LCOV - code coverage report
Current view: top level - src/backend/storage/aio - aio.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 281 327 85.9 %
Date: 2025-04-24 13:15:39 Functions: 37 39 94.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * aio.c
       4             :  *    AIO - Core Logic
       5             :  *
       6             :  * For documentation about how AIO works on a higher level, including a
       7             :  * schematic example, see README.md.
       8             :  *
       9             :  *
      10             :  * AIO is a complicated subsystem. To keep things navigable, it is split
      11             :  * across a number of files:
      12             :  *
      13             :  * - method_*.c - different ways of executing AIO (e.g. worker process)
      14             :  *
      15             :  * - aio_target.c - IO on different kinds of targets
      16             :  *
      17             :  * - aio_io.c - method-independent code for specific IO ops (e.g. readv)
      18             :  *
      19             :  * - aio_callback.c - callbacks at IO operation lifecycle events
      20             :  *
      21             :  * - aio_init.c - per-server and per-backend initialization
      22             :  *
      23             :  * - aio.c - all other topics
      24             :  *
      25             :  * - read_stream.c - helper for reading buffered relation data
      26             :  *
      27             :  * - README.md - higher-level overview over AIO
      28             :  *
      29             :  *
      30             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      31             :  * Portions Copyright (c) 1994, Regents of the University of California
      32             :  *
      33             :  * IDENTIFICATION
      34             :  *    src/backend/storage/aio/aio.c
      35             :  *
      36             :  *-------------------------------------------------------------------------
      37             :  */
      38             : 
      39             : #include "postgres.h"
      40             : 
      41             : #include "lib/ilist.h"
      42             : #include "miscadmin.h"
      43             : #include "port/atomics.h"
      44             : #include "storage/aio.h"
      45             : #include "storage/aio_internal.h"
      46             : #include "storage/aio_subsys.h"
      47             : #include "utils/guc.h"
      48             : #include "utils/guc_hooks.h"
      49             : #include "utils/resowner.h"
      50             : #include "utils/wait_event_types.h"
      51             : 
      52             : #ifdef USE_INJECTION_POINTS
      53             : #include "utils/injection_point.h"
      54             : #endif
      55             : 
      56             : 
      57             : static inline void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state);
      58             : static void pgaio_io_reclaim(PgAioHandle *ioh);
      59             : static void pgaio_io_resowner_register(PgAioHandle *ioh);
      60             : static void pgaio_io_wait_for_free(void);
      61             : static PgAioHandle *pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation);
      62             : static const char *pgaio_io_state_get_name(PgAioHandleState s);
      63             : static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation);
      64             : 
      65             : 
      66             : /* Options for io_method. */
      67             : const struct config_enum_entry io_method_options[] = {
      68             :     {"sync", IOMETHOD_SYNC, false},
      69             :     {"worker", IOMETHOD_WORKER, false},
      70             : #ifdef IOMETHOD_IO_URING_ENABLED
      71             :     {"io_uring", IOMETHOD_IO_URING, false},
      72             : #endif
      73             :     {NULL, 0, false}
      74             : };
      75             : 
      76             : /* GUCs */
      77             : int         io_method = DEFAULT_IO_METHOD;
      78             : int         io_max_concurrency = -1;
      79             : 
      80             : /* global control for AIO */
      81             : PgAioCtl   *pgaio_ctl;
      82             : 
      83             : /* current backend's per-backend state */
      84             : PgAioBackend *pgaio_my_backend;
      85             : 
      86             : 
      87             : static const IoMethodOps *const pgaio_method_ops_table[] = {
      88             :     [IOMETHOD_SYNC] = &pgaio_sync_ops,
      89             :     [IOMETHOD_WORKER] = &pgaio_worker_ops,
      90             : #ifdef IOMETHOD_IO_URING_ENABLED
      91             :     [IOMETHOD_IO_URING] = &pgaio_uring_ops,
      92             : #endif
      93             : };
      94             : 
      95             : /* callbacks for the configured io_method, set by assign_io_method */
      96             : const IoMethodOps *pgaio_method_ops;
      97             : 
      98             : 
      99             : /*
     100             :  * Currently there's no infrastructure to pass arguments to injection points,
     101             :  * so we instead set this up for the duration of the injection point
     102             :  * invocation. See pgaio_io_call_inj().
     103             :  */
     104             : #ifdef USE_INJECTION_POINTS
     105             : static PgAioHandle *pgaio_inj_cur_handle;
     106             : #endif
     107             : 
     108             : 
     109             : 
     110             : /* --------------------------------------------------------------------------------
     111             :  * Public Functions related to PgAioHandle
     112             :  * --------------------------------------------------------------------------------
     113             :  */
     114             : 
     115             : /*
     116             :  * Acquire an AioHandle, waiting for IO completion if necessary.
     117             :  *
     118             :  * Each backend can only have one AIO handle that has been "handed out" to
     119             :  * code, but not yet submitted or released. This restriction is necessary to
     120             :  * ensure that it is possible for code to wait for an unused handle by waiting
     121             :  * for in-flight IO to complete. There is a limited number of handles in each
     122             :  * backend, if multiple handles could be handed out without being submitted,
     123             :  * waiting for all in-flight IO to complete would not guarantee that handles
     124             :  * free up.
     125             :  *
     126             :  * It is cheap to acquire an IO handle, unless all handles are in use. In that
     127             :  * case this function waits for the oldest IO to complete. If that is not
     128             :  * desirable, use pgaio_io_acquire_nb().
     129             :  *
     130             :  * If a handle was acquired but then does not turn out to be needed,
     131             :  * e.g. because pgaio_io_acquire() is called before starting an IO in a
     132             :  * critical section, the handle needs to be released with pgaio_io_release().
     133             :  *
     134             :  *
     135             :  * To react to the completion of the IO as soon as it is known to have
     136             :  * completed, callbacks can be registered with pgaio_io_register_callbacks().
     137             :  *
     138             :  * To actually execute IO using the returned handle, the pgaio_io_start_*()
     139             :  * family of functions is used. In many cases the pgaio_io_start_*() call will
     140             :  * not be done directly by code that acquired the handle, but by lower level
     141             :  * code that gets passed the handle. E.g. if code in bufmgr.c wants to perform
     142             :  * AIO, it typically will pass the handle to smgr.c, which will pass it on to
     143             :  * md.c, on to fd.c, which then finally calls pgaio_io_start_*().  This
     144             :  * forwarding allows the various layers to react to the IO's completion by
     145             :  * registering callbacks. These callbacks in turn can translate a lower
     146             :  * layer's result into a result understandable by a higher layer.
     147             :  *
     148             :  * During pgaio_io_start_*() the IO is staged (i.e. prepared for execution but
     149             :  * not submitted to the kernel). Unless in batchmode
     150             :  * (c.f. pgaio_enter_batchmode()), the IO will also get submitted for
     151             :  * execution. Note that, whether in batchmode or not, the IO might even
     152             :  * complete before the functions return.
     153             :  *
     154             :  * After pgaio_io_start_*() the AioHandle is "consumed" and may not be
     155             :  * referenced by the IO issuing code. To e.g. wait for IO, references to the
     156             :  * IO can be established with pgaio_io_get_wref() *before* pgaio_io_start_*()
     157             :  * is called.  pgaio_wref_wait() can be used to wait for the IO to complete.
     158             :  *
     159             :  *
     160             :  * To know if the IO [partially] succeeded or failed, a PgAioReturn * can be
     161             :  * passed to pgaio_io_acquire(). Once the issuing backend has called
     162             :  * pgaio_wref_wait(), the PgAioReturn contains information about whether the
     163             :  * operation succeeded and details about the first failure, if any. The error
     164             :  * can be raised / logged with pgaio_result_report().
     165             :  *
     166             :  * The lifetime of the memory pointed to be *ret needs to be at least as long
     167             :  * as the passed in resowner. If the resowner releases resources before the IO
     168             :  * completes (typically due to an error), the reference to *ret will be
     169             :  * cleared. In case of resowner cleanup *ret will not be updated with the
     170             :  * results of the IO operation.
     171             :  */
     172             : PgAioHandle *
     173       10432 : pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret)
     174             : {
     175             :     PgAioHandle *h;
     176             : 
     177             :     while (true)
     178             :     {
     179       10432 :         h = pgaio_io_acquire_nb(resowner, ret);
     180             : 
     181       10428 :         if (h != NULL)
     182        5312 :             return h;
     183             : 
     184             :         /*
     185             :          * Evidently all handles by this backend are in use. Just wait for
     186             :          * some to complete.
     187             :          */
     188        5116 :         pgaio_io_wait_for_free();
     189             :     }
     190             : }
     191             : 
     192             : /*
     193             :  * Acquire an AioHandle, returning NULL if no handles are free.
     194             :  *
     195             :  * See pgaio_io_acquire(). The only difference is that this function will return
     196             :  * NULL if there are no idle handles, instead of blocking.
     197             :  */
     198             : PgAioHandle *
     199     2480958 : pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)
     200             : {
     201     2480958 :     if (pgaio_my_backend->num_staged_ios >= PGAIO_SUBMIT_BATCH_SIZE)
     202             :     {
     203             :         Assert(pgaio_my_backend->num_staged_ios == PGAIO_SUBMIT_BATCH_SIZE);
     204           0 :         pgaio_submit_staged();
     205             :     }
     206             : 
     207     2480958 :     if (pgaio_my_backend->handed_out_io)
     208           4 :         elog(ERROR, "API violation: Only one IO can be handed out");
     209             : 
     210     2480954 :     if (!dclist_is_empty(&pgaio_my_backend->idle_ios))
     211             :     {
     212     2470722 :         dlist_node *ion = dclist_pop_head_node(&pgaio_my_backend->idle_ios);
     213     2470722 :         PgAioHandle *ioh = dclist_container(PgAioHandle, node, ion);
     214             : 
     215             :         Assert(ioh->state == PGAIO_HS_IDLE);
     216             :         Assert(ioh->owner_procno == MyProcNumber);
     217             : 
     218     2470722 :         pgaio_io_update_state(ioh, PGAIO_HS_HANDED_OUT);
     219     2470722 :         pgaio_my_backend->handed_out_io = ioh;
     220             : 
     221     2470722 :         if (resowner)
     222     2470722 :             pgaio_io_resowner_register(ioh);
     223             : 
     224     2470722 :         if (ret)
     225             :         {
     226     2470670 :             ioh->report_return = ret;
     227     2470670 :             ret->result.status = PGAIO_RS_UNKNOWN;
     228             :         }
     229             : 
     230     2470722 :         return ioh;
     231             :     }
     232             : 
     233       10232 :     return NULL;
     234             : }
     235             : 
     236             : /*
     237             :  * Release IO handle that turned out to not be required.
     238             :  *
     239             :  * See pgaio_io_acquire() for more details.
     240             :  */
     241             : void
     242        5016 : pgaio_io_release(PgAioHandle *ioh)
     243             : {
     244        5016 :     if (ioh == pgaio_my_backend->handed_out_io)
     245             :     {
     246             :         Assert(ioh->state == PGAIO_HS_HANDED_OUT);
     247             :         Assert(ioh->resowner);
     248             : 
     249        5012 :         pgaio_my_backend->handed_out_io = NULL;
     250        5012 :         pgaio_io_reclaim(ioh);
     251             :     }
     252             :     else
     253             :     {
     254           4 :         elog(ERROR, "release in unexpected state");
     255             :     }
     256        5012 : }
     257             : 
     258             : /*
     259             :  * Release IO handle during resource owner cleanup.
     260             :  */
     261             : void
     262          86 : pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
     263             : {
     264          86 :     PgAioHandle *ioh = dlist_container(PgAioHandle, resowner_node, ioh_node);
     265             : 
     266             :     Assert(ioh->resowner);
     267             : 
     268          86 :     ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
     269          86 :     ioh->resowner = NULL;
     270             : 
     271          86 :     switch (ioh->state)
     272             :     {
     273           0 :         case PGAIO_HS_IDLE:
     274           0 :             elog(ERROR, "unexpected");
     275             :             break;
     276          66 :         case PGAIO_HS_HANDED_OUT:
     277             :             Assert(ioh == pgaio_my_backend->handed_out_io || pgaio_my_backend->handed_out_io == NULL);
     278             : 
     279          66 :             if (ioh == pgaio_my_backend->handed_out_io)
     280             :             {
     281          66 :                 pgaio_my_backend->handed_out_io = NULL;
     282          66 :                 if (!on_error)
     283          20 :                     elog(WARNING, "leaked AIO handle");
     284             :             }
     285             : 
     286          66 :             pgaio_io_reclaim(ioh);
     287          66 :             break;
     288           0 :         case PGAIO_HS_DEFINED:
     289             :         case PGAIO_HS_STAGED:
     290           0 :             if (!on_error)
     291           0 :                 elog(WARNING, "AIO handle was not submitted");
     292           0 :             pgaio_submit_staged();
     293           0 :             break;
     294          20 :         case PGAIO_HS_SUBMITTED:
     295             :         case PGAIO_HS_COMPLETED_IO:
     296             :         case PGAIO_HS_COMPLETED_SHARED:
     297             :         case PGAIO_HS_COMPLETED_LOCAL:
     298             :             /* this is expected to happen */
     299          20 :             break;
     300             :     }
     301             : 
     302             :     /*
     303             :      * Need to unregister the reporting of the IO's result, the memory it's
     304             :      * referencing likely has gone away.
     305             :      */
     306          86 :     if (ioh->report_return)
     307          20 :         ioh->report_return = NULL;
     308          86 : }
     309             : 
     310             : /*
     311             :  * Add a [set of] flags to the IO.
     312             :  *
     313             :  * Note that this combines flags with already set flags, rather than set flags
     314             :  * to explicitly the passed in parameters. This is to allow multiple callsites
     315             :  * to set flags.
     316             :  */
     317             : void
     318     4928446 : pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag)
     319             : {
     320             :     Assert(ioh->state == PGAIO_HS_HANDED_OUT);
     321             : 
     322     4928446 :     ioh->flags |= flag;
     323     4928446 : }
     324             : 
     325             : /*
     326             :  * Returns an ID uniquely identifying the IO handle. This is only really
     327             :  * useful for logging, as handles are reused across multiple IOs.
     328             :  */
     329             : int
     330     1209862 : pgaio_io_get_id(PgAioHandle *ioh)
     331             : {
     332             :     Assert(ioh >= pgaio_ctl->io_handles &&
     333             :            ioh < (pgaio_ctl->io_handles + pgaio_ctl->io_handle_count));
     334     1209862 :     return ioh - pgaio_ctl->io_handles;
     335             : }
     336             : 
     337             : /*
     338             :  * Return the ProcNumber for the process that can use an IO handle. The
     339             :  * mapping from IO handles to PGPROCs is static, therefore this even works
     340             :  * when the corresponding PGPROC is not in use.
     341             :  */
     342             : ProcNumber
     343           0 : pgaio_io_get_owner(PgAioHandle *ioh)
     344             : {
     345           0 :     return ioh->owner_procno;
     346             : }
     347             : 
     348             : /*
     349             :  * Return a wait reference for the IO. Only wait references can be used to
     350             :  * wait for an IOs completion, as handles themselves can be reused after
     351             :  * completion.  See also the comment above pgaio_io_acquire().
     352             :  */
     353             : void
     354     4931318 : pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow)
     355             : {
     356             :     Assert(ioh->state == PGAIO_HS_HANDED_OUT ||
     357             :            ioh->state == PGAIO_HS_DEFINED ||
     358             :            ioh->state == PGAIO_HS_STAGED);
     359             :     Assert(ioh->generation != 0);
     360             : 
     361     4931318 :     iow->aio_index = ioh - pgaio_ctl->io_handles;
     362     4931318 :     iow->generation_upper = (uint32) (ioh->generation >> 32);
     363     4931318 :     iow->generation_lower = (uint32) ioh->generation;
     364     4931318 : }
     365             : 
     366             : 
     367             : 
     368             : /* --------------------------------------------------------------------------------
     369             :  * Internal Functions related to PgAioHandle
     370             :  * --------------------------------------------------------------------------------
     371             :  */
     372             : 
     373             : static inline void
     374    19314784 : pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state)
     375             : {
     376    19314784 :     pgaio_debug_io(DEBUG5, ioh,
     377             :                    "updating state to %s",
     378             :                    pgaio_io_state_get_name(new_state));
     379             : 
     380             :     /*
     381             :      * Ensure the changes signified by the new state are visible before the
     382             :      * new state becomes visible.
     383             :      */
     384    19314784 :     pg_write_barrier();
     385             : 
     386    19314784 :     ioh->state = new_state;
     387    19314784 : }
     388             : 
     389             : static void
     390     2470722 : pgaio_io_resowner_register(PgAioHandle *ioh)
     391             : {
     392             :     Assert(!ioh->resowner);
     393             :     Assert(CurrentResourceOwner);
     394             : 
     395     2470722 :     ResourceOwnerRememberAioHandle(CurrentResourceOwner, &ioh->resowner_node);
     396     2470722 :     ioh->resowner = CurrentResourceOwner;
     397     2470722 : }
     398             : 
     399             : /*
     400             :  * Stage IO for execution and, if appropriate, submit it immediately.
     401             :  *
     402             :  * Should only be called from pgaio_io_start_*().
     403             :  */
     404             : void
     405     2465644 : pgaio_io_stage(PgAioHandle *ioh, PgAioOp op)
     406             : {
     407             :     bool        needs_synchronous;
     408             : 
     409             :     Assert(ioh->state == PGAIO_HS_HANDED_OUT);
     410             :     Assert(pgaio_my_backend->handed_out_io == ioh);
     411             :     Assert(pgaio_io_has_target(ioh));
     412             : 
     413     2465644 :     ioh->op = op;
     414     2465644 :     ioh->result = 0;
     415             : 
     416     2465644 :     pgaio_io_update_state(ioh, PGAIO_HS_DEFINED);
     417             : 
     418             :     /* allow a new IO to be staged */
     419     2465644 :     pgaio_my_backend->handed_out_io = NULL;
     420             : 
     421     2465644 :     pgaio_io_call_stage(ioh);
     422             : 
     423     2465644 :     pgaio_io_update_state(ioh, PGAIO_HS_STAGED);
     424             : 
     425             :     /*
     426             :      * Synchronous execution has to be executed, well, synchronously, so check
     427             :      * that first.
     428             :      */
     429     2465644 :     needs_synchronous = pgaio_io_needs_synchronous_execution(ioh);
     430             : 
     431     2465644 :     pgaio_debug_io(DEBUG3, ioh,
     432             :                    "staged (synchronous: %d, in_batch: %d)",
     433             :                    needs_synchronous, pgaio_my_backend->in_batchmode);
     434             : 
     435     2465644 :     if (!needs_synchronous)
     436             :     {
     437     1152156 :         pgaio_my_backend->staged_ios[pgaio_my_backend->num_staged_ios++] = ioh;
     438             :         Assert(pgaio_my_backend->num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
     439             : 
     440             :         /*
     441             :          * Unless code explicitly opted into batching IOs, submit the IO
     442             :          * immediately.
     443             :          */
     444     1152156 :         if (!pgaio_my_backend->in_batchmode)
     445       49078 :             pgaio_submit_staged();
     446             :     }
     447             :     else
     448             :     {
     449     1313488 :         pgaio_io_prepare_submit(ioh);
     450     1313488 :         pgaio_io_perform_synchronously(ioh);
     451             :     }
     452     2465644 : }
     453             : 
     454             : bool
     455     2465644 : pgaio_io_needs_synchronous_execution(PgAioHandle *ioh)
     456             : {
     457             :     /*
     458             :      * If the caller said to execute the IO synchronously, do so.
     459             :      *
     460             :      * XXX: We could optimize the logic when to execute synchronously by first
     461             :      * checking if there are other IOs in flight and only synchronously
     462             :      * executing if not. Unclear whether that'll be sufficiently common to be
     463             :      * worth worrying about.
     464             :      */
     465     2465644 :     if (ioh->flags & PGAIO_HF_SYNCHRONOUS)
     466     1305444 :         return true;
     467             : 
     468             :     /* Check if the IO method requires synchronous execution of IO */
     469     1160200 :     if (pgaio_method_ops->needs_synchronous_execution)
     470     1160200 :         return pgaio_method_ops->needs_synchronous_execution(ioh);
     471             : 
     472           0 :     return false;
     473             : }
     474             : 
     475             : /*
     476             :  * Handle IO being processed by IO method.
     477             :  *
     478             :  * Should be called by IO methods / synchronous IO execution, just before the
     479             :  * IO is performed.
     480             :  */
     481             : void
     482     2465644 : pgaio_io_prepare_submit(PgAioHandle *ioh)
     483             : {
     484     2465644 :     pgaio_io_update_state(ioh, PGAIO_HS_SUBMITTED);
     485             : 
     486     2465644 :     dclist_push_tail(&pgaio_my_backend->in_flight_ios, &ioh->node);
     487     2465644 : }
     488             : 
     489             : /*
     490             :  * Handle IO getting completed by a method.
     491             :  *
     492             :  * Should be called by IO methods / synchronous IO execution, just after the
     493             :  * IO has been performed.
     494             :  *
     495             :  * Expects to be called in a critical section. We expect IOs to be usable for
     496             :  * WAL etc, which requires being able to execute completion callbacks in a
     497             :  * critical section.
     498             :  */
     499             : void
     500     2255382 : pgaio_io_process_completion(PgAioHandle *ioh, int result)
     501             : {
     502             :     Assert(ioh->state == PGAIO_HS_SUBMITTED);
     503             : 
     504             :     Assert(CritSectionCount > 0);
     505             : 
     506     2255382 :     ioh->result = result;
     507             : 
     508     2255382 :     pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_IO);
     509             : 
     510     2255382 :     pgaio_io_call_inj(ioh, "aio-process-completion-before-shared");
     511             : 
     512     2255382 :     pgaio_io_call_complete_shared(ioh);
     513             : 
     514     2255382 :     pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_SHARED);
     515             : 
     516             :     /* condition variable broadcast ensures state is visible before wakeup */
     517     2255382 :     ConditionVariableBroadcast(&ioh->cv);
     518             : 
     519             :     /* contains call to pgaio_io_call_complete_local() */
     520     2255382 :     if (ioh->owner_procno == MyProcNumber)
     521     1313488 :         pgaio_io_reclaim(ioh);
     522     2255382 : }
     523             : 
     524             : /*
     525             :  * Has the IO completed and thus the IO handle been reused?
     526             :  *
     527             :  * This is useful when waiting for IO completion at a low level (e.g. in an IO
     528             :  * method's ->wait_one() callback).
     529             :  */
     530             : bool
     531     3593678 : pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
     532             : {
     533     3593678 :     *state = ioh->state;
     534     3593678 :     pg_read_barrier();
     535             : 
     536     3593678 :     return ioh->generation != ref_generation;
     537             : }
     538             : 
     539             : /*
     540             :  * Wait for IO to complete. External code should never use this, outside of
     541             :  * the AIO subsystem waits are only allowed via pgaio_wref_wait().
     542             :  */
     543             : static void
     544      490082 : pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation)
     545             : {
     546             :     PgAioHandleState state;
     547             :     bool        am_owner;
     548             : 
     549      490082 :     am_owner = ioh->owner_procno == MyProcNumber;
     550             : 
     551      490082 :     if (pgaio_io_was_recycled(ioh, ref_generation, &state))
     552          68 :         return;
     553             : 
     554      490014 :     if (am_owner)
     555             :     {
     556      485598 :         if (state != PGAIO_HS_SUBMITTED
     557      117596 :             && state != PGAIO_HS_COMPLETED_IO
     558         492 :             && state != PGAIO_HS_COMPLETED_SHARED
     559           0 :             && state != PGAIO_HS_COMPLETED_LOCAL)
     560             :         {
     561           0 :             elog(PANIC, "waiting for own IO in wrong state: %d",
     562             :                  state);
     563             :         }
     564             :     }
     565             : 
     566             :     while (true)
     567             :     {
     568      979284 :         if (pgaio_io_was_recycled(ioh, ref_generation, &state))
     569        2468 :             return;
     570             : 
     571      976816 :         switch (state)
     572             :         {
     573           0 :             case PGAIO_HS_IDLE:
     574             :             case PGAIO_HS_HANDED_OUT:
     575           0 :                 elog(ERROR, "IO in wrong state: %d", state);
     576             :                 break;
     577             : 
     578      370622 :             case PGAIO_HS_SUBMITTED:
     579             : 
     580             :                 /*
     581             :                  * If we need to wait via the IO method, do so now. Don't
     582             :                  * check via the IO method if the issuing backend is executing
     583             :                  * the IO synchronously.
     584             :                  */
     585      370622 :                 if (pgaio_method_ops->wait_one && !(ioh->flags & PGAIO_HF_SYNCHRONOUS))
     586             :                 {
     587           0 :                     pgaio_method_ops->wait_one(ioh, ref_generation);
     588           0 :                     continue;
     589             :                 }
     590             :                 /* fallthrough */
     591             : 
     592             :                 /* waiting for owner to submit */
     593             :             case PGAIO_HS_DEFINED:
     594             :             case PGAIO_HS_STAGED:
     595             :                 /* waiting for reaper to complete */
     596             :                 /* fallthrough */
     597             :             case PGAIO_HS_COMPLETED_IO:
     598             :                 /* shouldn't be able to hit this otherwise */
     599             :                 Assert(IsUnderPostmaster);
     600             :                 /* ensure we're going to get woken up */
     601      489270 :                 ConditionVariablePrepareToSleep(&ioh->cv);
     602             : 
     603      977322 :                 while (!pgaio_io_was_recycled(ioh, ref_generation, &state))
     604             :                 {
     605      974862 :                     if (state == PGAIO_HS_COMPLETED_SHARED ||
     606      488074 :                         state == PGAIO_HS_COMPLETED_LOCAL)
     607             :                         break;
     608      488052 :                     ConditionVariableSleep(&ioh->cv, WAIT_EVENT_AIO_IO_COMPLETION);
     609             :                 }
     610             : 
     611      489270 :                 ConditionVariableCancelSleep();
     612      489270 :                 break;
     613             : 
     614      487546 :             case PGAIO_HS_COMPLETED_SHARED:
     615             :             case PGAIO_HS_COMPLETED_LOCAL:
     616             :                 /* see above */
     617      487546 :                 if (am_owner)
     618      485598 :                     pgaio_io_reclaim(ioh);
     619      487546 :                 return;
     620             :         }
     621      489270 :     }
     622             : }
     623             : 
     624             : /*
     625             :  * Make IO handle ready to be reused after IO has completed or after the
     626             :  * handle has been released without being used.
     627             :  */
     628             : static void
     629     2470722 : pgaio_io_reclaim(PgAioHandle *ioh)
     630             : {
     631             :     /* This is only ok if it's our IO */
     632             :     Assert(ioh->owner_procno == MyProcNumber);
     633             :     Assert(ioh->state != PGAIO_HS_IDLE);
     634             : 
     635             :     /*
     636             :      * It's a bit ugly, but right now the easiest place to put the execution
     637             :      * of local completion callbacks is this function, as we need to execute
     638             :      * local callbacks just before reclaiming at multiple callsites.
     639             :      */
     640     2470722 :     if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
     641             :     {
     642             :         PgAioResult local_result;
     643             : 
     644     2465644 :         local_result = pgaio_io_call_complete_local(ioh);
     645     2465644 :         pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_LOCAL);
     646             : 
     647     2465644 :         if (ioh->report_return)
     648             :         {
     649     2465624 :             ioh->report_return->result = local_result;
     650     2465624 :             ioh->report_return->target_data = ioh->target_data;
     651             :         }
     652             :     }
     653             : 
     654     2470722 :     pgaio_debug_io(DEBUG4, ioh,
     655             :                    "reclaiming: distilled_result: (status %s, id %u, error_data %d), raw_result: %d",
     656             :                    pgaio_result_status_string(ioh->distilled_result.status),
     657             :                    ioh->distilled_result.id,
     658             :                    ioh->distilled_result.error_data,
     659             :                    ioh->result);
     660             : 
     661             :     /* if the IO has been defined, it's on the in-flight list, remove */
     662     2470722 :     if (ioh->state != PGAIO_HS_HANDED_OUT)
     663     2465644 :         dclist_delete_from(&pgaio_my_backend->in_flight_ios, &ioh->node);
     664             : 
     665     2470722 :     if (ioh->resowner)
     666             :     {
     667     2470636 :         ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
     668     2470636 :         ioh->resowner = NULL;
     669             :     }
     670             : 
     671             :     Assert(!ioh->resowner);
     672             : 
     673     2470722 :     ioh->op = PGAIO_OP_INVALID;
     674     2470722 :     ioh->target = PGAIO_TID_INVALID;
     675     2470722 :     ioh->flags = 0;
     676     2470722 :     ioh->num_callbacks = 0;
     677     2470722 :     ioh->handle_data_len = 0;
     678     2470722 :     ioh->report_return = NULL;
     679     2470722 :     ioh->result = 0;
     680     2470722 :     ioh->distilled_result.status = PGAIO_RS_UNKNOWN;
     681             : 
     682             :     /* XXX: the barrier is probably superfluous */
     683     2470722 :     pg_write_barrier();
     684     2470722 :     ioh->generation++;
     685             : 
     686     2470722 :     pgaio_io_update_state(ioh, PGAIO_HS_IDLE);
     687             : 
     688             :     /*
     689             :      * We push the IO to the head of the idle IO list, that seems more cache
     690             :      * efficient in cases where only a few IOs are used.
     691             :      */
     692     2470722 :     dclist_push_head(&pgaio_my_backend->idle_ios, &ioh->node);
     693     2470722 : }
     694             : 
     695             : /*
     696             :  * Wait for an IO handle to become usable.
     697             :  *
     698             :  * This only really is useful for pgaio_io_acquire().
     699             :  */
     700             : static void
     701        5116 : pgaio_io_wait_for_free(void)
     702             : {
     703        5116 :     int         reclaimed = 0;
     704             : 
     705        5116 :     pgaio_debug(DEBUG2, "waiting for self with %d pending",
     706             :                 pgaio_my_backend->num_staged_ios);
     707             : 
     708             :     /*
     709             :      * First check if any of our IOs actually have completed - when using
     710             :      * worker, that'll often be the case. We could do so as part of the loop
     711             :      * below, but that'd potentially lead us to wait for some IO submitted
     712             :      * before.
     713             :      */
     714       10232 :     for (int i = 0; i < io_max_concurrency; i++)
     715             :     {
     716        5116 :         PgAioHandle *ioh = &pgaio_ctl->io_handles[pgaio_my_backend->io_handle_off + i];
     717             : 
     718        5116 :         if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
     719             :         {
     720        4332 :             pgaio_io_reclaim(ioh);
     721        4332 :             reclaimed++;
     722             :         }
     723             :     }
     724             : 
     725        5116 :     if (reclaimed > 0)
     726        4332 :         return;
     727             : 
     728             :     /*
     729             :      * If we have any unsubmitted IOs, submit them now. We'll start waiting in
     730             :      * a second, so it's better they're in flight. This also addresses the
     731             :      * edge-case that all IOs are unsubmitted.
     732             :      */
     733         784 :     if (pgaio_my_backend->num_staged_ios > 0)
     734           0 :         pgaio_submit_staged();
     735             : 
     736         784 :     if (dclist_count(&pgaio_my_backend->in_flight_ios) == 0)
     737           0 :         elog(ERROR, "no free IOs despite no in-flight IOs");
     738             : 
     739             :     /*
     740             :      * Wait for the oldest in-flight IO to complete.
     741             :      *
     742             :      * XXX: Reusing the general IO wait is suboptimal, we don't need to wait
     743             :      * for that specific IO to complete, we just need *any* IO to complete.
     744             :      */
     745             :     {
     746         784 :         PgAioHandle *ioh = dclist_head_element(PgAioHandle, node,
     747             :                                                &pgaio_my_backend->in_flight_ios);
     748             : 
     749         784 :         switch (ioh->state)
     750             :         {
     751             :                 /* should not be in in-flight list */
     752           0 :             case PGAIO_HS_IDLE:
     753             :             case PGAIO_HS_DEFINED:
     754             :             case PGAIO_HS_HANDED_OUT:
     755             :             case PGAIO_HS_STAGED:
     756             :             case PGAIO_HS_COMPLETED_LOCAL:
     757           0 :                 elog(ERROR, "shouldn't get here with io:%d in state %d",
     758             :                      pgaio_io_get_id(ioh), ioh->state);
     759             :                 break;
     760             : 
     761         778 :             case PGAIO_HS_COMPLETED_IO:
     762             :             case PGAIO_HS_SUBMITTED:
     763         778 :                 pgaio_debug_io(DEBUG2, ioh,
     764             :                                "waiting for free io with %d in flight",
     765             :                                dclist_count(&pgaio_my_backend->in_flight_ios));
     766             : 
     767             :                 /*
     768             :                  * In a more general case this would be racy, because the
     769             :                  * generation could increase after we read ioh->state above.
     770             :                  * But we are only looking at IOs by the current backend and
     771             :                  * the IO can only be recycled by this backend.
     772             :                  */
     773         778 :                 pgaio_io_wait(ioh, ioh->generation);
     774         778 :                 break;
     775             : 
     776           6 :             case PGAIO_HS_COMPLETED_SHARED:
     777             :                 /* it's possible that another backend just finished this IO */
     778           6 :                 pgaio_io_reclaim(ioh);
     779           6 :                 break;
     780             :         }
     781             : 
     782         784 :         if (dclist_count(&pgaio_my_backend->idle_ios) == 0)
     783           0 :             elog(PANIC, "no idle IO after waiting for IO to terminate");
     784         784 :         return;
     785             :     }
     786             : }
     787             : 
     788             : /*
     789             :  * Internal - code outside of AIO should never need this and it'd be hard for
     790             :  * such code to be safe.
     791             :  */
     792             : static PgAioHandle *
     793     1636276 : pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation)
     794             : {
     795             :     PgAioHandle *ioh;
     796             : 
     797             :     Assert(iow->aio_index < pgaio_ctl->io_handle_count);
     798             : 
     799     1636276 :     ioh = &pgaio_ctl->io_handles[iow->aio_index];
     800             : 
     801     1636276 :     *ref_generation = ((uint64) iow->generation_upper) << 32 |
     802     1636276 :         iow->generation_lower;
     803             : 
     804             :     Assert(*ref_generation != 0);
     805             : 
     806     1636276 :     return ioh;
     807             : }
     808             : 
     809             : static const char *
     810       14270 : pgaio_io_state_get_name(PgAioHandleState s)
     811             : {
     812             : #define PGAIO_HS_TOSTR_CASE(sym) case PGAIO_HS_##sym: return #sym
     813       14270 :     switch (s)
     814             :     {
     815           0 :             PGAIO_HS_TOSTR_CASE(IDLE);
     816        4756 :             PGAIO_HS_TOSTR_CASE(HANDED_OUT);
     817        2378 :             PGAIO_HS_TOSTR_CASE(DEFINED);
     818        2378 :             PGAIO_HS_TOSTR_CASE(STAGED);
     819           0 :             PGAIO_HS_TOSTR_CASE(SUBMITTED);
     820        2380 :             PGAIO_HS_TOSTR_CASE(COMPLETED_IO);
     821        2378 :             PGAIO_HS_TOSTR_CASE(COMPLETED_SHARED);
     822           0 :             PGAIO_HS_TOSTR_CASE(COMPLETED_LOCAL);
     823             :     }
     824             : #undef PGAIO_HS_TOSTR_CASE
     825             : 
     826           0 :     return NULL;                /* silence compiler */
     827             : }
     828             : 
     829             : const char *
     830       14270 : pgaio_io_get_state_name(PgAioHandle *ioh)
     831             : {
     832       14270 :     return pgaio_io_state_get_name(ioh->state);
     833             : }
     834             : 
     835             : const char *
     836        4756 : pgaio_result_status_string(PgAioResultStatus rs)
     837             : {
     838        4756 :     switch (rs)
     839             :     {
     840           0 :         case PGAIO_RS_UNKNOWN:
     841           0 :             return "UNKNOWN";
     842        4396 :         case PGAIO_RS_OK:
     843        4396 :             return "OK";
     844         136 :         case PGAIO_RS_WARNING:
     845         136 :             return "WARNING";
     846          40 :         case PGAIO_RS_PARTIAL:
     847          40 :             return "PARTIAL";
     848         184 :         case PGAIO_RS_ERROR:
     849         184 :             return "ERROR";
     850             :     }
     851             : 
     852           0 :     return NULL;                /* silence compiler */
     853             : }
     854             : 
     855             : 
     856             : 
     857             : /* --------------------------------------------------------------------------------
     858             :  * Functions primarily related to IO Wait References
     859             :  * --------------------------------------------------------------------------------
     860             :  */
     861             : 
     862             : /*
     863             :  * Mark a wait reference as invalid
     864             :  */
     865             : void
     866    25718598 : pgaio_wref_clear(PgAioWaitRef *iow)
     867             : {
     868    25718598 :     iow->aio_index = PG_UINT32_MAX;
     869    25718598 : }
     870             : 
     871             : /* Is the wait reference valid? */
     872             : bool
     873     5036166 : pgaio_wref_valid(PgAioWaitRef *iow)
     874             : {
     875     5036166 :     return iow->aio_index != PG_UINT32_MAX;
     876             : }
     877             : 
     878             : /*
     879             :  * Similar to pgaio_io_get_id(), just for wait references.
     880             :  */
     881             : int
     882           0 : pgaio_wref_get_id(PgAioWaitRef *iow)
     883             : {
     884             :     Assert(pgaio_wref_valid(iow));
     885           0 :     return iow->aio_index;
     886             : }
     887             : 
     888             : /*
     889             :  * Wait for the IO to have completed. Can be called in any process, not just
     890             :  * in the issuing backend.
     891             :  */
     892             : void
     893      489286 : pgaio_wref_wait(PgAioWaitRef *iow)
     894             : {
     895             :     uint64      ref_generation;
     896             :     PgAioHandle *ioh;
     897             : 
     898      489286 :     ioh = pgaio_io_from_wref(iow, &ref_generation);
     899             : 
     900      489286 :     pgaio_io_wait(ioh, ref_generation);
     901      489286 : }
     902             : 
     903             : /*
     904             :  * Check if the referenced IO completed, without blocking.
     905             :  */
     906             : bool
     907     1146990 : pgaio_wref_check_done(PgAioWaitRef *iow)
     908             : {
     909             :     uint64      ref_generation;
     910             :     PgAioHandleState state;
     911             :     bool        am_owner;
     912             :     PgAioHandle *ioh;
     913             : 
     914     1146990 :     ioh = pgaio_io_from_wref(iow, &ref_generation);
     915             : 
     916     1146990 :     if (pgaio_io_was_recycled(ioh, ref_generation, &state))
     917           0 :         return true;
     918             : 
     919     1146990 :     if (state == PGAIO_HS_IDLE)
     920           0 :         return true;
     921             : 
     922     1146990 :     am_owner = ioh->owner_procno == MyProcNumber;
     923             : 
     924     1146990 :     if (state == PGAIO_HS_COMPLETED_SHARED ||
     925      484770 :         state == PGAIO_HS_COMPLETED_LOCAL)
     926             :     {
     927      662220 :         if (am_owner)
     928      662220 :             pgaio_io_reclaim(ioh);
     929      662220 :         return true;
     930             :     }
     931             : 
     932             :     /*
     933             :      * XXX: It likely would be worth checking in with the io method, to give
     934             :      * the IO method a chance to check if there are completion events queued.
     935             :      */
     936             : 
     937      484770 :     return false;
     938             : }
     939             : 
     940             : 
     941             : 
     942             : /* --------------------------------------------------------------------------------
     943             :  * Actions on multiple IOs.
     944             :  * --------------------------------------------------------------------------------
     945             :  */
     946             : 
     947             : /*
     948             :  * Submit IOs in batches going forward.
     949             :  *
     950             :  * Submitting multiple IOs at once can be substantially faster than doing so
     951             :  * one-by-one. At the same time, submitting multiple IOs at once requires more
     952             :  * care to avoid deadlocks.
     953             :  *
     954             :  * Consider backend A staging an IO for buffer 1 and then trying to start IO
     955             :  * on buffer 2, while backend B does the inverse. If A submitted the IO before
     956             :  * moving on to buffer 2, this works just fine, B will wait for the IO to
     957             :  * complete. But if batching were used, each backend will wait for IO that has
     958             :  * not yet been submitted to complete, i.e. forever.
     959             :  *
     960             :  * End batch submission mode with pgaio_exit_batchmode().  (Throwing errors is
     961             :  * allowed; error recovery will end the batch.)
     962             :  *
     963             :  * To avoid deadlocks, code needs to ensure that it will not wait for another
     964             :  * backend while there is unsubmitted IO. E.g. by using conditional lock
     965             :  * acquisition when acquiring buffer locks. To check if there currently are
     966             :  * staged IOs, call pgaio_have_staged() and to submit all staged IOs call
     967             :  * pgaio_submit_staged().
     968             :  *
     969             :  * It is not allowed to enter batchmode while already in batchmode, it's
     970             :  * unlikely to ever be needed, as code needs to be explicitly aware of being
     971             :  * called in batchmode, to avoid the deadlock risks explained above.
     972             :  *
     973             :  * Note that IOs may get submitted before pgaio_exit_batchmode() is called,
     974             :  * e.g. because too many IOs have been staged or because pgaio_submit_staged()
     975             :  * was called.
     976             :  */
     977             : void
     978     5413964 : pgaio_enter_batchmode(void)
     979             : {
     980     5413964 :     if (pgaio_my_backend->in_batchmode)
     981           0 :         elog(ERROR, "starting batch while batch already in progress");
     982     5413964 :     pgaio_my_backend->in_batchmode = true;
     983     5413964 : }
     984             : 
     985             : /*
     986             :  * Stop submitting IOs in batches.
     987             :  */
     988             : void
     989     5413944 : pgaio_exit_batchmode(void)
     990             : {
     991             :     Assert(pgaio_my_backend->in_batchmode);
     992             : 
     993     5413944 :     pgaio_submit_staged();
     994     5413944 :     pgaio_my_backend->in_batchmode = false;
     995     5413944 : }
     996             : 
     997             : /*
     998             :  * Are there staged but unsubmitted IOs?
     999             :  *
    1000             :  * See comment above pgaio_enter_batchmode() for why code may need to check if
    1001             :  * there is IO in that state.
    1002             :  */
    1003             : bool
    1004     2470526 : pgaio_have_staged(void)
    1005             : {
    1006             :     Assert(pgaio_my_backend->in_batchmode ||
    1007             :            pgaio_my_backend->num_staged_ios == 0);
    1008     2470526 :     return pgaio_my_backend->num_staged_ios > 0;
    1009             : }
    1010             : 
    1011             : /*
    1012             :  * Submit all staged but not yet submitted IOs.
    1013             :  *
    1014             :  * Unless in batch mode, this never needs to be called, as IOs get submitted
    1015             :  * as soon as possible. While in batchmode pgaio_submit_staged() can be called
    1016             :  * before waiting on another backend, to avoid the risk of deadlocks. See
    1017             :  * pgaio_enter_batchmode().
    1018             :  */
    1019             : void
    1020    21967064 : pgaio_submit_staged(void)
    1021             : {
    1022    21967064 :     int         total_submitted = 0;
    1023             :     int         did_submit;
    1024             : 
    1025    21967064 :     if (pgaio_my_backend->num_staged_ios == 0)
    1026    20816054 :         return;
    1027             : 
    1028             : 
    1029     1151010 :     START_CRIT_SECTION();
    1030             : 
    1031     1151010 :     did_submit = pgaio_method_ops->submit(pgaio_my_backend->num_staged_ios,
    1032     1151010 :                                           pgaio_my_backend->staged_ios);
    1033             : 
    1034     1151010 :     END_CRIT_SECTION();
    1035             : 
    1036     1151010 :     total_submitted += did_submit;
    1037             : 
    1038             :     Assert(total_submitted == did_submit);
    1039             : 
    1040     1151010 :     pgaio_my_backend->num_staged_ios = 0;
    1041             : 
    1042     1151010 :     pgaio_debug(DEBUG4,
    1043             :                 "aio: submitted %d IOs",
    1044             :                 total_submitted);
    1045             : }
    1046             : 
    1047             : 
    1048             : 
    1049             : /* --------------------------------------------------------------------------------
    1050             :  * Other
    1051             :  * --------------------------------------------------------------------------------
    1052             :  */
    1053             : 
    1054             : 
    1055             : /*
    1056             :  * Perform AIO related cleanup after an error.
    1057             :  *
    1058             :  * This should be called early in the error recovery paths, as later steps may
    1059             :  * need to issue AIO (e.g. to record a transaction abort WAL record).
    1060             :  */
    1061             : void
    1062       58292 : pgaio_error_cleanup(void)
    1063             : {
    1064             :     /*
    1065             :      * It is possible that code errored out after pgaio_enter_batchmode() but
    1066             :      * before pgaio_exit_batchmode() was called. In that case we need to
    1067             :      * submit the IO now.
    1068             :      */
    1069       58292 :     if (pgaio_my_backend->in_batchmode)
    1070             :     {
    1071          20 :         pgaio_my_backend->in_batchmode = false;
    1072             : 
    1073          20 :         pgaio_submit_staged();
    1074             :     }
    1075             : 
    1076             :     /*
    1077             :      * As we aren't in batchmode, there shouldn't be any unsubmitted IOs.
    1078             :      */
    1079             :     Assert(pgaio_my_backend->num_staged_ios == 0);
    1080       58292 : }
    1081             : 
    1082             : /*
    1083             :  * Perform AIO related checks at (sub-)transactional boundaries.
    1084             :  *
    1085             :  * This should be called late during (sub-)transactional commit/abort, after
    1086             :  * all steps that might need to perform AIO, so that we can verify that the
    1087             :  * AIO subsystem is in a valid state at the end of a transaction.
    1088             :  */
    1089             : void
    1090      915656 : AtEOXact_Aio(bool is_commit)
    1091             : {
    1092             :     /*
    1093             :      * We should never be in batch mode at transactional boundaries. In case
    1094             :      * an error was thrown while in batch mode, pgaio_error_cleanup() should
    1095             :      * have exited batchmode.
    1096             :      *
    1097             :      * In case we are in batchmode somehow, make sure to submit all staged
    1098             :      * IOs, other backends may need them to complete to continue.
    1099             :      */
    1100      915656 :     if (pgaio_my_backend->in_batchmode)
    1101             :     {
    1102           8 :         pgaio_error_cleanup();
    1103           8 :         elog(WARNING, "open AIO batch at end of (sub-)transaction");
    1104             :     }
    1105             : 
    1106             :     /*
    1107             :      * As we aren't in batchmode, there shouldn't be any unsubmitted IOs.
    1108             :      */
    1109             :     Assert(pgaio_my_backend->num_staged_ios == 0);
    1110      915656 : }
    1111             : 
    1112             : /*
    1113             :  * Need to submit staged but not yet submitted IOs using the fd, otherwise
    1114             :  * the IO would end up targeting something bogus.
    1115             :  */
    1116             : void
    1117    16512764 : pgaio_closing_fd(int fd)
    1118             : {
    1119             :     /*
    1120             :      * Might be called before AIO is initialized or in a subprocess that
    1121             :      * doesn't use AIO.
    1122             :      */
    1123    16512764 :     if (!pgaio_my_backend)
    1124       13858 :         return;
    1125             : 
    1126             :     /*
    1127             :      * For now just submit all staged IOs - we could be more selective, but
    1128             :      * it's probably not worth it.
    1129             :      */
    1130    16498906 :     pgaio_submit_staged();
    1131             : 
    1132             :     /*
    1133             :      * If requested by the IO method, wait for all IOs that use the
    1134             :      * to-be-closed FD.
    1135             :      */
    1136    16498906 :     if (pgaio_method_ops->wait_on_fd_before_close)
    1137             :     {
    1138             :         /*
    1139             :          * As waiting for one IO to complete may complete multiple IOs, we
    1140             :          * can't just use a mutable list iterator. The maximum number of
    1141             :          * in-flight IOs is fairly small, so just restart the loop after
    1142             :          * waiting for an IO.
    1143             :          */
    1144           0 :         while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
    1145             :         {
    1146             :             dlist_iter  iter;
    1147           0 :             PgAioHandle *ioh = NULL;
    1148             : 
    1149           0 :             dclist_foreach(iter, &pgaio_my_backend->in_flight_ios)
    1150             :             {
    1151           0 :                 ioh = dclist_container(PgAioHandle, node, iter.cur);
    1152             : 
    1153           0 :                 if (pgaio_io_uses_fd(ioh, fd))
    1154           0 :                     break;
    1155             :                 else
    1156           0 :                     ioh = NULL;
    1157             :             }
    1158             : 
    1159           0 :             if (!ioh)
    1160           0 :                 break;
    1161             : 
    1162             :             /* see comment in pgaio_io_wait_for_free() about raciness */
    1163           0 :             pgaio_io_wait(ioh, ioh->generation);
    1164             :         }
    1165             :     }
    1166             : }
    1167             : 
    1168             : /*
    1169             :  * Registered as before_shmem_exit() callback in pgaio_init_backend()
    1170             :  */
    1171             : void
    1172       40082 : pgaio_shutdown(int code, Datum arg)
    1173             : {
    1174             :     Assert(pgaio_my_backend);
    1175             :     Assert(!pgaio_my_backend->handed_out_io);
    1176             : 
    1177             :     /* first clean up resources as we would at a transaction boundary */
    1178       40082 :     AtEOXact_Aio(code == 0);
    1179             : 
    1180             :     /*
    1181             :      * Before exiting, make sure that all IOs are finished. That has two main
    1182             :      * purposes:
    1183             :      *
    1184             :      * - Some kernel-level AIO mechanisms don't deal well with the issuer of
    1185             :      * an AIO exiting before IO completed
    1186             :      *
    1187             :      * - It'd be confusing to see partially finished IOs in stats views etc
    1188             :      */
    1189       40100 :     while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
    1190             :     {
    1191          18 :         PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios);
    1192             : 
    1193             :         /* see comment in pgaio_io_wait_for_free() about raciness */
    1194          18 :         pgaio_io_wait(ioh, ioh->generation);
    1195             :     }
    1196             : 
    1197       40082 :     pgaio_my_backend = NULL;
    1198       40082 : }
    1199             : 
    1200             : void
    1201        2190 : assign_io_method(int newval, void *extra)
    1202             : {
    1203             :     Assert(pgaio_method_ops_table[newval] != NULL);
    1204             :     Assert(newval < lengthof(io_method_options));
    1205             : 
    1206        2190 :     pgaio_method_ops = pgaio_method_ops_table[newval];
    1207        2190 : }
    1208             : 
    1209             : bool
    1210        4264 : check_io_max_concurrency(int *newval, void **extra, GucSource source)
    1211             : {
    1212        4264 :     if (*newval == -1)
    1213             :     {
    1214             :         /*
    1215             :          * Auto-tuning will be applied later during startup, as auto-tuning
    1216             :          * depends on the value of various GUCs.
    1217             :          */
    1218        2168 :         return true;
    1219             :     }
    1220        2096 :     else if (*newval == 0)
    1221             :     {
    1222           0 :         GUC_check_errdetail("Only -1 or values bigger than 0 are valid.");
    1223           0 :         return false;
    1224             :     }
    1225             : 
    1226        2096 :     return true;
    1227             : }
    1228             : 
    1229             : 
    1230             : 
    1231             : /* --------------------------------------------------------------------------------
    1232             :  * Injection point support
    1233             :  * --------------------------------------------------------------------------------
    1234             :  */
    1235             : 
    1236             : #ifdef USE_INJECTION_POINTS
    1237             : 
    1238             : /*
    1239             :  * Call injection point with support for pgaio_inj_io_get().
    1240             :  */
    1241             : void
    1242     3197276 : pgaio_io_call_inj(PgAioHandle *ioh, const char *injection_point)
    1243             : {
    1244     3197276 :     pgaio_inj_cur_handle = ioh;
    1245             : 
    1246     3197276 :     PG_TRY();
    1247             :     {
    1248     3197276 :         InjectionPointCached(injection_point);
    1249             :     }
    1250           2 :     PG_FINALLY();
    1251             :     {
    1252     3197276 :         pgaio_inj_cur_handle = NULL;
    1253             :     }
    1254     3197276 :     PG_END_TRY();
    1255     3197274 : }
    1256             : 
    1257             : /*
    1258             :  * Return IO associated with injection point invocation. This is only needed
    1259             :  * as injection points currently don't support arguments.
    1260             :  */
    1261             : PgAioHandle *
    1262          96 : pgaio_inj_io_get(void)
    1263             : {
    1264          96 :     return pgaio_inj_cur_handle;
    1265             : }
    1266             : 
    1267             : #endif

Generated by: LCOV version 1.14