LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - walmethods.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 197 397 49.6 %
Date: 2021-03-02 10:06:42 Functions: 18 24 75.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walmethods.c - implementations of different ways to write received wal
       4             :  *
       5             :  * NOTE! The caller must ensure that only one method is instantiated in
       6             :  *       any given program, and that it's only instantiated once!
       7             :  *
       8             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/walmethods.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <sys/stat.h>
      18             : #include <time.h>
      19             : #include <unistd.h>
      20             : #ifdef HAVE_LIBZ
      21             : #include <zlib.h>
      22             : #endif
      23             : 
      24             : #include "common/file_perm.h"
      25             : #include "common/file_utils.h"
      26             : #include "pgtar.h"
      27             : #include "receivelog.h"
      28             : #include "streamutil.h"
      29             : 
      30             : /* Size of zlib buffer for .tar.gz */
      31             : #define ZLIB_OUT_SIZE 4096
      32             : 
      33             : /*-------------------------------------------------------------------------
      34             :  * WalDirectoryMethod - write wal to a directory looking like pg_wal
      35             :  *-------------------------------------------------------------------------
      36             :  */
      37             : 
      38             : /*
      39             :  * Global static data for this method
      40             :  */
      41             : typedef struct DirectoryMethodData
      42             : {
      43             :     char       *basedir;
      44             :     int         compression;
      45             :     bool        sync;
      46             : } DirectoryMethodData;
      47             : static DirectoryMethodData *dir_data = NULL;
      48             : 
      49             : /*
      50             :  * Local file handle
      51             :  */
      52             : typedef struct DirectoryMethodFile
      53             : {
      54             :     int         fd;
      55             :     off_t       currpos;
      56             :     char       *pathname;
      57             :     char       *fullpath;
      58             :     char       *temp_suffix;
      59             : #ifdef HAVE_LIBZ
      60             :     gzFile      gzfp;
      61             : #endif
      62             : } DirectoryMethodFile;
      63             : 
      64             : static const char *
      65           0 : dir_getlasterror(void)
      66             : {
      67             :     /* Directory method always sets errno, so just use strerror */
      68           0 :     return strerror(errno);
      69             : }
      70             : 
      71             : static Walfile
      72         154 : dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
      73             : {
      74             :     static char tmppath[MAXPGPATH];
      75             :     int         fd;
      76             :     DirectoryMethodFile *f;
      77             : #ifdef HAVE_LIBZ
      78         154 :     gzFile      gzfp = NULL;
      79             : #endif
      80             : 
      81         308 :     snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
      82         154 :              dir_data->basedir, pathname,
      83         154 :              dir_data->compression > 0 ? ".gz" : "",
      84             :              temp_suffix ? temp_suffix : "");
      85             : 
      86             :     /*
      87             :      * Open a file for non-compressed as well as compressed files. Tracking
      88             :      * the file descriptor is important for dir_sync() method as gzflush()
      89             :      * does not do any system calls to fsync() to make changes permanent on
      90             :      * disk.
      91             :      */
      92         154 :     fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
      93         154 :     if (fd < 0)
      94           0 :         return NULL;
      95             : 
      96             : #ifdef HAVE_LIBZ
      97         154 :     if (dir_data->compression > 0)
      98             :     {
      99           0 :         gzfp = gzdopen(fd, "wb");
     100           0 :         if (gzfp == NULL)
     101             :         {
     102           0 :             close(fd);
     103           0 :             return NULL;
     104             :         }
     105             : 
     106           0 :         if (gzsetparams(gzfp, dir_data->compression,
     107             :                         Z_DEFAULT_STRATEGY) != Z_OK)
     108             :         {
     109           0 :             gzclose(gzfp);
     110           0 :             return NULL;
     111             :         }
     112             :     }
     113             : #endif
     114             : 
     115             :     /* Do pre-padding on non-compressed files */
     116         154 :     if (pad_to_size && dir_data->compression == 0)
     117             :     {
     118             :         PGAlignedXLogBlock zerobuf;
     119             :         int         bytes;
     120             : 
     121         130 :         memset(zerobuf.data, 0, XLOG_BLCKSZ);
     122      262530 :         for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
     123             :         {
     124      262400 :             errno = 0;
     125      262400 :             if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
     126             :             {
     127           0 :                 int         save_errno = errno;
     128             : 
     129           0 :                 close(fd);
     130             : 
     131             :                 /*
     132             :                  * If write didn't set errno, assume problem is no disk space.
     133             :                  */
     134           0 :                 errno = save_errno ? save_errno : ENOSPC;
     135           0 :                 return NULL;
     136             :             }
     137             :         }
     138             : 
     139         130 :         if (lseek(fd, 0, SEEK_SET) != 0)
     140             :         {
     141           0 :             int         save_errno = errno;
     142             : 
     143           0 :             close(fd);
     144           0 :             errno = save_errno;
     145           0 :             return NULL;
     146             :         }
     147             :     }
     148             : 
     149             :     /*
     150             :      * fsync WAL file and containing directory, to ensure the file is
     151             :      * persistently created and zeroed (if padded). That's particularly
     152             :      * important when using synchronous mode, where the file is modified and
     153             :      * fsynced in-place, without a directory fsync.
     154             :      */
     155         154 :     if (dir_data->sync)
     156             :     {
     157           4 :         if (fsync_fname(tmppath, false) != 0 ||
     158           2 :             fsync_parent_path(tmppath) != 0)
     159             :         {
     160             : #ifdef HAVE_LIBZ
     161           0 :             if (dir_data->compression > 0)
     162           0 :                 gzclose(gzfp);
     163             :             else
     164             : #endif
     165           0 :                 close(fd);
     166           0 :             return NULL;
     167             :         }
     168             :     }
     169             : 
     170         154 :     f = pg_malloc0(sizeof(DirectoryMethodFile));
     171             : #ifdef HAVE_LIBZ
     172         154 :     if (dir_data->compression > 0)
     173           0 :         f->gzfp = gzfp;
     174             : #endif
     175         154 :     f->fd = fd;
     176         154 :     f->currpos = 0;
     177         154 :     f->pathname = pg_strdup(pathname);
     178         154 :     f->fullpath = pg_strdup(tmppath);
     179         154 :     if (temp_suffix)
     180           2 :         f->temp_suffix = pg_strdup(temp_suffix);
     181             : 
     182         154 :     return f;
     183             : }
     184             : 
     185             : static ssize_t
     186        4450 : dir_write(Walfile f, const void *buf, size_t count)
     187             : {
     188             :     ssize_t     r;
     189        4450 :     DirectoryMethodFile *df = (DirectoryMethodFile *) f;
     190             : 
     191             :     Assert(f != NULL);
     192             : 
     193             : #ifdef HAVE_LIBZ
     194        4450 :     if (dir_data->compression > 0)
     195           0 :         r = (ssize_t) gzwrite(df->gzfp, buf, count);
     196             :     else
     197             : #endif
     198        4450 :         r = write(df->fd, buf, count);
     199        4450 :     if (r > 0)
     200        4450 :         df->currpos += r;
     201        4450 :     return r;
     202             : }
     203             : 
     204             : static off_t
     205        4450 : dir_get_current_pos(Walfile f)
     206             : {
     207             :     Assert(f != NULL);
     208             : 
     209             :     /* Use a cached value to prevent lots of reseeks */
     210        4450 :     return ((DirectoryMethodFile *) f)->currpos;
     211             : }
     212             : 
     213             : static int
     214         154 : dir_close(Walfile f, WalCloseMethod method)
     215             : {
     216             :     int         r;
     217         154 :     DirectoryMethodFile *df = (DirectoryMethodFile *) f;
     218             :     static char tmppath[MAXPGPATH];
     219             :     static char tmppath2[MAXPGPATH];
     220             : 
     221             :     Assert(f != NULL);
     222             : 
     223             : #ifdef HAVE_LIBZ
     224         154 :     if (dir_data->compression > 0)
     225           0 :         r = gzclose(df->gzfp);
     226             :     else
     227             : #endif
     228         154 :         r = close(df->fd);
     229             : 
     230         154 :     if (r == 0)
     231             :     {
     232             :         /* Build path to the current version of the file */
     233         154 :         if (method == CLOSE_NORMAL && df->temp_suffix)
     234             :         {
     235             :             /*
     236             :              * If we have a temp prefix, normal operation is to rename the
     237             :              * file.
     238             :              */
     239           0 :             snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
     240           0 :                      dir_data->basedir, df->pathname,
     241           0 :                      dir_data->compression > 0 ? ".gz" : "",
     242             :                      df->temp_suffix);
     243           0 :             snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
     244           0 :                      dir_data->basedir, df->pathname,
     245           0 :                      dir_data->compression > 0 ? ".gz" : "");
     246           0 :             r = durable_rename(tmppath, tmppath2);
     247             :         }
     248         154 :         else if (method == CLOSE_UNLINK)
     249             :         {
     250             :             /* Unlink the file once it's closed */
     251           0 :             snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
     252           0 :                      dir_data->basedir, df->pathname,
     253           0 :                      dir_data->compression > 0 ? ".gz" : "",
     254           0 :                      df->temp_suffix ? df->temp_suffix : "");
     255           0 :             r = unlink(tmppath);
     256             :         }
     257             :         else
     258             :         {
     259             :             /*
     260             :              * Else either CLOSE_NORMAL and no temp suffix, or
     261             :              * CLOSE_NO_RENAME. In this case, fsync the file and containing
     262             :              * directory if sync mode is requested.
     263             :              */
     264         154 :             if (dir_data->sync)
     265             :             {
     266           2 :                 r = fsync_fname(df->fullpath, false);
     267           2 :                 if (r == 0)
     268           2 :                     r = fsync_parent_path(df->fullpath);
     269             :             }
     270             :         }
     271             :     }
     272             : 
     273         154 :     pg_free(df->pathname);
     274         154 :     pg_free(df->fullpath);
     275         154 :     if (df->temp_suffix)
     276           2 :         pg_free(df->temp_suffix);
     277         154 :     pg_free(df);
     278             : 
     279         154 :     return r;
     280             : }
     281             : 
     282             : static int
     283           0 : dir_sync(Walfile f)
     284             : {
     285             :     Assert(f != NULL);
     286             : 
     287           0 :     if (!dir_data->sync)
     288           0 :         return 0;
     289             : 
     290             : #ifdef HAVE_LIBZ
     291           0 :     if (dir_data->compression > 0)
     292             :     {
     293           0 :         if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
     294           0 :             return -1;
     295             :     }
     296             : #endif
     297             : 
     298           0 :     return fsync(((DirectoryMethodFile *) f)->fd);
     299             : }
     300             : 
     301             : static ssize_t
     302           0 : dir_get_file_size(const char *pathname)
     303             : {
     304             :     struct stat statbuf;
     305             :     static char tmppath[MAXPGPATH];
     306             : 
     307           0 :     snprintf(tmppath, sizeof(tmppath), "%s/%s",
     308           0 :              dir_data->basedir, pathname);
     309             : 
     310           0 :     if (stat(tmppath, &statbuf) != 0)
     311           0 :         return -1;
     312             : 
     313           0 :     return statbuf.st_size;
     314             : }
     315             : 
     316             : static bool
     317         130 : dir_existsfile(const char *pathname)
     318             : {
     319             :     static char tmppath[MAXPGPATH];
     320             :     int         fd;
     321             : 
     322         130 :     snprintf(tmppath, sizeof(tmppath), "%s/%s",
     323         130 :              dir_data->basedir, pathname);
     324             : 
     325         130 :     fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
     326         130 :     if (fd < 0)
     327         130 :         return false;
     328           0 :     close(fd);
     329           0 :     return true;
     330             : }
     331             : 
     332             : static bool
     333         130 : dir_finish(void)
     334             : {
     335         130 :     if (dir_data->sync)
     336             :     {
     337             :         /*
     338             :          * Files are fsynced when they are closed, but we need to fsync the
     339             :          * directory entry here as well.
     340             :          */
     341           2 :         if (fsync_fname(dir_data->basedir, true) != 0)
     342           0 :             return false;
     343             :     }
     344         130 :     return true;
     345             : }
     346             : 
     347             : 
     348             : WalWriteMethod *
     349         132 : CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
     350             : {
     351             :     WalWriteMethod *method;
     352             : 
     353         132 :     method = pg_malloc0(sizeof(WalWriteMethod));
     354         132 :     method->open_for_write = dir_open_for_write;
     355         132 :     method->write = dir_write;
     356         132 :     method->get_current_pos = dir_get_current_pos;
     357         132 :     method->get_file_size = dir_get_file_size;
     358         132 :     method->close = dir_close;
     359         132 :     method->sync = dir_sync;
     360         132 :     method->existsfile = dir_existsfile;
     361         132 :     method->finish = dir_finish;
     362         132 :     method->getlasterror = dir_getlasterror;
     363             : 
     364         132 :     dir_data = pg_malloc0(sizeof(DirectoryMethodData));
     365         132 :     dir_data->compression = compression;
     366         132 :     dir_data->basedir = pg_strdup(basedir);
     367         132 :     dir_data->sync = sync;
     368             : 
     369         132 :     return method;
     370             : }
     371             : 
     372             : void
     373         130 : FreeWalDirectoryMethod(void)
     374             : {
     375         130 :     pg_free(dir_data->basedir);
     376         130 :     pg_free(dir_data);
     377         130 : }
     378             : 
     379             : 
     380             : /*-------------------------------------------------------------------------
     381             :  * WalTarMethod - write wal to a tar file containing pg_wal contents
     382             :  *-------------------------------------------------------------------------
     383             :  */
     384             : 
     385             : typedef struct TarMethodFile
     386             : {
     387             :     off_t       ofs_start;      /* Where does the *header* for this file start */
     388             :     off_t       currpos;
     389             :     char        header[TAR_BLOCK_SIZE];
     390             :     char       *pathname;
     391             :     size_t      pad_to_size;
     392             : } TarMethodFile;
     393             : 
     394             : typedef struct TarMethodData
     395             : {
     396             :     char       *tarfilename;
     397             :     int         fd;
     398             :     int         compression;
     399             :     bool        sync;
     400             :     TarMethodFile *currentfile;
     401             :     char        lasterror[1024];
     402             : #ifdef HAVE_LIBZ
     403             :     z_streamp   zp;
     404             :     void       *zlibOut;
     405             : #endif
     406             : } TarMethodData;
     407             : static TarMethodData *tar_data = NULL;
     408             : 
     409             : #define tar_clear_error() tar_data->lasterror[0] = '\0'
     410             : #define tar_set_error(msg) strlcpy(tar_data->lasterror, _(msg), sizeof(tar_data->lasterror))
     411             : 
     412             : static const char *
     413           0 : tar_getlasterror(void)
     414             : {
     415             :     /*
     416             :      * If a custom error is set, return that one. Otherwise, assume errno is
     417             :      * set and return that one.
     418             :      */
     419           0 :     if (tar_data->lasterror[0])
     420           0 :         return tar_data->lasterror;
     421           0 :     return strerror(errno);
     422             : }
     423             : 
     424             : #ifdef HAVE_LIBZ
     425             : static bool
     426           0 : tar_write_compressed_data(void *buf, size_t count, bool flush)
     427             : {
     428           0 :     tar_data->zp->next_in = buf;
     429           0 :     tar_data->zp->avail_in = count;
     430             : 
     431           0 :     while (tar_data->zp->avail_in || flush)
     432             :     {
     433             :         int         r;
     434             : 
     435           0 :         r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
     436           0 :         if (r == Z_STREAM_ERROR)
     437             :         {
     438           0 :             tar_set_error("could not compress data");
     439           0 :             return false;
     440             :         }
     441             : 
     442           0 :         if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
     443             :         {
     444           0 :             size_t      len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
     445             : 
     446           0 :             errno = 0;
     447           0 :             if (write(tar_data->fd, tar_data->zlibOut, len) != len)
     448             :             {
     449             :                 /*
     450             :                  * If write didn't set errno, assume problem is no disk space.
     451             :                  */
     452           0 :                 if (errno == 0)
     453           0 :                     errno = ENOSPC;
     454           0 :                 return false;
     455             :             }
     456             : 
     457           0 :             tar_data->zp->next_out = tar_data->zlibOut;
     458           0 :             tar_data->zp->avail_out = ZLIB_OUT_SIZE;
     459             :         }
     460             : 
     461           0 :         if (r == Z_STREAM_END)
     462           0 :             break;
     463             :     }
     464             : 
     465           0 :     if (flush)
     466             :     {
     467             :         /* Reset the stream for writing */
     468           0 :         if (deflateReset(tar_data->zp) != Z_OK)
     469             :         {
     470           0 :             tar_set_error("could not reset compression stream");
     471           0 :             return false;
     472             :         }
     473             :     }
     474             : 
     475           0 :     return true;
     476             : }
     477             : #endif
     478             : 
     479             : static ssize_t
     480       16718 : tar_write(Walfile f, const void *buf, size_t count)
     481             : {
     482             :     ssize_t     r;
     483             : 
     484             :     Assert(f != NULL);
     485       16718 :     tar_clear_error();
     486             : 
     487             :     /* Tarfile will always be positioned at the end */
     488       16718 :     if (!tar_data->compression)
     489             :     {
     490       16718 :         r = write(tar_data->fd, buf, count);
     491       16718 :         if (r > 0)
     492       16718 :             ((TarMethodFile *) f)->currpos += r;
     493       16718 :         return r;
     494             :     }
     495             : #ifdef HAVE_LIBZ
     496             :     else
     497             :     {
     498           0 :         if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
     499           0 :             return -1;
     500           0 :         ((TarMethodFile *) f)->currpos += count;
     501           0 :         return count;
     502             :     }
     503             : #else
     504             :     else
     505             :         /* Can't happen - compression enabled with no libz */
     506             :         return -1;
     507             : #endif
     508             : }
     509             : 
     510             : static bool
     511           8 : tar_write_padding_data(TarMethodFile *f, size_t bytes)
     512             : {
     513             :     PGAlignedXLogBlock zerobuf;
     514           8 :     size_t      bytesleft = bytes;
     515             : 
     516           8 :     memset(zerobuf.data, 0, XLOG_BLCKSZ);
     517       16392 :     while (bytesleft)
     518             :     {
     519       16384 :         size_t      bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
     520       16384 :         ssize_t     r = tar_write(f, zerobuf.data, bytestowrite);
     521             : 
     522       16384 :         if (r < 0)
     523           0 :             return false;
     524       16384 :         bytesleft -= r;
     525             :     }
     526             : 
     527           8 :     return true;
     528             : }
     529             : 
     530             : static Walfile
     531           8 : tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
     532             : {
     533             :     int         save_errno;
     534             :     static char tmppath[MAXPGPATH];
     535             : 
     536           8 :     tar_clear_error();
     537             : 
     538           8 :     if (tar_data->fd < 0)
     539             :     {
     540             :         /*
     541             :          * We open the tar file only when we first try to write to it.
     542             :          */
     543           8 :         tar_data->fd = open(tar_data->tarfilename,
     544             :                             O_WRONLY | O_CREAT | PG_BINARY,
     545             :                             pg_file_create_mode);
     546           8 :         if (tar_data->fd < 0)
     547           0 :             return NULL;
     548             : 
     549             : #ifdef HAVE_LIBZ
     550           8 :         if (tar_data->compression)
     551             :         {
     552           0 :             tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
     553           0 :             tar_data->zp->zalloc = Z_NULL;
     554           0 :             tar_data->zp->zfree = Z_NULL;
     555           0 :             tar_data->zp->opaque = Z_NULL;
     556           0 :             tar_data->zp->next_out = tar_data->zlibOut;
     557           0 :             tar_data->zp->avail_out = ZLIB_OUT_SIZE;
     558             : 
     559             :             /*
     560             :              * Initialize deflation library. Adding the magic value 16 to the
     561             :              * default 15 for the windowBits parameter makes the output be
     562             :              * gzip instead of zlib.
     563             :              */
     564           0 :             if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
     565             :             {
     566           0 :                 pg_free(tar_data->zp);
     567           0 :                 tar_data->zp = NULL;
     568           0 :                 tar_set_error("could not initialize compression library");
     569           0 :                 return NULL;
     570             :             }
     571             :         }
     572             : #endif
     573             : 
     574             :         /* There's no tar header itself, the file starts with regular files */
     575             :     }
     576             : 
     577             :     Assert(tar_data->currentfile == NULL);
     578           8 :     if (tar_data->currentfile != NULL)
     579             :     {
     580           0 :         tar_set_error("implementation error: tar files can't have more than one open file");
     581           0 :         return NULL;
     582             :     }
     583             : 
     584           8 :     tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
     585             : 
     586           8 :     snprintf(tmppath, sizeof(tmppath), "%s%s",
     587             :              pathname, temp_suffix ? temp_suffix : "");
     588             : 
     589             :     /* Create a header with size set to 0 - we will fill out the size on close */
     590           8 :     if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
     591             :     {
     592           0 :         pg_free(tar_data->currentfile);
     593           0 :         tar_data->currentfile = NULL;
     594           0 :         tar_set_error("could not create tar header");
     595           0 :         return NULL;
     596             :     }
     597             : 
     598             : #ifdef HAVE_LIBZ
     599           8 :     if (tar_data->compression)
     600             :     {
     601             :         /* Flush existing data */
     602           0 :         if (!tar_write_compressed_data(NULL, 0, true))
     603           0 :             return NULL;
     604             : 
     605             :         /* Turn off compression for header */
     606           0 :         if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
     607             :         {
     608           0 :             tar_set_error("could not change compression parameters");
     609           0 :             return NULL;
     610             :         }
     611             :     }
     612             : #endif
     613             : 
     614           8 :     tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
     615           8 :     if (tar_data->currentfile->ofs_start == -1)
     616             :     {
     617           0 :         save_errno = errno;
     618           0 :         pg_free(tar_data->currentfile);
     619           0 :         tar_data->currentfile = NULL;
     620           0 :         errno = save_errno;
     621           0 :         return NULL;
     622             :     }
     623           8 :     tar_data->currentfile->currpos = 0;
     624             : 
     625           8 :     if (!tar_data->compression)
     626             :     {
     627           8 :         errno = 0;
     628           8 :         if (write(tar_data->fd, tar_data->currentfile->header,
     629             :                   TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
     630             :         {
     631           0 :             save_errno = errno;
     632           0 :             pg_free(tar_data->currentfile);
     633           0 :             tar_data->currentfile = NULL;
     634             :             /* if write didn't set errno, assume problem is no disk space */
     635           0 :             errno = save_errno ? save_errno : ENOSPC;
     636           0 :             return NULL;
     637             :         }
     638             :     }
     639             : #ifdef HAVE_LIBZ
     640             :     else
     641             :     {
     642             :         /* Write header through the zlib APIs but with no compression */
     643           0 :         if (!tar_write_compressed_data(tar_data->currentfile->header,
     644             :                                        TAR_BLOCK_SIZE, true))
     645           0 :             return NULL;
     646             : 
     647             :         /* Re-enable compression for the rest of the file */
     648           0 :         if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
     649             :         {
     650           0 :             tar_set_error("could not change compression parameters");
     651           0 :             return NULL;
     652             :         }
     653             :     }
     654             : #endif
     655             : 
     656           8 :     tar_data->currentfile->pathname = pg_strdup(pathname);
     657             : 
     658             :     /*
     659             :      * Uncompressed files are padded on creation, but for compression we can't
     660             :      * do that
     661             :      */
     662           8 :     if (pad_to_size)
     663             :     {
     664           8 :         tar_data->currentfile->pad_to_size = pad_to_size;
     665           8 :         if (!tar_data->compression)
     666             :         {
     667             :             /* Uncompressed, so pad now */
     668           8 :             tar_write_padding_data(tar_data->currentfile, pad_to_size);
     669             :             /* Seek back to start */
     670          16 :             if (lseek(tar_data->fd,
     671           8 :                       tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
     672           8 :                       SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
     673           0 :                 return NULL;
     674             : 
     675           8 :             tar_data->currentfile->currpos = 0;
     676             :         }
     677             :     }
     678             : 
     679           8 :     return tar_data->currentfile;
     680             : }
     681             : 
     682             : static ssize_t
     683           0 : tar_get_file_size(const char *pathname)
     684             : {
     685           0 :     tar_clear_error();
     686             : 
     687             :     /* Currently not used, so not supported */
     688           0 :     errno = ENOSYS;
     689           0 :     return -1;
     690             : }
     691             : 
     692             : static off_t
     693         342 : tar_get_current_pos(Walfile f)
     694             : {
     695             :     Assert(f != NULL);
     696         342 :     tar_clear_error();
     697             : 
     698         342 :     return ((TarMethodFile *) f)->currpos;
     699             : }
     700             : 
     701             : static int
     702           8 : tar_sync(Walfile f)
     703             : {
     704             :     Assert(f != NULL);
     705           8 :     tar_clear_error();
     706             : 
     707           8 :     if (!tar_data->sync)
     708           8 :         return 0;
     709             : 
     710             :     /*
     711             :      * Always sync the whole tarfile, because that's all we can do. This makes
     712             :      * no sense on compressed files, so just ignore those.
     713             :      */
     714           0 :     if (tar_data->compression)
     715           0 :         return 0;
     716             : 
     717           0 :     return fsync(tar_data->fd);
     718             : }
     719             : 
     720             : static int
     721           8 : tar_close(Walfile f, WalCloseMethod method)
     722             : {
     723             :     ssize_t     filesize;
     724             :     int         padding;
     725           8 :     TarMethodFile *tf = (TarMethodFile *) f;
     726             : 
     727             :     Assert(f != NULL);
     728           8 :     tar_clear_error();
     729             : 
     730           8 :     if (method == CLOSE_UNLINK)
     731             :     {
     732           0 :         if (tar_data->compression)
     733             :         {
     734           0 :             tar_set_error("unlink not supported with compression");
     735           0 :             return -1;
     736             :         }
     737             : 
     738             :         /*
     739             :          * Unlink the file that we just wrote to the tar. We do this by
     740             :          * truncating it to the start of the header. This is safe as we only
     741             :          * allow writing of the very last file.
     742             :          */
     743           0 :         if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
     744           0 :             return -1;
     745             : 
     746           0 :         pg_free(tf->pathname);
     747           0 :         pg_free(tf);
     748           0 :         tar_data->currentfile = NULL;
     749             : 
     750           0 :         return 0;
     751             :     }
     752             : 
     753             :     /*
     754             :      * Pad the file itself with zeroes if necessary. Note that this is
     755             :      * different from the tar format padding -- this is the padding we asked
     756             :      * for when the file was opened.
     757             :      */
     758           8 :     if (tf->pad_to_size)
     759             :     {
     760           8 :         if (tar_data->compression)
     761             :         {
     762             :             /*
     763             :              * A compressed tarfile is padded on close since we cannot know
     764             :              * the size of the compressed output until the end.
     765             :              */
     766           0 :             size_t      sizeleft = tf->pad_to_size - tf->currpos;
     767             : 
     768           0 :             if (sizeleft)
     769             :             {
     770           0 :                 if (!tar_write_padding_data(tf, sizeleft))
     771           0 :                     return -1;
     772             :             }
     773             :         }
     774             :         else
     775             :         {
     776             :             /*
     777             :              * An uncompressed tarfile was padded on creation, so just adjust
     778             :              * the current position as if we seeked to the end.
     779             :              */
     780           8 :             tf->currpos = tf->pad_to_size;
     781             :         }
     782             :     }
     783             : 
     784             :     /*
     785             :      * Get the size of the file, and pad out to a multiple of the tar block
     786             :      * size.
     787             :      */
     788           8 :     filesize = tar_get_current_pos(f);
     789           8 :     padding = tarPaddingBytesRequired(filesize);
     790           8 :     if (padding)
     791             :     {
     792             :         char        zerobuf[TAR_BLOCK_SIZE];
     793             : 
     794           0 :         MemSet(zerobuf, 0, padding);
     795           0 :         if (tar_write(f, zerobuf, padding) != padding)
     796           0 :             return -1;
     797             :     }
     798             : 
     799             : 
     800             : #ifdef HAVE_LIBZ
     801           8 :     if (tar_data->compression)
     802             :     {
     803             :         /* Flush the current buffer */
     804           0 :         if (!tar_write_compressed_data(NULL, 0, true))
     805             :         {
     806           0 :             errno = EINVAL;
     807           0 :             return -1;
     808             :         }
     809             :     }
     810             : #endif
     811             : 
     812             :     /*
     813             :      * Now go back and update the header with the correct filesize and
     814             :      * possibly also renaming the file. We overwrite the entire current header
     815             :      * when done, including the checksum.
     816             :      */
     817           8 :     print_tar_number(&(tf->header[124]), 12, filesize);
     818             : 
     819           8 :     if (method == CLOSE_NORMAL)
     820             : 
     821             :         /*
     822             :          * We overwrite it with what it was before if we have no tempname,
     823             :          * since we're going to write the buffer anyway.
     824             :          */
     825           8 :         strlcpy(&(tf->header[0]), tf->pathname, 100);
     826             : 
     827           8 :     print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
     828           8 :     if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
     829           0 :         return -1;
     830           8 :     if (!tar_data->compression)
     831             :     {
     832           8 :         errno = 0;
     833           8 :         if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
     834             :         {
     835             :             /* if write didn't set errno, assume problem is no disk space */
     836           0 :             if (errno == 0)
     837           0 :                 errno = ENOSPC;
     838           0 :             return -1;
     839             :         }
     840             :     }
     841             : #ifdef HAVE_LIBZ
     842             :     else
     843             :     {
     844             :         /* Turn off compression */
     845           0 :         if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
     846             :         {
     847           0 :             tar_set_error("could not change compression parameters");
     848           0 :             return -1;
     849             :         }
     850             : 
     851             :         /* Overwrite the header, assuming the size will be the same */
     852           0 :         if (!tar_write_compressed_data(tar_data->currentfile->header,
     853             :                                        TAR_BLOCK_SIZE, true))
     854           0 :             return -1;
     855             : 
     856             :         /* Turn compression back on */
     857           0 :         if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
     858             :         {
     859           0 :             tar_set_error("could not change compression parameters");
     860           0 :             return -1;
     861             :         }
     862             :     }
     863             : #endif
     864             : 
     865             :     /* Move file pointer back down to end, so we can write the next file */
     866           8 :     if (lseek(tar_data->fd, 0, SEEK_END) < 0)
     867           0 :         return -1;
     868             : 
     869             :     /* Always fsync on close, so the padding gets fsynced */
     870           8 :     if (tar_sync(f) < 0)
     871           0 :         exit(1);
     872             : 
     873             :     /* Clean up and done */
     874           8 :     pg_free(tf->pathname);
     875           8 :     pg_free(tf);
     876           8 :     tar_data->currentfile = NULL;
     877             : 
     878           8 :     return 0;
     879             : }
     880             : 
     881             : static bool
     882           8 : tar_existsfile(const char *pathname)
     883             : {
     884           8 :     tar_clear_error();
     885             :     /* We only deal with new tarfiles, so nothing externally created exists */
     886           8 :     return false;
     887             : }
     888             : 
     889             : static bool
     890           8 : tar_finish(void)
     891             : {
     892             :     char        zerobuf[1024];
     893             : 
     894           8 :     tar_clear_error();
     895             : 
     896           8 :     if (tar_data->currentfile)
     897             :     {
     898           0 :         if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
     899           0 :             return false;
     900             :     }
     901             : 
     902             :     /* A tarfile always ends with two empty blocks */
     903        1032 :     MemSet(zerobuf, 0, sizeof(zerobuf));
     904           8 :     if (!tar_data->compression)
     905             :     {
     906           8 :         errno = 0;
     907           8 :         if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
     908             :         {
     909             :             /* if write didn't set errno, assume problem is no disk space */
     910           0 :             if (errno == 0)
     911           0 :                 errno = ENOSPC;
     912           0 :             return false;
     913             :         }
     914             :     }
     915             : #ifdef HAVE_LIBZ
     916             :     else
     917             :     {
     918           0 :         if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
     919           0 :             return false;
     920             : 
     921             :         /* Also flush all data to make sure the gzip stream is finished */
     922           0 :         tar_data->zp->next_in = NULL;
     923           0 :         tar_data->zp->avail_in = 0;
     924             :         while (true)
     925           0 :         {
     926             :             int         r;
     927             : 
     928           0 :             r = deflate(tar_data->zp, Z_FINISH);
     929             : 
     930           0 :             if (r == Z_STREAM_ERROR)
     931             :             {
     932           0 :                 tar_set_error("could not compress data");
     933           0 :                 return false;
     934             :             }
     935           0 :             if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
     936             :             {
     937           0 :                 size_t      len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
     938             : 
     939           0 :                 errno = 0;
     940           0 :                 if (write(tar_data->fd, tar_data->zlibOut, len) != len)
     941             :                 {
     942             :                     /*
     943             :                      * If write didn't set errno, assume problem is no disk
     944             :                      * space.
     945             :                      */
     946           0 :                     if (errno == 0)
     947           0 :                         errno = ENOSPC;
     948           0 :                     return false;
     949             :                 }
     950             :             }
     951           0 :             if (r == Z_STREAM_END)
     952           0 :                 break;
     953             :         }
     954             : 
     955           0 :         if (deflateEnd(tar_data->zp) != Z_OK)
     956             :         {
     957           0 :             tar_set_error("could not close compression stream");
     958           0 :             return false;
     959             :         }
     960             :     }
     961             : #endif
     962             : 
     963             :     /* sync the empty blocks as well, since they're after the last file */
     964           8 :     if (tar_data->sync)
     965             :     {
     966           0 :         if (fsync(tar_data->fd) != 0)
     967           0 :             return false;
     968             :     }
     969             : 
     970           8 :     if (close(tar_data->fd) != 0)
     971           0 :         return false;
     972             : 
     973           8 :     tar_data->fd = -1;
     974             : 
     975           8 :     if (tar_data->sync)
     976             :     {
     977           0 :         if (fsync_fname(tar_data->tarfilename, false) != 0)
     978           0 :             return false;
     979           0 :         if (fsync_parent_path(tar_data->tarfilename) != 0)
     980           0 :             return false;
     981             :     }
     982             : 
     983           8 :     return true;
     984             : }
     985             : 
     986             : WalWriteMethod *
     987           8 : CreateWalTarMethod(const char *tarbase, int compression, bool sync)
     988             : {
     989             :     WalWriteMethod *method;
     990           8 :     const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
     991             : 
     992           8 :     method = pg_malloc0(sizeof(WalWriteMethod));
     993           8 :     method->open_for_write = tar_open_for_write;
     994           8 :     method->write = tar_write;
     995           8 :     method->get_current_pos = tar_get_current_pos;
     996           8 :     method->get_file_size = tar_get_file_size;
     997           8 :     method->close = tar_close;
     998           8 :     method->sync = tar_sync;
     999           8 :     method->existsfile = tar_existsfile;
    1000           8 :     method->finish = tar_finish;
    1001           8 :     method->getlasterror = tar_getlasterror;
    1002             : 
    1003           8 :     tar_data = pg_malloc0(sizeof(TarMethodData));
    1004           8 :     tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
    1005           8 :     sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
    1006           8 :     tar_data->fd = -1;
    1007           8 :     tar_data->compression = compression;
    1008           8 :     tar_data->sync = sync;
    1009             : #ifdef HAVE_LIBZ
    1010           8 :     if (compression)
    1011           0 :         tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
    1012             : #endif
    1013             : 
    1014           8 :     return method;
    1015             : }
    1016             : 
    1017             : void
    1018           8 : FreeWalTarMethod(void)
    1019             : {
    1020           8 :     pg_free(tar_data->tarfilename);
    1021             : #ifdef HAVE_LIBZ
    1022           8 :     if (tar_data->compression)
    1023           0 :         pg_free(tar_data->zlibOut);
    1024             : #endif
    1025           8 :     pg_free(tar_data);
    1026           8 : }

Generated by: LCOV version 1.13