Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel_slot.c
4 : * Parallel support for front-end parallel database connections
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * src/fe_utils/parallel_slot.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #if defined(WIN32) && FD_SETSIZE < 1024
16 : #error FD_SETSIZE needs to have been increased
17 : #endif
18 :
19 : #include "postgres_fe.h"
20 :
21 : #include <sys/select.h>
22 :
23 : #include "common/logging.h"
24 : #include "fe_utils/cancel.h"
25 : #include "fe_utils/parallel_slot.h"
26 : #include "fe_utils/query_utils.h"
27 :
28 : #define ERRCODE_UNDEFINED_TABLE "42P01"
29 :
30 : static int select_loop(int maxFd, fd_set *workerset);
31 : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
32 :
33 : /*
34 : * Process (and delete) a query result. Returns true if there's no problem,
35 : * false otherwise. It's up to the handler to decide what constitutes a
36 : * problem.
37 : */
38 : static bool
39 22638 : processQueryResult(ParallelSlot *slot, PGresult *result)
40 : {
41 : Assert(slot->handler != NULL);
42 :
43 : /* On failure, the handler should return NULL after freeing the result */
44 22638 : if (!slot->handler(result, slot->connection, slot->handler_context))
45 12 : return false;
46 :
47 : /* Ok, we have to free it ourself */
48 22626 : PQclear(result);
49 22626 : return true;
50 : }
51 :
52 : /*
53 : * Consume all the results generated for the given connection until
54 : * nothing remains. If at least one error is encountered, return false.
55 : * Note that this will block if the connection is busy.
56 : */
57 : static bool
58 406 : consumeQueryResult(ParallelSlot *slot)
59 : {
60 406 : bool ok = true;
61 : PGresult *result;
62 :
63 406 : SetCancelConn(slot->connection);
64 812 : while ((result = PQgetResult(slot->connection)) != NULL)
65 : {
66 406 : if (!processQueryResult(slot, result))
67 12 : ok = false;
68 : }
69 406 : ResetCancelConn();
70 406 : return ok;
71 : }
72 :
73 : /*
74 : * Wait until a file descriptor from the given set becomes readable.
75 : *
76 : * Returns the number of ready descriptors, or -1 on failure (including
77 : * getting a cancel request).
78 : */
79 : static int
80 22338 : select_loop(int maxFd, fd_set *workerset)
81 : {
82 : int i;
83 22338 : fd_set saveSet = *workerset;
84 :
85 22338 : if (CancelRequested)
86 0 : return -1;
87 :
88 : for (;;)
89 0 : {
90 : /*
91 : * On Windows, we need to check once in a while for cancel requests;
92 : * on other platforms we rely on select() returning when interrupted.
93 : */
94 : struct timeval *tvp;
95 : #ifdef WIN32
96 : struct timeval tv = {0, 1000000};
97 :
98 : tvp = &tv;
99 : #else
100 22338 : tvp = NULL;
101 : #endif
102 :
103 22338 : *workerset = saveSet;
104 22338 : i = select(maxFd + 1, workerset, NULL, NULL, tvp);
105 :
106 : #ifdef WIN32
107 : if (i == SOCKET_ERROR)
108 : {
109 : i = -1;
110 :
111 : if (WSAGetLastError() == WSAEINTR)
112 : errno = EINTR;
113 : }
114 : #endif
115 :
116 22338 : if (i < 0 && errno == EINTR)
117 0 : continue; /* ignore this */
118 22338 : if (i < 0 || CancelRequested)
119 0 : return -1; /* but not this */
120 22338 : if (i == 0)
121 0 : continue; /* timeout (Win32 only) */
122 22338 : break;
123 : }
124 :
125 22338 : return i;
126 : }
127 :
128 : /*
129 : * Return the offset of a suitable idle slot, or -1 if none are available. If
130 : * the given dbname is not null, only idle slots connected to the given
131 : * database are considered suitable, otherwise all idle connected slots are
132 : * considered suitable.
133 : */
134 : static int
135 44974 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
136 : {
137 : int i;
138 :
139 67580 : for (i = 0; i < sa->numslots; i++)
140 : {
141 45200 : if (sa->slots[i].inUse)
142 22564 : continue;
143 :
144 22636 : if (sa->slots[i].connection == NULL)
145 14 : continue;
146 :
147 22622 : if (dbname == NULL ||
148 15558 : strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
149 22594 : return i;
150 : }
151 22380 : return -1;
152 : }
153 :
154 : /*
155 : * Return the offset of the first slot without a database connection, or -1 if
156 : * all slots are connected.
157 : */
158 : static int
159 22664 : find_unconnected_slot(const ParallelSlotArray *sa)
160 : {
161 : int i;
162 :
163 45180 : for (i = 0; i < sa->numslots; i++)
164 : {
165 22814 : if (sa->slots[i].inUse)
166 22488 : continue;
167 :
168 326 : if (sa->slots[i].connection == NULL)
169 298 : return i;
170 : }
171 :
172 22366 : return -1;
173 : }
174 :
175 : /*
176 : * Return the offset of the first idle slot, or -1 if all slots are busy.
177 : */
178 : static int
179 22366 : find_any_idle_slot(const ParallelSlotArray *sa)
180 : {
181 : int i;
182 :
183 44846 : for (i = 0; i < sa->numslots; i++)
184 22508 : if (!sa->slots[i].inUse)
185 28 : return i;
186 :
187 22338 : return -1;
188 : }
189 :
190 : /*
191 : * Wait for any slot's connection to have query results, consume the results,
192 : * and update the slot's status as appropriate. Returns true on success,
193 : * false on cancellation, on error, or if no slots are connected.
194 : */
195 : static bool
196 22338 : wait_on_slots(ParallelSlotArray *sa)
197 : {
198 : int i;
199 : fd_set slotset;
200 22338 : int maxFd = 0;
201 22338 : PGconn *cancelconn = NULL;
202 :
203 : /* We must reconstruct the fd_set for each call to select_loop */
204 22338 : FD_ZERO(&slotset);
205 :
206 44818 : for (i = 0; i < sa->numslots; i++)
207 : {
208 : int sock;
209 :
210 : /* We shouldn't get here if we still have slots without connections */
211 : Assert(sa->slots[i].connection != NULL);
212 :
213 22480 : sock = PQsocket(sa->slots[i].connection);
214 :
215 : /*
216 : * We don't really expect any connections to lose their sockets after
217 : * startup, but just in case, cope by ignoring them.
218 : */
219 22480 : if (sock < 0)
220 0 : continue;
221 :
222 : /* Keep track of the first valid connection we see. */
223 22480 : if (cancelconn == NULL)
224 22338 : cancelconn = sa->slots[i].connection;
225 :
226 22480 : FD_SET(sock, &slotset);
227 22480 : if (sock > maxFd)
228 22480 : maxFd = sock;
229 : }
230 :
231 : /*
232 : * If we get this far with no valid connections, processing cannot
233 : * continue.
234 : */
235 22338 : if (cancelconn == NULL)
236 0 : return false;
237 :
238 22338 : SetCancelConn(cancelconn);
239 22338 : i = select_loop(maxFd, &slotset);
240 22338 : ResetCancelConn();
241 :
242 : /* failure? */
243 22338 : if (i < 0)
244 0 : return false;
245 :
246 44818 : for (i = 0; i < sa->numslots; i++)
247 : {
248 : int sock;
249 :
250 22480 : sock = PQsocket(sa->slots[i].connection);
251 :
252 22480 : if (sock >= 0 && FD_ISSET(sock, &slotset))
253 : {
254 : /* select() says input is available, so consume it */
255 22340 : PQconsumeInput(sa->slots[i].connection);
256 : }
257 :
258 : /* Collect result(s) as long as any are available */
259 44712 : while (!PQisBusy(sa->slots[i].connection))
260 : {
261 44464 : PGresult *result = PQgetResult(sa->slots[i].connection);
262 :
263 44464 : if (result != NULL)
264 : {
265 : /* Handle and discard the command result */
266 22232 : if (!processQueryResult(&sa->slots[i], result))
267 0 : return false;
268 : }
269 : else
270 : {
271 : /* This connection has become idle */
272 22232 : sa->slots[i].inUse = false;
273 22232 : ParallelSlotClearHandler(&sa->slots[i]);
274 22232 : break;
275 : }
276 : }
277 : }
278 22338 : return true;
279 : }
280 :
281 : /*
282 : * Open a new database connection using the stored connection parameters and
283 : * optionally a given dbname if not null, execute the stored initial command if
284 : * any, and associate the new connection with the given slot.
285 : */
286 : static void
287 42 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
288 : {
289 : const char *old_override;
290 42 : ParallelSlot *slot = &sa->slots[slotno];
291 :
292 42 : old_override = sa->cparams->override_dbname;
293 42 : if (dbname)
294 34 : sa->cparams->override_dbname = dbname;
295 42 : slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
296 42 : sa->cparams->override_dbname = old_override;
297 :
298 : /*
299 : * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
300 : * FD_SET() and allied macros. Windows defines it as a ceiling on the
301 : * count of file descriptors in the set, not a ceiling on the value of
302 : * each file descriptor; see
303 : * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
304 : * and
305 : * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
306 : * We can't ignore that, because Windows starts file descriptors at a
307 : * higher value, delays reuse, and skips values. With less than ten
308 : * concurrent file descriptors, opened and closed rapidly, one can reach
309 : * file descriptor 1024.
310 : *
311 : * Doing a hard exit here is a bit grotty, but it doesn't seem worth
312 : * complicating the API to make it less grotty.
313 : */
314 : #ifdef WIN32
315 : if (slotno >= FD_SETSIZE)
316 : {
317 : pg_log_error("too many jobs for this platform: %d", slotno);
318 : exit(1);
319 : }
320 : #else
321 : {
322 42 : int fd = PQsocket(slot->connection);
323 :
324 42 : if (fd >= FD_SETSIZE)
325 : {
326 0 : pg_log_error("socket file descriptor out of range for select(): %d",
327 : fd);
328 0 : pg_log_error_hint("Try fewer jobs.");
329 0 : exit(1);
330 : }
331 : }
332 : #endif
333 :
334 : /* Setup the connection using the supplied command, if any. */
335 42 : if (sa->initcmd)
336 0 : executeCommand(slot->connection, sa->initcmd, sa->echo);
337 42 : }
338 :
339 : /*
340 : * ParallelSlotsGetIdle
341 : * Return a connection slot that is ready to execute a command.
342 : *
343 : * The slot returned is chosen as follows:
344 : *
345 : * If any idle slot already has an open connection, and if either dbname is
346 : * null or the existing connection is to the given database, that slot will be
347 : * returned allowing the connection to be reused.
348 : *
349 : * Otherwise, if any idle slot is not yet connected to any database, the slot
350 : * will be returned with it's connection opened using the stored cparams and
351 : * optionally the given dbname if not null.
352 : *
353 : * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
354 : * after having it's connection disconnected and reconnected using the stored
355 : * cparams and optionally the given dbname if not null.
356 : *
357 : * Otherwise, if any slots have connections that are busy, we loop on select()
358 : * until one socket becomes available. When this happens, we read the whole
359 : * set and mark as free all sockets that become available. We then select a
360 : * slot using the same rules as above.
361 : *
362 : * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
363 : *
364 : * For any connection created, if the stored initcmd is not null, it will be
365 : * executed as a command on the newly formed connection before the slot is
366 : * returned.
367 : *
368 : * If an error occurs, NULL is returned.
369 : */
370 : ParallelSlot *
371 44974 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
372 : {
373 : int offset;
374 :
375 : Assert(sa);
376 : Assert(sa->numslots > 0);
377 :
378 : while (1)
379 : {
380 : /* First choice: a slot already connected to the desired database. */
381 44974 : offset = find_matching_idle_slot(sa, dbname);
382 44974 : if (offset >= 0)
383 : {
384 22594 : sa->slots[offset].inUse = true;
385 22594 : return &sa->slots[offset];
386 : }
387 :
388 : /* Second choice: a slot not connected to any database. */
389 22380 : offset = find_unconnected_slot(sa);
390 22380 : if (offset >= 0)
391 : {
392 14 : connect_slot(sa, offset, dbname);
393 14 : sa->slots[offset].inUse = true;
394 14 : return &sa->slots[offset];
395 : }
396 :
397 : /* Third choice: a slot connected to the wrong database. */
398 22366 : offset = find_any_idle_slot(sa);
399 22366 : if (offset >= 0)
400 : {
401 28 : disconnectDatabase(sa->slots[offset].connection);
402 28 : sa->slots[offset].connection = NULL;
403 28 : connect_slot(sa, offset, dbname);
404 28 : sa->slots[offset].inUse = true;
405 28 : return &sa->slots[offset];
406 : }
407 :
408 : /*
409 : * Fourth choice: block until one or more slots become available. If
410 : * any slots hit a fatal error, we'll find out about that here and
411 : * return NULL.
412 : */
413 22338 : if (!wait_on_slots(sa))
414 0 : return NULL;
415 : }
416 : }
417 :
418 : /*
419 : * ParallelSlotsSetup
420 : * Prepare a set of parallel slots but do not connect to any database.
421 : *
422 : * This creates and initializes a set of slots, marking all parallel slots as
423 : * free and ready to use. Establishing connections is delayed until requesting
424 : * a free slot. The cparams, progname, echo, and initcmd are stored for later
425 : * use and must remain valid for the lifetime of the returned array.
426 : */
427 : ParallelSlotArray *
428 290 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
429 : bool echo, const char *initcmd)
430 : {
431 : ParallelSlotArray *sa;
432 :
433 : Assert(numslots > 0);
434 : Assert(cparams != NULL);
435 : Assert(progname != NULL);
436 :
437 290 : sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
438 290 : numslots * sizeof(ParallelSlot));
439 :
440 290 : sa->numslots = numslots;
441 290 : sa->cparams = cparams;
442 290 : sa->progname = progname;
443 290 : sa->echo = echo;
444 290 : sa->initcmd = initcmd;
445 :
446 290 : return sa;
447 : }
448 :
449 : /*
450 : * ParallelSlotsAdoptConn
451 : * Assign an open connection to the slots array for reuse.
452 : *
453 : * This turns over ownership of an open connection to a slots array. The
454 : * caller should not further use or close the connection. All the connection's
455 : * parameters (user, host, port, etc.) except possibly dbname should match
456 : * those of the slots array's cparams, as given in ParallelSlotsSetup. If
457 : * these parameters differ, subsequent behavior is undefined.
458 : */
459 : void
460 284 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
461 : {
462 : int offset;
463 :
464 284 : offset = find_unconnected_slot(sa);
465 284 : if (offset >= 0)
466 284 : sa->slots[offset].connection = conn;
467 : else
468 0 : disconnectDatabase(conn);
469 284 : }
470 :
471 : /*
472 : * ParallelSlotsTerminate
473 : * Clean up a set of parallel slots
474 : *
475 : * Iterate through all connections in a given set of ParallelSlots and
476 : * terminate all connections.
477 : */
478 : void
479 290 : ParallelSlotsTerminate(ParallelSlotArray *sa)
480 : {
481 : int i;
482 :
483 588 : for (i = 0; i < sa->numslots; i++)
484 : {
485 298 : PGconn *conn = sa->slots[i].connection;
486 :
487 298 : if (conn == NULL)
488 0 : continue;
489 :
490 298 : disconnectDatabase(conn);
491 : }
492 290 : }
493 :
494 : /*
495 : * ParallelSlotsWaitCompletion
496 : *
497 : * Wait for all connections to finish, returning false if at least one
498 : * error has been found on the way.
499 : */
500 : bool
501 396 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
502 : {
503 : int i;
504 :
505 790 : for (i = 0; i < sa->numslots; i++)
506 : {
507 406 : if (sa->slots[i].connection == NULL)
508 0 : continue;
509 406 : if (!consumeQueryResult(&sa->slots[i]))
510 12 : return false;
511 : /* Mark connection as idle */
512 394 : sa->slots[i].inUse = false;
513 394 : ParallelSlotClearHandler(&sa->slots[i]);
514 : }
515 :
516 384 : return true;
517 : }
518 :
519 : /*
520 : * TableCommandResultHandler
521 : *
522 : * ParallelSlotResultHandler for results of commands (not queries) against
523 : * tables.
524 : *
525 : * Requires that the result status is either PGRES_COMMAND_OK or an error about
526 : * a missing table. This is useful for utilities that compile a list of tables
527 : * to process and then run commands (vacuum, reindex, or whatever) against
528 : * those tables, as there is a race condition between the time the list is
529 : * compiled and the time the command attempts to open the table.
530 : *
531 : * For missing tables, logs an error but allows processing to continue.
532 : *
533 : * For all other errors, logs an error and terminates further processing.
534 : *
535 : * res: PGresult from the query executed on the slot's connection
536 : * conn: connection belonging to the slot
537 : * context: unused
538 : */
539 : bool
540 7074 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
541 : {
542 : Assert(res != NULL);
543 : Assert(conn != NULL);
544 :
545 : /*
546 : * If it's an error, report it. Errors about a missing table are harmless
547 : * so we continue processing; but die for other errors.
548 : */
549 7074 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
550 : {
551 12 : char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
552 :
553 12 : pg_log_error("processing of database \"%s\" failed: %s",
554 : PQdb(conn), PQerrorMessage(conn));
555 :
556 12 : if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
557 : {
558 12 : PQclear(res);
559 12 : return false;
560 : }
561 : }
562 :
563 7062 : return true;
564 : }
|