Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * repack_worker.c
4 : * Implementation of the background worker for ad-hoc logical decoding
5 : * during REPACK (CONCURRENTLY).
6 : *
7 : *
8 : * Copyright (c) 2026, PostgreSQL Global Development Group
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/commands/repack_worker.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/table.h"
19 : #include "access/xlog_internal.h"
20 : #include "access/xlogutils.h"
21 : #include "access/xlogwait.h"
22 : #include "commands/repack.h"
23 : #include "commands/repack_internal.h"
24 : #include "libpq/pqmq.h"
25 : #include "replication/snapbuild.h"
26 : #include "storage/ipc.h"
27 : #include "storage/proc.h"
28 : #include "tcop/tcopprot.h"
29 : #include "utils/memutils.h"
30 :
31 : #define PGREPACK_PLUGIN "pgrepack"
32 :
33 : static void RepackWorkerShutdown(int code, Datum arg);
34 : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
35 : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
36 : static void export_initial_snapshot(Snapshot snapshot,
37 : DecodingWorkerShared *shared);
38 : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
39 : DecodingWorkerShared *shared);
40 :
41 : /* Is this process a REPACK worker? */
42 : static bool am_repack_worker = false;
43 :
44 : /* The WAL segment being decoded. */
45 : static XLogSegNo repack_current_segment = 0;
46 :
47 : /* Our DSM segment, for shutting down */
48 : static dsm_segment *worker_dsm_segment = NULL;
49 :
50 : /*
51 : * Keep track of the table we're processing, to skip logical decoding of data
52 : * from other relations.
53 : */
54 : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
55 : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
56 :
57 :
58 : /* REPACK decoding worker entry point */
59 : void
60 7 : RepackWorkerMain(Datum main_arg)
61 : {
62 : dsm_segment *seg;
63 : DecodingWorkerShared *shared;
64 : shm_mq *mq;
65 : shm_mq_handle *mqh;
66 : LogicalDecodingContext *decoding_ctx;
67 : SharedFileSet *sfs;
68 : Snapshot snapshot;
69 :
70 7 : am_repack_worker = true;
71 :
72 7 : BackgroundWorkerUnblockSignals();
73 :
74 7 : seg = dsm_attach(DatumGetUInt32(main_arg));
75 7 : if (seg == NULL)
76 0 : ereport(ERROR,
77 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
78 : errmsg("could not map dynamic shared memory segment"));
79 7 : worker_dsm_segment = seg;
80 :
81 7 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
82 :
83 : /* Arrange to signal the leader if we exit. */
84 7 : before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
85 :
86 : /*
87 : * Join locking group - see the comments around the call of
88 : * start_repack_decoding_worker().
89 : */
90 7 : if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
91 0 : return; /* The leader is not running anymore. */
92 :
93 : /*
94 : * Setup a queue to send error messages to the backend that launched this
95 : * worker.
96 : */
97 7 : mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
98 7 : shm_mq_set_sender(mq, MyProc);
99 7 : mqh = shm_mq_attach(mq, seg, NULL);
100 7 : pq_redirect_to_shm_mq(seg, mqh);
101 7 : pq_set_parallel_leader(shared->backend_pid,
102 : shared->backend_proc_number);
103 :
104 : /* Connect to the database. LOGIN is not required. */
105 7 : BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
106 : BGWORKER_BYPASS_ROLELOGINCHECK);
107 :
108 : /*
109 : * Transaction is needed to open relation, and it also provides us with a
110 : * resource owner.
111 : */
112 7 : StartTransactionCommand();
113 :
114 7 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
115 :
116 : /*
117 : * Not sure the spinlock is needed here - the backend should not change
118 : * anything in the shared memory until we have serialized the snapshot.
119 : */
120 7 : SpinLockAcquire(&shared->mutex);
121 : Assert(!XLogRecPtrIsValid(shared->lsn_upto));
122 7 : sfs = &shared->sfs;
123 7 : SpinLockRelease(&shared->mutex);
124 :
125 7 : SharedFileSetAttach(sfs, seg);
126 :
127 : /*
128 : * Prepare to capture the concurrent data changes ourselves.
129 : */
130 7 : decoding_ctx = repack_setup_logical_decoding(shared->relid);
131 :
132 : /* Announce that we're ready. */
133 7 : SpinLockAcquire(&shared->mutex);
134 7 : shared->initialized = true;
135 7 : SpinLockRelease(&shared->mutex);
136 7 : ConditionVariableSignal(&shared->cv);
137 :
138 : /* There doesn't seem to a nice API to set these */
139 7 : XactIsoLevel = XACT_REPEATABLE_READ;
140 7 : XactReadOnly = true;
141 :
142 : /* Build the initial snapshot and export it. */
143 7 : snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
144 7 : export_initial_snapshot(snapshot, shared);
145 :
146 : /*
147 : * Only historic snapshots should be used now. Do not let us restrict the
148 : * progress of xmin horizon.
149 : */
150 7 : InvalidateCatalogSnapshot();
151 :
152 : for (;;)
153 7 : {
154 14 : bool stop = decode_concurrent_changes(decoding_ctx, shared);
155 :
156 14 : if (stop)
157 7 : break;
158 :
159 : }
160 :
161 : /* Cleanup. */
162 7 : repack_cleanup_logical_decoding(decoding_ctx);
163 7 : CommitTransactionCommand();
164 : }
165 :
166 : /*
167 : * See ParallelWorkerShutdown for details.
168 : */
169 : static void
170 7 : RepackWorkerShutdown(int code, Datum arg)
171 : {
172 7 : DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
173 :
174 7 : SendProcSignal(shared->backend_pid,
175 : PROCSIG_REPACK_MESSAGE,
176 : shared->backend_proc_number);
177 :
178 7 : dsm_detach(worker_dsm_segment);
179 7 : }
180 :
181 : bool
182 2033 : AmRepackWorker(void)
183 : {
184 2033 : return am_repack_worker;
185 : }
186 :
187 : /*
188 : * This function is much like pg_create_logical_replication_slot() except that
189 : * the new slot is neither released (if anyone else could read changes from
190 : * our slot, we could miss changes other backends do while we copy the
191 : * existing data into temporary table), nor persisted (it's easier to handle
192 : * crash by restarting all the work from scratch).
193 : */
194 : static LogicalDecodingContext *
195 7 : repack_setup_logical_decoding(Oid relid)
196 : {
197 : Relation rel;
198 : Oid toastrelid;
199 : LogicalDecodingContext *ctx;
200 : char slotname[NAMEDATALEN];
201 : RepackDecodingState *dstate;
202 : MemoryContext oldcxt;
203 :
204 : /*
205 : * REPACK CONCURRENTLY is not allowed in a transaction block, so this
206 : * should never fire.
207 : */
208 : Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
209 :
210 : /* Make sure we can use logical decoding */
211 7 : CheckLogicalDecodingRequirements(true);
212 :
213 : /*
214 : * Create the replication slot we'll use, and enable logical decoding in
215 : * case it isn't already on.
216 : *
217 : * Make the slot RS_TEMPORARY so that it's removed on ERROR. A backend
218 : * cannot execute multiple REPACK commands at a time, so the PID is enough
219 : * to make the slot name unique.
220 : */
221 7 : snprintf(slotname, NAMEDATALEN, "pg_repack_%d", MyProcPid);
222 7 : ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true,
223 : false, false);
224 7 : EnsureLogicalDecodingEnabled();
225 :
226 : /*
227 : * Set up repacked_rel_locator and repacked_rel_toast_locator, which we
228 : * use to skip decoding of unrelated relations.
229 : */
230 7 : rel = table_open(relid, AccessShareLock);
231 7 : repacked_rel_locator = rel->rd_locator;
232 7 : toastrelid = rel->rd_rel->reltoastrelid;
233 7 : if (OidIsValid(toastrelid))
234 : {
235 : Relation toastrel;
236 :
237 : /* Avoid logical decoding of other TOAST relations. */
238 3 : toastrel = table_open(toastrelid, AccessShareLock);
239 3 : repacked_rel_toast_locator = toastrel->rd_locator;
240 3 : table_close(toastrel, AccessShareLock);
241 : }
242 7 : table_close(rel, AccessShareLock);
243 :
244 : /*
245 : * Set up our logical decoding context. We initially use the blocking
246 : * read_local_xlog_page until we find the start point, and switch to the
247 : * non-blocking interface afterwards.
248 : */
249 7 : ctx = CreateInitDecodingContext(PGREPACK_PLUGIN,
250 : NIL,
251 : true,
252 : true,
253 : InvalidXLogRecPtr,
254 7 : XL_ROUTINE(.page_read = read_local_xlog_page,
255 : .segment_open = wal_segment_open,
256 : .segment_close = wal_segment_close),
257 : NULL, NULL, NULL);
258 :
259 : /* Complete setup of output_writer_private */
260 7 : dstate = (RepackDecodingState *) ctx->output_writer_private;
261 7 : dstate->relid = relid;
262 7 : dstate->worker_cxt = CurrentMemoryContext;
263 7 : dstate->worker_resowner = CurrentResourceOwner;
264 :
265 : /* We don't have control on fast_forward, but verify it's sane */
266 : Assert(!ctx->fast_forward);
267 :
268 : /* Find our decoding starting point. */
269 7 : DecodingContextFindStartpoint(ctx);
270 :
271 : /* From this point on, we need non-blocking WAL reads */
272 7 : ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
273 :
274 : /*
275 : * Initialize repack_current_segment so that we can notice WAL segment
276 : * boundaries.
277 : */
278 7 : XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
279 : wal_segment_size);
280 :
281 : /*
282 : * Set up our reader private state to let the page-read callback notify
283 : * when end-of-WAL has been reached. This lives in the same context as
284 : * the logical decoding itself.
285 : */
286 7 : oldcxt = MemoryContextSwitchTo(ctx->context);
287 7 : ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
288 7 : MemoryContextSwitchTo(oldcxt);
289 :
290 7 : return ctx;
291 : }
292 :
293 : static void
294 7 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
295 : {
296 : RepackDecodingState *dstate;
297 :
298 7 : dstate = (RepackDecodingState *) ctx->output_writer_private;
299 7 : if (dstate->slot)
300 1 : ExecDropSingleTupleTableSlot(dstate->slot);
301 :
302 7 : FreeDecodingContext(ctx);
303 7 : ReplicationSlotDropAcquired(true);
304 7 : }
305 :
306 : /*
307 : * Make snapshot available to the backend that launched the decoding worker.
308 : */
309 : static void
310 7 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
311 : {
312 : char fname[MAXPGPATH];
313 : BufFile *file;
314 : Size snap_size;
315 : char *snap_space;
316 :
317 7 : snap_size = EstimateSnapshotSpace(snapshot);
318 7 : snap_space = (char *) palloc(snap_size);
319 7 : SerializeSnapshot(snapshot, snap_space);
320 :
321 7 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
322 7 : file = BufFileCreateFileSet(&shared->sfs.fs, fname);
323 : /* To make restoration easier, write the snapshot size first. */
324 7 : BufFileWrite(file, &snap_size, sizeof(snap_size));
325 7 : BufFileWrite(file, snap_space, snap_size);
326 7 : BufFileClose(file);
327 7 : pfree(snap_space);
328 :
329 : /* Increase the counter to tell the backend that the file is available. */
330 7 : SpinLockAcquire(&shared->mutex);
331 7 : shared->last_exported++;
332 7 : SpinLockRelease(&shared->mutex);
333 7 : ConditionVariableSignal(&shared->cv);
334 7 : }
335 :
336 : /*
337 : * Decode logical changes from the WAL sequence and store them to a file.
338 : *
339 : * If true is returned, there is no more work for the worker.
340 : */
341 : static bool
342 14 : decode_concurrent_changes(LogicalDecodingContext *ctx,
343 : DecodingWorkerShared *shared)
344 : {
345 : RepackDecodingState *dstate;
346 : XLogRecPtr lsn_upto;
347 : bool done;
348 : char fname[MAXPGPATH];
349 :
350 14 : dstate = (RepackDecodingState *) ctx->output_writer_private;
351 :
352 : /* Open the output file. */
353 14 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
354 14 : dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
355 :
356 14 : SpinLockAcquire(&shared->mutex);
357 14 : lsn_upto = shared->lsn_upto;
358 14 : done = shared->done;
359 14 : SpinLockRelease(&shared->mutex);
360 :
361 : while (true)
362 11090 : {
363 : XLogRecord *record;
364 : XLogSegNo segno_new;
365 11104 : char *errm = NULL;
366 : XLogRecPtr end_lsn;
367 :
368 11104 : CHECK_FOR_INTERRUPTS();
369 :
370 11104 : record = XLogReadRecord(ctx->reader, &errm);
371 11104 : if (record)
372 : {
373 1445 : LogicalDecodingProcessRecord(ctx, ctx->reader);
374 :
375 : /*
376 : * We want to allow WAL to be recycled while REPACK is running.
377 : *
378 : * In normal usage of a replication slot, we need to be very
379 : * careful not to advance the LSN until it's been confirmed as
380 : * received by the remote. In REPACK's case, this is not needed:
381 : * REPACK will never try to replay the same WAL after a crash, and
382 : * if there _is_ a crash, the whole REPACK has to be started from
383 : * scratch anyway.
384 : *
385 : * So here we disregard the careful LSN tracking and just move the
386 : * LSN locations forward to what we've processed. Note that it
387 : * would be bogus to move the xmin forward, though, so we don't
388 : * touch that.
389 : *
390 : * This can be done on whatever schedule is convenient, but in
391 : * order not to cause unnecessary load, we only do it as we cross
392 : * each WAL segment boundary.
393 : */
394 1445 : end_lsn = ctx->reader->EndRecPtr;
395 1445 : XLByteToSeg(end_lsn, segno_new, wal_segment_size);
396 1445 : if (segno_new != repack_current_segment)
397 : {
398 0 : LogicalIncreaseRestartDecodingForSlot(end_lsn, end_lsn);
399 0 : LogicalConfirmReceivedLocation(end_lsn);
400 0 : elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
401 : (uint32) (end_lsn >> 32), (uint32) end_lsn);
402 0 : repack_current_segment = segno_new;
403 : }
404 : }
405 : else
406 : {
407 : ReadLocalXLogPageNoWaitPrivate *priv;
408 :
409 9659 : if (errm)
410 0 : ereport(ERROR,
411 : errcode_for_file_access(),
412 : errmsg("could not read WAL from timeline %u at %X/%08X: %s",
413 : ctx->reader->currTLI,
414 : LSN_FORMAT_ARGS(ctx->reader->EndRecPtr),
415 : errm));
416 :
417 : /*
418 : * In the decoding loop we do not want to get blocked when there
419 : * is no more WAL available, otherwise the loop would become
420 : * uninterruptible.
421 : */
422 9659 : priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
423 9659 : if (priv->end_of_wal)
424 : /* Do not miss the end of WAL condition next time. */
425 9659 : priv->end_of_wal = false;
426 : else
427 0 : ereport(ERROR,
428 : errcode(ERRCODE_DATA_CORRUPTED),
429 : errmsg("could not read WAL record"));
430 : }
431 :
432 : /*
433 : * Whether we could read new record or not, keep checking if
434 : * 'lsn_upto' was specified.
435 : */
436 11104 : if (!XLogRecPtrIsValid(lsn_upto))
437 : {
438 10693 : SpinLockAcquire(&shared->mutex);
439 10693 : lsn_upto = shared->lsn_upto;
440 : /* 'done' should be set at the same time as 'lsn_upto' */
441 10693 : done = shared->done;
442 10693 : SpinLockRelease(&shared->mutex);
443 : }
444 11104 : if (XLogRecPtrIsValid(lsn_upto) &&
445 425 : ctx->reader->EndRecPtr >= lsn_upto)
446 14 : break;
447 :
448 11090 : if (record == NULL)
449 : {
450 9654 : int64 timeout = 0;
451 : WaitLSNResult res;
452 :
453 : /*
454 : * Before we retry reading, wait until new WAL is flushed.
455 : *
456 : * There is a race condition such that the backend executing
457 : * REPACK determines 'lsn_upto', but before it sets the shared
458 : * variable, we reach the end of WAL. In that case we'd need to
459 : * wait until the next WAL flush (unrelated to REPACK). Although
460 : * that should not be a problem in a busy system, it might be
461 : * noticeable in other cases, including regression tests (which
462 : * are not necessarily executed in parallel). Therefore it makes
463 : * sense to use timeout.
464 : *
465 : * If lsn_upto is valid, WAL records having LSN lower than that
466 : * should already have been flushed to disk.
467 : */
468 9654 : if (!XLogRecPtrIsValid(lsn_upto))
469 9654 : timeout = 100L;
470 9654 : res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
471 9654 : ctx->reader->EndRecPtr + 1,
472 : timeout);
473 9654 : if (res != WAIT_LSN_RESULT_SUCCESS &&
474 : res != WAIT_LSN_RESULT_TIMEOUT)
475 0 : ereport(ERROR,
476 : errcode(ERRCODE_INTERNAL_ERROR),
477 : errmsg("waiting for WAL failed"));
478 : }
479 : }
480 :
481 : /*
482 : * Close the file so we can make it available to the backend.
483 : */
484 14 : BufFileClose(dstate->file);
485 14 : dstate->file = NULL;
486 14 : SpinLockAcquire(&shared->mutex);
487 14 : shared->lsn_upto = InvalidXLogRecPtr;
488 14 : shared->last_exported++;
489 14 : SpinLockRelease(&shared->mutex);
490 14 : ConditionVariableSignal(&shared->cv);
491 :
492 14 : return done;
493 : }
494 :
495 : /*
496 : * Does the WAL record contain a data change that this backend does not need
497 : * to decode on behalf of REPACK (CONCURRENTLY)?
498 : */
499 : bool
500 1659051 : change_useless_for_repack(XLogRecordBuffer *buf)
501 : {
502 1659051 : XLogReaderState *r = buf->record;
503 : RelFileLocator locator;
504 :
505 : /* TOAST locator should not be set unless the main is. */
506 : Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
507 : OidIsValid(repacked_rel_locator.relNumber));
508 :
509 : /*
510 : * Backends not involved in REPACK (CONCURRENTLY) should not do the
511 : * filtering.
512 : */
513 1659051 : if (!OidIsValid(repacked_rel_locator.relNumber))
514 1658621 : return false;
515 :
516 : /*
517 : * If the record does not contain the block 0, it's probably not INSERT /
518 : * UPDATE / DELETE. In any case, we do not have enough information to
519 : * filter the change out.
520 : */
521 430 : if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
522 0 : return false;
523 :
524 : /*
525 : * Decode the change if it belongs to the table we are repacking, or if it
526 : * belongs to its TOAST relation.
527 : */
528 430 : if (RelFileLocatorEquals(locator, repacked_rel_locator))
529 33 : return false;
530 397 : if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
531 299 : RelFileLocatorEquals(locator, repacked_rel_toast_locator))
532 44 : return false;
533 :
534 : /* Filter out changes of other tables. */
535 353 : return true;
536 : }
|