Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * archive_waldump.c
4 : * A generic facility for reading WAL data from tar archives via archive
5 : * streamer.
6 : *
7 : * Portions Copyright (c) 2026, PostgreSQL Global Development Group
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_waldump/archive_waldump.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 :
17 : #include <unistd.h>
18 :
19 : #include "access/xlog_internal.h"
20 : #include "common/file_perm.h"
21 : #include "common/hashfn.h"
22 : #include "common/logging.h"
23 : #include "fe_utils/simple_list.h"
24 : #include "pg_waldump.h"
25 :
26 : /*
27 : * How many bytes should we try to read from a file at once?
28 : */
29 : #define READ_CHUNK_SIZE (128 * 1024)
30 :
31 : /* Temporary directory for spilled WAL segment files */
32 : char *TmpWalSegDir = NULL;
33 :
34 : /*
35 : * Check if the start segment number is zero; this indicates a request to read
36 : * any WAL file.
37 : */
38 : #define READ_ANY_WAL(privateInfo) ((privateInfo)->start_segno == 0)
39 :
40 : /*
41 : * Hash entry representing a WAL segment retrieved from the archive.
42 : *
43 : * While WAL segments are typically read sequentially, individual entries
44 : * maintain their own buffers for the following reasons:
45 : *
46 : * 1. Boundary Handling: The archive streamer provides a continuous byte
47 : * stream. A single streaming chunk may contain the end of one WAL segment
48 : * and the start of the next. Separate buffers allow us to easily
49 : * partition and track these bytes by their respective segments.
50 : *
51 : * 2. Out-of-Order Support: Dedicated buffers simplify logic when segments
52 : * are archived or retrieved out of sequence.
53 : *
54 : * To minimize the memory footprint, entries and their associated buffers are
55 : * freed once consumed. Since pg_waldump does not request the same bytes
56 : * twice (after it's located the point at which it should start decoding),
57 : * a segment can be discarded as soon as pg_waldump moves past it. Moreover,
58 : * if we read a segment that won't be needed till later, we spill its data to
59 : * a temporary file instead of retaining it in memory. This ensures that
60 : * pg_waldump can process even very large tar archives without needing more
61 : * than a few WAL segments' worth of memory space.
62 : */
63 : typedef struct ArchivedWALFile
64 : {
65 : uint32 status; /* hash status */
66 : const char *fname; /* hash key: WAL segment name */
67 :
68 : StringInfo buf; /* holds WAL bytes read from archive */
69 : bool spilled; /* true if the WAL data was spilled to a
70 : * temporary file */
71 :
72 : int read_len; /* total bytes received from archive for this
73 : * segment (same as buf->len, unless we have
74 : * spilled the data to a temp file) */
75 : } ArchivedWALFile;
76 :
77 : static uint32 hash_string_pointer(const char *s);
78 : #define SH_PREFIX ArchivedWAL
79 : #define SH_ELEMENT_TYPE ArchivedWALFile
80 : #define SH_KEY_TYPE const char *
81 : #define SH_KEY fname
82 : #define SH_HASH_KEY(tb, key) hash_string_pointer(key)
83 : #define SH_EQUAL(tb, a, b) (strcmp(a, b) == 0)
84 : #define SH_SCOPE static inline
85 : #define SH_RAW_ALLOCATOR pg_malloc0
86 : #define SH_DECLARE
87 : #define SH_DEFINE
88 : #include "lib/simplehash.h"
89 :
90 : typedef struct astreamer_waldump
91 : {
92 : astreamer base;
93 : XLogDumpPrivate *privateInfo;
94 : } astreamer_waldump;
95 :
96 : static ArchivedWALFile *get_archive_wal_entry(const char *fname,
97 : XLogDumpPrivate *privateInfo);
98 : static bool read_archive_file(XLogDumpPrivate *privateInfo);
99 : static void setup_tmpwal_dir(const char *waldir);
100 :
101 : static FILE *prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo);
102 : static void perform_tmp_write(const char *fname, StringInfo buf, FILE *file);
103 :
104 : static astreamer *astreamer_waldump_new(XLogDumpPrivate *privateInfo);
105 : static void astreamer_waldump_content(astreamer *streamer,
106 : astreamer_member *member,
107 : const char *data, int len,
108 : astreamer_archive_context context);
109 : static void astreamer_waldump_finalize(astreamer *streamer);
110 : static void astreamer_waldump_free(astreamer *streamer);
111 :
112 : static bool member_is_wal_file(astreamer_waldump *mystreamer,
113 : astreamer_member *member,
114 : char **fname);
115 :
116 : static const astreamer_ops astreamer_waldump_ops = {
117 : .content = astreamer_waldump_content,
118 : .finalize = astreamer_waldump_finalize,
119 : .free = astreamer_waldump_free
120 : };
121 :
122 : /*
123 : * Initializes the tar archive reader: opens the archive, builds a hash table
124 : * for WAL entries, reads ahead until a full WAL page header is available to
125 : * determine the WAL segment size, and computes start/end segment numbers for
126 : * filtering.
127 : */
128 : void
129 52 : init_archive_reader(XLogDumpPrivate *privateInfo,
130 : pg_compress_algorithm compression)
131 : {
132 : int fd;
133 : astreamer *streamer;
134 52 : ArchivedWALFile *entry = NULL;
135 : XLogLongPageHeader longhdr;
136 : ArchivedWAL_iterator iter;
137 :
138 : /* Open tar archive and store its file descriptor */
139 52 : fd = open_file_in_directory(privateInfo->archive_dir,
140 52 : privateInfo->archive_name);
141 :
142 52 : if (fd < 0)
143 0 : pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
144 :
145 52 : privateInfo->archive_fd = fd;
146 52 : privateInfo->archive_fd_eof = false;
147 :
148 52 : streamer = astreamer_waldump_new(privateInfo);
149 :
150 : /* We must first parse the tar archive. */
151 52 : streamer = astreamer_tar_parser_new(streamer);
152 :
153 : /* If the archive is compressed, decompress before parsing. */
154 52 : if (compression == PG_COMPRESSION_GZIP)
155 17 : streamer = astreamer_gzip_decompressor_new(streamer);
156 35 : else if (compression == PG_COMPRESSION_LZ4)
157 4 : streamer = astreamer_lz4_decompressor_new(streamer);
158 31 : else if (compression == PG_COMPRESSION_ZSTD)
159 0 : streamer = astreamer_zstd_decompressor_new(streamer);
160 :
161 52 : privateInfo->archive_streamer = streamer;
162 :
163 : /*
164 : * Allocate a buffer for reading the archive file to begin content
165 : * decoding.
166 : */
167 52 : privateInfo->archive_read_buf = pg_malloc(READ_CHUNK_SIZE);
168 52 : privateInfo->archive_read_buf_size = READ_CHUNK_SIZE;
169 :
170 : /*
171 : * Hash table storing WAL entries read from the archive with an arbitrary
172 : * initial size.
173 : */
174 52 : privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
175 :
176 : /*
177 : * Read until we have at least one WAL segment with enough data to extract
178 : * the WAL segment size from the long page header.
179 : *
180 : * We must not rely on cur_file here, because it can become NULL if a
181 : * member trailer is processed during a read_archive_file() call. Instead,
182 : * scan the hash table after each read to find any entry with sufficient
183 : * data.
184 : */
185 692 : while (entry == NULL)
186 : {
187 640 : if (!read_archive_file(privateInfo))
188 0 : pg_fatal("could not find WAL in archive \"%s\"",
189 : privateInfo->archive_name);
190 :
191 640 : ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
192 640 : while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
193 640 : &iter)) != NULL)
194 : {
195 52 : if (entry->read_len >= sizeof(XLogLongPageHeaderData))
196 52 : break;
197 : }
198 : }
199 :
200 : /* Extract the WAL segment size from the long page header */
201 52 : longhdr = (XLogLongPageHeader) entry->buf->data;
202 :
203 52 : if (!IsValidWalSegSize(longhdr->xlp_seg_size))
204 : {
205 0 : pg_log_error(ngettext("invalid WAL segment size in WAL file from archive \"%s\" (%d byte)",
206 : "invalid WAL segment size in WAL file from archive \"%s\" (%d bytes)",
207 : longhdr->xlp_seg_size),
208 : privateInfo->archive_name, longhdr->xlp_seg_size);
209 0 : pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
210 0 : exit(1);
211 : }
212 :
213 52 : privateInfo->segsize = longhdr->xlp_seg_size;
214 :
215 : /*
216 : * With the WAL segment size available, we can now initialize the
217 : * dependent start and end segment numbers.
218 : */
219 : Assert(XLogRecPtrIsValid(privateInfo->startptr));
220 52 : XLByteToSeg(privateInfo->startptr, privateInfo->start_segno,
221 : privateInfo->segsize);
222 :
223 52 : if (XLogRecPtrIsValid(privateInfo->endptr))
224 48 : XLByteToSeg(privateInfo->endptr, privateInfo->end_segno,
225 : privateInfo->segsize);
226 :
227 : /*
228 : * Now that we have initialized the filtering parameters (start_segno and
229 : * end_segno), we can discard any already-loaded WAL hash table entries
230 : * for segments we don't actually need. Subsequent WAL will be filtered
231 : * automatically by the archive streamer using the updated start_segno and
232 : * end_segno values.
233 : */
234 52 : ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
235 104 : while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
236 104 : &iter)) != NULL)
237 : {
238 : XLogSegNo segno;
239 : TimeLineID timeline;
240 :
241 52 : XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
242 52 : if (privateInfo->timeline != timeline ||
243 52 : privateInfo->start_segno > segno ||
244 52 : privateInfo->end_segno < segno)
245 13 : free_archive_wal_entry(entry->fname, privateInfo);
246 : }
247 52 : }
248 :
249 : /*
250 : * Release the archive streamer chain and close the archive file.
251 : */
252 : void
253 48 : free_archive_reader(XLogDumpPrivate *privateInfo)
254 : {
255 : /*
256 : * NB: Normally, astreamer_finalize() is called before astreamer_free() to
257 : * flush any remaining buffered data or to ensure the end of the tar
258 : * archive is reached. read_archive_file() may have done so. However,
259 : * when decoding WAL we can stop once we hit the end LSN, so we may never
260 : * have read all of the input file. In that case any remaining buffered
261 : * data or unread portion of the archive can be safely ignored.
262 : */
263 48 : astreamer_free(privateInfo->archive_streamer);
264 :
265 : /* Free any remaining hash table entries and their buffers. */
266 48 : if (privateInfo->archive_wal_htab != NULL)
267 : {
268 : ArchivedWAL_iterator iter;
269 : ArchivedWALFile *entry;
270 :
271 48 : ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
272 146 : while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
273 98 : &iter)) != NULL)
274 : {
275 50 : if (entry->buf != NULL)
276 50 : destroyStringInfo(entry->buf);
277 : }
278 48 : ArchivedWAL_destroy(privateInfo->archive_wal_htab);
279 48 : privateInfo->archive_wal_htab = NULL;
280 : }
281 :
282 : /* Free the reusable read buffer. */
283 48 : if (privateInfo->archive_read_buf != NULL)
284 : {
285 48 : pg_free(privateInfo->archive_read_buf);
286 48 : privateInfo->archive_read_buf = NULL;
287 : }
288 :
289 : /* Close the file. */
290 48 : if (close(privateInfo->archive_fd) != 0)
291 0 : pg_log_error("could not close file \"%s\": %m",
292 : privateInfo->archive_name);
293 48 : }
294 :
295 : /*
296 : * Copies the requested WAL data from the hash entry's buffer into readBuff.
297 : * If the buffer does not yet contain the needed bytes, fetches more data from
298 : * the tar archive via the archive streamer.
299 : */
300 : int
301 28452 : read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr,
302 : size_t count, char *readBuff)
303 : {
304 28452 : char *p = readBuff;
305 28452 : size_t nbytes = count;
306 28452 : XLogRecPtr recptr = targetPagePtr;
307 28452 : int segsize = privateInfo->segsize;
308 : XLogSegNo segno;
309 : char fname[MAXFNAMELEN];
310 : ArchivedWALFile *entry;
311 :
312 : /* Identify the segment and locate its entry in the archive hash */
313 28452 : XLByteToSeg(targetPagePtr, segno, segsize);
314 28452 : XLogFileName(fname, privateInfo->timeline, segno, segsize);
315 28452 : entry = get_archive_wal_entry(fname, privateInfo);
316 : Assert(!entry->spilled);
317 :
318 60217 : while (nbytes > 0)
319 : {
320 31765 : char *buf = entry->buf->data;
321 31765 : int bufLen = entry->buf->len;
322 : XLogRecPtr endPtr;
323 : XLogRecPtr startPtr;
324 :
325 : /*
326 : * Calculate the LSN range currently residing in the buffer.
327 : *
328 : * read_len tracks total bytes received for this segment, so endPtr is
329 : * the LSN just past the last buffered byte, and startPtr is the LSN
330 : * of the first buffered byte.
331 : */
332 31765 : XLogSegNoOffsetToRecPtr(segno, entry->read_len, segsize, endPtr);
333 31765 : startPtr = endPtr - bufLen;
334 :
335 : /*
336 : * Copy the requested WAL record if it exists in the buffer.
337 : */
338 31765 : if (bufLen > 0 && startPtr <= recptr && recptr < endPtr)
339 29496 : {
340 : int copyBytes;
341 29496 : int offset = recptr - startPtr;
342 :
343 : /*
344 : * Given startPtr <= recptr < endPtr and a total buffer size
345 : * 'bufLen', the offset (recptr - startPtr) will always be less
346 : * than 'bufLen'.
347 : */
348 : Assert(offset < bufLen);
349 :
350 29496 : copyBytes = Min(nbytes, bufLen - offset);
351 29496 : memcpy(p, buf + offset, copyBytes);
352 :
353 : /* Update state for read */
354 29496 : recptr += copyBytes;
355 29496 : nbytes -= copyBytes;
356 29496 : p += copyBytes;
357 : }
358 : else
359 : {
360 : /*
361 : * We evidently need to fetch more data. Raise an error if the
362 : * archive streamer has moved past our segment (meaning the WAL
363 : * file in the archive is shorter than expected) or if reading the
364 : * archive reached EOF.
365 : */
366 2269 : if (privateInfo->cur_file != entry)
367 0 : pg_fatal("WAL segment \"%s\" in archive \"%s\" is too short: read %zu of %zu bytes",
368 : fname, privateInfo->archive_name,
369 : (count - nbytes), count);
370 2269 : if (!read_archive_file(privateInfo))
371 0 : pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %zu of %zu bytes",
372 : privateInfo->archive_name, fname,
373 : (count - nbytes), count);
374 :
375 : /*
376 : * Loading more data may have moved hash table entries, so we must
377 : * re-look-up the one we are reading from.
378 : */
379 2269 : entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
380 : /* ... it had better still be there */
381 : Assert(entry != NULL);
382 : }
383 : }
384 :
385 : /*
386 : * Should have successfully read all the requested bytes or reported a
387 : * failure before this point.
388 : */
389 : Assert(nbytes == 0);
390 :
391 : /*
392 : * Return count unchanged; the caller expects this convention, matching
393 : * the routine that reads WAL pages from physical files.
394 : */
395 28452 : return count;
396 : }
397 :
398 : /*
399 : * Releases the buffer of a WAL entry that is no longer needed, preventing the
400 : * accumulation of irrelevant WAL data. Also removes any associated temporary
401 : * file and clears privateInfo->cur_file if it points to this entry, so the
402 : * archive streamer skips subsequent data for it.
403 : */
404 : void
405 43 : free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
406 : {
407 : ArchivedWALFile *entry;
408 : const char *oldfname;
409 :
410 43 : entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
411 :
412 43 : if (entry == NULL)
413 0 : return;
414 :
415 : /* Destroy the buffer */
416 43 : destroyStringInfo(entry->buf);
417 43 : entry->buf = NULL;
418 :
419 : /* Remove temporary file if any */
420 43 : if (entry->spilled)
421 : {
422 : char fpath[MAXPGPATH];
423 :
424 0 : snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
425 :
426 0 : if (unlink(fpath) == 0)
427 0 : pg_log_debug("removed file \"%s\"", fpath);
428 : }
429 :
430 : /* Clear cur_file if it points to the entry being freed */
431 43 : if (privateInfo->cur_file == entry)
432 15 : privateInfo->cur_file = NULL;
433 :
434 : /*
435 : * ArchivedWAL_delete_item may cause other hash table entries to move.
436 : * Therefore, if cur_file isn't NULL now, we have to be prepared to look
437 : * that entry up again after the deletion. Fortunately, the entry's fname
438 : * string won't move.
439 : */
440 43 : oldfname = privateInfo->cur_file ? privateInfo->cur_file->fname : NULL;
441 :
442 43 : ArchivedWAL_delete_item(privateInfo->archive_wal_htab, entry);
443 :
444 43 : if (oldfname)
445 : {
446 27 : privateInfo->cur_file = ArchivedWAL_lookup(privateInfo->archive_wal_htab,
447 : oldfname);
448 : /* ... it had better still be there */
449 : Assert(privateInfo->cur_file != NULL);
450 : }
451 : }
452 :
453 : /*
454 : * Returns the archived WAL entry from the hash table if it already exists.
455 : * Otherwise, reads more data from the archive until the requested entry is
456 : * found. If the archive streamer reads a WAL file from the archive that
457 : * is not currently needed, that data is spilled to a temporary file for later
458 : * retrieval.
459 : *
460 : * Note that the returned entry might not have been completely read from
461 : * the archive yet.
462 : */
463 : static ArchivedWALFile *
464 28452 : get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
465 : {
466 : while (1)
467 1920 : {
468 : ArchivedWALFile *entry;
469 : ArchivedWAL_iterator iter;
470 :
471 : /*
472 : * Search the hash table first. If the entry is found, return it.
473 : * Otherwise, the requested WAL entry hasn't been read from the
474 : * archive yet; we must invoke the archive streamer to fetch it.
475 : */
476 30372 : entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
477 :
478 30372 : if (entry != NULL)
479 28452 : return entry;
480 :
481 : /*
482 : * Before loading more data, scan the hash table to see if we have
483 : * loaded any files we don't need yet. If so, spill their data to
484 : * disk to conserve memory space. But don't try to spill a
485 : * partially-read file; it's not worth the complication.
486 : */
487 1920 : ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
488 2176 : while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
489 2176 : &iter)) != NULL)
490 : {
491 : FILE *write_fp;
492 :
493 : /* OK to spill? */
494 256 : if (entry->spilled)
495 0 : continue; /* already spilled */
496 256 : if (entry == privateInfo->cur_file)
497 256 : continue; /* still being read */
498 :
499 : /* Write out the completed WAL file contents to a temp file. */
500 0 : write_fp = prepare_tmp_write(entry->fname, privateInfo);
501 0 : perform_tmp_write(entry->fname, entry->buf, write_fp);
502 0 : if (fclose(write_fp) != 0)
503 0 : pg_fatal("could not close file \"%s/%s\": %m",
504 : TmpWalSegDir, entry->fname);
505 :
506 : /* resetStringInfo won't release storage, so delete/recreate. */
507 0 : destroyStringInfo(entry->buf);
508 0 : entry->buf = makeStringInfo();
509 0 : entry->spilled = true;
510 : }
511 :
512 : /*
513 : * Read more data. If we reach EOF, the desired file is not present.
514 : */
515 1920 : if (!read_archive_file(privateInfo))
516 0 : pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
517 : fname, privateInfo->archive_name);
518 : }
519 : }
520 :
521 : /*
522 : * Reads a chunk from the archive file and passes it through the streamer
523 : * pipeline for decompression (if needed) and tar member extraction.
524 : *
525 : * Returns true if successful, false if there is no more data.
526 : *
527 : * Callers must be aware that a single call may trigger multiple callbacks
528 : * in astreamer_waldump_content, so privateInfo->cur_file can change value
529 : * (or become NULL) during a call. In particular, cur_file is set to NULL
530 : * when the ASTREAMER_MEMBER_TRAILER callback fires at the end of a tar
531 : * member; it is then set to a new entry when the next WAL member's
532 : * ASTREAMER_MEMBER_HEADER callback fires, which may or may not happen
533 : * within the same call.
534 : */
535 : static bool
536 4829 : read_archive_file(XLogDumpPrivate *privateInfo)
537 : {
538 : int rc;
539 :
540 : /* Fail if we already reached EOF in a prior call. */
541 4829 : if (privateInfo->archive_fd_eof)
542 0 : return false;
543 :
544 : /* Try to read some more data. */
545 4829 : rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf,
546 : privateInfo->archive_read_buf_size);
547 4829 : if (rc < 0)
548 0 : pg_fatal("could not read file \"%s\": %m",
549 : privateInfo->archive_name);
550 :
551 : /*
552 : * Decompress (if required), and then parse the previously read contents
553 : * of the tar file.
554 : */
555 4829 : if (rc > 0)
556 4829 : astreamer_content(privateInfo->archive_streamer, NULL,
557 4829 : privateInfo->archive_read_buf, rc,
558 : ASTREAMER_UNKNOWN);
559 : else
560 : {
561 : /*
562 : * We reached EOF, but there is probably still data queued in the
563 : * astreamer pipeline's buffers. Flush it out to ensure that we
564 : * process everything.
565 : */
566 0 : astreamer_finalize(privateInfo->archive_streamer);
567 : /* Set flag to ensure we don't finalize more than once. */
568 0 : privateInfo->archive_fd_eof = true;
569 : }
570 :
571 4829 : return true;
572 : }
573 :
574 : /*
575 : * Set up a temporary directory to temporarily store WAL segments.
576 : */
577 : static void
578 0 : setup_tmpwal_dir(const char *waldir)
579 : {
580 0 : const char *tmpdir = getenv("TMPDIR");
581 : char *template;
582 :
583 : Assert(TmpWalSegDir == NULL);
584 :
585 : /*
586 : * Use the directory specified by the TMPDIR environment variable. If it's
587 : * not set, fall back to the provided WAL directory to store WAL files
588 : * temporarily.
589 : */
590 0 : template = psprintf("%s/waldump_tmp-XXXXXX",
591 : tmpdir ? tmpdir : waldir);
592 0 : TmpWalSegDir = mkdtemp(template);
593 :
594 0 : if (TmpWalSegDir == NULL)
595 0 : pg_fatal("could not create directory \"%s\": %m", template);
596 :
597 0 : canonicalize_path(TmpWalSegDir);
598 :
599 0 : pg_log_debug("created directory \"%s\"", TmpWalSegDir);
600 0 : }
601 :
602 : /*
603 : * Open a file in the temporary spill directory for writing an out-of-order
604 : * WAL segment, creating the directory if not already done.
605 : * Returns the open file handle.
606 : */
607 : static FILE *
608 0 : prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo)
609 : {
610 : char fpath[MAXPGPATH];
611 : FILE *file;
612 :
613 : /* Setup temporary directory to store WAL segments, if we didn't already */
614 0 : if (unlikely(TmpWalSegDir == NULL))
615 0 : setup_tmpwal_dir(privateInfo->archive_dir);
616 :
617 0 : snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
618 :
619 : /* Open the spill file for writing */
620 0 : file = fopen(fpath, PG_BINARY_W);
621 0 : if (file == NULL)
622 0 : pg_fatal("could not create file \"%s\": %m", fpath);
623 :
624 : #ifndef WIN32
625 0 : if (chmod(fpath, pg_file_create_mode))
626 0 : pg_fatal("could not set permissions on file \"%s\": %m",
627 : fpath);
628 : #endif
629 :
630 0 : pg_log_debug("spilling to temporary file \"%s\"", fpath);
631 :
632 0 : return file;
633 : }
634 :
635 : /*
636 : * Write buffer data to the given file handle.
637 : */
638 : static void
639 0 : perform_tmp_write(const char *fname, StringInfo buf, FILE *file)
640 : {
641 : Assert(file);
642 :
643 0 : errno = 0;
644 0 : if (buf->len > 0 && fwrite(buf->data, buf->len, 1, file) != 1)
645 : {
646 : /*
647 : * If write didn't set errno, assume problem is no disk space
648 : */
649 0 : if (errno == 0)
650 0 : errno = ENOSPC;
651 0 : pg_fatal("could not write to file \"%s/%s\": %m", TmpWalSegDir, fname);
652 : }
653 0 : }
654 :
655 : /*
656 : * Create an astreamer that can read WAL from tar file.
657 : */
658 : static astreamer *
659 52 : astreamer_waldump_new(XLogDumpPrivate *privateInfo)
660 : {
661 : astreamer_waldump *streamer;
662 :
663 52 : streamer = palloc0_object(astreamer_waldump);
664 52 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
665 : &astreamer_waldump_ops;
666 :
667 52 : streamer->privateInfo = privateInfo;
668 :
669 52 : return &streamer->base;
670 : }
671 :
672 : /*
673 : * Main entry point of the archive streamer for reading WAL data from a tar
674 : * file. If a member is identified as a valid WAL file, a hash entry is created
675 : * for it, and its contents are copied into that entry's buffer, making them
676 : * accessible to the decoding routine.
677 : */
678 : static void
679 31830 : astreamer_waldump_content(astreamer *streamer, astreamer_member *member,
680 : const char *data, int len,
681 : astreamer_archive_context context)
682 : {
683 31830 : astreamer_waldump *mystreamer = (astreamer_waldump *) streamer;
684 31830 : XLogDumpPrivate *privateInfo = mystreamer->privateInfo;
685 :
686 : Assert(context != ASTREAMER_UNKNOWN);
687 :
688 31830 : switch (context)
689 : {
690 8598 : case ASTREAMER_MEMBER_HEADER:
691 : {
692 8598 : char *fname = NULL;
693 : ArchivedWALFile *entry;
694 : bool found;
695 :
696 : /* Shouldn't see MEMBER_HEADER in the middle of a file */
697 : Assert(privateInfo->cur_file == NULL);
698 :
699 8598 : pg_log_debug("reading \"%s\"", member->pathname);
700 :
701 8598 : if (!member_is_wal_file(mystreamer, member, &fname))
702 8501 : break;
703 :
704 : /*
705 : * Skip range filtering during initial startup, before the WAL
706 : * segment size and segment number bounds are known.
707 : */
708 109 : if (!READ_ANY_WAL(privateInfo))
709 : {
710 : XLogSegNo segno;
711 : TimeLineID timeline;
712 :
713 : /*
714 : * Skip the segment if the timeline does not match, if it
715 : * falls outside the caller-specified range.
716 : */
717 57 : XLogFromFileName(fname, &timeline, &segno, privateInfo->segsize);
718 57 : if (privateInfo->timeline != timeline ||
719 57 : privateInfo->start_segno > segno ||
720 57 : privateInfo->end_segno < segno)
721 : {
722 12 : pfree(fname);
723 12 : break;
724 : }
725 : }
726 :
727 : /*
728 : * Note: ArchivedWAL_insert may cause existing hash table
729 : * entries to move. While cur_file is known to be NULL right
730 : * now, read_archive_wal_page may have a live hash entry
731 : * pointer, which it needs to take care to update after
732 : * read_archive_file completes.
733 : */
734 97 : entry = ArchivedWAL_insert(privateInfo->archive_wal_htab,
735 : fname, &found);
736 :
737 : /*
738 : * Shouldn't happen, but if it does, simply ignore the
739 : * duplicate WAL file.
740 : */
741 97 : if (found)
742 : {
743 0 : pg_log_warning("ignoring duplicate WAL \"%s\" found in archive \"%s\"",
744 : member->pathname, privateInfo->archive_name);
745 0 : pfree(fname);
746 0 : break;
747 : }
748 :
749 97 : entry->buf = makeStringInfo();
750 97 : entry->spilled = false;
751 97 : entry->read_len = 0;
752 97 : privateInfo->cur_file = entry;
753 : }
754 97 : break;
755 :
756 14686 : case ASTREAMER_MEMBER_CONTENTS:
757 14686 : if (privateInfo->cur_file)
758 : {
759 4587 : appendBinaryStringInfo(privateInfo->cur_file->buf, data, len);
760 4587 : privateInfo->cur_file->read_len += len;
761 : }
762 14686 : break;
763 :
764 8546 : case ASTREAMER_MEMBER_TRAILER:
765 :
766 : /*
767 : * End of this tar member; mark cur_file NULL so subsequent
768 : * content callbacks (if any) know no WAL file is currently
769 : * active.
770 : */
771 8546 : privateInfo->cur_file = NULL;
772 8546 : break;
773 :
774 0 : case ASTREAMER_ARCHIVE_TRAILER:
775 0 : break;
776 :
777 0 : default:
778 : /* Shouldn't happen. */
779 0 : pg_fatal("unexpected state while parsing tar file");
780 : }
781 31830 : }
782 :
783 : /*
784 : * End-of-stream processing for an astreamer_waldump stream. This is a
785 : * terminal streamer so it must have no successor.
786 : */
787 : static void
788 0 : astreamer_waldump_finalize(astreamer *streamer)
789 : {
790 : Assert(streamer->bbs_next == NULL);
791 0 : }
792 :
793 : /*
794 : * Free memory associated with an astreamer_waldump stream.
795 : */
796 : static void
797 48 : astreamer_waldump_free(astreamer *streamer)
798 : {
799 : Assert(streamer->bbs_next == NULL);
800 48 : pfree(streamer);
801 48 : }
802 :
803 : /*
804 : * Returns true if the archive member name matches the WAL naming format. If
805 : * successful, it also outputs the WAL segment name.
806 : */
807 : static bool
808 8598 : member_is_wal_file(astreamer_waldump *mystreamer, astreamer_member *member,
809 : char **fname)
810 : {
811 : int pathlen;
812 : char pathname[MAXPGPATH];
813 : char *filename;
814 :
815 : /* We are only interested in normal files */
816 8598 : if (!member->is_regular)
817 239 : return false;
818 :
819 8359 : if (strlen(member->pathname) < XLOG_FNAME_LEN)
820 8225 : return false;
821 :
822 : /*
823 : * For a correct comparison, we must remove any '.' or '..' components
824 : * from the member pathname. Similar to member_verify_header(), we prepend
825 : * './' to the path so that canonicalize_path() can properly resolve and
826 : * strip these references from the tar member name.
827 : */
828 134 : snprintf(pathname, MAXPGPATH, "./%s", member->pathname);
829 134 : canonicalize_path(pathname);
830 134 : pathlen = strlen(pathname);
831 :
832 : /* Skip files in subdirectories other than pg_wal/ */
833 134 : if (pathlen > XLOG_FNAME_LEN &&
834 33 : strncmp(pathname, XLOGDIR, strlen(XLOGDIR)) != 0)
835 25 : return false;
836 :
837 : /* WAL file may appear with a full path (e.g., pg_wal/<name>) */
838 109 : filename = pathname + (pathlen - XLOG_FNAME_LEN);
839 109 : if (!IsXLogFileName(filename))
840 0 : return false;
841 :
842 109 : *fname = pnstrdup(filename, XLOG_FNAME_LEN);
843 :
844 109 : return true;
845 : }
846 :
847 : /*
848 : * Helper function for WAL file hash table.
849 : */
850 : static uint32
851 32851 : hash_string_pointer(const char *s)
852 : {
853 32851 : const unsigned char *ss = (const unsigned char *) s;
854 :
855 32851 : return hash_bytes(ss, strlen(s));
856 : }
|