LCOV - code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 336 400 84.0 %
Date: 2021-09-17 15:07:27 Functions: 27 28 96.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * test_decoding.c
       4             :  *        example logical decoding output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        contrib/test_decoding/test_decoding.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "catalog/pg_type.h"
      16             : 
      17             : #include "replication/logical.h"
      18             : #include "replication/origin.h"
      19             : 
      20             : #include "utils/builtins.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/memutils.h"
      23             : #include "utils/rel.h"
      24             : 
      25         110 : PG_MODULE_MAGIC;
      26             : 
      27             : /* These must be available to dlsym() */
      28             : extern void _PG_init(void);
      29             : extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
      30             : 
      31             : typedef struct
      32             : {
      33             :     MemoryContext context;
      34             :     bool        include_xids;
      35             :     bool        include_timestamp;
      36             :     bool        skip_empty_xacts;
      37             :     bool        only_local;
      38             : } TestDecodingData;
      39             : 
      40             : /*
      41             :  * Maintain the per-transaction level variables to track whether the
      42             :  * transaction and or streams have written any changes. In streaming mode the
      43             :  * transaction can be decoded in streams so along with maintaining whether the
      44             :  * transaction has written any changes, we also need to track whether the
      45             :  * current stream has written any changes. This is required so that if user
      46             :  * has requested to skip the empty transactions we can skip the empty streams
      47             :  * even though the transaction has written some changes.
      48             :  */
      49             : typedef struct
      50             : {
      51             :     bool        xact_wrote_changes;
      52             :     bool        stream_wrote_changes;
      53             : } TestDecodingTxnData;
      54             : 
      55             : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      56             :                               bool is_init);
      57             : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
      58             : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
      59             :                                 ReorderBufferTXN *txn);
      60             : static void pg_output_begin(LogicalDecodingContext *ctx,
      61             :                             TestDecodingData *data,
      62             :                             ReorderBufferTXN *txn,
      63             :                             bool last_write);
      64             : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
      65             :                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      66             : static void pg_decode_change(LogicalDecodingContext *ctx,
      67             :                              ReorderBufferTXN *txn, Relation rel,
      68             :                              ReorderBufferChange *change);
      69             : static void pg_decode_truncate(LogicalDecodingContext *ctx,
      70             :                                ReorderBufferTXN *txn,
      71             :                                int nrelations, Relation relations[],
      72             :                                ReorderBufferChange *change);
      73             : static bool pg_decode_filter(LogicalDecodingContext *ctx,
      74             :                              RepOriginId origin_id);
      75             : static void pg_decode_message(LogicalDecodingContext *ctx,
      76             :                               ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      77             :                               bool transactional, const char *prefix,
      78             :                               Size sz, const char *message);
      79             : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
      80             :                                      TransactionId xid,
      81             :                                      const char *gid);
      82             : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
      83             :                                         ReorderBufferTXN *txn);
      84             : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
      85             :                                   ReorderBufferTXN *txn,
      86             :                                   XLogRecPtr prepare_lsn);
      87             : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
      88             :                                           ReorderBufferTXN *txn,
      89             :                                           XLogRecPtr commit_lsn);
      90             : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
      91             :                                             ReorderBufferTXN *txn,
      92             :                                             XLogRecPtr prepare_end_lsn,
      93             :                                             TimestampTz prepare_time);
      94             : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
      95             :                                    ReorderBufferTXN *txn);
      96             : static void pg_output_stream_start(LogicalDecodingContext *ctx,
      97             :                                    TestDecodingData *data,
      98             :                                    ReorderBufferTXN *txn,
      99             :                                    bool last_write);
     100             : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
     101             :                                   ReorderBufferTXN *txn);
     102             : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
     103             :                                    ReorderBufferTXN *txn,
     104             :                                    XLogRecPtr abort_lsn);
     105             : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     106             :                                      ReorderBufferTXN *txn,
     107             :                                      XLogRecPtr prepare_lsn);
     108             : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
     109             :                                     ReorderBufferTXN *txn,
     110             :                                     XLogRecPtr commit_lsn);
     111             : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
     112             :                                     ReorderBufferTXN *txn,
     113             :                                     Relation relation,
     114             :                                     ReorderBufferChange *change);
     115             : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
     116             :                                      ReorderBufferTXN *txn, XLogRecPtr message_lsn,
     117             :                                      bool transactional, const char *prefix,
     118             :                                      Size sz, const char *message);
     119             : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
     120             :                                       ReorderBufferTXN *txn,
     121             :                                       int nrelations, Relation relations[],
     122             :                                       ReorderBufferChange *change);
     123             : 
     124             : void
     125         110 : _PG_init(void)
     126             : {
     127             :     /* other plugins can perform things here */
     128         110 : }
     129             : 
     130             : /* specify output plugin callbacks */
     131             : void
     132         494 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     133             : {
     134             :     AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
     135             : 
     136         494 :     cb->startup_cb = pg_decode_startup;
     137         494 :     cb->begin_cb = pg_decode_begin_txn;
     138         494 :     cb->change_cb = pg_decode_change;
     139         494 :     cb->truncate_cb = pg_decode_truncate;
     140         494 :     cb->commit_cb = pg_decode_commit_txn;
     141         494 :     cb->filter_by_origin_cb = pg_decode_filter;
     142         494 :     cb->shutdown_cb = pg_decode_shutdown;
     143         494 :     cb->message_cb = pg_decode_message;
     144         494 :     cb->filter_prepare_cb = pg_decode_filter_prepare;
     145         494 :     cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
     146         494 :     cb->prepare_cb = pg_decode_prepare_txn;
     147         494 :     cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
     148         494 :     cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
     149         494 :     cb->stream_start_cb = pg_decode_stream_start;
     150         494 :     cb->stream_stop_cb = pg_decode_stream_stop;
     151         494 :     cb->stream_abort_cb = pg_decode_stream_abort;
     152         494 :     cb->stream_prepare_cb = pg_decode_stream_prepare;
     153         494 :     cb->stream_commit_cb = pg_decode_stream_commit;
     154         494 :     cb->stream_change_cb = pg_decode_stream_change;
     155         494 :     cb->stream_message_cb = pg_decode_stream_message;
     156         494 :     cb->stream_truncate_cb = pg_decode_stream_truncate;
     157         494 : }
     158             : 
     159             : 
     160             : /* initialize this plugin */
     161             : static void
     162         494 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     163             :                   bool is_init)
     164             : {
     165             :     ListCell   *option;
     166             :     TestDecodingData *data;
     167         494 :     bool        enable_streaming = false;
     168             : 
     169         494 :     data = palloc0(sizeof(TestDecodingData));
     170         494 :     data->context = AllocSetContextCreate(ctx->context,
     171             :                                           "text conversion context",
     172             :                                           ALLOCSET_DEFAULT_SIZES);
     173         494 :     data->include_xids = true;
     174         494 :     data->include_timestamp = false;
     175         494 :     data->skip_empty_xacts = false;
     176         494 :     data->only_local = false;
     177             : 
     178         494 :     ctx->output_plugin_private = data;
     179             : 
     180         494 :     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
     181         494 :     opt->receive_rewrites = false;
     182             : 
     183        1068 :     foreach(option, ctx->output_plugin_options)
     184             :     {
     185         580 :         DefElem    *elem = lfirst(option);
     186             : 
     187             :         Assert(elem->arg == NULL || IsA(elem->arg, String));
     188             : 
     189         580 :         if (strcmp(elem->defname, "include-xids") == 0)
     190             :         {
     191             :             /* if option does not provide a value, it means its value is true */
     192         274 :             if (elem->arg == NULL)
     193           0 :                 data->include_xids = true;
     194         274 :             else if (!parse_bool(strVal(elem->arg), &data->include_xids))
     195           4 :                 ereport(ERROR,
     196             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     197             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     198             :                                 strVal(elem->arg), elem->defname)));
     199             :         }
     200         306 :         else if (strcmp(elem->defname, "include-timestamp") == 0)
     201             :         {
     202           2 :             if (elem->arg == NULL)
     203           0 :                 data->include_timestamp = true;
     204           2 :             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
     205           0 :                 ereport(ERROR,
     206             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     207             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     208             :                                 strVal(elem->arg), elem->defname)));
     209             :         }
     210         304 :         else if (strcmp(elem->defname, "force-binary") == 0)
     211             :         {
     212             :             bool        force_binary;
     213             : 
     214          12 :             if (elem->arg == NULL)
     215           0 :                 continue;
     216          12 :             else if (!parse_bool(strVal(elem->arg), &force_binary))
     217           0 :                 ereport(ERROR,
     218             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     219             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     220             :                                 strVal(elem->arg), elem->defname)));
     221             : 
     222          12 :             if (force_binary)
     223           4 :                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     224             :         }
     225         292 :         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
     226             :         {
     227             : 
     228         266 :             if (elem->arg == NULL)
     229           0 :                 data->skip_empty_xacts = true;
     230         266 :             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
     231           0 :                 ereport(ERROR,
     232             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     233             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     234             :                                 strVal(elem->arg), elem->defname)));
     235             :         }
     236          26 :         else if (strcmp(elem->defname, "only-local") == 0)
     237             :         {
     238             : 
     239           6 :             if (elem->arg == NULL)
     240           0 :                 data->only_local = true;
     241           6 :             else if (!parse_bool(strVal(elem->arg), &data->only_local))
     242           0 :                 ereport(ERROR,
     243             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     244             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     245             :                                 strVal(elem->arg), elem->defname)));
     246             :         }
     247          20 :         else if (strcmp(elem->defname, "include-rewrites") == 0)
     248             :         {
     249             : 
     250           2 :             if (elem->arg == NULL)
     251           0 :                 continue;
     252           2 :             else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
     253           0 :                 ereport(ERROR,
     254             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     255             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     256             :                                 strVal(elem->arg), elem->defname)));
     257             :         }
     258          18 :         else if (strcmp(elem->defname, "stream-changes") == 0)
     259             :         {
     260          16 :             if (elem->arg == NULL)
     261           0 :                 continue;
     262          16 :             else if (!parse_bool(strVal(elem->arg), &enable_streaming))
     263           0 :                 ereport(ERROR,
     264             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     265             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     266             :                                 strVal(elem->arg), elem->defname)));
     267             :         }
     268             :         else
     269             :         {
     270           2 :             ereport(ERROR,
     271             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     272             :                      errmsg("option \"%s\" = \"%s\" is unknown",
     273             :                             elem->defname,
     274             :                             elem->arg ? strVal(elem->arg) : "(null)")));
     275             :         }
     276             :     }
     277             : 
     278         488 :     ctx->streaming &= enable_streaming;
     279         488 : }
     280             : 
     281             : /* cleanup this plugin's resources */
     282             : static void
     283         484 : pg_decode_shutdown(LogicalDecodingContext *ctx)
     284             : {
     285         484 :     TestDecodingData *data = ctx->output_plugin_private;
     286             : 
     287             :     /* cleanup our own resources via memory context reset */
     288         484 :     MemoryContextDelete(data->context);
     289         484 : }
     290             : 
     291             : /* BEGIN callback */
     292             : static void
     293         766 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     294             : {
     295         766 :     TestDecodingData *data = ctx->output_plugin_private;
     296             :     TestDecodingTxnData *txndata =
     297         766 :     MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     298             : 
     299         766 :     txndata->xact_wrote_changes = false;
     300         766 :     txn->output_plugin_private = txndata;
     301             : 
     302         766 :     if (data->skip_empty_xacts)
     303         700 :         return;
     304             : 
     305          66 :     pg_output_begin(ctx, data, txn, true);
     306             : }
     307             : 
     308             : static void
     309         452 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     310             : {
     311         452 :     OutputPluginPrepareWrite(ctx, last_write);
     312         452 :     if (data->include_xids)
     313          52 :         appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
     314             :     else
     315         400 :         appendStringInfoString(ctx->out, "BEGIN");
     316         452 :     OutputPluginWrite(ctx, last_write);
     317         452 : }
     318             : 
     319             : /* COMMIT callback */
     320             : static void
     321         766 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     322             :                      XLogRecPtr commit_lsn)
     323             : {
     324         766 :     TestDecodingData *data = ctx->output_plugin_private;
     325         766 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     326         766 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     327             : 
     328         766 :     pfree(txndata);
     329         766 :     txn->output_plugin_private = NULL;
     330             : 
     331         766 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     332         326 :         return;
     333             : 
     334         440 :     OutputPluginPrepareWrite(ctx, true);
     335         440 :     if (data->include_xids)
     336          52 :         appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
     337             :     else
     338         388 :         appendStringInfoString(ctx->out, "COMMIT");
     339             : 
     340         440 :     if (data->include_timestamp)
     341           2 :         appendStringInfo(ctx->out, " (at %s)",
     342             :                          timestamptz_to_str(txn->xact_time.commit_time));
     343             : 
     344         440 :     OutputPluginWrite(ctx, true);
     345             : }
     346             : 
     347             : /* BEGIN PREPARE callback */
     348             : static void
     349          12 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     350             : {
     351          12 :     TestDecodingData *data = ctx->output_plugin_private;
     352             :     TestDecodingTxnData *txndata =
     353          12 :     MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     354             : 
     355          12 :     txndata->xact_wrote_changes = false;
     356          12 :     txn->output_plugin_private = txndata;
     357             : 
     358          12 :     if (data->skip_empty_xacts)
     359          12 :         return;
     360             : 
     361           0 :     pg_output_begin(ctx, data, txn, true);
     362             : }
     363             : 
     364             : /* PREPARE callback */
     365             : static void
     366          12 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     367             :                       XLogRecPtr prepare_lsn)
     368             : {
     369          12 :     TestDecodingData *data = ctx->output_plugin_private;
     370          12 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     371             : 
     372          12 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     373           0 :         return;
     374             : 
     375          12 :     OutputPluginPrepareWrite(ctx, true);
     376             : 
     377          12 :     appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
     378          12 :                      quote_literal_cstr(txn->gid));
     379             : 
     380          12 :     if (data->include_xids)
     381           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     382             : 
     383          12 :     if (data->include_timestamp)
     384           0 :         appendStringInfo(ctx->out, " (at %s)",
     385             :                          timestamptz_to_str(txn->xact_time.prepare_time));
     386             : 
     387          12 :     OutputPluginWrite(ctx, true);
     388             : }
     389             : 
     390             : /* COMMIT PREPARED callback */
     391             : static void
     392          12 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     393             :                               XLogRecPtr commit_lsn)
     394             : {
     395          12 :     TestDecodingData *data = ctx->output_plugin_private;
     396             : 
     397          12 :     OutputPluginPrepareWrite(ctx, true);
     398             : 
     399          12 :     appendStringInfo(ctx->out, "COMMIT PREPARED %s",
     400          12 :                      quote_literal_cstr(txn->gid));
     401             : 
     402          12 :     if (data->include_xids)
     403           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     404             : 
     405          12 :     if (data->include_timestamp)
     406           0 :         appendStringInfo(ctx->out, " (at %s)",
     407             :                          timestamptz_to_str(txn->xact_time.commit_time));
     408             : 
     409          12 :     OutputPluginWrite(ctx, true);
     410          12 : }
     411             : 
     412             : /* ROLLBACK PREPARED callback */
     413             : static void
     414           2 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
     415             :                                 ReorderBufferTXN *txn,
     416             :                                 XLogRecPtr prepare_end_lsn,
     417             :                                 TimestampTz prepare_time)
     418             : {
     419           2 :     TestDecodingData *data = ctx->output_plugin_private;
     420             : 
     421           2 :     OutputPluginPrepareWrite(ctx, true);
     422             : 
     423           2 :     appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
     424           2 :                      quote_literal_cstr(txn->gid));
     425             : 
     426           2 :     if (data->include_xids)
     427           0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     428             : 
     429           2 :     if (data->include_timestamp)
     430           0 :         appendStringInfo(ctx->out, " (at %s)",
     431             :                          timestamptz_to_str(txn->xact_time.commit_time));
     432             : 
     433           2 :     OutputPluginWrite(ctx, true);
     434           2 : }
     435             : 
     436             : /*
     437             :  * Filter out two-phase transactions.
     438             :  *
     439             :  * Each plugin can implement its own filtering logic. Here we demonstrate a
     440             :  * simple logic by checking the GID. If the GID contains the "_nodecode"
     441             :  * substring, then we filter it out.
     442             :  */
     443             : static bool
     444         230 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
     445             :                          const char *gid)
     446             : {
     447         230 :     if (strstr(gid, "_nodecode") != NULL)
     448          16 :         return true;
     449             : 
     450         214 :     return false;
     451             : }
     452             : 
     453             : static bool
     454     2388300 : pg_decode_filter(LogicalDecodingContext *ctx,
     455             :                  RepOriginId origin_id)
     456             : {
     457     2388300 :     TestDecodingData *data = ctx->output_plugin_private;
     458             : 
     459     2388300 :     if (data->only_local && origin_id != InvalidRepOriginId)
     460          18 :         return true;
     461     2388282 :     return false;
     462             : }
     463             : 
     464             : /*
     465             :  * Print literal `outputstr' already represented as string of type `typid'
     466             :  * into stringbuf `s'.
     467             :  *
     468             :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
     469             :  * if standard_conforming_strings were enabled.
     470             :  */
     471             : static void
     472      351472 : print_literal(StringInfo s, Oid typid, char *outputstr)
     473             : {
     474             :     const char *valptr;
     475             : 
     476      351472 :     switch (typid)
     477             :     {
     478      120192 :         case INT2OID:
     479             :         case INT4OID:
     480             :         case INT8OID:
     481             :         case OIDOID:
     482             :         case FLOAT4OID:
     483             :         case FLOAT8OID:
     484             :         case NUMERICOID:
     485             :             /* NB: We don't care about Inf, NaN et al. */
     486      120192 :             appendStringInfoString(s, outputstr);
     487      120192 :             break;
     488             : 
     489           0 :         case BITOID:
     490             :         case VARBITOID:
     491           0 :             appendStringInfo(s, "B'%s'", outputstr);
     492           0 :             break;
     493             : 
     494           0 :         case BOOLOID:
     495           0 :             if (strcmp(outputstr, "t") == 0)
     496           0 :                 appendStringInfoString(s, "true");
     497             :             else
     498           0 :                 appendStringInfoString(s, "false");
     499           0 :             break;
     500             : 
     501      231280 :         default:
     502      231280 :             appendStringInfoChar(s, '\'');
     503    10834060 :             for (valptr = outputstr; *valptr; valptr++)
     504             :             {
     505    10602780 :                 char        ch = *valptr;
     506             : 
     507    10602780 :                 if (SQL_STR_DOUBLE(ch, false))
     508         128 :                     appendStringInfoChar(s, ch);
     509    10602780 :                 appendStringInfoChar(s, ch);
     510             :             }
     511      231280 :             appendStringInfoChar(s, '\'');
     512      231280 :             break;
     513             :     }
     514      351472 : }
     515             : 
     516             : /* print the tuple 'tuple' into the StringInfo s */
     517             : static void
     518      290878 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
     519             : {
     520             :     int         natt;
     521             : 
     522             :     /* print all columns individually */
     523      693670 :     for (natt = 0; natt < tupdesc->natts; natt++)
     524             :     {
     525             :         Form_pg_attribute attr; /* the attribute itself */
     526             :         Oid         typid;      /* type of current attribute */
     527             :         Oid         typoutput;  /* output function */
     528             :         bool        typisvarlena;
     529             :         Datum       origval;    /* possibly toasted Datum */
     530             :         bool        isnull;     /* column is null? */
     531             : 
     532      402792 :         attr = TupleDescAttr(tupdesc, natt);
     533             : 
     534             :         /*
     535             :          * don't print dropped columns, we can't be sure everything is
     536             :          * available for them
     537             :          */
     538      402792 :         if (attr->attisdropped)
     539       10154 :             continue;
     540             : 
     541             :         /*
     542             :          * Don't print system columns, oid will already have been printed if
     543             :          * present.
     544             :          */
     545      402700 :         if (attr->attnum < 0)
     546           0 :             continue;
     547             : 
     548      402700 :         typid = attr->atttypid;
     549             : 
     550             :         /* get Datum from tuple */
     551      402700 :         origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
     552             : 
     553      402700 :         if (isnull && skip_nulls)
     554       10062 :             continue;
     555             : 
     556             :         /* print attribute name */
     557      392638 :         appendStringInfoChar(s, ' ');
     558      392638 :         appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
     559             : 
     560             :         /* print attribute type */
     561      392638 :         appendStringInfoChar(s, '[');
     562      392638 :         appendStringInfoString(s, format_type_be(typid));
     563      392638 :         appendStringInfoChar(s, ']');
     564             : 
     565             :         /* query output function */
     566      392638 :         getTypeOutputInfo(typid,
     567             :                           &typoutput, &typisvarlena);
     568             : 
     569             :         /* print separator */
     570      392638 :         appendStringInfoChar(s, ':');
     571             : 
     572             :         /* print data */
     573      392638 :         if (isnull)
     574       41142 :             appendStringInfoString(s, "null");
     575      351496 :         else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
     576          24 :             appendStringInfoString(s, "unchanged-toast-datum");
     577      351472 :         else if (!typisvarlena)
     578      120200 :             print_literal(s, typid,
     579             :                           OidOutputFunctionCall(typoutput, origval));
     580             :         else
     581             :         {
     582             :             Datum       val;    /* definitely detoasted Datum */
     583             : 
     584      231272 :             val = PointerGetDatum(PG_DETOAST_DATUM(origval));
     585      231272 :             print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
     586             :         }
     587             :     }
     588      290878 : }
     589             : 
     590             : /*
     591             :  * callback for individual changed tuples
     592             :  */
     593             : static void
     594      300858 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     595             :                  Relation relation, ReorderBufferChange *change)
     596             : {
     597             :     TestDecodingData *data;
     598             :     TestDecodingTxnData *txndata;
     599             :     Form_pg_class class_form;
     600             :     TupleDesc   tupdesc;
     601             :     MemoryContext old;
     602             : 
     603      300858 :     data = ctx->output_plugin_private;
     604      300858 :     txndata = txn->output_plugin_private;
     605             : 
     606             :     /* output BEGIN if we haven't yet */
     607      300858 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     608             :     {
     609         380 :         pg_output_begin(ctx, data, txn, false);
     610             :     }
     611      300858 :     txndata->xact_wrote_changes = true;
     612             : 
     613      300858 :     class_form = RelationGetForm(relation);
     614      300858 :     tupdesc = RelationGetDescr(relation);
     615             : 
     616             :     /* Avoid leaking memory by using and resetting our own context */
     617      300858 :     old = MemoryContextSwitchTo(data->context);
     618             : 
     619      300858 :     OutputPluginPrepareWrite(ctx, true);
     620             : 
     621      300858 :     appendStringInfoString(ctx->out, "table ");
     622      300858 :     appendStringInfoString(ctx->out,
     623      300858 :                            quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
     624      300858 :                                                       class_form->relrewrite ?
     625           2 :                                                       get_rel_name(class_form->relrewrite) :
     626             :                                                       NameStr(class_form->relname)));
     627      300858 :     appendStringInfoChar(ctx->out, ':');
     628             : 
     629      300858 :     switch (change->action)
     630             :     {
     631      265784 :         case REORDER_BUFFER_CHANGE_INSERT:
     632      265784 :             appendStringInfoString(ctx->out, " INSERT:");
     633      265784 :             if (change->data.tp.newtuple == NULL)
     634           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     635             :             else
     636      265784 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     637      265784 :                                     &change->data.tp.newtuple->tuple,
     638             :                                     false);
     639      265784 :             break;
     640       15042 :         case REORDER_BUFFER_CHANGE_UPDATE:
     641       15042 :             appendStringInfoString(ctx->out, " UPDATE:");
     642       15042 :             if (change->data.tp.oldtuple != NULL)
     643             :             {
     644          34 :                 appendStringInfoString(ctx->out, " old-key:");
     645          34 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     646          34 :                                     &change->data.tp.oldtuple->tuple,
     647             :                                     true);
     648          34 :                 appendStringInfoString(ctx->out, " new-tuple:");
     649             :             }
     650             : 
     651       15042 :             if (change->data.tp.newtuple == NULL)
     652           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     653             :             else
     654       15042 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     655       15042 :                                     &change->data.tp.newtuple->tuple,
     656             :                                     false);
     657       15042 :             break;
     658       20032 :         case REORDER_BUFFER_CHANGE_DELETE:
     659       20032 :             appendStringInfoString(ctx->out, " DELETE:");
     660             : 
     661             :             /* if there was no PK, we only know that a delete happened */
     662       20032 :             if (change->data.tp.oldtuple == NULL)
     663       10014 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     664             :             /* In DELETE, only the replica identity is present; display that */
     665             :             else
     666       10018 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     667       10018 :                                     &change->data.tp.oldtuple->tuple,
     668             :                                     true);
     669       20032 :             break;
     670      300858 :         default:
     671             :             Assert(false);
     672             :     }
     673             : 
     674      300858 :     MemoryContextSwitchTo(old);
     675      300858 :     MemoryContextReset(data->context);
     676             : 
     677      300858 :     OutputPluginWrite(ctx, true);
     678      300858 : }
     679             : 
     680             : static void
     681           6 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     682             :                    int nrelations, Relation relations[], ReorderBufferChange *change)
     683             : {
     684             :     TestDecodingData *data;
     685             :     TestDecodingTxnData *txndata;
     686             :     MemoryContext old;
     687             :     int         i;
     688             : 
     689           6 :     data = ctx->output_plugin_private;
     690           6 :     txndata = txn->output_plugin_private;
     691             : 
     692             :     /* output BEGIN if we haven't yet */
     693           6 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     694             :     {
     695           6 :         pg_output_begin(ctx, data, txn, false);
     696             :     }
     697           6 :     txndata->xact_wrote_changes = true;
     698             : 
     699             :     /* Avoid leaking memory by using and resetting our own context */
     700           6 :     old = MemoryContextSwitchTo(data->context);
     701             : 
     702           6 :     OutputPluginPrepareWrite(ctx, true);
     703             : 
     704           6 :     appendStringInfoString(ctx->out, "table ");
     705             : 
     706          14 :     for (i = 0; i < nrelations; i++)
     707             :     {
     708           8 :         if (i > 0)
     709           2 :             appendStringInfoString(ctx->out, ", ");
     710             : 
     711           8 :         appendStringInfoString(ctx->out,
     712           8 :                                quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
     713           8 :                                                           NameStr(relations[i]->rd_rel->relname)));
     714             :     }
     715             : 
     716           6 :     appendStringInfoString(ctx->out, ": TRUNCATE:");
     717             : 
     718           6 :     if (change->data.truncate.restart_seqs
     719           4 :         || change->data.truncate.cascade)
     720             :     {
     721           2 :         if (change->data.truncate.restart_seqs)
     722           2 :             appendStringInfoString(ctx->out, " restart_seqs");
     723           4 :         if (change->data.truncate.cascade)
     724           2 :             appendStringInfoString(ctx->out, " cascade");
     725             :     }
     726             :     else
     727           4 :         appendStringInfoString(ctx->out, " (no-flags)");
     728             : 
     729           6 :     MemoryContextSwitchTo(old);
     730           6 :     MemoryContextReset(data->context);
     731             : 
     732           6 :     OutputPluginWrite(ctx, true);
     733           6 : }
     734             : 
     735             : static void
     736          16 : pg_decode_message(LogicalDecodingContext *ctx,
     737             :                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     738             :                   const char *prefix, Size sz, const char *message)
     739             : {
     740          16 :     OutputPluginPrepareWrite(ctx, true);
     741          16 :     appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
     742             :                      transactional, prefix, sz);
     743          16 :     appendBinaryStringInfo(ctx->out, message, sz);
     744          16 :     OutputPluginWrite(ctx, true);
     745          16 : }
     746             : 
     747             : static void
     748          22 : pg_decode_stream_start(LogicalDecodingContext *ctx,
     749             :                        ReorderBufferTXN *txn)
     750             : {
     751          22 :     TestDecodingData *data = ctx->output_plugin_private;
     752          22 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     753             : 
     754             :     /*
     755             :      * Allocate the txn plugin data for the first stream in the transaction.
     756             :      */
     757          22 :     if (txndata == NULL)
     758             :     {
     759             :         txndata =
     760          14 :             MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     761          14 :         txndata->xact_wrote_changes = false;
     762          14 :         txn->output_plugin_private = txndata;
     763             :     }
     764             : 
     765          22 :     txndata->stream_wrote_changes = false;
     766          22 :     if (data->skip_empty_xacts)
     767          22 :         return;
     768           0 :     pg_output_stream_start(ctx, data, txn, true);
     769             : }
     770             : 
     771             : static void
     772          12 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     773             : {
     774          12 :     OutputPluginPrepareWrite(ctx, last_write);
     775          12 :     if (data->include_xids)
     776           0 :         appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
     777             :     else
     778          12 :         appendStringInfoString(ctx->out, "opening a streamed block for transaction");
     779          12 :     OutputPluginWrite(ctx, last_write);
     780          12 : }
     781             : 
     782             : static void
     783          22 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
     784             :                       ReorderBufferTXN *txn)
     785             : {
     786          22 :     TestDecodingData *data = ctx->output_plugin_private;
     787          22 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     788             : 
     789          22 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     790          10 :         return;
     791             : 
     792          12 :     OutputPluginPrepareWrite(ctx, true);
     793          12 :     if (data->include_xids)
     794           0 :         appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
     795             :     else
     796          12 :         appendStringInfoString(ctx->out, "closing a streamed block for transaction");
     797          12 :     OutputPluginWrite(ctx, true);
     798             : }
     799             : 
     800             : static void
     801           6 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
     802             :                        ReorderBufferTXN *txn,
     803             :                        XLogRecPtr abort_lsn)
     804             : {
     805           6 :     TestDecodingData *data = ctx->output_plugin_private;
     806             : 
     807             :     /*
     808             :      * stream abort can be sent for an individual subtransaction but we
     809             :      * maintain the output_plugin_private only under the toptxn so if this is
     810             :      * not the toptxn then fetch the toptxn.
     811             :      */
     812           6 :     ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
     813           6 :     TestDecodingTxnData *txndata = toptxn->output_plugin_private;
     814           6 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     815             : 
     816           6 :     if (txn->toptxn == NULL)
     817             :     {
     818             :         Assert(txn->output_plugin_private != NULL);
     819           0 :         pfree(txndata);
     820           0 :         txn->output_plugin_private = NULL;
     821             :     }
     822             : 
     823           6 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     824           6 :         return;
     825             : 
     826           0 :     OutputPluginPrepareWrite(ctx, true);
     827           0 :     if (data->include_xids)
     828           0 :         appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
     829             :     else
     830           0 :         appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
     831           0 :     OutputPluginWrite(ctx, true);
     832             : }
     833             : 
     834             : static void
     835           2 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     836             :                          ReorderBufferTXN *txn,
     837             :                          XLogRecPtr prepare_lsn)
     838             : {
     839           2 :     TestDecodingData *data = ctx->output_plugin_private;
     840           2 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     841             : 
     842           2 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     843           0 :         return;
     844             : 
     845           2 :     OutputPluginPrepareWrite(ctx, true);
     846             : 
     847           2 :     if (data->include_xids)
     848           0 :         appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
     849           0 :                          quote_literal_cstr(txn->gid), txn->xid);
     850             :     else
     851           2 :         appendStringInfo(ctx->out, "preparing streamed transaction %s",
     852           2 :                          quote_literal_cstr(txn->gid));
     853             : 
     854           2 :     if (data->include_timestamp)
     855           0 :         appendStringInfo(ctx->out, " (at %s)",
     856             :                          timestamptz_to_str(txn->xact_time.prepare_time));
     857             : 
     858           2 :     OutputPluginWrite(ctx, true);
     859             : }
     860             : 
     861             : static void
     862           8 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
     863             :                         ReorderBufferTXN *txn,
     864             :                         XLogRecPtr commit_lsn)
     865             : {
     866           8 :     TestDecodingData *data = ctx->output_plugin_private;
     867           8 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     868           8 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     869             : 
     870           8 :     pfree(txndata);
     871           8 :     txn->output_plugin_private = NULL;
     872             : 
     873           8 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     874           0 :         return;
     875             : 
     876           8 :     OutputPluginPrepareWrite(ctx, true);
     877             : 
     878           8 :     if (data->include_xids)
     879           0 :         appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
     880             :     else
     881           8 :         appendStringInfoString(ctx->out, "committing streamed transaction");
     882             : 
     883           8 :     if (data->include_timestamp)
     884           0 :         appendStringInfo(ctx->out, " (at %s)",
     885             :                          timestamptz_to_str(txn->xact_time.commit_time));
     886             : 
     887           8 :     OutputPluginWrite(ctx, true);
     888             : }
     889             : 
     890             : /*
     891             :  * In streaming mode, we don't display the changes as the transaction can abort
     892             :  * at a later point in time.  We don't want users to see the changes until the
     893             :  * transaction is committed.
     894             :  */
     895             : static void
     896         126 : pg_decode_stream_change(LogicalDecodingContext *ctx,
     897             :                         ReorderBufferTXN *txn,
     898             :                         Relation relation,
     899             :                         ReorderBufferChange *change)
     900             : {
     901         126 :     TestDecodingData *data = ctx->output_plugin_private;
     902         126 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     903             : 
     904             :     /* output stream start if we haven't yet */
     905         126 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     906             :     {
     907          12 :         pg_output_stream_start(ctx, data, txn, false);
     908             :     }
     909         126 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     910             : 
     911         126 :     OutputPluginPrepareWrite(ctx, true);
     912         126 :     if (data->include_xids)
     913           0 :         appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
     914             :     else
     915         126 :         appendStringInfoString(ctx->out, "streaming change for transaction");
     916         126 :     OutputPluginWrite(ctx, true);
     917         126 : }
     918             : 
     919             : /*
     920             :  * In streaming mode, we don't display the contents for transactional messages
     921             :  * as the transaction can abort at a later point in time.  We don't want users to
     922             :  * see the message contents until the transaction is committed.
     923             :  */
     924             : static void
     925           6 : pg_decode_stream_message(LogicalDecodingContext *ctx,
     926             :                          ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     927             :                          const char *prefix, Size sz, const char *message)
     928             : {
     929           6 :     OutputPluginPrepareWrite(ctx, true);
     930             : 
     931           6 :     if (transactional)
     932             :     {
     933           6 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
     934             :                          transactional, prefix, sz);
     935             :     }
     936             :     else
     937             :     {
     938           0 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
     939             :                          transactional, prefix, sz);
     940           0 :         appendBinaryStringInfo(ctx->out, message, sz);
     941             :     }
     942             : 
     943           6 :     OutputPluginWrite(ctx, true);
     944           6 : }
     945             : 
     946             : /*
     947             :  * In streaming mode, we don't display the detailed information of Truncate.
     948             :  * See pg_decode_stream_change.
     949             :  */
     950             : static void
     951           0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     952             :                           int nrelations, Relation relations[],
     953             :                           ReorderBufferChange *change)
     954             : {
     955           0 :     TestDecodingData *data = ctx->output_plugin_private;
     956           0 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     957             : 
     958           0 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     959             :     {
     960           0 :         pg_output_stream_start(ctx, data, txn, false);
     961             :     }
     962           0 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     963             : 
     964           0 :     OutputPluginPrepareWrite(ctx, true);
     965           0 :     if (data->include_xids)
     966           0 :         appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
     967             :     else
     968           0 :         appendStringInfoString(ctx->out, "streaming truncate for transaction");
     969           0 :     OutputPluginWrite(ctx, true);
     970           0 : }

Generated by: LCOV version 1.13