LCOV - code coverage report
Current view: top level - src/backend/storage/aio - method_worker.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 96.0 % 297 285
Test Date: 2026-05-04 15:16:29 Functions: 100.0 % 32 32
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * method_worker.c
       4              :  *    AIO - perform AIO using worker processes
       5              :  *
       6              :  * IO workers consume IOs from a shared memory submission queue, run
       7              :  * traditional synchronous system calls, and perform the shared completion
       8              :  * handling immediately.  Client code submits most requests by pushing IOs
       9              :  * into the submission queue, and waits (if necessary) using condition
      10              :  * variables.  Some IOs cannot be performed in another process due to lack of
      11              :  * infrastructure for reopening the file, and must processed synchronously by
      12              :  * the client code when submitted.
      13              :  *
      14              :  * The pool of workers tries to stabilize at a size that can handle recently
      15              :  * seen variation in demand, within the configured limits.
      16              :  *
      17              :  * This method of AIO is available in all builds on all operating systems, and
      18              :  * is the default.
      19              :  *
      20              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      21              :  * Portions Copyright (c) 1994, Regents of the University of California
      22              :  *
      23              :  * IDENTIFICATION
      24              :  *    src/backend/storage/aio/method_worker.c
      25              :  *
      26              :  *-------------------------------------------------------------------------
      27              :  */
      28              : 
      29              : #include "postgres.h"
      30              : 
      31              : #include <limits.h>
      32              : 
      33              : #include "libpq/pqsignal.h"
      34              : #include "miscadmin.h"
      35              : #include "port/pg_bitutils.h"
      36              : #include "postmaster/auxprocess.h"
      37              : #include "postmaster/interrupt.h"
      38              : #include "storage/aio.h"
      39              : #include "storage/aio_internal.h"
      40              : #include "storage/aio_subsys.h"
      41              : #include "storage/io_worker.h"
      42              : #include "storage/ipc.h"
      43              : #include "storage/latch.h"
      44              : #include "storage/lwlock.h"
      45              : #include "storage/pmsignal.h"
      46              : #include "storage/proc.h"
      47              : #include "storage/shmem.h"
      48              : #include "tcop/tcopprot.h"
      49              : #include "utils/injection_point.h"
      50              : #include "utils/memdebug.h"
      51              : #include "utils/ps_status.h"
      52              : #include "utils/wait_event.h"
      53              : 
      54              : /*
      55              :  * Saturation for counters used to estimate wakeup:IO ratio.
      56              :  *
      57              :  * We maintain hist_wakeups for wakeups received and hist_ios for IOs
      58              :  * processed by each worker.  When either counter reaches this saturation
      59              :  * value, we divide both by two.  The result is an exponentially decaying
      60              :  * ratio of wakeups to IOs, with a very short memory.
      61              :  *
      62              :  * If a worker is itself experiencing useless wakeups, it assumes that
      63              :  * higher-numbered workers would experience even more, so it should end the
      64              :  * chain.
      65              :  */
      66              : #define PGAIO_WORKER_WAKEUP_RATIO_SATURATE 4
      67              : 
      68              : /* Debugging support: show current IO and wakeups:ios statistics in ps. */
      69              : /* #define PGAIO_WORKER_SHOW_PS_INFO */
      70              : 
      71              : typedef struct PgAioWorkerSubmissionQueue
      72              : {
      73              :     uint32      size;
      74              :     uint32      head;
      75              :     uint32      tail;
      76              :     int         sqes[FLEXIBLE_ARRAY_MEMBER];
      77              : } PgAioWorkerSubmissionQueue;
      78              : 
      79              : typedef struct PgAioWorkerSlot
      80              : {
      81              :     ProcNumber  proc_number;
      82              : } PgAioWorkerSlot;
      83              : 
      84              : /*
      85              :  * Sets of worker IDs are held in a simple bitmap, accessed through functions
      86              :  * that provide a more readable abstraction.  If we wanted to support more
      87              :  * workers than that, the contention on the single queue would surely get too
      88              :  * high, so we might want to consider multiple pools instead of widening this.
      89              :  */
      90              : typedef uint64 PgAioWorkerSet;
      91              : 
      92              : #define PGAIO_WORKERSET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
      93              : 
      94              : static_assert(PGAIO_WORKERSET_BITS >= MAX_IO_WORKERS, "too small");
      95              : 
      96              : typedef struct PgAioWorkerControl
      97              : {
      98              :     /* Seen by postmaster */
      99              :     bool        grow;
     100              :     bool        grow_signal_sent;
     101              : 
     102              :     /* Protected by AioWorkerSubmissionQueueLock. */
     103              :     PgAioWorkerSet idle_workerset;
     104              : 
     105              :     /* Protected by AioWorkerControlLock. */
     106              :     PgAioWorkerSet workerset;
     107              :     int         nworkers;
     108              : 
     109              :     /* Protected by AioWorkerControlLock. */
     110              :     PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
     111              : } PgAioWorkerControl;
     112              : 
     113              : 
     114              : static void pgaio_worker_shmem_request(void *arg);
     115              : static void pgaio_worker_shmem_init(void *arg);
     116              : 
     117              : static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
     118              : static int  pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
     119              : 
     120              : 
     121              : const IoMethodOps pgaio_worker_ops = {
     122              :     .shmem_callbacks.request_fn = pgaio_worker_shmem_request,
     123              :     .shmem_callbacks.init_fn = pgaio_worker_shmem_init,
     124              : 
     125              :     .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
     126              :     .submit = pgaio_worker_submit,
     127              : };
     128              : 
     129              : 
     130              : /* GUCs */
     131              : int         io_min_workers = 2;
     132              : int         io_max_workers = 8;
     133              : int         io_worker_idle_timeout = 60000;
     134              : int         io_worker_launch_interval = 100;
     135              : 
     136              : 
     137              : static int  io_worker_queue_size = 64;
     138              : static int  MyIoWorkerId = -1;
     139              : static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
     140              : static PgAioWorkerControl *io_worker_control;
     141              : 
     142              : 
     143              : static void
     144         2476 : pgaio_workerset_initialize(PgAioWorkerSet *set)
     145              : {
     146         2476 :     *set = 0;
     147         2476 : }
     148              : 
     149              : static bool
     150       705566 : pgaio_workerset_is_empty(PgAioWorkerSet *set)
     151              : {
     152       705566 :     return *set == 0;
     153              : }
     154              : 
     155              : static PgAioWorkerSet
     156      1749182 : pgaio_workerset_singleton(int worker)
     157              : {
     158              :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
     159      1749182 :     return UINT64_C(1) << worker;
     160              : }
     161              : 
     162              : static void
     163         1326 : pgaio_workerset_all(PgAioWorkerSet *set)
     164              : {
     165         1326 :     *set = UINT64_MAX >> (PGAIO_WORKERSET_BITS - MAX_IO_WORKERS);
     166         1326 : }
     167              : 
     168              : static void
     169         1326 : pgaio_workerset_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
     170              : {
     171         1326 :     *set1 &= ~*set2;
     172         1326 : }
     173              : 
     174              : static void
     175       535290 : pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
     176              : {
     177              :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
     178       535290 :     *set |= pgaio_workerset_singleton(worker);
     179       535290 : }
     180              : 
     181              : static void
     182      1213892 : pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
     183              : {
     184              :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
     185      1213892 :     *set &= ~pgaio_workerset_singleton(worker);
     186      1213892 : }
     187              : 
     188              : static void
     189        16611 : pgaio_workerset_remove_lte(PgAioWorkerSet *set, int worker)
     190              : {
     191              :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
     192        16611 :     *set &= (~(PgAioWorkerSet) 0) << (worker + 1);
     193        16611 : }
     194              : 
     195              : static int
     196        20904 : pgaio_workerset_get_highest(PgAioWorkerSet *set)
     197              : {
     198              :     Assert(!pgaio_workerset_is_empty(set));
     199        20904 :     return pg_leftmost_one_pos64(*set);
     200              : }
     201              : 
     202              : static int
     203       664827 : pgaio_workerset_get_lowest(PgAioWorkerSet *set)
     204              : {
     205              :     Assert(!pgaio_workerset_is_empty(set));
     206       664827 :     return pg_rightmost_one_pos64(*set);
     207              : }
     208              : 
     209              : static int
     210         2400 : pgaio_workerset_pop_lowest(PgAioWorkerSet *set)
     211              : {
     212         2400 :     int         worker = pgaio_workerset_get_lowest(set);
     213              : 
     214         2400 :     pgaio_workerset_remove(set, worker);
     215         2400 :     return worker;
     216              : }
     217              : 
     218              : #ifdef USE_ASSERT_CHECKING
     219              : static bool
     220              : pgaio_workerset_contains(PgAioWorkerSet *set, int worker)
     221              : {
     222              :     Assert(worker >= 0 && worker < MAX_IO_WORKERS);
     223              :     return (*set & pgaio_workerset_singleton(worker)) != 0;
     224              : }
     225              : 
     226              : static int
     227              : pgaio_workerset_count(PgAioWorkerSet *set)
     228              : {
     229              :     return pg_popcount64(*set);
     230              : }
     231              : #endif
     232              : 
     233              : static void
     234         1241 : pgaio_worker_shmem_request(void *arg)
     235              : {
     236              :     size_t      size;
     237              :     int         queue_size;
     238              : 
     239              :     /* Round size up to next power of two so we can make a mask. */
     240         1241 :     queue_size = pg_nextpower2_32(io_worker_queue_size);
     241              : 
     242         1241 :     size = offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(int) * queue_size;
     243         1241 :     ShmemRequestStruct(.name = "AioWorkerSubmissionQueue",
     244              :                        .size = size,
     245              :                        .ptr = (void **) &io_worker_submission_queue,
     246              :         );
     247              : 
     248         1241 :     size = offsetof(PgAioWorkerControl, workers) + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
     249         1241 :     ShmemRequestStruct(.name = "AioWorkerControl",
     250              :                        .size = size,
     251              :                        .ptr = (void **) &io_worker_control,
     252              :         );
     253         1241 : }
     254              : 
     255              : static void
     256         1238 : pgaio_worker_shmem_init(void *arg)
     257              : {
     258              :     int         queue_size;
     259              : 
     260              :     /* Round size up like in pgaio_worker_shmem_request() */
     261         1238 :     queue_size = pg_nextpower2_32(io_worker_queue_size);
     262              : 
     263         1238 :     io_worker_submission_queue->size = queue_size;
     264         1238 :     io_worker_submission_queue->head = 0;
     265         1238 :     io_worker_submission_queue->tail = 0;
     266         1238 :     io_worker_control->grow = false;
     267         1238 :     pgaio_workerset_initialize(&io_worker_control->workerset);
     268         1238 :     pgaio_workerset_initialize(&io_worker_control->idle_workerset);
     269              : 
     270        40854 :     for (int i = 0; i < MAX_IO_WORKERS; ++i)
     271        39616 :         io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
     272         1238 : }
     273              : 
     274              : /*
     275              :  * Tell postmaster that we think a new worker is needed.
     276              :  */
     277              : static void
     278           46 : pgaio_worker_request_grow(void)
     279              : {
     280              :     /*
     281              :      * Suppress useless signaling if we already know that we're at the
     282              :      * maximum.  This uses an unlocked read of nworkers, but that's OK for
     283              :      * this heuristic purpose.
     284              :      */
     285           46 :     if (io_worker_control->nworkers >= io_max_workers)
     286            0 :         return;
     287              : 
     288              :     /* Already requested? */
     289           46 :     if (io_worker_control->grow)
     290           11 :         return;
     291              : 
     292           35 :     io_worker_control->grow = true;
     293           35 :     pg_memory_barrier();
     294              : 
     295              :     /*
     296              :      * If the postmaster has already been signaled, don't do it again until
     297              :      * the postmaster clears this flag.  There is no point in repeated signals
     298              :      * if grow is being set and cleared repeatedly while the postmaster is
     299              :      * waiting for io_worker_launch_interval, which it applies even to
     300              :      * canceled requests.
     301              :      */
     302           35 :     if (io_worker_control->grow_signal_sent)
     303            4 :         return;
     304              : 
     305           31 :     io_worker_control->grow_signal_sent = true;
     306           31 :     pg_memory_barrier();
     307           31 :     SendPostmasterSignal(PMSIGNAL_IO_WORKER_GROW);
     308              : }
     309              : 
     310              : /*
     311              :  * Cancel any request for a new worker, after observing an empty queue.
     312              :  */
     313              : static void
     314       533964 : pgaio_worker_cancel_grow(void)
     315              : {
     316       533964 :     if (!io_worker_control->grow)
     317       533929 :         return;
     318              : 
     319           35 :     io_worker_control->grow = false;
     320           35 :     pg_memory_barrier();
     321              : }
     322              : 
     323              : /*
     324              :  * Called by the postmaster to check if a new worker has been requested (but
     325              :  * possibly canceled since).
     326              :  */
     327              : bool
     328       305127 : pgaio_worker_pm_test_grow_signal_sent(void)
     329              : {
     330       305127 :     pg_memory_barrier();
     331       305127 :     return io_worker_control && io_worker_control->grow_signal_sent;
     332              : }
     333              : 
     334              : /*
     335              :  * Called by the postmaster to check if a new worker has been requested and
     336              :  * not canceled since.
     337              :  */
     338              : bool
     339           49 : pgaio_worker_pm_test_grow(void)
     340              : {
     341           49 :     pg_memory_barrier();
     342           49 :     return io_worker_control && io_worker_control->grow;
     343              : }
     344              : 
     345              : /*
     346              :  * Called by the postmaster to clear the request for a new worker.
     347              :  */
     348              : void
     349           36 : pgaio_worker_pm_clear_grow_signal_sent(void)
     350              : {
     351           36 :     if (!io_worker_control)
     352            0 :         return;
     353              : 
     354           36 :     io_worker_control->grow = false;
     355           36 :     io_worker_control->grow_signal_sent = false;
     356           36 :     pg_memory_barrier();
     357              : }
     358              : 
     359              : static int
     360       699188 : pgaio_worker_choose_idle(int only_workers_above)
     361              : {
     362              :     PgAioWorkerSet workerset;
     363              :     int         worker;
     364              : 
     365              :     Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
     366              : 
     367       699188 :     workerset = io_worker_control->idle_workerset;
     368       699188 :     if (only_workers_above >= 0)
     369        16611 :         pgaio_workerset_remove_lte(&workerset, only_workers_above);
     370       699188 :     if (pgaio_workerset_is_empty(&workerset))
     371        38087 :         return -1;
     372              : 
     373              :     /* Find the lowest numbered idle worker and mark it not idle. */
     374       661101 :     worker = pgaio_workerset_get_lowest(&workerset);
     375       661101 :     pgaio_workerset_remove(&io_worker_control->idle_workerset, worker);
     376              : 
     377       661101 :     return worker;
     378              : }
     379              : 
     380              : /*
     381              :  * Try to wake a worker by setting its latch, to tell it there are IOs to
     382              :  * process in the submission queue.
     383              :  */
     384              : static void
     385       663501 : pgaio_worker_wake(int worker)
     386              : {
     387              :     ProcNumber  proc_number;
     388              : 
     389              :     /*
     390              :      * If the selected worker is concurrently exiting, then pgaio_worker_die()
     391              :      * had not yet removed it as of when we saw it in idle_workerset.  That's
     392              :      * OK, because it will wake all remaining workers to close wakeup-vs-exit
     393              :      * races: *someone* will see the queued IO.  If there are no workers
     394              :      * running, the postmaster will start a new one.
     395              :      */
     396       663501 :     proc_number = io_worker_control->workers[worker].proc_number;
     397       663501 :     if (proc_number != INVALID_PROC_NUMBER)
     398       663499 :         SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
     399       663501 : }
     400              : 
     401              : /*
     402              :  * Try to wake a set of workers.  Used on pool change, to close races
     403              :  * described in the callers.
     404              :  */
     405              : static void
     406         2652 : pgaio_workerset_wake(PgAioWorkerSet workerset)
     407              : {
     408         5052 :     while (!pgaio_workerset_is_empty(&workerset))
     409         2400 :         pgaio_worker_wake(pgaio_workerset_pop_lowest(&workerset));
     410         2652 : }
     411              : 
     412              : static bool
     413       682770 : pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
     414              : {
     415              :     PgAioWorkerSubmissionQueue *queue;
     416              :     uint32      new_head;
     417              : 
     418              :     Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
     419              : 
     420       682770 :     queue = io_worker_submission_queue;
     421       682770 :     new_head = (queue->head + 1) & (queue->size - 1);
     422       682770 :     if (new_head == queue->tail)
     423              :     {
     424            0 :         pgaio_debug(DEBUG3, "io queue is full, at %u elements",
     425              :                     io_worker_submission_queue->size);
     426            0 :         return false;           /* full */
     427              :     }
     428              : 
     429       682770 :     queue->sqes[queue->head] = pgaio_io_get_id(ioh);
     430       682770 :     queue->head = new_head;
     431              : 
     432       682770 :     return true;
     433              : }
     434              : 
     435              : static int
     436      1081703 : pgaio_worker_submission_queue_consume(void)
     437              : {
     438              :     PgAioWorkerSubmissionQueue *queue;
     439              :     int         result;
     440              : 
     441              :     Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
     442              : 
     443      1081703 :     queue = io_worker_submission_queue;
     444      1081703 :     if (queue->tail == queue->head)
     445       533964 :         return -1;              /* empty */
     446              : 
     447       547739 :     result = queue->sqes[queue->tail];
     448       547739 :     queue->tail = (queue->tail + 1) & (queue->size - 1);
     449              : 
     450       547739 :     return result;
     451              : }
     452              : 
     453              : static uint32
     454       330212 : pgaio_worker_submission_queue_depth(void)
     455              : {
     456              :     uint32      head;
     457              :     uint32      tail;
     458              : 
     459              :     Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
     460              : 
     461       330212 :     head = io_worker_submission_queue->head;
     462       330212 :     tail = io_worker_submission_queue->tail;
     463              : 
     464       330212 :     if (tail > head)
     465          295 :         head += io_worker_submission_queue->size;
     466              : 
     467              :     Assert(head >= tail);
     468              : 
     469       330212 :     return head - tail;
     470              : }
     471              : 
     472              : static bool
     473       690144 : pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
     474              : {
     475              :     return
     476       690144 :         !IsUnderPostmaster
     477       686419 :         || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
     478      1376563 :         || !pgaio_io_can_reopen(ioh);
     479              : }
     480              : 
     481              : static int
     482       684288 : pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
     483              : {
     484       684288 :     PgAioHandle **synchronous_ios = NULL;
     485       684288 :     int         nsync = 0;
     486       684288 :     int         worker = -1;
     487              : 
     488              :     Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
     489              : 
     490      1368769 :     for (int i = 0; i < num_staged_ios; i++)
     491       684481 :         pgaio_io_prepare_submit(staged_ios[i]);
     492              : 
     493       684288 :     if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
     494              :     {
     495      1365347 :         for (int i = 0; i < num_staged_ios; ++i)
     496              :         {
     497              :             Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
     498       682770 :             if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
     499              :             {
     500              :                 /*
     501              :                  * Do the rest synchronously. If the queue is full, give up
     502              :                  * and do the rest synchronously. We're holding an exclusive
     503              :                  * lock on the queue so nothing can consume entries.
     504              :                  */
     505            0 :                 synchronous_ios = &staged_ios[i];
     506            0 :                 nsync = (num_staged_ios - i);
     507              : 
     508            0 :                 break;
     509              :             }
     510              :         }
     511              :         /* Choose one worker to wake for this batch. */
     512       682577 :         worker = pgaio_worker_choose_idle(-1);
     513       682577 :         LWLockRelease(AioWorkerSubmissionQueueLock);
     514              : 
     515              :         /* Wake up chosen worker.  It will wake peers if necessary. */
     516       682577 :         if (worker != -1)
     517       658037 :             pgaio_worker_wake(worker);
     518              :     }
     519              :     else
     520              :     {
     521              :         /* do everything synchronously, no wakeup needed */
     522         1711 :         synchronous_ios = staged_ios;
     523         1711 :         nsync = num_staged_ios;
     524              :     }
     525              : 
     526              :     /* Run whatever is left synchronously. */
     527       684288 :     if (nsync > 0)
     528              :     {
     529         3422 :         for (int i = 0; i < nsync; ++i)
     530              :         {
     531         1711 :             pgaio_io_perform_synchronously(synchronous_ios[i]);
     532              :         }
     533              :     }
     534              : 
     535       684288 :     return num_staged_ios;
     536              : }
     537              : 
     538              : /*
     539              :  * on_shmem_exit() callback that releases the worker's slot in
     540              :  * io_worker_control.
     541              :  */
     542              : static void
     543         1326 : pgaio_worker_die(int code, Datum arg)
     544              : {
     545              :     PgAioWorkerSet notify_set;
     546              : 
     547         1326 :     LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     548         1326 :     pgaio_workerset_remove(&io_worker_control->idle_workerset, MyIoWorkerId);
     549         1326 :     LWLockRelease(AioWorkerSubmissionQueueLock);
     550              : 
     551         1326 :     LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
     552              :     Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
     553         1326 :     io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
     554              :     Assert(pgaio_workerset_contains(&io_worker_control->workerset, MyIoWorkerId));
     555         1326 :     pgaio_workerset_remove(&io_worker_control->workerset, MyIoWorkerId);
     556         1326 :     notify_set = io_worker_control->workerset;
     557              :     Assert(io_worker_control->nworkers > 0);
     558         1326 :     io_worker_control->nworkers--;
     559              :     Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
     560              :            io_worker_control->nworkers);
     561         1326 :     LWLockRelease(AioWorkerControlLock);
     562              : 
     563              :     /*
     564              :      * Notify other workers on pool change.  This allows the new highest
     565              :      * worker to know that it is now the one that can time out, and closes a
     566              :      * wakeup-loss race described in pgaio_worker_wake().
     567              :      */
     568         1326 :     pgaio_workerset_wake(notify_set);
     569         1326 : }
     570              : 
     571              : /*
     572              :  * Register the worker in shared memory, assign MyIoWorkerId and register a
     573              :  * shutdown callback to release registration.
     574              :  */
     575              : static void
     576         1326 : pgaio_worker_register(void)
     577              : {
     578              :     PgAioWorkerSet free_workerset;
     579              :     PgAioWorkerSet old_workerset;
     580              : 
     581         1326 :     MyIoWorkerId = -1;
     582              : 
     583         1326 :     LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
     584              :     /* Find lowest unused worker ID. */
     585         1326 :     pgaio_workerset_all(&free_workerset);
     586         1326 :     pgaio_workerset_subtract(&free_workerset, &io_worker_control->workerset);
     587         1326 :     if (!pgaio_workerset_is_empty(&free_workerset))
     588         1326 :         MyIoWorkerId = pgaio_workerset_get_lowest(&free_workerset);
     589         1326 :     if (MyIoWorkerId == -1)
     590            0 :         elog(ERROR, "couldn't find a free worker ID");
     591              : 
     592              :     Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
     593              :            INVALID_PROC_NUMBER);
     594         1326 :     io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
     595              : 
     596         1326 :     old_workerset = io_worker_control->workerset;
     597              :     Assert(!pgaio_workerset_contains(&old_workerset, MyIoWorkerId));
     598         1326 :     pgaio_workerset_insert(&io_worker_control->workerset, MyIoWorkerId);
     599         1326 :     io_worker_control->nworkers++;
     600              :     Assert(io_worker_control->nworkers <= MAX_IO_WORKERS);
     601              :     Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
     602              :            io_worker_control->nworkers);
     603         1326 :     LWLockRelease(AioWorkerControlLock);
     604              : 
     605              :     /*
     606              :      * Notify other workers on pool change.  If we were the highest worker,
     607              :      * this allows the new highest worker to know that it can time out.
     608              :      */
     609         1326 :     pgaio_workerset_wake(old_workerset);
     610              : 
     611         1326 :     on_shmem_exit(pgaio_worker_die, 0);
     612         1326 : }
     613              : 
     614              : static void
     615         1634 : pgaio_worker_error_callback(void *arg)
     616              : {
     617              :     ProcNumber  owner;
     618              :     PGPROC     *owner_proc;
     619              :     int32       owner_pid;
     620         1634 :     PgAioHandle *ioh = arg;
     621              : 
     622         1634 :     if (!ioh)
     623            0 :         return;
     624              : 
     625              :     Assert(ioh->owner_procno != MyProcNumber);
     626              :     Assert(MyBackendType == B_IO_WORKER);
     627              : 
     628         1634 :     owner = ioh->owner_procno;
     629         1634 :     owner_proc = GetPGProcByNumber(owner);
     630         1634 :     owner_pid = owner_proc->pid;
     631              : 
     632         1634 :     errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
     633              : }
     634              : 
     635              : /*
     636              :  * Check if this backend is allowed to time out, and thus should use a
     637              :  * non-infinite sleep time.  Only the highest-numbered worker is allowed to
     638              :  * time out, and only if the pool is above io_min_workers.  Serializing
     639              :  * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
     640              :  * io_min_workers.
     641              :  *
     642              :  * The result is only instantaneously true and may be temporarily inconsistent
     643              :  * in different workers around transitions, but all workers are woken up on
     644              :  * pool size or GUC changes making the result eventually consistent.
     645              :  */
     646              : static bool
     647       533995 : pgaio_worker_can_timeout(void)
     648              : {
     649              :     PgAioWorkerSet workerset;
     650              : 
     651       533995 :     if (MyIoWorkerId < io_min_workers)
     652       513091 :         return false;
     653              : 
     654              :     /* Serialize against pool size changes. */
     655        20904 :     LWLockAcquire(AioWorkerControlLock, LW_SHARED);
     656        20904 :     workerset = io_worker_control->workerset;
     657        20904 :     LWLockRelease(AioWorkerControlLock);
     658              : 
     659        20904 :     if (MyIoWorkerId != pgaio_workerset_get_highest(&workerset))
     660          139 :         return false;
     661              : 
     662        20765 :     return true;
     663              : }
     664              : 
     665              : void
     666         1326 : IoWorkerMain(const void *startup_data, size_t startup_data_len)
     667              : {
     668              :     sigjmp_buf  local_sigjmp_buf;
     669         1326 :     TimestampTz idle_timeout_abs = 0;
     670         1326 :     int         timeout_guc_used = 0;
     671         1326 :     PgAioHandle *volatile error_ioh = NULL;
     672         1326 :     ErrorContextCallback errcallback = {0};
     673         1326 :     volatile int error_errno = 0;
     674              :     char        cmd[128];
     675         1326 :     int         hist_ios = 0;
     676         1326 :     int         hist_wakeups = 0;
     677              : 
     678         1326 :     AuxiliaryProcessMainCommon();
     679              : 
     680         1326 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     681         1326 :     pqsignal(SIGINT, die);      /* to allow manually triggering worker restart */
     682              : 
     683              :     /*
     684              :      * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
     685              :      * shutdown sequence, similar to checkpointer.
     686              :      */
     687         1326 :     pqsignal(SIGTERM, PG_SIG_IGN);
     688              :     /* SIGQUIT handler was already set up by InitPostmasterChild */
     689         1326 :     pqsignal(SIGALRM, PG_SIG_IGN);
     690         1326 :     pqsignal(SIGPIPE, PG_SIG_IGN);
     691         1326 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
     692         1326 :     pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
     693              : 
     694              :     /* also registers a shutdown callback to unregister */
     695         1326 :     pgaio_worker_register();
     696              : 
     697         1326 :     sprintf(cmd, "%d", MyIoWorkerId);
     698         1326 :     set_ps_display(cmd);
     699              : 
     700         1326 :     errcallback.callback = pgaio_worker_error_callback;
     701         1326 :     errcallback.previous = error_context_stack;
     702         1326 :     error_context_stack = &errcallback;
     703              : 
     704              :     /* see PostgresMain() */
     705         1326 :     if (sigsetjmp(local_sigjmp_buf, 1) != 0)
     706              :     {
     707            1 :         error_context_stack = NULL;
     708            1 :         HOLD_INTERRUPTS();
     709              : 
     710            1 :         EmitErrorReport();
     711              : 
     712              :         /*
     713              :          * In the - very unlikely - case that the IO failed in a way that
     714              :          * raises an error we need to mark the IO as failed.
     715              :          *
     716              :          * Need to do just enough error recovery so that we can mark the IO as
     717              :          * failed and then exit (postmaster will start a new worker).
     718              :          */
     719            1 :         LWLockReleaseAll();
     720              : 
     721            1 :         if (error_ioh != NULL)
     722              :         {
     723              :             /* should never fail without setting error_errno */
     724              :             Assert(error_errno != 0);
     725              : 
     726            1 :             errno = error_errno;
     727              : 
     728            1 :             START_CRIT_SECTION();
     729            1 :             pgaio_io_process_completion(error_ioh, -error_errno);
     730            1 :             END_CRIT_SECTION();
     731              :         }
     732              : 
     733            1 :         proc_exit(1);
     734              :     }
     735              : 
     736              :     /* We can now handle ereport(ERROR) */
     737         1326 :     PG_exception_stack = &local_sigjmp_buf;
     738              : 
     739         1326 :     sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
     740              : 
     741      1082987 :     while (!ShutdownRequestPending)
     742              :     {
     743              :         uint32      io_index;
     744      1081703 :         int         worker = -1;
     745      1081703 :         int         queue_depth = 0;
     746      1081703 :         bool        maybe_grow = false;
     747              : 
     748              :         /*
     749              :          * Try to get a job to do.
     750              :          *
     751              :          * The lwlock acquisition also provides the necessary memory barrier
     752              :          * to ensure that we don't see an outdated data in the handle.
     753              :          */
     754      1081703 :         LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
     755      1081703 :         if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
     756              :         {
     757              :             /* Nothing to do.  Mark self idle. */
     758       533964 :             pgaio_workerset_insert(&io_worker_control->idle_workerset,
     759              :                                    MyIoWorkerId);
     760              :         }
     761              :         else
     762              :         {
     763              :             /* Got one.  Clear idle flag. */
     764       547739 :             pgaio_workerset_remove(&io_worker_control->idle_workerset,
     765              :                                    MyIoWorkerId);
     766              : 
     767              :             /*
     768              :              * See if we should wake up a higher numbered peer.  Only do that
     769              :              * if this worker is not receiving spurious wakeups itself.  The
     770              :              * intention is create a frontier beyond which idle workers stay
     771              :              * asleep.
     772              :              *
     773              :              * This heuristic tries to discover the useful wakeup propagation
     774              :              * chain length when IOs are very fast and workers wake up to find
     775              :              * that all IOs have already been taken.
     776              :              *
     777              :              * If we chose not to wake a worker when we ideally should have,
     778              :              * then the ratio will soon change to correct that.
     779              :              */
     780       547739 :             if (hist_wakeups <= hist_ios)
     781              :             {
     782       330212 :                 queue_depth = pgaio_worker_submission_queue_depth();
     783       330212 :                 if (queue_depth > 0)
     784              :                 {
     785              :                     /* Choose a worker higher than me to wake. */
     786        16611 :                     worker = pgaio_worker_choose_idle(MyIoWorkerId);
     787        16611 :                     if (worker == -1)
     788        13547 :                         maybe_grow = true;
     789              :                 }
     790              :             }
     791              :         }
     792      1081703 :         LWLockRelease(AioWorkerSubmissionQueueLock);
     793              : 
     794              :         /* Propagate wakeups. */
     795      1081703 :         if (worker != -1)
     796              :         {
     797         3064 :             pgaio_worker_wake(worker);
     798              :         }
     799      1078639 :         else if (maybe_grow)
     800              :         {
     801              :             /*
     802              :              * We know there was at least one more item in the queue, and we
     803              :              * failed to find a higher-numbered idle worker to wake.  Now we
     804              :              * decide if we should try to start one more worker.
     805              :              *
     806              :              * We do this with a simple heuristic: is the queue depth greater
     807              :              * than the current number of workers?
     808              :              *
     809              :              * Consider the following situations:
     810              :              *
     811              :              * 1. The queue depth is constantly increasing, because IOs are
     812              :              * arriving faster than they can possibly be serviced.  It doesn't
     813              :              * matter much which threshold we choose, as we will surely hit
     814              :              * it.  Crossing the current worker count is a useful signal
     815              :              * because it's clearly too deep to avoid queuing latency already,
     816              :              * but still leaves a small window of opportunity to improve the
     817              :              * situation before the queue overflows.
     818              :              *
     819              :              * 2. The worker pool is keeping up, no latency is being
     820              :              * introduced and an extra worker would be a waste of resources.
     821              :              * Queue depth distributions tend to be heavily skewed, with long
     822              :              * tails of low probability spikes (due to submission clustering,
     823              :              * scheduling, jitter, stalls, noisy neighbors, etc).  We want a
     824              :              * number that is very unlikely to be triggered by an outlier, and
     825              :              * we bet that an exponential or similar distribution whose
     826              :              * outliers never reach this threshold must be almost entirely
     827              :              * concentrated at the low end.  If we do see a spike as big as
     828              :              * the worker count, we take it as a signal that the distribution
     829              :              * is surely too wide.
     830              :              *
     831              :              * On its own, this is an extremely crude signal.  When combined
     832              :              * with the wakeup propagation test that precedes it (but on its
     833              :              * own tends to overshoot) and io_worker_launch_interval, the
     834              :              * result is that we gradually test each pool size until we find
     835              :              * one that doesn't trigger further expansion, and then hold it
     836              :              * for at least io_worker_idle_timeout.
     837              :              *
     838              :              * XXX Perhaps ideas from queueing theory or control theory could
     839              :              * do a better job of this.
     840              :              */
     841              : 
     842              :             /* Read nworkers without lock for this heuristic purpose. */
     843        13547 :             if (queue_depth > io_worker_control->nworkers)
     844           46 :                 pgaio_worker_request_grow();
     845              :         }
     846              : 
     847      1081703 :         if (io_index != -1)
     848              :         {
     849       547739 :             PgAioHandle *ioh = NULL;
     850              : 
     851              :             /* Cancel timeout and update wakeup:work ratio. */
     852       547739 :             idle_timeout_abs = 0;
     853       547739 :             if (++hist_ios == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
     854              :             {
     855       165157 :                 hist_wakeups /= 2;
     856       165157 :                 hist_ios /= 2;
     857              :             }
     858              : 
     859       547739 :             ioh = &pgaio_ctl->io_handles[io_index];
     860       547739 :             error_ioh = ioh;
     861       547739 :             errcallback.arg = ioh;
     862              : 
     863       547739 :             pgaio_debug_io(DEBUG4, ioh,
     864              :                            "worker %d processing IO",
     865              :                            MyIoWorkerId);
     866              : 
     867              :             /*
     868              :              * Prevent interrupts between pgaio_io_reopen() and
     869              :              * pgaio_io_perform_synchronously() that otherwise could lead to
     870              :              * the FD getting closed in that window.
     871              :              */
     872       547739 :             HOLD_INTERRUPTS();
     873              : 
     874              :             /*
     875              :              * It's very unlikely, but possible, that reopen fails. E.g. due
     876              :              * to memory allocations failing or file permissions changing or
     877              :              * such.  In that case we need to fail the IO.
     878              :              *
     879              :              * There's not really a good errno we can report here.
     880              :              */
     881       547739 :             error_errno = ENOENT;
     882       547739 :             pgaio_io_reopen(ioh);
     883              : 
     884              :             /*
     885              :              * To be able to exercise the reopen-fails path, allow injection
     886              :              * points to trigger a failure at this point.
     887              :              */
     888       547739 :             INJECTION_POINT("aio-worker-after-reopen", ioh);
     889              : 
     890       547738 :             error_errno = 0;
     891       547738 :             error_ioh = NULL;
     892              : 
     893              :             /*
     894              :              * As part of IO completion the buffer will be marked as NOACCESS,
     895              :              * until the buffer is pinned again - which never happens in io
     896              :              * workers. Therefore the next time there is IO for the same
     897              :              * buffer, the memory will be considered inaccessible. To avoid
     898              :              * that, explicitly allow access to the memory before reading data
     899              :              * into it.
     900              :              */
     901              : #ifdef USE_VALGRIND
     902              :             {
     903              :                 struct iovec *iov;
     904              :                 uint16      iov_length = pgaio_io_get_iovec_length(ioh, &iov);
     905              : 
     906              :                 for (int i = 0; i < iov_length; i++)
     907              :                     VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
     908              :             }
     909              : #endif
     910              : 
     911              : #ifdef PGAIO_WORKER_SHOW_PS_INFO
     912              :             {
     913              :                 char       *description = pgaio_io_get_target_description(ioh);
     914              : 
     915              :                 sprintf(cmd, "%d: [%s] %s",
     916              :                         MyIoWorkerId,
     917              :                         pgaio_io_get_op_name(ioh),
     918              :                         description);
     919              :                 pfree(description);
     920              :                 set_ps_display(cmd);
     921              :             }
     922              : #endif
     923              : 
     924              :             /*
     925              :              * We don't expect this to ever fail with ERROR or FATAL, no need
     926              :              * to keep error_ioh set to the IO.
     927              :              * pgaio_io_perform_synchronously() contains a critical section to
     928              :              * ensure we don't accidentally fail.
     929              :              */
     930       547738 :             pgaio_io_perform_synchronously(ioh);
     931              : 
     932       547738 :             RESUME_INTERRUPTS();
     933       547738 :             errcallback.arg = NULL;
     934              :         }
     935              :         else
     936              :         {
     937              :             int         timeout_ms;
     938              : 
     939              :             /* Cancel new worker request if pending. */
     940       533964 :             pgaio_worker_cancel_grow();
     941              : 
     942              :             /* Compute the remaining allowed idle time. */
     943       533964 :             if (io_worker_idle_timeout == -1)
     944              :             {
     945              :                 /* Never time out. */
     946            0 :                 timeout_ms = -1;
     947              :             }
     948              :             else
     949              :             {
     950       533964 :                 TimestampTz now = GetCurrentTimestamp();
     951              : 
     952              :                 /* If the GUC changes, reset timer. */
     953       533964 :                 if (idle_timeout_abs != 0 &&
     954         4048 :                     io_worker_idle_timeout != timeout_guc_used)
     955            0 :                     idle_timeout_abs = 0;
     956              : 
     957              :                 /* Only the highest-numbered worker can time out. */
     958       533964 :                 if (pgaio_worker_can_timeout())
     959              :                 {
     960        20734 :                     if (idle_timeout_abs == 0)
     961              :                     {
     962              :                         /*
     963              :                          * I have just been promoted to the timeout worker, or
     964              :                          * the GUC changed.  Compute new absolute time from
     965              :                          * now.
     966              :                          */
     967        16686 :                         idle_timeout_abs =
     968        16686 :                             TimestampTzPlusMilliseconds(now,
     969              :                                                         io_worker_idle_timeout);
     970        16686 :                         timeout_guc_used = io_worker_idle_timeout;
     971              :                     }
     972        20734 :                     timeout_ms =
     973        20734 :                         TimestampDifferenceMilliseconds(now, idle_timeout_abs);
     974              :                 }
     975              :                 else
     976              :                 {
     977              :                     /* No timeout for me. */
     978       513230 :                     idle_timeout_abs = 0;
     979       513230 :                     timeout_ms = -1;
     980              :                 }
     981              :             }
     982              : 
     983              : #ifdef PGAIO_WORKER_SHOW_PS_INFO
     984              :             sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
     985              :                     MyIoWorkerId, hist_wakeups, hist_ios);
     986              :             set_ps_display(cmd);
     987              : #endif
     988              : 
     989       533964 :             if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
     990              :                           timeout_ms,
     991              :                           WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
     992              :             {
     993              :                 /* WL_TIMEOUT */
     994           31 :                 if (pgaio_worker_can_timeout())
     995           31 :                     if (GetCurrentTimestamp() >= idle_timeout_abs)
     996           31 :                         break;
     997              :             }
     998              :             else
     999              :             {
    1000              :                 /* WL_LATCH_SET */
    1001       533927 :                 if (++hist_wakeups == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
    1002              :                 {
    1003       114107 :                     hist_wakeups /= 2;
    1004       114107 :                     hist_ios /= 2;
    1005              :                 }
    1006              :             }
    1007       533927 :             ResetLatch(MyLatch);
    1008              :         }
    1009              : 
    1010      1081665 :         CHECK_FOR_INTERRUPTS();
    1011              : 
    1012      1081661 :         if (ConfigReloadPending)
    1013              :         {
    1014          203 :             ConfigReloadPending = false;
    1015          203 :             ProcessConfigFile(PGC_SIGHUP);
    1016              : 
    1017              :             /* If io_max_workers has been decreased, exit highest first. */
    1018          203 :             if (MyIoWorkerId >= io_max_workers)
    1019            0 :                 break;
    1020              :         }
    1021              :     }
    1022              : 
    1023         1315 :     error_context_stack = errcallback.previous;
    1024         1315 :     proc_exit(0);
    1025              : }
    1026              : 
    1027              : bool
    1028       314975 : pgaio_workers_enabled(void)
    1029              : {
    1030       314975 :     return io_method == IOMETHOD_WORKER;
    1031              : }
        

Generated by: LCOV version 2.0-1