LCOV - code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 347 408 85.0 %
Date: 2025-01-18 04:15:08 Functions: 27 28 96.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * test_decoding.c
       4             :  *        example logical decoding output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        contrib/test_decoding/test_decoding.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "catalog/pg_type.h"
      16             : 
      17             : #include "replication/logical.h"
      18             : #include "replication/origin.h"
      19             : 
      20             : #include "utils/builtins.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/memutils.h"
      23             : #include "utils/rel.h"
      24             : 
      25         216 : PG_MODULE_MAGIC;
      26             : 
      27             : typedef struct
      28             : {
      29             :     MemoryContext context;
      30             :     bool        include_xids;
      31             :     bool        include_timestamp;
      32             :     bool        skip_empty_xacts;
      33             :     bool        only_local;
      34             : } TestDecodingData;
      35             : 
      36             : /*
      37             :  * Maintain the per-transaction level variables to track whether the
      38             :  * transaction and or streams have written any changes. In streaming mode the
      39             :  * transaction can be decoded in streams so along with maintaining whether the
      40             :  * transaction has written any changes, we also need to track whether the
      41             :  * current stream has written any changes. This is required so that if user
      42             :  * has requested to skip the empty transactions we can skip the empty streams
      43             :  * even though the transaction has written some changes.
      44             :  */
      45             : typedef struct
      46             : {
      47             :     bool        xact_wrote_changes;
      48             :     bool        stream_wrote_changes;
      49             : } TestDecodingTxnData;
      50             : 
      51             : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      52             :                               bool is_init);
      53             : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
      54             : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
      55             :                                 ReorderBufferTXN *txn);
      56             : static void pg_output_begin(LogicalDecodingContext *ctx,
      57             :                             TestDecodingData *data,
      58             :                             ReorderBufferTXN *txn,
      59             :                             bool last_write);
      60             : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
      61             :                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      62             : static void pg_decode_change(LogicalDecodingContext *ctx,
      63             :                              ReorderBufferTXN *txn, Relation relation,
      64             :                              ReorderBufferChange *change);
      65             : static void pg_decode_truncate(LogicalDecodingContext *ctx,
      66             :                                ReorderBufferTXN *txn,
      67             :                                int nrelations, Relation relations[],
      68             :                                ReorderBufferChange *change);
      69             : static bool pg_decode_filter(LogicalDecodingContext *ctx,
      70             :                              RepOriginId origin_id);
      71             : static void pg_decode_message(LogicalDecodingContext *ctx,
      72             :                               ReorderBufferTXN *txn, XLogRecPtr lsn,
      73             :                               bool transactional, const char *prefix,
      74             :                               Size sz, const char *message);
      75             : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
      76             :                                      TransactionId xid,
      77             :                                      const char *gid);
      78             : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
      79             :                                         ReorderBufferTXN *txn);
      80             : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
      81             :                                   ReorderBufferTXN *txn,
      82             :                                   XLogRecPtr prepare_lsn);
      83             : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
      84             :                                           ReorderBufferTXN *txn,
      85             :                                           XLogRecPtr commit_lsn);
      86             : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
      87             :                                             ReorderBufferTXN *txn,
      88             :                                             XLogRecPtr prepare_end_lsn,
      89             :                                             TimestampTz prepare_time);
      90             : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
      91             :                                    ReorderBufferTXN *txn);
      92             : static void pg_output_stream_start(LogicalDecodingContext *ctx,
      93             :                                    TestDecodingData *data,
      94             :                                    ReorderBufferTXN *txn,
      95             :                                    bool last_write);
      96             : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
      97             :                                   ReorderBufferTXN *txn);
      98             : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
      99             :                                    ReorderBufferTXN *txn,
     100             :                                    XLogRecPtr abort_lsn);
     101             : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     102             :                                      ReorderBufferTXN *txn,
     103             :                                      XLogRecPtr prepare_lsn);
     104             : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
     105             :                                     ReorderBufferTXN *txn,
     106             :                                     XLogRecPtr commit_lsn);
     107             : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
     108             :                                     ReorderBufferTXN *txn,
     109             :                                     Relation relation,
     110             :                                     ReorderBufferChange *change);
     111             : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
     112             :                                      ReorderBufferTXN *txn, XLogRecPtr lsn,
     113             :                                      bool transactional, const char *prefix,
     114             :                                      Size sz, const char *message);
     115             : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
     116             :                                       ReorderBufferTXN *txn,
     117             :                                       int nrelations, Relation relations[],
     118             :                                       ReorderBufferChange *change);
     119             : 
     120             : void
     121         216 : _PG_init(void)
     122             : {
     123             :     /* other plugins can perform things here */
     124         216 : }
     125             : 
     126             : /* specify output plugin callbacks */
     127             : void
     128         670 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     129             : {
     130         670 :     cb->startup_cb = pg_decode_startup;
     131         670 :     cb->begin_cb = pg_decode_begin_txn;
     132         670 :     cb->change_cb = pg_decode_change;
     133         670 :     cb->truncate_cb = pg_decode_truncate;
     134         670 :     cb->commit_cb = pg_decode_commit_txn;
     135         670 :     cb->filter_by_origin_cb = pg_decode_filter;
     136         670 :     cb->shutdown_cb = pg_decode_shutdown;
     137         670 :     cb->message_cb = pg_decode_message;
     138         670 :     cb->filter_prepare_cb = pg_decode_filter_prepare;
     139         670 :     cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
     140         670 :     cb->prepare_cb = pg_decode_prepare_txn;
     141         670 :     cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
     142         670 :     cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
     143         670 :     cb->stream_start_cb = pg_decode_stream_start;
     144         670 :     cb->stream_stop_cb = pg_decode_stream_stop;
     145         670 :     cb->stream_abort_cb = pg_decode_stream_abort;
     146         670 :     cb->stream_prepare_cb = pg_decode_stream_prepare;
     147         670 :     cb->stream_commit_cb = pg_decode_stream_commit;
     148         670 :     cb->stream_change_cb = pg_decode_stream_change;
     149         670 :     cb->stream_message_cb = pg_decode_stream_message;
     150         670 :     cb->stream_truncate_cb = pg_decode_stream_truncate;
     151         670 : }
     152             : 
     153             : 
     154             : /* initialize this plugin */
     155             : static void
     156         670 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     157             :                   bool is_init)
     158             : {
     159             :     ListCell   *option;
     160             :     TestDecodingData *data;
     161         670 :     bool        enable_streaming = false;
     162             : 
     163         670 :     data = palloc0(sizeof(TestDecodingData));
     164         670 :     data->context = AllocSetContextCreate(ctx->context,
     165             :                                           "text conversion context",
     166             :                                           ALLOCSET_DEFAULT_SIZES);
     167         670 :     data->include_xids = true;
     168         670 :     data->include_timestamp = false;
     169         670 :     data->skip_empty_xacts = false;
     170         670 :     data->only_local = false;
     171             : 
     172         670 :     ctx->output_plugin_private = data;
     173             : 
     174         670 :     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
     175         670 :     opt->receive_rewrites = false;
     176             : 
     177        1406 :     foreach(option, ctx->output_plugin_options)
     178             :     {
     179         742 :         DefElem    *elem = lfirst(option);
     180             : 
     181             :         Assert(elem->arg == NULL || IsA(elem->arg, String));
     182             : 
     183         742 :         if (strcmp(elem->defname, "include-xids") == 0)
     184             :         {
     185             :             /* if option does not provide a value, it means its value is true */
     186         350 :             if (elem->arg == NULL)
     187           0 :                 data->include_xids = true;
     188         350 :             else if (!parse_bool(strVal(elem->arg), &data->include_xids))
     189           4 :                 ereport(ERROR,
     190             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     191             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     192             :                                 strVal(elem->arg), elem->defname)));
     193             :         }
     194         392 :         else if (strcmp(elem->defname, "include-timestamp") == 0)
     195             :         {
     196           2 :             if (elem->arg == NULL)
     197           0 :                 data->include_timestamp = true;
     198           2 :             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
     199           0 :                 ereport(ERROR,
     200             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     201             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     202             :                                 strVal(elem->arg), elem->defname)));
     203             :         }
     204         390 :         else if (strcmp(elem->defname, "force-binary") == 0)
     205             :         {
     206             :             bool        force_binary;
     207             : 
     208          12 :             if (elem->arg == NULL)
     209           0 :                 continue;
     210          12 :             else if (!parse_bool(strVal(elem->arg), &force_binary))
     211           0 :                 ereport(ERROR,
     212             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     213             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     214             :                                 strVal(elem->arg), elem->defname)));
     215             : 
     216          12 :             if (force_binary)
     217           4 :                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     218             :         }
     219         378 :         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
     220             :         {
     221             : 
     222         344 :             if (elem->arg == NULL)
     223           0 :                 data->skip_empty_xacts = true;
     224         344 :             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
     225           0 :                 ereport(ERROR,
     226             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     227             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     228             :                                 strVal(elem->arg), elem->defname)));
     229             :         }
     230          34 :         else if (strcmp(elem->defname, "only-local") == 0)
     231             :         {
     232             : 
     233           6 :             if (elem->arg == NULL)
     234           0 :                 data->only_local = true;
     235           6 :             else if (!parse_bool(strVal(elem->arg), &data->only_local))
     236           0 :                 ereport(ERROR,
     237             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     238             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     239             :                                 strVal(elem->arg), elem->defname)));
     240             :         }
     241          28 :         else if (strcmp(elem->defname, "include-rewrites") == 0)
     242             :         {
     243             : 
     244           2 :             if (elem->arg == NULL)
     245           0 :                 continue;
     246           2 :             else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
     247           0 :                 ereport(ERROR,
     248             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     249             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     250             :                                 strVal(elem->arg), elem->defname)));
     251             :         }
     252          26 :         else if (strcmp(elem->defname, "stream-changes") == 0)
     253             :         {
     254          24 :             if (elem->arg == NULL)
     255           0 :                 continue;
     256          24 :             else if (!parse_bool(strVal(elem->arg), &enable_streaming))
     257           0 :                 ereport(ERROR,
     258             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     259             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     260             :                                 strVal(elem->arg), elem->defname)));
     261             :         }
     262             :         else
     263             :         {
     264           2 :             ereport(ERROR,
     265             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     266             :                      errmsg("option \"%s\" = \"%s\" is unknown",
     267             :                             elem->defname,
     268             :                             elem->arg ? strVal(elem->arg) : "(null)")));
     269             :         }
     270             :     }
     271             : 
     272         664 :     ctx->streaming &= enable_streaming;
     273         664 : }
     274             : 
     275             : /* cleanup this plugin's resources */
     276             : static void
     277         644 : pg_decode_shutdown(LogicalDecodingContext *ctx)
     278             : {
     279         644 :     TestDecodingData *data = ctx->output_plugin_private;
     280             : 
     281             :     /* cleanup our own resources via memory context reset */
     282         644 :     MemoryContextDelete(data->context);
     283         644 : }
     284             : 
     285             : /* BEGIN callback */
     286             : static void
     287         878 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     288             : {
     289         878 :     TestDecodingData *data = ctx->output_plugin_private;
     290             :     TestDecodingTxnData *txndata =
     291         878 :         MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     292             : 
     293         878 :     txndata->xact_wrote_changes = false;
     294         878 :     txn->output_plugin_private = txndata;
     295             : 
     296             :     /*
     297             :      * If asked to skip empty transactions, we'll emit BEGIN at the point
     298             :      * where the first operation is received for this transaction.
     299             :      */
     300         878 :     if (data->skip_empty_xacts)
     301         790 :         return;
     302             : 
     303          88 :     pg_output_begin(ctx, data, txn, true);
     304             : }
     305             : 
     306             : static void
     307         544 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     308             : {
     309         544 :     OutputPluginPrepareWrite(ctx, last_write);
     310         544 :     if (data->include_xids)
     311          78 :         appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
     312             :     else
     313         466 :         appendStringInfoString(ctx->out, "BEGIN");
     314         544 :     OutputPluginWrite(ctx, last_write);
     315         544 : }
     316             : 
     317             : /* COMMIT callback */
     318             : static void
     319         878 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     320             :                      XLogRecPtr commit_lsn)
     321             : {
     322         878 :     TestDecodingData *data = ctx->output_plugin_private;
     323         878 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     324         878 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     325             : 
     326         878 :     pfree(txndata);
     327         878 :     txn->output_plugin_private = NULL;
     328             : 
     329         878 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     330         348 :         return;
     331             : 
     332         530 :     OutputPluginPrepareWrite(ctx, true);
     333         530 :     if (data->include_xids)
     334          78 :         appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
     335             :     else
     336         452 :         appendStringInfoString(ctx->out, "COMMIT");
     337             : 
     338         530 :     if (data->include_timestamp)
     339           2 :         appendStringInfo(ctx->out, " (at %s)",
     340             :                          timestamptz_to_str(txn->xact_time.commit_time));
     341             : 
     342         530 :     OutputPluginWrite(ctx, true);
     343             : }
     344             : 
     345             : /* BEGIN PREPARE callback */
     346             : static void
     347          14 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     348             : {
     349          14 :     TestDecodingData *data = ctx->output_plugin_private;
     350             :     TestDecodingTxnData *txndata =
     351          14 :         MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     352             : 
     353          14 :     txndata->xact_wrote_changes = false;
     354          14 :     txn->output_plugin_private = txndata;
     355             : 
     356             :     /*
     357             :      * If asked to skip empty transactions, we'll emit BEGIN at the point
     358             :      * where the first operation is received for this transaction.
     359             :      */
     360          14 :     if (data->skip_empty_xacts)
     361          14 :         return;
     362             : 
     363           0 :     pg_output_begin(ctx, data, txn, true);
     364             : }
     365             : 
     366             : /* PREPARE callback */
     367             : static void
     368          14 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     369             :                       XLogRecPtr prepare_lsn)
     370             : {
     371          14 :     TestDecodingData *data = ctx->output_plugin_private;
     372          14 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     373             : 
     374             :     /*
     375             :      * If asked to skip empty transactions, we'll emit PREPARE at the point
     376             :      * where the first operation is received for this transaction.
     377             :      */
     378          14 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     379           0 :         return;
     380             : 
     381          14 :     OutputPluginPrepareWrite(ctx, true);
     382             : 
     383          14 :     appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
     384          14 :                      quote_literal_cstr(txn->gid));
     385             : 
     386          14 :     if (data->include_xids)
     387           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     388             : 
     389          14 :     if (data->include_timestamp)
     390           0 :         appendStringInfo(ctx->out, " (at %s)",
     391             :                          timestamptz_to_str(txn->xact_time.prepare_time));
     392             : 
     393          14 :     OutputPluginWrite(ctx, true);
     394             : }
     395             : 
     396             : /* COMMIT PREPARED callback */
     397             : static void
     398          14 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     399             :                               XLogRecPtr commit_lsn)
     400             : {
     401          14 :     TestDecodingData *data = ctx->output_plugin_private;
     402             : 
     403          14 :     OutputPluginPrepareWrite(ctx, true);
     404             : 
     405          14 :     appendStringInfo(ctx->out, "COMMIT PREPARED %s",
     406          14 :                      quote_literal_cstr(txn->gid));
     407             : 
     408          14 :     if (data->include_xids)
     409           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     410             : 
     411          14 :     if (data->include_timestamp)
     412           0 :         appendStringInfo(ctx->out, " (at %s)",
     413             :                          timestamptz_to_str(txn->xact_time.commit_time));
     414             : 
     415          14 :     OutputPluginWrite(ctx, true);
     416          14 : }
     417             : 
     418             : /* ROLLBACK PREPARED callback */
     419             : static void
     420           2 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
     421             :                                 ReorderBufferTXN *txn,
     422             :                                 XLogRecPtr prepare_end_lsn,
     423             :                                 TimestampTz prepare_time)
     424             : {
     425           2 :     TestDecodingData *data = ctx->output_plugin_private;
     426             : 
     427           2 :     OutputPluginPrepareWrite(ctx, true);
     428             : 
     429           2 :     appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
     430           2 :                      quote_literal_cstr(txn->gid));
     431             : 
     432           2 :     if (data->include_xids)
     433           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     434             : 
     435           2 :     if (data->include_timestamp)
     436           0 :         appendStringInfo(ctx->out, " (at %s)",
     437             :                          timestamptz_to_str(txn->xact_time.commit_time));
     438             : 
     439           2 :     OutputPluginWrite(ctx, true);
     440           2 : }
     441             : 
     442             : /*
     443             :  * Filter out two-phase transactions.
     444             :  *
     445             :  * Each plugin can implement its own filtering logic. Here we demonstrate a
     446             :  * simple logic by checking the GID. If the GID contains the "_nodecode"
     447             :  * substring, then we filter it out.
     448             :  */
     449             : static bool
     450         288 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
     451             :                          const char *gid)
     452             : {
     453         288 :     if (strstr(gid, "_nodecode") != NULL)
     454          24 :         return true;
     455             : 
     456         264 :     return false;
     457             : }
     458             : 
     459             : static bool
     460     2390982 : pg_decode_filter(LogicalDecodingContext *ctx,
     461             :                  RepOriginId origin_id)
     462             : {
     463     2390982 :     TestDecodingData *data = ctx->output_plugin_private;
     464             : 
     465     2390982 :     if (data->only_local && origin_id != InvalidRepOriginId)
     466          18 :         return true;
     467     2390964 :     return false;
     468             : }
     469             : 
     470             : /*
     471             :  * Print literal `outputstr' already represented as string of type `typid'
     472             :  * into stringbuf `s'.
     473             :  *
     474             :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
     475             :  * if standard_conforming_strings were enabled.
     476             :  */
     477             : static void
     478      351916 : print_literal(StringInfo s, Oid typid, char *outputstr)
     479             : {
     480             :     const char *valptr;
     481             : 
     482      351916 :     switch (typid)
     483             :     {
     484      120526 :         case INT2OID:
     485             :         case INT4OID:
     486             :         case INT8OID:
     487             :         case OIDOID:
     488             :         case FLOAT4OID:
     489             :         case FLOAT8OID:
     490             :         case NUMERICOID:
     491             :             /* NB: We don't care about Inf, NaN et al. */
     492      120526 :             appendStringInfoString(s, outputstr);
     493      120526 :             break;
     494             : 
     495           0 :         case BITOID:
     496             :         case VARBITOID:
     497           0 :             appendStringInfo(s, "B'%s'", outputstr);
     498           0 :             break;
     499             : 
     500           0 :         case BOOLOID:
     501           0 :             if (strcmp(outputstr, "t") == 0)
     502           0 :                 appendStringInfoString(s, "true");
     503             :             else
     504           0 :                 appendStringInfoString(s, "false");
     505           0 :             break;
     506             : 
     507      231390 :         default:
     508      231390 :             appendStringInfoChar(s, '\'');
     509    10854352 :             for (valptr = outputstr; *valptr; valptr++)
     510             :             {
     511    10622962 :                 char        ch = *valptr;
     512             : 
     513    10622962 :                 if (SQL_STR_DOUBLE(ch, false))
     514         128 :                     appendStringInfoChar(s, ch);
     515    10622962 :                 appendStringInfoChar(s, ch);
     516             :             }
     517      231390 :             appendStringInfoChar(s, '\'');
     518      231390 :             break;
     519             :     }
     520      351916 : }
     521             : 
     522             : /* print the tuple 'tuple' into the StringInfo s */
     523             : static void
     524      291142 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
     525             : {
     526             :     int         natt;
     527             : 
     528             :     /* print all columns individually */
     529      694574 :     for (natt = 0; natt < tupdesc->natts; natt++)
     530             :     {
     531             :         Form_pg_attribute attr; /* the attribute itself */
     532             :         Oid         typid;      /* type of current attribute */
     533             :         Oid         typoutput;  /* output function */
     534             :         bool        typisvarlena;
     535             :         Datum       origval;    /* possibly toasted Datum */
     536             :         bool        isnull;     /* column is null? */
     537             : 
     538      403432 :         attr = TupleDescAttr(tupdesc, natt);
     539             : 
     540             :         /*
     541             :          * don't print dropped columns, we can't be sure everything is
     542             :          * available for them
     543             :          */
     544      403432 :         if (attr->attisdropped)
     545       10260 :             continue;
     546             : 
     547             :         /*
     548             :          * Don't print system columns, oid will already have been printed if
     549             :          * present.
     550             :          */
     551      403288 :         if (attr->attnum < 0)
     552           0 :             continue;
     553             : 
     554      403288 :         typid = attr->atttypid;
     555             : 
     556             :         /* get Datum from tuple */
     557      403288 :         origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
     558             : 
     559      403288 :         if (isnull && skip_nulls)
     560       10116 :             continue;
     561             : 
     562             :         /* print attribute name */
     563      393172 :         appendStringInfoChar(s, ' ');
     564      393172 :         appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
     565             : 
     566             :         /* print attribute type */
     567      393172 :         appendStringInfoChar(s, '[');
     568      393172 :         appendStringInfoString(s, format_type_be(typid));
     569      393172 :         appendStringInfoChar(s, ']');
     570             : 
     571             :         /* query output function */
     572      393172 :         getTypeOutputInfo(typid,
     573             :                           &typoutput, &typisvarlena);
     574             : 
     575             :         /* print separator */
     576      393172 :         appendStringInfoChar(s, ':');
     577             : 
     578             :         /* print data */
     579      393172 :         if (isnull)
     580       41232 :             appendStringInfoString(s, "null");
     581      351940 :         else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
     582          24 :             appendStringInfoString(s, "unchanged-toast-datum");
     583      351916 :         else if (!typisvarlena)
     584      120534 :             print_literal(s, typid,
     585             :                           OidOutputFunctionCall(typoutput, origval));
     586             :         else
     587             :         {
     588             :             Datum       val;    /* definitely detoasted Datum */
     589             : 
     590      231382 :             val = PointerGetDatum(PG_DETOAST_DATUM(origval));
     591      231382 :             print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
     592             :         }
     593             :     }
     594      291142 : }
     595             : 
     596             : /*
     597             :  * callback for individual changed tuples
     598             :  */
     599             : static void
     600      301120 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     601             :                  Relation relation, ReorderBufferChange *change)
     602             : {
     603             :     TestDecodingData *data;
     604             :     TestDecodingTxnData *txndata;
     605             :     Form_pg_class class_form;
     606             :     TupleDesc   tupdesc;
     607             :     MemoryContext old;
     608             : 
     609      301120 :     data = ctx->output_plugin_private;
     610      301120 :     txndata = txn->output_plugin_private;
     611             : 
     612             :     /* output BEGIN if we haven't yet */
     613      301120 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     614             :     {
     615         434 :         pg_output_begin(ctx, data, txn, false);
     616             :     }
     617      301120 :     txndata->xact_wrote_changes = true;
     618             : 
     619      301120 :     class_form = RelationGetForm(relation);
     620      301120 :     tupdesc = RelationGetDescr(relation);
     621             : 
     622             :     /* Avoid leaking memory by using and resetting our own context */
     623      301120 :     old = MemoryContextSwitchTo(data->context);
     624             : 
     625      301120 :     OutputPluginPrepareWrite(ctx, true);
     626             : 
     627      301120 :     appendStringInfoString(ctx->out, "table ");
     628      301120 :     appendStringInfoString(ctx->out,
     629      301120 :                            quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
     630      301120 :                                                       class_form->relrewrite ?
     631           2 :                                                       get_rel_name(class_form->relrewrite) :
     632             :                                                       NameStr(class_form->relname)));
     633      301120 :     appendStringInfoChar(ctx->out, ':');
     634             : 
     635      301120 :     switch (change->action)
     636             :     {
     637      265990 :         case REORDER_BUFFER_CHANGE_INSERT:
     638      265990 :             appendStringInfoString(ctx->out, " INSERT:");
     639      265990 :             if (change->data.tp.newtuple == NULL)
     640           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     641             :             else
     642      265990 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     643             :                                     change->data.tp.newtuple,
     644             :                                     false);
     645      265990 :             break;
     646       15086 :         case REORDER_BUFFER_CHANGE_UPDATE:
     647       15086 :             appendStringInfoString(ctx->out, " UPDATE:");
     648       15086 :             if (change->data.tp.oldtuple != NULL)
     649             :             {
     650          36 :                 appendStringInfoString(ctx->out, " old-key:");
     651          36 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     652             :                                     change->data.tp.oldtuple,
     653             :                                     true);
     654          36 :                 appendStringInfoString(ctx->out, " new-tuple:");
     655             :             }
     656             : 
     657       15086 :             if (change->data.tp.newtuple == NULL)
     658           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     659             :             else
     660       15086 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     661             :                                     change->data.tp.newtuple,
     662             :                                     false);
     663       15086 :             break;
     664       20044 :         case REORDER_BUFFER_CHANGE_DELETE:
     665       20044 :             appendStringInfoString(ctx->out, " DELETE:");
     666             : 
     667             :             /* if there was no PK, we only know that a delete happened */
     668       20044 :             if (change->data.tp.oldtuple == NULL)
     669       10014 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     670             :             /* In DELETE, only the replica identity is present; display that */
     671             :             else
     672       10030 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     673             :                                     change->data.tp.oldtuple,
     674             :                                     true);
     675       20044 :             break;
     676      301120 :         default:
     677             :             Assert(false);
     678             :     }
     679             : 
     680      301120 :     MemoryContextSwitchTo(old);
     681      301120 :     MemoryContextReset(data->context);
     682             : 
     683      301120 :     OutputPluginWrite(ctx, true);
     684      301120 : }
     685             : 
     686             : static void
     687          18 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     688             :                    int nrelations, Relation relations[], ReorderBufferChange *change)
     689             : {
     690             :     TestDecodingData *data;
     691             :     TestDecodingTxnData *txndata;
     692             :     MemoryContext old;
     693             :     int         i;
     694             : 
     695          18 :     data = ctx->output_plugin_private;
     696          18 :     txndata = txn->output_plugin_private;
     697             : 
     698             :     /* output BEGIN if we haven't yet */
     699          18 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     700             :     {
     701          16 :         pg_output_begin(ctx, data, txn, false);
     702             :     }
     703          18 :     txndata->xact_wrote_changes = true;
     704             : 
     705             :     /* Avoid leaking memory by using and resetting our own context */
     706          18 :     old = MemoryContextSwitchTo(data->context);
     707             : 
     708          18 :     OutputPluginPrepareWrite(ctx, true);
     709             : 
     710          18 :     appendStringInfoString(ctx->out, "table ");
     711             : 
     712          38 :     for (i = 0; i < nrelations; i++)
     713             :     {
     714          20 :         if (i > 0)
     715           2 :             appendStringInfoString(ctx->out, ", ");
     716             : 
     717          20 :         appendStringInfoString(ctx->out,
     718          20 :                                quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
     719          20 :                                                           NameStr(relations[i]->rd_rel->relname)));
     720             :     }
     721             : 
     722          18 :     appendStringInfoString(ctx->out, ": TRUNCATE:");
     723             : 
     724          18 :     if (change->data.truncate.restart_seqs
     725          16 :         || change->data.truncate.cascade)
     726             :     {
     727           2 :         if (change->data.truncate.restart_seqs)
     728           2 :             appendStringInfoString(ctx->out, " restart_seqs");
     729           2 :         if (change->data.truncate.cascade)
     730           2 :             appendStringInfoString(ctx->out, " cascade");
     731             :     }
     732             :     else
     733          16 :         appendStringInfoString(ctx->out, " (no-flags)");
     734             : 
     735          18 :     MemoryContextSwitchTo(old);
     736          18 :     MemoryContextReset(data->context);
     737             : 
     738          18 :     OutputPluginWrite(ctx, true);
     739          18 : }
     740             : 
     741             : static void
     742          18 : pg_decode_message(LogicalDecodingContext *ctx,
     743             :                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     744             :                   const char *prefix, Size sz, const char *message)
     745             : {
     746          18 :     TestDecodingData *data = ctx->output_plugin_private;
     747             :     TestDecodingTxnData *txndata;
     748             : 
     749          18 :     txndata = transactional ? txn->output_plugin_private : NULL;
     750             : 
     751             :     /* output BEGIN if we haven't yet for transactional messages */
     752          18 :     if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
     753           6 :         pg_output_begin(ctx, data, txn, false);
     754             : 
     755          18 :     if (transactional)
     756          10 :         txndata->xact_wrote_changes = true;
     757             : 
     758          18 :     OutputPluginPrepareWrite(ctx, true);
     759          18 :     appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
     760             :                      transactional, prefix, sz);
     761          18 :     appendBinaryStringInfo(ctx->out, message, sz);
     762          18 :     OutputPluginWrite(ctx, true);
     763          18 : }
     764             : 
     765             : static void
     766          70 : pg_decode_stream_start(LogicalDecodingContext *ctx,
     767             :                        ReorderBufferTXN *txn)
     768             : {
     769          70 :     TestDecodingData *data = ctx->output_plugin_private;
     770          70 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     771             : 
     772             :     /*
     773             :      * Allocate the txn plugin data for the first stream in the transaction.
     774             :      */
     775          70 :     if (txndata == NULL)
     776             :     {
     777             :         txndata =
     778          18 :             MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     779          18 :         txndata->xact_wrote_changes = false;
     780          18 :         txn->output_plugin_private = txndata;
     781             :     }
     782             : 
     783          70 :     txndata->stream_wrote_changes = false;
     784          70 :     if (data->skip_empty_xacts)
     785          70 :         return;
     786           0 :     pg_output_stream_start(ctx, data, txn, true);
     787             : }
     788             : 
     789             : static void
     790          22 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     791             : {
     792          22 :     OutputPluginPrepareWrite(ctx, last_write);
     793          22 :     if (data->include_xids)
     794           0 :         appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
     795             :     else
     796          22 :         appendStringInfoString(ctx->out, "opening a streamed block for transaction");
     797          22 :     OutputPluginWrite(ctx, last_write);
     798          22 : }
     799             : 
     800             : static void
     801          70 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
     802             :                       ReorderBufferTXN *txn)
     803             : {
     804          70 :     TestDecodingData *data = ctx->output_plugin_private;
     805          70 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     806             : 
     807          70 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     808          48 :         return;
     809             : 
     810          22 :     OutputPluginPrepareWrite(ctx, true);
     811          22 :     if (data->include_xids)
     812           0 :         appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
     813             :     else
     814          22 :         appendStringInfoString(ctx->out, "closing a streamed block for transaction");
     815          22 :     OutputPluginWrite(ctx, true);
     816             : }
     817             : 
     818             : static void
     819           8 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
     820             :                        ReorderBufferTXN *txn,
     821             :                        XLogRecPtr abort_lsn)
     822             : {
     823           8 :     TestDecodingData *data = ctx->output_plugin_private;
     824             : 
     825             :     /*
     826             :      * stream abort can be sent for an individual subtransaction but we
     827             :      * maintain the output_plugin_private only under the toptxn so if this is
     828             :      * not the toptxn then fetch the toptxn.
     829             :      */
     830           8 :     ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
     831           8 :     TestDecodingTxnData *txndata = toptxn->output_plugin_private;
     832           8 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     833             : 
     834           8 :     if (rbtxn_is_toptxn(txn))
     835             :     {
     836             :         Assert(txn->output_plugin_private != NULL);
     837           0 :         pfree(txndata);
     838           0 :         txn->output_plugin_private = NULL;
     839             :     }
     840             : 
     841           8 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     842           0 :         return;
     843             : 
     844           8 :     OutputPluginPrepareWrite(ctx, true);
     845           8 :     if (data->include_xids)
     846           0 :         appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
     847             :     else
     848           8 :         appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
     849           8 :     OutputPluginWrite(ctx, true);
     850             : }
     851             : 
     852             : static void
     853           2 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     854             :                          ReorderBufferTXN *txn,
     855             :                          XLogRecPtr prepare_lsn)
     856             : {
     857           2 :     TestDecodingData *data = ctx->output_plugin_private;
     858           2 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     859             : 
     860           2 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     861           0 :         return;
     862             : 
     863           2 :     OutputPluginPrepareWrite(ctx, true);
     864             : 
     865           2 :     if (data->include_xids)
     866           0 :         appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
     867           0 :                          quote_literal_cstr(txn->gid), txn->xid);
     868             :     else
     869           2 :         appendStringInfo(ctx->out, "preparing streamed transaction %s",
     870           2 :                          quote_literal_cstr(txn->gid));
     871             : 
     872           2 :     if (data->include_timestamp)
     873           0 :         appendStringInfo(ctx->out, " (at %s)",
     874             :                          timestamptz_to_str(txn->xact_time.prepare_time));
     875             : 
     876           2 :     OutputPluginWrite(ctx, true);
     877             : }
     878             : 
     879             : static void
     880          10 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
     881             :                         ReorderBufferTXN *txn,
     882             :                         XLogRecPtr commit_lsn)
     883             : {
     884          10 :     TestDecodingData *data = ctx->output_plugin_private;
     885          10 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     886          10 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     887             : 
     888          10 :     pfree(txndata);
     889          10 :     txn->output_plugin_private = NULL;
     890             : 
     891          10 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     892           0 :         return;
     893             : 
     894          10 :     OutputPluginPrepareWrite(ctx, true);
     895             : 
     896          10 :     if (data->include_xids)
     897           0 :         appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
     898             :     else
     899          10 :         appendStringInfoString(ctx->out, "committing streamed transaction");
     900             : 
     901          10 :     if (data->include_timestamp)
     902           0 :         appendStringInfo(ctx->out, " (at %s)",
     903             :                          timestamptz_to_str(txn->xact_time.commit_time));
     904             : 
     905          10 :     OutputPluginWrite(ctx, true);
     906             : }
     907             : 
     908             : /*
     909             :  * In streaming mode, we don't display the changes as the transaction can abort
     910             :  * at a later point in time.  We don't want users to see the changes until the
     911             :  * transaction is committed.
     912             :  */
     913             : static void
     914         130 : pg_decode_stream_change(LogicalDecodingContext *ctx,
     915             :                         ReorderBufferTXN *txn,
     916             :                         Relation relation,
     917             :                         ReorderBufferChange *change)
     918             : {
     919         130 :     TestDecodingData *data = ctx->output_plugin_private;
     920         130 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     921             : 
     922             :     /* output stream start if we haven't yet */
     923         130 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     924             :     {
     925          16 :         pg_output_stream_start(ctx, data, txn, false);
     926             :     }
     927         130 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     928             : 
     929         130 :     OutputPluginPrepareWrite(ctx, true);
     930         130 :     if (data->include_xids)
     931           0 :         appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
     932             :     else
     933         130 :         appendStringInfoString(ctx->out, "streaming change for transaction");
     934         130 :     OutputPluginWrite(ctx, true);
     935         130 : }
     936             : 
     937             : /*
     938             :  * In streaming mode, we don't display the contents for transactional messages
     939             :  * as the transaction can abort at a later point in time.  We don't want users to
     940             :  * see the message contents until the transaction is committed.
     941             :  */
     942             : static void
     943           6 : pg_decode_stream_message(LogicalDecodingContext *ctx,
     944             :                          ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     945             :                          const char *prefix, Size sz, const char *message)
     946             : {
     947             :     /* Output stream start if we haven't yet for transactional messages. */
     948           6 :     if (transactional)
     949             :     {
     950           6 :         TestDecodingData *data = ctx->output_plugin_private;
     951           6 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     952             : 
     953           6 :         if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     954             :         {
     955           6 :             pg_output_stream_start(ctx, data, txn, false);
     956             :         }
     957           6 :         txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     958             :     }
     959             : 
     960           6 :     OutputPluginPrepareWrite(ctx, true);
     961             : 
     962           6 :     if (transactional)
     963             :     {
     964           6 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
     965             :                          transactional, prefix, sz);
     966             :     }
     967             :     else
     968             :     {
     969           0 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
     970             :                          transactional, prefix, sz);
     971           0 :         appendBinaryStringInfo(ctx->out, message, sz);
     972             :     }
     973             : 
     974           6 :     OutputPluginWrite(ctx, true);
     975           6 : }
     976             : 
     977             : /*
     978             :  * In streaming mode, we don't display the detailed information of Truncate.
     979             :  * See pg_decode_stream_change.
     980             :  */
     981             : static void
     982           0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     983             :                           int nrelations, Relation relations[],
     984             :                           ReorderBufferChange *change)
     985             : {
     986           0 :     TestDecodingData *data = ctx->output_plugin_private;
     987           0 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     988             : 
     989           0 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     990             :     {
     991           0 :         pg_output_stream_start(ctx, data, txn, false);
     992             :     }
     993           0 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     994             : 
     995           0 :     OutputPluginPrepareWrite(ctx, true);
     996           0 :     if (data->include_xids)
     997           0 :         appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
     998             :     else
     999           0 :         appendStringInfoString(ctx->out, "streaming truncate for transaction");
    1000           0 :     OutputPluginWrite(ctx, true);
    1001           0 : }

Generated by: LCOV version 1.14