Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * repack_worker.c
4 : : * Implementation of the background worker for ad-hoc logical decoding
5 : : * during REPACK (CONCURRENTLY).
6 : : *
7 : : *
8 : : * Copyright (c) 2026, PostgreSQL Global Development Group
9 : : *
10 : : *
11 : : * IDENTIFICATION
12 : : * src/backend/commands/repack_worker.c
13 : : *
14 : : *-------------------------------------------------------------------------
15 : : */
16 : : #include "postgres.h"
17 : :
18 : : #include "access/table.h"
19 : : #include "access/xlog_internal.h"
20 : : #include "access/xlogutils.h"
21 : : #include "access/xlogwait.h"
22 : : #include "commands/repack.h"
23 : : #include "commands/repack_internal.h"
24 : : #include "libpq/pqmq.h"
25 : : #include "replication/snapbuild.h"
26 : : #include "storage/ipc.h"
27 : : #include "storage/proc.h"
28 : : #include "tcop/tcopprot.h"
29 : : #include "utils/memutils.h"
30 : :
31 : : #define PGREPACK_PLUGIN "pgrepack"
32 : :
33 : : static void RepackWorkerShutdown(int code, Datum arg);
34 : : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
35 : : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
36 : : static void export_initial_snapshot(Snapshot snapshot,
37 : : DecodingWorkerShared *shared);
38 : : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
39 : : DecodingWorkerShared *shared);
40 : :
41 : : /* Is this process a REPACK worker? */
42 : : static bool am_repack_worker = false;
43 : :
44 : : /* The WAL segment being decoded. */
45 : : static XLogSegNo repack_current_segment = 0;
46 : :
47 : : /* Our DSM segment, for shutting down */
48 : : static dsm_segment *worker_dsm_segment = NULL;
49 : :
50 : : /*
51 : : * Keep track of the table we're processing, to skip logical decoding of data
52 : : * from other relations.
53 : : */
54 : : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
55 : : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
56 : :
57 : :
58 : : /* REPACK decoding worker entry point */
59 : : void
85 alvherre@kurilemu.de 60 :GNC 7 : RepackWorkerMain(Datum main_arg)
61 : : {
62 : : dsm_segment *seg;
63 : : DecodingWorkerShared *shared;
64 : : shm_mq *mq;
65 : : shm_mq_handle *mqh;
66 : : LogicalDecodingContext *decoding_ctx;
67 : : SharedFileSet *sfs;
68 : : Snapshot snapshot;
69 : :
70 : 7 : am_repack_worker = true;
71 : :
72 : 7 : BackgroundWorkerUnblockSignals();
73 : :
74 : 7 : seg = dsm_attach(DatumGetUInt32(main_arg));
75 [ - + ]: 7 : if (seg == NULL)
85 alvherre@kurilemu.de 76 [ # # ]:UNC 0 : ereport(ERROR,
77 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
78 : : errmsg("could not map dynamic shared memory segment"));
75 alvherre@kurilemu.de 79 :GNC 7 : worker_dsm_segment = seg;
80 : :
85 81 : 7 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
82 : :
83 : : /* Arrange to signal the leader if we exit. */
84 : 7 : before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
85 : :
86 : : /*
87 : : * Join locking group - see the comments around the call of
88 : : * start_repack_decoding_worker().
89 : : */
90 [ - + ]: 7 : if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
85 alvherre@kurilemu.de 91 :UNC 0 : return; /* The leader is not running anymore. */
92 : :
93 : : /*
94 : : * Setup a queue to send error messages to the backend that launched this
95 : : * worker.
96 : : */
85 alvherre@kurilemu.de 97 :GNC 7 : mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
98 : 7 : shm_mq_set_sender(mq, MyProc);
99 : 7 : mqh = shm_mq_attach(mq, seg, NULL);
100 : 7 : pq_redirect_to_shm_mq(seg, mqh);
101 : 7 : pq_set_parallel_leader(shared->backend_pid,
102 : : shared->backend_proc_number);
103 : :
104 : : /* Connect to the database. LOGIN is not required. */
71 105 : 7 : BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
106 : : BGWORKER_BYPASS_ROLELOGINCHECK);
107 : :
108 : : /*
109 : : * Transaction is needed to open relation, and it also provides us with a
110 : : * resource owner.
111 : : */
85 112 : 7 : StartTransactionCommand();
113 : :
114 : 7 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
115 : :
116 : : /*
117 : : * Not sure the spinlock is needed here - the backend should not change
118 : : * anything in the shared memory until we have serialized the snapshot.
119 : : */
120 : 7 : SpinLockAcquire(&shared->mutex);
121 [ - + ]: 7 : Assert(!XLogRecPtrIsValid(shared->lsn_upto));
122 : 7 : sfs = &shared->sfs;
123 : 7 : SpinLockRelease(&shared->mutex);
124 : :
125 : 7 : SharedFileSetAttach(sfs, seg);
126 : :
127 : : /*
128 : : * Prepare to capture the concurrent data changes ourselves.
129 : : */
130 : 7 : decoding_ctx = repack_setup_logical_decoding(shared->relid);
131 : :
132 : : /* Announce that we're ready. */
133 : 7 : SpinLockAcquire(&shared->mutex);
134 : 7 : shared->initialized = true;
135 : 7 : SpinLockRelease(&shared->mutex);
136 : 7 : ConditionVariableSignal(&shared->cv);
137 : :
138 : : /* There doesn't seem to a nice API to set these */
139 : 7 : XactIsoLevel = XACT_REPEATABLE_READ;
140 : 7 : XactReadOnly = true;
141 : :
142 : : /* Build the initial snapshot and export it. */
143 : 7 : snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
144 : 7 : export_initial_snapshot(snapshot, shared);
145 : :
146 : : /*
147 : : * Only historic snapshots should be used now. Do not let us restrict the
148 : : * progress of xmin horizon.
149 : : */
150 : 7 : InvalidateCatalogSnapshot();
151 : :
152 : : for (;;)
153 : 7 : {
154 : 14 : bool stop = decode_concurrent_changes(decoding_ctx, shared);
155 : :
156 [ + + ]: 14 : if (stop)
157 : 7 : break;
158 : :
159 : : }
160 : :
161 : : /* Cleanup. */
162 : 7 : repack_cleanup_logical_decoding(decoding_ctx);
163 : 7 : CommitTransactionCommand();
164 : : }
165 : :
166 : : /*
167 : : * See ParallelWorkerShutdown for details.
168 : : */
169 : : static void
170 : 7 : RepackWorkerShutdown(int code, Datum arg)
171 : : {
172 : 7 : DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
173 : :
174 : 7 : SendProcSignal(shared->backend_pid,
175 : : PROCSIG_REPACK_MESSAGE,
176 : : shared->backend_proc_number);
177 : :
75 178 : 7 : dsm_detach(worker_dsm_segment);
85 179 : 7 : }
180 : :
181 : : bool
182 : 2029 : AmRepackWorker(void)
183 : : {
184 : 2029 : return am_repack_worker;
185 : : }
186 : :
187 : : /*
188 : : * This function is much like pg_create_logical_replication_slot() except that
189 : : * the new slot is neither released (if anyone else could read changes from
190 : : * our slot, we could miss changes other backends do while we copy the
191 : : * existing data into temporary table), nor persisted (it's easier to handle
192 : : * crash by restarting all the work from scratch).
193 : : */
194 : : static LogicalDecodingContext *
195 : 7 : repack_setup_logical_decoding(Oid relid)
196 : : {
197 : : Relation rel;
198 : : Oid toastrelid;
199 : : LogicalDecodingContext *ctx;
200 : : char slotname[NAMEDATALEN];
201 : : RepackDecodingState *dstate;
202 : : MemoryContext oldcxt;
203 : :
204 : : /*
205 : : * REPACK CONCURRENTLY is not allowed in a transaction block, so this
206 : : * should never fire.
207 : : */
208 [ - + ]: 7 : Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
209 : :
210 : : /* Make sure we can use logical decoding */
84 211 : 7 : CheckLogicalDecodingRequirements(true);
212 : :
213 : : /*
214 : : * Create the replication slot we'll use, and enable logical decoding in
215 : : * case it isn't already on.
216 : : *
217 : : * Make the slot RS_TEMPORARY so that it's removed on ERROR. A backend
218 : : * cannot execute multiple REPACK commands at a time, so the PID is enough
219 : : * to make the slot name unique.
220 : : */
21 221 : 7 : snprintf(slotname, NAMEDATALEN, "pg_repack_%d", MyProcPid);
222 : 7 : ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true,
223 : : false, false);
85 224 : 7 : EnsureLogicalDecodingEnabled();
225 : :
226 : : /*
227 : : * Set up repacked_rel_locator and repacked_rel_toast_locator, which we
228 : : * use to skip decoding of unrelated relations.
229 : : */
230 : 7 : rel = table_open(relid, AccessShareLock);
231 : 7 : repacked_rel_locator = rel->rd_locator;
232 : 7 : toastrelid = rel->rd_rel->reltoastrelid;
233 [ + + ]: 7 : if (OidIsValid(toastrelid))
234 : : {
235 : : Relation toastrel;
236 : :
237 : : /* Avoid logical decoding of other TOAST relations. */
238 : 3 : toastrel = table_open(toastrelid, AccessShareLock);
239 : 3 : repacked_rel_toast_locator = toastrel->rd_locator;
240 : 3 : table_close(toastrel, AccessShareLock);
241 : : }
242 : 7 : table_close(rel, AccessShareLock);
243 : :
244 : : /*
245 : : * Set up our logical decoding context. We initially use the blocking
246 : : * read_local_xlog_page until we find the start point, and switch to the
247 : : * non-blocking interface afterwards.
248 : : */
21 249 : 7 : ctx = CreateInitDecodingContext(PGREPACK_PLUGIN,
250 : : NIL,
251 : : true,
252 : : true,
253 : : InvalidXLogRecPtr,
254 : 7 : XL_ROUTINE(.page_read = read_local_xlog_page,
255 : : .segment_open = wal_segment_open,
256 : : .segment_close = wal_segment_close),
257 : : NULL, NULL, NULL);
258 : :
259 : : /* Complete setup of output_writer_private */
260 : 7 : dstate = (RepackDecodingState *) ctx->output_writer_private;
261 : 7 : dstate->relid = relid;
262 : 7 : dstate->worker_cxt = CurrentMemoryContext;
263 : 7 : dstate->worker_resowner = CurrentResourceOwner;
264 : :
265 : : /* We don't have control on fast_forward, but verify it's sane */
266 [ - + ]: 7 : Assert(!ctx->fast_forward);
267 : :
268 : : /* Find our decoding starting point. */
269 : 7 : DecodingContextFindStartpoint(ctx);
270 : :
271 : : /* From this point on, we need non-blocking WAL reads */
272 : 7 : ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
273 : :
274 : : /*
275 : : * Initialize repack_current_segment so that we can notice WAL segment
276 : : * boundaries.
277 : : */
85 278 : 7 : XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
279 : : wal_segment_size);
280 : :
281 : : /*
282 : : * Set up our reader private state to let the page-read callback notify
283 : : * when end-of-WAL has been reached. This lives in the same context as
284 : : * the logical decoding itself.
285 : : */
21 286 : 7 : oldcxt = MemoryContextSwitchTo(ctx->context);
85 287 : 7 : ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
288 : 7 : MemoryContextSwitchTo(oldcxt);
289 : :
290 : 7 : return ctx;
291 : : }
292 : :
293 : : static void
294 : 7 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
295 : : {
296 : : RepackDecodingState *dstate;
297 : :
298 : 7 : dstate = (RepackDecodingState *) ctx->output_writer_private;
299 [ + + ]: 7 : if (dstate->slot)
300 : 1 : ExecDropSingleTupleTableSlot(dstate->slot);
301 : :
302 : 7 : FreeDecodingContext(ctx);
34 303 : 7 : ReplicationSlotDropAcquired(true);
85 304 : 7 : }
305 : :
306 : : /*
307 : : * Make snapshot available to the backend that launched the decoding worker.
308 : : */
309 : : static void
310 : 7 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
311 : : {
312 : : char fname[MAXPGPATH];
313 : : BufFile *file;
314 : : Size snap_size;
315 : : char *snap_space;
316 : :
317 : 7 : snap_size = EstimateSnapshotSpace(snapshot);
318 : 7 : snap_space = (char *) palloc(snap_size);
319 : 7 : SerializeSnapshot(snapshot, snap_space);
320 : :
321 : 7 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
322 : 7 : file = BufFileCreateFileSet(&shared->sfs.fs, fname);
323 : : /* To make restoration easier, write the snapshot size first. */
324 : 7 : BufFileWrite(file, &snap_size, sizeof(snap_size));
325 : 7 : BufFileWrite(file, snap_space, snap_size);
326 : 7 : BufFileClose(file);
84 327 : 7 : pfree(snap_space);
328 : :
329 : : /* Increase the counter to tell the backend that the file is available. */
85 330 : 7 : SpinLockAcquire(&shared->mutex);
331 : 7 : shared->last_exported++;
332 : 7 : SpinLockRelease(&shared->mutex);
333 : 7 : ConditionVariableSignal(&shared->cv);
334 : 7 : }
335 : :
336 : : /*
337 : : * Decode logical changes from the WAL sequence and store them to a file.
338 : : *
339 : : * If true is returned, there is no more work for the worker.
340 : : */
341 : : static bool
342 : 14 : decode_concurrent_changes(LogicalDecodingContext *ctx,
343 : : DecodingWorkerShared *shared)
344 : : {
345 : : RepackDecodingState *dstate;
346 : : XLogRecPtr lsn_upto;
347 : : bool done;
348 : : char fname[MAXPGPATH];
349 : :
350 : 14 : dstate = (RepackDecodingState *) ctx->output_writer_private;
351 : :
352 : : /* Open the output file. */
353 : 14 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
354 : 14 : dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
355 : :
356 : 14 : SpinLockAcquire(&shared->mutex);
357 : 14 : lsn_upto = shared->lsn_upto;
358 : 14 : done = shared->done;
359 : 14 : SpinLockRelease(&shared->mutex);
360 : :
361 : : while (true)
362 : 1480 : {
363 : : XLogRecord *record;
364 : : XLogSegNo segno_new;
365 : 1494 : char *errm = NULL;
366 : : XLogRecPtr end_lsn;
367 : :
368 [ - + ]: 1494 : CHECK_FOR_INTERRUPTS();
369 : :
370 : 1494 : record = XLogReadRecord(ctx->reader, &errm);
371 [ + + ]: 1494 : if (record)
372 : : {
373 : 1445 : LogicalDecodingProcessRecord(ctx, ctx->reader);
374 : :
375 : : /*
376 : : * We want to allow WAL to be recycled while REPACK is running.
377 : : *
378 : : * In normal usage of a replication slot, we need to be very
379 : : * careful not to advance the LSN until it's been confirmed as
380 : : * received by the remote. In REPACK's case, this is not needed:
381 : : * REPACK will never try to replay the same WAL after a crash, and
382 : : * if there _is_ a crash, the whole REPACK has to be started from
383 : : * scratch anyway.
384 : : *
385 : : * So here we disregard the careful LSN tracking and just move the
386 : : * LSN locations forward to what we've processed. Note that it
387 : : * would be bogus to move the xmin forward, though, so we don't
388 : : * touch that.
389 : : *
390 : : * This can be done on whatever schedule is convenient, but in
391 : : * order not to cause unnecessary load, we only do it as we cross
392 : : * each WAL segment boundary.
393 : : */
394 : 1445 : end_lsn = ctx->reader->EndRecPtr;
395 : 1445 : XLByteToSeg(end_lsn, segno_new, wal_segment_size);
396 [ - + ]: 1445 : if (segno_new != repack_current_segment)
397 : : {
31 alvherre@kurilemu.de 398 :UNC 0 : LogicalIncreaseRestartDecodingForSlot(end_lsn, end_lsn);
85 399 : 0 : LogicalConfirmReceivedLocation(end_lsn);
400 [ # # ]: 0 : elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
401 : : (uint32) (end_lsn >> 32), (uint32) end_lsn);
402 : 0 : repack_current_segment = segno_new;
403 : : }
404 : : }
405 : : else
406 : : {
407 : : ReadLocalXLogPageNoWaitPrivate *priv;
408 : :
85 alvherre@kurilemu.de 409 [ - + ]:GNC 49 : if (errm)
85 alvherre@kurilemu.de 410 [ # # ]:UNC 0 : ereport(ERROR,
411 : : errcode_for_file_access(),
412 : : errmsg("could not read WAL from timeline %u at %X/%08X: %s",
413 : : ctx->reader->currTLI,
414 : : LSN_FORMAT_ARGS(ctx->reader->EndRecPtr),
415 : : errm));
416 : :
417 : : /*
418 : : * In the decoding loop we do not want to get blocked when there
419 : : * is no more WAL available, otherwise the loop would become
420 : : * uninterruptible.
421 : : */
85 alvherre@kurilemu.de 422 :GNC 49 : priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
423 [ + - ]: 49 : if (priv->end_of_wal)
424 : : /* Do not miss the end of WAL condition next time. */
425 : 49 : priv->end_of_wal = false;
426 : : else
85 alvherre@kurilemu.de 427 [ # # ]:UNC 0 : ereport(ERROR,
428 : : errcode(ERRCODE_DATA_CORRUPTED),
429 : : errmsg("could not read WAL record"));
430 : : }
431 : :
432 : : /*
433 : : * Whether we could read new record or not, keep checking if
434 : : * 'lsn_upto' was specified.
435 : : */
85 alvherre@kurilemu.de 436 [ + + ]:GNC 1494 : if (!XLogRecPtrIsValid(lsn_upto))
437 : : {
438 : 1270 : SpinLockAcquire(&shared->mutex);
439 : 1270 : lsn_upto = shared->lsn_upto;
440 : : /* 'done' should be set at the same time as 'lsn_upto' */
441 : 1270 : done = shared->done;
442 : 1270 : SpinLockRelease(&shared->mutex);
443 : : }
444 [ + + ]: 1494 : if (XLogRecPtrIsValid(lsn_upto) &&
445 [ + + ]: 237 : ctx->reader->EndRecPtr >= lsn_upto)
446 : 14 : break;
447 : :
448 [ + + ]: 1480 : if (record == NULL)
449 : : {
450 : 41 : int64 timeout = 0;
451 : : WaitLSNResult res;
452 : :
453 : : /*
454 : : * Before we retry reading, wait until new WAL is flushed.
455 : : *
456 : : * There is a race condition such that the backend executing
457 : : * REPACK determines 'lsn_upto', but before it sets the shared
458 : : * variable, we reach the end of WAL. In that case we'd need to
459 : : * wait until the next WAL flush (unrelated to REPACK). Although
460 : : * that should not be a problem in a busy system, it might be
461 : : * noticeable in other cases, including regression tests (which
462 : : * are not necessarily executed in parallel). Therefore it makes
463 : : * sense to use timeout.
464 : : *
465 : : * If lsn_upto is valid, WAL records having LSN lower than that
466 : : * should already have been flushed to disk.
467 : : */
468 [ + - ]: 41 : if (!XLogRecPtrIsValid(lsn_upto))
469 : 41 : timeout = 100L;
470 : 41 : res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
471 : 41 : ctx->reader->EndRecPtr + 1,
472 : : timeout);
473 [ + + - + ]: 41 : if (res != WAIT_LSN_RESULT_SUCCESS &&
474 : : res != WAIT_LSN_RESULT_TIMEOUT)
85 alvherre@kurilemu.de 475 [ # # ]:UNC 0 : ereport(ERROR,
476 : : errcode(ERRCODE_INTERNAL_ERROR),
477 : : errmsg("waiting for WAL failed"));
478 : : }
479 : : }
480 : :
481 : : /*
482 : : * Close the file so we can make it available to the backend.
483 : : */
85 alvherre@kurilemu.de 484 :GNC 14 : BufFileClose(dstate->file);
485 : 14 : dstate->file = NULL;
486 : 14 : SpinLockAcquire(&shared->mutex);
487 : 14 : shared->lsn_upto = InvalidXLogRecPtr;
488 : 14 : shared->last_exported++;
489 : 14 : SpinLockRelease(&shared->mutex);
490 : 14 : ConditionVariableSignal(&shared->cv);
491 : :
492 : 14 : return done;
493 : : }
494 : :
495 : : /*
496 : : * Does the WAL record contain a data change that this backend does not need
497 : : * to decode on behalf of REPACK (CONCURRENTLY)?
498 : : */
499 : : bool
500 : 1416899 : change_useless_for_repack(XLogRecordBuffer *buf)
501 : : {
502 : 1416899 : XLogReaderState *r = buf->record;
503 : : RelFileLocator locator;
504 : :
505 : : /* TOAST locator should not be set unless the main is. */
506 [ + + - + ]: 1416899 : Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
507 : : OidIsValid(repacked_rel_locator.relNumber));
508 : :
509 : : /*
510 : : * Backends not involved in REPACK (CONCURRENTLY) should not do the
511 : : * filtering.
512 : : */
513 [ + + ]: 1416899 : if (!OidIsValid(repacked_rel_locator.relNumber))
514 : 1416469 : return false;
515 : :
516 : : /*
517 : : * If the record does not contain the block 0, it's probably not INSERT /
518 : : * UPDATE / DELETE. In any case, we do not have enough information to
519 : : * filter the change out.
520 : : */
521 [ - + ]: 430 : if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
85 alvherre@kurilemu.de 522 :UNC 0 : return false;
523 : :
524 : : /*
525 : : * Decode the change if it belongs to the table we are repacking, or if it
526 : : * belongs to its TOAST relation.
527 : : */
85 alvherre@kurilemu.de 528 [ + + + - :GNC 430 : if (RelFileLocatorEquals(locator, repacked_rel_locator))
+ - ]
529 : 33 : return false;
530 [ + + ]: 397 : if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
531 [ + + + - : 299 : RelFileLocatorEquals(locator, repacked_rel_toast_locator))
+ - ]
532 : 44 : return false;
533 : :
534 : : /* Filter out changes of other tables. */
535 : 353 : return true;
536 : : }
|