Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * local_source.c
4 : * Functions for using a local data directory as the source.
5 : *
6 : * Portions Copyright (c) 2013-2024, PostgreSQL Global Development Group
7 : *
8 : *-------------------------------------------------------------------------
9 : */
10 : #include "postgres_fe.h"
11 :
12 : #include <fcntl.h>
13 : #include <unistd.h>
14 :
15 : #include "datapagemap.h"
16 : #include "file_ops.h"
17 : #include "filemap.h"
18 : #include "pg_rewind.h"
19 : #include "rewind_source.h"
20 :
21 : typedef struct
22 : {
23 : rewind_source common; /* common interface functions */
24 :
25 : const char *datadir; /* path to the source data directory */
26 : } local_source;
27 :
28 : static void local_traverse_files(rewind_source *source,
29 : process_file_callback_t callback);
30 : static char *local_fetch_file(rewind_source *source, const char *path,
31 : size_t *filesize);
32 : static void local_queue_fetch_file(rewind_source *source, const char *path,
33 : size_t len);
34 : static void local_queue_fetch_range(rewind_source *source, const char *path,
35 : off_t off, size_t len);
36 : static void local_finish_fetch(rewind_source *source);
37 : static void local_destroy(rewind_source *source);
38 :
39 : rewind_source *
40 22 : init_local_source(const char *datadir)
41 : {
42 : local_source *src;
43 :
44 22 : src = pg_malloc0(sizeof(local_source));
45 :
46 22 : src->common.traverse_files = local_traverse_files;
47 22 : src->common.fetch_file = local_fetch_file;
48 22 : src->common.queue_fetch_file = local_queue_fetch_file;
49 22 : src->common.queue_fetch_range = local_queue_fetch_range;
50 22 : src->common.finish_fetch = local_finish_fetch;
51 22 : src->common.get_current_wal_insert_lsn = NULL;
52 22 : src->common.destroy = local_destroy;
53 :
54 22 : src->datadir = datadir;
55 :
56 22 : return &src->common;
57 : }
58 :
59 : static void
60 14 : local_traverse_files(rewind_source *source, process_file_callback_t callback)
61 : {
62 14 : traverse_datadir(((local_source *) source)->datadir, callback);
63 14 : }
64 :
65 : static char *
66 46 : local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
67 : {
68 46 : return slurpFile(((local_source *) source)->datadir, path, filesize);
69 : }
70 :
71 : /*
72 : * Copy a file from source to target.
73 : *
74 : * 'len' is the expected length of the file.
75 : */
76 : static void
77 4438 : local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
78 : {
79 4438 : const char *datadir = ((local_source *) source)->datadir;
80 : PGIOAlignedBlock buf;
81 : char srcpath[MAXPGPATH];
82 : int srcfd;
83 : size_t written_len;
84 :
85 4438 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
86 :
87 : /* Open source file for reading */
88 4438 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
89 4438 : if (srcfd < 0)
90 0 : pg_fatal("could not open source file \"%s\": %m",
91 : srcpath);
92 :
93 : /* Truncate and open the target file for writing */
94 4438 : open_target_file(path, true);
95 :
96 4438 : written_len = 0;
97 : for (;;)
98 95276 : {
99 : ssize_t read_len;
100 :
101 99714 : read_len = read(srcfd, buf.data, sizeof(buf));
102 :
103 99714 : if (read_len < 0)
104 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
105 99714 : else if (read_len == 0)
106 4438 : break; /* EOF reached */
107 :
108 95276 : write_target_range(buf.data, written_len, read_len);
109 95276 : written_len += read_len;
110 : }
111 :
112 : /*
113 : * A local source is not expected to change while we're rewinding, so
114 : * check that the size of the file matches our earlier expectation.
115 : */
116 4438 : if (written_len != len)
117 2 : pg_fatal("size of source file \"%s\" changed concurrently: %d bytes expected, %d copied",
118 : srcpath, (int) len, (int) written_len);
119 :
120 4436 : if (close(srcfd) != 0)
121 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
122 4436 : }
123 :
124 : /*
125 : * Copy a file from source to target, starting at 'off', for 'len' bytes.
126 : */
127 : static void
128 1644 : local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
129 : size_t len)
130 : {
131 1644 : const char *datadir = ((local_source *) source)->datadir;
132 : PGIOAlignedBlock buf;
133 : char srcpath[MAXPGPATH];
134 : int srcfd;
135 1644 : off_t begin = off;
136 1644 : off_t end = off + len;
137 :
138 1644 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
139 :
140 1644 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
141 1644 : if (srcfd < 0)
142 0 : pg_fatal("could not open source file \"%s\": %m",
143 : srcpath);
144 :
145 1644 : if (lseek(srcfd, begin, SEEK_SET) == -1)
146 0 : pg_fatal("could not seek in source file: %m");
147 :
148 1644 : open_target_file(path, false);
149 :
150 3780 : while (end - begin > 0)
151 : {
152 : ssize_t readlen;
153 : size_t thislen;
154 :
155 2136 : if (end - begin > sizeof(buf))
156 492 : thislen = sizeof(buf);
157 : else
158 1644 : thislen = end - begin;
159 :
160 2136 : readlen = read(srcfd, buf.data, thislen);
161 :
162 2136 : if (readlen < 0)
163 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
164 2136 : else if (readlen == 0)
165 0 : pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
166 :
167 2136 : write_target_range(buf.data, begin, readlen);
168 2136 : begin += readlen;
169 : }
170 :
171 1644 : if (close(srcfd) != 0)
172 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
173 1644 : }
174 :
175 : static void
176 12 : local_finish_fetch(rewind_source *source)
177 : {
178 : /*
179 : * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
180 : */
181 12 : }
182 :
183 : static void
184 12 : local_destroy(rewind_source *source)
185 : {
186 12 : pfree(source);
187 12 : }
|