Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * aio_callback.c
4 : * AIO - Functionality related to callbacks that can be registered on IO
5 : * Handles
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/aio/aio_callback.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "miscadmin.h"
19 : #include "storage/aio.h"
20 : #include "storage/aio_internal.h"
21 : #include "storage/bufmgr.h"
22 : #include "storage/md.h"
23 :
24 :
25 : /* just to have something to put into aio_handle_cbs */
26 : static const PgAioHandleCallbacks aio_invalid_cb = {0};
27 :
28 : typedef struct PgAioHandleCallbacksEntry
29 : {
30 : const PgAioHandleCallbacks *const cb;
31 : const char *const name;
32 : } PgAioHandleCallbacksEntry;
33 :
34 : /*
35 : * Callback definition for the callbacks that can be registered on an IO
36 : * handle. See PgAioHandleCallbackID's definition for an explanation for why
37 : * callbacks are not identified by a pointer.
38 : */
39 : static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
40 : #define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback}
41 : CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
42 :
43 : CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
44 :
45 : CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
46 :
47 : CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb),
48 : #undef CALLBACK_ENTRY
49 : };
50 :
51 :
52 :
53 : /* --------------------------------------------------------------------------------
54 : * Public callback related functions operating on IO Handles
55 : * --------------------------------------------------------------------------------
56 : */
57 :
58 : /*
59 : * Register callback for the IO handle.
60 : *
61 : * Only a limited number (PGAIO_HANDLE_MAX_CALLBACKS) of callbacks can be
62 : * registered for each IO.
63 : *
64 : * Callbacks need to be registered before [indirectly] calling
65 : * pgaio_io_start_*(), as the IO may be executed immediately.
66 : *
67 : * A callback can be passed a small bit of data, e.g. to indicate whether to
68 : * zero a buffer if it is invalid.
69 : *
70 : *
71 : * Note that callbacks are executed in critical sections. This is necessary
72 : * to be able to execute IO in critical sections (consider e.g. WAL
73 : * logging). To perform AIO we first need to acquire a handle, which, if there
74 : * are no free handles, requires waiting for IOs to complete and to execute
75 : * their completion callbacks.
76 : *
77 : * Callbacks may be executed in the issuing backend but also in another
78 : * backend (because that backend is waiting for the IO) or in IO workers (if
79 : * io_method=worker is used).
80 : *
81 : *
82 : * See PgAioHandleCallbackID's definition for an explanation for why
83 : * callbacks are not identified by a pointer.
84 : */
85 : void
86 4793570 : pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id,
87 : uint8 cb_data)
88 : {
89 4793570 : const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
90 :
91 : Assert(cb_id <= PGAIO_HCB_MAX);
92 4793570 : if (cb_id >= lengthof(aio_handle_cbs))
93 0 : elog(ERROR, "callback %d is out of range", cb_id);
94 4793570 : if (aio_handle_cbs[cb_id].cb->complete_shared == NULL &&
95 3582 : aio_handle_cbs[cb_id].cb->complete_local == NULL)
96 0 : elog(ERROR, "callback %d does not have a completion callback", cb_id);
97 4793570 : if (ioh->num_callbacks >= PGAIO_HANDLE_MAX_CALLBACKS)
98 0 : elog(PANIC, "too many callbacks, the max is %d",
99 : PGAIO_HANDLE_MAX_CALLBACKS);
100 4793570 : ioh->callbacks[ioh->num_callbacks] = cb_id;
101 4793570 : ioh->callbacks_data[ioh->num_callbacks] = cb_data;
102 :
103 4793570 : pgaio_debug_io(DEBUG3, ioh,
104 : "adding cb #%d, id %d/%s",
105 : ioh->num_callbacks + 1,
106 : cb_id, ce->name);
107 :
108 4793570 : ioh->num_callbacks++;
109 4793570 : }
110 :
111 : /*
112 : * Associate an array of data with the Handle. This is e.g. useful to the
113 : * transport knowledge about which buffers a multi-block IO affects to
114 : * completion callbacks.
115 : *
116 : * Right now this can be done only once for each IO, even though multiple
117 : * callbacks can be registered. There aren't any known usecases requiring more
118 : * and the required amount of shared memory does add up, so it doesn't seem
119 : * worth multiplying memory usage by PGAIO_HANDLE_MAX_CALLBACKS.
120 : */
121 : void
122 0 : pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len)
123 : {
124 : Assert(ioh->state == PGAIO_HS_HANDED_OUT);
125 : Assert(ioh->handle_data_len == 0);
126 : Assert(len <= PG_IOV_MAX);
127 : Assert(len <= io_max_combine_limit);
128 :
129 0 : for (int i = 0; i < len; i++)
130 0 : pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
131 0 : ioh->handle_data_len = len;
132 0 : }
133 :
134 : /*
135 : * Convenience version of pgaio_io_set_handle_data_64() that converts a 32bit
136 : * array to a 64bit array. Without it callers would end up needing to
137 : * open-code equivalent code.
138 : */
139 : void
140 2396800 : pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len)
141 : {
142 : Assert(ioh->state == PGAIO_HS_HANDED_OUT);
143 : Assert(ioh->handle_data_len == 0);
144 : Assert(len <= PG_IOV_MAX);
145 : Assert(len <= io_max_combine_limit);
146 :
147 5118762 : for (int i = 0; i < len; i++)
148 2721962 : pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
149 2396800 : ioh->handle_data_len = len;
150 2396800 : }
151 :
152 : /*
153 : * Return data set with pgaio_io_set_handle_data_*().
154 : */
155 : uint64 *
156 4584974 : pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len)
157 : {
158 : Assert(ioh->handle_data_len > 0);
159 :
160 4584974 : *len = ioh->handle_data_len;
161 :
162 4584974 : return &pgaio_ctl->handle_data[ioh->iovec_off];
163 : }
164 :
165 :
166 :
167 : /* --------------------------------------------------------------------------------
168 : * Public IO Result related functions
169 : * --------------------------------------------------------------------------------
170 : */
171 :
172 : void
173 600 : pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel)
174 : {
175 600 : PgAioHandleCallbackID cb_id = result.id;
176 600 : const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
177 :
178 : Assert(result.status != PGAIO_RS_UNKNOWN);
179 : Assert(result.status != PGAIO_RS_OK);
180 :
181 600 : if (ce->cb->report == NULL)
182 0 : elog(ERROR, "callback %d/%s does not have report callback",
183 : result.id, ce->name);
184 :
185 600 : ce->cb->report(result, target_data, elevel);
186 498 : }
187 :
188 :
189 :
190 : /* --------------------------------------------------------------------------------
191 : * Internal callback related functions operating on IO Handles
192 : * --------------------------------------------------------------------------------
193 : */
194 :
195 : /*
196 : * Internal function which invokes ->stage for all the registered callbacks.
197 : */
198 : void
199 2396770 : pgaio_io_call_stage(PgAioHandle *ioh)
200 : {
201 : Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
202 : Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
203 :
204 7190310 : for (int i = ioh->num_callbacks; i > 0; i--)
205 : {
206 4793540 : PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
207 4793540 : uint8 cb_data = ioh->callbacks_data[i - 1];
208 4793540 : const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
209 :
210 4793540 : if (!ce->cb->stage)
211 2396770 : continue;
212 :
213 2396770 : pgaio_debug_io(DEBUG3, ioh,
214 : "calling cb #%d %d/%s->stage(%u)",
215 : i, cb_id, ce->name, cb_data);
216 2396770 : ce->cb->stage(ioh, cb_data);
217 : }
218 2396770 : }
219 :
220 : /*
221 : * Internal function which invokes ->complete_shared for all the registered
222 : * callbacks.
223 : */
224 : void
225 2188204 : pgaio_io_call_complete_shared(PgAioHandle *ioh)
226 : {
227 : PgAioResult result;
228 :
229 2188204 : START_CRIT_SECTION();
230 :
231 : Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
232 : Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
233 :
234 2188204 : result.status = PGAIO_RS_OK; /* low level IO is always considered OK */
235 2188204 : result.result = ioh->result;
236 2188204 : result.id = PGAIO_HCB_INVALID;
237 2188204 : result.error_data = 0;
238 :
239 : /*
240 : * Call callbacks with the last registered (innermost) callback first.
241 : * Each callback can modify the result forwarded to the next callback.
242 : */
243 6564612 : for (int i = ioh->num_callbacks; i > 0; i--)
244 : {
245 4376408 : PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
246 4376408 : uint8 cb_data = ioh->callbacks_data[i - 1];
247 4376408 : const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
248 :
249 4376408 : if (!ce->cb->complete_shared)
250 3582 : continue;
251 :
252 4372826 : pgaio_debug_io(DEBUG4, ioh,
253 : "calling cb #%d, id %d/%s->complete_shared(%u) with distilled result: (status %s, id %u, error_data %d, result %d)",
254 : i, cb_id, ce->name,
255 : cb_data,
256 : pgaio_result_status_string(result.status),
257 : result.id, result.error_data, result.result);
258 4372826 : result = ce->cb->complete_shared(ioh, result, cb_data);
259 : }
260 :
261 2188204 : ioh->distilled_result = result;
262 :
263 2188204 : pgaio_debug_io(DEBUG3, ioh,
264 : "after shared completion: distilled result: (status %s, id %u, error_data: %d, result %d), raw_result: %d",
265 : pgaio_result_status_string(result.status),
266 : result.id, result.error_data, result.result,
267 : ioh->result);
268 :
269 2188204 : END_CRIT_SECTION();
270 2188204 : }
271 :
272 : /*
273 : * Internal function which invokes ->complete_local for all the registered
274 : * callbacks.
275 : *
276 : * Returns ioh->distilled_result after, possibly, being modified by local
277 : * callbacks.
278 : *
279 : * XXX: It'd be nice to deduplicate with pgaio_io_call_complete_shared().
280 : */
281 : PgAioResult
282 2396770 : pgaio_io_call_complete_local(PgAioHandle *ioh)
283 : {
284 : PgAioResult result;
285 :
286 2396770 : START_CRIT_SECTION();
287 :
288 : Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
289 : Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
290 :
291 : /* start with distilled result from shared callback */
292 2396770 : result = ioh->distilled_result;
293 :
294 7190310 : for (int i = ioh->num_callbacks; i > 0; i--)
295 : {
296 4793540 : PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
297 4793540 : uint8 cb_data = ioh->callbacks_data[i - 1];
298 4793540 : const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
299 :
300 4793540 : if (!ce->cb->complete_local)
301 2396770 : continue;
302 :
303 2396770 : pgaio_debug_io(DEBUG4, ioh,
304 : "calling cb #%d, id %d/%s->complete_local(%u) with distilled result: status %s, id %u, error_data %d, result %d",
305 : i, cb_id, ce->name, cb_data,
306 : pgaio_result_status_string(result.status),
307 : result.id, result.error_data, result.result);
308 2396770 : result = ce->cb->complete_local(ioh, result, cb_data);
309 : }
310 :
311 : /*
312 : * Note that we don't save the result in ioh->distilled_result, the local
313 : * callback's result should not ever matter to other waiters. However, the
314 : * local backend does care, so we return the result as modified by local
315 : * callbacks, which then can be passed to ioh->report_return->result.
316 : */
317 2396770 : pgaio_debug_io(DEBUG3, ioh,
318 : "after local completion: result: (status %s, id %u, error_data %d, result %d), raw_result: %d",
319 : pgaio_result_status_string(result.status),
320 : result.id, result.error_data, result.result,
321 : ioh->result);
322 :
323 2396770 : END_CRIT_SECTION();
324 :
325 2396770 : return result;
326 : }
|