LCOV - code coverage report
Current view: top level - src/backend/storage/aio - method_worker.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 149 166 89.8 %
Date: 2025-04-01 15:15:16 Functions: 15 15 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * method_worker.c
       4             :  *    AIO - perform AIO using worker processes
       5             :  *
       6             :  * IO workers consume IOs from a shared memory submission queue, run
       7             :  * traditional synchronous system calls, and perform the shared completion
       8             :  * handling immediately.  Client code submits most requests by pushing IOs
       9             :  * into the submission queue, and waits (if necessary) using condition
      10             :  * variables.  Some IOs cannot be performed in another process due to lack of
      11             :  * infrastructure for reopening the file, and must processed synchronously by
      12             :  * the client code when submitted.
      13             :  *
      14             :  * So that the submitter can make just one system call when submitting a batch
      15             :  * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
      16             :  * could be improved by using futexes instead of latches to wake N waiters.
      17             :  *
      18             :  * This method of AIO is available in all builds on all operating systems, and
      19             :  * is the default.
      20             :  *
      21             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      22             :  * Portions Copyright (c) 1994, Regents of the University of California
      23             :  *
      24             :  * IDENTIFICATION
      25             :  *    src/backend/storage/aio/method_worker.c
      26             :  *
      27             :  *-------------------------------------------------------------------------
      28             :  */
      29             : 
      30             : #include "postgres.h"
      31             : 
      32             : #include "libpq/pqsignal.h"
      33             : #include "miscadmin.h"
      34             : #include "port/pg_bitutils.h"
      35             : #include "postmaster/auxprocess.h"
      36             : #include "postmaster/interrupt.h"
      37             : #include "storage/aio.h"
      38             : #include "storage/aio_internal.h"
      39             : #include "storage/aio_subsys.h"
      40             : #include "storage/io_worker.h"
      41             : #include "storage/ipc.h"
      42             : #include "storage/latch.h"
      43             : #include "storage/proc.h"
      44             : #include "tcop/tcopprot.h"
      45             : #include "utils/ps_status.h"
      46             : #include "utils/wait_event.h"
      47             : 
      48             : 
      49             : /* How many workers should each worker wake up if needed? */
      50             : #define IO_WORKER_WAKEUP_FANOUT 2
      51             : 
      52             : 
      53             : typedef struct AioWorkerSubmissionQueue
      54             : {
      55             :     uint32      size;
      56             :     uint32      mask;
      57             :     uint32      head;
      58             :     uint32      tail;
      59             :     uint32      ios[FLEXIBLE_ARRAY_MEMBER];
      60             : } AioWorkerSubmissionQueue;
      61             : 
      62             : typedef struct AioWorkerSlot
      63             : {
      64             :     Latch      *latch;
      65             :     bool        in_use;
      66             : } AioWorkerSlot;
      67             : 
      68             : typedef struct AioWorkerControl
      69             : {
      70             :     uint64      idle_worker_mask;
      71             :     AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
      72             : } AioWorkerControl;
      73             : 
      74             : 
      75             : static size_t pgaio_worker_shmem_size(void);
      76             : static void pgaio_worker_shmem_init(bool first_time);
      77             : 
      78             : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
      79             : static int  pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
      80             : 
      81             : 
      82             : const IoMethodOps pgaio_worker_ops = {
      83             :     .shmem_size = pgaio_worker_shmem_size,
      84             :     .shmem_init = pgaio_worker_shmem_init,
      85             : 
      86             :     .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
      87             :     .submit = pgaio_worker_submit,
      88             : };
      89             : 
      90             : 
      91             : /* GUCs */
      92             : int         io_workers = 3;
      93             : 
      94             : 
      95             : static int  io_worker_queue_size = 64;
      96             : static int  MyIoWorkerId;
      97             : static AioWorkerSubmissionQueue *io_worker_submission_queue;
      98             : static AioWorkerControl *io_worker_control;
      99             : 
     100             : 
     101             : static size_t
     102        5826 : pgaio_worker_queue_shmem_size(int *queue_size)
     103             : {
     104             :     /* Round size up to next power of two so we can make a mask. */
     105        5826 :     *queue_size = pg_nextpower2_32(io_worker_queue_size);
     106             : 
     107       11652 :     return offsetof(AioWorkerSubmissionQueue, ios) +
     108        5826 :         sizeof(uint32) * *queue_size;
     109             : }
     110             : 
     111             : static size_t
     112        5826 : pgaio_worker_control_shmem_size(void)
     113             : {
     114        5826 :     return offsetof(AioWorkerControl, workers) +
     115             :         sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
     116             : }
     117             : 
     118             : static size_t
     119        3794 : pgaio_worker_shmem_size(void)
     120             : {
     121             :     size_t      sz;
     122             :     int         queue_size;
     123             : 
     124        3794 :     sz = pgaio_worker_queue_shmem_size(&queue_size);
     125        3794 :     sz = add_size(sz, pgaio_worker_control_shmem_size());
     126             : 
     127        3794 :     return sz;
     128             : }
     129             : 
     130             : static void
     131        2032 : pgaio_worker_shmem_init(bool first_time)
     132             : {
     133             :     bool        found;
     134             :     int         queue_size;
     135             : 
     136        2032 :     io_worker_submission_queue =
     137        2032 :         ShmemInitStruct("AioWorkerSubmissionQueue",
     138             :                         pgaio_worker_queue_shmem_size(&queue_size),
     139             :                         &found);
     140        2032 :     if (!found)
     141             :     {
     142        2032 :         io_worker_submission_queue->size = queue_size;
     143        2032 :         io_worker_submission_queue->head = 0;
     144        2032 :         io_worker_submission_queue->tail = 0;
     145             :     }
     146             : 
     147        2032 :     io_worker_control =
     148        2032 :         ShmemInitStruct("AioWorkerControl",
     149             :                         pgaio_worker_control_shmem_size(),
     150             :                         &found);
     151        2032 :     if (!found)
     152             :     {
     153        2032 :         io_worker_control->idle_worker_mask = 0;
     154       67056 :         for (int i = 0; i < MAX_IO_WORKERS; ++i)
     155             :         {
     156       65024 :             io_worker_control->workers[i].latch = NULL;
     157       65024 :             io_worker_control->workers[i].in_use = false;
     158             :         }
     159             :     }
     160        2032 : }
     161             : 
     162             : static int
     163     1216326 : pgaio_choose_idle_worker(void)
     164             : {
     165             :     int         worker;
     166             : 
     167     1216326 :     if (io_worker_control->idle_worker_mask == 0)
     168       23500 :         return -1;
     169             : 
     170             :     /* Find the lowest bit position, and clear it. */
     171     1192826 :     worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
     172     1192826 :     io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
     173             : 
     174     1192826 :     return worker;
     175             : }
     176             : 
     177             : static bool
     178     1182350 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
     179             : {
     180             :     AioWorkerSubmissionQueue *queue;
     181             :     uint32      new_head;
     182             : 
     183     1182350 :     queue = io_worker_submission_queue;
     184     1182350 :     new_head = (queue->head + 1) & (queue->size - 1);
     185     1182350 :     if (new_head == queue->tail)
     186             :     {
     187           0 :         pgaio_debug(DEBUG3, "io queue is full, at %u elements",
     188             :                     io_worker_submission_queue->size);
     189           0 :         return false;           /* full */
     190             :     }
     191             : 
     192     1182350 :     queue->ios[queue->head] = pgaio_io_get_id(ioh);
     193     1182350 :     queue->head = new_head;
     194             : 
     195     1182350 :     return true;
     196             : }
     197             : 
     198             : static uint32
     199     1962930 : pgaio_worker_submission_queue_consume(void)
     200             : {
     201             :     AioWorkerSubmissionQueue *queue;
     202             :     uint32      result;
     203             : 
     204     1962930 :     queue = io_worker_submission_queue;
     205     1962930 :     if (queue->tail == queue->head)
     206      990164 :         return UINT32_MAX;      /* empty */
     207             : 
     208      972766 :     result = queue->ios[queue->tail];
     209      972766 :     queue->tail = (queue->tail + 1) & (queue->size - 1);
     210             : 
     211      972766 :     return result;
     212             : }
     213             : 
     214             : static uint32
     215     1943970 : pgaio_worker_submission_queue_depth(void)
     216             : {
     217             :     uint32      head;
     218             :     uint32      tail;
     219             : 
     220     1943970 :     head = io_worker_submission_queue->head;
     221     1943970 :     tail = io_worker_submission_queue->tail;
     222             : 
     223     1943970 :     if (tail > head)
     224         996 :         head += io_worker_submission_queue->size;
     225             : 
     226             :     Assert(head >= tail);
     227             : 
     228     1943970 :     return head - tail;
     229             : }
     230             : 
     231             : static bool
     232     1190164 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
     233             : {
     234             :     return
     235     1190164 :         !IsUnderPostmaster
     236     1184618 :         || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
     237     2374782 :         || !pgaio_io_can_reopen(ioh);
     238             : }
     239             : 
     240             : static void
     241     1181582 : pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
     242             : {
     243             :     PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
     244     1181582 :     int         nsync = 0;
     245     1181582 :     Latch      *wakeup = NULL;
     246             :     int         worker;
     247             : 
     248             :     Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
     249             : 
     250     1181582 :     LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     251     2363932 :     for (int i = 0; i < nios; ++i)
     252             :     {
     253             :         Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
     254     1182350 :         if (!pgaio_worker_submission_queue_insert(ios[i]))
     255             :         {
     256             :             /*
     257             :              * We'll do it synchronously, but only after we've sent as many as
     258             :              * we can to workers, to maximize concurrency.
     259             :              */
     260           0 :             synchronous_ios[nsync++] = ios[i];
     261           0 :             continue;
     262             :         }
     263             : 
     264     1182350 :         if (wakeup == NULL)
     265             :         {
     266             :             /* Choose an idle worker to wake up if we haven't already. */
     267     1181602 :             worker = pgaio_choose_idle_worker();
     268     1181602 :             if (worker >= 0)
     269     1169504 :                 wakeup = io_worker_control->workers[worker].latch;
     270             : 
     271     1181602 :             pgaio_debug_io(DEBUG4, ios[i],
     272             :                            "choosing worker %d",
     273             :                            worker);
     274             :         }
     275             :     }
     276     1181582 :     LWLockRelease(AioWorkerSubmissionQueueLock);
     277             : 
     278     1181582 :     if (wakeup)
     279     1169504 :         SetLatch(wakeup);
     280             : 
     281             :     /* Run whatever is left synchronously. */
     282     1181582 :     if (nsync > 0)
     283             :     {
     284           0 :         for (int i = 0; i < nsync; ++i)
     285             :         {
     286           0 :             pgaio_io_perform_synchronously(synchronous_ios[i]);
     287             :         }
     288             :     }
     289     1181582 : }
     290             : 
     291             : static int
     292     1181582 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
     293             : {
     294     2363932 :     for (int i = 0; i < num_staged_ios; i++)
     295             :     {
     296     1182350 :         PgAioHandle *ioh = staged_ios[i];
     297             : 
     298     1182350 :         pgaio_io_prepare_submit(ioh);
     299             :     }
     300             : 
     301     1181582 :     pgaio_worker_submit_internal(num_staged_ios, staged_ios);
     302             : 
     303     1181582 :     return num_staged_ios;
     304             : }
     305             : 
     306             : /*
     307             :  * on_shmem_exit() callback that releases the worker's slot in
     308             :  * io_worker_control.
     309             :  */
     310             : static void
     311        2964 : pgaio_worker_die(int code, Datum arg)
     312             : {
     313        2964 :     LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     314             :     Assert(io_worker_control->workers[MyIoWorkerId].in_use);
     315             :     Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
     316             : 
     317        2964 :     io_worker_control->workers[MyIoWorkerId].in_use = false;
     318        2964 :     io_worker_control->workers[MyIoWorkerId].latch = NULL;
     319        2964 :     LWLockRelease(AioWorkerSubmissionQueueLock);
     320        2964 : }
     321             : 
     322             : /*
     323             :  * Register the worker in shared memory, assign MyWorkerId and register a
     324             :  * shutdown callback to release registration.
     325             :  */
     326             : static void
     327        2964 : pgaio_worker_register(void)
     328             : {
     329        2964 :     MyIoWorkerId = -1;
     330             : 
     331             :     /*
     332             :      * XXX: This could do with more fine-grained locking. But it's also not
     333             :      * very common for the number of workers to change at the moment...
     334             :      */
     335        2964 :     LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     336             : 
     337        5928 :     for (int i = 0; i < MAX_IO_WORKERS; ++i)
     338             :     {
     339        5928 :         if (!io_worker_control->workers[i].in_use)
     340             :         {
     341             :             Assert(io_worker_control->workers[i].latch == NULL);
     342        2964 :             io_worker_control->workers[i].in_use = true;
     343        2964 :             MyIoWorkerId = i;
     344        2964 :             break;
     345             :         }
     346             :         else
     347             :             Assert(io_worker_control->workers[i].latch != NULL);
     348             :     }
     349             : 
     350        2964 :     if (MyIoWorkerId == -1)
     351           0 :         elog(ERROR, "couldn't find a free worker slot");
     352             : 
     353        2964 :     io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
     354        2964 :     io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
     355        2964 :     LWLockRelease(AioWorkerSubmissionQueueLock);
     356             : 
     357        2964 :     on_shmem_exit(pgaio_worker_die, 0);
     358        2964 : }
     359             : 
     360             : void
     361        2964 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
     362             : {
     363             :     sigjmp_buf  local_sigjmp_buf;
     364        2964 :     PgAioHandle *volatile error_ioh = NULL;
     365        2964 :     volatile int error_errno = 0;
     366             :     char        cmd[128];
     367             : 
     368        2964 :     MyBackendType = B_IO_WORKER;
     369        2964 :     AuxiliaryProcessMainCommon();
     370             : 
     371        2964 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     372        2964 :     pqsignal(SIGINT, die);      /* to allow manually triggering worker restart */
     373             : 
     374             :     /*
     375             :      * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
     376             :      * shutdown sequence, similar to checkpointer.
     377             :      */
     378        2964 :     pqsignal(SIGTERM, SIG_IGN);
     379             :     /* SIGQUIT handler was already set up by InitPostmasterChild */
     380        2964 :     pqsignal(SIGALRM, SIG_IGN);
     381        2964 :     pqsignal(SIGPIPE, SIG_IGN);
     382        2964 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
     383        2964 :     pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
     384             : 
     385             :     /* also registers a shutdown callback to unregister */
     386        2964 :     pgaio_worker_register();
     387             : 
     388        2964 :     sprintf(cmd, "%d", MyIoWorkerId);
     389        2964 :     set_ps_display(cmd);
     390             : 
     391             :     /* see PostgresMain() */
     392        2964 :     if (sigsetjmp(local_sigjmp_buf, 1) != 0)
     393             :     {
     394           0 :         error_context_stack = NULL;
     395           0 :         HOLD_INTERRUPTS();
     396             : 
     397           0 :         EmitErrorReport();
     398             : 
     399             :         /*
     400             :          * In the - very unlikely - case that the IO failed in a way that
     401             :          * raises an error we need to mark the IO as failed.
     402             :          *
     403             :          * Need to do just enough error recovery so that we can mark the IO as
     404             :          * failed and then exit (postmaster will start a new worker).
     405             :          */
     406           0 :         LWLockReleaseAll();
     407             : 
     408           0 :         if (error_ioh != NULL)
     409             :         {
     410             :             /* should never fail without setting error_errno */
     411             :             Assert(error_errno != 0);
     412             : 
     413           0 :             errno = error_errno;
     414             : 
     415           0 :             START_CRIT_SECTION();
     416           0 :             pgaio_io_process_completion(error_ioh, -error_errno);
     417           0 :             END_CRIT_SECTION();
     418             :         }
     419             : 
     420           0 :         proc_exit(1);
     421             :     }
     422             : 
     423             :     /* We can now handle ereport(ERROR) */
     424        2964 :     PG_exception_stack = &local_sigjmp_buf;
     425             : 
     426        2964 :     sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
     427             : 
     428     1965870 :     while (!ShutdownRequestPending)
     429             :     {
     430             :         uint32      io_index;
     431             :         Latch      *latches[IO_WORKER_WAKEUP_FANOUT];
     432     1962930 :         int         nlatches = 0;
     433     1962930 :         int         nwakeups = 0;
     434             :         int         worker;
     435             : 
     436             :         /* Try to get a job to do. */
     437     1962930 :         LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     438     1962930 :         if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
     439             :         {
     440             :             /*
     441             :              * Nothing to do.  Mark self idle.
     442             :              *
     443             :              * XXX: Invent some kind of back pressure to reduce useless
     444             :              * wakeups?
     445             :              */
     446      990164 :             io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
     447             :         }
     448             :         else
     449             :         {
     450             :             /* Got one.  Clear idle flag. */
     451      972766 :             io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
     452             : 
     453             :             /* See if we can wake up some peers. */
     454      972766 :             nwakeups = Min(pgaio_worker_submission_queue_depth(),
     455             :                            IO_WORKER_WAKEUP_FANOUT);
     456      996088 :             for (int i = 0; i < nwakeups; ++i)
     457             :             {
     458       34724 :                 if ((worker = pgaio_choose_idle_worker()) < 0)
     459       11402 :                     break;
     460       23322 :                 latches[nlatches++] = io_worker_control->workers[worker].latch;
     461             :             }
     462             :         }
     463     1962930 :         LWLockRelease(AioWorkerSubmissionQueueLock);
     464             : 
     465     1986252 :         for (int i = 0; i < nlatches; ++i)
     466       23322 :             SetLatch(latches[i]);
     467             : 
     468     1962930 :         if (io_index != UINT32_MAX)
     469             :         {
     470      972766 :             PgAioHandle *ioh = NULL;
     471             : 
     472      972766 :             ioh = &pgaio_ctl->io_handles[io_index];
     473      972766 :             error_ioh = ioh;
     474             : 
     475      972766 :             pgaio_debug_io(DEBUG4, ioh,
     476             :                            "worker %d processing IO",
     477             :                            MyIoWorkerId);
     478             : 
     479             :             /*
     480             :              * Prevent interrupts between pgaio_io_reopen() and
     481             :              * pgaio_io_perform_synchronously() that otherwise could lead to
     482             :              * the FD getting closed in that window.
     483             :              */
     484      972766 :             HOLD_INTERRUPTS();
     485             : 
     486             :             /*
     487             :              * It's very unlikely, but possible, that reopen fails. E.g. due
     488             :              * to memory allocations failing or file permissions changing or
     489             :              * such.  In that case we need to fail the IO.
     490             :              *
     491             :              * There's not really a good errno we can report here.
     492             :              */
     493      972766 :             error_errno = ENOENT;
     494      972766 :             pgaio_io_reopen(ioh);
     495             : 
     496             :             /*
     497             :              * To be able to exercise the reopen-fails path, allow injection
     498             :              * points to trigger a failure at this point.
     499             :              */
     500      972766 :             pgaio_io_call_inj(ioh, "AIO_WORKER_AFTER_REOPEN");
     501             : 
     502      972766 :             error_errno = 0;
     503      972766 :             error_ioh = NULL;
     504             : 
     505             :             /*
     506             :              * We don't expect this to ever fail with ERROR or FATAL, no need
     507             :              * to keep error_ioh set to the IO.
     508             :              * pgaio_io_perform_synchronously() contains a critical section to
     509             :              * ensure we don't accidentally fail.
     510             :              */
     511      972766 :             pgaio_io_perform_synchronously(ioh);
     512             : 
     513      972766 :             RESUME_INTERRUPTS();
     514             :         }
     515             :         else
     516             :         {
     517      990164 :             WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
     518             :                       WAIT_EVENT_IO_WORKER_MAIN);
     519      990140 :             ResetLatch(MyLatch);
     520             :         }
     521             : 
     522     1962906 :         CHECK_FOR_INTERRUPTS();
     523             :     }
     524             : 
     525        2940 :     proc_exit(0);
     526             : }
     527             : 
     528             : bool
     529      257048 : pgaio_workers_enabled(void)
     530             : {
     531      257048 :     return io_method == IOMETHOD_WORKER;
     532             : }

Generated by: LCOV version 1.14