LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16devel Lines: 724 761 95.1 %
Date: 2022-08-17 05:10:35 Functions: 41 41 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pgoutput.c
       4             :  *      Logical Replication output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2022, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/backend/replication/pgoutput/pgoutput.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/tupconvert.h"
      16             : #include "catalog/partition.h"
      17             : #include "catalog/pg_publication.h"
      18             : #include "catalog/pg_publication_rel.h"
      19             : #include "catalog/pg_subscription.h"
      20             : #include "commands/defrem.h"
      21             : #include "executor/executor.h"
      22             : #include "fmgr.h"
      23             : #include "nodes/makefuncs.h"
      24             : #include "optimizer/optimizer.h"
      25             : #include "replication/logical.h"
      26             : #include "replication/logicalproto.h"
      27             : #include "replication/origin.h"
      28             : #include "replication/pgoutput.h"
      29             : #include "utils/builtins.h"
      30             : #include "utils/inval.h"
      31             : #include "utils/lsyscache.h"
      32             : #include "utils/memutils.h"
      33             : #include "utils/rel.h"
      34             : #include "utils/syscache.h"
      35             : #include "utils/varlena.h"
      36             : 
      37         594 : PG_MODULE_MAGIC;
      38             : 
      39             : static void pgoutput_startup(LogicalDecodingContext *ctx,
      40             :                              OutputPluginOptions *opt, bool is_init);
      41             : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      42             : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      43             :                                ReorderBufferTXN *txn);
      44             : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      45             :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      46             : static void pgoutput_change(LogicalDecodingContext *ctx,
      47             :                             ReorderBufferTXN *txn, Relation rel,
      48             :                             ReorderBufferChange *change);
      49             : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      50             :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      51             :                               ReorderBufferChange *change);
      52             : static void pgoutput_message(LogicalDecodingContext *ctx,
      53             :                              ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      54             :                              bool transactional, const char *prefix,
      55             :                              Size sz, const char *message);
      56             : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      57             :                                    RepOriginId origin_id);
      58             : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
      59             :                                        ReorderBufferTXN *txn);
      60             : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
      61             :                                  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      62             : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
      63             :                                          ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      64             : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
      65             :                                            ReorderBufferTXN *txn,
      66             :                                            XLogRecPtr prepare_end_lsn,
      67             :                                            TimestampTz prepare_time);
      68             : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
      69             :                                   ReorderBufferTXN *txn);
      70             : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
      71             :                                  ReorderBufferTXN *txn);
      72             : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
      73             :                                   ReorderBufferTXN *txn,
      74             :                                   XLogRecPtr abort_lsn);
      75             : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
      76             :                                    ReorderBufferTXN *txn,
      77             :                                    XLogRecPtr commit_lsn);
      78             : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
      79             :                                         ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      80             : 
      81             : static bool publications_valid;
      82             : static bool in_streaming;
      83             : static bool publish_no_origin;
      84             : 
      85             : static List *LoadPublications(List *pubnames);
      86             : static void publication_invalidation_cb(Datum arg, int cacheid,
      87             :                                         uint32 hashvalue);
      88             : static void send_relation_and_attrs(Relation relation, TransactionId xid,
      89             :                                     LogicalDecodingContext *ctx,
      90             :                                     Bitmapset *columns);
      91             : static void send_repl_origin(LogicalDecodingContext *ctx,
      92             :                              RepOriginId origin_id, XLogRecPtr origin_lsn,
      93             :                              bool send_origin);
      94             : static void update_replication_progress(LogicalDecodingContext *ctx,
      95             :                                         bool skipped_xact);
      96             : 
      97             : /*
      98             :  * Only 3 publication actions are used for row filtering ("insert", "update",
      99             :  * "delete"). See RelationSyncEntry.exprstate[].
     100             :  */
     101             : enum RowFilterPubAction
     102             : {
     103             :     PUBACTION_INSERT,
     104             :     PUBACTION_UPDATE,
     105             :     PUBACTION_DELETE
     106             : };
     107             : 
     108             : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
     109             : 
     110             : /*
     111             :  * Entry in the map used to remember which relation schemas we sent.
     112             :  *
     113             :  * The schema_sent flag determines if the current schema record for the
     114             :  * relation (and for its ancestor if publish_as_relid is set) was already
     115             :  * sent to the subscriber (in which case we don't need to send it again).
     116             :  *
     117             :  * The schema cache on downstream is however updated only at commit time,
     118             :  * and with streamed transactions the commit order may be different from
     119             :  * the order the transactions are sent in. Also, the (sub) transactions
     120             :  * might get aborted so we need to send the schema for each (sub) transaction
     121             :  * so that we don't lose the schema information on abort. For handling this,
     122             :  * we maintain the list of xids (streamed_txns) for those we have already sent
     123             :  * the schema.
     124             :  *
     125             :  * For partitions, 'pubactions' considers not only the table's own
     126             :  * publications, but also those of all of its ancestors.
     127             :  */
     128             : typedef struct RelationSyncEntry
     129             : {
     130             :     Oid         relid;          /* relation oid */
     131             : 
     132             :     bool        replicate_valid;    /* overall validity flag for entry */
     133             : 
     134             :     bool        schema_sent;
     135             :     List       *streamed_txns;  /* streamed toplevel transactions with this
     136             :                                  * schema */
     137             : 
     138             :     /* are we publishing this rel? */
     139             :     PublicationActions pubactions;
     140             : 
     141             :     /*
     142             :      * ExprState array for row filter. Different publication actions don't
     143             :      * allow multiple expressions to always be combined into one, because
     144             :      * updates or deletes restrict the column in expression to be part of the
     145             :      * replica identity index whereas inserts do not have this restriction, so
     146             :      * there is one ExprState per publication action.
     147             :      */
     148             :     ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
     149             :     EState     *estate;         /* executor state used for row filter */
     150             :     TupleTableSlot *new_slot;   /* slot for storing new tuple */
     151             :     TupleTableSlot *old_slot;   /* slot for storing old tuple */
     152             : 
     153             :     /*
     154             :      * OID of the relation to publish changes as.  For a partition, this may
     155             :      * be set to one of its ancestors whose schema will be used when
     156             :      * replicating changes, if publish_via_partition_root is set for the
     157             :      * publication.
     158             :      */
     159             :     Oid         publish_as_relid;
     160             : 
     161             :     /*
     162             :      * Map used when replicating using an ancestor's schema to convert tuples
     163             :      * from partition's type to the ancestor's; NULL if publish_as_relid is
     164             :      * same as 'relid' or if unnecessary due to partition and the ancestor
     165             :      * having identical TupleDesc.
     166             :      */
     167             :     AttrMap    *attrmap;
     168             : 
     169             :     /*
     170             :      * Columns included in the publication, or NULL if all columns are
     171             :      * included implicitly.  Note that the attnums in this bitmap are not
     172             :      * shifted by FirstLowInvalidHeapAttributeNumber.
     173             :      */
     174             :     Bitmapset  *columns;
     175             : 
     176             :     /*
     177             :      * Private context to store additional data for this entry - state for the
     178             :      * row filter expressions, column list, etc.
     179             :      */
     180             :     MemoryContext entry_cxt;
     181             : } RelationSyncEntry;
     182             : 
     183             : /*
     184             :  * Maintain a per-transaction level variable to track whether the transaction
     185             :  * has sent BEGIN. BEGIN is only sent when the first change in a transaction
     186             :  * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
     187             :  * messages for empty transactions which saves network bandwidth.
     188             :  *
     189             :  * This optimization is not used for prepared transactions because if the
     190             :  * WALSender restarts after prepare of a transaction and before commit prepared
     191             :  * of the same transaction then we won't be able to figure out if we have
     192             :  * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
     193             :  * because we would have lost the in-memory txndata information that was
     194             :  * present prior to the restart. This will result in sending a spurious
     195             :  * COMMIT PREPARED without a corresponding prepared transaction at the
     196             :  * downstream which would lead to an error when it tries to process it.
     197             :  *
     198             :  * XXX We could achieve this optimization by changing protocol to send
     199             :  * additional information so that downstream can detect that the corresponding
     200             :  * prepare has not been sent. However, adding such a check for every
     201             :  * transaction in the downstream could be costly so we might want to do it
     202             :  * optionally.
     203             :  *
     204             :  * We also don't have this optimization for streamed transactions because
     205             :  * they can contain prepared transactions.
     206             :  */
     207             : typedef struct PGOutputTxnData
     208             : {
     209             :     bool        sent_begin_txn; /* flag indicating whether BEGIN has been sent */
     210             : } PGOutputTxnData;
     211             : 
     212             : /* Map used to remember which relation schemas we sent. */
     213             : static HTAB *RelationSyncCache = NULL;
     214             : 
     215             : static void init_rel_sync_cache(MemoryContext decoding_context);
     216             : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
     217             : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
     218             :                                              Relation relation);
     219             : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
     220             : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
     221             :                                           uint32 hashvalue);
     222             : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     223             :                                             TransactionId xid);
     224             : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     225             :                                             TransactionId xid);
     226             : static void init_tuple_slot(PGOutputData *data, Relation relation,
     227             :                             RelationSyncEntry *entry);
     228             : 
     229             : /* row filter routines */
     230             : static EState *create_estate_for_relation(Relation rel);
     231             : static void pgoutput_row_filter_init(PGOutputData *data,
     232             :                                      List *publications,
     233             :                                      RelationSyncEntry *entry);
     234             : static bool pgoutput_row_filter_exec_expr(ExprState *state,
     235             :                                           ExprContext *econtext);
     236             : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
     237             :                                 TupleTableSlot **new_slot_ptr,
     238             :                                 RelationSyncEntry *entry,
     239             :                                 ReorderBufferChangeType *action);
     240             : 
     241             : /* column list routines */
     242             : static void pgoutput_column_list_init(PGOutputData *data,
     243             :                                       List *publications,
     244             :                                       RelationSyncEntry *entry);
     245             : 
     246             : /*
     247             :  * Specify output plugin callbacks
     248             :  */
     249             : void
     250         856 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     251             : {
     252             :     AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
     253             : 
     254         856 :     cb->startup_cb = pgoutput_startup;
     255         856 :     cb->begin_cb = pgoutput_begin_txn;
     256         856 :     cb->change_cb = pgoutput_change;
     257         856 :     cb->truncate_cb = pgoutput_truncate;
     258         856 :     cb->message_cb = pgoutput_message;
     259         856 :     cb->commit_cb = pgoutput_commit_txn;
     260             : 
     261         856 :     cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
     262         856 :     cb->prepare_cb = pgoutput_prepare_txn;
     263         856 :     cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
     264         856 :     cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
     265         856 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
     266         856 :     cb->shutdown_cb = pgoutput_shutdown;
     267             : 
     268             :     /* transaction streaming */
     269         856 :     cb->stream_start_cb = pgoutput_stream_start;
     270         856 :     cb->stream_stop_cb = pgoutput_stream_stop;
     271         856 :     cb->stream_abort_cb = pgoutput_stream_abort;
     272         856 :     cb->stream_commit_cb = pgoutput_stream_commit;
     273         856 :     cb->stream_change_cb = pgoutput_change;
     274         856 :     cb->stream_message_cb = pgoutput_message;
     275         856 :     cb->stream_truncate_cb = pgoutput_truncate;
     276             :     /* transaction streaming - two-phase commit */
     277         856 :     cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
     278         856 : }
     279             : 
     280             : static void
     281         462 : parse_output_parameters(List *options, PGOutputData *data)
     282             : {
     283             :     ListCell   *lc;
     284         462 :     bool        protocol_version_given = false;
     285         462 :     bool        publication_names_given = false;
     286         462 :     bool        binary_option_given = false;
     287         462 :     bool        messages_option_given = false;
     288         462 :     bool        streaming_given = false;
     289         462 :     bool        two_phase_option_given = false;
     290         462 :     bool        origin_option_given = false;
     291             : 
     292         462 :     data->binary = false;
     293         462 :     data->streaming = false;
     294         462 :     data->messages = false;
     295         462 :     data->two_phase = false;
     296             : 
     297        1912 :     foreach(lc, options)
     298             :     {
     299        1450 :         DefElem    *defel = (DefElem *) lfirst(lc);
     300             : 
     301             :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     302             : 
     303             :         /* Check each param, whether or not we recognize it */
     304        1450 :         if (strcmp(defel->defname, "proto_version") == 0)
     305             :         {
     306             :             unsigned long parsed;
     307             :             char       *endptr;
     308             : 
     309         462 :             if (protocol_version_given)
     310           0 :                 ereport(ERROR,
     311             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     312             :                          errmsg("conflicting or redundant options")));
     313         462 :             protocol_version_given = true;
     314             : 
     315         462 :             errno = 0;
     316         462 :             parsed = strtoul(strVal(defel->arg), &endptr, 10);
     317         462 :             if (errno != 0 || *endptr != '\0')
     318           0 :                 ereport(ERROR,
     319             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     320             :                          errmsg("invalid proto_version")));
     321             : 
     322         462 :             if (parsed > PG_UINT32_MAX)
     323           0 :                 ereport(ERROR,
     324             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     325             :                          errmsg("proto_version \"%s\" out of range",
     326             :                                 strVal(defel->arg))));
     327             : 
     328         462 :             data->protocol_version = (uint32) parsed;
     329             :         }
     330         988 :         else if (strcmp(defel->defname, "publication_names") == 0)
     331             :         {
     332         462 :             if (publication_names_given)
     333           0 :                 ereport(ERROR,
     334             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     335             :                          errmsg("conflicting or redundant options")));
     336         462 :             publication_names_given = true;
     337             : 
     338         462 :             if (!SplitIdentifierString(strVal(defel->arg), ',',
     339             :                                        &data->publication_names))
     340           0 :                 ereport(ERROR,
     341             :                         (errcode(ERRCODE_INVALID_NAME),
     342             :                          errmsg("invalid publication_names syntax")));
     343             :         }
     344         526 :         else if (strcmp(defel->defname, "binary") == 0)
     345             :         {
     346          10 :             if (binary_option_given)
     347           0 :                 ereport(ERROR,
     348             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     349             :                          errmsg("conflicting or redundant options")));
     350          10 :             binary_option_given = true;
     351             : 
     352          10 :             data->binary = defGetBoolean(defel);
     353             :         }
     354         516 :         else if (strcmp(defel->defname, "messages") == 0)
     355             :         {
     356           8 :             if (messages_option_given)
     357           0 :                 ereport(ERROR,
     358             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     359             :                          errmsg("conflicting or redundant options")));
     360           8 :             messages_option_given = true;
     361             : 
     362           8 :             data->messages = defGetBoolean(defel);
     363             :         }
     364         508 :         else if (strcmp(defel->defname, "streaming") == 0)
     365             :         {
     366          46 :             if (streaming_given)
     367           0 :                 ereport(ERROR,
     368             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     369             :                          errmsg("conflicting or redundant options")));
     370          46 :             streaming_given = true;
     371             : 
     372          46 :             data->streaming = defGetBoolean(defel);
     373             :         }
     374         462 :         else if (strcmp(defel->defname, "two_phase") == 0)
     375             :         {
     376          10 :             if (two_phase_option_given)
     377           0 :                 ereport(ERROR,
     378             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     379             :                          errmsg("conflicting or redundant options")));
     380          10 :             two_phase_option_given = true;
     381             : 
     382          10 :             data->two_phase = defGetBoolean(defel);
     383             :         }
     384         452 :         else if (strcmp(defel->defname, "origin") == 0)
     385             :         {
     386         452 :             if (origin_option_given)
     387           0 :                 ereport(ERROR,
     388             :                         errcode(ERRCODE_SYNTAX_ERROR),
     389             :                         errmsg("conflicting or redundant options"));
     390         452 :             origin_option_given = true;
     391             : 
     392         452 :             data->origin = defGetString(defel);
     393         452 :             if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
     394          10 :                 publish_no_origin = true;
     395         442 :             else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
     396         442 :                 publish_no_origin = false;
     397             :             else
     398           0 :                 ereport(ERROR,
     399             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     400             :                         errmsg("unrecognized origin value: \"%s\"", data->origin));
     401             :         }
     402             :         else
     403           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     404             :     }
     405         462 : }
     406             : 
     407             : /*
     408             :  * Initialize this plugin
     409             :  */
     410             : static void
     411         856 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     412             :                  bool is_init)
     413             : {
     414         856 :     PGOutputData *data = palloc0(sizeof(PGOutputData));
     415             : 
     416             :     /* Create our memory context for private allocations. */
     417         856 :     data->context = AllocSetContextCreate(ctx->context,
     418             :                                           "logical replication output context",
     419             :                                           ALLOCSET_DEFAULT_SIZES);
     420             : 
     421         856 :     data->cachectx = AllocSetContextCreate(ctx->context,
     422             :                                            "logical replication cache context",
     423             :                                            ALLOCSET_DEFAULT_SIZES);
     424             : 
     425         856 :     ctx->output_plugin_private = data;
     426             : 
     427             :     /* This plugin uses binary protocol. */
     428         856 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     429             : 
     430             :     /*
     431             :      * This is replication start and not slot initialization.
     432             :      *
     433             :      * Parse and validate options passed by the client.
     434             :      */
     435         856 :     if (!is_init)
     436             :     {
     437             :         /* Parse the params and ERROR if we see any we don't recognize */
     438         462 :         parse_output_parameters(ctx->output_plugin_options, data);
     439             : 
     440             :         /* Check if we support requested protocol */
     441         462 :         if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
     442           0 :             ereport(ERROR,
     443             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     444             :                      errmsg("client sent proto_version=%d but we only support protocol %d or lower",
     445             :                             data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
     446             : 
     447         462 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     448           0 :             ereport(ERROR,
     449             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     450             :                      errmsg("client sent proto_version=%d but we only support protocol %d or higher",
     451             :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     452             : 
     453         462 :         if (list_length(data->publication_names) < 1)
     454           0 :             ereport(ERROR,
     455             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     456             :                      errmsg("publication_names parameter missing")));
     457             : 
     458             :         /*
     459             :          * Decide whether to enable streaming. It is disabled by default, in
     460             :          * which case we just update the flag in decoding context. Otherwise
     461             :          * we only allow it with sufficient version of the protocol, and when
     462             :          * the output plugin supports it.
     463             :          */
     464         462 :         if (!data->streaming)
     465         416 :             ctx->streaming = false;
     466          46 :         else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
     467           0 :             ereport(ERROR,
     468             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     469             :                      errmsg("requested proto_version=%d does not support streaming, need %d or higher",
     470             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
     471          46 :         else if (!ctx->streaming)
     472           0 :             ereport(ERROR,
     473             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     474             :                      errmsg("streaming requested, but not supported by output plugin")));
     475             : 
     476             :         /* Also remember we're currently not streaming any transaction. */
     477         462 :         in_streaming = false;
     478             : 
     479             :         /*
     480             :          * Here, we just check whether the two-phase option is passed by
     481             :          * plugin and decide whether to enable it at later point of time. It
     482             :          * remains enabled if the previous start-up has done so. But we only
     483             :          * allow the option to be passed in with sufficient version of the
     484             :          * protocol, and when the output plugin supports it.
     485             :          */
     486         462 :         if (!data->two_phase)
     487         452 :             ctx->twophase_opt_given = false;
     488          10 :         else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
     489           0 :             ereport(ERROR,
     490             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     491             :                      errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
     492             :                             data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
     493          10 :         else if (!ctx->twophase)
     494           0 :             ereport(ERROR,
     495             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     496             :                      errmsg("two-phase commit requested, but not supported by output plugin")));
     497             :         else
     498          10 :             ctx->twophase_opt_given = true;
     499             : 
     500             :         /* Init publication state. */
     501         462 :         data->publications = NIL;
     502         462 :         publications_valid = false;
     503         462 :         CacheRegisterSyscacheCallback(PUBLICATIONOID,
     504             :                                       publication_invalidation_cb,
     505             :                                       (Datum) 0);
     506             : 
     507             :         /* Initialize relation schema cache. */
     508         462 :         init_rel_sync_cache(CacheMemoryContext);
     509             :     }
     510             :     else
     511             :     {
     512             :         /*
     513             :          * Disable the streaming and prepared transactions during the slot
     514             :          * initialization mode.
     515             :          */
     516         394 :         ctx->streaming = false;
     517         394 :         ctx->twophase = false;
     518             :     }
     519         856 : }
     520             : 
     521             : /*
     522             :  * BEGIN callback.
     523             :  *
     524             :  * Don't send the BEGIN message here instead postpone it until the first
     525             :  * change. In logical replication, a common scenario is to replicate a set of
     526             :  * tables (instead of all tables) and transactions whose changes were on
     527             :  * the table(s) that are not published will produce empty transactions. These
     528             :  * empty transactions will send BEGIN and COMMIT messages to subscribers,
     529             :  * using bandwidth on something with little/no use for logical replication.
     530             :  */
     531             : static void
     532         988 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     533             : {
     534         988 :     PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
     535             :                                                       sizeof(PGOutputTxnData));
     536             : 
     537         988 :     txn->output_plugin_private = txndata;
     538         988 : }
     539             : 
     540             : /*
     541             :  * Send BEGIN.
     542             :  *
     543             :  * This is called while processing the first change of the transaction.
     544             :  */
     545             : static void
     546         592 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     547             : {
     548         592 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     549         592 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     550             : 
     551             :     Assert(txndata);
     552             :     Assert(!txndata->sent_begin_txn);
     553             : 
     554         592 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     555         592 :     logicalrep_write_begin(ctx->out, txn);
     556         592 :     txndata->sent_begin_txn = true;
     557             : 
     558         592 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     559             :                      send_replication_origin);
     560             : 
     561         592 :     OutputPluginWrite(ctx, true);
     562         592 : }
     563             : 
     564             : /*
     565             :  * COMMIT callback
     566             :  */
     567             : static void
     568         984 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     569             :                     XLogRecPtr commit_lsn)
     570             : {
     571         984 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     572             :     bool        sent_begin_txn;
     573             : 
     574             :     Assert(txndata);
     575             : 
     576             :     /*
     577             :      * We don't need to send the commit message unless some relevant change
     578             :      * from this transaction has been sent to the downstream.
     579             :      */
     580         984 :     sent_begin_txn = txndata->sent_begin_txn;
     581         984 :     update_replication_progress(ctx, !sent_begin_txn);
     582         984 :     pfree(txndata);
     583         984 :     txn->output_plugin_private = NULL;
     584             : 
     585         984 :     if (!sent_begin_txn)
     586             :     {
     587         392 :         elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
     588         392 :         return;
     589             :     }
     590             : 
     591         592 :     OutputPluginPrepareWrite(ctx, true);
     592         592 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     593         592 :     OutputPluginWrite(ctx, true);
     594             : }
     595             : 
     596             : /*
     597             :  * BEGIN PREPARE callback
     598             :  */
     599             : static void
     600          36 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     601             : {
     602          36 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     603             : 
     604          36 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     605          36 :     logicalrep_write_begin_prepare(ctx->out, txn);
     606             : 
     607          36 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     608             :                      send_replication_origin);
     609             : 
     610          36 :     OutputPluginWrite(ctx, true);
     611          36 : }
     612             : 
     613             : /*
     614             :  * PREPARE callback
     615             :  */
     616             : static void
     617          36 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     618             :                      XLogRecPtr prepare_lsn)
     619             : {
     620          36 :     update_replication_progress(ctx, false);
     621             : 
     622          36 :     OutputPluginPrepareWrite(ctx, true);
     623          36 :     logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
     624          36 :     OutputPluginWrite(ctx, true);
     625          36 : }
     626             : 
     627             : /*
     628             :  * COMMIT PREPARED callback
     629             :  */
     630             : static void
     631          36 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     632             :                              XLogRecPtr commit_lsn)
     633             : {
     634          36 :     update_replication_progress(ctx, false);
     635             : 
     636          36 :     OutputPluginPrepareWrite(ctx, true);
     637          36 :     logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
     638          36 :     OutputPluginWrite(ctx, true);
     639          36 : }
     640             : 
     641             : /*
     642             :  * ROLLBACK PREPARED callback
     643             :  */
     644             : static void
     645          14 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
     646             :                                ReorderBufferTXN *txn,
     647             :                                XLogRecPtr prepare_end_lsn,
     648             :                                TimestampTz prepare_time)
     649             : {
     650          14 :     update_replication_progress(ctx, false);
     651             : 
     652          14 :     OutputPluginPrepareWrite(ctx, true);
     653          14 :     logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
     654             :                                        prepare_time);
     655          14 :     OutputPluginWrite(ctx, true);
     656          14 : }
     657             : 
     658             : /*
     659             :  * Write the current schema of the relation and its ancestor (if any) if not
     660             :  * done yet.
     661             :  */
     662             : static void
     663      343742 : maybe_send_schema(LogicalDecodingContext *ctx,
     664             :                   ReorderBufferChange *change,
     665             :                   Relation relation, RelationSyncEntry *relentry)
     666             : {
     667             :     bool        schema_sent;
     668      343742 :     TransactionId xid = InvalidTransactionId;
     669      343742 :     TransactionId topxid = InvalidTransactionId;
     670             : 
     671             :     /*
     672             :      * Remember XID of the (sub)transaction for the change. We don't care if
     673             :      * it's top-level transaction or not (we have already sent that XID in
     674             :      * start of the current streaming block).
     675             :      *
     676             :      * If we're not in a streaming block, just use InvalidTransactionId and
     677             :      * the write methods will not include it.
     678             :      */
     679      343742 :     if (in_streaming)
     680      341742 :         xid = change->txn->xid;
     681             : 
     682      343742 :     if (change->txn->toptxn)
     683       43752 :         topxid = change->txn->toptxn->xid;
     684             :     else
     685      299990 :         topxid = xid;
     686             : 
     687             :     /*
     688             :      * Do we need to send the schema? We do track streamed transactions
     689             :      * separately, because those may be applied later (and the regular
     690             :      * transactions won't see their effects until then) and in an order that
     691             :      * we don't know at this point.
     692             :      *
     693             :      * XXX There is a scope of optimization here. Currently, we always send
     694             :      * the schema first time in a streaming transaction but we can probably
     695             :      * avoid that by checking 'relentry->schema_sent' flag. However, before
     696             :      * doing that we need to study its impact on the case where we have a mix
     697             :      * of streaming and non-streaming transactions.
     698             :      */
     699      343742 :     if (in_streaming)
     700      341742 :         schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
     701             :     else
     702        2000 :         schema_sent = relentry->schema_sent;
     703             : 
     704             :     /* Nothing to do if we already sent the schema. */
     705      343742 :     if (schema_sent)
     706      343340 :         return;
     707             : 
     708             :     /*
     709             :      * Send the schema.  If the changes will be published using an ancestor's
     710             :      * schema, not the relation's own, send that ancestor's schema before
     711             :      * sending relation's own (XXX - maybe sending only the former suffices?).
     712             :      */
     713         402 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
     714             :     {
     715          48 :         Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     716             : 
     717          48 :         send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
     718          48 :         RelationClose(ancestor);
     719             :     }
     720             : 
     721         402 :     send_relation_and_attrs(relation, xid, ctx, relentry->columns);
     722             : 
     723         402 :     if (in_streaming)
     724          70 :         set_schema_sent_in_streamed_txn(relentry, topxid);
     725             :     else
     726         332 :         relentry->schema_sent = true;
     727             : }
     728             : 
     729             : /*
     730             :  * Sends a relation
     731             :  */
     732             : static void
     733         450 : send_relation_and_attrs(Relation relation, TransactionId xid,
     734             :                         LogicalDecodingContext *ctx,
     735             :                         Bitmapset *columns)
     736             : {
     737         450 :     TupleDesc   desc = RelationGetDescr(relation);
     738             :     int         i;
     739             : 
     740             :     /*
     741             :      * Write out type info if needed.  We do that only for user-created types.
     742             :      * We use FirstGenbkiObjectId as the cutoff, so that we only consider
     743             :      * objects with hand-assigned OIDs to be "built in", not for instance any
     744             :      * function or type defined in the information_schema. This is important
     745             :      * because only hand-assigned OIDs can be expected to remain stable across
     746             :      * major versions.
     747             :      */
     748        1406 :     for (i = 0; i < desc->natts; i++)
     749             :     {
     750         956 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     751             : 
     752         956 :         if (att->attisdropped || att->attgenerated)
     753           2 :             continue;
     754             : 
     755         954 :         if (att->atttypid < FirstGenbkiObjectId)
     756         920 :             continue;
     757             : 
     758             :         /* Skip this attribute if it's not present in the column list */
     759          34 :         if (columns != NULL && !bms_is_member(att->attnum, columns))
     760           0 :             continue;
     761             : 
     762          34 :         OutputPluginPrepareWrite(ctx, false);
     763          34 :         logicalrep_write_typ(ctx->out, xid, att->atttypid);
     764          34 :         OutputPluginWrite(ctx, false);
     765             :     }
     766             : 
     767         450 :     OutputPluginPrepareWrite(ctx, false);
     768         450 :     logicalrep_write_rel(ctx->out, xid, relation, columns);
     769         450 :     OutputPluginWrite(ctx, false);
     770         450 : }
     771             : 
     772             : /*
     773             :  * Executor state preparation for evaluation of row filter expressions for the
     774             :  * specified relation.
     775             :  */
     776             : static EState *
     777          32 : create_estate_for_relation(Relation rel)
     778             : {
     779             :     EState     *estate;
     780             :     RangeTblEntry *rte;
     781             : 
     782          32 :     estate = CreateExecutorState();
     783             : 
     784          32 :     rte = makeNode(RangeTblEntry);
     785          32 :     rte->rtekind = RTE_RELATION;
     786          32 :     rte->relid = RelationGetRelid(rel);
     787          32 :     rte->relkind = rel->rd_rel->relkind;
     788          32 :     rte->rellockmode = AccessShareLock;
     789          32 :     ExecInitRangeTable(estate, list_make1(rte));
     790             : 
     791          32 :     estate->es_output_cid = GetCurrentCommandId(false);
     792             : 
     793          32 :     return estate;
     794             : }
     795             : 
     796             : /*
     797             :  * Evaluates row filter.
     798             :  *
     799             :  * If the row filter evaluates to NULL, it is taken as false i.e. the change
     800             :  * isn't replicated.
     801             :  */
     802             : static bool
     803          72 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
     804             : {
     805             :     Datum       ret;
     806             :     bool        isnull;
     807             : 
     808             :     Assert(state != NULL);
     809             : 
     810          72 :     ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
     811             : 
     812          72 :     elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
     813             :          isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
     814             :          isnull ? "true" : "false");
     815             : 
     816          72 :     if (isnull)
     817           2 :         return false;
     818             : 
     819          70 :     return DatumGetBool(ret);
     820             : }
     821             : 
     822             : /*
     823             :  * Make sure the per-entry memory context exists.
     824             :  */
     825             : static void
     826         102 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
     827             : {
     828             :     Relation    relation;
     829             : 
     830             :     /* The context may already exist, in which case bail out. */
     831         102 :     if (entry->entry_cxt)
     832           4 :         return;
     833             : 
     834          98 :     relation = RelationIdGetRelation(entry->publish_as_relid);
     835             : 
     836          98 :     entry->entry_cxt = AllocSetContextCreate(data->cachectx,
     837             :                                              "entry private context",
     838             :                                              ALLOCSET_SMALL_SIZES);
     839             : 
     840          98 :     MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
     841             :                                       RelationGetRelationName(relation));
     842             : }
     843             : 
     844             : /*
     845             :  * Initialize the row filter.
     846             :  */
     847             : static void
     848         374 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
     849             :                          RelationSyncEntry *entry)
     850             : {
     851             :     ListCell   *lc;
     852         374 :     List       *rfnodes[] = {NIL, NIL, NIL};    /* One per pubaction */
     853         374 :     bool        no_filter[] = {false, false, false};    /* One per pubaction */
     854             :     MemoryContext oldctx;
     855             :     int         idx;
     856         374 :     bool        has_filter = true;
     857             : 
     858             :     /*
     859             :      * Find if there are any row filters for this relation. If there are, then
     860             :      * prepare the necessary ExprState and cache it in entry->exprstate. To
     861             :      * build an expression state, we need to ensure the following:
     862             :      *
     863             :      * All the given publication-table mappings must be checked.
     864             :      *
     865             :      * Multiple publications might have multiple row filters for this
     866             :      * relation. Since row filter usage depends on the DML operation, there
     867             :      * are multiple lists (one for each operation) to which row filters will
     868             :      * be appended.
     869             :      *
     870             :      * FOR ALL TABLES implies "don't use row filter expression" so it takes
     871             :      * precedence.
     872             :      */
     873         414 :     foreach(lc, publications)
     874             :     {
     875         382 :         Publication *pub = lfirst(lc);
     876         382 :         HeapTuple   rftuple = NULL;
     877         382 :         Datum       rfdatum = 0;
     878         382 :         bool        pub_no_filter = false;
     879             : 
     880         382 :         if (pub->alltables)
     881             :         {
     882             :             /*
     883             :              * If the publication is FOR ALL TABLES then it is treated the
     884             :              * same as if this table has no row filters (even if for other
     885             :              * publications it does).
     886             :              */
     887         106 :             pub_no_filter = true;
     888             :         }
     889             :         else
     890             :         {
     891             :             /*
     892             :              * Check for the presence of a row filter in this publication.
     893             :              */
     894         276 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     895         276 :                                       ObjectIdGetDatum(entry->publish_as_relid),
     896         276 :                                       ObjectIdGetDatum(pub->oid));
     897             : 
     898         276 :             if (HeapTupleIsValid(rftuple))
     899             :             {
     900             :                 /* Null indicates no filter. */
     901         244 :                 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     902             :                                           Anum_pg_publication_rel_prqual,
     903             :                                           &pub_no_filter);
     904             :             }
     905             :             else
     906             :             {
     907          32 :                 pub_no_filter = true;
     908             :             }
     909             :         }
     910             : 
     911         382 :         if (pub_no_filter)
     912             :         {
     913         356 :             if (rftuple)
     914         218 :                 ReleaseSysCache(rftuple);
     915             : 
     916         356 :             no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
     917         356 :             no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
     918         356 :             no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
     919             : 
     920             :             /*
     921             :              * Quick exit if all the DML actions are publicized via this
     922             :              * publication.
     923             :              */
     924         356 :             if (no_filter[PUBACTION_INSERT] &&
     925         356 :                 no_filter[PUBACTION_UPDATE] &&
     926         342 :                 no_filter[PUBACTION_DELETE])
     927             :             {
     928         342 :                 has_filter = false;
     929         342 :                 break;
     930             :             }
     931             : 
     932             :             /* No additional work for this publication. Next one. */
     933          14 :             continue;
     934             :         }
     935             : 
     936             :         /* Form the per pubaction row filter lists. */
     937          26 :         if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
     938          26 :             rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
     939          26 :                                                 TextDatumGetCString(rfdatum));
     940          26 :         if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
     941          26 :             rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
     942          26 :                                                 TextDatumGetCString(rfdatum));
     943          26 :         if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
     944          26 :             rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
     945          26 :                                                 TextDatumGetCString(rfdatum));
     946             : 
     947          26 :         ReleaseSysCache(rftuple);
     948             :     }                           /* loop all subscribed publications */
     949             : 
     950             :     /* Clean the row filter */
     951        1496 :     for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
     952             :     {
     953        1122 :         if (no_filter[idx])
     954             :         {
     955        1044 :             list_free_deep(rfnodes[idx]);
     956        1044 :             rfnodes[idx] = NIL;
     957             :         }
     958             :     }
     959             : 
     960         374 :     if (has_filter)
     961             :     {
     962          32 :         Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
     963             : 
     964          32 :         pgoutput_ensure_entry_cxt(data, entry);
     965             : 
     966             :         /*
     967             :          * Now all the filters for all pubactions are known. Combine them when
     968             :          * their pubactions are the same.
     969             :          */
     970          32 :         oldctx = MemoryContextSwitchTo(entry->entry_cxt);
     971          32 :         entry->estate = create_estate_for_relation(relation);
     972         128 :         for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
     973             :         {
     974          96 :             List       *filters = NIL;
     975             :             Expr       *rfnode;
     976             : 
     977          96 :             if (rfnodes[idx] == NIL)
     978          42 :                 continue;
     979             : 
     980         114 :             foreach(lc, rfnodes[idx])
     981          60 :                 filters = lappend(filters, stringToNode((char *) lfirst(lc)));
     982             : 
     983             :             /* combine the row filter and cache the ExprState */
     984          54 :             rfnode = make_orclause(filters);
     985          54 :             entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
     986             :         }                       /* for each pubaction */
     987          32 :         MemoryContextSwitchTo(oldctx);
     988             : 
     989          32 :         RelationClose(relation);
     990             :     }
     991         374 : }
     992             : 
     993             : /*
     994             :  * Initialize the column list.
     995             :  */
     996             : static void
     997         374 : pgoutput_column_list_init(PGOutputData *data, List *publications,
     998             :                           RelationSyncEntry *entry)
     999             : {
    1000             :     ListCell   *lc;
    1001         374 :     bool        first = true;
    1002         374 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1003             : 
    1004             :     /*
    1005             :      * Find if there are any column lists for this relation. If there are,
    1006             :      * build a bitmap using the column lists.
    1007             :      *
    1008             :      * Multiple publications might have multiple column lists for this
    1009             :      * relation.
    1010             :      *
    1011             :      * Note that we don't support the case where the column list is different
    1012             :      * for the same table when combining publications. See comments atop
    1013             :      * fetch_table_list. But one can later change the publication so we still
    1014             :      * need to check all the given publication-table mappings and report an
    1015             :      * error if any publications have a different column list.
    1016             :      *
    1017             :      * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column
    1018             :      * list".
    1019             :      */
    1020         766 :     foreach(lc, publications)
    1021             :     {
    1022         394 :         Publication *pub = lfirst(lc);
    1023         394 :         HeapTuple   cftuple = NULL;
    1024         394 :         Datum       cfdatum = 0;
    1025         394 :         Bitmapset  *cols = NULL;
    1026             : 
    1027             :         /*
    1028             :          * If the publication is FOR ALL TABLES then it is treated the same as
    1029             :          * if there are no column lists (even if other publications have a
    1030             :          * list).
    1031             :          */
    1032         394 :         if (!pub->alltables)
    1033             :         {
    1034         286 :             bool        pub_no_list = true;
    1035             : 
    1036             :             /*
    1037             :              * Check for the presence of a column list in this publication.
    1038             :              *
    1039             :              * Note: If we find no pg_publication_rel row, it's a publication
    1040             :              * defined for a whole schema, so it can't have a column list,
    1041             :              * just like a FOR ALL TABLES publication.
    1042             :              */
    1043         286 :             cftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1044         286 :                                       ObjectIdGetDatum(entry->publish_as_relid),
    1045         286 :                                       ObjectIdGetDatum(pub->oid));
    1046             : 
    1047         286 :             if (HeapTupleIsValid(cftuple))
    1048             :             {
    1049             :                 /* Lookup the column list attribute. */
    1050         250 :                 cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
    1051             :                                           Anum_pg_publication_rel_prattrs,
    1052             :                                           &pub_no_list);
    1053             : 
    1054             :                 /* Build the column list bitmap in the per-entry context. */
    1055         250 :                 if (!pub_no_list)   /* when not null */
    1056             :                 {
    1057          70 :                     pgoutput_ensure_entry_cxt(data, entry);
    1058             : 
    1059          70 :                     cols = pub_collist_to_bitmapset(cols, cfdatum,
    1060             :                                                     entry->entry_cxt);
    1061             : 
    1062             :                     /*
    1063             :                      * If column list includes all the columns of the table,
    1064             :                      * set it to NULL.
    1065             :                      */
    1066          70 :                     if (bms_num_members(cols) == RelationGetNumberOfAttributes(relation))
    1067             :                     {
    1068          12 :                         bms_free(cols);
    1069          12 :                         cols = NULL;
    1070             :                     }
    1071             :                 }
    1072             : 
    1073         250 :                 ReleaseSysCache(cftuple);
    1074             :             }
    1075             :         }
    1076             : 
    1077         394 :         if (first)
    1078             :         {
    1079         374 :             entry->columns = cols;
    1080         374 :             first = false;
    1081             :         }
    1082          20 :         else if (!bms_equal(entry->columns, cols))
    1083           2 :             ereport(ERROR,
    1084             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1085             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    1086             :                            get_namespace_name(RelationGetNamespace(relation)),
    1087             :                            RelationGetRelationName(relation)));
    1088             :     }                           /* loop all subscribed publications */
    1089             : 
    1090         372 :     RelationClose(relation);
    1091         372 : }
    1092             : 
    1093             : /*
    1094             :  * Initialize the slot for storing new and old tuples, and build the map that
    1095             :  * will be used to convert the relation's tuples into the ancestor's format.
    1096             :  */
    1097             : static void
    1098         374 : init_tuple_slot(PGOutputData *data, Relation relation,
    1099             :                 RelationSyncEntry *entry)
    1100             : {
    1101             :     MemoryContext oldctx;
    1102             :     TupleDesc   oldtupdesc;
    1103             :     TupleDesc   newtupdesc;
    1104             : 
    1105         374 :     oldctx = MemoryContextSwitchTo(data->cachectx);
    1106             : 
    1107             :     /*
    1108             :      * Create tuple table slots. Create a copy of the TupleDesc as it needs to
    1109             :      * live as long as the cache remains.
    1110             :      */
    1111         374 :     oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
    1112         374 :     newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
    1113             : 
    1114         374 :     entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
    1115         374 :     entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
    1116             : 
    1117         374 :     MemoryContextSwitchTo(oldctx);
    1118             : 
    1119             :     /*
    1120             :      * Cache the map that will be used to convert the relation's tuples into
    1121             :      * the ancestor's format, if needed.
    1122             :      */
    1123         374 :     if (entry->publish_as_relid != RelationGetRelid(relation))
    1124             :     {
    1125          50 :         Relation    ancestor = RelationIdGetRelation(entry->publish_as_relid);
    1126          50 :         TupleDesc   indesc = RelationGetDescr(relation);
    1127          50 :         TupleDesc   outdesc = RelationGetDescr(ancestor);
    1128             : 
    1129             :         /* Map must live as long as the session does. */
    1130          50 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1131             : 
    1132          50 :         entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc);
    1133             : 
    1134          50 :         MemoryContextSwitchTo(oldctx);
    1135          50 :         RelationClose(ancestor);
    1136             :     }
    1137         374 : }
    1138             : 
    1139             : /*
    1140             :  * Change is checked against the row filter if any.
    1141             :  *
    1142             :  * Returns true if the change is to be replicated, else false.
    1143             :  *
    1144             :  * For inserts, evaluate the row filter for new tuple.
    1145             :  * For deletes, evaluate the row filter for old tuple.
    1146             :  * For updates, evaluate the row filter for old and new tuple.
    1147             :  *
    1148             :  * For updates, if both evaluations are true, we allow sending the UPDATE and
    1149             :  * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
    1150             :  * only one of the tuples matches the row filter expression, we transform
    1151             :  * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
    1152             :  * following rules:
    1153             :  *
    1154             :  * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
    1155             :  * Case 2: old-row (no match)    new row (match)     -> INSERT
    1156             :  * Case 3: old-row (match)       new-row (no match)  -> DELETE
    1157             :  * Case 4: old-row (match)       new row (match)     -> UPDATE
    1158             :  *
    1159             :  * The new action is updated in the action parameter.
    1160             :  *
    1161             :  * The new slot could be updated when transforming the UPDATE into INSERT,
    1162             :  * because the original new tuple might not have column values from the replica
    1163             :  * identity.
    1164             :  *
    1165             :  * Examples:
    1166             :  * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
    1167             :  * Since the old tuple satisfies, the initial table synchronization copied this
    1168             :  * row (or another method was used to guarantee that there is data
    1169             :  * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
    1170             :  * row filter, so from a data consistency perspective, that row should be
    1171             :  * removed on the subscriber. The UPDATE should be transformed into a DELETE
    1172             :  * statement and be sent to the subscriber. Keeping this row on the subscriber
    1173             :  * is undesirable because it doesn't reflect what was defined in the row filter
    1174             :  * expression on the publisher. This row on the subscriber would likely not be
    1175             :  * modified by replication again. If someone inserted a new row with the same
    1176             :  * old identifier, replication could stop due to a constraint violation.
    1177             :  *
    1178             :  * Let's say the old tuple doesn't match the row filter but the new tuple does.
    1179             :  * Since the old tuple doesn't satisfy, the initial table synchronization
    1180             :  * probably didn't copy this row. However, after the UPDATE the new tuple does
    1181             :  * satisfy the row filter, so from a data consistency perspective, that row
    1182             :  * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
    1183             :  * statements have no effect (it matches no row -- see
    1184             :  * apply_handle_update_internal()). So, the UPDATE should be transformed into a
    1185             :  * INSERT statement and be sent to the subscriber. However, this might surprise
    1186             :  * someone who expects the data set to satisfy the row filter expression on the
    1187             :  * provider.
    1188             :  */
    1189             : static bool
    1190      343742 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
    1191             :                     TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
    1192             :                     ReorderBufferChangeType *action)
    1193             : {
    1194             :     TupleDesc   desc;
    1195             :     int         i;
    1196             :     bool        old_matched,
    1197             :                 new_matched,
    1198             :                 result;
    1199             :     TupleTableSlot *tmp_new_slot;
    1200      343742 :     TupleTableSlot *new_slot = *new_slot_ptr;
    1201             :     ExprContext *ecxt;
    1202             :     ExprState  *filter_exprstate;
    1203             : 
    1204             :     /*
    1205             :      * We need this map to avoid relying on ReorderBufferChangeType enums
    1206             :      * having specific values.
    1207             :      */
    1208             :     static const int map_changetype_pubaction[] = {
    1209             :         [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
    1210             :         [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
    1211             :         [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
    1212             :     };
    1213             : 
    1214             :     Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
    1215             :            *action == REORDER_BUFFER_CHANGE_UPDATE ||
    1216             :            *action == REORDER_BUFFER_CHANGE_DELETE);
    1217             : 
    1218             :     Assert(new_slot || old_slot);
    1219             : 
    1220             :     /* Get the corresponding row filter */
    1221      343742 :     filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
    1222             : 
    1223             :     /* Bail out if there is no row filter */
    1224      343742 :     if (!filter_exprstate)
    1225      343678 :         return true;
    1226             : 
    1227          64 :     elog(DEBUG3, "table \"%s.%s\" has row filter",
    1228             :          get_namespace_name(RelationGetNamespace(relation)),
    1229             :          RelationGetRelationName(relation));
    1230             : 
    1231          64 :     ResetPerTupleExprContext(entry->estate);
    1232             : 
    1233          64 :     ecxt = GetPerTupleExprContext(entry->estate);
    1234             : 
    1235             :     /*
    1236             :      * For the following occasions where there is only one tuple, we can
    1237             :      * evaluate the row filter for that tuple and return.
    1238             :      *
    1239             :      * For inserts, we only have the new tuple.
    1240             :      *
    1241             :      * For updates, we can have only a new tuple when none of the replica
    1242             :      * identity columns changed and none of those columns have external data
    1243             :      * but we still need to evaluate the row filter for the new tuple as the
    1244             :      * existing values of those columns might not match the filter. Also,
    1245             :      * users can use constant expressions in the row filter, so we anyway need
    1246             :      * to evaluate it for the new tuple.
    1247             :      *
    1248             :      * For deletes, we only have the old tuple.
    1249             :      */
    1250          64 :     if (!new_slot || !old_slot)
    1251             :     {
    1252          56 :         ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
    1253          56 :         result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1254             : 
    1255          56 :         return result;
    1256             :     }
    1257             : 
    1258             :     /*
    1259             :      * Both the old and new tuples must be valid only for updates and need to
    1260             :      * be checked against the row filter.
    1261             :      */
    1262             :     Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
    1263             : 
    1264           8 :     slot_getallattrs(new_slot);
    1265           8 :     slot_getallattrs(old_slot);
    1266             : 
    1267           8 :     tmp_new_slot = NULL;
    1268           8 :     desc = RelationGetDescr(relation);
    1269             : 
    1270             :     /*
    1271             :      * The new tuple might not have all the replica identity columns, in which
    1272             :      * case it needs to be copied over from the old tuple.
    1273             :      */
    1274          24 :     for (i = 0; i < desc->natts; i++)
    1275             :     {
    1276          16 :         Form_pg_attribute att = TupleDescAttr(desc, i);
    1277             : 
    1278             :         /*
    1279             :          * if the column in the new tuple or old tuple is null, nothing to do
    1280             :          */
    1281          16 :         if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
    1282           2 :             continue;
    1283             : 
    1284             :         /*
    1285             :          * Unchanged toasted replica identity columns are only logged in the
    1286             :          * old tuple. Copy this over to the new tuple. The changed (or WAL
    1287             :          * Logged) toast values are always assembled in memory and set as
    1288             :          * VARTAG_INDIRECT. See ReorderBufferToastReplace.
    1289             :          */
    1290          14 :         if (att->attlen == -1 &&
    1291           8 :             VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
    1292           2 :             !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
    1293             :         {
    1294           2 :             if (!tmp_new_slot)
    1295             :             {
    1296           2 :                 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
    1297           2 :                 ExecClearTuple(tmp_new_slot);
    1298             : 
    1299           2 :                 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
    1300           2 :                        desc->natts * sizeof(Datum));
    1301           2 :                 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
    1302           2 :                        desc->natts * sizeof(bool));
    1303             :             }
    1304             : 
    1305           2 :             tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
    1306           2 :             tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
    1307             :         }
    1308             :     }
    1309             : 
    1310           8 :     ecxt->ecxt_scantuple = old_slot;
    1311           8 :     old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1312             : 
    1313           8 :     if (tmp_new_slot)
    1314             :     {
    1315           2 :         ExecStoreVirtualTuple(tmp_new_slot);
    1316           2 :         ecxt->ecxt_scantuple = tmp_new_slot;
    1317             :     }
    1318             :     else
    1319           6 :         ecxt->ecxt_scantuple = new_slot;
    1320             : 
    1321           8 :     new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1322             : 
    1323             :     /*
    1324             :      * Case 1: if both tuples don't match the row filter, bailout. Send
    1325             :      * nothing.
    1326             :      */
    1327           8 :     if (!old_matched && !new_matched)
    1328           0 :         return false;
    1329             : 
    1330             :     /*
    1331             :      * Case 2: if the old tuple doesn't satisfy the row filter but the new
    1332             :      * tuple does, transform the UPDATE into INSERT.
    1333             :      *
    1334             :      * Use the newly transformed tuple that must contain the column values for
    1335             :      * all the replica identity columns. This is required to ensure that the
    1336             :      * while inserting the tuple in the downstream node, we have all the
    1337             :      * required column values.
    1338             :      */
    1339           8 :     if (!old_matched && new_matched)
    1340             :     {
    1341           4 :         *action = REORDER_BUFFER_CHANGE_INSERT;
    1342             : 
    1343           4 :         if (tmp_new_slot)
    1344           2 :             *new_slot_ptr = tmp_new_slot;
    1345             :     }
    1346             : 
    1347             :     /*
    1348             :      * Case 3: if the old tuple satisfies the row filter but the new tuple
    1349             :      * doesn't, transform the UPDATE into DELETE.
    1350             :      *
    1351             :      * This transformation does not require another tuple. The Old tuple will
    1352             :      * be used for DELETE.
    1353             :      */
    1354           4 :     else if (old_matched && !new_matched)
    1355           2 :         *action = REORDER_BUFFER_CHANGE_DELETE;
    1356             : 
    1357             :     /*
    1358             :      * Case 4: if both tuples match the row filter, transformation isn't
    1359             :      * required. (*action is default UPDATE).
    1360             :      */
    1361             : 
    1362           8 :     return true;
    1363             : }
    1364             : 
    1365             : /*
    1366             :  * Sends the decoded DML over wire.
    1367             :  *
    1368             :  * This is called both in streaming and non-streaming modes.
    1369             :  */
    1370             : static void
    1371      346014 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1372             :                 Relation relation, ReorderBufferChange *change)
    1373             : {
    1374      346014 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1375      346014 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1376             :     MemoryContext old;
    1377             :     RelationSyncEntry *relentry;
    1378      346014 :     TransactionId xid = InvalidTransactionId;
    1379      346014 :     Relation    ancestor = NULL;
    1380      346014 :     Relation    targetrel = relation;
    1381      346014 :     ReorderBufferChangeType action = change->action;
    1382      346014 :     TupleTableSlot *old_slot = NULL;
    1383      346014 :     TupleTableSlot *new_slot = NULL;
    1384             : 
    1385      346014 :     update_replication_progress(ctx, false);
    1386             : 
    1387      346014 :     if (!is_publishable_relation(relation))
    1388        2268 :         return;
    1389             : 
    1390             :     /*
    1391             :      * Remember the xid for the change in streaming mode. We need to send xid
    1392             :      * with each change in the streaming mode so that subscriber can make
    1393             :      * their association and on aborts, it can discard the corresponding
    1394             :      * changes.
    1395             :      */
    1396      346010 :     if (in_streaming)
    1397      341742 :         xid = change->txn->xid;
    1398             : 
    1399      346010 :     relentry = get_rel_sync_entry(data, relation);
    1400             : 
    1401             :     /* First check the table filter */
    1402      346006 :     switch (action)
    1403             :     {
    1404      193286 :         case REORDER_BUFFER_CHANGE_INSERT:
    1405      193286 :             if (!relentry->pubactions.pubinsert)
    1406          78 :                 return;
    1407      193208 :             break;
    1408       77918 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1409       77918 :             if (!relentry->pubactions.pubupdate)
    1410          80 :                 return;
    1411       77838 :             break;
    1412       74802 :         case REORDER_BUFFER_CHANGE_DELETE:
    1413       74802 :             if (!relentry->pubactions.pubdelete)
    1414        2106 :                 return;
    1415       72696 :             break;
    1416      343742 :         default:
    1417             :             Assert(false);
    1418             :     }
    1419             : 
    1420             :     /* Avoid leaking memory by using and resetting our own context */
    1421      343742 :     old = MemoryContextSwitchTo(data->context);
    1422             : 
    1423             :     /* Send the data */
    1424      343742 :     switch (action)
    1425             :     {
    1426      193208 :         case REORDER_BUFFER_CHANGE_INSERT:
    1427      193208 :             new_slot = relentry->new_slot;
    1428      193208 :             ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
    1429             :                                new_slot, false);
    1430             : 
    1431             :             /* Switch relation if publishing via root. */
    1432      193208 :             if (relentry->publish_as_relid != RelationGetRelid(relation))
    1433             :             {
    1434             :                 Assert(relation->rd_rel->relispartition);
    1435          84 :                 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
    1436          84 :                 targetrel = ancestor;
    1437             :                 /* Convert tuple if needed. */
    1438          84 :                 if (relentry->attrmap)
    1439             :                 {
    1440          14 :                     TupleDesc   tupdesc = RelationGetDescr(targetrel);
    1441             : 
    1442          14 :                     new_slot = execute_attr_map_slot(relentry->attrmap,
    1443             :                                                      new_slot,
    1444             :                                                      MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
    1445             :                 }
    1446             :             }
    1447             : 
    1448             :             /* Check row filter */
    1449      193208 :             if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
    1450             :                                      &action))
    1451          22 :                 break;
    1452             : 
    1453             :             /*
    1454             :              * Send BEGIN if we haven't yet.
    1455             :              *
    1456             :              * We send the BEGIN message after ensuring that we will actually
    1457             :              * send the change. This avoids sending a pair of BEGIN/COMMIT
    1458             :              * messages for empty transactions.
    1459             :              */
    1460      193186 :             if (txndata && !txndata->sent_begin_txn)
    1461         308 :                 pgoutput_send_begin(ctx, txn);
    1462             : 
    1463             :             /*
    1464             :              * Schema should be sent using the original relation because it
    1465             :              * also sends the ancestor's relation.
    1466             :              */
    1467      193186 :             maybe_send_schema(ctx, change, relation, relentry);
    1468             : 
    1469      193186 :             OutputPluginPrepareWrite(ctx, true);
    1470      193186 :             logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
    1471      193186 :                                     data->binary, relentry->columns);
    1472      193186 :             OutputPluginWrite(ctx, true);
    1473      193184 :             break;
    1474       77838 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1475       77838 :             if (change->data.tp.oldtuple)
    1476             :             {
    1477         190 :                 old_slot = relentry->old_slot;
    1478         190 :                 ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
    1479             :                                    old_slot, false);
    1480             :             }
    1481             : 
    1482       77838 :             new_slot = relentry->new_slot;
    1483       77838 :             ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
    1484             :                                new_slot, false);
    1485             : 
    1486             :             /* Switch relation if publishing via root. */
    1487       77838 :             if (relentry->publish_as_relid != RelationGetRelid(relation))
    1488             :             {
    1489             :                 Assert(relation->rd_rel->relispartition);
    1490           2 :                 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
    1491           2 :                 targetrel = ancestor;
    1492             :                 /* Convert tuples if needed. */
    1493           2 :                 if (relentry->attrmap)
    1494             :                 {
    1495           0 :                     TupleDesc   tupdesc = RelationGetDescr(targetrel);
    1496             : 
    1497           0 :                     if (old_slot)
    1498           0 :                         old_slot = execute_attr_map_slot(relentry->attrmap,
    1499             :                                                          old_slot,
    1500             :                                                          MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
    1501             : 
    1502           0 :                     new_slot = execute_attr_map_slot(relentry->attrmap,
    1503             :                                                      new_slot,
    1504             :                                                      MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
    1505             :                 }
    1506             :             }
    1507             : 
    1508             :             /* Check row filter */
    1509       77838 :             if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
    1510             :                                      relentry, &action))
    1511           0 :                 break;
    1512             : 
    1513             :             /* Send BEGIN if we haven't yet */
    1514       77838 :             if (txndata && !txndata->sent_begin_txn)
    1515         164 :                 pgoutput_send_begin(ctx, txn);
    1516             : 
    1517       77838 :             maybe_send_schema(ctx, change, relation, relentry);
    1518             : 
    1519       77838 :             OutputPluginPrepareWrite(ctx, true);
    1520             : 
    1521             :             /*
    1522             :              * Updates could be transformed to inserts or deletes based on the
    1523             :              * results of the row filter for old and new tuple.
    1524             :              */
    1525       77838 :             switch (action)
    1526             :             {
    1527           4 :                 case REORDER_BUFFER_CHANGE_INSERT:
    1528           4 :                     logicalrep_write_insert(ctx->out, xid, targetrel,
    1529           4 :                                             new_slot, data->binary,
    1530             :                                             relentry->columns);
    1531           4 :                     break;
    1532       77832 :                 case REORDER_BUFFER_CHANGE_UPDATE:
    1533       77832 :                     logicalrep_write_update(ctx->out, xid, targetrel,
    1534       77832 :                                             old_slot, new_slot, data->binary,
    1535             :                                             relentry->columns);
    1536       77832 :                     break;
    1537           2 :                 case REORDER_BUFFER_CHANGE_DELETE:
    1538           2 :                     logicalrep_write_delete(ctx->out, xid, targetrel,
    1539           2 :                                             old_slot, data->binary);
    1540           2 :                     break;
    1541       77838 :                 default:
    1542             :                     Assert(false);
    1543             :             }
    1544             : 
    1545       77838 :             OutputPluginWrite(ctx, true);
    1546       77838 :             break;
    1547       72696 :         case REORDER_BUFFER_CHANGE_DELETE:
    1548       72696 :             if (change->data.tp.oldtuple)
    1549             :             {
    1550       72696 :                 old_slot = relentry->old_slot;
    1551             : 
    1552       72696 :                 ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
    1553             :                                    old_slot, false);
    1554             : 
    1555             :                 /* Switch relation if publishing via root. */
    1556       72696 :                 if (relentry->publish_as_relid != RelationGetRelid(relation))
    1557             :                 {
    1558             :                     Assert(relation->rd_rel->relispartition);
    1559           6 :                     ancestor = RelationIdGetRelation(relentry->publish_as_relid);
    1560           6 :                     targetrel = ancestor;
    1561             :                     /* Convert tuple if needed. */
    1562           6 :                     if (relentry->attrmap)
    1563             :                     {
    1564           0 :                         TupleDesc   tupdesc = RelationGetDescr(targetrel);
    1565             : 
    1566           0 :                         old_slot = execute_attr_map_slot(relentry->attrmap,
    1567             :                                                          old_slot,
    1568             :                                                          MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
    1569             :                     }
    1570             :                 }
    1571             : 
    1572             :                 /* Check row filter */
    1573       72696 :                 if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
    1574             :                                          relentry, &action))
    1575           0 :                     break;
    1576             : 
    1577             :                 /* Send BEGIN if we haven't yet */
    1578       72696 :                 if (txndata && !txndata->sent_begin_txn)
    1579         104 :                     pgoutput_send_begin(ctx, txn);
    1580             : 
    1581       72696 :                 maybe_send_schema(ctx, change, relation, relentry);
    1582             : 
    1583       72696 :                 OutputPluginPrepareWrite(ctx, true);
    1584       72696 :                 logicalrep_write_delete(ctx->out, xid, targetrel,
    1585       72696 :                                         old_slot, data->binary);
    1586       72696 :                 OutputPluginWrite(ctx, true);
    1587             :             }
    1588             :             else
    1589           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
    1590       72694 :             break;
    1591      343738 :         default:
    1592             :             Assert(false);
    1593             :     }
    1594             : 
    1595      343738 :     if (RelationIsValid(ancestor))
    1596             :     {
    1597          92 :         RelationClose(ancestor);
    1598          92 :         ancestor = NULL;
    1599             :     }
    1600             : 
    1601             :     /* Cleanup */
    1602      343738 :     MemoryContextSwitchTo(old);
    1603      343738 :     MemoryContextReset(data->context);
    1604             : }
    1605             : 
    1606             : static void
    1607          20 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1608             :                   int nrelations, Relation relations[], ReorderBufferChange *change)
    1609             : {
    1610          20 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1611          20 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1612             :     MemoryContext old;
    1613             :     RelationSyncEntry *relentry;
    1614             :     int         i;
    1615             :     int         nrelids;
    1616             :     Oid        *relids;
    1617          20 :     TransactionId xid = InvalidTransactionId;
    1618             : 
    1619          20 :     update_replication_progress(ctx, false);
    1620             : 
    1621             :     /* Remember the xid for the change in streaming mode. See pgoutput_change. */
    1622          20 :     if (in_streaming)
    1623           0 :         xid = change->txn->xid;
    1624             : 
    1625          20 :     old = MemoryContextSwitchTo(data->context);
    1626             : 
    1627          20 :     relids = palloc0(nrelations * sizeof(Oid));
    1628          20 :     nrelids = 0;
    1629             : 
    1630          58 :     for (i = 0; i < nrelations; i++)
    1631             :     {
    1632          38 :         Relation    relation = relations[i];
    1633          38 :         Oid         relid = RelationGetRelid(relation);
    1634             : 
    1635          38 :         if (!is_publishable_relation(relation))
    1636           0 :             continue;
    1637             : 
    1638          38 :         relentry = get_rel_sync_entry(data, relation);
    1639             : 
    1640          38 :         if (!relentry->pubactions.pubtruncate)
    1641          16 :             continue;
    1642             : 
    1643             :         /*
    1644             :          * Don't send partitions if the publication wants to send only the
    1645             :          * root tables through it.
    1646             :          */
    1647          22 :         if (relation->rd_rel->relispartition &&
    1648          20 :             relentry->publish_as_relid != relid)
    1649           0 :             continue;
    1650             : 
    1651          22 :         relids[nrelids++] = relid;
    1652             : 
    1653             :         /* Send BEGIN if we haven't yet */
    1654          22 :         if (txndata && !txndata->sent_begin_txn)
    1655          12 :             pgoutput_send_begin(ctx, txn);
    1656             : 
    1657          22 :         maybe_send_schema(ctx, change, relation, relentry);
    1658             :     }
    1659             : 
    1660          20 :     if (nrelids > 0)
    1661             :     {
    1662          12 :         OutputPluginPrepareWrite(ctx, true);
    1663          12 :         logicalrep_write_truncate(ctx->out,
    1664             :                                   xid,
    1665             :                                   nrelids,
    1666             :                                   relids,
    1667          12 :                                   change->data.truncate.cascade,
    1668          12 :                                   change->data.truncate.restart_seqs);
    1669          12 :         OutputPluginWrite(ctx, true);
    1670             :     }
    1671             : 
    1672          20 :     MemoryContextSwitchTo(old);
    1673          20 :     MemoryContextReset(data->context);
    1674          20 : }
    1675             : 
    1676             : static void
    1677          12 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1678             :                  XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
    1679             :                  const char *message)
    1680             : {
    1681          12 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1682          12 :     TransactionId xid = InvalidTransactionId;
    1683             : 
    1684          12 :     update_replication_progress(ctx, false);
    1685             : 
    1686          12 :     if (!data->messages)
    1687           2 :         return;
    1688             : 
    1689             :     /*
    1690             :      * Remember the xid for the message in streaming mode. See
    1691             :      * pgoutput_change.
    1692             :      */
    1693          10 :     if (in_streaming)
    1694           0 :         xid = txn->xid;
    1695             : 
    1696             :     /*
    1697             :      * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
    1698             :      */
    1699          10 :     if (transactional)
    1700             :     {
    1701           4 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1702             : 
    1703             :         /* Send BEGIN if we haven't yet */
    1704           4 :         if (txndata && !txndata->sent_begin_txn)
    1705           4 :             pgoutput_send_begin(ctx, txn);
    1706             :     }
    1707             : 
    1708          10 :     OutputPluginPrepareWrite(ctx, true);
    1709          10 :     logicalrep_write_message(ctx->out,
    1710             :                              xid,
    1711             :                              message_lsn,
    1712             :                              transactional,
    1713             :                              prefix,
    1714             :                              sz,
    1715             :                              message);
    1716          10 :     OutputPluginWrite(ctx, true);
    1717             : }
    1718             : 
    1719             : /*
    1720             :  * Return true if the data is associated with an origin and the user has
    1721             :  * requested the changes that don't have an origin, false otherwise.
    1722             :  */
    1723             : static bool
    1724      440158 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
    1725             :                        RepOriginId origin_id)
    1726             : {
    1727      440158 :     if (publish_no_origin && origin_id != InvalidRepOriginId)
    1728          42 :         return true;
    1729             : 
    1730      440116 :     return false;
    1731             : }
    1732             : 
    1733             : /*
    1734             :  * Shutdown the output plugin.
    1735             :  *
    1736             :  * Note, we don't need to clean the data->context and data->cachectx as
    1737             :  * they are child context of the ctx->context so it will be cleaned up by
    1738             :  * logical decoding machinery.
    1739             :  */
    1740             : static void
    1741         660 : pgoutput_shutdown(LogicalDecodingContext *ctx)
    1742             : {
    1743         660 :     if (RelationSyncCache)
    1744             :     {
    1745         266 :         hash_destroy(RelationSyncCache);
    1746         266 :         RelationSyncCache = NULL;
    1747             :     }
    1748         660 : }
    1749             : 
    1750             : /*
    1751             :  * Load publications from the list of publication names.
    1752             :  */
    1753             : static List *
    1754         200 : LoadPublications(List *pubnames)
    1755             : {
    1756         200 :     List       *result = NIL;
    1757             :     ListCell   *lc;
    1758             : 
    1759         470 :     foreach(lc, pubnames)
    1760             :     {
    1761         272 :         char       *pubname = (char *) lfirst(lc);
    1762         272 :         Publication *pub = GetPublicationByName(pubname, false);
    1763             : 
    1764         270 :         result = lappend(result, pub);
    1765             :     }
    1766             : 
    1767         198 :     return result;
    1768             : }
    1769             : 
    1770             : /*
    1771             :  * Publication syscache invalidation callback.
    1772             :  *
    1773             :  * Called for invalidations on pg_publication.
    1774             :  */
    1775             : static void
    1776         386 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
    1777             : {
    1778         386 :     publications_valid = false;
    1779             : 
    1780             :     /*
    1781             :      * Also invalidate per-relation cache so that next time the filtering info
    1782             :      * is checked it will be updated with the new publication settings.
    1783             :      */
    1784         386 :     rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
    1785         386 : }
    1786             : 
    1787             : /*
    1788             :  * START STREAM callback
    1789             :  */
    1790             : static void
    1791         846 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
    1792             :                       ReorderBufferTXN *txn)
    1793             : {
    1794         846 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
    1795             : 
    1796             :     /* we can't nest streaming of transactions */
    1797             :     Assert(!in_streaming);
    1798             : 
    1799             :     /*
    1800             :      * If we already sent the first stream for this transaction then don't
    1801             :      * send the origin id in the subsequent streams.
    1802             :      */
    1803         846 :     if (rbtxn_is_streamed(txn))
    1804         792 :         send_replication_origin = false;
    1805             : 
    1806         846 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
    1807         846 :     logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
    1808             : 
    1809         846 :     send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
    1810             :                      send_replication_origin);
    1811             : 
    1812         846 :     OutputPluginWrite(ctx, true);
    1813             : 
    1814             :     /* we're streaming a chunk of transaction now */
    1815         846 :     in_streaming = true;
    1816         846 : }
    1817             : 
    1818             : /*
    1819             :  * STOP STREAM callback
    1820             :  */
    1821             : static void
    1822         842 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
    1823             :                      ReorderBufferTXN *txn)
    1824             : {
    1825             :     /* we should be streaming a trasanction */
    1826             :     Assert(in_streaming);
    1827             : 
    1828         842 :     OutputPluginPrepareWrite(ctx, true);
    1829         842 :     logicalrep_write_stream_stop(ctx->out);
    1830         842 :     OutputPluginWrite(ctx, true);
    1831             : 
    1832             :     /* we've stopped streaming a transaction */
    1833         842 :     in_streaming = false;
    1834         842 : }
    1835             : 
    1836             : /*
    1837             :  * Notify downstream to discard the streamed transaction (along with all
    1838             :  * it's subtransactions, if it's a toplevel transaction).
    1839             :  */
    1840             : static void
    1841          28 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
    1842             :                       ReorderBufferTXN *txn,
    1843             :                       XLogRecPtr abort_lsn)
    1844             : {
    1845             :     ReorderBufferTXN *toptxn;
    1846             : 
    1847             :     /*
    1848             :      * The abort should happen outside streaming block, even for streamed
    1849             :      * transactions. The transaction has to be marked as streamed, though.
    1850             :      */
    1851             :     Assert(!in_streaming);
    1852             : 
    1853             :     /* determine the toplevel transaction */
    1854          28 :     toptxn = (txn->toptxn) ? txn->toptxn : txn;
    1855             : 
    1856             :     Assert(rbtxn_is_streamed(toptxn));
    1857             : 
    1858          28 :     OutputPluginPrepareWrite(ctx, true);
    1859          28 :     logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
    1860          28 :     OutputPluginWrite(ctx, true);
    1861             : 
    1862          28 :     cleanup_rel_sync_cache(toptxn->xid, false);
    1863          28 : }
    1864             : 
    1865             : /*
    1866             :  * Notify downstream to apply the streamed transaction (along with all
    1867             :  * it's subtransactions).
    1868             :  */
    1869             : static void
    1870          32 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
    1871             :                        ReorderBufferTXN *txn,
    1872             :                        XLogRecPtr commit_lsn)
    1873             : {
    1874             :     /*
    1875             :      * The commit should happen outside streaming block, even for streamed
    1876             :      * transactions. The transaction has to be marked as streamed, though.
    1877             :      */
    1878             :     Assert(!in_streaming);
    1879             :     Assert(rbtxn_is_streamed(txn));
    1880             : 
    1881          32 :     update_replication_progress(ctx, false);
    1882             : 
    1883          32 :     OutputPluginPrepareWrite(ctx, true);
    1884          32 :     logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
    1885          32 :     OutputPluginWrite(ctx, true);
    1886             : 
    1887          32 :     cleanup_rel_sync_cache(txn->xid, true);
    1888          32 : }
    1889             : 
    1890             : /*
    1891             :  * PREPARE callback (for streaming two-phase commit).
    1892             :  *
    1893             :  * Notify the downstream to prepare the transaction.
    1894             :  */
    1895             : static void
    1896          16 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
    1897             :                             ReorderBufferTXN *txn,
    1898             :                             XLogRecPtr prepare_lsn)
    1899             : {
    1900             :     Assert(rbtxn_is_streamed(txn));
    1901             : 
    1902          16 :     update_replication_progress(ctx, false);
    1903          16 :     OutputPluginPrepareWrite(ctx, true);
    1904          16 :     logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
    1905          16 :     OutputPluginWrite(ctx, true);
    1906          16 : }
    1907             : 
    1908             : /*
    1909             :  * Initialize the relation schema sync cache for a decoding session.
    1910             :  *
    1911             :  * The hash table is destroyed at the end of a decoding session. While
    1912             :  * relcache invalidations still exist and will still be invoked, they
    1913             :  * will just see the null hash table global and take no action.
    1914             :  */
    1915             : static void
    1916         462 : init_rel_sync_cache(MemoryContext cachectx)
    1917             : {
    1918             :     HASHCTL     ctl;
    1919             : 
    1920         462 :     if (RelationSyncCache != NULL)
    1921           0 :         return;
    1922             : 
    1923             :     /* Make a new hash table for the cache */
    1924         462 :     ctl.keysize = sizeof(Oid);
    1925         462 :     ctl.entrysize = sizeof(RelationSyncEntry);
    1926         462 :     ctl.hcxt = cachectx;
    1927             : 
    1928         462 :     RelationSyncCache = hash_create("logical replication output relation cache",
    1929             :                                     128, &ctl,
    1930             :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
    1931             : 
    1932             :     Assert(RelationSyncCache != NULL);
    1933             : 
    1934         462 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
    1935         462 :     CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
    1936             :                                   rel_sync_cache_publication_cb,
    1937             :                                   (Datum) 0);
    1938         462 :     CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
    1939             :                                   rel_sync_cache_publication_cb,
    1940             :                                   (Datum) 0);
    1941             : }
    1942             : 
    1943             : /*
    1944             :  * We expect relatively small number of streamed transactions.
    1945             :  */
    1946             : static bool
    1947      341742 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    1948             : {
    1949      341742 :     return list_member_xid(entry->streamed_txns, xid);
    1950             : }
    1951             : 
    1952             : /*
    1953             :  * Add the xid in the rel sync entry for which we have already sent the schema
    1954             :  * of the relation.
    1955             :  */
    1956             : static void
    1957          70 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    1958             : {
    1959             :     MemoryContext oldctx;
    1960             : 
    1961          70 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1962             : 
    1963          70 :     entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
    1964             : 
    1965          70 :     MemoryContextSwitchTo(oldctx);
    1966          70 : }
    1967             : 
    1968             : /*
    1969             :  * Find or create entry in the relation schema cache.
    1970             :  *
    1971             :  * This looks up publications that the given relation is directly or
    1972             :  * indirectly part of (the latter if it's really the relation's ancestor that
    1973             :  * is part of a publication) and fills up the found entry with the information
    1974             :  * about which operations to publish and whether to use an ancestor's schema
    1975             :  * when publishing.
    1976             :  */
    1977             : static RelationSyncEntry *
    1978      346048 : get_rel_sync_entry(PGOutputData *data, Relation relation)
    1979             : {
    1980             :     RelationSyncEntry *entry;
    1981             :     bool        found;
    1982             :     MemoryContext oldctx;
    1983      346048 :     Oid         relid = RelationGetRelid(relation);
    1984             : 
    1985             :     Assert(RelationSyncCache != NULL);
    1986             : 
    1987             :     /* Find cached relation info, creating if not found */
    1988      346048 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
    1989             :                                               (void *) &relid,
    1990             :                                               HASH_ENTER, &found);
    1991             :     Assert(entry != NULL);
    1992             : 
    1993             :     /* initialize entry, if it's new */
    1994      346048 :     if (!found)
    1995             :     {
    1996         344 :         entry->replicate_valid = false;
    1997         344 :         entry->schema_sent = false;
    1998         344 :         entry->streamed_txns = NIL;
    1999         344 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
    2000         344 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
    2001         344 :         entry->new_slot = NULL;
    2002         344 :         entry->old_slot = NULL;
    2003         344 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2004         344 :         entry->entry_cxt = NULL;
    2005         344 :         entry->publish_as_relid = InvalidOid;
    2006         344 :         entry->columns = NULL;
    2007         344 :         entry->attrmap = NULL;
    2008             :     }
    2009             : 
    2010             :     /* Validate the entry */
    2011      346048 :     if (!entry->replicate_valid)
    2012             :     {
    2013         444 :         Oid         schemaId = get_rel_namespace(relid);
    2014         444 :         List       *pubids = GetRelationPublications(relid);
    2015             : 
    2016             :         /*
    2017             :          * We don't acquire a lock on the namespace system table as we build
    2018             :          * the cache entry using a historic snapshot and all the later changes
    2019             :          * are absorbed while decoding WAL.
    2020             :          */
    2021         444 :         List       *schemaPubids = GetSchemaPublications(schemaId);
    2022             :         ListCell   *lc;
    2023         444 :         Oid         publish_as_relid = relid;
    2024         444 :         int         publish_ancestor_level = 0;
    2025         444 :         bool        am_partition = get_rel_relispartition(relid);
    2026         444 :         char        relkind = get_rel_relkind(relid);
    2027         444 :         List       *rel_publications = NIL;
    2028             : 
    2029             :         /* Reload publications if needed before use. */
    2030         444 :         if (!publications_valid)
    2031             :         {
    2032         200 :             oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    2033         200 :             if (data->publications)
    2034             :             {
    2035          34 :                 list_free_deep(data->publications);
    2036          34 :                 data->publications = NIL;
    2037             :             }
    2038         200 :             data->publications = LoadPublications(data->publication_names);
    2039         198 :             MemoryContextSwitchTo(oldctx);
    2040         198 :             publications_valid = true;
    2041             :         }
    2042             : 
    2043             :         /*
    2044             :          * Reset schema_sent status as the relation definition may have
    2045             :          * changed.  Also reset pubactions to empty in case rel was dropped
    2046             :          * from a publication.  Also free any objects that depended on the
    2047             :          * earlier definition.
    2048             :          */
    2049         442 :         entry->schema_sent = false;
    2050         442 :         list_free(entry->streamed_txns);
    2051         442 :         entry->streamed_txns = NIL;
    2052         442 :         bms_free(entry->columns);
    2053         442 :         entry->columns = NULL;
    2054         442 :         entry->pubactions.pubinsert = false;
    2055         442 :         entry->pubactions.pubupdate = false;
    2056         442 :         entry->pubactions.pubdelete = false;
    2057         442 :         entry->pubactions.pubtruncate = false;
    2058             : 
    2059             :         /*
    2060             :          * Tuple slots cleanups. (Will be rebuilt later if needed).
    2061             :          */
    2062         442 :         if (entry->old_slot)
    2063          92 :             ExecDropSingleTupleTableSlot(entry->old_slot);
    2064         442 :         if (entry->new_slot)
    2065          92 :             ExecDropSingleTupleTableSlot(entry->new_slot);
    2066             : 
    2067         442 :         entry->old_slot = NULL;
    2068         442 :         entry->new_slot = NULL;
    2069             : 
    2070         442 :         if (entry->attrmap)
    2071           0 :             free_attrmap(entry->attrmap);
    2072         442 :         entry->attrmap = NULL;
    2073             : 
    2074             :         /*
    2075             :          * Row filter cache cleanups.
    2076             :          */
    2077         442 :         if (entry->entry_cxt)
    2078          20 :             MemoryContextDelete(entry->entry_cxt);
    2079             : 
    2080         442 :         entry->entry_cxt = NULL;
    2081         442 :         entry->estate = NULL;
    2082         442 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2083             : 
    2084             :         /*
    2085             :          * Build publication cache. We can't use one provided by relcache as
    2086             :          * relcache considers all publications that the given relation is in,
    2087             :          * but here we only need to consider ones that the subscriber
    2088             :          * requested.
    2089             :          */
    2090        1134 :         foreach(lc, data->publications)
    2091             :         {
    2092         692 :             Publication *pub = lfirst(lc);
    2093         692 :             bool        publish = false;
    2094             : 
    2095             :             /*
    2096             :              * Under what relid should we publish changes in this publication?
    2097             :              * We'll use the top-most relid across all publications. Also
    2098             :              * track the ancestor level for this publication.
    2099             :              */
    2100         692 :             Oid         pub_relid = relid;
    2101         692 :             int         ancestor_level = 0;
    2102             : 
    2103             :             /*
    2104             :              * If this is a FOR ALL TABLES publication, pick the partition
    2105             :              * root and set the ancestor level accordingly.
    2106             :              */
    2107         692 :             if (pub->alltables)
    2108             :             {
    2109         110 :                 publish = true;
    2110         110 :                 if (pub->pubviaroot && am_partition)
    2111             :                 {
    2112          18 :                     List       *ancestors = get_partition_ancestors(relid);
    2113             : 
    2114          18 :                     pub_relid = llast_oid(ancestors);
    2115          18 :                     ancestor_level = list_length(ancestors);
    2116             :                 }
    2117             :             }
    2118             : 
    2119         692 :             if (!publish)
    2120             :             {
    2121         582 :                 bool        ancestor_published = false;
    2122             : 
    2123             :                 /*
    2124             :                  * For a partition, check if any of the ancestors are
    2125             :                  * published.  If so, note down the topmost ancestor that is
    2126             :                  * published via this publication, which will be used as the
    2127             :                  * relation via which to publish the partition's changes.
    2128             :                  */
    2129         582 :                 if (am_partition)
    2130             :                 {
    2131             :                     Oid         ancestor;
    2132             :                     int         level;
    2133         176 :                     List       *ancestors = get_partition_ancestors(relid);
    2134             : 
    2135         176 :                     ancestor = GetTopMostAncestorInPublication(pub->oid,
    2136             :                                                                ancestors,
    2137             :                                                                &level);
    2138             : 
    2139         176 :                     if (ancestor != InvalidOid)
    2140             :                     {
    2141          76 :                         ancestor_published = true;
    2142          76 :                         if (pub->pubviaroot)
    2143             :                         {
    2144          34 :                             pub_relid = ancestor;
    2145          34 :                             ancestor_level = level;
    2146             :                         }
    2147             :                     }
    2148             :                 }
    2149             : 
    2150         930 :                 if (list_member_oid(pubids, pub->oid) ||
    2151         682 :                     list_member_oid(schemaPubids, pub->oid) ||
    2152             :                     ancestor_published)
    2153         294 :                     publish = true;
    2154             :             }
    2155             : 
    2156             :             /*
    2157             :              * If the relation is to be published, determine actions to
    2158             :              * publish, and list of columns, if appropriate.
    2159             :              *
    2160             :              * Don't publish changes for partitioned tables, because
    2161             :              * publishing those of its partitions suffices, unless partition
    2162             :              * changes won't be published due to pubviaroot being set.
    2163             :              */
    2164         692 :             if (publish &&
    2165           6 :                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
    2166             :             {
    2167         398 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
    2168         398 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
    2169         398 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
    2170         398 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
    2171             : 
    2172             :                 /*
    2173             :                  * We want to publish the changes as the top-most ancestor
    2174             :                  * across all publications. So we need to check if the already
    2175             :                  * calculated level is higher than the new one. If yes, we can
    2176             :                  * ignore the new value (as it's a child). Otherwise the new
    2177             :                  * value is an ancestor, so we keep it.
    2178             :                  */
    2179         398 :                 if (publish_ancestor_level > ancestor_level)
    2180           0 :                     continue;
    2181             : 
    2182             :                 /*
    2183             :                  * If we found an ancestor higher up in the tree, discard the
    2184             :                  * list of publications through which we replicate it, and use
    2185             :                  * the new ancestor.
    2186             :                  */
    2187         398 :                 if (publish_ancestor_level < ancestor_level)
    2188             :                 {
    2189          52 :                     publish_as_relid = pub_relid;
    2190          52 :                     publish_ancestor_level = ancestor_level;
    2191             : 
    2192             :                     /* reset the publication list for this relation */
    2193          52 :                     rel_publications = NIL;
    2194             :                 }
    2195             :                 else
    2196             :                 {
    2197             :                     /* Same ancestor level, has to be the same OID. */
    2198             :                     Assert(publish_as_relid == pub_relid);
    2199             :                 }
    2200             : 
    2201             :                 /* Track publications for this ancestor. */
    2202         398 :                 rel_publications = lappend(rel_publications, pub);
    2203             :             }
    2204             :         }
    2205             : 
    2206         442 :         entry->publish_as_relid = publish_as_relid;
    2207             : 
    2208             :         /*
    2209             :          * Initialize the tuple slot, map, and row filter. These are only used
    2210             :          * when publishing inserts, updates, or deletes.
    2211             :          */
    2212         442 :         if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
    2213          68 :             entry->pubactions.pubdelete)
    2214             :         {
    2215             :             /* Initialize the tuple slot and map */
    2216         374 :             init_tuple_slot(data, relation, entry);
    2217             : 
    2218             :             /* Initialize the row filter */
    2219         374 :             pgoutput_row_filter_init(data, rel_publications, entry);
    2220             : 
    2221             :             /* Initialize the column list */
    2222         374 :             pgoutput_column_list_init(data, rel_publications, entry);
    2223             :         }
    2224             : 
    2225         440 :         list_free(pubids);
    2226         440 :         list_free(schemaPubids);
    2227         440 :         list_free(rel_publications);
    2228             : 
    2229         440 :         entry->replicate_valid = true;
    2230             :     }
    2231             : 
    2232      346044 :     return entry;
    2233             : }
    2234             : 
    2235             : /*
    2236             :  * Cleanup list of streamed transactions and update the schema_sent flag.
    2237             :  *
    2238             :  * When a streamed transaction commits or aborts, we need to remove the
    2239             :  * toplevel XID from the schema cache. If the transaction aborted, the
    2240             :  * subscriber will simply throw away the schema records we streamed, so
    2241             :  * we don't need to do anything else.
    2242             :  *
    2243             :  * If the transaction is committed, the subscriber will update the relation
    2244             :  * cache - so tweak the schema_sent flag accordingly.
    2245             :  */
    2246             : static void
    2247          60 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
    2248             : {
    2249             :     HASH_SEQ_STATUS hash_seq;
    2250             :     RelationSyncEntry *entry;
    2251             :     ListCell   *lc;
    2252             : 
    2253             :     Assert(RelationSyncCache != NULL);
    2254             : 
    2255          60 :     hash_seq_init(&hash_seq, RelationSyncCache);
    2256         120 :     while ((entry = hash_seq_search(&hash_seq)) != NULL)
    2257             :     {
    2258             :         /*
    2259             :          * We can set the schema_sent flag for an entry that has committed xid
    2260             :          * in the list as that ensures that the subscriber would have the
    2261             :          * corresponding schema and we don't need to send it unless there is
    2262             :          * any invalidation for that relation.
    2263             :          */
    2264          72 :         foreach(lc, entry->streamed_txns)
    2265             :         {
    2266          54 :             if (xid == lfirst_xid(lc))
    2267             :             {
    2268          42 :                 if (is_commit)
    2269          30 :                     entry->schema_sent = true;
    2270             : 
    2271          42 :                 entry->streamed_txns =
    2272          42 :                     foreach_delete_current(entry->streamed_txns, lc);
    2273          42 :                 break;
    2274             :             }
    2275             :         }
    2276             :     }
    2277          60 : }
    2278             : 
    2279             : /*
    2280             :  * Relcache invalidation callback
    2281             :  */
    2282             : static void
    2283        5280 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
    2284             : {
    2285             :     RelationSyncEntry *entry;
    2286             : 
    2287             :     /*
    2288             :      * We can get here if the plugin was used in SQL interface as the
    2289             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
    2290             :      * is no way to unregister the relcache invalidation callback.
    2291             :      */
    2292        5280 :     if (RelationSyncCache == NULL)
    2293          10 :         return;
    2294             : 
    2295             :     /*
    2296             :      * Nobody keeps pointers to entries in this hash table around outside
    2297             :      * logical decoding callback calls - but invalidation events can come in
    2298             :      * *during* a callback if we do any syscache access in the callback.
    2299             :      * Because of that we must mark the cache entry as invalid but not damage
    2300             :      * any of its substructure here.  The next get_rel_sync_entry() call will
    2301             :      * rebuild it all.
    2302             :      */
    2303        5270 :     if (OidIsValid(relid))
    2304             :     {
    2305             :         /*
    2306             :          * Getting invalidations for relations that aren't in the table is
    2307             :          * entirely normal.  So we don't care if it's found or not.
    2308             :          */
    2309        5230 :         entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
    2310             :                                                   HASH_FIND, NULL);
    2311        5230 :         if (entry != NULL)
    2312         734 :             entry->replicate_valid = false;
    2313             :     }
    2314             :     else
    2315             :     {
    2316             :         /* Whole cache must be flushed. */
    2317             :         HASH_SEQ_STATUS status;
    2318             : 
    2319          40 :         hash_seq_init(&status, RelationSyncCache);
    2320         100 :         while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2321             :         {
    2322          60 :             entry->replicate_valid = false;
    2323             :         }
    2324             :     }
    2325             : }
    2326             : 
    2327             : /*
    2328             :  * Publication relation/schema map syscache invalidation callback
    2329             :  *
    2330             :  * Called for invalidations on pg_publication, pg_publication_rel, and
    2331             :  * pg_publication_namespace.
    2332             :  */
    2333             : static void
    2334        1040 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
    2335             : {
    2336             :     HASH_SEQ_STATUS status;
    2337             :     RelationSyncEntry *entry;
    2338             : 
    2339             :     /*
    2340             :      * We can get here if the plugin was used in SQL interface as the
    2341             :      * RelSchemaSyncCache is destroyed when the decoding finishes, but there
    2342             :      * is no way to unregister the relcache invalidation callback.
    2343             :      */
    2344        1040 :     if (RelationSyncCache == NULL)
    2345          30 :         return;
    2346             : 
    2347             :     /*
    2348             :      * There is no way to find which entry in our cache the hash belongs to so
    2349             :      * mark the whole cache as invalid.
    2350             :      */
    2351        1010 :     hash_seq_init(&status, RelationSyncCache);
    2352        3198 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2353             :     {
    2354        2188 :         entry->replicate_valid = false;
    2355             :     }
    2356             : }
    2357             : 
    2358             : /* Send Replication origin */
    2359             : static void
    2360        1474 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
    2361             :                  XLogRecPtr origin_lsn, bool send_origin)
    2362             : {
    2363        1474 :     if (send_origin)
    2364             :     {
    2365             :         char       *origin;
    2366             : 
    2367             :         /*----------
    2368             :          * XXX: which behaviour do we want here?
    2369             :          *
    2370             :          * Alternatives:
    2371             :          *  - don't send origin message if origin name not found
    2372             :          *    (that's what we do now)
    2373             :          *  - throw error - that will break replication, not good
    2374             :          *  - send some special "unknown" origin
    2375             :          *----------
    2376             :          */
    2377          14 :         if (replorigin_by_oid(origin_id, true, &origin))
    2378             :         {
    2379             :             /* Message boundary */
    2380          14 :             OutputPluginWrite(ctx, false);
    2381          14 :             OutputPluginPrepareWrite(ctx, true);
    2382             : 
    2383          14 :             logicalrep_write_origin(ctx->out, origin, origin_lsn);
    2384             :         }
    2385             :     }
    2386        1474 : }
    2387             : 
    2388             : /*
    2389             :  * Try to update progress and send a keepalive message if too many changes were
    2390             :  * processed.
    2391             :  *
    2392             :  * For a large transaction, if we don't send any change to the downstream for a
    2393             :  * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
    2394             :  * This can happen when all or most of the changes are either not published or
    2395             :  * got filtered out.
    2396             :  */
    2397             : static void
    2398      347164 : update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
    2399             : {
    2400             :     static int  changes_count = 0;
    2401             : 
    2402             :     /*
    2403             :      * We don't want to try sending a keepalive message after processing each
    2404             :      * change as that can have overhead. Tests revealed that there is no
    2405             :      * noticeable overhead in doing it after continuously processing 100 or so
    2406             :      * changes.
    2407             :      */
    2408             : #define CHANGES_THRESHOLD 100
    2409             : 
    2410             :     /*
    2411             :      * If we are at the end of transaction LSN, update progress tracking.
    2412             :      * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
    2413             :      * try to send a keepalive message if required.
    2414             :      */
    2415      347164 :     if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
    2416             :     {
    2417        4526 :         OutputPluginUpdateProgress(ctx, skipped_xact);
    2418        4526 :         changes_count = 0;
    2419             :     }
    2420      347164 : }

Generated by: LCOV version 1.14