LCOV - differential code coverage report
Current view: top level - src/backend/replication/pgrepack - pgrepack.c (source / functions) Coverage Total Hit UNC GNC
Current: d36b728949bf4e37ada1cd23e0f2aaa94f609a70 vs 52e118fe2f7e3381bdaa479816a7f72eda2ae517 Lines: 93.6 % 94 88 6 88
Current Date: 2026-06-29 16:15:13 +0200 Functions: 100.0 % 8 8 8
Baseline: lcov-20260630-baseline Branches: 66.7 % 54 36 18 36
Baseline Date: 2026-06-29 13:01:57 +0200 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 6 6 6
(30,360] days: 93.2 % 88 82 6 82
Function coverage date bins:
(30,360] days: 100.0 % 8 8 8
Branch coverage date bins:
(7,30] days: 66.7 % 6 4 2 4
(30,360] days: 66.7 % 48 32 16 32

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * pgrepack.c
                                  4                 :                :  *      Logical Replication output plugin for REPACK command
                                  5                 :                :  *
                                  6                 :                :  * Copyright (c) 2026, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * IDENTIFICATION
                                  9                 :                :  *        src/backend/replication/pgrepack/pgrepack.c
                                 10                 :                :  *
                                 11                 :                :  *-------------------------------------------------------------------------
                                 12                 :                :  */
                                 13                 :                : #include "postgres.h"
                                 14                 :                : 
                                 15                 :                : #include "access/detoast.h"
                                 16                 :                : #include "commands/repack.h"
                                 17                 :                : #include "commands/repack_internal.h"
                                 18                 :                : #include "replication/snapbuild.h"
                                 19                 :                : #include "utils/memutils.h"
                                 20                 :                : 
   85 alvherre@kurilemu.de       21                 :GNC           8 : PG_MODULE_MAGIC;
                                 22                 :                : 
                                 23                 :                : static void repack_startup(LogicalDecodingContext *ctx,
                                 24                 :                :                            OutputPluginOptions *opt, bool is_init);
                                 25                 :                : static void repack_shutdown(LogicalDecodingContext *ctx);
                                 26                 :                : static void repack_begin_txn(LogicalDecodingContext *ctx,
                                 27                 :                :                              ReorderBufferTXN *txn);
                                 28                 :                : static void repack_commit_txn(LogicalDecodingContext *ctx,
                                 29                 :                :                               ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
                                 30                 :                : static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                 31                 :                :                                   Relation relation, ReorderBufferChange *change);
                                 32                 :                : static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
                                 33                 :                :                                 ConcurrentChangeKind kind, HeapTuple tuple);
                                 34                 :                : 
                                 35                 :                : void
                                 36                 :              8 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
                                 37                 :                : {
                                 38                 :              8 :     cb->startup_cb = repack_startup;
                                 39                 :              8 :     cb->begin_cb = repack_begin_txn;
                                 40                 :              8 :     cb->change_cb = repack_process_change;
                                 41                 :              8 :     cb->commit_cb = repack_commit_txn;
                                 42                 :              8 :     cb->shutdown_cb = repack_shutdown;
                                 43                 :              8 : }
                                 44                 :                : 
                                 45                 :                : 
                                 46                 :                : /* initialize this plugin */
                                 47                 :                : static void
                                 48                 :              8 : repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                                 49                 :                :                bool is_init)
                                 50                 :                : {
                                 51                 :                :     RepackDecodingState *dstate;
                                 52                 :                : 
   21                            53         [ +  + ]:              8 :     if (!AmRepackWorker())
                                 54         [ +  - ]:              1 :         ereport(ERROR,
                                 55                 :                :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 56                 :                :                 errmsg("unsupported use of logical decoding plugin \"%s\"",
                                 57                 :                :                        "pgrepack"),
                                 58                 :                :                 errdetail("This plugin can only be used by %s.",
                                 59                 :                :                           "REPACK (CONCURRENTLY)"));
                                 60                 :                : 
                                 61                 :                :     /* Initial setup of our private state */
                                 62         [ -  + ]:              7 :     Assert(CurrentMemoryContext == ctx->context);
                                 63                 :              7 :     dstate = palloc0_object(RepackDecodingState);
                                 64                 :              7 :     dstate->change_cxt = AllocSetContextCreate(ctx->context,
                                 65                 :                :                                                "REPACK - change",
                                 66                 :                :                                                ALLOCSET_DEFAULT_SIZES);
                                 67                 :                :     /* repack_setup_logical_decoding fills in the rest */
                                 68                 :              7 :     ctx->output_writer_private = dstate;
                                 69                 :                : 
                                 70                 :                :     /* Probably unnecessary, as we don't use the SQL interface ... */
   85                            71                 :              7 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
                                 72                 :                : 
                                 73         [ -  + ]:              7 :     if (ctx->output_plugin_options != NIL)
                                 74                 :                :     {
   85 alvherre@kurilemu.de       75         [ #  # ]:UNC           0 :         ereport(ERROR,
                                 76                 :                :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                 77                 :                :                 errmsg("this plugin does not expect any options"));
                                 78                 :                :     }
   85 alvherre@kurilemu.de       79                 :GNC           7 : }
                                 80                 :                : 
                                 81                 :                : static void
                                 82                 :              7 : repack_shutdown(LogicalDecodingContext *ctx)
                                 83                 :                : {
                                 84                 :              7 : }
                                 85                 :                : 
                                 86                 :                : /*
                                 87                 :                :  * As we don't release the slot during processing of particular table, there's
                                 88                 :                :  * no room for SQL interface, even for debugging purposes. Therefore we need
                                 89                 :                :  * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
                                 90                 :                :  * callbacks. (Although we might want to write custom callbacks, this API
                                 91                 :                :  * seems to be unnecessarily generic for our purposes.)
                                 92                 :                :  */
                                 93                 :                : 
                                 94                 :                : /* BEGIN callback */
                                 95                 :                : static void
                                 96                 :             11 : repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
                                 97                 :                : {
                                 98                 :             11 : }
                                 99                 :                : 
                                100                 :                : /* COMMIT callback */
                                101                 :                : static void
                                102                 :             11 : repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                103                 :                :                   XLogRecPtr commit_lsn)
                                104                 :                : {
                                105                 :             11 : }
                                106                 :                : 
                                107                 :                : /*
                                108                 :                :  * Callback for individual changed tuples
                                109                 :                :  */
                                110                 :                : static void
                                111                 :             32 : repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                112                 :                :                       Relation relation, ReorderBufferChange *change)
                                113                 :                : {
                                114                 :             32 :     RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
                                115                 :                :         (RepackDecodingState *) ctx->output_writer_private;
                                116                 :                : 
                                117                 :                :     /* Changes of other relation should not have been decoded. */
                                118         [ -  + ]:             32 :     Assert(RelationGetRelid(relation) == private->relid);
                                119                 :                : 
                                120                 :                :     /* Decode entry depending on its type */
                                121   [ +  +  +  - ]:             32 :     switch (change->action)
                                122                 :                :     {
                                123                 :              7 :         case REORDER_BUFFER_CHANGE_INSERT:
                                124                 :                :             {
                                125                 :                :                 HeapTuple   newtuple;
                                126                 :                : 
                                127                 :              7 :                 newtuple = change->data.tp.newtuple;
                                128                 :                : 
                                129                 :                :                 /*
                                130                 :                :                  * Identity checks in the main function should have made this
                                131                 :                :                  * impossible.
                                132                 :                :                  */
                                133         [ -  + ]:              7 :                 if (newtuple == NULL)
   85 alvherre@kurilemu.de      134         [ #  # ]:UNC           0 :                     elog(ERROR, "incomplete insert info");
                                135                 :                : 
   85 alvherre@kurilemu.de      136                 :GNC           7 :                 repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
                                137                 :                :             }
                                138                 :              7 :             break;
                                139                 :             22 :         case REORDER_BUFFER_CHANGE_UPDATE:
                                140                 :                :             {
                                141                 :                :                 HeapTuple   oldtuple,
                                142                 :                :                             newtuple;
                                143                 :                : 
                                144                 :             22 :                 oldtuple = change->data.tp.oldtuple;
                                145                 :             22 :                 newtuple = change->data.tp.newtuple;
                                146                 :                : 
                                147         [ -  + ]:             22 :                 if (newtuple == NULL)
   85 alvherre@kurilemu.de      148         [ #  # ]:UNC           0 :                     elog(ERROR, "incomplete update info");
                                149                 :                : 
   85 alvherre@kurilemu.de      150         [ +  + ]:GNC          22 :                 if (oldtuple != NULL)
                                151                 :              8 :                     repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
                                152                 :                : 
                                153                 :             22 :                 repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
                                154                 :                :             }
                                155                 :             22 :             break;
                                156                 :              3 :         case REORDER_BUFFER_CHANGE_DELETE:
                                157                 :                :             {
                                158                 :                :                 HeapTuple   oldtuple;
                                159                 :                : 
                                160                 :              3 :                 oldtuple = change->data.tp.oldtuple;
                                161                 :                : 
                                162         [ -  + ]:              3 :                 if (oldtuple == NULL)
   85 alvherre@kurilemu.de      163         [ #  # ]:UNC           0 :                     elog(ERROR, "incomplete delete info");
                                164                 :                : 
   85 alvherre@kurilemu.de      165                 :GNC           3 :                 repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
                                166                 :                :             }
                                167                 :              3 :             break;
   85 alvherre@kurilemu.de      168                 :UNC           0 :         default:
                                169                 :                : 
                                170                 :                :             /*
                                171                 :                :              * Should not come here. This includes TRUNCATE of the table being
                                172                 :                :              * processed. heap_decode() cannot check the file locator easily,
                                173                 :                :              * but we assume that TRUNCATE uses AccessExclusiveLock on the
                                174                 :                :              * table so it should not occur during REPACK (CONCURRENTLY).
                                175                 :                :              */
                                176                 :              0 :             Assert(false);
                                177                 :                :             break;
                                178                 :                :     }
   85 alvherre@kurilemu.de      179                 :GNC          32 : }
                                180                 :                : 
                                181                 :                : /*
                                182                 :                :  * Write the given tuple, with the given change kind, to the repack spill
                                183                 :                :  * file.  Later, the repack decoding worker can read these and replay
                                184                 :                :  * the operations on the new copy of the table.
                                185                 :                :  *
                                186                 :                :  * For each change affecting the table being repacked, we store enough
                                187                 :                :  * information about each tuple in it, so that it can be replayed in the
                                188                 :                :  * new copy of the table.
                                189                 :                :  */
                                190                 :                : static void
                                191                 :             40 : repack_store_change(LogicalDecodingContext *ctx, Relation relation,
                                192                 :                :                     ConcurrentChangeKind kind, HeapTuple tuple)
                                193                 :                : {
                                194                 :                :     RepackDecodingState *dstate;
                                195                 :                :     MemoryContext oldcxt;
                                196                 :                :     BufFile    *file;
                                197                 :             40 :     List       *attrs_ext = NIL;
                                198                 :                :     int         natt_ext;
                                199                 :                : 
                                200                 :             40 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
                                201                 :             40 :     file = dstate->file;
                                202                 :                : 
                                203                 :                :     /* Store the change kind. */
                                204                 :             40 :     BufFileWrite(file, &kind, 1);
                                205                 :                : 
                                206                 :                :     /* Use a frequently-reset context to avoid dealing with leaks manually */
                                207                 :             40 :     oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
                                208                 :                : 
                                209                 :                :     /*
                                210                 :                :      * If the tuple contains "external indirect" attributes, we need to write
                                211                 :                :      * the contents to the file because we have no control over that memory.
                                212                 :                :      */
                                213         [ +  + ]:             40 :     if (HeapTupleHasExternal(tuple))
                                214                 :                :     {
                                215                 :             13 :         TupleDesc   desc = RelationGetDescr(relation);
                                216                 :                :         TupleTableSlot *slot;
                                217                 :                : 
                                218                 :                :         /* Initialize the slot, if not done already */
                                219         [ +  + ]:             13 :         if (dstate->slot == NULL)
                                220                 :                :         {
                                221                 :                :             ResourceOwner saveResourceOwner;
                                222                 :                : 
                                223                 :              1 :             MemoryContextSwitchTo(dstate->worker_cxt);
                                224                 :              1 :             saveResourceOwner = CurrentResourceOwner;
                                225                 :              1 :             CurrentResourceOwner = dstate->worker_resowner;
                                226                 :              1 :             dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
                                227                 :              1 :             MemoryContextSwitchTo(dstate->change_cxt);
                                228                 :              1 :             CurrentResourceOwner = saveResourceOwner;
                                229                 :                :         }
                                230                 :                : 
                                231                 :             13 :         slot = dstate->slot;
                                232                 :             13 :         ExecStoreHeapTuple(tuple, slot, false);
                                233                 :                : 
                                234                 :                :         /*
                                235                 :                :          * Loop over all attributes, and find out which ones we need to spill
                                236                 :                :          * separately, to wit: each one that's a non-null varlena and stored
                                237                 :                :          * out of line.
                                238                 :                :          */
                                239         [ +  + ]:             78 :         for (int i = 0; i < desc->natts; i++)
                                240                 :                :         {
                                241                 :             65 :             CompactAttribute *attr = TupleDescCompactAttr(desc, i);
                                242                 :                :             varlena    *varlen;
                                243                 :                : 
                                244   [ +  +  +  +  :             91 :             if (attr->attisdropped || attr->attlen != -1 ||
                                              -  + ]
                                245                 :             26 :                 slot_attisnull(slot, i + 1))
                                246                 :             39 :                 continue;
                                247                 :                : 
                                248                 :             26 :             slot_getsomeattrs(slot, i + 1);
                                249                 :                : 
                                250                 :                :             /*
                                251                 :                :              * This is a non-null varlena datum, but we only care if it's
                                252                 :                :              * out-of-line
                                253                 :                :              */
                                254                 :             26 :             varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
                                255         [ +  + ]:             26 :             if (!VARATT_IS_EXTERNAL(varlen))
                                256                 :              9 :                 continue;
                                257                 :                : 
                                258                 :                :             /*
                                259                 :                :              * We spill any indirect-external attributes separately from the
                                260                 :                :              * heap tuple.  Anything else is written as is.
                                261                 :                :              */
                                262         [ +  + ]:             17 :             if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
                                263                 :             15 :                 attrs_ext = lappend(attrs_ext, varlen);
                                264                 :                :             else
                                265                 :                :             {
                                266                 :                :                 /*
                                267                 :                :                  * Logical decoding should not produce "external expanded"
                                268                 :                :                  * attributes (those actually should never appear on disk), so
                                269                 :                :                  * only TOASTed attribute can be seen here.
                                270                 :                :                  *
                                271                 :                :                  * We get here if the table has external values but only
                                272                 :                :                  * in-line values are being updated now.
                                273                 :                :                  */
                                274         [ -  + ]:              2 :                 Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
                                275                 :                :             }
                                276                 :                :         }
                                277                 :                : 
                                278                 :             13 :         ExecClearTuple(slot);
                                279                 :                :     }
                                280                 :                : 
                                281                 :                :     /*
                                282                 :                :      * First, write the original heap tuple, prefixed by its length.  Note
                                283                 :                :      * that the external-toast tag for each toasted attribute will be present
                                284                 :                :      * in what we write, so that we know where to restore each one later.
                                285                 :                :      */
                                286                 :             40 :     BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
                                287                 :             40 :     BufFileWrite(file, tuple->t_data, tuple->t_len);
                                288                 :                : 
                                289                 :                :     /* Then, write the number of external attributes we found. */
                                290                 :             40 :     natt_ext = list_length(attrs_ext);
                                291                 :             40 :     BufFileWrite(file, &natt_ext, sizeof(natt_ext));
                                292                 :                : 
                                293                 :                :     /* Finally, the attributes themselves, if any */
                                294   [ +  +  +  +  :             95 :     foreach_ptr(varlena, attr_val, attrs_ext)
                                              +  + ]
                                295                 :                :     {
                                296                 :             15 :         attr_val = detoast_external_attr(attr_val);
                                297                 :             15 :         BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
                                298                 :                :         /* These attributes could be large, so free them right away */
                                299                 :             15 :         pfree(attr_val);
                                300                 :                :     }
                                301                 :                : 
                                302                 :                :     /* Cleanup. */
                                303                 :             40 :     MemoryContextSwitchTo(oldcxt);
                                304                 :             40 :     MemoryContextReset(dstate->change_cxt);
                                305                 :             40 : }
        

Generated by: LCOV version 2.0-1