LCOV - code coverage report
Current view: top level - src/backend/commands - repack_worker.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.4 % 160 151
Test Date: 2026-04-07 14:16:30 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * repack_worker.c
       4              :  *    Implementation of the background worker for ad-hoc logical decoding
       5              :  *    during REPACK (CONCURRENTLY).
       6              :  *
       7              :  *
       8              :  * Copyright (c) 2026, PostgreSQL Global Development Group
       9              :  *
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *    src/backend/commands/repack_worker.c
      13              :  *
      14              :  *-------------------------------------------------------------------------
      15              :  */
      16              : #include "postgres.h"
      17              : 
      18              : #include "access/table.h"
      19              : #include "access/xlog_internal.h"
      20              : #include "access/xlogutils.h"
      21              : #include "access/xlogwait.h"
      22              : #include "commands/repack.h"
      23              : #include "commands/repack_internal.h"
      24              : #include "libpq/pqmq.h"
      25              : #include "replication/snapbuild.h"
      26              : #include "storage/ipc.h"
      27              : #include "storage/proc.h"
      28              : #include "tcop/tcopprot.h"
      29              : #include "utils/memutils.h"
      30              : 
      31              : #define REPL_PLUGIN_NAME   "pgrepack"
      32              : 
      33              : static void RepackWorkerShutdown(int code, Datum arg);
      34              : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
      35              : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
      36              : static void export_initial_snapshot(Snapshot snapshot,
      37              :                                     DecodingWorkerShared *shared);
      38              : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
      39              :                                       DecodingWorkerShared *shared);
      40              : 
      41              : /* Is this process a REPACK worker? */
      42              : static bool am_repack_worker = false;
      43              : 
      44              : /* The WAL segment being decoded. */
      45              : static XLogSegNo repack_current_segment = 0;
      46              : 
      47              : /*
      48              :  * Keep track of the table we're processing, to skip logical decoding of data
      49              :  * from other relations.
      50              :  */
      51              : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
      52              : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
      53              : 
      54              : 
      55              : /* REPACK decoding worker entry point */
      56              : void
      57            2 : RepackWorkerMain(Datum main_arg)
      58              : {
      59              :     dsm_segment *seg;
      60              :     DecodingWorkerShared *shared;
      61              :     shm_mq     *mq;
      62              :     shm_mq_handle *mqh;
      63              :     LogicalDecodingContext *decoding_ctx;
      64              :     SharedFileSet *sfs;
      65              :     Snapshot    snapshot;
      66              : 
      67            2 :     am_repack_worker = true;
      68              : 
      69              :     /*
      70              :      * Override the default bgworker_die() with die() so we can use
      71              :      * CHECK_FOR_INTERRUPTS().
      72              :      */
      73            2 :     pqsignal(SIGTERM, die);
      74            2 :     BackgroundWorkerUnblockSignals();
      75              : 
      76            2 :     seg = dsm_attach(DatumGetUInt32(main_arg));
      77            2 :     if (seg == NULL)
      78            0 :         ereport(ERROR,
      79              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      80              :                 errmsg("could not map dynamic shared memory segment"));
      81              : 
      82            2 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
      83            2 :     shared->dsm_seg = seg;
      84              : 
      85              :     /* Arrange to signal the leader if we exit. */
      86            2 :     before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
      87              : 
      88              :     /*
      89              :      * Join locking group - see the comments around the call of
      90              :      * start_repack_decoding_worker().
      91              :      */
      92            2 :     if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
      93            0 :         return;                 /* The leader is not running anymore. */
      94              : 
      95              :     /*
      96              :      * Setup a queue to send error messages to the backend that launched this
      97              :      * worker.
      98              :      */
      99            2 :     mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
     100            2 :     shm_mq_set_sender(mq, MyProc);
     101            2 :     mqh = shm_mq_attach(mq, seg, NULL);
     102            2 :     pq_redirect_to_shm_mq(seg, mqh);
     103            2 :     pq_set_parallel_leader(shared->backend_pid,
     104              :                            shared->backend_proc_number);
     105              : 
     106              :     /* Connect to the database. */
     107            2 :     BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid, 0);
     108              : 
     109              :     /*
     110              :      * Transaction is needed to open relation, and it also provides us with a
     111              :      * resource owner.
     112              :      */
     113            2 :     StartTransactionCommand();
     114              : 
     115            2 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
     116              : 
     117              :     /*
     118              :      * Not sure the spinlock is needed here - the backend should not change
     119              :      * anything in the shared memory until we have serialized the snapshot.
     120              :      */
     121            2 :     SpinLockAcquire(&shared->mutex);
     122              :     Assert(!XLogRecPtrIsValid(shared->lsn_upto));
     123            2 :     sfs = &shared->sfs;
     124            2 :     SpinLockRelease(&shared->mutex);
     125              : 
     126            2 :     SharedFileSetAttach(sfs, seg);
     127              : 
     128              :     /*
     129              :      * Prepare to capture the concurrent data changes ourselves.
     130              :      */
     131            2 :     decoding_ctx = repack_setup_logical_decoding(shared->relid);
     132              : 
     133              :     /* Announce that we're ready. */
     134            2 :     SpinLockAcquire(&shared->mutex);
     135            2 :     shared->initialized = true;
     136            2 :     SpinLockRelease(&shared->mutex);
     137            2 :     ConditionVariableSignal(&shared->cv);
     138              : 
     139              :     /* There doesn't seem to a nice API to set these */
     140            2 :     XactIsoLevel = XACT_REPEATABLE_READ;
     141            2 :     XactReadOnly = true;
     142              : 
     143              :     /* Build the initial snapshot and export it. */
     144            2 :     snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
     145            2 :     export_initial_snapshot(snapshot, shared);
     146              : 
     147              :     /*
     148              :      * Only historic snapshots should be used now. Do not let us restrict the
     149              :      * progress of xmin horizon.
     150              :      */
     151            2 :     InvalidateCatalogSnapshot();
     152              : 
     153              :     for (;;)
     154            2 :     {
     155            4 :         bool        stop = decode_concurrent_changes(decoding_ctx, shared);
     156              : 
     157            4 :         if (stop)
     158            2 :             break;
     159              : 
     160              :     }
     161              : 
     162              :     /* Cleanup. */
     163            2 :     repack_cleanup_logical_decoding(decoding_ctx);
     164            2 :     CommitTransactionCommand();
     165              : }
     166              : 
     167              : /*
     168              :  * See ParallelWorkerShutdown for details.
     169              :  */
     170              : static void
     171            2 : RepackWorkerShutdown(int code, Datum arg)
     172              : {
     173            2 :     DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
     174              : 
     175            2 :     SendProcSignal(shared->backend_pid,
     176              :                    PROCSIG_REPACK_MESSAGE,
     177              :                    shared->backend_proc_number);
     178              : 
     179            2 :     dsm_detach(shared->dsm_seg);
     180            2 : }
     181              : 
     182              : bool
     183         2020 : AmRepackWorker(void)
     184              : {
     185         2020 :     return am_repack_worker;
     186              : }
     187              : 
     188              : /*
     189              :  * This function is much like pg_create_logical_replication_slot() except that
     190              :  * the new slot is neither released (if anyone else could read changes from
     191              :  * our slot, we could miss changes other backends do while we copy the
     192              :  * existing data into temporary table), nor persisted (it's easier to handle
     193              :  * crash by restarting all the work from scratch).
     194              :  */
     195              : static LogicalDecodingContext *
     196            2 : repack_setup_logical_decoding(Oid relid)
     197              : {
     198              :     Relation    rel;
     199              :     Oid         toastrelid;
     200              :     LogicalDecodingContext *ctx;
     201              :     NameData    slotname;
     202              :     RepackDecodingState *dstate;
     203              :     MemoryContext oldcxt;
     204              : 
     205              :     /*
     206              :      * REPACK CONCURRENTLY is not allowed in a transaction block, so this
     207              :      * should never fire.
     208              :      */
     209              :     Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
     210              : 
     211              :     /*
     212              :      * Make sure we can use logical decoding.
     213              :      */
     214            2 :     CheckSlotPermissions();
     215            2 :     CheckLogicalDecodingRequirements();
     216              : 
     217              :     /*
     218              :      * A single backend should not execute multiple REPACK commands at a time,
     219              :      * so use PID to make the slot unique.
     220              :      *
     221              :      * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
     222              :      */
     223            2 :     snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
     224            2 :     ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
     225              :                           false);
     226              : 
     227            2 :     EnsureLogicalDecodingEnabled();
     228              : 
     229              :     /*
     230              :      * Neither prepare_write nor do_write callback nor update_progress is
     231              :      * useful for us.
     232              :      */
     233            2 :     ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
     234              :                                     NIL,
     235              :                                     true,
     236              :                                     InvalidXLogRecPtr,
     237            2 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     238              :                                                .segment_open = wal_segment_open,
     239              :                                                .segment_close = wal_segment_close),
     240              :                                     NULL, NULL, NULL);
     241              : 
     242              :     /*
     243              :      * We don't have control on setting fast_forward, so at least check it.
     244              :      */
     245              :     Assert(!ctx->fast_forward);
     246              : 
     247              :     /* Avoid logical decoding of other relations. */
     248            2 :     rel = table_open(relid, AccessShareLock);
     249            2 :     repacked_rel_locator = rel->rd_locator;
     250            2 :     toastrelid = rel->rd_rel->reltoastrelid;
     251            2 :     if (OidIsValid(toastrelid))
     252              :     {
     253              :         Relation    toastrel;
     254              : 
     255              :         /* Avoid logical decoding of other TOAST relations. */
     256            1 :         toastrel = table_open(toastrelid, AccessShareLock);
     257            1 :         repacked_rel_toast_locator = toastrel->rd_locator;
     258            1 :         table_close(toastrel, AccessShareLock);
     259              :     }
     260            2 :     table_close(rel, AccessShareLock);
     261              : 
     262            2 :     DecodingContextFindStartpoint(ctx);
     263              : 
     264              :     /*
     265              :      * decode_concurrent_changes() needs non-blocking callback.
     266              :      */
     267            2 :     ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
     268              : 
     269              :     /* Some WAL records should have been read. */
     270              :     Assert(ctx->reader->EndRecPtr != InvalidXLogRecPtr);
     271              : 
     272              :     /*
     273              :      * Initialize repack_current_segment so that we can notice WAL segment
     274              :      * boundaries.
     275              :      */
     276            2 :     XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
     277              :                 wal_segment_size);
     278              : 
     279              :     /* Our private state belongs to the decoding context. */
     280            2 :     oldcxt = MemoryContextSwitchTo(ctx->context);
     281              : 
     282              :     /*
     283              :      * read_local_xlog_page_no_wait() needs to be able to indicate the end of
     284              :      * WAL.
     285              :      */
     286            2 :     ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
     287            2 :     dstate = palloc0_object(RepackDecodingState);
     288            2 :     MemoryContextSwitchTo(oldcxt);
     289              : 
     290              : #ifdef  USE_ASSERT_CHECKING
     291              :     dstate->relid = relid;
     292              : #endif
     293              : 
     294            2 :     dstate->change_cxt = AllocSetContextCreate(ctx->context,
     295              :                                                "REPACK - change",
     296              :                                                ALLOCSET_DEFAULT_SIZES);
     297              : 
     298              :     /* The file will be set as soon as we have it opened. */
     299            2 :     dstate->file = NULL;
     300              : 
     301              :     /*
     302              :      * Memory context and resource owner for long-lived resources.
     303              :      */
     304            2 :     dstate->worker_cxt = CurrentMemoryContext;
     305            2 :     dstate->worker_resowner = CurrentResourceOwner;
     306              : 
     307            2 :     ctx->output_writer_private = dstate;
     308              : 
     309            2 :     return ctx;
     310              : }
     311              : 
     312              : static void
     313            2 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
     314              : {
     315              :     RepackDecodingState *dstate;
     316              : 
     317            2 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     318            2 :     if (dstate->slot)
     319            1 :         ExecDropSingleTupleTableSlot(dstate->slot);
     320              : 
     321            2 :     FreeDecodingContext(ctx);
     322            2 :     ReplicationSlotDropAcquired();
     323            2 : }
     324              : 
     325              : /*
     326              :  * Make snapshot available to the backend that launched the decoding worker.
     327              :  */
     328              : static void
     329            2 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
     330              : {
     331              :     char        fname[MAXPGPATH];
     332              :     BufFile    *file;
     333              :     Size        snap_size;
     334              :     char       *snap_space;
     335              : 
     336            2 :     snap_size = EstimateSnapshotSpace(snapshot);
     337            2 :     snap_space = (char *) palloc(snap_size);
     338            2 :     SerializeSnapshot(snapshot, snap_space);
     339              : 
     340            2 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
     341            2 :     file = BufFileCreateFileSet(&shared->sfs.fs, fname);
     342              :     /* To make restoration easier, write the snapshot size first. */
     343            2 :     BufFileWrite(file, &snap_size, sizeof(snap_size));
     344            2 :     BufFileWrite(file, snap_space, snap_size);
     345            2 :     BufFileClose(file);
     346            2 :     pfree(snap_space);
     347              : 
     348              :     /* Increase the counter to tell the backend that the file is available. */
     349            2 :     SpinLockAcquire(&shared->mutex);
     350            2 :     shared->last_exported++;
     351            2 :     SpinLockRelease(&shared->mutex);
     352            2 :     ConditionVariableSignal(&shared->cv);
     353            2 : }
     354              : 
     355              : /*
     356              :  * Decode logical changes from the WAL sequence and store them to a file.
     357              :  *
     358              :  * If true is returned, there is no more work for the worker.
     359              :  */
     360              : static bool
     361            4 : decode_concurrent_changes(LogicalDecodingContext *ctx,
     362              :                           DecodingWorkerShared *shared)
     363              : {
     364              :     RepackDecodingState *dstate;
     365              :     XLogRecPtr  lsn_upto;
     366              :     bool        done;
     367              :     char        fname[MAXPGPATH];
     368              : 
     369            4 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     370              : 
     371              :     /* Open the output file. */
     372            4 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
     373            4 :     dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
     374              : 
     375            4 :     SpinLockAcquire(&shared->mutex);
     376            4 :     lsn_upto = shared->lsn_upto;
     377            4 :     done = shared->done;
     378            4 :     SpinLockRelease(&shared->mutex);
     379              : 
     380              :     while (true)
     381          440 :     {
     382              :         XLogRecord *record;
     383              :         XLogSegNo   segno_new;
     384          444 :         char       *errm = NULL;
     385              :         XLogRecPtr  end_lsn;
     386              : 
     387          444 :         CHECK_FOR_INTERRUPTS();
     388              : 
     389          444 :         record = XLogReadRecord(ctx->reader, &errm);
     390          444 :         if (record)
     391              :         {
     392          429 :             LogicalDecodingProcessRecord(ctx, ctx->reader);
     393              : 
     394              :             /*
     395              :              * If WAL segment boundary has been crossed, inform the decoding
     396              :              * system that the catalog_xmin can advance.
     397              :              */
     398          429 :             end_lsn = ctx->reader->EndRecPtr;
     399          429 :             XLByteToSeg(end_lsn, segno_new, wal_segment_size);
     400          429 :             if (segno_new != repack_current_segment)
     401              :             {
     402            0 :                 LogicalConfirmReceivedLocation(end_lsn);
     403            0 :                 elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
     404              :                      (uint32) (end_lsn >> 32), (uint32) end_lsn);
     405            0 :                 repack_current_segment = segno_new;
     406              :             }
     407              :         }
     408              :         else
     409              :         {
     410              :             ReadLocalXLogPageNoWaitPrivate *priv;
     411              : 
     412           15 :             if (errm)
     413            0 :                 ereport(ERROR,
     414              :                         errmsg("%s", errm));
     415              : 
     416              :             /*
     417              :              * In the decoding loop we do not want to get blocked when there
     418              :              * is no more WAL available, otherwise the loop would become
     419              :              * uninterruptible.
     420              :              */
     421           15 :             priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
     422           15 :             if (priv->end_of_wal)
     423              :                 /* Do not miss the end of WAL condition next time. */
     424           15 :                 priv->end_of_wal = false;
     425              :             else
     426            0 :                 ereport(ERROR,
     427              :                         errmsg("could not read WAL record"));
     428              :         }
     429              : 
     430              :         /*
     431              :          * Whether we could read new record or not, keep checking if
     432              :          * 'lsn_upto' was specified.
     433              :          */
     434          444 :         if (!XLogRecPtrIsValid(lsn_upto))
     435              :         {
     436          421 :             SpinLockAcquire(&shared->mutex);
     437          421 :             lsn_upto = shared->lsn_upto;
     438              :             /* 'done' should be set at the same time as 'lsn_upto' */
     439          421 :             done = shared->done;
     440          421 :             SpinLockRelease(&shared->mutex);
     441              :         }
     442          444 :         if (XLogRecPtrIsValid(lsn_upto) &&
     443           27 :             ctx->reader->EndRecPtr >= lsn_upto)
     444            4 :             break;
     445              : 
     446          440 :         if (record == NULL)
     447              :         {
     448           12 :             int64       timeout = 0;
     449              :             WaitLSNResult res;
     450              : 
     451              :             /*
     452              :              * Before we retry reading, wait until new WAL is flushed.
     453              :              *
     454              :              * There is a race condition such that the backend executing
     455              :              * REPACK determines 'lsn_upto', but before it sets the shared
     456              :              * variable, we reach the end of WAL. In that case we'd need to
     457              :              * wait until the next WAL flush (unrelated to REPACK). Although
     458              :              * that should not be a problem in a busy system, it might be
     459              :              * noticeable in other cases, including regression tests (which
     460              :              * are not necessarily executed in parallel). Therefore it makes
     461              :              * sense to use timeout.
     462              :              *
     463              :              * If lsn_upto is valid, WAL records having LSN lower than that
     464              :              * should already have been flushed to disk.
     465              :              */
     466           12 :             if (!XLogRecPtrIsValid(lsn_upto))
     467           12 :                 timeout = 100L;
     468           12 :             res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
     469           12 :                              ctx->reader->EndRecPtr + 1,
     470              :                              timeout);
     471           12 :             if (res != WAIT_LSN_RESULT_SUCCESS &&
     472              :                 res != WAIT_LSN_RESULT_TIMEOUT)
     473            0 :                 ereport(ERROR,
     474              :                         errmsg("waiting for WAL failed"));
     475              :         }
     476              :     }
     477              : 
     478              :     /*
     479              :      * Close the file so we can make it available to the backend.
     480              :      */
     481            4 :     BufFileClose(dstate->file);
     482            4 :     dstate->file = NULL;
     483            4 :     SpinLockAcquire(&shared->mutex);
     484            4 :     shared->lsn_upto = InvalidXLogRecPtr;
     485            4 :     shared->last_exported++;
     486            4 :     SpinLockRelease(&shared->mutex);
     487            4 :     ConditionVariableSignal(&shared->cv);
     488              : 
     489            4 :     return done;
     490              : }
     491              : 
     492              : /*
     493              :  * Does the WAL record contain a data change that this backend does not need
     494              :  * to decode on behalf of REPACK (CONCURRENTLY)?
     495              :  */
     496              : bool
     497      1494449 : change_useless_for_repack(XLogRecordBuffer *buf)
     498              : {
     499      1494449 :     XLogReaderState *r = buf->record;
     500              :     RelFileLocator locator;
     501              : 
     502              :     /* TOAST locator should not be set unless the main is. */
     503              :     Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
     504              :            OidIsValid(repacked_rel_locator.relNumber));
     505              : 
     506              :     /*
     507              :      * Backends not involved in REPACK (CONCURRENTLY) should not do the
     508              :      * filtering.
     509              :      */
     510      1494449 :     if (!OidIsValid(repacked_rel_locator.relNumber))
     511      1494318 :         return false;
     512              : 
     513              :     /*
     514              :      * If the record does not contain the block 0, it's probably not INSERT /
     515              :      * UPDATE / DELETE. In any case, we do not have enough information to
     516              :      * filter the change out.
     517              :      */
     518          131 :     if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
     519            0 :         return false;
     520              : 
     521              :     /*
     522              :      * Decode the change if it belongs to the table we are repacking, or if it
     523              :      * belongs to its TOAST relation.
     524              :      */
     525          131 :     if (RelFileLocatorEquals(locator, repacked_rel_locator))
     526           21 :         return false;
     527          110 :     if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
     528           65 :         RelFileLocatorEquals(locator, repacked_rel_toast_locator))
     529            8 :         return false;
     530              : 
     531              :     /* Filter out changes of other tables. */
     532          102 :     return true;
     533              : }
        

Generated by: LCOV version 2.0-1