Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * aio_init.c
4 : * AIO - Subsystem Initialization
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/storage/aio/aio_init.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "miscadmin.h"
18 : #include "storage/aio.h"
19 : #include "storage/aio_internal.h"
20 : #include "storage/aio_subsys.h"
21 : #include "storage/bufmgr.h"
22 : #include "storage/io_worker.h"
23 : #include "storage/ipc.h"
24 : #include "storage/proc.h"
25 : #include "storage/shmem.h"
26 : #include "utils/guc.h"
27 :
28 :
29 :
30 : static Size
31 8420 : AioCtlShmemSize(void)
32 : {
33 : /* pgaio_ctl itself */
34 8420 : return sizeof(PgAioCtl);
35 : }
36 :
37 : static uint32
38 356718 : AioProcs(void)
39 : {
40 : /*
41 : * While AIO workers don't need their own AIO context, we can't currently
42 : * guarantee nothing gets assigned to the a ProcNumber for an IO worker if
43 : * we just subtracted MAX_IO_WORKERS.
44 : */
45 356718 : return MaxBackends + NUM_AUXILIARY_PROCS;
46 : }
47 :
48 : static Size
49 6240 : AioBackendShmemSize(void)
50 : {
51 6240 : return mul_size(AioProcs(), sizeof(PgAioBackend));
52 : }
53 :
54 : static Size
55 6240 : AioHandleShmemSize(void)
56 : {
57 : Size sz;
58 :
59 : /* verify AioChooseMaxConcurrency() did its thing */
60 : Assert(io_max_concurrency > 0);
61 :
62 : /* io handles */
63 6240 : sz = mul_size(AioProcs(),
64 : mul_size(io_max_concurrency, sizeof(PgAioHandle)));
65 :
66 6240 : return sz;
67 : }
68 :
69 : static Size
70 6240 : AioHandleIOVShmemSize(void)
71 : {
72 : /* each IO handle can have up to io_max_combine_limit iovec objects */
73 6240 : return mul_size(sizeof(struct iovec),
74 6240 : mul_size(mul_size(io_max_combine_limit, AioProcs()),
75 : io_max_concurrency));
76 : }
77 :
78 : static Size
79 6240 : AioHandleDataShmemSize(void)
80 : {
81 : /* each buffer referenced by an iovec can have associated data */
82 6240 : return mul_size(sizeof(uint64),
83 6240 : mul_size(mul_size(io_max_combine_limit, AioProcs()),
84 : io_max_concurrency));
85 : }
86 :
87 : /*
88 : * Choose a suitable value for io_max_concurrency.
89 : *
90 : * It's unlikely that we could have more IOs in flight than buffers that we
91 : * would be allowed to pin.
92 : *
93 : * On the upper end, apply a cap too - just because shared_buffers is large,
94 : * it doesn't make sense have millions of buffers undergo IO concurrently.
95 : */
96 : static int
97 2174 : AioChooseMaxConcurrency(void)
98 : {
99 : uint32 max_backends;
100 : int max_proportional_pins;
101 :
102 : /* Similar logic to LimitAdditionalPins() */
103 2174 : max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
104 2174 : max_proportional_pins = NBuffers / max_backends;
105 :
106 2174 : max_proportional_pins = Max(max_proportional_pins, 1);
107 :
108 : /* apply upper limit */
109 2174 : return Min(max_proportional_pins, 64);
110 : }
111 :
112 : Size
113 4060 : AioShmemSize(void)
114 : {
115 4060 : Size sz = 0;
116 :
117 : /*
118 : * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
119 : * However, if the DBA explicitly set io_max_concurrency = -1 in the
120 : * config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that and
121 : * we must force the matter with PGC_S_OVERRIDE.
122 : */
123 4060 : if (io_max_concurrency == -1)
124 : {
125 : char buf[32];
126 :
127 2174 : snprintf(buf, sizeof(buf), "%d", AioChooseMaxConcurrency());
128 2174 : SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
129 : PGC_S_DYNAMIC_DEFAULT);
130 2174 : if (io_max_concurrency == -1) /* failed to apply it? */
131 0 : SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
132 : PGC_S_OVERRIDE);
133 : }
134 :
135 4060 : sz = add_size(sz, AioCtlShmemSize());
136 4060 : sz = add_size(sz, AioBackendShmemSize());
137 4060 : sz = add_size(sz, AioHandleShmemSize());
138 4060 : sz = add_size(sz, AioHandleIOVShmemSize());
139 4060 : sz = add_size(sz, AioHandleDataShmemSize());
140 :
141 : /* Reserve space for method specific resources. */
142 4060 : if (pgaio_method_ops->shmem_size)
143 4046 : sz = add_size(sz, pgaio_method_ops->shmem_size());
144 :
145 4060 : return sz;
146 : }
147 :
148 : void
149 2180 : AioShmemInit(void)
150 : {
151 : bool found;
152 2180 : uint32 io_handle_off = 0;
153 2180 : uint32 iovec_off = 0;
154 2180 : uint32 per_backend_iovecs = io_max_concurrency * io_max_combine_limit;
155 :
156 2180 : pgaio_ctl = (PgAioCtl *)
157 2180 : ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found);
158 :
159 2180 : if (found)
160 0 : goto out;
161 :
162 2180 : memset(pgaio_ctl, 0, AioCtlShmemSize());
163 :
164 2180 : pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency;
165 2180 : pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs;
166 :
167 4360 : pgaio_ctl->backend_state = (PgAioBackend *)
168 2180 : ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found);
169 :
170 4360 : pgaio_ctl->io_handles = (PgAioHandle *)
171 2180 : ShmemInitStruct("AioHandle", AioHandleShmemSize(), &found);
172 :
173 4360 : pgaio_ctl->iovecs = (struct iovec *)
174 2180 : ShmemInitStruct("AioHandleIOV", AioHandleIOVShmemSize(), &found);
175 4360 : pgaio_ctl->handle_data = (uint64 *)
176 2180 : ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found);
177 :
178 286356 : for (int procno = 0; procno < AioProcs(); procno++)
179 : {
180 284176 : PgAioBackend *bs = &pgaio_ctl->backend_state[procno];
181 :
182 284176 : bs->io_handle_off = io_handle_off;
183 284176 : io_handle_off += io_max_concurrency;
184 :
185 284176 : dclist_init(&bs->idle_ios);
186 284176 : memset(bs->staged_ios, 0, sizeof(PgAioHandle *) * PGAIO_SUBMIT_BATCH_SIZE);
187 284176 : dclist_init(&bs->in_flight_ios);
188 :
189 : /* initialize per-backend IOs */
190 13617540 : for (int i = 0; i < io_max_concurrency; i++)
191 : {
192 13333364 : PgAioHandle *ioh = &pgaio_ctl->io_handles[bs->io_handle_off + i];
193 :
194 13333364 : ioh->generation = 1;
195 13333364 : ioh->owner_procno = procno;
196 13333364 : ioh->iovec_off = iovec_off;
197 13333364 : ioh->handle_data_len = 0;
198 13333364 : ioh->report_return = NULL;
199 13333364 : ioh->resowner = NULL;
200 13333364 : ioh->num_callbacks = 0;
201 13333364 : ioh->distilled_result.status = PGAIO_RS_UNKNOWN;
202 13333364 : ioh->flags = 0;
203 :
204 13333364 : ConditionVariableInit(&ioh->cv);
205 :
206 13333364 : dclist_push_tail(&bs->idle_ios, &ioh->node);
207 13333364 : iovec_off += io_max_combine_limit;
208 : }
209 : }
210 :
211 2180 : out:
212 : /* Initialize IO method specific resources. */
213 2180 : if (pgaio_method_ops->shmem_init)
214 2170 : pgaio_method_ops->shmem_init(!found);
215 2180 : }
216 :
217 : void
218 44344 : pgaio_init_backend(void)
219 : {
220 : /* shouldn't be initialized twice */
221 : Assert(!pgaio_my_backend);
222 :
223 44344 : if (MyBackendType == B_IO_WORKER)
224 3302 : return;
225 :
226 41042 : if (MyProc == NULL || MyProcNumber >= AioProcs())
227 0 : elog(ERROR, "aio requires a normal PGPROC");
228 :
229 41042 : pgaio_my_backend = &pgaio_ctl->backend_state[MyProcNumber];
230 :
231 41042 : if (pgaio_method_ops->init_backend)
232 0 : pgaio_method_ops->init_backend();
233 :
234 41042 : before_shmem_exit(pgaio_shutdown, 0);
235 : }
|