Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * method_worker.c
4 : * AIO - perform AIO using worker processes
5 : *
6 : * IO workers consume IOs from a shared memory submission queue, run
7 : * traditional synchronous system calls, and perform the shared completion
8 : * handling immediately. Client code submits most requests by pushing IOs
9 : * into the submission queue, and waits (if necessary) using condition
10 : * variables. Some IOs cannot be performed in another process due to lack of
11 : * infrastructure for reopening the file, and must processed synchronously by
12 : * the client code when submitted.
13 : *
14 : * So that the submitter can make just one system call when submitting a batch
15 : * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 : * could be improved by using futexes instead of latches to wake N waiters.
17 : *
18 : * This method of AIO is available in all builds on all operating systems, and
19 : * is the default.
20 : *
21 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
22 : * Portions Copyright (c) 1994, Regents of the University of California
23 : *
24 : * IDENTIFICATION
25 : * src/backend/storage/aio/method_worker.c
26 : *
27 : *-------------------------------------------------------------------------
28 : */
29 :
30 : #include "postgres.h"
31 :
32 : #include "libpq/pqsignal.h"
33 : #include "miscadmin.h"
34 : #include "port/pg_bitutils.h"
35 : #include "postmaster/auxprocess.h"
36 : #include "postmaster/interrupt.h"
37 : #include "storage/aio.h"
38 : #include "storage/aio_internal.h"
39 : #include "storage/aio_subsys.h"
40 : #include "storage/io_worker.h"
41 : #include "storage/ipc.h"
42 : #include "storage/latch.h"
43 : #include "storage/proc.h"
44 : #include "tcop/tcopprot.h"
45 : #include "utils/injection_point.h"
46 : #include "utils/memdebug.h"
47 : #include "utils/ps_status.h"
48 : #include "utils/wait_event.h"
49 :
50 :
51 : /* How many workers should each worker wake up if needed? */
52 : #define IO_WORKER_WAKEUP_FANOUT 2
53 :
54 :
55 : typedef struct AioWorkerSubmissionQueue
56 : {
57 : uint32 size;
58 : uint32 mask;
59 : uint32 head;
60 : uint32 tail;
61 : uint32 ios[FLEXIBLE_ARRAY_MEMBER];
62 : } AioWorkerSubmissionQueue;
63 :
64 : typedef struct AioWorkerSlot
65 : {
66 : Latch *latch;
67 : bool in_use;
68 : } AioWorkerSlot;
69 :
70 : typedef struct AioWorkerControl
71 : {
72 : uint64 idle_worker_mask;
73 : AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
74 : } AioWorkerControl;
75 :
76 :
77 : static size_t pgaio_worker_shmem_size(void);
78 : static void pgaio_worker_shmem_init(bool first_time);
79 :
80 : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
81 : static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
82 :
83 :
84 : const IoMethodOps pgaio_worker_ops = {
85 : .shmem_size = pgaio_worker_shmem_size,
86 : .shmem_init = pgaio_worker_shmem_init,
87 :
88 : .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
89 : .submit = pgaio_worker_submit,
90 : };
91 :
92 :
93 : /* GUCs */
94 : int io_workers = 3;
95 :
96 :
97 : static int io_worker_queue_size = 64;
98 : static int MyIoWorkerId;
99 : static AioWorkerSubmissionQueue *io_worker_submission_queue;
100 : static AioWorkerControl *io_worker_control;
101 :
102 :
103 : static size_t
104 5982 : pgaio_worker_queue_shmem_size(int *queue_size)
105 : {
106 : /* Round size up to next power of two so we can make a mask. */
107 5982 : *queue_size = pg_nextpower2_32(io_worker_queue_size);
108 :
109 11964 : return offsetof(AioWorkerSubmissionQueue, ios) +
110 5982 : sizeof(uint32) * *queue_size;
111 : }
112 :
113 : static size_t
114 5982 : pgaio_worker_control_shmem_size(void)
115 : {
116 5982 : return offsetof(AioWorkerControl, workers) +
117 : sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
118 : }
119 :
120 : static size_t
121 3892 : pgaio_worker_shmem_size(void)
122 : {
123 : size_t sz;
124 : int queue_size;
125 :
126 3892 : sz = pgaio_worker_queue_shmem_size(&queue_size);
127 3892 : sz = add_size(sz, pgaio_worker_control_shmem_size());
128 :
129 3892 : return sz;
130 : }
131 :
132 : static void
133 2090 : pgaio_worker_shmem_init(bool first_time)
134 : {
135 : bool found;
136 : int queue_size;
137 :
138 2090 : io_worker_submission_queue =
139 2090 : ShmemInitStruct("AioWorkerSubmissionQueue",
140 : pgaio_worker_queue_shmem_size(&queue_size),
141 : &found);
142 2090 : if (!found)
143 : {
144 2090 : io_worker_submission_queue->size = queue_size;
145 2090 : io_worker_submission_queue->head = 0;
146 2090 : io_worker_submission_queue->tail = 0;
147 : }
148 :
149 2090 : io_worker_control =
150 2090 : ShmemInitStruct("AioWorkerControl",
151 : pgaio_worker_control_shmem_size(),
152 : &found);
153 2090 : if (!found)
154 : {
155 2090 : io_worker_control->idle_worker_mask = 0;
156 68970 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
157 : {
158 66880 : io_worker_control->workers[i].latch = NULL;
159 66880 : io_worker_control->workers[i].in_use = false;
160 : }
161 : }
162 2090 : }
163 :
164 : static int
165 1213110 : pgaio_choose_idle_worker(void)
166 : {
167 : int worker;
168 :
169 1213110 : if (io_worker_control->idle_worker_mask == 0)
170 16866 : return -1;
171 :
172 : /* Find the lowest bit position, and clear it. */
173 1196244 : worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
174 1196244 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
175 :
176 1196244 : return worker;
177 : }
178 :
179 : static bool
180 1183530 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
181 : {
182 : AioWorkerSubmissionQueue *queue;
183 : uint32 new_head;
184 :
185 1183530 : queue = io_worker_submission_queue;
186 1183530 : new_head = (queue->head + 1) & (queue->size - 1);
187 1183530 : if (new_head == queue->tail)
188 : {
189 0 : pgaio_debug(DEBUG3, "io queue is full, at %u elements",
190 : io_worker_submission_queue->size);
191 0 : return false; /* full */
192 : }
193 :
194 1183530 : queue->ios[queue->head] = pgaio_io_get_id(ioh);
195 1183530 : queue->head = new_head;
196 :
197 1183530 : return true;
198 : }
199 :
200 : static uint32
201 1966952 : pgaio_worker_submission_queue_consume(void)
202 : {
203 : AioWorkerSubmissionQueue *queue;
204 : uint32 result;
205 :
206 1966952 : queue = io_worker_submission_queue;
207 1966952 : if (queue->tail == queue->head)
208 993562 : return UINT32_MAX; /* empty */
209 :
210 973390 : result = queue->ios[queue->tail];
211 973390 : queue->tail = (queue->tail + 1) & (queue->size - 1);
212 :
213 973390 : return result;
214 : }
215 :
216 : static uint32
217 1945660 : pgaio_worker_submission_queue_depth(void)
218 : {
219 : uint32 head;
220 : uint32 tail;
221 :
222 1945660 : head = io_worker_submission_queue->head;
223 1945660 : tail = io_worker_submission_queue->tail;
224 :
225 1945660 : if (tail > head)
226 1040 : head += io_worker_submission_queue->size;
227 :
228 : Assert(head >= tail);
229 :
230 1945660 : return head - tail;
231 : }
232 :
233 : static bool
234 1191484 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
235 : {
236 : return
237 1191484 : !IsUnderPostmaster
238 1185834 : || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
239 2377318 : || !pgaio_io_can_reopen(ioh);
240 : }
241 :
242 : static void
243 1182404 : pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
244 : {
245 : PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
246 1182404 : int nsync = 0;
247 1182404 : Latch *wakeup = NULL;
248 : int worker;
249 :
250 : Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
251 :
252 1182404 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
253 2365934 : for (int i = 0; i < nios; ++i)
254 : {
255 : Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
256 1183530 : if (!pgaio_worker_submission_queue_insert(ios[i]))
257 : {
258 : /*
259 : * We'll do it synchronously, but only after we've sent as many as
260 : * we can to workers, to maximize concurrency.
261 : */
262 0 : synchronous_ios[nsync++] = ios[i];
263 0 : continue;
264 : }
265 :
266 1183530 : if (wakeup == NULL)
267 : {
268 : /* Choose an idle worker to wake up if we haven't already. */
269 1182422 : worker = pgaio_choose_idle_worker();
270 1182422 : if (worker >= 0)
271 1174034 : wakeup = io_worker_control->workers[worker].latch;
272 :
273 1182422 : pgaio_debug_io(DEBUG4, ios[i],
274 : "choosing worker %d",
275 : worker);
276 : }
277 : }
278 1182404 : LWLockRelease(AioWorkerSubmissionQueueLock);
279 :
280 1182404 : if (wakeup)
281 1174034 : SetLatch(wakeup);
282 :
283 : /* Run whatever is left synchronously. */
284 1182404 : if (nsync > 0)
285 : {
286 0 : for (int i = 0; i < nsync; ++i)
287 : {
288 0 : pgaio_io_perform_synchronously(synchronous_ios[i]);
289 : }
290 : }
291 1182404 : }
292 :
293 : static int
294 1182404 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
295 : {
296 2365934 : for (int i = 0; i < num_staged_ios; i++)
297 : {
298 1183530 : PgAioHandle *ioh = staged_ios[i];
299 :
300 1183530 : pgaio_io_prepare_submit(ioh);
301 : }
302 :
303 1182404 : pgaio_worker_submit_internal(num_staged_ios, staged_ios);
304 :
305 1182404 : return num_staged_ios;
306 : }
307 :
308 : /*
309 : * on_shmem_exit() callback that releases the worker's slot in
310 : * io_worker_control.
311 : */
312 : static void
313 3106 : pgaio_worker_die(int code, Datum arg)
314 : {
315 3106 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
316 : Assert(io_worker_control->workers[MyIoWorkerId].in_use);
317 : Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
318 :
319 3106 : io_worker_control->workers[MyIoWorkerId].in_use = false;
320 3106 : io_worker_control->workers[MyIoWorkerId].latch = NULL;
321 3106 : LWLockRelease(AioWorkerSubmissionQueueLock);
322 3106 : }
323 :
324 : /*
325 : * Register the worker in shared memory, assign MyIoWorkerId and register a
326 : * shutdown callback to release registration.
327 : */
328 : static void
329 3106 : pgaio_worker_register(void)
330 : {
331 3106 : MyIoWorkerId = -1;
332 :
333 : /*
334 : * XXX: This could do with more fine-grained locking. But it's also not
335 : * very common for the number of workers to change at the moment...
336 : */
337 3106 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
338 :
339 7780 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
340 : {
341 7780 : if (!io_worker_control->workers[i].in_use)
342 : {
343 : Assert(io_worker_control->workers[i].latch == NULL);
344 3106 : io_worker_control->workers[i].in_use = true;
345 3106 : MyIoWorkerId = i;
346 3106 : break;
347 : }
348 : else
349 : Assert(io_worker_control->workers[i].latch != NULL);
350 : }
351 :
352 3106 : if (MyIoWorkerId == -1)
353 0 : elog(ERROR, "couldn't find a free worker slot");
354 :
355 3106 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
356 3106 : io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
357 3106 : LWLockRelease(AioWorkerSubmissionQueueLock);
358 :
359 3106 : on_shmem_exit(pgaio_worker_die, 0);
360 3106 : }
361 :
362 : static void
363 2254 : pgaio_worker_error_callback(void *arg)
364 : {
365 : ProcNumber owner;
366 : PGPROC *owner_proc;
367 : int32 owner_pid;
368 2254 : PgAioHandle *ioh = arg;
369 :
370 2254 : if (!ioh)
371 0 : return;
372 :
373 : Assert(ioh->owner_procno != MyProcNumber);
374 : Assert(MyBackendType == B_IO_WORKER);
375 :
376 2254 : owner = ioh->owner_procno;
377 2254 : owner_proc = GetPGProcByNumber(owner);
378 2254 : owner_pid = owner_proc->pid;
379 :
380 2254 : errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
381 : }
382 :
383 : void
384 3106 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
385 : {
386 : sigjmp_buf local_sigjmp_buf;
387 3106 : PgAioHandle *volatile error_ioh = NULL;
388 3106 : ErrorContextCallback errcallback = {0};
389 3106 : volatile int error_errno = 0;
390 : char cmd[128];
391 :
392 3106 : MyBackendType = B_IO_WORKER;
393 3106 : AuxiliaryProcessMainCommon();
394 :
395 3106 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
396 3106 : pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
397 :
398 : /*
399 : * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
400 : * shutdown sequence, similar to checkpointer.
401 : */
402 3106 : pqsignal(SIGTERM, SIG_IGN);
403 : /* SIGQUIT handler was already set up by InitPostmasterChild */
404 3106 : pqsignal(SIGALRM, SIG_IGN);
405 3106 : pqsignal(SIGPIPE, SIG_IGN);
406 3106 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
407 3106 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
408 :
409 : /* also registers a shutdown callback to unregister */
410 3106 : pgaio_worker_register();
411 :
412 3106 : sprintf(cmd, "%d", MyIoWorkerId);
413 3106 : set_ps_display(cmd);
414 :
415 3106 : errcallback.callback = pgaio_worker_error_callback;
416 3106 : errcallback.previous = error_context_stack;
417 3106 : error_context_stack = &errcallback;
418 :
419 : /* see PostgresMain() */
420 3106 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
421 : {
422 2 : error_context_stack = NULL;
423 2 : HOLD_INTERRUPTS();
424 :
425 2 : EmitErrorReport();
426 :
427 : /*
428 : * In the - very unlikely - case that the IO failed in a way that
429 : * raises an error we need to mark the IO as failed.
430 : *
431 : * Need to do just enough error recovery so that we can mark the IO as
432 : * failed and then exit (postmaster will start a new worker).
433 : */
434 2 : LWLockReleaseAll();
435 :
436 2 : if (error_ioh != NULL)
437 : {
438 : /* should never fail without setting error_errno */
439 : Assert(error_errno != 0);
440 :
441 2 : errno = error_errno;
442 :
443 2 : START_CRIT_SECTION();
444 2 : pgaio_io_process_completion(error_ioh, -error_errno);
445 2 : END_CRIT_SECTION();
446 : }
447 :
448 2 : proc_exit(1);
449 : }
450 :
451 : /* We can now handle ereport(ERROR) */
452 3106 : PG_exception_stack = &local_sigjmp_buf;
453 :
454 3106 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
455 :
456 1970024 : while (!ShutdownRequestPending)
457 : {
458 : uint32 io_index;
459 : Latch *latches[IO_WORKER_WAKEUP_FANOUT];
460 1966952 : int nlatches = 0;
461 1966952 : int nwakeups = 0;
462 : int worker;
463 :
464 : /* Try to get a job to do. */
465 1966952 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
466 1966952 : if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
467 : {
468 : /*
469 : * Nothing to do. Mark self idle.
470 : *
471 : * XXX: Invent some kind of back pressure to reduce useless
472 : * wakeups?
473 : */
474 993562 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
475 : }
476 : else
477 : {
478 : /* Got one. Clear idle flag. */
479 973390 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
480 :
481 : /* See if we can wake up some peers. */
482 973390 : nwakeups = Min(pgaio_worker_submission_queue_depth(),
483 : IO_WORKER_WAKEUP_FANOUT);
484 995600 : for (int i = 0; i < nwakeups; ++i)
485 : {
486 30688 : if ((worker = pgaio_choose_idle_worker()) < 0)
487 8478 : break;
488 22210 : latches[nlatches++] = io_worker_control->workers[worker].latch;
489 : }
490 : }
491 1966952 : LWLockRelease(AioWorkerSubmissionQueueLock);
492 :
493 1989162 : for (int i = 0; i < nlatches; ++i)
494 22210 : SetLatch(latches[i]);
495 :
496 1966952 : if (io_index != UINT32_MAX)
497 : {
498 973390 : PgAioHandle *ioh = NULL;
499 :
500 973390 : ioh = &pgaio_ctl->io_handles[io_index];
501 973390 : error_ioh = ioh;
502 973390 : errcallback.arg = ioh;
503 :
504 973390 : pgaio_debug_io(DEBUG4, ioh,
505 : "worker %d processing IO",
506 : MyIoWorkerId);
507 :
508 : /*
509 : * Prevent interrupts between pgaio_io_reopen() and
510 : * pgaio_io_perform_synchronously() that otherwise could lead to
511 : * the FD getting closed in that window.
512 : */
513 973390 : HOLD_INTERRUPTS();
514 :
515 : /*
516 : * It's very unlikely, but possible, that reopen fails. E.g. due
517 : * to memory allocations failing or file permissions changing or
518 : * such. In that case we need to fail the IO.
519 : *
520 : * There's not really a good errno we can report here.
521 : */
522 973390 : error_errno = ENOENT;
523 973390 : pgaio_io_reopen(ioh);
524 :
525 : /*
526 : * To be able to exercise the reopen-fails path, allow injection
527 : * points to trigger a failure at this point.
528 : */
529 973390 : INJECTION_POINT("aio-worker-after-reopen", ioh);
530 :
531 973388 : error_errno = 0;
532 973388 : error_ioh = NULL;
533 :
534 : /*
535 : * As part of IO completion the buffer will be marked as NOACCESS,
536 : * until the buffer is pinned again - which never happens in io
537 : * workers. Therefore the next time there is IO for the same
538 : * buffer, the memory will be considered inaccessible. To avoid
539 : * that, explicitly allow access to the memory before reading data
540 : * into it.
541 : */
542 : #ifdef USE_VALGRIND
543 : {
544 : struct iovec *iov;
545 : uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
546 :
547 : for (int i = 0; i < iov_length; i++)
548 : VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
549 : }
550 : #endif
551 :
552 : /*
553 : * We don't expect this to ever fail with ERROR or FATAL, no need
554 : * to keep error_ioh set to the IO.
555 : * pgaio_io_perform_synchronously() contains a critical section to
556 : * ensure we don't accidentally fail.
557 : */
558 973388 : pgaio_io_perform_synchronously(ioh);
559 :
560 973388 : RESUME_INTERRUPTS();
561 973388 : errcallback.arg = NULL;
562 : }
563 : else
564 : {
565 993562 : WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
566 : WAIT_EVENT_IO_WORKER_MAIN);
567 993538 : ResetLatch(MyLatch);
568 : }
569 :
570 1966926 : CHECK_FOR_INTERRUPTS();
571 : }
572 :
573 3072 : error_context_stack = errcallback.previous;
574 3072 : proc_exit(0);
575 : }
576 :
577 : bool
578 251734 : pgaio_workers_enabled(void)
579 : {
580 251734 : return io_method == IOMETHOD_WORKER;
581 : }
|