LCOV - code coverage report
Current view: top level - src/backend/commands - repack_worker.c (source / functions) Coverage Total Hit
Test: PostgreSQL 20devel Lines: 93.6 % 157 147
Test Date: 2026-07-03 19:57:34 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 62.9 % 62 39

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * repack_worker.c
       4                 :             :  *    Implementation of the background worker for ad-hoc logical decoding
       5                 :             :  *    during REPACK (CONCURRENTLY).
       6                 :             :  *
       7                 :             :  *
       8                 :             :  * Copyright (c) 2026, PostgreSQL Global Development Group
       9                 :             :  *
      10                 :             :  *
      11                 :             :  * IDENTIFICATION
      12                 :             :  *    src/backend/commands/repack_worker.c
      13                 :             :  *
      14                 :             :  *-------------------------------------------------------------------------
      15                 :             :  */
      16                 :             : #include "postgres.h"
      17                 :             : 
      18                 :             : #include "access/table.h"
      19                 :             : #include "access/xlog_internal.h"
      20                 :             : #include "access/xlogutils.h"
      21                 :             : #include "access/xlogwait.h"
      22                 :             : #include "commands/repack.h"
      23                 :             : #include "commands/repack_internal.h"
      24                 :             : #include "libpq/pqmq.h"
      25                 :             : #include "replication/snapbuild.h"
      26                 :             : #include "storage/ipc.h"
      27                 :             : #include "storage/proc.h"
      28                 :             : #include "tcop/tcopprot.h"
      29                 :             : #include "utils/memutils.h"
      30                 :             : 
      31                 :             : #define PGREPACK_PLUGIN   "pgrepack"
      32                 :             : 
      33                 :             : static void RepackWorkerShutdown(int code, Datum arg);
      34                 :             : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
      35                 :             : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
      36                 :             : static void export_initial_snapshot(Snapshot snapshot,
      37                 :             :                                     DecodingWorkerShared *shared);
      38                 :             : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
      39                 :             :                                       DecodingWorkerShared *shared);
      40                 :             : 
      41                 :             : /* Is this process a REPACK worker? */
      42                 :             : static bool am_repack_worker = false;
      43                 :             : 
      44                 :             : /* The WAL segment being decoded. */
      45                 :             : static XLogSegNo repack_current_segment = 0;
      46                 :             : 
      47                 :             : /* Our DSM segment, for shutting down */
      48                 :             : static dsm_segment *worker_dsm_segment = NULL;
      49                 :             : 
      50                 :             : /*
      51                 :             :  * Keep track of the table we're processing, to skip logical decoding of data
      52                 :             :  * from other relations.
      53                 :             :  */
      54                 :             : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
      55                 :             : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
      56                 :             : 
      57                 :             : 
      58                 :             : /* REPACK decoding worker entry point */
      59                 :             : void
      60                 :           7 : RepackWorkerMain(Datum main_arg)
      61                 :             : {
      62                 :             :     dsm_segment *seg;
      63                 :             :     DecodingWorkerShared *shared;
      64                 :             :     shm_mq     *mq;
      65                 :             :     shm_mq_handle *mqh;
      66                 :             :     LogicalDecodingContext *decoding_ctx;
      67                 :             :     SharedFileSet *sfs;
      68                 :             :     Snapshot    snapshot;
      69                 :             : 
      70                 :           7 :     am_repack_worker = true;
      71                 :             : 
      72                 :           7 :     BackgroundWorkerUnblockSignals();
      73                 :             : 
      74                 :           7 :     seg = dsm_attach(DatumGetUInt32(main_arg));
      75         [ -  + ]:           7 :     if (seg == NULL)
      76         [ #  # ]:           0 :         ereport(ERROR,
      77                 :             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      78                 :             :                 errmsg("could not map dynamic shared memory segment"));
      79                 :           7 :     worker_dsm_segment = seg;
      80                 :             : 
      81                 :           7 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
      82                 :             : 
      83                 :             :     /* Arrange to signal the leader if we exit. */
      84                 :           7 :     before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
      85                 :             : 
      86                 :             :     /*
      87                 :             :      * Join locking group - see the comments around the call of
      88                 :             :      * start_repack_decoding_worker().
      89                 :             :      */
      90         [ -  + ]:           7 :     if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
      91                 :           0 :         return;                 /* The leader is not running anymore. */
      92                 :             : 
      93                 :             :     /*
      94                 :             :      * Setup a queue to send error messages to the backend that launched this
      95                 :             :      * worker.
      96                 :             :      */
      97                 :           7 :     mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
      98                 :           7 :     shm_mq_set_sender(mq, MyProc);
      99                 :           7 :     mqh = shm_mq_attach(mq, seg, NULL);
     100                 :           7 :     pq_redirect_to_shm_mq(seg, mqh);
     101                 :           7 :     pq_set_parallel_leader(shared->backend_pid,
     102                 :             :                            shared->backend_proc_number);
     103                 :             : 
     104                 :             :     /* Connect to the database. LOGIN is not required. */
     105                 :           7 :     BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
     106                 :             :                                               BGWORKER_BYPASS_ROLELOGINCHECK);
     107                 :             : 
     108                 :             :     /*
     109                 :             :      * Transaction is needed to open relation, and it also provides us with a
     110                 :             :      * resource owner.
     111                 :             :      */
     112                 :           7 :     StartTransactionCommand();
     113                 :             : 
     114                 :           7 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
     115                 :             : 
     116                 :             :     /*
     117                 :             :      * Not sure the spinlock is needed here - the backend should not change
     118                 :             :      * anything in the shared memory until we have serialized the snapshot.
     119                 :             :      */
     120                 :           7 :     SpinLockAcquire(&shared->mutex);
     121                 :             :     Assert(!XLogRecPtrIsValid(shared->lsn_upto));
     122                 :           7 :     sfs = &shared->sfs;
     123                 :           7 :     SpinLockRelease(&shared->mutex);
     124                 :             : 
     125                 :           7 :     SharedFileSetAttach(sfs, seg);
     126                 :             : 
     127                 :             :     /*
     128                 :             :      * Prepare to capture the concurrent data changes ourselves.
     129                 :             :      */
     130                 :           7 :     decoding_ctx = repack_setup_logical_decoding(shared->relid);
     131                 :             : 
     132                 :             :     /* Announce that we're ready. */
     133                 :           7 :     SpinLockAcquire(&shared->mutex);
     134                 :           7 :     shared->initialized = true;
     135                 :           7 :     SpinLockRelease(&shared->mutex);
     136                 :           7 :     ConditionVariableSignal(&shared->cv);
     137                 :             : 
     138                 :             :     /* There doesn't seem to a nice API to set these */
     139                 :           7 :     XactIsoLevel = XACT_REPEATABLE_READ;
     140                 :           7 :     XactReadOnly = true;
     141                 :             : 
     142                 :             :     /* Build the initial snapshot and export it. */
     143                 :           7 :     snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
     144                 :           7 :     export_initial_snapshot(snapshot, shared);
     145                 :             : 
     146                 :             :     /*
     147                 :             :      * Only historic snapshots should be used now. Do not let us restrict the
     148                 :             :      * progress of xmin horizon.
     149                 :             :      */
     150                 :           7 :     InvalidateCatalogSnapshot();
     151                 :             : 
     152                 :             :     for (;;)
     153                 :           7 :     {
     154                 :          14 :         bool        stop = decode_concurrent_changes(decoding_ctx, shared);
     155                 :             : 
     156         [ +  + ]:          14 :         if (stop)
     157                 :           7 :             break;
     158                 :             : 
     159                 :             :     }
     160                 :             : 
     161                 :             :     /* Cleanup. */
     162                 :           7 :     repack_cleanup_logical_decoding(decoding_ctx);
     163                 :           7 :     CommitTransactionCommand();
     164                 :             : }
     165                 :             : 
     166                 :             : /*
     167                 :             :  * See ParallelWorkerShutdown for details.
     168                 :             :  */
     169                 :             : static void
     170                 :           7 : RepackWorkerShutdown(int code, Datum arg)
     171                 :             : {
     172                 :           7 :     DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
     173                 :             : 
     174                 :           7 :     SendProcSignal(shared->backend_pid,
     175                 :             :                    PROCSIG_REPACK_MESSAGE,
     176                 :             :                    shared->backend_proc_number);
     177                 :             : 
     178                 :           7 :     dsm_detach(worker_dsm_segment);
     179                 :           7 : }
     180                 :             : 
     181                 :             : bool
     182                 :        2025 : AmRepackWorker(void)
     183                 :             : {
     184                 :        2025 :     return am_repack_worker;
     185                 :             : }
     186                 :             : 
     187                 :             : /*
     188                 :             :  * This function is much like pg_create_logical_replication_slot() except that
     189                 :             :  * the new slot is neither released (if anyone else could read changes from
     190                 :             :  * our slot, we could miss changes other backends do while we copy the
     191                 :             :  * existing data into temporary table), nor persisted (it's easier to handle
     192                 :             :  * crash by restarting all the work from scratch).
     193                 :             :  */
     194                 :             : static LogicalDecodingContext *
     195                 :           7 : repack_setup_logical_decoding(Oid relid)
     196                 :             : {
     197                 :             :     Relation    rel;
     198                 :             :     Oid         toastrelid;
     199                 :             :     LogicalDecodingContext *ctx;
     200                 :             :     char        slotname[NAMEDATALEN];
     201                 :             :     RepackDecodingState *dstate;
     202                 :             :     MemoryContext oldcxt;
     203                 :             : 
     204                 :             :     /*
     205                 :             :      * REPACK CONCURRENTLY is not allowed in a transaction block, so this
     206                 :             :      * should never fire.
     207                 :             :      */
     208                 :             :     Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
     209                 :             : 
     210                 :             :     /* Make sure we can use logical decoding */
     211                 :           7 :     CheckLogicalDecodingRequirements(true);
     212                 :             : 
     213                 :             :     /*
     214                 :             :      * Create the replication slot we'll use, and enable logical decoding in
     215                 :             :      * case it isn't already on.
     216                 :             :      *
     217                 :             :      * Make the slot RS_TEMPORARY so that it's removed on ERROR.  A backend
     218                 :             :      * cannot execute multiple REPACK commands at a time, so the PID is enough
     219                 :             :      * to make the slot name unique.
     220                 :             :      */
     221                 :           7 :     snprintf(slotname, NAMEDATALEN, "pg_repack_%d", MyProcPid);
     222                 :           7 :     ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true,
     223                 :             :                           false, false);
     224                 :           7 :     EnsureLogicalDecodingEnabled();
     225                 :             : 
     226                 :             :     /*
     227                 :             :      * Set up repacked_rel_locator and repacked_rel_toast_locator, which we
     228                 :             :      * use to skip decoding of unrelated relations.
     229                 :             :      */
     230                 :           7 :     rel = table_open(relid, AccessShareLock);
     231                 :           7 :     repacked_rel_locator = rel->rd_locator;
     232                 :           7 :     toastrelid = rel->rd_rel->reltoastrelid;
     233         [ +  + ]:           7 :     if (OidIsValid(toastrelid))
     234                 :             :     {
     235                 :             :         Relation    toastrel;
     236                 :             : 
     237                 :             :         /* Avoid logical decoding of other TOAST relations. */
     238                 :           3 :         toastrel = table_open(toastrelid, AccessShareLock);
     239                 :           3 :         repacked_rel_toast_locator = toastrel->rd_locator;
     240                 :           3 :         table_close(toastrel, AccessShareLock);
     241                 :             :     }
     242                 :           7 :     table_close(rel, AccessShareLock);
     243                 :             : 
     244                 :             :     /*
     245                 :             :      * Set up our logical decoding context.  We initially use the blocking
     246                 :             :      * read_local_xlog_page until we find the start point, and switch to the
     247                 :             :      * non-blocking interface afterwards.
     248                 :             :      */
     249                 :           7 :     ctx = CreateInitDecodingContext(PGREPACK_PLUGIN,
     250                 :             :                                     NIL,
     251                 :             :                                     true,
     252                 :             :                                     true,
     253                 :             :                                     InvalidXLogRecPtr,
     254                 :           7 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     255                 :             :                                                .segment_open = wal_segment_open,
     256                 :             :                                                .segment_close = wal_segment_close),
     257                 :             :                                     NULL, NULL, NULL);
     258                 :             : 
     259                 :             :     /* Complete setup of output_writer_private */
     260                 :           7 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     261                 :           7 :     dstate->relid = relid;
     262                 :           7 :     dstate->worker_cxt = CurrentMemoryContext;
     263                 :           7 :     dstate->worker_resowner = CurrentResourceOwner;
     264                 :             : 
     265                 :             :     /* We don't have control on fast_forward, but verify it's sane */
     266                 :             :     Assert(!ctx->fast_forward);
     267                 :             : 
     268                 :             :     /* Find our decoding starting point. */
     269                 :           7 :     DecodingContextFindStartpoint(ctx);
     270                 :             : 
     271                 :             :     /* From this point on, we need non-blocking WAL reads */
     272                 :           7 :     ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
     273                 :             : 
     274                 :             :     /*
     275                 :             :      * Initialize repack_current_segment so that we can notice WAL segment
     276                 :             :      * boundaries.
     277                 :             :      */
     278                 :           7 :     XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
     279                 :             :                 wal_segment_size);
     280                 :             : 
     281                 :             :     /*
     282                 :             :      * Set up our reader private state to let the page-read callback notify
     283                 :             :      * when end-of-WAL has been reached.  This lives in the same context as
     284                 :             :      * the logical decoding itself.
     285                 :             :      */
     286                 :           7 :     oldcxt = MemoryContextSwitchTo(ctx->context);
     287                 :           7 :     ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
     288                 :           7 :     MemoryContextSwitchTo(oldcxt);
     289                 :             : 
     290                 :           7 :     return ctx;
     291                 :             : }
     292                 :             : 
     293                 :             : static void
     294                 :           7 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
     295                 :             : {
     296                 :             :     RepackDecodingState *dstate;
     297                 :             : 
     298                 :           7 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     299         [ +  + ]:           7 :     if (dstate->slot)
     300                 :           1 :         ExecDropSingleTupleTableSlot(dstate->slot);
     301                 :             : 
     302                 :           7 :     FreeDecodingContext(ctx);
     303                 :           7 :     ReplicationSlotDropAcquired(true);
     304                 :           7 : }
     305                 :             : 
     306                 :             : /*
     307                 :             :  * Make snapshot available to the backend that launched the decoding worker.
     308                 :             :  */
     309                 :             : static void
     310                 :           7 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
     311                 :             : {
     312                 :             :     char        fname[MAXPGPATH];
     313                 :             :     BufFile    *file;
     314                 :             :     Size        snap_size;
     315                 :             :     char       *snap_space;
     316                 :             : 
     317                 :           7 :     snap_size = EstimateSnapshotSpace(snapshot);
     318                 :           7 :     snap_space = (char *) palloc(snap_size);
     319                 :           7 :     SerializeSnapshot(snapshot, snap_space);
     320                 :             : 
     321                 :           7 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
     322                 :           7 :     file = BufFileCreateFileSet(&shared->sfs.fs, fname);
     323                 :             :     /* To make restoration easier, write the snapshot size first. */
     324                 :           7 :     BufFileWrite(file, &snap_size, sizeof(snap_size));
     325                 :           7 :     BufFileWrite(file, snap_space, snap_size);
     326                 :           7 :     BufFileClose(file);
     327                 :           7 :     pfree(snap_space);
     328                 :             : 
     329                 :             :     /* Increase the counter to tell the backend that the file is available. */
     330                 :           7 :     SpinLockAcquire(&shared->mutex);
     331                 :           7 :     shared->last_exported++;
     332                 :           7 :     SpinLockRelease(&shared->mutex);
     333                 :           7 :     ConditionVariableSignal(&shared->cv);
     334                 :           7 : }
     335                 :             : 
     336                 :             : /*
     337                 :             :  * Decode logical changes from the WAL sequence and store them to a file.
     338                 :             :  *
     339                 :             :  * If true is returned, there is no more work for the worker.
     340                 :             :  */
     341                 :             : static bool
     342                 :          14 : decode_concurrent_changes(LogicalDecodingContext *ctx,
     343                 :             :                           DecodingWorkerShared *shared)
     344                 :             : {
     345                 :             :     RepackDecodingState *dstate;
     346                 :             :     XLogRecPtr  lsn_upto;
     347                 :             :     bool        done;
     348                 :             :     char        fname[MAXPGPATH];
     349                 :             : 
     350                 :          14 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     351                 :             : 
     352                 :             :     /* Open the output file. */
     353                 :          14 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
     354                 :          14 :     dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
     355                 :             : 
     356                 :          14 :     SpinLockAcquire(&shared->mutex);
     357                 :          14 :     lsn_upto = shared->lsn_upto;
     358                 :          14 :     done = shared->done;
     359                 :          14 :     SpinLockRelease(&shared->mutex);
     360                 :             : 
     361                 :             :     while (true)
     362                 :        9381 :     {
     363                 :             :         XLogRecord *record;
     364                 :             :         XLogSegNo   segno_new;
     365                 :        9395 :         char       *errm = NULL;
     366                 :             :         XLogRecPtr  end_lsn;
     367                 :             : 
     368         [ -  + ]:        9395 :         CHECK_FOR_INTERRUPTS();
     369                 :             : 
     370                 :        9395 :         record = XLogReadRecord(ctx->reader, &errm);
     371         [ +  + ]:        9395 :         if (record)
     372                 :             :         {
     373                 :        1473 :             LogicalDecodingProcessRecord(ctx, ctx->reader);
     374                 :             : 
     375                 :             :             /*
     376                 :             :              * We want to allow WAL to be recycled while REPACK is running.
     377                 :             :              *
     378                 :             :              * In normal usage of a replication slot, we need to be very
     379                 :             :              * careful not to advance the LSN until it's been confirmed as
     380                 :             :              * received by the remote.  In REPACK's case, this is not needed:
     381                 :             :              * REPACK will never try to replay the same WAL after a crash, and
     382                 :             :              * if there _is_ a crash, the whole REPACK has to be started from
     383                 :             :              * scratch anyway.
     384                 :             :              *
     385                 :             :              * So here we disregard the careful LSN tracking and just move the
     386                 :             :              * LSN locations forward to what we've processed.  Note that it
     387                 :             :              * would be bogus to move the xmin forward, though, so we don't
     388                 :             :              * touch that.
     389                 :             :              *
     390                 :             :              * This can be done on whatever schedule is convenient, but in
     391                 :             :              * order not to cause unnecessary load, we only do it as we cross
     392                 :             :              * each WAL segment boundary.
     393                 :             :              */
     394                 :        1473 :             end_lsn = ctx->reader->EndRecPtr;
     395                 :        1473 :             XLByteToSeg(end_lsn, segno_new, wal_segment_size);
     396         [ -  + ]:        1473 :             if (segno_new != repack_current_segment)
     397                 :             :             {
     398                 :           0 :                 LogicalIncreaseRestartDecodingForSlot(end_lsn, end_lsn);
     399                 :           0 :                 LogicalConfirmReceivedLocation(end_lsn);
     400         [ #  # ]:           0 :                 elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
     401                 :             :                      (uint32) (end_lsn >> 32), (uint32) end_lsn);
     402                 :           0 :                 repack_current_segment = segno_new;
     403                 :             :             }
     404                 :             :         }
     405                 :             :         else
     406                 :             :         {
     407                 :             :             ReadLocalXLogPageNoWaitPrivate *priv;
     408                 :             : 
     409         [ -  + ]:        7922 :             if (errm)
     410         [ #  # ]:           0 :                 ereport(ERROR,
     411                 :             :                         errcode_for_file_access(),
     412                 :             :                         errmsg("could not read WAL from timeline %u at %X/%08X: %s",
     413                 :             :                                ctx->reader->currTLI,
     414                 :             :                                LSN_FORMAT_ARGS(ctx->reader->EndRecPtr),
     415                 :             :                                errm));
     416                 :             : 
     417                 :             :             /*
     418                 :             :              * In the decoding loop we do not want to get blocked when there
     419                 :             :              * is no more WAL available, otherwise the loop would become
     420                 :             :              * uninterruptible.
     421                 :             :              */
     422                 :        7922 :             priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
     423         [ +  - ]:        7922 :             if (priv->end_of_wal)
     424                 :             :                 /* Do not miss the end of WAL condition next time. */
     425                 :        7922 :                 priv->end_of_wal = false;
     426                 :             :             else
     427         [ #  # ]:           0 :                 ereport(ERROR,
     428                 :             :                         errcode(ERRCODE_DATA_CORRUPTED),
     429                 :             :                         errmsg("could not read WAL record"));
     430                 :             :         }
     431                 :             : 
     432                 :             :         /*
     433                 :             :          * Whether we could read new record or not, keep checking if
     434                 :             :          * 'lsn_upto' was specified.
     435                 :             :          */
     436         [ +  + ]:        9395 :         if (!XLogRecPtrIsValid(lsn_upto))
     437                 :             :         {
     438                 :        9034 :             SpinLockAcquire(&shared->mutex);
     439                 :        9034 :             lsn_upto = shared->lsn_upto;
     440                 :             :             /* 'done' should be set at the same time as 'lsn_upto' */
     441                 :        9034 :             done = shared->done;
     442                 :        9034 :             SpinLockRelease(&shared->mutex);
     443                 :             :         }
     444         [ +  + ]:        9395 :         if (XLogRecPtrIsValid(lsn_upto) &&
     445         [ +  + ]:         373 :             ctx->reader->EndRecPtr >= lsn_upto)
     446                 :          14 :             break;
     447                 :             : 
     448         [ +  + ]:        9381 :         if (record == NULL)
     449                 :             :         {
     450                 :        7916 :             int64       timeout = 0;
     451                 :             :             WaitLSNResult res;
     452                 :             : 
     453                 :             :             /*
     454                 :             :              * Before we retry reading, wait until new WAL is flushed.
     455                 :             :              *
     456                 :             :              * There is a race condition such that the backend executing
     457                 :             :              * REPACK determines 'lsn_upto', but before it sets the shared
     458                 :             :              * variable, we reach the end of WAL. In that case we'd need to
     459                 :             :              * wait until the next WAL flush (unrelated to REPACK). Although
     460                 :             :              * that should not be a problem in a busy system, it might be
     461                 :             :              * noticeable in other cases, including regression tests (which
     462                 :             :              * are not necessarily executed in parallel). Therefore it makes
     463                 :             :              * sense to use timeout.
     464                 :             :              *
     465                 :             :              * If lsn_upto is valid, WAL records having LSN lower than that
     466                 :             :              * should already have been flushed to disk.
     467                 :             :              */
     468         [ +  - ]:        7916 :             if (!XLogRecPtrIsValid(lsn_upto))
     469                 :        7916 :                 timeout = 100L;
     470                 :        7916 :             res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
     471                 :        7916 :                              ctx->reader->EndRecPtr + 1,
     472                 :             :                              timeout);
     473   [ +  +  -  + ]:        7916 :             if (res != WAIT_LSN_RESULT_SUCCESS &&
     474                 :             :                 res != WAIT_LSN_RESULT_TIMEOUT)
     475         [ #  # ]:           0 :                 ereport(ERROR,
     476                 :             :                         errcode(ERRCODE_INTERNAL_ERROR),
     477                 :             :                         errmsg("waiting for WAL failed"));
     478                 :             :         }
     479                 :             :     }
     480                 :             : 
     481                 :             :     /*
     482                 :             :      * Close the file so we can make it available to the backend.
     483                 :             :      */
     484                 :          14 :     BufFileClose(dstate->file);
     485                 :          14 :     dstate->file = NULL;
     486                 :          14 :     SpinLockAcquire(&shared->mutex);
     487                 :          14 :     shared->lsn_upto = InvalidXLogRecPtr;
     488                 :          14 :     shared->last_exported++;
     489                 :          14 :     SpinLockRelease(&shared->mutex);
     490                 :          14 :     ConditionVariableSignal(&shared->cv);
     491                 :             : 
     492                 :          14 :     return done;
     493                 :             : }
     494                 :             : 
     495                 :             : /*
     496                 :             :  * Does the WAL record contain a data change that this backend does not need
     497                 :             :  * to decode on behalf of REPACK (CONCURRENTLY)?
     498                 :             :  */
     499                 :             : bool
     500                 :     1746254 : change_useless_for_repack(XLogRecordBuffer *buf)
     501                 :             : {
     502                 :     1746254 :     XLogReaderState *r = buf->record;
     503                 :             :     RelFileLocator locator;
     504                 :             : 
     505                 :             :     /* TOAST locator should not be set unless the main is. */
     506                 :             :     Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
     507                 :             :            OidIsValid(repacked_rel_locator.relNumber));
     508                 :             : 
     509                 :             :     /*
     510                 :             :      * Backends not involved in REPACK (CONCURRENTLY) should not do the
     511                 :             :      * filtering.
     512                 :             :      */
     513         [ +  + ]:     1746254 :     if (!OidIsValid(repacked_rel_locator.relNumber))
     514                 :     1745819 :         return false;
     515                 :             : 
     516                 :             :     /*
     517                 :             :      * If the record does not contain the block 0, it's probably not INSERT /
     518                 :             :      * UPDATE / DELETE. In any case, we do not have enough information to
     519                 :             :      * filter the change out.
     520                 :             :      */
     521         [ -  + ]:         435 :     if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
     522                 :           0 :         return false;
     523                 :             : 
     524                 :             :     /*
     525                 :             :      * Decode the change if it belongs to the table we are repacking, or if it
     526                 :             :      * belongs to its TOAST relation.
     527                 :             :      */
     528   [ +  +  +  -  :         435 :     if (RelFileLocatorEquals(locator, repacked_rel_locator))
                   +  - ]
     529                 :          33 :         return false;
     530         [ +  + ]:         402 :     if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
     531   [ +  +  +  -  :         298 :         RelFileLocatorEquals(locator, repacked_rel_toast_locator))
                   +  - ]
     532                 :          44 :         return false;
     533                 :             : 
     534                 :             :     /* Filter out changes of other tables. */
     535                 :         358 :     return true;
     536                 :             : }
        

Generated by: LCOV version 2.0-1