LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 339 392 86.5 %
Date: 2021-01-26 22:06:52 Functions: 25 25 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pgoutput.c
       4             :  *      Logical Replication output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/backend/replication/pgoutput/pgoutput.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/tupconvert.h"
      16             : #include "catalog/partition.h"
      17             : #include "catalog/pg_publication.h"
      18             : #include "commands/defrem.h"
      19             : #include "fmgr.h"
      20             : #include "replication/logical.h"
      21             : #include "replication/logicalproto.h"
      22             : #include "replication/origin.h"
      23             : #include "replication/pgoutput.h"
      24             : #include "utils/int8.h"
      25             : #include "utils/inval.h"
      26             : #include "utils/lsyscache.h"
      27             : #include "utils/memutils.h"
      28             : #include "utils/syscache.h"
      29             : #include "utils/varlena.h"
      30             : 
      31         242 : PG_MODULE_MAGIC;
      32             : 
      33             : extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
      34             : 
      35             : static void pgoutput_startup(LogicalDecodingContext *ctx,
      36             :                              OutputPluginOptions *opt, bool is_init);
      37             : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      38             : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      39             :                                ReorderBufferTXN *txn);
      40             : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      41             :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      42             : static void pgoutput_change(LogicalDecodingContext *ctx,
      43             :                             ReorderBufferTXN *txn, Relation rel,
      44             :                             ReorderBufferChange *change);
      45             : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      46             :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      47             :                               ReorderBufferChange *change);
      48             : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      49             :                                    RepOriginId origin_id);
      50             : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
      51             :                                   ReorderBufferTXN *txn);
      52             : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
      53             :                                  ReorderBufferTXN *txn);
      54             : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
      55             :                                   ReorderBufferTXN *txn,
      56             :                                   XLogRecPtr abort_lsn);
      57             : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
      58             :                                    ReorderBufferTXN *txn,
      59             :                                    XLogRecPtr commit_lsn);
      60             : 
      61             : static bool publications_valid;
      62             : static bool in_streaming;
      63             : 
      64             : static List *LoadPublications(List *pubnames);
      65             : static void publication_invalidation_cb(Datum arg, int cacheid,
      66             :                                         uint32 hashvalue);
      67             : static void send_relation_and_attrs(Relation relation, TransactionId xid,
      68             :                                     LogicalDecodingContext *ctx);
      69             : 
      70             : /*
      71             :  * Entry in the map used to remember which relation schemas we sent.
      72             :  *
      73             :  * The schema_sent flag determines if the current schema record was already
      74             :  * sent to the subscriber (in which case we don't need to send it again).
      75             :  *
      76             :  * The schema cache on downstream is however updated only at commit time,
      77             :  * and with streamed transactions the commit order may be different from
      78             :  * the order the transactions are sent in. Also, the (sub) transactions
      79             :  * might get aborted so we need to send the schema for each (sub) transaction
      80             :  * so that we don't lose the schema information on abort. For handling this,
      81             :  * we maintain the list of xids (streamed_txns) for those we have already sent
      82             :  * the schema.
      83             :  *
      84             :  * For partitions, 'pubactions' considers not only the table's own
      85             :  * publications, but also those of all of its ancestors.
      86             :  */
      87             : typedef struct RelationSyncEntry
      88             : {
      89             :     Oid         relid;          /* relation oid */
      90             : 
      91             :     /*
      92             :      * Did we send the schema?  If ancestor relid is set, its schema must also
      93             :      * have been sent for this to be true.
      94             :      */
      95             :     bool        schema_sent;
      96             :     List       *streamed_txns;  /* streamed toplevel transactions with this
      97             :                                  * schema */
      98             : 
      99             :     bool        replicate_valid;
     100             :     PublicationActions pubactions;
     101             : 
     102             :     /*
     103             :      * OID of the relation to publish changes as.  For a partition, this may
     104             :      * be set to one of its ancestors whose schema will be used when
     105             :      * replicating changes, if publish_via_partition_root is set for the
     106             :      * publication.
     107             :      */
     108             :     Oid         publish_as_relid;
     109             : 
     110             :     /*
     111             :      * Map used when replicating using an ancestor's schema to convert tuples
     112             :      * from partition's type to the ancestor's; NULL if publish_as_relid is
     113             :      * same as 'relid' or if unnecessary due to partition and the ancestor
     114             :      * having identical TupleDesc.
     115             :      */
     116             :     TupleConversionMap *map;
     117             : } RelationSyncEntry;
     118             : 
     119             : /* Map used to remember which relation schemas we sent. */
     120             : static HTAB *RelationSyncCache = NULL;
     121             : 
     122             : static void init_rel_sync_cache(MemoryContext decoding_context);
     123             : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
     124             : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
     125             : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
     126             : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
     127             :                                           uint32 hashvalue);
     128             : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     129             :                                             TransactionId xid);
     130             : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     131             :                                             TransactionId xid);
     132             : 
     133             : /*
     134             :  * Specify output plugin callbacks
     135             :  */
     136             : void
     137         368 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     138             : {
     139             :     AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
     140             : 
     141         368 :     cb->startup_cb = pgoutput_startup;
     142         368 :     cb->begin_cb = pgoutput_begin_txn;
     143         368 :     cb->change_cb = pgoutput_change;
     144         368 :     cb->truncate_cb = pgoutput_truncate;
     145         368 :     cb->commit_cb = pgoutput_commit_txn;
     146         368 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
     147         368 :     cb->shutdown_cb = pgoutput_shutdown;
     148             : 
     149             :     /* transaction streaming */
     150         368 :     cb->stream_start_cb = pgoutput_stream_start;
     151         368 :     cb->stream_stop_cb = pgoutput_stream_stop;
     152         368 :     cb->stream_abort_cb = pgoutput_stream_abort;
     153         368 :     cb->stream_commit_cb = pgoutput_stream_commit;
     154         368 :     cb->stream_change_cb = pgoutput_change;
     155         368 :     cb->stream_truncate_cb = pgoutput_truncate;
     156         368 : }
     157             : 
     158             : static void
     159         180 : parse_output_parameters(List *options, uint32 *protocol_version,
     160             :                         List **publication_names, bool *binary,
     161             :                         bool *enable_streaming)
     162             : {
     163             :     ListCell   *lc;
     164         180 :     bool        protocol_version_given = false;
     165         180 :     bool        publication_names_given = false;
     166         180 :     bool        binary_option_given = false;
     167         180 :     bool        streaming_given = false;
     168             : 
     169         180 :     *binary = false;
     170             : 
     171         572 :     foreach(lc, options)
     172             :     {
     173         392 :         DefElem    *defel = (DefElem *) lfirst(lc);
     174             : 
     175             :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     176             : 
     177             :         /* Check each param, whether or not we recognize it */
     178         392 :         if (strcmp(defel->defname, "proto_version") == 0)
     179             :         {
     180             :             int64       parsed;
     181             : 
     182         180 :             if (protocol_version_given)
     183           0 :                 ereport(ERROR,
     184             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     185             :                          errmsg("conflicting or redundant options")));
     186         180 :             protocol_version_given = true;
     187             : 
     188         180 :             if (!scanint8(strVal(defel->arg), true, &parsed))
     189           0 :                 ereport(ERROR,
     190             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     191             :                          errmsg("invalid proto_version")));
     192             : 
     193         180 :             if (parsed > PG_UINT32_MAX || parsed < 0)
     194           0 :                 ereport(ERROR,
     195             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     196             :                          errmsg("proto_version \"%s\" out of range",
     197             :                                 strVal(defel->arg))));
     198             : 
     199         180 :             *protocol_version = (uint32) parsed;
     200             :         }
     201         212 :         else if (strcmp(defel->defname, "publication_names") == 0)
     202             :         {
     203         180 :             if (publication_names_given)
     204           0 :                 ereport(ERROR,
     205             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     206             :                          errmsg("conflicting or redundant options")));
     207         180 :             publication_names_given = true;
     208             : 
     209         180 :             if (!SplitIdentifierString(strVal(defel->arg), ',',
     210             :                                        publication_names))
     211           0 :                 ereport(ERROR,
     212             :                         (errcode(ERRCODE_INVALID_NAME),
     213             :                          errmsg("invalid publication_names syntax")));
     214             :         }
     215          32 :         else if (strcmp(defel->defname, "binary") == 0)
     216             :         {
     217          10 :             if (binary_option_given)
     218           0 :                 ereport(ERROR,
     219             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     220             :                          errmsg("conflicting or redundant options")));
     221          10 :             binary_option_given = true;
     222             : 
     223          10 :             *binary = defGetBoolean(defel);
     224             :         }
     225          22 :         else if (strcmp(defel->defname, "streaming") == 0)
     226             :         {
     227          22 :             if (streaming_given)
     228           0 :                 ereport(ERROR,
     229             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     230             :                          errmsg("conflicting or redundant options")));
     231          22 :             streaming_given = true;
     232             : 
     233          22 :             *enable_streaming = defGetBoolean(defel);
     234             :         }
     235             :         else
     236           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     237             :     }
     238         180 : }
     239             : 
     240             : /*
     241             :  * Initialize this plugin
     242             :  */
     243             : static void
     244         368 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     245             :                  bool is_init)
     246             : {
     247         368 :     bool        enable_streaming = false;
     248         368 :     PGOutputData *data = palloc0(sizeof(PGOutputData));
     249             : 
     250             :     /* Create our memory context for private allocations. */
     251         368 :     data->context = AllocSetContextCreate(ctx->context,
     252             :                                           "logical replication output context",
     253             :                                           ALLOCSET_DEFAULT_SIZES);
     254             : 
     255         368 :     ctx->output_plugin_private = data;
     256             : 
     257             :     /* This plugin uses binary protocol. */
     258         368 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     259             : 
     260             :     /*
     261             :      * This is replication start and not slot initialization.
     262             :      *
     263             :      * Parse and validate options passed by the client.
     264             :      */
     265         368 :     if (!is_init)
     266             :     {
     267             :         /* Parse the params and ERROR if we see any we don't recognize */
     268         180 :         parse_output_parameters(ctx->output_plugin_options,
     269             :                                 &data->protocol_version,
     270             :                                 &data->publication_names,
     271             :                                 &data->binary,
     272             :                                 &enable_streaming);
     273             : 
     274             :         /* Check if we support requested protocol */
     275         180 :         if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
     276           0 :             ereport(ERROR,
     277             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     278             :                      errmsg("client sent proto_version=%d but we only support protocol %d or lower",
     279             :                             data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
     280             : 
     281         180 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     282           0 :             ereport(ERROR,
     283             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     284             :                      errmsg("client sent proto_version=%d but we only support protocol %d or higher",
     285             :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     286             : 
     287         180 :         if (list_length(data->publication_names) < 1)
     288           0 :             ereport(ERROR,
     289             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     290             :                      errmsg("publication_names parameter missing")));
     291             : 
     292             :         /*
     293             :          * Decide whether to enable streaming. It is disabled by default, in
     294             :          * which case we just update the flag in decoding context. Otherwise
     295             :          * we only allow it with sufficient version of the protocol, and when
     296             :          * the output plugin supports it.
     297             :          */
     298         180 :         if (!enable_streaming)
     299         158 :             ctx->streaming = false;
     300          22 :         else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
     301           0 :             ereport(ERROR,
     302             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     303             :                      errmsg("requested proto_version=%d does not support streaming, need %d or higher",
     304             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
     305          22 :         else if (!ctx->streaming)
     306           0 :             ereport(ERROR,
     307             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     308             :                      errmsg("streaming requested, but not supported by output plugin")));
     309             : 
     310             :         /* Also remember we're currently not streaming any transaction. */
     311         180 :         in_streaming = false;
     312             : 
     313             :         /* Init publication state. */
     314         180 :         data->publications = NIL;
     315         180 :         publications_valid = false;
     316         180 :         CacheRegisterSyscacheCallback(PUBLICATIONOID,
     317             :                                       publication_invalidation_cb,
     318             :                                       (Datum) 0);
     319             : 
     320             :         /* Initialize relation schema cache. */
     321         180 :         init_rel_sync_cache(CacheMemoryContext);
     322             :     }
     323             :     else
     324             :     {
     325             :         /* Disable the streaming during the slot initialization mode. */
     326         188 :         ctx->streaming = false;
     327             :     }
     328         368 : }
     329             : 
     330             : /*
     331             :  * BEGIN callback
     332             :  */
     333             : static void
     334         358 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     335             : {
     336         358 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     337             : 
     338         358 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     339         358 :     logicalrep_write_begin(ctx->out, txn);
     340             : 
     341         358 :     if (send_replication_origin)
     342             :     {
     343             :         char       *origin;
     344             : 
     345             :         /* Message boundary */
     346           0 :         OutputPluginWrite(ctx, false);
     347           0 :         OutputPluginPrepareWrite(ctx, true);
     348             : 
     349             :         /*----------
     350             :          * XXX: which behaviour do we want here?
     351             :          *
     352             :          * Alternatives:
     353             :          *  - don't send origin message if origin name not found
     354             :          *    (that's what we do now)
     355             :          *  - throw error - that will break replication, not good
     356             :          *  - send some special "unknown" origin
     357             :          *----------
     358             :          */
     359           0 :         if (replorigin_by_oid(txn->origin_id, true, &origin))
     360           0 :             logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
     361             :     }
     362             : 
     363         358 :     OutputPluginWrite(ctx, true);
     364         358 : }
     365             : 
     366             : /*
     367             :  * COMMIT callback
     368             :  */
     369             : static void
     370         358 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     371             :                     XLogRecPtr commit_lsn)
     372             : {
     373         358 :     OutputPluginUpdateProgress(ctx);
     374             : 
     375         358 :     OutputPluginPrepareWrite(ctx, true);
     376         358 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     377         358 :     OutputPluginWrite(ctx, true);
     378         358 : }
     379             : 
     380             : /*
     381             :  * Write the current schema of the relation and its ancestor (if any) if not
     382             :  * done yet.
     383             :  */
     384             : static void
     385      114220 : maybe_send_schema(LogicalDecodingContext *ctx,
     386             :                   ReorderBufferTXN *txn, ReorderBufferChange *change,
     387             :                   Relation relation, RelationSyncEntry *relentry)
     388             : {
     389             :     bool        schema_sent;
     390      114220 :     TransactionId xid = InvalidTransactionId;
     391      114220 :     TransactionId topxid = InvalidTransactionId;
     392             : 
     393             :     /*
     394             :      * Remember XID of the (sub)transaction for the change. We don't care if
     395             :      * it's top-level transaction or not (we have already sent that XID in
     396             :      * start of the current streaming block).
     397             :      *
     398             :      * If we're not in a streaming block, just use InvalidTransactionId and
     399             :      * the write methods will not include it.
     400             :      */
     401      114220 :     if (in_streaming)
     402      112748 :         xid = change->txn->xid;
     403             : 
     404      114220 :     if (change->txn->toptxn)
     405       25618 :         topxid = change->txn->toptxn->xid;
     406             :     else
     407       88602 :         topxid = xid;
     408             : 
     409             :     /*
     410             :      * Do we need to send the schema? We do track streamed transactions
     411             :      * separately, because those may be applied later (and the regular
     412             :      * transactions won't see their effects until then) and in an order that
     413             :      * we don't know at this point.
     414             :      *
     415             :      * XXX There is a scope of optimization here. Currently, we always send
     416             :      * the schema first time in a streaming transaction but we can probably
     417             :      * avoid that by checking 'relentry->schema_sent' flag. However, before
     418             :      * doing that we need to study its impact on the case where we have a mix
     419             :      * of streaming and non-streaming transactions.
     420             :      */
     421      114220 :     if (in_streaming)
     422      112748 :         schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
     423             :     else
     424        1472 :         schema_sent = relentry->schema_sent;
     425             : 
     426      114220 :     if (schema_sent)
     427      114062 :         return;
     428             : 
     429             :     /* If needed, send the ancestor's schema first. */
     430         158 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
     431             :     {
     432           0 :         Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     433           0 :         TupleDesc   indesc = RelationGetDescr(relation);
     434           0 :         TupleDesc   outdesc = RelationGetDescr(ancestor);
     435             :         MemoryContext oldctx;
     436             : 
     437             :         /* Map must live as long as the session does. */
     438           0 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     439           0 :         relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
     440             :                                                CreateTupleDescCopy(outdesc));
     441           0 :         MemoryContextSwitchTo(oldctx);
     442           0 :         send_relation_and_attrs(ancestor, xid, ctx);
     443           0 :         RelationClose(ancestor);
     444             :     }
     445             : 
     446         158 :     send_relation_and_attrs(relation, xid, ctx);
     447             : 
     448         158 :     if (in_streaming)
     449          38 :         set_schema_sent_in_streamed_txn(relentry, topxid);
     450             :     else
     451         120 :         relentry->schema_sent = true;
     452             : }
     453             : 
     454             : /*
     455             :  * Sends a relation
     456             :  */
     457             : static void
     458         158 : send_relation_and_attrs(Relation relation, TransactionId xid,
     459             :                         LogicalDecodingContext *ctx)
     460             : {
     461         158 :     TupleDesc   desc = RelationGetDescr(relation);
     462             :     int         i;
     463             : 
     464             :     /*
     465             :      * Write out type info if needed.  We do that only for user-created types.
     466             :      * We use FirstGenbkiObjectId as the cutoff, so that we only consider
     467             :      * objects with hand-assigned OIDs to be "built in", not for instance any
     468             :      * function or type defined in the information_schema. This is important
     469             :      * because only hand-assigned OIDs can be expected to remain stable across
     470             :      * major versions.
     471             :      */
     472         520 :     for (i = 0; i < desc->natts; i++)
     473             :     {
     474         362 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     475             : 
     476         362 :         if (att->attisdropped || att->attgenerated)
     477           2 :             continue;
     478             : 
     479         360 :         if (att->atttypid < FirstGenbkiObjectId)
     480         328 :             continue;
     481             : 
     482          32 :         OutputPluginPrepareWrite(ctx, false);
     483          32 :         logicalrep_write_typ(ctx->out, xid, att->atttypid);
     484          32 :         OutputPluginWrite(ctx, false);
     485             :     }
     486             : 
     487         158 :     OutputPluginPrepareWrite(ctx, false);
     488         158 :     logicalrep_write_rel(ctx->out, xid, relation);
     489         158 :     OutputPluginWrite(ctx, false);
     490         158 : }
     491             : 
     492             : /*
     493             :  * Sends the decoded DML over wire.
     494             :  *
     495             :  * This is called both in streaming and non-streaming modes.
     496             :  */
     497             : static void
     498      116404 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     499             :                 Relation relation, ReorderBufferChange *change)
     500             : {
     501      116404 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     502             :     MemoryContext old;
     503             :     RelationSyncEntry *relentry;
     504      116404 :     TransactionId xid = InvalidTransactionId;
     505      116404 :     Relation    ancestor = NULL;
     506             : 
     507      116404 :     if (!is_publishable_relation(relation))
     508           4 :         return;
     509             : 
     510             :     /*
     511             :      * Remember the xid for the change in streaming mode. We need to send xid
     512             :      * with each change in the streaming mode so that subscriber can make
     513             :      * their association and on aborts, it can discard the corresponding
     514             :      * changes.
     515             :      */
     516      116400 :     if (in_streaming)
     517      112748 :         xid = change->txn->xid;
     518             : 
     519      116400 :     relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
     520             : 
     521             :     /* First check the table filter */
     522      116400 :     switch (change->action)
     523             :     {
     524       62738 :         case REORDER_BUFFER_CHANGE_INSERT:
     525       62738 :             if (!relentry->pubactions.pubinsert)
     526           4 :                 return;
     527       62734 :             break;
     528       32832 :         case REORDER_BUFFER_CHANGE_UPDATE:
     529       32832 :             if (!relentry->pubactions.pubupdate)
     530          80 :                 return;
     531       32752 :             break;
     532       20830 :         case REORDER_BUFFER_CHANGE_DELETE:
     533       20830 :             if (!relentry->pubactions.pubdelete)
     534        2106 :                 return;
     535       18724 :             break;
     536      114210 :         default:
     537             :             Assert(false);
     538             :     }
     539             : 
     540             :     /* Avoid leaking memory by using and resetting our own context */
     541      114210 :     old = MemoryContextSwitchTo(data->context);
     542             : 
     543      114210 :     maybe_send_schema(ctx, txn, change, relation, relentry);
     544             : 
     545             :     /* Send the data */
     546      114210 :     switch (change->action)
     547             :     {
     548       62734 :         case REORDER_BUFFER_CHANGE_INSERT:
     549             :             {
     550       62734 :                 HeapTuple   tuple = &change->data.tp.newtuple->tuple;
     551             : 
     552             :                 /* Switch relation if publishing via root. */
     553       62734 :                 if (relentry->publish_as_relid != RelationGetRelid(relation))
     554             :                 {
     555             :                     Assert(relation->rd_rel->relispartition);
     556           0 :                     ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     557           0 :                     relation = ancestor;
     558             :                     /* Convert tuple if needed. */
     559           0 :                     if (relentry->map)
     560           0 :                         tuple = execute_attr_map_tuple(tuple, relentry->map);
     561             :                 }
     562             : 
     563       62734 :                 OutputPluginPrepareWrite(ctx, true);
     564       62734 :                 logicalrep_write_insert(ctx->out, xid, relation, tuple,
     565       62734 :                                         data->binary);
     566       62734 :                 OutputPluginWrite(ctx, true);
     567       62734 :                 break;
     568             :             }
     569       32752 :         case REORDER_BUFFER_CHANGE_UPDATE:
     570             :             {
     571       65504 :                 HeapTuple   oldtuple = change->data.tp.oldtuple ?
     572       32752 :                 &change->data.tp.oldtuple->tuple : NULL;
     573       32752 :                 HeapTuple   newtuple = &change->data.tp.newtuple->tuple;
     574             : 
     575             :                 /* Switch relation if publishing via root. */
     576       32752 :                 if (relentry->publish_as_relid != RelationGetRelid(relation))
     577             :                 {
     578             :                     Assert(relation->rd_rel->relispartition);
     579           0 :                     ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     580           0 :                     relation = ancestor;
     581             :                     /* Convert tuples if needed. */
     582           0 :                     if (relentry->map)
     583             :                     {
     584           0 :                         oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
     585           0 :                         newtuple = execute_attr_map_tuple(newtuple, relentry->map);
     586             :                     }
     587             :                 }
     588             : 
     589       32752 :                 OutputPluginPrepareWrite(ctx, true);
     590       32752 :                 logicalrep_write_update(ctx->out, xid, relation, oldtuple,
     591       32752 :                                         newtuple, data->binary);
     592       32752 :                 OutputPluginWrite(ctx, true);
     593       32752 :                 break;
     594             :             }
     595       18724 :         case REORDER_BUFFER_CHANGE_DELETE:
     596       18724 :             if (change->data.tp.oldtuple)
     597             :             {
     598       18724 :                 HeapTuple   oldtuple = &change->data.tp.oldtuple->tuple;
     599             : 
     600             :                 /* Switch relation if publishing via root. */
     601       18724 :                 if (relentry->publish_as_relid != RelationGetRelid(relation))
     602             :                 {
     603             :                     Assert(relation->rd_rel->relispartition);
     604           0 :                     ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     605           0 :                     relation = ancestor;
     606             :                     /* Convert tuple if needed. */
     607           0 :                     if (relentry->map)
     608           0 :                         oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
     609             :                 }
     610             : 
     611       18724 :                 OutputPluginPrepareWrite(ctx, true);
     612       18724 :                 logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
     613       18724 :                                         data->binary);
     614       18724 :                 OutputPluginWrite(ctx, true);
     615             :             }
     616             :             else
     617           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
     618       18724 :             break;
     619      114210 :         default:
     620             :             Assert(false);
     621             :     }
     622             : 
     623      114210 :     if (RelationIsValid(ancestor))
     624             :     {
     625           0 :         RelationClose(ancestor);
     626           0 :         ancestor = NULL;
     627             :     }
     628             : 
     629             :     /* Cleanup */
     630      114210 :     MemoryContextSwitchTo(old);
     631      114210 :     MemoryContextReset(data->context);
     632             : }
     633             : 
     634             : static void
     635          14 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     636             :                   int nrelations, Relation relations[], ReorderBufferChange *change)
     637             : {
     638          14 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     639             :     MemoryContext old;
     640             :     RelationSyncEntry *relentry;
     641             :     int         i;
     642             :     int         nrelids;
     643             :     Oid        *relids;
     644          14 :     TransactionId xid = InvalidTransactionId;
     645             : 
     646             :     /* Remember the xid for the change in streaming mode. See pgoutput_change. */
     647          14 :     if (in_streaming)
     648           0 :         xid = change->txn->xid;
     649             : 
     650          14 :     old = MemoryContextSwitchTo(data->context);
     651             : 
     652          14 :     relids = palloc0(nrelations * sizeof(Oid));
     653          14 :     nrelids = 0;
     654             : 
     655          36 :     for (i = 0; i < nrelations; i++)
     656             :     {
     657          22 :         Relation    relation = relations[i];
     658          22 :         Oid         relid = RelationGetRelid(relation);
     659             : 
     660          22 :         if (!is_publishable_relation(relation))
     661           0 :             continue;
     662             : 
     663          22 :         relentry = get_rel_sync_entry(data, relid);
     664             : 
     665          22 :         if (!relentry->pubactions.pubtruncate)
     666          12 :             continue;
     667             : 
     668             :         /*
     669             :          * Don't send partitions if the publication wants to send only the
     670             :          * root tables through it.
     671             :          */
     672          10 :         if (relation->rd_rel->relispartition &&
     673           8 :             relentry->publish_as_relid != relid)
     674           0 :             continue;
     675             : 
     676          10 :         relids[nrelids++] = relid;
     677          10 :         maybe_send_schema(ctx, txn, change, relation, relentry);
     678             :     }
     679             : 
     680          14 :     if (nrelids > 0)
     681             :     {
     682           6 :         OutputPluginPrepareWrite(ctx, true);
     683          12 :         logicalrep_write_truncate(ctx->out,
     684             :                                   xid,
     685             :                                   nrelids,
     686             :                                   relids,
     687           6 :                                   change->data.truncate.cascade,
     688           6 :                                   change->data.truncate.restart_seqs);
     689           6 :         OutputPluginWrite(ctx, true);
     690             :     }
     691             : 
     692          14 :     MemoryContextSwitchTo(old);
     693          14 :     MemoryContextReset(data->context);
     694          14 : }
     695             : 
     696             : /*
     697             :  * Currently we always forward.
     698             :  */
     699             : static bool
     700      160766 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
     701             :                        RepOriginId origin_id)
     702             : {
     703      160766 :     return false;
     704             : }
     705             : 
     706             : /*
     707             :  * Shutdown the output plugin.
     708             :  *
     709             :  * Note, we don't need to clean the data->context as it's child context
     710             :  * of the ctx->context so it will be cleaned up by logical decoding machinery.
     711             :  */
     712             : static void
     713         308 : pgoutput_shutdown(LogicalDecodingContext *ctx)
     714             : {
     715         308 :     if (RelationSyncCache)
     716             :     {
     717         120 :         hash_destroy(RelationSyncCache);
     718         120 :         RelationSyncCache = NULL;
     719             :     }
     720         308 : }
     721             : 
     722             : /*
     723             :  * Load publications from the list of publication names.
     724             :  */
     725             : static List *
     726          56 : LoadPublications(List *pubnames)
     727             : {
     728          56 :     List       *result = NIL;
     729             :     ListCell   *lc;
     730             : 
     731         120 :     foreach(lc, pubnames)
     732             :     {
     733          64 :         char       *pubname = (char *) lfirst(lc);
     734          64 :         Publication *pub = GetPublicationByName(pubname, false);
     735             : 
     736          64 :         result = lappend(result, pub);
     737             :     }
     738             : 
     739          56 :     return result;
     740             : }
     741             : 
     742             : /*
     743             :  * Publication cache invalidation callback.
     744             :  */
     745             : static void
     746          72 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
     747             : {
     748          72 :     publications_valid = false;
     749             : 
     750             :     /*
     751             :      * Also invalidate per-relation cache so that next time the filtering info
     752             :      * is checked it will be updated with the new publication settings.
     753             :      */
     754          72 :     rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
     755          72 : }
     756             : 
     757             : /*
     758             :  * START STREAM callback
     759             :  */
     760             : static void
     761         602 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
     762             :                       ReorderBufferTXN *txn)
     763             : {
     764         602 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     765             : 
     766             :     /* we can't nest streaming of transactions */
     767             :     Assert(!in_streaming);
     768             : 
     769             :     /*
     770             :      * If we already sent the first stream for this transaction then don't
     771             :      * send the origin id in the subsequent streams.
     772             :      */
     773         602 :     if (rbtxn_is_streamed(txn))
     774         580 :         send_replication_origin = false;
     775             : 
     776         602 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     777         602 :     logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
     778             : 
     779         602 :     if (send_replication_origin)
     780             :     {
     781             :         char       *origin;
     782             : 
     783             :         /* Message boundary */
     784           0 :         OutputPluginWrite(ctx, false);
     785           0 :         OutputPluginPrepareWrite(ctx, true);
     786             : 
     787           0 :         if (replorigin_by_oid(txn->origin_id, true, &origin))
     788           0 :             logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
     789             :     }
     790             : 
     791         602 :     OutputPluginWrite(ctx, true);
     792             : 
     793             :     /* we're streaming a chunk of transaction now */
     794         602 :     in_streaming = true;
     795         602 : }
     796             : 
     797             : /*
     798             :  * STOP STREAM callback
     799             :  */
     800             : static void
     801         602 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
     802             :                      ReorderBufferTXN *txn)
     803             : {
     804             :     /* we should be streaming a trasanction */
     805             :     Assert(in_streaming);
     806             : 
     807         602 :     OutputPluginPrepareWrite(ctx, true);
     808         602 :     logicalrep_write_stream_stop(ctx->out);
     809         602 :     OutputPluginWrite(ctx, true);
     810             : 
     811             :     /* we've stopped streaming a transaction */
     812         602 :     in_streaming = false;
     813         602 : }
     814             : 
     815             : /*
     816             :  * Notify downstream to discard the streamed transaction (along with all
     817             :  * it's subtransactions, if it's a toplevel transaction).
     818             :  */
     819             : static void
     820          26 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
     821             :                       ReorderBufferTXN *txn,
     822             :                       XLogRecPtr abort_lsn)
     823             : {
     824             :     ReorderBufferTXN *toptxn;
     825             : 
     826             :     /*
     827             :      * The abort should happen outside streaming block, even for streamed
     828             :      * transactions. The transaction has to be marked as streamed, though.
     829             :      */
     830             :     Assert(!in_streaming);
     831             : 
     832             :     /* determine the toplevel transaction */
     833          26 :     toptxn = (txn->toptxn) ? txn->toptxn : txn;
     834             : 
     835             :     Assert(rbtxn_is_streamed(toptxn));
     836             : 
     837          26 :     OutputPluginPrepareWrite(ctx, true);
     838          26 :     logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
     839          26 :     OutputPluginWrite(ctx, true);
     840             : 
     841          26 :     cleanup_rel_sync_cache(toptxn->xid, false);
     842          26 : }
     843             : 
     844             : /*
     845             :  * Notify downstream to apply the streamed transaction (along with all
     846             :  * it's subtransactions).
     847             :  */
     848             : static void
     849          20 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
     850             :                        ReorderBufferTXN *txn,
     851             :                        XLogRecPtr commit_lsn)
     852             : {
     853             :     /*
     854             :      * The commit should happen outside streaming block, even for streamed
     855             :      * transactions. The transaction has to be marked as streamed, though.
     856             :      */
     857             :     Assert(!in_streaming);
     858             :     Assert(rbtxn_is_streamed(txn));
     859             : 
     860          20 :     OutputPluginUpdateProgress(ctx);
     861             : 
     862          20 :     OutputPluginPrepareWrite(ctx, true);
     863          20 :     logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
     864          20 :     OutputPluginWrite(ctx, true);
     865             : 
     866          20 :     cleanup_rel_sync_cache(txn->xid, true);
     867          20 : }
     868             : 
     869             : /*
     870             :  * Initialize the relation schema sync cache for a decoding session.
     871             :  *
     872             :  * The hash table is destroyed at the end of a decoding session. While
     873             :  * relcache invalidations still exist and will still be invoked, they
     874             :  * will just see the null hash table global and take no action.
     875             :  */
     876             : static void
     877         180 : init_rel_sync_cache(MemoryContext cachectx)
     878             : {
     879             :     HASHCTL     ctl;
     880             : 
     881         180 :     if (RelationSyncCache != NULL)
     882           0 :         return;
     883             : 
     884             :     /* Make a new hash table for the cache */
     885         180 :     ctl.keysize = sizeof(Oid);
     886         180 :     ctl.entrysize = sizeof(RelationSyncEntry);
     887         180 :     ctl.hcxt = cachectx;
     888             : 
     889         180 :     RelationSyncCache = hash_create("logical replication output relation cache",
     890             :                                     128, &ctl,
     891             :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
     892             : 
     893             :     Assert(RelationSyncCache != NULL);
     894             : 
     895         180 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
     896         180 :     CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
     897             :                                   rel_sync_cache_publication_cb,
     898             :                                   (Datum) 0);
     899             : }
     900             : 
     901             : /*
     902             :  * We expect relatively small number of streamed transactions.
     903             :  */
     904             : static bool
     905      112748 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
     906             : {
     907             :     ListCell   *lc;
     908             : 
     909      132744 :     foreach(lc, entry->streamed_txns)
     910             :     {
     911      132706 :         if (xid == (uint32) lfirst_int(lc))
     912      112710 :             return true;
     913             :     }
     914             : 
     915          38 :     return false;
     916             : }
     917             : 
     918             : /*
     919             :  * Add the xid in the rel sync entry for which we have already sent the schema
     920             :  * of the relation.
     921             :  */
     922             : static void
     923          38 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
     924             : {
     925             :     MemoryContext oldctx;
     926             : 
     927          38 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     928             : 
     929          38 :     entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
     930             : 
     931          38 :     MemoryContextSwitchTo(oldctx);
     932          38 : }
     933             : 
     934             : /*
     935             :  * Find or create entry in the relation schema cache.
     936             :  *
     937             :  * This looks up publications that the given relation is directly or
     938             :  * indirectly part of (the latter if it's really the relation's ancestor that
     939             :  * is part of a publication) and fills up the found entry with the information
     940             :  * about which operations to publish and whether to use an ancestor's schema
     941             :  * when publishing.
     942             :  */
     943             : static RelationSyncEntry *
     944      116422 : get_rel_sync_entry(PGOutputData *data, Oid relid)
     945             : {
     946             :     RelationSyncEntry *entry;
     947      116422 :     bool        am_partition = get_rel_relispartition(relid);
     948      116422 :     char        relkind = get_rel_relkind(relid);
     949             :     bool        found;
     950             :     MemoryContext oldctx;
     951             : 
     952             :     Assert(RelationSyncCache != NULL);
     953             : 
     954             :     /* Find cached relation info, creating if not found */
     955      116422 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
     956             :                                               (void *) &relid,
     957             :                                               HASH_ENTER, &found);
     958             :     Assert(entry != NULL);
     959             : 
     960             :     /* Not found means schema wasn't sent */
     961      116422 :     if (!found)
     962             :     {
     963             :         /* immediately make a new entry valid enough to satisfy callbacks */
     964         118 :         entry->schema_sent = false;
     965         118 :         entry->streamed_txns = NIL;
     966         118 :         entry->replicate_valid = false;
     967         118 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
     968         118 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
     969         118 :         entry->publish_as_relid = InvalidOid;
     970             :     }
     971             : 
     972             :     /* Validate the entry */
     973      116422 :     if (!entry->replicate_valid)
     974             :     {
     975         128 :         List       *pubids = GetRelationPublications(relid);
     976             :         ListCell   *lc;
     977         128 :         Oid         publish_as_relid = relid;
     978             : 
     979             :         /* Reload publications if needed before use. */
     980         128 :         if (!publications_valid)
     981             :         {
     982          56 :             oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     983          56 :             if (data->publications)
     984           8 :                 list_free_deep(data->publications);
     985             : 
     986          56 :             data->publications = LoadPublications(data->publication_names);
     987          56 :             MemoryContextSwitchTo(oldctx);
     988          56 :             publications_valid = true;
     989             :         }
     990             : 
     991             :         /*
     992             :          * Build publication cache. We can't use one provided by relcache as
     993             :          * relcache considers all publications given relation is in, but here
     994             :          * we only need to consider ones that the subscriber requested.
     995             :          */
     996         166 :         foreach(lc, data->publications)
     997             :         {
     998         136 :             Publication *pub = lfirst(lc);
     999         136 :             bool        publish = false;
    1000             : 
    1001         136 :             if (pub->alltables)
    1002             :             {
    1003          60 :                 publish = true;
    1004          60 :                 if (pub->pubviaroot && am_partition)
    1005           0 :                     publish_as_relid = llast_oid(get_partition_ancestors(relid));
    1006             :             }
    1007             : 
    1008         136 :             if (!publish)
    1009             :             {
    1010          76 :                 bool        ancestor_published = false;
    1011             : 
    1012             :                 /*
    1013             :                  * For a partition, check if any of the ancestors are
    1014             :                  * published.  If so, note down the topmost ancestor that is
    1015             :                  * published via this publication, which will be used as the
    1016             :                  * relation via which to publish the partition's changes.
    1017             :                  */
    1018          76 :                 if (am_partition)
    1019             :                 {
    1020           6 :                     List       *ancestors = get_partition_ancestors(relid);
    1021             :                     ListCell   *lc2;
    1022             : 
    1023             :                     /*
    1024             :                      * Find the "topmost" ancestor that is in this
    1025             :                      * publication.
    1026             :                      */
    1027          12 :                     foreach(lc2, ancestors)
    1028             :                     {
    1029           6 :                         Oid         ancestor = lfirst_oid(lc2);
    1030             : 
    1031           6 :                         if (list_member_oid(GetRelationPublications(ancestor),
    1032             :                                             pub->oid))
    1033             :                         {
    1034           6 :                             ancestor_published = true;
    1035           6 :                             if (pub->pubviaroot)
    1036           0 :                                 publish_as_relid = ancestor;
    1037             :                         }
    1038             :                     }
    1039             :                 }
    1040             : 
    1041          76 :                 if (list_member_oid(pubids, pub->oid) || ancestor_published)
    1042          54 :                     publish = true;
    1043             :             }
    1044             : 
    1045             :             /*
    1046             :              * Don't publish changes for partitioned tables, because
    1047             :              * publishing those of its partitions suffices, unless partition
    1048             :              * changes won't be published due to pubviaroot being set.
    1049             :              */
    1050         136 :             if (publish &&
    1051           2 :                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
    1052             :             {
    1053         112 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
    1054         112 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
    1055         112 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
    1056         112 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
    1057             :             }
    1058             : 
    1059         136 :             if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
    1060          98 :                 entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
    1061          98 :                 break;
    1062             :         }
    1063             : 
    1064         128 :         list_free(pubids);
    1065             : 
    1066         128 :         entry->publish_as_relid = publish_as_relid;
    1067         128 :         entry->replicate_valid = true;
    1068             :     }
    1069             : 
    1070      116422 :     return entry;
    1071             : }
    1072             : 
    1073             : /*
    1074             :  * Cleanup list of streamed transactions and update the schema_sent flag.
    1075             :  *
    1076             :  * When a streamed transaction commits or aborts, we need to remove the
    1077             :  * toplevel XID from the schema cache. If the transaction aborted, the
    1078             :  * subscriber will simply throw away the schema records we streamed, so
    1079             :  * we don't need to do anything else.
    1080             :  *
    1081             :  * If the transaction is committed, the subscriber will update the relation
    1082             :  * cache - so tweak the schema_sent flag accordingly.
    1083             :  */
    1084             : static void
    1085          46 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
    1086             : {
    1087             :     HASH_SEQ_STATUS hash_seq;
    1088             :     RelationSyncEntry *entry;
    1089             :     ListCell   *lc;
    1090             : 
    1091             :     Assert(RelationSyncCache != NULL);
    1092             : 
    1093          46 :     hash_seq_init(&hash_seq, RelationSyncCache);
    1094          92 :     while ((entry = hash_seq_search(&hash_seq)) != NULL)
    1095             :     {
    1096             :         /*
    1097             :          * We can set the schema_sent flag for an entry that has committed xid
    1098             :          * in the list as that ensures that the subscriber would have the
    1099             :          * corresponding schema and we don't need to send it unless there is
    1100             :          * any invalidation for that relation.
    1101             :          */
    1102          48 :         foreach(lc, entry->streamed_txns)
    1103             :         {
    1104          22 :             if (xid == (uint32) lfirst_int(lc))
    1105             :             {
    1106          20 :                 if (is_commit)
    1107          12 :                     entry->schema_sent = true;
    1108             : 
    1109          20 :                 entry->streamed_txns =
    1110          20 :                     foreach_delete_current(entry->streamed_txns, lc);
    1111          20 :                 break;
    1112             :             }
    1113             :         }
    1114             :     }
    1115          46 : }
    1116             : 
    1117             : /*
    1118             :  * Relcache invalidation callback
    1119             :  */
    1120             : static void
    1121        1108 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
    1122             : {
    1123             :     RelationSyncEntry *entry;
    1124             : 
    1125             :     /*
    1126             :      * We can get here if the plugin was used in SQL interface as the
    1127             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
    1128             :      * is no way to unregister the relcache invalidation callback.
    1129             :      */
    1130        1108 :     if (RelationSyncCache == NULL)
    1131           0 :         return;
    1132             : 
    1133             :     /*
    1134             :      * Nobody keeps pointers to entries in this hash table around outside
    1135             :      * logical decoding callback calls - but invalidation events can come in
    1136             :      * *during* a callback if we access the relcache in the callback. Because
    1137             :      * of that we must mark the cache entry as invalid but not remove it from
    1138             :      * the hash while it could still be referenced, then prune it at a later
    1139             :      * safe point.
    1140             :      *
    1141             :      * Getting invalidations for relations that aren't in the table is
    1142             :      * entirely normal, since there's no way to unregister for an invalidation
    1143             :      * event. So we don't care if it's found or not.
    1144             :      */
    1145        1108 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
    1146             :                                               HASH_FIND, NULL);
    1147             : 
    1148             :     /*
    1149             :      * Reset schema sent status as the relation definition may have changed.
    1150             :      */
    1151        1108 :     if (entry != NULL)
    1152             :     {
    1153         262 :         entry->schema_sent = false;
    1154         262 :         list_free(entry->streamed_txns);
    1155         262 :         entry->streamed_txns = NIL;
    1156             :     }
    1157             : }
    1158             : 
    1159             : /*
    1160             :  * Publication relation map syscache invalidation callback
    1161             :  */
    1162             : static void
    1163         156 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
    1164             : {
    1165             :     HASH_SEQ_STATUS status;
    1166             :     RelationSyncEntry *entry;
    1167             : 
    1168             :     /*
    1169             :      * We can get here if the plugin was used in SQL interface as the
    1170             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
    1171             :      * is no way to unregister the relcache invalidation callback.
    1172             :      */
    1173         156 :     if (RelationSyncCache == NULL)
    1174           0 :         return;
    1175             : 
    1176             :     /*
    1177             :      * There is no way to find which entry in our cache the hash belongs to so
    1178             :      * mark the whole cache as invalid.
    1179             :      */
    1180         156 :     hash_seq_init(&status, RelationSyncCache);
    1181         792 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    1182             :     {
    1183         636 :         entry->replicate_valid = false;
    1184             : 
    1185             :         /*
    1186             :          * There might be some relations dropped from the publication so we
    1187             :          * don't need to publish the changes for them.
    1188             :          */
    1189         636 :         entry->pubactions.pubinsert = false;
    1190         636 :         entry->pubactions.pubupdate = false;
    1191         636 :         entry->pubactions.pubdelete = false;
    1192         636 :         entry->pubactions.pubtruncate = false;
    1193             :     }
    1194             : }

Generated by: LCOV version 1.13