LCOV - code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 281 333 84.4 %
Date: 2020-11-27 11:06:40 Functions: 21 22 95.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * test_decoding.c
       4             :  *        example logical decoding output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        contrib/test_decoding/test_decoding.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "catalog/pg_type.h"
      16             : 
      17             : #include "replication/logical.h"
      18             : #include "replication/origin.h"
      19             : 
      20             : #include "utils/builtins.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/memutils.h"
      23             : #include "utils/rel.h"
      24             : 
      25          98 : PG_MODULE_MAGIC;
      26             : 
      27             : /* These must be available to dlsym() */
      28             : extern void _PG_init(void);
      29             : extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
      30             : 
      31             : typedef struct
      32             : {
      33             :     MemoryContext context;
      34             :     bool        include_xids;
      35             :     bool        include_timestamp;
      36             :     bool        skip_empty_xacts;
      37             :     bool        only_local;
      38             : } TestDecodingData;
      39             : 
      40             : /*
      41             :  * Maintain the per-transaction level variables to track whether the
      42             :  * transaction and or streams have written any changes. In streaming mode the
      43             :  * transaction can be decoded in streams so along with maintaining whether the
      44             :  * transaction has written any changes, we also need to track whether the
      45             :  * current stream has written any changes. This is required so that if user
      46             :  * has requested to skip the empty transactions we can skip the empty streams
      47             :  * even though the transaction has written some changes.
      48             :  */
      49             : typedef struct
      50             : {
      51             :     bool        xact_wrote_changes;
      52             :     bool        stream_wrote_changes;
      53             : } TestDecodingTxnData;
      54             : 
      55             : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      56             :                               bool is_init);
      57             : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
      58             : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
      59             :                                 ReorderBufferTXN *txn);
      60             : static void pg_output_begin(LogicalDecodingContext *ctx,
      61             :                             TestDecodingData *data,
      62             :                             ReorderBufferTXN *txn,
      63             :                             bool last_write);
      64             : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
      65             :                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      66             : static void pg_decode_change(LogicalDecodingContext *ctx,
      67             :                              ReorderBufferTXN *txn, Relation rel,
      68             :                              ReorderBufferChange *change);
      69             : static void pg_decode_truncate(LogicalDecodingContext *ctx,
      70             :                                ReorderBufferTXN *txn,
      71             :                                int nrelations, Relation relations[],
      72             :                                ReorderBufferChange *change);
      73             : static bool pg_decode_filter(LogicalDecodingContext *ctx,
      74             :                              RepOriginId origin_id);
      75             : static void pg_decode_message(LogicalDecodingContext *ctx,
      76             :                               ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      77             :                               bool transactional, const char *prefix,
      78             :                               Size sz, const char *message);
      79             : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
      80             :                                    ReorderBufferTXN *txn);
      81             : static void pg_output_stream_start(LogicalDecodingContext *ctx,
      82             :                                    TestDecodingData *data,
      83             :                                    ReorderBufferTXN *txn,
      84             :                                    bool last_write);
      85             : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
      86             :                                   ReorderBufferTXN *txn);
      87             : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
      88             :                                    ReorderBufferTXN *txn,
      89             :                                    XLogRecPtr abort_lsn);
      90             : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
      91             :                                     ReorderBufferTXN *txn,
      92             :                                     XLogRecPtr commit_lsn);
      93             : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
      94             :                                     ReorderBufferTXN *txn,
      95             :                                     Relation relation,
      96             :                                     ReorderBufferChange *change);
      97             : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
      98             :                                      ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      99             :                                      bool transactional, const char *prefix,
     100             :                                      Size sz, const char *message);
     101             : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
     102             :                                       ReorderBufferTXN *txn,
     103             :                                       int nrelations, Relation relations[],
     104             :                                       ReorderBufferChange *change);
     105             : 
     106             : void
     107          98 : _PG_init(void)
     108             : {
     109             :     /* other plugins can perform things here */
     110          98 : }
     111             : 
     112             : /* specify output plugin callbacks */
     113             : void
     114         420 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     115             : {
     116             :     AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
     117             : 
     118         420 :     cb->startup_cb = pg_decode_startup;
     119         420 :     cb->begin_cb = pg_decode_begin_txn;
     120         420 :     cb->change_cb = pg_decode_change;
     121         420 :     cb->truncate_cb = pg_decode_truncate;
     122         420 :     cb->commit_cb = pg_decode_commit_txn;
     123         420 :     cb->filter_by_origin_cb = pg_decode_filter;
     124         420 :     cb->shutdown_cb = pg_decode_shutdown;
     125         420 :     cb->message_cb = pg_decode_message;
     126         420 :     cb->stream_start_cb = pg_decode_stream_start;
     127         420 :     cb->stream_stop_cb = pg_decode_stream_stop;
     128         420 :     cb->stream_abort_cb = pg_decode_stream_abort;
     129         420 :     cb->stream_commit_cb = pg_decode_stream_commit;
     130         420 :     cb->stream_change_cb = pg_decode_stream_change;
     131         420 :     cb->stream_message_cb = pg_decode_stream_message;
     132         420 :     cb->stream_truncate_cb = pg_decode_stream_truncate;
     133         420 : }
     134             : 
     135             : 
     136             : /* initialize this plugin */
     137             : static void
     138         420 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     139             :                   bool is_init)
     140             : {
     141             :     ListCell   *option;
     142             :     TestDecodingData *data;
     143         420 :     bool        enable_streaming = false;
     144             : 
     145         420 :     data = palloc0(sizeof(TestDecodingData));
     146         420 :     data->context = AllocSetContextCreate(ctx->context,
     147             :                                           "text conversion context",
     148             :                                           ALLOCSET_DEFAULT_SIZES);
     149         420 :     data->include_xids = true;
     150         420 :     data->include_timestamp = false;
     151         420 :     data->skip_empty_xacts = false;
     152         420 :     data->only_local = false;
     153             : 
     154         420 :     ctx->output_plugin_private = data;
     155             : 
     156         420 :     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
     157         420 :     opt->receive_rewrites = false;
     158             : 
     159         868 :     foreach(option, ctx->output_plugin_options)
     160             :     {
     161         454 :         DefElem    *elem = lfirst(option);
     162             : 
     163             :         Assert(elem->arg == NULL || IsA(elem->arg, String));
     164             : 
     165         454 :         if (strcmp(elem->defname, "include-xids") == 0)
     166             :         {
     167             :             /* if option does not provide a value, it means its value is true */
     168         216 :             if (elem->arg == NULL)
     169           0 :                 data->include_xids = true;
     170         216 :             else if (!parse_bool(strVal(elem->arg), &data->include_xids))
     171           4 :                 ereport(ERROR,
     172             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     173             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     174             :                                 strVal(elem->arg), elem->defname)));
     175             :         }
     176         238 :         else if (strcmp(elem->defname, "include-timestamp") == 0)
     177             :         {
     178           2 :             if (elem->arg == NULL)
     179           0 :                 data->include_timestamp = true;
     180           2 :             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
     181           0 :                 ereport(ERROR,
     182             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     183             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     184             :                                 strVal(elem->arg), elem->defname)));
     185             :         }
     186         236 :         else if (strcmp(elem->defname, "force-binary") == 0)
     187             :         {
     188             :             bool        force_binary;
     189             : 
     190          12 :             if (elem->arg == NULL)
     191           0 :                 continue;
     192          12 :             else if (!parse_bool(strVal(elem->arg), &force_binary))
     193           0 :                 ereport(ERROR,
     194             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     195             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     196             :                                 strVal(elem->arg), elem->defname)));
     197             : 
     198          12 :             if (force_binary)
     199           4 :                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     200             :         }
     201         224 :         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
     202             :         {
     203             : 
     204         208 :             if (elem->arg == NULL)
     205           0 :                 data->skip_empty_xacts = true;
     206         208 :             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
     207           0 :                 ereport(ERROR,
     208             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     209             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     210             :                                 strVal(elem->arg), elem->defname)));
     211             :         }
     212          16 :         else if (strcmp(elem->defname, "only-local") == 0)
     213             :         {
     214             : 
     215           6 :             if (elem->arg == NULL)
     216           0 :                 data->only_local = true;
     217           6 :             else if (!parse_bool(strVal(elem->arg), &data->only_local))
     218           0 :                 ereport(ERROR,
     219             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     220             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     221             :                                 strVal(elem->arg), elem->defname)));
     222             :         }
     223          10 :         else if (strcmp(elem->defname, "include-rewrites") == 0)
     224             :         {
     225             : 
     226           2 :             if (elem->arg == NULL)
     227           0 :                 continue;
     228           2 :             else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
     229           0 :                 ereport(ERROR,
     230             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     231             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     232             :                                 strVal(elem->arg), elem->defname)));
     233             :         }
     234           8 :         else if (strcmp(elem->defname, "stream-changes") == 0)
     235             :         {
     236           6 :             if (elem->arg == NULL)
     237           0 :                 continue;
     238           6 :             else if (!parse_bool(strVal(elem->arg), &enable_streaming))
     239           0 :                 ereport(ERROR,
     240             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     241             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     242             :                                 strVal(elem->arg), elem->defname)));
     243             :         }
     244             :         else
     245             :         {
     246           2 :             ereport(ERROR,
     247             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     248             :                      errmsg("option \"%s\" = \"%s\" is unknown",
     249             :                             elem->defname,
     250             :                             elem->arg ? strVal(elem->arg) : "(null)")));
     251             :         }
     252             :     }
     253             : 
     254         414 :     ctx->streaming &= enable_streaming;
     255         414 : }
     256             : 
     257             : /* cleanup this plugin's resources */
     258             : static void
     259         410 : pg_decode_shutdown(LogicalDecodingContext *ctx)
     260             : {
     261         410 :     TestDecodingData *data = ctx->output_plugin_private;
     262             : 
     263             :     /* cleanup our own resources via memory context reset */
     264         410 :     MemoryContextDelete(data->context);
     265         410 : }
     266             : 
     267             : /* BEGIN callback */
     268             : static void
     269         732 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     270             : {
     271         732 :     TestDecodingData *data = ctx->output_plugin_private;
     272             :     TestDecodingTxnData *txndata =
     273         732 :     MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     274             : 
     275         732 :     txndata->xact_wrote_changes = false;
     276         732 :     txn->output_plugin_private = txndata;
     277             : 
     278         732 :     if (data->skip_empty_xacts)
     279         666 :         return;
     280             : 
     281          66 :     pg_output_begin(ctx, data, txn, true);
     282             : }
     283             : 
     284             : static void
     285         418 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     286             : {
     287         418 :     OutputPluginPrepareWrite(ctx, last_write);
     288         418 :     if (data->include_xids)
     289          52 :         appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
     290             :     else
     291         366 :         appendStringInfoString(ctx->out, "BEGIN");
     292         418 :     OutputPluginWrite(ctx, last_write);
     293         418 : }
     294             : 
     295             : /* COMMIT callback */
     296             : static void
     297         732 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     298             :                      XLogRecPtr commit_lsn)
     299             : {
     300         732 :     TestDecodingData *data = ctx->output_plugin_private;
     301         732 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     302         732 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     303             : 
     304         732 :     pfree(txndata);
     305         732 :     txn->output_plugin_private = NULL;
     306             : 
     307         732 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     308         314 :         return;
     309             : 
     310         418 :     OutputPluginPrepareWrite(ctx, true);
     311         418 :     if (data->include_xids)
     312          52 :         appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
     313             :     else
     314         366 :         appendStringInfoString(ctx->out, "COMMIT");
     315             : 
     316         418 :     if (data->include_timestamp)
     317           2 :         appendStringInfo(ctx->out, " (at %s)",
     318             :                          timestamptz_to_str(txn->commit_time));
     319             : 
     320         418 :     OutputPluginWrite(ctx, true);
     321             : }
     322             : 
     323             : static bool
     324     2374474 : pg_decode_filter(LogicalDecodingContext *ctx,
     325             :                  RepOriginId origin_id)
     326             : {
     327     2374474 :     TestDecodingData *data = ctx->output_plugin_private;
     328             : 
     329     2374474 :     if (data->only_local && origin_id != InvalidRepOriginId)
     330          18 :         return true;
     331     2374456 :     return false;
     332             : }
     333             : 
     334             : /*
     335             :  * Print literal `outputstr' already represented as string of type `typid'
     336             :  * into stringbuf `s'.
     337             :  *
     338             :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
     339             :  * if standard_conforming_strings were enabled.
     340             :  */
     341             : static void
     342      361352 : print_literal(StringInfo s, Oid typid, char *outputstr)
     343             : {
     344             :     const char *valptr;
     345             : 
     346      361352 :     switch (typid)
     347             :     {
     348      120122 :         case INT2OID:
     349             :         case INT4OID:
     350             :         case INT8OID:
     351             :         case OIDOID:
     352             :         case FLOAT4OID:
     353             :         case FLOAT8OID:
     354             :         case NUMERICOID:
     355             :             /* NB: We don't care about Inf, NaN et al. */
     356      120122 :             appendStringInfoString(s, outputstr);
     357      120122 :             break;
     358             : 
     359           0 :         case BITOID:
     360             :         case VARBITOID:
     361           0 :             appendStringInfo(s, "B'%s'", outputstr);
     362           0 :             break;
     363             : 
     364           0 :         case BOOLOID:
     365           0 :             if (strcmp(outputstr, "t") == 0)
     366           0 :                 appendStringInfoString(s, "true");
     367             :             else
     368           0 :                 appendStringInfoString(s, "false");
     369           0 :             break;
     370             : 
     371      241230 :         default:
     372      241230 :             appendStringInfoChar(s, '\'');
     373    11073284 :             for (valptr = outputstr; *valptr; valptr++)
     374             :             {
     375    10832054 :                 char        ch = *valptr;
     376             : 
     377    10832054 :                 if (SQL_STR_DOUBLE(ch, false))
     378         128 :                     appendStringInfoChar(s, ch);
     379    10832054 :                 appendStringInfoChar(s, ch);
     380             :             }
     381      241230 :             appendStringInfoChar(s, '\'');
     382      241230 :             break;
     383             :     }
     384      361352 : }
     385             : 
     386             : /* print the tuple 'tuple' into the StringInfo s */
     387             : static void
     388      300766 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
     389             : {
     390             :     int         natt;
     391             : 
     392             :     /* print all columns individually */
     393      713434 :     for (natt = 0; natt < tupdesc->natts; natt++)
     394             :     {
     395             :         Form_pg_attribute attr; /* the attribute itself */
     396             :         Oid         typid;      /* type of current attribute */
     397             :         Oid         typoutput;  /* output function */
     398             :         bool        typisvarlena;
     399             :         Datum       origval;    /* possibly toasted Datum */
     400             :         bool        isnull;     /* column is null? */
     401             : 
     402      412668 :         attr = TupleDescAttr(tupdesc, natt);
     403             : 
     404             :         /*
     405             :          * don't print dropped columns, we can't be sure everything is
     406             :          * available for them
     407             :          */
     408      412668 :         if (attr->attisdropped)
     409       10154 :             continue;
     410             : 
     411             :         /*
     412             :          * Don't print system columns, oid will already have been printed if
     413             :          * present.
     414             :          */
     415      412576 :         if (attr->attnum < 0)
     416           0 :             continue;
     417             : 
     418      412576 :         typid = attr->atttypid;
     419             : 
     420             :         /* get Datum from tuple */
     421      412576 :         origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
     422             : 
     423      412576 :         if (isnull && skip_nulls)
     424       10062 :             continue;
     425             : 
     426             :         /* print attribute name */
     427      402514 :         appendStringInfoChar(s, ' ');
     428      402514 :         appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
     429             : 
     430             :         /* print attribute type */
     431      402514 :         appendStringInfoChar(s, '[');
     432      402514 :         appendStringInfoString(s, format_type_be(typid));
     433      402514 :         appendStringInfoChar(s, ']');
     434             : 
     435             :         /* query output function */
     436      402514 :         getTypeOutputInfo(typid,
     437             :                           &typoutput, &typisvarlena);
     438             : 
     439             :         /* print separator */
     440      402514 :         appendStringInfoChar(s, ':');
     441             : 
     442             :         /* print data */
     443      402514 :         if (isnull)
     444       41138 :             appendStringInfoString(s, "null");
     445      361376 :         else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
     446          24 :             appendStringInfoString(s, "unchanged-toast-datum");
     447      361352 :         else if (!typisvarlena)
     448      120130 :             print_literal(s, typid,
     449             :                           OidOutputFunctionCall(typoutput, origval));
     450             :         else
     451             :         {
     452             :             Datum       val;    /* definitely detoasted Datum */
     453             : 
     454      241222 :             val = PointerGetDatum(PG_DETOAST_DATUM(origval));
     455      241222 :             print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
     456             :         }
     457             :     }
     458      300766 : }
     459             : 
     460             : /*
     461             :  * callback for individual changed tuples
     462             :  */
     463             : static void
     464      310746 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     465             :                  Relation relation, ReorderBufferChange *change)
     466             : {
     467             :     TestDecodingData *data;
     468             :     TestDecodingTxnData *txndata;
     469             :     Form_pg_class class_form;
     470             :     TupleDesc   tupdesc;
     471             :     MemoryContext old;
     472             : 
     473      310746 :     data = ctx->output_plugin_private;
     474      310746 :     txndata = txn->output_plugin_private;
     475             : 
     476             :     /* output BEGIN if we haven't yet */
     477      310746 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     478             :     {
     479         346 :         pg_output_begin(ctx, data, txn, false);
     480             :     }
     481      310746 :     txndata->xact_wrote_changes = true;
     482             : 
     483      310746 :     class_form = RelationGetForm(relation);
     484      310746 :     tupdesc = RelationGetDescr(relation);
     485             : 
     486             :     /* Avoid leaking memory by using and resetting our own context */
     487      310746 :     old = MemoryContextSwitchTo(data->context);
     488             : 
     489      310746 :     OutputPluginPrepareWrite(ctx, true);
     490             : 
     491      310746 :     appendStringInfoString(ctx->out, "table ");
     492      310746 :     appendStringInfoString(ctx->out,
     493      310746 :                            quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
     494      310746 :                                                       class_form->relrewrite ?
     495           2 :                                                       get_rel_name(class_form->relrewrite) :
     496             :                                                       NameStr(class_form->relname)));
     497      310746 :     appendStringInfoChar(ctx->out, ':');
     498             : 
     499      310746 :     switch (change->action)
     500             :     {
     501      275672 :         case REORDER_BUFFER_CHANGE_INSERT:
     502      275672 :             appendStringInfoString(ctx->out, " INSERT:");
     503      275672 :             if (change->data.tp.newtuple == NULL)
     504           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     505             :             else
     506      275672 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     507      275672 :                                     &change->data.tp.newtuple->tuple,
     508             :                                     false);
     509      275672 :             break;
     510       15042 :         case REORDER_BUFFER_CHANGE_UPDATE:
     511       15042 :             appendStringInfoString(ctx->out, " UPDATE:");
     512       15042 :             if (change->data.tp.oldtuple != NULL)
     513             :             {
     514          34 :                 appendStringInfoString(ctx->out, " old-key:");
     515          34 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     516          34 :                                     &change->data.tp.oldtuple->tuple,
     517             :                                     true);
     518          34 :                 appendStringInfoString(ctx->out, " new-tuple:");
     519             :             }
     520             : 
     521       15042 :             if (change->data.tp.newtuple == NULL)
     522           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     523             :             else
     524       15042 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     525       15042 :                                     &change->data.tp.newtuple->tuple,
     526             :                                     false);
     527       15042 :             break;
     528       20032 :         case REORDER_BUFFER_CHANGE_DELETE:
     529       20032 :             appendStringInfoString(ctx->out, " DELETE:");
     530             : 
     531             :             /* if there was no PK, we only know that a delete happened */
     532       20032 :             if (change->data.tp.oldtuple == NULL)
     533       10014 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     534             :             /* In DELETE, only the replica identity is present; display that */
     535             :             else
     536       10018 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     537       10018 :                                     &change->data.tp.oldtuple->tuple,
     538             :                                     true);
     539       20032 :             break;
     540      310746 :         default:
     541             :             Assert(false);
     542             :     }
     543             : 
     544      310746 :     MemoryContextSwitchTo(old);
     545      310746 :     MemoryContextReset(data->context);
     546             : 
     547      310746 :     OutputPluginWrite(ctx, true);
     548      310746 : }
     549             : 
     550             : static void
     551           6 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     552             :                    int nrelations, Relation relations[], ReorderBufferChange *change)
     553             : {
     554             :     TestDecodingData *data;
     555             :     TestDecodingTxnData *txndata;
     556             :     MemoryContext old;
     557             :     int         i;
     558             : 
     559           6 :     data = ctx->output_plugin_private;
     560           6 :     txndata = txn->output_plugin_private;
     561             : 
     562             :     /* output BEGIN if we haven't yet */
     563           6 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     564             :     {
     565           6 :         pg_output_begin(ctx, data, txn, false);
     566             :     }
     567           6 :     txndata->xact_wrote_changes = true;
     568             : 
     569             :     /* Avoid leaking memory by using and resetting our own context */
     570           6 :     old = MemoryContextSwitchTo(data->context);
     571             : 
     572           6 :     OutputPluginPrepareWrite(ctx, true);
     573             : 
     574           6 :     appendStringInfoString(ctx->out, "table ");
     575             : 
     576          14 :     for (i = 0; i < nrelations; i++)
     577             :     {
     578           8 :         if (i > 0)
     579           2 :             appendStringInfoString(ctx->out, ", ");
     580             : 
     581           8 :         appendStringInfoString(ctx->out,
     582           8 :                                quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
     583           8 :                                                           NameStr(relations[i]->rd_rel->relname)));
     584             :     }
     585             : 
     586           6 :     appendStringInfoString(ctx->out, ": TRUNCATE:");
     587             : 
     588           6 :     if (change->data.truncate.restart_seqs
     589           4 :         || change->data.truncate.cascade)
     590             :     {
     591           2 :         if (change->data.truncate.restart_seqs)
     592           2 :             appendStringInfoString(ctx->out, " restart_seqs");
     593           4 :         if (change->data.truncate.cascade)
     594           2 :             appendStringInfoString(ctx->out, " cascade");
     595             :     }
     596             :     else
     597           4 :         appendStringInfoString(ctx->out, " (no-flags)");
     598             : 
     599           6 :     MemoryContextSwitchTo(old);
     600           6 :     MemoryContextReset(data->context);
     601             : 
     602           6 :     OutputPluginWrite(ctx, true);
     603           6 : }
     604             : 
     605             : static void
     606          16 : pg_decode_message(LogicalDecodingContext *ctx,
     607             :                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     608             :                   const char *prefix, Size sz, const char *message)
     609             : {
     610          16 :     OutputPluginPrepareWrite(ctx, true);
     611          16 :     appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
     612             :                      transactional, prefix, sz);
     613          16 :     appendBinaryStringInfo(ctx->out, message, sz);
     614          16 :     OutputPluginWrite(ctx, true);
     615          16 : }
     616             : 
     617             : static void
     618          12 : pg_decode_stream_start(LogicalDecodingContext *ctx,
     619             :                        ReorderBufferTXN *txn)
     620             : {
     621          12 :     TestDecodingData *data = ctx->output_plugin_private;
     622          12 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     623             : 
     624             :     /*
     625             :      * Allocate the txn plugin data for the first stream in the transaction.
     626             :      */
     627          12 :     if (txndata == NULL)
     628             :     {
     629             :         txndata =
     630           8 :             MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     631           8 :         txndata->xact_wrote_changes = false;
     632           8 :         txn->output_plugin_private = txndata;
     633             :     }
     634             : 
     635          12 :     txndata->stream_wrote_changes = false;
     636          12 :     if (data->skip_empty_xacts)
     637          12 :         return;
     638           0 :     pg_output_stream_start(ctx, data, txn, true);
     639             : }
     640             : 
     641             : static void
     642           6 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     643             : {
     644           6 :     OutputPluginPrepareWrite(ctx, last_write);
     645           6 :     if (data->include_xids)
     646           0 :         appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
     647             :     else
     648           6 :         appendStringInfoString(ctx->out, "opening a streamed block for transaction");
     649           6 :     OutputPluginWrite(ctx, last_write);
     650           6 : }
     651             : 
     652             : static void
     653          12 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
     654             :                       ReorderBufferTXN *txn)
     655             : {
     656          12 :     TestDecodingData *data = ctx->output_plugin_private;
     657          12 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     658             : 
     659          12 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     660           6 :         return;
     661             : 
     662           6 :     OutputPluginPrepareWrite(ctx, true);
     663           6 :     if (data->include_xids)
     664           0 :         appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
     665             :     else
     666           6 :         appendStringInfoString(ctx->out, "closing a streamed block for transaction");
     667           6 :     OutputPluginWrite(ctx, true);
     668             : }
     669             : 
     670             : static void
     671           2 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
     672             :                        ReorderBufferTXN *txn,
     673             :                        XLogRecPtr abort_lsn)
     674             : {
     675           2 :     TestDecodingData *data = ctx->output_plugin_private;
     676             : 
     677             :     /*
     678             :      * stream abort can be sent for an individual subtransaction but we
     679             :      * maintain the output_plugin_private only under the toptxn so if this is
     680             :      * not the toptxn then fetch the toptxn.
     681             :      */
     682           2 :     ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
     683           2 :     TestDecodingTxnData *txndata = toptxn->output_plugin_private;
     684           2 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     685             : 
     686           2 :     if (txn->toptxn == NULL)
     687             :     {
     688             :         Assert(txn->output_plugin_private != NULL);
     689           0 :         pfree(txndata);
     690           0 :         txn->output_plugin_private = NULL;
     691             :     }
     692             : 
     693           2 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     694           2 :         return;
     695             : 
     696           0 :     OutputPluginPrepareWrite(ctx, true);
     697           0 :     if (data->include_xids)
     698           0 :         appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
     699             :     else
     700           0 :         appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
     701           0 :     OutputPluginWrite(ctx, true);
     702             : }
     703             : 
     704             : static void
     705           6 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
     706             :                         ReorderBufferTXN *txn,
     707             :                         XLogRecPtr commit_lsn)
     708             : {
     709           6 :     TestDecodingData *data = ctx->output_plugin_private;
     710           6 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     711           6 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     712             : 
     713           6 :     pfree(txndata);
     714           6 :     txn->output_plugin_private = NULL;
     715             : 
     716           6 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     717           0 :         return;
     718             : 
     719           6 :     OutputPluginPrepareWrite(ctx, true);
     720             : 
     721           6 :     if (data->include_xids)
     722           0 :         appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
     723             :     else
     724           6 :         appendStringInfoString(ctx->out, "committing streamed transaction");
     725             : 
     726           6 :     if (data->include_timestamp)
     727           0 :         appendStringInfo(ctx->out, " (at %s)",
     728             :                          timestamptz_to_str(txn->commit_time));
     729             : 
     730           6 :     OutputPluginWrite(ctx, true);
     731             : }
     732             : 
     733             : /*
     734             :  * In streaming mode, we don't display the changes as the transaction can abort
     735             :  * at a later point in time.  We don't want users to see the changes until the
     736             :  * transaction is committed.
     737             :  */
     738             : static void
     739          62 : pg_decode_stream_change(LogicalDecodingContext *ctx,
     740             :                         ReorderBufferTXN *txn,
     741             :                         Relation relation,
     742             :                         ReorderBufferChange *change)
     743             : {
     744          62 :     TestDecodingData *data = ctx->output_plugin_private;
     745          62 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     746             : 
     747             :     /* output stream start if we haven't yet */
     748          62 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     749             :     {
     750           6 :         pg_output_stream_start(ctx, data, txn, false);
     751             :     }
     752          62 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     753             : 
     754          62 :     OutputPluginPrepareWrite(ctx, true);
     755          62 :     if (data->include_xids)
     756           0 :         appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
     757             :     else
     758          62 :         appendStringInfoString(ctx->out, "streaming change for transaction");
     759          62 :     OutputPluginWrite(ctx, true);
     760          62 : }
     761             : 
     762             : /*
     763             :  * In streaming mode, we don't display the contents for transactional messages
     764             :  * as the transaction can abort at a later point in time.  We don't want users to
     765             :  * see the message contents until the transaction is committed.
     766             :  */
     767             : static void
     768           2 : pg_decode_stream_message(LogicalDecodingContext *ctx,
     769             :                          ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     770             :                          const char *prefix, Size sz, const char *message)
     771             : {
     772           2 :     OutputPluginPrepareWrite(ctx, true);
     773             : 
     774           2 :     if (transactional)
     775             :     {
     776           2 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
     777             :                          transactional, prefix, sz);
     778             :     }
     779             :     else
     780             :     {
     781           0 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
     782             :                          transactional, prefix, sz);
     783           0 :         appendBinaryStringInfo(ctx->out, message, sz);
     784             :     }
     785             : 
     786           2 :     OutputPluginWrite(ctx, true);
     787           2 : }
     788             : 
     789             : /*
     790             :  * In streaming mode, we don't display the detailed information of Truncate.
     791             :  * See pg_decode_stream_change.
     792             :  */
     793             : static void
     794           0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     795             :                           int nrelations, Relation relations[],
     796             :                           ReorderBufferChange *change)
     797             : {
     798           0 :     TestDecodingData *data = ctx->output_plugin_private;
     799           0 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     800             : 
     801           0 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     802             :     {
     803           0 :         pg_output_stream_start(ctx, data, txn, false);
     804             :     }
     805           0 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     806             : 
     807           0 :     OutputPluginPrepareWrite(ctx, true);
     808           0 :     if (data->include_xids)
     809           0 :         appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
     810             :     else
     811           0 :         appendStringInfoString(ctx->out, "streaming truncate for transaction");
     812           0 :     OutputPluginWrite(ctx, true);
     813           0 : }

Generated by: LCOV version 1.13