Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgarch.c
4 : *
5 : * PostgreSQL WAL archiver
6 : *
7 : * All functions relating to archiver are included here
8 : *
9 : * - All functions executed by archiver process
10 : *
11 : * - archiver is forked from postmaster, and the two
12 : * processes then communicate using signals. All functions
13 : * executed by postmaster are included in this file.
14 : *
15 : * Initial author: Simon Riggs simon@2ndquadrant.com
16 : *
17 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
18 : * Portions Copyright (c) 1994, Regents of the University of California
19 : *
20 : *
21 : * IDENTIFICATION
22 : * src/backend/postmaster/pgarch.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres.h"
27 :
28 : #include <time.h>
29 : #include <sys/stat.h>
30 : #include <unistd.h>
31 :
32 : #include "access/xlog.h"
33 : #include "access/xlog_internal.h"
34 : #include "archive/archive_module.h"
35 : #include "archive/shell_archive.h"
36 : #include "lib/binaryheap.h"
37 : #include "libpq/pqsignal.h"
38 : #include "pgstat.h"
39 : #include "postmaster/auxprocess.h"
40 : #include "postmaster/interrupt.h"
41 : #include "postmaster/pgarch.h"
42 : #include "storage/condition_variable.h"
43 : #include "storage/aio_subsys.h"
44 : #include "storage/fd.h"
45 : #include "storage/ipc.h"
46 : #include "storage/latch.h"
47 : #include "storage/pmsignal.h"
48 : #include "storage/proc.h"
49 : #include "storage/procsignal.h"
50 : #include "storage/shmem.h"
51 : #include "utils/guc.h"
52 : #include "utils/memutils.h"
53 : #include "utils/ps_status.h"
54 : #include "utils/resowner.h"
55 : #include "utils/timeout.h"
56 :
57 :
58 : /* ----------
59 : * Timer definitions.
60 : * ----------
61 : */
62 : #define PGARCH_AUTOWAKE_INTERVAL 60 /* How often to force a poll of the
63 : * archive status directory; in seconds. */
64 : #define PGARCH_RESTART_INTERVAL 10 /* How often to attempt to restart a
65 : * failed archiver; in seconds. */
66 :
67 : /*
68 : * Maximum number of retries allowed when attempting to archive a WAL
69 : * file.
70 : */
71 : #define NUM_ARCHIVE_RETRIES 3
72 :
73 : /*
74 : * Maximum number of retries allowed when attempting to remove an
75 : * orphan archive status file.
76 : */
77 : #define NUM_ORPHAN_CLEANUP_RETRIES 3
78 :
79 : /*
80 : * Maximum number of .ready files to gather per directory scan.
81 : */
82 : #define NUM_FILES_PER_DIRECTORY_SCAN 64
83 :
84 : /* Shared memory area for archiver process */
85 : typedef struct PgArchData
86 : {
87 : int pgprocno; /* proc number of archiver process */
88 :
89 : /*
90 : * Forces a directory scan in pgarch_readyXlog().
91 : */
92 : pg_atomic_uint32 force_dir_scan;
93 : } PgArchData;
94 :
95 : char *XLogArchiveLibrary = "";
96 : char *arch_module_check_errdetail_string;
97 :
98 :
99 : /* ----------
100 : * Local data
101 : * ----------
102 : */
103 : static time_t last_sigterm_time = 0;
104 : static PgArchData *PgArch = NULL;
105 : static const ArchiveModuleCallbacks *ArchiveCallbacks;
106 : static ArchiveModuleState *archive_module_state;
107 : static MemoryContext archive_context;
108 :
109 :
110 : /*
111 : * Stuff for tracking multiple files to archive from each scan of
112 : * archive_status. Minimizing the number of directory scans when there are
113 : * many files to archive can significantly improve archival rate.
114 : *
115 : * arch_heap is a max-heap that is used during the directory scan to track
116 : * the highest-priority files to archive. After the directory scan
117 : * completes, the file names are stored in ascending order of priority in
118 : * arch_files. pgarch_readyXlog() returns files from arch_files until it
119 : * is empty, at which point another directory scan must be performed.
120 : *
121 : * We only need this data in the archiver process, so make it a palloc'd
122 : * struct rather than a bunch of static arrays.
123 : */
124 : struct arch_files_state
125 : {
126 : binaryheap *arch_heap;
127 : int arch_files_size; /* number of live entries in arch_files[] */
128 : char *arch_files[NUM_FILES_PER_DIRECTORY_SCAN];
129 : /* buffers underlying heap, and later arch_files[], entries: */
130 : char arch_filenames[NUM_FILES_PER_DIRECTORY_SCAN][MAX_XFN_CHARS + 1];
131 : };
132 :
133 : static struct arch_files_state *arch_files = NULL;
134 :
135 : /*
136 : * Flags set by interrupt handlers for later service in the main loop.
137 : */
138 : static volatile sig_atomic_t ready_to_stop = false;
139 :
140 : /* ----------
141 : * Local function forward declarations
142 : * ----------
143 : */
144 : static void pgarch_waken_stop(SIGNAL_ARGS);
145 : static void pgarch_MainLoop(void);
146 : static void pgarch_ArchiverCopyLoop(void);
147 : static bool pgarch_archiveXlog(char *xlog);
148 : static bool pgarch_readyXlog(char *xlog);
149 : static void pgarch_archiveDone(char *xlog);
150 : static void pgarch_die(int code, Datum arg);
151 : static void ProcessPgArchInterrupts(void);
152 : static int ready_file_comparator(Datum a, Datum b, void *arg);
153 : static void LoadArchiveLibrary(void);
154 : static void pgarch_call_module_shutdown_cb(int code, Datum arg);
155 :
156 : /* Report shared memory space needed by PgArchShmemInit */
157 : Size
158 8814 : PgArchShmemSize(void)
159 : {
160 8814 : Size size = 0;
161 :
162 8814 : size = add_size(size, sizeof(PgArchData));
163 :
164 8814 : return size;
165 : }
166 :
167 : /* Allocate and initialize archiver-related shared memory */
168 : void
169 2280 : PgArchShmemInit(void)
170 : {
171 : bool found;
172 :
173 2280 : PgArch = (PgArchData *)
174 2280 : ShmemInitStruct("Archiver Data", PgArchShmemSize(), &found);
175 :
176 2280 : if (!found)
177 : {
178 : /* First time through, so initialize */
179 4560 : MemSet(PgArch, 0, PgArchShmemSize());
180 2280 : PgArch->pgprocno = INVALID_PROC_NUMBER;
181 2280 : pg_atomic_init_u32(&PgArch->force_dir_scan, 0);
182 : }
183 2280 : }
184 :
185 : /*
186 : * PgArchCanRestart
187 : *
188 : * Return true, indicating archiver is allowed to restart, if enough time has
189 : * passed since it was last launched to reach PGARCH_RESTART_INTERVAL.
190 : * Otherwise return false.
191 : *
192 : * This is a safety valve to protect against continuous respawn attempts if the
193 : * archiver is dying immediately at launch. Note that since we will retry to
194 : * launch the archiver from the postmaster main loop, we will get another
195 : * chance later.
196 : */
197 : bool
198 108 : PgArchCanRestart(void)
199 : {
200 : static time_t last_pgarch_start_time = 0;
201 108 : time_t curtime = time(NULL);
202 :
203 : /*
204 : * If first time through, or time somehow went backwards, always update
205 : * last_pgarch_start_time to match the current clock and allow archiver
206 : * start. Otherwise allow it only once enough time has elapsed.
207 : */
208 108 : if (last_pgarch_start_time == 0 ||
209 0 : curtime < last_pgarch_start_time ||
210 0 : curtime - last_pgarch_start_time >= PGARCH_RESTART_INTERVAL)
211 : {
212 108 : last_pgarch_start_time = curtime;
213 108 : return true;
214 : }
215 0 : return false;
216 : }
217 :
218 :
219 : /* Main entry point for archiver process */
220 : void
221 34 : PgArchiverMain(const void *startup_data, size_t startup_data_len)
222 : {
223 : Assert(startup_data_len == 0);
224 :
225 34 : AuxiliaryProcessMainCommon();
226 :
227 : /*
228 : * Ignore all signals usually bound to some action in the postmaster,
229 : * except for SIGHUP, SIGTERM, SIGUSR1, SIGUSR2, and SIGQUIT.
230 : */
231 34 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
232 34 : pqsignal(SIGINT, SIG_IGN);
233 34 : pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
234 : /* SIGQUIT handler was already set up by InitPostmasterChild */
235 34 : pqsignal(SIGALRM, SIG_IGN);
236 34 : pqsignal(SIGPIPE, SIG_IGN);
237 34 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
238 34 : pqsignal(SIGUSR2, pgarch_waken_stop);
239 :
240 : /* Reset some signals that are accepted by postmaster but not here */
241 34 : pqsignal(SIGCHLD, SIG_DFL);
242 :
243 : /* Unblock signals (they were blocked when the postmaster forked us) */
244 34 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
245 :
246 : /* We shouldn't be launched unnecessarily. */
247 : Assert(XLogArchivingActive());
248 :
249 : /* Arrange to clean up at archiver exit */
250 34 : on_shmem_exit(pgarch_die, 0);
251 :
252 : /*
253 : * Advertise our proc number so that backends can use our latch to wake us
254 : * up while we're sleeping.
255 : */
256 34 : PgArch->pgprocno = MyProcNumber;
257 :
258 : /* Create workspace for pgarch_readyXlog() */
259 34 : arch_files = palloc_object(struct arch_files_state);
260 34 : arch_files->arch_files_size = 0;
261 :
262 : /* Initialize our max-heap for prioritizing files to archive. */
263 34 : arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN,
264 : ready_file_comparator, NULL);
265 :
266 : /* Initialize our memory context. */
267 34 : archive_context = AllocSetContextCreate(TopMemoryContext,
268 : "archiver",
269 : ALLOCSET_DEFAULT_SIZES);
270 :
271 : /* Load the archive_library. */
272 34 : LoadArchiveLibrary();
273 :
274 34 : pgarch_MainLoop();
275 :
276 34 : proc_exit(0);
277 : }
278 :
279 : /*
280 : * Wake up the archiver
281 : */
282 : void
283 942 : PgArchWakeup(void)
284 : {
285 942 : int arch_pgprocno = PgArch->pgprocno;
286 :
287 : /*
288 : * We don't acquire ProcArrayLock here. It's actually fine because
289 : * procLatch isn't ever freed, so we just can potentially set the wrong
290 : * process' (or no process') latch. Even in that case the archiver will
291 : * be relaunched shortly and will start archiving.
292 : */
293 942 : if (arch_pgprocno != INVALID_PROC_NUMBER)
294 902 : SetLatch(&GetPGProcByNumber(arch_pgprocno)->procLatch);
295 942 : }
296 :
297 :
298 : /* SIGUSR2 signal handler for archiver process */
299 : static void
300 34 : pgarch_waken_stop(SIGNAL_ARGS)
301 : {
302 : /* set flag to do a final cycle and shut down afterwards */
303 34 : ready_to_stop = true;
304 34 : SetLatch(MyLatch);
305 34 : }
306 :
307 : /*
308 : * pgarch_MainLoop
309 : *
310 : * Main loop for archiver
311 : */
312 : static void
313 34 : pgarch_MainLoop(void)
314 : {
315 : bool time_to_stop;
316 :
317 : /*
318 : * There shouldn't be anything for the archiver to do except to wait for a
319 : * signal ... however, the archiver exists to protect our data, so it
320 : * wakes up occasionally to allow itself to be proactive.
321 : */
322 : do
323 : {
324 156 : ResetLatch(MyLatch);
325 :
326 : /* When we get SIGUSR2, we do one more archive cycle, then exit */
327 156 : time_to_stop = ready_to_stop;
328 :
329 : /* Check for barrier events and config update */
330 156 : ProcessPgArchInterrupts();
331 :
332 : /*
333 : * If we've gotten SIGTERM, we normally just sit and do nothing until
334 : * SIGUSR2 arrives. However, that means a random SIGTERM would
335 : * disable archiving indefinitely, which doesn't seem like a good
336 : * idea. If more than 60 seconds pass since SIGTERM, exit anyway, so
337 : * that the postmaster can start a new archiver if needed. Also exit
338 : * if time unexpectedly goes backward.
339 : */
340 156 : if (ShutdownRequestPending)
341 : {
342 0 : time_t curtime = time(NULL);
343 :
344 0 : if (last_sigterm_time == 0)
345 0 : last_sigterm_time = curtime;
346 0 : else if (curtime < last_sigterm_time ||
347 0 : curtime - last_sigterm_time >= 60)
348 : break;
349 : }
350 :
351 : /* Do what we're here for */
352 156 : pgarch_ArchiverCopyLoop();
353 :
354 : /*
355 : * Sleep until a signal is received, or until a poll is forced by
356 : * PGARCH_AUTOWAKE_INTERVAL, or until postmaster dies.
357 : */
358 156 : if (!time_to_stop) /* Don't wait during last iteration */
359 : {
360 : int rc;
361 :
362 122 : rc = WaitLatch(MyLatch,
363 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
364 : PGARCH_AUTOWAKE_INTERVAL * 1000L,
365 : WAIT_EVENT_ARCHIVER_MAIN);
366 122 : if (rc & WL_POSTMASTER_DEATH)
367 0 : time_to_stop = true;
368 : }
369 :
370 : /*
371 : * The archiver quits either when the postmaster dies (not expected)
372 : * or after completing one more archiving cycle after receiving
373 : * SIGUSR2.
374 : */
375 156 : } while (!time_to_stop);
376 34 : }
377 :
378 : /*
379 : * pgarch_ArchiverCopyLoop
380 : *
381 : * Archives all outstanding xlogs then returns
382 : */
383 : static void
384 156 : pgarch_ArchiverCopyLoop(void)
385 : {
386 : char xlog[MAX_XFN_CHARS + 1];
387 :
388 : /* force directory scan in the first call to pgarch_readyXlog() */
389 156 : arch_files->arch_files_size = 0;
390 :
391 : /*
392 : * loop through all xlogs with archive_status of .ready and archive
393 : * them...mostly we expect this to be a single file, though it is possible
394 : * some backend will add files onto the list of those that need archiving
395 : * while we are still copying earlier archives
396 : */
397 890 : while (pgarch_readyXlog(xlog))
398 : {
399 744 : int failures = 0;
400 744 : int failures_orphan = 0;
401 :
402 : for (;;)
403 14 : {
404 : struct stat stat_buf;
405 : char pathname[MAXPGPATH];
406 :
407 : /*
408 : * Do not initiate any more archive commands after receiving
409 : * SIGTERM, nor after the postmaster has died unexpectedly. The
410 : * first condition is to try to keep from having init SIGKILL the
411 : * command, and the second is to avoid conflicts with another
412 : * archiver spawned by a newer postmaster.
413 : */
414 758 : if (ShutdownRequestPending || !PostmasterIsAlive())
415 10 : return;
416 :
417 : /*
418 : * Check for barrier events and config update. This is so that
419 : * we'll adopt a new setting for archive_command as soon as
420 : * possible, even if there is a backlog of files to be archived.
421 : */
422 758 : ProcessPgArchInterrupts();
423 :
424 : /* Reset variables that might be set by the callback */
425 758 : arch_module_check_errdetail_string = NULL;
426 :
427 : /* can't do anything if not configured ... */
428 758 : if (ArchiveCallbacks->check_configured_cb != NULL &&
429 758 : !ArchiveCallbacks->check_configured_cb(archive_module_state))
430 : {
431 4 : ereport(WARNING,
432 : (errmsg("\"archive_mode\" enabled, yet archiving is not configured"),
433 : arch_module_check_errdetail_string ?
434 : errdetail_internal("%s", arch_module_check_errdetail_string) : 0));
435 4 : return;
436 : }
437 :
438 : /*
439 : * Since archive status files are not removed in a durable manner,
440 : * a system crash could leave behind .ready files for WAL segments
441 : * that have already been recycled or removed. In this case,
442 : * simply remove the orphan status file and move on. unlink() is
443 : * used here as even on subsequent crashes the same orphan files
444 : * would get removed, so there is no need to worry about
445 : * durability.
446 : */
447 754 : snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
448 754 : if (stat(pathname, &stat_buf) != 0 && errno == ENOENT)
449 0 : {
450 : char xlogready[MAXPGPATH];
451 :
452 0 : StatusFilePath(xlogready, xlog, ".ready");
453 0 : if (unlink(xlogready) == 0)
454 : {
455 0 : ereport(WARNING,
456 : (errmsg("removed orphan archive status file \"%s\"",
457 : xlogready)));
458 :
459 : /* leave loop and move to the next status file */
460 0 : break;
461 : }
462 :
463 0 : if (++failures_orphan >= NUM_ORPHAN_CLEANUP_RETRIES)
464 : {
465 0 : ereport(WARNING,
466 : (errmsg("removal of orphan archive status file \"%s\" failed too many times, will try again later",
467 : xlogready)));
468 :
469 : /* give up cleanup of orphan status files */
470 0 : return;
471 : }
472 :
473 : /* wait a bit before retrying */
474 0 : pg_usleep(1000000L);
475 0 : continue;
476 : }
477 :
478 754 : if (pgarch_archiveXlog(xlog))
479 : {
480 : /* successful */
481 734 : pgarch_archiveDone(xlog);
482 :
483 : /*
484 : * Tell the cumulative stats system about the WAL file that we
485 : * successfully archived
486 : */
487 734 : pgstat_report_archiver(xlog, false);
488 :
489 734 : break; /* out of inner retry loop */
490 : }
491 : else
492 : {
493 : /*
494 : * Tell the cumulative stats system about the WAL file that we
495 : * failed to archive
496 : */
497 20 : pgstat_report_archiver(xlog, true);
498 :
499 20 : if (++failures >= NUM_ARCHIVE_RETRIES)
500 : {
501 6 : ereport(WARNING,
502 : (errmsg("archiving write-ahead log file \"%s\" failed too many times, will try again later",
503 : xlog)));
504 6 : return; /* give up archiving for now */
505 : }
506 14 : pg_usleep(1000000L); /* wait a bit before retrying */
507 : }
508 : }
509 : }
510 : }
511 :
512 : /*
513 : * pgarch_archiveXlog
514 : *
515 : * Invokes archive_file_cb to copy one archive file to wherever it should go
516 : *
517 : * Returns true if successful
518 : */
519 : static bool
520 754 : pgarch_archiveXlog(char *xlog)
521 : {
522 : sigjmp_buf local_sigjmp_buf;
523 : MemoryContext oldcontext;
524 : char pathname[MAXPGPATH];
525 : char activitymsg[MAXFNAMELEN + 16];
526 : bool ret;
527 :
528 754 : snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
529 :
530 : /* Report archive activity in PS display */
531 754 : snprintf(activitymsg, sizeof(activitymsg), "archiving %s", xlog);
532 754 : set_ps_display(activitymsg);
533 :
534 754 : oldcontext = MemoryContextSwitchTo(archive_context);
535 :
536 : /*
537 : * Since the archiver operates at the bottom of the exception stack,
538 : * ERRORs turn into FATALs and cause the archiver process to restart.
539 : * However, using ereport(ERROR, ...) when there are problems is easy to
540 : * code and maintain. Therefore, we create our own exception handler to
541 : * catch ERRORs and return false instead of restarting the archiver
542 : * whenever there is a failure.
543 : *
544 : * We assume ERRORs from the archiving callback are the most common
545 : * exceptions experienced by the archiver, so we opt to handle exceptions
546 : * here instead of PgArchiverMain() to avoid reinitializing the archiver
547 : * too frequently. We could instead add a sigsetjmp() block to
548 : * PgArchiverMain() and use PG_TRY/PG_CATCH here, but the extra code to
549 : * avoid the odd archiver restart doesn't seem worth it.
550 : */
551 754 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
552 : {
553 : /* Since not using PG_TRY, must reset error stack by hand */
554 0 : error_context_stack = NULL;
555 :
556 : /* Prevent interrupts while cleaning up */
557 0 : HOLD_INTERRUPTS();
558 :
559 : /* Report the error to the server log. */
560 0 : EmitErrorReport();
561 :
562 : /*
563 : * Try to clean up anything the archive module left behind. We try to
564 : * cover anything that an archive module could conceivably have left
565 : * behind, but it is of course possible that modules could be doing
566 : * unexpected things that require additional cleanup. Module authors
567 : * should be sure to do any extra required cleanup in a PG_CATCH block
568 : * within the archiving callback, and they are encouraged to notify
569 : * the pgsql-hackers mailing list so that we can add it here.
570 : */
571 0 : disable_all_timeouts(false);
572 0 : LWLockReleaseAll();
573 0 : ConditionVariableCancelSleep();
574 0 : pgstat_report_wait_end();
575 0 : pgaio_error_cleanup();
576 0 : ReleaseAuxProcessResources(false);
577 0 : AtEOXact_Files(false);
578 0 : AtEOXact_HashTables(false);
579 :
580 : /*
581 : * Return to the original memory context and clear ErrorContext for
582 : * next time.
583 : */
584 0 : MemoryContextSwitchTo(oldcontext);
585 0 : FlushErrorState();
586 :
587 : /* Flush any leaked data */
588 0 : MemoryContextReset(archive_context);
589 :
590 : /* Remove our exception handler */
591 0 : PG_exception_stack = NULL;
592 :
593 : /* Now we can allow interrupts again */
594 0 : RESUME_INTERRUPTS();
595 :
596 : /* Report failure so that the archiver retries this file */
597 0 : ret = false;
598 : }
599 : else
600 : {
601 : /* Enable our exception handler */
602 754 : PG_exception_stack = &local_sigjmp_buf;
603 :
604 : /* Archive the file! */
605 754 : ret = ArchiveCallbacks->archive_file_cb(archive_module_state,
606 : xlog, pathname);
607 :
608 : /* Remove our exception handler */
609 754 : PG_exception_stack = NULL;
610 :
611 : /* Reset our memory context and switch back to the original one */
612 754 : MemoryContextSwitchTo(oldcontext);
613 754 : MemoryContextReset(archive_context);
614 : }
615 :
616 754 : if (ret)
617 734 : snprintf(activitymsg, sizeof(activitymsg), "last was %s", xlog);
618 : else
619 20 : snprintf(activitymsg, sizeof(activitymsg), "failed on %s", xlog);
620 754 : set_ps_display(activitymsg);
621 :
622 754 : return ret;
623 : }
624 :
625 : /*
626 : * pgarch_readyXlog
627 : *
628 : * Return name of the oldest xlog file that has not yet been archived.
629 : * No notification is set that file archiving is now in progress, so
630 : * this would need to be extended if multiple concurrent archival
631 : * tasks were created. If a failure occurs, we will completely
632 : * re-copy the file at the next available opportunity.
633 : *
634 : * It is important that we return the oldest, so that we archive xlogs
635 : * in order that they were written, for two reasons:
636 : * 1) to maintain the sequential chain of xlogs required for recovery
637 : * 2) because the oldest ones will sooner become candidates for
638 : * recycling at time of checkpoint
639 : *
640 : * NOTE: the "oldest" comparison will consider any .history file to be older
641 : * than any other file except another .history file. Segments on a timeline
642 : * with a smaller ID will be older than all segments on a timeline with a
643 : * larger ID; the net result being that past timelines are given higher
644 : * priority for archiving. This seems okay, or at least not obviously worth
645 : * changing.
646 : */
647 : static bool
648 890 : pgarch_readyXlog(char *xlog)
649 : {
650 : char XLogArchiveStatusDir[MAXPGPATH];
651 : DIR *rldir;
652 : struct dirent *rlde;
653 :
654 : /*
655 : * If a directory scan was requested, clear the stored file names and
656 : * proceed.
657 : */
658 890 : if (pg_atomic_exchange_u32(&PgArch->force_dir_scan, 0) == 1)
659 6 : arch_files->arch_files_size = 0;
660 :
661 : /*
662 : * If we still have stored file names from the previous directory scan,
663 : * try to return one of those. We check to make sure the status file is
664 : * still present, as the archive_command for a previous file may have
665 : * already marked it done.
666 : */
667 890 : while (arch_files->arch_files_size > 0)
668 : {
669 : struct stat st;
670 : char status_file[MAXPGPATH];
671 : char *arch_file;
672 :
673 624 : arch_files->arch_files_size--;
674 624 : arch_file = arch_files->arch_files[arch_files->arch_files_size];
675 624 : StatusFilePath(status_file, arch_file, ".ready");
676 :
677 624 : if (stat(status_file, &st) == 0)
678 : {
679 624 : strcpy(xlog, arch_file);
680 624 : return true;
681 : }
682 0 : else if (errno != ENOENT)
683 0 : ereport(ERROR,
684 : (errcode_for_file_access(),
685 : errmsg("could not stat file \"%s\": %m", status_file)));
686 : }
687 :
688 : /* arch_heap is probably empty, but let's make sure */
689 266 : binaryheap_reset(arch_files->arch_heap);
690 :
691 : /*
692 : * Open the archive status directory and read through the list of files
693 : * with the .ready suffix, looking for the earliest files.
694 : */
695 266 : snprintf(XLogArchiveStatusDir, MAXPGPATH, XLOGDIR "/archive_status");
696 266 : rldir = AllocateDir(XLogArchiveStatusDir);
697 :
698 2762 : while ((rlde = ReadDir(rldir, XLogArchiveStatusDir)) != NULL)
699 : {
700 2496 : int basenamelen = (int) strlen(rlde->d_name) - 6;
701 : char basename[MAX_XFN_CHARS + 1];
702 : char *arch_file;
703 :
704 : /* Ignore entries with unexpected number of characters */
705 2496 : if (basenamelen < MIN_XFN_CHARS ||
706 : basenamelen > MAX_XFN_CHARS)
707 1610 : continue;
708 :
709 : /* Ignore entries with unexpected characters */
710 1928 : if (strspn(rlde->d_name, VALID_XFN_CHARS) < basenamelen)
711 0 : continue;
712 :
713 : /* Ignore anything not suffixed with .ready */
714 1928 : if (strcmp(rlde->d_name + basenamelen, ".ready") != 0)
715 1042 : continue;
716 :
717 : /* Truncate off the .ready */
718 886 : memcpy(basename, rlde->d_name, basenamelen);
719 886 : basename[basenamelen] = '\0';
720 :
721 : /*
722 : * Store the file in our max-heap if it has a high enough priority.
723 : */
724 886 : if (binaryheap_size(arch_files->arch_heap) < NUM_FILES_PER_DIRECTORY_SCAN)
725 : {
726 : /* If the heap isn't full yet, quickly add it. */
727 748 : arch_file = arch_files->arch_filenames[binaryheap_size(arch_files->arch_heap)];
728 748 : strcpy(arch_file, basename);
729 748 : binaryheap_add_unordered(arch_files->arch_heap, CStringGetDatum(arch_file));
730 :
731 : /* If we just filled the heap, make it a valid one. */
732 748 : if (binaryheap_size(arch_files->arch_heap) == NUM_FILES_PER_DIRECTORY_SCAN)
733 4 : binaryheap_build(arch_files->arch_heap);
734 : }
735 138 : else if (ready_file_comparator(binaryheap_first(arch_files->arch_heap),
736 : CStringGetDatum(basename), NULL) > 0)
737 : {
738 : /*
739 : * Remove the lowest priority file and add the current one to the
740 : * heap.
741 : */
742 98 : arch_file = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
743 98 : strcpy(arch_file, basename);
744 98 : binaryheap_add(arch_files->arch_heap, CStringGetDatum(arch_file));
745 : }
746 : }
747 266 : FreeDir(rldir);
748 :
749 : /* If no files were found, simply return. */
750 266 : if (binaryheap_empty(arch_files->arch_heap))
751 146 : return false;
752 :
753 : /*
754 : * If we didn't fill the heap, we didn't make it a valid one. Do that
755 : * now.
756 : */
757 120 : if (binaryheap_size(arch_files->arch_heap) < NUM_FILES_PER_DIRECTORY_SCAN)
758 116 : binaryheap_build(arch_files->arch_heap);
759 :
760 : /*
761 : * Fill arch_files array with the files to archive in ascending order of
762 : * priority.
763 : */
764 120 : arch_files->arch_files_size = binaryheap_size(arch_files->arch_heap);
765 868 : for (int i = 0; i < arch_files->arch_files_size; i++)
766 748 : arch_files->arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
767 :
768 : /* Return the highest priority file. */
769 120 : arch_files->arch_files_size--;
770 120 : strcpy(xlog, arch_files->arch_files[arch_files->arch_files_size]);
771 :
772 120 : return true;
773 : }
774 :
775 : /*
776 : * ready_file_comparator
777 : *
778 : * Compares the archival priority of the given files to archive. If "a"
779 : * has a higher priority than "b", a negative value will be returned. If
780 : * "b" has a higher priority than "a", a positive value will be returned.
781 : * If "a" and "b" have equivalent values, 0 will be returned.
782 : */
783 : static int
784 6206 : ready_file_comparator(Datum a, Datum b, void *arg)
785 : {
786 6206 : char *a_str = DatumGetCString(a);
787 6206 : char *b_str = DatumGetCString(b);
788 6206 : bool a_history = IsTLHistoryFileName(a_str);
789 6206 : bool b_history = IsTLHistoryFileName(b_str);
790 :
791 : /* Timeline history files always have the highest priority. */
792 6206 : if (a_history != b_history)
793 4 : return a_history ? -1 : 1;
794 :
795 : /* Priority is given to older files. */
796 6202 : return strcmp(a_str, b_str);
797 : }
798 :
799 : /*
800 : * PgArchForceDirScan
801 : *
802 : * When called, the next call to pgarch_readyXlog() will perform a
803 : * directory scan. This is useful for ensuring that important files such
804 : * as timeline history files are archived as quickly as possible.
805 : */
806 : void
807 28 : PgArchForceDirScan(void)
808 : {
809 28 : pg_atomic_write_membarrier_u32(&PgArch->force_dir_scan, 1);
810 28 : }
811 :
812 : /*
813 : * pgarch_archiveDone
814 : *
815 : * Emit notification that an xlog file has been successfully archived.
816 : * We do this by renaming the status file from NNN.ready to NNN.done.
817 : * Eventually, a checkpoint process will notice this and delete both the
818 : * NNN.done file and the xlog file itself.
819 : */
820 : static void
821 734 : pgarch_archiveDone(char *xlog)
822 : {
823 : char rlogready[MAXPGPATH];
824 : char rlogdone[MAXPGPATH];
825 :
826 734 : StatusFilePath(rlogready, xlog, ".ready");
827 734 : StatusFilePath(rlogdone, xlog, ".done");
828 :
829 : /*
830 : * To avoid extra overhead, we don't durably rename the .ready file to
831 : * .done. Archive commands and libraries must gracefully handle attempts
832 : * to re-archive files (e.g., if the server crashes just before this
833 : * function is called), so it should be okay if the .ready file reappears
834 : * after a crash.
835 : */
836 734 : if (rename(rlogready, rlogdone) < 0)
837 0 : ereport(WARNING,
838 : (errcode_for_file_access(),
839 : errmsg("could not rename file \"%s\" to \"%s\": %m",
840 : rlogready, rlogdone)));
841 734 : }
842 :
843 :
844 : /*
845 : * pgarch_die
846 : *
847 : * Exit-time cleanup handler
848 : */
849 : static void
850 34 : pgarch_die(int code, Datum arg)
851 : {
852 34 : PgArch->pgprocno = INVALID_PROC_NUMBER;
853 34 : }
854 :
855 : /*
856 : * Interrupt handler for WAL archiver process.
857 : *
858 : * This is called in the loops pgarch_MainLoop and pgarch_ArchiverCopyLoop.
859 : * It checks for barrier events, config update and request for logging of
860 : * memory contexts, but not shutdown request because how to handle
861 : * shutdown request is different between those loops.
862 : */
863 : static void
864 914 : ProcessPgArchInterrupts(void)
865 : {
866 914 : if (ProcSignalBarrierPending)
867 4 : ProcessProcSignalBarrier();
868 :
869 : /* Perform logging of memory contexts of this process */
870 914 : if (LogMemoryContextPending)
871 0 : ProcessLogMemoryContextInterrupt();
872 :
873 914 : if (ConfigReloadPending)
874 : {
875 6 : char *archiveLib = pstrdup(XLogArchiveLibrary);
876 : bool archiveLibChanged;
877 :
878 6 : ConfigReloadPending = false;
879 6 : ProcessConfigFile(PGC_SIGHUP);
880 :
881 6 : if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
882 0 : ereport(ERROR,
883 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
884 : errmsg("both \"archive_command\" and \"archive_library\" set"),
885 : errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
886 :
887 6 : archiveLibChanged = strcmp(XLogArchiveLibrary, archiveLib) != 0;
888 6 : pfree(archiveLib);
889 :
890 6 : if (archiveLibChanged)
891 : {
892 : /*
893 : * Ideally, we would simply unload the previous archive module and
894 : * load the new one, but there is presently no mechanism for
895 : * unloading a library (see the comment above
896 : * internal_load_library()). To deal with this, we simply restart
897 : * the archiver. The new archive module will be loaded when the
898 : * new archiver process starts up. Note that this triggers the
899 : * module's shutdown callback, if defined.
900 : */
901 0 : ereport(LOG,
902 : (errmsg("restarting archiver process because value of "
903 : "\"archive_library\" was changed")));
904 :
905 0 : proc_exit(0);
906 : }
907 : }
908 914 : }
909 :
910 : /*
911 : * LoadArchiveLibrary
912 : *
913 : * Loads the archiving callbacks into our local ArchiveCallbacks.
914 : */
915 : static void
916 34 : LoadArchiveLibrary(void)
917 : {
918 : ArchiveModuleInit archive_init;
919 :
920 34 : if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
921 0 : ereport(ERROR,
922 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
923 : errmsg("both \"archive_command\" and \"archive_library\" set"),
924 : errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
925 :
926 : /*
927 : * If shell archiving is enabled, use our special initialization function.
928 : * Otherwise, load the library and call its _PG_archive_module_init().
929 : */
930 34 : if (XLogArchiveLibrary[0] == '\0')
931 32 : archive_init = shell_archive_init;
932 : else
933 2 : archive_init = (ArchiveModuleInit)
934 2 : load_external_function(XLogArchiveLibrary,
935 : "_PG_archive_module_init", false, NULL);
936 :
937 34 : if (archive_init == NULL)
938 0 : ereport(ERROR,
939 : (errmsg("archive modules have to define the symbol %s", "_PG_archive_module_init")));
940 :
941 34 : ArchiveCallbacks = (*archive_init) ();
942 :
943 34 : if (ArchiveCallbacks->archive_file_cb == NULL)
944 0 : ereport(ERROR,
945 : (errmsg("archive modules must register an archive callback")));
946 :
947 34 : archive_module_state = palloc0_object(ArchiveModuleState);
948 34 : if (ArchiveCallbacks->startup_cb != NULL)
949 0 : ArchiveCallbacks->startup_cb(archive_module_state);
950 :
951 34 : before_shmem_exit(pgarch_call_module_shutdown_cb, 0);
952 34 : }
953 :
954 : /*
955 : * Call the shutdown callback of the loaded archive module, if defined.
956 : */
957 : static void
958 34 : pgarch_call_module_shutdown_cb(int code, Datum arg)
959 : {
960 34 : if (ArchiveCallbacks->shutdown_cb != NULL)
961 32 : ArchiveCallbacks->shutdown_cb(archive_module_state);
962 34 : }
|