Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * method_worker.c
4 : * AIO - perform AIO using worker processes
5 : *
6 : * IO workers consume IOs from a shared memory submission queue, run
7 : * traditional synchronous system calls, and perform the shared completion
8 : * handling immediately. Client code submits most requests by pushing IOs
9 : * into the submission queue, and waits (if necessary) using condition
10 : * variables. Some IOs cannot be performed in another process due to lack of
11 : * infrastructure for reopening the file, and must processed synchronously by
12 : * the client code when submitted.
13 : *
14 : * So that the submitter can make just one system call when submitting a batch
15 : * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 : * could be improved by using futexes instead of latches to wake N waiters.
17 : *
18 : * This method of AIO is available in all builds on all operating systems, and
19 : * is the default.
20 : *
21 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
22 : * Portions Copyright (c) 1994, Regents of the University of California
23 : *
24 : * IDENTIFICATION
25 : * src/backend/storage/aio/method_worker.c
26 : *
27 : *-------------------------------------------------------------------------
28 : */
29 :
30 : #include "postgres.h"
31 :
32 : #include "libpq/pqsignal.h"
33 : #include "miscadmin.h"
34 : #include "port/pg_bitutils.h"
35 : #include "postmaster/auxprocess.h"
36 : #include "postmaster/interrupt.h"
37 : #include "storage/aio.h"
38 : #include "storage/aio_internal.h"
39 : #include "storage/aio_subsys.h"
40 : #include "storage/io_worker.h"
41 : #include "storage/ipc.h"
42 : #include "storage/latch.h"
43 : #include "storage/proc.h"
44 : #include "tcop/tcopprot.h"
45 : #include "utils/memdebug.h"
46 : #include "utils/ps_status.h"
47 : #include "utils/wait_event.h"
48 :
49 :
50 : /* How many workers should each worker wake up if needed? */
51 : #define IO_WORKER_WAKEUP_FANOUT 2
52 :
53 :
54 : typedef struct AioWorkerSubmissionQueue
55 : {
56 : uint32 size;
57 : uint32 mask;
58 : uint32 head;
59 : uint32 tail;
60 : uint32 ios[FLEXIBLE_ARRAY_MEMBER];
61 : } AioWorkerSubmissionQueue;
62 :
63 : typedef struct AioWorkerSlot
64 : {
65 : Latch *latch;
66 : bool in_use;
67 : } AioWorkerSlot;
68 :
69 : typedef struct AioWorkerControl
70 : {
71 : uint64 idle_worker_mask;
72 : AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
73 : } AioWorkerControl;
74 :
75 :
76 : static size_t pgaio_worker_shmem_size(void);
77 : static void pgaio_worker_shmem_init(bool first_time);
78 :
79 : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
80 : static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
81 :
82 :
83 : const IoMethodOps pgaio_worker_ops = {
84 : .shmem_size = pgaio_worker_shmem_size,
85 : .shmem_init = pgaio_worker_shmem_init,
86 :
87 : .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
88 : .submit = pgaio_worker_submit,
89 : };
90 :
91 :
92 : /* GUCs */
93 : int io_workers = 3;
94 :
95 :
96 : static int io_worker_queue_size = 64;
97 : static int MyIoWorkerId;
98 : static AioWorkerSubmissionQueue *io_worker_submission_queue;
99 : static AioWorkerControl *io_worker_control;
100 :
101 :
102 : static size_t
103 5982 : pgaio_worker_queue_shmem_size(int *queue_size)
104 : {
105 : /* Round size up to next power of two so we can make a mask. */
106 5982 : *queue_size = pg_nextpower2_32(io_worker_queue_size);
107 :
108 11964 : return offsetof(AioWorkerSubmissionQueue, ios) +
109 5982 : sizeof(uint32) * *queue_size;
110 : }
111 :
112 : static size_t
113 5982 : pgaio_worker_control_shmem_size(void)
114 : {
115 5982 : return offsetof(AioWorkerControl, workers) +
116 : sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
117 : }
118 :
119 : static size_t
120 3892 : pgaio_worker_shmem_size(void)
121 : {
122 : size_t sz;
123 : int queue_size;
124 :
125 3892 : sz = pgaio_worker_queue_shmem_size(&queue_size);
126 3892 : sz = add_size(sz, pgaio_worker_control_shmem_size());
127 :
128 3892 : return sz;
129 : }
130 :
131 : static void
132 2090 : pgaio_worker_shmem_init(bool first_time)
133 : {
134 : bool found;
135 : int queue_size;
136 :
137 2090 : io_worker_submission_queue =
138 2090 : ShmemInitStruct("AioWorkerSubmissionQueue",
139 : pgaio_worker_queue_shmem_size(&queue_size),
140 : &found);
141 2090 : if (!found)
142 : {
143 2090 : io_worker_submission_queue->size = queue_size;
144 2090 : io_worker_submission_queue->head = 0;
145 2090 : io_worker_submission_queue->tail = 0;
146 : }
147 :
148 2090 : io_worker_control =
149 2090 : ShmemInitStruct("AioWorkerControl",
150 : pgaio_worker_control_shmem_size(),
151 : &found);
152 2090 : if (!found)
153 : {
154 2090 : io_worker_control->idle_worker_mask = 0;
155 68970 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
156 : {
157 66880 : io_worker_control->workers[i].latch = NULL;
158 66880 : io_worker_control->workers[i].in_use = false;
159 : }
160 : }
161 2090 : }
162 :
163 : static int
164 1180294 : pgaio_choose_idle_worker(void)
165 : {
166 : int worker;
167 :
168 1180294 : if (io_worker_control->idle_worker_mask == 0)
169 15786 : return -1;
170 :
171 : /* Find the lowest bit position, and clear it. */
172 1164508 : worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
173 1164508 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
174 :
175 1164508 : return worker;
176 : }
177 :
178 : static bool
179 1152156 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
180 : {
181 : AioWorkerSubmissionQueue *queue;
182 : uint32 new_head;
183 :
184 1152156 : queue = io_worker_submission_queue;
185 1152156 : new_head = (queue->head + 1) & (queue->size - 1);
186 1152156 : if (new_head == queue->tail)
187 : {
188 0 : pgaio_debug(DEBUG3, "io queue is full, at %u elements",
189 : io_worker_submission_queue->size);
190 0 : return false; /* full */
191 : }
192 :
193 1152156 : queue->ios[queue->head] = pgaio_io_get_id(ioh);
194 1152156 : queue->head = new_head;
195 :
196 1152156 : return true;
197 : }
198 :
199 : static uint32
200 1903554 : pgaio_worker_submission_queue_consume(void)
201 : {
202 : AioWorkerSubmissionQueue *queue;
203 : uint32 result;
204 :
205 1903554 : queue = io_worker_submission_queue;
206 1903554 : if (queue->tail == queue->head)
207 961660 : return UINT32_MAX; /* empty */
208 :
209 941894 : result = queue->ios[queue->tail];
210 941894 : queue->tail = (queue->tail + 1) & (queue->size - 1);
211 :
212 941894 : return result;
213 : }
214 :
215 : static uint32
216 1882838 : pgaio_worker_submission_queue_depth(void)
217 : {
218 : uint32 head;
219 : uint32 tail;
220 :
221 1882838 : head = io_worker_submission_queue->head;
222 1882838 : tail = io_worker_submission_queue->tail;
223 :
224 1882838 : if (tail > head)
225 898 : head += io_worker_submission_queue->size;
226 :
227 : Assert(head >= tail);
228 :
229 1882838 : return head - tail;
230 : }
231 :
232 : static bool
233 1160128 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
234 : {
235 : return
236 1160128 : !IsUnderPostmaster
237 1154460 : || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
238 2314588 : || !pgaio_io_can_reopen(ioh);
239 : }
240 :
241 : static void
242 1151010 : pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
243 : {
244 : PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
245 1151010 : int nsync = 0;
246 1151010 : Latch *wakeup = NULL;
247 : int worker;
248 :
249 : Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
250 :
251 1151010 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
252 2303166 : for (int i = 0; i < nios; ++i)
253 : {
254 : Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
255 1152156 : if (!pgaio_worker_submission_queue_insert(ios[i]))
256 : {
257 : /*
258 : * We'll do it synchronously, but only after we've sent as many as
259 : * we can to workers, to maximize concurrency.
260 : */
261 0 : synchronous_ios[nsync++] = ios[i];
262 0 : continue;
263 : }
264 :
265 1152156 : if (wakeup == NULL)
266 : {
267 : /* Choose an idle worker to wake up if we haven't already. */
268 1151050 : worker = pgaio_choose_idle_worker();
269 1151050 : if (worker >= 0)
270 1142782 : wakeup = io_worker_control->workers[worker].latch;
271 :
272 1151050 : pgaio_debug_io(DEBUG4, ios[i],
273 : "choosing worker %d",
274 : worker);
275 : }
276 : }
277 1151010 : LWLockRelease(AioWorkerSubmissionQueueLock);
278 :
279 1151010 : if (wakeup)
280 1142782 : SetLatch(wakeup);
281 :
282 : /* Run whatever is left synchronously. */
283 1151010 : if (nsync > 0)
284 : {
285 0 : for (int i = 0; i < nsync; ++i)
286 : {
287 0 : pgaio_io_perform_synchronously(synchronous_ios[i]);
288 : }
289 : }
290 1151010 : }
291 :
292 : static int
293 1151010 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
294 : {
295 2303166 : for (int i = 0; i < num_staged_ios; i++)
296 : {
297 1152156 : PgAioHandle *ioh = staged_ios[i];
298 :
299 1152156 : pgaio_io_prepare_submit(ioh);
300 : }
301 :
302 1151010 : pgaio_worker_submit_internal(num_staged_ios, staged_ios);
303 :
304 1151010 : return num_staged_ios;
305 : }
306 :
307 : /*
308 : * on_shmem_exit() callback that releases the worker's slot in
309 : * io_worker_control.
310 : */
311 : static void
312 3084 : pgaio_worker_die(int code, Datum arg)
313 : {
314 3084 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
315 : Assert(io_worker_control->workers[MyIoWorkerId].in_use);
316 : Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
317 :
318 3084 : io_worker_control->workers[MyIoWorkerId].in_use = false;
319 3084 : io_worker_control->workers[MyIoWorkerId].latch = NULL;
320 3084 : LWLockRelease(AioWorkerSubmissionQueueLock);
321 3084 : }
322 :
323 : /*
324 : * Register the worker in shared memory, assign MyIoWorkerId and register a
325 : * shutdown callback to release registration.
326 : */
327 : static void
328 3084 : pgaio_worker_register(void)
329 : {
330 3084 : MyIoWorkerId = -1;
331 :
332 : /*
333 : * XXX: This could do with more fine-grained locking. But it's also not
334 : * very common for the number of workers to change at the moment...
335 : */
336 3084 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
337 :
338 7128 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
339 : {
340 7128 : if (!io_worker_control->workers[i].in_use)
341 : {
342 : Assert(io_worker_control->workers[i].latch == NULL);
343 3084 : io_worker_control->workers[i].in_use = true;
344 3084 : MyIoWorkerId = i;
345 3084 : break;
346 : }
347 : else
348 : Assert(io_worker_control->workers[i].latch != NULL);
349 : }
350 :
351 3084 : if (MyIoWorkerId == -1)
352 0 : elog(ERROR, "couldn't find a free worker slot");
353 :
354 3084 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
355 3084 : io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
356 3084 : LWLockRelease(AioWorkerSubmissionQueueLock);
357 :
358 3084 : on_shmem_exit(pgaio_worker_die, 0);
359 3084 : }
360 :
361 : static void
362 2254 : pgaio_worker_error_callback(void *arg)
363 : {
364 : ProcNumber owner;
365 : PGPROC *owner_proc;
366 : int32 owner_pid;
367 2254 : PgAioHandle *ioh = arg;
368 :
369 2254 : if (!ioh)
370 0 : return;
371 :
372 : Assert(ioh->owner_procno != MyProcNumber);
373 : Assert(MyBackendType == B_IO_WORKER);
374 :
375 2254 : owner = ioh->owner_procno;
376 2254 : owner_proc = GetPGProcByNumber(owner);
377 2254 : owner_pid = owner_proc->pid;
378 :
379 2254 : errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
380 : }
381 :
382 : void
383 3084 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
384 : {
385 : sigjmp_buf local_sigjmp_buf;
386 3084 : PgAioHandle *volatile error_ioh = NULL;
387 3084 : ErrorContextCallback errcallback = {0};
388 3084 : volatile int error_errno = 0;
389 : char cmd[128];
390 :
391 3084 : MyBackendType = B_IO_WORKER;
392 3084 : AuxiliaryProcessMainCommon();
393 :
394 3084 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
395 3084 : pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
396 :
397 : /*
398 : * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
399 : * shutdown sequence, similar to checkpointer.
400 : */
401 3084 : pqsignal(SIGTERM, SIG_IGN);
402 : /* SIGQUIT handler was already set up by InitPostmasterChild */
403 3084 : pqsignal(SIGALRM, SIG_IGN);
404 3084 : pqsignal(SIGPIPE, SIG_IGN);
405 3084 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
406 3084 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
407 :
408 : /* also registers a shutdown callback to unregister */
409 3084 : pgaio_worker_register();
410 :
411 3084 : sprintf(cmd, "%d", MyIoWorkerId);
412 3084 : set_ps_display(cmd);
413 :
414 3084 : errcallback.callback = pgaio_worker_error_callback;
415 3084 : errcallback.previous = error_context_stack;
416 3084 : error_context_stack = &errcallback;
417 :
418 : /* see PostgresMain() */
419 3084 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
420 : {
421 2 : error_context_stack = NULL;
422 2 : HOLD_INTERRUPTS();
423 :
424 2 : EmitErrorReport();
425 :
426 : /*
427 : * In the - very unlikely - case that the IO failed in a way that
428 : * raises an error we need to mark the IO as failed.
429 : *
430 : * Need to do just enough error recovery so that we can mark the IO as
431 : * failed and then exit (postmaster will start a new worker).
432 : */
433 2 : LWLockReleaseAll();
434 :
435 2 : if (error_ioh != NULL)
436 : {
437 : /* should never fail without setting error_errno */
438 : Assert(error_errno != 0);
439 :
440 2 : errno = error_errno;
441 :
442 2 : START_CRIT_SECTION();
443 2 : pgaio_io_process_completion(error_ioh, -error_errno);
444 2 : END_CRIT_SECTION();
445 : }
446 :
447 2 : proc_exit(1);
448 : }
449 :
450 : /* We can now handle ereport(ERROR) */
451 3084 : PG_exception_stack = &local_sigjmp_buf;
452 :
453 3084 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
454 :
455 1906604 : while (!ShutdownRequestPending)
456 : {
457 : uint32 io_index;
458 : Latch *latches[IO_WORKER_WAKEUP_FANOUT];
459 1903554 : int nlatches = 0;
460 1903554 : int nwakeups = 0;
461 : int worker;
462 :
463 : /* Try to get a job to do. */
464 1903554 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
465 1903554 : if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
466 : {
467 : /*
468 : * Nothing to do. Mark self idle.
469 : *
470 : * XXX: Invent some kind of back pressure to reduce useless
471 : * wakeups?
472 : */
473 961660 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
474 : }
475 : else
476 : {
477 : /* Got one. Clear idle flag. */
478 941894 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
479 :
480 : /* See if we can wake up some peers. */
481 941894 : nwakeups = Min(pgaio_worker_submission_queue_depth(),
482 : IO_WORKER_WAKEUP_FANOUT);
483 963620 : for (int i = 0; i < nwakeups; ++i)
484 : {
485 29244 : if ((worker = pgaio_choose_idle_worker()) < 0)
486 7518 : break;
487 21726 : latches[nlatches++] = io_worker_control->workers[worker].latch;
488 : }
489 : }
490 1903554 : LWLockRelease(AioWorkerSubmissionQueueLock);
491 :
492 1925280 : for (int i = 0; i < nlatches; ++i)
493 21726 : SetLatch(latches[i]);
494 :
495 1903554 : if (io_index != UINT32_MAX)
496 : {
497 941894 : PgAioHandle *ioh = NULL;
498 :
499 941894 : ioh = &pgaio_ctl->io_handles[io_index];
500 941894 : error_ioh = ioh;
501 941894 : errcallback.arg = ioh;
502 :
503 941894 : pgaio_debug_io(DEBUG4, ioh,
504 : "worker %d processing IO",
505 : MyIoWorkerId);
506 :
507 : /*
508 : * Prevent interrupts between pgaio_io_reopen() and
509 : * pgaio_io_perform_synchronously() that otherwise could lead to
510 : * the FD getting closed in that window.
511 : */
512 941894 : HOLD_INTERRUPTS();
513 :
514 : /*
515 : * It's very unlikely, but possible, that reopen fails. E.g. due
516 : * to memory allocations failing or file permissions changing or
517 : * such. In that case we need to fail the IO.
518 : *
519 : * There's not really a good errno we can report here.
520 : */
521 941894 : error_errno = ENOENT;
522 941894 : pgaio_io_reopen(ioh);
523 :
524 : /*
525 : * To be able to exercise the reopen-fails path, allow injection
526 : * points to trigger a failure at this point.
527 : */
528 941894 : pgaio_io_call_inj(ioh, "aio-worker-after-reopen");
529 :
530 941892 : error_errno = 0;
531 941892 : error_ioh = NULL;
532 :
533 : /*
534 : * As part of IO completion the buffer will be marked as NOACCESS,
535 : * until the buffer is pinned again - which never happens in io
536 : * workers. Therefore the next time there is IO for the same
537 : * buffer, the memory will be considered inaccessible. To avoid
538 : * that, explicitly allow access to the memory before reading data
539 : * into it.
540 : */
541 : #ifdef USE_VALGRIND
542 : {
543 : struct iovec *iov;
544 : uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
545 :
546 : for (int i = 0; i < iov_length; i++)
547 : VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
548 : }
549 : #endif
550 :
551 : /*
552 : * We don't expect this to ever fail with ERROR or FATAL, no need
553 : * to keep error_ioh set to the IO.
554 : * pgaio_io_perform_synchronously() contains a critical section to
555 : * ensure we don't accidentally fail.
556 : */
557 941892 : pgaio_io_perform_synchronously(ioh);
558 :
559 941892 : RESUME_INTERRUPTS();
560 941892 : errcallback.arg = NULL;
561 : }
562 : else
563 : {
564 961660 : WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
565 : WAIT_EVENT_IO_WORKER_MAIN);
566 961636 : ResetLatch(MyLatch);
567 : }
568 :
569 1903528 : CHECK_FOR_INTERRUPTS();
570 : }
571 :
572 3050 : error_context_stack = errcallback.previous;
573 3050 : proc_exit(0);
574 : }
575 :
576 : bool
577 245784 : pgaio_workers_enabled(void)
578 : {
579 245784 : return io_method == IOMETHOD_WORKER;
580 : }
|