Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * method_worker.c
4 : * AIO - perform AIO using worker processes
5 : *
6 : * IO workers consume IOs from a shared memory submission queue, run
7 : * traditional synchronous system calls, and perform the shared completion
8 : * handling immediately. Client code submits most requests by pushing IOs
9 : * into the submission queue, and waits (if necessary) using condition
10 : * variables. Some IOs cannot be performed in another process due to lack of
11 : * infrastructure for reopening the file, and must processed synchronously by
12 : * the client code when submitted.
13 : *
14 : * So that the submitter can make just one system call when submitting a batch
15 : * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 : * could be improved by using futexes instead of latches to wake N waiters.
17 : *
18 : * This method of AIO is available in all builds on all operating systems, and
19 : * is the default.
20 : *
21 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
22 : * Portions Copyright (c) 1994, Regents of the University of California
23 : *
24 : * IDENTIFICATION
25 : * src/backend/storage/aio/method_worker.c
26 : *
27 : *-------------------------------------------------------------------------
28 : */
29 :
30 : #include "postgres.h"
31 :
32 : #include "libpq/pqsignal.h"
33 : #include "miscadmin.h"
34 : #include "port/pg_bitutils.h"
35 : #include "postmaster/auxprocess.h"
36 : #include "postmaster/interrupt.h"
37 : #include "storage/aio.h"
38 : #include "storage/aio_internal.h"
39 : #include "storage/aio_subsys.h"
40 : #include "storage/io_worker.h"
41 : #include "storage/ipc.h"
42 : #include "storage/latch.h"
43 : #include "storage/proc.h"
44 : #include "storage/shmem.h"
45 : #include "tcop/tcopprot.h"
46 : #include "utils/injection_point.h"
47 : #include "utils/memdebug.h"
48 : #include "utils/ps_status.h"
49 : #include "utils/wait_event.h"
50 :
51 :
52 : /* How many workers should each worker wake up if needed? */
53 : #define IO_WORKER_WAKEUP_FANOUT 2
54 :
55 :
56 : typedef struct PgAioWorkerSubmissionQueue
57 : {
58 : uint32 size;
59 : uint32 head;
60 : uint32 tail;
61 : int sqes[FLEXIBLE_ARRAY_MEMBER];
62 : } PgAioWorkerSubmissionQueue;
63 :
64 : typedef struct PgAioWorkerSlot
65 : {
66 : Latch *latch;
67 : bool in_use;
68 : } PgAioWorkerSlot;
69 :
70 : typedef struct PgAioWorkerControl
71 : {
72 : uint64 idle_worker_mask;
73 : PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
74 : } PgAioWorkerControl;
75 :
76 :
77 : static void pgaio_worker_shmem_request(void *arg);
78 : static void pgaio_worker_shmem_init(void *arg);
79 :
80 : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
81 : static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
82 :
83 :
84 : const IoMethodOps pgaio_worker_ops = {
85 : .shmem_callbacks.request_fn = pgaio_worker_shmem_request,
86 : .shmem_callbacks.init_fn = pgaio_worker_shmem_init,
87 :
88 : .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
89 : .submit = pgaio_worker_submit,
90 : };
91 :
92 :
93 : /* GUCs */
94 : int io_workers = 3;
95 :
96 :
97 : static int io_worker_queue_size = 64;
98 : static int MyIoWorkerId;
99 : static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
100 : static PgAioWorkerControl *io_worker_control;
101 :
102 :
103 : static void
104 1227 : pgaio_worker_shmem_request(void *arg)
105 : {
106 : size_t size;
107 : int queue_size;
108 :
109 : /* Round size up to next power of two so we can make a mask. */
110 1227 : queue_size = pg_nextpower2_32(io_worker_queue_size);
111 :
112 1227 : size = offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(int) * queue_size;
113 1227 : ShmemRequestStruct(.name = "AioWorkerSubmissionQueue",
114 : .size = size,
115 : .ptr = (void **) &io_worker_submission_queue,
116 : );
117 :
118 1227 : size = offsetof(PgAioWorkerControl, workers) + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
119 1227 : ShmemRequestStruct(.name = "AioWorkerControl",
120 : .size = size,
121 : .ptr = (void **) &io_worker_control,
122 : );
123 1227 : }
124 :
125 : static void
126 1224 : pgaio_worker_shmem_init(void *arg)
127 : {
128 : int queue_size;
129 :
130 : /* Round size up like in pgaio_worker_shmem_request() */
131 1224 : queue_size = pg_nextpower2_32(io_worker_queue_size);
132 :
133 1224 : io_worker_submission_queue->size = queue_size;
134 1224 : io_worker_submission_queue->head = 0;
135 1224 : io_worker_submission_queue->tail = 0;
136 :
137 1224 : io_worker_control->idle_worker_mask = 0;
138 40392 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
139 : {
140 39168 : io_worker_control->workers[i].latch = NULL;
141 39168 : io_worker_control->workers[i].in_use = false;
142 : }
143 1224 : }
144 :
145 : static int
146 697484 : pgaio_worker_choose_idle(void)
147 : {
148 : int worker;
149 :
150 697484 : if (io_worker_control->idle_worker_mask == 0)
151 29334 : return -1;
152 :
153 : /* Find the lowest bit position, and clear it. */
154 668150 : worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
155 668150 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
156 : Assert(io_worker_control->workers[worker].in_use);
157 :
158 668150 : return worker;
159 : }
160 :
161 : static bool
162 677482 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
163 : {
164 : PgAioWorkerSubmissionQueue *queue;
165 : uint32 new_head;
166 :
167 677482 : queue = io_worker_submission_queue;
168 677482 : new_head = (queue->head + 1) & (queue->size - 1);
169 677482 : if (new_head == queue->tail)
170 : {
171 0 : pgaio_debug(DEBUG3, "io queue is full, at %u elements",
172 : io_worker_submission_queue->size);
173 0 : return false; /* full */
174 : }
175 :
176 677482 : queue->sqes[queue->head] = pgaio_io_get_id(ioh);
177 677482 : queue->head = new_head;
178 :
179 677482 : return true;
180 : }
181 :
182 : static int
183 1080627 : pgaio_worker_submission_queue_consume(void)
184 : {
185 : PgAioWorkerSubmissionQueue *queue;
186 : int result;
187 :
188 1080627 : queue = io_worker_submission_queue;
189 1080627 : if (queue->tail == queue->head)
190 538702 : return -1; /* empty */
191 :
192 541925 : result = queue->sqes[queue->tail];
193 541925 : queue->tail = (queue->tail + 1) & (queue->size - 1);
194 :
195 541925 : return result;
196 : }
197 :
198 : static uint32
199 1082351 : pgaio_worker_submission_queue_depth(void)
200 : {
201 : uint32 head;
202 : uint32 tail;
203 :
204 1082351 : head = io_worker_submission_queue->head;
205 1082351 : tail = io_worker_submission_queue->tail;
206 :
207 1082351 : if (tail > head)
208 657 : head += io_worker_submission_queue->size;
209 :
210 : Assert(head >= tail);
211 :
212 1082351 : return head - tail;
213 : }
214 :
215 : static bool
216 686612 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
217 : {
218 : return
219 686612 : !IsUnderPostmaster
220 682887 : || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
221 1369499 : || !pgaio_io_can_reopen(ioh);
222 : }
223 :
224 : static int
225 680768 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
226 : {
227 680768 : PgAioHandle **synchronous_ios = NULL;
228 680768 : int nsync = 0;
229 680768 : Latch *wakeup = NULL;
230 : int worker;
231 :
232 : Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
233 :
234 1361713 : for (int i = 0; i < num_staged_ios; i++)
235 680945 : pgaio_io_prepare_submit(staged_ios[i]);
236 :
237 680768 : if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
238 : {
239 1354787 : for (int i = 0; i < num_staged_ios; ++i)
240 : {
241 : Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
242 677482 : if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
243 : {
244 : /*
245 : * Do the rest synchronously. If the queue is full, give up
246 : * and do the rest synchronously. We're holding an exclusive
247 : * lock on the queue so nothing can consume entries.
248 : */
249 0 : synchronous_ios = &staged_ios[i];
250 0 : nsync = (num_staged_ios - i);
251 :
252 0 : break;
253 : }
254 :
255 677482 : if (wakeup == NULL)
256 : {
257 : /* Choose an idle worker to wake up if we haven't already. */
258 677313 : worker = pgaio_worker_choose_idle();
259 677313 : if (worker >= 0)
260 661248 : wakeup = io_worker_control->workers[worker].latch;
261 :
262 677313 : pgaio_debug_io(DEBUG4, staged_ios[i],
263 : "choosing worker %d",
264 : worker);
265 : }
266 : }
267 677305 : LWLockRelease(AioWorkerSubmissionQueueLock);
268 : }
269 : else
270 : {
271 : /* do everything synchronously, no wakeup needed */
272 3463 : synchronous_ios = staged_ios;
273 3463 : nsync = num_staged_ios;
274 : }
275 :
276 680768 : if (wakeup)
277 661248 : SetLatch(wakeup);
278 :
279 : /* Run whatever is left synchronously. */
280 680768 : if (nsync > 0)
281 : {
282 6926 : for (int i = 0; i < nsync; ++i)
283 : {
284 3463 : pgaio_io_perform_synchronously(synchronous_ios[i]);
285 : }
286 : }
287 :
288 680768 : return num_staged_ios;
289 : }
290 :
291 : /*
292 : * on_shmem_exit() callback that releases the worker's slot in
293 : * io_worker_control.
294 : */
295 : static void
296 1926 : pgaio_worker_die(int code, Datum arg)
297 : {
298 1926 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
299 : Assert(io_worker_control->workers[MyIoWorkerId].in_use);
300 : Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
301 :
302 1926 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
303 1926 : io_worker_control->workers[MyIoWorkerId].in_use = false;
304 1926 : io_worker_control->workers[MyIoWorkerId].latch = NULL;
305 1926 : LWLockRelease(AioWorkerSubmissionQueueLock);
306 1926 : }
307 :
308 : /*
309 : * Register the worker in shared memory, assign MyIoWorkerId and register a
310 : * shutdown callback to release registration.
311 : */
312 : static void
313 1926 : pgaio_worker_register(void)
314 : {
315 1926 : MyIoWorkerId = -1;
316 :
317 : /*
318 : * XXX: This could do with more fine-grained locking. But it's also not
319 : * very common for the number of workers to change at the moment...
320 : */
321 1926 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
322 :
323 4319 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
324 : {
325 4319 : if (!io_worker_control->workers[i].in_use)
326 : {
327 : Assert(io_worker_control->workers[i].latch == NULL);
328 1926 : io_worker_control->workers[i].in_use = true;
329 1926 : MyIoWorkerId = i;
330 1926 : break;
331 : }
332 : else
333 : Assert(io_worker_control->workers[i].latch != NULL);
334 : }
335 :
336 1926 : if (MyIoWorkerId == -1)
337 0 : elog(ERROR, "couldn't find a free worker slot");
338 :
339 1926 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
340 1926 : io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
341 1926 : LWLockRelease(AioWorkerSubmissionQueueLock);
342 :
343 1926 : on_shmem_exit(pgaio_worker_die, 0);
344 1926 : }
345 :
346 : static void
347 1634 : pgaio_worker_error_callback(void *arg)
348 : {
349 : ProcNumber owner;
350 : PGPROC *owner_proc;
351 : int32 owner_pid;
352 1634 : PgAioHandle *ioh = arg;
353 :
354 1634 : if (!ioh)
355 0 : return;
356 :
357 : Assert(ioh->owner_procno != MyProcNumber);
358 : Assert(MyBackendType == B_IO_WORKER);
359 :
360 1634 : owner = ioh->owner_procno;
361 1634 : owner_proc = GetPGProcByNumber(owner);
362 1634 : owner_pid = owner_proc->pid;
363 :
364 1634 : errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
365 : }
366 :
367 : void
368 1926 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
369 : {
370 : sigjmp_buf local_sigjmp_buf;
371 1926 : PgAioHandle *volatile error_ioh = NULL;
372 1926 : ErrorContextCallback errcallback = {0};
373 1926 : volatile int error_errno = 0;
374 : char cmd[128];
375 :
376 1926 : AuxiliaryProcessMainCommon();
377 :
378 1926 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
379 1926 : pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
380 :
381 : /*
382 : * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
383 : * shutdown sequence, similar to checkpointer.
384 : */
385 1926 : pqsignal(SIGTERM, SIG_IGN);
386 : /* SIGQUIT handler was already set up by InitPostmasterChild */
387 1926 : pqsignal(SIGALRM, SIG_IGN);
388 1926 : pqsignal(SIGPIPE, SIG_IGN);
389 1926 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
390 1926 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
391 :
392 : /* also registers a shutdown callback to unregister */
393 1926 : pgaio_worker_register();
394 :
395 1926 : sprintf(cmd, "%d", MyIoWorkerId);
396 1926 : set_ps_display(cmd);
397 :
398 1926 : errcallback.callback = pgaio_worker_error_callback;
399 1926 : errcallback.previous = error_context_stack;
400 1926 : error_context_stack = &errcallback;
401 :
402 : /* see PostgresMain() */
403 1926 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
404 : {
405 1 : error_context_stack = NULL;
406 1 : HOLD_INTERRUPTS();
407 :
408 1 : EmitErrorReport();
409 :
410 : /*
411 : * In the - very unlikely - case that the IO failed in a way that
412 : * raises an error we need to mark the IO as failed.
413 : *
414 : * Need to do just enough error recovery so that we can mark the IO as
415 : * failed and then exit (postmaster will start a new worker).
416 : */
417 1 : LWLockReleaseAll();
418 :
419 1 : if (error_ioh != NULL)
420 : {
421 : /* should never fail without setting error_errno */
422 : Assert(error_errno != 0);
423 :
424 1 : errno = error_errno;
425 :
426 1 : START_CRIT_SECTION();
427 1 : pgaio_io_process_completion(error_ioh, -error_errno);
428 1 : END_CRIT_SECTION();
429 : }
430 :
431 1 : proc_exit(1);
432 : }
433 :
434 : /* We can now handle ereport(ERROR) */
435 1926 : PG_exception_stack = &local_sigjmp_buf;
436 :
437 1926 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
438 :
439 1082527 : while (!ShutdownRequestPending)
440 : {
441 : uint32 io_index;
442 : Latch *latches[IO_WORKER_WAKEUP_FANOUT];
443 1080627 : int nlatches = 0;
444 1080627 : int nwakeups = 0;
445 : int worker;
446 :
447 : /*
448 : * Try to get a job to do.
449 : *
450 : * The lwlock acquisition also provides the necessary memory barrier
451 : * to ensure that we don't see an outdated data in the handle.
452 : */
453 1080627 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
454 1080627 : if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
455 : {
456 : /*
457 : * Nothing to do. Mark self idle.
458 : *
459 : * XXX: Invent some kind of back pressure to reduce useless
460 : * wakeups?
461 : */
462 538702 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
463 : }
464 : else
465 : {
466 : /* Got one. Clear idle flag. */
467 541925 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
468 :
469 : /* See if we can wake up some peers. */
470 541925 : nwakeups = Min(pgaio_worker_submission_queue_depth(),
471 : IO_WORKER_WAKEUP_FANOUT);
472 548827 : for (int i = 0; i < nwakeups; ++i)
473 : {
474 20171 : if ((worker = pgaio_worker_choose_idle()) < 0)
475 13269 : break;
476 6902 : latches[nlatches++] = io_worker_control->workers[worker].latch;
477 : }
478 : }
479 1080627 : LWLockRelease(AioWorkerSubmissionQueueLock);
480 :
481 1087529 : for (int i = 0; i < nlatches; ++i)
482 6902 : SetLatch(latches[i]);
483 :
484 1080627 : if (io_index != -1)
485 : {
486 541925 : PgAioHandle *ioh = NULL;
487 :
488 541925 : ioh = &pgaio_ctl->io_handles[io_index];
489 541925 : error_ioh = ioh;
490 541925 : errcallback.arg = ioh;
491 :
492 541925 : pgaio_debug_io(DEBUG4, ioh,
493 : "worker %d processing IO",
494 : MyIoWorkerId);
495 :
496 : /*
497 : * Prevent interrupts between pgaio_io_reopen() and
498 : * pgaio_io_perform_synchronously() that otherwise could lead to
499 : * the FD getting closed in that window.
500 : */
501 541925 : HOLD_INTERRUPTS();
502 :
503 : /*
504 : * It's very unlikely, but possible, that reopen fails. E.g. due
505 : * to memory allocations failing or file permissions changing or
506 : * such. In that case we need to fail the IO.
507 : *
508 : * There's not really a good errno we can report here.
509 : */
510 541925 : error_errno = ENOENT;
511 541925 : pgaio_io_reopen(ioh);
512 :
513 : /*
514 : * To be able to exercise the reopen-fails path, allow injection
515 : * points to trigger a failure at this point.
516 : */
517 541925 : INJECTION_POINT("aio-worker-after-reopen", ioh);
518 :
519 541924 : error_errno = 0;
520 541924 : error_ioh = NULL;
521 :
522 : /*
523 : * As part of IO completion the buffer will be marked as NOACCESS,
524 : * until the buffer is pinned again - which never happens in io
525 : * workers. Therefore the next time there is IO for the same
526 : * buffer, the memory will be considered inaccessible. To avoid
527 : * that, explicitly allow access to the memory before reading data
528 : * into it.
529 : */
530 : #ifdef USE_VALGRIND
531 : {
532 : struct iovec *iov;
533 : uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
534 :
535 : for (int i = 0; i < iov_length; i++)
536 : VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
537 : }
538 : #endif
539 :
540 : /*
541 : * We don't expect this to ever fail with ERROR or FATAL, no need
542 : * to keep error_ioh set to the IO.
543 : * pgaio_io_perform_synchronously() contains a critical section to
544 : * ensure we don't accidentally fail.
545 : */
546 541924 : pgaio_io_perform_synchronously(ioh);
547 :
548 541924 : RESUME_INTERRUPTS();
549 541924 : errcallback.arg = NULL;
550 : }
551 : else
552 : {
553 538702 : WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
554 : WAIT_EVENT_IO_WORKER_MAIN);
555 538681 : ResetLatch(MyLatch);
556 : }
557 :
558 1080605 : CHECK_FOR_INTERRUPTS();
559 :
560 1080601 : if (ConfigReloadPending)
561 : {
562 286 : ConfigReloadPending = false;
563 286 : ProcessConfigFile(PGC_SIGHUP);
564 : }
565 : }
566 :
567 1900 : error_context_stack = errcallback.previous;
568 1900 : proc_exit(0);
569 : }
570 :
571 : bool
572 184198 : pgaio_workers_enabled(void)
573 : {
574 184198 : return io_method == IOMETHOD_WORKER;
575 : }
|