LCOV - code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 351 408 86.0 %
Date: 2025-04-01 14:15:22 Functions: 27 28 96.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * test_decoding.c
       4             :  *        example logical decoding output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        contrib/test_decoding/test_decoding.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "catalog/pg_type.h"
      16             : 
      17             : #include "replication/logical.h"
      18             : #include "replication/origin.h"
      19             : 
      20             : #include "utils/builtins.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/memutils.h"
      23             : #include "utils/rel.h"
      24             : 
      25         222 : PG_MODULE_MAGIC_EXT(
      26             :                     .name = "test_decoding",
      27             :                     .version = PG_VERSION
      28             : );
      29             : 
      30             : typedef struct
      31             : {
      32             :     MemoryContext context;
      33             :     bool        include_xids;
      34             :     bool        include_timestamp;
      35             :     bool        skip_empty_xacts;
      36             :     bool        only_local;
      37             : } TestDecodingData;
      38             : 
      39             : /*
      40             :  * Maintain the per-transaction level variables to track whether the
      41             :  * transaction and or streams have written any changes. In streaming mode the
      42             :  * transaction can be decoded in streams so along with maintaining whether the
      43             :  * transaction has written any changes, we also need to track whether the
      44             :  * current stream has written any changes. This is required so that if user
      45             :  * has requested to skip the empty transactions we can skip the empty streams
      46             :  * even though the transaction has written some changes.
      47             :  */
      48             : typedef struct
      49             : {
      50             :     bool        xact_wrote_changes;
      51             :     bool        stream_wrote_changes;
      52             : } TestDecodingTxnData;
      53             : 
      54             : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      55             :                               bool is_init);
      56             : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
      57             : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
      58             :                                 ReorderBufferTXN *txn);
      59             : static void pg_output_begin(LogicalDecodingContext *ctx,
      60             :                             TestDecodingData *data,
      61             :                             ReorderBufferTXN *txn,
      62             :                             bool last_write);
      63             : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
      64             :                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      65             : static void pg_decode_change(LogicalDecodingContext *ctx,
      66             :                              ReorderBufferTXN *txn, Relation relation,
      67             :                              ReorderBufferChange *change);
      68             : static void pg_decode_truncate(LogicalDecodingContext *ctx,
      69             :                                ReorderBufferTXN *txn,
      70             :                                int nrelations, Relation relations[],
      71             :                                ReorderBufferChange *change);
      72             : static bool pg_decode_filter(LogicalDecodingContext *ctx,
      73             :                              RepOriginId origin_id);
      74             : static void pg_decode_message(LogicalDecodingContext *ctx,
      75             :                               ReorderBufferTXN *txn, XLogRecPtr lsn,
      76             :                               bool transactional, const char *prefix,
      77             :                               Size sz, const char *message);
      78             : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
      79             :                                      TransactionId xid,
      80             :                                      const char *gid);
      81             : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
      82             :                                         ReorderBufferTXN *txn);
      83             : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
      84             :                                   ReorderBufferTXN *txn,
      85             :                                   XLogRecPtr prepare_lsn);
      86             : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
      87             :                                           ReorderBufferTXN *txn,
      88             :                                           XLogRecPtr commit_lsn);
      89             : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
      90             :                                             ReorderBufferTXN *txn,
      91             :                                             XLogRecPtr prepare_end_lsn,
      92             :                                             TimestampTz prepare_time);
      93             : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
      94             :                                    ReorderBufferTXN *txn);
      95             : static void pg_output_stream_start(LogicalDecodingContext *ctx,
      96             :                                    TestDecodingData *data,
      97             :                                    ReorderBufferTXN *txn,
      98             :                                    bool last_write);
      99             : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
     100             :                                   ReorderBufferTXN *txn);
     101             : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
     102             :                                    ReorderBufferTXN *txn,
     103             :                                    XLogRecPtr abort_lsn);
     104             : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     105             :                                      ReorderBufferTXN *txn,
     106             :                                      XLogRecPtr prepare_lsn);
     107             : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
     108             :                                     ReorderBufferTXN *txn,
     109             :                                     XLogRecPtr commit_lsn);
     110             : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
     111             :                                     ReorderBufferTXN *txn,
     112             :                                     Relation relation,
     113             :                                     ReorderBufferChange *change);
     114             : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
     115             :                                      ReorderBufferTXN *txn, XLogRecPtr lsn,
     116             :                                      bool transactional, const char *prefix,
     117             :                                      Size sz, const char *message);
     118             : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
     119             :                                       ReorderBufferTXN *txn,
     120             :                                       int nrelations, Relation relations[],
     121             :                                       ReorderBufferChange *change);
     122             : 
     123             : void
     124         222 : _PG_init(void)
     125             : {
     126             :     /* other plugins can perform things here */
     127         222 : }
     128             : 
     129             : /* specify output plugin callbacks */
     130             : void
     131         676 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     132             : {
     133         676 :     cb->startup_cb = pg_decode_startup;
     134         676 :     cb->begin_cb = pg_decode_begin_txn;
     135         676 :     cb->change_cb = pg_decode_change;
     136         676 :     cb->truncate_cb = pg_decode_truncate;
     137         676 :     cb->commit_cb = pg_decode_commit_txn;
     138         676 :     cb->filter_by_origin_cb = pg_decode_filter;
     139         676 :     cb->shutdown_cb = pg_decode_shutdown;
     140         676 :     cb->message_cb = pg_decode_message;
     141         676 :     cb->filter_prepare_cb = pg_decode_filter_prepare;
     142         676 :     cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
     143         676 :     cb->prepare_cb = pg_decode_prepare_txn;
     144         676 :     cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
     145         676 :     cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
     146         676 :     cb->stream_start_cb = pg_decode_stream_start;
     147         676 :     cb->stream_stop_cb = pg_decode_stream_stop;
     148         676 :     cb->stream_abort_cb = pg_decode_stream_abort;
     149         676 :     cb->stream_prepare_cb = pg_decode_stream_prepare;
     150         676 :     cb->stream_commit_cb = pg_decode_stream_commit;
     151         676 :     cb->stream_change_cb = pg_decode_stream_change;
     152         676 :     cb->stream_message_cb = pg_decode_stream_message;
     153         676 :     cb->stream_truncate_cb = pg_decode_stream_truncate;
     154         676 : }
     155             : 
     156             : 
     157             : /* initialize this plugin */
     158             : static void
     159         676 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     160             :                   bool is_init)
     161             : {
     162             :     ListCell   *option;
     163             :     TestDecodingData *data;
     164         676 :     bool        enable_streaming = false;
     165             : 
     166         676 :     data = palloc0(sizeof(TestDecodingData));
     167         676 :     data->context = AllocSetContextCreate(ctx->context,
     168             :                                           "text conversion context",
     169             :                                           ALLOCSET_DEFAULT_SIZES);
     170         676 :     data->include_xids = true;
     171         676 :     data->include_timestamp = false;
     172         676 :     data->skip_empty_xacts = false;
     173         676 :     data->only_local = false;
     174             : 
     175         676 :     ctx->output_plugin_private = data;
     176             : 
     177         676 :     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
     178         676 :     opt->receive_rewrites = false;
     179             : 
     180        1408 :     foreach(option, ctx->output_plugin_options)
     181             :     {
     182         738 :         DefElem    *elem = lfirst(option);
     183             : 
     184             :         Assert(elem->arg == NULL || IsA(elem->arg, String));
     185             : 
     186         738 :         if (strcmp(elem->defname, "include-xids") == 0)
     187             :         {
     188             :             /* if option does not provide a value, it means its value is true */
     189         348 :             if (elem->arg == NULL)
     190           0 :                 data->include_xids = true;
     191         348 :             else if (!parse_bool(strVal(elem->arg), &data->include_xids))
     192           4 :                 ereport(ERROR,
     193             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     194             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     195             :                                 strVal(elem->arg), elem->defname)));
     196             :         }
     197         390 :         else if (strcmp(elem->defname, "include-timestamp") == 0)
     198             :         {
     199           2 :             if (elem->arg == NULL)
     200           0 :                 data->include_timestamp = true;
     201           2 :             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
     202           0 :                 ereport(ERROR,
     203             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     204             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     205             :                                 strVal(elem->arg), elem->defname)));
     206             :         }
     207         388 :         else if (strcmp(elem->defname, "force-binary") == 0)
     208             :         {
     209             :             bool        force_binary;
     210             : 
     211          12 :             if (elem->arg == NULL)
     212           0 :                 continue;
     213          12 :             else if (!parse_bool(strVal(elem->arg), &force_binary))
     214           0 :                 ereport(ERROR,
     215             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     216             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     217             :                                 strVal(elem->arg), elem->defname)));
     218             : 
     219          12 :             if (force_binary)
     220           4 :                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     221             :         }
     222         376 :         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
     223             :         {
     224             : 
     225         342 :             if (elem->arg == NULL)
     226           0 :                 data->skip_empty_xacts = true;
     227         342 :             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
     228           0 :                 ereport(ERROR,
     229             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     230             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     231             :                                 strVal(elem->arg), elem->defname)));
     232             :         }
     233          34 :         else if (strcmp(elem->defname, "only-local") == 0)
     234             :         {
     235             : 
     236           6 :             if (elem->arg == NULL)
     237           0 :                 data->only_local = true;
     238           6 :             else if (!parse_bool(strVal(elem->arg), &data->only_local))
     239           0 :                 ereport(ERROR,
     240             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     241             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     242             :                                 strVal(elem->arg), elem->defname)));
     243             :         }
     244          28 :         else if (strcmp(elem->defname, "include-rewrites") == 0)
     245             :         {
     246             : 
     247           2 :             if (elem->arg == NULL)
     248           0 :                 continue;
     249           2 :             else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
     250           0 :                 ereport(ERROR,
     251             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     252             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     253             :                                 strVal(elem->arg), elem->defname)));
     254             :         }
     255          26 :         else if (strcmp(elem->defname, "stream-changes") == 0)
     256             :         {
     257          24 :             if (elem->arg == NULL)
     258           0 :                 continue;
     259          24 :             else if (!parse_bool(strVal(elem->arg), &enable_streaming))
     260           0 :                 ereport(ERROR,
     261             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     262             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     263             :                                 strVal(elem->arg), elem->defname)));
     264             :         }
     265             :         else
     266             :         {
     267           2 :             ereport(ERROR,
     268             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     269             :                      errmsg("option \"%s\" = \"%s\" is unknown",
     270             :                             elem->defname,
     271             :                             elem->arg ? strVal(elem->arg) : "(null)")));
     272             :         }
     273             :     }
     274             : 
     275         670 :     ctx->streaming &= enable_streaming;
     276         670 : }
     277             : 
     278             : /* cleanup this plugin's resources */
     279             : static void
     280         650 : pg_decode_shutdown(LogicalDecodingContext *ctx)
     281             : {
     282         650 :     TestDecodingData *data = ctx->output_plugin_private;
     283             : 
     284             :     /* cleanup our own resources via memory context reset */
     285         650 :     MemoryContextDelete(data->context);
     286         650 : }
     287             : 
     288             : /* BEGIN callback */
     289             : static void
     290         876 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     291             : {
     292         876 :     TestDecodingData *data = ctx->output_plugin_private;
     293             :     TestDecodingTxnData *txndata =
     294         876 :         MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     295             : 
     296         876 :     txndata->xact_wrote_changes = false;
     297         876 :     txn->output_plugin_private = txndata;
     298             : 
     299             :     /*
     300             :      * If asked to skip empty transactions, we'll emit BEGIN at the point
     301             :      * where the first operation is received for this transaction.
     302             :      */
     303         876 :     if (data->skip_empty_xacts)
     304         788 :         return;
     305             : 
     306          88 :     pg_output_begin(ctx, data, txn, true);
     307             : }
     308             : 
     309             : static void
     310         542 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     311             : {
     312         542 :     OutputPluginPrepareWrite(ctx, last_write);
     313         542 :     if (data->include_xids)
     314          80 :         appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
     315             :     else
     316         462 :         appendStringInfoString(ctx->out, "BEGIN");
     317         542 :     OutputPluginWrite(ctx, last_write);
     318         542 : }
     319             : 
     320             : /* COMMIT callback */
     321             : static void
     322         876 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     323             :                      XLogRecPtr commit_lsn)
     324             : {
     325         876 :     TestDecodingData *data = ctx->output_plugin_private;
     326         876 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     327         876 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     328             : 
     329         876 :     pfree(txndata);
     330         876 :     txn->output_plugin_private = NULL;
     331             : 
     332         876 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     333         350 :         return;
     334             : 
     335         526 :     OutputPluginPrepareWrite(ctx, true);
     336         526 :     if (data->include_xids)
     337          78 :         appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
     338             :     else
     339         448 :         appendStringInfoString(ctx->out, "COMMIT");
     340             : 
     341         526 :     if (data->include_timestamp)
     342           2 :         appendStringInfo(ctx->out, " (at %s)",
     343             :                          timestamptz_to_str(txn->xact_time.commit_time));
     344             : 
     345         526 :     OutputPluginWrite(ctx, true);
     346             : }
     347             : 
     348             : /* BEGIN PREPARE callback */
     349             : static void
     350          18 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     351             : {
     352          18 :     TestDecodingData *data = ctx->output_plugin_private;
     353             :     TestDecodingTxnData *txndata =
     354          18 :         MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     355             : 
     356          18 :     txndata->xact_wrote_changes = false;
     357          18 :     txn->output_plugin_private = txndata;
     358             : 
     359             :     /*
     360             :      * If asked to skip empty transactions, we'll emit BEGIN at the point
     361             :      * where the first operation is received for this transaction.
     362             :      */
     363          18 :     if (data->skip_empty_xacts)
     364          16 :         return;
     365             : 
     366           2 :     pg_output_begin(ctx, data, txn, true);
     367             : }
     368             : 
     369             : /* PREPARE callback */
     370             : static void
     371          18 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     372             :                       XLogRecPtr prepare_lsn)
     373             : {
     374          18 :     TestDecodingData *data = ctx->output_plugin_private;
     375          18 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     376             : 
     377             :     /*
     378             :      * If asked to skip empty transactions, we'll emit PREPARE at the point
     379             :      * where the first operation is received for this transaction.
     380             :      */
     381          18 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     382           2 :         return;
     383             : 
     384          16 :     OutputPluginPrepareWrite(ctx, true);
     385             : 
     386          16 :     appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
     387          16 :                      quote_literal_cstr(txn->gid));
     388             : 
     389          16 :     if (data->include_xids)
     390           2 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     391             : 
     392          16 :     if (data->include_timestamp)
     393           0 :         appendStringInfo(ctx->out, " (at %s)",
     394             :                          timestamptz_to_str(txn->xact_time.prepare_time));
     395             : 
     396          16 :     OutputPluginWrite(ctx, true);
     397             : }
     398             : 
     399             : /* COMMIT PREPARED callback */
     400             : static void
     401          16 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     402             :                               XLogRecPtr commit_lsn)
     403             : {
     404          16 :     TestDecodingData *data = ctx->output_plugin_private;
     405             : 
     406          16 :     OutputPluginPrepareWrite(ctx, true);
     407             : 
     408          16 :     appendStringInfo(ctx->out, "COMMIT PREPARED %s",
     409          16 :                      quote_literal_cstr(txn->gid));
     410             : 
     411          16 :     if (data->include_xids)
     412           2 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     413             : 
     414          16 :     if (data->include_timestamp)
     415           0 :         appendStringInfo(ctx->out, " (at %s)",
     416             :                          timestamptz_to_str(txn->xact_time.commit_time));
     417             : 
     418          16 :     OutputPluginWrite(ctx, true);
     419          16 : }
     420             : 
     421             : /* ROLLBACK PREPARED callback */
     422             : static void
     423           4 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
     424             :                                 ReorderBufferTXN *txn,
     425             :                                 XLogRecPtr prepare_end_lsn,
     426             :                                 TimestampTz prepare_time)
     427             : {
     428           4 :     TestDecodingData *data = ctx->output_plugin_private;
     429             : 
     430           4 :     OutputPluginPrepareWrite(ctx, true);
     431             : 
     432           4 :     appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
     433           4 :                      quote_literal_cstr(txn->gid));
     434             : 
     435           4 :     if (data->include_xids)
     436           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     437             : 
     438           4 :     if (data->include_timestamp)
     439           0 :         appendStringInfo(ctx->out, " (at %s)",
     440             :                          timestamptz_to_str(txn->xact_time.commit_time));
     441             : 
     442           4 :     OutputPluginWrite(ctx, true);
     443           4 : }
     444             : 
     445             : /*
     446             :  * Filter out two-phase transactions.
     447             :  *
     448             :  * Each plugin can implement its own filtering logic. Here we demonstrate a
     449             :  * simple logic by checking the GID. If the GID contains the "_nodecode"
     450             :  * substring, then we filter it out.
     451             :  */
     452             : static bool
     453         296 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
     454             :                          const char *gid)
     455             : {
     456         296 :     if (strstr(gid, "_nodecode") != NULL)
     457          24 :         return true;
     458             : 
     459         272 :     return false;
     460             : }
     461             : 
     462             : static bool
     463     2400996 : pg_decode_filter(LogicalDecodingContext *ctx,
     464             :                  RepOriginId origin_id)
     465             : {
     466     2400996 :     TestDecodingData *data = ctx->output_plugin_private;
     467             : 
     468     2400996 :     if (data->only_local && origin_id != InvalidRepOriginId)
     469          18 :         return true;
     470     2400978 :     return false;
     471             : }
     472             : 
     473             : /*
     474             :  * Print literal `outputstr' already represented as string of type `typid'
     475             :  * into stringbuf `s'.
     476             :  *
     477             :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
     478             :  * if standard_conforming_strings were enabled.
     479             :  */
     480             : static void
     481      351916 : print_literal(StringInfo s, Oid typid, char *outputstr)
     482             : {
     483             :     const char *valptr;
     484             : 
     485      351916 :     switch (typid)
     486             :     {
     487      120526 :         case INT2OID:
     488             :         case INT4OID:
     489             :         case INT8OID:
     490             :         case OIDOID:
     491             :         case FLOAT4OID:
     492             :         case FLOAT8OID:
     493             :         case NUMERICOID:
     494             :             /* NB: We don't care about Inf, NaN et al. */
     495      120526 :             appendStringInfoString(s, outputstr);
     496      120526 :             break;
     497             : 
     498           0 :         case BITOID:
     499             :         case VARBITOID:
     500           0 :             appendStringInfo(s, "B'%s'", outputstr);
     501           0 :             break;
     502             : 
     503           0 :         case BOOLOID:
     504           0 :             if (strcmp(outputstr, "t") == 0)
     505           0 :                 appendStringInfoString(s, "true");
     506             :             else
     507           0 :                 appendStringInfoString(s, "false");
     508           0 :             break;
     509             : 
     510      231390 :         default:
     511      231390 :             appendStringInfoChar(s, '\'');
     512    10854352 :             for (valptr = outputstr; *valptr; valptr++)
     513             :             {
     514    10622962 :                 char        ch = *valptr;
     515             : 
     516    10622962 :                 if (SQL_STR_DOUBLE(ch, false))
     517         128 :                     appendStringInfoChar(s, ch);
     518    10622962 :                 appendStringInfoChar(s, ch);
     519             :             }
     520      231390 :             appendStringInfoChar(s, '\'');
     521      231390 :             break;
     522             :     }
     523      351916 : }
     524             : 
     525             : /* print the tuple 'tuple' into the StringInfo s */
     526             : static void
     527      291142 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
     528             : {
     529             :     int         natt;
     530             : 
     531             :     /* print all columns individually */
     532      694572 :     for (natt = 0; natt < tupdesc->natts; natt++)
     533             :     {
     534             :         Form_pg_attribute attr; /* the attribute itself */
     535             :         Oid         typid;      /* type of current attribute */
     536             :         Oid         typoutput;  /* output function */
     537             :         bool        typisvarlena;
     538             :         Datum       origval;    /* possibly toasted Datum */
     539             :         bool        isnull;     /* column is null? */
     540             : 
     541      403430 :         attr = TupleDescAttr(tupdesc, natt);
     542             : 
     543             :         /*
     544             :          * don't print dropped columns, we can't be sure everything is
     545             :          * available for them
     546             :          */
     547      403430 :         if (attr->attisdropped)
     548       10260 :             continue;
     549             : 
     550             :         /*
     551             :          * Don't print system columns, oid will already have been printed if
     552             :          * present.
     553             :          */
     554      403286 :         if (attr->attnum < 0)
     555           0 :             continue;
     556             : 
     557      403286 :         typid = attr->atttypid;
     558             : 
     559             :         /* get Datum from tuple */
     560      403286 :         origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
     561             : 
     562      403286 :         if (isnull && skip_nulls)
     563       10116 :             continue;
     564             : 
     565             :         /* print attribute name */
     566      393170 :         appendStringInfoChar(s, ' ');
     567      393170 :         appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
     568             : 
     569             :         /* print attribute type */
     570      393170 :         appendStringInfoChar(s, '[');
     571      393170 :         appendStringInfoString(s, format_type_be(typid));
     572      393170 :         appendStringInfoChar(s, ']');
     573             : 
     574             :         /* query output function */
     575      393170 :         getTypeOutputInfo(typid,
     576             :                           &typoutput, &typisvarlena);
     577             : 
     578             :         /* print separator */
     579      393170 :         appendStringInfoChar(s, ':');
     580             : 
     581             :         /* print data */
     582      393170 :         if (isnull)
     583       41230 :             appendStringInfoString(s, "null");
     584      351940 :         else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
     585          24 :             appendStringInfoString(s, "unchanged-toast-datum");
     586      351916 :         else if (!typisvarlena)
     587      120534 :             print_literal(s, typid,
     588             :                           OidOutputFunctionCall(typoutput, origval));
     589             :         else
     590             :         {
     591             :             Datum       val;    /* definitely detoasted Datum */
     592             : 
     593      231382 :             val = PointerGetDatum(PG_DETOAST_DATUM(origval));
     594      231382 :             print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
     595             :         }
     596             :     }
     597      291142 : }
     598             : 
     599             : /*
     600             :  * callback for individual changed tuples
     601             :  */
     602             : static void
     603      301120 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     604             :                  Relation relation, ReorderBufferChange *change)
     605             : {
     606             :     TestDecodingData *data;
     607             :     TestDecodingTxnData *txndata;
     608             :     Form_pg_class class_form;
     609             :     TupleDesc   tupdesc;
     610             :     MemoryContext old;
     611             : 
     612      301120 :     data = ctx->output_plugin_private;
     613      301120 :     txndata = txn->output_plugin_private;
     614             : 
     615             :     /* output BEGIN if we haven't yet */
     616      301120 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     617             :     {
     618         432 :         pg_output_begin(ctx, data, txn, false);
     619             :     }
     620      301120 :     txndata->xact_wrote_changes = true;
     621             : 
     622      301120 :     class_form = RelationGetForm(relation);
     623      301120 :     tupdesc = RelationGetDescr(relation);
     624             : 
     625             :     /* Avoid leaking memory by using and resetting our own context */
     626      301120 :     old = MemoryContextSwitchTo(data->context);
     627             : 
     628      301120 :     OutputPluginPrepareWrite(ctx, true);
     629             : 
     630      301120 :     appendStringInfoString(ctx->out, "table ");
     631      301120 :     appendStringInfoString(ctx->out,
     632      301120 :                            quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
     633      301120 :                                                       class_form->relrewrite ?
     634           2 :                                                       get_rel_name(class_form->relrewrite) :
     635             :                                                       NameStr(class_form->relname)));
     636      301120 :     appendStringInfoChar(ctx->out, ':');
     637             : 
     638      301120 :     switch (change->action)
     639             :     {
     640      265990 :         case REORDER_BUFFER_CHANGE_INSERT:
     641      265990 :             appendStringInfoString(ctx->out, " INSERT:");
     642      265990 :             if (change->data.tp.newtuple == NULL)
     643           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     644             :             else
     645      265990 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     646             :                                     change->data.tp.newtuple,
     647             :                                     false);
     648      265990 :             break;
     649       15086 :         case REORDER_BUFFER_CHANGE_UPDATE:
     650       15086 :             appendStringInfoString(ctx->out, " UPDATE:");
     651       15086 :             if (change->data.tp.oldtuple != NULL)
     652             :             {
     653          36 :                 appendStringInfoString(ctx->out, " old-key:");
     654          36 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     655             :                                     change->data.tp.oldtuple,
     656             :                                     true);
     657          36 :                 appendStringInfoString(ctx->out, " new-tuple:");
     658             :             }
     659             : 
     660       15086 :             if (change->data.tp.newtuple == NULL)
     661           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     662             :             else
     663       15086 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     664             :                                     change->data.tp.newtuple,
     665             :                                     false);
     666       15086 :             break;
     667       20044 :         case REORDER_BUFFER_CHANGE_DELETE:
     668       20044 :             appendStringInfoString(ctx->out, " DELETE:");
     669             : 
     670             :             /* if there was no PK, we only know that a delete happened */
     671       20044 :             if (change->data.tp.oldtuple == NULL)
     672       10014 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     673             :             /* In DELETE, only the replica identity is present; display that */
     674             :             else
     675       10030 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     676             :                                     change->data.tp.oldtuple,
     677             :                                     true);
     678       20044 :             break;
     679      301120 :         default:
     680             :             Assert(false);
     681             :     }
     682             : 
     683      301120 :     MemoryContextSwitchTo(old);
     684      301120 :     MemoryContextReset(data->context);
     685             : 
     686      301120 :     OutputPluginWrite(ctx, true);
     687      301120 : }
     688             : 
     689             : static void
     690          16 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     691             :                    int nrelations, Relation relations[], ReorderBufferChange *change)
     692             : {
     693             :     TestDecodingData *data;
     694             :     TestDecodingTxnData *txndata;
     695             :     MemoryContext old;
     696             :     int         i;
     697             : 
     698          16 :     data = ctx->output_plugin_private;
     699          16 :     txndata = txn->output_plugin_private;
     700             : 
     701             :     /* output BEGIN if we haven't yet */
     702          16 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     703             :     {
     704          14 :         pg_output_begin(ctx, data, txn, false);
     705             :     }
     706          16 :     txndata->xact_wrote_changes = true;
     707             : 
     708             :     /* Avoid leaking memory by using and resetting our own context */
     709          16 :     old = MemoryContextSwitchTo(data->context);
     710             : 
     711          16 :     OutputPluginPrepareWrite(ctx, true);
     712             : 
     713          16 :     appendStringInfoString(ctx->out, "table ");
     714             : 
     715          34 :     for (i = 0; i < nrelations; i++)
     716             :     {
     717          18 :         if (i > 0)
     718           2 :             appendStringInfoString(ctx->out, ", ");
     719             : 
     720          18 :         appendStringInfoString(ctx->out,
     721          18 :                                quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
     722          18 :                                                           NameStr(relations[i]->rd_rel->relname)));
     723             :     }
     724             : 
     725          16 :     appendStringInfoString(ctx->out, ": TRUNCATE:");
     726             : 
     727          16 :     if (change->data.truncate.restart_seqs
     728          14 :         || change->data.truncate.cascade)
     729             :     {
     730           2 :         if (change->data.truncate.restart_seqs)
     731           2 :             appendStringInfoString(ctx->out, " restart_seqs");
     732           2 :         if (change->data.truncate.cascade)
     733           2 :             appendStringInfoString(ctx->out, " cascade");
     734             :     }
     735             :     else
     736          14 :         appendStringInfoString(ctx->out, " (no-flags)");
     737             : 
     738          16 :     MemoryContextSwitchTo(old);
     739          16 :     MemoryContextReset(data->context);
     740             : 
     741          16 :     OutputPluginWrite(ctx, true);
     742          16 : }
     743             : 
     744             : static void
     745          18 : pg_decode_message(LogicalDecodingContext *ctx,
     746             :                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     747             :                   const char *prefix, Size sz, const char *message)
     748             : {
     749          18 :     TestDecodingData *data = ctx->output_plugin_private;
     750             :     TestDecodingTxnData *txndata;
     751             : 
     752          18 :     txndata = transactional ? txn->output_plugin_private : NULL;
     753             : 
     754             :     /* output BEGIN if we haven't yet for transactional messages */
     755          18 :     if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
     756           6 :         pg_output_begin(ctx, data, txn, false);
     757             : 
     758          18 :     if (transactional)
     759          10 :         txndata->xact_wrote_changes = true;
     760             : 
     761          18 :     OutputPluginPrepareWrite(ctx, true);
     762          18 :     appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
     763             :                      transactional, prefix, sz);
     764          18 :     appendBinaryStringInfo(ctx->out, message, sz);
     765          18 :     OutputPluginWrite(ctx, true);
     766          18 : }
     767             : 
     768             : static void
     769          80 : pg_decode_stream_start(LogicalDecodingContext *ctx,
     770             :                        ReorderBufferTXN *txn)
     771             : {
     772          80 :     TestDecodingData *data = ctx->output_plugin_private;
     773          80 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     774             : 
     775             :     /*
     776             :      * Allocate the txn plugin data for the first stream in the transaction.
     777             :      */
     778          80 :     if (txndata == NULL)
     779             :     {
     780             :         txndata =
     781          18 :             MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     782          18 :         txndata->xact_wrote_changes = false;
     783          18 :         txn->output_plugin_private = txndata;
     784             :     }
     785             : 
     786          80 :     txndata->stream_wrote_changes = false;
     787          80 :     if (data->skip_empty_xacts)
     788          80 :         return;
     789           0 :     pg_output_stream_start(ctx, data, txn, true);
     790             : }
     791             : 
     792             : static void
     793          22 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     794             : {
     795          22 :     OutputPluginPrepareWrite(ctx, last_write);
     796          22 :     if (data->include_xids)
     797           0 :         appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
     798             :     else
     799          22 :         appendStringInfoString(ctx->out, "opening a streamed block for transaction");
     800          22 :     OutputPluginWrite(ctx, last_write);
     801          22 : }
     802             : 
     803             : static void
     804          80 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
     805             :                       ReorderBufferTXN *txn)
     806             : {
     807          80 :     TestDecodingData *data = ctx->output_plugin_private;
     808          80 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     809             : 
     810          80 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     811          58 :         return;
     812             : 
     813          22 :     OutputPluginPrepareWrite(ctx, true);
     814          22 :     if (data->include_xids)
     815           0 :         appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
     816             :     else
     817          22 :         appendStringInfoString(ctx->out, "closing a streamed block for transaction");
     818          22 :     OutputPluginWrite(ctx, true);
     819             : }
     820             : 
     821             : static void
     822           8 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
     823             :                        ReorderBufferTXN *txn,
     824             :                        XLogRecPtr abort_lsn)
     825             : {
     826           8 :     TestDecodingData *data = ctx->output_plugin_private;
     827             : 
     828             :     /*
     829             :      * stream abort can be sent for an individual subtransaction but we
     830             :      * maintain the output_plugin_private only under the toptxn so if this is
     831             :      * not the toptxn then fetch the toptxn.
     832             :      */
     833           8 :     ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
     834           8 :     TestDecodingTxnData *txndata = toptxn->output_plugin_private;
     835           8 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     836             : 
     837           8 :     if (rbtxn_is_toptxn(txn))
     838             :     {
     839             :         Assert(txn->output_plugin_private != NULL);
     840           0 :         pfree(txndata);
     841           0 :         txn->output_plugin_private = NULL;
     842             :     }
     843             : 
     844           8 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     845           0 :         return;
     846             : 
     847           8 :     OutputPluginPrepareWrite(ctx, true);
     848           8 :     if (data->include_xids)
     849           0 :         appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
     850             :     else
     851           8 :         appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
     852           8 :     OutputPluginWrite(ctx, true);
     853             : }
     854             : 
     855             : static void
     856           2 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     857             :                          ReorderBufferTXN *txn,
     858             :                          XLogRecPtr prepare_lsn)
     859             : {
     860           2 :     TestDecodingData *data = ctx->output_plugin_private;
     861           2 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     862             : 
     863           2 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     864           0 :         return;
     865             : 
     866           2 :     OutputPluginPrepareWrite(ctx, true);
     867             : 
     868           2 :     if (data->include_xids)
     869           0 :         appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
     870           0 :                          quote_literal_cstr(txn->gid), txn->xid);
     871             :     else
     872           2 :         appendStringInfo(ctx->out, "preparing streamed transaction %s",
     873           2 :                          quote_literal_cstr(txn->gid));
     874             : 
     875           2 :     if (data->include_timestamp)
     876           0 :         appendStringInfo(ctx->out, " (at %s)",
     877             :                          timestamptz_to_str(txn->xact_time.prepare_time));
     878             : 
     879           2 :     OutputPluginWrite(ctx, true);
     880             : }
     881             : 
     882             : static void
     883          10 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
     884             :                         ReorderBufferTXN *txn,
     885             :                         XLogRecPtr commit_lsn)
     886             : {
     887          10 :     TestDecodingData *data = ctx->output_plugin_private;
     888          10 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     889          10 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     890             : 
     891          10 :     pfree(txndata);
     892          10 :     txn->output_plugin_private = NULL;
     893             : 
     894          10 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     895           0 :         return;
     896             : 
     897          10 :     OutputPluginPrepareWrite(ctx, true);
     898             : 
     899          10 :     if (data->include_xids)
     900           0 :         appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
     901             :     else
     902          10 :         appendStringInfoString(ctx->out, "committing streamed transaction");
     903             : 
     904          10 :     if (data->include_timestamp)
     905           0 :         appendStringInfo(ctx->out, " (at %s)",
     906             :                          timestamptz_to_str(txn->xact_time.commit_time));
     907             : 
     908          10 :     OutputPluginWrite(ctx, true);
     909             : }
     910             : 
     911             : /*
     912             :  * In streaming mode, we don't display the changes as the transaction can abort
     913             :  * at a later point in time.  We don't want users to see the changes until the
     914             :  * transaction is committed.
     915             :  */
     916             : static void
     917         130 : pg_decode_stream_change(LogicalDecodingContext *ctx,
     918             :                         ReorderBufferTXN *txn,
     919             :                         Relation relation,
     920             :                         ReorderBufferChange *change)
     921             : {
     922         130 :     TestDecodingData *data = ctx->output_plugin_private;
     923         130 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     924             : 
     925             :     /* output stream start if we haven't yet */
     926         130 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     927             :     {
     928          16 :         pg_output_stream_start(ctx, data, txn, false);
     929             :     }
     930         130 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     931             : 
     932         130 :     OutputPluginPrepareWrite(ctx, true);
     933         130 :     if (data->include_xids)
     934           0 :         appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
     935             :     else
     936         130 :         appendStringInfoString(ctx->out, "streaming change for transaction");
     937         130 :     OutputPluginWrite(ctx, true);
     938         130 : }
     939             : 
     940             : /*
     941             :  * In streaming mode, we don't display the contents for transactional messages
     942             :  * as the transaction can abort at a later point in time.  We don't want users to
     943             :  * see the message contents until the transaction is committed.
     944             :  */
     945             : static void
     946           6 : pg_decode_stream_message(LogicalDecodingContext *ctx,
     947             :                          ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     948             :                          const char *prefix, Size sz, const char *message)
     949             : {
     950             :     /* Output stream start if we haven't yet for transactional messages. */
     951           6 :     if (transactional)
     952             :     {
     953           6 :         TestDecodingData *data = ctx->output_plugin_private;
     954           6 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     955             : 
     956           6 :         if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     957             :         {
     958           6 :             pg_output_stream_start(ctx, data, txn, false);
     959             :         }
     960           6 :         txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     961             :     }
     962             : 
     963           6 :     OutputPluginPrepareWrite(ctx, true);
     964             : 
     965           6 :     if (transactional)
     966             :     {
     967           6 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
     968             :                          transactional, prefix, sz);
     969             :     }
     970             :     else
     971             :     {
     972           0 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
     973             :                          transactional, prefix, sz);
     974           0 :         appendBinaryStringInfo(ctx->out, message, sz);
     975             :     }
     976             : 
     977           6 :     OutputPluginWrite(ctx, true);
     978           6 : }
     979             : 
     980             : /*
     981             :  * In streaming mode, we don't display the detailed information of Truncate.
     982             :  * See pg_decode_stream_change.
     983             :  */
     984             : static void
     985           0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     986             :                           int nrelations, Relation relations[],
     987             :                           ReorderBufferChange *change)
     988             : {
     989           0 :     TestDecodingData *data = ctx->output_plugin_private;
     990           0 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     991             : 
     992           0 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     993             :     {
     994           0 :         pg_output_stream_start(ctx, data, txn, false);
     995             :     }
     996           0 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     997             : 
     998           0 :     OutputPluginPrepareWrite(ctx, true);
     999           0 :     if (data->include_xids)
    1000           0 :         appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
    1001             :     else
    1002           0 :         appendStringInfoString(ctx->out, "streaming truncate for transaction");
    1003           0 :     OutputPluginWrite(ctx, true);
    1004           0 : }

Generated by: LCOV version 1.14