Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * repack_worker.c
4 : * Implementation of the background worker for ad-hoc logical decoding
5 : * during REPACK (CONCURRENTLY).
6 : *
7 : *
8 : * Copyright (c) 2026, PostgreSQL Global Development Group
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/commands/repack_worker.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/table.h"
19 : #include "access/xlog_internal.h"
20 : #include "access/xlogutils.h"
21 : #include "access/xlogwait.h"
22 : #include "commands/repack.h"
23 : #include "commands/repack_internal.h"
24 : #include "libpq/pqmq.h"
25 : #include "replication/snapbuild.h"
26 : #include "storage/ipc.h"
27 : #include "storage/proc.h"
28 : #include "tcop/tcopprot.h"
29 : #include "utils/memutils.h"
30 :
31 : #define REPL_PLUGIN_NAME "pgrepack"
32 :
33 : static void RepackWorkerShutdown(int code, Datum arg);
34 : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
35 : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
36 : static void export_initial_snapshot(Snapshot snapshot,
37 : DecodingWorkerShared *shared);
38 : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
39 : DecodingWorkerShared *shared);
40 :
41 : /* Is this process a REPACK worker? */
42 : static bool am_repack_worker = false;
43 :
44 : /* The WAL segment being decoded. */
45 : static XLogSegNo repack_current_segment = 0;
46 :
47 : /* Our DSM segment, for shutting down */
48 : static dsm_segment *worker_dsm_segment = NULL;
49 :
50 : /*
51 : * Keep track of the table we're processing, to skip logical decoding of data
52 : * from other relations.
53 : */
54 : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
55 : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
56 :
57 :
58 : /* REPACK decoding worker entry point */
59 : void
60 3 : RepackWorkerMain(Datum main_arg)
61 : {
62 : dsm_segment *seg;
63 : DecodingWorkerShared *shared;
64 : shm_mq *mq;
65 : shm_mq_handle *mqh;
66 : LogicalDecodingContext *decoding_ctx;
67 : SharedFileSet *sfs;
68 : Snapshot snapshot;
69 :
70 3 : am_repack_worker = true;
71 :
72 : /*
73 : * Override the default bgworker_die() with die() so we can use
74 : * CHECK_FOR_INTERRUPTS().
75 : */
76 3 : pqsignal(SIGTERM, die);
77 3 : BackgroundWorkerUnblockSignals();
78 :
79 3 : seg = dsm_attach(DatumGetUInt32(main_arg));
80 3 : if (seg == NULL)
81 0 : ereport(ERROR,
82 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
83 : errmsg("could not map dynamic shared memory segment"));
84 3 : worker_dsm_segment = seg;
85 :
86 3 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
87 :
88 : /* Arrange to signal the leader if we exit. */
89 3 : before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
90 :
91 : /*
92 : * Join locking group - see the comments around the call of
93 : * start_repack_decoding_worker().
94 : */
95 3 : if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
96 0 : return; /* The leader is not running anymore. */
97 :
98 : /*
99 : * Setup a queue to send error messages to the backend that launched this
100 : * worker.
101 : */
102 3 : mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
103 3 : shm_mq_set_sender(mq, MyProc);
104 3 : mqh = shm_mq_attach(mq, seg, NULL);
105 3 : pq_redirect_to_shm_mq(seg, mqh);
106 3 : pq_set_parallel_leader(shared->backend_pid,
107 : shared->backend_proc_number);
108 :
109 : /* Connect to the database. LOGIN is not required. */
110 3 : BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
111 : BGWORKER_BYPASS_ROLELOGINCHECK);
112 :
113 : /*
114 : * Transaction is needed to open relation, and it also provides us with a
115 : * resource owner.
116 : */
117 3 : StartTransactionCommand();
118 :
119 3 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
120 :
121 : /*
122 : * Not sure the spinlock is needed here - the backend should not change
123 : * anything in the shared memory until we have serialized the snapshot.
124 : */
125 3 : SpinLockAcquire(&shared->mutex);
126 : Assert(!XLogRecPtrIsValid(shared->lsn_upto));
127 3 : sfs = &shared->sfs;
128 3 : SpinLockRelease(&shared->mutex);
129 :
130 3 : SharedFileSetAttach(sfs, seg);
131 :
132 : /*
133 : * Prepare to capture the concurrent data changes ourselves.
134 : */
135 3 : decoding_ctx = repack_setup_logical_decoding(shared->relid);
136 :
137 : /* Announce that we're ready. */
138 3 : SpinLockAcquire(&shared->mutex);
139 3 : shared->initialized = true;
140 3 : SpinLockRelease(&shared->mutex);
141 3 : ConditionVariableSignal(&shared->cv);
142 :
143 : /* There doesn't seem to a nice API to set these */
144 3 : XactIsoLevel = XACT_REPEATABLE_READ;
145 3 : XactReadOnly = true;
146 :
147 : /* Build the initial snapshot and export it. */
148 3 : snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
149 3 : export_initial_snapshot(snapshot, shared);
150 :
151 : /*
152 : * Only historic snapshots should be used now. Do not let us restrict the
153 : * progress of xmin horizon.
154 : */
155 3 : InvalidateCatalogSnapshot();
156 :
157 : for (;;)
158 3 : {
159 6 : bool stop = decode_concurrent_changes(decoding_ctx, shared);
160 :
161 6 : if (stop)
162 3 : break;
163 :
164 : }
165 :
166 : /* Cleanup. */
167 3 : repack_cleanup_logical_decoding(decoding_ctx);
168 3 : CommitTransactionCommand();
169 : }
170 :
171 : /*
172 : * See ParallelWorkerShutdown for details.
173 : */
174 : static void
175 3 : RepackWorkerShutdown(int code, Datum arg)
176 : {
177 3 : DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
178 :
179 3 : SendProcSignal(shared->backend_pid,
180 : PROCSIG_REPACK_MESSAGE,
181 : shared->backend_proc_number);
182 :
183 3 : dsm_detach(worker_dsm_segment);
184 3 : }
185 :
186 : bool
187 2028 : AmRepackWorker(void)
188 : {
189 2028 : return am_repack_worker;
190 : }
191 :
192 : /*
193 : * This function is much like pg_create_logical_replication_slot() except that
194 : * the new slot is neither released (if anyone else could read changes from
195 : * our slot, we could miss changes other backends do while we copy the
196 : * existing data into temporary table), nor persisted (it's easier to handle
197 : * crash by restarting all the work from scratch).
198 : */
199 : static LogicalDecodingContext *
200 3 : repack_setup_logical_decoding(Oid relid)
201 : {
202 : Relation rel;
203 : Oid toastrelid;
204 : LogicalDecodingContext *ctx;
205 : NameData slotname;
206 : RepackDecodingState *dstate;
207 : MemoryContext oldcxt;
208 :
209 : /*
210 : * REPACK CONCURRENTLY is not allowed in a transaction block, so this
211 : * should never fire.
212 : */
213 : Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
214 :
215 : /*
216 : * Make sure we can use logical decoding.
217 : */
218 3 : CheckLogicalDecodingRequirements(true);
219 :
220 : /*
221 : * A single backend should not execute multiple REPACK commands at a time,
222 : * so use PID to make the slot unique.
223 : *
224 : * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
225 : */
226 3 : snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
227 3 : ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
228 : false, false);
229 :
230 3 : EnsureLogicalDecodingEnabled();
231 :
232 : /*
233 : * Neither prepare_write nor do_write callback nor update_progress is
234 : * useful for us.
235 : */
236 3 : ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
237 : NIL,
238 : true,
239 : true,
240 : InvalidXLogRecPtr,
241 3 : XL_ROUTINE(.page_read = read_local_xlog_page,
242 : .segment_open = wal_segment_open,
243 : .segment_close = wal_segment_close),
244 : NULL, NULL, NULL);
245 :
246 : /*
247 : * We don't have control on setting fast_forward, so at least check it.
248 : */
249 : Assert(!ctx->fast_forward);
250 :
251 : /* Avoid logical decoding of other relations. */
252 3 : rel = table_open(relid, AccessShareLock);
253 3 : repacked_rel_locator = rel->rd_locator;
254 3 : toastrelid = rel->rd_rel->reltoastrelid;
255 3 : if (OidIsValid(toastrelid))
256 : {
257 : Relation toastrel;
258 :
259 : /* Avoid logical decoding of other TOAST relations. */
260 1 : toastrel = table_open(toastrelid, AccessShareLock);
261 1 : repacked_rel_toast_locator = toastrel->rd_locator;
262 1 : table_close(toastrel, AccessShareLock);
263 : }
264 3 : table_close(rel, AccessShareLock);
265 :
266 3 : DecodingContextFindStartpoint(ctx);
267 :
268 : /*
269 : * decode_concurrent_changes() needs non-blocking callback.
270 : */
271 3 : ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
272 :
273 : /* Some WAL records should have been read. */
274 : Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr));
275 :
276 : /*
277 : * Initialize repack_current_segment so that we can notice WAL segment
278 : * boundaries.
279 : */
280 3 : XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
281 : wal_segment_size);
282 :
283 : /* Our private state belongs to the decoding context. */
284 3 : oldcxt = MemoryContextSwitchTo(ctx->context);
285 :
286 : /*
287 : * read_local_xlog_page_no_wait() needs to be able to indicate the end of
288 : * WAL.
289 : */
290 3 : ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
291 3 : dstate = palloc0_object(RepackDecodingState);
292 3 : MemoryContextSwitchTo(oldcxt);
293 :
294 : #ifdef USE_ASSERT_CHECKING
295 : dstate->relid = relid;
296 : #endif
297 :
298 3 : dstate->change_cxt = AllocSetContextCreate(ctx->context,
299 : "REPACK - change",
300 : ALLOCSET_DEFAULT_SIZES);
301 :
302 : /* The file will be set as soon as we have it opened. */
303 3 : dstate->file = NULL;
304 :
305 : /*
306 : * Memory context and resource owner for long-lived resources.
307 : */
308 3 : dstate->worker_cxt = CurrentMemoryContext;
309 3 : dstate->worker_resowner = CurrentResourceOwner;
310 :
311 3 : ctx->output_writer_private = dstate;
312 :
313 3 : return ctx;
314 : }
315 :
316 : static void
317 3 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
318 : {
319 : RepackDecodingState *dstate;
320 :
321 3 : dstate = (RepackDecodingState *) ctx->output_writer_private;
322 3 : if (dstate->slot)
323 1 : ExecDropSingleTupleTableSlot(dstate->slot);
324 :
325 3 : FreeDecodingContext(ctx);
326 3 : ReplicationSlotDropAcquired();
327 3 : }
328 :
329 : /*
330 : * Make snapshot available to the backend that launched the decoding worker.
331 : */
332 : static void
333 3 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
334 : {
335 : char fname[MAXPGPATH];
336 : BufFile *file;
337 : Size snap_size;
338 : char *snap_space;
339 :
340 3 : snap_size = EstimateSnapshotSpace(snapshot);
341 3 : snap_space = (char *) palloc(snap_size);
342 3 : SerializeSnapshot(snapshot, snap_space);
343 :
344 3 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
345 3 : file = BufFileCreateFileSet(&shared->sfs.fs, fname);
346 : /* To make restoration easier, write the snapshot size first. */
347 3 : BufFileWrite(file, &snap_size, sizeof(snap_size));
348 3 : BufFileWrite(file, snap_space, snap_size);
349 3 : BufFileClose(file);
350 3 : pfree(snap_space);
351 :
352 : /* Increase the counter to tell the backend that the file is available. */
353 3 : SpinLockAcquire(&shared->mutex);
354 3 : shared->last_exported++;
355 3 : SpinLockRelease(&shared->mutex);
356 3 : ConditionVariableSignal(&shared->cv);
357 3 : }
358 :
359 : /*
360 : * Decode logical changes from the WAL sequence and store them to a file.
361 : *
362 : * If true is returned, there is no more work for the worker.
363 : */
364 : static bool
365 6 : decode_concurrent_changes(LogicalDecodingContext *ctx,
366 : DecodingWorkerShared *shared)
367 : {
368 : RepackDecodingState *dstate;
369 : XLogRecPtr lsn_upto;
370 : bool done;
371 : char fname[MAXPGPATH];
372 :
373 6 : dstate = (RepackDecodingState *) ctx->output_writer_private;
374 :
375 : /* Open the output file. */
376 6 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
377 6 : dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
378 :
379 6 : SpinLockAcquire(&shared->mutex);
380 6 : lsn_upto = shared->lsn_upto;
381 6 : done = shared->done;
382 6 : SpinLockRelease(&shared->mutex);
383 :
384 : while (true)
385 875 : {
386 : XLogRecord *record;
387 : XLogSegNo segno_new;
388 881 : char *errm = NULL;
389 : XLogRecPtr end_lsn;
390 :
391 881 : CHECK_FOR_INTERRUPTS();
392 :
393 881 : record = XLogReadRecord(ctx->reader, &errm);
394 881 : if (record)
395 : {
396 863 : LogicalDecodingProcessRecord(ctx, ctx->reader);
397 :
398 : /*
399 : * If WAL segment boundary has been crossed, inform the decoding
400 : * system that the catalog_xmin can advance.
401 : */
402 863 : end_lsn = ctx->reader->EndRecPtr;
403 863 : XLByteToSeg(end_lsn, segno_new, wal_segment_size);
404 863 : if (segno_new != repack_current_segment)
405 : {
406 0 : LogicalConfirmReceivedLocation(end_lsn);
407 0 : elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
408 : (uint32) (end_lsn >> 32), (uint32) end_lsn);
409 0 : repack_current_segment = segno_new;
410 : }
411 : }
412 : else
413 : {
414 : ReadLocalXLogPageNoWaitPrivate *priv;
415 :
416 18 : if (errm)
417 0 : ereport(ERROR,
418 : errmsg("%s", errm));
419 :
420 : /*
421 : * In the decoding loop we do not want to get blocked when there
422 : * is no more WAL available, otherwise the loop would become
423 : * uninterruptible.
424 : */
425 18 : priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
426 18 : if (priv->end_of_wal)
427 : /* Do not miss the end of WAL condition next time. */
428 18 : priv->end_of_wal = false;
429 : else
430 0 : ereport(ERROR,
431 : errmsg("could not read WAL record"));
432 : }
433 :
434 : /*
435 : * Whether we could read new record or not, keep checking if
436 : * 'lsn_upto' was specified.
437 : */
438 881 : if (!XLogRecPtrIsValid(lsn_upto))
439 : {
440 753 : SpinLockAcquire(&shared->mutex);
441 753 : lsn_upto = shared->lsn_upto;
442 : /* 'done' should be set at the same time as 'lsn_upto' */
443 753 : done = shared->done;
444 753 : SpinLockRelease(&shared->mutex);
445 : }
446 881 : if (XLogRecPtrIsValid(lsn_upto) &&
447 134 : ctx->reader->EndRecPtr >= lsn_upto)
448 6 : break;
449 :
450 875 : if (record == NULL)
451 : {
452 15 : int64 timeout = 0;
453 : WaitLSNResult res;
454 :
455 : /*
456 : * Before we retry reading, wait until new WAL is flushed.
457 : *
458 : * There is a race condition such that the backend executing
459 : * REPACK determines 'lsn_upto', but before it sets the shared
460 : * variable, we reach the end of WAL. In that case we'd need to
461 : * wait until the next WAL flush (unrelated to REPACK). Although
462 : * that should not be a problem in a busy system, it might be
463 : * noticeable in other cases, including regression tests (which
464 : * are not necessarily executed in parallel). Therefore it makes
465 : * sense to use timeout.
466 : *
467 : * If lsn_upto is valid, WAL records having LSN lower than that
468 : * should already have been flushed to disk.
469 : */
470 15 : if (!XLogRecPtrIsValid(lsn_upto))
471 15 : timeout = 100L;
472 15 : res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
473 15 : ctx->reader->EndRecPtr + 1,
474 : timeout);
475 15 : if (res != WAIT_LSN_RESULT_SUCCESS &&
476 : res != WAIT_LSN_RESULT_TIMEOUT)
477 0 : ereport(ERROR,
478 : errmsg("waiting for WAL failed"));
479 : }
480 : }
481 :
482 : /*
483 : * Close the file so we can make it available to the backend.
484 : */
485 6 : BufFileClose(dstate->file);
486 6 : dstate->file = NULL;
487 6 : SpinLockAcquire(&shared->mutex);
488 6 : shared->lsn_upto = InvalidXLogRecPtr;
489 6 : shared->last_exported++;
490 6 : SpinLockRelease(&shared->mutex);
491 6 : ConditionVariableSignal(&shared->cv);
492 :
493 6 : return done;
494 : }
495 :
496 : /*
497 : * Does the WAL record contain a data change that this backend does not need
498 : * to decode on behalf of REPACK (CONCURRENTLY)?
499 : */
500 : bool
501 1492817 : change_useless_for_repack(XLogRecordBuffer *buf)
502 : {
503 1492817 : XLogReaderState *r = buf->record;
504 : RelFileLocator locator;
505 :
506 : /* TOAST locator should not be set unless the main is. */
507 : Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
508 : OidIsValid(repacked_rel_locator.relNumber));
509 :
510 : /*
511 : * Backends not involved in REPACK (CONCURRENTLY) should not do the
512 : * filtering.
513 : */
514 1492817 : if (!OidIsValid(repacked_rel_locator.relNumber))
515 1492481 : return false;
516 :
517 : /*
518 : * If the record does not contain the block 0, it's probably not INSERT /
519 : * UPDATE / DELETE. In any case, we do not have enough information to
520 : * filter the change out.
521 : */
522 336 : if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
523 0 : return false;
524 :
525 : /*
526 : * Decode the change if it belongs to the table we are repacking, or if it
527 : * belongs to its TOAST relation.
528 : */
529 336 : if (RelFileLocatorEquals(locator, repacked_rel_locator))
530 31 : return false;
531 305 : if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
532 240 : RelFileLocatorEquals(locator, repacked_rel_toast_locator))
533 44 : return false;
534 :
535 : /* Filter out changes of other tables. */
536 261 : return true;
537 : }
|