Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * bulk_write.c
4 : * Efficiently and reliably populate a new relation
5 : *
6 : * The assumption is that no other backends access the relation while we are
7 : * loading it, so we can take some shortcuts. Pages already present in the
8 : * indicated fork when the bulk write operation is started are not modified
9 : * unless explicitly written to. Do not mix operations through the regular
10 : * buffer manager and the bulk loading interface!
11 : *
12 : * We bypass the buffer manager to avoid the locking overhead, and call
13 : * smgrextend() directly. A downside is that the pages will need to be
14 : * re-read into shared buffers on first use after the build finishes. That's
15 : * usually a good tradeoff for large relations, and for small relations, the
16 : * overhead isn't very significant compared to creating the relation in the
17 : * first place.
18 : *
19 : * The pages are WAL-logged if needed. To save on WAL header overhead, we
20 : * WAL-log several pages in one record.
21 : *
22 : * One tricky point is that because we bypass the buffer manager, we need to
23 : * register the relation for fsyncing at the next checkpoint ourselves, and
24 : * make sure that the relation is correctly fsync'd by us or the checkpointer
25 : * even if a checkpoint happens concurrently.
26 : *
27 : *
28 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
29 : * Portions Copyright (c) 1994, Regents of the University of California
30 : *
31 : *
32 : * IDENTIFICATION
33 : * src/backend/storage/smgr/bulk_write.c
34 : *
35 : *-------------------------------------------------------------------------
36 : */
37 : #include "postgres.h"
38 :
39 : #include "access/xloginsert.h"
40 : #include "access/xlogrecord.h"
41 : #include "storage/bufpage.h"
42 : #include "storage/bulk_write.h"
43 : #include "storage/proc.h"
44 : #include "storage/smgr.h"
45 : #include "utils/rel.h"
46 :
47 : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
48 :
49 : static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
50 :
51 : typedef struct PendingWrite
52 : {
53 : BulkWriteBuffer buf;
54 : BlockNumber blkno;
55 : bool page_std;
56 : } PendingWrite;
57 :
58 : /*
59 : * Bulk writer state for one relation fork.
60 : */
61 : struct BulkWriteState
62 : {
63 : /* Information about the target relation we're writing */
64 : SMgrRelation smgr;
65 : ForkNumber forknum;
66 : bool use_wal;
67 :
68 : /* We keep several writes queued, and WAL-log them in batches */
69 : int npending;
70 : PendingWrite pending_writes[MAX_PENDING_WRITES];
71 :
72 : /* Current size of the relation */
73 : BlockNumber relsize;
74 :
75 : /* The RedoRecPtr at the time that the bulk operation started */
76 : XLogRecPtr start_RedoRecPtr;
77 :
78 : MemoryContext memcxt;
79 : };
80 :
81 : static void smgr_bulk_flush(BulkWriteState *bulkstate);
82 :
83 : /*
84 : * Start a bulk write operation on a relation fork.
85 : */
86 : BulkWriteState *
87 48524 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
88 : {
89 48524 : return smgr_bulk_start_smgr(RelationGetSmgr(rel),
90 : forknum,
91 48524 : RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
92 : }
93 :
94 : /*
95 : * Start a bulk write operation on a relation fork.
96 : *
97 : * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
98 : */
99 : BulkWriteState *
100 48702 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
101 : {
102 : BulkWriteState *state;
103 :
104 48702 : state = palloc(sizeof(BulkWriteState));
105 48702 : state->smgr = smgr;
106 48702 : state->forknum = forknum;
107 48702 : state->use_wal = use_wal;
108 :
109 48702 : state->npending = 0;
110 48702 : state->relsize = smgrnblocks(smgr, forknum);
111 :
112 48702 : state->start_RedoRecPtr = GetRedoRecPtr();
113 :
114 : /*
115 : * Remember the memory context. We will use it to allocate all the
116 : * buffers later.
117 : */
118 48702 : state->memcxt = CurrentMemoryContext;
119 :
120 48702 : return state;
121 : }
122 :
123 : /*
124 : * Finish bulk write operation.
125 : *
126 : * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
127 : * the relation if needed.
128 : */
129 : void
130 48702 : smgr_bulk_finish(BulkWriteState *bulkstate)
131 : {
132 : /* WAL-log and flush any remaining pages */
133 48702 : smgr_bulk_flush(bulkstate);
134 :
135 : /*
136 : * Fsync the relation, or register it for the next checkpoint, if
137 : * necessary.
138 : */
139 48702 : if (SmgrIsTemp(bulkstate->smgr))
140 : {
141 : /* Temporary relations don't need to be fsync'd, ever */
142 : }
143 46356 : else if (!bulkstate->use_wal)
144 : {
145 : /*----------
146 : * This is either an unlogged relation, or a permanent relation but we
147 : * skipped WAL-logging because wal_level=minimal:
148 : *
149 : * A) Unlogged relation
150 : *
151 : * Unlogged relations will go away on crash, but they need to be
152 : * fsync'd on a clean shutdown. It's sufficient to call
153 : * smgrregistersync(), that ensures that the checkpointer will
154 : * flush it at the shutdown checkpoint. (It will flush it on the
155 : * next online checkpoint too, which is not strictly necessary.)
156 : *
157 : * Note that the init-fork of an unlogged relation is not
158 : * considered unlogged for our purposes. It's treated like a
159 : * regular permanent relation. The callers will pass use_wal=true
160 : * for the init fork.
161 : *
162 : * B) Permanent relation, WAL-logging skipped because wal_level=minimal
163 : *
164 : * This is a new relation, and we didn't WAL-log the pages as we
165 : * wrote, but they need to be fsync'd before commit.
166 : *
167 : * We don't need to do that here, however. The fsync() is done at
168 : * commit, by smgrDoPendingSyncs() (*).
169 : *
170 : * (*) smgrDoPendingSyncs() might decide to WAL-log the whole
171 : * relation at commit instead of fsyncing it, if the relation was
172 : * very small, but it's smgrDoPendingSyncs() responsibility in any
173 : * case.
174 : *
175 : * We cannot distinguish the two here, so conservatively assume it's
176 : * an unlogged relation. A permanent relation with wal_level=minimal
177 : * would require no actions, see above.
178 : */
179 11096 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
180 : }
181 : else
182 : {
183 : /*
184 : * Permanent relation, WAL-logged normally.
185 : *
186 : * We already WAL-logged all the pages, so they will be replayed from
187 : * WAL on crash. However, when we wrote out the pages, we passed
188 : * skipFsync=true to avoid the overhead of registering all the writes
189 : * with the checkpointer. Register the whole relation now.
190 : *
191 : * There is one hole in that idea: If a checkpoint occurred while we
192 : * were writing the pages, it already missed fsyncing the pages we had
193 : * written before the checkpoint started. A crash later on would
194 : * replay the WAL starting from the checkpoint, therefore it wouldn't
195 : * replay our earlier WAL records. So if a checkpoint started after
196 : * the bulk write, fsync the files now.
197 : */
198 :
199 : /*
200 : * Prevent a checkpoint from starting between the GetRedoRecPtr() and
201 : * smgrregistersync() calls.
202 : */
203 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
204 35260 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
205 :
206 35260 : if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
207 : {
208 : /*
209 : * A checkpoint occurred and it didn't know about our writes, so
210 : * fsync() the relation ourselves.
211 : */
212 2 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
213 2 : smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
214 2 : elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
215 : }
216 : else
217 : {
218 35258 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
219 35258 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
220 : }
221 : }
222 48702 : }
223 :
224 : static int
225 222948 : buffer_cmp(const void *a, const void *b)
226 : {
227 222948 : const PendingWrite *bufa = (const PendingWrite *) a;
228 222948 : const PendingWrite *bufb = (const PendingWrite *) b;
229 :
230 : /* We should not see duplicated writes for the same block */
231 : Assert(bufa->blkno != bufb->blkno);
232 222948 : if (bufa->blkno > bufb->blkno)
233 104400 : return 1;
234 : else
235 118548 : return -1;
236 : }
237 :
238 : /*
239 : * Finish all the pending writes.
240 : */
241 : static void
242 49838 : smgr_bulk_flush(BulkWriteState *bulkstate)
243 : {
244 49838 : int npending = bulkstate->npending;
245 49838 : PendingWrite *pending_writes = bulkstate->pending_writes;
246 :
247 49838 : if (npending == 0)
248 208 : return;
249 :
250 49630 : if (npending > 1)
251 10750 : qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
252 :
253 49630 : if (bulkstate->use_wal)
254 : {
255 : BlockNumber blknos[MAX_PENDING_WRITES];
256 : Page pages[MAX_PENDING_WRITES];
257 35738 : bool page_std = true;
258 :
259 106934 : for (int i = 0; i < npending; i++)
260 : {
261 71196 : blknos[i] = pending_writes[i].blkno;
262 71196 : pages[i] = pending_writes[i].buf->data;
263 :
264 : /*
265 : * If any of the pages use !page_std, we log them all as such.
266 : * That's a bit wasteful, but in practice, a mix of standard and
267 : * non-standard page layout is rare. None of the built-in AMs do
268 : * that.
269 : */
270 71196 : if (!pending_writes[i].page_std)
271 104 : page_std = false;
272 : }
273 35738 : log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
274 : npending, blknos, pages, page_std);
275 : }
276 :
277 162092 : for (int i = 0; i < npending; i++)
278 : {
279 112462 : BlockNumber blkno = pending_writes[i].blkno;
280 112462 : Page page = pending_writes[i].buf->data;
281 :
282 112462 : PageSetChecksumInplace(page, blkno);
283 :
284 112462 : if (blkno >= bulkstate->relsize)
285 : {
286 : /*
287 : * If we have to write pages nonsequentially, fill in the space
288 : * with zeroes until we come back and overwrite. This is not
289 : * logically necessary on standard Unix filesystems (unwritten
290 : * space will read as zeroes anyway), but it should help to avoid
291 : * fragmentation. The dummy pages aren't WAL-logged though.
292 : */
293 112462 : while (blkno > bulkstate->relsize)
294 : {
295 : /* don't set checksum for all-zero page */
296 538 : smgrextend(bulkstate->smgr, bulkstate->forknum,
297 : bulkstate->relsize,
298 : &zero_buffer,
299 : true);
300 538 : bulkstate->relsize++;
301 : }
302 :
303 111924 : smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
304 111924 : bulkstate->relsize++;
305 : }
306 : else
307 538 : smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
308 112462 : pfree(page);
309 : }
310 :
311 49630 : bulkstate->npending = 0;
312 : }
313 :
314 : /*
315 : * Queue write of 'buf'.
316 : *
317 : * NB: this takes ownership of 'buf'!
318 : *
319 : * You are only allowed to write a given block once as part of one bulk write
320 : * operation.
321 : */
322 : void
323 112462 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
324 : {
325 : PendingWrite *w;
326 :
327 112462 : w = &bulkstate->pending_writes[bulkstate->npending++];
328 112462 : w->buf = buf;
329 112462 : w->blkno = blocknum;
330 112462 : w->page_std = page_std;
331 :
332 112462 : if (bulkstate->npending == MAX_PENDING_WRITES)
333 1136 : smgr_bulk_flush(bulkstate);
334 112462 : }
335 :
336 : /*
337 : * Allocate a new buffer which can later be written with smgr_bulk_write().
338 : *
339 : * There is no function to free the buffer. When you pass it to
340 : * smgr_bulk_write(), it takes ownership and frees it when it's no longer
341 : * needed.
342 : *
343 : * This is currently implemented as a simple palloc, but could be implemented
344 : * using a ring buffer or larger chunks in the future, so don't rely on it.
345 : */
346 : BulkWriteBuffer
347 112462 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
348 : {
349 112462 : return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
350 : }
|