LCOV - code coverage report
Current view: top level - src/backend/replication/pgrepack - pgrepack.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 93.4 % 91 85
Test Date: 2026-06-15 18:16:44 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pgrepack.c
       4              :  *      Logical Replication output plugin for REPACK command
       5              :  *
       6              :  * Copyright (c) 2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *        src/backend/replication/pgrepack/pgrepack.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #include "access/detoast.h"
      16              : #include "commands/repack.h"
      17              : #include "commands/repack_internal.h"
      18              : #include "replication/snapbuild.h"
      19              : #include "utils/memutils.h"
      20              : 
      21            8 : PG_MODULE_MAGIC;
      22              : 
      23              : static void repack_startup(LogicalDecodingContext *ctx,
      24              :                            OutputPluginOptions *opt, bool is_init);
      25              : static void repack_shutdown(LogicalDecodingContext *ctx);
      26              : static void repack_begin_txn(LogicalDecodingContext *ctx,
      27              :                              ReorderBufferTXN *txn);
      28              : static void repack_commit_txn(LogicalDecodingContext *ctx,
      29              :                               ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      30              : static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
      31              :                                   Relation relation, ReorderBufferChange *change);
      32              : static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
      33              :                                 ConcurrentChangeKind kind, HeapTuple tuple);
      34              : 
      35              : void
      36            8 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
      37              : {
      38            8 :     cb->startup_cb = repack_startup;
      39            8 :     cb->begin_cb = repack_begin_txn;
      40            8 :     cb->change_cb = repack_process_change;
      41            8 :     cb->commit_cb = repack_commit_txn;
      42            8 :     cb->shutdown_cb = repack_shutdown;
      43            8 : }
      44              : 
      45              : 
      46              : /* initialize this plugin */
      47              : static void
      48            8 : repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      49              :                bool is_init)
      50              : {
      51              :     RepackDecodingState *dstate;
      52              : 
      53            8 :     if (!AmRepackWorker())
      54            1 :         ereport(ERROR,
      55              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      56              :                 errmsg("unsupported use of logical decoding plugin \"%s\"",
      57              :                        "pgrepack"),
      58              :                 errdetail("This plugin can only be used by %s.",
      59              :                           "REPACK (CONCURRENTLY)"));
      60              : 
      61              :     /* Initial setup of our private state */
      62              :     Assert(CurrentMemoryContext == ctx->context);
      63            7 :     dstate = palloc0_object(RepackDecodingState);
      64            7 :     dstate->change_cxt = AllocSetContextCreate(ctx->context,
      65              :                                                "REPACK - change",
      66              :                                                ALLOCSET_DEFAULT_SIZES);
      67              :     /* repack_setup_logical_decoding fills in the rest */
      68            7 :     ctx->output_writer_private = dstate;
      69              : 
      70              :     /* Probably unnecessary, as we don't use the SQL interface ... */
      71            7 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
      72              : 
      73            7 :     if (ctx->output_plugin_options != NIL)
      74              :     {
      75            0 :         ereport(ERROR,
      76              :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      77              :                 errmsg("this plugin does not expect any options"));
      78              :     }
      79            7 : }
      80              : 
      81              : static void
      82            7 : repack_shutdown(LogicalDecodingContext *ctx)
      83              : {
      84            7 : }
      85              : 
      86              : /*
      87              :  * As we don't release the slot during processing of particular table, there's
      88              :  * no room for SQL interface, even for debugging purposes. Therefore we need
      89              :  * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
      90              :  * callbacks. (Although we might want to write custom callbacks, this API
      91              :  * seems to be unnecessarily generic for our purposes.)
      92              :  */
      93              : 
      94              : /* BEGIN callback */
      95              : static void
      96           11 : repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
      97              : {
      98           11 : }
      99              : 
     100              : /* COMMIT callback */
     101              : static void
     102           11 : repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     103              :                   XLogRecPtr commit_lsn)
     104              : {
     105           11 : }
     106              : 
     107              : /*
     108              :  * Callback for individual changed tuples
     109              :  */
     110              : static void
     111           32 : repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     112              :                       Relation relation, ReorderBufferChange *change)
     113              : {
     114           32 :     RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
     115              :         (RepackDecodingState *) ctx->output_writer_private;
     116              : 
     117              :     /* Changes of other relation should not have been decoded. */
     118              :     Assert(RelationGetRelid(relation) == private->relid);
     119              : 
     120              :     /* Decode entry depending on its type */
     121           32 :     switch (change->action)
     122              :     {
     123            7 :         case REORDER_BUFFER_CHANGE_INSERT:
     124              :             {
     125              :                 HeapTuple   newtuple;
     126              : 
     127            7 :                 newtuple = change->data.tp.newtuple;
     128              : 
     129              :                 /*
     130              :                  * Identity checks in the main function should have made this
     131              :                  * impossible.
     132              :                  */
     133            7 :                 if (newtuple == NULL)
     134            0 :                     elog(ERROR, "incomplete insert info");
     135              : 
     136            7 :                 repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
     137              :             }
     138            7 :             break;
     139           22 :         case REORDER_BUFFER_CHANGE_UPDATE:
     140              :             {
     141              :                 HeapTuple   oldtuple,
     142              :                             newtuple;
     143              : 
     144           22 :                 oldtuple = change->data.tp.oldtuple;
     145           22 :                 newtuple = change->data.tp.newtuple;
     146              : 
     147           22 :                 if (newtuple == NULL)
     148            0 :                     elog(ERROR, "incomplete update info");
     149              : 
     150           22 :                 if (oldtuple != NULL)
     151            8 :                     repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
     152              : 
     153           22 :                 repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
     154              :             }
     155           22 :             break;
     156            3 :         case REORDER_BUFFER_CHANGE_DELETE:
     157              :             {
     158              :                 HeapTuple   oldtuple;
     159              : 
     160            3 :                 oldtuple = change->data.tp.oldtuple;
     161              : 
     162            3 :                 if (oldtuple == NULL)
     163            0 :                     elog(ERROR, "incomplete delete info");
     164              : 
     165            3 :                 repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
     166              :             }
     167            3 :             break;
     168            0 :         default:
     169              : 
     170              :             /*
     171              :              * Should not come here. This includes TRUNCATE of the table being
     172              :              * processed. heap_decode() cannot check the file locator easily,
     173              :              * but we assume that TRUNCATE uses AccessExclusiveLock on the
     174              :              * table so it should not occur during REPACK (CONCURRENTLY).
     175              :              */
     176              :             Assert(false);
     177            0 :             break;
     178              :     }
     179           32 : }
     180              : 
     181              : /*
     182              :  * Write the given tuple, with the given change kind, to the repack spill
     183              :  * file.  Later, the repack decoding worker can read these and replay
     184              :  * the operations on the new copy of the table.
     185              :  *
     186              :  * For each change affecting the table being repacked, we store enough
     187              :  * information about each tuple in it, so that it can be replayed in the
     188              :  * new copy of the table.
     189              :  */
     190              : static void
     191           40 : repack_store_change(LogicalDecodingContext *ctx, Relation relation,
     192              :                     ConcurrentChangeKind kind, HeapTuple tuple)
     193              : {
     194              :     RepackDecodingState *dstate;
     195              :     MemoryContext oldcxt;
     196              :     BufFile    *file;
     197           40 :     List       *attrs_ext = NIL;
     198              :     int         natt_ext;
     199              : 
     200           40 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     201           40 :     file = dstate->file;
     202              : 
     203              :     /* Store the change kind. */
     204           40 :     BufFileWrite(file, &kind, 1);
     205              : 
     206              :     /* Use a frequently-reset context to avoid dealing with leaks manually */
     207           40 :     oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
     208              : 
     209              :     /*
     210              :      * If the tuple contains "external indirect" attributes, we need to write
     211              :      * the contents to the file because we have no control over that memory.
     212              :      */
     213           40 :     if (HeapTupleHasExternal(tuple))
     214              :     {
     215           13 :         TupleDesc   desc = RelationGetDescr(relation);
     216              :         TupleTableSlot *slot;
     217              : 
     218              :         /* Initialize the slot, if not done already */
     219           13 :         if (dstate->slot == NULL)
     220              :         {
     221              :             ResourceOwner saveResourceOwner;
     222              : 
     223            1 :             MemoryContextSwitchTo(dstate->worker_cxt);
     224            1 :             saveResourceOwner = CurrentResourceOwner;
     225            1 :             CurrentResourceOwner = dstate->worker_resowner;
     226            1 :             dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
     227            1 :             MemoryContextSwitchTo(dstate->change_cxt);
     228            1 :             CurrentResourceOwner = saveResourceOwner;
     229              :         }
     230              : 
     231           13 :         slot = dstate->slot;
     232           13 :         ExecStoreHeapTuple(tuple, slot, false);
     233              : 
     234              :         /*
     235              :          * Loop over all attributes, and find out which ones we need to spill
     236              :          * separately, to wit: each one that's a non-null varlena and stored
     237              :          * out of line.
     238              :          */
     239           78 :         for (int i = 0; i < desc->natts; i++)
     240              :         {
     241           65 :             CompactAttribute *attr = TupleDescCompactAttr(desc, i);
     242              :             varlena    *varlen;
     243              : 
     244           91 :             if (attr->attisdropped || attr->attlen != -1 ||
     245           26 :                 slot_attisnull(slot, i + 1))
     246           39 :                 continue;
     247              : 
     248           26 :             slot_getsomeattrs(slot, i + 1);
     249              : 
     250              :             /*
     251              :              * This is a non-null varlena datum, but we only care if it's
     252              :              * out-of-line
     253              :              */
     254           26 :             varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
     255           26 :             if (!VARATT_IS_EXTERNAL(varlen))
     256            9 :                 continue;
     257              : 
     258              :             /*
     259              :              * We spill any indirect-external attributes separately from the
     260              :              * heap tuple.  Anything else is written as is.
     261              :              */
     262           17 :             if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
     263           15 :                 attrs_ext = lappend(attrs_ext, varlen);
     264              :             else
     265              :             {
     266              :                 /*
     267              :                  * Logical decoding should not produce "external expanded"
     268              :                  * attributes (those actually should never appear on disk), so
     269              :                  * only TOASTed attribute can be seen here.
     270              :                  *
     271              :                  * We get here if the table has external values but only
     272              :                  * in-line values are being updated now.
     273              :                  */
     274              :                 Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
     275              :             }
     276              :         }
     277              : 
     278           13 :         ExecClearTuple(slot);
     279              :     }
     280              : 
     281              :     /*
     282              :      * First, write the original heap tuple, prefixed by its length.  Note
     283              :      * that the external-toast tag for each toasted attribute will be present
     284              :      * in what we write, so that we know where to restore each one later.
     285              :      */
     286           40 :     BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
     287           40 :     BufFileWrite(file, tuple->t_data, tuple->t_len);
     288              : 
     289              :     /* Then, write the number of external attributes we found. */
     290           40 :     natt_ext = list_length(attrs_ext);
     291           40 :     BufFileWrite(file, &natt_ext, sizeof(natt_ext));
     292              : 
     293              :     /* Finally, the attributes themselves, if any */
     294           95 :     foreach_ptr(varlena, attr_val, attrs_ext)
     295              :     {
     296           15 :         attr_val = detoast_external_attr(attr_val);
     297           15 :         BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
     298              :         /* These attributes could be large, so free them right away */
     299           15 :         pfree(attr_val);
     300              :     }
     301              : 
     302              :     /* Cleanup. */
     303           40 :     MemoryContextSwitchTo(oldcxt);
     304           40 :     MemoryContextReset(dstate->change_cxt);
     305           40 : }
        

Generated by: LCOV version 2.0-1