Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * basebackup_zstd.c
4 : * Basebackup sink implementing zstd compression.
5 : *
6 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/backup/basebackup_zstd.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #ifdef USE_ZSTD
16 : #include <zstd.h>
17 : #endif
18 :
19 : #include "backup/basebackup_sink.h"
20 :
21 : #ifdef USE_ZSTD
22 :
23 : typedef struct bbsink_zstd
24 : {
25 : /* Common information for all types of sink. */
26 : bbsink base;
27 :
28 : /* Compression options */
29 : pg_compress_specification *compress;
30 :
31 : ZSTD_CCtx *cctx;
32 : ZSTD_outBuffer zstd_outBuf;
33 : } bbsink_zstd;
34 :
35 : static void bbsink_zstd_begin_backup(bbsink *sink);
36 : static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
37 : static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
38 : static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
39 : static void bbsink_zstd_end_archive(bbsink *sink);
40 : static void bbsink_zstd_cleanup(bbsink *sink);
41 : static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
42 : TimeLineID endtli);
43 :
44 : static const bbsink_ops bbsink_zstd_ops = {
45 : .begin_backup = bbsink_zstd_begin_backup,
46 : .begin_archive = bbsink_zstd_begin_archive,
47 : .archive_contents = bbsink_zstd_archive_contents,
48 : .end_archive = bbsink_zstd_end_archive,
49 : .begin_manifest = bbsink_forward_begin_manifest,
50 : .manifest_contents = bbsink_zstd_manifest_contents,
51 : .end_manifest = bbsink_forward_end_manifest,
52 : .end_backup = bbsink_zstd_end_backup,
53 : .cleanup = bbsink_zstd_cleanup
54 : };
55 : #endif
56 :
57 : /*
58 : * Create a new basebackup sink that performs zstd compression.
59 : */
60 : bbsink *
61 0 : bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
62 : {
63 : #ifndef USE_ZSTD
64 0 : ereport(ERROR,
65 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
66 : errmsg("zstd compression is not supported by this build")));
67 : return NULL; /* keep compiler quiet */
68 : #else
69 : bbsink_zstd *sink;
70 :
71 : Assert(next != NULL);
72 :
73 : sink = palloc0(sizeof(bbsink_zstd));
74 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
75 : sink->base.bbs_next = next;
76 : sink->compress = compress;
77 :
78 : return &sink->base;
79 : #endif
80 : }
81 :
82 : #ifdef USE_ZSTD
83 :
84 : /*
85 : * Begin backup.
86 : */
87 : static void
88 : bbsink_zstd_begin_backup(bbsink *sink)
89 : {
90 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
91 : size_t output_buffer_bound;
92 : size_t ret;
93 : pg_compress_specification *compress = mysink->compress;
94 :
95 : mysink->cctx = ZSTD_createCCtx();
96 : if (!mysink->cctx)
97 : elog(ERROR, "could not create zstd compression context");
98 :
99 : ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
100 : compress->level);
101 : if (ZSTD_isError(ret))
102 : elog(ERROR, "could not set zstd compression level to %d: %s",
103 : compress->level, ZSTD_getErrorName(ret));
104 :
105 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
106 : {
107 : /*
108 : * On older versions of libzstd, this option does not exist, and
109 : * trying to set it will fail. Similarly for newer versions if they
110 : * are compiled without threading support.
111 : */
112 : ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
113 : compress->workers);
114 : if (ZSTD_isError(ret))
115 : ereport(ERROR,
116 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 : errmsg("could not set compression worker count to %d: %s",
118 : compress->workers, ZSTD_getErrorName(ret)));
119 : }
120 :
121 : /*
122 : * We need our own buffer, because we're going to pass different data to
123 : * the next sink than what gets passed to us.
124 : */
125 : mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
126 :
127 : /*
128 : * Make sure that the next sink's bbs_buffer is big enough to accommodate
129 : * the compressed input buffer.
130 : */
131 : output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
132 :
133 : /*
134 : * The buffer length is expected to be a multiple of BLCKSZ, so round up.
135 : */
136 : output_buffer_bound = output_buffer_bound + BLCKSZ -
137 : (output_buffer_bound % BLCKSZ);
138 :
139 : bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
140 : }
141 :
142 : /*
143 : * Prepare to compress the next archive.
144 : */
145 : static void
146 : bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
147 : {
148 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
149 : char *zstd_archive_name;
150 :
151 : /*
152 : * At the start of each archive we reset the state to start a new
153 : * compression operation. The parameters are sticky and they will stick
154 : * around as we are resetting with option ZSTD_reset_session_only.
155 : */
156 : ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
157 :
158 : mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
159 : mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
160 : mysink->zstd_outBuf.pos = 0;
161 :
162 : /* Add ".zst" to the archive name. */
163 : zstd_archive_name = psprintf("%s.zst", archive_name);
164 : Assert(sink->bbs_next != NULL);
165 : bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
166 : pfree(zstd_archive_name);
167 : }
168 :
169 : /*
170 : * Compress the input data to the output buffer until we run out of input
171 : * data. Each time the output buffer falls below the compression bound for
172 : * the input buffer, invoke the archive_contents() method for the next sink.
173 : *
174 : * Note that since we're compressing the input, it may very commonly happen
175 : * that we consume all the input data without filling the output buffer. In
176 : * that case, the compressed representation of the current input data won't
177 : * actually be sent to the next bbsink until a later call to this function,
178 : * or perhaps even not until bbsink_zstd_end_archive() is invoked.
179 : */
180 : static void
181 : bbsink_zstd_archive_contents(bbsink *sink, size_t len)
182 : {
183 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
184 : ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
185 :
186 : while (inBuf.pos < inBuf.size)
187 : {
188 : size_t yet_to_flush;
189 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
190 :
191 : /*
192 : * If the out buffer is not left with enough space, send the output
193 : * buffer to the next sink, and reset it.
194 : */
195 : if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
196 : {
197 : bbsink_archive_contents(mysink->base.bbs_next,
198 : mysink->zstd_outBuf.pos);
199 : mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
200 : mysink->zstd_outBuf.size =
201 : mysink->base.bbs_next->bbs_buffer_length;
202 : mysink->zstd_outBuf.pos = 0;
203 : }
204 :
205 : yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
206 : &inBuf, ZSTD_e_continue);
207 :
208 : if (ZSTD_isError(yet_to_flush))
209 : elog(ERROR,
210 : "could not compress data: %s",
211 : ZSTD_getErrorName(yet_to_flush));
212 : }
213 : }
214 :
215 : /*
216 : * There might be some data inside zstd's internal buffers; we need to get that
217 : * flushed out, also end the zstd frame and then get that forwarded to the
218 : * successor sink as archive content.
219 : *
220 : * Then we can end processing for this archive.
221 : */
222 : static void
223 : bbsink_zstd_end_archive(bbsink *sink)
224 : {
225 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
226 : size_t yet_to_flush;
227 :
228 : do
229 : {
230 : ZSTD_inBuffer in = {NULL, 0, 0};
231 : size_t max_needed = ZSTD_compressBound(0);
232 :
233 : /*
234 : * If the out buffer is not left with enough space, send the output
235 : * buffer to the next sink, and reset it.
236 : */
237 : if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
238 : {
239 : bbsink_archive_contents(mysink->base.bbs_next,
240 : mysink->zstd_outBuf.pos);
241 : mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
242 : mysink->zstd_outBuf.size =
243 : mysink->base.bbs_next->bbs_buffer_length;
244 : mysink->zstd_outBuf.pos = 0;
245 : }
246 :
247 : yet_to_flush = ZSTD_compressStream2(mysink->cctx,
248 : &mysink->zstd_outBuf,
249 : &in, ZSTD_e_end);
250 :
251 : if (ZSTD_isError(yet_to_flush))
252 : elog(ERROR, "could not compress data: %s",
253 : ZSTD_getErrorName(yet_to_flush));
254 :
255 : } while (yet_to_flush > 0);
256 :
257 : /* Make sure to pass any remaining bytes to the next sink. */
258 : if (mysink->zstd_outBuf.pos > 0)
259 : bbsink_archive_contents(mysink->base.bbs_next,
260 : mysink->zstd_outBuf.pos);
261 :
262 : /* Pass on the information that this archive has ended. */
263 : bbsink_forward_end_archive(sink);
264 : }
265 :
266 : /*
267 : * Free the resources and context.
268 : */
269 : static void
270 : bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
271 : TimeLineID endtli)
272 : {
273 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
274 :
275 : /* Release the context. */
276 : if (mysink->cctx)
277 : {
278 : ZSTD_freeCCtx(mysink->cctx);
279 : mysink->cctx = NULL;
280 : }
281 :
282 : bbsink_forward_end_backup(sink, endptr, endtli);
283 : }
284 :
285 : /*
286 : * Manifest contents are not compressed, but we do need to copy them into
287 : * the successor sink's buffer, because we have our own.
288 : */
289 : static void
290 : bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
291 : {
292 : memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
293 : bbsink_manifest_contents(sink->bbs_next, len);
294 : }
295 :
296 : /*
297 : * In case the backup fails, make sure we free any compression context that
298 : * got allocated, so that we don't leak memory.
299 : */
300 : static void
301 : bbsink_zstd_cleanup(bbsink *sink)
302 : {
303 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
304 :
305 : /* Release the context if not already released. */
306 : if (mysink->cctx)
307 : {
308 : ZSTD_freeCCtx(mysink->cctx);
309 : mysink->cctx = NULL;
310 : }
311 : }
312 :
313 : #endif
|