Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_walinspect.c
4 : * Functions to inspect contents of PostgreSQL Write-Ahead Log
5 : *
6 : * Copyright (c) 2022-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/pg_walinspect/pg_walinspect.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/htup_details.h"
16 : #include "access/xlog.h"
17 : #include "access/xlog_internal.h"
18 : #include "access/xlogreader.h"
19 : #include "access/xlogrecovery.h"
20 : #include "access/xlogstats.h"
21 : #include "access/xlogutils.h"
22 : #include "funcapi.h"
23 : #include "miscadmin.h"
24 : #include "port/pg_bitutils.h"
25 : #include "utils/array.h"
26 : #include "utils/builtins.h"
27 : #include "utils/pg_lsn.h"
28 : #include "utils/tuplestore.h"
29 :
30 : /*
31 : * NOTE: For any code change or issue fix here, it is highly recommended to
32 : * give a thought about doing the same in pg_waldump tool as well.
33 : */
34 :
35 8 : PG_MODULE_MAGIC_EXT(
36 : .name = "pg_walinspect",
37 : .version = PG_VERSION
38 : );
39 :
40 6 : PG_FUNCTION_INFO_V1(pg_get_wal_block_info);
41 6 : PG_FUNCTION_INFO_V1(pg_get_wal_record_info);
42 9 : PG_FUNCTION_INFO_V1(pg_get_wal_records_info);
43 6 : PG_FUNCTION_INFO_V1(pg_get_wal_records_info_till_end_of_wal);
44 6 : PG_FUNCTION_INFO_V1(pg_get_wal_stats);
45 6 : PG_FUNCTION_INFO_V1(pg_get_wal_stats_till_end_of_wal);
46 :
47 : static void ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn);
48 : static XLogRecPtr GetCurrentLSN(void);
49 : static XLogReaderState *InitXLogReaderState(XLogRecPtr lsn);
50 : static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
51 : static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
52 : bool *nulls, uint32 ncols);
53 : static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
54 : XLogRecPtr start_lsn,
55 : XLogRecPtr end_lsn);
56 : static void GetXLogSummaryStats(XLogStats *stats, ReturnSetInfo *rsinfo,
57 : Datum *values, bool *nulls, uint32 ncols,
58 : bool stats_per_record);
59 : static void FillXLogStatsRow(const char *name, uint64 n, uint64 total_count,
60 : uint64 rec_len, uint64 total_rec_len,
61 : uint64 fpi_len, uint64 total_fpi_len,
62 : uint64 tot_len, uint64 total_len,
63 : Datum *values, bool *nulls, uint32 ncols);
64 : static void GetWalStats(FunctionCallInfo fcinfo,
65 : XLogRecPtr start_lsn,
66 : XLogRecPtr end_lsn,
67 : bool stats_per_record);
68 : static void GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
69 : bool show_data);
70 :
71 : /*
72 : * Return the LSN up to which the server has WAL.
73 : */
74 : static XLogRecPtr
75 30 : GetCurrentLSN(void)
76 : {
77 : XLogRecPtr curr_lsn;
78 :
79 : /*
80 : * We determine the current LSN of the server similar to how page_read
81 : * callback read_local_xlog_page_no_wait does.
82 : */
83 30 : if (!RecoveryInProgress())
84 30 : curr_lsn = GetFlushRecPtr(NULL);
85 : else
86 0 : curr_lsn = GetXLogReplayRecPtr(NULL);
87 :
88 : Assert(XLogRecPtrIsValid(curr_lsn));
89 :
90 30 : return curr_lsn;
91 : }
92 :
93 : /*
94 : * Initialize WAL reader and identify first valid LSN.
95 : */
96 : static XLogReaderState *
97 21 : InitXLogReaderState(XLogRecPtr lsn)
98 : {
99 : XLogReaderState *xlogreader;
100 : ReadLocalXLogPageNoWaitPrivate *private_data;
101 : XLogRecPtr first_valid_record;
102 : char *errormsg;
103 :
104 : /*
105 : * Reading WAL below the first page of the first segments isn't allowed.
106 : * This is a bootstrap WAL page and the page_read callback fails to read
107 : * it.
108 : */
109 21 : if (lsn < XLOG_BLCKSZ)
110 4 : ereport(ERROR,
111 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
112 : errmsg("could not read WAL at LSN %X/%08X",
113 : LSN_FORMAT_ARGS(lsn))));
114 :
115 17 : private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
116 :
117 17 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
118 17 : XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
119 : .segment_open = &wal_segment_open,
120 : .segment_close = &wal_segment_close),
121 : private_data);
122 :
123 17 : if (xlogreader == NULL)
124 0 : ereport(ERROR,
125 : (errcode(ERRCODE_OUT_OF_MEMORY),
126 : errmsg("out of memory"),
127 : errdetail("Failed while allocating a WAL reading processor.")));
128 :
129 : /* first find a valid recptr to start from */
130 17 : first_valid_record = XLogFindNextRecord(xlogreader, lsn, &errormsg);
131 :
132 17 : if (!XLogRecPtrIsValid(first_valid_record))
133 : {
134 0 : if (errormsg)
135 0 : ereport(ERROR,
136 : errmsg("could not find a valid record after %X/%08X: %s",
137 : LSN_FORMAT_ARGS(lsn), errormsg));
138 : else
139 0 : ereport(ERROR,
140 : errmsg("could not find a valid record after %X/%08X",
141 : LSN_FORMAT_ARGS(lsn)));
142 : }
143 :
144 17 : return xlogreader;
145 : }
146 :
147 : /*
148 : * Read next WAL record.
149 : *
150 : * By design, to be less intrusive in a running system, no slot is allocated
151 : * to reserve the WAL we're about to read. Therefore this function can
152 : * encounter read errors for historical WAL.
153 : *
154 : * We guard against ordinary errors trying to read WAL that hasn't been
155 : * written yet by limiting end_lsn to the flushed WAL, but that can also
156 : * encounter errors if the flush pointer falls in the middle of a record. In
157 : * that case we'll return NULL.
158 : */
159 : static XLogRecord *
160 34391 : ReadNextXLogRecord(XLogReaderState *xlogreader)
161 : {
162 : XLogRecord *record;
163 : char *errormsg;
164 :
165 34391 : record = XLogReadRecord(xlogreader, &errormsg);
166 :
167 34391 : if (record == NULL)
168 : {
169 : ReadLocalXLogPageNoWaitPrivate *private_data;
170 :
171 : /* return NULL, if end of WAL is reached */
172 10 : private_data = (ReadLocalXLogPageNoWaitPrivate *)
173 : xlogreader->private_data;
174 :
175 10 : if (private_data->end_of_wal)
176 10 : return NULL;
177 :
178 0 : if (errormsg)
179 0 : ereport(ERROR,
180 : (errcode_for_file_access(),
181 : errmsg("could not read WAL at %X/%08X: %s",
182 : LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
183 : else
184 0 : ereport(ERROR,
185 : (errcode_for_file_access(),
186 : errmsg("could not read WAL at %X/%08X",
187 : LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
188 : }
189 :
190 34381 : return record;
191 : }
192 :
193 : /*
194 : * Output values that make up a row describing caller's WAL record.
195 : *
196 : * This function leaks memory. Caller may need to use its own custom memory
197 : * context.
198 : *
199 : * Keep this in sync with GetWALBlockInfo.
200 : */
201 : static void
202 34336 : GetWALRecordInfo(XLogReaderState *record, Datum *values,
203 : bool *nulls, uint32 ncols)
204 : {
205 : const char *record_type;
206 : RmgrData desc;
207 34336 : uint32 fpi_len = 0;
208 : StringInfoData rec_desc;
209 : StringInfoData rec_blk_ref;
210 34336 : int i = 0;
211 :
212 34336 : desc = GetRmgr(XLogRecGetRmid(record));
213 34336 : record_type = desc.rm_identify(XLogRecGetInfo(record));
214 :
215 34336 : if (record_type == NULL)
216 0 : record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
217 :
218 34336 : initStringInfo(&rec_desc);
219 34336 : desc.rm_desc(&rec_desc, record);
220 :
221 34336 : if (XLogRecHasAnyBlockRefs(record))
222 : {
223 34315 : initStringInfo(&rec_blk_ref);
224 34315 : XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
225 : }
226 :
227 34336 : values[i++] = LSNGetDatum(record->ReadRecPtr);
228 34336 : values[i++] = LSNGetDatum(record->EndRecPtr);
229 34336 : values[i++] = LSNGetDatum(XLogRecGetPrev(record));
230 34336 : values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
231 34336 : values[i++] = CStringGetTextDatum(desc.rm_name);
232 34336 : values[i++] = CStringGetTextDatum(record_type);
233 34336 : values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
234 34336 : values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
235 34336 : values[i++] = UInt32GetDatum(fpi_len);
236 :
237 34336 : if (rec_desc.len > 0)
238 34318 : values[i++] = CStringGetTextDatum(rec_desc.data);
239 : else
240 18 : nulls[i++] = true;
241 :
242 34336 : if (XLogRecHasAnyBlockRefs(record))
243 34315 : values[i++] = CStringGetTextDatum(rec_blk_ref.data);
244 : else
245 21 : nulls[i++] = true;
246 :
247 : Assert(i == ncols);
248 34336 : }
249 :
250 :
251 : /*
252 : * Output one or more rows in rsinfo tuple store, each describing a single
253 : * block reference from caller's WAL record. (Should only be called with
254 : * records that have block references.)
255 : *
256 : * This function leaks memory. Caller may need to use its own custom memory
257 : * context.
258 : *
259 : * Keep this in sync with GetWALRecordInfo.
260 : */
261 : static void
262 18 : GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
263 : bool show_data)
264 : {
265 : #define PG_GET_WAL_BLOCK_INFO_COLS 20
266 : int block_id;
267 18 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
268 : RmgrData desc;
269 : const char *record_type;
270 : StringInfoData rec_desc;
271 :
272 : Assert(XLogRecHasAnyBlockRefs(record));
273 :
274 18 : desc = GetRmgr(XLogRecGetRmid(record));
275 18 : record_type = desc.rm_identify(XLogRecGetInfo(record));
276 :
277 18 : if (record_type == NULL)
278 0 : record_type = psprintf("UNKNOWN (%x)",
279 0 : XLogRecGetInfo(record) & ~XLR_INFO_MASK);
280 :
281 18 : initStringInfo(&rec_desc);
282 18 : desc.rm_desc(&rec_desc, record);
283 :
284 36 : for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
285 : {
286 : DecodedBkpBlock *blk;
287 : BlockNumber blkno;
288 : RelFileLocator rnode;
289 : ForkNumber forknum;
290 18 : Datum values[PG_GET_WAL_BLOCK_INFO_COLS] = {0};
291 18 : bool nulls[PG_GET_WAL_BLOCK_INFO_COLS] = {0};
292 18 : uint32 block_data_len = 0,
293 18 : block_fpi_len = 0;
294 18 : ArrayType *block_fpi_info = NULL;
295 18 : int i = 0;
296 :
297 18 : if (!XLogRecHasBlockRef(record, block_id))
298 0 : continue;
299 :
300 18 : blk = XLogRecGetBlock(record, block_id);
301 :
302 18 : (void) XLogRecGetBlockTagExtended(record, block_id,
303 : &rnode, &forknum, &blkno, NULL);
304 :
305 : /* Save block_data_len */
306 18 : if (blk->has_data)
307 17 : block_data_len = blk->data_len;
308 :
309 18 : if (blk->has_image)
310 : {
311 : /* Block reference has an FPI, so prepare relevant output */
312 : int bitcnt;
313 1 : int cnt = 0;
314 : Datum *flags;
315 :
316 : /* Save block_fpi_len */
317 1 : block_fpi_len = blk->bimg_len;
318 :
319 : /* Construct and save block_fpi_info */
320 1 : bitcnt = pg_popcount((const char *) &blk->bimg_info,
321 : sizeof(uint8));
322 1 : flags = palloc0_array(Datum, bitcnt);
323 1 : if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) != 0)
324 1 : flags[cnt++] = CStringGetTextDatum("HAS_HOLE");
325 1 : if (blk->apply_image)
326 1 : flags[cnt++] = CStringGetTextDatum("APPLY");
327 1 : if ((blk->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
328 0 : flags[cnt++] = CStringGetTextDatum("COMPRESS_PGLZ");
329 1 : if ((blk->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
330 0 : flags[cnt++] = CStringGetTextDatum("COMPRESS_LZ4");
331 1 : if ((blk->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
332 0 : flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
333 :
334 : Assert(cnt <= bitcnt);
335 1 : block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
336 : }
337 :
338 : /* start_lsn, end_lsn, prev_lsn, and blockid outputs */
339 18 : values[i++] = LSNGetDatum(record->ReadRecPtr);
340 18 : values[i++] = LSNGetDatum(record->EndRecPtr);
341 18 : values[i++] = LSNGetDatum(XLogRecGetPrev(record));
342 18 : values[i++] = Int16GetDatum(block_id);
343 :
344 : /* relfile and block related outputs */
345 18 : values[i++] = ObjectIdGetDatum(blk->rlocator.spcOid);
346 18 : values[i++] = ObjectIdGetDatum(blk->rlocator.dbOid);
347 18 : values[i++] = ObjectIdGetDatum(blk->rlocator.relNumber);
348 18 : values[i++] = Int16GetDatum(forknum);
349 18 : values[i++] = Int64GetDatum((int64) blkno);
350 :
351 : /* xid, resource_manager, and record_type outputs */
352 18 : values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
353 18 : values[i++] = CStringGetTextDatum(desc.rm_name);
354 18 : values[i++] = CStringGetTextDatum(record_type);
355 :
356 : /*
357 : * record_length, main_data_length, block_data_len, and
358 : * block_fpi_length outputs
359 : */
360 18 : values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
361 18 : values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
362 18 : values[i++] = UInt32GetDatum(block_data_len);
363 18 : values[i++] = UInt32GetDatum(block_fpi_len);
364 :
365 : /* block_fpi_info (text array) output */
366 18 : if (block_fpi_info)
367 1 : values[i++] = PointerGetDatum(block_fpi_info);
368 : else
369 17 : nulls[i++] = true;
370 :
371 : /* description output (describes WAL record) */
372 18 : if (rec_desc.len > 0)
373 17 : values[i++] = CStringGetTextDatum(rec_desc.data);
374 : else
375 1 : nulls[i++] = true;
376 :
377 : /* block_data output */
378 18 : if (blk->has_data && show_data)
379 17 : {
380 : bytea *block_data;
381 :
382 17 : block_data = (bytea *) palloc(block_data_len + VARHDRSZ);
383 17 : SET_VARSIZE(block_data, block_data_len + VARHDRSZ);
384 17 : memcpy(VARDATA(block_data), blk->data, block_data_len);
385 17 : values[i++] = PointerGetDatum(block_data);
386 : }
387 : else
388 1 : nulls[i++] = true;
389 :
390 : /* block_fpi_data output */
391 18 : if (blk->has_image && show_data)
392 1 : {
393 : PGAlignedBlock buf;
394 : Page page;
395 : bytea *block_fpi_data;
396 :
397 1 : page = (Page) buf.data;
398 1 : if (!RestoreBlockImage(record, block_id, page))
399 0 : ereport(ERROR,
400 : (errcode(ERRCODE_INTERNAL_ERROR),
401 : errmsg_internal("%s", record->errormsg_buf)));
402 :
403 1 : block_fpi_data = (bytea *) palloc(BLCKSZ + VARHDRSZ);
404 1 : SET_VARSIZE(block_fpi_data, BLCKSZ + VARHDRSZ);
405 1 : memcpy(VARDATA(block_fpi_data), page, BLCKSZ);
406 1 : values[i++] = PointerGetDatum(block_fpi_data);
407 : }
408 : else
409 17 : nulls[i++] = true;
410 :
411 : Assert(i == PG_GET_WAL_BLOCK_INFO_COLS);
412 :
413 : /* Store a tuple for this block reference */
414 18 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
415 : values, nulls);
416 : }
417 :
418 : #undef PG_GET_WAL_BLOCK_INFO_COLS
419 18 : }
420 :
421 : /*
422 : * Get WAL record info, unnested by block reference
423 : */
424 : Datum
425 7 : pg_get_wal_block_info(PG_FUNCTION_ARGS)
426 : {
427 7 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
428 7 : XLogRecPtr end_lsn = PG_GETARG_LSN(1);
429 7 : bool show_data = PG_GETARG_BOOL(2);
430 : XLogReaderState *xlogreader;
431 : MemoryContext old_cxt;
432 : MemoryContext tmp_cxt;
433 :
434 7 : ValidateInputLSNs(start_lsn, &end_lsn);
435 :
436 5 : InitMaterializedSRF(fcinfo, 0);
437 :
438 5 : xlogreader = InitXLogReaderState(start_lsn);
439 :
440 4 : tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
441 : "pg_get_wal_block_info temporary cxt",
442 : ALLOCSET_DEFAULT_SIZES);
443 :
444 30 : while (ReadNextXLogRecord(xlogreader) &&
445 27 : xlogreader->EndRecPtr <= end_lsn)
446 : {
447 26 : CHECK_FOR_INTERRUPTS();
448 :
449 26 : if (!XLogRecHasAnyBlockRefs(xlogreader))
450 8 : continue;
451 :
452 : /* Use the tmp context so we can clean up after each tuple is done */
453 18 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
454 :
455 18 : GetWALBlockInfo(fcinfo, xlogreader, show_data);
456 :
457 : /* clean up and switch back */
458 18 : MemoryContextSwitchTo(old_cxt);
459 18 : MemoryContextReset(tmp_cxt);
460 : }
461 :
462 4 : MemoryContextDelete(tmp_cxt);
463 4 : pfree(xlogreader->private_data);
464 4 : XLogReaderFree(xlogreader);
465 :
466 4 : PG_RETURN_VOID();
467 : }
468 :
469 : /*
470 : * Get WAL record info.
471 : */
472 : Datum
473 4 : pg_get_wal_record_info(PG_FUNCTION_ARGS)
474 : {
475 : #define PG_GET_WAL_RECORD_INFO_COLS 11
476 : Datum result;
477 4 : Datum values[PG_GET_WAL_RECORD_INFO_COLS] = {0};
478 4 : bool nulls[PG_GET_WAL_RECORD_INFO_COLS] = {0};
479 : XLogRecPtr lsn;
480 : XLogRecPtr curr_lsn;
481 : XLogReaderState *xlogreader;
482 : TupleDesc tupdesc;
483 : HeapTuple tuple;
484 :
485 4 : lsn = PG_GETARG_LSN(0);
486 4 : curr_lsn = GetCurrentLSN();
487 :
488 4 : if (lsn > curr_lsn)
489 1 : ereport(ERROR,
490 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
491 : errmsg("WAL input LSN must be less than current LSN"),
492 : errdetail("Current WAL LSN on the database system is at %X/%08X.",
493 : LSN_FORMAT_ARGS(curr_lsn))));
494 :
495 : /* Build a tuple descriptor for our result type. */
496 3 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
497 0 : elog(ERROR, "return type must be a row type");
498 :
499 3 : xlogreader = InitXLogReaderState(lsn);
500 :
501 2 : if (!ReadNextXLogRecord(xlogreader))
502 0 : ereport(ERROR,
503 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
504 : errmsg("could not read WAL at %X/%08X",
505 : LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
506 :
507 2 : GetWALRecordInfo(xlogreader, values, nulls, PG_GET_WAL_RECORD_INFO_COLS);
508 :
509 2 : pfree(xlogreader->private_data);
510 2 : XLogReaderFree(xlogreader);
511 :
512 2 : tuple = heap_form_tuple(tupdesc, values, nulls);
513 2 : result = HeapTupleGetDatum(tuple);
514 :
515 2 : PG_RETURN_DATUM(result);
516 : #undef PG_GET_WAL_RECORD_INFO_COLS
517 : }
518 :
519 : /*
520 : * Validate start and end LSNs coming from the function inputs.
521 : *
522 : * If end_lsn is found to be higher than the current LSN reported by the
523 : * cluster, use the current LSN as the upper bound.
524 : */
525 : static void
526 22 : ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn)
527 : {
528 22 : XLogRecPtr curr_lsn = GetCurrentLSN();
529 :
530 22 : if (start_lsn > curr_lsn)
531 3 : ereport(ERROR,
532 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
533 : errmsg("WAL start LSN must be less than current LSN"),
534 : errdetail("Current WAL LSN on the database system is at %X/%08X.",
535 : LSN_FORMAT_ARGS(curr_lsn))));
536 :
537 19 : if (start_lsn > *end_lsn)
538 3 : ereport(ERROR,
539 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
540 : errmsg("WAL start LSN must be less than end LSN")));
541 :
542 16 : if (*end_lsn > curr_lsn)
543 4 : *end_lsn = curr_lsn;
544 16 : }
545 :
546 : /*
547 : * Get info of all WAL records between start LSN and end LSN.
548 : */
549 : static void
550 9 : GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
551 : XLogRecPtr end_lsn)
552 : {
553 : #define PG_GET_WAL_RECORDS_INFO_COLS 11
554 : XLogReaderState *xlogreader;
555 9 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
556 : MemoryContext old_cxt;
557 : MemoryContext tmp_cxt;
558 :
559 : Assert(start_lsn <= end_lsn);
560 :
561 9 : InitMaterializedSRF(fcinfo, 0);
562 :
563 9 : xlogreader = InitXLogReaderState(start_lsn);
564 :
565 8 : tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
566 : "GetWALRecordsInfo temporary cxt",
567 : ALLOCSET_DEFAULT_SIZES);
568 :
569 34342 : while (ReadNextXLogRecord(xlogreader) &&
570 34337 : xlogreader->EndRecPtr <= end_lsn)
571 : {
572 34334 : Datum values[PG_GET_WAL_RECORDS_INFO_COLS] = {0};
573 34334 : bool nulls[PG_GET_WAL_RECORDS_INFO_COLS] = {0};
574 :
575 : /* Use the tmp context so we can clean up after each tuple is done */
576 34334 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
577 :
578 34334 : GetWALRecordInfo(xlogreader, values, nulls,
579 : PG_GET_WAL_RECORDS_INFO_COLS);
580 :
581 34334 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
582 : values, nulls);
583 :
584 : /* clean up and switch back */
585 34334 : MemoryContextSwitchTo(old_cxt);
586 34334 : MemoryContextReset(tmp_cxt);
587 :
588 34334 : CHECK_FOR_INTERRUPTS();
589 : }
590 :
591 8 : MemoryContextDelete(tmp_cxt);
592 8 : pfree(xlogreader->private_data);
593 8 : XLogReaderFree(xlogreader);
594 :
595 : #undef PG_GET_WAL_RECORDS_INFO_COLS
596 8 : }
597 :
598 : /*
599 : * Get info of all WAL records between start LSN and end LSN.
600 : */
601 : Datum
602 10 : pg_get_wal_records_info(PG_FUNCTION_ARGS)
603 : {
604 10 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
605 10 : XLogRecPtr end_lsn = PG_GETARG_LSN(1);
606 :
607 10 : ValidateInputLSNs(start_lsn, &end_lsn);
608 8 : GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
609 :
610 7 : PG_RETURN_VOID();
611 : }
612 :
613 : /*
614 : * Fill single row of record counts and sizes for an rmgr or record.
615 : */
616 : static void
617 69 : FillXLogStatsRow(const char *name,
618 : uint64 n, uint64 total_count,
619 : uint64 rec_len, uint64 total_rec_len,
620 : uint64 fpi_len, uint64 total_fpi_len,
621 : uint64 tot_len, uint64 total_len,
622 : Datum *values, bool *nulls, uint32 ncols)
623 : {
624 : double n_pct,
625 : rec_len_pct,
626 : fpi_len_pct,
627 : tot_len_pct;
628 69 : int i = 0;
629 :
630 69 : n_pct = 0;
631 69 : if (total_count != 0)
632 69 : n_pct = 100 * (double) n / total_count;
633 :
634 69 : rec_len_pct = 0;
635 69 : if (total_rec_len != 0)
636 69 : rec_len_pct = 100 * (double) rec_len / total_rec_len;
637 :
638 69 : fpi_len_pct = 0;
639 69 : if (total_fpi_len != 0)
640 0 : fpi_len_pct = 100 * (double) fpi_len / total_fpi_len;
641 :
642 69 : tot_len_pct = 0;
643 69 : if (total_len != 0)
644 69 : tot_len_pct = 100 * (double) tot_len / total_len;
645 :
646 69 : values[i++] = CStringGetTextDatum(name);
647 69 : values[i++] = Int64GetDatum(n);
648 69 : values[i++] = Float8GetDatum(n_pct);
649 69 : values[i++] = Int64GetDatum(rec_len);
650 69 : values[i++] = Float8GetDatum(rec_len_pct);
651 69 : values[i++] = Int64GetDatum(fpi_len);
652 69 : values[i++] = Float8GetDatum(fpi_len_pct);
653 69 : values[i++] = Int64GetDatum(tot_len);
654 69 : values[i++] = Float8GetDatum(tot_len_pct);
655 :
656 : Assert(i == ncols);
657 69 : }
658 :
659 : /*
660 : * Get summary statistics about the records seen so far.
661 : */
662 : static void
663 3 : GetXLogSummaryStats(XLogStats *stats, ReturnSetInfo *rsinfo,
664 : Datum *values, bool *nulls, uint32 ncols,
665 : bool stats_per_record)
666 : {
667 : MemoryContext old_cxt;
668 : MemoryContext tmp_cxt;
669 3 : uint64 total_count = 0;
670 3 : uint64 total_rec_len = 0;
671 3 : uint64 total_fpi_len = 0;
672 3 : uint64 total_len = 0;
673 : int ri;
674 :
675 : /*
676 : * Each row shows its percentages of the total, so make a first pass to
677 : * calculate column totals.
678 : */
679 771 : for (ri = 0; ri <= RM_MAX_ID; ri++)
680 : {
681 768 : if (!RmgrIdIsValid(ri))
682 315 : continue;
683 :
684 453 : total_count += stats->rmgr_stats[ri].count;
685 453 : total_rec_len += stats->rmgr_stats[ri].rec_len;
686 453 : total_fpi_len += stats->rmgr_stats[ri].fpi_len;
687 : }
688 3 : total_len = total_rec_len + total_fpi_len;
689 :
690 3 : tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
691 : "GetXLogSummaryStats temporary cxt",
692 : ALLOCSET_DEFAULT_SIZES);
693 :
694 771 : for (ri = 0; ri <= RM_MAX_ID; ri++)
695 : {
696 : uint64 count;
697 : uint64 rec_len;
698 : uint64 fpi_len;
699 : uint64 tot_len;
700 : RmgrData desc;
701 :
702 768 : if (!RmgrIdIsValid(ri))
703 699 : continue;
704 :
705 453 : if (!RmgrIdExists(ri))
706 384 : continue;
707 :
708 69 : desc = GetRmgr(ri);
709 :
710 69 : if (stats_per_record)
711 : {
712 : int rj;
713 :
714 0 : for (rj = 0; rj < MAX_XLINFO_TYPES; rj++)
715 : {
716 : const char *id;
717 :
718 0 : count = stats->record_stats[ri][rj].count;
719 0 : rec_len = stats->record_stats[ri][rj].rec_len;
720 0 : fpi_len = stats->record_stats[ri][rj].fpi_len;
721 0 : tot_len = rec_len + fpi_len;
722 :
723 : /* Skip undefined combinations and ones that didn't occur */
724 0 : if (count == 0)
725 0 : continue;
726 :
727 0 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
728 :
729 : /* the upper four bits in xl_info are the rmgr's */
730 0 : id = desc.rm_identify(rj << 4);
731 0 : if (id == NULL)
732 0 : id = psprintf("UNKNOWN (%x)", rj << 4);
733 :
734 0 : FillXLogStatsRow(psprintf("%s/%s", desc.rm_name, id), count,
735 : total_count, rec_len, total_rec_len, fpi_len,
736 : total_fpi_len, tot_len, total_len,
737 : values, nulls, ncols);
738 :
739 0 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
740 : values, nulls);
741 :
742 : /* clean up and switch back */
743 0 : MemoryContextSwitchTo(old_cxt);
744 0 : MemoryContextReset(tmp_cxt);
745 : }
746 : }
747 : else
748 : {
749 69 : count = stats->rmgr_stats[ri].count;
750 69 : rec_len = stats->rmgr_stats[ri].rec_len;
751 69 : fpi_len = stats->rmgr_stats[ri].fpi_len;
752 69 : tot_len = rec_len + fpi_len;
753 :
754 69 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
755 :
756 69 : FillXLogStatsRow(desc.rm_name, count, total_count, rec_len,
757 : total_rec_len, fpi_len, total_fpi_len, tot_len,
758 : total_len, values, nulls, ncols);
759 :
760 69 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
761 : values, nulls);
762 :
763 : /* clean up and switch back */
764 69 : MemoryContextSwitchTo(old_cxt);
765 69 : MemoryContextReset(tmp_cxt);
766 : }
767 : }
768 :
769 3 : MemoryContextDelete(tmp_cxt);
770 3 : }
771 :
772 : /*
773 : * Get WAL stats between start LSN and end LSN.
774 : */
775 : static void
776 4 : GetWalStats(FunctionCallInfo fcinfo, XLogRecPtr start_lsn, XLogRecPtr end_lsn,
777 : bool stats_per_record)
778 : {
779 : #define PG_GET_WAL_STATS_COLS 9
780 : XLogReaderState *xlogreader;
781 4 : XLogStats stats = {0};
782 4 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
783 4 : Datum values[PG_GET_WAL_STATS_COLS] = {0};
784 4 : bool nulls[PG_GET_WAL_STATS_COLS] = {0};
785 :
786 : Assert(start_lsn <= end_lsn);
787 :
788 4 : InitMaterializedSRF(fcinfo, 0);
789 :
790 4 : xlogreader = InitXLogReaderState(start_lsn);
791 :
792 20 : while (ReadNextXLogRecord(xlogreader) &&
793 15 : xlogreader->EndRecPtr <= end_lsn)
794 : {
795 14 : XLogRecStoreStats(&stats, xlogreader);
796 :
797 14 : CHECK_FOR_INTERRUPTS();
798 : }
799 :
800 3 : pfree(xlogreader->private_data);
801 3 : XLogReaderFree(xlogreader);
802 :
803 3 : GetXLogSummaryStats(&stats, rsinfo, values, nulls,
804 : PG_GET_WAL_STATS_COLS,
805 : stats_per_record);
806 :
807 : #undef PG_GET_WAL_STATS_COLS
808 3 : }
809 :
810 : /*
811 : * Get stats of all WAL records between start LSN and end LSN.
812 : */
813 : Datum
814 5 : pg_get_wal_stats(PG_FUNCTION_ARGS)
815 : {
816 5 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
817 5 : XLogRecPtr end_lsn = PG_GETARG_LSN(1);
818 5 : bool stats_per_record = PG_GETARG_BOOL(2);
819 :
820 5 : ValidateInputLSNs(start_lsn, &end_lsn);
821 3 : GetWalStats(fcinfo, start_lsn, end_lsn, stats_per_record);
822 :
823 2 : PG_RETURN_VOID();
824 : }
825 :
826 : /*
827 : * The following functions have been removed in newer versions in 1.1, but
828 : * they are kept around for compatibility.
829 : */
830 : Datum
831 2 : pg_get_wal_records_info_till_end_of_wal(PG_FUNCTION_ARGS)
832 : {
833 2 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
834 2 : XLogRecPtr end_lsn = GetCurrentLSN();
835 :
836 2 : if (start_lsn > end_lsn)
837 1 : ereport(ERROR,
838 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
839 : errmsg("WAL start LSN must be less than current LSN"),
840 : errdetail("Current WAL LSN on the database system is at %X/%08X.",
841 : LSN_FORMAT_ARGS(end_lsn))));
842 :
843 1 : GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
844 :
845 1 : PG_RETURN_VOID();
846 : }
847 :
848 : Datum
849 2 : pg_get_wal_stats_till_end_of_wal(PG_FUNCTION_ARGS)
850 : {
851 2 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
852 2 : XLogRecPtr end_lsn = GetCurrentLSN();
853 2 : bool stats_per_record = PG_GETARG_BOOL(1);
854 :
855 2 : if (start_lsn > end_lsn)
856 1 : ereport(ERROR,
857 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
858 : errmsg("WAL start LSN must be less than current LSN"),
859 : errdetail("Current WAL LSN on the database system is at %X/%08X.",
860 : LSN_FORMAT_ARGS(end_lsn))));
861 :
862 1 : GetWalStats(fcinfo, start_lsn, end_lsn, stats_per_record);
863 :
864 1 : PG_RETURN_VOID();
865 : }
|