LCOV - code coverage report
Current view: top level - src/backend/commands - repack_worker.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.3 % 159 150
Test Date: 2026-05-02 10:16:34 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * repack_worker.c
       4              :  *    Implementation of the background worker for ad-hoc logical decoding
       5              :  *    during REPACK (CONCURRENTLY).
       6              :  *
       7              :  *
       8              :  * Copyright (c) 2026, PostgreSQL Global Development Group
       9              :  *
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *    src/backend/commands/repack_worker.c
      13              :  *
      14              :  *-------------------------------------------------------------------------
      15              :  */
      16              : #include "postgres.h"
      17              : 
      18              : #include "access/table.h"
      19              : #include "access/xlog_internal.h"
      20              : #include "access/xlogutils.h"
      21              : #include "access/xlogwait.h"
      22              : #include "commands/repack.h"
      23              : #include "commands/repack_internal.h"
      24              : #include "libpq/pqmq.h"
      25              : #include "replication/snapbuild.h"
      26              : #include "storage/ipc.h"
      27              : #include "storage/proc.h"
      28              : #include "tcop/tcopprot.h"
      29              : #include "utils/memutils.h"
      30              : 
      31              : #define REPL_PLUGIN_NAME   "pgrepack"
      32              : 
      33              : static void RepackWorkerShutdown(int code, Datum arg);
      34              : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
      35              : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
      36              : static void export_initial_snapshot(Snapshot snapshot,
      37              :                                     DecodingWorkerShared *shared);
      38              : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
      39              :                                       DecodingWorkerShared *shared);
      40              : 
      41              : /* Is this process a REPACK worker? */
      42              : static bool am_repack_worker = false;
      43              : 
      44              : /* The WAL segment being decoded. */
      45              : static XLogSegNo repack_current_segment = 0;
      46              : 
      47              : /* Our DSM segment, for shutting down */
      48              : static dsm_segment *worker_dsm_segment = NULL;
      49              : 
      50              : /*
      51              :  * Keep track of the table we're processing, to skip logical decoding of data
      52              :  * from other relations.
      53              :  */
      54              : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
      55              : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
      56              : 
      57              : 
      58              : /* REPACK decoding worker entry point */
      59              : void
      60            3 : RepackWorkerMain(Datum main_arg)
      61              : {
      62              :     dsm_segment *seg;
      63              :     DecodingWorkerShared *shared;
      64              :     shm_mq     *mq;
      65              :     shm_mq_handle *mqh;
      66              :     LogicalDecodingContext *decoding_ctx;
      67              :     SharedFileSet *sfs;
      68              :     Snapshot    snapshot;
      69              : 
      70            3 :     am_repack_worker = true;
      71              : 
      72              :     /*
      73              :      * Override the default bgworker_die() with die() so we can use
      74              :      * CHECK_FOR_INTERRUPTS().
      75              :      */
      76            3 :     pqsignal(SIGTERM, die);
      77            3 :     BackgroundWorkerUnblockSignals();
      78              : 
      79            3 :     seg = dsm_attach(DatumGetUInt32(main_arg));
      80            3 :     if (seg == NULL)
      81            0 :         ereport(ERROR,
      82              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      83              :                 errmsg("could not map dynamic shared memory segment"));
      84            3 :     worker_dsm_segment = seg;
      85              : 
      86            3 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
      87              : 
      88              :     /* Arrange to signal the leader if we exit. */
      89            3 :     before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
      90              : 
      91              :     /*
      92              :      * Join locking group - see the comments around the call of
      93              :      * start_repack_decoding_worker().
      94              :      */
      95            3 :     if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
      96            0 :         return;                 /* The leader is not running anymore. */
      97              : 
      98              :     /*
      99              :      * Setup a queue to send error messages to the backend that launched this
     100              :      * worker.
     101              :      */
     102            3 :     mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
     103            3 :     shm_mq_set_sender(mq, MyProc);
     104            3 :     mqh = shm_mq_attach(mq, seg, NULL);
     105            3 :     pq_redirect_to_shm_mq(seg, mqh);
     106            3 :     pq_set_parallel_leader(shared->backend_pid,
     107              :                            shared->backend_proc_number);
     108              : 
     109              :     /* Connect to the database. LOGIN is not required. */
     110            3 :     BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
     111              :                                               BGWORKER_BYPASS_ROLELOGINCHECK);
     112              : 
     113              :     /*
     114              :      * Transaction is needed to open relation, and it also provides us with a
     115              :      * resource owner.
     116              :      */
     117            3 :     StartTransactionCommand();
     118              : 
     119            3 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
     120              : 
     121              :     /*
     122              :      * Not sure the spinlock is needed here - the backend should not change
     123              :      * anything in the shared memory until we have serialized the snapshot.
     124              :      */
     125            3 :     SpinLockAcquire(&shared->mutex);
     126              :     Assert(!XLogRecPtrIsValid(shared->lsn_upto));
     127            3 :     sfs = &shared->sfs;
     128            3 :     SpinLockRelease(&shared->mutex);
     129              : 
     130            3 :     SharedFileSetAttach(sfs, seg);
     131              : 
     132              :     /*
     133              :      * Prepare to capture the concurrent data changes ourselves.
     134              :      */
     135            3 :     decoding_ctx = repack_setup_logical_decoding(shared->relid);
     136              : 
     137              :     /* Announce that we're ready. */
     138            3 :     SpinLockAcquire(&shared->mutex);
     139            3 :     shared->initialized = true;
     140            3 :     SpinLockRelease(&shared->mutex);
     141            3 :     ConditionVariableSignal(&shared->cv);
     142              : 
     143              :     /* There doesn't seem to a nice API to set these */
     144            3 :     XactIsoLevel = XACT_REPEATABLE_READ;
     145            3 :     XactReadOnly = true;
     146              : 
     147              :     /* Build the initial snapshot and export it. */
     148            3 :     snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
     149            3 :     export_initial_snapshot(snapshot, shared);
     150              : 
     151              :     /*
     152              :      * Only historic snapshots should be used now. Do not let us restrict the
     153              :      * progress of xmin horizon.
     154              :      */
     155            3 :     InvalidateCatalogSnapshot();
     156              : 
     157              :     for (;;)
     158            3 :     {
     159            6 :         bool        stop = decode_concurrent_changes(decoding_ctx, shared);
     160              : 
     161            6 :         if (stop)
     162            3 :             break;
     163              : 
     164              :     }
     165              : 
     166              :     /* Cleanup. */
     167            3 :     repack_cleanup_logical_decoding(decoding_ctx);
     168            3 :     CommitTransactionCommand();
     169              : }
     170              : 
     171              : /*
     172              :  * See ParallelWorkerShutdown for details.
     173              :  */
     174              : static void
     175            3 : RepackWorkerShutdown(int code, Datum arg)
     176              : {
     177            3 :     DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
     178              : 
     179            3 :     SendProcSignal(shared->backend_pid,
     180              :                    PROCSIG_REPACK_MESSAGE,
     181              :                    shared->backend_proc_number);
     182              : 
     183            3 :     dsm_detach(worker_dsm_segment);
     184            3 : }
     185              : 
     186              : bool
     187         2028 : AmRepackWorker(void)
     188              : {
     189         2028 :     return am_repack_worker;
     190              : }
     191              : 
     192              : /*
     193              :  * This function is much like pg_create_logical_replication_slot() except that
     194              :  * the new slot is neither released (if anyone else could read changes from
     195              :  * our slot, we could miss changes other backends do while we copy the
     196              :  * existing data into temporary table), nor persisted (it's easier to handle
     197              :  * crash by restarting all the work from scratch).
     198              :  */
     199              : static LogicalDecodingContext *
     200            3 : repack_setup_logical_decoding(Oid relid)
     201              : {
     202              :     Relation    rel;
     203              :     Oid         toastrelid;
     204              :     LogicalDecodingContext *ctx;
     205              :     NameData    slotname;
     206              :     RepackDecodingState *dstate;
     207              :     MemoryContext oldcxt;
     208              : 
     209              :     /*
     210              :      * REPACK CONCURRENTLY is not allowed in a transaction block, so this
     211              :      * should never fire.
     212              :      */
     213              :     Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
     214              : 
     215              :     /*
     216              :      * Make sure we can use logical decoding.
     217              :      */
     218            3 :     CheckLogicalDecodingRequirements(true);
     219              : 
     220              :     /*
     221              :      * A single backend should not execute multiple REPACK commands at a time,
     222              :      * so use PID to make the slot unique.
     223              :      *
     224              :      * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
     225              :      */
     226            3 :     snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
     227            3 :     ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
     228              :                           false, false);
     229              : 
     230            3 :     EnsureLogicalDecodingEnabled();
     231              : 
     232              :     /*
     233              :      * Neither prepare_write nor do_write callback nor update_progress is
     234              :      * useful for us.
     235              :      */
     236            3 :     ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
     237              :                                     NIL,
     238              :                                     true,
     239              :                                     true,
     240              :                                     InvalidXLogRecPtr,
     241            3 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     242              :                                                .segment_open = wal_segment_open,
     243              :                                                .segment_close = wal_segment_close),
     244              :                                     NULL, NULL, NULL);
     245              : 
     246              :     /*
     247              :      * We don't have control on setting fast_forward, so at least check it.
     248              :      */
     249              :     Assert(!ctx->fast_forward);
     250              : 
     251              :     /* Avoid logical decoding of other relations. */
     252            3 :     rel = table_open(relid, AccessShareLock);
     253            3 :     repacked_rel_locator = rel->rd_locator;
     254            3 :     toastrelid = rel->rd_rel->reltoastrelid;
     255            3 :     if (OidIsValid(toastrelid))
     256              :     {
     257              :         Relation    toastrel;
     258              : 
     259              :         /* Avoid logical decoding of other TOAST relations. */
     260            1 :         toastrel = table_open(toastrelid, AccessShareLock);
     261            1 :         repacked_rel_toast_locator = toastrel->rd_locator;
     262            1 :         table_close(toastrel, AccessShareLock);
     263              :     }
     264            3 :     table_close(rel, AccessShareLock);
     265              : 
     266            3 :     DecodingContextFindStartpoint(ctx);
     267              : 
     268              :     /*
     269              :      * decode_concurrent_changes() needs non-blocking callback.
     270              :      */
     271            3 :     ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
     272              : 
     273              :     /* Some WAL records should have been read. */
     274              :     Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr));
     275              : 
     276              :     /*
     277              :      * Initialize repack_current_segment so that we can notice WAL segment
     278              :      * boundaries.
     279              :      */
     280            3 :     XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
     281              :                 wal_segment_size);
     282              : 
     283              :     /* Our private state belongs to the decoding context. */
     284            3 :     oldcxt = MemoryContextSwitchTo(ctx->context);
     285              : 
     286              :     /*
     287              :      * read_local_xlog_page_no_wait() needs to be able to indicate the end of
     288              :      * WAL.
     289              :      */
     290            3 :     ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
     291            3 :     dstate = palloc0_object(RepackDecodingState);
     292            3 :     MemoryContextSwitchTo(oldcxt);
     293              : 
     294              : #ifdef  USE_ASSERT_CHECKING
     295              :     dstate->relid = relid;
     296              : #endif
     297              : 
     298            3 :     dstate->change_cxt = AllocSetContextCreate(ctx->context,
     299              :                                                "REPACK - change",
     300              :                                                ALLOCSET_DEFAULT_SIZES);
     301              : 
     302              :     /* The file will be set as soon as we have it opened. */
     303            3 :     dstate->file = NULL;
     304              : 
     305              :     /*
     306              :      * Memory context and resource owner for long-lived resources.
     307              :      */
     308            3 :     dstate->worker_cxt = CurrentMemoryContext;
     309            3 :     dstate->worker_resowner = CurrentResourceOwner;
     310              : 
     311            3 :     ctx->output_writer_private = dstate;
     312              : 
     313            3 :     return ctx;
     314              : }
     315              : 
     316              : static void
     317            3 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
     318              : {
     319              :     RepackDecodingState *dstate;
     320              : 
     321            3 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     322            3 :     if (dstate->slot)
     323            1 :         ExecDropSingleTupleTableSlot(dstate->slot);
     324              : 
     325            3 :     FreeDecodingContext(ctx);
     326            3 :     ReplicationSlotDropAcquired();
     327            3 : }
     328              : 
     329              : /*
     330              :  * Make snapshot available to the backend that launched the decoding worker.
     331              :  */
     332              : static void
     333            3 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
     334              : {
     335              :     char        fname[MAXPGPATH];
     336              :     BufFile    *file;
     337              :     Size        snap_size;
     338              :     char       *snap_space;
     339              : 
     340            3 :     snap_size = EstimateSnapshotSpace(snapshot);
     341            3 :     snap_space = (char *) palloc(snap_size);
     342            3 :     SerializeSnapshot(snapshot, snap_space);
     343              : 
     344            3 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
     345            3 :     file = BufFileCreateFileSet(&shared->sfs.fs, fname);
     346              :     /* To make restoration easier, write the snapshot size first. */
     347            3 :     BufFileWrite(file, &snap_size, sizeof(snap_size));
     348            3 :     BufFileWrite(file, snap_space, snap_size);
     349            3 :     BufFileClose(file);
     350            3 :     pfree(snap_space);
     351              : 
     352              :     /* Increase the counter to tell the backend that the file is available. */
     353            3 :     SpinLockAcquire(&shared->mutex);
     354            3 :     shared->last_exported++;
     355            3 :     SpinLockRelease(&shared->mutex);
     356            3 :     ConditionVariableSignal(&shared->cv);
     357            3 : }
     358              : 
     359              : /*
     360              :  * Decode logical changes from the WAL sequence and store them to a file.
     361              :  *
     362              :  * If true is returned, there is no more work for the worker.
     363              :  */
     364              : static bool
     365            6 : decode_concurrent_changes(LogicalDecodingContext *ctx,
     366              :                           DecodingWorkerShared *shared)
     367              : {
     368              :     RepackDecodingState *dstate;
     369              :     XLogRecPtr  lsn_upto;
     370              :     bool        done;
     371              :     char        fname[MAXPGPATH];
     372              : 
     373            6 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     374              : 
     375              :     /* Open the output file. */
     376            6 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
     377            6 :     dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
     378              : 
     379            6 :     SpinLockAcquire(&shared->mutex);
     380            6 :     lsn_upto = shared->lsn_upto;
     381            6 :     done = shared->done;
     382            6 :     SpinLockRelease(&shared->mutex);
     383              : 
     384              :     while (true)
     385          875 :     {
     386              :         XLogRecord *record;
     387              :         XLogSegNo   segno_new;
     388          881 :         char       *errm = NULL;
     389              :         XLogRecPtr  end_lsn;
     390              : 
     391          881 :         CHECK_FOR_INTERRUPTS();
     392              : 
     393          881 :         record = XLogReadRecord(ctx->reader, &errm);
     394          881 :         if (record)
     395              :         {
     396          863 :             LogicalDecodingProcessRecord(ctx, ctx->reader);
     397              : 
     398              :             /*
     399              :              * If WAL segment boundary has been crossed, inform the decoding
     400              :              * system that the catalog_xmin can advance.
     401              :              */
     402          863 :             end_lsn = ctx->reader->EndRecPtr;
     403          863 :             XLByteToSeg(end_lsn, segno_new, wal_segment_size);
     404          863 :             if (segno_new != repack_current_segment)
     405              :             {
     406            0 :                 LogicalConfirmReceivedLocation(end_lsn);
     407            0 :                 elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
     408              :                      (uint32) (end_lsn >> 32), (uint32) end_lsn);
     409            0 :                 repack_current_segment = segno_new;
     410              :             }
     411              :         }
     412              :         else
     413              :         {
     414              :             ReadLocalXLogPageNoWaitPrivate *priv;
     415              : 
     416           18 :             if (errm)
     417            0 :                 ereport(ERROR,
     418              :                         errmsg("%s", errm));
     419              : 
     420              :             /*
     421              :              * In the decoding loop we do not want to get blocked when there
     422              :              * is no more WAL available, otherwise the loop would become
     423              :              * uninterruptible.
     424              :              */
     425           18 :             priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
     426           18 :             if (priv->end_of_wal)
     427              :                 /* Do not miss the end of WAL condition next time. */
     428           18 :                 priv->end_of_wal = false;
     429              :             else
     430            0 :                 ereport(ERROR,
     431              :                         errmsg("could not read WAL record"));
     432              :         }
     433              : 
     434              :         /*
     435              :          * Whether we could read new record or not, keep checking if
     436              :          * 'lsn_upto' was specified.
     437              :          */
     438          881 :         if (!XLogRecPtrIsValid(lsn_upto))
     439              :         {
     440          753 :             SpinLockAcquire(&shared->mutex);
     441          753 :             lsn_upto = shared->lsn_upto;
     442              :             /* 'done' should be set at the same time as 'lsn_upto' */
     443          753 :             done = shared->done;
     444          753 :             SpinLockRelease(&shared->mutex);
     445              :         }
     446          881 :         if (XLogRecPtrIsValid(lsn_upto) &&
     447          134 :             ctx->reader->EndRecPtr >= lsn_upto)
     448            6 :             break;
     449              : 
     450          875 :         if (record == NULL)
     451              :         {
     452           15 :             int64       timeout = 0;
     453              :             WaitLSNResult res;
     454              : 
     455              :             /*
     456              :              * Before we retry reading, wait until new WAL is flushed.
     457              :              *
     458              :              * There is a race condition such that the backend executing
     459              :              * REPACK determines 'lsn_upto', but before it sets the shared
     460              :              * variable, we reach the end of WAL. In that case we'd need to
     461              :              * wait until the next WAL flush (unrelated to REPACK). Although
     462              :              * that should not be a problem in a busy system, it might be
     463              :              * noticeable in other cases, including regression tests (which
     464              :              * are not necessarily executed in parallel). Therefore it makes
     465              :              * sense to use timeout.
     466              :              *
     467              :              * If lsn_upto is valid, WAL records having LSN lower than that
     468              :              * should already have been flushed to disk.
     469              :              */
     470           15 :             if (!XLogRecPtrIsValid(lsn_upto))
     471           15 :                 timeout = 100L;
     472           15 :             res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
     473           15 :                              ctx->reader->EndRecPtr + 1,
     474              :                              timeout);
     475           15 :             if (res != WAIT_LSN_RESULT_SUCCESS &&
     476              :                 res != WAIT_LSN_RESULT_TIMEOUT)
     477            0 :                 ereport(ERROR,
     478              :                         errmsg("waiting for WAL failed"));
     479              :         }
     480              :     }
     481              : 
     482              :     /*
     483              :      * Close the file so we can make it available to the backend.
     484              :      */
     485            6 :     BufFileClose(dstate->file);
     486            6 :     dstate->file = NULL;
     487            6 :     SpinLockAcquire(&shared->mutex);
     488            6 :     shared->lsn_upto = InvalidXLogRecPtr;
     489            6 :     shared->last_exported++;
     490            6 :     SpinLockRelease(&shared->mutex);
     491            6 :     ConditionVariableSignal(&shared->cv);
     492              : 
     493            6 :     return done;
     494              : }
     495              : 
     496              : /*
     497              :  * Does the WAL record contain a data change that this backend does not need
     498              :  * to decode on behalf of REPACK (CONCURRENTLY)?
     499              :  */
     500              : bool
     501      1492817 : change_useless_for_repack(XLogRecordBuffer *buf)
     502              : {
     503      1492817 :     XLogReaderState *r = buf->record;
     504              :     RelFileLocator locator;
     505              : 
     506              :     /* TOAST locator should not be set unless the main is. */
     507              :     Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
     508              :            OidIsValid(repacked_rel_locator.relNumber));
     509              : 
     510              :     /*
     511              :      * Backends not involved in REPACK (CONCURRENTLY) should not do the
     512              :      * filtering.
     513              :      */
     514      1492817 :     if (!OidIsValid(repacked_rel_locator.relNumber))
     515      1492481 :         return false;
     516              : 
     517              :     /*
     518              :      * If the record does not contain the block 0, it's probably not INSERT /
     519              :      * UPDATE / DELETE. In any case, we do not have enough information to
     520              :      * filter the change out.
     521              :      */
     522          336 :     if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
     523            0 :         return false;
     524              : 
     525              :     /*
     526              :      * Decode the change if it belongs to the table we are repacking, or if it
     527              :      * belongs to its TOAST relation.
     528              :      */
     529          336 :     if (RelFileLocatorEquals(locator, repacked_rel_locator))
     530           31 :         return false;
     531          305 :     if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
     532          240 :         RelFileLocatorEquals(locator, repacked_rel_toast_locator))
     533           44 :         return false;
     534              : 
     535              :     /* Filter out changes of other tables. */
     536          261 :     return true;
     537              : }
        

Generated by: LCOV version 2.0-1