LCOV - code coverage report
Current view: top level - src/bin/pg_waldump - archive_waldump.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 73.0 % 233 170
Test Date: 2026-04-18 20:16:25 Functions: 73.3 % 15 11
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * archive_waldump.c
       4              :  *      A generic facility for reading WAL data from tar archives via archive
       5              :  *      streamer.
       6              :  *
       7              :  * Portions Copyright (c) 2026, PostgreSQL Global Development Group
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *      src/bin/pg_waldump/archive_waldump.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres_fe.h"
      16              : 
      17              : #include <unistd.h>
      18              : 
      19              : #include "access/xlog_internal.h"
      20              : #include "common/file_perm.h"
      21              : #include "common/hashfn.h"
      22              : #include "common/logging.h"
      23              : #include "fe_utils/simple_list.h"
      24              : #include "pg_waldump.h"
      25              : 
      26              : /*
      27              :  * How many bytes should we try to read from a file at once?
      28              :  */
      29              : #define READ_CHUNK_SIZE             (128 * 1024)
      30              : 
      31              : /* Temporary directory for spilled WAL segment files */
      32              : char       *TmpWalSegDir = NULL;
      33              : 
      34              : /*
      35              :  * Check if the start segment number is zero; this indicates a request to read
      36              :  * any WAL file.
      37              :  */
      38              : #define READ_ANY_WAL(privateInfo)   ((privateInfo)->start_segno == 0)
      39              : 
      40              : /*
      41              :  * Hash entry representing a WAL segment retrieved from the archive.
      42              :  *
      43              :  * While WAL segments are typically read sequentially, individual entries
      44              :  * maintain their own buffers for the following reasons:
      45              :  *
      46              :  * 1. Boundary Handling: The archive streamer provides a continuous byte
      47              :  * stream. A single streaming chunk may contain the end of one WAL segment
      48              :  * and the start of the next. Separate buffers allow us to easily
      49              :  * partition and track these bytes by their respective segments.
      50              :  *
      51              :  * 2. Out-of-Order Support: Dedicated buffers simplify logic when segments
      52              :  * are archived or retrieved out of sequence.
      53              :  *
      54              :  * To minimize the memory footprint, entries and their associated buffers are
      55              :  * freed once consumed.  Since pg_waldump does not request the same bytes
      56              :  * twice (after it's located the point at which it should start decoding),
      57              :  * a segment can be discarded as soon as pg_waldump moves past it.  Moreover,
      58              :  * if we read a segment that won't be needed till later, we spill its data to
      59              :  * a temporary file instead of retaining it in memory.  This ensures that
      60              :  * pg_waldump can process even very large tar archives without needing more
      61              :  * than a few WAL segments' worth of memory space.
      62              :  */
      63              : typedef struct ArchivedWALFile
      64              : {
      65              :     uint32      status;         /* hash status */
      66              :     const char *fname;          /* hash key: WAL segment name */
      67              : 
      68              :     StringInfo  buf;            /* holds WAL bytes read from archive */
      69              :     bool        spilled;        /* true if the WAL data was spilled to a
      70              :                                  * temporary file */
      71              : 
      72              :     int         read_len;       /* total bytes received from archive for this
      73              :                                  * segment (same as buf->len, unless we have
      74              :                                  * spilled the data to a temp file) */
      75              : } ArchivedWALFile;
      76              : 
      77              : static uint32 hash_string_pointer(const char *s);
      78              : #define SH_PREFIX               ArchivedWAL
      79              : #define SH_ELEMENT_TYPE         ArchivedWALFile
      80              : #define SH_KEY_TYPE             const char *
      81              : #define SH_KEY                  fname
      82              : #define SH_HASH_KEY(tb, key)    hash_string_pointer(key)
      83              : #define SH_EQUAL(tb, a, b)      (strcmp(a, b) == 0)
      84              : #define SH_SCOPE                static inline
      85              : #define SH_RAW_ALLOCATOR        pg_malloc0
      86              : #define SH_DECLARE
      87              : #define SH_DEFINE
      88              : #include "lib/simplehash.h"
      89              : 
      90              : typedef struct astreamer_waldump
      91              : {
      92              :     astreamer   base;
      93              :     XLogDumpPrivate *privateInfo;
      94              : } astreamer_waldump;
      95              : 
      96              : static ArchivedWALFile *get_archive_wal_entry(const char *fname,
      97              :                                               XLogDumpPrivate *privateInfo);
      98              : static bool read_archive_file(XLogDumpPrivate *privateInfo);
      99              : static void setup_tmpwal_dir(const char *waldir);
     100              : 
     101              : static FILE *prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo);
     102              : static void perform_tmp_write(const char *fname, StringInfo buf, FILE *file);
     103              : 
     104              : static astreamer *astreamer_waldump_new(XLogDumpPrivate *privateInfo);
     105              : static void astreamer_waldump_content(astreamer *streamer,
     106              :                                       astreamer_member *member,
     107              :                                       const char *data, int len,
     108              :                                       astreamer_archive_context context);
     109              : static void astreamer_waldump_finalize(astreamer *streamer);
     110              : static void astreamer_waldump_free(astreamer *streamer);
     111              : 
     112              : static bool member_is_wal_file(astreamer_waldump *mystreamer,
     113              :                                astreamer_member *member,
     114              :                                char **fname);
     115              : 
     116              : static const astreamer_ops astreamer_waldump_ops = {
     117              :     .content = astreamer_waldump_content,
     118              :     .finalize = astreamer_waldump_finalize,
     119              :     .free = astreamer_waldump_free
     120              : };
     121              : 
     122              : /*
     123              :  * Initializes the tar archive reader: opens the archive, builds a hash table
     124              :  * for WAL entries, reads ahead until a full WAL page header is available to
     125              :  * determine the WAL segment size, and computes start/end segment numbers for
     126              :  * filtering.
     127              :  */
     128              : void
     129           52 : init_archive_reader(XLogDumpPrivate *privateInfo,
     130              :                     pg_compress_algorithm compression)
     131              : {
     132              :     int         fd;
     133              :     astreamer  *streamer;
     134           52 :     ArchivedWALFile *entry = NULL;
     135              :     XLogLongPageHeader longhdr;
     136              :     ArchivedWAL_iterator iter;
     137              : 
     138              :     /* Open tar archive and store its file descriptor */
     139           52 :     fd = open_file_in_directory(privateInfo->archive_dir,
     140           52 :                                 privateInfo->archive_name);
     141              : 
     142           52 :     if (fd < 0)
     143            0 :         pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
     144              : 
     145           52 :     privateInfo->archive_fd = fd;
     146           52 :     privateInfo->archive_fd_eof = false;
     147              : 
     148           52 :     streamer = astreamer_waldump_new(privateInfo);
     149              : 
     150              :     /* We must first parse the tar archive. */
     151           52 :     streamer = astreamer_tar_parser_new(streamer);
     152              : 
     153              :     /* If the archive is compressed, decompress before parsing. */
     154           52 :     if (compression == PG_COMPRESSION_GZIP)
     155           17 :         streamer = astreamer_gzip_decompressor_new(streamer);
     156           35 :     else if (compression == PG_COMPRESSION_LZ4)
     157            4 :         streamer = astreamer_lz4_decompressor_new(streamer);
     158           31 :     else if (compression == PG_COMPRESSION_ZSTD)
     159            0 :         streamer = astreamer_zstd_decompressor_new(streamer);
     160              : 
     161           52 :     privateInfo->archive_streamer = streamer;
     162              : 
     163              :     /*
     164              :      * Allocate a buffer for reading the archive file to begin content
     165              :      * decoding.
     166              :      */
     167           52 :     privateInfo->archive_read_buf = pg_malloc(READ_CHUNK_SIZE);
     168           52 :     privateInfo->archive_read_buf_size = READ_CHUNK_SIZE;
     169              : 
     170              :     /*
     171              :      * Hash table storing WAL entries read from the archive with an arbitrary
     172              :      * initial size.
     173              :      */
     174           52 :     privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
     175              : 
     176              :     /*
     177              :      * Read until we have at least one WAL segment with enough data to extract
     178              :      * the WAL segment size from the long page header.
     179              :      *
     180              :      * We must not rely on cur_file here, because it can become NULL if a
     181              :      * member trailer is processed during a read_archive_file() call. Instead,
     182              :      * scan the hash table after each read to find any entry with sufficient
     183              :      * data.
     184              :      */
     185          692 :     while (entry == NULL)
     186              :     {
     187          640 :         if (!read_archive_file(privateInfo))
     188            0 :             pg_fatal("could not find WAL in archive \"%s\"",
     189              :                      privateInfo->archive_name);
     190              : 
     191          640 :         ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
     192          640 :         while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
     193          640 :                                             &iter)) != NULL)
     194              :         {
     195           52 :             if (entry->read_len >= sizeof(XLogLongPageHeaderData))
     196           52 :                 break;
     197              :         }
     198              :     }
     199              : 
     200              :     /* Extract the WAL segment size from the long page header */
     201           52 :     longhdr = (XLogLongPageHeader) entry->buf->data;
     202              : 
     203           52 :     if (!IsValidWalSegSize(longhdr->xlp_seg_size))
     204              :     {
     205            0 :         pg_log_error(ngettext("invalid WAL segment size in WAL file from archive \"%s\" (%d byte)",
     206              :                               "invalid WAL segment size in WAL file from archive \"%s\" (%d bytes)",
     207              :                               longhdr->xlp_seg_size),
     208              :                      privateInfo->archive_name, longhdr->xlp_seg_size);
     209            0 :         pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
     210            0 :         exit(1);
     211              :     }
     212              : 
     213           52 :     privateInfo->segsize = longhdr->xlp_seg_size;
     214              : 
     215              :     /*
     216              :      * With the WAL segment size available, we can now initialize the
     217              :      * dependent start and end segment numbers.
     218              :      */
     219              :     Assert(XLogRecPtrIsValid(privateInfo->startptr));
     220           52 :     XLByteToSeg(privateInfo->startptr, privateInfo->start_segno,
     221              :                 privateInfo->segsize);
     222              : 
     223           52 :     if (XLogRecPtrIsValid(privateInfo->endptr))
     224           48 :         XLByteToSeg(privateInfo->endptr, privateInfo->end_segno,
     225              :                     privateInfo->segsize);
     226              : 
     227              :     /*
     228              :      * Now that we have initialized the filtering parameters (start_segno and
     229              :      * end_segno), we can discard any already-loaded WAL hash table entries
     230              :      * for segments we don't actually need.  Subsequent WAL will be filtered
     231              :      * automatically by the archive streamer using the updated start_segno and
     232              :      * end_segno values.
     233              :      */
     234           52 :     ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
     235          104 :     while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
     236          104 :                                         &iter)) != NULL)
     237              :     {
     238              :         XLogSegNo   segno;
     239              :         TimeLineID  timeline;
     240              : 
     241           52 :         XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
     242           52 :         if (privateInfo->timeline != timeline ||
     243           52 :             privateInfo->start_segno > segno ||
     244           52 :             privateInfo->end_segno < segno)
     245           13 :             free_archive_wal_entry(entry->fname, privateInfo);
     246              :     }
     247           52 : }
     248              : 
     249              : /*
     250              :  * Release the archive streamer chain and close the archive file.
     251              :  */
     252              : void
     253           48 : free_archive_reader(XLogDumpPrivate *privateInfo)
     254              : {
     255              :     /*
     256              :      * NB: Normally, astreamer_finalize() is called before astreamer_free() to
     257              :      * flush any remaining buffered data or to ensure the end of the tar
     258              :      * archive is reached.  read_archive_file() may have done so.  However,
     259              :      * when decoding WAL we can stop once we hit the end LSN, so we may never
     260              :      * have read all of the input file.  In that case any remaining buffered
     261              :      * data or unread portion of the archive can be safely ignored.
     262              :      */
     263           48 :     astreamer_free(privateInfo->archive_streamer);
     264              : 
     265              :     /* Free any remaining hash table entries and their buffers. */
     266           48 :     if (privateInfo->archive_wal_htab != NULL)
     267              :     {
     268              :         ArchivedWAL_iterator iter;
     269              :         ArchivedWALFile *entry;
     270              : 
     271           48 :         ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
     272          146 :         while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
     273           98 :                                             &iter)) != NULL)
     274              :         {
     275           50 :             if (entry->buf != NULL)
     276           50 :                 destroyStringInfo(entry->buf);
     277              :         }
     278           48 :         ArchivedWAL_destroy(privateInfo->archive_wal_htab);
     279           48 :         privateInfo->archive_wal_htab = NULL;
     280              :     }
     281              : 
     282              :     /* Free the reusable read buffer. */
     283           48 :     if (privateInfo->archive_read_buf != NULL)
     284              :     {
     285           48 :         pg_free(privateInfo->archive_read_buf);
     286           48 :         privateInfo->archive_read_buf = NULL;
     287              :     }
     288              : 
     289              :     /* Close the file. */
     290           48 :     if (close(privateInfo->archive_fd) != 0)
     291            0 :         pg_log_error("could not close file \"%s\": %m",
     292              :                      privateInfo->archive_name);
     293           48 : }
     294              : 
     295              : /*
     296              :  * Copies the requested WAL data from the hash entry's buffer into readBuff.
     297              :  * If the buffer does not yet contain the needed bytes, fetches more data from
     298              :  * the tar archive via the archive streamer.
     299              :  */
     300              : int
     301        28452 : read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr,
     302              :                       size_t count, char *readBuff)
     303              : {
     304        28452 :     char       *p = readBuff;
     305        28452 :     size_t      nbytes = count;
     306        28452 :     XLogRecPtr  recptr = targetPagePtr;
     307        28452 :     int         segsize = privateInfo->segsize;
     308              :     XLogSegNo   segno;
     309              :     char        fname[MAXFNAMELEN];
     310              :     ArchivedWALFile *entry;
     311              : 
     312              :     /* Identify the segment and locate its entry in the archive hash */
     313        28452 :     XLByteToSeg(targetPagePtr, segno, segsize);
     314        28452 :     XLogFileName(fname, privateInfo->timeline, segno, segsize);
     315        28452 :     entry = get_archive_wal_entry(fname, privateInfo);
     316              :     Assert(!entry->spilled);
     317              : 
     318        60217 :     while (nbytes > 0)
     319              :     {
     320        31765 :         char       *buf = entry->buf->data;
     321        31765 :         int         bufLen = entry->buf->len;
     322              :         XLogRecPtr  endPtr;
     323              :         XLogRecPtr  startPtr;
     324              : 
     325              :         /*
     326              :          * Calculate the LSN range currently residing in the buffer.
     327              :          *
     328              :          * read_len tracks total bytes received for this segment, so endPtr is
     329              :          * the LSN just past the last buffered byte, and startPtr is the LSN
     330              :          * of the first buffered byte.
     331              :          */
     332        31765 :         XLogSegNoOffsetToRecPtr(segno, entry->read_len, segsize, endPtr);
     333        31765 :         startPtr = endPtr - bufLen;
     334              : 
     335              :         /*
     336              :          * Copy the requested WAL record if it exists in the buffer.
     337              :          */
     338        31765 :         if (bufLen > 0 && startPtr <= recptr && recptr < endPtr)
     339        29496 :         {
     340              :             int         copyBytes;
     341        29496 :             int         offset = recptr - startPtr;
     342              : 
     343              :             /*
     344              :              * Given startPtr <= recptr < endPtr and a total buffer size
     345              :              * 'bufLen', the offset (recptr - startPtr) will always be less
     346              :              * than 'bufLen'.
     347              :              */
     348              :             Assert(offset < bufLen);
     349              : 
     350        29496 :             copyBytes = Min(nbytes, bufLen - offset);
     351        29496 :             memcpy(p, buf + offset, copyBytes);
     352              : 
     353              :             /* Update state for read */
     354        29496 :             recptr += copyBytes;
     355        29496 :             nbytes -= copyBytes;
     356        29496 :             p += copyBytes;
     357              :         }
     358              :         else
     359              :         {
     360              :             /*
     361              :              * We evidently need to fetch more data.  Raise an error if the
     362              :              * archive streamer has moved past our segment (meaning the WAL
     363              :              * file in the archive is shorter than expected) or if reading the
     364              :              * archive reached EOF.
     365              :              */
     366         2269 :             if (privateInfo->cur_file != entry)
     367            0 :                 pg_fatal("WAL segment \"%s\" in archive \"%s\" is too short: read %zu of %zu bytes",
     368              :                          fname, privateInfo->archive_name,
     369              :                          (count - nbytes), count);
     370         2269 :             if (!read_archive_file(privateInfo))
     371            0 :                 pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %zu of %zu bytes",
     372              :                          privateInfo->archive_name, fname,
     373              :                          (count - nbytes), count);
     374              : 
     375              :             /*
     376              :              * Loading more data may have moved hash table entries, so we must
     377              :              * re-look-up the one we are reading from.
     378              :              */
     379         2269 :             entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
     380              :             /* ... it had better still be there */
     381              :             Assert(entry != NULL);
     382              :         }
     383              :     }
     384              : 
     385              :     /*
     386              :      * Should have successfully read all the requested bytes or reported a
     387              :      * failure before this point.
     388              :      */
     389              :     Assert(nbytes == 0);
     390              : 
     391              :     /*
     392              :      * Return count unchanged; the caller expects this convention, matching
     393              :      * the routine that reads WAL pages from physical files.
     394              :      */
     395        28452 :     return count;
     396              : }
     397              : 
     398              : /*
     399              :  * Releases the buffer of a WAL entry that is no longer needed, preventing the
     400              :  * accumulation of irrelevant WAL data.  Also removes any associated temporary
     401              :  * file and clears privateInfo->cur_file if it points to this entry, so the
     402              :  * archive streamer skips subsequent data for it.
     403              :  */
     404              : void
     405           43 : free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
     406              : {
     407              :     ArchivedWALFile *entry;
     408              :     const char *oldfname;
     409              : 
     410           43 :     entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
     411              : 
     412           43 :     if (entry == NULL)
     413            0 :         return;
     414              : 
     415              :     /* Destroy the buffer */
     416           43 :     destroyStringInfo(entry->buf);
     417           43 :     entry->buf = NULL;
     418              : 
     419              :     /* Remove temporary file if any */
     420           43 :     if (entry->spilled)
     421              :     {
     422              :         char        fpath[MAXPGPATH];
     423              : 
     424            0 :         snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
     425              : 
     426            0 :         if (unlink(fpath) == 0)
     427            0 :             pg_log_debug("removed file \"%s\"", fpath);
     428              :     }
     429              : 
     430              :     /* Clear cur_file if it points to the entry being freed */
     431           43 :     if (privateInfo->cur_file == entry)
     432           15 :         privateInfo->cur_file = NULL;
     433              : 
     434              :     /*
     435              :      * ArchivedWAL_delete_item may cause other hash table entries to move.
     436              :      * Therefore, if cur_file isn't NULL now, we have to be prepared to look
     437              :      * that entry up again after the deletion.  Fortunately, the entry's fname
     438              :      * string won't move.
     439              :      */
     440           43 :     oldfname = privateInfo->cur_file ? privateInfo->cur_file->fname : NULL;
     441              : 
     442           43 :     ArchivedWAL_delete_item(privateInfo->archive_wal_htab, entry);
     443              : 
     444           43 :     if (oldfname)
     445              :     {
     446           27 :         privateInfo->cur_file = ArchivedWAL_lookup(privateInfo->archive_wal_htab,
     447              :                                                    oldfname);
     448              :         /* ... it had better still be there */
     449              :         Assert(privateInfo->cur_file != NULL);
     450              :     }
     451              : }
     452              : 
     453              : /*
     454              :  * Returns the archived WAL entry from the hash table if it already exists.
     455              :  * Otherwise, reads more data from the archive until the requested entry is
     456              :  * found.  If the archive streamer reads a WAL file from the archive that
     457              :  * is not currently needed, that data is spilled to a temporary file for later
     458              :  * retrieval.
     459              :  *
     460              :  * Note that the returned entry might not have been completely read from
     461              :  * the archive yet.
     462              :  */
     463              : static ArchivedWALFile *
     464        28452 : get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
     465              : {
     466              :     while (1)
     467         1920 :     {
     468              :         ArchivedWALFile *entry;
     469              :         ArchivedWAL_iterator iter;
     470              : 
     471              :         /*
     472              :          * Search the hash table first.  If the entry is found, return it.
     473              :          * Otherwise, the requested WAL entry hasn't been read from the
     474              :          * archive yet; we must invoke the archive streamer to fetch it.
     475              :          */
     476        30372 :         entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
     477              : 
     478        30372 :         if (entry != NULL)
     479        28452 :             return entry;
     480              : 
     481              :         /*
     482              :          * Before loading more data, scan the hash table to see if we have
     483              :          * loaded any files we don't need yet.  If so, spill their data to
     484              :          * disk to conserve memory space.  But don't try to spill a
     485              :          * partially-read file; it's not worth the complication.
     486              :          */
     487         1920 :         ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
     488         2176 :         while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
     489         2176 :                                             &iter)) != NULL)
     490              :         {
     491              :             FILE       *write_fp;
     492              : 
     493              :             /* OK to spill? */
     494          256 :             if (entry->spilled)
     495            0 :                 continue;       /* already spilled */
     496          256 :             if (entry == privateInfo->cur_file)
     497          256 :                 continue;       /* still being read */
     498              : 
     499              :             /* Write out the completed WAL file contents to a temp file. */
     500            0 :             write_fp = prepare_tmp_write(entry->fname, privateInfo);
     501            0 :             perform_tmp_write(entry->fname, entry->buf, write_fp);
     502            0 :             if (fclose(write_fp) != 0)
     503            0 :                 pg_fatal("could not close file \"%s/%s\": %m",
     504              :                          TmpWalSegDir, entry->fname);
     505              : 
     506              :             /* resetStringInfo won't release storage, so delete/recreate. */
     507            0 :             destroyStringInfo(entry->buf);
     508            0 :             entry->buf = makeStringInfo();
     509            0 :             entry->spilled = true;
     510              :         }
     511              : 
     512              :         /*
     513              :          * Read more data.  If we reach EOF, the desired file is not present.
     514              :          */
     515         1920 :         if (!read_archive_file(privateInfo))
     516            0 :             pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
     517              :                      fname, privateInfo->archive_name);
     518              :     }
     519              : }
     520              : 
     521              : /*
     522              :  * Reads a chunk from the archive file and passes it through the streamer
     523              :  * pipeline for decompression (if needed) and tar member extraction.
     524              :  *
     525              :  * Returns true if successful, false if there is no more data.
     526              :  *
     527              :  * Callers must be aware that a single call may trigger multiple callbacks
     528              :  * in astreamer_waldump_content, so privateInfo->cur_file can change value
     529              :  * (or become NULL) during a call.  In particular, cur_file is set to NULL
     530              :  * when the ASTREAMER_MEMBER_TRAILER callback fires at the end of a tar
     531              :  * member; it is then set to a new entry when the next WAL member's
     532              :  * ASTREAMER_MEMBER_HEADER callback fires, which may or may not happen
     533              :  * within the same call.
     534              :  */
     535              : static bool
     536         4829 : read_archive_file(XLogDumpPrivate *privateInfo)
     537              : {
     538              :     int         rc;
     539              : 
     540              :     /* Fail if we already reached EOF in a prior call. */
     541         4829 :     if (privateInfo->archive_fd_eof)
     542            0 :         return false;
     543              : 
     544              :     /* Try to read some more data. */
     545         4829 :     rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf,
     546              :               privateInfo->archive_read_buf_size);
     547         4829 :     if (rc < 0)
     548            0 :         pg_fatal("could not read file \"%s\": %m",
     549              :                  privateInfo->archive_name);
     550              : 
     551              :     /*
     552              :      * Decompress (if required), and then parse the previously read contents
     553              :      * of the tar file.
     554              :      */
     555         4829 :     if (rc > 0)
     556         4829 :         astreamer_content(privateInfo->archive_streamer, NULL,
     557         4829 :                           privateInfo->archive_read_buf, rc,
     558              :                           ASTREAMER_UNKNOWN);
     559              :     else
     560              :     {
     561              :         /*
     562              :          * We reached EOF, but there is probably still data queued in the
     563              :          * astreamer pipeline's buffers.  Flush it out to ensure that we
     564              :          * process everything.
     565              :          */
     566            0 :         astreamer_finalize(privateInfo->archive_streamer);
     567              :         /* Set flag to ensure we don't finalize more than once. */
     568            0 :         privateInfo->archive_fd_eof = true;
     569              :     }
     570              : 
     571         4829 :     return true;
     572              : }
     573              : 
     574              : /*
     575              :  * Set up a temporary directory to temporarily store WAL segments.
     576              :  */
     577              : static void
     578            0 : setup_tmpwal_dir(const char *waldir)
     579              : {
     580            0 :     const char *tmpdir = getenv("TMPDIR");
     581              :     char       *template;
     582              : 
     583              :     Assert(TmpWalSegDir == NULL);
     584              : 
     585              :     /*
     586              :      * Use the directory specified by the TMPDIR environment variable. If it's
     587              :      * not set, fall back to the provided WAL directory to store WAL files
     588              :      * temporarily.
     589              :      */
     590            0 :     template = psprintf("%s/waldump_tmp-XXXXXX",
     591              :                         tmpdir ? tmpdir : waldir);
     592            0 :     TmpWalSegDir = mkdtemp(template);
     593              : 
     594            0 :     if (TmpWalSegDir == NULL)
     595            0 :         pg_fatal("could not create directory \"%s\": %m", template);
     596              : 
     597            0 :     canonicalize_path(TmpWalSegDir);
     598              : 
     599            0 :     pg_log_debug("created directory \"%s\"", TmpWalSegDir);
     600            0 : }
     601              : 
     602              : /*
     603              :  * Open a file in the temporary spill directory for writing an out-of-order
     604              :  * WAL segment, creating the directory if not already done.
     605              :  * Returns the open file handle.
     606              :  */
     607              : static FILE *
     608            0 : prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo)
     609              : {
     610              :     char        fpath[MAXPGPATH];
     611              :     FILE       *file;
     612              : 
     613              :     /* Setup temporary directory to store WAL segments, if we didn't already */
     614            0 :     if (unlikely(TmpWalSegDir == NULL))
     615            0 :         setup_tmpwal_dir(privateInfo->archive_dir);
     616              : 
     617            0 :     snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
     618              : 
     619              :     /* Open the spill file for writing */
     620            0 :     file = fopen(fpath, PG_BINARY_W);
     621            0 :     if (file == NULL)
     622            0 :         pg_fatal("could not create file \"%s\": %m", fpath);
     623              : 
     624              : #ifndef WIN32
     625            0 :     if (chmod(fpath, pg_file_create_mode))
     626            0 :         pg_fatal("could not set permissions on file \"%s\": %m",
     627              :                  fpath);
     628              : #endif
     629              : 
     630            0 :     pg_log_debug("spilling to temporary file \"%s\"", fpath);
     631              : 
     632            0 :     return file;
     633              : }
     634              : 
     635              : /*
     636              :  * Write buffer data to the given file handle.
     637              :  */
     638              : static void
     639            0 : perform_tmp_write(const char *fname, StringInfo buf, FILE *file)
     640              : {
     641              :     Assert(file);
     642              : 
     643            0 :     errno = 0;
     644            0 :     if (buf->len > 0 && fwrite(buf->data, buf->len, 1, file) != 1)
     645              :     {
     646              :         /*
     647              :          * If write didn't set errno, assume problem is no disk space
     648              :          */
     649            0 :         if (errno == 0)
     650            0 :             errno = ENOSPC;
     651            0 :         pg_fatal("could not write to file \"%s/%s\": %m", TmpWalSegDir, fname);
     652              :     }
     653            0 : }
     654              : 
     655              : /*
     656              :  * Create an astreamer that can read WAL from tar file.
     657              :  */
     658              : static astreamer *
     659           52 : astreamer_waldump_new(XLogDumpPrivate *privateInfo)
     660              : {
     661              :     astreamer_waldump *streamer;
     662              : 
     663           52 :     streamer = palloc0_object(astreamer_waldump);
     664           52 :     *((const astreamer_ops **) &streamer->base.bbs_ops) =
     665              :         &astreamer_waldump_ops;
     666              : 
     667           52 :     streamer->privateInfo = privateInfo;
     668              : 
     669           52 :     return &streamer->base;
     670              : }
     671              : 
     672              : /*
     673              :  * Main entry point of the archive streamer for reading WAL data from a tar
     674              :  * file. If a member is identified as a valid WAL file, a hash entry is created
     675              :  * for it, and its contents are copied into that entry's buffer, making them
     676              :  * accessible to the decoding routine.
     677              :  */
     678              : static void
     679        31830 : astreamer_waldump_content(astreamer *streamer, astreamer_member *member,
     680              :                           const char *data, int len,
     681              :                           astreamer_archive_context context)
     682              : {
     683        31830 :     astreamer_waldump *mystreamer = (astreamer_waldump *) streamer;
     684        31830 :     XLogDumpPrivate *privateInfo = mystreamer->privateInfo;
     685              : 
     686              :     Assert(context != ASTREAMER_UNKNOWN);
     687              : 
     688        31830 :     switch (context)
     689              :     {
     690         8598 :         case ASTREAMER_MEMBER_HEADER:
     691              :             {
     692         8598 :                 char       *fname = NULL;
     693              :                 ArchivedWALFile *entry;
     694              :                 bool        found;
     695              : 
     696              :                 /* Shouldn't see MEMBER_HEADER in the middle of a file */
     697              :                 Assert(privateInfo->cur_file == NULL);
     698              : 
     699         8598 :                 pg_log_debug("reading \"%s\"", member->pathname);
     700              : 
     701         8598 :                 if (!member_is_wal_file(mystreamer, member, &fname))
     702         8501 :                     break;
     703              : 
     704              :                 /*
     705              :                  * Skip range filtering during initial startup, before the WAL
     706              :                  * segment size and segment number bounds are known.
     707              :                  */
     708          109 :                 if (!READ_ANY_WAL(privateInfo))
     709              :                 {
     710              :                     XLogSegNo   segno;
     711              :                     TimeLineID  timeline;
     712              : 
     713              :                     /*
     714              :                      * Skip the segment if the timeline does not match, if it
     715              :                      * falls outside the caller-specified range.
     716              :                      */
     717           57 :                     XLogFromFileName(fname, &timeline, &segno, privateInfo->segsize);
     718           57 :                     if (privateInfo->timeline != timeline ||
     719           57 :                         privateInfo->start_segno > segno ||
     720           57 :                         privateInfo->end_segno < segno)
     721              :                     {
     722           12 :                         pfree(fname);
     723           12 :                         break;
     724              :                     }
     725              :                 }
     726              : 
     727              :                 /*
     728              :                  * Note: ArchivedWAL_insert may cause existing hash table
     729              :                  * entries to move.  While cur_file is known to be NULL right
     730              :                  * now, read_archive_wal_page may have a live hash entry
     731              :                  * pointer, which it needs to take care to update after
     732              :                  * read_archive_file completes.
     733              :                  */
     734           97 :                 entry = ArchivedWAL_insert(privateInfo->archive_wal_htab,
     735              :                                            fname, &found);
     736              : 
     737              :                 /*
     738              :                  * Shouldn't happen, but if it does, simply ignore the
     739              :                  * duplicate WAL file.
     740              :                  */
     741           97 :                 if (found)
     742              :                 {
     743            0 :                     pg_log_warning("ignoring duplicate WAL \"%s\" found in archive \"%s\"",
     744              :                                    member->pathname, privateInfo->archive_name);
     745            0 :                     pfree(fname);
     746            0 :                     break;
     747              :                 }
     748              : 
     749           97 :                 entry->buf = makeStringInfo();
     750           97 :                 entry->spilled = false;
     751           97 :                 entry->read_len = 0;
     752           97 :                 privateInfo->cur_file = entry;
     753              :             }
     754           97 :             break;
     755              : 
     756        14686 :         case ASTREAMER_MEMBER_CONTENTS:
     757        14686 :             if (privateInfo->cur_file)
     758              :             {
     759         4587 :                 appendBinaryStringInfo(privateInfo->cur_file->buf, data, len);
     760         4587 :                 privateInfo->cur_file->read_len += len;
     761              :             }
     762        14686 :             break;
     763              : 
     764         8546 :         case ASTREAMER_MEMBER_TRAILER:
     765              : 
     766              :             /*
     767              :              * End of this tar member; mark cur_file NULL so subsequent
     768              :              * content callbacks (if any) know no WAL file is currently
     769              :              * active.
     770              :              */
     771         8546 :             privateInfo->cur_file = NULL;
     772         8546 :             break;
     773              : 
     774            0 :         case ASTREAMER_ARCHIVE_TRAILER:
     775            0 :             break;
     776              : 
     777            0 :         default:
     778              :             /* Shouldn't happen. */
     779            0 :             pg_fatal("unexpected state while parsing tar file");
     780              :     }
     781        31830 : }
     782              : 
     783              : /*
     784              :  * End-of-stream processing for an astreamer_waldump stream.  This is a
     785              :  * terminal streamer so it must have no successor.
     786              :  */
     787              : static void
     788            0 : astreamer_waldump_finalize(astreamer *streamer)
     789              : {
     790              :     Assert(streamer->bbs_next == NULL);
     791            0 : }
     792              : 
     793              : /*
     794              :  * Free memory associated with an astreamer_waldump stream.
     795              :  */
     796              : static void
     797           48 : astreamer_waldump_free(astreamer *streamer)
     798              : {
     799              :     Assert(streamer->bbs_next == NULL);
     800           48 :     pfree(streamer);
     801           48 : }
     802              : 
     803              : /*
     804              :  * Returns true if the archive member name matches the WAL naming format. If
     805              :  * successful, it also outputs the WAL segment name.
     806              :  */
     807              : static bool
     808         8598 : member_is_wal_file(astreamer_waldump *mystreamer, astreamer_member *member,
     809              :                    char **fname)
     810              : {
     811              :     int         pathlen;
     812              :     char        pathname[MAXPGPATH];
     813              :     char       *filename;
     814              : 
     815              :     /* We are only interested in normal files */
     816         8598 :     if (!member->is_regular)
     817          239 :         return false;
     818              : 
     819         8359 :     if (strlen(member->pathname) < XLOG_FNAME_LEN)
     820         8225 :         return false;
     821              : 
     822              :     /*
     823              :      * For a correct comparison, we must remove any '.' or '..' components
     824              :      * from the member pathname. Similar to member_verify_header(), we prepend
     825              :      * './' to the path so that canonicalize_path() can properly resolve and
     826              :      * strip these references from the tar member name.
     827              :      */
     828          134 :     snprintf(pathname, MAXPGPATH, "./%s", member->pathname);
     829          134 :     canonicalize_path(pathname);
     830          134 :     pathlen = strlen(pathname);
     831              : 
     832              :     /* Skip files in subdirectories other than pg_wal/ */
     833          134 :     if (pathlen > XLOG_FNAME_LEN &&
     834           33 :         strncmp(pathname, XLOGDIR, strlen(XLOGDIR)) != 0)
     835           25 :         return false;
     836              : 
     837              :     /* WAL file may appear with a full path (e.g., pg_wal/<name>) */
     838          109 :     filename = pathname + (pathlen - XLOG_FNAME_LEN);
     839          109 :     if (!IsXLogFileName(filename))
     840            0 :         return false;
     841              : 
     842          109 :     *fname = pnstrdup(filename, XLOG_FNAME_LEN);
     843              : 
     844          109 :     return true;
     845              : }
     846              : 
     847              : /*
     848              :  * Helper function for WAL file hash table.
     849              :  */
     850              : static uint32
     851        32851 : hash_string_pointer(const char *s)
     852              : {
     853        32851 :     const unsigned char *ss = (const unsigned char *) s;
     854              : 
     855        32851 :     return hash_bytes(ss, strlen(s));
     856              : }
        

Generated by: LCOV version 2.0-1