Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * basebackup_gzip.c
4 : * Basebackup sink implementing gzip compression.
5 : *
6 : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/backup/basebackup_gzip.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #ifdef HAVE_LIBZ
16 : #include <zlib.h>
17 : #endif
18 :
19 : #include "backup/basebackup_sink.h"
20 :
21 : #ifdef HAVE_LIBZ
22 : typedef struct bbsink_gzip
23 : {
24 : /* Common information for all types of sink. */
25 : bbsink base;
26 :
27 : /* Compression level. */
28 : int compresslevel;
29 :
30 : /* Compressed data stream. */
31 : z_stream zstream;
32 :
33 : /* Number of bytes staged in output buffer. */
34 : size_t bytes_written;
35 : } bbsink_gzip;
36 :
37 : static void bbsink_gzip_begin_backup(bbsink *sink);
38 : static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
39 : static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
40 : static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
41 : static void bbsink_gzip_end_archive(bbsink *sink);
42 : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
43 : static void gzip_pfree(void *opaque, void *address);
44 :
45 : static const bbsink_ops bbsink_gzip_ops = {
46 : .begin_backup = bbsink_gzip_begin_backup,
47 : .begin_archive = bbsink_gzip_begin_archive,
48 : .archive_contents = bbsink_gzip_archive_contents,
49 : .end_archive = bbsink_gzip_end_archive,
50 : .begin_manifest = bbsink_forward_begin_manifest,
51 : .manifest_contents = bbsink_gzip_manifest_contents,
52 : .end_manifest = bbsink_forward_end_manifest,
53 : .end_backup = bbsink_forward_end_backup,
54 : .cleanup = bbsink_forward_cleanup
55 : };
56 : #endif
57 :
58 : /*
59 : * Create a new basebackup sink that performs gzip compression.
60 : */
61 : bbsink *
62 4 : bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
63 : {
64 : #ifndef HAVE_LIBZ
65 : ereport(ERROR,
66 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
67 : errmsg("gzip compression is not supported by this build")));
68 : return NULL; /* keep compiler quiet */
69 : #else
70 : bbsink_gzip *sink;
71 : int compresslevel;
72 :
73 : Assert(next != NULL);
74 :
75 4 : compresslevel = compress->level;
76 : Assert((compresslevel >= 1 && compresslevel <= 9) ||
77 : compresslevel == Z_DEFAULT_COMPRESSION);
78 :
79 4 : sink = palloc0(sizeof(bbsink_gzip));
80 4 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
81 4 : sink->base.bbs_next = next;
82 4 : sink->compresslevel = compresslevel;
83 :
84 4 : return &sink->base;
85 : #endif
86 : }
87 :
88 : #ifdef HAVE_LIBZ
89 :
90 : /*
91 : * Begin backup.
92 : */
93 : static void
94 4 : bbsink_gzip_begin_backup(bbsink *sink)
95 : {
96 : /*
97 : * We need our own buffer, because we're going to pass different data to
98 : * the next sink than what gets passed to us.
99 : */
100 4 : sink->bbs_buffer = palloc(sink->bbs_buffer_length);
101 :
102 : /*
103 : * Since deflate() doesn't require the output buffer to be of any
104 : * particular size, we can just make it the same size as the input buffer.
105 : */
106 4 : bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
107 4 : sink->bbs_buffer_length);
108 4 : }
109 :
110 : /*
111 : * Prepare to compress the next archive.
112 : */
113 : static void
114 6 : bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
115 : {
116 6 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
117 : char *gz_archive_name;
118 6 : z_stream *zs = &mysink->zstream;
119 :
120 : /* Initialize compressor object. */
121 6 : memset(zs, 0, sizeof(z_stream));
122 6 : zs->zalloc = gzip_palloc;
123 6 : zs->zfree = gzip_pfree;
124 6 : zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
125 6 : zs->avail_out = sink->bbs_next->bbs_buffer_length;
126 :
127 : /*
128 : * We need to use deflateInit2() rather than deflateInit() here so that we
129 : * can request a gzip header rather than a zlib header. Otherwise, we want
130 : * to supply the same values that would have been used by default if we
131 : * had just called deflateInit().
132 : *
133 : * Per the documentation for deflateInit2, the third argument must be
134 : * Z_DEFLATED; the fourth argument is the number of "window bits", by
135 : * default 15, but adding 16 gets you a gzip header rather than a zlib
136 : * header; the fifth argument controls memory usage, and 8 is the default;
137 : * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
138 : */
139 6 : if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
140 : Z_DEFAULT_STRATEGY) != Z_OK)
141 0 : ereport(ERROR,
142 : errcode(ERRCODE_INTERNAL_ERROR),
143 : errmsg("could not initialize compression library"));
144 :
145 : /*
146 : * Add ".gz" to the archive name. Note that the pg_basebackup -z produces
147 : * archives named ".tar.gz" rather than ".tgz", so we match that here.
148 : */
149 6 : gz_archive_name = psprintf("%s.gz", archive_name);
150 : Assert(sink->bbs_next != NULL);
151 6 : bbsink_begin_archive(sink->bbs_next, gz_archive_name);
152 6 : pfree(gz_archive_name);
153 6 : }
154 :
155 : /*
156 : * Compress the input data to the output buffer until we run out of input
157 : * data. Each time the output buffer fills up, invoke the archive_contents()
158 : * method for then next sink.
159 : *
160 : * Note that since we're compressing the input, it may very commonly happen
161 : * that we consume all the input data without filling the output buffer. In
162 : * that case, the compressed representation of the current input data won't
163 : * actually be sent to the next bbsink until a later call to this function,
164 : * or perhaps even not until bbsink_gzip_end_archive() is invoked.
165 : */
166 : static void
167 10828 : bbsink_gzip_archive_contents(bbsink *sink, size_t len)
168 : {
169 10828 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
170 10828 : z_stream *zs = &mysink->zstream;
171 :
172 : /* Compress data from input buffer. */
173 10828 : zs->next_in = (uint8 *) mysink->base.bbs_buffer;
174 10828 : zs->avail_in = len;
175 :
176 21852 : while (zs->avail_in > 0)
177 : {
178 : int res;
179 :
180 : /* Write output data into unused portion of output buffer. */
181 : Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
182 11024 : zs->next_out = (uint8 *)
183 11024 : mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
184 11024 : zs->avail_out =
185 11024 : mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
186 :
187 : /*
188 : * Try to compress. Note that this will update zs->next_in and
189 : * zs->avail_in according to how much input data was consumed, and
190 : * zs->next_out and zs->avail_out according to how many output bytes
191 : * were produced.
192 : *
193 : * According to the zlib documentation, Z_STREAM_ERROR should only
194 : * occur if we've made a programming error, or if say there's been a
195 : * memory clobber; we use elog() rather than Assert() here out of an
196 : * abundance of caution.
197 : */
198 11024 : res = deflate(zs, Z_NO_FLUSH);
199 11024 : if (res == Z_STREAM_ERROR)
200 0 : elog(ERROR, "could not compress data: %s", zs->msg);
201 :
202 : /* Update our notion of how many bytes we've written. */
203 11024 : mysink->bytes_written =
204 11024 : mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
205 :
206 : /*
207 : * If the output buffer is full, it's time for the next sink to
208 : * process the contents.
209 : */
210 11024 : if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
211 : {
212 382 : bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
213 382 : mysink->bytes_written = 0;
214 : }
215 : }
216 10828 : }
217 :
218 : /*
219 : * There might be some data inside zlib's internal buffers; we need to get
220 : * that flushed out and forwarded to the successor sink as archive content.
221 : *
222 : * Then we can end processing for this archive.
223 : */
224 : static void
225 6 : bbsink_gzip_end_archive(bbsink *sink)
226 : {
227 6 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
228 6 : z_stream *zs = &mysink->zstream;
229 :
230 : /* There is no more data available. */
231 6 : zs->next_in = (uint8 *) mysink->base.bbs_buffer;
232 6 : zs->avail_in = 0;
233 :
234 : while (1)
235 6 : {
236 : int res;
237 :
238 : /* Write output data into unused portion of output buffer. */
239 : Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
240 12 : zs->next_out = (uint8 *)
241 12 : mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
242 12 : zs->avail_out =
243 12 : mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
244 :
245 : /*
246 : * As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
247 : * no more input.
248 : */
249 12 : res = deflate(zs, Z_FINISH);
250 12 : if (res == Z_STREAM_ERROR)
251 0 : elog(ERROR, "could not compress data: %s", zs->msg);
252 :
253 : /* Update our notion of how many bytes we've written. */
254 12 : mysink->bytes_written =
255 12 : mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
256 :
257 : /*
258 : * Apparently we had no data in the output buffer and deflate() was
259 : * not able to add any. We must be done.
260 : */
261 12 : if (mysink->bytes_written == 0)
262 6 : break;
263 :
264 : /* Send whatever accumulated output bytes we have. */
265 6 : bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
266 6 : mysink->bytes_written = 0;
267 : }
268 :
269 : /* Must also pass on the information that this archive has ended. */
270 6 : bbsink_forward_end_archive(sink);
271 6 : }
272 :
273 : /*
274 : * Manifest contents are not compressed, but we do need to copy them into
275 : * the successor sink's buffer, because we have our own.
276 : */
277 : static void
278 20 : bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
279 : {
280 20 : memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
281 20 : bbsink_manifest_contents(sink->bbs_next, len);
282 20 : }
283 :
284 : /*
285 : * Wrapper function to adjust the signature of palloc to match what libz
286 : * expects.
287 : */
288 : static void *
289 30 : gzip_palloc(void *opaque, unsigned items, unsigned size)
290 : {
291 30 : return palloc(items * size);
292 : }
293 :
294 : /*
295 : * Wrapper function to adjust the signature of pfree to match what libz
296 : * expects.
297 : */
298 : static void
299 0 : gzip_pfree(void *opaque, void *address)
300 : {
301 0 : pfree(address);
302 0 : }
303 :
304 : #endif
|