Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * bulk_write.c
4 : * Efficiently and reliably populate a new relation
5 : *
6 : * The assumption is that no other backends access the relation while we are
7 : * loading it, so we can take some shortcuts. Do not mix operations through
8 : * the regular buffer manager and the bulk loading interface!
9 : *
10 : * We bypass the buffer manager to avoid the locking overhead, and call
11 : * smgrextend() directly. A downside is that the pages will need to be
12 : * re-read into shared buffers on first use after the build finishes. That's
13 : * usually a good tradeoff for large relations, and for small relations, the
14 : * overhead isn't very significant compared to creating the relation in the
15 : * first place.
16 : *
17 : * The pages are WAL-logged if needed. To save on WAL header overhead, we
18 : * WAL-log several pages in one record.
19 : *
20 : * One tricky point is that because we bypass the buffer manager, we need to
21 : * register the relation for fsyncing at the next checkpoint ourselves, and
22 : * make sure that the relation is correctly fsync'd by us or the checkpointer
23 : * even if a checkpoint happens concurrently.
24 : *
25 : *
26 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
27 : * Portions Copyright (c) 1994, Regents of the University of California
28 : *
29 : *
30 : * IDENTIFICATION
31 : * src/backend/storage/smgr/bulk_write.c
32 : *
33 : *-------------------------------------------------------------------------
34 : */
35 : #include "postgres.h"
36 :
37 : #include "access/xloginsert.h"
38 : #include "access/xlogrecord.h"
39 : #include "storage/bufmgr.h"
40 : #include "storage/bufpage.h"
41 : #include "storage/bulk_write.h"
42 : #include "storage/proc.h"
43 : #include "storage/smgr.h"
44 : #include "utils/rel.h"
45 :
46 : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
47 :
48 : static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
49 :
50 : typedef struct PendingWrite
51 : {
52 : BulkWriteBuffer buf;
53 : BlockNumber blkno;
54 : bool page_std;
55 : } PendingWrite;
56 :
57 : /*
58 : * Bulk writer state for one relation fork.
59 : */
60 : struct BulkWriteState
61 : {
62 : /* Information about the target relation we're writing */
63 : SMgrRelation smgr;
64 : ForkNumber forknum;
65 : bool use_wal;
66 :
67 : /* We keep several writes queued, and WAL-log them in batches */
68 : int npending;
69 : PendingWrite pending_writes[MAX_PENDING_WRITES];
70 :
71 : /* Current size of the relation */
72 : BlockNumber pages_written;
73 :
74 : /* The RedoRecPtr at the time that the bulk operation started */
75 : XLogRecPtr start_RedoRecPtr;
76 :
77 : MemoryContext memcxt;
78 : };
79 :
80 : static void smgr_bulk_flush(BulkWriteState *bulkstate);
81 :
82 : /*
83 : * Start a bulk write operation on a relation fork.
84 : */
85 : BulkWriteState *
86 46456 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
87 : {
88 46456 : return smgr_bulk_start_smgr(RelationGetSmgr(rel),
89 : forknum,
90 46456 : RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
91 : }
92 :
93 : /*
94 : * Start a bulk write operation on a relation fork.
95 : *
96 : * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
97 : */
98 : BulkWriteState *
99 46634 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
100 : {
101 : BulkWriteState *state;
102 :
103 46634 : state = palloc(sizeof(BulkWriteState));
104 46634 : state->smgr = smgr;
105 46634 : state->forknum = forknum;
106 46634 : state->use_wal = use_wal;
107 :
108 46634 : state->npending = 0;
109 46634 : state->pages_written = 0;
110 :
111 46634 : state->start_RedoRecPtr = GetRedoRecPtr();
112 :
113 : /*
114 : * Remember the memory context. We will use it to allocate all the
115 : * buffers later.
116 : */
117 46634 : state->memcxt = CurrentMemoryContext;
118 :
119 46634 : return state;
120 : }
121 :
122 : /*
123 : * Finish bulk write operation.
124 : *
125 : * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
126 : * the relation if needed.
127 : */
128 : void
129 46634 : smgr_bulk_finish(BulkWriteState *bulkstate)
130 : {
131 : /* WAL-log and flush any remaining pages */
132 46634 : smgr_bulk_flush(bulkstate);
133 :
134 : /*
135 : * When we wrote out the pages, we passed skipFsync=true to avoid the
136 : * overhead of registering all the writes with the checkpointer. Register
137 : * the whole relation now.
138 : *
139 : * There is one hole in that idea: If a checkpoint occurred while we were
140 : * writing the pages, it already missed fsyncing the pages we had written
141 : * before the checkpoint started. A crash later on would replay the WAL
142 : * starting from the checkpoint, therefore it wouldn't replay our earlier
143 : * WAL records. So if a checkpoint started after the bulk write, fsync
144 : * the files now.
145 : */
146 46634 : if (!SmgrIsTemp(bulkstate->smgr))
147 : {
148 : /*
149 : * Prevent a checkpoint from starting between the GetRedoRecPtr() and
150 : * smgrregistersync() calls.
151 : */
152 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
153 44414 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
154 :
155 44414 : if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
156 : {
157 : /*
158 : * A checkpoint occurred and it didn't know about our writes, so
159 : * fsync() the relation ourselves.
160 : */
161 0 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
162 0 : smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
163 0 : elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
164 : }
165 : else
166 : {
167 44414 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
168 44414 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
169 : }
170 : }
171 46634 : }
172 :
173 : static int
174 197884 : buffer_cmp(const void *a, const void *b)
175 : {
176 197884 : const PendingWrite *bufa = (const PendingWrite *) a;
177 197884 : const PendingWrite *bufb = (const PendingWrite *) b;
178 :
179 : /* We should not see duplicated writes for the same block */
180 : Assert(bufa->blkno != bufb->blkno);
181 197884 : if (bufa->blkno > bufb->blkno)
182 92198 : return 1;
183 : else
184 105686 : return -1;
185 : }
186 :
187 : /*
188 : * Finish all the pending writes.
189 : */
190 : static void
191 47748 : smgr_bulk_flush(BulkWriteState *bulkstate)
192 : {
193 47748 : int npending = bulkstate->npending;
194 47748 : PendingWrite *pending_writes = bulkstate->pending_writes;
195 :
196 47748 : if (npending == 0)
197 304 : return;
198 :
199 47444 : if (npending > 1)
200 10166 : qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
201 :
202 47444 : if (bulkstate->use_wal)
203 : {
204 : BlockNumber blknos[MAX_PENDING_WRITES];
205 : Page pages[MAX_PENDING_WRITES];
206 33846 : bool page_std = true;
207 :
208 101268 : for (int i = 0; i < npending; i++)
209 : {
210 67422 : blknos[i] = pending_writes[i].blkno;
211 67422 : pages[i] = pending_writes[i].buf->data;
212 :
213 : /*
214 : * If any of the pages use !page_std, we log them all as such.
215 : * That's a bit wasteful, but in practice, a mix of standard and
216 : * non-standard page layout is rare. None of the built-in AMs do
217 : * that.
218 : */
219 67422 : if (!pending_writes[i].page_std)
220 102 : page_std = false;
221 : }
222 33846 : log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
223 : npending, blknos, pages, page_std);
224 : }
225 :
226 155140 : for (int i = 0; i < npending; i++)
227 : {
228 107696 : BlockNumber blkno = pending_writes[i].blkno;
229 107696 : Page page = pending_writes[i].buf->data;
230 :
231 107696 : PageSetChecksumInplace(page, blkno);
232 :
233 107696 : if (blkno >= bulkstate->pages_written)
234 : {
235 : /*
236 : * If we have to write pages nonsequentially, fill in the space
237 : * with zeroes until we come back and overwrite. This is not
238 : * logically necessary on standard Unix filesystems (unwritten
239 : * space will read as zeroes anyway), but it should help to avoid
240 : * fragmentation. The dummy pages aren't WAL-logged though.
241 : */
242 107696 : while (blkno > bulkstate->pages_written)
243 : {
244 : /* don't set checksum for all-zero page */
245 412 : smgrextend(bulkstate->smgr, bulkstate->forknum,
246 412 : bulkstate->pages_written++,
247 : &zero_buffer,
248 : true);
249 : }
250 :
251 107284 : smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
252 107284 : bulkstate->pages_written = pending_writes[i].blkno + 1;
253 : }
254 : else
255 412 : smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
256 107696 : pfree(page);
257 : }
258 :
259 47444 : bulkstate->npending = 0;
260 : }
261 :
262 : /*
263 : * Queue write of 'buf'.
264 : *
265 : * NB: this takes ownership of 'buf'!
266 : *
267 : * You are only allowed to write a given block once as part of one bulk write
268 : * operation.
269 : */
270 : void
271 107696 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
272 : {
273 : PendingWrite *w;
274 :
275 107696 : w = &bulkstate->pending_writes[bulkstate->npending++];
276 107696 : w->buf = buf;
277 107696 : w->blkno = blocknum;
278 107696 : w->page_std = page_std;
279 :
280 107696 : if (bulkstate->npending == MAX_PENDING_WRITES)
281 1114 : smgr_bulk_flush(bulkstate);
282 107696 : }
283 :
284 : /*
285 : * Allocate a new buffer which can later be written with smgr_bulk_write().
286 : *
287 : * There is no function to free the buffer. When you pass it to
288 : * smgr_bulk_write(), it takes ownership and frees it when it's no longer
289 : * needed.
290 : *
291 : * This is currently implemented as a simple palloc, but could be implemented
292 : * using a ring buffer or larger chunks in the future, so don't rely on it.
293 : */
294 : BulkWriteBuffer
295 107696 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
296 : {
297 107696 : return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
298 : }
|