Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgarch.c
4 : *
5 : * PostgreSQL WAL archiver
6 : *
7 : * All functions relating to archiver are included here
8 : *
9 : * - All functions executed by archiver process
10 : *
11 : * - archiver is forked from postmaster, and the two
12 : * processes then communicate using signals. All functions
13 : * executed by postmaster are included in this file.
14 : *
15 : * Initial author: Simon Riggs simon@2ndquadrant.com
16 : *
17 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
18 : * Portions Copyright (c) 1994, Regents of the University of California
19 : *
20 : *
21 : * IDENTIFICATION
22 : * src/backend/postmaster/pgarch.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres.h"
27 :
28 : #include <time.h>
29 : #include <sys/stat.h>
30 : #include <unistd.h>
31 :
32 : #include "access/xlog.h"
33 : #include "access/xlog_internal.h"
34 : #include "archive/archive_module.h"
35 : #include "archive/shell_archive.h"
36 : #include "lib/binaryheap.h"
37 : #include "libpq/pqsignal.h"
38 : #include "pgstat.h"
39 : #include "postmaster/auxprocess.h"
40 : #include "postmaster/interrupt.h"
41 : #include "postmaster/pgarch.h"
42 : #include "storage/condition_variable.h"
43 : #include "storage/aio_subsys.h"
44 : #include "storage/fd.h"
45 : #include "storage/ipc.h"
46 : #include "storage/latch.h"
47 : #include "storage/pmsignal.h"
48 : #include "storage/proc.h"
49 : #include "storage/procsignal.h"
50 : #include "storage/shmem.h"
51 : #include "utils/guc.h"
52 : #include "utils/memutils.h"
53 : #include "utils/ps_status.h"
54 : #include "utils/resowner.h"
55 : #include "utils/timeout.h"
56 : #include "utils/wait_event.h"
57 :
58 :
59 : /* ----------
60 : * Timer definitions.
61 : * ----------
62 : */
63 : #define PGARCH_AUTOWAKE_INTERVAL 60 /* How often to force a poll of the
64 : * archive status directory; in seconds. */
65 : #define PGARCH_RESTART_INTERVAL 10 /* How often to attempt to restart a
66 : * failed archiver; in seconds. */
67 :
68 : /*
69 : * Maximum number of retries allowed when attempting to archive a WAL
70 : * file.
71 : */
72 : #define NUM_ARCHIVE_RETRIES 3
73 :
74 : /*
75 : * Maximum number of retries allowed when attempting to remove an
76 : * orphan archive status file.
77 : */
78 : #define NUM_ORPHAN_CLEANUP_RETRIES 3
79 :
80 : /*
81 : * Maximum number of .ready files to gather per directory scan.
82 : */
83 : #define NUM_FILES_PER_DIRECTORY_SCAN 64
84 :
85 : /* Shared memory area for archiver process */
86 : typedef struct PgArchData
87 : {
88 : int pgprocno; /* proc number of archiver process */
89 :
90 : /*
91 : * Forces a directory scan in pgarch_readyXlog().
92 : */
93 : pg_atomic_uint32 force_dir_scan;
94 : } PgArchData;
95 :
96 : char *XLogArchiveLibrary = "";
97 : char *arch_module_check_errdetail_string;
98 :
99 :
100 : /* ----------
101 : * Local data
102 : * ----------
103 : */
104 : static time_t last_sigterm_time = 0;
105 : static PgArchData *PgArch = NULL;
106 : static const ArchiveModuleCallbacks *ArchiveCallbacks;
107 : static ArchiveModuleState *archive_module_state;
108 : static MemoryContext archive_context;
109 :
110 :
111 : /*
112 : * Stuff for tracking multiple files to archive from each scan of
113 : * archive_status. Minimizing the number of directory scans when there are
114 : * many files to archive can significantly improve archival rate.
115 : *
116 : * arch_heap is a max-heap that is used during the directory scan to track
117 : * the highest-priority files to archive. After the directory scan
118 : * completes, the file names are stored in ascending order of priority in
119 : * arch_files. pgarch_readyXlog() returns files from arch_files until it
120 : * is empty, at which point another directory scan must be performed.
121 : *
122 : * We only need this data in the archiver process, so make it a palloc'd
123 : * struct rather than a bunch of static arrays.
124 : */
125 : struct arch_files_state
126 : {
127 : binaryheap *arch_heap;
128 : int arch_files_size; /* number of live entries in arch_files[] */
129 : char *arch_files[NUM_FILES_PER_DIRECTORY_SCAN];
130 : /* buffers underlying heap, and later arch_files[], entries: */
131 : char arch_filenames[NUM_FILES_PER_DIRECTORY_SCAN][MAX_XFN_CHARS + 1];
132 : };
133 :
134 : static struct arch_files_state *arch_files = NULL;
135 :
136 : /*
137 : * Flags set by interrupt handlers for later service in the main loop.
138 : */
139 : static volatile sig_atomic_t ready_to_stop = false;
140 :
141 : /* ----------
142 : * Local function forward declarations
143 : * ----------
144 : */
145 : static void pgarch_waken_stop(SIGNAL_ARGS);
146 : static void pgarch_MainLoop(void);
147 : static void pgarch_ArchiverCopyLoop(void);
148 : static bool pgarch_archiveXlog(char *xlog);
149 : static bool pgarch_readyXlog(char *xlog);
150 : static void pgarch_archiveDone(char *xlog);
151 : static void pgarch_die(int code, Datum arg);
152 : static void ProcessPgArchInterrupts(void);
153 : static int ready_file_comparator(Datum a, Datum b, void *arg);
154 : static void LoadArchiveLibrary(void);
155 : static void pgarch_call_module_shutdown_cb(int code, Datum arg);
156 :
157 : /* Report shared memory space needed by PgArchShmemInit */
158 : Size
159 4483 : PgArchShmemSize(void)
160 : {
161 4483 : Size size = 0;
162 :
163 4483 : size = add_size(size, sizeof(PgArchData));
164 :
165 4483 : return size;
166 : }
167 :
168 : /* Allocate and initialize archiver-related shared memory */
169 : void
170 1159 : PgArchShmemInit(void)
171 : {
172 : bool found;
173 :
174 1159 : PgArch = (PgArchData *)
175 1159 : ShmemInitStruct("Archiver Data", PgArchShmemSize(), &found);
176 :
177 1159 : if (!found)
178 : {
179 : /* First time through, so initialize */
180 2318 : MemSet(PgArch, 0, PgArchShmemSize());
181 1159 : PgArch->pgprocno = INVALID_PROC_NUMBER;
182 1159 : pg_atomic_init_u32(&PgArch->force_dir_scan, 0);
183 : }
184 1159 : }
185 :
186 : /*
187 : * PgArchCanRestart
188 : *
189 : * Return true, indicating archiver is allowed to restart, if enough time has
190 : * passed since it was last launched to reach PGARCH_RESTART_INTERVAL.
191 : * Otherwise return false.
192 : *
193 : * This is a safety valve to protect against continuous respawn attempts if the
194 : * archiver is dying immediately at launch. Note that since we will retry to
195 : * launch the archiver from the postmaster main loop, we will get another
196 : * chance later.
197 : */
198 : bool
199 54 : PgArchCanRestart(void)
200 : {
201 : static time_t last_pgarch_start_time = 0;
202 54 : time_t curtime = time(NULL);
203 :
204 : /*
205 : * If first time through, or time somehow went backwards, always update
206 : * last_pgarch_start_time to match the current clock and allow archiver
207 : * start. Otherwise allow it only once enough time has elapsed.
208 : */
209 54 : if (last_pgarch_start_time == 0 ||
210 0 : curtime < last_pgarch_start_time ||
211 0 : curtime - last_pgarch_start_time >= PGARCH_RESTART_INTERVAL)
212 : {
213 54 : last_pgarch_start_time = curtime;
214 54 : return true;
215 : }
216 0 : return false;
217 : }
218 :
219 :
220 : /* Main entry point for archiver process */
221 : void
222 17 : PgArchiverMain(const void *startup_data, size_t startup_data_len)
223 : {
224 : Assert(startup_data_len == 0);
225 :
226 17 : AuxiliaryProcessMainCommon();
227 :
228 : /*
229 : * Ignore all signals usually bound to some action in the postmaster,
230 : * except for SIGHUP, SIGTERM, SIGUSR1, SIGUSR2, and SIGQUIT.
231 : */
232 17 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
233 17 : pqsignal(SIGINT, SIG_IGN);
234 17 : pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
235 : /* SIGQUIT handler was already set up by InitPostmasterChild */
236 17 : pqsignal(SIGALRM, SIG_IGN);
237 17 : pqsignal(SIGPIPE, SIG_IGN);
238 17 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
239 17 : pqsignal(SIGUSR2, pgarch_waken_stop);
240 :
241 : /* Reset some signals that are accepted by postmaster but not here */
242 17 : pqsignal(SIGCHLD, SIG_DFL);
243 :
244 : /* Unblock signals (they were blocked when the postmaster forked us) */
245 17 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
246 :
247 : /* We shouldn't be launched unnecessarily. */
248 : Assert(XLogArchivingActive());
249 :
250 : /* Arrange to clean up at archiver exit */
251 17 : on_shmem_exit(pgarch_die, 0);
252 :
253 : /*
254 : * Advertise our proc number so that backends can use our latch to wake us
255 : * up while we're sleeping.
256 : */
257 17 : PgArch->pgprocno = MyProcNumber;
258 :
259 : /* Create workspace for pgarch_readyXlog() */
260 17 : arch_files = palloc_object(struct arch_files_state);
261 17 : arch_files->arch_files_size = 0;
262 :
263 : /* Initialize our max-heap for prioritizing files to archive. */
264 17 : arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN,
265 : ready_file_comparator, NULL);
266 :
267 : /* Initialize our memory context. */
268 17 : archive_context = AllocSetContextCreate(TopMemoryContext,
269 : "archiver",
270 : ALLOCSET_DEFAULT_SIZES);
271 :
272 : /* Load the archive_library. */
273 17 : LoadArchiveLibrary();
274 :
275 17 : pgarch_MainLoop();
276 :
277 17 : proc_exit(0);
278 : }
279 :
280 : /*
281 : * Wake up the archiver
282 : */
283 : void
284 469 : PgArchWakeup(void)
285 : {
286 469 : int arch_pgprocno = PgArch->pgprocno;
287 :
288 : /*
289 : * We don't acquire ProcArrayLock here. It's actually fine because
290 : * procLatch isn't ever freed, so we just can potentially set the wrong
291 : * process' (or no process') latch. Even in that case the archiver will
292 : * be relaunched shortly and will start archiving.
293 : */
294 469 : if (arch_pgprocno != INVALID_PROC_NUMBER)
295 450 : SetLatch(&GetPGProcByNumber(arch_pgprocno)->procLatch);
296 469 : }
297 :
298 :
299 : /* SIGUSR2 signal handler for archiver process */
300 : static void
301 17 : pgarch_waken_stop(SIGNAL_ARGS)
302 : {
303 : /* set flag to do a final cycle and shut down afterwards */
304 17 : ready_to_stop = true;
305 17 : SetLatch(MyLatch);
306 17 : }
307 :
308 : /*
309 : * pgarch_MainLoop
310 : *
311 : * Main loop for archiver
312 : */
313 : static void
314 17 : pgarch_MainLoop(void)
315 : {
316 : bool time_to_stop;
317 :
318 : /*
319 : * There shouldn't be anything for the archiver to do except to wait for a
320 : * signal ... however, the archiver exists to protect our data, so it
321 : * wakes up occasionally to allow itself to be proactive.
322 : */
323 : do
324 : {
325 74 : ResetLatch(MyLatch);
326 :
327 : /* When we get SIGUSR2, we do one more archive cycle, then exit */
328 74 : time_to_stop = ready_to_stop;
329 :
330 : /* Check for barrier events and config update */
331 74 : ProcessPgArchInterrupts();
332 :
333 : /*
334 : * If we've gotten SIGTERM, we normally just sit and do nothing until
335 : * SIGUSR2 arrives. However, that means a random SIGTERM would
336 : * disable archiving indefinitely, which doesn't seem like a good
337 : * idea. If more than 60 seconds pass since SIGTERM, exit anyway, so
338 : * that the postmaster can start a new archiver if needed. Also exit
339 : * if time unexpectedly goes backward.
340 : */
341 74 : if (ShutdownRequestPending)
342 : {
343 0 : time_t curtime = time(NULL);
344 :
345 0 : if (last_sigterm_time == 0)
346 0 : last_sigterm_time = curtime;
347 0 : else if (curtime < last_sigterm_time ||
348 0 : curtime - last_sigterm_time >= 60)
349 : break;
350 : }
351 :
352 : /* Do what we're here for */
353 74 : pgarch_ArchiverCopyLoop();
354 :
355 : /*
356 : * Sleep until a signal is received, or until a poll is forced by
357 : * PGARCH_AUTOWAKE_INTERVAL, or until postmaster dies.
358 : */
359 74 : if (!time_to_stop) /* Don't wait during last iteration */
360 : {
361 : int rc;
362 :
363 57 : rc = WaitLatch(MyLatch,
364 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
365 : PGARCH_AUTOWAKE_INTERVAL * 1000L,
366 : WAIT_EVENT_ARCHIVER_MAIN);
367 57 : if (rc & WL_POSTMASTER_DEATH)
368 0 : time_to_stop = true;
369 : }
370 :
371 : /*
372 : * The archiver quits either when the postmaster dies (not expected)
373 : * or after completing one more archiving cycle after receiving
374 : * SIGUSR2.
375 : */
376 74 : } while (!time_to_stop);
377 17 : }
378 :
379 : /*
380 : * pgarch_ArchiverCopyLoop
381 : *
382 : * Archives all outstanding xlogs then returns
383 : */
384 : static void
385 74 : pgarch_ArchiverCopyLoop(void)
386 : {
387 : char xlog[MAX_XFN_CHARS + 1];
388 :
389 : /* force directory scan in the first call to pgarch_readyXlog() */
390 74 : arch_files->arch_files_size = 0;
391 :
392 : /*
393 : * loop through all xlogs with archive_status of .ready and archive
394 : * them...mostly we expect this to be a single file, though it is possible
395 : * some backend will add files onto the list of those that need archiving
396 : * while we are still copying earlier archives
397 : */
398 440 : while (pgarch_readyXlog(xlog))
399 : {
400 370 : int failures = 0;
401 370 : int failures_orphan = 0;
402 :
403 : for (;;)
404 5 : {
405 : struct stat stat_buf;
406 : char pathname[MAXPGPATH];
407 :
408 : /*
409 : * Do not initiate any more archive commands after receiving
410 : * SIGTERM, nor after the postmaster has died unexpectedly. The
411 : * first condition is to try to keep from having init SIGKILL the
412 : * command, and the second is to avoid conflicts with another
413 : * archiver spawned by a newer postmaster.
414 : */
415 375 : if (ShutdownRequestPending || !PostmasterIsAlive())
416 4 : return;
417 :
418 : /*
419 : * Check for barrier events and config update. This is so that
420 : * we'll adopt a new setting for archive_command as soon as
421 : * possible, even if there is a backlog of files to be archived.
422 : */
423 375 : ProcessPgArchInterrupts();
424 :
425 : /* Reset variables that might be set by the callback */
426 375 : arch_module_check_errdetail_string = NULL;
427 :
428 : /* can't do anything if not configured ... */
429 375 : if (ArchiveCallbacks->check_configured_cb != NULL &&
430 375 : !ArchiveCallbacks->check_configured_cb(archive_module_state))
431 : {
432 2 : ereport(WARNING,
433 : (errmsg("\"archive_mode\" enabled, yet archiving is not configured"),
434 : arch_module_check_errdetail_string ?
435 : errdetail_internal("%s", arch_module_check_errdetail_string) : 0));
436 2 : return;
437 : }
438 :
439 : /*
440 : * Since archive status files are not removed in a durable manner,
441 : * a system crash could leave behind .ready files for WAL segments
442 : * that have already been recycled or removed. In this case,
443 : * simply remove the orphan status file and move on. unlink() is
444 : * used here as even on subsequent crashes the same orphan files
445 : * would get removed, so there is no need to worry about
446 : * durability.
447 : */
448 373 : snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
449 373 : if (stat(pathname, &stat_buf) != 0 && errno == ENOENT)
450 0 : {
451 : char xlogready[MAXPGPATH];
452 :
453 0 : StatusFilePath(xlogready, xlog, ".ready");
454 0 : if (unlink(xlogready) == 0)
455 : {
456 0 : ereport(WARNING,
457 : (errmsg("removed orphan archive status file \"%s\"",
458 : xlogready)));
459 :
460 : /* leave loop and move to the next status file */
461 0 : break;
462 : }
463 :
464 0 : if (++failures_orphan >= NUM_ORPHAN_CLEANUP_RETRIES)
465 : {
466 0 : ereport(WARNING,
467 : (errmsg("removal of orphan archive status file \"%s\" failed too many times, will try again later",
468 : xlogready)));
469 :
470 : /* give up cleanup of orphan status files */
471 0 : return;
472 : }
473 :
474 : /* wait a bit before retrying */
475 0 : pg_usleep(1000000L);
476 0 : continue;
477 : }
478 :
479 373 : if (pgarch_archiveXlog(xlog))
480 : {
481 : /* successful */
482 366 : pgarch_archiveDone(xlog);
483 :
484 : /*
485 : * Tell the cumulative stats system about the WAL file that we
486 : * successfully archived
487 : */
488 366 : pgstat_report_archiver(xlog, false);
489 :
490 366 : break; /* out of inner retry loop */
491 : }
492 : else
493 : {
494 : /*
495 : * Tell the cumulative stats system about the WAL file that we
496 : * failed to archive
497 : */
498 7 : pgstat_report_archiver(xlog, true);
499 :
500 7 : if (++failures >= NUM_ARCHIVE_RETRIES)
501 : {
502 2 : ereport(WARNING,
503 : (errmsg("archiving write-ahead log file \"%s\" failed too many times, will try again later",
504 : xlog)));
505 2 : return; /* give up archiving for now */
506 : }
507 5 : pg_usleep(1000000L); /* wait a bit before retrying */
508 : }
509 : }
510 : }
511 : }
512 :
513 : /*
514 : * pgarch_archiveXlog
515 : *
516 : * Invokes archive_file_cb to copy one archive file to wherever it should go
517 : *
518 : * Returns true if successful
519 : */
520 : static bool
521 373 : pgarch_archiveXlog(char *xlog)
522 : {
523 : sigjmp_buf local_sigjmp_buf;
524 : MemoryContext oldcontext;
525 : char pathname[MAXPGPATH];
526 : char activitymsg[MAXFNAMELEN + 16];
527 : bool ret;
528 :
529 373 : snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
530 :
531 : /* Report archive activity in PS display */
532 373 : snprintf(activitymsg, sizeof(activitymsg), "archiving %s", xlog);
533 373 : set_ps_display(activitymsg);
534 :
535 373 : oldcontext = MemoryContextSwitchTo(archive_context);
536 :
537 : /*
538 : * Since the archiver operates at the bottom of the exception stack,
539 : * ERRORs turn into FATALs and cause the archiver process to restart.
540 : * However, using ereport(ERROR, ...) when there are problems is easy to
541 : * code and maintain. Therefore, we create our own exception handler to
542 : * catch ERRORs and return false instead of restarting the archiver
543 : * whenever there is a failure.
544 : *
545 : * We assume ERRORs from the archiving callback are the most common
546 : * exceptions experienced by the archiver, so we opt to handle exceptions
547 : * here instead of PgArchiverMain() to avoid reinitializing the archiver
548 : * too frequently. We could instead add a sigsetjmp() block to
549 : * PgArchiverMain() and use PG_TRY/PG_CATCH here, but the extra code to
550 : * avoid the odd archiver restart doesn't seem worth it.
551 : */
552 373 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
553 : {
554 : /* Since not using PG_TRY, must reset error stack by hand */
555 0 : error_context_stack = NULL;
556 :
557 : /* Prevent interrupts while cleaning up */
558 0 : HOLD_INTERRUPTS();
559 :
560 : /* Report the error to the server log. */
561 0 : EmitErrorReport();
562 :
563 : /*
564 : * Try to clean up anything the archive module left behind. We try to
565 : * cover anything that an archive module could conceivably have left
566 : * behind, but it is of course possible that modules could be doing
567 : * unexpected things that require additional cleanup. Module authors
568 : * should be sure to do any extra required cleanup in a PG_CATCH block
569 : * within the archiving callback, and they are encouraged to notify
570 : * the pgsql-hackers mailing list so that we can add it here.
571 : */
572 0 : disable_all_timeouts(false);
573 0 : LWLockReleaseAll();
574 0 : ConditionVariableCancelSleep();
575 0 : pgstat_report_wait_end();
576 0 : pgaio_error_cleanup();
577 0 : ReleaseAuxProcessResources(false);
578 0 : AtEOXact_Files(false);
579 0 : AtEOXact_HashTables(false);
580 :
581 : /*
582 : * Return to the original memory context and clear ErrorContext for
583 : * next time.
584 : */
585 0 : MemoryContextSwitchTo(oldcontext);
586 0 : FlushErrorState();
587 :
588 : /* Flush any leaked data */
589 0 : MemoryContextReset(archive_context);
590 :
591 : /* Remove our exception handler */
592 0 : PG_exception_stack = NULL;
593 :
594 : /* Now we can allow interrupts again */
595 0 : RESUME_INTERRUPTS();
596 :
597 : /* Report failure so that the archiver retries this file */
598 0 : ret = false;
599 : }
600 : else
601 : {
602 : /* Enable our exception handler */
603 373 : PG_exception_stack = &local_sigjmp_buf;
604 :
605 : /* Archive the file! */
606 373 : ret = ArchiveCallbacks->archive_file_cb(archive_module_state,
607 : xlog, pathname);
608 :
609 : /* Remove our exception handler */
610 373 : PG_exception_stack = NULL;
611 :
612 : /* Reset our memory context and switch back to the original one */
613 373 : MemoryContextSwitchTo(oldcontext);
614 373 : MemoryContextReset(archive_context);
615 : }
616 :
617 373 : if (ret)
618 366 : snprintf(activitymsg, sizeof(activitymsg), "last was %s", xlog);
619 : else
620 7 : snprintf(activitymsg, sizeof(activitymsg), "failed on %s", xlog);
621 373 : set_ps_display(activitymsg);
622 :
623 373 : return ret;
624 : }
625 :
626 : /*
627 : * pgarch_readyXlog
628 : *
629 : * Return name of the oldest xlog file that has not yet been archived.
630 : * No notification is set that file archiving is now in progress, so
631 : * this would need to be extended if multiple concurrent archival
632 : * tasks were created. If a failure occurs, we will completely
633 : * re-copy the file at the next available opportunity.
634 : *
635 : * It is important that we return the oldest, so that we archive xlogs
636 : * in order that they were written, for two reasons:
637 : * 1) to maintain the sequential chain of xlogs required for recovery
638 : * 2) because the oldest ones will sooner become candidates for
639 : * recycling at time of checkpoint
640 : *
641 : * NOTE: the "oldest" comparison will consider any .history file to be older
642 : * than any other file except another .history file. Segments on a timeline
643 : * with a smaller ID will be older than all segments on a timeline with a
644 : * larger ID; the net result being that past timelines are given higher
645 : * priority for archiving. This seems okay, or at least not obviously worth
646 : * changing.
647 : */
648 : static bool
649 440 : pgarch_readyXlog(char *xlog)
650 : {
651 : char XLogArchiveStatusDir[MAXPGPATH];
652 : DIR *rldir;
653 : struct dirent *rlde;
654 :
655 : /*
656 : * If a directory scan was requested, clear the stored file names and
657 : * proceed.
658 : */
659 440 : if (pg_atomic_exchange_u32(&PgArch->force_dir_scan, 0) == 1)
660 3 : arch_files->arch_files_size = 0;
661 :
662 : /*
663 : * If we still have stored file names from the previous directory scan,
664 : * try to return one of those. We check to make sure the status file is
665 : * still present, as the archive_command for a previous file may have
666 : * already marked it done.
667 : */
668 440 : while (arch_files->arch_files_size > 0)
669 : {
670 : struct stat st;
671 : char status_file[MAXPGPATH];
672 : char *arch_file;
673 :
674 314 : arch_files->arch_files_size--;
675 314 : arch_file = arch_files->arch_files[arch_files->arch_files_size];
676 314 : StatusFilePath(status_file, arch_file, ".ready");
677 :
678 314 : if (stat(status_file, &st) == 0)
679 : {
680 314 : strcpy(xlog, arch_file);
681 314 : return true;
682 : }
683 0 : else if (errno != ENOENT)
684 0 : ereport(ERROR,
685 : (errcode_for_file_access(),
686 : errmsg("could not stat file \"%s\": %m", status_file)));
687 : }
688 :
689 : /* arch_heap is probably empty, but let's make sure */
690 126 : binaryheap_reset(arch_files->arch_heap);
691 :
692 : /*
693 : * Open the archive status directory and read through the list of files
694 : * with the .ready suffix, looking for the earliest files.
695 : */
696 126 : snprintf(XLogArchiveStatusDir, MAXPGPATH, XLOGDIR "/archive_status");
697 126 : rldir = AllocateDir(XLogArchiveStatusDir);
698 :
699 1299 : while ((rlde = ReadDir(rldir, XLogArchiveStatusDir)) != NULL)
700 : {
701 1173 : int basenamelen = (int) strlen(rlde->d_name) - 6;
702 : char basename[MAX_XFN_CHARS + 1];
703 : char *arch_file;
704 :
705 : /* Ignore entries with unexpected number of characters */
706 1173 : if (basenamelen < MIN_XFN_CHARS ||
707 : basenamelen > MAX_XFN_CHARS)
708 746 : continue;
709 :
710 : /* Ignore entries with unexpected characters */
711 899 : if (strspn(rlde->d_name, VALID_XFN_CHARS) < basenamelen)
712 0 : continue;
713 :
714 : /* Ignore anything not suffixed with .ready */
715 899 : if (strcmp(rlde->d_name + basenamelen, ".ready") != 0)
716 472 : continue;
717 :
718 : /* Truncate off the .ready */
719 427 : memcpy(basename, rlde->d_name, basenamelen);
720 427 : basename[basenamelen] = '\0';
721 :
722 : /*
723 : * Store the file in our max-heap if it has a high enough priority.
724 : */
725 427 : if (binaryheap_size(arch_files->arch_heap) < NUM_FILES_PER_DIRECTORY_SCAN)
726 : {
727 : /* If the heap isn't full yet, quickly add it. */
728 370 : arch_file = arch_files->arch_filenames[binaryheap_size(arch_files->arch_heap)];
729 370 : strcpy(arch_file, basename);
730 370 : binaryheap_add_unordered(arch_files->arch_heap, CStringGetDatum(arch_file));
731 :
732 : /* If we just filled the heap, make it a valid one. */
733 370 : if (binaryheap_size(arch_files->arch_heap) == NUM_FILES_PER_DIRECTORY_SCAN)
734 2 : binaryheap_build(arch_files->arch_heap);
735 : }
736 57 : else if (ready_file_comparator(binaryheap_first(arch_files->arch_heap),
737 : CStringGetDatum(basename), NULL) > 0)
738 : {
739 : /*
740 : * Remove the lowest priority file and add the current one to the
741 : * heap.
742 : */
743 42 : arch_file = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
744 42 : strcpy(arch_file, basename);
745 42 : binaryheap_add(arch_files->arch_heap, CStringGetDatum(arch_file));
746 : }
747 : }
748 126 : FreeDir(rldir);
749 :
750 : /* If no files were found, simply return. */
751 126 : if (binaryheap_empty(arch_files->arch_heap))
752 70 : return false;
753 :
754 : /*
755 : * If we didn't fill the heap, we didn't make it a valid one. Do that
756 : * now.
757 : */
758 56 : if (binaryheap_size(arch_files->arch_heap) < NUM_FILES_PER_DIRECTORY_SCAN)
759 54 : binaryheap_build(arch_files->arch_heap);
760 :
761 : /*
762 : * Fill arch_files array with the files to archive in ascending order of
763 : * priority.
764 : */
765 56 : arch_files->arch_files_size = binaryheap_size(arch_files->arch_heap);
766 426 : for (int i = 0; i < arch_files->arch_files_size; i++)
767 370 : arch_files->arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
768 :
769 : /* Return the highest priority file. */
770 56 : arch_files->arch_files_size--;
771 56 : strcpy(xlog, arch_files->arch_files[arch_files->arch_files_size]);
772 :
773 56 : return true;
774 : }
775 :
776 : /*
777 : * ready_file_comparator
778 : *
779 : * Compares the archival priority of the given files to archive. If "a"
780 : * has a higher priority than "b", a negative value will be returned. If
781 : * "b" has a higher priority than "a", a positive value will be returned.
782 : * If "a" and "b" have equivalent values, 0 will be returned.
783 : */
784 : static int
785 3008 : ready_file_comparator(Datum a, Datum b, void *arg)
786 : {
787 3008 : char *a_str = DatumGetCString(a);
788 3008 : char *b_str = DatumGetCString(b);
789 3008 : bool a_history = IsTLHistoryFileName(a_str);
790 3008 : bool b_history = IsTLHistoryFileName(b_str);
791 :
792 : /* Timeline history files always have the highest priority. */
793 3008 : if (a_history != b_history)
794 1 : return a_history ? -1 : 1;
795 :
796 : /* Priority is given to older files. */
797 3007 : return strcmp(a_str, b_str);
798 : }
799 :
800 : /*
801 : * PgArchForceDirScan
802 : *
803 : * When called, the next call to pgarch_readyXlog() will perform a
804 : * directory scan. This is useful for ensuring that important files such
805 : * as timeline history files are archived as quickly as possible.
806 : */
807 : void
808 14 : PgArchForceDirScan(void)
809 : {
810 14 : pg_atomic_write_membarrier_u32(&PgArch->force_dir_scan, 1);
811 14 : }
812 :
813 : /*
814 : * pgarch_archiveDone
815 : *
816 : * Emit notification that an xlog file has been successfully archived.
817 : * We do this by renaming the status file from NNN.ready to NNN.done.
818 : * Eventually, a checkpoint process will notice this and delete both the
819 : * NNN.done file and the xlog file itself.
820 : */
821 : static void
822 366 : pgarch_archiveDone(char *xlog)
823 : {
824 : char rlogready[MAXPGPATH];
825 : char rlogdone[MAXPGPATH];
826 :
827 366 : StatusFilePath(rlogready, xlog, ".ready");
828 366 : StatusFilePath(rlogdone, xlog, ".done");
829 :
830 : /*
831 : * To avoid extra overhead, we don't durably rename the .ready file to
832 : * .done. Archive commands and libraries must gracefully handle attempts
833 : * to re-archive files (e.g., if the server crashes just before this
834 : * function is called), so it should be okay if the .ready file reappears
835 : * after a crash.
836 : */
837 366 : if (rename(rlogready, rlogdone) < 0)
838 0 : ereport(WARNING,
839 : (errcode_for_file_access(),
840 : errmsg("could not rename file \"%s\" to \"%s\": %m",
841 : rlogready, rlogdone)));
842 366 : }
843 :
844 :
845 : /*
846 : * pgarch_die
847 : *
848 : * Exit-time cleanup handler
849 : */
850 : static void
851 17 : pgarch_die(int code, Datum arg)
852 : {
853 17 : PgArch->pgprocno = INVALID_PROC_NUMBER;
854 17 : }
855 :
856 : /*
857 : * Interrupt handler for WAL archiver process.
858 : *
859 : * This is called in the loops pgarch_MainLoop and pgarch_ArchiverCopyLoop.
860 : * It checks for barrier events, config update and request for logging of
861 : * memory contexts, but not shutdown request because how to handle
862 : * shutdown request is different between those loops.
863 : */
864 : static void
865 449 : ProcessPgArchInterrupts(void)
866 : {
867 449 : if (ProcSignalBarrierPending)
868 2 : ProcessProcSignalBarrier();
869 :
870 : /* Perform logging of memory contexts of this process */
871 449 : if (LogMemoryContextPending)
872 0 : ProcessLogMemoryContextInterrupt();
873 :
874 449 : if (ConfigReloadPending)
875 : {
876 3 : char *archiveLib = pstrdup(XLogArchiveLibrary);
877 : bool archiveLibChanged;
878 :
879 3 : ConfigReloadPending = false;
880 3 : ProcessConfigFile(PGC_SIGHUP);
881 :
882 3 : if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
883 0 : ereport(ERROR,
884 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
885 : errmsg("both \"archive_command\" and \"archive_library\" set"),
886 : errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
887 :
888 3 : archiveLibChanged = strcmp(XLogArchiveLibrary, archiveLib) != 0;
889 3 : pfree(archiveLib);
890 :
891 3 : if (archiveLibChanged)
892 : {
893 : /*
894 : * Ideally, we would simply unload the previous archive module and
895 : * load the new one, but there is presently no mechanism for
896 : * unloading a library (see the comment above
897 : * internal_load_library()). To deal with this, we simply restart
898 : * the archiver. The new archive module will be loaded when the
899 : * new archiver process starts up. Note that this triggers the
900 : * module's shutdown callback, if defined.
901 : */
902 0 : ereport(LOG,
903 : (errmsg("restarting archiver process because value of "
904 : "\"archive_library\" was changed")));
905 :
906 0 : proc_exit(0);
907 : }
908 : }
909 449 : }
910 :
911 : /*
912 : * LoadArchiveLibrary
913 : *
914 : * Loads the archiving callbacks into our local ArchiveCallbacks.
915 : */
916 : static void
917 17 : LoadArchiveLibrary(void)
918 : {
919 : ArchiveModuleInit archive_init;
920 :
921 17 : if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
922 0 : ereport(ERROR,
923 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
924 : errmsg("both \"archive_command\" and \"archive_library\" set"),
925 : errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
926 :
927 : /*
928 : * If shell archiving is enabled, use our special initialization function.
929 : * Otherwise, load the library and call its _PG_archive_module_init().
930 : */
931 17 : if (XLogArchiveLibrary[0] == '\0')
932 16 : archive_init = shell_archive_init;
933 : else
934 1 : archive_init = (ArchiveModuleInit)
935 1 : load_external_function(XLogArchiveLibrary,
936 : "_PG_archive_module_init", false, NULL);
937 :
938 17 : if (archive_init == NULL)
939 0 : ereport(ERROR,
940 : (errmsg("archive modules have to define the symbol %s", "_PG_archive_module_init")));
941 :
942 17 : ArchiveCallbacks = (*archive_init) ();
943 :
944 17 : if (ArchiveCallbacks->archive_file_cb == NULL)
945 0 : ereport(ERROR,
946 : (errmsg("archive modules must register an archive callback")));
947 :
948 17 : archive_module_state = palloc0_object(ArchiveModuleState);
949 17 : if (ArchiveCallbacks->startup_cb != NULL)
950 0 : ArchiveCallbacks->startup_cb(archive_module_state);
951 :
952 17 : before_shmem_exit(pgarch_call_module_shutdown_cb, 0);
953 17 : }
954 :
955 : /*
956 : * Call the shutdown callback of the loaded archive module, if defined.
957 : */
958 : static void
959 17 : pgarch_call_module_shutdown_cb(int code, Datum arg)
960 : {
961 17 : if (ArchiveCallbacks->shutdown_cb != NULL)
962 16 : ArchiveCallbacks->shutdown_cb(archive_module_state);
963 17 : }
|