LCOV - code coverage report
Current view: top level - src/backend/commands - repack_worker.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 93.6 % 157 147
Test Date: 2026-06-12 20:16:36 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * repack_worker.c
       4              :  *    Implementation of the background worker for ad-hoc logical decoding
       5              :  *    during REPACK (CONCURRENTLY).
       6              :  *
       7              :  *
       8              :  * Copyright (c) 2026, PostgreSQL Global Development Group
       9              :  *
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *    src/backend/commands/repack_worker.c
      13              :  *
      14              :  *-------------------------------------------------------------------------
      15              :  */
      16              : #include "postgres.h"
      17              : 
      18              : #include "access/table.h"
      19              : #include "access/xlog_internal.h"
      20              : #include "access/xlogutils.h"
      21              : #include "access/xlogwait.h"
      22              : #include "commands/repack.h"
      23              : #include "commands/repack_internal.h"
      24              : #include "libpq/pqmq.h"
      25              : #include "replication/snapbuild.h"
      26              : #include "storage/ipc.h"
      27              : #include "storage/proc.h"
      28              : #include "tcop/tcopprot.h"
      29              : #include "utils/memutils.h"
      30              : 
      31              : #define PGREPACK_PLUGIN   "pgrepack"
      32              : 
      33              : static void RepackWorkerShutdown(int code, Datum arg);
      34              : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
      35              : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
      36              : static void export_initial_snapshot(Snapshot snapshot,
      37              :                                     DecodingWorkerShared *shared);
      38              : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
      39              :                                       DecodingWorkerShared *shared);
      40              : 
      41              : /* Is this process a REPACK worker? */
      42              : static bool am_repack_worker = false;
      43              : 
      44              : /* The WAL segment being decoded. */
      45              : static XLogSegNo repack_current_segment = 0;
      46              : 
      47              : /* Our DSM segment, for shutting down */
      48              : static dsm_segment *worker_dsm_segment = NULL;
      49              : 
      50              : /*
      51              :  * Keep track of the table we're processing, to skip logical decoding of data
      52              :  * from other relations.
      53              :  */
      54              : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
      55              : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
      56              : 
      57              : 
      58              : /* REPACK decoding worker entry point */
      59              : void
      60            7 : RepackWorkerMain(Datum main_arg)
      61              : {
      62              :     dsm_segment *seg;
      63              :     DecodingWorkerShared *shared;
      64              :     shm_mq     *mq;
      65              :     shm_mq_handle *mqh;
      66              :     LogicalDecodingContext *decoding_ctx;
      67              :     SharedFileSet *sfs;
      68              :     Snapshot    snapshot;
      69              : 
      70            7 :     am_repack_worker = true;
      71              : 
      72            7 :     BackgroundWorkerUnblockSignals();
      73              : 
      74            7 :     seg = dsm_attach(DatumGetUInt32(main_arg));
      75            7 :     if (seg == NULL)
      76            0 :         ereport(ERROR,
      77              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      78              :                 errmsg("could not map dynamic shared memory segment"));
      79            7 :     worker_dsm_segment = seg;
      80              : 
      81            7 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
      82              : 
      83              :     /* Arrange to signal the leader if we exit. */
      84            7 :     before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
      85              : 
      86              :     /*
      87              :      * Join locking group - see the comments around the call of
      88              :      * start_repack_decoding_worker().
      89              :      */
      90            7 :     if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
      91            0 :         return;                 /* The leader is not running anymore. */
      92              : 
      93              :     /*
      94              :      * Setup a queue to send error messages to the backend that launched this
      95              :      * worker.
      96              :      */
      97            7 :     mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
      98            7 :     shm_mq_set_sender(mq, MyProc);
      99            7 :     mqh = shm_mq_attach(mq, seg, NULL);
     100            7 :     pq_redirect_to_shm_mq(seg, mqh);
     101            7 :     pq_set_parallel_leader(shared->backend_pid,
     102              :                            shared->backend_proc_number);
     103              : 
     104              :     /* Connect to the database. LOGIN is not required. */
     105            7 :     BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
     106              :                                               BGWORKER_BYPASS_ROLELOGINCHECK);
     107              : 
     108              :     /*
     109              :      * Transaction is needed to open relation, and it also provides us with a
     110              :      * resource owner.
     111              :      */
     112            7 :     StartTransactionCommand();
     113              : 
     114            7 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
     115              : 
     116              :     /*
     117              :      * Not sure the spinlock is needed here - the backend should not change
     118              :      * anything in the shared memory until we have serialized the snapshot.
     119              :      */
     120            7 :     SpinLockAcquire(&shared->mutex);
     121              :     Assert(!XLogRecPtrIsValid(shared->lsn_upto));
     122            7 :     sfs = &shared->sfs;
     123            7 :     SpinLockRelease(&shared->mutex);
     124              : 
     125            7 :     SharedFileSetAttach(sfs, seg);
     126              : 
     127              :     /*
     128              :      * Prepare to capture the concurrent data changes ourselves.
     129              :      */
     130            7 :     decoding_ctx = repack_setup_logical_decoding(shared->relid);
     131              : 
     132              :     /* Announce that we're ready. */
     133            7 :     SpinLockAcquire(&shared->mutex);
     134            7 :     shared->initialized = true;
     135            7 :     SpinLockRelease(&shared->mutex);
     136            7 :     ConditionVariableSignal(&shared->cv);
     137              : 
     138              :     /* There doesn't seem to a nice API to set these */
     139            7 :     XactIsoLevel = XACT_REPEATABLE_READ;
     140            7 :     XactReadOnly = true;
     141              : 
     142              :     /* Build the initial snapshot and export it. */
     143            7 :     snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
     144            7 :     export_initial_snapshot(snapshot, shared);
     145              : 
     146              :     /*
     147              :      * Only historic snapshots should be used now. Do not let us restrict the
     148              :      * progress of xmin horizon.
     149              :      */
     150            7 :     InvalidateCatalogSnapshot();
     151              : 
     152              :     for (;;)
     153            7 :     {
     154           14 :         bool        stop = decode_concurrent_changes(decoding_ctx, shared);
     155              : 
     156           14 :         if (stop)
     157            7 :             break;
     158              : 
     159              :     }
     160              : 
     161              :     /* Cleanup. */
     162            7 :     repack_cleanup_logical_decoding(decoding_ctx);
     163            7 :     CommitTransactionCommand();
     164              : }
     165              : 
     166              : /*
     167              :  * See ParallelWorkerShutdown for details.
     168              :  */
     169              : static void
     170            7 : RepackWorkerShutdown(int code, Datum arg)
     171              : {
     172            7 :     DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
     173              : 
     174            7 :     SendProcSignal(shared->backend_pid,
     175              :                    PROCSIG_REPACK_MESSAGE,
     176              :                    shared->backend_proc_number);
     177              : 
     178            7 :     dsm_detach(worker_dsm_segment);
     179            7 : }
     180              : 
     181              : bool
     182         2033 : AmRepackWorker(void)
     183              : {
     184         2033 :     return am_repack_worker;
     185              : }
     186              : 
     187              : /*
     188              :  * This function is much like pg_create_logical_replication_slot() except that
     189              :  * the new slot is neither released (if anyone else could read changes from
     190              :  * our slot, we could miss changes other backends do while we copy the
     191              :  * existing data into temporary table), nor persisted (it's easier to handle
     192              :  * crash by restarting all the work from scratch).
     193              :  */
     194              : static LogicalDecodingContext *
     195            7 : repack_setup_logical_decoding(Oid relid)
     196              : {
     197              :     Relation    rel;
     198              :     Oid         toastrelid;
     199              :     LogicalDecodingContext *ctx;
     200              :     char        slotname[NAMEDATALEN];
     201              :     RepackDecodingState *dstate;
     202              :     MemoryContext oldcxt;
     203              : 
     204              :     /*
     205              :      * REPACK CONCURRENTLY is not allowed in a transaction block, so this
     206              :      * should never fire.
     207              :      */
     208              :     Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
     209              : 
     210              :     /* Make sure we can use logical decoding */
     211            7 :     CheckLogicalDecodingRequirements(true);
     212              : 
     213              :     /*
     214              :      * Create the replication slot we'll use, and enable logical decoding in
     215              :      * case it isn't already on.
     216              :      *
     217              :      * Make the slot RS_TEMPORARY so that it's removed on ERROR.  A backend
     218              :      * cannot execute multiple REPACK commands at a time, so the PID is enough
     219              :      * to make the slot name unique.
     220              :      */
     221            7 :     snprintf(slotname, NAMEDATALEN, "pg_repack_%d", MyProcPid);
     222            7 :     ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true,
     223              :                           false, false);
     224            7 :     EnsureLogicalDecodingEnabled();
     225              : 
     226              :     /*
     227              :      * Set up repacked_rel_locator and repacked_rel_toast_locator, which we
     228              :      * use to skip decoding of unrelated relations.
     229              :      */
     230            7 :     rel = table_open(relid, AccessShareLock);
     231            7 :     repacked_rel_locator = rel->rd_locator;
     232            7 :     toastrelid = rel->rd_rel->reltoastrelid;
     233            7 :     if (OidIsValid(toastrelid))
     234              :     {
     235              :         Relation    toastrel;
     236              : 
     237              :         /* Avoid logical decoding of other TOAST relations. */
     238            3 :         toastrel = table_open(toastrelid, AccessShareLock);
     239            3 :         repacked_rel_toast_locator = toastrel->rd_locator;
     240            3 :         table_close(toastrel, AccessShareLock);
     241              :     }
     242            7 :     table_close(rel, AccessShareLock);
     243              : 
     244              :     /*
     245              :      * Set up our logical decoding context.  We initially use the blocking
     246              :      * read_local_xlog_page until we find the start point, and switch to the
     247              :      * non-blocking interface afterwards.
     248              :      */
     249            7 :     ctx = CreateInitDecodingContext(PGREPACK_PLUGIN,
     250              :                                     NIL,
     251              :                                     true,
     252              :                                     true,
     253              :                                     InvalidXLogRecPtr,
     254            7 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     255              :                                                .segment_open = wal_segment_open,
     256              :                                                .segment_close = wal_segment_close),
     257              :                                     NULL, NULL, NULL);
     258              : 
     259              :     /* Complete setup of output_writer_private */
     260            7 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     261            7 :     dstate->relid = relid;
     262            7 :     dstate->worker_cxt = CurrentMemoryContext;
     263            7 :     dstate->worker_resowner = CurrentResourceOwner;
     264              : 
     265              :     /* We don't have control on fast_forward, but verify it's sane */
     266              :     Assert(!ctx->fast_forward);
     267              : 
     268              :     /* Find our decoding starting point. */
     269            7 :     DecodingContextFindStartpoint(ctx);
     270              : 
     271              :     /* From this point on, we need non-blocking WAL reads */
     272            7 :     ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
     273              : 
     274              :     /*
     275              :      * Initialize repack_current_segment so that we can notice WAL segment
     276              :      * boundaries.
     277              :      */
     278            7 :     XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
     279              :                 wal_segment_size);
     280              : 
     281              :     /*
     282              :      * Set up our reader private state to let the page-read callback notify
     283              :      * when end-of-WAL has been reached.  This lives in the same context as
     284              :      * the logical decoding itself.
     285              :      */
     286            7 :     oldcxt = MemoryContextSwitchTo(ctx->context);
     287            7 :     ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
     288            7 :     MemoryContextSwitchTo(oldcxt);
     289              : 
     290            7 :     return ctx;
     291              : }
     292              : 
     293              : static void
     294            7 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
     295              : {
     296              :     RepackDecodingState *dstate;
     297              : 
     298            7 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     299            7 :     if (dstate->slot)
     300            1 :         ExecDropSingleTupleTableSlot(dstate->slot);
     301              : 
     302            7 :     FreeDecodingContext(ctx);
     303            7 :     ReplicationSlotDropAcquired(true);
     304            7 : }
     305              : 
     306              : /*
     307              :  * Make snapshot available to the backend that launched the decoding worker.
     308              :  */
     309              : static void
     310            7 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
     311              : {
     312              :     char        fname[MAXPGPATH];
     313              :     BufFile    *file;
     314              :     Size        snap_size;
     315              :     char       *snap_space;
     316              : 
     317            7 :     snap_size = EstimateSnapshotSpace(snapshot);
     318            7 :     snap_space = (char *) palloc(snap_size);
     319            7 :     SerializeSnapshot(snapshot, snap_space);
     320              : 
     321            7 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
     322            7 :     file = BufFileCreateFileSet(&shared->sfs.fs, fname);
     323              :     /* To make restoration easier, write the snapshot size first. */
     324            7 :     BufFileWrite(file, &snap_size, sizeof(snap_size));
     325            7 :     BufFileWrite(file, snap_space, snap_size);
     326            7 :     BufFileClose(file);
     327            7 :     pfree(snap_space);
     328              : 
     329              :     /* Increase the counter to tell the backend that the file is available. */
     330            7 :     SpinLockAcquire(&shared->mutex);
     331            7 :     shared->last_exported++;
     332            7 :     SpinLockRelease(&shared->mutex);
     333            7 :     ConditionVariableSignal(&shared->cv);
     334            7 : }
     335              : 
     336              : /*
     337              :  * Decode logical changes from the WAL sequence and store them to a file.
     338              :  *
     339              :  * If true is returned, there is no more work for the worker.
     340              :  */
     341              : static bool
     342           14 : decode_concurrent_changes(LogicalDecodingContext *ctx,
     343              :                           DecodingWorkerShared *shared)
     344              : {
     345              :     RepackDecodingState *dstate;
     346              :     XLogRecPtr  lsn_upto;
     347              :     bool        done;
     348              :     char        fname[MAXPGPATH];
     349              : 
     350           14 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     351              : 
     352              :     /* Open the output file. */
     353           14 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
     354           14 :     dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
     355              : 
     356           14 :     SpinLockAcquire(&shared->mutex);
     357           14 :     lsn_upto = shared->lsn_upto;
     358           14 :     done = shared->done;
     359           14 :     SpinLockRelease(&shared->mutex);
     360              : 
     361              :     while (true)
     362        11090 :     {
     363              :         XLogRecord *record;
     364              :         XLogSegNo   segno_new;
     365        11104 :         char       *errm = NULL;
     366              :         XLogRecPtr  end_lsn;
     367              : 
     368        11104 :         CHECK_FOR_INTERRUPTS();
     369              : 
     370        11104 :         record = XLogReadRecord(ctx->reader, &errm);
     371        11104 :         if (record)
     372              :         {
     373         1445 :             LogicalDecodingProcessRecord(ctx, ctx->reader);
     374              : 
     375              :             /*
     376              :              * We want to allow WAL to be recycled while REPACK is running.
     377              :              *
     378              :              * In normal usage of a replication slot, we need to be very
     379              :              * careful not to advance the LSN until it's been confirmed as
     380              :              * received by the remote.  In REPACK's case, this is not needed:
     381              :              * REPACK will never try to replay the same WAL after a crash, and
     382              :              * if there _is_ a crash, the whole REPACK has to be started from
     383              :              * scratch anyway.
     384              :              *
     385              :              * So here we disregard the careful LSN tracking and just move the
     386              :              * LSN locations forward to what we've processed.  Note that it
     387              :              * would be bogus to move the xmin forward, though, so we don't
     388              :              * touch that.
     389              :              *
     390              :              * This can be done on whatever schedule is convenient, but in
     391              :              * order not to cause unnecessary load, we only do it as we cross
     392              :              * each WAL segment boundary.
     393              :              */
     394         1445 :             end_lsn = ctx->reader->EndRecPtr;
     395         1445 :             XLByteToSeg(end_lsn, segno_new, wal_segment_size);
     396         1445 :             if (segno_new != repack_current_segment)
     397              :             {
     398            0 :                 LogicalIncreaseRestartDecodingForSlot(end_lsn, end_lsn);
     399            0 :                 LogicalConfirmReceivedLocation(end_lsn);
     400            0 :                 elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
     401              :                      (uint32) (end_lsn >> 32), (uint32) end_lsn);
     402            0 :                 repack_current_segment = segno_new;
     403              :             }
     404              :         }
     405              :         else
     406              :         {
     407              :             ReadLocalXLogPageNoWaitPrivate *priv;
     408              : 
     409         9659 :             if (errm)
     410            0 :                 ereport(ERROR,
     411              :                         errcode_for_file_access(),
     412              :                         errmsg("could not read WAL from timeline %u at %X/%08X: %s",
     413              :                                ctx->reader->currTLI,
     414              :                                LSN_FORMAT_ARGS(ctx->reader->EndRecPtr),
     415              :                                errm));
     416              : 
     417              :             /*
     418              :              * In the decoding loop we do not want to get blocked when there
     419              :              * is no more WAL available, otherwise the loop would become
     420              :              * uninterruptible.
     421              :              */
     422         9659 :             priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
     423         9659 :             if (priv->end_of_wal)
     424              :                 /* Do not miss the end of WAL condition next time. */
     425         9659 :                 priv->end_of_wal = false;
     426              :             else
     427            0 :                 ereport(ERROR,
     428              :                         errcode(ERRCODE_DATA_CORRUPTED),
     429              :                         errmsg("could not read WAL record"));
     430              :         }
     431              : 
     432              :         /*
     433              :          * Whether we could read new record or not, keep checking if
     434              :          * 'lsn_upto' was specified.
     435              :          */
     436        11104 :         if (!XLogRecPtrIsValid(lsn_upto))
     437              :         {
     438        10693 :             SpinLockAcquire(&shared->mutex);
     439        10693 :             lsn_upto = shared->lsn_upto;
     440              :             /* 'done' should be set at the same time as 'lsn_upto' */
     441        10693 :             done = shared->done;
     442        10693 :             SpinLockRelease(&shared->mutex);
     443              :         }
     444        11104 :         if (XLogRecPtrIsValid(lsn_upto) &&
     445          425 :             ctx->reader->EndRecPtr >= lsn_upto)
     446           14 :             break;
     447              : 
     448        11090 :         if (record == NULL)
     449              :         {
     450         9654 :             int64       timeout = 0;
     451              :             WaitLSNResult res;
     452              : 
     453              :             /*
     454              :              * Before we retry reading, wait until new WAL is flushed.
     455              :              *
     456              :              * There is a race condition such that the backend executing
     457              :              * REPACK determines 'lsn_upto', but before it sets the shared
     458              :              * variable, we reach the end of WAL. In that case we'd need to
     459              :              * wait until the next WAL flush (unrelated to REPACK). Although
     460              :              * that should not be a problem in a busy system, it might be
     461              :              * noticeable in other cases, including regression tests (which
     462              :              * are not necessarily executed in parallel). Therefore it makes
     463              :              * sense to use timeout.
     464              :              *
     465              :              * If lsn_upto is valid, WAL records having LSN lower than that
     466              :              * should already have been flushed to disk.
     467              :              */
     468         9654 :             if (!XLogRecPtrIsValid(lsn_upto))
     469         9654 :                 timeout = 100L;
     470         9654 :             res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
     471         9654 :                              ctx->reader->EndRecPtr + 1,
     472              :                              timeout);
     473         9654 :             if (res != WAIT_LSN_RESULT_SUCCESS &&
     474              :                 res != WAIT_LSN_RESULT_TIMEOUT)
     475            0 :                 ereport(ERROR,
     476              :                         errcode(ERRCODE_INTERNAL_ERROR),
     477              :                         errmsg("waiting for WAL failed"));
     478              :         }
     479              :     }
     480              : 
     481              :     /*
     482              :      * Close the file so we can make it available to the backend.
     483              :      */
     484           14 :     BufFileClose(dstate->file);
     485           14 :     dstate->file = NULL;
     486           14 :     SpinLockAcquire(&shared->mutex);
     487           14 :     shared->lsn_upto = InvalidXLogRecPtr;
     488           14 :     shared->last_exported++;
     489           14 :     SpinLockRelease(&shared->mutex);
     490           14 :     ConditionVariableSignal(&shared->cv);
     491              : 
     492           14 :     return done;
     493              : }
     494              : 
     495              : /*
     496              :  * Does the WAL record contain a data change that this backend does not need
     497              :  * to decode on behalf of REPACK (CONCURRENTLY)?
     498              :  */
     499              : bool
     500      1659051 : change_useless_for_repack(XLogRecordBuffer *buf)
     501              : {
     502      1659051 :     XLogReaderState *r = buf->record;
     503              :     RelFileLocator locator;
     504              : 
     505              :     /* TOAST locator should not be set unless the main is. */
     506              :     Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
     507              :            OidIsValid(repacked_rel_locator.relNumber));
     508              : 
     509              :     /*
     510              :      * Backends not involved in REPACK (CONCURRENTLY) should not do the
     511              :      * filtering.
     512              :      */
     513      1659051 :     if (!OidIsValid(repacked_rel_locator.relNumber))
     514      1658621 :         return false;
     515              : 
     516              :     /*
     517              :      * If the record does not contain the block 0, it's probably not INSERT /
     518              :      * UPDATE / DELETE. In any case, we do not have enough information to
     519              :      * filter the change out.
     520              :      */
     521          430 :     if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
     522            0 :         return false;
     523              : 
     524              :     /*
     525              :      * Decode the change if it belongs to the table we are repacking, or if it
     526              :      * belongs to its TOAST relation.
     527              :      */
     528          430 :     if (RelFileLocatorEquals(locator, repacked_rel_locator))
     529           33 :         return false;
     530          397 :     if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
     531          299 :         RelFileLocatorEquals(locator, repacked_rel_toast_locator))
     532           44 :         return false;
     533              : 
     534              :     /* Filter out changes of other tables. */
     535          353 :     return true;
     536              : }
        

Generated by: LCOV version 2.0-1