Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * bulk_write.c
4 : * Efficiently and reliably populate a new relation
5 : *
6 : * The assumption is that no other backends access the relation while we are
7 : * loading it, so we can take some shortcuts. Pages already present in the
8 : * indicated fork when the bulk write operation is started are not modified
9 : * unless explicitly written to. Do not mix operations through the regular
10 : * buffer manager and the bulk loading interface!
11 : *
12 : * We bypass the buffer manager to avoid the locking overhead, and call
13 : * smgrextend() directly. A downside is that the pages will need to be
14 : * re-read into shared buffers on first use after the build finishes. That's
15 : * usually a good tradeoff for large relations, and for small relations, the
16 : * overhead isn't very significant compared to creating the relation in the
17 : * first place.
18 : *
19 : * The pages are WAL-logged if needed. To save on WAL header overhead, we
20 : * WAL-log several pages in one record.
21 : *
22 : * One tricky point is that because we bypass the buffer manager, we need to
23 : * register the relation for fsyncing at the next checkpoint ourselves, and
24 : * make sure that the relation is correctly fsync'd by us or the checkpointer
25 : * even if a checkpoint happens concurrently.
26 : *
27 : *
28 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
29 : * Portions Copyright (c) 1994, Regents of the University of California
30 : *
31 : *
32 : * IDENTIFICATION
33 : * src/backend/storage/smgr/bulk_write.c
34 : *
35 : *-------------------------------------------------------------------------
36 : */
37 : #include "postgres.h"
38 :
39 : #include "access/xloginsert.h"
40 : #include "access/xlogrecord.h"
41 : #include "storage/bufpage.h"
42 : #include "storage/bulk_write.h"
43 : #include "storage/proc.h"
44 : #include "storage/smgr.h"
45 : #include "utils/rel.h"
46 :
47 : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
48 :
49 : static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
50 :
51 : typedef struct PendingWrite
52 : {
53 : BulkWriteBuffer buf;
54 : BlockNumber blkno;
55 : bool page_std;
56 : } PendingWrite;
57 :
58 : /*
59 : * Bulk writer state for one relation fork.
60 : */
61 : struct BulkWriteState
62 : {
63 : /* Information about the target relation we're writing */
64 : SMgrRelation smgr;
65 : ForkNumber forknum;
66 : bool use_wal;
67 :
68 : /* We keep several writes queued, and WAL-log them in batches */
69 : int npending;
70 : PendingWrite pending_writes[MAX_PENDING_WRITES];
71 :
72 : /* Current size of the relation */
73 : BlockNumber relsize;
74 :
75 : /* The RedoRecPtr at the time that the bulk operation started */
76 : XLogRecPtr start_RedoRecPtr;
77 :
78 : MemoryContext memcxt;
79 : };
80 :
81 : static void smgr_bulk_flush(BulkWriteState *bulkstate);
82 :
83 : /*
84 : * Start a bulk write operation on a relation fork.
85 : */
86 : BulkWriteState *
87 49028 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
88 : {
89 49028 : return smgr_bulk_start_smgr(RelationGetSmgr(rel),
90 : forknum,
91 49028 : RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
92 : }
93 :
94 : /*
95 : * Start a bulk write operation on a relation fork.
96 : *
97 : * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
98 : */
99 : BulkWriteState *
100 49206 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
101 : {
102 : BulkWriteState *state;
103 :
104 49206 : state = palloc(sizeof(BulkWriteState));
105 49206 : state->smgr = smgr;
106 49206 : state->forknum = forknum;
107 49206 : state->use_wal = use_wal;
108 :
109 49206 : state->npending = 0;
110 49206 : state->relsize = smgrnblocks(smgr, forknum);
111 :
112 49206 : state->start_RedoRecPtr = GetRedoRecPtr();
113 :
114 : /*
115 : * Remember the memory context. We will use it to allocate all the
116 : * buffers later.
117 : */
118 49206 : state->memcxt = CurrentMemoryContext;
119 :
120 49206 : return state;
121 : }
122 :
123 : /*
124 : * Finish bulk write operation.
125 : *
126 : * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
127 : * the relation if needed.
128 : */
129 : void
130 49206 : smgr_bulk_finish(BulkWriteState *bulkstate)
131 : {
132 : /* WAL-log and flush any remaining pages */
133 49206 : smgr_bulk_flush(bulkstate);
134 :
135 : /*
136 : * Fsync the relation, or register it for the next checkpoint, if
137 : * necessary.
138 : */
139 49206 : if (SmgrIsTemp(bulkstate->smgr))
140 : {
141 : /* Temporary relations don't need to be fsync'd, ever */
142 : }
143 46860 : else if (!bulkstate->use_wal)
144 : {
145 : /*----------
146 : * This is either an unlogged relation, or a permanent relation but we
147 : * skipped WAL-logging because wal_level=minimal:
148 : *
149 : * A) Unlogged relation
150 : *
151 : * Unlogged relations will go away on crash, but they need to be
152 : * fsync'd on a clean shutdown. It's sufficient to call
153 : * smgrregistersync(), that ensures that the checkpointer will
154 : * flush it at the shutdown checkpoint. (It will flush it on the
155 : * next online checkpoint too, which is not strictly necessary.)
156 : *
157 : * Note that the init-fork of an unlogged relation is not
158 : * considered unlogged for our purposes. It's treated like a
159 : * regular permanent relation. The callers will pass use_wal=true
160 : * for the init fork.
161 : *
162 : * B) Permanent relation, WAL-logging skipped because wal_level=minimal
163 : *
164 : * This is a new relation, and we didn't WAL-log the pages as we
165 : * wrote, but they need to be fsync'd before commit.
166 : *
167 : * We don't need to do that here, however. The fsync() is done at
168 : * commit, by smgrDoPendingSyncs() (*).
169 : *
170 : * (*) smgrDoPendingSyncs() might decide to WAL-log the whole
171 : * relation at commit instead of fsyncing it, if the relation was
172 : * very small, but it's smgrDoPendingSyncs() responsibility in any
173 : * case.
174 : *
175 : * We cannot distinguish the two here, so conservatively assume it's
176 : * an unlogged relation. A permanent relation with wal_level=minimal
177 : * would require no actions, see above.
178 : */
179 11294 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
180 : }
181 : else
182 : {
183 : /*
184 : * Permanent relation, WAL-logged normally.
185 : *
186 : * We already WAL-logged all the pages, so they will be replayed from
187 : * WAL on crash. However, when we wrote out the pages, we passed
188 : * skipFsync=true to avoid the overhead of registering all the writes
189 : * with the checkpointer. Register the whole relation now.
190 : *
191 : * There is one hole in that idea: If a checkpoint occurred while we
192 : * were writing the pages, it already missed fsyncing the pages we had
193 : * written before the checkpoint started. A crash later on would
194 : * replay the WAL starting from the checkpoint, therefore it wouldn't
195 : * replay our earlier WAL records. So if a checkpoint started after
196 : * the bulk write, fsync the files now.
197 : */
198 :
199 : /*
200 : * Prevent a checkpoint from starting between the GetRedoRecPtr() and
201 : * smgrregistersync() calls.
202 : */
203 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
204 35566 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
205 :
206 35566 : if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
207 : {
208 : /*
209 : * A checkpoint occurred and it didn't know about our writes, so
210 : * fsync() the relation ourselves.
211 : */
212 2 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
213 2 : smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
214 2 : elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
215 : }
216 : else
217 : {
218 35564 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
219 35564 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
220 : }
221 : }
222 49206 : }
223 :
224 : static int
225 223144 : buffer_cmp(const void *a, const void *b)
226 : {
227 223144 : const PendingWrite *bufa = (const PendingWrite *) a;
228 223144 : const PendingWrite *bufb = (const PendingWrite *) b;
229 :
230 : /* We should not see duplicated writes for the same block */
231 : Assert(bufa->blkno != bufb->blkno);
232 223144 : if (bufa->blkno > bufb->blkno)
233 104498 : return 1;
234 : else
235 118646 : return -1;
236 : }
237 :
238 : /*
239 : * Finish all the pending writes.
240 : */
241 : static void
242 50344 : smgr_bulk_flush(BulkWriteState *bulkstate)
243 : {
244 50344 : int npending = bulkstate->npending;
245 50344 : PendingWrite *pending_writes = bulkstate->pending_writes;
246 :
247 50344 : if (npending == 0)
248 206 : return;
249 :
250 50138 : if (npending > 1)
251 10798 : qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
252 :
253 50138 : if (bulkstate->use_wal)
254 : {
255 : BlockNumber blknos[MAX_PENDING_WRITES];
256 : Page pages[MAX_PENDING_WRITES];
257 36042 : bool page_std = true;
258 :
259 107536 : for (int i = 0; i < npending; i++)
260 : {
261 71494 : blknos[i] = pending_writes[i].blkno;
262 71494 : pages[i] = pending_writes[i].buf->data;
263 :
264 : /*
265 : * If any of the pages use !page_std, we log them all as such.
266 : * That's a bit wasteful, but in practice, a mix of standard and
267 : * non-standard page layout is rare. None of the built-in AMs do
268 : * that.
269 : */
270 71494 : if (!pending_writes[i].page_std)
271 104 : page_std = false;
272 : }
273 36042 : log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
274 : npending, blknos, pages, page_std);
275 : }
276 :
277 163248 : for (int i = 0; i < npending; i++)
278 : {
279 113110 : BlockNumber blkno = pending_writes[i].blkno;
280 113110 : Page page = pending_writes[i].buf->data;
281 :
282 113110 : PageSetChecksumInplace(page, blkno);
283 :
284 113110 : if (blkno >= bulkstate->relsize)
285 : {
286 : /*
287 : * If we have to write pages nonsequentially, fill in the space
288 : * with zeroes until we come back and overwrite. This is not
289 : * logically necessary on standard Unix filesystems (unwritten
290 : * space will read as zeroes anyway), but it should help to avoid
291 : * fragmentation. The dummy pages aren't WAL-logged though.
292 : */
293 113110 : while (blkno > bulkstate->relsize)
294 : {
295 : /* don't set checksum for all-zero page */
296 538 : smgrextend(bulkstate->smgr, bulkstate->forknum,
297 : bulkstate->relsize,
298 : &zero_buffer,
299 : true);
300 538 : bulkstate->relsize++;
301 : }
302 :
303 112572 : smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
304 112572 : bulkstate->relsize++;
305 : }
306 : else
307 538 : smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
308 113110 : pfree(page);
309 : }
310 :
311 50138 : bulkstate->npending = 0;
312 : }
313 :
314 : /*
315 : * Queue write of 'buf'.
316 : *
317 : * NB: this takes ownership of 'buf'!
318 : *
319 : * You are only allowed to write a given block once as part of one bulk write
320 : * operation.
321 : */
322 : void
323 113110 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
324 : {
325 : PendingWrite *w;
326 :
327 113110 : w = &bulkstate->pending_writes[bulkstate->npending++];
328 113110 : w->buf = buf;
329 113110 : w->blkno = blocknum;
330 113110 : w->page_std = page_std;
331 :
332 113110 : if (bulkstate->npending == MAX_PENDING_WRITES)
333 1138 : smgr_bulk_flush(bulkstate);
334 113110 : }
335 :
336 : /*
337 : * Allocate a new buffer which can later be written with smgr_bulk_write().
338 : *
339 : * There is no function to free the buffer. When you pass it to
340 : * smgr_bulk_write(), it takes ownership and frees it when it's no longer
341 : * needed.
342 : *
343 : * This is currently implemented as a simple palloc, but could be implemented
344 : * using a ring buffer or larger chunks in the future, so don't rely on it.
345 : */
346 : BulkWriteBuffer
347 113110 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
348 : {
349 113110 : return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
350 : }
|