Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * basebackup_gzip.c
4 : * Basebackup sink implementing gzip compression.
5 : *
6 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/backup/basebackup_gzip.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #ifdef HAVE_LIBZ
16 : #include <zlib.h>
17 : #endif
18 :
19 : #include "backup/basebackup_sink.h"
20 :
21 : #ifdef HAVE_LIBZ
22 : typedef struct bbsink_gzip
23 : {
24 : /* Common information for all types of sink. */
25 : bbsink base;
26 :
27 : /* Compression level. */
28 : int compresslevel;
29 :
30 : /* Compressed data stream. */
31 : z_stream zstream;
32 :
33 : /* Number of bytes staged in output buffer. */
34 : size_t bytes_written;
35 :
36 : /* Has the zstream been initialized? */
37 : bool zstream_initialized;
38 : } bbsink_gzip;
39 :
40 : static void bbsink_gzip_begin_backup(bbsink *sink);
41 : static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
42 : static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
43 : static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
44 : static void bbsink_gzip_end_archive(bbsink *sink);
45 : static void bbsink_gzip_cleanup(bbsink *sink);
46 : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
47 : static void gzip_pfree(void *opaque, void *address);
48 :
49 : static const bbsink_ops bbsink_gzip_ops = {
50 : .begin_backup = bbsink_gzip_begin_backup,
51 : .begin_archive = bbsink_gzip_begin_archive,
52 : .archive_contents = bbsink_gzip_archive_contents,
53 : .end_archive = bbsink_gzip_end_archive,
54 : .begin_manifest = bbsink_forward_begin_manifest,
55 : .manifest_contents = bbsink_gzip_manifest_contents,
56 : .end_manifest = bbsink_forward_end_manifest,
57 : .end_backup = bbsink_forward_end_backup,
58 : .cleanup = bbsink_gzip_cleanup
59 : };
60 : #endif
61 :
62 : /*
63 : * Create a new basebackup sink that performs gzip compression.
64 : */
65 : bbsink *
66 2 : bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
67 : {
68 : #ifndef HAVE_LIBZ
69 : ereport(ERROR,
70 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
71 : errmsg("gzip compression is not supported by this build")));
72 : return NULL; /* keep compiler quiet */
73 : #else
74 : bbsink_gzip *sink;
75 : int compresslevel;
76 :
77 : Assert(next != NULL);
78 :
79 2 : compresslevel = compress->level;
80 : Assert((compresslevel >= 1 && compresslevel <= 9) ||
81 : compresslevel == Z_DEFAULT_COMPRESSION);
82 :
83 2 : sink = palloc0_object(bbsink_gzip);
84 2 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
85 2 : sink->base.bbs_next = next;
86 2 : sink->compresslevel = compresslevel;
87 :
88 2 : return &sink->base;
89 : #endif
90 : }
91 :
92 : #ifdef HAVE_LIBZ
93 :
94 : /*
95 : * Begin backup.
96 : */
97 : static void
98 2 : bbsink_gzip_begin_backup(bbsink *sink)
99 : {
100 : /*
101 : * We need our own buffer, because we're going to pass different data to
102 : * the next sink than what gets passed to us.
103 : */
104 2 : sink->bbs_buffer = palloc(sink->bbs_buffer_length);
105 :
106 : /*
107 : * Since deflate() doesn't require the output buffer to be of any
108 : * particular size, we can just make it the same size as the input buffer.
109 : */
110 2 : bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
111 2 : sink->bbs_buffer_length);
112 2 : }
113 :
114 : /*
115 : * Prepare to compress the next archive.
116 : */
117 : static void
118 3 : bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
119 : {
120 3 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
121 : char *gz_archive_name;
122 3 : z_stream *zs = &mysink->zstream;
123 :
124 : /* Initialize compressor object. */
125 3 : memset(zs, 0, sizeof(z_stream));
126 3 : zs->zalloc = gzip_palloc;
127 3 : zs->zfree = gzip_pfree;
128 3 : zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
129 3 : zs->avail_out = sink->bbs_next->bbs_buffer_length;
130 :
131 : /*
132 : * We need to use deflateInit2() rather than deflateInit() here so that we
133 : * can request a gzip header rather than a zlib header. Otherwise, we want
134 : * to supply the same values that would have been used by default if we
135 : * had just called deflateInit().
136 : *
137 : * Per the documentation for deflateInit2, the third argument must be
138 : * Z_DEFLATED; the fourth argument is the number of "window bits", by
139 : * default 15, but adding 16 gets you a gzip header rather than a zlib
140 : * header; the fifth argument controls memory usage, and 8 is the default;
141 : * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
142 : */
143 3 : if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
144 : Z_DEFAULT_STRATEGY) != Z_OK)
145 0 : ereport(ERROR,
146 : errcode(ERRCODE_INTERNAL_ERROR),
147 : errmsg("could not initialize compression library"));
148 3 : mysink->zstream_initialized = true;
149 :
150 : /*
151 : * Add ".gz" to the archive name. Note that the pg_basebackup -z produces
152 : * archives named ".tar.gz" rather than ".tgz", so we match that here.
153 : */
154 3 : gz_archive_name = psprintf("%s.gz", archive_name);
155 : Assert(sink->bbs_next != NULL);
156 3 : bbsink_begin_archive(sink->bbs_next, gz_archive_name);
157 3 : pfree(gz_archive_name);
158 3 : }
159 :
160 : /*
161 : * Compress the input data to the output buffer until we run out of input
162 : * data. Each time the output buffer fills up, invoke the archive_contents()
163 : * method for then next sink.
164 : *
165 : * Note that since we're compressing the input, it may very commonly happen
166 : * that we consume all the input data without filling the output buffer. In
167 : * that case, the compressed representation of the current input data won't
168 : * actually be sent to the next bbsink until a later call to this function,
169 : * or perhaps even not until bbsink_gzip_end_archive() is invoked.
170 : */
171 : static void
172 5659 : bbsink_gzip_archive_contents(bbsink *sink, size_t len)
173 : {
174 5659 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
175 5659 : z_stream *zs = &mysink->zstream;
176 :
177 : /* Compress data from input buffer. */
178 5659 : zs->next_in = (uint8 *) mysink->base.bbs_buffer;
179 5659 : zs->avail_in = len;
180 :
181 17081 : while (zs->avail_in > 0)
182 : {
183 : int res;
184 :
185 : /* Write output data into unused portion of output buffer. */
186 : Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
187 5763 : zs->next_out = (uint8 *)
188 5763 : mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
189 5763 : zs->avail_out =
190 5763 : mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
191 :
192 : /*
193 : * Try to compress. Note that this will update zs->next_in and
194 : * zs->avail_in according to how much input data was consumed, and
195 : * zs->next_out and zs->avail_out according to how many output bytes
196 : * were produced.
197 : *
198 : * According to the zlib documentation, Z_STREAM_ERROR should only
199 : * occur if we've made a programming error, or if say there's been a
200 : * memory clobber; we use elog() rather than Assert() here out of an
201 : * abundance of caution.
202 : */
203 5763 : res = deflate(zs, Z_NO_FLUSH);
204 5763 : if (res == Z_STREAM_ERROR)
205 0 : elog(ERROR, "could not compress data: %s", zs->msg);
206 :
207 : /* Update our notion of how many bytes we've written. */
208 5763 : mysink->bytes_written =
209 5763 : mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
210 :
211 : /*
212 : * If the output buffer is full, it's time for the next sink to
213 : * process the contents.
214 : */
215 5763 : if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
216 : {
217 206 : bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
218 206 : mysink->bytes_written = 0;
219 : }
220 : }
221 5659 : }
222 :
223 : /*
224 : * There might be some data inside zlib's internal buffers; we need to get
225 : * that flushed out and forwarded to the successor sink as archive content.
226 : *
227 : * Then we can end processing for this archive.
228 : */
229 : static void
230 3 : bbsink_gzip_end_archive(bbsink *sink)
231 : {
232 3 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
233 3 : z_stream *zs = &mysink->zstream;
234 :
235 : /* There is no more data available. */
236 3 : zs->next_in = (uint8 *) mysink->base.bbs_buffer;
237 3 : zs->avail_in = 0;
238 :
239 : while (1)
240 3 : {
241 : int res;
242 :
243 : /* Write output data into unused portion of output buffer. */
244 : Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
245 6 : zs->next_out = (uint8 *)
246 6 : mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
247 6 : zs->avail_out =
248 6 : mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
249 :
250 : /*
251 : * As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
252 : * no more input.
253 : */
254 6 : res = deflate(zs, Z_FINISH);
255 6 : if (res == Z_STREAM_ERROR)
256 0 : elog(ERROR, "could not compress data: %s", zs->msg);
257 :
258 : /* Update our notion of how many bytes we've written. */
259 6 : mysink->bytes_written =
260 6 : mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
261 :
262 : /*
263 : * Apparently we had no data in the output buffer and deflate() was
264 : * not able to add any. We must be done.
265 : */
266 6 : if (mysink->bytes_written == 0)
267 3 : break;
268 :
269 : /* Send whatever accumulated output bytes we have. */
270 3 : bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
271 3 : mysink->bytes_written = 0;
272 : }
273 :
274 : /* Release the compression resources. */
275 3 : deflateEnd(zs);
276 3 : mysink->zstream_initialized = false;
277 :
278 : /* Must also pass on the information that this archive has ended. */
279 3 : bbsink_forward_end_archive(sink);
280 3 : }
281 :
282 : /*
283 : * Manifest contents are not compressed, but we do need to copy them into
284 : * the successor sink's buffer, because we have our own.
285 : */
286 : static void
287 10 : bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
288 : {
289 10 : memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
290 10 : bbsink_manifest_contents(sink->bbs_next, len);
291 10 : }
292 :
293 : /*
294 : * Wrapper function to adjust the signature of palloc to match what libz
295 : * expects.
296 : */
297 : static void *
298 15 : gzip_palloc(void *opaque, unsigned items, unsigned size)
299 : {
300 15 : return palloc(items * size);
301 : }
302 :
303 : /*
304 : * Wrapper function to adjust the signature of pfree to match what libz
305 : * expects.
306 : */
307 : static void
308 15 : gzip_pfree(void *opaque, void *address)
309 : {
310 15 : pfree(address);
311 15 : }
312 :
313 : /*
314 : * In case the backup fails, make sure we free the compression context by
315 : * calling deflateEnd() if needed to avoid a resource leak.
316 : */
317 : static void
318 2 : bbsink_gzip_cleanup(bbsink *sink)
319 : {
320 2 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
321 :
322 2 : if (mysink->zstream_initialized)
323 : {
324 0 : deflateEnd(&mysink->zstream);
325 0 : mysink->zstream_initialized = false;
326 : }
327 2 : }
328 :
329 : #endif
|