Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * repack_worker.c
4 : * Implementation of the background worker for ad-hoc logical decoding
5 : * during REPACK (CONCURRENTLY).
6 : *
7 : *
8 : * Copyright (c) 2026, PostgreSQL Global Development Group
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/commands/repack_worker.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/table.h"
19 : #include "access/xlog_internal.h"
20 : #include "access/xlogutils.h"
21 : #include "access/xlogwait.h"
22 : #include "commands/repack.h"
23 : #include "commands/repack_internal.h"
24 : #include "libpq/pqmq.h"
25 : #include "replication/snapbuild.h"
26 : #include "storage/ipc.h"
27 : #include "storage/proc.h"
28 : #include "tcop/tcopprot.h"
29 : #include "utils/memutils.h"
30 :
31 : #define REPL_PLUGIN_NAME "pgrepack"
32 :
33 : static void RepackWorkerShutdown(int code, Datum arg);
34 : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
35 : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
36 : static void export_initial_snapshot(Snapshot snapshot,
37 : DecodingWorkerShared *shared);
38 : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
39 : DecodingWorkerShared *shared);
40 :
41 : /* Is this process a REPACK worker? */
42 : static bool am_repack_worker = false;
43 :
44 : /* The WAL segment being decoded. */
45 : static XLogSegNo repack_current_segment = 0;
46 :
47 : /*
48 : * Keep track of the table we're processing, to skip logical decoding of data
49 : * from other relations.
50 : */
51 : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
52 : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
53 :
54 :
55 : /* REPACK decoding worker entry point */
56 : void
57 2 : RepackWorkerMain(Datum main_arg)
58 : {
59 : dsm_segment *seg;
60 : DecodingWorkerShared *shared;
61 : shm_mq *mq;
62 : shm_mq_handle *mqh;
63 : LogicalDecodingContext *decoding_ctx;
64 : SharedFileSet *sfs;
65 : Snapshot snapshot;
66 :
67 2 : am_repack_worker = true;
68 :
69 : /*
70 : * Override the default bgworker_die() with die() so we can use
71 : * CHECK_FOR_INTERRUPTS().
72 : */
73 2 : pqsignal(SIGTERM, die);
74 2 : BackgroundWorkerUnblockSignals();
75 :
76 2 : seg = dsm_attach(DatumGetUInt32(main_arg));
77 2 : if (seg == NULL)
78 0 : ereport(ERROR,
79 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
80 : errmsg("could not map dynamic shared memory segment"));
81 :
82 2 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
83 2 : shared->dsm_seg = seg;
84 :
85 : /* Arrange to signal the leader if we exit. */
86 2 : before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
87 :
88 : /*
89 : * Join locking group - see the comments around the call of
90 : * start_repack_decoding_worker().
91 : */
92 2 : if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
93 0 : return; /* The leader is not running anymore. */
94 :
95 : /*
96 : * Setup a queue to send error messages to the backend that launched this
97 : * worker.
98 : */
99 2 : mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
100 2 : shm_mq_set_sender(mq, MyProc);
101 2 : mqh = shm_mq_attach(mq, seg, NULL);
102 2 : pq_redirect_to_shm_mq(seg, mqh);
103 2 : pq_set_parallel_leader(shared->backend_pid,
104 : shared->backend_proc_number);
105 :
106 : /* Connect to the database. */
107 2 : BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid, 0);
108 :
109 : /*
110 : * Transaction is needed to open relation, and it also provides us with a
111 : * resource owner.
112 : */
113 2 : StartTransactionCommand();
114 :
115 2 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
116 :
117 : /*
118 : * Not sure the spinlock is needed here - the backend should not change
119 : * anything in the shared memory until we have serialized the snapshot.
120 : */
121 2 : SpinLockAcquire(&shared->mutex);
122 : Assert(!XLogRecPtrIsValid(shared->lsn_upto));
123 2 : sfs = &shared->sfs;
124 2 : SpinLockRelease(&shared->mutex);
125 :
126 2 : SharedFileSetAttach(sfs, seg);
127 :
128 : /*
129 : * Prepare to capture the concurrent data changes ourselves.
130 : */
131 2 : decoding_ctx = repack_setup_logical_decoding(shared->relid);
132 :
133 : /* Announce that we're ready. */
134 2 : SpinLockAcquire(&shared->mutex);
135 2 : shared->initialized = true;
136 2 : SpinLockRelease(&shared->mutex);
137 2 : ConditionVariableSignal(&shared->cv);
138 :
139 : /* There doesn't seem to a nice API to set these */
140 2 : XactIsoLevel = XACT_REPEATABLE_READ;
141 2 : XactReadOnly = true;
142 :
143 : /* Build the initial snapshot and export it. */
144 2 : snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
145 2 : export_initial_snapshot(snapshot, shared);
146 :
147 : /*
148 : * Only historic snapshots should be used now. Do not let us restrict the
149 : * progress of xmin horizon.
150 : */
151 2 : InvalidateCatalogSnapshot();
152 :
153 : for (;;)
154 2 : {
155 4 : bool stop = decode_concurrent_changes(decoding_ctx, shared);
156 :
157 4 : if (stop)
158 2 : break;
159 :
160 : }
161 :
162 : /* Cleanup. */
163 2 : repack_cleanup_logical_decoding(decoding_ctx);
164 2 : CommitTransactionCommand();
165 : }
166 :
167 : /*
168 : * See ParallelWorkerShutdown for details.
169 : */
170 : static void
171 2 : RepackWorkerShutdown(int code, Datum arg)
172 : {
173 2 : DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
174 :
175 2 : SendProcSignal(shared->backend_pid,
176 : PROCSIG_REPACK_MESSAGE,
177 : shared->backend_proc_number);
178 :
179 2 : dsm_detach(shared->dsm_seg);
180 2 : }
181 :
182 : bool
183 2020 : AmRepackWorker(void)
184 : {
185 2020 : return am_repack_worker;
186 : }
187 :
188 : /*
189 : * This function is much like pg_create_logical_replication_slot() except that
190 : * the new slot is neither released (if anyone else could read changes from
191 : * our slot, we could miss changes other backends do while we copy the
192 : * existing data into temporary table), nor persisted (it's easier to handle
193 : * crash by restarting all the work from scratch).
194 : */
195 : static LogicalDecodingContext *
196 2 : repack_setup_logical_decoding(Oid relid)
197 : {
198 : Relation rel;
199 : Oid toastrelid;
200 : LogicalDecodingContext *ctx;
201 : NameData slotname;
202 : RepackDecodingState *dstate;
203 : MemoryContext oldcxt;
204 :
205 : /*
206 : * REPACK CONCURRENTLY is not allowed in a transaction block, so this
207 : * should never fire.
208 : */
209 : Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
210 :
211 : /*
212 : * Make sure we can use logical decoding.
213 : */
214 2 : CheckSlotPermissions();
215 2 : CheckLogicalDecodingRequirements();
216 :
217 : /*
218 : * A single backend should not execute multiple REPACK commands at a time,
219 : * so use PID to make the slot unique.
220 : *
221 : * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
222 : */
223 2 : snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
224 2 : ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
225 : false);
226 :
227 2 : EnsureLogicalDecodingEnabled();
228 :
229 : /*
230 : * Neither prepare_write nor do_write callback nor update_progress is
231 : * useful for us.
232 : */
233 2 : ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
234 : NIL,
235 : true,
236 : InvalidXLogRecPtr,
237 2 : XL_ROUTINE(.page_read = read_local_xlog_page,
238 : .segment_open = wal_segment_open,
239 : .segment_close = wal_segment_close),
240 : NULL, NULL, NULL);
241 :
242 : /*
243 : * We don't have control on setting fast_forward, so at least check it.
244 : */
245 : Assert(!ctx->fast_forward);
246 :
247 : /* Avoid logical decoding of other relations. */
248 2 : rel = table_open(relid, AccessShareLock);
249 2 : repacked_rel_locator = rel->rd_locator;
250 2 : toastrelid = rel->rd_rel->reltoastrelid;
251 2 : if (OidIsValid(toastrelid))
252 : {
253 : Relation toastrel;
254 :
255 : /* Avoid logical decoding of other TOAST relations. */
256 1 : toastrel = table_open(toastrelid, AccessShareLock);
257 1 : repacked_rel_toast_locator = toastrel->rd_locator;
258 1 : table_close(toastrel, AccessShareLock);
259 : }
260 2 : table_close(rel, AccessShareLock);
261 :
262 2 : DecodingContextFindStartpoint(ctx);
263 :
264 : /*
265 : * decode_concurrent_changes() needs non-blocking callback.
266 : */
267 2 : ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
268 :
269 : /* Some WAL records should have been read. */
270 : Assert(ctx->reader->EndRecPtr != InvalidXLogRecPtr);
271 :
272 : /*
273 : * Initialize repack_current_segment so that we can notice WAL segment
274 : * boundaries.
275 : */
276 2 : XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
277 : wal_segment_size);
278 :
279 : /* Our private state belongs to the decoding context. */
280 2 : oldcxt = MemoryContextSwitchTo(ctx->context);
281 :
282 : /*
283 : * read_local_xlog_page_no_wait() needs to be able to indicate the end of
284 : * WAL.
285 : */
286 2 : ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
287 2 : dstate = palloc0_object(RepackDecodingState);
288 2 : MemoryContextSwitchTo(oldcxt);
289 :
290 : #ifdef USE_ASSERT_CHECKING
291 : dstate->relid = relid;
292 : #endif
293 :
294 2 : dstate->change_cxt = AllocSetContextCreate(ctx->context,
295 : "REPACK - change",
296 : ALLOCSET_DEFAULT_SIZES);
297 :
298 : /* The file will be set as soon as we have it opened. */
299 2 : dstate->file = NULL;
300 :
301 : /*
302 : * Memory context and resource owner for long-lived resources.
303 : */
304 2 : dstate->worker_cxt = CurrentMemoryContext;
305 2 : dstate->worker_resowner = CurrentResourceOwner;
306 :
307 2 : ctx->output_writer_private = dstate;
308 :
309 2 : return ctx;
310 : }
311 :
312 : static void
313 2 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
314 : {
315 : RepackDecodingState *dstate;
316 :
317 2 : dstate = (RepackDecodingState *) ctx->output_writer_private;
318 2 : if (dstate->slot)
319 1 : ExecDropSingleTupleTableSlot(dstate->slot);
320 :
321 2 : FreeDecodingContext(ctx);
322 2 : ReplicationSlotDropAcquired();
323 2 : }
324 :
325 : /*
326 : * Make snapshot available to the backend that launched the decoding worker.
327 : */
328 : static void
329 2 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
330 : {
331 : char fname[MAXPGPATH];
332 : BufFile *file;
333 : Size snap_size;
334 : char *snap_space;
335 :
336 2 : snap_size = EstimateSnapshotSpace(snapshot);
337 2 : snap_space = (char *) palloc(snap_size);
338 2 : SerializeSnapshot(snapshot, snap_space);
339 :
340 2 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
341 2 : file = BufFileCreateFileSet(&shared->sfs.fs, fname);
342 : /* To make restoration easier, write the snapshot size first. */
343 2 : BufFileWrite(file, &snap_size, sizeof(snap_size));
344 2 : BufFileWrite(file, snap_space, snap_size);
345 2 : BufFileClose(file);
346 2 : pfree(snap_space);
347 :
348 : /* Increase the counter to tell the backend that the file is available. */
349 2 : SpinLockAcquire(&shared->mutex);
350 2 : shared->last_exported++;
351 2 : SpinLockRelease(&shared->mutex);
352 2 : ConditionVariableSignal(&shared->cv);
353 2 : }
354 :
355 : /*
356 : * Decode logical changes from the WAL sequence and store them to a file.
357 : *
358 : * If true is returned, there is no more work for the worker.
359 : */
360 : static bool
361 4 : decode_concurrent_changes(LogicalDecodingContext *ctx,
362 : DecodingWorkerShared *shared)
363 : {
364 : RepackDecodingState *dstate;
365 : XLogRecPtr lsn_upto;
366 : bool done;
367 : char fname[MAXPGPATH];
368 :
369 4 : dstate = (RepackDecodingState *) ctx->output_writer_private;
370 :
371 : /* Open the output file. */
372 4 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
373 4 : dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
374 :
375 4 : SpinLockAcquire(&shared->mutex);
376 4 : lsn_upto = shared->lsn_upto;
377 4 : done = shared->done;
378 4 : SpinLockRelease(&shared->mutex);
379 :
380 : while (true)
381 440 : {
382 : XLogRecord *record;
383 : XLogSegNo segno_new;
384 444 : char *errm = NULL;
385 : XLogRecPtr end_lsn;
386 :
387 444 : CHECK_FOR_INTERRUPTS();
388 :
389 444 : record = XLogReadRecord(ctx->reader, &errm);
390 444 : if (record)
391 : {
392 429 : LogicalDecodingProcessRecord(ctx, ctx->reader);
393 :
394 : /*
395 : * If WAL segment boundary has been crossed, inform the decoding
396 : * system that the catalog_xmin can advance.
397 : */
398 429 : end_lsn = ctx->reader->EndRecPtr;
399 429 : XLByteToSeg(end_lsn, segno_new, wal_segment_size);
400 429 : if (segno_new != repack_current_segment)
401 : {
402 0 : LogicalConfirmReceivedLocation(end_lsn);
403 0 : elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
404 : (uint32) (end_lsn >> 32), (uint32) end_lsn);
405 0 : repack_current_segment = segno_new;
406 : }
407 : }
408 : else
409 : {
410 : ReadLocalXLogPageNoWaitPrivate *priv;
411 :
412 15 : if (errm)
413 0 : ereport(ERROR,
414 : errmsg("%s", errm));
415 :
416 : /*
417 : * In the decoding loop we do not want to get blocked when there
418 : * is no more WAL available, otherwise the loop would become
419 : * uninterruptible.
420 : */
421 15 : priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
422 15 : if (priv->end_of_wal)
423 : /* Do not miss the end of WAL condition next time. */
424 15 : priv->end_of_wal = false;
425 : else
426 0 : ereport(ERROR,
427 : errmsg("could not read WAL record"));
428 : }
429 :
430 : /*
431 : * Whether we could read new record or not, keep checking if
432 : * 'lsn_upto' was specified.
433 : */
434 444 : if (!XLogRecPtrIsValid(lsn_upto))
435 : {
436 421 : SpinLockAcquire(&shared->mutex);
437 421 : lsn_upto = shared->lsn_upto;
438 : /* 'done' should be set at the same time as 'lsn_upto' */
439 421 : done = shared->done;
440 421 : SpinLockRelease(&shared->mutex);
441 : }
442 444 : if (XLogRecPtrIsValid(lsn_upto) &&
443 27 : ctx->reader->EndRecPtr >= lsn_upto)
444 4 : break;
445 :
446 440 : if (record == NULL)
447 : {
448 12 : int64 timeout = 0;
449 : WaitLSNResult res;
450 :
451 : /*
452 : * Before we retry reading, wait until new WAL is flushed.
453 : *
454 : * There is a race condition such that the backend executing
455 : * REPACK determines 'lsn_upto', but before it sets the shared
456 : * variable, we reach the end of WAL. In that case we'd need to
457 : * wait until the next WAL flush (unrelated to REPACK). Although
458 : * that should not be a problem in a busy system, it might be
459 : * noticeable in other cases, including regression tests (which
460 : * are not necessarily executed in parallel). Therefore it makes
461 : * sense to use timeout.
462 : *
463 : * If lsn_upto is valid, WAL records having LSN lower than that
464 : * should already have been flushed to disk.
465 : */
466 12 : if (!XLogRecPtrIsValid(lsn_upto))
467 12 : timeout = 100L;
468 12 : res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
469 12 : ctx->reader->EndRecPtr + 1,
470 : timeout);
471 12 : if (res != WAIT_LSN_RESULT_SUCCESS &&
472 : res != WAIT_LSN_RESULT_TIMEOUT)
473 0 : ereport(ERROR,
474 : errmsg("waiting for WAL failed"));
475 : }
476 : }
477 :
478 : /*
479 : * Close the file so we can make it available to the backend.
480 : */
481 4 : BufFileClose(dstate->file);
482 4 : dstate->file = NULL;
483 4 : SpinLockAcquire(&shared->mutex);
484 4 : shared->lsn_upto = InvalidXLogRecPtr;
485 4 : shared->last_exported++;
486 4 : SpinLockRelease(&shared->mutex);
487 4 : ConditionVariableSignal(&shared->cv);
488 :
489 4 : return done;
490 : }
491 :
492 : /*
493 : * Does the WAL record contain a data change that this backend does not need
494 : * to decode on behalf of REPACK (CONCURRENTLY)?
495 : */
496 : bool
497 1494449 : change_useless_for_repack(XLogRecordBuffer *buf)
498 : {
499 1494449 : XLogReaderState *r = buf->record;
500 : RelFileLocator locator;
501 :
502 : /* TOAST locator should not be set unless the main is. */
503 : Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
504 : OidIsValid(repacked_rel_locator.relNumber));
505 :
506 : /*
507 : * Backends not involved in REPACK (CONCURRENTLY) should not do the
508 : * filtering.
509 : */
510 1494449 : if (!OidIsValid(repacked_rel_locator.relNumber))
511 1494318 : return false;
512 :
513 : /*
514 : * If the record does not contain the block 0, it's probably not INSERT /
515 : * UPDATE / DELETE. In any case, we do not have enough information to
516 : * filter the change out.
517 : */
518 131 : if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
519 0 : return false;
520 :
521 : /*
522 : * Decode the change if it belongs to the table we are repacking, or if it
523 : * belongs to its TOAST relation.
524 : */
525 131 : if (RelFileLocatorEquals(locator, repacked_rel_locator))
526 21 : return false;
527 110 : if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
528 65 : RelFileLocatorEquals(locator, repacked_rel_toast_locator))
529 8 : return false;
530 :
531 : /* Filter out changes of other tables. */
532 102 : return true;
533 : }
|