LCOV - code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 199 221 90.0 %
Date: 2020-06-01 09:07:10 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * test_decoding.c
       4             :  *        example logical decoding output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        contrib/test_decoding/test_decoding.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "catalog/pg_type.h"
      16             : 
      17             : #include "replication/logical.h"
      18             : #include "replication/origin.h"
      19             : 
      20             : #include "utils/builtins.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/memutils.h"
      23             : #include "utils/rel.h"
      24             : 
      25          90 : PG_MODULE_MAGIC;
      26             : 
      27             : /* These must be available to dlsym() */
      28             : extern void _PG_init(void);
      29             : extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
      30             : 
      31             : typedef struct
      32             : {
      33             :     MemoryContext context;
      34             :     bool        include_xids;
      35             :     bool        include_timestamp;
      36             :     bool        skip_empty_xacts;
      37             :     bool        xact_wrote_changes;
      38             :     bool        only_local;
      39             : } TestDecodingData;
      40             : 
      41             : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      42             :                               bool is_init);
      43             : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
      44             : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
      45             :                                 ReorderBufferTXN *txn);
      46             : static void pg_output_begin(LogicalDecodingContext *ctx,
      47             :                             TestDecodingData *data,
      48             :                             ReorderBufferTXN *txn,
      49             :                             bool last_write);
      50             : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
      51             :                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      52             : static void pg_decode_change(LogicalDecodingContext *ctx,
      53             :                              ReorderBufferTXN *txn, Relation rel,
      54             :                              ReorderBufferChange *change);
      55             : static void pg_decode_truncate(LogicalDecodingContext *ctx,
      56             :                                ReorderBufferTXN *txn,
      57             :                                int nrelations, Relation relations[],
      58             :                                ReorderBufferChange *change);
      59             : static bool pg_decode_filter(LogicalDecodingContext *ctx,
      60             :                              RepOriginId origin_id);
      61             : static void pg_decode_message(LogicalDecodingContext *ctx,
      62             :                               ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      63             :                               bool transactional, const char *prefix,
      64             :                               Size sz, const char *message);
      65             : 
      66             : void
      67          90 : _PG_init(void)
      68             : {
      69             :     /* other plugins can perform things here */
      70          90 : }
      71             : 
      72             : /* specify output plugin callbacks */
      73             : void
      74         398 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
      75             : {
      76             :     AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
      77             : 
      78         398 :     cb->startup_cb = pg_decode_startup;
      79         398 :     cb->begin_cb = pg_decode_begin_txn;
      80         398 :     cb->change_cb = pg_decode_change;
      81         398 :     cb->truncate_cb = pg_decode_truncate;
      82         398 :     cb->commit_cb = pg_decode_commit_txn;
      83         398 :     cb->filter_by_origin_cb = pg_decode_filter;
      84         398 :     cb->shutdown_cb = pg_decode_shutdown;
      85         398 :     cb->message_cb = pg_decode_message;
      86         398 : }
      87             : 
      88             : 
      89             : /* initialize this plugin */
      90             : static void
      91         398 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      92             :                   bool is_init)
      93             : {
      94             :     ListCell   *option;
      95             :     TestDecodingData *data;
      96             : 
      97         398 :     data = palloc0(sizeof(TestDecodingData));
      98         398 :     data->context = AllocSetContextCreate(ctx->context,
      99             :                                           "text conversion context",
     100             :                                           ALLOCSET_DEFAULT_SIZES);
     101         398 :     data->include_xids = true;
     102         398 :     data->include_timestamp = false;
     103         398 :     data->skip_empty_xacts = false;
     104         398 :     data->only_local = false;
     105             : 
     106         398 :     ctx->output_plugin_private = data;
     107             : 
     108         398 :     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
     109         398 :     opt->receive_rewrites = false;
     110             : 
     111         812 :     foreach(option, ctx->output_plugin_options)
     112             :     {
     113         420 :         DefElem    *elem = lfirst(option);
     114             : 
     115             :         Assert(elem->arg == NULL || IsA(elem->arg, String));
     116             : 
     117         420 :         if (strcmp(elem->defname, "include-xids") == 0)
     118             :         {
     119             :             /* if option does not provide a value, it means its value is true */
     120         204 :             if (elem->arg == NULL)
     121           0 :                 data->include_xids = true;
     122         204 :             else if (!parse_bool(strVal(elem->arg), &data->include_xids))
     123           4 :                 ereport(ERROR,
     124             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     125             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     126             :                                 strVal(elem->arg), elem->defname)));
     127             :         }
     128         216 :         else if (strcmp(elem->defname, "include-timestamp") == 0)
     129             :         {
     130           2 :             if (elem->arg == NULL)
     131           0 :                 data->include_timestamp = true;
     132           2 :             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
     133           0 :                 ereport(ERROR,
     134             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     135             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     136             :                                 strVal(elem->arg), elem->defname)));
     137             :         }
     138         214 :         else if (strcmp(elem->defname, "force-binary") == 0)
     139             :         {
     140             :             bool        force_binary;
     141             : 
     142          12 :             if (elem->arg == NULL)
     143           0 :                 continue;
     144          12 :             else if (!parse_bool(strVal(elem->arg), &force_binary))
     145           0 :                 ereport(ERROR,
     146             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     147             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     148             :                                 strVal(elem->arg), elem->defname)));
     149             : 
     150          12 :             if (force_binary)
     151           4 :                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     152             :         }
     153         202 :         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
     154             :         {
     155             : 
     156         192 :             if (elem->arg == NULL)
     157           0 :                 data->skip_empty_xacts = true;
     158         192 :             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
     159           0 :                 ereport(ERROR,
     160             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     161             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     162             :                                 strVal(elem->arg), elem->defname)));
     163             :         }
     164          10 :         else if (strcmp(elem->defname, "only-local") == 0)
     165             :         {
     166             : 
     167           6 :             if (elem->arg == NULL)
     168           0 :                 data->only_local = true;
     169           6 :             else if (!parse_bool(strVal(elem->arg), &data->only_local))
     170           0 :                 ereport(ERROR,
     171             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     172             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     173             :                                 strVal(elem->arg), elem->defname)));
     174             :         }
     175           4 :         else if (strcmp(elem->defname, "include-rewrites") == 0)
     176             :         {
     177             : 
     178           2 :             if (elem->arg == NULL)
     179           0 :                 continue;
     180           2 :             else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
     181           0 :                 ereport(ERROR,
     182             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     183             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     184             :                                 strVal(elem->arg), elem->defname)));
     185             :         }
     186             :         else
     187             :         {
     188           2 :             ereport(ERROR,
     189             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     190             :                      errmsg("option \"%s\" = \"%s\" is unknown",
     191             :                             elem->defname,
     192             :                             elem->arg ? strVal(elem->arg) : "(null)")));
     193             :         }
     194             :     }
     195         392 : }
     196             : 
     197             : /* cleanup this plugin's resources */
     198             : static void
     199         388 : pg_decode_shutdown(LogicalDecodingContext *ctx)
     200             : {
     201         388 :     TestDecodingData *data = ctx->output_plugin_private;
     202             : 
     203             :     /* cleanup our own resources via memory context reset */
     204         388 :     MemoryContextDelete(data->context);
     205         388 : }
     206             : 
     207             : /* BEGIN callback */
     208             : static void
     209         666 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     210             : {
     211         666 :     TestDecodingData *data = ctx->output_plugin_private;
     212             : 
     213         666 :     data->xact_wrote_changes = false;
     214         666 :     if (data->skip_empty_xacts)
     215         600 :         return;
     216             : 
     217          66 :     pg_output_begin(ctx, data, txn, true);
     218             : }
     219             : 
     220             : static void
     221         390 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     222             : {
     223         390 :     OutputPluginPrepareWrite(ctx, last_write);
     224         390 :     if (data->include_xids)
     225          48 :         appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
     226             :     else
     227         342 :         appendStringInfoString(ctx->out, "BEGIN");
     228         390 :     OutputPluginWrite(ctx, last_write);
     229         390 : }
     230             : 
     231             : /* COMMIT callback */
     232             : static void
     233         666 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     234             :                      XLogRecPtr commit_lsn)
     235             : {
     236         666 :     TestDecodingData *data = ctx->output_plugin_private;
     237             : 
     238         666 :     if (data->skip_empty_xacts && !data->xact_wrote_changes)
     239         276 :         return;
     240             : 
     241         390 :     OutputPluginPrepareWrite(ctx, true);
     242         390 :     if (data->include_xids)
     243          48 :         appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
     244             :     else
     245         342 :         appendStringInfoString(ctx->out, "COMMIT");
     246             : 
     247         390 :     if (data->include_timestamp)
     248           2 :         appendStringInfo(ctx->out, " (at %s)",
     249             :                          timestamptz_to_str(txn->commit_time));
     250             : 
     251         390 :     OutputPluginWrite(ctx, true);
     252             : }
     253             : 
     254             : static bool
     255     1831024 : pg_decode_filter(LogicalDecodingContext *ctx,
     256             :                  RepOriginId origin_id)
     257             : {
     258     1831024 :     TestDecodingData *data = ctx->output_plugin_private;
     259             : 
     260     1831024 :     if (data->only_local && origin_id != InvalidRepOriginId)
     261          18 :         return true;
     262     1831006 :     return false;
     263             : }
     264             : 
     265             : /*
     266             :  * Print literal `outputstr' already represented as string of type `typid'
     267             :  * into stringbuf `s'.
     268             :  *
     269             :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
     270             :  * if standard_conforming_strings were enabled.
     271             :  */
     272             : static void
     273      341280 : print_literal(StringInfo s, Oid typid, char *outputstr)
     274             : {
     275             :     const char *valptr;
     276             : 
     277      341280 :     switch (typid)
     278             :     {
     279      120050 :         case INT2OID:
     280             :         case INT4OID:
     281             :         case INT8OID:
     282             :         case OIDOID:
     283             :         case FLOAT4OID:
     284             :         case FLOAT8OID:
     285             :         case NUMERICOID:
     286             :             /* NB: We don't care about Inf, NaN et al. */
     287      120050 :             appendStringInfoString(s, outputstr);
     288      120050 :             break;
     289             : 
     290           0 :         case BITOID:
     291             :         case VARBITOID:
     292           0 :             appendStringInfo(s, "B'%s'", outputstr);
     293           0 :             break;
     294             : 
     295           0 :         case BOOLOID:
     296           0 :             if (strcmp(outputstr, "t") == 0)
     297           0 :                 appendStringInfoString(s, "true");
     298             :             else
     299           0 :                 appendStringInfoString(s, "false");
     300           0 :             break;
     301             : 
     302      221230 :         default:
     303      221230 :             appendStringInfoChar(s, '\'');
     304    10577712 :             for (valptr = outputstr; *valptr; valptr++)
     305             :             {
     306    10356482 :                 char        ch = *valptr;
     307             : 
     308    10356482 :                 if (SQL_STR_DOUBLE(ch, false))
     309         128 :                     appendStringInfoChar(s, ch);
     310    10356482 :                 appendStringInfoChar(s, ch);
     311             :             }
     312      221230 :             appendStringInfoChar(s, '\'');
     313      221230 :             break;
     314             :     }
     315      341280 : }
     316             : 
     317             : /* print the tuple 'tuple' into the StringInfo s */
     318             : static void
     319      280742 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
     320             : {
     321             :     int         natt;
     322             : 
     323             :     /* print all columns individually */
     324      673338 :     for (natt = 0; natt < tupdesc->natts; natt++)
     325             :     {
     326             :         Form_pg_attribute attr; /* the attribute itself */
     327             :         Oid         typid;      /* type of current attribute */
     328             :         Oid         typoutput;  /* output function */
     329             :         bool        typisvarlena;
     330             :         Datum       origval;    /* possibly toasted Datum */
     331             :         bool        isnull;     /* column is null? */
     332             : 
     333      392596 :         attr = TupleDescAttr(tupdesc, natt);
     334             : 
     335             :         /*
     336             :          * don't print dropped columns, we can't be sure everything is
     337             :          * available for them
     338             :          */
     339      392596 :         if (attr->attisdropped)
     340       10154 :             continue;
     341             : 
     342             :         /*
     343             :          * Don't print system columns, oid will already have been printed if
     344             :          * present.
     345             :          */
     346      392504 :         if (attr->attnum < 0)
     347           0 :             continue;
     348             : 
     349      392504 :         typid = attr->atttypid;
     350             : 
     351             :         /* get Datum from tuple */
     352      392504 :         origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
     353             : 
     354      392504 :         if (isnull && skip_nulls)
     355       10062 :             continue;
     356             : 
     357             :         /* print attribute name */
     358      382442 :         appendStringInfoChar(s, ' ');
     359      382442 :         appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
     360             : 
     361             :         /* print attribute type */
     362      382442 :         appendStringInfoChar(s, '[');
     363      382442 :         appendStringInfoString(s, format_type_be(typid));
     364      382442 :         appendStringInfoChar(s, ']');
     365             : 
     366             :         /* query output function */
     367      382442 :         getTypeOutputInfo(typid,
     368             :                           &typoutput, &typisvarlena);
     369             : 
     370             :         /* print separator */
     371      382442 :         appendStringInfoChar(s, ':');
     372             : 
     373             :         /* print data */
     374      382442 :         if (isnull)
     375       41138 :             appendStringInfoString(s, "null");
     376      341304 :         else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
     377          24 :             appendStringInfoString(s, "unchanged-toast-datum");
     378      341280 :         else if (!typisvarlena)
     379      120058 :             print_literal(s, typid,
     380             :                           OidOutputFunctionCall(typoutput, origval));
     381             :         else
     382             :         {
     383             :             Datum       val;    /* definitely detoasted Datum */
     384             : 
     385      221222 :             val = PointerGetDatum(PG_DETOAST_DATUM(origval));
     386      221222 :             print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
     387             :         }
     388             :     }
     389      280742 : }
     390             : 
     391             : /*
     392             :  * callback for individual changed tuples
     393             :  */
     394             : static void
     395      290714 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     396             :                  Relation relation, ReorderBufferChange *change)
     397             : {
     398             :     TestDecodingData *data;
     399             :     Form_pg_class class_form;
     400             :     TupleDesc   tupdesc;
     401             :     MemoryContext old;
     402             : 
     403      290714 :     data = ctx->output_plugin_private;
     404             : 
     405             :     /* output BEGIN if we haven't yet */
     406      290714 :     if (data->skip_empty_xacts && !data->xact_wrote_changes)
     407             :     {
     408         318 :         pg_output_begin(ctx, data, txn, false);
     409             :     }
     410      290714 :     data->xact_wrote_changes = true;
     411             : 
     412      290714 :     class_form = RelationGetForm(relation);
     413      290714 :     tupdesc = RelationGetDescr(relation);
     414             : 
     415             :     /* Avoid leaking memory by using and resetting our own context */
     416      290714 :     old = MemoryContextSwitchTo(data->context);
     417             : 
     418      290714 :     OutputPluginPrepareWrite(ctx, true);
     419             : 
     420      290714 :     appendStringInfoString(ctx->out, "table ");
     421      290714 :     appendStringInfoString(ctx->out,
     422      290714 :                            quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
     423      290714 :                                                       class_form->relrewrite ?
     424           2 :                                                       get_rel_name(class_form->relrewrite) :
     425             :                                                       NameStr(class_form->relname)));
     426      290714 :     appendStringInfoChar(ctx->out, ':');
     427             : 
     428      290714 :     switch (change->action)
     429             :     {
     430      255660 :         case REORDER_BUFFER_CHANGE_INSERT:
     431      255660 :             appendStringInfoString(ctx->out, " INSERT:");
     432      255660 :             if (change->data.tp.newtuple == NULL)
     433           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     434             :             else
     435      255660 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     436      255660 :                                     &change->data.tp.newtuple->tuple,
     437             :                                     false);
     438      255660 :             break;
     439       15030 :         case REORDER_BUFFER_CHANGE_UPDATE:
     440       15030 :             appendStringInfoString(ctx->out, " UPDATE:");
     441       15030 :             if (change->data.tp.oldtuple != NULL)
     442             :             {
     443          34 :                 appendStringInfoString(ctx->out, " old-key:");
     444          34 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     445          34 :                                     &change->data.tp.oldtuple->tuple,
     446             :                                     true);
     447          34 :                 appendStringInfoString(ctx->out, " new-tuple:");
     448             :             }
     449             : 
     450       15030 :             if (change->data.tp.newtuple == NULL)
     451           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     452             :             else
     453       15030 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     454       15030 :                                     &change->data.tp.newtuple->tuple,
     455             :                                     false);
     456       15030 :             break;
     457       20024 :         case REORDER_BUFFER_CHANGE_DELETE:
     458       20024 :             appendStringInfoString(ctx->out, " DELETE:");
     459             : 
     460             :             /* if there was no PK, we only know that a delete happened */
     461       20024 :             if (change->data.tp.oldtuple == NULL)
     462       10006 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     463             :             /* In DELETE, only the replica identity is present; display that */
     464             :             else
     465       10018 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     466       10018 :                                     &change->data.tp.oldtuple->tuple,
     467             :                                     true);
     468       20024 :             break;
     469      290714 :         default:
     470             :             Assert(false);
     471             :     }
     472             : 
     473      290714 :     MemoryContextSwitchTo(old);
     474      290714 :     MemoryContextReset(data->context);
     475             : 
     476      290714 :     OutputPluginWrite(ctx, true);
     477      290714 : }
     478             : 
     479             : static void
     480           6 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     481             :                    int nrelations, Relation relations[], ReorderBufferChange *change)
     482             : {
     483             :     TestDecodingData *data;
     484             :     MemoryContext old;
     485             :     int         i;
     486             : 
     487           6 :     data = ctx->output_plugin_private;
     488             : 
     489             :     /* output BEGIN if we haven't yet */
     490           6 :     if (data->skip_empty_xacts && !data->xact_wrote_changes)
     491             :     {
     492           6 :         pg_output_begin(ctx, data, txn, false);
     493             :     }
     494           6 :     data->xact_wrote_changes = true;
     495             : 
     496             :     /* Avoid leaking memory by using and resetting our own context */
     497           6 :     old = MemoryContextSwitchTo(data->context);
     498             : 
     499           6 :     OutputPluginPrepareWrite(ctx, true);
     500             : 
     501           6 :     appendStringInfoString(ctx->out, "table ");
     502             : 
     503          14 :     for (i = 0; i < nrelations; i++)
     504             :     {
     505           8 :         if (i > 0)
     506           2 :             appendStringInfoString(ctx->out, ", ");
     507             : 
     508           8 :         appendStringInfoString(ctx->out,
     509           8 :                                quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
     510           8 :                                                           NameStr(relations[i]->rd_rel->relname)));
     511             :     }
     512             : 
     513           6 :     appendStringInfoString(ctx->out, ": TRUNCATE:");
     514             : 
     515           6 :     if (change->data.truncate.restart_seqs
     516           4 :         || change->data.truncate.cascade)
     517             :     {
     518           2 :         if (change->data.truncate.restart_seqs)
     519           2 :             appendStringInfoString(ctx->out, " restart_seqs");
     520           4 :         if (change->data.truncate.cascade)
     521           2 :             appendStringInfoString(ctx->out, " cascade");
     522             :     }
     523             :     else
     524           4 :         appendStringInfoString(ctx->out, " (no-flags)");
     525             : 
     526           6 :     MemoryContextSwitchTo(old);
     527           6 :     MemoryContextReset(data->context);
     528             : 
     529           6 :     OutputPluginWrite(ctx, true);
     530           6 : }
     531             : 
     532             : static void
     533          16 : pg_decode_message(LogicalDecodingContext *ctx,
     534             :                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     535             :                   const char *prefix, Size sz, const char *message)
     536             : {
     537          16 :     OutputPluginPrepareWrite(ctx, true);
     538          16 :     appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
     539             :                      transactional, prefix, sz);
     540          16 :     appendBinaryStringInfo(ctx->out, message, sz);
     541          16 :     OutputPluginWrite(ctx, true);
     542          16 : }

Generated by: LCOV version 1.13