LCOV - code coverage report
Current view: top level - contrib/test_decoding - test_decoding.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 194 214 90.7 %
Date: 2019-08-24 15:07:19 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * test_decoding.c
       4             :  *        example logical decoding output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        contrib/test_decoding/test_decoding.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "catalog/pg_type.h"
      16             : 
      17             : #include "replication/logical.h"
      18             : #include "replication/origin.h"
      19             : 
      20             : #include "utils/builtins.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/memutils.h"
      23             : #include "utils/rel.h"
      24             : 
      25          86 : PG_MODULE_MAGIC;
      26             : 
      27             : /* These must be available to dlsym() */
      28             : extern void _PG_init(void);
      29             : extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
      30             : 
      31             : typedef struct
      32             : {
      33             :     MemoryContext context;
      34             :     bool        include_xids;
      35             :     bool        include_timestamp;
      36             :     bool        skip_empty_xacts;
      37             :     bool        xact_wrote_changes;
      38             :     bool        only_local;
      39             : } TestDecodingData;
      40             : 
      41             : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      42             :                               bool is_init);
      43             : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
      44             : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
      45             :                                 ReorderBufferTXN *txn);
      46             : static void pg_output_begin(LogicalDecodingContext *ctx,
      47             :                             TestDecodingData *data,
      48             :                             ReorderBufferTXN *txn,
      49             :                             bool last_write);
      50             : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
      51             :                                  ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      52             : static void pg_decode_change(LogicalDecodingContext *ctx,
      53             :                              ReorderBufferTXN *txn, Relation rel,
      54             :                              ReorderBufferChange *change);
      55             : static void pg_decode_truncate(LogicalDecodingContext *ctx,
      56             :                                ReorderBufferTXN *txn,
      57             :                                int nrelations, Relation relations[],
      58             :                                ReorderBufferChange *change);
      59             : static bool pg_decode_filter(LogicalDecodingContext *ctx,
      60             :                              RepOriginId origin_id);
      61             : static void pg_decode_message(LogicalDecodingContext *ctx,
      62             :                               ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      63             :                               bool transactional, const char *prefix,
      64             :                               Size sz, const char *message);
      65             : 
      66             : void
      67          86 : _PG_init(void)
      68             : {
      69             :     /* other plugins can perform things here */
      70          86 : }
      71             : 
      72             : /* specify output plugin callbacks */
      73             : void
      74         390 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
      75             : {
      76             :     AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
      77             : 
      78         390 :     cb->startup_cb = pg_decode_startup;
      79         390 :     cb->begin_cb = pg_decode_begin_txn;
      80         390 :     cb->change_cb = pg_decode_change;
      81         390 :     cb->truncate_cb = pg_decode_truncate;
      82         390 :     cb->commit_cb = pg_decode_commit_txn;
      83         390 :     cb->filter_by_origin_cb = pg_decode_filter;
      84         390 :     cb->shutdown_cb = pg_decode_shutdown;
      85         390 :     cb->message_cb = pg_decode_message;
      86         390 : }
      87             : 
      88             : 
      89             : /* initialize this plugin */
      90             : static void
      91         390 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      92             :                   bool is_init)
      93             : {
      94             :     ListCell   *option;
      95             :     TestDecodingData *data;
      96             : 
      97         390 :     data = palloc0(sizeof(TestDecodingData));
      98         390 :     data->context = AllocSetContextCreate(ctx->context,
      99             :                                           "text conversion context",
     100             :                                           ALLOCSET_DEFAULT_SIZES);
     101         390 :     data->include_xids = true;
     102         390 :     data->include_timestamp = false;
     103         390 :     data->skip_empty_xacts = false;
     104         390 :     data->only_local = false;
     105             : 
     106         390 :     ctx->output_plugin_private = data;
     107             : 
     108         390 :     opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
     109         390 :     opt->receive_rewrites = false;
     110             : 
     111         796 :     foreach(option, ctx->output_plugin_options)
     112             :     {
     113         412 :         DefElem    *elem = lfirst(option);
     114             : 
     115             :         Assert(elem->arg == NULL || IsA(elem->arg, String));
     116             : 
     117         412 :         if (strcmp(elem->defname, "include-xids") == 0)
     118             :         {
     119             :             /* if option does not provide a value, it means its value is true */
     120         200 :             if (elem->arg == NULL)
     121           0 :                 data->include_xids = true;
     122         200 :             else if (!parse_bool(strVal(elem->arg), &data->include_xids))
     123           4 :                 ereport(ERROR,
     124             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     125             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     126             :                                 strVal(elem->arg), elem->defname)));
     127             :         }
     128         212 :         else if (strcmp(elem->defname, "include-timestamp") == 0)
     129             :         {
     130           2 :             if (elem->arg == NULL)
     131           0 :                 data->include_timestamp = true;
     132           2 :             else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
     133           0 :                 ereport(ERROR,
     134             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     135             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     136             :                                 strVal(elem->arg), elem->defname)));
     137             :         }
     138         210 :         else if (strcmp(elem->defname, "force-binary") == 0)
     139             :         {
     140             :             bool        force_binary;
     141             : 
     142          12 :             if (elem->arg == NULL)
     143           0 :                 continue;
     144          12 :             else if (!parse_bool(strVal(elem->arg), &force_binary))
     145           0 :                 ereport(ERROR,
     146             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     147             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     148             :                                 strVal(elem->arg), elem->defname)));
     149             : 
     150          12 :             if (force_binary)
     151           4 :                 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     152             :         }
     153         198 :         else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
     154             :         {
     155             : 
     156         188 :             if (elem->arg == NULL)
     157           0 :                 data->skip_empty_xacts = true;
     158         188 :             else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
     159           0 :                 ereport(ERROR,
     160             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     161             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     162             :                                 strVal(elem->arg), elem->defname)));
     163             :         }
     164          10 :         else if (strcmp(elem->defname, "only-local") == 0)
     165             :         {
     166             : 
     167           6 :             if (elem->arg == NULL)
     168           0 :                 data->only_local = true;
     169           6 :             else if (!parse_bool(strVal(elem->arg), &data->only_local))
     170           0 :                 ereport(ERROR,
     171             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     172             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     173             :                                 strVal(elem->arg), elem->defname)));
     174             :         }
     175           4 :         else if (strcmp(elem->defname, "include-rewrites") == 0)
     176             :         {
     177             : 
     178           2 :             if (elem->arg == NULL)
     179           0 :                 continue;
     180           2 :             else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
     181           0 :                 ereport(ERROR,
     182             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     183             :                          errmsg("could not parse value \"%s\" for parameter \"%s\"",
     184             :                                 strVal(elem->arg), elem->defname)));
     185             :         }
     186             :         else
     187             :         {
     188           2 :             ereport(ERROR,
     189             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     190             :                      errmsg("option \"%s\" = \"%s\" is unknown",
     191             :                             elem->defname,
     192             :                             elem->arg ? strVal(elem->arg) : "(null)")));
     193             :         }
     194             :     }
     195         384 : }
     196             : 
     197             : /* cleanup this plugin's resources */
     198             : static void
     199         372 : pg_decode_shutdown(LogicalDecodingContext *ctx)
     200             : {
     201         372 :     TestDecodingData *data = ctx->output_plugin_private;
     202             : 
     203             :     /* cleanup our own resources via memory context reset */
     204         372 :     MemoryContextDelete(data->context);
     205         372 : }
     206             : 
     207             : /* BEGIN callback */
     208             : static void
     209         668 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     210             : {
     211         668 :     TestDecodingData *data = ctx->output_plugin_private;
     212             : 
     213         668 :     data->xact_wrote_changes = false;
     214         668 :     if (data->skip_empty_xacts)
     215         602 :         return;
     216             : 
     217          66 :     pg_output_begin(ctx, data, txn, true);
     218             : }
     219             : 
     220             : static void
     221         392 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
     222             : {
     223         392 :     OutputPluginPrepareWrite(ctx, last_write);
     224         392 :     if (data->include_xids)
     225          48 :         appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
     226             :     else
     227         344 :         appendStringInfoString(ctx->out, "BEGIN");
     228         392 :     OutputPluginWrite(ctx, last_write);
     229         392 : }
     230             : 
     231             : /* COMMIT callback */
     232             : static void
     233         668 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     234             :                      XLogRecPtr commit_lsn)
     235             : {
     236         668 :     TestDecodingData *data = ctx->output_plugin_private;
     237             : 
     238         668 :     if (data->skip_empty_xacts && !data->xact_wrote_changes)
     239         276 :         return;
     240             : 
     241         392 :     OutputPluginPrepareWrite(ctx, true);
     242         392 :     if (data->include_xids)
     243          48 :         appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
     244             :     else
     245         344 :         appendStringInfoString(ctx->out, "COMMIT");
     246             : 
     247         392 :     if (data->include_timestamp)
     248           2 :         appendStringInfo(ctx->out, " (at %s)",
     249             :                          timestamptz_to_str(txn->commit_time));
     250             : 
     251         392 :     OutputPluginWrite(ctx, true);
     252             : }
     253             : 
     254             : static bool
     255     2350268 : pg_decode_filter(LogicalDecodingContext *ctx,
     256             :                  RepOriginId origin_id)
     257             : {
     258     2350268 :     TestDecodingData *data = ctx->output_plugin_private;
     259             : 
     260     2350268 :     if (data->only_local && origin_id != InvalidRepOriginId)
     261          18 :         return true;
     262     2350250 :     return false;
     263             : }
     264             : 
     265             : /*
     266             :  * Print literal `outputstr' already represented as string of type `typid'
     267             :  * into stringbuf `s'.
     268             :  *
     269             :  * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
     270             :  * if standard_conforming_strings were enabled.
     271             :  */
     272             : static void
     273      341026 : print_literal(StringInfo s, Oid typid, char *outputstr)
     274             : {
     275             :     const char *valptr;
     276             : 
     277      341026 :     switch (typid)
     278             :     {
     279             :         case INT2OID:
     280             :         case INT4OID:
     281             :         case INT8OID:
     282             :         case OIDOID:
     283             :         case FLOAT4OID:
     284             :         case FLOAT8OID:
     285             :         case NUMERICOID:
     286             :             /* NB: We don't care about Inf, NaN et al. */
     287      119790 :             appendStringInfoString(s, outputstr);
     288      119790 :             break;
     289             : 
     290             :         case BITOID:
     291             :         case VARBITOID:
     292           0 :             appendStringInfo(s, "B'%s'", outputstr);
     293           0 :             break;
     294             : 
     295             :         case BOOLOID:
     296           0 :             if (strcmp(outputstr, "t") == 0)
     297           0 :                 appendStringInfoString(s, "true");
     298             :             else
     299           0 :                 appendStringInfoString(s, "false");
     300           0 :             break;
     301             : 
     302             :         default:
     303      221236 :             appendStringInfoChar(s, '\'');
     304    10577776 :             for (valptr = outputstr; *valptr; valptr++)
     305             :             {
     306    10356540 :                 char        ch = *valptr;
     307             : 
     308    10356540 :                 if (SQL_STR_DOUBLE(ch, false))
     309         128 :                     appendStringInfoChar(s, ch);
     310    10356540 :                 appendStringInfoChar(s, ch);
     311             :             }
     312      221236 :             appendStringInfoChar(s, '\'');
     313      221236 :             break;
     314             :     }
     315      341026 : }
     316             : 
     317             : /* print the tuple 'tuple' into the StringInfo s */
     318             : static void
     319      280488 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
     320             : {
     321             :     int         natt;
     322             : 
     323             :     /* print all columns individually */
     324      672830 :     for (natt = 0; natt < tupdesc->natts; natt++)
     325             :     {
     326             :         Form_pg_attribute attr; /* the attribute itself */
     327             :         Oid         typid;      /* type of current attribute */
     328             :         Oid         typoutput;  /* output function */
     329             :         bool        typisvarlena;
     330             :         Datum       origval;    /* possibly toasted Datum */
     331             :         bool        isnull;     /* column is null? */
     332             : 
     333      392342 :         attr = TupleDescAttr(tupdesc, natt);
     334             : 
     335             :         /*
     336             :          * don't print dropped columns, we can't be sure everything is
     337             :          * available for them
     338             :          */
     339      392342 :         if (attr->attisdropped)
     340       10246 :             continue;
     341             : 
     342             :         /*
     343             :          * Don't print system columns, oid will already have been printed if
     344             :          * present.
     345             :          */
     346      392250 :         if (attr->attnum < 0)
     347           0 :             continue;
     348             : 
     349      392250 :         typid = attr->atttypid;
     350             : 
     351             :         /* get Datum from tuple */
     352      392250 :         origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
     353             : 
     354      392250 :         if (isnull && skip_nulls)
     355       10062 :             continue;
     356             : 
     357             :         /* print attribute name */
     358      382188 :         appendStringInfoChar(s, ' ');
     359      382188 :         appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
     360             : 
     361             :         /* print attribute type */
     362      382188 :         appendStringInfoChar(s, '[');
     363      382188 :         appendStringInfoString(s, format_type_be(typid));
     364      382188 :         appendStringInfoChar(s, ']');
     365             : 
     366             :         /* query output function */
     367      382188 :         getTypeOutputInfo(typid,
     368             :                           &typoutput, &typisvarlena);
     369             : 
     370             :         /* print separator */
     371      382188 :         appendStringInfoChar(s, ':');
     372             : 
     373             :         /* print data */
     374      382188 :         if (isnull)
     375       41138 :             appendStringInfoString(s, "null");
     376      341050 :         else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
     377          24 :             appendStringInfoString(s, "unchanged-toast-datum");
     378      341026 :         else if (!typisvarlena)
     379      119798 :             print_literal(s, typid,
     380             :                           OidOutputFunctionCall(typoutput, origval));
     381             :         else
     382             :         {
     383             :             Datum       val;    /* definitely detoasted Datum */
     384             : 
     385      221228 :             val = PointerGetDatum(PG_DETOAST_DATUM(origval));
     386      221228 :             print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
     387             :         }
     388             :     }
     389      280488 : }
     390             : 
     391             : /*
     392             :  * callback for individual changed tuples
     393             :  */
     394             : static void
     395      290460 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     396             :                  Relation relation, ReorderBufferChange *change)
     397             : {
     398             :     TestDecodingData *data;
     399             :     Form_pg_class class_form;
     400             :     TupleDesc   tupdesc;
     401             :     MemoryContext old;
     402             : 
     403      290460 :     data = ctx->output_plugin_private;
     404             : 
     405             :     /* output BEGIN if we haven't yet */
     406      290460 :     if (data->skip_empty_xacts && !data->xact_wrote_changes)
     407             :     {
     408         320 :         pg_output_begin(ctx, data, txn, false);
     409             :     }
     410      290460 :     data->xact_wrote_changes = true;
     411             : 
     412      290460 :     class_form = RelationGetForm(relation);
     413      290460 :     tupdesc = RelationGetDescr(relation);
     414             : 
     415             :     /* Avoid leaking memory by using and resetting our own context */
     416      290460 :     old = MemoryContextSwitchTo(data->context);
     417             : 
     418      290460 :     OutputPluginPrepareWrite(ctx, true);
     419             : 
     420      290460 :     appendStringInfoString(ctx->out, "table ");
     421      290460 :     appendStringInfoString(ctx->out,
     422      580922 :                            quote_qualified_identifier(
     423      290460 :                                                       get_namespace_name(
     424             :                                                                          get_rel_namespace(RelationGetRelid(relation))),
     425      290460 :                                                       class_form->relrewrite ?
     426           2 :                                                       get_rel_name(class_form->relrewrite) :
     427             :                                                       NameStr(class_form->relname)));
     428      290460 :     appendStringInfoChar(ctx->out, ':');
     429             : 
     430      290460 :     switch (change->action)
     431             :     {
     432             :         case REORDER_BUFFER_CHANGE_INSERT:
     433      255406 :             appendStringInfoString(ctx->out, " INSERT:");
     434      255406 :             if (change->data.tp.newtuple == NULL)
     435           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     436             :             else
     437      255406 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     438      255406 :                                     &change->data.tp.newtuple->tuple,
     439             :                                     false);
     440      255406 :             break;
     441             :         case REORDER_BUFFER_CHANGE_UPDATE:
     442       15030 :             appendStringInfoString(ctx->out, " UPDATE:");
     443       15030 :             if (change->data.tp.oldtuple != NULL)
     444             :             {
     445          34 :                 appendStringInfoString(ctx->out, " old-key:");
     446          34 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     447          34 :                                     &change->data.tp.oldtuple->tuple,
     448             :                                     true);
     449          34 :                 appendStringInfoString(ctx->out, " new-tuple:");
     450             :             }
     451             : 
     452       15030 :             if (change->data.tp.newtuple == NULL)
     453           0 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     454             :             else
     455       15030 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     456       15030 :                                     &change->data.tp.newtuple->tuple,
     457             :                                     false);
     458       15030 :             break;
     459             :         case REORDER_BUFFER_CHANGE_DELETE:
     460       20024 :             appendStringInfoString(ctx->out, " DELETE:");
     461             : 
     462             :             /* if there was no PK, we only know that a delete happened */
     463       20024 :             if (change->data.tp.oldtuple == NULL)
     464       10006 :                 appendStringInfoString(ctx->out, " (no-tuple-data)");
     465             :             /* In DELETE, only the replica identity is present; display that */
     466             :             else
     467       10018 :                 tuple_to_stringinfo(ctx->out, tupdesc,
     468       10018 :                                     &change->data.tp.oldtuple->tuple,
     469             :                                     true);
     470       20024 :             break;
     471             :         default:
     472             :             Assert(false);
     473             :     }
     474             : 
     475      290460 :     MemoryContextSwitchTo(old);
     476      290460 :     MemoryContextReset(data->context);
     477             : 
     478      290460 :     OutputPluginWrite(ctx, true);
     479      290460 : }
     480             : 
     481             : static void
     482           6 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     483             :                    int nrelations, Relation relations[], ReorderBufferChange *change)
     484             : {
     485             :     TestDecodingData *data;
     486             :     MemoryContext old;
     487             :     int         i;
     488             : 
     489           6 :     data = ctx->output_plugin_private;
     490             : 
     491             :     /* output BEGIN if we haven't yet */
     492           6 :     if (data->skip_empty_xacts && !data->xact_wrote_changes)
     493             :     {
     494           6 :         pg_output_begin(ctx, data, txn, false);
     495             :     }
     496           6 :     data->xact_wrote_changes = true;
     497             : 
     498             :     /* Avoid leaking memory by using and resetting our own context */
     499           6 :     old = MemoryContextSwitchTo(data->context);
     500             : 
     501           6 :     OutputPluginPrepareWrite(ctx, true);
     502             : 
     503           6 :     appendStringInfoString(ctx->out, "table ");
     504             : 
     505          14 :     for (i = 0; i < nrelations; i++)
     506             :     {
     507           8 :         if (i > 0)
     508           2 :             appendStringInfoString(ctx->out, ", ");
     509             : 
     510           8 :         appendStringInfoString(ctx->out,
     511           8 :                                quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
     512           8 :                                                           NameStr(relations[i]->rd_rel->relname)));
     513             :     }
     514             : 
     515           6 :     appendStringInfoString(ctx->out, ": TRUNCATE:");
     516             : 
     517           6 :     if (change->data.truncate.restart_seqs
     518           4 :         || change->data.truncate.cascade)
     519             :     {
     520           2 :         if (change->data.truncate.restart_seqs)
     521           2 :             appendStringInfoString(ctx->out, " restart_seqs");
     522           4 :         if (change->data.truncate.cascade)
     523           2 :             appendStringInfoString(ctx->out, " cascade");
     524             :     }
     525             :     else
     526           4 :         appendStringInfoString(ctx->out, " (no-flags)");
     527             : 
     528           6 :     MemoryContextSwitchTo(old);
     529           6 :     MemoryContextReset(data->context);
     530             : 
     531           6 :     OutputPluginWrite(ctx, true);
     532           6 : }
     533             : 
     534             : static void
     535          16 : pg_decode_message(LogicalDecodingContext *ctx,
     536             :                   ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
     537             :                   const char *prefix, Size sz, const char *message)
     538             : {
     539          16 :     OutputPluginPrepareWrite(ctx, true);
     540          16 :     appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
     541             :                      transactional, prefix, sz);
     542          16 :     appendBinaryStringInfo(ctx->out, message, sz);
     543          16 :     OutputPluginWrite(ctx, true);
     544          16 : }

Generated by: LCOV version 1.13