Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * aio_funcs.c
4 : * AIO - SQL interface for AIO
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/aio/aio_funcs.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "fmgr.h"
19 : #include "funcapi.h"
20 : #include "nodes/execnodes.h"
21 : #include "port/atomics.h"
22 : #include "storage/aio_internal.h"
23 : #include "storage/lock.h"
24 : #include "storage/proc.h"
25 : #include "storage/procnumber.h"
26 : #include "utils/builtins.h"
27 : #include "utils/fmgrprotos.h"
28 : #include "utils/tuplestore.h"
29 :
30 :
31 : /*
32 : * Byte length of an iovec.
33 : */
34 : static size_t
35 0 : iov_byte_length(const struct iovec *iov, int cnt)
36 : {
37 0 : size_t len = 0;
38 :
39 0 : for (int i = 0; i < cnt; i++)
40 : {
41 0 : len += iov[i].iov_len;
42 : }
43 :
44 0 : return len;
45 : }
46 :
47 : Datum
48 3 : pg_get_aios(PG_FUNCTION_ARGS)
49 : {
50 3 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
51 :
52 3 : InitMaterializedSRF(fcinfo, 0);
53 :
54 : #define PG_GET_AIOS_COLS 15
55 :
56 21721 : for (uint64 i = 0; i < pgaio_ctl->io_handle_count; i++)
57 : {
58 21718 : PgAioHandle *live_ioh = &pgaio_ctl->io_handles[i];
59 21718 : int ioh_id = pgaio_io_get_id(live_ioh);
60 21718 : Datum values[PG_GET_AIOS_COLS] = {0};
61 21718 : bool nulls[PG_GET_AIOS_COLS] = {0};
62 : ProcNumber owner;
63 : PGPROC *owner_proc;
64 : int32 owner_pid;
65 : PgAioHandleState start_state;
66 : uint64 start_generation;
67 : PgAioHandle ioh_copy;
68 : struct iovec iov_copy[PG_IOV_MAX];
69 :
70 :
71 : /*
72 : * There is no lock that could prevent the state of the IO to advance
73 : * concurrently - and we don't want to introduce one, as that would
74 : * introduce atomics into a very common path. Instead we
75 : *
76 : * 1) Determine the state + generation of the IO.
77 : *
78 : * 2) Copy the IO to local memory.
79 : *
80 : * 3) Check if state or generation of the IO changed. If the state
81 : * changed, retry, if the generation changed don't display the IO.
82 : */
83 :
84 : /* 1) from above */
85 21718 : start_generation = live_ioh->generation;
86 :
87 : /*
88 : * Retry at this point, so we can accept changing states, but not
89 : * changing generations.
90 : */
91 21718 : retry:
92 21718 : pg_read_barrier();
93 21718 : start_state = live_ioh->state;
94 :
95 21718 : if (start_state == PGAIO_HS_IDLE)
96 21718 : continue;
97 :
98 : /* 2) from above */
99 0 : memcpy(&ioh_copy, live_ioh, sizeof(PgAioHandle));
100 :
101 : /*
102 : * Safe to copy even if no iovec is used - we always reserve the
103 : * required space.
104 : */
105 0 : memcpy(&iov_copy, &pgaio_ctl->iovecs[ioh_copy.iovec_off],
106 : PG_IOV_MAX * sizeof(struct iovec));
107 :
108 : /*
109 : * Copy information about owner before 3) below, if the process exited
110 : * it'd have to wait for the IO to finish first, which we would detect
111 : * in 3).
112 : */
113 0 : owner = ioh_copy.owner_procno;
114 0 : owner_proc = GetPGProcByNumber(owner);
115 0 : owner_pid = owner_proc->pid;
116 :
117 : /* 3) from above */
118 0 : pg_read_barrier();
119 :
120 : /*
121 : * The IO completed and a new one was started with the same ID. Don't
122 : * display it - it really started after this function was called.
123 : * There be a risk of a livelock if we just retried endlessly, if IOs
124 : * complete very quickly.
125 : */
126 0 : if (live_ioh->generation != start_generation)
127 0 : continue;
128 :
129 : /*
130 : * The IO's state changed while we were "rendering" it. Just start
131 : * from scratch. There's no risk of a livelock here, as an IO has a
132 : * limited sets of states it can be in, and state changes go only in a
133 : * single direction.
134 : */
135 0 : if (live_ioh->state != start_state)
136 0 : goto retry;
137 :
138 : /*
139 : * Now that we have copied the IO into local memory and checked that
140 : * it's still in the same state, we are not allowed to access "live"
141 : * memory anymore. To make it slightly easier to catch such cases, set
142 : * the "live" pointers to NULL.
143 : */
144 0 : live_ioh = NULL;
145 0 : owner_proc = NULL;
146 :
147 :
148 : /* column: owning pid */
149 0 : if (owner_pid != 0)
150 0 : values[0] = Int32GetDatum(owner_pid);
151 : else
152 0 : nulls[0] = false;
153 :
154 : /* column: IO's id */
155 0 : values[1] = Int32GetDatum(ioh_id);
156 :
157 : /* column: IO's generation */
158 0 : values[2] = Int64GetDatum(start_generation);
159 :
160 : /* column: IO's state */
161 0 : values[3] = CStringGetTextDatum(pgaio_io_get_state_name(&ioh_copy));
162 :
163 : /*
164 : * If the IO is in PGAIO_HS_HANDED_OUT state, none of the following
165 : * fields are valid yet (or are in the process of being set).
166 : * Therefore we don't want to display any other columns.
167 : */
168 0 : if (start_state == PGAIO_HS_HANDED_OUT)
169 : {
170 0 : memset(nulls + 4, 1, (lengthof(nulls) - 4) * sizeof(bool));
171 0 : goto display;
172 : }
173 :
174 : /* column: IO's operation */
175 0 : values[4] = CStringGetTextDatum(pgaio_io_get_op_name(&ioh_copy));
176 :
177 : /* columns: details about the IO's operation (offset, length) */
178 0 : switch ((PgAioOp) ioh_copy.op)
179 : {
180 0 : case PGAIO_OP_INVALID:
181 0 : nulls[5] = true;
182 0 : nulls[6] = true;
183 0 : break;
184 0 : case PGAIO_OP_READV:
185 0 : values[5] = Int64GetDatum(ioh_copy.op_data.read.offset);
186 0 : values[6] =
187 0 : Int64GetDatum(iov_byte_length(iov_copy, ioh_copy.op_data.read.iov_length));
188 0 : break;
189 0 : case PGAIO_OP_WRITEV:
190 0 : values[5] = Int64GetDatum(ioh_copy.op_data.write.offset);
191 0 : values[6] =
192 0 : Int64GetDatum(iov_byte_length(iov_copy, ioh_copy.op_data.write.iov_length));
193 0 : break;
194 : }
195 :
196 : /* column: IO's target */
197 0 : values[7] = CStringGetTextDatum(pgaio_io_get_target_name(&ioh_copy));
198 :
199 : /* column: length of IO's data array */
200 0 : values[8] = Int16GetDatum(ioh_copy.handle_data_len);
201 :
202 : /* column: raw result (i.e. some form of syscall return value) */
203 0 : if (start_state == PGAIO_HS_COMPLETED_IO
204 0 : || start_state == PGAIO_HS_COMPLETED_SHARED
205 0 : || start_state == PGAIO_HS_COMPLETED_LOCAL)
206 0 : values[9] = Int32GetDatum(ioh_copy.result);
207 : else
208 0 : nulls[9] = true;
209 :
210 : /*
211 : * column: result in the higher level representation (unknown if not
212 : * finished)
213 : */
214 0 : values[10] =
215 0 : CStringGetTextDatum(pgaio_result_status_string(ioh_copy.distilled_result.status));
216 :
217 : /* column: target description */
218 0 : values[11] = CStringGetTextDatum(pgaio_io_get_target_description(&ioh_copy));
219 :
220 : /* columns: one for each flag */
221 0 : values[12] = BoolGetDatum(ioh_copy.flags & PGAIO_HF_SYNCHRONOUS);
222 0 : values[13] = BoolGetDatum(ioh_copy.flags & PGAIO_HF_REFERENCES_LOCAL);
223 0 : values[14] = BoolGetDatum(ioh_copy.flags & PGAIO_HF_BUFFERED);
224 :
225 0 : display:
226 0 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
227 : }
228 :
229 3 : return (Datum) 0;
230 : }
|