Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * bbstreamer_zstd.c
4 : *
5 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/bin/pg_basebackup/bbstreamer_zstd.c
9 : *-------------------------------------------------------------------------
10 : */
11 :
12 : #include "postgres_fe.h"
13 :
14 : #include <unistd.h>
15 :
16 : #ifdef USE_ZSTD
17 : #include <zstd.h>
18 : #endif
19 :
20 : #include "bbstreamer.h"
21 : #include "common/logging.h"
22 :
23 : #ifdef USE_ZSTD
24 :
25 : typedef struct bbstreamer_zstd_frame
26 : {
27 : bbstreamer base;
28 :
29 : ZSTD_CCtx *cctx;
30 : ZSTD_DCtx *dctx;
31 : ZSTD_outBuffer zstd_outBuf;
32 : } bbstreamer_zstd_frame;
33 :
34 : static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
35 : bbstreamer_member *member,
36 : const char *data, int len,
37 : bbstreamer_archive_context context);
38 : static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
39 : static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
40 :
41 : const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
42 : .content = bbstreamer_zstd_compressor_content,
43 : .finalize = bbstreamer_zstd_compressor_finalize,
44 : .free = bbstreamer_zstd_compressor_free
45 : };
46 :
47 : static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
48 : bbstreamer_member *member,
49 : const char *data, int len,
50 : bbstreamer_archive_context context);
51 : static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
52 : static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
53 :
54 : const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
55 : .content = bbstreamer_zstd_decompressor_content,
56 : .finalize = bbstreamer_zstd_decompressor_finalize,
57 : .free = bbstreamer_zstd_decompressor_free
58 : };
59 : #endif
60 :
61 : /*
62 : * Create a new base backup streamer that performs zstd compression of tar
63 : * blocks.
64 : */
65 : bbstreamer *
66 0 : bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
67 : {
68 : #ifdef USE_ZSTD
69 : bbstreamer_zstd_frame *streamer;
70 : size_t ret;
71 :
72 : Assert(next != NULL);
73 :
74 : streamer = palloc0(sizeof(bbstreamer_zstd_frame));
75 :
76 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
77 : &bbstreamer_zstd_compressor_ops;
78 :
79 : streamer->base.bbs_next = next;
80 : initStringInfo(&streamer->base.bbs_buffer);
81 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
82 :
83 : streamer->cctx = ZSTD_createCCtx();
84 : if (!streamer->cctx)
85 : pg_fatal("could not create zstd compression context");
86 :
87 : /* Set compression level */
88 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
89 : compress->level);
90 : if (ZSTD_isError(ret))
91 : pg_fatal("could not set zstd compression level to %d: %s",
92 : compress->level, ZSTD_getErrorName(ret));
93 :
94 : /* Set # of workers, if specified */
95 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
96 : {
97 : /*
98 : * On older versions of libzstd, this option does not exist, and
99 : * trying to set it will fail. Similarly for newer versions if they
100 : * are compiled without threading support.
101 : */
102 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
103 : compress->workers);
104 : if (ZSTD_isError(ret))
105 : pg_fatal("could not set compression worker count to %d: %s",
106 : compress->workers, ZSTD_getErrorName(ret));
107 : }
108 :
109 : if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
110 : {
111 : ret = ZSTD_CCtx_setParameter(streamer->cctx,
112 : ZSTD_c_enableLongDistanceMatching,
113 : compress->long_distance);
114 : if (ZSTD_isError(ret))
115 : {
116 : pg_log_error("could not enable long-distance mode: %s",
117 : ZSTD_getErrorName(ret));
118 : exit(1);
119 : }
120 : }
121 :
122 : /* Initialize the ZSTD output buffer. */
123 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
124 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
125 : streamer->zstd_outBuf.pos = 0;
126 :
127 : return &streamer->base;
128 : #else
129 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
130 : return NULL; /* keep compiler quiet */
131 : #endif
132 : }
133 :
134 : #ifdef USE_ZSTD
135 : /*
136 : * Compress the input data to output buffer.
137 : *
138 : * Find out the compression bound based on input data length for each
139 : * invocation to make sure that output buffer has enough capacity to
140 : * accommodate the compressed data. In case if the output buffer
141 : * capacity falls short of compression bound then forward the content
142 : * of output buffer to next streamer and empty the buffer.
143 : */
144 : static void
145 : bbstreamer_zstd_compressor_content(bbstreamer *streamer,
146 : bbstreamer_member *member,
147 : const char *data, int len,
148 : bbstreamer_archive_context context)
149 : {
150 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
151 : ZSTD_inBuffer inBuf = {data, len, 0};
152 :
153 : while (inBuf.pos < inBuf.size)
154 : {
155 : size_t yet_to_flush;
156 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
157 :
158 : /*
159 : * If the output buffer is not left with enough space, send the
160 : * compressed bytes to the next streamer, and empty the buffer.
161 : */
162 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
163 : max_needed)
164 : {
165 : bbstreamer_content(mystreamer->base.bbs_next, member,
166 : mystreamer->zstd_outBuf.dst,
167 : mystreamer->zstd_outBuf.pos,
168 : context);
169 :
170 : /* Reset the ZSTD output buffer. */
171 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
172 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
173 : mystreamer->zstd_outBuf.pos = 0;
174 : }
175 :
176 : yet_to_flush =
177 : ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
178 : &inBuf, ZSTD_e_continue);
179 :
180 : if (ZSTD_isError(yet_to_flush))
181 : pg_log_error("could not compress data: %s",
182 : ZSTD_getErrorName(yet_to_flush));
183 : }
184 : }
185 :
186 : /*
187 : * End-of-stream processing.
188 : */
189 : static void
190 : bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
191 : {
192 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
193 : size_t yet_to_flush;
194 :
195 : do
196 : {
197 : ZSTD_inBuffer in = {NULL, 0, 0};
198 : size_t max_needed = ZSTD_compressBound(0);
199 :
200 : /*
201 : * If the output buffer is not left with enough space, send the
202 : * compressed bytes to the next streamer, and empty the buffer.
203 : */
204 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
205 : max_needed)
206 : {
207 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
208 : mystreamer->zstd_outBuf.dst,
209 : mystreamer->zstd_outBuf.pos,
210 : BBSTREAMER_UNKNOWN);
211 :
212 : /* Reset the ZSTD output buffer. */
213 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
214 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
215 : mystreamer->zstd_outBuf.pos = 0;
216 : }
217 :
218 : yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
219 : &mystreamer->zstd_outBuf,
220 : &in, ZSTD_e_end);
221 :
222 : if (ZSTD_isError(yet_to_flush))
223 : pg_log_error("could not compress data: %s",
224 : ZSTD_getErrorName(yet_to_flush));
225 :
226 : } while (yet_to_flush > 0);
227 :
228 : /* Make sure to pass any remaining bytes to the next streamer. */
229 : if (mystreamer->zstd_outBuf.pos > 0)
230 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
231 : mystreamer->zstd_outBuf.dst,
232 : mystreamer->zstd_outBuf.pos,
233 : BBSTREAMER_UNKNOWN);
234 :
235 : bbstreamer_finalize(mystreamer->base.bbs_next);
236 : }
237 :
238 : /*
239 : * Free memory.
240 : */
241 : static void
242 : bbstreamer_zstd_compressor_free(bbstreamer *streamer)
243 : {
244 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
245 :
246 : bbstreamer_free(streamer->bbs_next);
247 : ZSTD_freeCCtx(mystreamer->cctx);
248 : pfree(streamer->bbs_buffer.data);
249 : pfree(streamer);
250 : }
251 : #endif
252 :
253 : /*
254 : * Create a new base backup streamer that performs decompression of zstd
255 : * compressed blocks.
256 : */
257 : bbstreamer *
258 0 : bbstreamer_zstd_decompressor_new(bbstreamer *next)
259 : {
260 : #ifdef USE_ZSTD
261 : bbstreamer_zstd_frame *streamer;
262 :
263 : Assert(next != NULL);
264 :
265 : streamer = palloc0(sizeof(bbstreamer_zstd_frame));
266 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
267 : &bbstreamer_zstd_decompressor_ops;
268 :
269 : streamer->base.bbs_next = next;
270 : initStringInfo(&streamer->base.bbs_buffer);
271 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
272 :
273 : streamer->dctx = ZSTD_createDCtx();
274 : if (!streamer->dctx)
275 : pg_fatal("could not create zstd decompression context");
276 :
277 : /* Initialize the ZSTD output buffer. */
278 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
279 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
280 : streamer->zstd_outBuf.pos = 0;
281 :
282 : return &streamer->base;
283 : #else
284 0 : pg_fatal("this build does not support compression with %s", "ZSTD");
285 : return NULL; /* keep compiler quiet */
286 : #endif
287 : }
288 :
289 : #ifdef USE_ZSTD
290 : /*
291 : * Decompress the input data to output buffer until we run out of input
292 : * data. Each time the output buffer is full, pass on the decompressed data
293 : * to the next streamer.
294 : */
295 : static void
296 : bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
297 : bbstreamer_member *member,
298 : const char *data, int len,
299 : bbstreamer_archive_context context)
300 : {
301 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
302 : ZSTD_inBuffer inBuf = {data, len, 0};
303 :
304 : while (inBuf.pos < inBuf.size)
305 : {
306 : size_t ret;
307 :
308 : /*
309 : * If output buffer is full then forward the content to next streamer
310 : * and update the output buffer.
311 : */
312 : if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
313 : {
314 : bbstreamer_content(mystreamer->base.bbs_next, member,
315 : mystreamer->zstd_outBuf.dst,
316 : mystreamer->zstd_outBuf.pos,
317 : context);
318 :
319 : /* Reset the ZSTD output buffer. */
320 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
321 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
322 : mystreamer->zstd_outBuf.pos = 0;
323 : }
324 :
325 : ret = ZSTD_decompressStream(mystreamer->dctx,
326 : &mystreamer->zstd_outBuf, &inBuf);
327 :
328 : if (ZSTD_isError(ret))
329 : pg_log_error("could not decompress data: %s",
330 : ZSTD_getErrorName(ret));
331 : }
332 : }
333 :
334 : /*
335 : * End-of-stream processing.
336 : */
337 : static void
338 : bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
339 : {
340 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
341 :
342 : /*
343 : * End of the stream, if there is some pending data in output buffers then
344 : * we must forward it to next streamer.
345 : */
346 : if (mystreamer->zstd_outBuf.pos > 0)
347 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
348 : mystreamer->base.bbs_buffer.data,
349 : mystreamer->base.bbs_buffer.maxlen,
350 : BBSTREAMER_UNKNOWN);
351 :
352 : bbstreamer_finalize(mystreamer->base.bbs_next);
353 : }
354 :
355 : /*
356 : * Free memory.
357 : */
358 : static void
359 : bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
360 : {
361 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
362 :
363 : bbstreamer_free(streamer->bbs_next);
364 : ZSTD_freeDCtx(mystreamer->dctx);
365 : pfree(streamer->bbs_buffer.data);
366 : pfree(streamer);
367 : }
368 : #endif
|