LCOV - code coverage report
Current view: top level - src/bin/pg_waldump - archive_waldump.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 87.1 % 225 196
Test Date: 2026-03-22 18:16:46 Functions: 93.8 % 16 15
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * archive_waldump.c
       4              :  *      A generic facility for reading WAL data from tar archives via archive
       5              :  *      streamer.
       6              :  *
       7              :  * Portions Copyright (c) 2026, PostgreSQL Global Development Group
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/bin/pg_waldump/archive_waldump.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres_fe.h"
      16              : 
      17              : #include <unistd.h>
      18              : 
      19              : #include "access/xlog_internal.h"
      20              : #include "common/file_perm.h"
      21              : #include "common/hashfn.h"
      22              : #include "common/logging.h"
      23              : #include "fe_utils/simple_list.h"
      24              : #include "pg_waldump.h"
      25              : 
      26              : /*
      27              :  * How many bytes should we try to read from a file at once?
      28              :  */
      29              : #define READ_CHUNK_SIZE             (128 * 1024)
      30              : 
      31              : /* Temporary directory for spilled WAL segment files */
      32              : char       *TmpWalSegDir = NULL;
      33              : 
      34              : /*
      35              :  * Check if the start segment number is zero; this indicates a request to read
      36              :  * any WAL file.
      37              :  */
      38              : #define READ_ANY_WAL(privateInfo)   ((privateInfo)->start_segno == 0)
      39              : 
      40              : /*
      41              :  * Hash entry representing a WAL segment retrieved from the archive.
      42              :  *
      43              :  * While WAL segments are typically read sequentially, individual entries
      44              :  * maintain their own buffers for the following reasons:
      45              :  *
      46              :  * 1. Boundary Handling: The archive streamer provides a continuous byte
      47              :  * stream. A single streaming chunk may contain the end of one WAL segment
      48              :  * and the start of the next. Separate buffers allow us to easily
      49              :  * partition and track these bytes by their respective segments.
      50              :  *
      51              :  * 2. Out-of-Order Support: Dedicated buffers simplify logic when segments
      52              :  * are archived or retrieved out of sequence.
      53              :  *
      54              :  * To minimize the memory footprint, entries and their associated buffers are
      55              :  * freed immediately once consumed. Since pg_waldump does not request the same
      56              :  * bytes twice, a segment is discarded as soon as pg_waldump moves past it.
      57              :  */
      58              : typedef struct ArchivedWALFile
      59              : {
      60              :     uint32      status;         /* hash status */
      61              :     const char *fname;          /* hash key: WAL segment name */
      62              : 
      63              :     StringInfo  buf;            /* holds WAL bytes read from archive */
      64              :     bool        spilled;        /* true if the WAL data was spilled to a
      65              :                                  * temporary file */
      66              : 
      67              :     int         read_len;       /* total bytes received from archive for this
      68              :                                  * segment, including already-consumed data */
      69              : } ArchivedWALFile;
      70              : 
      71              : static uint32 hash_string_pointer(const char *s);
      72              : #define SH_PREFIX               ArchivedWAL
      73              : #define SH_ELEMENT_TYPE         ArchivedWALFile
      74              : #define SH_KEY_TYPE             const char *
      75              : #define SH_KEY                  fname
      76              : #define SH_HASH_KEY(tb, key)    hash_string_pointer(key)
      77              : #define SH_EQUAL(tb, a, b)      (strcmp(a, b) == 0)
      78              : #define SH_SCOPE                static inline
      79              : #define SH_RAW_ALLOCATOR        pg_malloc0
      80              : #define SH_DECLARE
      81              : #define SH_DEFINE
      82              : #include "lib/simplehash.h"
      83              : 
      84              : typedef struct astreamer_waldump
      85              : {
      86              :     astreamer   base;
      87              :     XLogDumpPrivate *privateInfo;
      88              : } astreamer_waldump;
      89              : 
      90              : static ArchivedWALFile *get_archive_wal_entry(const char *fname,
      91              :                                               XLogDumpPrivate *privateInfo);
      92              : static int  read_archive_file(XLogDumpPrivate *privateInfo, Size count);
      93              : static void setup_tmpwal_dir(const char *waldir);
      94              : static void cleanup_tmpwal_dir_atexit(void);
      95              : 
      96              : static FILE *prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo);
      97              : static void perform_tmp_write(const char *fname, StringInfo buf, FILE *file);
      98              : 
      99              : static astreamer *astreamer_waldump_new(XLogDumpPrivate *privateInfo);
     100              : static void astreamer_waldump_content(astreamer *streamer,
     101              :                                       astreamer_member *member,
     102              :                                       const char *data, int len,
     103              :                                       astreamer_archive_context context);
     104              : static void astreamer_waldump_finalize(astreamer *streamer);
     105              : static void astreamer_waldump_free(astreamer *streamer);
     106              : 
     107              : static bool member_is_wal_file(astreamer_waldump *mystreamer,
     108              :                                astreamer_member *member,
     109              :                                char **fname);
     110              : 
     111              : static const astreamer_ops astreamer_waldump_ops = {
     112              :     .content = astreamer_waldump_content,
     113              :     .finalize = astreamer_waldump_finalize,
     114              :     .free = astreamer_waldump_free
     115              : };
     116              : 
     117              : /*
     118              :  * Initializes the tar archive reader: opens the archive, builds a hash table
     119              :  * for WAL entries, reads ahead until a full WAL page header is available to
     120              :  * determine the WAL segment size, and computes start/end segment numbers for
     121              :  * filtering.
     122              :  */
     123              : void
     124           52 : init_archive_reader(XLogDumpPrivate *privateInfo,
     125              :                     pg_compress_algorithm compression)
     126              : {
     127              :     int         fd;
     128              :     astreamer  *streamer;
     129           52 :     ArchivedWALFile *entry = NULL;
     130              :     XLogLongPageHeader longhdr;
     131              :     XLogSegNo   segno;
     132              :     TimeLineID  timeline;
     133              : 
     134              :     /* Open tar archive and store its file descriptor */
     135           52 :     fd = open_file_in_directory(privateInfo->archive_dir,
     136           52 :                                 privateInfo->archive_name);
     137              : 
     138           52 :     if (fd < 0)
     139            0 :         pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
     140              : 
     141           52 :     privateInfo->archive_fd = fd;
     142              : 
     143           52 :     streamer = astreamer_waldump_new(privateInfo);
     144              : 
     145              :     /* We must first parse the tar archive. */
     146           52 :     streamer = astreamer_tar_parser_new(streamer);
     147              : 
     148              :     /* If the archive is compressed, decompress before parsing. */
     149           52 :     if (compression == PG_COMPRESSION_GZIP)
     150           17 :         streamer = astreamer_gzip_decompressor_new(streamer);
     151           35 :     else if (compression == PG_COMPRESSION_LZ4)
     152            4 :         streamer = astreamer_lz4_decompressor_new(streamer);
     153           31 :     else if (compression == PG_COMPRESSION_ZSTD)
     154            0 :         streamer = astreamer_zstd_decompressor_new(streamer);
     155              : 
     156           52 :     privateInfo->archive_streamer = streamer;
     157              : 
     158              :     /*
     159              :      * Allocate a buffer for reading the archive file to facilitate content
     160              :      * decoding; read requests must not exceed the allocated buffer size.
     161              :      */
     162           52 :     privateInfo->archive_read_buf = pg_malloc(READ_CHUNK_SIZE);
     163              : 
     164              : #ifdef USE_ASSERT_CHECKING
     165              :     privateInfo->archive_read_buf_size = READ_CHUNK_SIZE;
     166              : #endif
     167              : 
     168              :     /*
     169              :      * Hash table storing WAL entries read from the archive with an arbitrary
     170              :      * initial size.
     171              :      */
     172           52 :     privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
     173              : 
     174              :     /*
     175              :      * Read until we have at least one full WAL page (XLOG_BLCKSZ bytes) from
     176              :      * the first WAL segment in the archive so we can extract the WAL segment
     177              :      * size from the long page header.
     178              :      */
     179         9590 :     while (entry == NULL || entry->buf->len < XLOG_BLCKSZ)
     180              :     {
     181         9538 :         if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0)
     182            0 :             pg_fatal("could not find WAL in archive \"%s\"",
     183              :                      privateInfo->archive_name);
     184              : 
     185         9538 :         entry = privateInfo->cur_file;
     186              :     }
     187              : 
     188              :     /* Extract the WAL segment size from the long page header */
     189           52 :     longhdr = (XLogLongPageHeader) entry->buf->data;
     190              : 
     191           52 :     if (!IsValidWalSegSize(longhdr->xlp_seg_size))
     192              :     {
     193            0 :         pg_log_error(ngettext("invalid WAL segment size in WAL file from archive \"%s\" (%d byte)",
     194              :                               "invalid WAL segment size in WAL file from archive \"%s\" (%d bytes)",
     195              :                               longhdr->xlp_seg_size),
     196              :                      privateInfo->archive_name, longhdr->xlp_seg_size);
     197            0 :         pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
     198            0 :         exit(1);
     199              :     }
     200              : 
     201           52 :     privateInfo->segsize = longhdr->xlp_seg_size;
     202              : 
     203              :     /*
     204              :      * With the WAL segment size available, we can now initialize the
     205              :      * dependent start and end segment numbers.
     206              :      */
     207              :     Assert(!XLogRecPtrIsInvalid(privateInfo->startptr));
     208           52 :     XLByteToSeg(privateInfo->startptr, privateInfo->start_segno,
     209              :                 privateInfo->segsize);
     210              : 
     211           52 :     if (!XLogRecPtrIsInvalid(privateInfo->endptr))
     212           48 :         XLByteToSeg(privateInfo->endptr, privateInfo->end_segno,
     213              :                     privateInfo->segsize);
     214              : 
     215              :     /*
     216              :      * This WAL record was fetched before the filtering parameters
     217              :      * (start_segno and end_segno) were fully initialized. Perform the
     218              :      * relevance check against the user-provided range now; if the WAL falls
     219              :      * outside this range, remove it from the hash table. Subsequent WAL will
     220              :      * be filtered automatically by the archive streamer using the updated
     221              :      * start_segno and end_segno values.
     222              :      */
     223           52 :     XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
     224           52 :     if (privateInfo->timeline != timeline ||
     225           52 :         privateInfo->start_segno > segno ||
     226           52 :         privateInfo->end_segno < segno)
     227           13 :         free_archive_wal_entry(entry->fname, privateInfo);
     228           52 : }
     229              : 
     230              : /*
     231              :  * Release the archive streamer chain and close the archive file.
     232              :  */
     233              : void
     234           48 : free_archive_reader(XLogDumpPrivate *privateInfo)
     235              : {
     236              :     /*
     237              :      * NB: Normally, astreamer_finalize() is called before astreamer_free() to
     238              :      * flush any remaining buffered data or to ensure the end of the tar
     239              :      * archive is reached.  However, when decoding WAL, once we hit the end
     240              :      * LSN, any remaining buffered data or unread portion of the archive can
     241              :      * be safely ignored.
     242              :      */
     243           48 :     astreamer_free(privateInfo->archive_streamer);
     244              : 
     245              :     /* Free any remaining hash table entries and their buffers. */
     246           48 :     if (privateInfo->archive_wal_htab != NULL)
     247              :     {
     248              :         ArchivedWAL_iterator iter;
     249              :         ArchivedWALFile *entry;
     250              : 
     251           48 :         ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
     252          147 :         while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
     253           99 :                                             &iter)) != NULL)
     254              :         {
     255           51 :             if (entry->buf != NULL)
     256           51 :                 destroyStringInfo(entry->buf);
     257              :         }
     258           48 :         ArchivedWAL_destroy(privateInfo->archive_wal_htab);
     259           48 :         privateInfo->archive_wal_htab = NULL;
     260              :     }
     261              : 
     262              :     /* Free the reusable read buffer. */
     263           48 :     if (privateInfo->archive_read_buf != NULL)
     264              :     {
     265           48 :         pg_free(privateInfo->archive_read_buf);
     266           48 :         privateInfo->archive_read_buf = NULL;
     267              :     }
     268              : 
     269              :     /* Close the file. */
     270           48 :     if (close(privateInfo->archive_fd) != 0)
     271            0 :         pg_log_error("could not close file \"%s\": %m",
     272              :                      privateInfo->archive_name);
     273           48 : }
     274              : 
     275              : /*
     276              :  * Copies the requested WAL data from the hash entry's buffer into readBuff.
     277              :  * If the buffer does not yet contain the needed bytes, fetches more data from
     278              :  * the tar archive via the archive streamer.
     279              :  */
     280              : int
     281        27342 : read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr,
     282              :                       Size count, char *readBuff)
     283              : {
     284        27342 :     char       *p = readBuff;
     285        27342 :     Size        nbytes = count;
     286        27342 :     XLogRecPtr  recptr = targetPagePtr;
     287        27342 :     int         segsize = privateInfo->segsize;
     288              :     XLogSegNo   segno;
     289              :     char        fname[MAXFNAMELEN];
     290              :     ArchivedWALFile *entry;
     291              : 
     292              :     /* Identify the segment and locate its entry in the archive hash */
     293        27342 :     XLByteToSeg(targetPagePtr, segno, segsize);
     294        27342 :     XLogFileName(fname, privateInfo->timeline, segno, segsize);
     295        27342 :     entry = get_archive_wal_entry(fname, privateInfo);
     296              : 
     297        57887 :     while (nbytes > 0)
     298              :     {
     299        30545 :         char       *buf = entry->buf->data;
     300        30545 :         int         bufLen = entry->buf->len;
     301              :         XLogRecPtr  endPtr;
     302              :         XLogRecPtr  startPtr;
     303              : 
     304              :         /*
     305              :          * Calculate the LSN range currently residing in the buffer.
     306              :          *
     307              :          * read_len tracks total bytes received for this segment (including
     308              :          * already-discarded data), so endPtr is the LSN just past the last
     309              :          * buffered byte, and startPtr is the LSN of the first buffered byte.
     310              :          */
     311        30545 :         XLogSegNoOffsetToRecPtr(segno, entry->read_len, segsize, endPtr);
     312        30545 :         startPtr = endPtr - bufLen;
     313              : 
     314              :         /*
     315              :          * Copy the requested WAL record if it exists in the buffer.
     316              :          */
     317        30545 :         if (bufLen > 0 && startPtr <= recptr && recptr < endPtr)
     318        28316 :         {
     319              :             int         copyBytes;
     320        28316 :             int         offset = recptr - startPtr;
     321              : 
     322              :             /*
     323              :              * Given startPtr <= recptr < endPtr and a total buffer size
     324              :              * 'bufLen', the offset (recptr - startPtr) will always be less
     325              :              * than 'bufLen'.
     326              :              */
     327              :             Assert(offset < bufLen);
     328              : 
     329        28316 :             copyBytes = Min(nbytes, bufLen - offset);
     330        28316 :             memcpy(p, buf + offset, copyBytes);
     331              : 
     332              :             /* Update state for read */
     333        28316 :             recptr += copyBytes;
     334        28316 :             nbytes -= copyBytes;
     335        28316 :             p += copyBytes;
     336              :         }
     337              :         else
     338              :         {
     339              :             /*
     340              :              * Before starting the actual decoding loop, pg_waldump tries to
     341              :              * locate the first valid record from the user-specified start
     342              :              * position, which might not be the start of a WAL record and
     343              :              * could fall in the middle of a record that spans multiple pages.
     344              :              * Consequently, the valid start position the decoder is looking
     345              :              * for could be far away from that initial position.
     346              :              *
     347              :              * This may involve reading across multiple pages, and this
     348              :              * pre-reading fetches data in multiple rounds from the archive
     349              :              * streamer; normally, we would throw away existing buffer
     350              :              * contents to fetch the next set of data, but that existing data
     351              :              * might be needed once the main loop starts. Because previously
     352              :              * read data cannot be re-read by the archive streamer, we delay
     353              :              * resetting the buffer until the main decoding loop is entered.
     354              :              *
     355              :              * Once pg_waldump has entered the main loop, it may re-read the
     356              :              * currently active page, but never an older one; therefore, any
     357              :              * fully consumed WAL data preceding the current page can then be
     358              :              * safely discarded.
     359              :              */
     360         2229 :             if (privateInfo->decoding_started)
     361              :             {
     362          972 :                 resetStringInfo(entry->buf);
     363              : 
     364              :                 /*
     365              :                  * Push back the partial page data for the current page to the
     366              :                  * buffer, ensuring a full page remains available for
     367              :                  * re-reading if requested.
     368              :                  */
     369          972 :                 if (p > readBuff)
     370              :                 {
     371              :                     Assert((count - nbytes) > 0);
     372          972 :                     appendBinaryStringInfo(entry->buf, readBuff, count - nbytes);
     373              :                 }
     374              :             }
     375              : 
     376              :             /*
     377              :              * Now, fetch more data.  Raise an error if the archive streamer
     378              :              * has moved past our segment (meaning the WAL file in the archive
     379              :              * is shorter than expected) or if reading the archive reached
     380              :              * EOF.
     381              :              */
     382         2229 :             if (privateInfo->cur_file != entry)
     383            0 :                 pg_fatal("WAL segment \"%s\" in archive \"%s\" is too short: read %lld of %lld bytes",
     384              :                          fname, privateInfo->archive_name,
     385              :                          (long long int) (count - nbytes),
     386              :                          (long long int) count);
     387         2229 :             if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
     388            0 :                 pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %lld of %lld bytes",
     389              :                          privateInfo->archive_name, fname,
     390              :                          (long long int) (count - nbytes),
     391              :                          (long long int) count);
     392              :         }
     393              :     }
     394              : 
     395              :     /*
     396              :      * Should have successfully read all the requested bytes or reported a
     397              :      * failure before this point.
     398              :      */
     399              :     Assert(nbytes == 0);
     400              : 
     401              :     /*
     402              :      * Return count unchanged; the caller expects this convention, matching
     403              :      * the routine that reads WAL pages from physical files.
     404              :      */
     405        27342 :     return count;
     406              : }
     407              : 
     408              : /*
     409              :  * Releases the buffer of a WAL entry that is no longer needed, preventing the
     410              :  * accumulation of irrelevant WAL data.  Also removes any associated temporary
     411              :  * file and clears privateInfo->cur_file if it points to this entry, so the
     412              :  * archive streamer skips subsequent data for it.
     413              :  */
     414              : void
     415           43 : free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
     416              : {
     417              :     ArchivedWALFile *entry;
     418              : 
     419           43 :     entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
     420              : 
     421           43 :     if (entry == NULL)
     422            0 :         return;
     423              : 
     424              :     /* Destroy the buffer */
     425           43 :     destroyStringInfo(entry->buf);
     426           43 :     entry->buf = NULL;
     427              : 
     428              :     /* Remove temporary file if any */
     429           43 :     if (entry->spilled)
     430              :     {
     431              :         char        fpath[MAXPGPATH];
     432              : 
     433            2 :         snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
     434              : 
     435            2 :         if (unlink(fpath) == 0)
     436            2 :             pg_log_debug("removed file \"%s\"", fpath);
     437              :     }
     438              : 
     439              :     /* Clear cur_file if it points to the entry being freed */
     440           43 :     if (privateInfo->cur_file == entry)
     441           13 :         privateInfo->cur_file = NULL;
     442              : 
     443           43 :     ArchivedWAL_delete_item(privateInfo->archive_wal_htab, entry);
     444              : }
     445              : 
     446              : /*
     447              :  * Returns the archived WAL entry from the hash table if it already exists.
     448              :  * Otherwise, reads more data from the archive until the requested entry is
     449              :  * found.  If the archive streamer is reading a WAL file from the archive that
     450              :  * is not currently needed, that data is spilled to a temporary file for later
     451              :  * retrieval.
     452              :  */
     453              : static ArchivedWALFile *
     454        27342 : get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
     455              : {
     456        27342 :     ArchivedWALFile *entry = NULL;
     457        27342 :     FILE       *write_fp = NULL;
     458              : 
     459              :     /*
     460              :      * Search the hash table first. If the entry is found, return it.
     461              :      * Otherwise, the requested WAL entry hasn't been read from the archive
     462              :      * yet; invoke the archive streamer to fetch it.
     463              :      */
     464              :     while (1)
     465              :     {
     466              :         /*
     467              :          * Search hash table.
     468              :          *
     469              :          * We perform the search inside the loop because a single iteration of
     470              :          * the archive reader may decompress and extract multiple files into
     471              :          * the hash table. One of these newly added files could be the one we
     472              :          * are seeking.
     473              :          */
     474        31199 :         entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
     475              : 
     476        31199 :         if (entry != NULL)
     477        27342 :             return entry;
     478              : 
     479              :         /*
     480              :          * Capture the current entry before calling read_archive_file(),
     481              :          * because cur_file may advance to a new segment during streaming. We
     482              :          * hold this reference so we can flush any remaining buffer data and
     483              :          * close the write handle once we detect that cur_file has moved on.
     484              :          */
     485         3857 :         entry = privateInfo->cur_file;
     486              : 
     487              :         /*
     488              :          * Fetch more data either when no current file is being tracked or
     489              :          * when its buffer has been fully flushed to the temporary file.
     490              :          */
     491         3857 :         if (entry == NULL || entry->buf->len == 0)
     492              :         {
     493         3840 :             if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
     494            0 :                 break;          /* archive file ended */
     495              :         }
     496              : 
     497              :         /*
     498              :          * Archive streamer is reading a non-WAL file or an irrelevant WAL
     499              :          * file.
     500              :          */
     501         3857 :         if (entry == NULL)
     502         1664 :             continue;
     503              : 
     504              :         /*
     505              :          * The streamer is producing a WAL segment that isn't the one asked
     506              :          * for; it must be arriving out of order.  Spill its data to disk so
     507              :          * it can be read back when needed.
     508              :          */
     509              :         Assert(strcmp(fname, entry->fname) != 0);
     510              : 
     511              :         /* Create a temporary file if one does not already exist */
     512         2193 :         if (!entry->spilled)
     513              :         {
     514           17 :             write_fp = prepare_tmp_write(entry->fname, privateInfo);
     515           17 :             entry->spilled = true;
     516              :         }
     517              : 
     518              :         /* Flush data from the buffer to the file */
     519         2193 :         perform_tmp_write(entry->fname, entry->buf, write_fp);
     520         2193 :         resetStringInfo(entry->buf);
     521              : 
     522              :         /*
     523              :          * If cur_file changed since we captured entry above, the archive
     524              :          * streamer has finished this segment and moved on.  Close its spill
     525              :          * file handle so data is flushed to disk before the next segment
     526              :          * starts writing to a different handle.
     527              :          */
     528         2193 :         if (entry != privateInfo->cur_file && write_fp != NULL)
     529              :         {
     530           17 :             fclose(write_fp);
     531           17 :             write_fp = NULL;
     532              :         }
     533              :     }
     534              : 
     535              :     /* Requested WAL segment not found */
     536            0 :     pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
     537              :              fname, privateInfo->archive_name);
     538              : }
     539              : 
     540              : /*
     541              :  * Reads a chunk from the archive file and passes it through the streamer
     542              :  * pipeline for decompression (if needed) and tar member extraction.
     543              :  */
     544              : static int
     545        15607 : read_archive_file(XLogDumpPrivate *privateInfo, Size count)
     546              : {
     547              :     int         rc;
     548              : 
     549              :     /* The read request must not exceed the allocated buffer size. */
     550              :     Assert(privateInfo->archive_read_buf_size >= count);
     551              : 
     552        15607 :     rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf, count);
     553        15607 :     if (rc < 0)
     554            0 :         pg_fatal("could not read file \"%s\": %m",
     555              :                  privateInfo->archive_name);
     556              : 
     557              :     /*
     558              :      * Decompress (if required), and then parse the previously read contents
     559              :      * of the tar file.
     560              :      */
     561        15607 :     if (rc > 0)
     562        15607 :         astreamer_content(privateInfo->archive_streamer, NULL,
     563        15607 :                           privateInfo->archive_read_buf, rc,
     564              :                           ASTREAMER_UNKNOWN);
     565              : 
     566        15607 :     return rc;
     567              : }
     568              : 
     569              : /*
     570              :  * Set up a temporary directory to temporarily store WAL segments.
     571              :  */
     572              : static void
     573           15 : setup_tmpwal_dir(const char *waldir)
     574              : {
     575           15 :     const char *tmpdir = getenv("TMPDIR");
     576              :     char       *template;
     577              : 
     578              :     Assert(TmpWalSegDir == NULL);
     579              : 
     580              :     /*
     581              :      * Use the directory specified by the TMPDIR environment variable. If it's
     582              :      * not set, fall back to the provided WAL directory to store WAL files
     583              :      * temporarily.
     584              :      */
     585           15 :     template = psprintf("%s/waldump_tmp-XXXXXX",
     586              :                         tmpdir ? tmpdir : waldir);
     587           15 :     TmpWalSegDir = mkdtemp(template);
     588              : 
     589           15 :     if (TmpWalSegDir == NULL)
     590            0 :         pg_fatal("could not create directory \"%s\": %m", template);
     591              : 
     592           15 :     canonicalize_path(TmpWalSegDir);
     593              : 
     594           15 :     pg_log_debug("created directory \"%s\"", TmpWalSegDir);
     595           15 : }
     596              : 
     597              : /*
     598              :  * Remove temporary directory at exit, if any.
     599              :  */
     600              : static void
     601           15 : cleanup_tmpwal_dir_atexit(void)
     602              : {
     603              :     Assert(TmpWalSegDir != NULL);
     604              : 
     605           15 :     rmtree(TmpWalSegDir, true);
     606              : 
     607           15 :     TmpWalSegDir = NULL;
     608           15 : }
     609              : 
     610              : /*
     611              :  * Open a file in the temporary spill directory for writing an out-of-order
     612              :  * WAL segment, creating the directory and registering the cleanup callback
     613              :  * if not already done.  Returns the open file handle.
     614              :  */
     615              : static FILE *
     616           17 : prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo)
     617              : {
     618              :     char        fpath[MAXPGPATH];
     619              :     FILE       *file;
     620              : 
     621              :     /*
     622              :      * Setup temporary directory to store WAL segments and set up an exit
     623              :      * callback to remove it upon completion if not already.
     624              :      */
     625           17 :     if (unlikely(TmpWalSegDir == NULL))
     626              :     {
     627           15 :         setup_tmpwal_dir(privateInfo->archive_dir);
     628           15 :         atexit(cleanup_tmpwal_dir_atexit);
     629              :     }
     630              : 
     631           17 :     snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
     632              : 
     633              :     /* Open the spill file for writing */
     634           17 :     file = fopen(fpath, PG_BINARY_W);
     635           17 :     if (file == NULL)
     636            0 :         pg_fatal("could not create file \"%s\": %m", fpath);
     637              : 
     638              : #ifndef WIN32
     639           17 :     if (chmod(fpath, pg_file_create_mode))
     640            0 :         pg_fatal("could not set permissions on file \"%s\": %m",
     641              :                  fpath);
     642              : #endif
     643              : 
     644           17 :     pg_log_debug("spilling to temporary file \"%s\"", fpath);
     645              : 
     646           17 :     return file;
     647              : }
     648              : 
     649              : /*
     650              :  * Write buffer data to the given file handle.
     651              :  */
     652              : static void
     653         2193 : perform_tmp_write(const char *fname, StringInfo buf, FILE *file)
     654              : {
     655              :     Assert(file);
     656              : 
     657         2193 :     errno = 0;
     658         2193 :     if (buf->len > 0 && fwrite(buf->data, buf->len, 1, file) != 1)
     659              :     {
     660              :         /*
     661              :          * If write didn't set errno, assume problem is no disk space
     662              :          */
     663            0 :         if (errno == 0)
     664            0 :             errno = ENOSPC;
     665            0 :         pg_fatal("could not write to file \"%s/%s\": %m", TmpWalSegDir, fname);
     666              :     }
     667         2193 : }
     668              : 
     669              : /*
     670              :  * Create an astreamer that can read WAL from tar file.
     671              :  */
     672              : static astreamer *
     673           52 : astreamer_waldump_new(XLogDumpPrivate *privateInfo)
     674              : {
     675              :     astreamer_waldump *streamer;
     676              : 
     677           52 :     streamer = palloc0_object(astreamer_waldump);
     678           52 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     679              :         &astreamer_waldump_ops;
     680              : 
     681           52 :     streamer->privateInfo = privateInfo;
     682              : 
     683           52 :     return &streamer->base;
     684              : }
     685              : 
     686              : /*
     687              :  * Main entry point of the archive streamer for reading WAL data from a tar
     688              :  * file. If a member is identified as a valid WAL file, a hash entry is created
     689              :  * for it, and its contents are copied into that entry's buffer, making them
     690              :  * accessible to the decoding routine.
     691              :  */
     692              : static void
     693       889154 : astreamer_waldump_content(astreamer *streamer, astreamer_member *member,
     694              :                           const char *data, int len,
     695              :                           astreamer_archive_context context)
     696              : {
     697       889154 :     astreamer_waldump *mystreamer = (astreamer_waldump *) streamer;
     698       889154 :     XLogDumpPrivate *privateInfo = mystreamer->privateInfo;
     699              : 
     700              :     Assert(context != ASTREAMER_UNKNOWN);
     701              : 
     702       889154 :     switch (context)
     703              :     {
     704         8699 :         case ASTREAMER_MEMBER_HEADER:
     705              :             {
     706         8699 :                 char       *fname = NULL;
     707              :                 ArchivedWALFile *entry;
     708              :                 bool        found;
     709              : 
     710         8699 :                 pg_log_debug("reading \"%s\"", member->pathname);
     711              : 
     712         8699 :                 if (!member_is_wal_file(mystreamer, member, &fname))
     713         8601 :                     break;
     714              : 
     715              :                 /*
     716              :                  * Skip range filtering during initial startup, before the WAL
     717              :                  * segment size and segment number bounds are known.
     718              :                  */
     719          110 :                 if (!READ_ANY_WAL(privateInfo))
     720              :                 {
     721              :                     XLogSegNo   segno;
     722              :                     TimeLineID  timeline;
     723              : 
     724              :                     /*
     725              :                      * Skip the segment if the timeline does not match, if it
     726              :                      * falls outside the caller-specified range.
     727              :                      */
     728           58 :                     XLogFromFileName(fname, &timeline, &segno, privateInfo->segsize);
     729           58 :                     if (privateInfo->timeline != timeline ||
     730           58 :                         privateInfo->start_segno > segno ||
     731           58 :                         privateInfo->end_segno < segno)
     732              :                     {
     733           12 :                         pfree(fname);
     734           12 :                         break;
     735              :                     }
     736              :                 }
     737              : 
     738           98 :                 entry = ArchivedWAL_insert(privateInfo->archive_wal_htab,
     739              :                                            fname, &found);
     740              : 
     741              :                 /*
     742              :                  * Shouldn't happen, but if it does, simply ignore the
     743              :                  * duplicate WAL file.
     744              :                  */
     745           98 :                 if (found)
     746              :                 {
     747            0 :                     pg_log_warning("ignoring duplicate WAL \"%s\" found in archive \"%s\"",
     748              :                                    member->pathname, privateInfo->archive_name);
     749            0 :                     pfree(fname);
     750            0 :                     break;
     751              :                 }
     752              : 
     753           98 :                 entry->buf = makeStringInfo();
     754           98 :                 entry->spilled = false;
     755           98 :                 entry->read_len = 0;
     756           98 :                 privateInfo->cur_file = entry;
     757              :             }
     758           98 :             break;
     759              : 
     760       871780 :         case ASTREAMER_MEMBER_CONTENTS:
     761       871780 :             if (privateInfo->cur_file)
     762              :             {
     763       516869 :                 appendBinaryStringInfo(privateInfo->cur_file->buf, data, len);
     764       516869 :                 privateInfo->cur_file->read_len += len;
     765              :             }
     766       871780 :             break;
     767              : 
     768         8675 :         case ASTREAMER_MEMBER_TRAILER:
     769              : 
     770              :             /*
     771              :              * End of this tar member; mark cur_file NULL so subsequent
     772              :              * content callbacks (if any) know no WAL file is currently
     773              :              * active.
     774              :              */
     775         8675 :             privateInfo->cur_file = NULL;
     776         8675 :             break;
     777              : 
     778            0 :         case ASTREAMER_ARCHIVE_TRAILER:
     779            0 :             break;
     780              : 
     781            0 :         default:
     782              :             /* Shouldn't happen. */
     783            0 :             pg_fatal("unexpected state while parsing tar file");
     784              :     }
     785       889154 : }
     786              : 
     787              : /*
     788              :  * End-of-stream processing for an astreamer_waldump stream.  This is a
     789              :  * terminal streamer so it must have no successor.
     790              :  */
     791              : static void
     792            0 : astreamer_waldump_finalize(astreamer *streamer)
     793              : {
     794              :     Assert(streamer->bbs_next == NULL);
     795            0 : }
     796              : 
     797              : /*
     798              :  * Free memory associated with an astreamer_waldump stream.
     799              :  */
     800              : static void
     801           48 : astreamer_waldump_free(astreamer *streamer)
     802              : {
     803              :     Assert(streamer->bbs_next == NULL);
     804           48 :     pfree(streamer);
     805           48 : }
     806              : 
     807              : /*
     808              :  * Returns true if the archive member name matches the WAL naming format. If
     809              :  * successful, it also outputs the WAL segment name.
     810              :  */
     811              : static bool
     812         8699 : member_is_wal_file(astreamer_waldump *mystreamer, astreamer_member *member,
     813              :                    char **fname)
     814              : {
     815              :     int         pathlen;
     816              :     char        pathname[MAXPGPATH];
     817              :     char       *filename;
     818              : 
     819              :     /* We are only interested in normal files */
     820         8699 :     if (member->is_directory || member->is_link)
     821          281 :         return false;
     822              : 
     823         8418 :     if (strlen(member->pathname) < XLOG_FNAME_LEN)
     824         8225 :         return false;
     825              : 
     826              :     /*
     827              :      * For a correct comparison, we must remove any '.' or '..' components
     828              :      * from the member pathname. Similar to member_verify_header(), we prepend
     829              :      * './' to the path so that canonicalize_path() can properly resolve and
     830              :      * strip these references from the tar member name.
     831              :      */
     832          193 :     snprintf(pathname, MAXPGPATH, "./%s", member->pathname);
     833          193 :     canonicalize_path(pathname);
     834          193 :     pathlen = strlen(pathname);
     835              : 
     836              :     /* Skip files in subdirectories other than pg_wal/ */
     837          193 :     if (pathlen > XLOG_FNAME_LEN &&
     838           91 :         strncmp(pathname, XLOGDIR, strlen(XLOGDIR)) != 0)
     839           83 :         return false;
     840              : 
     841              :     /* WAL file may appear with a full path (e.g., pg_wal/<name>) */
     842          110 :     filename = pathname + (pathlen - XLOG_FNAME_LEN);
     843          110 :     if (!IsXLogFileName(filename))
     844            0 :         return false;
     845              : 
     846          110 :     *fname = pnstrdup(filename, XLOG_FNAME_LEN);
     847              : 
     848          110 :     return true;
     849              : }
     850              : 
     851              : /*
     852              :  * Helper function for WAL file hash table.
     853              :  */
     854              : static uint32
     855        31383 : hash_string_pointer(const char *s)
     856              : {
     857        31383 :     unsigned char *ss = (unsigned char *) s;
     858              : 
     859        31383 :     return hash_bytes(ss, strlen(s));
     860              : }
        

Generated by: LCOV version 2.0-1