Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * explain_dr.c
4 : * Explain DestReceiver to measure serialization overhead
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994-5, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/explain_dr.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres.h"
15 :
16 : #include "commands/explain.h"
17 : #include "commands/explain_dr.h"
18 : #include "commands/explain_state.h"
19 : #include "libpq/pqformat.h"
20 : #include "libpq/protocol.h"
21 : #include "utils/lsyscache.h"
22 : #include "varatt.h"
23 :
24 : /*
25 : * DestReceiver functions for SERIALIZE option
26 : *
27 : * A DestReceiver for query tuples, that serializes passed rows into RowData
28 : * messages while measuring the resources expended and total serialized size,
29 : * while never sending the data to the client. This allows measuring the
30 : * overhead of deTOASTing and datatype out/sendfuncs, which are not otherwise
31 : * exercisable without actually hitting the network.
32 : */
33 : typedef struct SerializeDestReceiver
34 : {
35 : DestReceiver pub;
36 : ExplainState *es; /* this EXPLAIN statement's ExplainState */
37 : int8 format; /* text or binary, like pq wire protocol */
38 : TupleDesc attrinfo; /* the output tuple desc */
39 : int nattrs; /* current number of columns */
40 : FmgrInfo *finfos; /* precomputed call info for output fns */
41 : MemoryContext tmpcontext; /* per-row temporary memory context */
42 : StringInfoData buf; /* buffer to hold the constructed message */
43 : SerializeMetrics metrics; /* collected metrics */
44 : } SerializeDestReceiver;
45 :
46 : /*
47 : * Get the function lookup info that we'll need for output.
48 : *
49 : * This is a subset of what printtup_prepare_info() does. We don't need to
50 : * cope with format choices varying across columns, so it's slightly simpler.
51 : */
52 : static void
53 24 : serialize_prepare_info(SerializeDestReceiver *receiver,
54 : TupleDesc typeinfo, int nattrs)
55 : {
56 : /* get rid of any old data */
57 24 : if (receiver->finfos)
58 0 : pfree(receiver->finfos);
59 24 : receiver->finfos = NULL;
60 :
61 24 : receiver->attrinfo = typeinfo;
62 24 : receiver->nattrs = nattrs;
63 24 : if (nattrs <= 0)
64 0 : return;
65 :
66 24 : receiver->finfos = (FmgrInfo *) palloc0(nattrs * sizeof(FmgrInfo));
67 :
68 72 : for (int i = 0; i < nattrs; i++)
69 : {
70 48 : FmgrInfo *finfo = receiver->finfos + i;
71 48 : Form_pg_attribute attr = TupleDescAttr(typeinfo, i);
72 : Oid typoutput;
73 : Oid typsend;
74 : bool typisvarlena;
75 :
76 48 : if (receiver->format == 0)
77 : {
78 : /* wire protocol format text */
79 36 : getTypeOutputInfo(attr->atttypid,
80 : &typoutput,
81 : &typisvarlena);
82 36 : fmgr_info(typoutput, finfo);
83 : }
84 12 : else if (receiver->format == 1)
85 : {
86 : /* wire protocol format binary */
87 12 : getTypeBinaryOutputInfo(attr->atttypid,
88 : &typsend,
89 : &typisvarlena);
90 12 : fmgr_info(typsend, finfo);
91 : }
92 : else
93 0 : ereport(ERROR,
94 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
95 : errmsg("unsupported format code: %d", receiver->format)));
96 : }
97 : }
98 :
99 : /*
100 : * serializeAnalyzeReceive - collect tuples for EXPLAIN (SERIALIZE)
101 : *
102 : * This should match printtup() in printtup.c as closely as possible,
103 : * except for the addition of measurement code.
104 : */
105 : static bool
106 120 : serializeAnalyzeReceive(TupleTableSlot *slot, DestReceiver *self)
107 : {
108 120 : TupleDesc typeinfo = slot->tts_tupleDescriptor;
109 120 : SerializeDestReceiver *myState = (SerializeDestReceiver *) self;
110 : MemoryContext oldcontext;
111 120 : StringInfo buf = &myState->buf;
112 120 : int natts = typeinfo->natts;
113 : instr_time start,
114 : end;
115 : BufferUsage instr_start;
116 :
117 : /* only measure time, buffers if requested */
118 120 : if (myState->es->timing)
119 90 : INSTR_TIME_SET_CURRENT(start);
120 120 : if (myState->es->buffers)
121 90 : instr_start = pgBufferUsage;
122 :
123 : /* Set or update my derived attribute info, if needed */
124 120 : if (myState->attrinfo != typeinfo || myState->nattrs != natts)
125 24 : serialize_prepare_info(myState, typeinfo, natts);
126 :
127 : /* Make sure the tuple is fully deconstructed */
128 120 : slot_getallattrs(slot);
129 :
130 : /* Switch into per-row context so we can recover memory below */
131 120 : oldcontext = MemoryContextSwitchTo(myState->tmpcontext);
132 :
133 : /*
134 : * Prepare a DataRow message (note buffer is in per-query context)
135 : *
136 : * Note that we fill a StringInfo buffer the same as printtup() does, so
137 : * as to capture the costs of manipulating the strings accurately.
138 : */
139 120 : pq_beginmessage_reuse(buf, PqMsg_DataRow);
140 :
141 120 : pq_sendint16(buf, natts);
142 :
143 : /*
144 : * send the attributes of this tuple
145 : */
146 360 : for (int i = 0; i < natts; i++)
147 : {
148 240 : FmgrInfo *finfo = myState->finfos + i;
149 240 : Datum attr = slot->tts_values[i];
150 :
151 240 : if (slot->tts_isnull[i])
152 : {
153 0 : pq_sendint32(buf, -1);
154 0 : continue;
155 : }
156 :
157 240 : if (myState->format == 0)
158 : {
159 : /* Text output */
160 : char *outputstr;
161 :
162 180 : outputstr = OutputFunctionCall(finfo, attr);
163 180 : pq_sendcountedtext(buf, outputstr, strlen(outputstr));
164 : }
165 : else
166 : {
167 : /* Binary output */
168 : bytea *outputbytes;
169 :
170 60 : outputbytes = SendFunctionCall(finfo, attr);
171 60 : pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
172 60 : pq_sendbytes(buf, VARDATA(outputbytes),
173 60 : VARSIZE(outputbytes) - VARHDRSZ);
174 : }
175 : }
176 :
177 : /*
178 : * We mustn't call pq_endmessage_reuse(), since that would actually send
179 : * the data to the client. Just count the data, instead. We can leave
180 : * the buffer alone; it'll be reset on the next iteration (as would also
181 : * happen in printtup()).
182 : */
183 120 : myState->metrics.bytesSent += buf->len;
184 :
185 : /* Return to caller's context, and flush row's temporary memory */
186 120 : MemoryContextSwitchTo(oldcontext);
187 120 : MemoryContextReset(myState->tmpcontext);
188 :
189 : /* Update timing data */
190 120 : if (myState->es->timing)
191 : {
192 90 : INSTR_TIME_SET_CURRENT(end);
193 90 : INSTR_TIME_ACCUM_DIFF(myState->metrics.timeSpent, end, start);
194 : }
195 :
196 : /* Update buffer metrics */
197 120 : if (myState->es->buffers)
198 90 : BufferUsageAccumDiff(&myState->metrics.bufferUsage,
199 : &pgBufferUsage,
200 : &instr_start);
201 :
202 120 : return true;
203 : }
204 :
205 : /*
206 : * serializeAnalyzeStartup - start up the serializeAnalyze receiver
207 : */
208 : static void
209 24 : serializeAnalyzeStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
210 : {
211 24 : SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;
212 :
213 : Assert(receiver->es != NULL);
214 :
215 24 : switch (receiver->es->serialize)
216 : {
217 0 : case EXPLAIN_SERIALIZE_NONE:
218 : Assert(false);
219 0 : break;
220 18 : case EXPLAIN_SERIALIZE_TEXT:
221 18 : receiver->format = 0; /* wire protocol format text */
222 18 : break;
223 6 : case EXPLAIN_SERIALIZE_BINARY:
224 6 : receiver->format = 1; /* wire protocol format binary */
225 6 : break;
226 : }
227 :
228 : /* Create per-row temporary memory context */
229 24 : receiver->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
230 : "SerializeTupleReceive",
231 : ALLOCSET_DEFAULT_SIZES);
232 :
233 : /* The output buffer is re-used across rows, as in printtup.c */
234 24 : initStringInfo(&receiver->buf);
235 :
236 : /* Initialize results counters */
237 24 : memset(&receiver->metrics, 0, sizeof(SerializeMetrics));
238 24 : INSTR_TIME_SET_ZERO(receiver->metrics.timeSpent);
239 24 : }
240 :
241 : /*
242 : * serializeAnalyzeShutdown - shut down the serializeAnalyze receiver
243 : */
244 : static void
245 24 : serializeAnalyzeShutdown(DestReceiver *self)
246 : {
247 24 : SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;
248 :
249 24 : if (receiver->finfos)
250 24 : pfree(receiver->finfos);
251 24 : receiver->finfos = NULL;
252 :
253 24 : if (receiver->buf.data)
254 24 : pfree(receiver->buf.data);
255 24 : receiver->buf.data = NULL;
256 :
257 24 : if (receiver->tmpcontext)
258 24 : MemoryContextDelete(receiver->tmpcontext);
259 24 : receiver->tmpcontext = NULL;
260 24 : }
261 :
262 : /*
263 : * serializeAnalyzeDestroy - destroy the serializeAnalyze receiver
264 : */
265 : static void
266 24 : serializeAnalyzeDestroy(DestReceiver *self)
267 : {
268 24 : pfree(self);
269 24 : }
270 :
271 : /*
272 : * Build a DestReceiver for EXPLAIN (SERIALIZE) instrumentation.
273 : */
274 : DestReceiver *
275 24 : CreateExplainSerializeDestReceiver(ExplainState *es)
276 : {
277 : SerializeDestReceiver *self;
278 :
279 24 : self = (SerializeDestReceiver *) palloc0(sizeof(SerializeDestReceiver));
280 :
281 24 : self->pub.receiveSlot = serializeAnalyzeReceive;
282 24 : self->pub.rStartup = serializeAnalyzeStartup;
283 24 : self->pub.rShutdown = serializeAnalyzeShutdown;
284 24 : self->pub.rDestroy = serializeAnalyzeDestroy;
285 24 : self->pub.mydest = DestExplainSerialize;
286 :
287 24 : self->es = es;
288 :
289 24 : return (DestReceiver *) self;
290 : }
291 :
292 : /*
293 : * GetSerializationMetrics - collect metrics
294 : *
295 : * We have to be careful here since the receiver could be an IntoRel
296 : * receiver if the subject statement is CREATE TABLE AS. In that
297 : * case, return all-zeroes stats.
298 : */
299 : SerializeMetrics
300 30 : GetSerializationMetrics(DestReceiver *dest)
301 : {
302 : SerializeMetrics empty;
303 :
304 30 : if (dest->mydest == DestExplainSerialize)
305 24 : return ((SerializeDestReceiver *) dest)->metrics;
306 :
307 6 : memset(&empty, 0, sizeof(SerializeMetrics));
308 6 : INSTR_TIME_SET_ZERO(empty.timeSpent);
309 :
310 6 : return empty;
311 : }
|