LCOV - code coverage report
Current view: top level - src/backend/storage/aio - aio_callback.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 61 76 80.3 %
Date: 2025-04-01 15:15:16 Functions: 6 8 75.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * aio_callback.c
       4             :  *    AIO - Functionality related to callbacks that can be registered on IO
       5             :  *    Handles
       6             :  *
       7             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/storage/aio/aio_callback.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres.h"
      17             : 
      18             : #include "miscadmin.h"
      19             : #include "storage/aio.h"
      20             : #include "storage/aio_internal.h"
      21             : #include "storage/bufmgr.h"
      22             : #include "storage/md.h"
      23             : 
      24             : 
      25             : /* just to have something to put into aio_handle_cbs */
      26             : static const PgAioHandleCallbacks aio_invalid_cb = {0};
      27             : 
      28             : typedef struct PgAioHandleCallbacksEntry
      29             : {
      30             :     const PgAioHandleCallbacks *const cb;
      31             :     const char *const name;
      32             : } PgAioHandleCallbacksEntry;
      33             : 
      34             : /*
      35             :  * Callback definition for the callbacks that can be registered on an IO
      36             :  * handle.  See PgAioHandleCallbackID's definition for an explanation for why
      37             :  * callbacks are not identified by a pointer.
      38             :  */
      39             : static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
      40             : #define CALLBACK_ENTRY(id, callback)  [id] = {.cb = &callback, .name = #callback}
      41             :     CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
      42             : 
      43             :     CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
      44             : 
      45             :     CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
      46             : 
      47             :     CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb),
      48             : #undef CALLBACK_ENTRY
      49             : };
      50             : 
      51             : 
      52             : 
      53             : /* --------------------------------------------------------------------------------
      54             :  * Public callback related functions operating on IO Handles
      55             :  * --------------------------------------------------------------------------------
      56             :  */
      57             : 
      58             : /*
      59             :  * Register callback for the IO handle.
      60             :  *
      61             :  * Only a limited number (PGAIO_HANDLE_MAX_CALLBACKS) of callbacks can be
      62             :  * registered for each IO.
      63             :  *
      64             :  * Callbacks need to be registered before [indirectly] calling
      65             :  * pgaio_io_start_*(), as the IO may be executed immediately.
      66             :  *
      67             :  * A callback can be passed a small bit of data, e.g. to indicate whether to
      68             :  * zero a buffer if it is invalid.
      69             :  *
      70             :  *
      71             :  * Note that callbacks are executed in critical sections.  This is necessary
      72             :  * to be able to execute IO in critical sections (consider e.g. WAL
      73             :  * logging). To perform AIO we first need to acquire a handle, which, if there
      74             :  * are no free handles, requires waiting for IOs to complete and to execute
      75             :  * their completion callbacks.
      76             :  *
      77             :  * Callbacks may be executed in the issuing backend but also in another
      78             :  * backend (because that backend is waiting for the IO) or in IO workers (if
      79             :  * io_method=worker is used).
      80             :  *
      81             :  *
      82             :  * See PgAioHandleCallbackID's definition for an explanation for why
      83             :  * callbacks are not identified by a pointer.
      84             :  */
      85             : void
      86     4846622 : pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id,
      87             :                             uint8 cb_data)
      88             : {
      89     4846622 :     const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
      90             : 
      91             :     Assert(cb_id <= PGAIO_HCB_MAX);
      92     4846622 :     if (cb_id >= lengthof(aio_handle_cbs))
      93           0 :         elog(ERROR, "callback %d is out of range", cb_id);
      94     4846622 :     if (aio_handle_cbs[cb_id].cb->complete_shared == NULL &&
      95        3492 :         aio_handle_cbs[cb_id].cb->complete_local == NULL)
      96           0 :         elog(ERROR, "callback %d does not have a completion callback", cb_id);
      97     4846622 :     if (ioh->num_callbacks >= PGAIO_HANDLE_MAX_CALLBACKS)
      98           0 :         elog(PANIC, "too many callbacks, the max is %d",
      99             :              PGAIO_HANDLE_MAX_CALLBACKS);
     100     4846622 :     ioh->callbacks[ioh->num_callbacks] = cb_id;
     101     4846622 :     ioh->callbacks_data[ioh->num_callbacks] = cb_data;
     102             : 
     103     4846622 :     pgaio_debug_io(DEBUG3, ioh,
     104             :                    "adding cb #%d, id %d/%s",
     105             :                    ioh->num_callbacks + 1,
     106             :                    cb_id, ce->name);
     107             : 
     108     4846622 :     ioh->num_callbacks++;
     109     4846622 : }
     110             : 
     111             : /*
     112             :  * Associate an array of data with the Handle. This is e.g. useful to the
     113             :  * transport knowledge about which buffers a multi-block IO affects to
     114             :  * completion callbacks.
     115             :  *
     116             :  * Right now this can be done only once for each IO, even though multiple
     117             :  * callbacks can be registered. There aren't any known usecases requiring more
     118             :  * and the required amount of shared memory does add up, so it doesn't seem
     119             :  * worth multiplying memory usage by PGAIO_HANDLE_MAX_CALLBACKS.
     120             :  */
     121             : void
     122           0 : pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len)
     123             : {
     124             :     Assert(ioh->state == PGAIO_HS_HANDED_OUT);
     125             :     Assert(ioh->handle_data_len == 0);
     126             :     Assert(len <= PG_IOV_MAX);
     127             : 
     128           0 :     for (int i = 0; i < len; i++)
     129           0 :         pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
     130           0 :     ioh->handle_data_len = len;
     131           0 : }
     132             : 
     133             : /*
     134             :  * Convenience version of pgaio_io_set_handle_data_64() that converts a 32bit
     135             :  * array to a 64bit array. Without it callers would end up needing to
     136             :  * open-code equivalent code.
     137             :  */
     138             : void
     139     2423326 : pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len)
     140             : {
     141             :     Assert(ioh->state == PGAIO_HS_HANDED_OUT);
     142             :     Assert(ioh->handle_data_len == 0);
     143             :     Assert(len <= PG_IOV_MAX);
     144             : 
     145     5168484 :     for (int i = 0; i < len; i++)
     146     2745158 :         pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
     147     2423326 :     ioh->handle_data_len = len;
     148     2423326 : }
     149             : 
     150             : /*
     151             :  * Return data set with pgaio_io_set_handle_data_*().
     152             :  */
     153             : uint64 *
     154     4637008 : pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len)
     155             : {
     156             :     Assert(ioh->handle_data_len > 0);
     157             : 
     158     4637008 :     *len = ioh->handle_data_len;
     159             : 
     160     4637008 :     return &pgaio_ctl->handle_data[ioh->iovec_off];
     161             : }
     162             : 
     163             : 
     164             : 
     165             : /* --------------------------------------------------------------------------------
     166             :  * Public IO Result related functions
     167             :  * --------------------------------------------------------------------------------
     168             :  */
     169             : 
     170             : void
     171           0 : pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel)
     172             : {
     173           0 :     PgAioHandleCallbackID cb_id = result.id;
     174           0 :     const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
     175             : 
     176             :     Assert(result.status != PGAIO_RS_UNKNOWN);
     177             :     Assert(result.status != PGAIO_RS_OK);
     178             : 
     179           0 :     if (ce->cb->report == NULL)
     180           0 :         elog(ERROR, "callback %d/%s does not have report callback",
     181             :              result.id, ce->name);
     182             : 
     183           0 :     ce->cb->report(result, target_data, elevel);
     184           0 : }
     185             : 
     186             : 
     187             : 
     188             : /* --------------------------------------------------------------------------------
     189             :  * Internal callback related functions operating on IO Handles
     190             :  * --------------------------------------------------------------------------------
     191             :  */
     192             : 
     193             : /*
     194             :  * Internal function which invokes ->stage for all the registered callbacks.
     195             :  */
     196             : void
     197     2423296 : pgaio_io_call_stage(PgAioHandle *ioh)
     198             : {
     199             :     Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
     200             :     Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
     201             : 
     202     7269888 :     for (int i = ioh->num_callbacks; i > 0; i--)
     203             :     {
     204     4846592 :         PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
     205     4846592 :         uint8       cb_data = ioh->callbacks_data[i - 1];
     206     4846592 :         const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
     207             : 
     208     4846592 :         if (!ce->cb->stage)
     209     2423296 :             continue;
     210             : 
     211     2423296 :         pgaio_debug_io(DEBUG3, ioh,
     212             :                        "calling cb #%d %d/%s->stage(%u)",
     213             :                        i, cb_id, ce->name, cb_data);
     214     2423296 :         ce->cb->stage(ioh, cb_data);
     215             :     }
     216     2423296 : }
     217             : 
     218             : /*
     219             :  * Internal function which invokes ->complete_shared for all the registered
     220             :  * callbacks.
     221             :  */
     222             : void
     223     2213712 : pgaio_io_call_complete_shared(PgAioHandle *ioh)
     224             : {
     225             :     PgAioResult result;
     226             : 
     227     2213712 :     START_CRIT_SECTION();
     228             : 
     229             :     Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
     230             :     Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
     231             : 
     232     2213712 :     result.status = PGAIO_RS_OK;    /* low level IO is always considered OK */
     233     2213712 :     result.result = ioh->result;
     234     2213712 :     result.id = PGAIO_HCB_INVALID;
     235     2213712 :     result.error_data = 0;
     236             : 
     237             :     /*
     238             :      * Call callbacks with the last registered (innermost) callback first.
     239             :      * Each callback can modify the result forwarded to the next callback.
     240             :      */
     241     6641136 :     for (int i = ioh->num_callbacks; i > 0; i--)
     242             :     {
     243     4427424 :         PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
     244     4427424 :         uint8       cb_data = ioh->callbacks_data[i - 1];
     245     4427424 :         const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
     246             : 
     247     4427424 :         if (!ce->cb->complete_shared)
     248        3492 :             continue;
     249             : 
     250     4423932 :         pgaio_debug_io(DEBUG4, ioh,
     251             :                        "calling cb #%d, id %d/%s->complete_shared(%u) with distilled result: (status %s, id %u, error_data %d, result %d)",
     252             :                        i, cb_id, ce->name,
     253             :                        cb_data,
     254             :                        pgaio_result_status_string(result.status),
     255             :                        result.id, result.error_data, result.result);
     256     4423932 :         result = ce->cb->complete_shared(ioh, result, cb_data);
     257             :     }
     258             : 
     259     2213712 :     ioh->distilled_result = result;
     260             : 
     261     2213712 :     pgaio_debug_io(DEBUG3, ioh,
     262             :                    "after shared completion: distilled result: (status %s, id %u, error_data: %d, result %d), raw_result: %d",
     263             :                    pgaio_result_status_string(result.status),
     264             :                    result.id, result.error_data, result.result,
     265             :                    ioh->result);
     266             : 
     267     2213712 :     END_CRIT_SECTION();
     268     2213712 : }
     269             : 
     270             : /*
     271             :  * Internal function which invokes ->complete_local for all the registered
     272             :  * callbacks.
     273             :  *
     274             :  * Returns ioh->distilled_result after, possibly, being modified by local
     275             :  * callbacks.
     276             :  *
     277             :  * XXX: It'd be nice to deduplicate with pgaio_io_call_complete_shared().
     278             :  */
     279             : PgAioResult
     280     2423296 : pgaio_io_call_complete_local(PgAioHandle *ioh)
     281             : {
     282             :     PgAioResult result;
     283             : 
     284     2423296 :     START_CRIT_SECTION();
     285             : 
     286             :     Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
     287             :     Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
     288             : 
     289             :     /* start with distilled result from shared callback */
     290     2423296 :     result = ioh->distilled_result;
     291             : 
     292     7269888 :     for (int i = ioh->num_callbacks; i > 0; i--)
     293             :     {
     294     4846592 :         PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
     295     4846592 :         uint8       cb_data = ioh->callbacks_data[i - 1];
     296     4846592 :         const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
     297             : 
     298     4846592 :         if (!ce->cb->complete_local)
     299     2423296 :             continue;
     300             : 
     301     2423296 :         pgaio_debug_io(DEBUG4, ioh,
     302             :                        "calling cb #%d, id %d/%s->complete_local(%u) with distilled result: status %s, id %u, error_data %d, result %d",
     303             :                        i, cb_id, ce->name, cb_data,
     304             :                        pgaio_result_status_string(result.status),
     305             :                        result.id, result.error_data, result.result);
     306     2423296 :         result = ce->cb->complete_local(ioh, result, cb_data);
     307             :     }
     308             : 
     309             :     /*
     310             :      * Note that we don't save the result in ioh->distilled_result, the local
     311             :      * callback's result should not ever matter to other waiters. However, the
     312             :      * local backend does care, so we return the result as modified by local
     313             :      * callbacks, which then can be passed to ioh->report_return->result.
     314             :      */
     315     2423296 :     pgaio_debug_io(DEBUG3, ioh,
     316             :                    "after local completion: result: (status %s, id %u, error_data %d, result %d), raw_result: %d",
     317             :                    pgaio_result_status_string(result.status),
     318             :                    result.id, result.error_data, result.result,
     319             :                    ioh->result);
     320             : 
     321     2423296 :     END_CRIT_SECTION();
     322             : 
     323     2423296 :     return result;
     324             : }

Generated by: LCOV version 1.14