Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * aio.c
4 : * AIO - Core Logic
5 : *
6 : * For documentation about how AIO works on a higher level, including a
7 : * schematic example, see README.md.
8 : *
9 : *
10 : * AIO is a complicated subsystem. To keep things navigable, it is split
11 : * across a number of files:
12 : *
13 : * - method_*.c - different ways of executing AIO (e.g. worker process)
14 : *
15 : * - aio_target.c - IO on different kinds of targets
16 : *
17 : * - aio_io.c - method-independent code for specific IO ops (e.g. readv)
18 : *
19 : * - aio_callback.c - callbacks at IO operation lifecycle events
20 : *
21 : * - aio_init.c - per-server and per-backend initialization
22 : *
23 : * - aio.c - all other topics
24 : *
25 : * - read_stream.c - helper for reading buffered relation data
26 : *
27 : * - README.md - higher-level overview over AIO
28 : *
29 : *
30 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
31 : * Portions Copyright (c) 1994, Regents of the University of California
32 : *
33 : * IDENTIFICATION
34 : * src/backend/storage/aio/aio.c
35 : *
36 : *-------------------------------------------------------------------------
37 : */
38 :
39 : #include "postgres.h"
40 :
41 : #include "lib/ilist.h"
42 : #include "miscadmin.h"
43 : #include "port/atomics.h"
44 : #include "storage/aio.h"
45 : #include "storage/aio_internal.h"
46 : #include "storage/aio_subsys.h"
47 : #include "utils/guc.h"
48 : #include "utils/guc_hooks.h"
49 : #include "utils/injection_point.h"
50 : #include "utils/resowner.h"
51 : #include "utils/wait_event_types.h"
52 :
53 :
54 : static inline void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state);
55 : static void pgaio_io_reclaim(PgAioHandle *ioh);
56 : static void pgaio_io_resowner_register(PgAioHandle *ioh);
57 : static void pgaio_io_wait_for_free(void);
58 : static PgAioHandle *pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation);
59 : static const char *pgaio_io_state_get_name(PgAioHandleState s);
60 : static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation);
61 :
62 :
63 : /* Options for io_method. */
64 : const struct config_enum_entry io_method_options[] = {
65 : {"sync", IOMETHOD_SYNC, false},
66 : {"worker", IOMETHOD_WORKER, false},
67 : #ifdef IOMETHOD_IO_URING_ENABLED
68 : {"io_uring", IOMETHOD_IO_URING, false},
69 : #endif
70 : {NULL, 0, false}
71 : };
72 :
73 : /* GUCs */
74 : int io_method = DEFAULT_IO_METHOD;
75 : int io_max_concurrency = -1;
76 :
77 : /* global control for AIO */
78 : PgAioCtl *pgaio_ctl;
79 :
80 : /* current backend's per-backend state */
81 : PgAioBackend *pgaio_my_backend;
82 :
83 :
84 : static const IoMethodOps *const pgaio_method_ops_table[] = {
85 : [IOMETHOD_SYNC] = &pgaio_sync_ops,
86 : [IOMETHOD_WORKER] = &pgaio_worker_ops,
87 : #ifdef IOMETHOD_IO_URING_ENABLED
88 : [IOMETHOD_IO_URING] = &pgaio_uring_ops,
89 : #endif
90 : };
91 :
92 : /* callbacks for the configured io_method, set by assign_io_method */
93 : const IoMethodOps *pgaio_method_ops;
94 :
95 :
96 : /* --------------------------------------------------------------------------------
97 : * Public Functions related to PgAioHandle
98 : * --------------------------------------------------------------------------------
99 : */
100 :
101 : /*
102 : * Acquire an AioHandle, waiting for IO completion if necessary.
103 : *
104 : * Each backend can only have one AIO handle that has been "handed out" to
105 : * code, but not yet submitted or released. This restriction is necessary to
106 : * ensure that it is possible for code to wait for an unused handle by waiting
107 : * for in-flight IO to complete. There is a limited number of handles in each
108 : * backend, if multiple handles could be handed out without being submitted,
109 : * waiting for all in-flight IO to complete would not guarantee that handles
110 : * free up.
111 : *
112 : * It is cheap to acquire an IO handle, unless all handles are in use. In that
113 : * case this function waits for the oldest IO to complete. If that is not
114 : * desirable, use pgaio_io_acquire_nb().
115 : *
116 : * If a handle was acquired but then does not turn out to be needed,
117 : * e.g. because pgaio_io_acquire() is called before starting an IO in a
118 : * critical section, the handle needs to be released with pgaio_io_release().
119 : *
120 : *
121 : * To react to the completion of the IO as soon as it is known to have
122 : * completed, callbacks can be registered with pgaio_io_register_callbacks().
123 : *
124 : * To actually execute IO using the returned handle, the pgaio_io_start_*()
125 : * family of functions is used. In many cases the pgaio_io_start_*() call will
126 : * not be done directly by code that acquired the handle, but by lower level
127 : * code that gets passed the handle. E.g. if code in bufmgr.c wants to perform
128 : * AIO, it typically will pass the handle to smgr.c, which will pass it on to
129 : * md.c, on to fd.c, which then finally calls pgaio_io_start_*(). This
130 : * forwarding allows the various layers to react to the IO's completion by
131 : * registering callbacks. These callbacks in turn can translate a lower
132 : * layer's result into a result understandable by a higher layer.
133 : *
134 : * During pgaio_io_start_*() the IO is staged (i.e. prepared for execution but
135 : * not submitted to the kernel). Unless in batchmode
136 : * (c.f. pgaio_enter_batchmode()), the IO will also get submitted for
137 : * execution. Note that, whether in batchmode or not, the IO might even
138 : * complete before the functions return.
139 : *
140 : * After pgaio_io_start_*() the AioHandle is "consumed" and may not be
141 : * referenced by the IO issuing code. To e.g. wait for IO, references to the
142 : * IO can be established with pgaio_io_get_wref() *before* pgaio_io_start_*()
143 : * is called. pgaio_wref_wait() can be used to wait for the IO to complete.
144 : *
145 : *
146 : * To know if the IO [partially] succeeded or failed, a PgAioReturn * can be
147 : * passed to pgaio_io_acquire(). Once the issuing backend has called
148 : * pgaio_wref_wait(), the PgAioReturn contains information about whether the
149 : * operation succeeded and details about the first failure, if any. The error
150 : * can be raised / logged with pgaio_result_report().
151 : *
152 : * The lifetime of the memory pointed to be *ret needs to be at least as long
153 : * as the passed in resowner. If the resowner releases resources before the IO
154 : * completes (typically due to an error), the reference to *ret will be
155 : * cleared. In case of resowner cleanup *ret will not be updated with the
156 : * results of the IO operation.
157 : */
158 : PgAioHandle *
159 11120 : pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret)
160 : {
161 : PgAioHandle *h;
162 :
163 : while (true)
164 : {
165 11120 : h = pgaio_io_acquire_nb(resowner, ret);
166 :
167 11116 : if (h != NULL)
168 5656 : return h;
169 :
170 : /*
171 : * Evidently all handles by this backend are in use. Just wait for
172 : * some to complete.
173 : */
174 5460 : pgaio_io_wait_for_free();
175 : }
176 : }
177 :
178 : /*
179 : * Acquire an AioHandle, returning NULL if no handles are free.
180 : *
181 : * See pgaio_io_acquire(). The only difference is that this function will return
182 : * NULL if there are no idle handles, instead of blocking.
183 : */
184 : PgAioHandle *
185 2496878 : pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)
186 : {
187 2496878 : if (pgaio_my_backend->num_staged_ios >= PGAIO_SUBMIT_BATCH_SIZE)
188 : {
189 : Assert(pgaio_my_backend->num_staged_ios == PGAIO_SUBMIT_BATCH_SIZE);
190 0 : pgaio_submit_staged();
191 : }
192 :
193 2496878 : if (pgaio_my_backend->handed_out_io)
194 4 : elog(ERROR, "API violation: Only one IO can be handed out");
195 :
196 2496874 : if (!dclist_is_empty(&pgaio_my_backend->idle_ios))
197 : {
198 2485954 : dlist_node *ion = dclist_pop_head_node(&pgaio_my_backend->idle_ios);
199 2485954 : PgAioHandle *ioh = dclist_container(PgAioHandle, node, ion);
200 :
201 : Assert(ioh->state == PGAIO_HS_IDLE);
202 : Assert(ioh->owner_procno == MyProcNumber);
203 :
204 2485954 : pgaio_io_update_state(ioh, PGAIO_HS_HANDED_OUT);
205 2485954 : pgaio_my_backend->handed_out_io = ioh;
206 :
207 2485954 : if (resowner)
208 2485954 : pgaio_io_resowner_register(ioh);
209 :
210 2485954 : if (ret)
211 : {
212 2485902 : ioh->report_return = ret;
213 2485902 : ret->result.status = PGAIO_RS_UNKNOWN;
214 : }
215 :
216 2485954 : return ioh;
217 : }
218 :
219 10920 : return NULL;
220 : }
221 :
222 : /*
223 : * Release IO handle that turned out to not be required.
224 : *
225 : * See pgaio_io_acquire() for more details.
226 : */
227 : void
228 3734 : pgaio_io_release(PgAioHandle *ioh)
229 : {
230 3734 : if (ioh == pgaio_my_backend->handed_out_io)
231 : {
232 : Assert(ioh->state == PGAIO_HS_HANDED_OUT);
233 : Assert(ioh->resowner);
234 :
235 3730 : pgaio_my_backend->handed_out_io = NULL;
236 3730 : pgaio_io_reclaim(ioh);
237 : }
238 : else
239 : {
240 4 : elog(ERROR, "release in unexpected state");
241 : }
242 3730 : }
243 :
244 : /*
245 : * Release IO handle during resource owner cleanup.
246 : */
247 : void
248 96 : pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
249 : {
250 96 : PgAioHandle *ioh = dlist_container(PgAioHandle, resowner_node, ioh_node);
251 :
252 : Assert(ioh->resowner);
253 :
254 96 : ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
255 96 : ioh->resowner = NULL;
256 :
257 96 : switch (ioh->state)
258 : {
259 0 : case PGAIO_HS_IDLE:
260 0 : elog(ERROR, "unexpected");
261 : break;
262 66 : case PGAIO_HS_HANDED_OUT:
263 : Assert(ioh == pgaio_my_backend->handed_out_io || pgaio_my_backend->handed_out_io == NULL);
264 :
265 66 : if (ioh == pgaio_my_backend->handed_out_io)
266 : {
267 66 : pgaio_my_backend->handed_out_io = NULL;
268 66 : if (!on_error)
269 20 : elog(WARNING, "leaked AIO handle");
270 : }
271 :
272 66 : pgaio_io_reclaim(ioh);
273 66 : break;
274 0 : case PGAIO_HS_DEFINED:
275 : case PGAIO_HS_STAGED:
276 0 : if (!on_error)
277 0 : elog(WARNING, "AIO handle was not submitted");
278 0 : pgaio_submit_staged();
279 0 : break;
280 30 : case PGAIO_HS_SUBMITTED:
281 : case PGAIO_HS_COMPLETED_IO:
282 : case PGAIO_HS_COMPLETED_SHARED:
283 : case PGAIO_HS_COMPLETED_LOCAL:
284 : /* this is expected to happen */
285 30 : break;
286 : }
287 :
288 : /*
289 : * Need to unregister the reporting of the IO's result, the memory it's
290 : * referencing likely has gone away.
291 : */
292 96 : if (ioh->report_return)
293 30 : ioh->report_return = NULL;
294 96 : }
295 :
296 : /*
297 : * Add a [set of] flags to the IO.
298 : *
299 : * Note that this combines flags with already set flags, rather than set flags
300 : * to explicitly the passed in parameters. This is to allow multiple callsites
301 : * to set flags.
302 : */
303 : void
304 4961466 : pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag)
305 : {
306 : Assert(ioh->state == PGAIO_HS_HANDED_OUT);
307 :
308 4961466 : ioh->flags |= flag;
309 4961466 : }
310 :
311 : /*
312 : * Returns an ID uniquely identifying the IO handle. This is only really
313 : * useful for logging, as handles are reused across multiple IOs.
314 : */
315 : int
316 1207642 : pgaio_io_get_id(PgAioHandle *ioh)
317 : {
318 : Assert(ioh >= pgaio_ctl->io_handles &&
319 : ioh < (pgaio_ctl->io_handles + pgaio_ctl->io_handle_count));
320 1207642 : return ioh - pgaio_ctl->io_handles;
321 : }
322 :
323 : /*
324 : * Return the ProcNumber for the process that can use an IO handle. The
325 : * mapping from IO handles to PGPROCs is static, therefore this even works
326 : * when the corresponding PGPROC is not in use.
327 : */
328 : ProcNumber
329 0 : pgaio_io_get_owner(PgAioHandle *ioh)
330 : {
331 0 : return ioh->owner_procno;
332 : }
333 :
334 : /*
335 : * Return a wait reference for the IO. Only wait references can be used to
336 : * wait for an IOs completion, as handles themselves can be reused after
337 : * completion. See also the comment above pgaio_io_acquire().
338 : */
339 : void
340 4964346 : pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow)
341 : {
342 : Assert(ioh->state == PGAIO_HS_HANDED_OUT ||
343 : ioh->state == PGAIO_HS_DEFINED ||
344 : ioh->state == PGAIO_HS_STAGED);
345 : Assert(ioh->generation != 0);
346 :
347 4964346 : iow->aio_index = ioh - pgaio_ctl->io_handles;
348 4964346 : iow->generation_upper = (uint32) (ioh->generation >> 32);
349 4964346 : iow->generation_lower = (uint32) ioh->generation;
350 4964346 : }
351 :
352 :
353 :
354 : /* --------------------------------------------------------------------------------
355 : * Internal Functions related to PgAioHandle
356 : * --------------------------------------------------------------------------------
357 : */
358 :
359 : static inline void
360 19444580 : pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state)
361 : {
362 19444580 : pgaio_debug_io(DEBUG5, ioh,
363 : "updating state to %s",
364 : pgaio_io_state_get_name(new_state));
365 :
366 : /*
367 : * Ensure the changes signified by the new state are visible before the
368 : * new state becomes visible.
369 : */
370 19444580 : pg_write_barrier();
371 :
372 19444580 : ioh->state = new_state;
373 19444580 : }
374 :
375 : static void
376 2485954 : pgaio_io_resowner_register(PgAioHandle *ioh)
377 : {
378 : Assert(!ioh->resowner);
379 : Assert(CurrentResourceOwner);
380 :
381 2485954 : ResourceOwnerRememberAioHandle(CurrentResourceOwner, &ioh->resowner_node);
382 2485954 : ioh->resowner = CurrentResourceOwner;
383 2485954 : }
384 :
385 : /*
386 : * Stage IO for execution and, if appropriate, submit it immediately.
387 : *
388 : * Should only be called from pgaio_io_start_*().
389 : */
390 : void
391 2482158 : pgaio_io_stage(PgAioHandle *ioh, PgAioOp op)
392 : {
393 : bool needs_synchronous;
394 :
395 : Assert(ioh->state == PGAIO_HS_HANDED_OUT);
396 : Assert(pgaio_my_backend->handed_out_io == ioh);
397 : Assert(pgaio_io_has_target(ioh));
398 :
399 2482158 : ioh->op = op;
400 2482158 : ioh->result = 0;
401 :
402 2482158 : pgaio_io_update_state(ioh, PGAIO_HS_DEFINED);
403 :
404 : /* allow a new IO to be staged */
405 2482158 : pgaio_my_backend->handed_out_io = NULL;
406 :
407 2482158 : pgaio_io_call_stage(ioh);
408 :
409 2482158 : pgaio_io_update_state(ioh, PGAIO_HS_STAGED);
410 :
411 : /*
412 : * Synchronous execution has to be executed, well, synchronously, so check
413 : * that first.
414 : */
415 2482158 : needs_synchronous = pgaio_io_needs_synchronous_execution(ioh);
416 :
417 2482158 : pgaio_debug_io(DEBUG3, ioh,
418 : "staged (synchronous: %d, in_batch: %d)",
419 : needs_synchronous, pgaio_my_backend->in_batchmode);
420 :
421 2482158 : if (!needs_synchronous)
422 : {
423 1149896 : pgaio_my_backend->staged_ios[pgaio_my_backend->num_staged_ios++] = ioh;
424 : Assert(pgaio_my_backend->num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
425 :
426 : /*
427 : * Unless code explicitly opted into batching IOs, submit the IO
428 : * immediately.
429 : */
430 1149896 : if (!pgaio_my_backend->in_batchmode)
431 48226 : pgaio_submit_staged();
432 : }
433 : else
434 : {
435 1332262 : pgaio_io_prepare_submit(ioh);
436 1332262 : pgaio_io_perform_synchronously(ioh);
437 : }
438 2482158 : }
439 :
440 : bool
441 2482158 : pgaio_io_needs_synchronous_execution(PgAioHandle *ioh)
442 : {
443 : /*
444 : * If the caller said to execute the IO synchronously, do so.
445 : *
446 : * XXX: We could optimize the logic when to execute synchronously by first
447 : * checking if there are other IOs in flight and only synchronously
448 : * executing if not. Unclear whether that'll be sufficiently common to be
449 : * worth worrying about.
450 : */
451 2482158 : if (ioh->flags & PGAIO_HF_SYNCHRONOUS)
452 1324236 : return true;
453 :
454 : /* Check if the IO method requires synchronous execution of IO */
455 1157922 : if (pgaio_method_ops->needs_synchronous_execution)
456 1157922 : return pgaio_method_ops->needs_synchronous_execution(ioh);
457 :
458 0 : return false;
459 : }
460 :
461 : /*
462 : * Handle IO being processed by IO method.
463 : *
464 : * Should be called by IO methods / synchronous IO execution, just before the
465 : * IO is performed.
466 : */
467 : void
468 2482158 : pgaio_io_prepare_submit(PgAioHandle *ioh)
469 : {
470 2482158 : pgaio_io_update_state(ioh, PGAIO_HS_SUBMITTED);
471 :
472 2482158 : dclist_push_tail(&pgaio_my_backend->in_flight_ios, &ioh->node);
473 2482158 : }
474 :
475 : /*
476 : * Handle IO getting completed by a method.
477 : *
478 : * Should be called by IO methods / synchronous IO execution, just after the
479 : * IO has been performed.
480 : *
481 : * Expects to be called in a critical section. We expect IOs to be usable for
482 : * WAL etc, which requires being able to execute completion callbacks in a
483 : * critical section.
484 : */
485 : void
486 2272020 : pgaio_io_process_completion(PgAioHandle *ioh, int result)
487 : {
488 : Assert(ioh->state == PGAIO_HS_SUBMITTED);
489 :
490 : Assert(CritSectionCount > 0);
491 :
492 2272020 : ioh->result = result;
493 :
494 2272020 : pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_IO);
495 :
496 2272020 : INJECTION_POINT("aio-process-completion-before-shared", ioh);
497 :
498 2272020 : pgaio_io_call_complete_shared(ioh);
499 :
500 2272020 : pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_SHARED);
501 :
502 : /* condition variable broadcast ensures state is visible before wakeup */
503 2272020 : ConditionVariableBroadcast(&ioh->cv);
504 :
505 : /* contains call to pgaio_io_call_complete_local() */
506 2272020 : if (ioh->owner_procno == MyProcNumber)
507 1332262 : pgaio_io_reclaim(ioh);
508 2272020 : }
509 :
510 : /*
511 : * Has the IO completed and thus the IO handle been reused?
512 : *
513 : * This is useful when waiting for IO completion at a low level (e.g. in an IO
514 : * method's ->wait_one() callback).
515 : */
516 : bool
517 3625064 : pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
518 : {
519 3625064 : *state = ioh->state;
520 3625064 : pg_read_barrier();
521 :
522 3625064 : return ioh->generation != ref_generation;
523 : }
524 :
525 : /*
526 : * Wait for IO to complete. External code should never use this, outside of
527 : * the AIO subsystem waits are only allowed via pgaio_wref_wait().
528 : */
529 : static void
530 497024 : pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation)
531 : {
532 : PgAioHandleState state;
533 : bool am_owner;
534 :
535 497024 : am_owner = ioh->owner_procno == MyProcNumber;
536 :
537 497024 : if (pgaio_io_was_recycled(ioh, ref_generation, &state))
538 68 : return;
539 :
540 496956 : if (am_owner)
541 : {
542 493764 : if (state != PGAIO_HS_SUBMITTED
543 122314 : && state != PGAIO_HS_COMPLETED_IO
544 678 : && state != PGAIO_HS_COMPLETED_SHARED
545 0 : && state != PGAIO_HS_COMPLETED_LOCAL)
546 : {
547 0 : elog(PANIC, "waiting for own IO in wrong state: %d",
548 : state);
549 : }
550 : }
551 :
552 : while (true)
553 : {
554 992984 : if (pgaio_io_was_recycled(ioh, ref_generation, &state))
555 1936 : return;
556 :
557 991048 : switch (state)
558 : {
559 0 : case PGAIO_HS_IDLE:
560 : case PGAIO_HS_HANDED_OUT:
561 0 : elog(ERROR, "IO in wrong state: %d", state);
562 : break;
563 :
564 373208 : case PGAIO_HS_SUBMITTED:
565 :
566 : /*
567 : * If we need to wait via the IO method, do so now. Don't
568 : * check via the IO method if the issuing backend is executing
569 : * the IO synchronously.
570 : */
571 373208 : if (pgaio_method_ops->wait_one && !(ioh->flags & PGAIO_HF_SYNCHRONOUS))
572 : {
573 0 : pgaio_method_ops->wait_one(ioh, ref_generation);
574 0 : continue;
575 : }
576 : /* fallthrough */
577 :
578 : /* waiting for owner to submit */
579 : case PGAIO_HS_DEFINED:
580 : case PGAIO_HS_STAGED:
581 : /* waiting for reaper to complete */
582 : /* fallthrough */
583 : case PGAIO_HS_COMPLETED_IO:
584 : /* shouldn't be able to hit this otherwise */
585 : Assert(IsUnderPostmaster);
586 : /* ensure we're going to get woken up */
587 496028 : ConditionVariablePrepareToSleep(&ioh->cv);
588 :
589 990670 : while (!pgaio_io_was_recycled(ioh, ref_generation, &state))
590 : {
591 988746 : if (state == PGAIO_HS_COMPLETED_SHARED ||
592 494670 : state == PGAIO_HS_COMPLETED_LOCAL)
593 : break;
594 494642 : ConditionVariableSleep(&ioh->cv, WAIT_EVENT_AIO_IO_COMPLETION);
595 : }
596 :
597 496028 : ConditionVariableCancelSleep();
598 496028 : break;
599 :
600 495020 : case PGAIO_HS_COMPLETED_SHARED:
601 : case PGAIO_HS_COMPLETED_LOCAL:
602 : /* see above */
603 495020 : if (am_owner)
604 493764 : pgaio_io_reclaim(ioh);
605 495020 : return;
606 : }
607 496028 : }
608 : }
609 :
610 : /*
611 : * Make IO handle ready to be reused after IO has completed or after the
612 : * handle has been released without being used.
613 : */
614 : static void
615 2485954 : pgaio_io_reclaim(PgAioHandle *ioh)
616 : {
617 : /* This is only ok if it's our IO */
618 : Assert(ioh->owner_procno == MyProcNumber);
619 : Assert(ioh->state != PGAIO_HS_IDLE);
620 :
621 : /*
622 : * It's a bit ugly, but right now the easiest place to put the execution
623 : * of local completion callbacks is this function, as we need to execute
624 : * local callbacks just before reclaiming at multiple callsites.
625 : */
626 2485954 : if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
627 : {
628 : PgAioResult local_result;
629 :
630 2482158 : local_result = pgaio_io_call_complete_local(ioh);
631 2482158 : pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_LOCAL);
632 :
633 2482158 : if (ioh->report_return)
634 : {
635 2482128 : ioh->report_return->result = local_result;
636 2482128 : ioh->report_return->target_data = ioh->target_data;
637 : }
638 : }
639 :
640 2485954 : pgaio_debug_io(DEBUG4, ioh,
641 : "reclaiming: distilled_result: (status %s, id %u, error_data %d), raw_result: %d",
642 : pgaio_result_status_string(ioh->distilled_result.status),
643 : ioh->distilled_result.id,
644 : ioh->distilled_result.error_data,
645 : ioh->result);
646 :
647 : /* if the IO has been defined, it's on the in-flight list, remove */
648 2485954 : if (ioh->state != PGAIO_HS_HANDED_OUT)
649 2482158 : dclist_delete_from(&pgaio_my_backend->in_flight_ios, &ioh->node);
650 :
651 2485954 : if (ioh->resowner)
652 : {
653 2485858 : ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
654 2485858 : ioh->resowner = NULL;
655 : }
656 :
657 : Assert(!ioh->resowner);
658 :
659 : /*
660 : * Update generation & state first, before resetting the IO's fields,
661 : * otherwise a concurrent "viewer" could think the fields are valid, even
662 : * though they are being reset. Increment the generation first, so that
663 : * we can assert elsewhere that we never wait for an IDLE IO. While it's
664 : * a bit weird for the state to go backwards for a generation, it's OK
665 : * here, as there cannot be references to the "reborn" IO yet. Can't
666 : * update both at once, so something has to give.
667 : */
668 2485954 : ioh->generation++;
669 2485954 : pgaio_io_update_state(ioh, PGAIO_HS_IDLE);
670 :
671 : /* ensure the state update is visible before we reset fields */
672 2485954 : pg_write_barrier();
673 :
674 2485954 : ioh->op = PGAIO_OP_INVALID;
675 2485954 : ioh->target = PGAIO_TID_INVALID;
676 2485954 : ioh->flags = 0;
677 2485954 : ioh->num_callbacks = 0;
678 2485954 : ioh->handle_data_len = 0;
679 2485954 : ioh->report_return = NULL;
680 2485954 : ioh->result = 0;
681 2485954 : ioh->distilled_result.status = PGAIO_RS_UNKNOWN;
682 :
683 : /*
684 : * We push the IO to the head of the idle IO list, that seems more cache
685 : * efficient in cases where only a few IOs are used.
686 : */
687 2485954 : dclist_push_head(&pgaio_my_backend->idle_ios, &ioh->node);
688 2485954 : }
689 :
690 : /*
691 : * Wait for an IO handle to become usable.
692 : *
693 : * This only really is useful for pgaio_io_acquire().
694 : */
695 : static void
696 5460 : pgaio_io_wait_for_free(void)
697 : {
698 5460 : int reclaimed = 0;
699 :
700 5460 : pgaio_debug(DEBUG2, "waiting for free IO with %d pending, %d in-flight, %d idle IOs",
701 : pgaio_my_backend->num_staged_ios,
702 : dclist_count(&pgaio_my_backend->in_flight_ios),
703 : dclist_is_empty(&pgaio_my_backend->idle_ios));
704 :
705 : /*
706 : * First check if any of our IOs actually have completed - when using
707 : * worker, that'll often be the case. We could do so as part of the loop
708 : * below, but that'd potentially lead us to wait for some IO submitted
709 : * before.
710 : */
711 10920 : for (int i = 0; i < io_max_concurrency; i++)
712 : {
713 5460 : PgAioHandle *ioh = &pgaio_ctl->io_handles[pgaio_my_backend->io_handle_off + i];
714 :
715 5460 : if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
716 : {
717 4554 : pgaio_io_reclaim(ioh);
718 4554 : reclaimed++;
719 : }
720 : }
721 :
722 5460 : if (reclaimed > 0)
723 4554 : return;
724 :
725 : /*
726 : * If we have any unsubmitted IOs, submit them now. We'll start waiting in
727 : * a second, so it's better they're in flight. This also addresses the
728 : * edge-case that all IOs are unsubmitted.
729 : */
730 906 : if (pgaio_my_backend->num_staged_ios > 0)
731 0 : pgaio_submit_staged();
732 :
733 906 : if (dclist_count(&pgaio_my_backend->in_flight_ios) == 0)
734 0 : ereport(ERROR,
735 : errmsg_internal("no free IOs despite no in-flight IOs"),
736 : errdetail_internal("%d pending, %d in-flight, %d idle IOs",
737 : pgaio_my_backend->num_staged_ios,
738 : dclist_count(&pgaio_my_backend->in_flight_ios),
739 : dclist_is_empty(&pgaio_my_backend->idle_ios)));
740 :
741 : /*
742 : * Wait for the oldest in-flight IO to complete.
743 : *
744 : * XXX: Reusing the general IO wait is suboptimal, we don't need to wait
745 : * for that specific IO to complete, we just need *any* IO to complete.
746 : */
747 : {
748 906 : PgAioHandle *ioh = dclist_head_element(PgAioHandle, node,
749 : &pgaio_my_backend->in_flight_ios);
750 :
751 906 : switch (ioh->state)
752 : {
753 : /* should not be in in-flight list */
754 0 : case PGAIO_HS_IDLE:
755 : case PGAIO_HS_DEFINED:
756 : case PGAIO_HS_HANDED_OUT:
757 : case PGAIO_HS_STAGED:
758 : case PGAIO_HS_COMPLETED_LOCAL:
759 0 : elog(ERROR, "shouldn't get here with io:%d in state %d",
760 : pgaio_io_get_id(ioh), ioh->state);
761 : break;
762 :
763 902 : case PGAIO_HS_COMPLETED_IO:
764 : case PGAIO_HS_SUBMITTED:
765 902 : pgaio_debug_io(DEBUG2, ioh,
766 : "waiting for free io with %d in flight",
767 : dclist_count(&pgaio_my_backend->in_flight_ios));
768 :
769 : /*
770 : * In a more general case this would be racy, because the
771 : * generation could increase after we read ioh->state above.
772 : * But we are only looking at IOs by the current backend and
773 : * the IO can only be recycled by this backend.
774 : */
775 902 : pgaio_io_wait(ioh, ioh->generation);
776 902 : break;
777 :
778 4 : case PGAIO_HS_COMPLETED_SHARED:
779 : /* it's possible that another backend just finished this IO */
780 4 : pgaio_io_reclaim(ioh);
781 4 : break;
782 : }
783 :
784 906 : if (dclist_count(&pgaio_my_backend->idle_ios) == 0)
785 0 : elog(PANIC, "no idle IO after waiting for IO to terminate");
786 906 : return;
787 : }
788 : }
789 :
790 : /*
791 : * Internal - code outside of AIO should never need this and it'd be hard for
792 : * such code to be safe.
793 : */
794 : static PgAioHandle *
795 1640490 : pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation)
796 : {
797 : PgAioHandle *ioh;
798 :
799 : Assert(iow->aio_index < pgaio_ctl->io_handle_count);
800 :
801 1640490 : ioh = &pgaio_ctl->io_handles[iow->aio_index];
802 :
803 1640490 : *ref_generation = ((uint64) iow->generation_upper) << 32 |
804 1640490 : iow->generation_lower;
805 :
806 : Assert(*ref_generation != 0);
807 :
808 1640490 : return ioh;
809 : }
810 :
811 : static const char *
812 14310 : pgaio_io_state_get_name(PgAioHandleState s)
813 : {
814 : #define PGAIO_HS_TOSTR_CASE(sym) case PGAIO_HS_##sym: return #sym
815 14310 : switch (s)
816 : {
817 0 : PGAIO_HS_TOSTR_CASE(IDLE);
818 4764 : PGAIO_HS_TOSTR_CASE(HANDED_OUT);
819 2382 : PGAIO_HS_TOSTR_CASE(DEFINED);
820 2382 : PGAIO_HS_TOSTR_CASE(STAGED);
821 0 : PGAIO_HS_TOSTR_CASE(SUBMITTED);
822 2382 : PGAIO_HS_TOSTR_CASE(COMPLETED_IO);
823 2400 : PGAIO_HS_TOSTR_CASE(COMPLETED_SHARED);
824 0 : PGAIO_HS_TOSTR_CASE(COMPLETED_LOCAL);
825 : }
826 : #undef PGAIO_HS_TOSTR_CASE
827 :
828 0 : return NULL; /* silence compiler */
829 : }
830 :
831 : const char *
832 14310 : pgaio_io_get_state_name(PgAioHandle *ioh)
833 : {
834 14310 : return pgaio_io_state_get_name(ioh->state);
835 : }
836 :
837 : const char *
838 4764 : pgaio_result_status_string(PgAioResultStatus rs)
839 : {
840 4764 : switch (rs)
841 : {
842 0 : case PGAIO_RS_UNKNOWN:
843 0 : return "UNKNOWN";
844 4404 : case PGAIO_RS_OK:
845 4404 : return "OK";
846 136 : case PGAIO_RS_WARNING:
847 136 : return "WARNING";
848 40 : case PGAIO_RS_PARTIAL:
849 40 : return "PARTIAL";
850 184 : case PGAIO_RS_ERROR:
851 184 : return "ERROR";
852 : }
853 :
854 0 : return NULL; /* silence compiler */
855 : }
856 :
857 :
858 :
859 : /* --------------------------------------------------------------------------------
860 : * Functions primarily related to IO Wait References
861 : * --------------------------------------------------------------------------------
862 : */
863 :
864 : /*
865 : * Mark a wait reference as invalid
866 : */
867 : void
868 25750748 : pgaio_wref_clear(PgAioWaitRef *iow)
869 : {
870 25750748 : iow->aio_index = PG_UINT32_MAX;
871 25750748 : }
872 :
873 : /* Is the wait reference valid? */
874 : bool
875 5067840 : pgaio_wref_valid(PgAioWaitRef *iow)
876 : {
877 5067840 : return iow->aio_index != PG_UINT32_MAX;
878 : }
879 :
880 : /*
881 : * Similar to pgaio_io_get_id(), just for wait references.
882 : */
883 : int
884 0 : pgaio_wref_get_id(PgAioWaitRef *iow)
885 : {
886 : Assert(pgaio_wref_valid(iow));
887 0 : return iow->aio_index;
888 : }
889 :
890 : /*
891 : * Wait for the IO to have completed. Can be called in any process, not just
892 : * in the issuing backend.
893 : */
894 : void
895 496104 : pgaio_wref_wait(PgAioWaitRef *iow)
896 : {
897 : uint64 ref_generation;
898 : PgAioHandle *ioh;
899 :
900 496104 : ioh = pgaio_io_from_wref(iow, &ref_generation);
901 :
902 496104 : pgaio_io_wait(ioh, ref_generation);
903 496104 : }
904 :
905 : /*
906 : * Check if the referenced IO completed, without blocking.
907 : */
908 : bool
909 1144386 : pgaio_wref_check_done(PgAioWaitRef *iow)
910 : {
911 : uint64 ref_generation;
912 : PgAioHandleState state;
913 : bool am_owner;
914 : PgAioHandle *ioh;
915 :
916 1144386 : ioh = pgaio_io_from_wref(iow, &ref_generation);
917 :
918 1144386 : if (pgaio_io_was_recycled(ioh, ref_generation, &state))
919 0 : return true;
920 :
921 1144386 : if (state == PGAIO_HS_IDLE)
922 0 : return true;
923 :
924 1144386 : am_owner = ioh->owner_procno == MyProcNumber;
925 :
926 1144386 : if (state == PGAIO_HS_COMPLETED_SHARED ||
927 492812 : state == PGAIO_HS_COMPLETED_LOCAL)
928 : {
929 651574 : if (am_owner)
930 651574 : pgaio_io_reclaim(ioh);
931 651574 : return true;
932 : }
933 :
934 : /*
935 : * XXX: It likely would be worth checking in with the io method, to give
936 : * the IO method a chance to check if there are completion events queued.
937 : */
938 :
939 492812 : return false;
940 : }
941 :
942 :
943 :
944 : /* --------------------------------------------------------------------------------
945 : * Actions on multiple IOs.
946 : * --------------------------------------------------------------------------------
947 : */
948 :
949 : /*
950 : * Submit IOs in batches going forward.
951 : *
952 : * Submitting multiple IOs at once can be substantially faster than doing so
953 : * one-by-one. At the same time, submitting multiple IOs at once requires more
954 : * care to avoid deadlocks.
955 : *
956 : * Consider backend A staging an IO for buffer 1 and then trying to start IO
957 : * on buffer 2, while backend B does the inverse. If A submitted the IO before
958 : * moving on to buffer 2, this works just fine, B will wait for the IO to
959 : * complete. But if batching were used, each backend will wait for IO that has
960 : * not yet been submitted to complete, i.e. forever.
961 : *
962 : * End batch submission mode with pgaio_exit_batchmode(). (Throwing errors is
963 : * allowed; error recovery will end the batch.)
964 : *
965 : * To avoid deadlocks, code needs to ensure that it will not wait for another
966 : * backend while there is unsubmitted IO. E.g. by using conditional lock
967 : * acquisition when acquiring buffer locks. To check if there currently are
968 : * staged IOs, call pgaio_have_staged() and to submit all staged IOs call
969 : * pgaio_submit_staged().
970 : *
971 : * It is not allowed to enter batchmode while already in batchmode, it's
972 : * unlikely to ever be needed, as code needs to be explicitly aware of being
973 : * called in batchmode, to avoid the deadlock risks explained above.
974 : *
975 : * Note that IOs may get submitted before pgaio_exit_batchmode() is called,
976 : * e.g. because too many IOs have been staged or because pgaio_submit_staged()
977 : * was called.
978 : */
979 : void
980 5407554 : pgaio_enter_batchmode(void)
981 : {
982 5407554 : if (pgaio_my_backend->in_batchmode)
983 0 : elog(ERROR, "starting batch while batch already in progress");
984 5407554 : pgaio_my_backend->in_batchmode = true;
985 5407554 : }
986 :
987 : /*
988 : * Stop submitting IOs in batches.
989 : */
990 : void
991 5407534 : pgaio_exit_batchmode(void)
992 : {
993 : Assert(pgaio_my_backend->in_batchmode);
994 :
995 5407534 : pgaio_submit_staged();
996 5407534 : pgaio_my_backend->in_batchmode = false;
997 5407534 : }
998 :
999 : /*
1000 : * Are there staged but unsubmitted IOs?
1001 : *
1002 : * See comment above pgaio_enter_batchmode() for why code may need to check if
1003 : * there is IO in that state.
1004 : */
1005 : bool
1006 2485758 : pgaio_have_staged(void)
1007 : {
1008 : Assert(pgaio_my_backend->in_batchmode ||
1009 : pgaio_my_backend->num_staged_ios == 0);
1010 2485758 : return pgaio_my_backend->num_staged_ios > 0;
1011 : }
1012 :
1013 : /*
1014 : * Submit all staged but not yet submitted IOs.
1015 : *
1016 : * Unless in batch mode, this never needs to be called, as IOs get submitted
1017 : * as soon as possible. While in batchmode pgaio_submit_staged() can be called
1018 : * before waiting on another backend, to avoid the risk of deadlocks. See
1019 : * pgaio_enter_batchmode().
1020 : */
1021 : void
1022 5461244 : pgaio_submit_staged(void)
1023 : {
1024 5461244 : int total_submitted = 0;
1025 : int did_submit;
1026 :
1027 5461244 : if (pgaio_my_backend->num_staged_ios == 0)
1028 4312480 : return;
1029 :
1030 :
1031 1148764 : START_CRIT_SECTION();
1032 :
1033 1148764 : did_submit = pgaio_method_ops->submit(pgaio_my_backend->num_staged_ios,
1034 1148764 : pgaio_my_backend->staged_ios);
1035 :
1036 1148764 : END_CRIT_SECTION();
1037 :
1038 1148764 : total_submitted += did_submit;
1039 :
1040 : Assert(total_submitted == did_submit);
1041 :
1042 1148764 : pgaio_my_backend->num_staged_ios = 0;
1043 :
1044 1148764 : pgaio_debug(DEBUG4,
1045 : "aio: submitted %d IOs",
1046 : total_submitted);
1047 : }
1048 :
1049 :
1050 :
1051 : /* --------------------------------------------------------------------------------
1052 : * Other
1053 : * --------------------------------------------------------------------------------
1054 : */
1055 :
1056 :
1057 : /*
1058 : * Perform AIO related cleanup after an error.
1059 : *
1060 : * This should be called early in the error recovery paths, as later steps may
1061 : * need to issue AIO (e.g. to record a transaction abort WAL record).
1062 : */
1063 : void
1064 58474 : pgaio_error_cleanup(void)
1065 : {
1066 : /*
1067 : * It is possible that code errored out after pgaio_enter_batchmode() but
1068 : * before pgaio_exit_batchmode() was called. In that case we need to
1069 : * submit the IO now.
1070 : */
1071 58474 : if (pgaio_my_backend->in_batchmode)
1072 : {
1073 20 : pgaio_my_backend->in_batchmode = false;
1074 :
1075 20 : pgaio_submit_staged();
1076 : }
1077 :
1078 : /*
1079 : * As we aren't in batchmode, there shouldn't be any unsubmitted IOs.
1080 : */
1081 : Assert(pgaio_my_backend->num_staged_ios == 0);
1082 58474 : }
1083 :
1084 : /*
1085 : * Perform AIO related checks at (sub-)transactional boundaries.
1086 : *
1087 : * This should be called late during (sub-)transactional commit/abort, after
1088 : * all steps that might need to perform AIO, so that we can verify that the
1089 : * AIO subsystem is in a valid state at the end of a transaction.
1090 : */
1091 : void
1092 917988 : AtEOXact_Aio(bool is_commit)
1093 : {
1094 : /*
1095 : * We should never be in batch mode at transactional boundaries. In case
1096 : * an error was thrown while in batch mode, pgaio_error_cleanup() should
1097 : * have exited batchmode.
1098 : *
1099 : * In case we are in batchmode somehow, make sure to submit all staged
1100 : * IOs, other backends may need them to complete to continue.
1101 : */
1102 917988 : if (pgaio_my_backend->in_batchmode)
1103 : {
1104 8 : pgaio_error_cleanup();
1105 8 : elog(WARNING, "open AIO batch at end of (sub-)transaction");
1106 : }
1107 :
1108 : /*
1109 : * As we aren't in batchmode, there shouldn't be any unsubmitted IOs.
1110 : */
1111 : Assert(pgaio_my_backend->num_staged_ios == 0);
1112 917988 : }
1113 :
1114 : /*
1115 : * Need to submit staged but not yet submitted IOs using the fd, otherwise
1116 : * the IO would end up targeting something bogus.
1117 : */
1118 : void
1119 16513558 : pgaio_closing_fd(int fd)
1120 : {
1121 : /*
1122 : * Might be called before AIO is initialized or in a subprocess that
1123 : * doesn't use AIO.
1124 : */
1125 16513558 : if (!pgaio_my_backend)
1126 13980 : return;
1127 :
1128 : /*
1129 : * For now just submit all staged IOs - we could be more selective, but
1130 : * it's probably not worth it.
1131 : */
1132 16499578 : if (pgaio_my_backend->num_staged_ios > 0)
1133 : {
1134 4 : pgaio_debug(DEBUG2,
1135 : "submitting %d IOs before FD %d gets closed",
1136 : pgaio_my_backend->num_staged_ios, fd);
1137 4 : pgaio_submit_staged();
1138 : }
1139 :
1140 : /*
1141 : * If requested by the IO method, wait for all IOs that use the
1142 : * to-be-closed FD.
1143 : */
1144 16499578 : if (pgaio_method_ops->wait_on_fd_before_close)
1145 : {
1146 : /*
1147 : * As waiting for one IO to complete may complete multiple IOs, we
1148 : * can't just use a mutable list iterator. The maximum number of
1149 : * in-flight IOs is fairly small, so just restart the loop after
1150 : * waiting for an IO.
1151 : */
1152 0 : while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
1153 : {
1154 : dlist_iter iter;
1155 0 : PgAioHandle *ioh = NULL;
1156 :
1157 0 : dclist_foreach(iter, &pgaio_my_backend->in_flight_ios)
1158 : {
1159 0 : ioh = dclist_container(PgAioHandle, node, iter.cur);
1160 :
1161 0 : if (pgaio_io_uses_fd(ioh, fd))
1162 0 : break;
1163 : else
1164 0 : ioh = NULL;
1165 : }
1166 :
1167 0 : if (!ioh)
1168 0 : break;
1169 :
1170 0 : pgaio_debug_io(DEBUG2, ioh,
1171 : "waiting for IO before FD %d gets closed, %d in-flight IOs",
1172 : fd, dclist_count(&pgaio_my_backend->in_flight_ios));
1173 :
1174 : /* see comment in pgaio_io_wait_for_free() about raciness */
1175 0 : pgaio_io_wait(ioh, ioh->generation);
1176 : }
1177 : }
1178 : }
1179 :
1180 : /*
1181 : * Registered as before_shmem_exit() callback in pgaio_init_backend()
1182 : */
1183 : void
1184 40282 : pgaio_shutdown(int code, Datum arg)
1185 : {
1186 : Assert(pgaio_my_backend);
1187 : Assert(!pgaio_my_backend->handed_out_io);
1188 :
1189 : /* first clean up resources as we would at a transaction boundary */
1190 40282 : AtEOXact_Aio(code == 0);
1191 :
1192 : /*
1193 : * Before exiting, make sure that all IOs are finished. That has two main
1194 : * purposes:
1195 : *
1196 : * - Some kernel-level AIO mechanisms don't deal well with the issuer of
1197 : * an AIO exiting before IO completed
1198 : *
1199 : * - It'd be confusing to see partially finished IOs in stats views etc
1200 : */
1201 40300 : while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
1202 : {
1203 18 : PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios);
1204 :
1205 18 : pgaio_debug_io(DEBUG2, ioh,
1206 : "waiting for IO to complete during shutdown, %d in-flight IOs",
1207 : dclist_count(&pgaio_my_backend->in_flight_ios));
1208 :
1209 : /* see comment in pgaio_io_wait_for_free() about raciness */
1210 18 : pgaio_io_wait(ioh, ioh->generation);
1211 : }
1212 :
1213 40282 : pgaio_my_backend = NULL;
1214 40282 : }
1215 :
1216 : void
1217 2190 : assign_io_method(int newval, void *extra)
1218 : {
1219 : Assert(pgaio_method_ops_table[newval] != NULL);
1220 : Assert(newval < lengthof(io_method_options));
1221 :
1222 2190 : pgaio_method_ops = pgaio_method_ops_table[newval];
1223 2190 : }
1224 :
1225 : bool
1226 4264 : check_io_max_concurrency(int *newval, void **extra, GucSource source)
1227 : {
1228 4264 : if (*newval == -1)
1229 : {
1230 : /*
1231 : * Auto-tuning will be applied later during startup, as auto-tuning
1232 : * depends on the value of various GUCs.
1233 : */
1234 2168 : return true;
1235 : }
1236 2096 : else if (*newval == 0)
1237 : {
1238 0 : GUC_check_errdetail("Only -1 or values bigger than 0 are valid.");
1239 0 : return false;
1240 : }
1241 :
1242 2096 : return true;
1243 : }
|