Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * archive_waldump.c
4 : * A generic facility for reading WAL data from tar archives via archive
5 : * streamer.
6 : *
7 : * Portions Copyright (c) 2026, PostgreSQL Global Development Group
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_waldump/archive_waldump.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 :
17 : #include <unistd.h>
18 :
19 : #include "access/xlog_internal.h"
20 : #include "common/file_perm.h"
21 : #include "common/hashfn.h"
22 : #include "common/logging.h"
23 : #include "fe_utils/simple_list.h"
24 : #include "pg_waldump.h"
25 :
26 : /*
27 : * How many bytes should we try to read from a file at once?
28 : */
29 : #define READ_CHUNK_SIZE (128 * 1024)
30 :
31 : /* Temporary directory for spilled WAL segment files */
32 : char *TmpWalSegDir = NULL;
33 :
34 : /*
35 : * Check if the start segment number is zero; this indicates a request to read
36 : * any WAL file.
37 : */
38 : #define READ_ANY_WAL(privateInfo) ((privateInfo)->start_segno == 0)
39 :
40 : /*
41 : * Hash entry representing a WAL segment retrieved from the archive.
42 : *
43 : * While WAL segments are typically read sequentially, individual entries
44 : * maintain their own buffers for the following reasons:
45 : *
46 : * 1. Boundary Handling: The archive streamer provides a continuous byte
47 : * stream. A single streaming chunk may contain the end of one WAL segment
48 : * and the start of the next. Separate buffers allow us to easily
49 : * partition and track these bytes by their respective segments.
50 : *
51 : * 2. Out-of-Order Support: Dedicated buffers simplify logic when segments
52 : * are archived or retrieved out of sequence.
53 : *
54 : * To minimize the memory footprint, entries and their associated buffers are
55 : * freed immediately once consumed. Since pg_waldump does not request the same
56 : * bytes twice, a segment is discarded as soon as pg_waldump moves past it.
57 : */
58 : typedef struct ArchivedWALFile
59 : {
60 : uint32 status; /* hash status */
61 : const char *fname; /* hash key: WAL segment name */
62 :
63 : StringInfo buf; /* holds WAL bytes read from archive */
64 : bool spilled; /* true if the WAL data was spilled to a
65 : * temporary file */
66 :
67 : int read_len; /* total bytes received from archive for this
68 : * segment, including already-consumed data */
69 : } ArchivedWALFile;
70 :
71 : static uint32 hash_string_pointer(const char *s);
72 : #define SH_PREFIX ArchivedWAL
73 : #define SH_ELEMENT_TYPE ArchivedWALFile
74 : #define SH_KEY_TYPE const char *
75 : #define SH_KEY fname
76 : #define SH_HASH_KEY(tb, key) hash_string_pointer(key)
77 : #define SH_EQUAL(tb, a, b) (strcmp(a, b) == 0)
78 : #define SH_SCOPE static inline
79 : #define SH_RAW_ALLOCATOR pg_malloc0
80 : #define SH_DECLARE
81 : #define SH_DEFINE
82 : #include "lib/simplehash.h"
83 :
84 : typedef struct astreamer_waldump
85 : {
86 : astreamer base;
87 : XLogDumpPrivate *privateInfo;
88 : } astreamer_waldump;
89 :
90 : static ArchivedWALFile *get_archive_wal_entry(const char *fname,
91 : XLogDumpPrivate *privateInfo);
92 : static int read_archive_file(XLogDumpPrivate *privateInfo, Size count);
93 : static void setup_tmpwal_dir(const char *waldir);
94 : static void cleanup_tmpwal_dir_atexit(void);
95 :
96 : static FILE *prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo);
97 : static void perform_tmp_write(const char *fname, StringInfo buf, FILE *file);
98 :
99 : static astreamer *astreamer_waldump_new(XLogDumpPrivate *privateInfo);
100 : static void astreamer_waldump_content(astreamer *streamer,
101 : astreamer_member *member,
102 : const char *data, int len,
103 : astreamer_archive_context context);
104 : static void astreamer_waldump_finalize(astreamer *streamer);
105 : static void astreamer_waldump_free(astreamer *streamer);
106 :
107 : static bool member_is_wal_file(astreamer_waldump *mystreamer,
108 : astreamer_member *member,
109 : char **fname);
110 :
111 : static const astreamer_ops astreamer_waldump_ops = {
112 : .content = astreamer_waldump_content,
113 : .finalize = astreamer_waldump_finalize,
114 : .free = astreamer_waldump_free
115 : };
116 :
117 : /*
118 : * Initializes the tar archive reader: opens the archive, builds a hash table
119 : * for WAL entries, reads ahead until a full WAL page header is available to
120 : * determine the WAL segment size, and computes start/end segment numbers for
121 : * filtering.
122 : */
123 : void
124 52 : init_archive_reader(XLogDumpPrivate *privateInfo,
125 : pg_compress_algorithm compression)
126 : {
127 : int fd;
128 : astreamer *streamer;
129 52 : ArchivedWALFile *entry = NULL;
130 : XLogLongPageHeader longhdr;
131 : XLogSegNo segno;
132 : TimeLineID timeline;
133 :
134 : /* Open tar archive and store its file descriptor */
135 52 : fd = open_file_in_directory(privateInfo->archive_dir,
136 52 : privateInfo->archive_name);
137 :
138 52 : if (fd < 0)
139 0 : pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
140 :
141 52 : privateInfo->archive_fd = fd;
142 :
143 52 : streamer = astreamer_waldump_new(privateInfo);
144 :
145 : /* We must first parse the tar archive. */
146 52 : streamer = astreamer_tar_parser_new(streamer);
147 :
148 : /* If the archive is compressed, decompress before parsing. */
149 52 : if (compression == PG_COMPRESSION_GZIP)
150 17 : streamer = astreamer_gzip_decompressor_new(streamer);
151 35 : else if (compression == PG_COMPRESSION_LZ4)
152 4 : streamer = astreamer_lz4_decompressor_new(streamer);
153 31 : else if (compression == PG_COMPRESSION_ZSTD)
154 0 : streamer = astreamer_zstd_decompressor_new(streamer);
155 :
156 52 : privateInfo->archive_streamer = streamer;
157 :
158 : /*
159 : * Allocate a buffer for reading the archive file to facilitate content
160 : * decoding; read requests must not exceed the allocated buffer size.
161 : */
162 52 : privateInfo->archive_read_buf = pg_malloc(READ_CHUNK_SIZE);
163 :
164 : #ifdef USE_ASSERT_CHECKING
165 : privateInfo->archive_read_buf_size = READ_CHUNK_SIZE;
166 : #endif
167 :
168 : /*
169 : * Hash table storing WAL entries read from the archive with an arbitrary
170 : * initial size.
171 : */
172 52 : privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
173 :
174 : /*
175 : * Read until we have at least one full WAL page (XLOG_BLCKSZ bytes) from
176 : * the first WAL segment in the archive so we can extract the WAL segment
177 : * size from the long page header.
178 : */
179 9590 : while (entry == NULL || entry->buf->len < XLOG_BLCKSZ)
180 : {
181 9538 : if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0)
182 0 : pg_fatal("could not find WAL in archive \"%s\"",
183 : privateInfo->archive_name);
184 :
185 9538 : entry = privateInfo->cur_file;
186 : }
187 :
188 : /* Extract the WAL segment size from the long page header */
189 52 : longhdr = (XLogLongPageHeader) entry->buf->data;
190 :
191 52 : if (!IsValidWalSegSize(longhdr->xlp_seg_size))
192 : {
193 0 : pg_log_error(ngettext("invalid WAL segment size in WAL file from archive \"%s\" (%d byte)",
194 : "invalid WAL segment size in WAL file from archive \"%s\" (%d bytes)",
195 : longhdr->xlp_seg_size),
196 : privateInfo->archive_name, longhdr->xlp_seg_size);
197 0 : pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
198 0 : exit(1);
199 : }
200 :
201 52 : privateInfo->segsize = longhdr->xlp_seg_size;
202 :
203 : /*
204 : * With the WAL segment size available, we can now initialize the
205 : * dependent start and end segment numbers.
206 : */
207 : Assert(!XLogRecPtrIsInvalid(privateInfo->startptr));
208 52 : XLByteToSeg(privateInfo->startptr, privateInfo->start_segno,
209 : privateInfo->segsize);
210 :
211 52 : if (!XLogRecPtrIsInvalid(privateInfo->endptr))
212 48 : XLByteToSeg(privateInfo->endptr, privateInfo->end_segno,
213 : privateInfo->segsize);
214 :
215 : /*
216 : * This WAL record was fetched before the filtering parameters
217 : * (start_segno and end_segno) were fully initialized. Perform the
218 : * relevance check against the user-provided range now; if the WAL falls
219 : * outside this range, remove it from the hash table. Subsequent WAL will
220 : * be filtered automatically by the archive streamer using the updated
221 : * start_segno and end_segno values.
222 : */
223 52 : XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
224 52 : if (privateInfo->timeline != timeline ||
225 52 : privateInfo->start_segno > segno ||
226 52 : privateInfo->end_segno < segno)
227 13 : free_archive_wal_entry(entry->fname, privateInfo);
228 52 : }
229 :
230 : /*
231 : * Release the archive streamer chain and close the archive file.
232 : */
233 : void
234 48 : free_archive_reader(XLogDumpPrivate *privateInfo)
235 : {
236 : /*
237 : * NB: Normally, astreamer_finalize() is called before astreamer_free() to
238 : * flush any remaining buffered data or to ensure the end of the tar
239 : * archive is reached. However, when decoding WAL, once we hit the end
240 : * LSN, any remaining buffered data or unread portion of the archive can
241 : * be safely ignored.
242 : */
243 48 : astreamer_free(privateInfo->archive_streamer);
244 :
245 : /* Free any remaining hash table entries and their buffers. */
246 48 : if (privateInfo->archive_wal_htab != NULL)
247 : {
248 : ArchivedWAL_iterator iter;
249 : ArchivedWALFile *entry;
250 :
251 48 : ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
252 147 : while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
253 99 : &iter)) != NULL)
254 : {
255 51 : if (entry->buf != NULL)
256 51 : destroyStringInfo(entry->buf);
257 : }
258 48 : ArchivedWAL_destroy(privateInfo->archive_wal_htab);
259 48 : privateInfo->archive_wal_htab = NULL;
260 : }
261 :
262 : /* Free the reusable read buffer. */
263 48 : if (privateInfo->archive_read_buf != NULL)
264 : {
265 48 : pg_free(privateInfo->archive_read_buf);
266 48 : privateInfo->archive_read_buf = NULL;
267 : }
268 :
269 : /* Close the file. */
270 48 : if (close(privateInfo->archive_fd) != 0)
271 0 : pg_log_error("could not close file \"%s\": %m",
272 : privateInfo->archive_name);
273 48 : }
274 :
275 : /*
276 : * Copies the requested WAL data from the hash entry's buffer into readBuff.
277 : * If the buffer does not yet contain the needed bytes, fetches more data from
278 : * the tar archive via the archive streamer.
279 : */
280 : int
281 27342 : read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr,
282 : Size count, char *readBuff)
283 : {
284 27342 : char *p = readBuff;
285 27342 : Size nbytes = count;
286 27342 : XLogRecPtr recptr = targetPagePtr;
287 27342 : int segsize = privateInfo->segsize;
288 : XLogSegNo segno;
289 : char fname[MAXFNAMELEN];
290 : ArchivedWALFile *entry;
291 :
292 : /* Identify the segment and locate its entry in the archive hash */
293 27342 : XLByteToSeg(targetPagePtr, segno, segsize);
294 27342 : XLogFileName(fname, privateInfo->timeline, segno, segsize);
295 27342 : entry = get_archive_wal_entry(fname, privateInfo);
296 :
297 57887 : while (nbytes > 0)
298 : {
299 30545 : char *buf = entry->buf->data;
300 30545 : int bufLen = entry->buf->len;
301 : XLogRecPtr endPtr;
302 : XLogRecPtr startPtr;
303 :
304 : /*
305 : * Calculate the LSN range currently residing in the buffer.
306 : *
307 : * read_len tracks total bytes received for this segment (including
308 : * already-discarded data), so endPtr is the LSN just past the last
309 : * buffered byte, and startPtr is the LSN of the first buffered byte.
310 : */
311 30545 : XLogSegNoOffsetToRecPtr(segno, entry->read_len, segsize, endPtr);
312 30545 : startPtr = endPtr - bufLen;
313 :
314 : /*
315 : * Copy the requested WAL record if it exists in the buffer.
316 : */
317 30545 : if (bufLen > 0 && startPtr <= recptr && recptr < endPtr)
318 28316 : {
319 : int copyBytes;
320 28316 : int offset = recptr - startPtr;
321 :
322 : /*
323 : * Given startPtr <= recptr < endPtr and a total buffer size
324 : * 'bufLen', the offset (recptr - startPtr) will always be less
325 : * than 'bufLen'.
326 : */
327 : Assert(offset < bufLen);
328 :
329 28316 : copyBytes = Min(nbytes, bufLen - offset);
330 28316 : memcpy(p, buf + offset, copyBytes);
331 :
332 : /* Update state for read */
333 28316 : recptr += copyBytes;
334 28316 : nbytes -= copyBytes;
335 28316 : p += copyBytes;
336 : }
337 : else
338 : {
339 : /*
340 : * Before starting the actual decoding loop, pg_waldump tries to
341 : * locate the first valid record from the user-specified start
342 : * position, which might not be the start of a WAL record and
343 : * could fall in the middle of a record that spans multiple pages.
344 : * Consequently, the valid start position the decoder is looking
345 : * for could be far away from that initial position.
346 : *
347 : * This may involve reading across multiple pages, and this
348 : * pre-reading fetches data in multiple rounds from the archive
349 : * streamer; normally, we would throw away existing buffer
350 : * contents to fetch the next set of data, but that existing data
351 : * might be needed once the main loop starts. Because previously
352 : * read data cannot be re-read by the archive streamer, we delay
353 : * resetting the buffer until the main decoding loop is entered.
354 : *
355 : * Once pg_waldump has entered the main loop, it may re-read the
356 : * currently active page, but never an older one; therefore, any
357 : * fully consumed WAL data preceding the current page can then be
358 : * safely discarded.
359 : */
360 2229 : if (privateInfo->decoding_started)
361 : {
362 972 : resetStringInfo(entry->buf);
363 :
364 : /*
365 : * Push back the partial page data for the current page to the
366 : * buffer, ensuring a full page remains available for
367 : * re-reading if requested.
368 : */
369 972 : if (p > readBuff)
370 : {
371 : Assert((count - nbytes) > 0);
372 972 : appendBinaryStringInfo(entry->buf, readBuff, count - nbytes);
373 : }
374 : }
375 :
376 : /*
377 : * Now, fetch more data. Raise an error if the archive streamer
378 : * has moved past our segment (meaning the WAL file in the archive
379 : * is shorter than expected) or if reading the archive reached
380 : * EOF.
381 : */
382 2229 : if (privateInfo->cur_file != entry)
383 0 : pg_fatal("WAL segment \"%s\" in archive \"%s\" is too short: read %lld of %lld bytes",
384 : fname, privateInfo->archive_name,
385 : (long long int) (count - nbytes),
386 : (long long int) count);
387 2229 : if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
388 0 : pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %lld of %lld bytes",
389 : privateInfo->archive_name, fname,
390 : (long long int) (count - nbytes),
391 : (long long int) count);
392 : }
393 : }
394 :
395 : /*
396 : * Should have successfully read all the requested bytes or reported a
397 : * failure before this point.
398 : */
399 : Assert(nbytes == 0);
400 :
401 : /*
402 : * Return count unchanged; the caller expects this convention, matching
403 : * the routine that reads WAL pages from physical files.
404 : */
405 27342 : return count;
406 : }
407 :
408 : /*
409 : * Releases the buffer of a WAL entry that is no longer needed, preventing the
410 : * accumulation of irrelevant WAL data. Also removes any associated temporary
411 : * file and clears privateInfo->cur_file if it points to this entry, so the
412 : * archive streamer skips subsequent data for it.
413 : */
414 : void
415 43 : free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
416 : {
417 : ArchivedWALFile *entry;
418 :
419 43 : entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
420 :
421 43 : if (entry == NULL)
422 0 : return;
423 :
424 : /* Destroy the buffer */
425 43 : destroyStringInfo(entry->buf);
426 43 : entry->buf = NULL;
427 :
428 : /* Remove temporary file if any */
429 43 : if (entry->spilled)
430 : {
431 : char fpath[MAXPGPATH];
432 :
433 2 : snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
434 :
435 2 : if (unlink(fpath) == 0)
436 2 : pg_log_debug("removed file \"%s\"", fpath);
437 : }
438 :
439 : /* Clear cur_file if it points to the entry being freed */
440 43 : if (privateInfo->cur_file == entry)
441 13 : privateInfo->cur_file = NULL;
442 :
443 43 : ArchivedWAL_delete_item(privateInfo->archive_wal_htab, entry);
444 : }
445 :
446 : /*
447 : * Returns the archived WAL entry from the hash table if it already exists.
448 : * Otherwise, reads more data from the archive until the requested entry is
449 : * found. If the archive streamer is reading a WAL file from the archive that
450 : * is not currently needed, that data is spilled to a temporary file for later
451 : * retrieval.
452 : */
453 : static ArchivedWALFile *
454 27342 : get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
455 : {
456 27342 : ArchivedWALFile *entry = NULL;
457 27342 : FILE *write_fp = NULL;
458 :
459 : /*
460 : * Search the hash table first. If the entry is found, return it.
461 : * Otherwise, the requested WAL entry hasn't been read from the archive
462 : * yet; invoke the archive streamer to fetch it.
463 : */
464 : while (1)
465 : {
466 : /*
467 : * Search hash table.
468 : *
469 : * We perform the search inside the loop because a single iteration of
470 : * the archive reader may decompress and extract multiple files into
471 : * the hash table. One of these newly added files could be the one we
472 : * are seeking.
473 : */
474 31199 : entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
475 :
476 31199 : if (entry != NULL)
477 27342 : return entry;
478 :
479 : /*
480 : * Capture the current entry before calling read_archive_file(),
481 : * because cur_file may advance to a new segment during streaming. We
482 : * hold this reference so we can flush any remaining buffer data and
483 : * close the write handle once we detect that cur_file has moved on.
484 : */
485 3857 : entry = privateInfo->cur_file;
486 :
487 : /*
488 : * Fetch more data either when no current file is being tracked or
489 : * when its buffer has been fully flushed to the temporary file.
490 : */
491 3857 : if (entry == NULL || entry->buf->len == 0)
492 : {
493 3840 : if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
494 0 : break; /* archive file ended */
495 : }
496 :
497 : /*
498 : * Archive streamer is reading a non-WAL file or an irrelevant WAL
499 : * file.
500 : */
501 3857 : if (entry == NULL)
502 1664 : continue;
503 :
504 : /*
505 : * The streamer is producing a WAL segment that isn't the one asked
506 : * for; it must be arriving out of order. Spill its data to disk so
507 : * it can be read back when needed.
508 : */
509 : Assert(strcmp(fname, entry->fname) != 0);
510 :
511 : /* Create a temporary file if one does not already exist */
512 2193 : if (!entry->spilled)
513 : {
514 17 : write_fp = prepare_tmp_write(entry->fname, privateInfo);
515 17 : entry->spilled = true;
516 : }
517 :
518 : /* Flush data from the buffer to the file */
519 2193 : perform_tmp_write(entry->fname, entry->buf, write_fp);
520 2193 : resetStringInfo(entry->buf);
521 :
522 : /*
523 : * If cur_file changed since we captured entry above, the archive
524 : * streamer has finished this segment and moved on. Close its spill
525 : * file handle so data is flushed to disk before the next segment
526 : * starts writing to a different handle.
527 : */
528 2193 : if (entry != privateInfo->cur_file && write_fp != NULL)
529 : {
530 17 : fclose(write_fp);
531 17 : write_fp = NULL;
532 : }
533 : }
534 :
535 : /* Requested WAL segment not found */
536 0 : pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
537 : fname, privateInfo->archive_name);
538 : }
539 :
540 : /*
541 : * Reads a chunk from the archive file and passes it through the streamer
542 : * pipeline for decompression (if needed) and tar member extraction.
543 : */
544 : static int
545 15607 : read_archive_file(XLogDumpPrivate *privateInfo, Size count)
546 : {
547 : int rc;
548 :
549 : /* The read request must not exceed the allocated buffer size. */
550 : Assert(privateInfo->archive_read_buf_size >= count);
551 :
552 15607 : rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf, count);
553 15607 : if (rc < 0)
554 0 : pg_fatal("could not read file \"%s\": %m",
555 : privateInfo->archive_name);
556 :
557 : /*
558 : * Decompress (if required), and then parse the previously read contents
559 : * of the tar file.
560 : */
561 15607 : if (rc > 0)
562 15607 : astreamer_content(privateInfo->archive_streamer, NULL,
563 15607 : privateInfo->archive_read_buf, rc,
564 : ASTREAMER_UNKNOWN);
565 :
566 15607 : return rc;
567 : }
568 :
569 : /*
570 : * Set up a temporary directory to temporarily store WAL segments.
571 : */
572 : static void
573 15 : setup_tmpwal_dir(const char *waldir)
574 : {
575 15 : const char *tmpdir = getenv("TMPDIR");
576 : char *template;
577 :
578 : Assert(TmpWalSegDir == NULL);
579 :
580 : /*
581 : * Use the directory specified by the TMPDIR environment variable. If it's
582 : * not set, fall back to the provided WAL directory to store WAL files
583 : * temporarily.
584 : */
585 15 : template = psprintf("%s/waldump_tmp-XXXXXX",
586 : tmpdir ? tmpdir : waldir);
587 15 : TmpWalSegDir = mkdtemp(template);
588 :
589 15 : if (TmpWalSegDir == NULL)
590 0 : pg_fatal("could not create directory \"%s\": %m", template);
591 :
592 15 : canonicalize_path(TmpWalSegDir);
593 :
594 15 : pg_log_debug("created directory \"%s\"", TmpWalSegDir);
595 15 : }
596 :
597 : /*
598 : * Remove temporary directory at exit, if any.
599 : */
600 : static void
601 15 : cleanup_tmpwal_dir_atexit(void)
602 : {
603 : Assert(TmpWalSegDir != NULL);
604 :
605 15 : rmtree(TmpWalSegDir, true);
606 :
607 15 : TmpWalSegDir = NULL;
608 15 : }
609 :
610 : /*
611 : * Open a file in the temporary spill directory for writing an out-of-order
612 : * WAL segment, creating the directory and registering the cleanup callback
613 : * if not already done. Returns the open file handle.
614 : */
615 : static FILE *
616 17 : prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo)
617 : {
618 : char fpath[MAXPGPATH];
619 : FILE *file;
620 :
621 : /*
622 : * Setup temporary directory to store WAL segments and set up an exit
623 : * callback to remove it upon completion if not already.
624 : */
625 17 : if (unlikely(TmpWalSegDir == NULL))
626 : {
627 15 : setup_tmpwal_dir(privateInfo->archive_dir);
628 15 : atexit(cleanup_tmpwal_dir_atexit);
629 : }
630 :
631 17 : snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
632 :
633 : /* Open the spill file for writing */
634 17 : file = fopen(fpath, PG_BINARY_W);
635 17 : if (file == NULL)
636 0 : pg_fatal("could not create file \"%s\": %m", fpath);
637 :
638 : #ifndef WIN32
639 17 : if (chmod(fpath, pg_file_create_mode))
640 0 : pg_fatal("could not set permissions on file \"%s\": %m",
641 : fpath);
642 : #endif
643 :
644 17 : pg_log_debug("spilling to temporary file \"%s\"", fpath);
645 :
646 17 : return file;
647 : }
648 :
649 : /*
650 : * Write buffer data to the given file handle.
651 : */
652 : static void
653 2193 : perform_tmp_write(const char *fname, StringInfo buf, FILE *file)
654 : {
655 : Assert(file);
656 :
657 2193 : errno = 0;
658 2193 : if (buf->len > 0 && fwrite(buf->data, buf->len, 1, file) != 1)
659 : {
660 : /*
661 : * If write didn't set errno, assume problem is no disk space
662 : */
663 0 : if (errno == 0)
664 0 : errno = ENOSPC;
665 0 : pg_fatal("could not write to file \"%s/%s\": %m", TmpWalSegDir, fname);
666 : }
667 2193 : }
668 :
669 : /*
670 : * Create an astreamer that can read WAL from tar file.
671 : */
672 : static astreamer *
673 52 : astreamer_waldump_new(XLogDumpPrivate *privateInfo)
674 : {
675 : astreamer_waldump *streamer;
676 :
677 52 : streamer = palloc0_object(astreamer_waldump);
678 52 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
679 : &astreamer_waldump_ops;
680 :
681 52 : streamer->privateInfo = privateInfo;
682 :
683 52 : return &streamer->base;
684 : }
685 :
686 : /*
687 : * Main entry point of the archive streamer for reading WAL data from a tar
688 : * file. If a member is identified as a valid WAL file, a hash entry is created
689 : * for it, and its contents are copied into that entry's buffer, making them
690 : * accessible to the decoding routine.
691 : */
692 : static void
693 889154 : astreamer_waldump_content(astreamer *streamer, astreamer_member *member,
694 : const char *data, int len,
695 : astreamer_archive_context context)
696 : {
697 889154 : astreamer_waldump *mystreamer = (astreamer_waldump *) streamer;
698 889154 : XLogDumpPrivate *privateInfo = mystreamer->privateInfo;
699 :
700 : Assert(context != ASTREAMER_UNKNOWN);
701 :
702 889154 : switch (context)
703 : {
704 8699 : case ASTREAMER_MEMBER_HEADER:
705 : {
706 8699 : char *fname = NULL;
707 : ArchivedWALFile *entry;
708 : bool found;
709 :
710 8699 : pg_log_debug("reading \"%s\"", member->pathname);
711 :
712 8699 : if (!member_is_wal_file(mystreamer, member, &fname))
713 8601 : break;
714 :
715 : /*
716 : * Skip range filtering during initial startup, before the WAL
717 : * segment size and segment number bounds are known.
718 : */
719 110 : if (!READ_ANY_WAL(privateInfo))
720 : {
721 : XLogSegNo segno;
722 : TimeLineID timeline;
723 :
724 : /*
725 : * Skip the segment if the timeline does not match, if it
726 : * falls outside the caller-specified range.
727 : */
728 58 : XLogFromFileName(fname, &timeline, &segno, privateInfo->segsize);
729 58 : if (privateInfo->timeline != timeline ||
730 58 : privateInfo->start_segno > segno ||
731 58 : privateInfo->end_segno < segno)
732 : {
733 12 : pfree(fname);
734 12 : break;
735 : }
736 : }
737 :
738 98 : entry = ArchivedWAL_insert(privateInfo->archive_wal_htab,
739 : fname, &found);
740 :
741 : /*
742 : * Shouldn't happen, but if it does, simply ignore the
743 : * duplicate WAL file.
744 : */
745 98 : if (found)
746 : {
747 0 : pg_log_warning("ignoring duplicate WAL \"%s\" found in archive \"%s\"",
748 : member->pathname, privateInfo->archive_name);
749 0 : pfree(fname);
750 0 : break;
751 : }
752 :
753 98 : entry->buf = makeStringInfo();
754 98 : entry->spilled = false;
755 98 : entry->read_len = 0;
756 98 : privateInfo->cur_file = entry;
757 : }
758 98 : break;
759 :
760 871780 : case ASTREAMER_MEMBER_CONTENTS:
761 871780 : if (privateInfo->cur_file)
762 : {
763 516869 : appendBinaryStringInfo(privateInfo->cur_file->buf, data, len);
764 516869 : privateInfo->cur_file->read_len += len;
765 : }
766 871780 : break;
767 :
768 8675 : case ASTREAMER_MEMBER_TRAILER:
769 :
770 : /*
771 : * End of this tar member; mark cur_file NULL so subsequent
772 : * content callbacks (if any) know no WAL file is currently
773 : * active.
774 : */
775 8675 : privateInfo->cur_file = NULL;
776 8675 : break;
777 :
778 0 : case ASTREAMER_ARCHIVE_TRAILER:
779 0 : break;
780 :
781 0 : default:
782 : /* Shouldn't happen. */
783 0 : pg_fatal("unexpected state while parsing tar file");
784 : }
785 889154 : }
786 :
787 : /*
788 : * End-of-stream processing for an astreamer_waldump stream. This is a
789 : * terminal streamer so it must have no successor.
790 : */
791 : static void
792 0 : astreamer_waldump_finalize(astreamer *streamer)
793 : {
794 : Assert(streamer->bbs_next == NULL);
795 0 : }
796 :
797 : /*
798 : * Free memory associated with an astreamer_waldump stream.
799 : */
800 : static void
801 48 : astreamer_waldump_free(astreamer *streamer)
802 : {
803 : Assert(streamer->bbs_next == NULL);
804 48 : pfree(streamer);
805 48 : }
806 :
807 : /*
808 : * Returns true if the archive member name matches the WAL naming format. If
809 : * successful, it also outputs the WAL segment name.
810 : */
811 : static bool
812 8699 : member_is_wal_file(astreamer_waldump *mystreamer, astreamer_member *member,
813 : char **fname)
814 : {
815 : int pathlen;
816 : char pathname[MAXPGPATH];
817 : char *filename;
818 :
819 : /* We are only interested in normal files */
820 8699 : if (member->is_directory || member->is_link)
821 281 : return false;
822 :
823 8418 : if (strlen(member->pathname) < XLOG_FNAME_LEN)
824 8225 : return false;
825 :
826 : /*
827 : * For a correct comparison, we must remove any '.' or '..' components
828 : * from the member pathname. Similar to member_verify_header(), we prepend
829 : * './' to the path so that canonicalize_path() can properly resolve and
830 : * strip these references from the tar member name.
831 : */
832 193 : snprintf(pathname, MAXPGPATH, "./%s", member->pathname);
833 193 : canonicalize_path(pathname);
834 193 : pathlen = strlen(pathname);
835 :
836 : /* Skip files in subdirectories other than pg_wal/ */
837 193 : if (pathlen > XLOG_FNAME_LEN &&
838 91 : strncmp(pathname, XLOGDIR, strlen(XLOGDIR)) != 0)
839 83 : return false;
840 :
841 : /* WAL file may appear with a full path (e.g., pg_wal/<name>) */
842 110 : filename = pathname + (pathlen - XLOG_FNAME_LEN);
843 110 : if (!IsXLogFileName(filename))
844 0 : return false;
845 :
846 110 : *fname = pnstrdup(filename, XLOG_FNAME_LEN);
847 :
848 110 : return true;
849 : }
850 :
851 : /*
852 : * Helper function for WAL file hash table.
853 : */
854 : static uint32
855 31383 : hash_string_pointer(const char *s)
856 : {
857 31383 : unsigned char *ss = (unsigned char *) s;
858 :
859 31383 : return hash_bytes(ss, strlen(s));
860 : }
|