Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * method_worker.c
4 : * AIO - perform AIO using worker processes
5 : *
6 : * IO workers consume IOs from a shared memory submission queue, run
7 : * traditional synchronous system calls, and perform the shared completion
8 : * handling immediately. Client code submits most requests by pushing IOs
9 : * into the submission queue, and waits (if necessary) using condition
10 : * variables. Some IOs cannot be performed in another process due to lack of
11 : * infrastructure for reopening the file, and must processed synchronously by
12 : * the client code when submitted.
13 : *
14 : * So that the submitter can make just one system call when submitting a batch
15 : * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 : * could be improved by using futexes instead of latches to wake N waiters.
17 : *
18 : * This method of AIO is available in all builds on all operating systems, and
19 : * is the default.
20 : *
21 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
22 : * Portions Copyright (c) 1994, Regents of the University of California
23 : *
24 : * IDENTIFICATION
25 : * src/backend/storage/aio/method_worker.c
26 : *
27 : *-------------------------------------------------------------------------
28 : */
29 :
30 : #include "postgres.h"
31 :
32 : #include "libpq/pqsignal.h"
33 : #include "miscadmin.h"
34 : #include "port/pg_bitutils.h"
35 : #include "postmaster/auxprocess.h"
36 : #include "postmaster/interrupt.h"
37 : #include "storage/aio.h"
38 : #include "storage/aio_internal.h"
39 : #include "storage/aio_subsys.h"
40 : #include "storage/io_worker.h"
41 : #include "storage/ipc.h"
42 : #include "storage/latch.h"
43 : #include "storage/proc.h"
44 : #include "tcop/tcopprot.h"
45 : #include "utils/injection_point.h"
46 : #include "utils/memdebug.h"
47 : #include "utils/ps_status.h"
48 : #include "utils/wait_event.h"
49 :
50 :
51 : /* How many workers should each worker wake up if needed? */
52 : #define IO_WORKER_WAKEUP_FANOUT 2
53 :
54 :
55 : typedef struct PgAioWorkerSubmissionQueue
56 : {
57 : uint32 size;
58 : uint32 mask;
59 : uint32 head;
60 : uint32 tail;
61 : uint32 sqes[FLEXIBLE_ARRAY_MEMBER];
62 : } PgAioWorkerSubmissionQueue;
63 :
64 : typedef struct PgAioWorkerSlot
65 : {
66 : Latch *latch;
67 : bool in_use;
68 : } PgAioWorkerSlot;
69 :
70 : typedef struct PgAioWorkerControl
71 : {
72 : uint64 idle_worker_mask;
73 : PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
74 : } PgAioWorkerControl;
75 :
76 :
77 : static size_t pgaio_worker_shmem_size(void);
78 : static void pgaio_worker_shmem_init(bool first_time);
79 :
80 : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
81 : static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
82 :
83 :
84 : const IoMethodOps pgaio_worker_ops = {
85 : .shmem_size = pgaio_worker_shmem_size,
86 : .shmem_init = pgaio_worker_shmem_init,
87 :
88 : .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
89 : .submit = pgaio_worker_submit,
90 : };
91 :
92 :
93 : /* GUCs */
94 : int io_workers = 3;
95 :
96 :
97 : static int io_worker_queue_size = 64;
98 : static int MyIoWorkerId;
99 : static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
100 : static PgAioWorkerControl *io_worker_control;
101 :
102 :
103 : static size_t
104 6072 : pgaio_worker_queue_shmem_size(int *queue_size)
105 : {
106 : /* Round size up to next power of two so we can make a mask. */
107 6072 : *queue_size = pg_nextpower2_32(io_worker_queue_size);
108 :
109 12144 : return offsetof(PgAioWorkerSubmissionQueue, sqes) +
110 6072 : sizeof(uint32) * *queue_size;
111 : }
112 :
113 : static size_t
114 6072 : pgaio_worker_control_shmem_size(void)
115 : {
116 6072 : return offsetof(PgAioWorkerControl, workers) +
117 : sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
118 : }
119 :
120 : static size_t
121 3948 : pgaio_worker_shmem_size(void)
122 : {
123 : size_t sz;
124 : int queue_size;
125 :
126 3948 : sz = pgaio_worker_queue_shmem_size(&queue_size);
127 3948 : sz = add_size(sz, pgaio_worker_control_shmem_size());
128 :
129 3948 : return sz;
130 : }
131 :
132 : static void
133 2124 : pgaio_worker_shmem_init(bool first_time)
134 : {
135 : bool found;
136 : int queue_size;
137 :
138 2124 : io_worker_submission_queue =
139 2124 : ShmemInitStruct("AioWorkerSubmissionQueue",
140 : pgaio_worker_queue_shmem_size(&queue_size),
141 : &found);
142 2124 : if (!found)
143 : {
144 2124 : io_worker_submission_queue->size = queue_size;
145 2124 : io_worker_submission_queue->head = 0;
146 2124 : io_worker_submission_queue->tail = 0;
147 : }
148 :
149 2124 : io_worker_control =
150 2124 : ShmemInitStruct("AioWorkerControl",
151 : pgaio_worker_control_shmem_size(),
152 : &found);
153 2124 : if (!found)
154 : {
155 2124 : io_worker_control->idle_worker_mask = 0;
156 70092 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
157 : {
158 67968 : io_worker_control->workers[i].latch = NULL;
159 67968 : io_worker_control->workers[i].in_use = false;
160 : }
161 : }
162 2124 : }
163 :
164 : static int
165 1102422 : pgaio_worker_choose_idle(void)
166 : {
167 : int worker;
168 :
169 1102422 : if (io_worker_control->idle_worker_mask == 0)
170 12618 : return -1;
171 :
172 : /* Find the lowest bit position, and clear it. */
173 1089804 : worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
174 1089804 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
175 : Assert(io_worker_control->workers[worker].in_use);
176 :
177 1089804 : return worker;
178 : }
179 :
180 : static bool
181 1074654 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
182 : {
183 : PgAioWorkerSubmissionQueue *queue;
184 : uint32 new_head;
185 :
186 1074654 : queue = io_worker_submission_queue;
187 1074654 : new_head = (queue->head + 1) & (queue->size - 1);
188 1074654 : if (new_head == queue->tail)
189 : {
190 0 : pgaio_debug(DEBUG3, "io queue is full, at %u elements",
191 : io_worker_submission_queue->size);
192 0 : return false; /* full */
193 : }
194 :
195 1074654 : queue->sqes[queue->head] = pgaio_io_get_id(ioh);
196 1074654 : queue->head = new_head;
197 :
198 1074654 : return true;
199 : }
200 :
201 : static uint32
202 1753930 : pgaio_worker_submission_queue_consume(void)
203 : {
204 : PgAioWorkerSubmissionQueue *queue;
205 : uint32 result;
206 :
207 1753930 : queue = io_worker_submission_queue;
208 1753930 : if (queue->tail == queue->head)
209 888294 : return UINT32_MAX; /* empty */
210 :
211 865636 : result = queue->sqes[queue->tail];
212 865636 : queue->tail = (queue->tail + 1) & (queue->size - 1);
213 :
214 865636 : return result;
215 : }
216 :
217 : static uint32
218 1729414 : pgaio_worker_submission_queue_depth(void)
219 : {
220 : uint32 head;
221 : uint32 tail;
222 :
223 1729414 : head = io_worker_submission_queue->head;
224 1729414 : tail = io_worker_submission_queue->tail;
225 :
226 1729414 : if (tail > head)
227 906 : head += io_worker_submission_queue->size;
228 :
229 : Assert(head >= tail);
230 :
231 1729414 : return head - tail;
232 : }
233 :
234 : static bool
235 1082804 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
236 : {
237 : return
238 1082804 : !IsUnderPostmaster
239 1076958 : || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
240 2159762 : || !pgaio_io_can_reopen(ioh);
241 : }
242 :
243 : static void
244 1073568 : pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
245 : {
246 : PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
247 1073568 : int nsync = 0;
248 1073568 : Latch *wakeup = NULL;
249 : int worker;
250 :
251 : Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
252 :
253 1073568 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
254 2148222 : for (int i = 0; i < num_staged_ios; ++i)
255 : {
256 : Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
257 1074654 : if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
258 : {
259 : /*
260 : * We'll do it synchronously, but only after we've sent as many as
261 : * we can to workers, to maximize concurrency.
262 : */
263 0 : synchronous_ios[nsync++] = staged_ios[i];
264 0 : continue;
265 : }
266 :
267 1074654 : if (wakeup == NULL)
268 : {
269 : /* Choose an idle worker to wake up if we haven't already. */
270 1073574 : worker = pgaio_worker_choose_idle();
271 1073574 : if (worker >= 0)
272 1068100 : wakeup = io_worker_control->workers[worker].latch;
273 :
274 1073574 : pgaio_debug_io(DEBUG4, staged_ios[i],
275 : "choosing worker %d",
276 : worker);
277 : }
278 : }
279 1073568 : LWLockRelease(AioWorkerSubmissionQueueLock);
280 :
281 1073568 : if (wakeup)
282 1068100 : SetLatch(wakeup);
283 :
284 : /* Run whatever is left synchronously. */
285 1073568 : if (nsync > 0)
286 : {
287 0 : for (int i = 0; i < nsync; ++i)
288 : {
289 0 : pgaio_io_perform_synchronously(synchronous_ios[i]);
290 : }
291 : }
292 1073568 : }
293 :
294 : static int
295 1073568 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
296 : {
297 2148222 : for (int i = 0; i < num_staged_ios; i++)
298 : {
299 1074654 : PgAioHandle *ioh = staged_ios[i];
300 :
301 1074654 : pgaio_io_prepare_submit(ioh);
302 : }
303 :
304 1073568 : pgaio_worker_submit_internal(num_staged_ios, staged_ios);
305 :
306 1073568 : return num_staged_ios;
307 : }
308 :
309 : /*
310 : * on_shmem_exit() callback that releases the worker's slot in
311 : * io_worker_control.
312 : */
313 : static void
314 3174 : pgaio_worker_die(int code, Datum arg)
315 : {
316 3174 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
317 : Assert(io_worker_control->workers[MyIoWorkerId].in_use);
318 : Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
319 :
320 3174 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
321 3174 : io_worker_control->workers[MyIoWorkerId].in_use = false;
322 3174 : io_worker_control->workers[MyIoWorkerId].latch = NULL;
323 3174 : LWLockRelease(AioWorkerSubmissionQueueLock);
324 3174 : }
325 :
326 : /*
327 : * Register the worker in shared memory, assign MyIoWorkerId and register a
328 : * shutdown callback to release registration.
329 : */
330 : static void
331 3174 : pgaio_worker_register(void)
332 : {
333 3174 : MyIoWorkerId = -1;
334 :
335 : /*
336 : * XXX: This could do with more fine-grained locking. But it's also not
337 : * very common for the number of workers to change at the moment...
338 : */
339 3174 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
340 :
341 7348 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
342 : {
343 7348 : if (!io_worker_control->workers[i].in_use)
344 : {
345 : Assert(io_worker_control->workers[i].latch == NULL);
346 3174 : io_worker_control->workers[i].in_use = true;
347 3174 : MyIoWorkerId = i;
348 3174 : break;
349 : }
350 : else
351 : Assert(io_worker_control->workers[i].latch != NULL);
352 : }
353 :
354 3174 : if (MyIoWorkerId == -1)
355 0 : elog(ERROR, "couldn't find a free worker slot");
356 :
357 3174 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
358 3174 : io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
359 3174 : LWLockRelease(AioWorkerSubmissionQueueLock);
360 :
361 3174 : on_shmem_exit(pgaio_worker_die, 0);
362 3174 : }
363 :
364 : static void
365 2254 : pgaio_worker_error_callback(void *arg)
366 : {
367 : ProcNumber owner;
368 : PGPROC *owner_proc;
369 : int32 owner_pid;
370 2254 : PgAioHandle *ioh = arg;
371 :
372 2254 : if (!ioh)
373 0 : return;
374 :
375 : Assert(ioh->owner_procno != MyProcNumber);
376 : Assert(MyBackendType == B_IO_WORKER);
377 :
378 2254 : owner = ioh->owner_procno;
379 2254 : owner_proc = GetPGProcByNumber(owner);
380 2254 : owner_pid = owner_proc->pid;
381 :
382 2254 : errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
383 : }
384 :
385 : void
386 3174 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
387 : {
388 : sigjmp_buf local_sigjmp_buf;
389 3174 : PgAioHandle *volatile error_ioh = NULL;
390 3174 : ErrorContextCallback errcallback = {0};
391 3174 : volatile int error_errno = 0;
392 : char cmd[128];
393 :
394 3174 : MyBackendType = B_IO_WORKER;
395 3174 : AuxiliaryProcessMainCommon();
396 :
397 3174 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
398 3174 : pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
399 :
400 : /*
401 : * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
402 : * shutdown sequence, similar to checkpointer.
403 : */
404 3174 : pqsignal(SIGTERM, SIG_IGN);
405 : /* SIGQUIT handler was already set up by InitPostmasterChild */
406 3174 : pqsignal(SIGALRM, SIG_IGN);
407 3174 : pqsignal(SIGPIPE, SIG_IGN);
408 3174 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
409 3174 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
410 :
411 : /* also registers a shutdown callback to unregister */
412 3174 : pgaio_worker_register();
413 :
414 3174 : sprintf(cmd, "%d", MyIoWorkerId);
415 3174 : set_ps_display(cmd);
416 :
417 3174 : errcallback.callback = pgaio_worker_error_callback;
418 3174 : errcallback.previous = error_context_stack;
419 3174 : error_context_stack = &errcallback;
420 :
421 : /* see PostgresMain() */
422 3174 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
423 : {
424 2 : error_context_stack = NULL;
425 2 : HOLD_INTERRUPTS();
426 :
427 2 : EmitErrorReport();
428 :
429 : /*
430 : * In the - very unlikely - case that the IO failed in a way that
431 : * raises an error we need to mark the IO as failed.
432 : *
433 : * Need to do just enough error recovery so that we can mark the IO as
434 : * failed and then exit (postmaster will start a new worker).
435 : */
436 2 : LWLockReleaseAll();
437 :
438 2 : if (error_ioh != NULL)
439 : {
440 : /* should never fail without setting error_errno */
441 : Assert(error_errno != 0);
442 :
443 2 : errno = error_errno;
444 :
445 2 : START_CRIT_SECTION();
446 2 : pgaio_io_process_completion(error_ioh, -error_errno);
447 2 : END_CRIT_SECTION();
448 : }
449 :
450 2 : proc_exit(1);
451 : }
452 :
453 : /* We can now handle ereport(ERROR) */
454 3174 : PG_exception_stack = &local_sigjmp_buf;
455 :
456 3174 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
457 :
458 1757070 : while (!ShutdownRequestPending)
459 : {
460 : uint32 io_index;
461 : Latch *latches[IO_WORKER_WAKEUP_FANOUT];
462 1753930 : int nlatches = 0;
463 1753930 : int nwakeups = 0;
464 : int worker;
465 :
466 : /*
467 : * Try to get a job to do.
468 : *
469 : * The lwlock acquisition also provides the necessary memory barrier
470 : * to ensure that we don't see an outdated data in the handle.
471 : */
472 1753930 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
473 1753930 : if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
474 : {
475 : /*
476 : * Nothing to do. Mark self idle.
477 : *
478 : * XXX: Invent some kind of back pressure to reduce useless
479 : * wakeups?
480 : */
481 888294 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
482 : }
483 : else
484 : {
485 : /* Got one. Clear idle flag. */
486 865636 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
487 :
488 : /* See if we can wake up some peers. */
489 865636 : nwakeups = Min(pgaio_worker_submission_queue_depth(),
490 : IO_WORKER_WAKEUP_FANOUT);
491 887340 : for (int i = 0; i < nwakeups; ++i)
492 : {
493 28848 : if ((worker = pgaio_worker_choose_idle()) < 0)
494 7144 : break;
495 21704 : latches[nlatches++] = io_worker_control->workers[worker].latch;
496 : }
497 : }
498 1753930 : LWLockRelease(AioWorkerSubmissionQueueLock);
499 :
500 1775634 : for (int i = 0; i < nlatches; ++i)
501 21704 : SetLatch(latches[i]);
502 :
503 1753930 : if (io_index != UINT32_MAX)
504 : {
505 865636 : PgAioHandle *ioh = NULL;
506 :
507 865636 : ioh = &pgaio_ctl->io_handles[io_index];
508 865636 : error_ioh = ioh;
509 865636 : errcallback.arg = ioh;
510 :
511 865636 : pgaio_debug_io(DEBUG4, ioh,
512 : "worker %d processing IO",
513 : MyIoWorkerId);
514 :
515 : /*
516 : * Prevent interrupts between pgaio_io_reopen() and
517 : * pgaio_io_perform_synchronously() that otherwise could lead to
518 : * the FD getting closed in that window.
519 : */
520 865636 : HOLD_INTERRUPTS();
521 :
522 : /*
523 : * It's very unlikely, but possible, that reopen fails. E.g. due
524 : * to memory allocations failing or file permissions changing or
525 : * such. In that case we need to fail the IO.
526 : *
527 : * There's not really a good errno we can report here.
528 : */
529 865636 : error_errno = ENOENT;
530 865636 : pgaio_io_reopen(ioh);
531 :
532 : /*
533 : * To be able to exercise the reopen-fails path, allow injection
534 : * points to trigger a failure at this point.
535 : */
536 865636 : INJECTION_POINT("aio-worker-after-reopen", ioh);
537 :
538 865634 : error_errno = 0;
539 865634 : error_ioh = NULL;
540 :
541 : /*
542 : * As part of IO completion the buffer will be marked as NOACCESS,
543 : * until the buffer is pinned again - which never happens in io
544 : * workers. Therefore the next time there is IO for the same
545 : * buffer, the memory will be considered inaccessible. To avoid
546 : * that, explicitly allow access to the memory before reading data
547 : * into it.
548 : */
549 : #ifdef USE_VALGRIND
550 : {
551 : struct iovec *iov;
552 : uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
553 :
554 : for (int i = 0; i < iov_length; i++)
555 : VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
556 : }
557 : #endif
558 :
559 : /*
560 : * We don't expect this to ever fail with ERROR or FATAL, no need
561 : * to keep error_ioh set to the IO.
562 : * pgaio_io_perform_synchronously() contains a critical section to
563 : * ensure we don't accidentally fail.
564 : */
565 865634 : pgaio_io_perform_synchronously(ioh);
566 :
567 865634 : RESUME_INTERRUPTS();
568 865634 : errcallback.arg = NULL;
569 : }
570 : else
571 : {
572 888294 : WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
573 : WAIT_EVENT_IO_WORKER_MAIN);
574 888270 : ResetLatch(MyLatch);
575 : }
576 :
577 1753904 : CHECK_FOR_INTERRUPTS();
578 :
579 1753896 : if (ConfigReloadPending)
580 : {
581 444 : ConfigReloadPending = false;
582 444 : ProcessConfigFile(PGC_SIGHUP);
583 : }
584 : }
585 :
586 3140 : error_context_stack = errcallback.previous;
587 3140 : proc_exit(0);
588 : }
589 :
590 : bool
591 251974 : pgaio_workers_enabled(void)
592 : {
593 251974 : return io_method == IOMETHOD_WORKER;
594 : }
|