Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * local_source.c
4 : * Functions for using a local data directory as the source.
5 : *
6 : * Portions Copyright (c) 2013-2024, PostgreSQL Global Development Group
7 : *
8 : *-------------------------------------------------------------------------
9 : */
10 : #include "postgres_fe.h"
11 :
12 : #include <fcntl.h>
13 : #include <unistd.h>
14 :
15 : #include "common/logging.h"
16 : #include "file_ops.h"
17 : #include "rewind_source.h"
18 :
19 : typedef struct
20 : {
21 : rewind_source common; /* common interface functions */
22 :
23 : const char *datadir; /* path to the source data directory */
24 : } local_source;
25 :
26 : static void local_traverse_files(rewind_source *source,
27 : process_file_callback_t callback);
28 : static char *local_fetch_file(rewind_source *source, const char *path,
29 : size_t *filesize);
30 : static void local_queue_fetch_file(rewind_source *source, const char *path,
31 : size_t len);
32 : static void local_queue_fetch_range(rewind_source *source, const char *path,
33 : off_t off, size_t len);
34 : static void local_finish_fetch(rewind_source *source);
35 : static void local_destroy(rewind_source *source);
36 :
37 : rewind_source *
38 24 : init_local_source(const char *datadir)
39 : {
40 : local_source *src;
41 :
42 24 : src = pg_malloc0(sizeof(local_source));
43 :
44 24 : src->common.traverse_files = local_traverse_files;
45 24 : src->common.fetch_file = local_fetch_file;
46 24 : src->common.queue_fetch_file = local_queue_fetch_file;
47 24 : src->common.queue_fetch_range = local_queue_fetch_range;
48 24 : src->common.finish_fetch = local_finish_fetch;
49 24 : src->common.get_current_wal_insert_lsn = NULL;
50 24 : src->common.destroy = local_destroy;
51 :
52 24 : src->datadir = datadir;
53 :
54 24 : return &src->common;
55 : }
56 :
57 : static void
58 16 : local_traverse_files(rewind_source *source, process_file_callback_t callback)
59 : {
60 16 : traverse_datadir(((local_source *) source)->datadir, callback);
61 16 : }
62 :
63 : static char *
64 52 : local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
65 : {
66 52 : return slurpFile(((local_source *) source)->datadir, path, filesize);
67 : }
68 :
69 : /*
70 : * Copy a file from source to target.
71 : *
72 : * 'len' is the expected length of the file.
73 : */
74 : static void
75 4954 : local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
76 : {
77 4954 : const char *datadir = ((local_source *) source)->datadir;
78 : PGIOAlignedBlock buf;
79 : char srcpath[MAXPGPATH];
80 : int srcfd;
81 : size_t written_len;
82 :
83 4954 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
84 :
85 : /* Open source file for reading */
86 4954 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
87 4954 : if (srcfd < 0)
88 0 : pg_fatal("could not open source file \"%s\": %m",
89 : srcpath);
90 :
91 : /* Truncate and open the target file for writing */
92 4954 : open_target_file(path, true);
93 :
94 4954 : written_len = 0;
95 : for (;;)
96 112656 : {
97 : ssize_t read_len;
98 :
99 117610 : read_len = read(srcfd, buf.data, sizeof(buf));
100 :
101 117610 : if (read_len < 0)
102 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
103 117610 : else if (read_len == 0)
104 4954 : break; /* EOF reached */
105 :
106 112656 : write_target_range(buf.data, written_len, read_len);
107 112656 : written_len += read_len;
108 : }
109 :
110 : /*
111 : * A local source is not expected to change while we're rewinding, so
112 : * check that the size of the file matches our earlier expectation.
113 : */
114 4954 : if (written_len != len)
115 2 : pg_fatal("size of source file \"%s\" changed concurrently: %d bytes expected, %d copied",
116 : srcpath, (int) len, (int) written_len);
117 :
118 4952 : if (close(srcfd) != 0)
119 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
120 4952 : }
121 :
122 : /*
123 : * Copy a file from source to target, starting at 'off', for 'len' bytes.
124 : */
125 : static void
126 1712 : local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
127 : size_t len)
128 : {
129 1712 : const char *datadir = ((local_source *) source)->datadir;
130 : PGIOAlignedBlock buf;
131 : char srcpath[MAXPGPATH];
132 : int srcfd;
133 1712 : off_t begin = off;
134 1712 : off_t end = off + len;
135 :
136 1712 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
137 :
138 1712 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
139 1712 : if (srcfd < 0)
140 0 : pg_fatal("could not open source file \"%s\": %m",
141 : srcpath);
142 :
143 1712 : if (lseek(srcfd, begin, SEEK_SET) == -1)
144 0 : pg_fatal("could not seek in source file: %m");
145 :
146 1712 : open_target_file(path, false);
147 :
148 3916 : while (end - begin > 0)
149 : {
150 : ssize_t readlen;
151 : size_t thislen;
152 :
153 2204 : if (end - begin > sizeof(buf))
154 492 : thislen = sizeof(buf);
155 : else
156 1712 : thislen = end - begin;
157 :
158 2204 : readlen = read(srcfd, buf.data, thislen);
159 :
160 2204 : if (readlen < 0)
161 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
162 2204 : else if (readlen == 0)
163 0 : pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
164 :
165 2204 : write_target_range(buf.data, begin, readlen);
166 2204 : begin += readlen;
167 : }
168 :
169 1712 : if (close(srcfd) != 0)
170 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
171 1712 : }
172 :
173 : static void
174 14 : local_finish_fetch(rewind_source *source)
175 : {
176 : /*
177 : * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
178 : */
179 14 : }
180 :
181 : static void
182 14 : local_destroy(rewind_source *source)
183 : {
184 14 : pfree(source);
185 14 : }
|