LCOV - code coverage report
Current view: top level - src/backend/replication/pgrepack - pgrepack.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 92.0 % 88 81
Test Date: 2026-04-07 14:16:30 Functions: 100.0 % 8 8
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pgrepack.c
       4              :  *      Logical Replication output plugin for REPACK command
       5              :  *
       6              :  * Copyright (c) 2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *        src/backend/replication/pgrepack/pgrepack.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #include "access/detoast.h"
      16              : #include "commands/repack_internal.h"
      17              : #include "replication/snapbuild.h"
      18              : #include "utils/memutils.h"
      19              : 
      20            2 : PG_MODULE_MAGIC;
      21              : 
      22              : static void repack_startup(LogicalDecodingContext *ctx,
      23              :                            OutputPluginOptions *opt, bool is_init);
      24              : static void repack_shutdown(LogicalDecodingContext *ctx);
      25              : static void repack_begin_txn(LogicalDecodingContext *ctx,
      26              :                              ReorderBufferTXN *txn);
      27              : static void repack_commit_txn(LogicalDecodingContext *ctx,
      28              :                               ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      29              : static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
      30              :                                   Relation rel, ReorderBufferChange *change);
      31              : static void repack_store_change(LogicalDecodingContext *ctx, Relation relation,
      32              :                                 ConcurrentChangeKind kind, HeapTuple tuple);
      33              : 
      34              : void
      35            2 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
      36              : {
      37            2 :     cb->startup_cb = repack_startup;
      38            2 :     cb->begin_cb = repack_begin_txn;
      39            2 :     cb->change_cb = repack_process_change;
      40            2 :     cb->commit_cb = repack_commit_txn;
      41            2 :     cb->shutdown_cb = repack_shutdown;
      42            2 : }
      43              : 
      44              : 
      45              : /* initialize this plugin */
      46              : static void
      47            2 : repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      48              :                bool is_init)
      49              : {
      50            2 :     ctx->output_plugin_private = NULL;
      51              : 
      52              :     /* Probably unnecessary, as we don't use the SQL interface ... */
      53            2 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
      54              : 
      55              :     /*
      56              :      * REPACK doesn't need access to shared catalogs, so we can speed up the
      57              :      * historic snapshot creation by setting this flag.  We'll only have to
      58              :      * wait for transactions in our database.
      59              :      */
      60            2 :     opt->need_shared_catalogs = false;
      61              : 
      62            2 :     if (ctx->output_plugin_options != NIL)
      63              :     {
      64            0 :         ereport(ERROR,
      65              :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      66              :                 errmsg("this plugin does not expect any options"));
      67              :     }
      68            2 : }
      69              : 
      70              : static void
      71            2 : repack_shutdown(LogicalDecodingContext *ctx)
      72              : {
      73            2 : }
      74              : 
      75              : /*
      76              :  * As we don't release the slot during processing of particular table, there's
      77              :  * no room for SQL interface, even for debugging purposes. Therefore we need
      78              :  * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin
      79              :  * callbacks. (Although we might want to write custom callbacks, this API
      80              :  * seems to be unnecessarily generic for our purposes.)
      81              :  */
      82              : 
      83              : /* BEGIN callback */
      84              : static void
      85            7 : repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
      86              : {
      87            7 : }
      88              : 
      89              : /* COMMIT callback */
      90              : static void
      91            7 : repack_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
      92              :                   XLogRecPtr commit_lsn)
      93              : {
      94            7 : }
      95              : 
      96              : /*
      97              :  * Callback for individual changed tuples
      98              :  */
      99              : static void
     100           20 : repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     101              :                       Relation relation, ReorderBufferChange *change)
     102              : {
     103           20 :     RepackDecodingState *private PG_USED_FOR_ASSERTS_ONLY =
     104              :         (RepackDecodingState *) ctx->output_writer_private;
     105              : 
     106              :     /* Changes of other relation should not have been decoded. */
     107              :     Assert(RelationGetRelid(relation) == private->relid);
     108              : 
     109              :     /* Decode entry depending on its type */
     110           20 :     switch (change->action)
     111              :     {
     112            7 :         case REORDER_BUFFER_CHANGE_INSERT:
     113              :             {
     114              :                 HeapTuple   newtuple;
     115              : 
     116            7 :                 newtuple = change->data.tp.newtuple;
     117              : 
     118              :                 /*
     119              :                  * Identity checks in the main function should have made this
     120              :                  * impossible.
     121              :                  */
     122            7 :                 if (newtuple == NULL)
     123            0 :                     elog(ERROR, "incomplete insert info");
     124              : 
     125            7 :                 repack_store_change(ctx, relation, CHANGE_INSERT, newtuple);
     126              :             }
     127            7 :             break;
     128           10 :         case REORDER_BUFFER_CHANGE_UPDATE:
     129              :             {
     130              :                 HeapTuple   oldtuple,
     131              :                             newtuple;
     132              : 
     133           10 :                 oldtuple = change->data.tp.oldtuple;
     134           10 :                 newtuple = change->data.tp.newtuple;
     135              : 
     136           10 :                 if (newtuple == NULL)
     137            0 :                     elog(ERROR, "incomplete update info");
     138              : 
     139           10 :                 if (oldtuple != NULL)
     140            7 :                     repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple);
     141              : 
     142           10 :                 repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple);
     143              :             }
     144           10 :             break;
     145            3 :         case REORDER_BUFFER_CHANGE_DELETE:
     146              :             {
     147              :                 HeapTuple   oldtuple;
     148              : 
     149            3 :                 oldtuple = change->data.tp.oldtuple;
     150              : 
     151            3 :                 if (oldtuple == NULL)
     152            0 :                     elog(ERROR, "incomplete delete info");
     153              : 
     154            3 :                 repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple);
     155              :             }
     156            3 :             break;
     157            0 :         default:
     158              : 
     159              :             /*
     160              :              * Should not come here. This includes TRUNCATE of the table being
     161              :              * processed. heap_decode() cannot check the file locator easily,
     162              :              * but we assume that TRUNCATE uses AccessExclusiveLock on the
     163              :              * table so it should not occur during REPACK (CONCURRENTLY).
     164              :              */
     165              :             Assert(false);
     166            0 :             break;
     167              :     }
     168           20 : }
     169              : 
     170              : /*
     171              :  * Write the given tuple, with the given change kind, to the repack spill
     172              :  * file.  Later, the repack decoding worker can read these and replay
     173              :  * the operations on the new copy of the table.
     174              :  *
     175              :  * For each change affecting the table being repacked, we store enough
     176              :  * information about each tuple in it, so that it can be replayed in the
     177              :  * new copy of the table.
     178              :  */
     179              : static void
     180           27 : repack_store_change(LogicalDecodingContext *ctx, Relation relation,
     181              :                     ConcurrentChangeKind kind, HeapTuple tuple)
     182              : {
     183              :     RepackDecodingState *dstate;
     184              :     MemoryContext oldcxt;
     185              :     BufFile    *file;
     186           27 :     List       *attrs_ext = NIL;
     187              :     int         natt_ext;
     188              : 
     189           27 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
     190           27 :     file = dstate->file;
     191              : 
     192              :     /* Store the change kind. */
     193           27 :     BufFileWrite(file, &kind, 1);
     194              : 
     195              :     /* Use a frequently-reset context to avoid dealing with leaks manually */
     196           27 :     oldcxt = MemoryContextSwitchTo(dstate->change_cxt);
     197              : 
     198              :     /*
     199              :      * If the tuple contains "external indirect" attributes, we need to write
     200              :      * the contents to the file because we have no control over that memory.
     201              :      */
     202           27 :     if (HeapTupleHasExternal(tuple))
     203              :     {
     204            3 :         TupleDesc   desc = RelationGetDescr(relation);
     205              :         TupleTableSlot *slot;
     206              : 
     207              :         /* Initialize the slot, if not done already */
     208            3 :         if (dstate->slot == NULL)
     209              :         {
     210              :             ResourceOwner saveResourceOwner;
     211              : 
     212            1 :             MemoryContextSwitchTo(dstate->worker_cxt);
     213            1 :             saveResourceOwner = CurrentResourceOwner;
     214            1 :             CurrentResourceOwner = dstate->worker_resowner;
     215            1 :             dstate->slot = MakeSingleTupleTableSlot(desc, &TTSOpsHeapTuple);
     216            1 :             MemoryContextSwitchTo(dstate->change_cxt);
     217            1 :             CurrentResourceOwner = saveResourceOwner;
     218              :         }
     219              : 
     220            3 :         slot = dstate->slot;
     221            3 :         ExecStoreHeapTuple(tuple, slot, false);
     222              : 
     223              :         /*
     224              :          * Loop over all attributes, and find out which ones we need to spill
     225              :          * separately, to wit: each one that's a non-null varlena and stored
     226              :          * out of line.
     227              :          */
     228            9 :         for (int i = 0; i < desc->natts; i++)
     229              :         {
     230            6 :             CompactAttribute *attr = TupleDescCompactAttr(desc, i);
     231              :             varlena    *varlen;
     232              : 
     233            9 :             if (attr->attisdropped || attr->attlen != -1 ||
     234            3 :                 slot_attisnull(slot, i + 1))
     235            3 :                 continue;
     236              : 
     237            3 :             slot_getsomeattrs(slot, i + 1);
     238              : 
     239              :             /*
     240              :              * This is a non-null varlena datum, but we only care if it's
     241              :              * out-of-line
     242              :              */
     243            3 :             varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
     244            3 :             if (!VARATT_IS_EXTERNAL(varlen))
     245            0 :                 continue;
     246              : 
     247              :             /*
     248              :              * We spill any indirect-external attributes separately from the
     249              :              * heap tuple.  Anything else is written as is.
     250              :              */
     251            3 :             if (VARATT_IS_EXTERNAL_INDIRECT(varlen))
     252            2 :                 attrs_ext = lappend(attrs_ext, varlen);
     253              :             else
     254              :             {
     255              :                 /*
     256              :                  * Logical decoding should not produce "external expanded"
     257              :                  * attributes (those actually should never appear on disk), so
     258              :                  * only TOASTed attribute can be seen here.
     259              :                  *
     260              :                  * We get here if the table has external values but only
     261              :                  * in-line values are being updated now.
     262              :                  */
     263              :                 Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
     264              :             }
     265              :         }
     266              : 
     267            3 :         ExecClearTuple(slot);
     268              :     }
     269              : 
     270              :     /*
     271              :      * First, write the original heap tuple, prefixed by its length.  Note
     272              :      * that the external-toast tag for each toasted attribute will be present
     273              :      * in what we write, so that we know where to restore each one later.
     274              :      */
     275           27 :     BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len));
     276           27 :     BufFileWrite(file, tuple->t_data, tuple->t_len);
     277              : 
     278              :     /* Then, write the number of external attributes we found. */
     279           27 :     natt_ext = list_length(attrs_ext);
     280           27 :     BufFileWrite(file, &natt_ext, sizeof(natt_ext));
     281              : 
     282              :     /* Finally, the attributes themselves, if any */
     283           56 :     foreach_ptr(varlena, attr_val, attrs_ext)
     284              :     {
     285            2 :         attr_val = detoast_external_attr(attr_val);
     286            2 :         BufFileWrite(file, attr_val, VARSIZE_ANY(attr_val));
     287              :         /* These attributes could be large, so free them right away */
     288            2 :         pfree(attr_val);
     289              :     }
     290              : 
     291              :     /* Cleanup. */
     292           27 :     MemoryContextSwitchTo(oldcxt);
     293           27 :     MemoryContextReset(dstate->change_cxt);
     294           27 : }
        

Generated by: LCOV version 2.0-1