LCOV - code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.0 % 408 351
Test Date: 2026-03-03 13:15:30 Functions: 96.4 % 28 27
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * test_decoding.c
       4              :  *        example logical decoding output plugin
       5              :  *
       6              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *        contrib/test_decoding/test_decoding.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #include "catalog/pg_type.h"
      16              : 
      17              : #include "replication/logical.h"
      18              : #include "replication/origin.h"
      19              : 
      20              : #include "utils/builtins.h"
      21              : #include "utils/lsyscache.h"
      22              : #include "utils/memutils.h"
      23              : #include "utils/rel.h"
      24              : 
      25          127 : PG_MODULE_MAGIC_EXT(
      26              :                     .name = "test_decoding",
      27              :                     .version = PG_VERSION
      28              : );
      29              : 
      30              : typedef struct
      31              : {
      32              :     MemoryContext context;
      33              :     bool        include_xids;
      34              :     bool        include_timestamp;
      35              :     bool        skip_empty_xacts;
      36              :     bool        only_local;
      37              : } TestDecodingData;
      38              : 
      39              : /*
      40              :  * Maintain the per-transaction level variables to track whether the
      41              :  * transaction and or streams have written any changes. In streaming mode the
      42              :  * transaction can be decoded in streams so along with maintaining whether the
      43              :  * transaction has written any changes, we also need to track whether the
      44              :  * current stream has written any changes. This is required so that if user
      45              :  * has requested to skip the empty transactions we can skip the empty streams
      46              :  * even though the transaction has written some changes.
      47              :  */
      48              : typedef struct
      49              : {
      50              :     bool        xact_wrote_changes;
      51              :     bool        stream_wrote_changes;
      52              : } TestDecodingTxnData;
      53              : 
      54              : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      55              :                               bool is_init);
      56              : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
      57              : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
      58              :                                 ReorderBufferTXN *txn);
      59              : static void pg_output_begin(LogicalDecodingContext *ctx,
      60              :                             TestDecodingData *data,
      61              :                             ReorderBufferTXN *txn,
      62              :                             bool last_write);
      63              : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
      64              :                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      65              : static void pg_decode_change(LogicalDecodingContext *ctx,
      66              :                              ReorderBufferTXN *txn, Relation relation,
      67              :                              ReorderBufferChange *change);
      68              : static void pg_decode_truncate(LogicalDecodingContext *ctx,
      69              :                                ReorderBufferTXN *txn,
      70              :                                int nrelations, Relation relations[],
      71              :                                ReorderBufferChange *change);
      72              : static bool pg_decode_filter(LogicalDecodingContext *ctx,
      73              :                              ReplOriginId origin_id);
      74              : static void pg_decode_message(LogicalDecodingContext *ctx,
      75              :                               ReorderBufferTXN *txn, XLogRecPtr lsn,
      76              :                               bool transactional, const char *prefix,
      77              :                               Size sz, const char *message);
      78              : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
      79              :                                      TransactionId xid,
      80              :                                      const char *gid);
      81              : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
      82              :                                         ReorderBufferTXN *txn);
      83              : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
      84              :                                   ReorderBufferTXN *txn,
      85              :                                   XLogRecPtr prepare_lsn);
      86              : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
      87              :                                           ReorderBufferTXN *txn,
      88              :                                           XLogRecPtr commit_lsn);
      89              : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
      90              :                                             ReorderBufferTXN *txn,
      91              :                                             XLogRecPtr prepare_end_lsn,
      92              :                                             TimestampTz prepare_time);
      93              : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
      94              :                                    ReorderBufferTXN *txn);
      95              : static void pg_output_stream_start(LogicalDecodingContext *ctx,
      96              :                                    TestDecodingData *data,
      97              :                                    ReorderBufferTXN *txn,
      98              :                                    bool last_write);
      99              : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
     100              :                                   ReorderBufferTXN *txn);
     101              : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
     102              :                                    ReorderBufferTXN *txn,
     103              :                                    XLogRecPtr abort_lsn);
     104              : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     105              :                                      ReorderBufferTXN *txn,
     106              :                                      XLogRecPtr prepare_lsn);
     107              : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
     108              :                                     ReorderBufferTXN *txn,
     109              :                                     XLogRecPtr commit_lsn);
     110              : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
     111              :                                     ReorderBufferTXN *txn,
     112              :                                     Relation relation,
     113              :                                     ReorderBufferChange *change);
     114              : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
     115              :                                      ReorderBufferTXN *txn, XLogRecPtr lsn,
     116              :                                      bool transactional, const char *prefix,
     117              :                                      Size sz, const char *message);
     118              : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
     119              :                                       ReorderBufferTXN *txn,
     120              :                                       int nrelations, Relation relations[],
     121              :                                       ReorderBufferChange *change);
     122              : 
     123              : void
     124          127 : _PG_init(void)
     125              : {
     126              :     /* other plugins can perform things here */
     127          127 : }
     128              : 
     129              : /* specify output plugin callbacks */
     130              : void
     131          357 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     132              : {
     133          357 :     cb->startup_cb = pg_decode_startup;
     134          357 :     cb->begin_cb = pg_decode_begin_txn;
     135          357 :     cb->change_cb = pg_decode_change;
     136          357 :     cb->truncate_cb = pg_decode_truncate;
     137          357 :     cb->commit_cb = pg_decode_commit_txn;
     138          357 :     cb->filter_by_origin_cb = pg_decode_filter;
     139          357 :     cb->shutdown_cb = pg_decode_shutdown;
     140          357 :     cb->message_cb = pg_decode_message;
     141          357 :     cb->filter_prepare_cb = pg_decode_filter_prepare;
     142          357 :     cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
     143          357 :     cb->prepare_cb = pg_decode_prepare_txn;
     144          357 :     cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
     145          357 :     cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
     146          357 :     cb->stream_start_cb = pg_decode_stream_start;
     147          357 :     cb->stream_stop_cb = pg_decode_stream_stop;
     148          357 :     cb->stream_abort_cb = pg_decode_stream_abort;
     149          357 :     cb->stream_prepare_cb = pg_decode_stream_prepare;
     150          357 :     cb->stream_commit_cb = pg_decode_stream_commit;
     151          357 :     cb->stream_change_cb = pg_decode_stream_change;
     152          357 :     cb->stream_message_cb = pg_decode_stream_message;
     153          357 :     cb->stream_truncate_cb = pg_decode_stream_truncate;
     154          357 : }
     155              : 
     156              : 
     157              : /* initialize this plugin */
     158              : static void
     159          357 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     160              :                   bool is_init)
     161              : {
     162              :     ListCell   *option;
     163              :     TestDecodingData *data;
     164          357 :     bool        enable_streaming = false;
     165              : 
     166          357 :     data = palloc0_object(TestDecodingData);
     167          357 :     data->context = AllocSetContextCreate(ctx->context,
     168              :                                           "text conversion context",
     169              :                                           ALLOCSET_DEFAULT_SIZES);
     170          357 :     data->include_xids = true;
     171          357 :     data->include_timestamp = false;
     172          357 :     data->skip_empty_xacts = false;
     173          357 :     data->only_local = false;
     174              : 
     175          357 :     ctx->output_plugin_private = data;
     176              : 
     177          357 :     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
     178          357 :     opt->receive_rewrites = false;
     179              : 
     180          727 :     foreach(option, ctx->output_plugin_options)
     181              :     {
     182          373 :         DefElem    *elem = lfirst(option);
     183              : 
     184              :         Assert(elem->arg == NULL || IsA(elem->arg, String));
     185              : 
     186          373 :         if (strcmp(elem->defname, "include-xids") == 0)
     187              :         {
     188              :             /* if option does not provide a value, it means its value is true */
     189          176 :             if (elem->arg == NULL)
     190            0 :                 data->include_xids = true;
     191          176 :             else if (!parse_bool(strVal(elem->arg), &data->include_xids))
     192            2 :                 ereport(ERROR,
     193              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     194              :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     195              :                                 strVal(elem->arg), elem->defname)));
     196              :         }
     197          197 :         else if (strcmp(elem->defname, "include-timestamp") == 0)
     198              :         {
     199            1 :             if (elem->arg == NULL)
     200            0 :                 data->include_timestamp = true;
     201            1 :             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
     202            0 :                 ereport(ERROR,
     203              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     204              :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     205              :                                 strVal(elem->arg), elem->defname)));
     206              :         }
     207          196 :         else if (strcmp(elem->defname, "force-binary") == 0)
     208              :         {
     209              :             bool        force_binary;
     210              : 
     211            6 :             if (elem->arg == NULL)
     212            0 :                 continue;
     213            6 :             else if (!parse_bool(strVal(elem->arg), &force_binary))
     214            0 :                 ereport(ERROR,
     215              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     216              :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     217              :                                 strVal(elem->arg), elem->defname)));
     218              : 
     219            6 :             if (force_binary)
     220            2 :                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     221              :         }
     222          190 :         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
     223              :         {
     224              : 
     225          173 :             if (elem->arg == NULL)
     226            0 :                 data->skip_empty_xacts = true;
     227          173 :             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
     228            0 :                 ereport(ERROR,
     229              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     230              :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     231              :                                 strVal(elem->arg), elem->defname)));
     232              :         }
     233           17 :         else if (strcmp(elem->defname, "only-local") == 0)
     234              :         {
     235              : 
     236            3 :             if (elem->arg == NULL)
     237            0 :                 data->only_local = true;
     238            3 :             else if (!parse_bool(strVal(elem->arg), &data->only_local))
     239            0 :                 ereport(ERROR,
     240              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     241              :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     242              :                                 strVal(elem->arg), elem->defname)));
     243              :         }
     244           14 :         else if (strcmp(elem->defname, "include-rewrites") == 0)
     245              :         {
     246              : 
     247            1 :             if (elem->arg == NULL)
     248            0 :                 continue;
     249            1 :             else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
     250            0 :                 ereport(ERROR,
     251              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     252              :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     253              :                                 strVal(elem->arg), elem->defname)));
     254              :         }
     255           13 :         else if (strcmp(elem->defname, "stream-changes") == 0)
     256              :         {
     257           12 :             if (elem->arg == NULL)
     258            0 :                 continue;
     259           12 :             else if (!parse_bool(strVal(elem->arg), &enable_streaming))
     260            0 :                 ereport(ERROR,
     261              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     262              :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     263              :                                 strVal(elem->arg), elem->defname)));
     264              :         }
     265              :         else
     266              :         {
     267            1 :             ereport(ERROR,
     268              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     269              :                      errmsg("option \"%s\" = \"%s\" is unknown",
     270              :                             elem->defname,
     271              :                             elem->arg ? strVal(elem->arg) : "(null)")));
     272              :         }
     273              :     }
     274              : 
     275          354 :     ctx->streaming &= enable_streaming;
     276          354 : }
     277              : 
     278              : /* cleanup this plugin's resources */
     279              : static void
     280          343 : pg_decode_shutdown(LogicalDecodingContext *ctx)
     281              : {
     282          343 :     TestDecodingData *data = ctx->output_plugin_private;
     283              : 
     284              :     /* cleanup our own resources via memory context reset */
     285          343 :     MemoryContextDelete(data->context);
     286          343 : }
     287              : 
     288              : /* BEGIN callback */
     289              : static void
     290          454 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     291              : {
     292          454 :     TestDecodingData *data = ctx->output_plugin_private;
     293              :     TestDecodingTxnData *txndata =
     294          454 :         MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     295              : 
     296          454 :     txndata->xact_wrote_changes = false;
     297          454 :     txn->output_plugin_private = txndata;
     298              : 
     299              :     /*
     300              :      * If asked to skip empty transactions, we'll emit BEGIN at the point
     301              :      * where the first operation is received for this transaction.
     302              :      */
     303          454 :     if (data->skip_empty_xacts)
     304          406 :         return;
     305              : 
     306           48 :     pg_output_begin(ctx, data, txn, true);
     307              : }
     308              : 
     309              : static void
     310          283 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     311              : {
     312          283 :     OutputPluginPrepareWrite(ctx, last_write);
     313          283 :     if (data->include_xids)
     314           44 :         appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
     315              :     else
     316          239 :         appendStringInfoString(ctx->out, "BEGIN");
     317          283 :     OutputPluginWrite(ctx, last_write);
     318          283 : }
     319              : 
     320              : /* COMMIT callback */
     321              : static void
     322          454 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     323              :                      XLogRecPtr commit_lsn)
     324              : {
     325          454 :     TestDecodingData *data = ctx->output_plugin_private;
     326          454 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     327          454 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     328              : 
     329          454 :     pfree(txndata);
     330          454 :     txn->output_plugin_private = NULL;
     331              : 
     332          454 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     333          179 :         return;
     334              : 
     335          275 :     OutputPluginPrepareWrite(ctx, true);
     336          275 :     if (data->include_xids)
     337           43 :         appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
     338              :     else
     339          232 :         appendStringInfoString(ctx->out, "COMMIT");
     340              : 
     341          275 :     if (data->include_timestamp)
     342            1 :         appendStringInfo(ctx->out, " (at %s)",
     343              :                          timestamptz_to_str(txn->commit_time));
     344              : 
     345          275 :     OutputPluginWrite(ctx, true);
     346              : }
     347              : 
     348              : /* BEGIN PREPARE callback */
     349              : static void
     350            9 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     351              : {
     352            9 :     TestDecodingData *data = ctx->output_plugin_private;
     353              :     TestDecodingTxnData *txndata =
     354            9 :         MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     355              : 
     356            9 :     txndata->xact_wrote_changes = false;
     357            9 :     txn->output_plugin_private = txndata;
     358              : 
     359              :     /*
     360              :      * If asked to skip empty transactions, we'll emit BEGIN at the point
     361              :      * where the first operation is received for this transaction.
     362              :      */
     363            9 :     if (data->skip_empty_xacts)
     364            8 :         return;
     365              : 
     366            1 :     pg_output_begin(ctx, data, txn, true);
     367              : }
     368              : 
     369              : /* PREPARE callback */
     370              : static void
     371            9 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     372              :                       XLogRecPtr prepare_lsn)
     373              : {
     374            9 :     TestDecodingData *data = ctx->output_plugin_private;
     375            9 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     376              : 
     377              :     /*
     378              :      * If asked to skip empty transactions, we'll emit PREPARE at the point
     379              :      * where the first operation is received for this transaction.
     380              :      */
     381            9 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     382            1 :         return;
     383              : 
     384            8 :     OutputPluginPrepareWrite(ctx, true);
     385              : 
     386            8 :     appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
     387            8 :                      quote_literal_cstr(txn->gid));
     388              : 
     389            8 :     if (data->include_xids)
     390            1 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     391              : 
     392            8 :     if (data->include_timestamp)
     393            0 :         appendStringInfo(ctx->out, " (at %s)",
     394              :                          timestamptz_to_str(txn->prepare_time));
     395              : 
     396            8 :     OutputPluginWrite(ctx, true);
     397              : }
     398              : 
     399              : /* COMMIT PREPARED callback */
     400              : static void
     401            8 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     402              :                               XLogRecPtr commit_lsn)
     403              : {
     404            8 :     TestDecodingData *data = ctx->output_plugin_private;
     405              : 
     406            8 :     OutputPluginPrepareWrite(ctx, true);
     407              : 
     408            8 :     appendStringInfo(ctx->out, "COMMIT PREPARED %s",
     409            8 :                      quote_literal_cstr(txn->gid));
     410              : 
     411            8 :     if (data->include_xids)
     412            1 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     413              : 
     414            8 :     if (data->include_timestamp)
     415            0 :         appendStringInfo(ctx->out, " (at %s)",
     416              :                          timestamptz_to_str(txn->commit_time));
     417              : 
     418            8 :     OutputPluginWrite(ctx, true);
     419            8 : }
     420              : 
     421              : /* ROLLBACK PREPARED callback */
     422              : static void
     423            2 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
     424              :                                 ReorderBufferTXN *txn,
     425              :                                 XLogRecPtr prepare_end_lsn,
     426              :                                 TimestampTz prepare_time)
     427              : {
     428            2 :     TestDecodingData *data = ctx->output_plugin_private;
     429              : 
     430            2 :     OutputPluginPrepareWrite(ctx, true);
     431              : 
     432            2 :     appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
     433            2 :                      quote_literal_cstr(txn->gid));
     434              : 
     435            2 :     if (data->include_xids)
     436            0 :         appendStringInfo(ctx->out, ", txid %u", txn->xid);
     437              : 
     438            2 :     if (data->include_timestamp)
     439            0 :         appendStringInfo(ctx->out, " (at %s)",
     440              :                          timestamptz_to_str(txn->commit_time));
     441              : 
     442            2 :     OutputPluginWrite(ctx, true);
     443            2 : }
     444              : 
     445              : /*
     446              :  * Filter out two-phase transactions.
     447              :  *
     448              :  * Each plugin can implement its own filtering logic. Here we demonstrate a
     449              :  * simple logic by checking the GID. If the GID contains the "_nodecode"
     450              :  * substring, then we filter it out.
     451              :  */
     452              : static bool
     453          148 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
     454              :                          const char *gid)
     455              : {
     456          148 :     if (strstr(gid, "_nodecode") != NULL)
     457           12 :         return true;
     458              : 
     459          136 :     return false;
     460              : }
     461              : 
     462              : static bool
     463       990765 : pg_decode_filter(LogicalDecodingContext *ctx,
     464              :                  ReplOriginId origin_id)
     465              : {
     466       990765 :     TestDecodingData *data = ctx->output_plugin_private;
     467              : 
     468       990765 :     if (data->only_local && origin_id != InvalidReplOriginId)
     469            9 :         return true;
     470       990756 :     return false;
     471              : }
     472              : 
     473              : /*
     474              :  * Print literal `outputstr' already represented as string of type `typid'
     475              :  * into stringbuf `s'.
     476              :  *
     477              :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done
     478              :  * per standard SQL rules.
     479              :  */
     480              : static void
     481       176334 : print_literal(StringInfo s, Oid typid, char *outputstr)
     482              : {
     483              :     const char *valptr;
     484              : 
     485       176334 :     switch (typid)
     486              :     {
     487        60451 :         case INT2OID:
     488              :         case INT4OID:
     489              :         case INT8OID:
     490              :         case OIDOID:
     491              :         case FLOAT4OID:
     492              :         case FLOAT8OID:
     493              :         case NUMERICOID:
     494              :             /* NB: We don't care about Inf, NaN et al. */
     495        60451 :             appendStringInfoString(s, outputstr);
     496        60451 :             break;
     497              : 
     498            0 :         case BITOID:
     499              :         case VARBITOID:
     500            0 :             appendStringInfo(s, "B'%s'", outputstr);
     501            0 :             break;
     502              : 
     503            0 :         case BOOLOID:
     504            0 :             if (strcmp(outputstr, "t") == 0)
     505            0 :                 appendStringInfoString(s, "true");
     506              :             else
     507            0 :                 appendStringInfoString(s, "false");
     508            0 :             break;
     509              : 
     510       115883 :         default:
     511       115883 :             appendStringInfoChar(s, '\'');
     512      5427748 :             for (valptr = outputstr; *valptr; valptr++)
     513              :             {
     514      5311865 :                 char        ch = *valptr;
     515              : 
     516      5311865 :                 if (SQL_STR_DOUBLE(ch, false))
     517           64 :                     appendStringInfoChar(s, ch);
     518      5311865 :                 appendStringInfoChar(s, ch);
     519              :             }
     520       115883 :             appendStringInfoChar(s, '\'');
     521       115883 :             break;
     522              :     }
     523       176334 : }
     524              : 
     525              : /* print the tuple 'tuple' into the StringInfo s */
     526              : static void
     527       145763 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
     528              : {
     529              :     int         natt;
     530              : 
     531              :     /* print all columns individually */
     532       347854 :     for (natt = 0; natt < tupdesc->natts; natt++)
     533              :     {
     534              :         Form_pg_attribute attr; /* the attribute itself */
     535              :         Oid         typid;      /* type of current attribute */
     536              :         Oid         typoutput;  /* output function */
     537              :         bool        typisvarlena;
     538              :         Datum       origval;    /* possibly toasted Datum */
     539              :         bool        isnull;     /* column is null? */
     540              : 
     541       202091 :         attr = TupleDescAttr(tupdesc, natt);
     542              : 
     543              :         /*
     544              :          * don't print dropped columns, we can't be sure everything is
     545              :          * available for them
     546              :          */
     547       202091 :         if (attr->attisdropped)
     548         5130 :             continue;
     549              : 
     550              :         /*
     551              :          * Don't print system columns, oid will already have been printed if
     552              :          * present.
     553              :          */
     554       202019 :         if (attr->attnum < 0)
     555            0 :             continue;
     556              : 
     557       202019 :         typid = attr->atttypid;
     558              : 
     559              :         /* get Datum from tuple */
     560       202019 :         origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
     561              : 
     562       202019 :         if (isnull && skip_nulls)
     563         5058 :             continue;
     564              : 
     565              :         /* print attribute name */
     566       196961 :         appendStringInfoChar(s, ' ');
     567       196961 :         appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
     568              : 
     569              :         /* print attribute type */
     570       196961 :         appendStringInfoChar(s, '[');
     571       196961 :         appendStringInfoString(s, format_type_be(typid));
     572       196961 :         appendStringInfoChar(s, ']');
     573              : 
     574              :         /* query output function */
     575       196961 :         getTypeOutputInfo(typid,
     576              :                           &typoutput, &typisvarlena);
     577              : 
     578              :         /* print separator */
     579       196961 :         appendStringInfoChar(s, ':');
     580              : 
     581              :         /* print data */
     582       196961 :         if (isnull)
     583        20615 :             appendStringInfoString(s, "null");
     584       176346 :         else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
     585           12 :             appendStringInfoString(s, "unchanged-toast-datum");
     586       176334 :         else if (!typisvarlena)
     587        60455 :             print_literal(s, typid,
     588              :                           OidOutputFunctionCall(typoutput, origval));
     589              :         else
     590              :         {
     591              :             Datum       val;    /* definitely detoasted Datum */
     592              : 
     593       115879 :             val = PointerGetDatum(PG_DETOAST_DATUM(origval));
     594       115879 :             print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
     595              :         }
     596              :     }
     597       145763 : }
     598              : 
     599              : /*
     600              :  * callback for individual changed tuples
     601              :  */
     602              : static void
     603       150752 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     604              :                  Relation relation, ReorderBufferChange *change)
     605              : {
     606              :     TestDecodingData *data;
     607              :     TestDecodingTxnData *txndata;
     608              :     Form_pg_class class_form;
     609              :     TupleDesc   tupdesc;
     610              :     MemoryContext old;
     611              : 
     612       150752 :     data = ctx->output_plugin_private;
     613       150752 :     txndata = txn->output_plugin_private;
     614              : 
     615              :     /* output BEGIN if we haven't yet */
     616       150752 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     617              :     {
     618          224 :         pg_output_begin(ctx, data, txn, false);
     619              :     }
     620       150752 :     txndata->xact_wrote_changes = true;
     621              : 
     622       150752 :     class_form = RelationGetForm(relation);
     623       150752 :     tupdesc = RelationGetDescr(relation);
     624              : 
     625              :     /* Avoid leaking memory by using and resetting our own context */
     626       150752 :     old = MemoryContextSwitchTo(data->context);
     627              : 
     628       150752 :     OutputPluginPrepareWrite(ctx, true);
     629              : 
     630       150752 :     appendStringInfoString(ctx->out, "table ");
     631       150752 :     appendStringInfoString(ctx->out,
     632       150752 :                            quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
     633       150752 :                                                       class_form->relrewrite ?
     634            1 :                                                       get_rel_name(class_form->relrewrite) :
     635              :                                                       NameStr(class_form->relname)));
     636       150752 :     appendStringInfoChar(ctx->out, ':');
     637              : 
     638       150752 :     switch (change->action)
     639              :     {
     640       133187 :         case REORDER_BUFFER_CHANGE_INSERT:
     641       133187 :             appendStringInfoString(ctx->out, " INSERT:");
     642       133187 :             if (change->data.tp.newtuple == NULL)
     643            0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     644              :             else
     645       133187 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     646              :                                     change->data.tp.newtuple,
     647              :                                     false);
     648       133187 :             break;
     649         7543 :         case REORDER_BUFFER_CHANGE_UPDATE:
     650         7543 :             appendStringInfoString(ctx->out, " UPDATE:");
     651         7543 :             if (change->data.tp.oldtuple != NULL)
     652              :             {
     653           18 :                 appendStringInfoString(ctx->out, " old-key:");
     654           18 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     655              :                                     change->data.tp.oldtuple,
     656              :                                     true);
     657           18 :                 appendStringInfoString(ctx->out, " new-tuple:");
     658              :             }
     659              : 
     660         7543 :             if (change->data.tp.newtuple == NULL)
     661            0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     662              :             else
     663         7543 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     664              :                                     change->data.tp.newtuple,
     665              :                                     false);
     666         7543 :             break;
     667        10022 :         case REORDER_BUFFER_CHANGE_DELETE:
     668        10022 :             appendStringInfoString(ctx->out, " DELETE:");
     669              : 
     670              :             /* if there was no PK, we only know that a delete happened */
     671        10022 :             if (change->data.tp.oldtuple == NULL)
     672         5007 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     673              :             /* In DELETE, only the replica identity is present; display that */
     674              :             else
     675         5015 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     676              :                                     change->data.tp.oldtuple,
     677              :                                     true);
     678        10022 :             break;
     679       150752 :         default:
     680              :             Assert(false);
     681              :     }
     682              : 
     683       150752 :     MemoryContextSwitchTo(old);
     684       150752 :     MemoryContextReset(data->context);
     685              : 
     686       150752 :     OutputPluginWrite(ctx, true);
     687       150752 : }
     688              : 
     689              : static void
     690            8 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     691              :                    int nrelations, Relation relations[], ReorderBufferChange *change)
     692              : {
     693              :     TestDecodingData *data;
     694              :     TestDecodingTxnData *txndata;
     695              :     MemoryContext old;
     696              :     int         i;
     697              : 
     698            8 :     data = ctx->output_plugin_private;
     699            8 :     txndata = txn->output_plugin_private;
     700              : 
     701              :     /* output BEGIN if we haven't yet */
     702            8 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     703              :     {
     704            7 :         pg_output_begin(ctx, data, txn, false);
     705              :     }
     706            8 :     txndata->xact_wrote_changes = true;
     707              : 
     708              :     /* Avoid leaking memory by using and resetting our own context */
     709            8 :     old = MemoryContextSwitchTo(data->context);
     710              : 
     711            8 :     OutputPluginPrepareWrite(ctx, true);
     712              : 
     713            8 :     appendStringInfoString(ctx->out, "table ");
     714              : 
     715           17 :     for (i = 0; i < nrelations; i++)
     716              :     {
     717            9 :         if (i > 0)
     718            1 :             appendStringInfoString(ctx->out, ", ");
     719              : 
     720            9 :         appendStringInfoString(ctx->out,
     721            9 :                                quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
     722            9 :                                                           NameStr(relations[i]->rd_rel->relname)));
     723              :     }
     724              : 
     725            8 :     appendStringInfoString(ctx->out, ": TRUNCATE:");
     726              : 
     727            8 :     if (change->data.truncate.restart_seqs
     728            7 :         || change->data.truncate.cascade)
     729              :     {
     730            1 :         if (change->data.truncate.restart_seqs)
     731            1 :             appendStringInfoString(ctx->out, " restart_seqs");
     732            1 :         if (change->data.truncate.cascade)
     733            1 :             appendStringInfoString(ctx->out, " cascade");
     734              :     }
     735              :     else
     736            7 :         appendStringInfoString(ctx->out, " (no-flags)");
     737              : 
     738            8 :     MemoryContextSwitchTo(old);
     739            8 :     MemoryContextReset(data->context);
     740              : 
     741            8 :     OutputPluginWrite(ctx, true);
     742            8 : }
     743              : 
     744              : static void
     745           10 : pg_decode_message(LogicalDecodingContext *ctx,
     746              :                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     747              :                   const char *prefix, Size sz, const char *message)
     748              : {
     749           10 :     TestDecodingData *data = ctx->output_plugin_private;
     750              :     TestDecodingTxnData *txndata;
     751              : 
     752           10 :     txndata = transactional ? txn->output_plugin_private : NULL;
     753              : 
     754              :     /* output BEGIN if we haven't yet for transactional messages */
     755           10 :     if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
     756            3 :         pg_output_begin(ctx, data, txn, false);
     757              : 
     758           10 :     if (transactional)
     759            5 :         txndata->xact_wrote_changes = true;
     760              : 
     761           10 :     OutputPluginPrepareWrite(ctx, true);
     762           10 :     appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
     763              :                      transactional, prefix, sz);
     764           10 :     appendBinaryStringInfo(ctx->out, message, sz);
     765           10 :     OutputPluginWrite(ctx, true);
     766           10 : }
     767              : 
     768              : static void
     769           40 : pg_decode_stream_start(LogicalDecodingContext *ctx,
     770              :                        ReorderBufferTXN *txn)
     771              : {
     772           40 :     TestDecodingData *data = ctx->output_plugin_private;
     773           40 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     774              : 
     775              :     /*
     776              :      * Allocate the txn plugin data for the first stream in the transaction.
     777              :      */
     778           40 :     if (txndata == NULL)
     779              :     {
     780              :         txndata =
     781            9 :             MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
     782            9 :         txndata->xact_wrote_changes = false;
     783            9 :         txn->output_plugin_private = txndata;
     784              :     }
     785              : 
     786           40 :     txndata->stream_wrote_changes = false;
     787           40 :     if (data->skip_empty_xacts)
     788           40 :         return;
     789            0 :     pg_output_stream_start(ctx, data, txn, true);
     790              : }
     791              : 
     792              : static void
     793           11 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     794              : {
     795           11 :     OutputPluginPrepareWrite(ctx, last_write);
     796           11 :     if (data->include_xids)
     797            0 :         appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
     798              :     else
     799           11 :         appendStringInfoString(ctx->out, "opening a streamed block for transaction");
     800           11 :     OutputPluginWrite(ctx, last_write);
     801           11 : }
     802              : 
     803              : static void
     804           40 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
     805              :                       ReorderBufferTXN *txn)
     806              : {
     807           40 :     TestDecodingData *data = ctx->output_plugin_private;
     808           40 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     809              : 
     810           40 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     811           29 :         return;
     812              : 
     813           11 :     OutputPluginPrepareWrite(ctx, true);
     814           11 :     if (data->include_xids)
     815            0 :         appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
     816              :     else
     817           11 :         appendStringInfoString(ctx->out, "closing a streamed block for transaction");
     818           11 :     OutputPluginWrite(ctx, true);
     819              : }
     820              : 
     821              : static void
     822            4 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
     823              :                        ReorderBufferTXN *txn,
     824              :                        XLogRecPtr abort_lsn)
     825              : {
     826            4 :     TestDecodingData *data = ctx->output_plugin_private;
     827              : 
     828              :     /*
     829              :      * stream abort can be sent for an individual subtransaction but we
     830              :      * maintain the output_plugin_private only under the toptxn so if this is
     831              :      * not the toptxn then fetch the toptxn.
     832              :      */
     833            4 :     ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
     834            4 :     TestDecodingTxnData *txndata = toptxn->output_plugin_private;
     835            4 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     836              : 
     837            4 :     if (rbtxn_is_toptxn(txn))
     838              :     {
     839              :         Assert(txn->output_plugin_private != NULL);
     840            0 :         pfree(txndata);
     841            0 :         txn->output_plugin_private = NULL;
     842              :     }
     843              : 
     844            4 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     845            0 :         return;
     846              : 
     847            4 :     OutputPluginPrepareWrite(ctx, true);
     848            4 :     if (data->include_xids)
     849            0 :         appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
     850              :     else
     851            4 :         appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
     852            4 :     OutputPluginWrite(ctx, true);
     853              : }
     854              : 
     855              : static void
     856            1 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
     857              :                          ReorderBufferTXN *txn,
     858              :                          XLogRecPtr prepare_lsn)
     859              : {
     860            1 :     TestDecodingData *data = ctx->output_plugin_private;
     861            1 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     862              : 
     863            1 :     if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
     864            0 :         return;
     865              : 
     866            1 :     OutputPluginPrepareWrite(ctx, true);
     867              : 
     868            1 :     if (data->include_xids)
     869            0 :         appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
     870            0 :                          quote_literal_cstr(txn->gid), txn->xid);
     871              :     else
     872            1 :         appendStringInfo(ctx->out, "preparing streamed transaction %s",
     873            1 :                          quote_literal_cstr(txn->gid));
     874              : 
     875            1 :     if (data->include_timestamp)
     876            0 :         appendStringInfo(ctx->out, " (at %s)",
     877              :                          timestamptz_to_str(txn->prepare_time));
     878              : 
     879            1 :     OutputPluginWrite(ctx, true);
     880              : }
     881              : 
     882              : static void
     883            5 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
     884              :                         ReorderBufferTXN *txn,
     885              :                         XLogRecPtr commit_lsn)
     886              : {
     887            5 :     TestDecodingData *data = ctx->output_plugin_private;
     888            5 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     889            5 :     bool        xact_wrote_changes = txndata->xact_wrote_changes;
     890              : 
     891            5 :     pfree(txndata);
     892            5 :     txn->output_plugin_private = NULL;
     893              : 
     894            5 :     if (data->skip_empty_xacts && !xact_wrote_changes)
     895            0 :         return;
     896              : 
     897            5 :     OutputPluginPrepareWrite(ctx, true);
     898              : 
     899            5 :     if (data->include_xids)
     900            0 :         appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
     901              :     else
     902            5 :         appendStringInfoString(ctx->out, "committing streamed transaction");
     903              : 
     904            5 :     if (data->include_timestamp)
     905            0 :         appendStringInfo(ctx->out, " (at %s)",
     906              :                          timestamptz_to_str(txn->commit_time));
     907              : 
     908            5 :     OutputPluginWrite(ctx, true);
     909              : }
     910              : 
     911              : /*
     912              :  * In streaming mode, we don't display the changes as the transaction can abort
     913              :  * at a later point in time.  We don't want users to see the changes until the
     914              :  * transaction is committed.
     915              :  */
     916              : static void
     917           65 : pg_decode_stream_change(LogicalDecodingContext *ctx,
     918              :                         ReorderBufferTXN *txn,
     919              :                         Relation relation,
     920              :                         ReorderBufferChange *change)
     921              : {
     922           65 :     TestDecodingData *data = ctx->output_plugin_private;
     923           65 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     924              : 
     925              :     /* output stream start if we haven't yet */
     926           65 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     927              :     {
     928            8 :         pg_output_stream_start(ctx, data, txn, false);
     929              :     }
     930           65 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     931              : 
     932           65 :     OutputPluginPrepareWrite(ctx, true);
     933           65 :     if (data->include_xids)
     934            0 :         appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
     935              :     else
     936           65 :         appendStringInfoString(ctx->out, "streaming change for transaction");
     937           65 :     OutputPluginWrite(ctx, true);
     938           65 : }
     939              : 
     940              : /*
     941              :  * In streaming mode, we don't display the contents for transactional messages
     942              :  * as the transaction can abort at a later point in time.  We don't want users to
     943              :  * see the message contents until the transaction is committed.
     944              :  */
     945              : static void
     946            3 : pg_decode_stream_message(LogicalDecodingContext *ctx,
     947              :                          ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     948              :                          const char *prefix, Size sz, const char *message)
     949              : {
     950              :     /* Output stream start if we haven't yet for transactional messages. */
     951            3 :     if (transactional)
     952              :     {
     953            3 :         TestDecodingData *data = ctx->output_plugin_private;
     954            3 :         TestDecodingTxnData *txndata = txn->output_plugin_private;
     955              : 
     956            3 :         if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     957              :         {
     958            3 :             pg_output_stream_start(ctx, data, txn, false);
     959              :         }
     960            3 :         txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     961              :     }
     962              : 
     963            3 :     OutputPluginPrepareWrite(ctx, true);
     964              : 
     965            3 :     if (transactional)
     966              :     {
     967            3 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
     968              :                          transactional, prefix, sz);
     969              :     }
     970              :     else
     971              :     {
     972            0 :         appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
     973              :                          transactional, prefix, sz);
     974            0 :         appendBinaryStringInfo(ctx->out, message, sz);
     975              :     }
     976              : 
     977            3 :     OutputPluginWrite(ctx, true);
     978            3 : }
     979              : 
     980              : /*
     981              :  * In streaming mode, we don't display the detailed information of Truncate.
     982              :  * See pg_decode_stream_change.
     983              :  */
     984              : static void
     985            0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     986              :                           int nrelations, Relation relations[],
     987              :                           ReorderBufferChange *change)
     988              : {
     989            0 :     TestDecodingData *data = ctx->output_plugin_private;
     990            0 :     TestDecodingTxnData *txndata = txn->output_plugin_private;
     991              : 
     992            0 :     if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
     993              :     {
     994            0 :         pg_output_stream_start(ctx, data, txn, false);
     995              :     }
     996            0 :     txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
     997              : 
     998            0 :     OutputPluginPrepareWrite(ctx, true);
     999            0 :     if (data->include_xids)
    1000            0 :         appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
    1001              :     else
    1002            0 :         appendStringInfoString(ctx->out, "streaming truncate for transaction");
    1003            0 :     OutputPluginWrite(ctx, true);
    1004            0 : }
        

Generated by: LCOV version 2.0-1