Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_walinspect.c
4 : * Functions to inspect contents of PostgreSQL Write-Ahead Log
5 : *
6 : * Copyright (c) 2022-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/pg_walinspect/pg_walinspect.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/xlog.h"
16 : #include "access/xlog_internal.h"
17 : #include "access/xlogreader.h"
18 : #include "access/xlogrecovery.h"
19 : #include "access/xlogstats.h"
20 : #include "access/xlogutils.h"
21 : #include "funcapi.h"
22 : #include "miscadmin.h"
23 : #include "utils/array.h"
24 : #include "utils/builtins.h"
25 : #include "utils/pg_lsn.h"
26 :
27 : /*
28 : * NOTE: For any code change or issue fix here, it is highly recommended to
29 : * give a thought about doing the same in pg_waldump tool as well.
30 : */
31 :
32 16 : PG_MODULE_MAGIC;
33 :
34 12 : PG_FUNCTION_INFO_V1(pg_get_wal_block_info);
35 12 : PG_FUNCTION_INFO_V1(pg_get_wal_record_info);
36 18 : PG_FUNCTION_INFO_V1(pg_get_wal_records_info);
37 12 : PG_FUNCTION_INFO_V1(pg_get_wal_records_info_till_end_of_wal);
38 12 : PG_FUNCTION_INFO_V1(pg_get_wal_stats);
39 12 : PG_FUNCTION_INFO_V1(pg_get_wal_stats_till_end_of_wal);
40 :
41 : static void ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn);
42 : static XLogRecPtr GetCurrentLSN(void);
43 : static XLogReaderState *InitXLogReaderState(XLogRecPtr lsn);
44 : static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
45 : static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
46 : bool *nulls, uint32 ncols);
47 : static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
48 : XLogRecPtr start_lsn,
49 : XLogRecPtr end_lsn);
50 : static void GetXLogSummaryStats(XLogStats *stats, ReturnSetInfo *rsinfo,
51 : Datum *values, bool *nulls, uint32 ncols,
52 : bool stats_per_record);
53 : static void FillXLogStatsRow(const char *name, uint64 n, uint64 total_count,
54 : uint64 rec_len, uint64 total_rec_len,
55 : uint64 fpi_len, uint64 total_fpi_len,
56 : uint64 tot_len, uint64 total_len,
57 : Datum *values, bool *nulls, uint32 ncols);
58 : static void GetWalStats(FunctionCallInfo fcinfo,
59 : XLogRecPtr start_lsn,
60 : XLogRecPtr end_lsn,
61 : bool stats_per_record);
62 : static void GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
63 : bool show_data);
64 :
65 : /*
66 : * Return the LSN up to which the server has WAL.
67 : */
68 : static XLogRecPtr
69 60 : GetCurrentLSN(void)
70 : {
71 : XLogRecPtr curr_lsn;
72 :
73 : /*
74 : * We determine the current LSN of the server similar to how page_read
75 : * callback read_local_xlog_page_no_wait does.
76 : */
77 60 : if (!RecoveryInProgress())
78 60 : curr_lsn = GetFlushRecPtr(NULL);
79 : else
80 0 : curr_lsn = GetXLogReplayRecPtr(NULL);
81 :
82 : Assert(!XLogRecPtrIsInvalid(curr_lsn));
83 :
84 60 : return curr_lsn;
85 : }
86 :
87 : /*
88 : * Initialize WAL reader and identify first valid LSN.
89 : */
90 : static XLogReaderState *
91 42 : InitXLogReaderState(XLogRecPtr lsn)
92 : {
93 : XLogReaderState *xlogreader;
94 : ReadLocalXLogPageNoWaitPrivate *private_data;
95 : XLogRecPtr first_valid_record;
96 :
97 : /*
98 : * Reading WAL below the first page of the first segments isn't allowed.
99 : * This is a bootstrap WAL page and the page_read callback fails to read
100 : * it.
101 : */
102 42 : if (lsn < XLOG_BLCKSZ)
103 8 : ereport(ERROR,
104 : (errmsg("could not read WAL at LSN %X/%X",
105 : LSN_FORMAT_ARGS(lsn))));
106 :
107 : private_data = (ReadLocalXLogPageNoWaitPrivate *)
108 34 : palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
109 :
110 34 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
111 34 : XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
112 : .segment_open = &wal_segment_open,
113 : .segment_close = &wal_segment_close),
114 : private_data);
115 :
116 34 : if (xlogreader == NULL)
117 0 : ereport(ERROR,
118 : (errcode(ERRCODE_OUT_OF_MEMORY),
119 : errmsg("out of memory"),
120 : errdetail("Failed while allocating a WAL reading processor.")));
121 :
122 : /* first find a valid recptr to start from */
123 34 : first_valid_record = XLogFindNextRecord(xlogreader, lsn);
124 :
125 34 : if (XLogRecPtrIsInvalid(first_valid_record))
126 0 : ereport(ERROR,
127 : (errmsg("could not find a valid record after %X/%X",
128 : LSN_FORMAT_ARGS(lsn))));
129 :
130 34 : return xlogreader;
131 : }
132 :
133 : /*
134 : * Read next WAL record.
135 : *
136 : * By design, to be less intrusive in a running system, no slot is allocated
137 : * to reserve the WAL we're about to read. Therefore this function can
138 : * encounter read errors for historical WAL.
139 : *
140 : * We guard against ordinary errors trying to read WAL that hasn't been
141 : * written yet by limiting end_lsn to the flushed WAL, but that can also
142 : * encounter errors if the flush pointer falls in the middle of a record. In
143 : * that case we'll return NULL.
144 : */
145 : static XLogRecord *
146 68778 : ReadNextXLogRecord(XLogReaderState *xlogreader)
147 : {
148 : XLogRecord *record;
149 : char *errormsg;
150 :
151 68778 : record = XLogReadRecord(xlogreader, &errormsg);
152 :
153 68778 : if (record == NULL)
154 : {
155 : ReadLocalXLogPageNoWaitPrivate *private_data;
156 :
157 : /* return NULL, if end of WAL is reached */
158 20 : private_data = (ReadLocalXLogPageNoWaitPrivate *)
159 : xlogreader->private_data;
160 :
161 20 : if (private_data->end_of_wal)
162 20 : return NULL;
163 :
164 0 : if (errormsg)
165 0 : ereport(ERROR,
166 : (errcode_for_file_access(),
167 : errmsg("could not read WAL at %X/%X: %s",
168 : LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
169 : else
170 0 : ereport(ERROR,
171 : (errcode_for_file_access(),
172 : errmsg("could not read WAL at %X/%X",
173 : LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
174 : }
175 :
176 68758 : return record;
177 : }
178 :
179 : /*
180 : * Output values that make up a row describing caller's WAL record.
181 : *
182 : * This function leaks memory. Caller may need to use its own custom memory
183 : * context.
184 : *
185 : * Keep this in sync with GetWALBlockInfo.
186 : */
187 : static void
188 68672 : GetWALRecordInfo(XLogReaderState *record, Datum *values,
189 : bool *nulls, uint32 ncols)
190 : {
191 : const char *record_type;
192 : RmgrData desc;
193 68672 : uint32 fpi_len = 0;
194 : StringInfoData rec_desc;
195 : StringInfoData rec_blk_ref;
196 68672 : int i = 0;
197 :
198 68672 : desc = GetRmgr(XLogRecGetRmid(record));
199 68672 : record_type = desc.rm_identify(XLogRecGetInfo(record));
200 :
201 68672 : if (record_type == NULL)
202 0 : record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
203 :
204 68672 : initStringInfo(&rec_desc);
205 68672 : desc.rm_desc(&rec_desc, record);
206 :
207 68672 : if (XLogRecHasAnyBlockRefs(record))
208 : {
209 68630 : initStringInfo(&rec_blk_ref);
210 68630 : XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
211 : }
212 :
213 68672 : values[i++] = LSNGetDatum(record->ReadRecPtr);
214 68672 : values[i++] = LSNGetDatum(record->EndRecPtr);
215 68672 : values[i++] = LSNGetDatum(XLogRecGetPrev(record));
216 68672 : values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
217 68672 : values[i++] = CStringGetTextDatum(desc.rm_name);
218 68672 : values[i++] = CStringGetTextDatum(record_type);
219 68672 : values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
220 68672 : values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
221 68672 : values[i++] = UInt32GetDatum(fpi_len);
222 :
223 68672 : if (rec_desc.len > 0)
224 68644 : values[i++] = CStringGetTextDatum(rec_desc.data);
225 : else
226 28 : nulls[i++] = true;
227 :
228 68672 : if (XLogRecHasAnyBlockRefs(record))
229 68630 : values[i++] = CStringGetTextDatum(rec_blk_ref.data);
230 : else
231 42 : nulls[i++] = true;
232 :
233 : Assert(i == ncols);
234 68672 : }
235 :
236 :
237 : /*
238 : * Output one or more rows in rsinfo tuple store, each describing a single
239 : * block reference from caller's WAL record. (Should only be called with
240 : * records that have block references.)
241 : *
242 : * This function leaks memory. Caller may need to use its own custom memory
243 : * context.
244 : *
245 : * Keep this in sync with GetWALRecordInfo.
246 : */
247 : static void
248 32 : GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
249 : bool show_data)
250 : {
251 : #define PG_GET_WAL_BLOCK_INFO_COLS 20
252 : int block_id;
253 32 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
254 : RmgrData desc;
255 : const char *record_type;
256 : StringInfoData rec_desc;
257 :
258 : Assert(XLogRecHasAnyBlockRefs(record));
259 :
260 32 : desc = GetRmgr(XLogRecGetRmid(record));
261 32 : record_type = desc.rm_identify(XLogRecGetInfo(record));
262 :
263 32 : if (record_type == NULL)
264 0 : record_type = psprintf("UNKNOWN (%x)",
265 0 : XLogRecGetInfo(record) & ~XLR_INFO_MASK);
266 :
267 32 : initStringInfo(&rec_desc);
268 32 : desc.rm_desc(&rec_desc, record);
269 :
270 64 : for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
271 : {
272 : DecodedBkpBlock *blk;
273 : BlockNumber blkno;
274 : RelFileLocator rnode;
275 : ForkNumber forknum;
276 32 : Datum values[PG_GET_WAL_BLOCK_INFO_COLS] = {0};
277 32 : bool nulls[PG_GET_WAL_BLOCK_INFO_COLS] = {0};
278 32 : uint32 block_data_len = 0,
279 32 : block_fpi_len = 0;
280 32 : ArrayType *block_fpi_info = NULL;
281 32 : int i = 0;
282 :
283 32 : if (!XLogRecHasBlockRef(record, block_id))
284 0 : continue;
285 :
286 32 : blk = XLogRecGetBlock(record, block_id);
287 :
288 32 : (void) XLogRecGetBlockTagExtended(record, block_id,
289 : &rnode, &forknum, &blkno, NULL);
290 :
291 : /* Save block_data_len */
292 32 : if (blk->has_data)
293 30 : block_data_len = blk->data_len;
294 :
295 32 : if (blk->has_image)
296 : {
297 : /* Block reference has an FPI, so prepare relevant output */
298 : int bitcnt;
299 2 : int cnt = 0;
300 : Datum *flags;
301 :
302 : /* Save block_fpi_len */
303 2 : block_fpi_len = blk->bimg_len;
304 :
305 : /* Construct and save block_fpi_info */
306 2 : bitcnt = pg_popcount((const char *) &blk->bimg_info,
307 : sizeof(uint8));
308 2 : flags = (Datum *) palloc0(sizeof(Datum) * bitcnt);
309 2 : if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) != 0)
310 2 : flags[cnt++] = CStringGetTextDatum("HAS_HOLE");
311 2 : if (blk->apply_image)
312 2 : flags[cnt++] = CStringGetTextDatum("APPLY");
313 2 : if ((blk->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
314 0 : flags[cnt++] = CStringGetTextDatum("COMPRESS_PGLZ");
315 2 : if ((blk->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
316 0 : flags[cnt++] = CStringGetTextDatum("COMPRESS_LZ4");
317 2 : if ((blk->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
318 0 : flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
319 :
320 : Assert(cnt <= bitcnt);
321 2 : block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
322 : }
323 :
324 : /* start_lsn, end_lsn, prev_lsn, and blockid outputs */
325 32 : values[i++] = LSNGetDatum(record->ReadRecPtr);
326 32 : values[i++] = LSNGetDatum(record->EndRecPtr);
327 32 : values[i++] = LSNGetDatum(XLogRecGetPrev(record));
328 32 : values[i++] = Int16GetDatum(block_id);
329 :
330 : /* relfile and block related outputs */
331 32 : values[i++] = ObjectIdGetDatum(blk->rlocator.spcOid);
332 32 : values[i++] = ObjectIdGetDatum(blk->rlocator.dbOid);
333 32 : values[i++] = ObjectIdGetDatum(blk->rlocator.relNumber);
334 32 : values[i++] = Int16GetDatum(forknum);
335 32 : values[i++] = Int64GetDatum((int64) blkno);
336 :
337 : /* xid, resource_manager, and record_type outputs */
338 32 : values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
339 32 : values[i++] = CStringGetTextDatum(desc.rm_name);
340 32 : values[i++] = CStringGetTextDatum(record_type);
341 :
342 : /*
343 : * record_length, main_data_length, block_data_len, and
344 : * block_fpi_length outputs
345 : */
346 32 : values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
347 32 : values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
348 32 : values[i++] = UInt32GetDatum(block_data_len);
349 32 : values[i++] = UInt32GetDatum(block_fpi_len);
350 :
351 : /* block_fpi_info (text array) output */
352 32 : if (block_fpi_info)
353 2 : values[i++] = PointerGetDatum(block_fpi_info);
354 : else
355 30 : nulls[i++] = true;
356 :
357 : /* description output (describes WAL record) */
358 32 : if (rec_desc.len > 0)
359 32 : values[i++] = CStringGetTextDatum(rec_desc.data);
360 : else
361 0 : nulls[i++] = true;
362 :
363 : /* block_data output */
364 32 : if (blk->has_data && show_data)
365 30 : {
366 : bytea *block_data;
367 :
368 30 : block_data = (bytea *) palloc(block_data_len + VARHDRSZ);
369 30 : SET_VARSIZE(block_data, block_data_len + VARHDRSZ);
370 30 : memcpy(VARDATA(block_data), blk->data, block_data_len);
371 30 : values[i++] = PointerGetDatum(block_data);
372 : }
373 : else
374 2 : nulls[i++] = true;
375 :
376 : /* block_fpi_data output */
377 32 : if (blk->has_image && show_data)
378 2 : {
379 : PGAlignedBlock buf;
380 : Page page;
381 : bytea *block_fpi_data;
382 :
383 2 : page = (Page) buf.data;
384 2 : if (!RestoreBlockImage(record, block_id, page))
385 0 : ereport(ERROR,
386 : (errcode(ERRCODE_INTERNAL_ERROR),
387 : errmsg_internal("%s", record->errormsg_buf)));
388 :
389 2 : block_fpi_data = (bytea *) palloc(BLCKSZ + VARHDRSZ);
390 2 : SET_VARSIZE(block_fpi_data, BLCKSZ + VARHDRSZ);
391 2 : memcpy(VARDATA(block_fpi_data), page, BLCKSZ);
392 2 : values[i++] = PointerGetDatum(block_fpi_data);
393 : }
394 : else
395 30 : nulls[i++] = true;
396 :
397 : Assert(i == PG_GET_WAL_BLOCK_INFO_COLS);
398 :
399 : /* Store a tuple for this block reference */
400 32 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
401 : values, nulls);
402 : }
403 :
404 : #undef PG_GET_WAL_BLOCK_INFO_COLS
405 32 : }
406 :
407 : /*
408 : * Get WAL record info, unnested by block reference
409 : */
410 : Datum
411 14 : pg_get_wal_block_info(PG_FUNCTION_ARGS)
412 : {
413 14 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
414 14 : XLogRecPtr end_lsn = PG_GETARG_LSN(1);
415 14 : bool show_data = PG_GETARG_BOOL(2);
416 : XLogReaderState *xlogreader;
417 : MemoryContext old_cxt;
418 : MemoryContext tmp_cxt;
419 :
420 14 : ValidateInputLSNs(start_lsn, &end_lsn);
421 :
422 10 : InitMaterializedSRF(fcinfo, 0);
423 :
424 10 : xlogreader = InitXLogReaderState(start_lsn);
425 :
426 8 : tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
427 : "pg_get_wal_block_info temporary cxt",
428 : ALLOCSET_DEFAULT_SIZES);
429 :
430 56 : while (ReadNextXLogRecord(xlogreader) &&
431 50 : xlogreader->EndRecPtr <= end_lsn)
432 : {
433 48 : CHECK_FOR_INTERRUPTS();
434 :
435 48 : if (!XLogRecHasAnyBlockRefs(xlogreader))
436 16 : continue;
437 :
438 : /* Use the tmp context so we can clean up after each tuple is done */
439 32 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
440 :
441 32 : GetWALBlockInfo(fcinfo, xlogreader, show_data);
442 :
443 : /* clean up and switch back */
444 32 : MemoryContextSwitchTo(old_cxt);
445 32 : MemoryContextReset(tmp_cxt);
446 : }
447 :
448 8 : MemoryContextDelete(tmp_cxt);
449 8 : pfree(xlogreader->private_data);
450 8 : XLogReaderFree(xlogreader);
451 :
452 8 : PG_RETURN_VOID();
453 : }
454 :
455 : /*
456 : * Get WAL record info.
457 : */
458 : Datum
459 8 : pg_get_wal_record_info(PG_FUNCTION_ARGS)
460 : {
461 : #define PG_GET_WAL_RECORD_INFO_COLS 11
462 : Datum result;
463 8 : Datum values[PG_GET_WAL_RECORD_INFO_COLS] = {0};
464 8 : bool nulls[PG_GET_WAL_RECORD_INFO_COLS] = {0};
465 : XLogRecPtr lsn;
466 : XLogRecPtr curr_lsn;
467 : XLogReaderState *xlogreader;
468 : TupleDesc tupdesc;
469 : HeapTuple tuple;
470 :
471 8 : lsn = PG_GETARG_LSN(0);
472 8 : curr_lsn = GetCurrentLSN();
473 :
474 8 : if (lsn > curr_lsn)
475 2 : ereport(ERROR,
476 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
477 : errmsg("WAL input LSN must be less than current LSN"),
478 : errdetail("Current WAL LSN on the database system is at %X/%X.",
479 : LSN_FORMAT_ARGS(curr_lsn))));
480 :
481 : /* Build a tuple descriptor for our result type. */
482 6 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
483 0 : elog(ERROR, "return type must be a row type");
484 :
485 6 : xlogreader = InitXLogReaderState(lsn);
486 :
487 4 : if (!ReadNextXLogRecord(xlogreader))
488 0 : ereport(ERROR,
489 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
490 : errmsg("could not read WAL at %X/%X",
491 : LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
492 :
493 4 : GetWALRecordInfo(xlogreader, values, nulls, PG_GET_WAL_RECORD_INFO_COLS);
494 :
495 4 : pfree(xlogreader->private_data);
496 4 : XLogReaderFree(xlogreader);
497 :
498 4 : tuple = heap_form_tuple(tupdesc, values, nulls);
499 4 : result = HeapTupleGetDatum(tuple);
500 :
501 4 : PG_RETURN_DATUM(result);
502 : #undef PG_GET_WAL_RECORD_INFO_COLS
503 : }
504 :
505 : /*
506 : * Validate start and end LSNs coming from the function inputs.
507 : *
508 : * If end_lsn is found to be higher than the current LSN reported by the
509 : * cluster, use the current LSN as the upper bound.
510 : */
511 : static void
512 44 : ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn)
513 : {
514 44 : XLogRecPtr curr_lsn = GetCurrentLSN();
515 :
516 44 : if (start_lsn > curr_lsn)
517 6 : ereport(ERROR,
518 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
519 : errmsg("WAL start LSN must be less than current LSN"),
520 : errdetail("Current WAL LSN on the database system is at %X/%X.",
521 : LSN_FORMAT_ARGS(curr_lsn))));
522 :
523 38 : if (start_lsn > *end_lsn)
524 6 : ereport(ERROR,
525 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
526 : errmsg("WAL start LSN must be less than end LSN")));
527 :
528 32 : if (*end_lsn > curr_lsn)
529 8 : *end_lsn = curr_lsn;
530 32 : }
531 :
532 : /*
533 : * Get info of all WAL records between start LSN and end LSN.
534 : */
535 : static void
536 18 : GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
537 : XLogRecPtr end_lsn)
538 : {
539 : #define PG_GET_WAL_RECORDS_INFO_COLS 11
540 : XLogReaderState *xlogreader;
541 18 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
542 : MemoryContext old_cxt;
543 : MemoryContext tmp_cxt;
544 :
545 : Assert(start_lsn <= end_lsn);
546 :
547 18 : InitMaterializedSRF(fcinfo, 0);
548 :
549 18 : xlogreader = InitXLogReaderState(start_lsn);
550 :
551 16 : tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
552 : "GetWALRecordsInfo temporary cxt",
553 : ALLOCSET_DEFAULT_SIZES);
554 :
555 68684 : while (ReadNextXLogRecord(xlogreader) &&
556 68674 : xlogreader->EndRecPtr <= end_lsn)
557 : {
558 68668 : Datum values[PG_GET_WAL_RECORDS_INFO_COLS] = {0};
559 68668 : bool nulls[PG_GET_WAL_RECORDS_INFO_COLS] = {0};
560 :
561 : /* Use the tmp context so we can clean up after each tuple is done */
562 68668 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
563 :
564 68668 : GetWALRecordInfo(xlogreader, values, nulls,
565 : PG_GET_WAL_RECORDS_INFO_COLS);
566 :
567 68668 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
568 : values, nulls);
569 :
570 : /* clean up and switch back */
571 68668 : MemoryContextSwitchTo(old_cxt);
572 68668 : MemoryContextReset(tmp_cxt);
573 :
574 68668 : CHECK_FOR_INTERRUPTS();
575 : }
576 :
577 16 : MemoryContextDelete(tmp_cxt);
578 16 : pfree(xlogreader->private_data);
579 16 : XLogReaderFree(xlogreader);
580 :
581 : #undef PG_GET_WAL_RECORDS_INFO_COLS
582 16 : }
583 :
584 : /*
585 : * Get info of all WAL records between start LSN and end LSN.
586 : */
587 : Datum
588 20 : pg_get_wal_records_info(PG_FUNCTION_ARGS)
589 : {
590 20 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
591 20 : XLogRecPtr end_lsn = PG_GETARG_LSN(1);
592 :
593 20 : ValidateInputLSNs(start_lsn, &end_lsn);
594 16 : GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
595 :
596 14 : PG_RETURN_VOID();
597 : }
598 :
599 : /*
600 : * Fill single row of record counts and sizes for an rmgr or record.
601 : */
602 : static void
603 132 : FillXLogStatsRow(const char *name,
604 : uint64 n, uint64 total_count,
605 : uint64 rec_len, uint64 total_rec_len,
606 : uint64 fpi_len, uint64 total_fpi_len,
607 : uint64 tot_len, uint64 total_len,
608 : Datum *values, bool *nulls, uint32 ncols)
609 : {
610 : double n_pct,
611 : rec_len_pct,
612 : fpi_len_pct,
613 : tot_len_pct;
614 132 : int i = 0;
615 :
616 132 : n_pct = 0;
617 132 : if (total_count != 0)
618 132 : n_pct = 100 * (double) n / total_count;
619 :
620 132 : rec_len_pct = 0;
621 132 : if (total_rec_len != 0)
622 132 : rec_len_pct = 100 * (double) rec_len / total_rec_len;
623 :
624 132 : fpi_len_pct = 0;
625 132 : if (total_fpi_len != 0)
626 0 : fpi_len_pct = 100 * (double) fpi_len / total_fpi_len;
627 :
628 132 : tot_len_pct = 0;
629 132 : if (total_len != 0)
630 132 : tot_len_pct = 100 * (double) tot_len / total_len;
631 :
632 132 : values[i++] = CStringGetTextDatum(name);
633 132 : values[i++] = Int64GetDatum(n);
634 132 : values[i++] = Float8GetDatum(n_pct);
635 132 : values[i++] = Int64GetDatum(rec_len);
636 132 : values[i++] = Float8GetDatum(rec_len_pct);
637 132 : values[i++] = Int64GetDatum(fpi_len);
638 132 : values[i++] = Float8GetDatum(fpi_len_pct);
639 132 : values[i++] = Int64GetDatum(tot_len);
640 132 : values[i++] = Float8GetDatum(tot_len_pct);
641 :
642 : Assert(i == ncols);
643 132 : }
644 :
645 : /*
646 : * Get summary statistics about the records seen so far.
647 : */
648 : static void
649 6 : GetXLogSummaryStats(XLogStats *stats, ReturnSetInfo *rsinfo,
650 : Datum *values, bool *nulls, uint32 ncols,
651 : bool stats_per_record)
652 : {
653 : MemoryContext old_cxt;
654 : MemoryContext tmp_cxt;
655 6 : uint64 total_count = 0;
656 6 : uint64 total_rec_len = 0;
657 6 : uint64 total_fpi_len = 0;
658 6 : uint64 total_len = 0;
659 : int ri;
660 :
661 : /*
662 : * Each row shows its percentages of the total, so make a first pass to
663 : * calculate column totals.
664 : */
665 1542 : for (ri = 0; ri <= RM_MAX_ID; ri++)
666 : {
667 1536 : if (!RmgrIdIsValid(ri))
668 636 : continue;
669 :
670 900 : total_count += stats->rmgr_stats[ri].count;
671 900 : total_rec_len += stats->rmgr_stats[ri].rec_len;
672 900 : total_fpi_len += stats->rmgr_stats[ri].fpi_len;
673 : }
674 6 : total_len = total_rec_len + total_fpi_len;
675 :
676 6 : tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
677 : "GetXLogSummaryStats temporary cxt",
678 : ALLOCSET_DEFAULT_SIZES);
679 :
680 1542 : for (ri = 0; ri <= RM_MAX_ID; ri++)
681 : {
682 : uint64 count;
683 : uint64 rec_len;
684 : uint64 fpi_len;
685 : uint64 tot_len;
686 : RmgrData desc;
687 :
688 1536 : if (!RmgrIdIsValid(ri))
689 1404 : continue;
690 :
691 900 : if (!RmgrIdExists(ri))
692 768 : continue;
693 :
694 132 : desc = GetRmgr(ri);
695 :
696 132 : if (stats_per_record)
697 : {
698 : int rj;
699 :
700 0 : for (rj = 0; rj < MAX_XLINFO_TYPES; rj++)
701 : {
702 : const char *id;
703 :
704 0 : count = stats->record_stats[ri][rj].count;
705 0 : rec_len = stats->record_stats[ri][rj].rec_len;
706 0 : fpi_len = stats->record_stats[ri][rj].fpi_len;
707 0 : tot_len = rec_len + fpi_len;
708 :
709 : /* Skip undefined combinations and ones that didn't occur */
710 0 : if (count == 0)
711 0 : continue;
712 :
713 0 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
714 :
715 : /* the upper four bits in xl_info are the rmgr's */
716 0 : id = desc.rm_identify(rj << 4);
717 0 : if (id == NULL)
718 0 : id = psprintf("UNKNOWN (%x)", rj << 4);
719 :
720 0 : FillXLogStatsRow(psprintf("%s/%s", desc.rm_name, id), count,
721 : total_count, rec_len, total_rec_len, fpi_len,
722 : total_fpi_len, tot_len, total_len,
723 : values, nulls, ncols);
724 :
725 0 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
726 : values, nulls);
727 :
728 : /* clean up and switch back */
729 0 : MemoryContextSwitchTo(old_cxt);
730 0 : MemoryContextReset(tmp_cxt);
731 : }
732 : }
733 : else
734 : {
735 132 : count = stats->rmgr_stats[ri].count;
736 132 : rec_len = stats->rmgr_stats[ri].rec_len;
737 132 : fpi_len = stats->rmgr_stats[ri].fpi_len;
738 132 : tot_len = rec_len + fpi_len;
739 :
740 132 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
741 :
742 132 : FillXLogStatsRow(desc.rm_name, count, total_count, rec_len,
743 : total_rec_len, fpi_len, total_fpi_len, tot_len,
744 : total_len, values, nulls, ncols);
745 :
746 132 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
747 : values, nulls);
748 :
749 : /* clean up and switch back */
750 132 : MemoryContextSwitchTo(old_cxt);
751 132 : MemoryContextReset(tmp_cxt);
752 : }
753 : }
754 :
755 6 : MemoryContextDelete(tmp_cxt);
756 6 : }
757 :
758 : /*
759 : * Get WAL stats between start LSN and end LSN.
760 : */
761 : static void
762 8 : GetWalStats(FunctionCallInfo fcinfo, XLogRecPtr start_lsn, XLogRecPtr end_lsn,
763 : bool stats_per_record)
764 : {
765 : #define PG_GET_WAL_STATS_COLS 9
766 : XLogReaderState *xlogreader;
767 8 : XLogStats stats = {0};
768 8 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
769 8 : Datum values[PG_GET_WAL_STATS_COLS] = {0};
770 8 : bool nulls[PG_GET_WAL_STATS_COLS] = {0};
771 :
772 : Assert(start_lsn <= end_lsn);
773 :
774 8 : InitMaterializedSRF(fcinfo, 0);
775 :
776 8 : xlogreader = InitXLogReaderState(start_lsn);
777 :
778 34 : while (ReadNextXLogRecord(xlogreader) &&
779 30 : xlogreader->EndRecPtr <= end_lsn)
780 : {
781 28 : XLogRecStoreStats(&stats, xlogreader);
782 :
783 28 : CHECK_FOR_INTERRUPTS();
784 : }
785 :
786 6 : pfree(xlogreader->private_data);
787 6 : XLogReaderFree(xlogreader);
788 :
789 6 : GetXLogSummaryStats(&stats, rsinfo, values, nulls,
790 : PG_GET_WAL_STATS_COLS,
791 : stats_per_record);
792 :
793 : #undef PG_GET_WAL_STATS_COLS
794 6 : }
795 :
796 : /*
797 : * Get stats of all WAL records between start LSN and end LSN.
798 : */
799 : Datum
800 10 : pg_get_wal_stats(PG_FUNCTION_ARGS)
801 : {
802 10 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
803 10 : XLogRecPtr end_lsn = PG_GETARG_LSN(1);
804 10 : bool stats_per_record = PG_GETARG_BOOL(2);
805 :
806 10 : ValidateInputLSNs(start_lsn, &end_lsn);
807 6 : GetWalStats(fcinfo, start_lsn, end_lsn, stats_per_record);
808 :
809 4 : PG_RETURN_VOID();
810 : }
811 :
812 : /*
813 : * The following functions have been removed in newer versions in 1.1, but
814 : * they are kept around for compatibility.
815 : */
816 : Datum
817 4 : pg_get_wal_records_info_till_end_of_wal(PG_FUNCTION_ARGS)
818 : {
819 4 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
820 4 : XLogRecPtr end_lsn = GetCurrentLSN();
821 :
822 4 : if (start_lsn > end_lsn)
823 2 : ereport(ERROR,
824 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
825 : errmsg("WAL start LSN must be less than current LSN"),
826 : errdetail("Current WAL LSN on the database system is at %X/%X.",
827 : LSN_FORMAT_ARGS(end_lsn))));
828 :
829 2 : GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
830 :
831 2 : PG_RETURN_VOID();
832 : }
833 :
834 : Datum
835 4 : pg_get_wal_stats_till_end_of_wal(PG_FUNCTION_ARGS)
836 : {
837 4 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
838 4 : XLogRecPtr end_lsn = GetCurrentLSN();
839 4 : bool stats_per_record = PG_GETARG_BOOL(1);
840 :
841 4 : if (start_lsn > end_lsn)
842 2 : ereport(ERROR,
843 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
844 : errmsg("WAL start LSN must be less than current LSN"),
845 : errdetail("Current WAL LSN on the database system is at %X/%X.",
846 : LSN_FORMAT_ARGS(end_lsn))));
847 :
848 2 : GetWalStats(fcinfo, start_lsn, end_lsn, stats_per_record);
849 :
850 2 : PG_RETURN_VOID();
851 : }
|