Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * explain_dr.c
4 : * Explain DestReceiver to measure serialization overhead
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994-5, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/explain.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres.h"
15 :
16 : #include "commands/explain.h"
17 : #include "commands/explain_dr.h"
18 : #include "commands/explain_state.h"
19 : #include "libpq/pqformat.h"
20 : #include "libpq/protocol.h"
21 : #include "utils/lsyscache.h"
22 :
23 : /*
24 : * DestReceiver functions for SERIALIZE option
25 : *
26 : * A DestReceiver for query tuples, that serializes passed rows into RowData
27 : * messages while measuring the resources expended and total serialized size,
28 : * while never sending the data to the client. This allows measuring the
29 : * overhead of deTOASTing and datatype out/sendfuncs, which are not otherwise
30 : * exercisable without actually hitting the network.
31 : */
32 : typedef struct SerializeDestReceiver
33 : {
34 : DestReceiver pub;
35 : ExplainState *es; /* this EXPLAIN statement's ExplainState */
36 : int8 format; /* text or binary, like pq wire protocol */
37 : TupleDesc attrinfo; /* the output tuple desc */
38 : int nattrs; /* current number of columns */
39 : FmgrInfo *finfos; /* precomputed call info for output fns */
40 : MemoryContext tmpcontext; /* per-row temporary memory context */
41 : StringInfoData buf; /* buffer to hold the constructed message */
42 : SerializeMetrics metrics; /* collected metrics */
43 : } SerializeDestReceiver;
44 :
45 : /*
46 : * Get the function lookup info that we'll need for output.
47 : *
48 : * This is a subset of what printtup_prepare_info() does. We don't need to
49 : * cope with format choices varying across columns, so it's slightly simpler.
50 : */
51 : static void
52 24 : serialize_prepare_info(SerializeDestReceiver *receiver,
53 : TupleDesc typeinfo, int nattrs)
54 : {
55 : /* get rid of any old data */
56 24 : if (receiver->finfos)
57 0 : pfree(receiver->finfos);
58 24 : receiver->finfos = NULL;
59 :
60 24 : receiver->attrinfo = typeinfo;
61 24 : receiver->nattrs = nattrs;
62 24 : if (nattrs <= 0)
63 0 : return;
64 :
65 24 : receiver->finfos = (FmgrInfo *) palloc0(nattrs * sizeof(FmgrInfo));
66 :
67 72 : for (int i = 0; i < nattrs; i++)
68 : {
69 48 : FmgrInfo *finfo = receiver->finfos + i;
70 48 : Form_pg_attribute attr = TupleDescAttr(typeinfo, i);
71 : Oid typoutput;
72 : Oid typsend;
73 : bool typisvarlena;
74 :
75 48 : if (receiver->format == 0)
76 : {
77 : /* wire protocol format text */
78 36 : getTypeOutputInfo(attr->atttypid,
79 : &typoutput,
80 : &typisvarlena);
81 36 : fmgr_info(typoutput, finfo);
82 : }
83 12 : else if (receiver->format == 1)
84 : {
85 : /* wire protocol format binary */
86 12 : getTypeBinaryOutputInfo(attr->atttypid,
87 : &typsend,
88 : &typisvarlena);
89 12 : fmgr_info(typsend, finfo);
90 : }
91 : else
92 0 : ereport(ERROR,
93 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
94 : errmsg("unsupported format code: %d", receiver->format)));
95 : }
96 : }
97 :
98 : /*
99 : * serializeAnalyzeReceive - collect tuples for EXPLAIN (SERIALIZE)
100 : *
101 : * This should match printtup() in printtup.c as closely as possible,
102 : * except for the addition of measurement code.
103 : */
104 : static bool
105 120 : serializeAnalyzeReceive(TupleTableSlot *slot, DestReceiver *self)
106 : {
107 120 : TupleDesc typeinfo = slot->tts_tupleDescriptor;
108 120 : SerializeDestReceiver *myState = (SerializeDestReceiver *) self;
109 : MemoryContext oldcontext;
110 120 : StringInfo buf = &myState->buf;
111 120 : int natts = typeinfo->natts;
112 : instr_time start,
113 : end;
114 : BufferUsage instr_start;
115 :
116 : /* only measure time, buffers if requested */
117 120 : if (myState->es->timing)
118 90 : INSTR_TIME_SET_CURRENT(start);
119 120 : if (myState->es->buffers)
120 90 : instr_start = pgBufferUsage;
121 :
122 : /* Set or update my derived attribute info, if needed */
123 120 : if (myState->attrinfo != typeinfo || myState->nattrs != natts)
124 24 : serialize_prepare_info(myState, typeinfo, natts);
125 :
126 : /* Make sure the tuple is fully deconstructed */
127 120 : slot_getallattrs(slot);
128 :
129 : /* Switch into per-row context so we can recover memory below */
130 120 : oldcontext = MemoryContextSwitchTo(myState->tmpcontext);
131 :
132 : /*
133 : * Prepare a DataRow message (note buffer is in per-query context)
134 : *
135 : * Note that we fill a StringInfo buffer the same as printtup() does, so
136 : * as to capture the costs of manipulating the strings accurately.
137 : */
138 120 : pq_beginmessage_reuse(buf, PqMsg_DataRow);
139 :
140 120 : pq_sendint16(buf, natts);
141 :
142 : /*
143 : * send the attributes of this tuple
144 : */
145 360 : for (int i = 0; i < natts; i++)
146 : {
147 240 : FmgrInfo *finfo = myState->finfos + i;
148 240 : Datum attr = slot->tts_values[i];
149 :
150 240 : if (slot->tts_isnull[i])
151 : {
152 0 : pq_sendint32(buf, -1);
153 0 : continue;
154 : }
155 :
156 240 : if (myState->format == 0)
157 : {
158 : /* Text output */
159 : char *outputstr;
160 :
161 180 : outputstr = OutputFunctionCall(finfo, attr);
162 180 : pq_sendcountedtext(buf, outputstr, strlen(outputstr));
163 : }
164 : else
165 : {
166 : /* Binary output */
167 : bytea *outputbytes;
168 :
169 60 : outputbytes = SendFunctionCall(finfo, attr);
170 60 : pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
171 60 : pq_sendbytes(buf, VARDATA(outputbytes),
172 60 : VARSIZE(outputbytes) - VARHDRSZ);
173 : }
174 : }
175 :
176 : /*
177 : * We mustn't call pq_endmessage_reuse(), since that would actually send
178 : * the data to the client. Just count the data, instead. We can leave
179 : * the buffer alone; it'll be reset on the next iteration (as would also
180 : * happen in printtup()).
181 : */
182 120 : myState->metrics.bytesSent += buf->len;
183 :
184 : /* Return to caller's context, and flush row's temporary memory */
185 120 : MemoryContextSwitchTo(oldcontext);
186 120 : MemoryContextReset(myState->tmpcontext);
187 :
188 : /* Update timing data */
189 120 : if (myState->es->timing)
190 : {
191 90 : INSTR_TIME_SET_CURRENT(end);
192 90 : INSTR_TIME_ACCUM_DIFF(myState->metrics.timeSpent, end, start);
193 : }
194 :
195 : /* Update buffer metrics */
196 120 : if (myState->es->buffers)
197 90 : BufferUsageAccumDiff(&myState->metrics.bufferUsage,
198 : &pgBufferUsage,
199 : &instr_start);
200 :
201 120 : return true;
202 : }
203 :
204 : /*
205 : * serializeAnalyzeStartup - start up the serializeAnalyze receiver
206 : */
207 : static void
208 24 : serializeAnalyzeStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
209 : {
210 24 : SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;
211 :
212 : Assert(receiver->es != NULL);
213 :
214 24 : switch (receiver->es->serialize)
215 : {
216 0 : case EXPLAIN_SERIALIZE_NONE:
217 : Assert(false);
218 0 : break;
219 18 : case EXPLAIN_SERIALIZE_TEXT:
220 18 : receiver->format = 0; /* wire protocol format text */
221 18 : break;
222 6 : case EXPLAIN_SERIALIZE_BINARY:
223 6 : receiver->format = 1; /* wire protocol format binary */
224 6 : break;
225 : }
226 :
227 : /* Create per-row temporary memory context */
228 24 : receiver->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
229 : "SerializeTupleReceive",
230 : ALLOCSET_DEFAULT_SIZES);
231 :
232 : /* The output buffer is re-used across rows, as in printtup.c */
233 24 : initStringInfo(&receiver->buf);
234 :
235 : /* Initialize results counters */
236 24 : memset(&receiver->metrics, 0, sizeof(SerializeMetrics));
237 24 : INSTR_TIME_SET_ZERO(receiver->metrics.timeSpent);
238 24 : }
239 :
240 : /*
241 : * serializeAnalyzeShutdown - shut down the serializeAnalyze receiver
242 : */
243 : static void
244 24 : serializeAnalyzeShutdown(DestReceiver *self)
245 : {
246 24 : SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;
247 :
248 24 : if (receiver->finfos)
249 24 : pfree(receiver->finfos);
250 24 : receiver->finfos = NULL;
251 :
252 24 : if (receiver->buf.data)
253 24 : pfree(receiver->buf.data);
254 24 : receiver->buf.data = NULL;
255 :
256 24 : if (receiver->tmpcontext)
257 24 : MemoryContextDelete(receiver->tmpcontext);
258 24 : receiver->tmpcontext = NULL;
259 24 : }
260 :
261 : /*
262 : * serializeAnalyzeDestroy - destroy the serializeAnalyze receiver
263 : */
264 : static void
265 24 : serializeAnalyzeDestroy(DestReceiver *self)
266 : {
267 24 : pfree(self);
268 24 : }
269 :
270 : /*
271 : * Build a DestReceiver for EXPLAIN (SERIALIZE) instrumentation.
272 : */
273 : DestReceiver *
274 24 : CreateExplainSerializeDestReceiver(ExplainState *es)
275 : {
276 : SerializeDestReceiver *self;
277 :
278 24 : self = (SerializeDestReceiver *) palloc0(sizeof(SerializeDestReceiver));
279 :
280 24 : self->pub.receiveSlot = serializeAnalyzeReceive;
281 24 : self->pub.rStartup = serializeAnalyzeStartup;
282 24 : self->pub.rShutdown = serializeAnalyzeShutdown;
283 24 : self->pub.rDestroy = serializeAnalyzeDestroy;
284 24 : self->pub.mydest = DestExplainSerialize;
285 :
286 24 : self->es = es;
287 :
288 24 : return (DestReceiver *) self;
289 : }
290 :
291 : /*
292 : * GetSerializationMetrics - collect metrics
293 : *
294 : * We have to be careful here since the receiver could be an IntoRel
295 : * receiver if the subject statement is CREATE TABLE AS. In that
296 : * case, return all-zeroes stats.
297 : */
298 : SerializeMetrics
299 30 : GetSerializationMetrics(DestReceiver *dest)
300 : {
301 : SerializeMetrics empty;
302 :
303 30 : if (dest->mydest == DestExplainSerialize)
304 24 : return ((SerializeDestReceiver *) dest)->metrics;
305 :
306 6 : memset(&empty, 0, sizeof(SerializeMetrics));
307 6 : INSTR_TIME_SET_ZERO(empty.timeSpent);
308 :
309 6 : return empty;
310 : }
|