Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * astreamer_gzip.c
4 : *
5 : * Archive streamers that deal with data compressed using gzip.
6 : * astreamer_gzip_writer applies gzip compression to the input data
7 : * and writes the result to a file. astreamer_gzip_decompressor assumes
8 : * that the input stream is compressed using gzip and decompresses it.
9 : *
10 : * Note that the code in this file is asymmetric with what we do for
11 : * other compression types: for lz4 and zstd, there is a compressor and
12 : * a decompressor, rather than a writer and a decompressor. The approach
13 : * taken here is less flexible, because a writer can only write to a file,
14 : * while a compressor can write to a subsequent astreamer which is free
15 : * to do whatever it likes. The reason it's like this is because this
16 : * code was adapted from old, less-modular pg_basebackup code that used
17 : * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
18 : * necessary to change anything at the time.
19 : *
20 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
21 : *
22 : * IDENTIFICATION
23 : * src/bin/pg_basebackup/astreamer_gzip.c
24 : *-------------------------------------------------------------------------
25 : */
26 :
27 : #include "postgres_fe.h"
28 :
29 : #include <unistd.h>
30 :
31 : #ifdef HAVE_LIBZ
32 : #include <zlib.h>
33 : #endif
34 :
35 : #include "common/logging.h"
36 : #include "fe_utils/astreamer.h"
37 :
38 : #ifdef HAVE_LIBZ
39 : typedef struct astreamer_gzip_writer
40 : {
41 : astreamer base;
42 : char *pathname;
43 : gzFile gzfile;
44 : } astreamer_gzip_writer;
45 :
46 : typedef struct astreamer_gzip_decompressor
47 : {
48 : astreamer base;
49 : z_stream zstream;
50 : size_t bytes_written;
51 : } astreamer_gzip_decompressor;
52 :
53 : static void astreamer_gzip_writer_content(astreamer *streamer,
54 : astreamer_member *member,
55 : const char *data, int len,
56 : astreamer_archive_context context);
57 : static void astreamer_gzip_writer_finalize(astreamer *streamer);
58 : static void astreamer_gzip_writer_free(astreamer *streamer);
59 : static const char *get_gz_error(gzFile gzf);
60 :
61 : static const astreamer_ops astreamer_gzip_writer_ops = {
62 : .content = astreamer_gzip_writer_content,
63 : .finalize = astreamer_gzip_writer_finalize,
64 : .free = astreamer_gzip_writer_free
65 : };
66 :
67 : static void astreamer_gzip_decompressor_content(astreamer *streamer,
68 : astreamer_member *member,
69 : const char *data, int len,
70 : astreamer_archive_context context);
71 : static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
72 : static void astreamer_gzip_decompressor_free(astreamer *streamer);
73 : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
74 : static void gzip_pfree(void *opaque, void *address);
75 :
76 : static const astreamer_ops astreamer_gzip_decompressor_ops = {
77 : .content = astreamer_gzip_decompressor_content,
78 : .finalize = astreamer_gzip_decompressor_finalize,
79 : .free = astreamer_gzip_decompressor_free
80 : };
81 : #endif
82 :
83 : /*
84 : * Create a astreamer that just compresses data using gzip, and then writes
85 : * it to a file.
86 : *
87 : * The caller must specify a pathname and may specify a file. The pathname is
88 : * used for error-reporting purposes either way. If file is NULL, the pathname
89 : * also identifies the file to which the data should be written: it is opened
90 : * for writing and closed when done. If file is not NULL, the data is written
91 : * there.
92 : *
93 : * Note that zlib does not use the FILE interface, but operates directly on
94 : * a duplicate of the underlying fd. Hence, callers must take care if they
95 : * plan to write any other data to the same FILE, either before or after using
96 : * this.
97 : */
98 : astreamer *
99 8 : astreamer_gzip_writer_new(char *pathname, FILE *file,
100 : pg_compress_specification *compress)
101 : {
102 : #ifdef HAVE_LIBZ
103 : astreamer_gzip_writer *streamer;
104 :
105 8 : streamer = palloc0(sizeof(astreamer_gzip_writer));
106 8 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
107 : &astreamer_gzip_writer_ops;
108 :
109 8 : streamer->pathname = pstrdup(pathname);
110 :
111 8 : if (file == NULL)
112 : {
113 8 : streamer->gzfile = gzopen(pathname, "wb");
114 8 : if (streamer->gzfile == NULL)
115 0 : pg_fatal("could not create compressed file \"%s\": %m",
116 : pathname);
117 : }
118 : else
119 : {
120 : /*
121 : * We must dup the file handle so that gzclose doesn't break the
122 : * caller's FILE. See comment for astreamer_gzip_writer_finalize.
123 : */
124 0 : int fd = dup(fileno(file));
125 :
126 0 : if (fd < 0)
127 0 : pg_fatal("could not duplicate stdout: %m");
128 :
129 0 : streamer->gzfile = gzdopen(fd, "wb");
130 0 : if (streamer->gzfile == NULL)
131 0 : pg_fatal("could not open output file: %m");
132 : }
133 :
134 8 : if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
135 0 : pg_fatal("could not set compression level %d: %s",
136 : compress->level, get_gz_error(streamer->gzfile));
137 :
138 8 : return &streamer->base;
139 : #else
140 : pg_fatal("this build does not support compression with %s", "gzip");
141 : return NULL; /* keep compiler quiet */
142 : #endif
143 : }
144 :
145 : #ifdef HAVE_LIBZ
146 : /*
147 : * Write archive content to gzip file.
148 : */
149 : static void
150 18572 : astreamer_gzip_writer_content(astreamer *streamer,
151 : astreamer_member *member, const char *data,
152 : int len, astreamer_archive_context context)
153 : {
154 : astreamer_gzip_writer *mystreamer;
155 :
156 18572 : mystreamer = (astreamer_gzip_writer *) streamer;
157 :
158 18572 : if (len == 0)
159 0 : return;
160 :
161 18572 : errno = 0;
162 18572 : if (gzwrite(mystreamer->gzfile, data, len) != len)
163 : {
164 : /* if write didn't set errno, assume problem is no disk space */
165 0 : if (errno == 0)
166 0 : errno = ENOSPC;
167 0 : pg_fatal("could not write to compressed file \"%s\": %s",
168 : mystreamer->pathname, get_gz_error(mystreamer->gzfile));
169 : }
170 : }
171 :
172 : /*
173 : * End-of-archive processing when writing to a gzip file consists of just
174 : * calling gzclose.
175 : *
176 : * It makes no difference whether we opened the file or the caller did it,
177 : * because libz provides no way of avoiding a close on the underlying file
178 : * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
179 : * work around this issue, so that the behavior from the caller's viewpoint
180 : * is the same as for astreamer_plain_writer.
181 : */
182 : static void
183 8 : astreamer_gzip_writer_finalize(astreamer *streamer)
184 : {
185 : astreamer_gzip_writer *mystreamer;
186 :
187 8 : mystreamer = (astreamer_gzip_writer *) streamer;
188 :
189 8 : errno = 0; /* in case gzclose() doesn't set it */
190 8 : if (gzclose(mystreamer->gzfile) != 0)
191 0 : pg_fatal("could not close compressed file \"%s\": %m",
192 : mystreamer->pathname);
193 :
194 8 : mystreamer->gzfile = NULL;
195 8 : }
196 :
197 : /*
198 : * Free memory associated with this astreamer.
199 : */
200 : static void
201 8 : astreamer_gzip_writer_free(astreamer *streamer)
202 : {
203 : astreamer_gzip_writer *mystreamer;
204 :
205 8 : mystreamer = (astreamer_gzip_writer *) streamer;
206 :
207 : Assert(mystreamer->base.bbs_next == NULL);
208 : Assert(mystreamer->gzfile == NULL);
209 :
210 8 : pfree(mystreamer->pathname);
211 8 : pfree(mystreamer);
212 8 : }
213 :
214 : /*
215 : * Helper function for libz error reporting.
216 : */
217 : static const char *
218 0 : get_gz_error(gzFile gzf)
219 : {
220 : int errnum;
221 : const char *errmsg;
222 :
223 0 : errmsg = gzerror(gzf, &errnum);
224 0 : if (errnum == Z_ERRNO)
225 0 : return strerror(errno);
226 : else
227 0 : return errmsg;
228 : }
229 : #endif
230 :
231 : /*
232 : * Create a new base backup streamer that performs decompression of gzip
233 : * compressed blocks.
234 : */
235 : astreamer *
236 8 : astreamer_gzip_decompressor_new(astreamer *next)
237 : {
238 : #ifdef HAVE_LIBZ
239 : astreamer_gzip_decompressor *streamer;
240 : z_stream *zs;
241 :
242 : Assert(next != NULL);
243 :
244 8 : streamer = palloc0(sizeof(astreamer_gzip_decompressor));
245 8 : *((const astreamer_ops **) &streamer->base.bbs_ops) =
246 : &astreamer_gzip_decompressor_ops;
247 :
248 8 : streamer->base.bbs_next = next;
249 8 : initStringInfo(&streamer->base.bbs_buffer);
250 :
251 : /* Initialize internal stream state for decompression */
252 8 : zs = &streamer->zstream;
253 8 : zs->zalloc = gzip_palloc;
254 8 : zs->zfree = gzip_pfree;
255 8 : zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
256 8 : zs->avail_out = streamer->base.bbs_buffer.maxlen;
257 :
258 : /*
259 : * Data compression was initialized using deflateInit2 to request a gzip
260 : * header. Similarly, we are using inflateInit2 to initialize data
261 : * decompression.
262 : *
263 : * Per the documentation for inflateInit2, the second argument is
264 : * "windowBits" and its value must be greater than or equal to the value
265 : * provided while compressing the data, so we are using the maximum
266 : * possible value for safety.
267 : */
268 8 : if (inflateInit2(zs, 15 + 16) != Z_OK)
269 0 : pg_fatal("could not initialize compression library");
270 :
271 8 : return &streamer->base;
272 : #else
273 : pg_fatal("this build does not support compression with %s", "gzip");
274 : return NULL; /* keep compiler quiet */
275 : #endif
276 : }
277 :
278 : #ifdef HAVE_LIBZ
279 : /*
280 : * Decompress the input data to output buffer until we run out of input
281 : * data. Each time the output buffer is full, pass on the decompressed data
282 : * to the next streamer.
283 : */
284 : static void
285 294 : astreamer_gzip_decompressor_content(astreamer *streamer,
286 : astreamer_member *member,
287 : const char *data, int len,
288 : astreamer_archive_context context)
289 : {
290 : astreamer_gzip_decompressor *mystreamer;
291 : z_stream *zs;
292 :
293 294 : mystreamer = (astreamer_gzip_decompressor *) streamer;
294 :
295 294 : zs = &mystreamer->zstream;
296 294 : zs->next_in = (const uint8 *) data;
297 294 : zs->avail_in = len;
298 :
299 : /* Process the current chunk */
300 239288 : while (zs->avail_in > 0)
301 : {
302 : int res;
303 :
304 : Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
305 :
306 238994 : zs->next_out = (uint8 *)
307 238994 : mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
308 238994 : zs->avail_out =
309 238994 : mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
310 :
311 : /*
312 : * This call decompresses data starting at zs->next_in and updates
313 : * zs->next_in * and zs->avail_in. It generates output data starting
314 : * at zs->next_out and updates zs->next_out and zs->avail_out
315 : * accordingly.
316 : */
317 238994 : res = inflate(zs, Z_NO_FLUSH);
318 :
319 238994 : if (res == Z_STREAM_ERROR)
320 0 : pg_log_error("could not decompress data: %s", zs->msg);
321 :
322 238994 : mystreamer->bytes_written =
323 238994 : mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
324 :
325 : /* If output buffer is full then pass data to next streamer */
326 238994 : if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
327 : {
328 238712 : astreamer_content(mystreamer->base.bbs_next, member,
329 238712 : mystreamer->base.bbs_buffer.data,
330 : mystreamer->base.bbs_buffer.maxlen, context);
331 238712 : mystreamer->bytes_written = 0;
332 : }
333 : }
334 294 : }
335 :
336 : /*
337 : * End-of-stream processing.
338 : */
339 : static void
340 8 : astreamer_gzip_decompressor_finalize(astreamer *streamer)
341 : {
342 : astreamer_gzip_decompressor *mystreamer;
343 :
344 8 : mystreamer = (astreamer_gzip_decompressor *) streamer;
345 :
346 : /*
347 : * End of the stream, if there is some pending data in output buffers then
348 : * we must forward it to next streamer.
349 : */
350 8 : astreamer_content(mystreamer->base.bbs_next, NULL,
351 8 : mystreamer->base.bbs_buffer.data,
352 : mystreamer->base.bbs_buffer.maxlen,
353 : ASTREAMER_UNKNOWN);
354 :
355 8 : astreamer_finalize(mystreamer->base.bbs_next);
356 8 : }
357 :
358 : /*
359 : * Free memory.
360 : */
361 : static void
362 8 : astreamer_gzip_decompressor_free(astreamer *streamer)
363 : {
364 8 : astreamer_free(streamer->bbs_next);
365 8 : pfree(streamer->bbs_buffer.data);
366 8 : pfree(streamer);
367 8 : }
368 :
369 : /*
370 : * Wrapper function to adjust the signature of palloc to match what libz
371 : * expects.
372 : */
373 : static void *
374 16 : gzip_palloc(void *opaque, unsigned items, unsigned size)
375 : {
376 16 : return palloc(items * size);
377 : }
378 :
379 : /*
380 : * Wrapper function to adjust the signature of pfree to match what libz
381 : * expects.
382 : */
383 : static void
384 0 : gzip_pfree(void *opaque, void *address)
385 : {
386 0 : pfree(address);
387 0 : }
388 : #endif
|