Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * method_worker.c
4 : * AIO - perform AIO using worker processes
5 : *
6 : * IO workers consume IOs from a shared memory submission queue, run
7 : * traditional synchronous system calls, and perform the shared completion
8 : * handling immediately. Client code submits most requests by pushing IOs
9 : * into the submission queue, and waits (if necessary) using condition
10 : * variables. Some IOs cannot be performed in another process due to lack of
11 : * infrastructure for reopening the file, and must processed synchronously by
12 : * the client code when submitted.
13 : *
14 : * The pool of workers tries to stabilize at a size that can handle recently
15 : * seen variation in demand, within the configured limits.
16 : *
17 : * This method of AIO is available in all builds on all operating systems, and
18 : * is the default.
19 : *
20 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
21 : * Portions Copyright (c) 1994, Regents of the University of California
22 : *
23 : * IDENTIFICATION
24 : * src/backend/storage/aio/method_worker.c
25 : *
26 : *-------------------------------------------------------------------------
27 : */
28 :
29 : #include "postgres.h"
30 :
31 : #include <limits.h>
32 :
33 : #include "libpq/pqsignal.h"
34 : #include "miscadmin.h"
35 : #include "port/pg_bitutils.h"
36 : #include "postmaster/auxprocess.h"
37 : #include "postmaster/interrupt.h"
38 : #include "storage/aio.h"
39 : #include "storage/aio_internal.h"
40 : #include "storage/aio_subsys.h"
41 : #include "storage/io_worker.h"
42 : #include "storage/ipc.h"
43 : #include "storage/latch.h"
44 : #include "storage/lwlock.h"
45 : #include "storage/pmsignal.h"
46 : #include "storage/proc.h"
47 : #include "storage/shmem.h"
48 : #include "tcop/tcopprot.h"
49 : #include "utils/injection_point.h"
50 : #include "utils/memdebug.h"
51 : #include "utils/ps_status.h"
52 : #include "utils/wait_event.h"
53 :
54 : /*
55 : * Saturation for counters used to estimate wakeup:IO ratio.
56 : *
57 : * We maintain hist_wakeups for wakeups received and hist_ios for IOs
58 : * processed by each worker. When either counter reaches this saturation
59 : * value, we divide both by two. The result is an exponentially decaying
60 : * ratio of wakeups to IOs, with a very short memory.
61 : *
62 : * If a worker is itself experiencing useless wakeups, it assumes that
63 : * higher-numbered workers would experience even more, so it should end the
64 : * chain.
65 : */
66 : #define PGAIO_WORKER_WAKEUP_RATIO_SATURATE 4
67 :
68 : /* Debugging support: show current IO and wakeups:ios statistics in ps. */
69 : /* #define PGAIO_WORKER_SHOW_PS_INFO */
70 :
71 : typedef struct PgAioWorkerSubmissionQueue
72 : {
73 : uint32 size;
74 : uint32 head;
75 : uint32 tail;
76 : int sqes[FLEXIBLE_ARRAY_MEMBER];
77 : } PgAioWorkerSubmissionQueue;
78 :
79 : typedef struct PgAioWorkerSlot
80 : {
81 : ProcNumber proc_number;
82 : } PgAioWorkerSlot;
83 :
84 : /*
85 : * Sets of worker IDs are held in a simple bitmap, accessed through functions
86 : * that provide a more readable abstraction. If we wanted to support more
87 : * workers than that, the contention on the single queue would surely get too
88 : * high, so we might want to consider multiple pools instead of widening this.
89 : */
90 : typedef uint64 PgAioWorkerSet;
91 :
92 : #define PGAIO_WORKERSET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
93 :
94 : static_assert(PGAIO_WORKERSET_BITS >= MAX_IO_WORKERS, "too small");
95 :
96 : typedef struct PgAioWorkerControl
97 : {
98 : /* Seen by postmaster */
99 : bool grow;
100 : bool grow_signal_sent;
101 :
102 : /* Protected by AioWorkerSubmissionQueueLock. */
103 : PgAioWorkerSet idle_workerset;
104 :
105 : /* Protected by AioWorkerControlLock. */
106 : PgAioWorkerSet workerset;
107 : int nworkers;
108 :
109 : /* Protected by AioWorkerControlLock. */
110 : PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
111 : } PgAioWorkerControl;
112 :
113 :
114 : static void pgaio_worker_shmem_request(void *arg);
115 : static void pgaio_worker_shmem_init(void *arg);
116 :
117 : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
118 : static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
119 :
120 :
121 : const IoMethodOps pgaio_worker_ops = {
122 : .shmem_callbacks.request_fn = pgaio_worker_shmem_request,
123 : .shmem_callbacks.init_fn = pgaio_worker_shmem_init,
124 :
125 : .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
126 : .submit = pgaio_worker_submit,
127 : };
128 :
129 :
130 : /* GUCs */
131 : int io_min_workers = 2;
132 : int io_max_workers = 8;
133 : int io_worker_idle_timeout = 60000;
134 : int io_worker_launch_interval = 100;
135 :
136 :
137 : static int io_worker_queue_size = 64;
138 : static int MyIoWorkerId = -1;
139 : static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
140 : static PgAioWorkerControl *io_worker_control;
141 :
142 :
143 : static void
144 2476 : pgaio_workerset_initialize(PgAioWorkerSet *set)
145 : {
146 2476 : *set = 0;
147 2476 : }
148 :
149 : static bool
150 705566 : pgaio_workerset_is_empty(PgAioWorkerSet *set)
151 : {
152 705566 : return *set == 0;
153 : }
154 :
155 : static PgAioWorkerSet
156 1749182 : pgaio_workerset_singleton(int worker)
157 : {
158 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
159 1749182 : return UINT64_C(1) << worker;
160 : }
161 :
162 : static void
163 1326 : pgaio_workerset_all(PgAioWorkerSet *set)
164 : {
165 1326 : *set = UINT64_MAX >> (PGAIO_WORKERSET_BITS - MAX_IO_WORKERS);
166 1326 : }
167 :
168 : static void
169 1326 : pgaio_workerset_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
170 : {
171 1326 : *set1 &= ~*set2;
172 1326 : }
173 :
174 : static void
175 535290 : pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
176 : {
177 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
178 535290 : *set |= pgaio_workerset_singleton(worker);
179 535290 : }
180 :
181 : static void
182 1213892 : pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
183 : {
184 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
185 1213892 : *set &= ~pgaio_workerset_singleton(worker);
186 1213892 : }
187 :
188 : static void
189 16611 : pgaio_workerset_remove_lte(PgAioWorkerSet *set, int worker)
190 : {
191 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
192 16611 : *set &= (~(PgAioWorkerSet) 0) << (worker + 1);
193 16611 : }
194 :
195 : static int
196 20904 : pgaio_workerset_get_highest(PgAioWorkerSet *set)
197 : {
198 : Assert(!pgaio_workerset_is_empty(set));
199 20904 : return pg_leftmost_one_pos64(*set);
200 : }
201 :
202 : static int
203 664827 : pgaio_workerset_get_lowest(PgAioWorkerSet *set)
204 : {
205 : Assert(!pgaio_workerset_is_empty(set));
206 664827 : return pg_rightmost_one_pos64(*set);
207 : }
208 :
209 : static int
210 2400 : pgaio_workerset_pop_lowest(PgAioWorkerSet *set)
211 : {
212 2400 : int worker = pgaio_workerset_get_lowest(set);
213 :
214 2400 : pgaio_workerset_remove(set, worker);
215 2400 : return worker;
216 : }
217 :
218 : #ifdef USE_ASSERT_CHECKING
219 : static bool
220 : pgaio_workerset_contains(PgAioWorkerSet *set, int worker)
221 : {
222 : Assert(worker >= 0 && worker < MAX_IO_WORKERS);
223 : return (*set & pgaio_workerset_singleton(worker)) != 0;
224 : }
225 :
226 : static int
227 : pgaio_workerset_count(PgAioWorkerSet *set)
228 : {
229 : return pg_popcount64(*set);
230 : }
231 : #endif
232 :
233 : static void
234 1241 : pgaio_worker_shmem_request(void *arg)
235 : {
236 : size_t size;
237 : int queue_size;
238 :
239 : /* Round size up to next power of two so we can make a mask. */
240 1241 : queue_size = pg_nextpower2_32(io_worker_queue_size);
241 :
242 1241 : size = offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(int) * queue_size;
243 1241 : ShmemRequestStruct(.name = "AioWorkerSubmissionQueue",
244 : .size = size,
245 : .ptr = (void **) &io_worker_submission_queue,
246 : );
247 :
248 1241 : size = offsetof(PgAioWorkerControl, workers) + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
249 1241 : ShmemRequestStruct(.name = "AioWorkerControl",
250 : .size = size,
251 : .ptr = (void **) &io_worker_control,
252 : );
253 1241 : }
254 :
255 : static void
256 1238 : pgaio_worker_shmem_init(void *arg)
257 : {
258 : int queue_size;
259 :
260 : /* Round size up like in pgaio_worker_shmem_request() */
261 1238 : queue_size = pg_nextpower2_32(io_worker_queue_size);
262 :
263 1238 : io_worker_submission_queue->size = queue_size;
264 1238 : io_worker_submission_queue->head = 0;
265 1238 : io_worker_submission_queue->tail = 0;
266 1238 : io_worker_control->grow = false;
267 1238 : pgaio_workerset_initialize(&io_worker_control->workerset);
268 1238 : pgaio_workerset_initialize(&io_worker_control->idle_workerset);
269 :
270 40854 : for (int i = 0; i < MAX_IO_WORKERS; ++i)
271 39616 : io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
272 1238 : }
273 :
274 : /*
275 : * Tell postmaster that we think a new worker is needed.
276 : */
277 : static void
278 46 : pgaio_worker_request_grow(void)
279 : {
280 : /*
281 : * Suppress useless signaling if we already know that we're at the
282 : * maximum. This uses an unlocked read of nworkers, but that's OK for
283 : * this heuristic purpose.
284 : */
285 46 : if (io_worker_control->nworkers >= io_max_workers)
286 0 : return;
287 :
288 : /* Already requested? */
289 46 : if (io_worker_control->grow)
290 11 : return;
291 :
292 35 : io_worker_control->grow = true;
293 35 : pg_memory_barrier();
294 :
295 : /*
296 : * If the postmaster has already been signaled, don't do it again until
297 : * the postmaster clears this flag. There is no point in repeated signals
298 : * if grow is being set and cleared repeatedly while the postmaster is
299 : * waiting for io_worker_launch_interval, which it applies even to
300 : * canceled requests.
301 : */
302 35 : if (io_worker_control->grow_signal_sent)
303 4 : return;
304 :
305 31 : io_worker_control->grow_signal_sent = true;
306 31 : pg_memory_barrier();
307 31 : SendPostmasterSignal(PMSIGNAL_IO_WORKER_GROW);
308 : }
309 :
310 : /*
311 : * Cancel any request for a new worker, after observing an empty queue.
312 : */
313 : static void
314 533964 : pgaio_worker_cancel_grow(void)
315 : {
316 533964 : if (!io_worker_control->grow)
317 533929 : return;
318 :
319 35 : io_worker_control->grow = false;
320 35 : pg_memory_barrier();
321 : }
322 :
323 : /*
324 : * Called by the postmaster to check if a new worker has been requested (but
325 : * possibly canceled since).
326 : */
327 : bool
328 305127 : pgaio_worker_pm_test_grow_signal_sent(void)
329 : {
330 305127 : pg_memory_barrier();
331 305127 : return io_worker_control && io_worker_control->grow_signal_sent;
332 : }
333 :
334 : /*
335 : * Called by the postmaster to check if a new worker has been requested and
336 : * not canceled since.
337 : */
338 : bool
339 49 : pgaio_worker_pm_test_grow(void)
340 : {
341 49 : pg_memory_barrier();
342 49 : return io_worker_control && io_worker_control->grow;
343 : }
344 :
345 : /*
346 : * Called by the postmaster to clear the request for a new worker.
347 : */
348 : void
349 36 : pgaio_worker_pm_clear_grow_signal_sent(void)
350 : {
351 36 : if (!io_worker_control)
352 0 : return;
353 :
354 36 : io_worker_control->grow = false;
355 36 : io_worker_control->grow_signal_sent = false;
356 36 : pg_memory_barrier();
357 : }
358 :
359 : static int
360 699188 : pgaio_worker_choose_idle(int only_workers_above)
361 : {
362 : PgAioWorkerSet workerset;
363 : int worker;
364 :
365 : Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
366 :
367 699188 : workerset = io_worker_control->idle_workerset;
368 699188 : if (only_workers_above >= 0)
369 16611 : pgaio_workerset_remove_lte(&workerset, only_workers_above);
370 699188 : if (pgaio_workerset_is_empty(&workerset))
371 38087 : return -1;
372 :
373 : /* Find the lowest numbered idle worker and mark it not idle. */
374 661101 : worker = pgaio_workerset_get_lowest(&workerset);
375 661101 : pgaio_workerset_remove(&io_worker_control->idle_workerset, worker);
376 :
377 661101 : return worker;
378 : }
379 :
380 : /*
381 : * Try to wake a worker by setting its latch, to tell it there are IOs to
382 : * process in the submission queue.
383 : */
384 : static void
385 663501 : pgaio_worker_wake(int worker)
386 : {
387 : ProcNumber proc_number;
388 :
389 : /*
390 : * If the selected worker is concurrently exiting, then pgaio_worker_die()
391 : * had not yet removed it as of when we saw it in idle_workerset. That's
392 : * OK, because it will wake all remaining workers to close wakeup-vs-exit
393 : * races: *someone* will see the queued IO. If there are no workers
394 : * running, the postmaster will start a new one.
395 : */
396 663501 : proc_number = io_worker_control->workers[worker].proc_number;
397 663501 : if (proc_number != INVALID_PROC_NUMBER)
398 663499 : SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
399 663501 : }
400 :
401 : /*
402 : * Try to wake a set of workers. Used on pool change, to close races
403 : * described in the callers.
404 : */
405 : static void
406 2652 : pgaio_workerset_wake(PgAioWorkerSet workerset)
407 : {
408 5052 : while (!pgaio_workerset_is_empty(&workerset))
409 2400 : pgaio_worker_wake(pgaio_workerset_pop_lowest(&workerset));
410 2652 : }
411 :
412 : static bool
413 682770 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
414 : {
415 : PgAioWorkerSubmissionQueue *queue;
416 : uint32 new_head;
417 :
418 : Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
419 :
420 682770 : queue = io_worker_submission_queue;
421 682770 : new_head = (queue->head + 1) & (queue->size - 1);
422 682770 : if (new_head == queue->tail)
423 : {
424 0 : pgaio_debug(DEBUG3, "io queue is full, at %u elements",
425 : io_worker_submission_queue->size);
426 0 : return false; /* full */
427 : }
428 :
429 682770 : queue->sqes[queue->head] = pgaio_io_get_id(ioh);
430 682770 : queue->head = new_head;
431 :
432 682770 : return true;
433 : }
434 :
435 : static int
436 1081703 : pgaio_worker_submission_queue_consume(void)
437 : {
438 : PgAioWorkerSubmissionQueue *queue;
439 : int result;
440 :
441 : Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
442 :
443 1081703 : queue = io_worker_submission_queue;
444 1081703 : if (queue->tail == queue->head)
445 533964 : return -1; /* empty */
446 :
447 547739 : result = queue->sqes[queue->tail];
448 547739 : queue->tail = (queue->tail + 1) & (queue->size - 1);
449 :
450 547739 : return result;
451 : }
452 :
453 : static uint32
454 330212 : pgaio_worker_submission_queue_depth(void)
455 : {
456 : uint32 head;
457 : uint32 tail;
458 :
459 : Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
460 :
461 330212 : head = io_worker_submission_queue->head;
462 330212 : tail = io_worker_submission_queue->tail;
463 :
464 330212 : if (tail > head)
465 295 : head += io_worker_submission_queue->size;
466 :
467 : Assert(head >= tail);
468 :
469 330212 : return head - tail;
470 : }
471 :
472 : static bool
473 690144 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
474 : {
475 : return
476 690144 : !IsUnderPostmaster
477 686419 : || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
478 1376563 : || !pgaio_io_can_reopen(ioh);
479 : }
480 :
481 : static int
482 684288 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
483 : {
484 684288 : PgAioHandle **synchronous_ios = NULL;
485 684288 : int nsync = 0;
486 684288 : int worker = -1;
487 :
488 : Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
489 :
490 1368769 : for (int i = 0; i < num_staged_ios; i++)
491 684481 : pgaio_io_prepare_submit(staged_ios[i]);
492 :
493 684288 : if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
494 : {
495 1365347 : for (int i = 0; i < num_staged_ios; ++i)
496 : {
497 : Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
498 682770 : if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
499 : {
500 : /*
501 : * Do the rest synchronously. If the queue is full, give up
502 : * and do the rest synchronously. We're holding an exclusive
503 : * lock on the queue so nothing can consume entries.
504 : */
505 0 : synchronous_ios = &staged_ios[i];
506 0 : nsync = (num_staged_ios - i);
507 :
508 0 : break;
509 : }
510 : }
511 : /* Choose one worker to wake for this batch. */
512 682577 : worker = pgaio_worker_choose_idle(-1);
513 682577 : LWLockRelease(AioWorkerSubmissionQueueLock);
514 :
515 : /* Wake up chosen worker. It will wake peers if necessary. */
516 682577 : if (worker != -1)
517 658037 : pgaio_worker_wake(worker);
518 : }
519 : else
520 : {
521 : /* do everything synchronously, no wakeup needed */
522 1711 : synchronous_ios = staged_ios;
523 1711 : nsync = num_staged_ios;
524 : }
525 :
526 : /* Run whatever is left synchronously. */
527 684288 : if (nsync > 0)
528 : {
529 3422 : for (int i = 0; i < nsync; ++i)
530 : {
531 1711 : pgaio_io_perform_synchronously(synchronous_ios[i]);
532 : }
533 : }
534 :
535 684288 : return num_staged_ios;
536 : }
537 :
538 : /*
539 : * on_shmem_exit() callback that releases the worker's slot in
540 : * io_worker_control.
541 : */
542 : static void
543 1326 : pgaio_worker_die(int code, Datum arg)
544 : {
545 : PgAioWorkerSet notify_set;
546 :
547 1326 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
548 1326 : pgaio_workerset_remove(&io_worker_control->idle_workerset, MyIoWorkerId);
549 1326 : LWLockRelease(AioWorkerSubmissionQueueLock);
550 :
551 1326 : LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
552 : Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
553 1326 : io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
554 : Assert(pgaio_workerset_contains(&io_worker_control->workerset, MyIoWorkerId));
555 1326 : pgaio_workerset_remove(&io_worker_control->workerset, MyIoWorkerId);
556 1326 : notify_set = io_worker_control->workerset;
557 : Assert(io_worker_control->nworkers > 0);
558 1326 : io_worker_control->nworkers--;
559 : Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
560 : io_worker_control->nworkers);
561 1326 : LWLockRelease(AioWorkerControlLock);
562 :
563 : /*
564 : * Notify other workers on pool change. This allows the new highest
565 : * worker to know that it is now the one that can time out, and closes a
566 : * wakeup-loss race described in pgaio_worker_wake().
567 : */
568 1326 : pgaio_workerset_wake(notify_set);
569 1326 : }
570 :
571 : /*
572 : * Register the worker in shared memory, assign MyIoWorkerId and register a
573 : * shutdown callback to release registration.
574 : */
575 : static void
576 1326 : pgaio_worker_register(void)
577 : {
578 : PgAioWorkerSet free_workerset;
579 : PgAioWorkerSet old_workerset;
580 :
581 1326 : MyIoWorkerId = -1;
582 :
583 1326 : LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
584 : /* Find lowest unused worker ID. */
585 1326 : pgaio_workerset_all(&free_workerset);
586 1326 : pgaio_workerset_subtract(&free_workerset, &io_worker_control->workerset);
587 1326 : if (!pgaio_workerset_is_empty(&free_workerset))
588 1326 : MyIoWorkerId = pgaio_workerset_get_lowest(&free_workerset);
589 1326 : if (MyIoWorkerId == -1)
590 0 : elog(ERROR, "couldn't find a free worker ID");
591 :
592 : Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
593 : INVALID_PROC_NUMBER);
594 1326 : io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
595 :
596 1326 : old_workerset = io_worker_control->workerset;
597 : Assert(!pgaio_workerset_contains(&old_workerset, MyIoWorkerId));
598 1326 : pgaio_workerset_insert(&io_worker_control->workerset, MyIoWorkerId);
599 1326 : io_worker_control->nworkers++;
600 : Assert(io_worker_control->nworkers <= MAX_IO_WORKERS);
601 : Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
602 : io_worker_control->nworkers);
603 1326 : LWLockRelease(AioWorkerControlLock);
604 :
605 : /*
606 : * Notify other workers on pool change. If we were the highest worker,
607 : * this allows the new highest worker to know that it can time out.
608 : */
609 1326 : pgaio_workerset_wake(old_workerset);
610 :
611 1326 : on_shmem_exit(pgaio_worker_die, 0);
612 1326 : }
613 :
614 : static void
615 1634 : pgaio_worker_error_callback(void *arg)
616 : {
617 : ProcNumber owner;
618 : PGPROC *owner_proc;
619 : int32 owner_pid;
620 1634 : PgAioHandle *ioh = arg;
621 :
622 1634 : if (!ioh)
623 0 : return;
624 :
625 : Assert(ioh->owner_procno != MyProcNumber);
626 : Assert(MyBackendType == B_IO_WORKER);
627 :
628 1634 : owner = ioh->owner_procno;
629 1634 : owner_proc = GetPGProcByNumber(owner);
630 1634 : owner_pid = owner_proc->pid;
631 :
632 1634 : errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
633 : }
634 :
635 : /*
636 : * Check if this backend is allowed to time out, and thus should use a
637 : * non-infinite sleep time. Only the highest-numbered worker is allowed to
638 : * time out, and only if the pool is above io_min_workers. Serializing
639 : * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
640 : * io_min_workers.
641 : *
642 : * The result is only instantaneously true and may be temporarily inconsistent
643 : * in different workers around transitions, but all workers are woken up on
644 : * pool size or GUC changes making the result eventually consistent.
645 : */
646 : static bool
647 533995 : pgaio_worker_can_timeout(void)
648 : {
649 : PgAioWorkerSet workerset;
650 :
651 533995 : if (MyIoWorkerId < io_min_workers)
652 513091 : return false;
653 :
654 : /* Serialize against pool size changes. */
655 20904 : LWLockAcquire(AioWorkerControlLock, LW_SHARED);
656 20904 : workerset = io_worker_control->workerset;
657 20904 : LWLockRelease(AioWorkerControlLock);
658 :
659 20904 : if (MyIoWorkerId != pgaio_workerset_get_highest(&workerset))
660 139 : return false;
661 :
662 20765 : return true;
663 : }
664 :
665 : void
666 1326 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
667 : {
668 : sigjmp_buf local_sigjmp_buf;
669 1326 : TimestampTz idle_timeout_abs = 0;
670 1326 : int timeout_guc_used = 0;
671 1326 : PgAioHandle *volatile error_ioh = NULL;
672 1326 : ErrorContextCallback errcallback = {0};
673 1326 : volatile int error_errno = 0;
674 : char cmd[128];
675 1326 : int hist_ios = 0;
676 1326 : int hist_wakeups = 0;
677 :
678 1326 : AuxiliaryProcessMainCommon();
679 :
680 1326 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
681 1326 : pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
682 :
683 : /*
684 : * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
685 : * shutdown sequence, similar to checkpointer.
686 : */
687 1326 : pqsignal(SIGTERM, PG_SIG_IGN);
688 : /* SIGQUIT handler was already set up by InitPostmasterChild */
689 1326 : pqsignal(SIGALRM, PG_SIG_IGN);
690 1326 : pqsignal(SIGPIPE, PG_SIG_IGN);
691 1326 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
692 1326 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
693 :
694 : /* also registers a shutdown callback to unregister */
695 1326 : pgaio_worker_register();
696 :
697 1326 : sprintf(cmd, "%d", MyIoWorkerId);
698 1326 : set_ps_display(cmd);
699 :
700 1326 : errcallback.callback = pgaio_worker_error_callback;
701 1326 : errcallback.previous = error_context_stack;
702 1326 : error_context_stack = &errcallback;
703 :
704 : /* see PostgresMain() */
705 1326 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
706 : {
707 1 : error_context_stack = NULL;
708 1 : HOLD_INTERRUPTS();
709 :
710 1 : EmitErrorReport();
711 :
712 : /*
713 : * In the - very unlikely - case that the IO failed in a way that
714 : * raises an error we need to mark the IO as failed.
715 : *
716 : * Need to do just enough error recovery so that we can mark the IO as
717 : * failed and then exit (postmaster will start a new worker).
718 : */
719 1 : LWLockReleaseAll();
720 :
721 1 : if (error_ioh != NULL)
722 : {
723 : /* should never fail without setting error_errno */
724 : Assert(error_errno != 0);
725 :
726 1 : errno = error_errno;
727 :
728 1 : START_CRIT_SECTION();
729 1 : pgaio_io_process_completion(error_ioh, -error_errno);
730 1 : END_CRIT_SECTION();
731 : }
732 :
733 1 : proc_exit(1);
734 : }
735 :
736 : /* We can now handle ereport(ERROR) */
737 1326 : PG_exception_stack = &local_sigjmp_buf;
738 :
739 1326 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
740 :
741 1082987 : while (!ShutdownRequestPending)
742 : {
743 : uint32 io_index;
744 1081703 : int worker = -1;
745 1081703 : int queue_depth = 0;
746 1081703 : bool maybe_grow = false;
747 :
748 : /*
749 : * Try to get a job to do.
750 : *
751 : * The lwlock acquisition also provides the necessary memory barrier
752 : * to ensure that we don't see an outdated data in the handle.
753 : */
754 1081703 : LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
755 1081703 : if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
756 : {
757 : /* Nothing to do. Mark self idle. */
758 533964 : pgaio_workerset_insert(&io_worker_control->idle_workerset,
759 : MyIoWorkerId);
760 : }
761 : else
762 : {
763 : /* Got one. Clear idle flag. */
764 547739 : pgaio_workerset_remove(&io_worker_control->idle_workerset,
765 : MyIoWorkerId);
766 :
767 : /*
768 : * See if we should wake up a higher numbered peer. Only do that
769 : * if this worker is not receiving spurious wakeups itself. The
770 : * intention is create a frontier beyond which idle workers stay
771 : * asleep.
772 : *
773 : * This heuristic tries to discover the useful wakeup propagation
774 : * chain length when IOs are very fast and workers wake up to find
775 : * that all IOs have already been taken.
776 : *
777 : * If we chose not to wake a worker when we ideally should have,
778 : * then the ratio will soon change to correct that.
779 : */
780 547739 : if (hist_wakeups <= hist_ios)
781 : {
782 330212 : queue_depth = pgaio_worker_submission_queue_depth();
783 330212 : if (queue_depth > 0)
784 : {
785 : /* Choose a worker higher than me to wake. */
786 16611 : worker = pgaio_worker_choose_idle(MyIoWorkerId);
787 16611 : if (worker == -1)
788 13547 : maybe_grow = true;
789 : }
790 : }
791 : }
792 1081703 : LWLockRelease(AioWorkerSubmissionQueueLock);
793 :
794 : /* Propagate wakeups. */
795 1081703 : if (worker != -1)
796 : {
797 3064 : pgaio_worker_wake(worker);
798 : }
799 1078639 : else if (maybe_grow)
800 : {
801 : /*
802 : * We know there was at least one more item in the queue, and we
803 : * failed to find a higher-numbered idle worker to wake. Now we
804 : * decide if we should try to start one more worker.
805 : *
806 : * We do this with a simple heuristic: is the queue depth greater
807 : * than the current number of workers?
808 : *
809 : * Consider the following situations:
810 : *
811 : * 1. The queue depth is constantly increasing, because IOs are
812 : * arriving faster than they can possibly be serviced. It doesn't
813 : * matter much which threshold we choose, as we will surely hit
814 : * it. Crossing the current worker count is a useful signal
815 : * because it's clearly too deep to avoid queuing latency already,
816 : * but still leaves a small window of opportunity to improve the
817 : * situation before the queue overflows.
818 : *
819 : * 2. The worker pool is keeping up, no latency is being
820 : * introduced and an extra worker would be a waste of resources.
821 : * Queue depth distributions tend to be heavily skewed, with long
822 : * tails of low probability spikes (due to submission clustering,
823 : * scheduling, jitter, stalls, noisy neighbors, etc). We want a
824 : * number that is very unlikely to be triggered by an outlier, and
825 : * we bet that an exponential or similar distribution whose
826 : * outliers never reach this threshold must be almost entirely
827 : * concentrated at the low end. If we do see a spike as big as
828 : * the worker count, we take it as a signal that the distribution
829 : * is surely too wide.
830 : *
831 : * On its own, this is an extremely crude signal. When combined
832 : * with the wakeup propagation test that precedes it (but on its
833 : * own tends to overshoot) and io_worker_launch_interval, the
834 : * result is that we gradually test each pool size until we find
835 : * one that doesn't trigger further expansion, and then hold it
836 : * for at least io_worker_idle_timeout.
837 : *
838 : * XXX Perhaps ideas from queueing theory or control theory could
839 : * do a better job of this.
840 : */
841 :
842 : /* Read nworkers without lock for this heuristic purpose. */
843 13547 : if (queue_depth > io_worker_control->nworkers)
844 46 : pgaio_worker_request_grow();
845 : }
846 :
847 1081703 : if (io_index != -1)
848 : {
849 547739 : PgAioHandle *ioh = NULL;
850 :
851 : /* Cancel timeout and update wakeup:work ratio. */
852 547739 : idle_timeout_abs = 0;
853 547739 : if (++hist_ios == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
854 : {
855 165157 : hist_wakeups /= 2;
856 165157 : hist_ios /= 2;
857 : }
858 :
859 547739 : ioh = &pgaio_ctl->io_handles[io_index];
860 547739 : error_ioh = ioh;
861 547739 : errcallback.arg = ioh;
862 :
863 547739 : pgaio_debug_io(DEBUG4, ioh,
864 : "worker %d processing IO",
865 : MyIoWorkerId);
866 :
867 : /*
868 : * Prevent interrupts between pgaio_io_reopen() and
869 : * pgaio_io_perform_synchronously() that otherwise could lead to
870 : * the FD getting closed in that window.
871 : */
872 547739 : HOLD_INTERRUPTS();
873 :
874 : /*
875 : * It's very unlikely, but possible, that reopen fails. E.g. due
876 : * to memory allocations failing or file permissions changing or
877 : * such. In that case we need to fail the IO.
878 : *
879 : * There's not really a good errno we can report here.
880 : */
881 547739 : error_errno = ENOENT;
882 547739 : pgaio_io_reopen(ioh);
883 :
884 : /*
885 : * To be able to exercise the reopen-fails path, allow injection
886 : * points to trigger a failure at this point.
887 : */
888 547739 : INJECTION_POINT("aio-worker-after-reopen", ioh);
889 :
890 547738 : error_errno = 0;
891 547738 : error_ioh = NULL;
892 :
893 : /*
894 : * As part of IO completion the buffer will be marked as NOACCESS,
895 : * until the buffer is pinned again - which never happens in io
896 : * workers. Therefore the next time there is IO for the same
897 : * buffer, the memory will be considered inaccessible. To avoid
898 : * that, explicitly allow access to the memory before reading data
899 : * into it.
900 : */
901 : #ifdef USE_VALGRIND
902 : {
903 : struct iovec *iov;
904 : uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
905 :
906 : for (int i = 0; i < iov_length; i++)
907 : VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
908 : }
909 : #endif
910 :
911 : #ifdef PGAIO_WORKER_SHOW_PS_INFO
912 : {
913 : char *description = pgaio_io_get_target_description(ioh);
914 :
915 : sprintf(cmd, "%d: [%s] %s",
916 : MyIoWorkerId,
917 : pgaio_io_get_op_name(ioh),
918 : description);
919 : pfree(description);
920 : set_ps_display(cmd);
921 : }
922 : #endif
923 :
924 : /*
925 : * We don't expect this to ever fail with ERROR or FATAL, no need
926 : * to keep error_ioh set to the IO.
927 : * pgaio_io_perform_synchronously() contains a critical section to
928 : * ensure we don't accidentally fail.
929 : */
930 547738 : pgaio_io_perform_synchronously(ioh);
931 :
932 547738 : RESUME_INTERRUPTS();
933 547738 : errcallback.arg = NULL;
934 : }
935 : else
936 : {
937 : int timeout_ms;
938 :
939 : /* Cancel new worker request if pending. */
940 533964 : pgaio_worker_cancel_grow();
941 :
942 : /* Compute the remaining allowed idle time. */
943 533964 : if (io_worker_idle_timeout == -1)
944 : {
945 : /* Never time out. */
946 0 : timeout_ms = -1;
947 : }
948 : else
949 : {
950 533964 : TimestampTz now = GetCurrentTimestamp();
951 :
952 : /* If the GUC changes, reset timer. */
953 533964 : if (idle_timeout_abs != 0 &&
954 4048 : io_worker_idle_timeout != timeout_guc_used)
955 0 : idle_timeout_abs = 0;
956 :
957 : /* Only the highest-numbered worker can time out. */
958 533964 : if (pgaio_worker_can_timeout())
959 : {
960 20734 : if (idle_timeout_abs == 0)
961 : {
962 : /*
963 : * I have just been promoted to the timeout worker, or
964 : * the GUC changed. Compute new absolute time from
965 : * now.
966 : */
967 16686 : idle_timeout_abs =
968 16686 : TimestampTzPlusMilliseconds(now,
969 : io_worker_idle_timeout);
970 16686 : timeout_guc_used = io_worker_idle_timeout;
971 : }
972 20734 : timeout_ms =
973 20734 : TimestampDifferenceMilliseconds(now, idle_timeout_abs);
974 : }
975 : else
976 : {
977 : /* No timeout for me. */
978 513230 : idle_timeout_abs = 0;
979 513230 : timeout_ms = -1;
980 : }
981 : }
982 :
983 : #ifdef PGAIO_WORKER_SHOW_PS_INFO
984 : sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
985 : MyIoWorkerId, hist_wakeups, hist_ios);
986 : set_ps_display(cmd);
987 : #endif
988 :
989 533964 : if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
990 : timeout_ms,
991 : WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
992 : {
993 : /* WL_TIMEOUT */
994 31 : if (pgaio_worker_can_timeout())
995 31 : if (GetCurrentTimestamp() >= idle_timeout_abs)
996 31 : break;
997 : }
998 : else
999 : {
1000 : /* WL_LATCH_SET */
1001 533927 : if (++hist_wakeups == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
1002 : {
1003 114107 : hist_wakeups /= 2;
1004 114107 : hist_ios /= 2;
1005 : }
1006 : }
1007 533927 : ResetLatch(MyLatch);
1008 : }
1009 :
1010 1081665 : CHECK_FOR_INTERRUPTS();
1011 :
1012 1081661 : if (ConfigReloadPending)
1013 : {
1014 203 : ConfigReloadPending = false;
1015 203 : ProcessConfigFile(PGC_SIGHUP);
1016 :
1017 : /* If io_max_workers has been decreased, exit highest first. */
1018 203 : if (MyIoWorkerId >= io_max_workers)
1019 0 : break;
1020 : }
1021 : }
1022 :
1023 1315 : error_context_stack = errcallback.previous;
1024 1315 : proc_exit(0);
1025 : }
1026 :
1027 : bool
1028 314975 : pgaio_workers_enabled(void)
1029 : {
1030 314975 : return io_method == IOMETHOD_WORKER;
1031 : }
|