Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * aio_init.c
4 : * AIO - Subsystem Initialization
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/storage/aio/aio_init.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "miscadmin.h"
18 : #include "storage/aio.h"
19 : #include "storage/aio_internal.h"
20 : #include "storage/aio_subsys.h"
21 : #include "storage/bufmgr.h"
22 : #include "storage/io_worker.h"
23 : #include "storage/ipc.h"
24 : #include "storage/proc.h"
25 : #include "storage/shmem.h"
26 : #include "utils/guc.h"
27 :
28 :
29 :
30 : static Size
31 7858 : AioCtlShmemSize(void)
32 : {
33 : Size sz;
34 :
35 : /* pgaio_ctl itself */
36 7858 : sz = offsetof(PgAioCtl, io_handles);
37 :
38 7858 : return sz;
39 : }
40 :
41 : static uint32
42 335160 : AioProcs(void)
43 : {
44 : /*
45 : * While AIO workers don't need their own AIO context, we can't currently
46 : * guarantee nothing gets assigned to the a ProcNumber for an IO worker if
47 : * we just subtracted MAX_IO_WORKERS.
48 : */
49 335160 : return MaxBackends + NUM_AUXILIARY_PROCS;
50 : }
51 :
52 : static Size
53 5826 : AioBackendShmemSize(void)
54 : {
55 5826 : return mul_size(AioProcs(), sizeof(PgAioBackend));
56 : }
57 :
58 : static Size
59 5826 : AioHandleShmemSize(void)
60 : {
61 : Size sz;
62 :
63 : /* verify AioChooseMaxConcurrency() did its thing */
64 : Assert(io_max_concurrency > 0);
65 :
66 : /* io handles */
67 5826 : sz = mul_size(AioProcs(),
68 : mul_size(io_max_concurrency, sizeof(PgAioHandle)));
69 :
70 5826 : return sz;
71 : }
72 :
73 : static Size
74 5826 : AioHandleIOVShmemSize(void)
75 : {
76 : /* each IO handle can have up to io_max_combine_limit iovec objects */
77 5826 : return mul_size(sizeof(struct iovec),
78 5826 : mul_size(mul_size(io_max_combine_limit, AioProcs()),
79 : io_max_concurrency));
80 : }
81 :
82 : static Size
83 5826 : AioHandleDataShmemSize(void)
84 : {
85 : /* each buffer referenced by an iovec can have associated data */
86 5826 : return mul_size(sizeof(uint64),
87 5826 : mul_size(mul_size(io_max_combine_limit, AioProcs()),
88 : io_max_concurrency));
89 : }
90 :
91 : /*
92 : * Choose a suitable value for io_max_concurrency.
93 : *
94 : * It's unlikely that we could have more IOs in flight than buffers that we
95 : * would be allowed to pin.
96 : *
97 : * On the upper end, apply a cap too - just because shared_buffers is large,
98 : * it doesn't make sense have millions of buffers undergo IO concurrently.
99 : */
100 : static int
101 2028 : AioChooseMaxConcurrency(void)
102 : {
103 : uint32 max_backends;
104 : int max_proportional_pins;
105 :
106 : /* Similar logic to LimitAdditionalPins() */
107 2028 : max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
108 2028 : max_proportional_pins = NBuffers / max_backends;
109 :
110 2028 : max_proportional_pins = Max(max_proportional_pins, 1);
111 :
112 : /* apply upper limit */
113 2028 : return Min(max_proportional_pins, 64);
114 : }
115 :
116 : Size
117 3794 : AioShmemSize(void)
118 : {
119 3794 : Size sz = 0;
120 :
121 : /*
122 : * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
123 : * However, if the DBA explicitly set io_max_concurrency = -1 in the
124 : * config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that and
125 : * we must force the matter with PGC_S_OVERRIDE.
126 : */
127 3794 : if (io_max_concurrency == -1)
128 : {
129 : char buf[32];
130 :
131 2028 : snprintf(buf, sizeof(buf), "%d", AioChooseMaxConcurrency());
132 2028 : SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
133 : PGC_S_DYNAMIC_DEFAULT);
134 2028 : if (io_max_concurrency == -1) /* failed to apply it? */
135 0 : SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
136 : PGC_S_OVERRIDE);
137 : }
138 :
139 3794 : sz = add_size(sz, AioCtlShmemSize());
140 3794 : sz = add_size(sz, AioBackendShmemSize());
141 3794 : sz = add_size(sz, AioHandleShmemSize());
142 3794 : sz = add_size(sz, AioHandleIOVShmemSize());
143 3794 : sz = add_size(sz, AioHandleDataShmemSize());
144 :
145 : /* Reserve space for method specific resources. */
146 3794 : if (pgaio_method_ops->shmem_size)
147 3794 : sz = add_size(sz, pgaio_method_ops->shmem_size());
148 :
149 3794 : return sz;
150 : }
151 :
152 : void
153 2032 : AioShmemInit(void)
154 : {
155 : bool found;
156 2032 : uint32 io_handle_off = 0;
157 2032 : uint32 iovec_off = 0;
158 2032 : uint32 per_backend_iovecs = io_max_concurrency * io_max_combine_limit;
159 :
160 2032 : pgaio_ctl = (PgAioCtl *)
161 2032 : ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found);
162 :
163 2032 : if (found)
164 0 : goto out;
165 :
166 2032 : memset(pgaio_ctl, 0, AioCtlShmemSize());
167 :
168 2032 : pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency;
169 2032 : pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs;
170 :
171 4064 : pgaio_ctl->backend_state = (PgAioBackend *)
172 2032 : ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found);
173 :
174 4064 : pgaio_ctl->io_handles = (PgAioHandle *)
175 2032 : ShmemInitStruct("AioHandle", AioHandleShmemSize(), &found);
176 :
177 4064 : pgaio_ctl->iovecs = (struct iovec *)
178 2032 : ShmemInitStruct("AioHandleIOV", AioHandleIOVShmemSize(), &found);
179 4064 : pgaio_ctl->handle_data = (uint64 *)
180 2032 : ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found);
181 :
182 268484 : for (int procno = 0; procno < AioProcs(); procno++)
183 : {
184 266452 : PgAioBackend *bs = &pgaio_ctl->backend_state[procno];
185 :
186 266452 : bs->io_handle_off = io_handle_off;
187 266452 : io_handle_off += io_max_concurrency;
188 :
189 266452 : dclist_init(&bs->idle_ios);
190 266452 : memset(bs->staged_ios, 0, sizeof(PgAioHandle *) * PGAIO_SUBMIT_BATCH_SIZE);
191 266452 : dclist_init(&bs->in_flight_ios);
192 :
193 : /* initialize per-backend IOs */
194 12927768 : for (int i = 0; i < io_max_concurrency; i++)
195 : {
196 12661316 : PgAioHandle *ioh = &pgaio_ctl->io_handles[bs->io_handle_off + i];
197 :
198 12661316 : ioh->generation = 1;
199 12661316 : ioh->owner_procno = procno;
200 12661316 : ioh->iovec_off = iovec_off;
201 12661316 : ioh->handle_data_len = 0;
202 12661316 : ioh->report_return = NULL;
203 12661316 : ioh->resowner = NULL;
204 12661316 : ioh->num_callbacks = 0;
205 12661316 : ioh->distilled_result.status = PGAIO_RS_UNKNOWN;
206 12661316 : ioh->flags = 0;
207 :
208 12661316 : ConditionVariableInit(&ioh->cv);
209 :
210 12661316 : dclist_push_tail(&bs->idle_ios, &ioh->node);
211 12661316 : iovec_off += io_max_combine_limit;
212 : }
213 : }
214 :
215 2032 : out:
216 : /* Initialize IO method specific resources. */
217 2032 : if (pgaio_method_ops->shmem_init)
218 2032 : pgaio_method_ops->shmem_init(!found);
219 2032 : }
220 :
221 : void
222 42272 : pgaio_init_backend(void)
223 : {
224 : /* shouldn't be initialized twice */
225 : Assert(!pgaio_my_backend);
226 :
227 42272 : if (MyBackendType == B_IO_WORKER)
228 2964 : return;
229 :
230 39308 : if (MyProc == NULL || MyProcNumber >= AioProcs())
231 0 : elog(ERROR, "aio requires a normal PGPROC");
232 :
233 39308 : pgaio_my_backend = &pgaio_ctl->backend_state[MyProcNumber];
234 :
235 39308 : if (pgaio_method_ops->init_backend)
236 0 : pgaio_method_ops->init_backend();
237 :
238 39308 : before_shmem_exit(pgaio_shutdown, 0);
239 : }
|