Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walmethods.c - implementations of different ways to write received wal
4 : *
5 : * NOTE! The caller must ensure that only one method is instantiated in
6 : * any given program, and that it's only instantiated once!
7 : *
8 : * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
9 : *
10 : * IDENTIFICATION
11 : * src/bin/pg_basebackup/walmethods.c
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 :
17 : #include <sys/stat.h>
18 : #include <time.h>
19 : #include <unistd.h>
20 : #ifdef HAVE_LIBZ
21 : #include <zlib.h>
22 : #endif
23 :
24 : #include "common/file_perm.h"
25 : #include "common/file_utils.h"
26 : #include "pgtar.h"
27 : #include "receivelog.h"
28 : #include "streamutil.h"
29 :
30 : /* Size of zlib buffer for .tar.gz */
31 : #define ZLIB_OUT_SIZE 4096
32 :
33 : /*-------------------------------------------------------------------------
34 : * WalDirectoryMethod - write wal to a directory looking like pg_wal
35 : *-------------------------------------------------------------------------
36 : */
37 :
38 : /*
39 : * Global static data for this method
40 : */
41 : typedef struct DirectoryMethodData
42 : {
43 : char *basedir;
44 : int compression;
45 : bool sync;
46 : } DirectoryMethodData;
47 : static DirectoryMethodData *dir_data = NULL;
48 :
49 : /*
50 : * Local file handle
51 : */
52 : typedef struct DirectoryMethodFile
53 : {
54 : int fd;
55 : off_t currpos;
56 : char *pathname;
57 : char *fullpath;
58 : char *temp_suffix;
59 : #ifdef HAVE_LIBZ
60 : gzFile gzfp;
61 : #endif
62 : } DirectoryMethodFile;
63 :
64 : static const char *
65 0 : dir_getlasterror(void)
66 : {
67 : /* Directory method always sets errno, so just use strerror */
68 0 : return strerror(errno);
69 : }
70 :
71 : static Walfile
72 154 : dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
73 : {
74 : static char tmppath[MAXPGPATH];
75 : int fd;
76 : DirectoryMethodFile *f;
77 : #ifdef HAVE_LIBZ
78 154 : gzFile gzfp = NULL;
79 : #endif
80 :
81 308 : snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
82 154 : dir_data->basedir, pathname,
83 154 : dir_data->compression > 0 ? ".gz" : "",
84 : temp_suffix ? temp_suffix : "");
85 :
86 : /*
87 : * Open a file for non-compressed as well as compressed files. Tracking
88 : * the file descriptor is important for dir_sync() method as gzflush()
89 : * does not do any system calls to fsync() to make changes permanent on
90 : * disk.
91 : */
92 154 : fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
93 154 : if (fd < 0)
94 0 : return NULL;
95 :
96 : #ifdef HAVE_LIBZ
97 154 : if (dir_data->compression > 0)
98 : {
99 0 : gzfp = gzdopen(fd, "wb");
100 0 : if (gzfp == NULL)
101 : {
102 0 : close(fd);
103 0 : return NULL;
104 : }
105 :
106 0 : if (gzsetparams(gzfp, dir_data->compression,
107 : Z_DEFAULT_STRATEGY) != Z_OK)
108 : {
109 0 : gzclose(gzfp);
110 0 : return NULL;
111 : }
112 : }
113 : #endif
114 :
115 : /* Do pre-padding on non-compressed files */
116 154 : if (pad_to_size && dir_data->compression == 0)
117 : {
118 : PGAlignedXLogBlock zerobuf;
119 : int bytes;
120 :
121 130 : memset(zerobuf.data, 0, XLOG_BLCKSZ);
122 262530 : for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
123 : {
124 262400 : errno = 0;
125 262400 : if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
126 : {
127 0 : int save_errno = errno;
128 :
129 0 : close(fd);
130 :
131 : /*
132 : * If write didn't set errno, assume problem is no disk space.
133 : */
134 0 : errno = save_errno ? save_errno : ENOSPC;
135 0 : return NULL;
136 : }
137 : }
138 :
139 130 : if (lseek(fd, 0, SEEK_SET) != 0)
140 : {
141 0 : int save_errno = errno;
142 :
143 0 : close(fd);
144 0 : errno = save_errno;
145 0 : return NULL;
146 : }
147 : }
148 :
149 : /*
150 : * fsync WAL file and containing directory, to ensure the file is
151 : * persistently created and zeroed (if padded). That's particularly
152 : * important when using synchronous mode, where the file is modified and
153 : * fsynced in-place, without a directory fsync.
154 : */
155 154 : if (dir_data->sync)
156 : {
157 4 : if (fsync_fname(tmppath, false) != 0 ||
158 2 : fsync_parent_path(tmppath) != 0)
159 : {
160 : #ifdef HAVE_LIBZ
161 0 : if (dir_data->compression > 0)
162 0 : gzclose(gzfp);
163 : else
164 : #endif
165 0 : close(fd);
166 0 : return NULL;
167 : }
168 : }
169 :
170 154 : f = pg_malloc0(sizeof(DirectoryMethodFile));
171 : #ifdef HAVE_LIBZ
172 154 : if (dir_data->compression > 0)
173 0 : f->gzfp = gzfp;
174 : #endif
175 154 : f->fd = fd;
176 154 : f->currpos = 0;
177 154 : f->pathname = pg_strdup(pathname);
178 154 : f->fullpath = pg_strdup(tmppath);
179 154 : if (temp_suffix)
180 2 : f->temp_suffix = pg_strdup(temp_suffix);
181 :
182 154 : return f;
183 : }
184 :
185 : static ssize_t
186 4450 : dir_write(Walfile f, const void *buf, size_t count)
187 : {
188 : ssize_t r;
189 4450 : DirectoryMethodFile *df = (DirectoryMethodFile *) f;
190 :
191 : Assert(f != NULL);
192 :
193 : #ifdef HAVE_LIBZ
194 4450 : if (dir_data->compression > 0)
195 0 : r = (ssize_t) gzwrite(df->gzfp, buf, count);
196 : else
197 : #endif
198 4450 : r = write(df->fd, buf, count);
199 4450 : if (r > 0)
200 4450 : df->currpos += r;
201 4450 : return r;
202 : }
203 :
204 : static off_t
205 4450 : dir_get_current_pos(Walfile f)
206 : {
207 : Assert(f != NULL);
208 :
209 : /* Use a cached value to prevent lots of reseeks */
210 4450 : return ((DirectoryMethodFile *) f)->currpos;
211 : }
212 :
213 : static int
214 154 : dir_close(Walfile f, WalCloseMethod method)
215 : {
216 : int r;
217 154 : DirectoryMethodFile *df = (DirectoryMethodFile *) f;
218 : static char tmppath[MAXPGPATH];
219 : static char tmppath2[MAXPGPATH];
220 :
221 : Assert(f != NULL);
222 :
223 : #ifdef HAVE_LIBZ
224 154 : if (dir_data->compression > 0)
225 0 : r = gzclose(df->gzfp);
226 : else
227 : #endif
228 154 : r = close(df->fd);
229 :
230 154 : if (r == 0)
231 : {
232 : /* Build path to the current version of the file */
233 154 : if (method == CLOSE_NORMAL && df->temp_suffix)
234 : {
235 : /*
236 : * If we have a temp prefix, normal operation is to rename the
237 : * file.
238 : */
239 0 : snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
240 0 : dir_data->basedir, df->pathname,
241 0 : dir_data->compression > 0 ? ".gz" : "",
242 : df->temp_suffix);
243 0 : snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
244 0 : dir_data->basedir, df->pathname,
245 0 : dir_data->compression > 0 ? ".gz" : "");
246 0 : r = durable_rename(tmppath, tmppath2);
247 : }
248 154 : else if (method == CLOSE_UNLINK)
249 : {
250 : /* Unlink the file once it's closed */
251 0 : snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
252 0 : dir_data->basedir, df->pathname,
253 0 : dir_data->compression > 0 ? ".gz" : "",
254 0 : df->temp_suffix ? df->temp_suffix : "");
255 0 : r = unlink(tmppath);
256 : }
257 : else
258 : {
259 : /*
260 : * Else either CLOSE_NORMAL and no temp suffix, or
261 : * CLOSE_NO_RENAME. In this case, fsync the file and containing
262 : * directory if sync mode is requested.
263 : */
264 154 : if (dir_data->sync)
265 : {
266 2 : r = fsync_fname(df->fullpath, false);
267 2 : if (r == 0)
268 2 : r = fsync_parent_path(df->fullpath);
269 : }
270 : }
271 : }
272 :
273 154 : pg_free(df->pathname);
274 154 : pg_free(df->fullpath);
275 154 : if (df->temp_suffix)
276 2 : pg_free(df->temp_suffix);
277 154 : pg_free(df);
278 :
279 154 : return r;
280 : }
281 :
282 : static int
283 0 : dir_sync(Walfile f)
284 : {
285 : Assert(f != NULL);
286 :
287 0 : if (!dir_data->sync)
288 0 : return 0;
289 :
290 : #ifdef HAVE_LIBZ
291 0 : if (dir_data->compression > 0)
292 : {
293 0 : if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
294 0 : return -1;
295 : }
296 : #endif
297 :
298 0 : return fsync(((DirectoryMethodFile *) f)->fd);
299 : }
300 :
301 : static ssize_t
302 0 : dir_get_file_size(const char *pathname)
303 : {
304 : struct stat statbuf;
305 : static char tmppath[MAXPGPATH];
306 :
307 0 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
308 0 : dir_data->basedir, pathname);
309 :
310 0 : if (stat(tmppath, &statbuf) != 0)
311 0 : return -1;
312 :
313 0 : return statbuf.st_size;
314 : }
315 :
316 : static bool
317 130 : dir_existsfile(const char *pathname)
318 : {
319 : static char tmppath[MAXPGPATH];
320 : int fd;
321 :
322 130 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
323 130 : dir_data->basedir, pathname);
324 :
325 130 : fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
326 130 : if (fd < 0)
327 130 : return false;
328 0 : close(fd);
329 0 : return true;
330 : }
331 :
332 : static bool
333 130 : dir_finish(void)
334 : {
335 130 : if (dir_data->sync)
336 : {
337 : /*
338 : * Files are fsynced when they are closed, but we need to fsync the
339 : * directory entry here as well.
340 : */
341 2 : if (fsync_fname(dir_data->basedir, true) != 0)
342 0 : return false;
343 : }
344 130 : return true;
345 : }
346 :
347 :
348 : WalWriteMethod *
349 132 : CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
350 : {
351 : WalWriteMethod *method;
352 :
353 132 : method = pg_malloc0(sizeof(WalWriteMethod));
354 132 : method->open_for_write = dir_open_for_write;
355 132 : method->write = dir_write;
356 132 : method->get_current_pos = dir_get_current_pos;
357 132 : method->get_file_size = dir_get_file_size;
358 132 : method->close = dir_close;
359 132 : method->sync = dir_sync;
360 132 : method->existsfile = dir_existsfile;
361 132 : method->finish = dir_finish;
362 132 : method->getlasterror = dir_getlasterror;
363 :
364 132 : dir_data = pg_malloc0(sizeof(DirectoryMethodData));
365 132 : dir_data->compression = compression;
366 132 : dir_data->basedir = pg_strdup(basedir);
367 132 : dir_data->sync = sync;
368 :
369 132 : return method;
370 : }
371 :
372 : void
373 130 : FreeWalDirectoryMethod(void)
374 : {
375 130 : pg_free(dir_data->basedir);
376 130 : pg_free(dir_data);
377 130 : }
378 :
379 :
380 : /*-------------------------------------------------------------------------
381 : * WalTarMethod - write wal to a tar file containing pg_wal contents
382 : *-------------------------------------------------------------------------
383 : */
384 :
385 : typedef struct TarMethodFile
386 : {
387 : off_t ofs_start; /* Where does the *header* for this file start */
388 : off_t currpos;
389 : char header[TAR_BLOCK_SIZE];
390 : char *pathname;
391 : size_t pad_to_size;
392 : } TarMethodFile;
393 :
394 : typedef struct TarMethodData
395 : {
396 : char *tarfilename;
397 : int fd;
398 : int compression;
399 : bool sync;
400 : TarMethodFile *currentfile;
401 : char lasterror[1024];
402 : #ifdef HAVE_LIBZ
403 : z_streamp zp;
404 : void *zlibOut;
405 : #endif
406 : } TarMethodData;
407 : static TarMethodData *tar_data = NULL;
408 :
409 : #define tar_clear_error() tar_data->lasterror[0] = '\0'
410 : #define tar_set_error(msg) strlcpy(tar_data->lasterror, _(msg), sizeof(tar_data->lasterror))
411 :
412 : static const char *
413 0 : tar_getlasterror(void)
414 : {
415 : /*
416 : * If a custom error is set, return that one. Otherwise, assume errno is
417 : * set and return that one.
418 : */
419 0 : if (tar_data->lasterror[0])
420 0 : return tar_data->lasterror;
421 0 : return strerror(errno);
422 : }
423 :
424 : #ifdef HAVE_LIBZ
425 : static bool
426 0 : tar_write_compressed_data(void *buf, size_t count, bool flush)
427 : {
428 0 : tar_data->zp->next_in = buf;
429 0 : tar_data->zp->avail_in = count;
430 :
431 0 : while (tar_data->zp->avail_in || flush)
432 : {
433 : int r;
434 :
435 0 : r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
436 0 : if (r == Z_STREAM_ERROR)
437 : {
438 0 : tar_set_error("could not compress data");
439 0 : return false;
440 : }
441 :
442 0 : if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
443 : {
444 0 : size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
445 :
446 0 : errno = 0;
447 0 : if (write(tar_data->fd, tar_data->zlibOut, len) != len)
448 : {
449 : /*
450 : * If write didn't set errno, assume problem is no disk space.
451 : */
452 0 : if (errno == 0)
453 0 : errno = ENOSPC;
454 0 : return false;
455 : }
456 :
457 0 : tar_data->zp->next_out = tar_data->zlibOut;
458 0 : tar_data->zp->avail_out = ZLIB_OUT_SIZE;
459 : }
460 :
461 0 : if (r == Z_STREAM_END)
462 0 : break;
463 : }
464 :
465 0 : if (flush)
466 : {
467 : /* Reset the stream for writing */
468 0 : if (deflateReset(tar_data->zp) != Z_OK)
469 : {
470 0 : tar_set_error("could not reset compression stream");
471 0 : return false;
472 : }
473 : }
474 :
475 0 : return true;
476 : }
477 : #endif
478 :
479 : static ssize_t
480 16718 : tar_write(Walfile f, const void *buf, size_t count)
481 : {
482 : ssize_t r;
483 :
484 : Assert(f != NULL);
485 16718 : tar_clear_error();
486 :
487 : /* Tarfile will always be positioned at the end */
488 16718 : if (!tar_data->compression)
489 : {
490 16718 : r = write(tar_data->fd, buf, count);
491 16718 : if (r > 0)
492 16718 : ((TarMethodFile *) f)->currpos += r;
493 16718 : return r;
494 : }
495 : #ifdef HAVE_LIBZ
496 : else
497 : {
498 0 : if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
499 0 : return -1;
500 0 : ((TarMethodFile *) f)->currpos += count;
501 0 : return count;
502 : }
503 : #else
504 : else
505 : /* Can't happen - compression enabled with no libz */
506 : return -1;
507 : #endif
508 : }
509 :
510 : static bool
511 8 : tar_write_padding_data(TarMethodFile *f, size_t bytes)
512 : {
513 : PGAlignedXLogBlock zerobuf;
514 8 : size_t bytesleft = bytes;
515 :
516 8 : memset(zerobuf.data, 0, XLOG_BLCKSZ);
517 16392 : while (bytesleft)
518 : {
519 16384 : size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
520 16384 : ssize_t r = tar_write(f, zerobuf.data, bytestowrite);
521 :
522 16384 : if (r < 0)
523 0 : return false;
524 16384 : bytesleft -= r;
525 : }
526 :
527 8 : return true;
528 : }
529 :
530 : static Walfile
531 8 : tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
532 : {
533 : int save_errno;
534 : static char tmppath[MAXPGPATH];
535 :
536 8 : tar_clear_error();
537 :
538 8 : if (tar_data->fd < 0)
539 : {
540 : /*
541 : * We open the tar file only when we first try to write to it.
542 : */
543 8 : tar_data->fd = open(tar_data->tarfilename,
544 : O_WRONLY | O_CREAT | PG_BINARY,
545 : pg_file_create_mode);
546 8 : if (tar_data->fd < 0)
547 0 : return NULL;
548 :
549 : #ifdef HAVE_LIBZ
550 8 : if (tar_data->compression)
551 : {
552 0 : tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
553 0 : tar_data->zp->zalloc = Z_NULL;
554 0 : tar_data->zp->zfree = Z_NULL;
555 0 : tar_data->zp->opaque = Z_NULL;
556 0 : tar_data->zp->next_out = tar_data->zlibOut;
557 0 : tar_data->zp->avail_out = ZLIB_OUT_SIZE;
558 :
559 : /*
560 : * Initialize deflation library. Adding the magic value 16 to the
561 : * default 15 for the windowBits parameter makes the output be
562 : * gzip instead of zlib.
563 : */
564 0 : if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
565 : {
566 0 : pg_free(tar_data->zp);
567 0 : tar_data->zp = NULL;
568 0 : tar_set_error("could not initialize compression library");
569 0 : return NULL;
570 : }
571 : }
572 : #endif
573 :
574 : /* There's no tar header itself, the file starts with regular files */
575 : }
576 :
577 : Assert(tar_data->currentfile == NULL);
578 8 : if (tar_data->currentfile != NULL)
579 : {
580 0 : tar_set_error("implementation error: tar files can't have more than one open file");
581 0 : return NULL;
582 : }
583 :
584 8 : tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
585 :
586 8 : snprintf(tmppath, sizeof(tmppath), "%s%s",
587 : pathname, temp_suffix ? temp_suffix : "");
588 :
589 : /* Create a header with size set to 0 - we will fill out the size on close */
590 8 : if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
591 : {
592 0 : pg_free(tar_data->currentfile);
593 0 : tar_data->currentfile = NULL;
594 0 : tar_set_error("could not create tar header");
595 0 : return NULL;
596 : }
597 :
598 : #ifdef HAVE_LIBZ
599 8 : if (tar_data->compression)
600 : {
601 : /* Flush existing data */
602 0 : if (!tar_write_compressed_data(NULL, 0, true))
603 0 : return NULL;
604 :
605 : /* Turn off compression for header */
606 0 : if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
607 : {
608 0 : tar_set_error("could not change compression parameters");
609 0 : return NULL;
610 : }
611 : }
612 : #endif
613 :
614 8 : tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
615 8 : if (tar_data->currentfile->ofs_start == -1)
616 : {
617 0 : save_errno = errno;
618 0 : pg_free(tar_data->currentfile);
619 0 : tar_data->currentfile = NULL;
620 0 : errno = save_errno;
621 0 : return NULL;
622 : }
623 8 : tar_data->currentfile->currpos = 0;
624 :
625 8 : if (!tar_data->compression)
626 : {
627 8 : errno = 0;
628 8 : if (write(tar_data->fd, tar_data->currentfile->header,
629 : TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
630 : {
631 0 : save_errno = errno;
632 0 : pg_free(tar_data->currentfile);
633 0 : tar_data->currentfile = NULL;
634 : /* if write didn't set errno, assume problem is no disk space */
635 0 : errno = save_errno ? save_errno : ENOSPC;
636 0 : return NULL;
637 : }
638 : }
639 : #ifdef HAVE_LIBZ
640 : else
641 : {
642 : /* Write header through the zlib APIs but with no compression */
643 0 : if (!tar_write_compressed_data(tar_data->currentfile->header,
644 : TAR_BLOCK_SIZE, true))
645 0 : return NULL;
646 :
647 : /* Re-enable compression for the rest of the file */
648 0 : if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
649 : {
650 0 : tar_set_error("could not change compression parameters");
651 0 : return NULL;
652 : }
653 : }
654 : #endif
655 :
656 8 : tar_data->currentfile->pathname = pg_strdup(pathname);
657 :
658 : /*
659 : * Uncompressed files are padded on creation, but for compression we can't
660 : * do that
661 : */
662 8 : if (pad_to_size)
663 : {
664 8 : tar_data->currentfile->pad_to_size = pad_to_size;
665 8 : if (!tar_data->compression)
666 : {
667 : /* Uncompressed, so pad now */
668 8 : tar_write_padding_data(tar_data->currentfile, pad_to_size);
669 : /* Seek back to start */
670 16 : if (lseek(tar_data->fd,
671 8 : tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
672 8 : SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
673 0 : return NULL;
674 :
675 8 : tar_data->currentfile->currpos = 0;
676 : }
677 : }
678 :
679 8 : return tar_data->currentfile;
680 : }
681 :
682 : static ssize_t
683 0 : tar_get_file_size(const char *pathname)
684 : {
685 0 : tar_clear_error();
686 :
687 : /* Currently not used, so not supported */
688 0 : errno = ENOSYS;
689 0 : return -1;
690 : }
691 :
692 : static off_t
693 342 : tar_get_current_pos(Walfile f)
694 : {
695 : Assert(f != NULL);
696 342 : tar_clear_error();
697 :
698 342 : return ((TarMethodFile *) f)->currpos;
699 : }
700 :
701 : static int
702 8 : tar_sync(Walfile f)
703 : {
704 : Assert(f != NULL);
705 8 : tar_clear_error();
706 :
707 8 : if (!tar_data->sync)
708 8 : return 0;
709 :
710 : /*
711 : * Always sync the whole tarfile, because that's all we can do. This makes
712 : * no sense on compressed files, so just ignore those.
713 : */
714 0 : if (tar_data->compression)
715 0 : return 0;
716 :
717 0 : return fsync(tar_data->fd);
718 : }
719 :
720 : static int
721 8 : tar_close(Walfile f, WalCloseMethod method)
722 : {
723 : ssize_t filesize;
724 : int padding;
725 8 : TarMethodFile *tf = (TarMethodFile *) f;
726 :
727 : Assert(f != NULL);
728 8 : tar_clear_error();
729 :
730 8 : if (method == CLOSE_UNLINK)
731 : {
732 0 : if (tar_data->compression)
733 : {
734 0 : tar_set_error("unlink not supported with compression");
735 0 : return -1;
736 : }
737 :
738 : /*
739 : * Unlink the file that we just wrote to the tar. We do this by
740 : * truncating it to the start of the header. This is safe as we only
741 : * allow writing of the very last file.
742 : */
743 0 : if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
744 0 : return -1;
745 :
746 0 : pg_free(tf->pathname);
747 0 : pg_free(tf);
748 0 : tar_data->currentfile = NULL;
749 :
750 0 : return 0;
751 : }
752 :
753 : /*
754 : * Pad the file itself with zeroes if necessary. Note that this is
755 : * different from the tar format padding -- this is the padding we asked
756 : * for when the file was opened.
757 : */
758 8 : if (tf->pad_to_size)
759 : {
760 8 : if (tar_data->compression)
761 : {
762 : /*
763 : * A compressed tarfile is padded on close since we cannot know
764 : * the size of the compressed output until the end.
765 : */
766 0 : size_t sizeleft = tf->pad_to_size - tf->currpos;
767 :
768 0 : if (sizeleft)
769 : {
770 0 : if (!tar_write_padding_data(tf, sizeleft))
771 0 : return -1;
772 : }
773 : }
774 : else
775 : {
776 : /*
777 : * An uncompressed tarfile was padded on creation, so just adjust
778 : * the current position as if we seeked to the end.
779 : */
780 8 : tf->currpos = tf->pad_to_size;
781 : }
782 : }
783 :
784 : /*
785 : * Get the size of the file, and pad out to a multiple of the tar block
786 : * size.
787 : */
788 8 : filesize = tar_get_current_pos(f);
789 8 : padding = tarPaddingBytesRequired(filesize);
790 8 : if (padding)
791 : {
792 : char zerobuf[TAR_BLOCK_SIZE];
793 :
794 0 : MemSet(zerobuf, 0, padding);
795 0 : if (tar_write(f, zerobuf, padding) != padding)
796 0 : return -1;
797 : }
798 :
799 :
800 : #ifdef HAVE_LIBZ
801 8 : if (tar_data->compression)
802 : {
803 : /* Flush the current buffer */
804 0 : if (!tar_write_compressed_data(NULL, 0, true))
805 : {
806 0 : errno = EINVAL;
807 0 : return -1;
808 : }
809 : }
810 : #endif
811 :
812 : /*
813 : * Now go back and update the header with the correct filesize and
814 : * possibly also renaming the file. We overwrite the entire current header
815 : * when done, including the checksum.
816 : */
817 8 : print_tar_number(&(tf->header[124]), 12, filesize);
818 :
819 8 : if (method == CLOSE_NORMAL)
820 :
821 : /*
822 : * We overwrite it with what it was before if we have no tempname,
823 : * since we're going to write the buffer anyway.
824 : */
825 8 : strlcpy(&(tf->header[0]), tf->pathname, 100);
826 :
827 8 : print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
828 8 : if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
829 0 : return -1;
830 8 : if (!tar_data->compression)
831 : {
832 8 : errno = 0;
833 8 : if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
834 : {
835 : /* if write didn't set errno, assume problem is no disk space */
836 0 : if (errno == 0)
837 0 : errno = ENOSPC;
838 0 : return -1;
839 : }
840 : }
841 : #ifdef HAVE_LIBZ
842 : else
843 : {
844 : /* Turn off compression */
845 0 : if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
846 : {
847 0 : tar_set_error("could not change compression parameters");
848 0 : return -1;
849 : }
850 :
851 : /* Overwrite the header, assuming the size will be the same */
852 0 : if (!tar_write_compressed_data(tar_data->currentfile->header,
853 : TAR_BLOCK_SIZE, true))
854 0 : return -1;
855 :
856 : /* Turn compression back on */
857 0 : if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
858 : {
859 0 : tar_set_error("could not change compression parameters");
860 0 : return -1;
861 : }
862 : }
863 : #endif
864 :
865 : /* Move file pointer back down to end, so we can write the next file */
866 8 : if (lseek(tar_data->fd, 0, SEEK_END) < 0)
867 0 : return -1;
868 :
869 : /* Always fsync on close, so the padding gets fsynced */
870 8 : if (tar_sync(f) < 0)
871 0 : exit(1);
872 :
873 : /* Clean up and done */
874 8 : pg_free(tf->pathname);
875 8 : pg_free(tf);
876 8 : tar_data->currentfile = NULL;
877 :
878 8 : return 0;
879 : }
880 :
881 : static bool
882 8 : tar_existsfile(const char *pathname)
883 : {
884 8 : tar_clear_error();
885 : /* We only deal with new tarfiles, so nothing externally created exists */
886 8 : return false;
887 : }
888 :
889 : static bool
890 8 : tar_finish(void)
891 : {
892 : char zerobuf[1024];
893 :
894 8 : tar_clear_error();
895 :
896 8 : if (tar_data->currentfile)
897 : {
898 0 : if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
899 0 : return false;
900 : }
901 :
902 : /* A tarfile always ends with two empty blocks */
903 1032 : MemSet(zerobuf, 0, sizeof(zerobuf));
904 8 : if (!tar_data->compression)
905 : {
906 8 : errno = 0;
907 8 : if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
908 : {
909 : /* if write didn't set errno, assume problem is no disk space */
910 0 : if (errno == 0)
911 0 : errno = ENOSPC;
912 0 : return false;
913 : }
914 : }
915 : #ifdef HAVE_LIBZ
916 : else
917 : {
918 0 : if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
919 0 : return false;
920 :
921 : /* Also flush all data to make sure the gzip stream is finished */
922 0 : tar_data->zp->next_in = NULL;
923 0 : tar_data->zp->avail_in = 0;
924 : while (true)
925 0 : {
926 : int r;
927 :
928 0 : r = deflate(tar_data->zp, Z_FINISH);
929 :
930 0 : if (r == Z_STREAM_ERROR)
931 : {
932 0 : tar_set_error("could not compress data");
933 0 : return false;
934 : }
935 0 : if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
936 : {
937 0 : size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
938 :
939 0 : errno = 0;
940 0 : if (write(tar_data->fd, tar_data->zlibOut, len) != len)
941 : {
942 : /*
943 : * If write didn't set errno, assume problem is no disk
944 : * space.
945 : */
946 0 : if (errno == 0)
947 0 : errno = ENOSPC;
948 0 : return false;
949 : }
950 : }
951 0 : if (r == Z_STREAM_END)
952 0 : break;
953 : }
954 :
955 0 : if (deflateEnd(tar_data->zp) != Z_OK)
956 : {
957 0 : tar_set_error("could not close compression stream");
958 0 : return false;
959 : }
960 : }
961 : #endif
962 :
963 : /* sync the empty blocks as well, since they're after the last file */
964 8 : if (tar_data->sync)
965 : {
966 0 : if (fsync(tar_data->fd) != 0)
967 0 : return false;
968 : }
969 :
970 8 : if (close(tar_data->fd) != 0)
971 0 : return false;
972 :
973 8 : tar_data->fd = -1;
974 :
975 8 : if (tar_data->sync)
976 : {
977 0 : if (fsync_fname(tar_data->tarfilename, false) != 0)
978 0 : return false;
979 0 : if (fsync_parent_path(tar_data->tarfilename) != 0)
980 0 : return false;
981 : }
982 :
983 8 : return true;
984 : }
985 :
986 : WalWriteMethod *
987 8 : CreateWalTarMethod(const char *tarbase, int compression, bool sync)
988 : {
989 : WalWriteMethod *method;
990 8 : const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
991 :
992 8 : method = pg_malloc0(sizeof(WalWriteMethod));
993 8 : method->open_for_write = tar_open_for_write;
994 8 : method->write = tar_write;
995 8 : method->get_current_pos = tar_get_current_pos;
996 8 : method->get_file_size = tar_get_file_size;
997 8 : method->close = tar_close;
998 8 : method->sync = tar_sync;
999 8 : method->existsfile = tar_existsfile;
1000 8 : method->finish = tar_finish;
1001 8 : method->getlasterror = tar_getlasterror;
1002 :
1003 8 : tar_data = pg_malloc0(sizeof(TarMethodData));
1004 8 : tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1005 8 : sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
1006 8 : tar_data->fd = -1;
1007 8 : tar_data->compression = compression;
1008 8 : tar_data->sync = sync;
1009 : #ifdef HAVE_LIBZ
1010 8 : if (compression)
1011 0 : tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
1012 : #endif
1013 :
1014 8 : return method;
1015 : }
1016 :
1017 : void
1018 8 : FreeWalTarMethod(void)
1019 : {
1020 8 : pg_free(tar_data->tarfilename);
1021 : #ifdef HAVE_LIBZ
1022 8 : if (tar_data->compression)
1023 0 : pg_free(tar_data->zlibOut);
1024 : #endif
1025 8 : pg_free(tar_data);
1026 8 : }
|