Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * aio.c
4 : * AIO - Core Logic
5 : *
6 : * For documentation about how AIO works on a higher level, including a
7 : * schematic example, see README.md.
8 : *
9 : *
10 : * AIO is a complicated subsystem. To keep things navigable, it is split
11 : * across a number of files:
12 : *
13 : * - method_*.c - different ways of executing AIO (e.g. worker process)
14 : *
15 : * - aio_target.c - IO on different kinds of targets
16 : *
17 : * - aio_io.c - method-independent code for specific IO ops (e.g. readv)
18 : *
19 : * - aio_callback.c - callbacks at IO operation lifecycle events
20 : *
21 : * - aio_init.c - per-server and per-backend initialization
22 : *
23 : * - aio.c - all other topics
24 : *
25 : * - read_stream.c - helper for reading buffered relation data
26 : *
27 : *
28 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
29 : * Portions Copyright (c) 1994, Regents of the University of California
30 : *
31 : * IDENTIFICATION
32 : * src/backend/storage/aio/aio.c
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include "lib/ilist.h"
40 : #include "miscadmin.h"
41 : #include "port/atomics.h"
42 : #include "storage/aio.h"
43 : #include "storage/aio_internal.h"
44 : #include "storage/aio_subsys.h"
45 : #include "utils/guc.h"
46 : #include "utils/guc_hooks.h"
47 : #include "utils/resowner.h"
48 : #include "utils/wait_event_types.h"
49 :
50 : #ifdef USE_INJECTION_POINTS
51 : #include "utils/injection_point.h"
52 : #endif
53 :
54 :
55 : static inline void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state);
56 : static void pgaio_io_reclaim(PgAioHandle *ioh);
57 : static void pgaio_io_resowner_register(PgAioHandle *ioh);
58 : static void pgaio_io_wait_for_free(void);
59 : static PgAioHandle *pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation);
60 : static const char *pgaio_io_state_get_name(PgAioHandleState s);
61 : static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation);
62 :
63 :
64 : /* Options for io_method. */
65 : const struct config_enum_entry io_method_options[] = {
66 : {"sync", IOMETHOD_SYNC, false},
67 : {"worker", IOMETHOD_WORKER, false},
68 : #ifdef IOMETHOD_IO_URING_ENABLED
69 : {"io_uring", IOMETHOD_IO_URING, false},
70 : #endif
71 : {NULL, 0, false}
72 : };
73 :
74 : /* GUCs */
75 : int io_method = DEFAULT_IO_METHOD;
76 : int io_max_concurrency = -1;
77 :
78 : /* global control for AIO */
79 : PgAioCtl *pgaio_ctl;
80 :
81 : /* current backend's per-backend state */
82 : PgAioBackend *pgaio_my_backend;
83 :
84 :
85 : static const IoMethodOps *const pgaio_method_ops_table[] = {
86 : [IOMETHOD_SYNC] = &pgaio_sync_ops,
87 : [IOMETHOD_WORKER] = &pgaio_worker_ops,
88 : #ifdef IOMETHOD_IO_URING_ENABLED
89 : [IOMETHOD_IO_URING] = &pgaio_uring_ops,
90 : #endif
91 : };
92 :
93 : /* callbacks for the configured io_method, set by assign_io_method */
94 : const IoMethodOps *pgaio_method_ops;
95 :
96 :
97 : /*
98 : * Currently there's no infrastructure to pass arguments to injection points,
99 : * so we instead set this up for the duration of the injection point
100 : * invocation. See pgaio_io_call_inj().
101 : */
102 : #ifdef USE_INJECTION_POINTS
103 : static PgAioHandle *pgaio_inj_cur_handle;
104 : #endif
105 :
106 :
107 :
108 : /* --------------------------------------------------------------------------------
109 : * Public Functions related to PgAioHandle
110 : * --------------------------------------------------------------------------------
111 : */
112 :
113 : /*
114 : * Acquire an AioHandle, waiting for IO completion if necessary.
115 : *
116 : * Each backend can only have one AIO handle that has been "handed out" to
117 : * code, but not yet submitted or released. This restriction is necessary to
118 : * ensure that it is possible for code to wait for an unused handle by waiting
119 : * for in-flight IO to complete. There is a limited number of handles in each
120 : * backend, if multiple handles could be handed out without being submitted,
121 : * waiting for all in-flight IO to complete would not guarantee that handles
122 : * free up.
123 : *
124 : * It is cheap to acquire an IO handle, unless all handles are in use. In that
125 : * case this function waits for the oldest IO to complete. If that is not
126 : * desirable, use pgaio_io_acquire_nb().
127 : *
128 : * If a handle was acquired but then does not turn out to be needed,
129 : * e.g. because pgaio_io_acquire() is called before starting an IO in a
130 : * critical section, the handle needs to be released with pgaio_io_release().
131 : *
132 : *
133 : * To react to the completion of the IO as soon as it is known to have
134 : * completed, callbacks can be registered with pgaio_io_register_callbacks().
135 : *
136 : * To actually execute IO using the returned handle, the pgaio_io_start_*()
137 : * family of functions is used. In many cases the pgaio_io_start_*() call will
138 : * not be done directly by code that acquired the handle, but by lower level
139 : * code that gets passed the handle. E.g. if code in bufmgr.c wants to perform
140 : * AIO, it typically will pass the handle to smgr.c, which will pass it on to
141 : * md.c, on to fd.c, which then finally calls pgaio_io_start_*(). This
142 : * forwarding allows the various layers to react to the IO's completion by
143 : * registering callbacks. These callbacks in turn can translate a lower
144 : * layer's result into a result understandable by a higher layer.
145 : *
146 : * During pgaio_io_start_*() the IO is staged (i.e. prepared for execution but
147 : * not submitted to the kernel). Unless in batchmode
148 : * (c.f. pgaio_enter_batchmode()), the IO will also get submitted for
149 : * execution. Note that, whether in batchmode or not, the IO might even
150 : * complete before the functions return.
151 : *
152 : * After pgaio_io_start_*() the AioHandle is "consumed" and may not be
153 : * referenced by the IO issuing code. To e.g. wait for IO, references to the
154 : * IO can be established with pgaio_io_get_wref() *before* pgaio_io_start_*()
155 : * is called. pgaio_wref_wait() can be used to wait for the IO to complete.
156 : *
157 : *
158 : * To know if the IO [partially] succeeded or failed, a PgAioReturn * can be
159 : * passed to pgaio_io_acquire(). Once the issuing backend has called
160 : * pgaio_wref_wait(), the PgAioReturn contains information about whether the
161 : * operation succeeded and details about the first failure, if any. The error
162 : * can be raised / logged with pgaio_result_report().
163 : *
164 : * The lifetime of the memory pointed to be *ret needs to be at least as long
165 : * as the passed in resowner. If the resowner releases resources before the IO
166 : * completes (typically due to an error), the reference to *ret will be
167 : * cleared. In case of resowner cleanup *ret will not be updated with the
168 : * results of the IO operation.
169 : */
170 : PgAioHandle *
171 10776 : pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret)
172 : {
173 : PgAioHandle *h;
174 :
175 : while (true)
176 : {
177 10776 : h = pgaio_io_acquire_nb(resowner, ret);
178 :
179 10776 : if (h != NULL)
180 5388 : return h;
181 :
182 : /*
183 : * Evidently all handles by this backend are in use. Just wait for
184 : * some to complete.
185 : */
186 5388 : pgaio_io_wait_for_free();
187 : }
188 : }
189 :
190 : /*
191 : * Acquire an AioHandle, returning NULL if no handles are free.
192 : *
193 : * See pgaio_io_acquire(). The only difference is that this function will return
194 : * NULL if there are no idle handles, instead of blocking.
195 : */
196 : PgAioHandle *
197 2438952 : pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)
198 : {
199 2438952 : if (pgaio_my_backend->num_staged_ios >= PGAIO_SUBMIT_BATCH_SIZE)
200 : {
201 : Assert(pgaio_my_backend->num_staged_ios == PGAIO_SUBMIT_BATCH_SIZE);
202 0 : pgaio_submit_staged();
203 : }
204 :
205 2438952 : if (pgaio_my_backend->handed_out_io)
206 0 : elog(ERROR, "API violation: Only one IO can be handed out");
207 :
208 2438952 : if (!dclist_is_empty(&pgaio_my_backend->idle_ios))
209 : {
210 2428176 : dlist_node *ion = dclist_pop_head_node(&pgaio_my_backend->idle_ios);
211 2428176 : PgAioHandle *ioh = dclist_container(PgAioHandle, node, ion);
212 :
213 : Assert(ioh->state == PGAIO_HS_IDLE);
214 : Assert(ioh->owner_procno == MyProcNumber);
215 :
216 2428176 : pgaio_io_update_state(ioh, PGAIO_HS_HANDED_OUT);
217 2428176 : pgaio_my_backend->handed_out_io = ioh;
218 :
219 2428176 : if (resowner)
220 2428176 : pgaio_io_resowner_register(ioh);
221 :
222 2428176 : if (ret)
223 : {
224 2428176 : ioh->report_return = ret;
225 2428176 : ret->result.status = PGAIO_RS_UNKNOWN;
226 : }
227 :
228 2428176 : return ioh;
229 : }
230 :
231 10776 : return NULL;
232 : }
233 :
234 : /*
235 : * Release IO handle that turned out to not be required.
236 : *
237 : * See pgaio_io_acquire() for more details.
238 : */
239 : void
240 4850 : pgaio_io_release(PgAioHandle *ioh)
241 : {
242 4850 : if (ioh == pgaio_my_backend->handed_out_io)
243 : {
244 : Assert(ioh->state == PGAIO_HS_HANDED_OUT);
245 : Assert(ioh->resowner);
246 :
247 4850 : pgaio_my_backend->handed_out_io = NULL;
248 4850 : pgaio_io_reclaim(ioh);
249 : }
250 : else
251 : {
252 0 : elog(ERROR, "release in unexpected state");
253 : }
254 4850 : }
255 :
256 : /*
257 : * Release IO handle during resource owner cleanup.
258 : */
259 : void
260 34 : pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
261 : {
262 34 : PgAioHandle *ioh = dlist_container(PgAioHandle, resowner_node, ioh_node);
263 :
264 : Assert(ioh->resowner);
265 :
266 34 : ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
267 34 : ioh->resowner = NULL;
268 :
269 34 : switch (ioh->state)
270 : {
271 0 : case PGAIO_HS_IDLE:
272 0 : elog(ERROR, "unexpected");
273 : break;
274 30 : case PGAIO_HS_HANDED_OUT:
275 : Assert(ioh == pgaio_my_backend->handed_out_io || pgaio_my_backend->handed_out_io == NULL);
276 :
277 30 : if (ioh == pgaio_my_backend->handed_out_io)
278 : {
279 30 : pgaio_my_backend->handed_out_io = NULL;
280 30 : if (!on_error)
281 0 : elog(WARNING, "leaked AIO handle");
282 : }
283 :
284 30 : pgaio_io_reclaim(ioh);
285 30 : break;
286 0 : case PGAIO_HS_DEFINED:
287 : case PGAIO_HS_STAGED:
288 0 : if (!on_error)
289 0 : elog(WARNING, "AIO handle was not submitted");
290 0 : pgaio_submit_staged();
291 0 : break;
292 4 : case PGAIO_HS_SUBMITTED:
293 : case PGAIO_HS_COMPLETED_IO:
294 : case PGAIO_HS_COMPLETED_SHARED:
295 : case PGAIO_HS_COMPLETED_LOCAL:
296 : /* this is expected to happen */
297 4 : break;
298 : }
299 :
300 : /*
301 : * Need to unregister the reporting of the IO's result, the memory it's
302 : * referencing likely has gone away.
303 : */
304 34 : if (ioh->report_return)
305 4 : ioh->report_return = NULL;
306 34 : }
307 :
308 : /*
309 : * Add a [set of] flags to the IO.
310 : *
311 : * Note that this combines flags with already set flags, rather than set flags
312 : * to explicitly the passed in parameters. This is to allow multiple callsites
313 : * to set flags.
314 : */
315 : void
316 4843836 : pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag)
317 : {
318 : Assert(ioh->state == PGAIO_HS_HANDED_OUT);
319 :
320 4843836 : ioh->flags |= flag;
321 4843836 : }
322 :
323 : /*
324 : * Returns an ID uniquely identifying the IO handle. This is only really
325 : * useful for logging, as handles are reused across multiple IOs.
326 : */
327 : int
328 1182350 : pgaio_io_get_id(PgAioHandle *ioh)
329 : {
330 : Assert(ioh >= pgaio_ctl->io_handles &&
331 : ioh < (pgaio_ctl->io_handles + pgaio_ctl->io_handle_count));
332 1182350 : return ioh - pgaio_ctl->io_handles;
333 : }
334 :
335 : /*
336 : * Return the ProcNumber for the process that can use an IO handle. The
337 : * mapping from IO handles to PGPROCs is static, therefore this even works
338 : * when the corresponding PGPROC is not in use.
339 : */
340 : ProcNumber
341 0 : pgaio_io_get_owner(PgAioHandle *ioh)
342 : {
343 0 : return ioh->owner_procno;
344 : }
345 :
346 : /*
347 : * Return a wait reference for the IO. Only wait references can be used to
348 : * wait for an IOs completion, as handles themselves can be reused after
349 : * completion. See also the comment above pgaio_io_acquire().
350 : */
351 : void
352 4846622 : pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow)
353 : {
354 : Assert(ioh->state == PGAIO_HS_HANDED_OUT ||
355 : ioh->state == PGAIO_HS_DEFINED ||
356 : ioh->state == PGAIO_HS_STAGED);
357 : Assert(ioh->generation != 0);
358 :
359 4846622 : iow->aio_index = ioh - pgaio_ctl->io_handles;
360 4846622 : iow->generation_upper = (uint32) (ioh->generation >> 32);
361 4846622 : iow->generation_lower = (uint32) ioh->generation;
362 4846622 : }
363 :
364 :
365 :
366 : /* --------------------------------------------------------------------------------
367 : * Internal Functions related to PgAioHandle
368 : * --------------------------------------------------------------------------------
369 : */
370 :
371 : static inline void
372 18976960 : pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state)
373 : {
374 18976960 : pgaio_debug_io(DEBUG5, ioh,
375 : "updating state to %s",
376 : pgaio_io_state_get_name(new_state));
377 :
378 : /*
379 : * Ensure the changes signified by the new state are visible before the
380 : * new state becomes visible.
381 : */
382 18976960 : pg_write_barrier();
383 :
384 18976960 : ioh->state = new_state;
385 18976960 : }
386 :
387 : static void
388 2428176 : pgaio_io_resowner_register(PgAioHandle *ioh)
389 : {
390 : Assert(!ioh->resowner);
391 : Assert(CurrentResourceOwner);
392 :
393 2428176 : ResourceOwnerRememberAioHandle(CurrentResourceOwner, &ioh->resowner_node);
394 2428176 : ioh->resowner = CurrentResourceOwner;
395 2428176 : }
396 :
397 : /*
398 : * Stage IO for execution and, if appropriate, submit it immediately.
399 : *
400 : * Should only be called from pgaio_io_start_*().
401 : */
402 : void
403 2423296 : pgaio_io_stage(PgAioHandle *ioh, PgAioOp op)
404 : {
405 : bool needs_synchronous;
406 :
407 : Assert(ioh->state == PGAIO_HS_HANDED_OUT);
408 : Assert(pgaio_my_backend->handed_out_io == ioh);
409 : Assert(pgaio_io_has_target(ioh));
410 :
411 2423296 : ioh->op = op;
412 2423296 : ioh->result = 0;
413 :
414 2423296 : pgaio_io_update_state(ioh, PGAIO_HS_DEFINED);
415 :
416 : /* allow a new IO to be staged */
417 2423296 : pgaio_my_backend->handed_out_io = NULL;
418 :
419 2423296 : pgaio_io_call_stage(ioh);
420 :
421 2423296 : pgaio_io_update_state(ioh, PGAIO_HS_STAGED);
422 :
423 : /*
424 : * Synchronous execution has to be executed, well, synchronously, so check
425 : * that first.
426 : */
427 2423296 : needs_synchronous = pgaio_io_needs_synchronous_execution(ioh);
428 :
429 2423296 : pgaio_debug_io(DEBUG3, ioh,
430 : "staged (synchronous: %d, in_batch: %d)",
431 : needs_synchronous, pgaio_my_backend->in_batchmode);
432 :
433 2423296 : if (!needs_synchronous)
434 : {
435 1182350 : pgaio_my_backend->staged_ios[pgaio_my_backend->num_staged_ios++] = ioh;
436 : Assert(pgaio_my_backend->num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
437 :
438 : /*
439 : * Unless code explicitly opted into batching IOs, submit the IO
440 : * immediately.
441 : */
442 1182350 : if (!pgaio_my_backend->in_batchmode)
443 203868 : pgaio_submit_staged();
444 : }
445 : else
446 : {
447 1240946 : pgaio_io_prepare_submit(ioh);
448 1240946 : pgaio_io_perform_synchronously(ioh);
449 : }
450 2423296 : }
451 :
452 : bool
453 2423296 : pgaio_io_needs_synchronous_execution(PgAioHandle *ioh)
454 : {
455 : /*
456 : * If the caller said to execute the IO synchronously, do so.
457 : *
458 : * XXX: We could optimize the logic when to execute synchronously by first
459 : * checking if there are other IOs in flight and only synchronously
460 : * executing if not. Unclear whether that'll be sufficiently common to be
461 : * worth worrying about.
462 : */
463 2423296 : if (ioh->flags & PGAIO_HF_SYNCHRONOUS)
464 1233132 : return true;
465 :
466 : /* Check if the IO method requires synchronous execution of IO */
467 1190164 : if (pgaio_method_ops->needs_synchronous_execution)
468 1190164 : return pgaio_method_ops->needs_synchronous_execution(ioh);
469 :
470 0 : return false;
471 : }
472 :
473 : /*
474 : * Handle IO being processed by IO method.
475 : *
476 : * Should be called by IO methods / synchronous IO execution, just before the
477 : * IO is performed.
478 : */
479 : void
480 2423296 : pgaio_io_prepare_submit(PgAioHandle *ioh)
481 : {
482 2423296 : pgaio_io_update_state(ioh, PGAIO_HS_SUBMITTED);
483 :
484 2423296 : dclist_push_tail(&pgaio_my_backend->in_flight_ios, &ioh->node);
485 2423296 : }
486 :
487 : /*
488 : * Handle IO getting completed by a method.
489 : *
490 : * Should be called by IO methods / synchronous IO execution, just after the
491 : * IO has been performed.
492 : *
493 : * Expects to be called in a critical section. We expect IOs to be usable for
494 : * WAL etc, which requires being able to execute completion callbacks in a
495 : * critical section.
496 : */
497 : void
498 2213712 : pgaio_io_process_completion(PgAioHandle *ioh, int result)
499 : {
500 : Assert(ioh->state == PGAIO_HS_SUBMITTED);
501 :
502 : Assert(CritSectionCount > 0);
503 :
504 2213712 : ioh->result = result;
505 :
506 2213712 : pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_IO);
507 :
508 2213712 : pgaio_io_call_inj(ioh, "AIO_PROCESS_COMPLETION_BEFORE_SHARED");
509 :
510 2213712 : pgaio_io_call_complete_shared(ioh);
511 :
512 2213712 : pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_SHARED);
513 :
514 : /* condition variable broadcast ensures state is visible before wakeup */
515 2213712 : ConditionVariableBroadcast(&ioh->cv);
516 :
517 : /* contains call to pgaio_io_call_complete_local() */
518 2213712 : if (ioh->owner_procno == MyProcNumber)
519 1240946 : pgaio_io_reclaim(ioh);
520 2213712 : }
521 :
522 : /*
523 : * Has the IO completed and thus the IO handle been reused?
524 : *
525 : * This is useful when waiting for IO completion at a low level (e.g. in an IO
526 : * method's ->wait_one() callback).
527 : */
528 : bool
529 3925002 : pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
530 : {
531 3925002 : *state = ioh->state;
532 3925002 : pg_read_barrier();
533 :
534 3925002 : return ioh->generation != ref_generation;
535 : }
536 :
537 : /*
538 : * Wait for IO to complete. External code should never use this, outside of
539 : * the AIO subsystem waits are only allowed via pgaio_wref_wait().
540 : */
541 : static void
542 550370 : pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation)
543 : {
544 : PgAioHandleState state;
545 : bool am_owner;
546 :
547 550370 : am_owner = ioh->owner_procno == MyProcNumber;
548 :
549 550370 : if (pgaio_io_was_recycled(ioh, ref_generation, &state))
550 2 : return;
551 :
552 550368 : if (am_owner)
553 : {
554 546252 : if (state != PGAIO_HS_SUBMITTED
555 125142 : && state != PGAIO_HS_COMPLETED_IO
556 526 : && state != PGAIO_HS_COMPLETED_SHARED
557 0 : && state != PGAIO_HS_COMPLETED_LOCAL)
558 : {
559 0 : elog(PANIC, "waiting for own IO in wrong state: %d",
560 : state);
561 : }
562 : }
563 :
564 : while (true)
565 : {
566 1099932 : if (pgaio_io_was_recycled(ioh, ref_generation, &state))
567 2370 : return;
568 :
569 1097562 : switch (state)
570 : {
571 0 : case PGAIO_HS_IDLE:
572 : case PGAIO_HS_HANDED_OUT:
573 0 : elog(ERROR, "IO in wrong state: %d", state);
574 : break;
575 :
576 423510 : case PGAIO_HS_SUBMITTED:
577 :
578 : /*
579 : * If we need to wait via the IO method, do so now. Don't
580 : * check via the IO method if the issuing backend is executing
581 : * the IO synchronously.
582 : */
583 423510 : if (pgaio_method_ops->wait_one && !(ioh->flags & PGAIO_HF_SYNCHRONOUS))
584 : {
585 0 : pgaio_method_ops->wait_one(ioh, ref_generation);
586 0 : continue;
587 : }
588 : /* fallthrough */
589 :
590 : /* waiting for owner to submit */
591 : case PGAIO_HS_DEFINED:
592 : case PGAIO_HS_STAGED:
593 : /* waiting for reaper to complete */
594 : /* fallthrough */
595 : case PGAIO_HS_COMPLETED_IO:
596 : /* shouldn't be able to hit this otherwise */
597 : Assert(IsUnderPostmaster);
598 : /* ensure we're going to get woken up */
599 549564 : ConditionVariablePrepareToSleep(&ioh->cv);
600 :
601 1097740 : while (!pgaio_io_was_recycled(ioh, ref_generation, &state))
602 : {
603 1095402 : if (state == PGAIO_HS_COMPLETED_SHARED ||
604 548234 : state == PGAIO_HS_COMPLETED_LOCAL)
605 : break;
606 548176 : ConditionVariableSleep(&ioh->cv, WAIT_EVENT_AIO_IO_COMPLETION);
607 : }
608 :
609 549564 : ConditionVariableCancelSleep();
610 549564 : break;
611 :
612 547998 : case PGAIO_HS_COMPLETED_SHARED:
613 : case PGAIO_HS_COMPLETED_LOCAL:
614 : /* see above */
615 547998 : if (am_owner)
616 546252 : pgaio_io_reclaim(ioh);
617 547998 : return;
618 : }
619 549564 : }
620 : }
621 :
622 : /*
623 : * Make IO handle ready to be reused after IO has completed or after the
624 : * handle has been released without being used.
625 : */
626 : static void
627 2428176 : pgaio_io_reclaim(PgAioHandle *ioh)
628 : {
629 : /* This is only ok if it's our IO */
630 : Assert(ioh->owner_procno == MyProcNumber);
631 : Assert(ioh->state != PGAIO_HS_IDLE);
632 :
633 : /*
634 : * It's a bit ugly, but right now the easiest place to put the execution
635 : * of local completion callbacks is this function, as we need to execute
636 : * local callbacks just before reclaiming at multiple callsites.
637 : */
638 2428176 : if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
639 : {
640 : PgAioResult local_result;
641 :
642 2423296 : local_result = pgaio_io_call_complete_local(ioh);
643 2423296 : pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_LOCAL);
644 :
645 2423296 : if (ioh->report_return)
646 : {
647 2423292 : ioh->report_return->result = local_result;
648 2423292 : ioh->report_return->target_data = ioh->target_data;
649 : }
650 : }
651 :
652 2428176 : pgaio_debug_io(DEBUG4, ioh,
653 : "reclaiming: distilled_result: (status %s, id %u, error_data %d), raw_result: %d",
654 : pgaio_result_status_string(ioh->distilled_result.status),
655 : ioh->distilled_result.id,
656 : ioh->distilled_result.error_data,
657 : ioh->result);
658 :
659 : /* if the IO has been defined, it's on the in-flight list, remove */
660 2428176 : if (ioh->state != PGAIO_HS_HANDED_OUT)
661 2423296 : dclist_delete_from(&pgaio_my_backend->in_flight_ios, &ioh->node);
662 :
663 2428176 : if (ioh->resowner)
664 : {
665 2428142 : ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
666 2428142 : ioh->resowner = NULL;
667 : }
668 :
669 : Assert(!ioh->resowner);
670 :
671 2428176 : ioh->op = PGAIO_OP_INVALID;
672 2428176 : ioh->target = PGAIO_TID_INVALID;
673 2428176 : ioh->flags = 0;
674 2428176 : ioh->num_callbacks = 0;
675 2428176 : ioh->handle_data_len = 0;
676 2428176 : ioh->report_return = NULL;
677 2428176 : ioh->result = 0;
678 2428176 : ioh->distilled_result.status = PGAIO_RS_UNKNOWN;
679 :
680 : /* XXX: the barrier is probably superfluous */
681 2428176 : pg_write_barrier();
682 2428176 : ioh->generation++;
683 :
684 2428176 : pgaio_io_update_state(ioh, PGAIO_HS_IDLE);
685 :
686 : /*
687 : * We push the IO to the head of the idle IO list, that seems more cache
688 : * efficient in cases where only a few IOs are used.
689 : */
690 2428176 : dclist_push_head(&pgaio_my_backend->idle_ios, &ioh->node);
691 2428176 : }
692 :
693 : /*
694 : * Wait for an IO handle to become usable.
695 : *
696 : * This only really is useful for pgaio_io_acquire().
697 : */
698 : static void
699 5388 : pgaio_io_wait_for_free(void)
700 : {
701 5388 : int reclaimed = 0;
702 :
703 5388 : pgaio_debug(DEBUG2, "waiting for self with %d pending",
704 : pgaio_my_backend->num_staged_ios);
705 :
706 : /*
707 : * First check if any of our IOs actually have completed - when using
708 : * worker, that'll often be the case. We could do so as part of the loop
709 : * below, but that'd potentially lead us to wait for some IO submitted
710 : * before.
711 : */
712 10776 : for (int i = 0; i < io_max_concurrency; i++)
713 : {
714 5388 : PgAioHandle *ioh = &pgaio_ctl->io_handles[pgaio_my_backend->io_handle_off + i];
715 :
716 5388 : if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
717 : {
718 4412 : pgaio_io_reclaim(ioh);
719 4412 : reclaimed++;
720 : }
721 : }
722 :
723 5388 : if (reclaimed > 0)
724 4412 : return;
725 :
726 : /*
727 : * If we have any unsubmitted IOs, submit them now. We'll start waiting in
728 : * a second, so it's better they're in flight. This also addresses the
729 : * edge-case that all IOs are unsubmitted.
730 : */
731 976 : if (pgaio_my_backend->num_staged_ios > 0)
732 0 : pgaio_submit_staged();
733 :
734 976 : if (dclist_count(&pgaio_my_backend->in_flight_ios) == 0)
735 0 : elog(ERROR, "no free IOs despite no in-flight IOs");
736 :
737 : /*
738 : * Wait for the oldest in-flight IO to complete.
739 : *
740 : * XXX: Reusing the general IO wait is suboptimal, we don't need to wait
741 : * for that specific IO to complete, we just need *any* IO to complete.
742 : */
743 : {
744 976 : PgAioHandle *ioh = dclist_head_element(PgAioHandle, node,
745 : &pgaio_my_backend->in_flight_ios);
746 :
747 976 : switch (ioh->state)
748 : {
749 : /* should not be in in-flight list */
750 0 : case PGAIO_HS_IDLE:
751 : case PGAIO_HS_DEFINED:
752 : case PGAIO_HS_HANDED_OUT:
753 : case PGAIO_HS_STAGED:
754 : case PGAIO_HS_COMPLETED_LOCAL:
755 0 : elog(ERROR, "shouldn't get here with io:%d in state %d",
756 : pgaio_io_get_id(ioh), ioh->state);
757 : break;
758 :
759 972 : case PGAIO_HS_COMPLETED_IO:
760 : case PGAIO_HS_SUBMITTED:
761 972 : pgaio_debug_io(DEBUG2, ioh,
762 : "waiting for free io with %d in flight",
763 : dclist_count(&pgaio_my_backend->in_flight_ios));
764 :
765 : /*
766 : * In a more general case this would be racy, because the
767 : * generation could increase after we read ioh->state above.
768 : * But we are only looking at IOs by the current backend and
769 : * the IO can only be recycled by this backend.
770 : */
771 972 : pgaio_io_wait(ioh, ioh->generation);
772 972 : break;
773 :
774 4 : case PGAIO_HS_COMPLETED_SHARED:
775 : /* it's possible that another backend just finished this IO */
776 4 : pgaio_io_reclaim(ioh);
777 4 : break;
778 : }
779 :
780 976 : if (dclist_count(&pgaio_my_backend->idle_ios) == 0)
781 0 : elog(PANIC, "no idle IO after waiting for IO to terminate");
782 976 : return;
783 : }
784 : }
785 :
786 : /*
787 : * Internal - code outside of AIO should never need this and it'd be hard for
788 : * such code to be safe.
789 : */
790 : static PgAioHandle *
791 1726358 : pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation)
792 : {
793 : PgAioHandle *ioh;
794 :
795 : Assert(iow->aio_index < pgaio_ctl->io_handle_count);
796 :
797 1726358 : ioh = &pgaio_ctl->io_handles[iow->aio_index];
798 :
799 1726358 : *ref_generation = ((uint64) iow->generation_upper) << 32 |
800 1726358 : iow->generation_lower;
801 :
802 : Assert(*ref_generation != 0);
803 :
804 1726358 : return ioh;
805 : }
806 :
807 : static const char *
808 0 : pgaio_io_state_get_name(PgAioHandleState s)
809 : {
810 : #define PGAIO_HS_TOSTR_CASE(sym) case PGAIO_HS_##sym: return #sym
811 0 : switch (s)
812 : {
813 0 : PGAIO_HS_TOSTR_CASE(IDLE);
814 0 : PGAIO_HS_TOSTR_CASE(HANDED_OUT);
815 0 : PGAIO_HS_TOSTR_CASE(DEFINED);
816 0 : PGAIO_HS_TOSTR_CASE(STAGED);
817 0 : PGAIO_HS_TOSTR_CASE(SUBMITTED);
818 0 : PGAIO_HS_TOSTR_CASE(COMPLETED_IO);
819 0 : PGAIO_HS_TOSTR_CASE(COMPLETED_SHARED);
820 0 : PGAIO_HS_TOSTR_CASE(COMPLETED_LOCAL);
821 : }
822 : #undef PGAIO_HS_TOSTR_CASE
823 :
824 0 : return NULL; /* silence compiler */
825 : }
826 :
827 : const char *
828 0 : pgaio_io_get_state_name(PgAioHandle *ioh)
829 : {
830 0 : return pgaio_io_state_get_name(ioh->state);
831 : }
832 :
833 : const char *
834 0 : pgaio_result_status_string(PgAioResultStatus rs)
835 : {
836 0 : switch (rs)
837 : {
838 0 : case PGAIO_RS_UNKNOWN:
839 0 : return "UNKNOWN";
840 0 : case PGAIO_RS_OK:
841 0 : return "OK";
842 0 : case PGAIO_RS_WARNING:
843 0 : return "WARNING";
844 0 : case PGAIO_RS_PARTIAL:
845 0 : return "PARTIAL";
846 0 : case PGAIO_RS_ERROR:
847 0 : return "ERROR";
848 : }
849 :
850 0 : return NULL; /* silence compiler */
851 : }
852 :
853 :
854 :
855 : /* --------------------------------------------------------------------------------
856 : * Functions primarily related to IO Wait References
857 : * --------------------------------------------------------------------------------
858 : */
859 :
860 : /*
861 : * Mark a wait reference as invalid
862 : */
863 : void
864 24732900 : pgaio_wref_clear(PgAioWaitRef *iow)
865 : {
866 24732900 : iow->aio_index = PG_UINT32_MAX;
867 24732900 : }
868 :
869 : /* Is the wait reference valid? */
870 : bool
871 4941218 : pgaio_wref_valid(PgAioWaitRef *iow)
872 : {
873 4941218 : return iow->aio_index != PG_UINT32_MAX;
874 : }
875 :
876 : /*
877 : * Similar to pgaio_io_get_id(), just for wait references.
878 : */
879 : int
880 0 : pgaio_wref_get_id(PgAioWaitRef *iow)
881 : {
882 : Assert(pgaio_wref_valid(iow));
883 0 : return iow->aio_index;
884 : }
885 :
886 : /*
887 : * Wait for the IO to have completed. Can be called in any process, not just
888 : * in the issuing backend.
889 : */
890 : void
891 549398 : pgaio_wref_wait(PgAioWaitRef *iow)
892 : {
893 : uint64 ref_generation;
894 : PgAioHandle *ioh;
895 :
896 549398 : ioh = pgaio_io_from_wref(iow, &ref_generation);
897 :
898 549398 : pgaio_io_wait(ioh, ref_generation);
899 549398 : }
900 :
901 : /*
902 : * Check if the referenced IO completed, without blocking.
903 : */
904 : bool
905 1176960 : pgaio_wref_check_done(PgAioWaitRef *iow)
906 : {
907 : uint64 ref_generation;
908 : PgAioHandleState state;
909 : bool am_owner;
910 : PgAioHandle *ioh;
911 :
912 1176960 : ioh = pgaio_io_from_wref(iow, &ref_generation);
913 :
914 1176960 : if (pgaio_io_was_recycled(ioh, ref_generation, &state))
915 0 : return true;
916 :
917 1176960 : if (state == PGAIO_HS_IDLE)
918 0 : return true;
919 :
920 1176960 : am_owner = ioh->owner_procno == MyProcNumber;
921 :
922 1176960 : if (state == PGAIO_HS_COMPLETED_SHARED ||
923 545278 : state == PGAIO_HS_COMPLETED_LOCAL)
924 : {
925 631682 : if (am_owner)
926 631682 : pgaio_io_reclaim(ioh);
927 631682 : return true;
928 : }
929 :
930 : /*
931 : * XXX: It likely would be worth checking in with the io method, to give
932 : * the IO method a chance to check if there are completion events queued.
933 : */
934 :
935 545278 : return false;
936 : }
937 :
938 :
939 :
940 : /* --------------------------------------------------------------------------------
941 : * Actions on multiple IOs.
942 : * --------------------------------------------------------------------------------
943 : */
944 :
945 : /*
946 : * Submit IOs in batches going forward.
947 : *
948 : * Submitting multiple IOs at once can be substantially faster than doing so
949 : * one-by-one. At the same time, submitting multiple IOs at once requires more
950 : * care to avoid deadlocks.
951 : *
952 : * Consider backend A staging an IO for buffer 1 and then trying to start IO
953 : * on buffer 2, while backend B does the inverse. If A submitted the IO before
954 : * moving on to buffer 2, this works just fine, B will wait for the IO to
955 : * complete. But if batching were used, each backend will wait for IO that has
956 : * not yet been submitted to complete, i.e. forever.
957 : *
958 : * End batch submission mode with pgaio_exit_batchmode(). (Throwing errors is
959 : * allowed; error recovery will end the batch.)
960 : *
961 : * To avoid deadlocks, code needs to ensure that it will not wait for another
962 : * backend while there is unsubmitted IO. E.g. by using conditional lock
963 : * acquisition when acquiring buffer locks. To check if there currently are
964 : * staged IOs, call pgaio_have_staged() and to submit all staged IOs call
965 : * pgaio_submit_staged().
966 : *
967 : * It is not allowed to enter batchmode while already in batchmode, it's
968 : * unlikely to ever be needed, as code needs to be explicitly aware of being
969 : * called in batchmode, to avoid the deadlock risks explained above.
970 : *
971 : * Note that IOs may get submitted before pgaio_exit_batchmode() is called,
972 : * e.g. because too many IOs have been staged or because pgaio_submit_staged()
973 : * was called.
974 : */
975 : void
976 4553450 : pgaio_enter_batchmode(void)
977 : {
978 4553450 : if (pgaio_my_backend->in_batchmode)
979 0 : elog(ERROR, "starting batch while batch already in progress");
980 4553450 : pgaio_my_backend->in_batchmode = true;
981 4553450 : }
982 :
983 : /*
984 : * Stop submitting IOs in batches.
985 : */
986 : void
987 4553438 : pgaio_exit_batchmode(void)
988 : {
989 : Assert(pgaio_my_backend->in_batchmode);
990 :
991 4553438 : pgaio_submit_staged();
992 4553438 : pgaio_my_backend->in_batchmode = false;
993 4553438 : }
994 :
995 : /*
996 : * Are there staged but unsubmitted IOs?
997 : *
998 : * See comment above pgaio_enter_batchmode() for why code may need to check if
999 : * there is IO in that state.
1000 : */
1001 : bool
1002 2428176 : pgaio_have_staged(void)
1003 : {
1004 : Assert(pgaio_my_backend->in_batchmode ||
1005 : pgaio_my_backend->num_staged_ios == 0);
1006 2428176 : return pgaio_my_backend->num_staged_ios > 0;
1007 : }
1008 :
1009 : /*
1010 : * Submit all staged but not yet submitted IOs.
1011 : *
1012 : * Unless in batch mode, this never needs to be called, as IOs get submitted
1013 : * as soon as possible. While in batchmode pgaio_submit_staged() can be called
1014 : * before waiting on another backend, to avoid the risk of deadlocks. See
1015 : * pgaio_enter_batchmode().
1016 : */
1017 : void
1018 21171302 : pgaio_submit_staged(void)
1019 : {
1020 21171302 : int total_submitted = 0;
1021 : int did_submit;
1022 :
1023 21171302 : if (pgaio_my_backend->num_staged_ios == 0)
1024 19989720 : return;
1025 :
1026 :
1027 1181582 : START_CRIT_SECTION();
1028 :
1029 1181582 : did_submit = pgaio_method_ops->submit(pgaio_my_backend->num_staged_ios,
1030 1181582 : pgaio_my_backend->staged_ios);
1031 :
1032 1181582 : END_CRIT_SECTION();
1033 :
1034 1181582 : total_submitted += did_submit;
1035 :
1036 : Assert(total_submitted == did_submit);
1037 :
1038 1181582 : pgaio_my_backend->num_staged_ios = 0;
1039 :
1040 1181582 : pgaio_debug(DEBUG4,
1041 : "aio: submitted %d IOs",
1042 : total_submitted);
1043 : }
1044 :
1045 :
1046 :
1047 : /* --------------------------------------------------------------------------------
1048 : * Other
1049 : * --------------------------------------------------------------------------------
1050 : */
1051 :
1052 :
1053 : /*
1054 : * Perform AIO related cleanup after an error.
1055 : *
1056 : * This should be called early in the error recovery paths, as later steps may
1057 : * need to issue AIO (e.g. to record a transaction abort WAL record).
1058 : */
1059 : void
1060 57748 : pgaio_error_cleanup(void)
1061 : {
1062 : /*
1063 : * It is possible that code errored out after pgaio_enter_batchmode() but
1064 : * before pgaio_exit_batchmode() was called. In that case we need to
1065 : * submit the IO now.
1066 : */
1067 57748 : if (pgaio_my_backend->in_batchmode)
1068 : {
1069 12 : pgaio_my_backend->in_batchmode = false;
1070 :
1071 12 : pgaio_submit_staged();
1072 : }
1073 :
1074 : /*
1075 : * As we aren't in batchmode, there shouldn't be any unsubmitted IOs.
1076 : */
1077 : Assert(pgaio_my_backend->num_staged_ios == 0);
1078 57748 : }
1079 :
1080 : /*
1081 : * Perform AIO related checks at (sub-)transactional boundaries.
1082 : *
1083 : * This should be called late during (sub-)transactional commit/abort, after
1084 : * all steps that might need to perform AIO, so that we can verify that the
1085 : * AIO subsystem is in a valid state at the end of a transaction.
1086 : */
1087 : void
1088 866638 : AtEOXact_Aio(bool is_commit)
1089 : {
1090 : /*
1091 : * We should never be in batch mode at transactional boundaries. In case
1092 : * an error was thrown while in batch mode, pgaio_error_cleanup() should
1093 : * have exited batchmode.
1094 : *
1095 : * In case we are in batchmode somehow, make sure to submit all staged
1096 : * IOs, other backends may need them to complete to continue.
1097 : */
1098 866638 : if (pgaio_my_backend->in_batchmode)
1099 : {
1100 0 : pgaio_error_cleanup();
1101 0 : elog(WARNING, "open AIO batch at end of (sub-)transaction");
1102 : }
1103 :
1104 : /*
1105 : * As we aren't in batchmode, there shouldn't be any unsubmitted IOs.
1106 : */
1107 : Assert(pgaio_my_backend->num_staged_ios == 0);
1108 866638 : }
1109 :
1110 : /*
1111 : * Need to submit staged but not yet submitted IOs using the fd, otherwise
1112 : * the IO would end up targeting something bogus.
1113 : */
1114 : void
1115 16422292 : pgaio_closing_fd(int fd)
1116 : {
1117 : /*
1118 : * Might be called before AIO is initialized or in a subprocess that
1119 : * doesn't use AIO.
1120 : */
1121 16422292 : if (!pgaio_my_backend)
1122 13696 : return;
1123 :
1124 : /*
1125 : * For now just submit all staged IOs - we could be more selective, but
1126 : * it's probably not worth it.
1127 : */
1128 16408596 : pgaio_submit_staged();
1129 :
1130 : /*
1131 : * If requested by the IO method, wait for all IOs that use the
1132 : * to-be-closed FD.
1133 : */
1134 16408596 : if (pgaio_method_ops->wait_on_fd_before_close)
1135 : {
1136 : /*
1137 : * As waiting for one IO to complete may complete multiple IOs, we
1138 : * can't just use a mutable list iterator. The maximum number of
1139 : * in-flight IOs is fairly small, so just restart the loop after
1140 : * waiting for an IO.
1141 : */
1142 0 : while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
1143 : {
1144 : dlist_iter iter;
1145 0 : PgAioHandle *ioh = NULL;
1146 :
1147 0 : dclist_foreach(iter, &pgaio_my_backend->in_flight_ios)
1148 : {
1149 0 : ioh = dclist_container(PgAioHandle, node, iter.cur);
1150 :
1151 0 : if (pgaio_io_uses_fd(ioh, fd))
1152 0 : break;
1153 : else
1154 0 : ioh = NULL;
1155 : }
1156 :
1157 0 : if (!ioh)
1158 0 : break;
1159 :
1160 : /* see comment in pgaio_io_wait_for_free() about raciness */
1161 0 : pgaio_io_wait(ioh, ioh->generation);
1162 : }
1163 : }
1164 : }
1165 :
1166 : /*
1167 : * Registered as before_shmem_exit() callback in pgaio_init_backend()
1168 : */
1169 : void
1170 39308 : pgaio_shutdown(int code, Datum arg)
1171 : {
1172 : Assert(pgaio_my_backend);
1173 : Assert(!pgaio_my_backend->handed_out_io);
1174 :
1175 : /* first clean up resources as we would at a transaction boundary */
1176 39308 : AtEOXact_Aio(code == 0);
1177 :
1178 : /*
1179 : * Before exiting, make sure that all IOs are finished. That has two main
1180 : * purposes:
1181 : *
1182 : * - Some kernel-level AIO mechanisms don't deal well with the issuer of
1183 : * an AIO exiting before IO completed
1184 : *
1185 : * - It'd be confusing to see partially finished IOs in stats views etc
1186 : */
1187 39308 : while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
1188 : {
1189 0 : PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios);
1190 :
1191 : /* see comment in pgaio_io_wait_for_free() about raciness */
1192 0 : pgaio_io_wait(ioh, ioh->generation);
1193 : }
1194 :
1195 39308 : pgaio_my_backend = NULL;
1196 39308 : }
1197 :
1198 : void
1199 2098 : assign_io_method(int newval, void *extra)
1200 : {
1201 : Assert(pgaio_method_ops_table[newval] != NULL);
1202 : Assert(newval < lengthof(io_method_options));
1203 :
1204 2098 : pgaio_method_ops = pgaio_method_ops_table[newval];
1205 2098 : }
1206 :
1207 : bool
1208 4126 : check_io_max_concurrency(int *newval, void **extra, GucSource source)
1209 : {
1210 4126 : if (*newval == -1)
1211 : {
1212 : /*
1213 : * Auto-tuning will be applied later during startup, as auto-tuning
1214 : * depends on the value of various GUCs.
1215 : */
1216 2098 : return true;
1217 : }
1218 2028 : else if (*newval == 0)
1219 : {
1220 0 : GUC_check_errdetail("Only -1 or values bigger than 0 are valid.");
1221 0 : return false;
1222 : }
1223 :
1224 2028 : return true;
1225 : }
1226 :
1227 :
1228 :
1229 : /* --------------------------------------------------------------------------------
1230 : * Injection point support
1231 : * --------------------------------------------------------------------------------
1232 : */
1233 :
1234 : #ifdef USE_INJECTION_POINTS
1235 :
1236 : /*
1237 : * Call injection point with support for pgaio_inj_io_get().
1238 : */
1239 : void
1240 3186478 : pgaio_io_call_inj(PgAioHandle *ioh, const char *injection_point)
1241 : {
1242 3186478 : pgaio_inj_cur_handle = ioh;
1243 :
1244 3186478 : PG_TRY();
1245 : {
1246 3186478 : InjectionPointCached(injection_point);
1247 : }
1248 0 : PG_FINALLY();
1249 : {
1250 3186478 : pgaio_inj_cur_handle = NULL;
1251 : }
1252 3186478 : PG_END_TRY();
1253 3186478 : }
1254 :
1255 : /*
1256 : * Return IO associated with injection point invocation. This is only needed
1257 : * as injection points currently don't support arguments.
1258 : */
1259 : PgAioHandle *
1260 0 : pgaio_inj_io_get(void)
1261 : {
1262 0 : return pgaio_inj_cur_handle;
1263 : }
1264 :
1265 : #endif
|