Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * bulk_write.c
4 : * Efficiently and reliably populate a new relation
5 : *
6 : * The assumption is that no other backends access the relation while we are
7 : * loading it, so we can take some shortcuts. Do not mix operations through
8 : * the regular buffer manager and the bulk loading interface!
9 : *
10 : * We bypass the buffer manager to avoid the locking overhead, and call
11 : * smgrextend() directly. A downside is that the pages will need to be
12 : * re-read into shared buffers on first use after the build finishes. That's
13 : * usually a good tradeoff for large relations, and for small relations, the
14 : * overhead isn't very significant compared to creating the relation in the
15 : * first place.
16 : *
17 : * The pages are WAL-logged if needed. To save on WAL header overhead, we
18 : * WAL-log several pages in one record.
19 : *
20 : * One tricky point is that because we bypass the buffer manager, we need to
21 : * register the relation for fsyncing at the next checkpoint ourselves, and
22 : * make sure that the relation is correctly fsync'd by us or the checkpointer
23 : * even if a checkpoint happens concurrently.
24 : *
25 : *
26 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
27 : * Portions Copyright (c) 1994, Regents of the University of California
28 : *
29 : *
30 : * IDENTIFICATION
31 : * src/backend/storage/smgr/bulk_write.c
32 : *
33 : *-------------------------------------------------------------------------
34 : */
35 : #include "postgres.h"
36 :
37 : #include "access/xloginsert.h"
38 : #include "access/xlogrecord.h"
39 : #include "storage/bufpage.h"
40 : #include "storage/bulk_write.h"
41 : #include "storage/proc.h"
42 : #include "storage/smgr.h"
43 : #include "utils/rel.h"
44 :
45 : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
46 :
47 : static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
48 :
49 : typedef struct PendingWrite
50 : {
51 : BulkWriteBuffer buf;
52 : BlockNumber blkno;
53 : bool page_std;
54 : } PendingWrite;
55 :
56 : /*
57 : * Bulk writer state for one relation fork.
58 : */
59 : struct BulkWriteState
60 : {
61 : /* Information about the target relation we're writing */
62 : SMgrRelation smgr;
63 : ForkNumber forknum;
64 : bool use_wal;
65 :
66 : /* We keep several writes queued, and WAL-log them in batches */
67 : int npending;
68 : PendingWrite pending_writes[MAX_PENDING_WRITES];
69 :
70 : /* Current size of the relation */
71 : BlockNumber pages_written;
72 :
73 : /* The RedoRecPtr at the time that the bulk operation started */
74 : XLogRecPtr start_RedoRecPtr;
75 :
76 : MemoryContext memcxt;
77 : };
78 :
79 : static void smgr_bulk_flush(BulkWriteState *bulkstate);
80 :
81 : /*
82 : * Start a bulk write operation on a relation fork.
83 : */
84 : BulkWriteState *
85 48298 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
86 : {
87 48298 : return smgr_bulk_start_smgr(RelationGetSmgr(rel),
88 : forknum,
89 48298 : RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
90 : }
91 :
92 : /*
93 : * Start a bulk write operation on a relation fork.
94 : *
95 : * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
96 : */
97 : BulkWriteState *
98 48496 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
99 : {
100 : BulkWriteState *state;
101 :
102 48496 : state = palloc(sizeof(BulkWriteState));
103 48496 : state->smgr = smgr;
104 48496 : state->forknum = forknum;
105 48496 : state->use_wal = use_wal;
106 :
107 48496 : state->npending = 0;
108 48496 : state->pages_written = 0;
109 :
110 48496 : state->start_RedoRecPtr = GetRedoRecPtr();
111 :
112 : /*
113 : * Remember the memory context. We will use it to allocate all the
114 : * buffers later.
115 : */
116 48496 : state->memcxt = CurrentMemoryContext;
117 :
118 48496 : return state;
119 : }
120 :
121 : /*
122 : * Finish bulk write operation.
123 : *
124 : * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
125 : * the relation if needed.
126 : */
127 : void
128 48496 : smgr_bulk_finish(BulkWriteState *bulkstate)
129 : {
130 : /* WAL-log and flush any remaining pages */
131 48496 : smgr_bulk_flush(bulkstate);
132 :
133 : /*
134 : * Fsync the relation, or register it for the next checkpoint, if
135 : * necessary.
136 : */
137 48496 : if (SmgrIsTemp(bulkstate->smgr))
138 : {
139 : /* Temporary relations don't need to be fsync'd, ever */
140 : }
141 46186 : else if (!bulkstate->use_wal)
142 : {
143 : /*----------
144 : * This is either an unlogged relation, or a permanent relation but we
145 : * skipped WAL-logging because wal_level=minimal:
146 : *
147 : * A) Unlogged relation
148 : *
149 : * Unlogged relations will go away on crash, but they need to be
150 : * fsync'd on a clean shutdown. It's sufficient to call
151 : * smgrregistersync(), that ensures that the checkpointer will
152 : * flush it at the shutdown checkpoint. (It will flush it on the
153 : * next online checkpoint too, which is not strictly necessary.)
154 : *
155 : * Note that the init-fork of an unlogged relation is not
156 : * considered unlogged for our purposes. It's treated like a
157 : * regular permanent relation. The callers will pass use_wal=true
158 : * for the init fork.
159 : *
160 : * B) Permanent relation, WAL-logging skipped because wal_level=minimal
161 : *
162 : * This is a new relation, and we didn't WAL-log the pages as we
163 : * wrote, but they need to be fsync'd before commit.
164 : *
165 : * We don't need to do that here, however. The fsync() is done at
166 : * commit, by smgrDoPendingSyncs() (*).
167 : *
168 : * (*) smgrDoPendingSyncs() might decide to WAL-log the whole
169 : * relation at commit instead of fsyncing it, if the relation was
170 : * very small, but it's smgrDoPendingSyncs() responsibility in any
171 : * case.
172 : *
173 : * We cannot distinguish the two here, so conservatively assume it's
174 : * an unlogged relation. A permanent relation with wal_level=minimal
175 : * would require no actions, see above.
176 : */
177 11052 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
178 : }
179 : else
180 : {
181 : /*
182 : * Permanent relation, WAL-logged normally.
183 : *
184 : * We already WAL-logged all the pages, so they will be replayed from
185 : * WAL on crash. However, when we wrote out the pages, we passed
186 : * skipFsync=true to avoid the overhead of registering all the writes
187 : * with the checkpointer. Register the whole relation now.
188 : *
189 : * There is one hole in that idea: If a checkpoint occurred while we
190 : * were writing the pages, it already missed fsyncing the pages we had
191 : * written before the checkpoint started. A crash later on would
192 : * replay the WAL starting from the checkpoint, therefore it wouldn't
193 : * replay our earlier WAL records. So if a checkpoint started after
194 : * the bulk write, fsync the files now.
195 : */
196 :
197 : /*
198 : * Prevent a checkpoint from starting between the GetRedoRecPtr() and
199 : * smgrregistersync() calls.
200 : */
201 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
202 35134 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
203 :
204 35134 : if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
205 : {
206 : /*
207 : * A checkpoint occurred and it didn't know about our writes, so
208 : * fsync() the relation ourselves.
209 : */
210 0 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
211 0 : smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
212 0 : elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
213 : }
214 : else
215 : {
216 35134 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
217 35134 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
218 : }
219 : }
220 48496 : }
221 :
222 : static int
223 220306 : buffer_cmp(const void *a, const void *b)
224 : {
225 220306 : const PendingWrite *bufa = (const PendingWrite *) a;
226 220306 : const PendingWrite *bufb = (const PendingWrite *) b;
227 :
228 : /* We should not see duplicated writes for the same block */
229 : Assert(bufa->blkno != bufb->blkno);
230 220306 : if (bufa->blkno > bufb->blkno)
231 103176 : return 1;
232 : else
233 117130 : return -1;
234 : }
235 :
236 : /*
237 : * Finish all the pending writes.
238 : */
239 : static void
240 49620 : smgr_bulk_flush(BulkWriteState *bulkstate)
241 : {
242 49620 : int npending = bulkstate->npending;
243 49620 : PendingWrite *pending_writes = bulkstate->pending_writes;
244 :
245 49620 : if (npending == 0)
246 212 : return;
247 :
248 49408 : if (npending > 1)
249 10706 : qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
250 :
251 49408 : if (bulkstate->use_wal)
252 : {
253 : BlockNumber blknos[MAX_PENDING_WRITES];
254 : Page pages[MAX_PENDING_WRITES];
255 35612 : bool page_std = true;
256 :
257 106534 : for (int i = 0; i < npending; i++)
258 : {
259 70922 : blknos[i] = pending_writes[i].blkno;
260 70922 : pages[i] = pending_writes[i].buf->data;
261 :
262 : /*
263 : * If any of the pages use !page_std, we log them all as such.
264 : * That's a bit wasteful, but in practice, a mix of standard and
265 : * non-standard page layout is rare. None of the built-in AMs do
266 : * that.
267 : */
268 70922 : if (!pending_writes[i].page_std)
269 108 : page_std = false;
270 : }
271 35612 : log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
272 : npending, blknos, pages, page_std);
273 : }
274 :
275 160992 : for (int i = 0; i < npending; i++)
276 : {
277 111584 : BlockNumber blkno = pending_writes[i].blkno;
278 111584 : Page page = pending_writes[i].buf->data;
279 :
280 111584 : PageSetChecksumInplace(page, blkno);
281 :
282 111584 : if (blkno >= bulkstate->pages_written)
283 : {
284 : /*
285 : * If we have to write pages nonsequentially, fill in the space
286 : * with zeroes until we come back and overwrite. This is not
287 : * logically necessary on standard Unix filesystems (unwritten
288 : * space will read as zeroes anyway), but it should help to avoid
289 : * fragmentation. The dummy pages aren't WAL-logged though.
290 : */
291 111584 : while (blkno > bulkstate->pages_written)
292 : {
293 : /* don't set checksum for all-zero page */
294 526 : smgrextend(bulkstate->smgr, bulkstate->forknum,
295 526 : bulkstate->pages_written++,
296 : &zero_buffer,
297 : true);
298 : }
299 :
300 111058 : smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
301 111058 : bulkstate->pages_written = pending_writes[i].blkno + 1;
302 : }
303 : else
304 526 : smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
305 111584 : pfree(page);
306 : }
307 :
308 49408 : bulkstate->npending = 0;
309 : }
310 :
311 : /*
312 : * Queue write of 'buf'.
313 : *
314 : * NB: this takes ownership of 'buf'!
315 : *
316 : * You are only allowed to write a given block once as part of one bulk write
317 : * operation.
318 : */
319 : void
320 111584 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
321 : {
322 : PendingWrite *w;
323 :
324 111584 : w = &bulkstate->pending_writes[bulkstate->npending++];
325 111584 : w->buf = buf;
326 111584 : w->blkno = blocknum;
327 111584 : w->page_std = page_std;
328 :
329 111584 : if (bulkstate->npending == MAX_PENDING_WRITES)
330 1124 : smgr_bulk_flush(bulkstate);
331 111584 : }
332 :
333 : /*
334 : * Allocate a new buffer which can later be written with smgr_bulk_write().
335 : *
336 : * There is no function to free the buffer. When you pass it to
337 : * smgr_bulk_write(), it takes ownership and frees it when it's no longer
338 : * needed.
339 : *
340 : * This is currently implemented as a simple palloc, but could be implemented
341 : * using a ring buffer or larger chunks in the future, so don't rely on it.
342 : */
343 : BulkWriteBuffer
344 111584 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
345 : {
346 111584 : return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
347 : }
|