Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * repack_worker.c
4 : * Implementation of the background worker for ad-hoc logical decoding
5 : * during REPACK (CONCURRENTLY).
6 : *
7 : *
8 : * Copyright (c) 2026, PostgreSQL Global Development Group
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/commands/repack_worker.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/table.h"
19 : #include "access/xlog_internal.h"
20 : #include "access/xlogutils.h"
21 : #include "access/xlogwait.h"
22 : #include "commands/repack.h"
23 : #include "commands/repack_internal.h"
24 : #include "libpq/pqmq.h"
25 : #include "replication/snapbuild.h"
26 : #include "storage/ipc.h"
27 : #include "storage/proc.h"
28 : #include "tcop/tcopprot.h"
29 : #include "utils/memutils.h"
30 :
31 : #define REPL_PLUGIN_NAME "pgrepack"
32 :
33 : static void RepackWorkerShutdown(int code, Datum arg);
34 : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
35 : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
36 : static void export_initial_snapshot(Snapshot snapshot,
37 : DecodingWorkerShared *shared);
38 : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
39 : DecodingWorkerShared *shared);
40 :
41 : /* Is this process a REPACK worker? */
42 : static bool am_repack_worker = false;
43 :
44 : /* The WAL segment being decoded. */
45 : static XLogSegNo repack_current_segment = 0;
46 :
47 : /* Our DSM segment, for shutting down */
48 : static dsm_segment *worker_dsm_segment = NULL;
49 :
50 : /*
51 : * Keep track of the table we're processing, to skip logical decoding of data
52 : * from other relations.
53 : */
54 : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
55 : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
56 :
57 :
58 : /* REPACK decoding worker entry point */
59 : void
60 6 : RepackWorkerMain(Datum main_arg)
61 : {
62 : dsm_segment *seg;
63 : DecodingWorkerShared *shared;
64 : shm_mq *mq;
65 : shm_mq_handle *mqh;
66 : LogicalDecodingContext *decoding_ctx;
67 : SharedFileSet *sfs;
68 : Snapshot snapshot;
69 :
70 6 : am_repack_worker = true;
71 :
72 : /*
73 : * Override the default bgworker_die() with die() so we can use
74 : * CHECK_FOR_INTERRUPTS().
75 : */
76 6 : pqsignal(SIGTERM, die);
77 6 : BackgroundWorkerUnblockSignals();
78 :
79 6 : seg = dsm_attach(DatumGetUInt32(main_arg));
80 6 : if (seg == NULL)
81 0 : ereport(ERROR,
82 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
83 : errmsg("could not map dynamic shared memory segment"));
84 6 : worker_dsm_segment = seg;
85 :
86 6 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
87 :
88 : /* Arrange to signal the leader if we exit. */
89 6 : before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
90 :
91 : /*
92 : * Join locking group - see the comments around the call of
93 : * start_repack_decoding_worker().
94 : */
95 6 : if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
96 0 : return; /* The leader is not running anymore. */
97 :
98 : /*
99 : * Setup a queue to send error messages to the backend that launched this
100 : * worker.
101 : */
102 6 : mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
103 6 : shm_mq_set_sender(mq, MyProc);
104 6 : mqh = shm_mq_attach(mq, seg, NULL);
105 6 : pq_redirect_to_shm_mq(seg, mqh);
106 6 : pq_set_parallel_leader(shared->backend_pid,
107 : shared->backend_proc_number);
108 :
109 : /* Connect to the database. LOGIN is not required. */
110 6 : BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
111 : BGWORKER_BYPASS_ROLELOGINCHECK);
112 :
113 : /*
114 : * Transaction is needed to open relation, and it also provides us with a
115 : * resource owner.
116 : */
117 6 : StartTransactionCommand();
118 :
119 6 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
120 :
121 : /*
122 : * Not sure the spinlock is needed here - the backend should not change
123 : * anything in the shared memory until we have serialized the snapshot.
124 : */
125 6 : SpinLockAcquire(&shared->mutex);
126 : Assert(!XLogRecPtrIsValid(shared->lsn_upto));
127 6 : sfs = &shared->sfs;
128 6 : SpinLockRelease(&shared->mutex);
129 :
130 6 : SharedFileSetAttach(sfs, seg);
131 :
132 : /*
133 : * Prepare to capture the concurrent data changes ourselves.
134 : */
135 6 : decoding_ctx = repack_setup_logical_decoding(shared->relid);
136 :
137 : /* Announce that we're ready. */
138 6 : SpinLockAcquire(&shared->mutex);
139 6 : shared->initialized = true;
140 6 : SpinLockRelease(&shared->mutex);
141 6 : ConditionVariableSignal(&shared->cv);
142 :
143 : /* There doesn't seem to a nice API to set these */
144 6 : XactIsoLevel = XACT_REPEATABLE_READ;
145 6 : XactReadOnly = true;
146 :
147 : /* Build the initial snapshot and export it. */
148 6 : snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
149 6 : export_initial_snapshot(snapshot, shared);
150 :
151 : /*
152 : * Only historic snapshots should be used now. Do not let us restrict the
153 : * progress of xmin horizon.
154 : */
155 6 : InvalidateCatalogSnapshot();
156 :
157 : for (;;)
158 6 : {
159 12 : bool stop = decode_concurrent_changes(decoding_ctx, shared);
160 :
161 12 : if (stop)
162 6 : break;
163 :
164 : }
165 :
166 : /* Cleanup. */
167 6 : repack_cleanup_logical_decoding(decoding_ctx);
168 6 : CommitTransactionCommand();
169 : }
170 :
171 : /*
172 : * See ParallelWorkerShutdown for details.
173 : */
174 : static void
175 6 : RepackWorkerShutdown(int code, Datum arg)
176 : {
177 6 : DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
178 :
179 6 : SendProcSignal(shared->backend_pid,
180 : PROCSIG_REPACK_MESSAGE,
181 : shared->backend_proc_number);
182 :
183 6 : dsm_detach(worker_dsm_segment);
184 6 : }
185 :
186 : bool
187 2014 : AmRepackWorker(void)
188 : {
189 2014 : return am_repack_worker;
190 : }
191 :
192 : /*
193 : * This function is much like pg_create_logical_replication_slot() except that
194 : * the new slot is neither released (if anyone else could read changes from
195 : * our slot, we could miss changes other backends do while we copy the
196 : * existing data into temporary table), nor persisted (it's easier to handle
197 : * crash by restarting all the work from scratch).
198 : */
199 : static LogicalDecodingContext *
200 6 : repack_setup_logical_decoding(Oid relid)
201 : {
202 : Relation rel;
203 : Oid toastrelid;
204 : LogicalDecodingContext *ctx;
205 : NameData slotname;
206 : RepackDecodingState *dstate;
207 : MemoryContext oldcxt;
208 :
209 : /*
210 : * REPACK CONCURRENTLY is not allowed in a transaction block, so this
211 : * should never fire.
212 : */
213 : Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
214 :
215 : /*
216 : * Make sure we can use logical decoding.
217 : */
218 6 : CheckLogicalDecodingRequirements(true);
219 :
220 : /*
221 : * A single backend should not execute multiple REPACK commands at a time,
222 : * so use PID to make the slot unique.
223 : *
224 : * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
225 : */
226 6 : snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
227 6 : ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
228 : false, false);
229 :
230 6 : EnsureLogicalDecodingEnabled();
231 :
232 : /*
233 : * Neither prepare_write nor do_write callback nor update_progress is
234 : * useful for us.
235 : */
236 6 : ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
237 : NIL,
238 : true,
239 : true,
240 : InvalidXLogRecPtr,
241 6 : XL_ROUTINE(.page_read = read_local_xlog_page,
242 : .segment_open = wal_segment_open,
243 : .segment_close = wal_segment_close),
244 : NULL, NULL, NULL);
245 :
246 : /*
247 : * We don't have control on setting fast_forward, so at least check it.
248 : */
249 : Assert(!ctx->fast_forward);
250 :
251 : /* Avoid logical decoding of other relations. */
252 6 : rel = table_open(relid, AccessShareLock);
253 6 : repacked_rel_locator = rel->rd_locator;
254 6 : toastrelid = rel->rd_rel->reltoastrelid;
255 6 : if (OidIsValid(toastrelid))
256 : {
257 : Relation toastrel;
258 :
259 : /* Avoid logical decoding of other TOAST relations. */
260 3 : toastrel = table_open(toastrelid, AccessShareLock);
261 3 : repacked_rel_toast_locator = toastrel->rd_locator;
262 3 : table_close(toastrel, AccessShareLock);
263 : }
264 6 : table_close(rel, AccessShareLock);
265 :
266 6 : DecodingContextFindStartpoint(ctx);
267 :
268 : /*
269 : * decode_concurrent_changes() needs non-blocking callback.
270 : */
271 6 : ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
272 :
273 : /* Some WAL records should have been read. */
274 : Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr));
275 :
276 : /*
277 : * Initialize repack_current_segment so that we can notice WAL segment
278 : * boundaries.
279 : */
280 6 : XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
281 : wal_segment_size);
282 :
283 : /* Our private state belongs to the decoding context. */
284 6 : oldcxt = MemoryContextSwitchTo(ctx->context);
285 :
286 : /*
287 : * read_local_xlog_page_no_wait() needs to be able to indicate the end of
288 : * WAL.
289 : */
290 6 : ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
291 6 : dstate = palloc0_object(RepackDecodingState);
292 6 : MemoryContextSwitchTo(oldcxt);
293 :
294 : #ifdef USE_ASSERT_CHECKING
295 : dstate->relid = relid;
296 : #endif
297 :
298 6 : dstate->change_cxt = AllocSetContextCreate(ctx->context,
299 : "REPACK - change",
300 : ALLOCSET_DEFAULT_SIZES);
301 :
302 : /* The file will be set as soon as we have it opened. */
303 6 : dstate->file = NULL;
304 :
305 : /*
306 : * Memory context and resource owner for long-lived resources.
307 : */
308 6 : dstate->worker_cxt = CurrentMemoryContext;
309 6 : dstate->worker_resowner = CurrentResourceOwner;
310 :
311 6 : ctx->output_writer_private = dstate;
312 :
313 6 : return ctx;
314 : }
315 :
316 : static void
317 6 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
318 : {
319 : RepackDecodingState *dstate;
320 :
321 6 : dstate = (RepackDecodingState *) ctx->output_writer_private;
322 6 : if (dstate->slot)
323 1 : ExecDropSingleTupleTableSlot(dstate->slot);
324 :
325 6 : FreeDecodingContext(ctx);
326 6 : ReplicationSlotDropAcquired();
327 6 : }
328 :
329 : /*
330 : * Make snapshot available to the backend that launched the decoding worker.
331 : */
332 : static void
333 6 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
334 : {
335 : char fname[MAXPGPATH];
336 : BufFile *file;
337 : Size snap_size;
338 : char *snap_space;
339 :
340 6 : snap_size = EstimateSnapshotSpace(snapshot);
341 6 : snap_space = (char *) palloc(snap_size);
342 6 : SerializeSnapshot(snapshot, snap_space);
343 :
344 6 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
345 6 : file = BufFileCreateFileSet(&shared->sfs.fs, fname);
346 : /* To make restoration easier, write the snapshot size first. */
347 6 : BufFileWrite(file, &snap_size, sizeof(snap_size));
348 6 : BufFileWrite(file, snap_space, snap_size);
349 6 : BufFileClose(file);
350 6 : pfree(snap_space);
351 :
352 : /* Increase the counter to tell the backend that the file is available. */
353 6 : SpinLockAcquire(&shared->mutex);
354 6 : shared->last_exported++;
355 6 : SpinLockRelease(&shared->mutex);
356 6 : ConditionVariableSignal(&shared->cv);
357 6 : }
358 :
359 : /*
360 : * Decode logical changes from the WAL sequence and store them to a file.
361 : *
362 : * If true is returned, there is no more work for the worker.
363 : */
364 : static bool
365 12 : decode_concurrent_changes(LogicalDecodingContext *ctx,
366 : DecodingWorkerShared *shared)
367 : {
368 : RepackDecodingState *dstate;
369 : XLogRecPtr lsn_upto;
370 : bool done;
371 : char fname[MAXPGPATH];
372 :
373 12 : dstate = (RepackDecodingState *) ctx->output_writer_private;
374 :
375 : /* Open the output file. */
376 12 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
377 12 : dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
378 :
379 12 : SpinLockAcquire(&shared->mutex);
380 12 : lsn_upto = shared->lsn_upto;
381 12 : done = shared->done;
382 12 : SpinLockRelease(&shared->mutex);
383 :
384 : while (true)
385 1370 : {
386 : XLogRecord *record;
387 : XLogSegNo segno_new;
388 1382 : char *errm = NULL;
389 : XLogRecPtr end_lsn;
390 :
391 1382 : CHECK_FOR_INTERRUPTS();
392 :
393 1382 : record = XLogReadRecord(ctx->reader, &errm);
394 1382 : if (record)
395 : {
396 1352 : LogicalDecodingProcessRecord(ctx, ctx->reader);
397 :
398 : /*
399 : * If WAL segment boundary has been crossed, inform the decoding
400 : * system that the catalog_xmin can advance.
401 : */
402 1352 : end_lsn = ctx->reader->EndRecPtr;
403 1352 : XLByteToSeg(end_lsn, segno_new, wal_segment_size);
404 1352 : if (segno_new != repack_current_segment)
405 : {
406 0 : LogicalConfirmReceivedLocation(end_lsn);
407 0 : elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
408 : (uint32) (end_lsn >> 32), (uint32) end_lsn);
409 0 : repack_current_segment = segno_new;
410 : }
411 : }
412 : else
413 : {
414 : ReadLocalXLogPageNoWaitPrivate *priv;
415 :
416 30 : if (errm)
417 0 : ereport(ERROR,
418 : errcode_for_file_access(),
419 : errmsg("could not read WAL from timeline %u at %X/%08X: %s",
420 : ctx->reader->currTLI,
421 : LSN_FORMAT_ARGS(ctx->reader->EndRecPtr),
422 : errm));
423 :
424 : /*
425 : * In the decoding loop we do not want to get blocked when there
426 : * is no more WAL available, otherwise the loop would become
427 : * uninterruptible.
428 : */
429 30 : priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
430 30 : if (priv->end_of_wal)
431 : /* Do not miss the end of WAL condition next time. */
432 30 : priv->end_of_wal = false;
433 : else
434 0 : ereport(ERROR,
435 : errmsg("could not read WAL record"));
436 : }
437 :
438 : /*
439 : * Whether we could read new record or not, keep checking if
440 : * 'lsn_upto' was specified.
441 : */
442 1382 : if (!XLogRecPtrIsValid(lsn_upto))
443 : {
444 1034 : SpinLockAcquire(&shared->mutex);
445 1034 : lsn_upto = shared->lsn_upto;
446 : /* 'done' should be set at the same time as 'lsn_upto' */
447 1034 : done = shared->done;
448 1034 : SpinLockRelease(&shared->mutex);
449 : }
450 1382 : if (XLogRecPtrIsValid(lsn_upto) &&
451 360 : ctx->reader->EndRecPtr >= lsn_upto)
452 12 : break;
453 :
454 1370 : if (record == NULL)
455 : {
456 25 : int64 timeout = 0;
457 : WaitLSNResult res;
458 :
459 : /*
460 : * Before we retry reading, wait until new WAL is flushed.
461 : *
462 : * There is a race condition such that the backend executing
463 : * REPACK determines 'lsn_upto', but before it sets the shared
464 : * variable, we reach the end of WAL. In that case we'd need to
465 : * wait until the next WAL flush (unrelated to REPACK). Although
466 : * that should not be a problem in a busy system, it might be
467 : * noticeable in other cases, including regression tests (which
468 : * are not necessarily executed in parallel). Therefore it makes
469 : * sense to use timeout.
470 : *
471 : * If lsn_upto is valid, WAL records having LSN lower than that
472 : * should already have been flushed to disk.
473 : */
474 25 : if (!XLogRecPtrIsValid(lsn_upto))
475 25 : timeout = 100L;
476 25 : res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
477 25 : ctx->reader->EndRecPtr + 1,
478 : timeout);
479 25 : if (res != WAIT_LSN_RESULT_SUCCESS &&
480 : res != WAIT_LSN_RESULT_TIMEOUT)
481 0 : ereport(ERROR,
482 : errmsg("waiting for WAL failed"));
483 : }
484 : }
485 :
486 : /*
487 : * Close the file so we can make it available to the backend.
488 : */
489 12 : BufFileClose(dstate->file);
490 12 : dstate->file = NULL;
491 12 : SpinLockAcquire(&shared->mutex);
492 12 : shared->lsn_upto = InvalidXLogRecPtr;
493 12 : shared->last_exported++;
494 12 : SpinLockRelease(&shared->mutex);
495 12 : ConditionVariableSignal(&shared->cv);
496 :
497 12 : return done;
498 : }
499 :
500 : /*
501 : * Does the WAL record contain a data change that this backend does not need
502 : * to decode on behalf of REPACK (CONCURRENTLY)?
503 : */
504 : bool
505 1586953 : change_useless_for_repack(XLogRecordBuffer *buf)
506 : {
507 1586953 : XLogReaderState *r = buf->record;
508 : RelFileLocator locator;
509 :
510 : /* TOAST locator should not be set unless the main is. */
511 : Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
512 : OidIsValid(repacked_rel_locator.relNumber));
513 :
514 : /*
515 : * Backends not involved in REPACK (CONCURRENTLY) should not do the
516 : * filtering.
517 : */
518 1586953 : if (!OidIsValid(repacked_rel_locator.relNumber))
519 1586538 : return false;
520 :
521 : /*
522 : * If the record does not contain the block 0, it's probably not INSERT /
523 : * UPDATE / DELETE. In any case, we do not have enough information to
524 : * filter the change out.
525 : */
526 415 : if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
527 0 : return false;
528 :
529 : /*
530 : * Decode the change if it belongs to the table we are repacking, or if it
531 : * belongs to its TOAST relation.
532 : */
533 415 : if (RelFileLocatorEquals(locator, repacked_rel_locator))
534 33 : return false;
535 382 : if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
536 299 : RelFileLocatorEquals(locator, repacked_rel_toast_locator))
537 44 : return false;
538 :
539 : /* Filter out changes of other tables. */
540 338 : return true;
541 : }
|