Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgarch.c
4 : *
5 : * PostgreSQL WAL archiver
6 : *
7 : * All functions relating to archiver are included here
8 : *
9 : * - All functions executed by archiver process
10 : *
11 : * - archiver is forked from postmaster, and the two
12 : * processes then communicate using signals. All functions
13 : * executed by postmaster are included in this file.
14 : *
15 : * Initial author: Simon Riggs simon@2ndquadrant.com
16 : *
17 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
18 : * Portions Copyright (c) 1994, Regents of the University of California
19 : *
20 : *
21 : * IDENTIFICATION
22 : * src/backend/postmaster/pgarch.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres.h"
27 :
28 : #include <time.h>
29 : #include <sys/stat.h>
30 : #include <unistd.h>
31 :
32 : #include "access/xlog.h"
33 : #include "access/xlog_internal.h"
34 : #include "archive/archive_module.h"
35 : #include "archive/shell_archive.h"
36 : #include "lib/binaryheap.h"
37 : #include "libpq/pqsignal.h"
38 : #include "pgstat.h"
39 : #include "postmaster/auxprocess.h"
40 : #include "postmaster/interrupt.h"
41 : #include "postmaster/pgarch.h"
42 : #include "storage/condition_variable.h"
43 : #include "storage/fd.h"
44 : #include "storage/ipc.h"
45 : #include "storage/latch.h"
46 : #include "storage/pmsignal.h"
47 : #include "storage/proc.h"
48 : #include "storage/procsignal.h"
49 : #include "storage/shmem.h"
50 : #include "utils/guc.h"
51 : #include "utils/memutils.h"
52 : #include "utils/ps_status.h"
53 : #include "utils/resowner.h"
54 : #include "utils/timeout.h"
55 :
56 :
57 : /* ----------
58 : * Timer definitions.
59 : * ----------
60 : */
61 : #define PGARCH_AUTOWAKE_INTERVAL 60 /* How often to force a poll of the
62 : * archive status directory; in seconds. */
63 : #define PGARCH_RESTART_INTERVAL 10 /* How often to attempt to restart a
64 : * failed archiver; in seconds. */
65 :
66 : /*
67 : * Maximum number of retries allowed when attempting to archive a WAL
68 : * file.
69 : */
70 : #define NUM_ARCHIVE_RETRIES 3
71 :
72 : /*
73 : * Maximum number of retries allowed when attempting to remove an
74 : * orphan archive status file.
75 : */
76 : #define NUM_ORPHAN_CLEANUP_RETRIES 3
77 :
78 : /*
79 : * Maximum number of .ready files to gather per directory scan.
80 : */
81 : #define NUM_FILES_PER_DIRECTORY_SCAN 64
82 :
83 : /* Shared memory area for archiver process */
84 : typedef struct PgArchData
85 : {
86 : int pgprocno; /* proc number of archiver process */
87 :
88 : /*
89 : * Forces a directory scan in pgarch_readyXlog().
90 : */
91 : pg_atomic_uint32 force_dir_scan;
92 : } PgArchData;
93 :
94 : char *XLogArchiveLibrary = "";
95 : char *arch_module_check_errdetail_string;
96 :
97 :
98 : /* ----------
99 : * Local data
100 : * ----------
101 : */
102 : static time_t last_sigterm_time = 0;
103 : static PgArchData *PgArch = NULL;
104 : static const ArchiveModuleCallbacks *ArchiveCallbacks;
105 : static ArchiveModuleState *archive_module_state;
106 : static MemoryContext archive_context;
107 :
108 :
109 : /*
110 : * Stuff for tracking multiple files to archive from each scan of
111 : * archive_status. Minimizing the number of directory scans when there are
112 : * many files to archive can significantly improve archival rate.
113 : *
114 : * arch_heap is a max-heap that is used during the directory scan to track
115 : * the highest-priority files to archive. After the directory scan
116 : * completes, the file names are stored in ascending order of priority in
117 : * arch_files. pgarch_readyXlog() returns files from arch_files until it
118 : * is empty, at which point another directory scan must be performed.
119 : *
120 : * We only need this data in the archiver process, so make it a palloc'd
121 : * struct rather than a bunch of static arrays.
122 : */
123 : struct arch_files_state
124 : {
125 : binaryheap *arch_heap;
126 : int arch_files_size; /* number of live entries in arch_files[] */
127 : char *arch_files[NUM_FILES_PER_DIRECTORY_SCAN];
128 : /* buffers underlying heap, and later arch_files[], entries: */
129 : char arch_filenames[NUM_FILES_PER_DIRECTORY_SCAN][MAX_XFN_CHARS + 1];
130 : };
131 :
132 : static struct arch_files_state *arch_files = NULL;
133 :
134 : /*
135 : * Flags set by interrupt handlers for later service in the main loop.
136 : */
137 : static volatile sig_atomic_t ready_to_stop = false;
138 :
139 : /* ----------
140 : * Local function forward declarations
141 : * ----------
142 : */
143 : static void pgarch_waken_stop(SIGNAL_ARGS);
144 : static void pgarch_MainLoop(void);
145 : static void pgarch_ArchiverCopyLoop(void);
146 : static bool pgarch_archiveXlog(char *xlog);
147 : static bool pgarch_readyXlog(char *xlog);
148 : static void pgarch_archiveDone(char *xlog);
149 : static void pgarch_die(int code, Datum arg);
150 : static void HandlePgArchInterrupts(void);
151 : static int ready_file_comparator(Datum a, Datum b, void *arg);
152 : static void LoadArchiveLibrary(void);
153 : static void pgarch_call_module_shutdown_cb(int code, Datum arg);
154 :
155 : /* Report shared memory space needed by PgArchShmemInit */
156 : Size
157 7002 : PgArchShmemSize(void)
158 : {
159 7002 : Size size = 0;
160 :
161 7002 : size = add_size(size, sizeof(PgArchData));
162 :
163 7002 : return size;
164 : }
165 :
166 : /* Allocate and initialize archiver-related shared memory */
167 : void
168 1810 : PgArchShmemInit(void)
169 : {
170 : bool found;
171 :
172 1810 : PgArch = (PgArchData *)
173 1810 : ShmemInitStruct("Archiver Data", PgArchShmemSize(), &found);
174 :
175 1810 : if (!found)
176 : {
177 : /* First time through, so initialize */
178 3620 : MemSet(PgArch, 0, PgArchShmemSize());
179 1810 : PgArch->pgprocno = INVALID_PROC_NUMBER;
180 1810 : pg_atomic_init_u32(&PgArch->force_dir_scan, 0);
181 : }
182 1810 : }
183 :
184 : /*
185 : * PgArchCanRestart
186 : *
187 : * Return true and archiver is allowed to restart if enough time has
188 : * passed since it was launched last to reach PGARCH_RESTART_INTERVAL.
189 : * Otherwise return false.
190 : *
191 : * This is a safety valve to protect against continuous respawn attempts if the
192 : * archiver is dying immediately at launch. Note that since we will retry to
193 : * launch the archiver from the postmaster main loop, we will get another
194 : * chance later.
195 : */
196 : bool
197 90 : PgArchCanRestart(void)
198 : {
199 : static time_t last_pgarch_start_time = 0;
200 90 : time_t curtime = time(NULL);
201 :
202 : /*
203 : * Return false and don't restart archiver if too soon since last archiver
204 : * start.
205 : */
206 90 : if ((unsigned int) (curtime - last_pgarch_start_time) <
207 : (unsigned int) PGARCH_RESTART_INTERVAL)
208 0 : return false;
209 :
210 90 : last_pgarch_start_time = curtime;
211 90 : return true;
212 : }
213 :
214 :
215 : /* Main entry point for archiver process */
216 : void
217 24 : PgArchiverMain(char *startup_data, size_t startup_data_len)
218 : {
219 : Assert(startup_data_len == 0);
220 :
221 24 : MyBackendType = B_ARCHIVER;
222 24 : AuxiliaryProcessMainCommon();
223 :
224 : /*
225 : * Ignore all signals usually bound to some action in the postmaster,
226 : * except for SIGHUP, SIGTERM, SIGUSR1, SIGUSR2, and SIGQUIT.
227 : */
228 24 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
229 24 : pqsignal(SIGINT, SIG_IGN);
230 24 : pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
231 : /* SIGQUIT handler was already set up by InitPostmasterChild */
232 24 : pqsignal(SIGALRM, SIG_IGN);
233 24 : pqsignal(SIGPIPE, SIG_IGN);
234 24 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
235 24 : pqsignal(SIGUSR2, pgarch_waken_stop);
236 :
237 : /* Reset some signals that are accepted by postmaster but not here */
238 24 : pqsignal(SIGCHLD, SIG_DFL);
239 :
240 : /* Unblock signals (they were blocked when the postmaster forked us) */
241 24 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
242 :
243 : /* We shouldn't be launched unnecessarily. */
244 : Assert(XLogArchivingActive());
245 :
246 : /* Arrange to clean up at archiver exit */
247 24 : on_shmem_exit(pgarch_die, 0);
248 :
249 : /*
250 : * Advertise our proc number so that backends can use our latch to wake us
251 : * up while we're sleeping.
252 : */
253 24 : PgArch->pgprocno = MyProcNumber;
254 :
255 : /* Create workspace for pgarch_readyXlog() */
256 24 : arch_files = palloc(sizeof(struct arch_files_state));
257 24 : arch_files->arch_files_size = 0;
258 :
259 : /* Initialize our max-heap for prioritizing files to archive. */
260 24 : arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN,
261 : ready_file_comparator, NULL);
262 :
263 : /* Initialize our memory context. */
264 24 : archive_context = AllocSetContextCreate(TopMemoryContext,
265 : "archiver",
266 : ALLOCSET_DEFAULT_SIZES);
267 :
268 : /* Load the archive_library. */
269 24 : LoadArchiveLibrary();
270 :
271 24 : pgarch_MainLoop();
272 :
273 24 : proc_exit(0);
274 : }
275 :
276 : /*
277 : * Wake up the archiver
278 : */
279 : void
280 230 : PgArchWakeup(void)
281 : {
282 230 : int arch_pgprocno = PgArch->pgprocno;
283 :
284 : /*
285 : * We don't acquire ProcArrayLock here. It's actually fine because
286 : * procLatch isn't ever freed, so we just can potentially set the wrong
287 : * process' (or no process') latch. Even in that case the archiver will
288 : * be relaunched shortly and will start archiving.
289 : */
290 230 : if (arch_pgprocno != INVALID_PROC_NUMBER)
291 198 : SetLatch(&ProcGlobal->allProcs[arch_pgprocno].procLatch);
292 230 : }
293 :
294 :
295 : /* SIGUSR2 signal handler for archiver process */
296 : static void
297 24 : pgarch_waken_stop(SIGNAL_ARGS)
298 : {
299 : /* set flag to do a final cycle and shut down afterwards */
300 24 : ready_to_stop = true;
301 24 : SetLatch(MyLatch);
302 24 : }
303 :
304 : /*
305 : * pgarch_MainLoop
306 : *
307 : * Main loop for archiver
308 : */
309 : static void
310 106 : pgarch_MainLoop(void)
311 : {
312 : bool time_to_stop;
313 :
314 : /*
315 : * There shouldn't be anything for the archiver to do except to wait for a
316 : * signal ... however, the archiver exists to protect our data, so it
317 : * wakes up occasionally to allow itself to be proactive.
318 : */
319 : do
320 : {
321 106 : ResetLatch(MyLatch);
322 :
323 : /* When we get SIGUSR2, we do one more archive cycle, then exit */
324 106 : time_to_stop = ready_to_stop;
325 :
326 : /* Check for barrier events and config update */
327 106 : HandlePgArchInterrupts();
328 :
329 : /*
330 : * If we've gotten SIGTERM, we normally just sit and do nothing until
331 : * SIGUSR2 arrives. However, that means a random SIGTERM would
332 : * disable archiving indefinitely, which doesn't seem like a good
333 : * idea. If more than 60 seconds pass since SIGTERM, exit anyway, so
334 : * that the postmaster can start a new archiver if needed.
335 : */
336 106 : if (ShutdownRequestPending)
337 : {
338 0 : time_t curtime = time(NULL);
339 :
340 0 : if (last_sigterm_time == 0)
341 0 : last_sigterm_time = curtime;
342 0 : else if ((unsigned int) (curtime - last_sigterm_time) >=
343 : (unsigned int) 60)
344 0 : break;
345 : }
346 :
347 : /* Do what we're here for */
348 106 : pgarch_ArchiverCopyLoop();
349 :
350 : /*
351 : * Sleep until a signal is received, or until a poll is forced by
352 : * PGARCH_AUTOWAKE_INTERVAL, or until postmaster dies.
353 : */
354 106 : if (!time_to_stop) /* Don't wait during last iteration */
355 : {
356 : int rc;
357 :
358 82 : rc = WaitLatch(MyLatch,
359 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
360 : PGARCH_AUTOWAKE_INTERVAL * 1000L,
361 : WAIT_EVENT_ARCHIVER_MAIN);
362 82 : if (rc & WL_POSTMASTER_DEATH)
363 0 : time_to_stop = true;
364 : }
365 :
366 : /*
367 : * The archiver quits either when the postmaster dies (not expected)
368 : * or after completing one more archiving cycle after receiving
369 : * SIGUSR2.
370 : */
371 106 : } while (!time_to_stop);
372 24 : }
373 :
374 : /*
375 : * pgarch_ArchiverCopyLoop
376 : *
377 : * Archives all outstanding xlogs then returns
378 : */
379 : static void
380 106 : pgarch_ArchiverCopyLoop(void)
381 : {
382 : char xlog[MAX_XFN_CHARS + 1];
383 :
384 : /* force directory scan in the first call to pgarch_readyXlog() */
385 106 : arch_files->arch_files_size = 0;
386 :
387 : /*
388 : * loop through all xlogs with archive_status of .ready and archive
389 : * them...mostly we expect this to be a single file, though it is possible
390 : * some backend will add files onto the list of those that need archiving
391 : * while we are still copying earlier archives
392 : */
393 164 : while (pgarch_readyXlog(xlog))
394 : {
395 62 : int failures = 0;
396 62 : int failures_orphan = 0;
397 :
398 : for (;;)
399 2 : {
400 : struct stat stat_buf;
401 : char pathname[MAXPGPATH];
402 :
403 : /*
404 : * Do not initiate any more archive commands after receiving
405 : * SIGTERM, nor after the postmaster has died unexpectedly. The
406 : * first condition is to try to keep from having init SIGKILL the
407 : * command, and the second is to avoid conflicts with another
408 : * archiver spawned by a newer postmaster.
409 : */
410 64 : if (ShutdownRequestPending || !PostmasterIsAlive())
411 4 : return;
412 :
413 : /*
414 : * Check for barrier events and config update. This is so that
415 : * we'll adopt a new setting for archive_command as soon as
416 : * possible, even if there is a backlog of files to be archived.
417 : */
418 64 : HandlePgArchInterrupts();
419 :
420 : /* Reset variables that might be set by the callback */
421 64 : arch_module_check_errdetail_string = NULL;
422 :
423 : /* can't do anything if not configured ... */
424 64 : if (ArchiveCallbacks->check_configured_cb != NULL &&
425 64 : !ArchiveCallbacks->check_configured_cb(archive_module_state))
426 : {
427 4 : ereport(WARNING,
428 : (errmsg("\"archive_mode\" enabled, yet archiving is not configured"),
429 : arch_module_check_errdetail_string ?
430 : errdetail_internal("%s", arch_module_check_errdetail_string) : 0));
431 4 : return;
432 : }
433 :
434 : /*
435 : * Since archive status files are not removed in a durable manner,
436 : * a system crash could leave behind .ready files for WAL segments
437 : * that have already been recycled or removed. In this case,
438 : * simply remove the orphan status file and move on. unlink() is
439 : * used here as even on subsequent crashes the same orphan files
440 : * would get removed, so there is no need to worry about
441 : * durability.
442 : */
443 60 : snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
444 60 : if (stat(pathname, &stat_buf) != 0 && errno == ENOENT)
445 : {
446 : char xlogready[MAXPGPATH];
447 :
448 0 : StatusFilePath(xlogready, xlog, ".ready");
449 0 : if (unlink(xlogready) == 0)
450 : {
451 0 : ereport(WARNING,
452 : (errmsg("removed orphan archive status file \"%s\"",
453 : xlogready)));
454 :
455 : /* leave loop and move to the next status file */
456 0 : break;
457 : }
458 :
459 0 : if (++failures_orphan >= NUM_ORPHAN_CLEANUP_RETRIES)
460 : {
461 0 : ereport(WARNING,
462 : (errmsg("removal of orphan archive status file \"%s\" failed too many times, will try again later",
463 : xlogready)));
464 :
465 : /* give up cleanup of orphan status files */
466 0 : return;
467 : }
468 :
469 : /* wait a bit before retrying */
470 0 : pg_usleep(1000000L);
471 0 : continue;
472 : }
473 :
474 60 : if (pgarch_archiveXlog(xlog))
475 : {
476 : /* successful */
477 58 : pgarch_archiveDone(xlog);
478 :
479 : /*
480 : * Tell the cumulative stats system about the WAL file that we
481 : * successfully archived
482 : */
483 58 : pgstat_report_archiver(xlog, false);
484 :
485 58 : break; /* out of inner retry loop */
486 : }
487 : else
488 : {
489 : /*
490 : * Tell the cumulative stats system about the WAL file that we
491 : * failed to archive
492 : */
493 2 : pgstat_report_archiver(xlog, true);
494 :
495 2 : if (++failures >= NUM_ARCHIVE_RETRIES)
496 : {
497 0 : ereport(WARNING,
498 : (errmsg("archiving write-ahead log file \"%s\" failed too many times, will try again later",
499 : xlog)));
500 0 : return; /* give up archiving for now */
501 : }
502 2 : pg_usleep(1000000L); /* wait a bit before retrying */
503 : }
504 : }
505 : }
506 : }
507 :
508 : /*
509 : * pgarch_archiveXlog
510 : *
511 : * Invokes archive_file_cb to copy one archive file to wherever it should go
512 : *
513 : * Returns true if successful
514 : */
515 : static bool
516 60 : pgarch_archiveXlog(char *xlog)
517 : {
518 : sigjmp_buf local_sigjmp_buf;
519 : MemoryContext oldcontext;
520 : char pathname[MAXPGPATH];
521 : char activitymsg[MAXFNAMELEN + 16];
522 : bool ret;
523 :
524 60 : snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
525 :
526 : /* Report archive activity in PS display */
527 60 : snprintf(activitymsg, sizeof(activitymsg), "archiving %s", xlog);
528 60 : set_ps_display(activitymsg);
529 :
530 60 : oldcontext = MemoryContextSwitchTo(archive_context);
531 :
532 : /*
533 : * Since the archiver operates at the bottom of the exception stack,
534 : * ERRORs turn into FATALs and cause the archiver process to restart.
535 : * However, using ereport(ERROR, ...) when there are problems is easy to
536 : * code and maintain. Therefore, we create our own exception handler to
537 : * catch ERRORs and return false instead of restarting the archiver
538 : * whenever there is a failure.
539 : *
540 : * We assume ERRORs from the archiving callback are the most common
541 : * exceptions experienced by the archiver, so we opt to handle exceptions
542 : * here instead of PgArchiverMain() to avoid reinitializing the archiver
543 : * too frequently. We could instead add a sigsetjmp() block to
544 : * PgArchiverMain() and use PG_TRY/PG_CATCH here, but the extra code to
545 : * avoid the odd archiver restart doesn't seem worth it.
546 : */
547 60 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
548 : {
549 : /* Since not using PG_TRY, must reset error stack by hand */
550 0 : error_context_stack = NULL;
551 :
552 : /* Prevent interrupts while cleaning up */
553 0 : HOLD_INTERRUPTS();
554 :
555 : /* Report the error to the server log. */
556 0 : EmitErrorReport();
557 :
558 : /*
559 : * Try to clean up anything the archive module left behind. We try to
560 : * cover anything that an archive module could conceivably have left
561 : * behind, but it is of course possible that modules could be doing
562 : * unexpected things that require additional cleanup. Module authors
563 : * should be sure to do any extra required cleanup in a PG_CATCH block
564 : * within the archiving callback, and they are encouraged to notify
565 : * the pgsql-hackers mailing list so that we can add it here.
566 : */
567 0 : disable_all_timeouts(false);
568 0 : LWLockReleaseAll();
569 0 : ConditionVariableCancelSleep();
570 0 : pgstat_report_wait_end();
571 0 : ReleaseAuxProcessResources(false);
572 0 : AtEOXact_Files(false);
573 0 : AtEOXact_HashTables(false);
574 :
575 : /*
576 : * Return to the original memory context and clear ErrorContext for
577 : * next time.
578 : */
579 0 : MemoryContextSwitchTo(oldcontext);
580 0 : FlushErrorState();
581 :
582 : /* Flush any leaked data */
583 0 : MemoryContextReset(archive_context);
584 :
585 : /* Remove our exception handler */
586 0 : PG_exception_stack = NULL;
587 :
588 : /* Now we can allow interrupts again */
589 0 : RESUME_INTERRUPTS();
590 :
591 : /* Report failure so that the archiver retries this file */
592 0 : ret = false;
593 : }
594 : else
595 : {
596 : /* Enable our exception handler */
597 60 : PG_exception_stack = &local_sigjmp_buf;
598 :
599 : /* Archive the file! */
600 60 : ret = ArchiveCallbacks->archive_file_cb(archive_module_state,
601 : xlog, pathname);
602 :
603 : /* Remove our exception handler */
604 60 : PG_exception_stack = NULL;
605 :
606 : /* Reset our memory context and switch back to the original one */
607 60 : MemoryContextSwitchTo(oldcontext);
608 60 : MemoryContextReset(archive_context);
609 : }
610 :
611 60 : if (ret)
612 58 : snprintf(activitymsg, sizeof(activitymsg), "last was %s", xlog);
613 : else
614 2 : snprintf(activitymsg, sizeof(activitymsg), "failed on %s", xlog);
615 60 : set_ps_display(activitymsg);
616 :
617 60 : return ret;
618 : }
619 :
620 : /*
621 : * pgarch_readyXlog
622 : *
623 : * Return name of the oldest xlog file that has not yet been archived.
624 : * No notification is set that file archiving is now in progress, so
625 : * this would need to be extended if multiple concurrent archival
626 : * tasks were created. If a failure occurs, we will completely
627 : * re-copy the file at the next available opportunity.
628 : *
629 : * It is important that we return the oldest, so that we archive xlogs
630 : * in order that they were written, for two reasons:
631 : * 1) to maintain the sequential chain of xlogs required for recovery
632 : * 2) because the oldest ones will sooner become candidates for
633 : * recycling at time of checkpoint
634 : *
635 : * NOTE: the "oldest" comparison will consider any .history file to be older
636 : * than any other file except another .history file. Segments on a timeline
637 : * with a smaller ID will be older than all segments on a timeline with a
638 : * larger ID; the net result being that past timelines are given higher
639 : * priority for archiving. This seems okay, or at least not obviously worth
640 : * changing.
641 : */
642 : static bool
643 164 : pgarch_readyXlog(char *xlog)
644 : {
645 : char XLogArchiveStatusDir[MAXPGPATH];
646 : DIR *rldir;
647 : struct dirent *rlde;
648 :
649 : /*
650 : * If a directory scan was requested, clear the stored file names and
651 : * proceed.
652 : */
653 164 : if (pg_atomic_exchange_u32(&PgArch->force_dir_scan, 0) == 1)
654 4 : arch_files->arch_files_size = 0;
655 :
656 : /*
657 : * If we still have stored file names from the previous directory scan,
658 : * try to return one of those. We check to make sure the status file is
659 : * still present, as the archive_command for a previous file may have
660 : * already marked it done.
661 : */
662 164 : while (arch_files->arch_files_size > 0)
663 : {
664 : struct stat st;
665 : char status_file[MAXPGPATH];
666 : char *arch_file;
667 :
668 2 : arch_files->arch_files_size--;
669 2 : arch_file = arch_files->arch_files[arch_files->arch_files_size];
670 2 : StatusFilePath(status_file, arch_file, ".ready");
671 :
672 2 : if (stat(status_file, &st) == 0)
673 : {
674 2 : strcpy(xlog, arch_file);
675 2 : return true;
676 : }
677 0 : else if (errno != ENOENT)
678 0 : ereport(ERROR,
679 : (errcode_for_file_access(),
680 : errmsg("could not stat file \"%s\": %m", status_file)));
681 : }
682 :
683 : /* arch_heap is probably empty, but let's make sure */
684 162 : binaryheap_reset(arch_files->arch_heap);
685 :
686 : /*
687 : * Open the archive status directory and read through the list of files
688 : * with the .ready suffix, looking for the earliest files.
689 : */
690 162 : snprintf(XLogArchiveStatusDir, MAXPGPATH, XLOGDIR "/archive_status");
691 162 : rldir = AllocateDir(XLogArchiveStatusDir);
692 :
693 802 : while ((rlde = ReadDir(rldir, XLogArchiveStatusDir)) != NULL)
694 : {
695 640 : int basenamelen = (int) strlen(rlde->d_name) - 6;
696 : char basename[MAX_XFN_CHARS + 1];
697 : char *arch_file;
698 :
699 : /* Ignore entries with unexpected number of characters */
700 640 : if (basenamelen < MIN_XFN_CHARS ||
701 : basenamelen > MAX_XFN_CHARS)
702 578 : continue;
703 :
704 : /* Ignore entries with unexpected characters */
705 280 : if (strspn(rlde->d_name, VALID_XFN_CHARS) < basenamelen)
706 0 : continue;
707 :
708 : /* Ignore anything not suffixed with .ready */
709 280 : if (strcmp(rlde->d_name + basenamelen, ".ready") != 0)
710 218 : continue;
711 :
712 : /* Truncate off the .ready */
713 62 : memcpy(basename, rlde->d_name, basenamelen);
714 62 : basename[basenamelen] = '\0';
715 :
716 : /*
717 : * Store the file in our max-heap if it has a high enough priority.
718 : */
719 62 : if (arch_files->arch_heap->bh_size < NUM_FILES_PER_DIRECTORY_SCAN)
720 : {
721 : /* If the heap isn't full yet, quickly add it. */
722 62 : arch_file = arch_files->arch_filenames[arch_files->arch_heap->bh_size];
723 62 : strcpy(arch_file, basename);
724 62 : binaryheap_add_unordered(arch_files->arch_heap, CStringGetDatum(arch_file));
725 :
726 : /* If we just filled the heap, make it a valid one. */
727 62 : if (arch_files->arch_heap->bh_size == NUM_FILES_PER_DIRECTORY_SCAN)
728 0 : binaryheap_build(arch_files->arch_heap);
729 : }
730 0 : else if (ready_file_comparator(binaryheap_first(arch_files->arch_heap),
731 : CStringGetDatum(basename), NULL) > 0)
732 : {
733 : /*
734 : * Remove the lowest priority file and add the current one to the
735 : * heap.
736 : */
737 0 : arch_file = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
738 0 : strcpy(arch_file, basename);
739 0 : binaryheap_add(arch_files->arch_heap, CStringGetDatum(arch_file));
740 : }
741 : }
742 162 : FreeDir(rldir);
743 :
744 : /* If no files were found, simply return. */
745 162 : if (arch_files->arch_heap->bh_size == 0)
746 102 : return false;
747 :
748 : /*
749 : * If we didn't fill the heap, we didn't make it a valid one. Do that
750 : * now.
751 : */
752 60 : if (arch_files->arch_heap->bh_size < NUM_FILES_PER_DIRECTORY_SCAN)
753 60 : binaryheap_build(arch_files->arch_heap);
754 :
755 : /*
756 : * Fill arch_files array with the files to archive in ascending order of
757 : * priority.
758 : */
759 60 : arch_files->arch_files_size = arch_files->arch_heap->bh_size;
760 122 : for (int i = 0; i < arch_files->arch_files_size; i++)
761 62 : arch_files->arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
762 :
763 : /* Return the highest priority file. */
764 60 : arch_files->arch_files_size--;
765 60 : strcpy(xlog, arch_files->arch_files[arch_files->arch_files_size]);
766 :
767 60 : return true;
768 : }
769 :
770 : /*
771 : * ready_file_comparator
772 : *
773 : * Compares the archival priority of the given files to archive. If "a"
774 : * has a higher priority than "b", a negative value will be returned. If
775 : * "b" has a higher priority than "a", a positive value will be returned.
776 : * If "a" and "b" have equivalent values, 0 will be returned.
777 : */
778 : static int
779 2 : ready_file_comparator(Datum a, Datum b, void *arg)
780 : {
781 2 : char *a_str = DatumGetCString(a);
782 2 : char *b_str = DatumGetCString(b);
783 2 : bool a_history = IsTLHistoryFileName(a_str);
784 2 : bool b_history = IsTLHistoryFileName(b_str);
785 :
786 : /* Timeline history files always have the highest priority. */
787 2 : if (a_history != b_history)
788 0 : return a_history ? -1 : 1;
789 :
790 : /* Priority is given to older files. */
791 2 : return strcmp(a_str, b_str);
792 : }
793 :
794 : /*
795 : * PgArchForceDirScan
796 : *
797 : * When called, the next call to pgarch_readyXlog() will perform a
798 : * directory scan. This is useful for ensuring that important files such
799 : * as timeline history files are archived as quickly as possible.
800 : */
801 : void
802 24 : PgArchForceDirScan(void)
803 : {
804 24 : pg_atomic_write_membarrier_u32(&PgArch->force_dir_scan, 1);
805 24 : }
806 :
807 : /*
808 : * pgarch_archiveDone
809 : *
810 : * Emit notification that an xlog file has been successfully archived.
811 : * We do this by renaming the status file from NNN.ready to NNN.done.
812 : * Eventually, a checkpoint process will notice this and delete both the
813 : * NNN.done file and the xlog file itself.
814 : */
815 : static void
816 58 : pgarch_archiveDone(char *xlog)
817 : {
818 : char rlogready[MAXPGPATH];
819 : char rlogdone[MAXPGPATH];
820 :
821 58 : StatusFilePath(rlogready, xlog, ".ready");
822 58 : StatusFilePath(rlogdone, xlog, ".done");
823 :
824 : /*
825 : * To avoid extra overhead, we don't durably rename the .ready file to
826 : * .done. Archive commands and libraries must gracefully handle attempts
827 : * to re-archive files (e.g., if the server crashes just before this
828 : * function is called), so it should be okay if the .ready file reappears
829 : * after a crash.
830 : */
831 58 : if (rename(rlogready, rlogdone) < 0)
832 0 : ereport(WARNING,
833 : (errcode_for_file_access(),
834 : errmsg("could not rename file \"%s\" to \"%s\": %m",
835 : rlogready, rlogdone)));
836 58 : }
837 :
838 :
839 : /*
840 : * pgarch_die
841 : *
842 : * Exit-time cleanup handler
843 : */
844 : static void
845 24 : pgarch_die(int code, Datum arg)
846 : {
847 24 : PgArch->pgprocno = INVALID_PROC_NUMBER;
848 24 : }
849 :
850 : /*
851 : * Interrupt handler for WAL archiver process.
852 : *
853 : * This is called in the loops pgarch_MainLoop and pgarch_ArchiverCopyLoop.
854 : * It checks for barrier events, config update and request for logging of
855 : * memory contexts, but not shutdown request because how to handle
856 : * shutdown request is different between those loops.
857 : */
858 : static void
859 170 : HandlePgArchInterrupts(void)
860 : {
861 170 : if (ProcSignalBarrierPending)
862 2 : ProcessProcSignalBarrier();
863 :
864 : /* Perform logging of memory contexts of this process */
865 170 : if (LogMemoryContextPending)
866 0 : ProcessLogMemoryContextInterrupt();
867 :
868 170 : if (ConfigReloadPending)
869 : {
870 4 : char *archiveLib = pstrdup(XLogArchiveLibrary);
871 : bool archiveLibChanged;
872 :
873 4 : ConfigReloadPending = false;
874 4 : ProcessConfigFile(PGC_SIGHUP);
875 :
876 4 : if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
877 0 : ereport(ERROR,
878 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
879 : errmsg("both \"archive_command\" and \"archive_library\" set"),
880 : errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
881 :
882 4 : archiveLibChanged = strcmp(XLogArchiveLibrary, archiveLib) != 0;
883 4 : pfree(archiveLib);
884 :
885 4 : if (archiveLibChanged)
886 : {
887 : /*
888 : * Ideally, we would simply unload the previous archive module and
889 : * load the new one, but there is presently no mechanism for
890 : * unloading a library (see the comment above
891 : * internal_load_library()). To deal with this, we simply restart
892 : * the archiver. The new archive module will be loaded when the
893 : * new archiver process starts up. Note that this triggers the
894 : * module's shutdown callback, if defined.
895 : */
896 0 : ereport(LOG,
897 : (errmsg("restarting archiver process because value of "
898 : "\"archive_library\" was changed")));
899 :
900 0 : proc_exit(0);
901 : }
902 : }
903 170 : }
904 :
905 : /*
906 : * LoadArchiveLibrary
907 : *
908 : * Loads the archiving callbacks into our local ArchiveCallbacks.
909 : */
910 : static void
911 24 : LoadArchiveLibrary(void)
912 : {
913 : ArchiveModuleInit archive_init;
914 :
915 24 : if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
916 0 : ereport(ERROR,
917 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
918 : errmsg("both \"archive_command\" and \"archive_library\" set"),
919 : errdetail("Only one of \"archive_command\", \"archive_library\" may be set.")));
920 :
921 : /*
922 : * If shell archiving is enabled, use our special initialization function.
923 : * Otherwise, load the library and call its _PG_archive_module_init().
924 : */
925 24 : if (XLogArchiveLibrary[0] == '\0')
926 22 : archive_init = shell_archive_init;
927 : else
928 2 : archive_init = (ArchiveModuleInit)
929 2 : load_external_function(XLogArchiveLibrary,
930 : "_PG_archive_module_init", false, NULL);
931 :
932 24 : if (archive_init == NULL)
933 0 : ereport(ERROR,
934 : (errmsg("archive modules have to define the symbol %s", "_PG_archive_module_init")));
935 :
936 24 : ArchiveCallbacks = (*archive_init) ();
937 :
938 24 : if (ArchiveCallbacks->archive_file_cb == NULL)
939 0 : ereport(ERROR,
940 : (errmsg("archive modules must register an archive callback")));
941 :
942 24 : archive_module_state = (ArchiveModuleState *) palloc0(sizeof(ArchiveModuleState));
943 24 : if (ArchiveCallbacks->startup_cb != NULL)
944 0 : ArchiveCallbacks->startup_cb(archive_module_state);
945 :
946 24 : before_shmem_exit(pgarch_call_module_shutdown_cb, 0);
947 24 : }
948 :
949 : /*
950 : * Call the shutdown callback of the loaded archive module, if defined.
951 : */
952 : static void
953 24 : pgarch_call_module_shutdown_cb(int code, Datum arg)
954 : {
955 24 : if (ArchiveCallbacks->shutdown_cb != NULL)
956 22 : ArchiveCallbacks->shutdown_cb(archive_module_state);
957 24 : }
|