Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * method_worker.c
4 : * AIO - perform AIO using worker processes
5 : *
6 : * IO workers consume IOs from a shared memory submission queue, run
7 : * traditional synchronous system calls, and perform the shared completion
8 : * handling immediately. Client code submits most requests by pushing IOs
9 : * into the submission queue, and waits (if necessary) using condition
10 : * variables. Some IOs cannot be performed in another process due to lack of
11 : * infrastructure for reopening the file, and must processed synchronously by
12 : * the client code when submitted.
13 : *
14 : * So that the submitter can make just one system call when submitting a batch
15 : * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 : * could be improved by using futexes instead of latches to wake N waiters.
17 : *
18 : * This method of AIO is available in all builds on all operating systems, and
19 : * is the default.
20 : *
21 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
22 : * Portions Copyright (c) 1994, Regents of the University of California
23 : *
24 : * IDENTIFICATION
25 : * src/backend/storage/aio/method_worker.c
26 : *
27 : *-------------------------------------------------------------------------
28 : */
29 :
30 : #include "postgres.h"
31 :
32 : #include "libpq/pqsignal.h"
33 : #include "miscadmin.h"
34 : #include "port/pg_bitutils.h"
35 : #include "postmaster/auxprocess.h"
36 : #include "postmaster/interrupt.h"
37 : #include "storage/aio.h"
38 : #include "storage/aio_internal.h"
39 : #include "storage/aio_subsys.h"
40 : #include "storage/io_worker.h"
41 : #include "storage/ipc.h"
42 : #include "storage/latch.h"
43 : #include "storage/proc.h"
44 : #include "tcop/tcopprot.h"
45 : #include "utils/ps_status.h"
46 : #include "utils/wait_event.h"
47 :
48 :
49 : /* How many workers should each worker wake up if needed? */
50 : #define IO_WORKER_WAKEUP_FANOUT 2
51 :
52 :
53 : typedef struct AioWorkerSubmissionQueue
54 : {
55 : uint32 size;
56 : uint32 mask;
57 : uint32 head;
58 : uint32 tail;
59 : uint32 ios[FLEXIBLE_ARRAY_MEMBER];
60 : } AioWorkerSubmissionQueue;
61 :
62 : typedef struct AioWorkerSlot
63 : {
64 : Latch *latch;
65 : bool in_use;
66 : } AioWorkerSlot;
67 :
68 : typedef struct AioWorkerControl
69 : {
70 : uint64 idle_worker_mask;
71 : AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
72 : } AioWorkerControl;
73 :
74 :
75 : static size_t pgaio_worker_shmem_size(void);
76 : static void pgaio_worker_shmem_init(bool first_time);
77 :
78 : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
79 : static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
80 :
81 :
82 : const IoMethodOps pgaio_worker_ops = {
83 : .shmem_size = pgaio_worker_shmem_size,
84 : .shmem_init = pgaio_worker_shmem_init,
85 :
86 : .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
87 : .submit = pgaio_worker_submit,
88 : };
89 :
90 :
91 : /* GUCs */
92 : int io_workers = 3;
93 :
94 :
95 : static int io_worker_queue_size = 64;
96 : static int MyIoWorkerId;
97 : static AioWorkerSubmissionQueue *io_worker_submission_queue;
98 : static AioWorkerControl *io_worker_control;
99 :
100 :
101 : static size_t
102 5826 : pgaio_worker_queue_shmem_size(int *queue_size)
103 : {
104 : /* Round size up to next power of two so we can make a mask. */
105 5826 : *queue_size = pg_nextpower2_32(io_worker_queue_size);
106 :
107 11652 : return offsetof(AioWorkerSubmissionQueue, ios) +
108 5826 : sizeof(uint32) * *queue_size;
109 : }
110 :
111 : static size_t
112 5826 : pgaio_worker_control_shmem_size(void)
113 : {
114 5826 : return offsetof(AioWorkerControl, workers) +
115 : sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
116 : }
117 :
118 : static size_t
119 3794 : pgaio_worker_shmem_size(void)
120 : {
121 : size_t sz;
122 : int queue_size;
123 :
124 3794 : sz = pgaio_worker_queue_shmem_size(&queue_size);
125 3794 : sz = add_size(sz, pgaio_worker_control_shmem_size());
126 :
127 3794 : return sz;
128 : }
129 :
130 : static void
131 2032 : pgaio_worker_shmem_init(bool first_time)
132 : {
133 : bool found;
134 : int queue_size;
135 :
136 2032 : io_worker_submission_queue =
137 2032 : ShmemInitStruct("AioWorkerSubmissionQueue",
138 : pgaio_worker_queue_shmem_size(&queue_size),
139 : &found);
140 2032 : if (!found)
141 : {
142 2032 : io_worker_submission_queue->size = queue_size;
143 2032 : io_worker_submission_queue->head = 0;
144 2032 : io_worker_submission_queue->tail = 0;
145 : }
146 :
147 2032 : io_worker_control =
148 2032 : ShmemInitStruct("AioWorkerControl",
149 : pgaio_worker_control_shmem_size(),
150 : &found);
151 2032 : if (!found)
152 : {
153 2032 : io_worker_control->idle_worker_mask = 0;
154 67056 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
155 : {
156 65024 : io_worker_control->workers[i].latch = NULL;
157 65024 : io_worker_control->workers[i].in_use = false;
158 : }
159 : }
160 2032 : }
161 :
162 : static int
163 1216326 : pgaio_choose_idle_worker(void)
164 : {
165 : int worker;
166 :
167 1216326 : if (io_worker_control->idle_worker_mask == 0)
168 23500 : return -1;
169 :
170 : /* Find the lowest bit position, and clear it. */
171 1192826 : worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
172 1192826 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
173 :
174 1192826 : return worker;
175 : }
176 :
177 : static bool
178 1182350 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
179 : {
180 : AioWorkerSubmissionQueue *queue;
181 : uint32 new_head;
182 :
183 1182350 : queue = io_worker_submission_queue;
184 1182350 : new_head = (queue->head + 1) & (queue->size - 1);
185 1182350 : if (new_head == queue->tail)
186 : {
187 0 : pgaio_debug(DEBUG3, "io queue is full, at %u elements",
188 : io_worker_submission_queue->size);
189 0 : return false; /* full */
190 : }
191 :
192 1182350 : queue->ios[queue->head] = pgaio_io_get_id(ioh);
193 1182350 : queue->head = new_head;
194 :
195 1182350 : return true;
196 : }
197 :
198 : static uint32
199 1962930 : pgaio_worker_submission_queue_consume(void)
200 : {
201 : AioWorkerSubmissionQueue *queue;
202 : uint32 result;
203 :
204 1962930 : queue = io_worker_submission_queue;
205 1962930 : if (queue->tail == queue->head)
206 990164 : return UINT32_MAX; /* empty */
207 :
208 972766 : result = queue->ios[queue->tail];
209 972766 : queue->tail = (queue->tail + 1) & (queue->size - 1);
210 :
211 972766 : return result;
212 : }
213 :
214 : static uint32
215 1943970 : pgaio_worker_submission_queue_depth(void)
216 : {
217 : uint32 head;
218 : uint32 tail;
219 :
220 1943970 : head = io_worker_submission_queue->head;
221 1943970 : tail = io_worker_submission_queue->tail;
222 :
223 1943970 : if (tail > head)
224 996 : head += io_worker_submission_queue->size;
225 :
226 : Assert(head >= tail);
227 :
228 1943970 : return head - tail;
229 : }
230 :
231 : static bool
232 1190164 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
233 : {
234 : return
235 1190164 : !IsUnderPostmaster
236 1184618 : || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
237 2374782 : || !pgaio_io_can_reopen(ioh);
238 : }
239 :
240 : static void
241 1181582 : pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
242 : {
243 : PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
244 1181582 : int nsync = 0;
245 1181582 : Latch *wakeup = NULL;
246 : int worker;
247 :
248 : Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
249 :
250 1181582 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
251 2363932 : for (int i = 0; i < nios; ++i)
252 : {
253 : Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
254 1182350 : if (!pgaio_worker_submission_queue_insert(ios[i]))
255 : {
256 : /*
257 : * We'll do it synchronously, but only after we've sent as many as
258 : * we can to workers, to maximize concurrency.
259 : */
260 0 : synchronous_ios[nsync++] = ios[i];
261 0 : continue;
262 : }
263 :
264 1182350 : if (wakeup == NULL)
265 : {
266 : /* Choose an idle worker to wake up if we haven't already. */
267 1181602 : worker = pgaio_choose_idle_worker();
268 1181602 : if (worker >= 0)
269 1169504 : wakeup = io_worker_control->workers[worker].latch;
270 :
271 1181602 : pgaio_debug_io(DEBUG4, ios[i],
272 : "choosing worker %d",
273 : worker);
274 : }
275 : }
276 1181582 : LWLockRelease(AioWorkerSubmissionQueueLock);
277 :
278 1181582 : if (wakeup)
279 1169504 : SetLatch(wakeup);
280 :
281 : /* Run whatever is left synchronously. */
282 1181582 : if (nsync > 0)
283 : {
284 0 : for (int i = 0; i < nsync; ++i)
285 : {
286 0 : pgaio_io_perform_synchronously(synchronous_ios[i]);
287 : }
288 : }
289 1181582 : }
290 :
291 : static int
292 1181582 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
293 : {
294 2363932 : for (int i = 0; i < num_staged_ios; i++)
295 : {
296 1182350 : PgAioHandle *ioh = staged_ios[i];
297 :
298 1182350 : pgaio_io_prepare_submit(ioh);
299 : }
300 :
301 1181582 : pgaio_worker_submit_internal(num_staged_ios, staged_ios);
302 :
303 1181582 : return num_staged_ios;
304 : }
305 :
306 : /*
307 : * on_shmem_exit() callback that releases the worker's slot in
308 : * io_worker_control.
309 : */
310 : static void
311 2964 : pgaio_worker_die(int code, Datum arg)
312 : {
313 2964 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
314 : Assert(io_worker_control->workers[MyIoWorkerId].in_use);
315 : Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
316 :
317 2964 : io_worker_control->workers[MyIoWorkerId].in_use = false;
318 2964 : io_worker_control->workers[MyIoWorkerId].latch = NULL;
319 2964 : LWLockRelease(AioWorkerSubmissionQueueLock);
320 2964 : }
321 :
322 : /*
323 : * Register the worker in shared memory, assign MyWorkerId and register a
324 : * shutdown callback to release registration.
325 : */
326 : static void
327 2964 : pgaio_worker_register(void)
328 : {
329 2964 : MyIoWorkerId = -1;
330 :
331 : /*
332 : * XXX: This could do with more fine-grained locking. But it's also not
333 : * very common for the number of workers to change at the moment...
334 : */
335 2964 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
336 :
337 5928 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
338 : {
339 5928 : if (!io_worker_control->workers[i].in_use)
340 : {
341 : Assert(io_worker_control->workers[i].latch == NULL);
342 2964 : io_worker_control->workers[i].in_use = true;
343 2964 : MyIoWorkerId = i;
344 2964 : break;
345 : }
346 : else
347 : Assert(io_worker_control->workers[i].latch != NULL);
348 : }
349 :
350 2964 : if (MyIoWorkerId == -1)
351 0 : elog(ERROR, "couldn't find a free worker slot");
352 :
353 2964 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
354 2964 : io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
355 2964 : LWLockRelease(AioWorkerSubmissionQueueLock);
356 :
357 2964 : on_shmem_exit(pgaio_worker_die, 0);
358 2964 : }
359 :
360 : void
361 2964 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
362 : {
363 : sigjmp_buf local_sigjmp_buf;
364 2964 : PgAioHandle *volatile error_ioh = NULL;
365 2964 : volatile int error_errno = 0;
366 : char cmd[128];
367 :
368 2964 : MyBackendType = B_IO_WORKER;
369 2964 : AuxiliaryProcessMainCommon();
370 :
371 2964 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
372 2964 : pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
373 :
374 : /*
375 : * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
376 : * shutdown sequence, similar to checkpointer.
377 : */
378 2964 : pqsignal(SIGTERM, SIG_IGN);
379 : /* SIGQUIT handler was already set up by InitPostmasterChild */
380 2964 : pqsignal(SIGALRM, SIG_IGN);
381 2964 : pqsignal(SIGPIPE, SIG_IGN);
382 2964 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
383 2964 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
384 :
385 : /* also registers a shutdown callback to unregister */
386 2964 : pgaio_worker_register();
387 :
388 2964 : sprintf(cmd, "%d", MyIoWorkerId);
389 2964 : set_ps_display(cmd);
390 :
391 : /* see PostgresMain() */
392 2964 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
393 : {
394 0 : error_context_stack = NULL;
395 0 : HOLD_INTERRUPTS();
396 :
397 0 : EmitErrorReport();
398 :
399 : /*
400 : * In the - very unlikely - case that the IO failed in a way that
401 : * raises an error we need to mark the IO as failed.
402 : *
403 : * Need to do just enough error recovery so that we can mark the IO as
404 : * failed and then exit (postmaster will start a new worker).
405 : */
406 0 : LWLockReleaseAll();
407 :
408 0 : if (error_ioh != NULL)
409 : {
410 : /* should never fail without setting error_errno */
411 : Assert(error_errno != 0);
412 :
413 0 : errno = error_errno;
414 :
415 0 : START_CRIT_SECTION();
416 0 : pgaio_io_process_completion(error_ioh, -error_errno);
417 0 : END_CRIT_SECTION();
418 : }
419 :
420 0 : proc_exit(1);
421 : }
422 :
423 : /* We can now handle ereport(ERROR) */
424 2964 : PG_exception_stack = &local_sigjmp_buf;
425 :
426 2964 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
427 :
428 1965870 : while (!ShutdownRequestPending)
429 : {
430 : uint32 io_index;
431 : Latch *latches[IO_WORKER_WAKEUP_FANOUT];
432 1962930 : int nlatches = 0;
433 1962930 : int nwakeups = 0;
434 : int worker;
435 :
436 : /* Try to get a job to do. */
437 1962930 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
438 1962930 : if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
439 : {
440 : /*
441 : * Nothing to do. Mark self idle.
442 : *
443 : * XXX: Invent some kind of back pressure to reduce useless
444 : * wakeups?
445 : */
446 990164 : io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
447 : }
448 : else
449 : {
450 : /* Got one. Clear idle flag. */
451 972766 : io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
452 :
453 : /* See if we can wake up some peers. */
454 972766 : nwakeups = Min(pgaio_worker_submission_queue_depth(),
455 : IO_WORKER_WAKEUP_FANOUT);
456 996088 : for (int i = 0; i < nwakeups; ++i)
457 : {
458 34724 : if ((worker = pgaio_choose_idle_worker()) < 0)
459 11402 : break;
460 23322 : latches[nlatches++] = io_worker_control->workers[worker].latch;
461 : }
462 : }
463 1962930 : LWLockRelease(AioWorkerSubmissionQueueLock);
464 :
465 1986252 : for (int i = 0; i < nlatches; ++i)
466 23322 : SetLatch(latches[i]);
467 :
468 1962930 : if (io_index != UINT32_MAX)
469 : {
470 972766 : PgAioHandle *ioh = NULL;
471 :
472 972766 : ioh = &pgaio_ctl->io_handles[io_index];
473 972766 : error_ioh = ioh;
474 :
475 972766 : pgaio_debug_io(DEBUG4, ioh,
476 : "worker %d processing IO",
477 : MyIoWorkerId);
478 :
479 : /*
480 : * Prevent interrupts between pgaio_io_reopen() and
481 : * pgaio_io_perform_synchronously() that otherwise could lead to
482 : * the FD getting closed in that window.
483 : */
484 972766 : HOLD_INTERRUPTS();
485 :
486 : /*
487 : * It's very unlikely, but possible, that reopen fails. E.g. due
488 : * to memory allocations failing or file permissions changing or
489 : * such. In that case we need to fail the IO.
490 : *
491 : * There's not really a good errno we can report here.
492 : */
493 972766 : error_errno = ENOENT;
494 972766 : pgaio_io_reopen(ioh);
495 :
496 : /*
497 : * To be able to exercise the reopen-fails path, allow injection
498 : * points to trigger a failure at this point.
499 : */
500 972766 : pgaio_io_call_inj(ioh, "AIO_WORKER_AFTER_REOPEN");
501 :
502 972766 : error_errno = 0;
503 972766 : error_ioh = NULL;
504 :
505 : /*
506 : * We don't expect this to ever fail with ERROR or FATAL, no need
507 : * to keep error_ioh set to the IO.
508 : * pgaio_io_perform_synchronously() contains a critical section to
509 : * ensure we don't accidentally fail.
510 : */
511 972766 : pgaio_io_perform_synchronously(ioh);
512 :
513 972766 : RESUME_INTERRUPTS();
514 : }
515 : else
516 : {
517 990164 : WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
518 : WAIT_EVENT_IO_WORKER_MAIN);
519 990140 : ResetLatch(MyLatch);
520 : }
521 :
522 1962906 : CHECK_FOR_INTERRUPTS();
523 : }
524 :
525 2940 : proc_exit(0);
526 : }
527 :
528 : bool
529 257048 : pgaio_workers_enabled(void)
530 : {
531 257048 : return io_method == IOMETHOD_WORKER;
532 : }
|