Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * basic_archive.c
4 : *
5 : * This file demonstrates a basic archive library implementation that is
6 : * roughly equivalent to the following shell command:
7 : *
8 : * test ! -f /path/to/dest && cp /path/to/src /path/to/dest
9 : *
10 : * One notable difference between this module and the shell command above
11 : * is that this module first copies the file to a temporary destination,
12 : * syncs it to disk, and then durably moves it to the final destination.
13 : *
14 : * Another notable difference is that if /path/to/dest already exists
15 : * but has contents identical to /path/to/src, archiving will succeed,
16 : * whereas the command shown above would fail. This prevents problems if
17 : * a file is successfully archived and then the system crashes before
18 : * a durable record of the success has been made.
19 : *
20 : * Copyright (c) 2022-2023, PostgreSQL Global Development Group
21 : *
22 : * IDENTIFICATION
23 : * contrib/basic_archive/basic_archive.c
24 : *
25 : *-------------------------------------------------------------------------
26 : */
27 : #include "postgres.h"
28 :
29 : #include <sys/stat.h>
30 : #include <sys/time.h>
31 : #include <unistd.h>
32 :
33 : #include "archive/archive_module.h"
34 : #include "common/int.h"
35 : #include "miscadmin.h"
36 : #include "storage/copydir.h"
37 : #include "storage/fd.h"
38 : #include "utils/guc.h"
39 : #include "utils/memutils.h"
40 :
41 2 : PG_MODULE_MAGIC;
42 :
43 : typedef struct BasicArchiveData
44 : {
45 : MemoryContext context;
46 : } BasicArchiveData;
47 :
48 : static char *archive_directory = NULL;
49 :
50 : static void basic_archive_startup(ArchiveModuleState *state);
51 : static bool basic_archive_configured(ArchiveModuleState *state);
52 : static bool basic_archive_file(ArchiveModuleState *state, const char *file, const char *path);
53 : static void basic_archive_file_internal(const char *file, const char *path);
54 : static bool check_archive_directory(char **newval, void **extra, GucSource source);
55 : static bool compare_files(const char *file1, const char *file2);
56 : static void basic_archive_shutdown(ArchiveModuleState *state);
57 :
58 : static const ArchiveModuleCallbacks basic_archive_callbacks = {
59 : .startup_cb = basic_archive_startup,
60 : .check_configured_cb = basic_archive_configured,
61 : .archive_file_cb = basic_archive_file,
62 : .shutdown_cb = basic_archive_shutdown
63 : };
64 :
65 : /*
66 : * _PG_init
67 : *
68 : * Defines the module's GUC.
69 : */
70 : void
71 2 : _PG_init(void)
72 : {
73 2 : DefineCustomStringVariable("basic_archive.archive_directory",
74 : gettext_noop("Archive file destination directory."),
75 : NULL,
76 : &archive_directory,
77 : "",
78 : PGC_SIGHUP,
79 : 0,
80 : check_archive_directory, NULL, NULL);
81 :
82 2 : MarkGUCPrefixReserved("basic_archive");
83 2 : }
84 :
85 : /*
86 : * _PG_archive_module_init
87 : *
88 : * Returns the module's archiving callbacks.
89 : */
90 : const ArchiveModuleCallbacks *
91 2 : _PG_archive_module_init(void)
92 : {
93 2 : return &basic_archive_callbacks;
94 : }
95 :
96 : /*
97 : * basic_archive_startup
98 : *
99 : * Creates the module's memory context.
100 : */
101 : void
102 2 : basic_archive_startup(ArchiveModuleState *state)
103 : {
104 : BasicArchiveData *data;
105 :
106 2 : data = (BasicArchiveData *) MemoryContextAllocZero(TopMemoryContext,
107 : sizeof(BasicArchiveData));
108 2 : data->context = AllocSetContextCreate(TopMemoryContext,
109 : "basic_archive",
110 : ALLOCSET_DEFAULT_SIZES);
111 2 : state->private_data = (void *) data;
112 2 : }
113 :
114 : /*
115 : * check_archive_directory
116 : *
117 : * Checks that the provided archive directory exists.
118 : */
119 : static bool
120 4 : check_archive_directory(char **newval, void **extra, GucSource source)
121 : {
122 : struct stat st;
123 :
124 : /*
125 : * The default value is an empty string, so we have to accept that value.
126 : * Our check_configured callback also checks for this and prevents
127 : * archiving from proceeding if it is still empty.
128 : */
129 4 : if (*newval == NULL || *newval[0] == '\0')
130 2 : return true;
131 :
132 : /*
133 : * Make sure the file paths won't be too long. The docs indicate that the
134 : * file names to be archived can be up to 64 characters long.
135 : */
136 2 : if (strlen(*newval) + 64 + 2 >= MAXPGPATH)
137 : {
138 0 : GUC_check_errdetail("Archive directory too long.");
139 0 : return false;
140 : }
141 :
142 : /*
143 : * Do a basic sanity check that the specified archive directory exists. It
144 : * could be removed at some point in the future, so we still need to be
145 : * prepared for it not to exist in the actual archiving logic.
146 : */
147 2 : if (stat(*newval, &st) != 0 || !S_ISDIR(st.st_mode))
148 : {
149 0 : GUC_check_errdetail("Specified archive directory does not exist.");
150 0 : return false;
151 : }
152 :
153 2 : return true;
154 : }
155 :
156 : /*
157 : * basic_archive_configured
158 : *
159 : * Checks that archive_directory is not blank.
160 : */
161 : static bool
162 4 : basic_archive_configured(ArchiveModuleState *state)
163 : {
164 4 : return archive_directory != NULL && archive_directory[0] != '\0';
165 : }
166 :
167 : /*
168 : * basic_archive_file
169 : *
170 : * Archives one file.
171 : */
172 : static bool
173 4 : basic_archive_file(ArchiveModuleState *state, const char *file, const char *path)
174 : {
175 : sigjmp_buf local_sigjmp_buf;
176 : MemoryContext oldcontext;
177 4 : BasicArchiveData *data = (BasicArchiveData *) state->private_data;
178 4 : MemoryContext basic_archive_context = data->context;
179 :
180 : /*
181 : * We run basic_archive_file_internal() in our own memory context so that
182 : * we can easily reset it during error recovery (thus avoiding memory
183 : * leaks).
184 : */
185 4 : oldcontext = MemoryContextSwitchTo(basic_archive_context);
186 :
187 : /*
188 : * Since the archiver operates at the bottom of the exception stack,
189 : * ERRORs turn into FATALs and cause the archiver process to restart.
190 : * However, using ereport(ERROR, ...) when there are problems is easy to
191 : * code and maintain. Therefore, we create our own exception handler to
192 : * catch ERRORs and return false instead of restarting the archiver
193 : * whenever there is a failure.
194 : */
195 4 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
196 : {
197 : /* Since not using PG_TRY, must reset error stack by hand */
198 0 : error_context_stack = NULL;
199 :
200 : /* Prevent interrupts while cleaning up */
201 0 : HOLD_INTERRUPTS();
202 :
203 : /* Report the error and clear ErrorContext for next time */
204 0 : EmitErrorReport();
205 0 : FlushErrorState();
206 :
207 : /* Close any files left open by copy_file() or compare_files() */
208 0 : AtEOSubXact_Files(false, InvalidSubTransactionId, InvalidSubTransactionId);
209 :
210 : /* Reset our memory context and switch back to the original one */
211 0 : MemoryContextSwitchTo(oldcontext);
212 0 : MemoryContextReset(basic_archive_context);
213 :
214 : /* Remove our exception handler */
215 0 : PG_exception_stack = NULL;
216 :
217 : /* Now we can allow interrupts again */
218 0 : RESUME_INTERRUPTS();
219 :
220 : /* Report failure so that the archiver retries this file */
221 0 : return false;
222 : }
223 :
224 : /* Enable our exception handler */
225 4 : PG_exception_stack = &local_sigjmp_buf;
226 :
227 : /* Archive the file! */
228 4 : basic_archive_file_internal(file, path);
229 :
230 : /* Remove our exception handler */
231 4 : PG_exception_stack = NULL;
232 :
233 : /* Reset our memory context and switch back to the original one */
234 4 : MemoryContextSwitchTo(oldcontext);
235 4 : MemoryContextReset(basic_archive_context);
236 :
237 4 : return true;
238 : }
239 :
240 : static void
241 4 : basic_archive_file_internal(const char *file, const char *path)
242 : {
243 : char destination[MAXPGPATH];
244 : char temp[MAXPGPATH + 256];
245 : struct stat st;
246 : struct timeval tv;
247 : uint64 epoch; /* milliseconds */
248 :
249 4 : ereport(DEBUG3,
250 : (errmsg("archiving \"%s\" via basic_archive", file)));
251 :
252 4 : snprintf(destination, MAXPGPATH, "%s/%s", archive_directory, file);
253 :
254 : /*
255 : * First, check if the file has already been archived. If it already
256 : * exists and has the same contents as the file we're trying to archive,
257 : * we can return success (after ensuring the file is persisted to disk).
258 : * This scenario is possible if the server crashed after archiving the
259 : * file but before renaming its .ready file to .done.
260 : *
261 : * If the archive file already exists but has different contents,
262 : * something might be wrong, so we just fail.
263 : */
264 4 : if (stat(destination, &st) == 0)
265 : {
266 0 : if (compare_files(path, destination))
267 : {
268 0 : ereport(DEBUG3,
269 : (errmsg("archive file \"%s\" already exists with identical contents",
270 : destination)));
271 :
272 0 : fsync_fname(destination, false);
273 0 : fsync_fname(archive_directory, true);
274 :
275 0 : return;
276 : }
277 :
278 0 : ereport(ERROR,
279 : (errmsg("archive file \"%s\" already exists", destination)));
280 : }
281 4 : else if (errno != ENOENT)
282 0 : ereport(ERROR,
283 : (errcode_for_file_access(),
284 : errmsg("could not stat file \"%s\": %m", destination)));
285 :
286 : /*
287 : * Pick a sufficiently unique name for the temporary file so that a
288 : * collision is unlikely. This helps avoid problems in case a temporary
289 : * file was left around after a crash or another server happens to be
290 : * archiving to the same directory.
291 : */
292 4 : gettimeofday(&tv, NULL);
293 8 : if (pg_mul_u64_overflow((uint64) 1000, (uint64) tv.tv_sec, &epoch) ||
294 4 : pg_add_u64_overflow(epoch, (uint64) (tv.tv_usec / 1000), &epoch))
295 0 : elog(ERROR, "could not generate temporary file name for archiving");
296 :
297 4 : snprintf(temp, sizeof(temp), "%s/%s.%s.%d." UINT64_FORMAT,
298 : archive_directory, "archtemp", file, MyProcPid, epoch);
299 :
300 : /*
301 : * Copy the file to its temporary destination. Note that this will fail
302 : * if temp already exists.
303 : */
304 4 : copy_file(path, temp);
305 :
306 : /*
307 : * Sync the temporary file to disk and move it to its final destination.
308 : * Note that this will overwrite any existing file, but this is only
309 : * possible if someone else created the file since the stat() above.
310 : */
311 4 : (void) durable_rename(temp, destination, ERROR);
312 :
313 4 : ereport(DEBUG1,
314 : (errmsg("archived \"%s\" via basic_archive", file)));
315 : }
316 :
317 : /*
318 : * compare_files
319 : *
320 : * Returns whether the contents of the files are the same.
321 : */
322 : static bool
323 0 : compare_files(const char *file1, const char *file2)
324 : {
325 : #define CMP_BUF_SIZE (4096)
326 : char buf1[CMP_BUF_SIZE];
327 : char buf2[CMP_BUF_SIZE];
328 : int fd1;
329 : int fd2;
330 0 : bool ret = true;
331 :
332 0 : fd1 = OpenTransientFile(file1, O_RDONLY | PG_BINARY);
333 0 : if (fd1 < 0)
334 0 : ereport(ERROR,
335 : (errcode_for_file_access(),
336 : errmsg("could not open file \"%s\": %m", file1)));
337 :
338 0 : fd2 = OpenTransientFile(file2, O_RDONLY | PG_BINARY);
339 0 : if (fd2 < 0)
340 0 : ereport(ERROR,
341 : (errcode_for_file_access(),
342 : errmsg("could not open file \"%s\": %m", file2)));
343 :
344 : for (;;)
345 0 : {
346 0 : int nbytes = 0;
347 0 : int buf1_len = 0;
348 0 : int buf2_len = 0;
349 :
350 0 : while (buf1_len < CMP_BUF_SIZE)
351 : {
352 0 : nbytes = read(fd1, buf1 + buf1_len, CMP_BUF_SIZE - buf1_len);
353 0 : if (nbytes < 0)
354 0 : ereport(ERROR,
355 : (errcode_for_file_access(),
356 : errmsg("could not read file \"%s\": %m", file1)));
357 0 : else if (nbytes == 0)
358 0 : break;
359 :
360 0 : buf1_len += nbytes;
361 : }
362 :
363 0 : while (buf2_len < CMP_BUF_SIZE)
364 : {
365 0 : nbytes = read(fd2, buf2 + buf2_len, CMP_BUF_SIZE - buf2_len);
366 0 : if (nbytes < 0)
367 0 : ereport(ERROR,
368 : (errcode_for_file_access(),
369 : errmsg("could not read file \"%s\": %m", file2)));
370 0 : else if (nbytes == 0)
371 0 : break;
372 :
373 0 : buf2_len += nbytes;
374 : }
375 :
376 0 : if (buf1_len != buf2_len || memcmp(buf1, buf2, buf1_len) != 0)
377 : {
378 0 : ret = false;
379 0 : break;
380 : }
381 0 : else if (buf1_len == 0)
382 0 : break;
383 : }
384 :
385 0 : if (CloseTransientFile(fd1) != 0)
386 0 : ereport(ERROR,
387 : (errcode_for_file_access(),
388 : errmsg("could not close file \"%s\": %m", file1)));
389 :
390 0 : if (CloseTransientFile(fd2) != 0)
391 0 : ereport(ERROR,
392 : (errcode_for_file_access(),
393 : errmsg("could not close file \"%s\": %m", file2)));
394 :
395 0 : return ret;
396 : }
397 :
398 : /*
399 : * basic_archive_shutdown
400 : *
401 : * Frees our allocated state.
402 : */
403 : static void
404 2 : basic_archive_shutdown(ArchiveModuleState *state)
405 : {
406 2 : BasicArchiveData *data = (BasicArchiveData *) state->private_data;
407 : MemoryContext basic_archive_context;
408 :
409 : /*
410 : * If we didn't get to storing the pointer to our allocated state, we don't
411 : * have anything to clean up.
412 : */
413 2 : if (data == NULL)
414 0 : return;
415 :
416 2 : basic_archive_context = data->context;
417 : Assert(CurrentMemoryContext != basic_archive_context);
418 :
419 2 : if (MemoryContextIsValid(basic_archive_context))
420 2 : MemoryContextDelete(basic_archive_context);
421 2 : data->context = NULL;
422 :
423 : /*
424 : * Finally, free the state.
425 : */
426 2 : pfree(data);
427 2 : state->private_data = NULL;
428 : }
|