LCOV - differential code coverage report
Current view: top level - src/backend/commands - repack_worker.c (source / functions) Coverage Total Hit UNC GNC
Current: d36b728949bf4e37ada1cd23e0f2aaa94f609a70 vs 52e118fe2f7e3381bdaa479816a7f72eda2ae517 Lines: 93.8 % 161 151 10 151
Current Date: 2026-06-29 16:15:13 +0200 Functions: 100.0 % 8 8 8
Baseline: lcov-20260630-baseline Branches: 62.5 % 72 45 27 45
Baseline Date: 2026-06-29 13:01:57 +0200 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 12 12 12
(30,360] days: 93.3 % 149 139 10 139
Function coverage date bins:
(30,360] days: 100.0 % 8 8 8
Branch coverage date bins:
(7,30] days: 50.0 % 2 1 1 1
(30,360] days: 62.9 % 70 44 26 44

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * repack_worker.c
                                  4                 :                :  *    Implementation of the background worker for ad-hoc logical decoding
                                  5                 :                :  *    during REPACK (CONCURRENTLY).
                                  6                 :                :  *
                                  7                 :                :  *
                                  8                 :                :  * Copyright (c) 2026, PostgreSQL Global Development Group
                                  9                 :                :  *
                                 10                 :                :  *
                                 11                 :                :  * IDENTIFICATION
                                 12                 :                :  *    src/backend/commands/repack_worker.c
                                 13                 :                :  *
                                 14                 :                :  *-------------------------------------------------------------------------
                                 15                 :                :  */
                                 16                 :                : #include "postgres.h"
                                 17                 :                : 
                                 18                 :                : #include "access/table.h"
                                 19                 :                : #include "access/xlog_internal.h"
                                 20                 :                : #include "access/xlogutils.h"
                                 21                 :                : #include "access/xlogwait.h"
                                 22                 :                : #include "commands/repack.h"
                                 23                 :                : #include "commands/repack_internal.h"
                                 24                 :                : #include "libpq/pqmq.h"
                                 25                 :                : #include "replication/snapbuild.h"
                                 26                 :                : #include "storage/ipc.h"
                                 27                 :                : #include "storage/proc.h"
                                 28                 :                : #include "tcop/tcopprot.h"
                                 29                 :                : #include "utils/memutils.h"
                                 30                 :                : 
                                 31                 :                : #define PGREPACK_PLUGIN   "pgrepack"
                                 32                 :                : 
                                 33                 :                : static void RepackWorkerShutdown(int code, Datum arg);
                                 34                 :                : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
                                 35                 :                : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
                                 36                 :                : static void export_initial_snapshot(Snapshot snapshot,
                                 37                 :                :                                     DecodingWorkerShared *shared);
                                 38                 :                : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
                                 39                 :                :                                       DecodingWorkerShared *shared);
                                 40                 :                : 
                                 41                 :                : /* Is this process a REPACK worker? */
                                 42                 :                : static bool am_repack_worker = false;
                                 43                 :                : 
                                 44                 :                : /* The WAL segment being decoded. */
                                 45                 :                : static XLogSegNo repack_current_segment = 0;
                                 46                 :                : 
                                 47                 :                : /* Our DSM segment, for shutting down */
                                 48                 :                : static dsm_segment *worker_dsm_segment = NULL;
                                 49                 :                : 
                                 50                 :                : /*
                                 51                 :                :  * Keep track of the table we're processing, to skip logical decoding of data
                                 52                 :                :  * from other relations.
                                 53                 :                :  */
                                 54                 :                : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
                                 55                 :                : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
                                 56                 :                : 
                                 57                 :                : 
                                 58                 :                : /* REPACK decoding worker entry point */
                                 59                 :                : void
   85 alvherre@kurilemu.de       60                 :GNC           7 : RepackWorkerMain(Datum main_arg)
                                 61                 :                : {
                                 62                 :                :     dsm_segment *seg;
                                 63                 :                :     DecodingWorkerShared *shared;
                                 64                 :                :     shm_mq     *mq;
                                 65                 :                :     shm_mq_handle *mqh;
                                 66                 :                :     LogicalDecodingContext *decoding_ctx;
                                 67                 :                :     SharedFileSet *sfs;
                                 68                 :                :     Snapshot    snapshot;
                                 69                 :                : 
                                 70                 :              7 :     am_repack_worker = true;
                                 71                 :                : 
                                 72                 :              7 :     BackgroundWorkerUnblockSignals();
                                 73                 :                : 
                                 74                 :              7 :     seg = dsm_attach(DatumGetUInt32(main_arg));
                                 75         [ -  + ]:              7 :     if (seg == NULL)
   85 alvherre@kurilemu.de       76         [ #  # ]:UNC           0 :         ereport(ERROR,
                                 77                 :                :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 78                 :                :                 errmsg("could not map dynamic shared memory segment"));
   75 alvherre@kurilemu.de       79                 :GNC           7 :     worker_dsm_segment = seg;
                                 80                 :                : 
   85                            81                 :              7 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
                                 82                 :                : 
                                 83                 :                :     /* Arrange to signal the leader if we exit. */
                                 84                 :              7 :     before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
                                 85                 :                : 
                                 86                 :                :     /*
                                 87                 :                :      * Join locking group - see the comments around the call of
                                 88                 :                :      * start_repack_decoding_worker().
                                 89                 :                :      */
                                 90         [ -  + ]:              7 :     if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
   85 alvherre@kurilemu.de       91                 :UNC           0 :         return;                 /* The leader is not running anymore. */
                                 92                 :                : 
                                 93                 :                :     /*
                                 94                 :                :      * Setup a queue to send error messages to the backend that launched this
                                 95                 :                :      * worker.
                                 96                 :                :      */
   85 alvherre@kurilemu.de       97                 :GNC           7 :     mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
                                 98                 :              7 :     shm_mq_set_sender(mq, MyProc);
                                 99                 :              7 :     mqh = shm_mq_attach(mq, seg, NULL);
                                100                 :              7 :     pq_redirect_to_shm_mq(seg, mqh);
                                101                 :              7 :     pq_set_parallel_leader(shared->backend_pid,
                                102                 :                :                            shared->backend_proc_number);
                                103                 :                : 
                                104                 :                :     /* Connect to the database. LOGIN is not required. */
   71                           105                 :              7 :     BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
                                106                 :                :                                               BGWORKER_BYPASS_ROLELOGINCHECK);
                                107                 :                : 
                                108                 :                :     /*
                                109                 :                :      * Transaction is needed to open relation, and it also provides us with a
                                110                 :                :      * resource owner.
                                111                 :                :      */
   85                           112                 :              7 :     StartTransactionCommand();
                                113                 :                : 
                                114                 :              7 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
                                115                 :                : 
                                116                 :                :     /*
                                117                 :                :      * Not sure the spinlock is needed here - the backend should not change
                                118                 :                :      * anything in the shared memory until we have serialized the snapshot.
                                119                 :                :      */
                                120                 :              7 :     SpinLockAcquire(&shared->mutex);
                                121         [ -  + ]:              7 :     Assert(!XLogRecPtrIsValid(shared->lsn_upto));
                                122                 :              7 :     sfs = &shared->sfs;
                                123                 :              7 :     SpinLockRelease(&shared->mutex);
                                124                 :                : 
                                125                 :              7 :     SharedFileSetAttach(sfs, seg);
                                126                 :                : 
                                127                 :                :     /*
                                128                 :                :      * Prepare to capture the concurrent data changes ourselves.
                                129                 :                :      */
                                130                 :              7 :     decoding_ctx = repack_setup_logical_decoding(shared->relid);
                                131                 :                : 
                                132                 :                :     /* Announce that we're ready. */
                                133                 :              7 :     SpinLockAcquire(&shared->mutex);
                                134                 :              7 :     shared->initialized = true;
                                135                 :              7 :     SpinLockRelease(&shared->mutex);
                                136                 :              7 :     ConditionVariableSignal(&shared->cv);
                                137                 :                : 
                                138                 :                :     /* There doesn't seem to a nice API to set these */
                                139                 :              7 :     XactIsoLevel = XACT_REPEATABLE_READ;
                                140                 :              7 :     XactReadOnly = true;
                                141                 :                : 
                                142                 :                :     /* Build the initial snapshot and export it. */
                                143                 :              7 :     snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
                                144                 :              7 :     export_initial_snapshot(snapshot, shared);
                                145                 :                : 
                                146                 :                :     /*
                                147                 :                :      * Only historic snapshots should be used now. Do not let us restrict the
                                148                 :                :      * progress of xmin horizon.
                                149                 :                :      */
                                150                 :              7 :     InvalidateCatalogSnapshot();
                                151                 :                : 
                                152                 :                :     for (;;)
                                153                 :              7 :     {
                                154                 :             14 :         bool        stop = decode_concurrent_changes(decoding_ctx, shared);
                                155                 :                : 
                                156         [ +  + ]:             14 :         if (stop)
                                157                 :              7 :             break;
                                158                 :                : 
                                159                 :                :     }
                                160                 :                : 
                                161                 :                :     /* Cleanup. */
                                162                 :              7 :     repack_cleanup_logical_decoding(decoding_ctx);
                                163                 :              7 :     CommitTransactionCommand();
                                164                 :                : }
                                165                 :                : 
                                166                 :                : /*
                                167                 :                :  * See ParallelWorkerShutdown for details.
                                168                 :                :  */
                                169                 :                : static void
                                170                 :              7 : RepackWorkerShutdown(int code, Datum arg)
                                171                 :                : {
                                172                 :              7 :     DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
                                173                 :                : 
                                174                 :              7 :     SendProcSignal(shared->backend_pid,
                                175                 :                :                    PROCSIG_REPACK_MESSAGE,
                                176                 :                :                    shared->backend_proc_number);
                                177                 :                : 
   75                           178                 :              7 :     dsm_detach(worker_dsm_segment);
   85                           179                 :              7 : }
                                180                 :                : 
                                181                 :                : bool
                                182                 :           2029 : AmRepackWorker(void)
                                183                 :                : {
                                184                 :           2029 :     return am_repack_worker;
                                185                 :                : }
                                186                 :                : 
                                187                 :                : /*
                                188                 :                :  * This function is much like pg_create_logical_replication_slot() except that
                                189                 :                :  * the new slot is neither released (if anyone else could read changes from
                                190                 :                :  * our slot, we could miss changes other backends do while we copy the
                                191                 :                :  * existing data into temporary table), nor persisted (it's easier to handle
                                192                 :                :  * crash by restarting all the work from scratch).
                                193                 :                :  */
                                194                 :                : static LogicalDecodingContext *
                                195                 :              7 : repack_setup_logical_decoding(Oid relid)
                                196                 :                : {
                                197                 :                :     Relation    rel;
                                198                 :                :     Oid         toastrelid;
                                199                 :                :     LogicalDecodingContext *ctx;
                                200                 :                :     char        slotname[NAMEDATALEN];
                                201                 :                :     RepackDecodingState *dstate;
                                202                 :                :     MemoryContext oldcxt;
                                203                 :                : 
                                204                 :                :     /*
                                205                 :                :      * REPACK CONCURRENTLY is not allowed in a transaction block, so this
                                206                 :                :      * should never fire.
                                207                 :                :      */
                                208         [ -  + ]:              7 :     Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
                                209                 :                : 
                                210                 :                :     /* Make sure we can use logical decoding */
   84                           211                 :              7 :     CheckLogicalDecodingRequirements(true);
                                212                 :                : 
                                213                 :                :     /*
                                214                 :                :      * Create the replication slot we'll use, and enable logical decoding in
                                215                 :                :      * case it isn't already on.
                                216                 :                :      *
                                217                 :                :      * Make the slot RS_TEMPORARY so that it's removed on ERROR.  A backend
                                218                 :                :      * cannot execute multiple REPACK commands at a time, so the PID is enough
                                219                 :                :      * to make the slot name unique.
                                220                 :                :      */
   21                           221                 :              7 :     snprintf(slotname, NAMEDATALEN, "pg_repack_%d", MyProcPid);
                                222                 :              7 :     ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true,
                                223                 :                :                           false, false);
   85                           224                 :              7 :     EnsureLogicalDecodingEnabled();
                                225                 :                : 
                                226                 :                :     /*
                                227                 :                :      * Set up repacked_rel_locator and repacked_rel_toast_locator, which we
                                228                 :                :      * use to skip decoding of unrelated relations.
                                229                 :                :      */
                                230                 :              7 :     rel = table_open(relid, AccessShareLock);
                                231                 :              7 :     repacked_rel_locator = rel->rd_locator;
                                232                 :              7 :     toastrelid = rel->rd_rel->reltoastrelid;
                                233         [ +  + ]:              7 :     if (OidIsValid(toastrelid))
                                234                 :                :     {
                                235                 :                :         Relation    toastrel;
                                236                 :                : 
                                237                 :                :         /* Avoid logical decoding of other TOAST relations. */
                                238                 :              3 :         toastrel = table_open(toastrelid, AccessShareLock);
                                239                 :              3 :         repacked_rel_toast_locator = toastrel->rd_locator;
                                240                 :              3 :         table_close(toastrel, AccessShareLock);
                                241                 :                :     }
                                242                 :              7 :     table_close(rel, AccessShareLock);
                                243                 :                : 
                                244                 :                :     /*
                                245                 :                :      * Set up our logical decoding context.  We initially use the blocking
                                246                 :                :      * read_local_xlog_page until we find the start point, and switch to the
                                247                 :                :      * non-blocking interface afterwards.
                                248                 :                :      */
   21                           249                 :              7 :     ctx = CreateInitDecodingContext(PGREPACK_PLUGIN,
                                250                 :                :                                     NIL,
                                251                 :                :                                     true,
                                252                 :                :                                     true,
                                253                 :                :                                     InvalidXLogRecPtr,
                                254                 :              7 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
                                255                 :                :                                                .segment_open = wal_segment_open,
                                256                 :                :                                                .segment_close = wal_segment_close),
                                257                 :                :                                     NULL, NULL, NULL);
                                258                 :                : 
                                259                 :                :     /* Complete setup of output_writer_private */
                                260                 :              7 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
                                261                 :              7 :     dstate->relid = relid;
                                262                 :              7 :     dstate->worker_cxt = CurrentMemoryContext;
                                263                 :              7 :     dstate->worker_resowner = CurrentResourceOwner;
                                264                 :                : 
                                265                 :                :     /* We don't have control on fast_forward, but verify it's sane */
                                266         [ -  + ]:              7 :     Assert(!ctx->fast_forward);
                                267                 :                : 
                                268                 :                :     /* Find our decoding starting point. */
                                269                 :              7 :     DecodingContextFindStartpoint(ctx);
                                270                 :                : 
                                271                 :                :     /* From this point on, we need non-blocking WAL reads */
                                272                 :              7 :     ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
                                273                 :                : 
                                274                 :                :     /*
                                275                 :                :      * Initialize repack_current_segment so that we can notice WAL segment
                                276                 :                :      * boundaries.
                                277                 :                :      */
   85                           278                 :              7 :     XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
                                279                 :                :                 wal_segment_size);
                                280                 :                : 
                                281                 :                :     /*
                                282                 :                :      * Set up our reader private state to let the page-read callback notify
                                283                 :                :      * when end-of-WAL has been reached.  This lives in the same context as
                                284                 :                :      * the logical decoding itself.
                                285                 :                :      */
   21                           286                 :              7 :     oldcxt = MemoryContextSwitchTo(ctx->context);
   85                           287                 :              7 :     ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
                                288                 :              7 :     MemoryContextSwitchTo(oldcxt);
                                289                 :                : 
                                290                 :              7 :     return ctx;
                                291                 :                : }
                                292                 :                : 
                                293                 :                : static void
                                294                 :              7 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
                                295                 :                : {
                                296                 :                :     RepackDecodingState *dstate;
                                297                 :                : 
                                298                 :              7 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
                                299         [ +  + ]:              7 :     if (dstate->slot)
                                300                 :              1 :         ExecDropSingleTupleTableSlot(dstate->slot);
                                301                 :                : 
                                302                 :              7 :     FreeDecodingContext(ctx);
   34                           303                 :              7 :     ReplicationSlotDropAcquired(true);
   85                           304                 :              7 : }
                                305                 :                : 
                                306                 :                : /*
                                307                 :                :  * Make snapshot available to the backend that launched the decoding worker.
                                308                 :                :  */
                                309                 :                : static void
                                310                 :              7 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
                                311                 :                : {
                                312                 :                :     char        fname[MAXPGPATH];
                                313                 :                :     BufFile    *file;
                                314                 :                :     Size        snap_size;
                                315                 :                :     char       *snap_space;
                                316                 :                : 
                                317                 :              7 :     snap_size = EstimateSnapshotSpace(snapshot);
                                318                 :              7 :     snap_space = (char *) palloc(snap_size);
                                319                 :              7 :     SerializeSnapshot(snapshot, snap_space);
                                320                 :                : 
                                321                 :              7 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
                                322                 :              7 :     file = BufFileCreateFileSet(&shared->sfs.fs, fname);
                                323                 :                :     /* To make restoration easier, write the snapshot size first. */
                                324                 :              7 :     BufFileWrite(file, &snap_size, sizeof(snap_size));
                                325                 :              7 :     BufFileWrite(file, snap_space, snap_size);
                                326                 :              7 :     BufFileClose(file);
   84                           327                 :              7 :     pfree(snap_space);
                                328                 :                : 
                                329                 :                :     /* Increase the counter to tell the backend that the file is available. */
   85                           330                 :              7 :     SpinLockAcquire(&shared->mutex);
                                331                 :              7 :     shared->last_exported++;
                                332                 :              7 :     SpinLockRelease(&shared->mutex);
                                333                 :              7 :     ConditionVariableSignal(&shared->cv);
                                334                 :              7 : }
                                335                 :                : 
                                336                 :                : /*
                                337                 :                :  * Decode logical changes from the WAL sequence and store them to a file.
                                338                 :                :  *
                                339                 :                :  * If true is returned, there is no more work for the worker.
                                340                 :                :  */
                                341                 :                : static bool
                                342                 :             14 : decode_concurrent_changes(LogicalDecodingContext *ctx,
                                343                 :                :                           DecodingWorkerShared *shared)
                                344                 :                : {
                                345                 :                :     RepackDecodingState *dstate;
                                346                 :                :     XLogRecPtr  lsn_upto;
                                347                 :                :     bool        done;
                                348                 :                :     char        fname[MAXPGPATH];
                                349                 :                : 
                                350                 :             14 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
                                351                 :                : 
                                352                 :                :     /* Open the output file. */
                                353                 :             14 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
                                354                 :             14 :     dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
                                355                 :                : 
                                356                 :             14 :     SpinLockAcquire(&shared->mutex);
                                357                 :             14 :     lsn_upto = shared->lsn_upto;
                                358                 :             14 :     done = shared->done;
                                359                 :             14 :     SpinLockRelease(&shared->mutex);
                                360                 :                : 
                                361                 :                :     while (true)
                                362                 :           1480 :     {
                                363                 :                :         XLogRecord *record;
                                364                 :                :         XLogSegNo   segno_new;
                                365                 :           1494 :         char       *errm = NULL;
                                366                 :                :         XLogRecPtr  end_lsn;
                                367                 :                : 
                                368         [ -  + ]:           1494 :         CHECK_FOR_INTERRUPTS();
                                369                 :                : 
                                370                 :           1494 :         record = XLogReadRecord(ctx->reader, &errm);
                                371         [ +  + ]:           1494 :         if (record)
                                372                 :                :         {
                                373                 :           1445 :             LogicalDecodingProcessRecord(ctx, ctx->reader);
                                374                 :                : 
                                375                 :                :             /*
                                376                 :                :              * We want to allow WAL to be recycled while REPACK is running.
                                377                 :                :              *
                                378                 :                :              * In normal usage of a replication slot, we need to be very
                                379                 :                :              * careful not to advance the LSN until it's been confirmed as
                                380                 :                :              * received by the remote.  In REPACK's case, this is not needed:
                                381                 :                :              * REPACK will never try to replay the same WAL after a crash, and
                                382                 :                :              * if there _is_ a crash, the whole REPACK has to be started from
                                383                 :                :              * scratch anyway.
                                384                 :                :              *
                                385                 :                :              * So here we disregard the careful LSN tracking and just move the
                                386                 :                :              * LSN locations forward to what we've processed.  Note that it
                                387                 :                :              * would be bogus to move the xmin forward, though, so we don't
                                388                 :                :              * touch that.
                                389                 :                :              *
                                390                 :                :              * This can be done on whatever schedule is convenient, but in
                                391                 :                :              * order not to cause unnecessary load, we only do it as we cross
                                392                 :                :              * each WAL segment boundary.
                                393                 :                :              */
                                394                 :           1445 :             end_lsn = ctx->reader->EndRecPtr;
                                395                 :           1445 :             XLByteToSeg(end_lsn, segno_new, wal_segment_size);
                                396         [ -  + ]:           1445 :             if (segno_new != repack_current_segment)
                                397                 :                :             {
   31 alvherre@kurilemu.de      398                 :UNC           0 :                 LogicalIncreaseRestartDecodingForSlot(end_lsn, end_lsn);
   85                           399                 :              0 :                 LogicalConfirmReceivedLocation(end_lsn);
                                400         [ #  # ]:              0 :                 elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
                                401                 :                :                      (uint32) (end_lsn >> 32), (uint32) end_lsn);
                                402                 :              0 :                 repack_current_segment = segno_new;
                                403                 :                :             }
                                404                 :                :         }
                                405                 :                :         else
                                406                 :                :         {
                                407                 :                :             ReadLocalXLogPageNoWaitPrivate *priv;
                                408                 :                : 
   85 alvherre@kurilemu.de      409         [ -  + ]:GNC          49 :             if (errm)
   85 alvherre@kurilemu.de      410         [ #  # ]:UNC           0 :                 ereport(ERROR,
                                411                 :                :                         errcode_for_file_access(),
                                412                 :                :                         errmsg("could not read WAL from timeline %u at %X/%08X: %s",
                                413                 :                :                                ctx->reader->currTLI,
                                414                 :                :                                LSN_FORMAT_ARGS(ctx->reader->EndRecPtr),
                                415                 :                :                                errm));
                                416                 :                : 
                                417                 :                :             /*
                                418                 :                :              * In the decoding loop we do not want to get blocked when there
                                419                 :                :              * is no more WAL available, otherwise the loop would become
                                420                 :                :              * uninterruptible.
                                421                 :                :              */
   85 alvherre@kurilemu.de      422                 :GNC          49 :             priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
                                423         [ +  - ]:             49 :             if (priv->end_of_wal)
                                424                 :                :                 /* Do not miss the end of WAL condition next time. */
                                425                 :             49 :                 priv->end_of_wal = false;
                                426                 :                :             else
   85 alvherre@kurilemu.de      427         [ #  # ]:UNC           0 :                 ereport(ERROR,
                                428                 :                :                         errcode(ERRCODE_DATA_CORRUPTED),
                                429                 :                :                         errmsg("could not read WAL record"));
                                430                 :                :         }
                                431                 :                : 
                                432                 :                :         /*
                                433                 :                :          * Whether we could read new record or not, keep checking if
                                434                 :                :          * 'lsn_upto' was specified.
                                435                 :                :          */
   85 alvherre@kurilemu.de      436         [ +  + ]:GNC        1494 :         if (!XLogRecPtrIsValid(lsn_upto))
                                437                 :                :         {
                                438                 :           1270 :             SpinLockAcquire(&shared->mutex);
                                439                 :           1270 :             lsn_upto = shared->lsn_upto;
                                440                 :                :             /* 'done' should be set at the same time as 'lsn_upto' */
                                441                 :           1270 :             done = shared->done;
                                442                 :           1270 :             SpinLockRelease(&shared->mutex);
                                443                 :                :         }
                                444         [ +  + ]:           1494 :         if (XLogRecPtrIsValid(lsn_upto) &&
                                445         [ +  + ]:            237 :             ctx->reader->EndRecPtr >= lsn_upto)
                                446                 :             14 :             break;
                                447                 :                : 
                                448         [ +  + ]:           1480 :         if (record == NULL)
                                449                 :                :         {
                                450                 :             41 :             int64       timeout = 0;
                                451                 :                :             WaitLSNResult res;
                                452                 :                : 
                                453                 :                :             /*
                                454                 :                :              * Before we retry reading, wait until new WAL is flushed.
                                455                 :                :              *
                                456                 :                :              * There is a race condition such that the backend executing
                                457                 :                :              * REPACK determines 'lsn_upto', but before it sets the shared
                                458                 :                :              * variable, we reach the end of WAL. In that case we'd need to
                                459                 :                :              * wait until the next WAL flush (unrelated to REPACK). Although
                                460                 :                :              * that should not be a problem in a busy system, it might be
                                461                 :                :              * noticeable in other cases, including regression tests (which
                                462                 :                :              * are not necessarily executed in parallel). Therefore it makes
                                463                 :                :              * sense to use timeout.
                                464                 :                :              *
                                465                 :                :              * If lsn_upto is valid, WAL records having LSN lower than that
                                466                 :                :              * should already have been flushed to disk.
                                467                 :                :              */
                                468         [ +  - ]:             41 :             if (!XLogRecPtrIsValid(lsn_upto))
                                469                 :             41 :                 timeout = 100L;
                                470                 :             41 :             res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
                                471                 :             41 :                              ctx->reader->EndRecPtr + 1,
                                472                 :                :                              timeout);
                                473   [ +  +  -  + ]:             41 :             if (res != WAIT_LSN_RESULT_SUCCESS &&
                                474                 :                :                 res != WAIT_LSN_RESULT_TIMEOUT)
   85 alvherre@kurilemu.de      475         [ #  # ]:UNC           0 :                 ereport(ERROR,
                                476                 :                :                         errcode(ERRCODE_INTERNAL_ERROR),
                                477                 :                :                         errmsg("waiting for WAL failed"));
                                478                 :                :         }
                                479                 :                :     }
                                480                 :                : 
                                481                 :                :     /*
                                482                 :                :      * Close the file so we can make it available to the backend.
                                483                 :                :      */
   85 alvherre@kurilemu.de      484                 :GNC          14 :     BufFileClose(dstate->file);
                                485                 :             14 :     dstate->file = NULL;
                                486                 :             14 :     SpinLockAcquire(&shared->mutex);
                                487                 :             14 :     shared->lsn_upto = InvalidXLogRecPtr;
                                488                 :             14 :     shared->last_exported++;
                                489                 :             14 :     SpinLockRelease(&shared->mutex);
                                490                 :             14 :     ConditionVariableSignal(&shared->cv);
                                491                 :                : 
                                492                 :             14 :     return done;
                                493                 :                : }
                                494                 :                : 
                                495                 :                : /*
                                496                 :                :  * Does the WAL record contain a data change that this backend does not need
                                497                 :                :  * to decode on behalf of REPACK (CONCURRENTLY)?
                                498                 :                :  */
                                499                 :                : bool
                                500                 :        1416899 : change_useless_for_repack(XLogRecordBuffer *buf)
                                501                 :                : {
                                502                 :        1416899 :     XLogReaderState *r = buf->record;
                                503                 :                :     RelFileLocator locator;
                                504                 :                : 
                                505                 :                :     /* TOAST locator should not be set unless the main is. */
                                506   [ +  +  -  + ]:        1416899 :     Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
                                507                 :                :            OidIsValid(repacked_rel_locator.relNumber));
                                508                 :                : 
                                509                 :                :     /*
                                510                 :                :      * Backends not involved in REPACK (CONCURRENTLY) should not do the
                                511                 :                :      * filtering.
                                512                 :                :      */
                                513         [ +  + ]:        1416899 :     if (!OidIsValid(repacked_rel_locator.relNumber))
                                514                 :        1416469 :         return false;
                                515                 :                : 
                                516                 :                :     /*
                                517                 :                :      * If the record does not contain the block 0, it's probably not INSERT /
                                518                 :                :      * UPDATE / DELETE. In any case, we do not have enough information to
                                519                 :                :      * filter the change out.
                                520                 :                :      */
                                521         [ -  + ]:            430 :     if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
   85 alvherre@kurilemu.de      522                 :UNC           0 :         return false;
                                523                 :                : 
                                524                 :                :     /*
                                525                 :                :      * Decode the change if it belongs to the table we are repacking, or if it
                                526                 :                :      * belongs to its TOAST relation.
                                527                 :                :      */
   85 alvherre@kurilemu.de      528   [ +  +  +  -  :GNC         430 :     if (RelFileLocatorEquals(locator, repacked_rel_locator))
                                              +  - ]
                                529                 :             33 :         return false;
                                530         [ +  + ]:            397 :     if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
                                531   [ +  +  +  -  :            299 :         RelFileLocatorEquals(locator, repacked_rel_toast_locator))
                                              +  - ]
                                532                 :             44 :         return false;
                                533                 :                : 
                                534                 :                :     /* Filter out changes of other tables. */
                                535                 :            353 :     return true;
                                536                 :                : }
        

Generated by: LCOV version 2.0-1