LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 735 764 96.2 %
Date: 2025-01-18 04:15:08 Functions: 41 41 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pgoutput.c
       4             :  *      Logical Replication output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/backend/replication/pgoutput/pgoutput.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/tupconvert.h"
      16             : #include "catalog/partition.h"
      17             : #include "catalog/pg_publication.h"
      18             : #include "catalog/pg_publication_rel.h"
      19             : #include "catalog/pg_subscription.h"
      20             : #include "commands/defrem.h"
      21             : #include "commands/subscriptioncmds.h"
      22             : #include "executor/executor.h"
      23             : #include "fmgr.h"
      24             : #include "nodes/makefuncs.h"
      25             : #include "parser/parse_relation.h"
      26             : #include "replication/logical.h"
      27             : #include "replication/logicalproto.h"
      28             : #include "replication/origin.h"
      29             : #include "replication/pgoutput.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/inval.h"
      32             : #include "utils/lsyscache.h"
      33             : #include "utils/memutils.h"
      34             : #include "utils/rel.h"
      35             : #include "utils/syscache.h"
      36             : #include "utils/varlena.h"
      37             : 
      38         948 : PG_MODULE_MAGIC;
      39             : 
      40             : static void pgoutput_startup(LogicalDecodingContext *ctx,
      41             :                              OutputPluginOptions *opt, bool is_init);
      42             : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      43             : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      44             :                                ReorderBufferTXN *txn);
      45             : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      46             :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      47             : static void pgoutput_change(LogicalDecodingContext *ctx,
      48             :                             ReorderBufferTXN *txn, Relation relation,
      49             :                             ReorderBufferChange *change);
      50             : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      51             :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      52             :                               ReorderBufferChange *change);
      53             : static void pgoutput_message(LogicalDecodingContext *ctx,
      54             :                              ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      55             :                              bool transactional, const char *prefix,
      56             :                              Size sz, const char *message);
      57             : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      58             :                                    RepOriginId origin_id);
      59             : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
      60             :                                        ReorderBufferTXN *txn);
      61             : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
      62             :                                  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      63             : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
      64             :                                          ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      65             : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
      66             :                                            ReorderBufferTXN *txn,
      67             :                                            XLogRecPtr prepare_end_lsn,
      68             :                                            TimestampTz prepare_time);
      69             : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
      70             :                                   ReorderBufferTXN *txn);
      71             : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
      72             :                                  ReorderBufferTXN *txn);
      73             : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
      74             :                                   ReorderBufferTXN *txn,
      75             :                                   XLogRecPtr abort_lsn);
      76             : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
      77             :                                    ReorderBufferTXN *txn,
      78             :                                    XLogRecPtr commit_lsn);
      79             : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
      80             :                                         ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      81             : 
      82             : static bool publications_valid;
      83             : 
      84             : static List *LoadPublications(List *pubnames);
      85             : static void publication_invalidation_cb(Datum arg, int cacheid,
      86             :                                         uint32 hashvalue);
      87             : static void send_repl_origin(LogicalDecodingContext *ctx,
      88             :                              RepOriginId origin_id, XLogRecPtr origin_lsn,
      89             :                              bool send_origin);
      90             : 
      91             : /*
      92             :  * Only 3 publication actions are used for row filtering ("insert", "update",
      93             :  * "delete"). See RelationSyncEntry.exprstate[].
      94             :  */
      95             : enum RowFilterPubAction
      96             : {
      97             :     PUBACTION_INSERT,
      98             :     PUBACTION_UPDATE,
      99             :     PUBACTION_DELETE,
     100             : };
     101             : 
     102             : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
     103             : 
     104             : /*
     105             :  * Entry in the map used to remember which relation schemas we sent.
     106             :  *
     107             :  * The schema_sent flag determines if the current schema record for the
     108             :  * relation (and for its ancestor if publish_as_relid is set) was already
     109             :  * sent to the subscriber (in which case we don't need to send it again).
     110             :  *
     111             :  * The schema cache on downstream is however updated only at commit time,
     112             :  * and with streamed transactions the commit order may be different from
     113             :  * the order the transactions are sent in. Also, the (sub) transactions
     114             :  * might get aborted so we need to send the schema for each (sub) transaction
     115             :  * so that we don't lose the schema information on abort. For handling this,
     116             :  * we maintain the list of xids (streamed_txns) for those we have already sent
     117             :  * the schema.
     118             :  *
     119             :  * For partitions, 'pubactions' considers not only the table's own
     120             :  * publications, but also those of all of its ancestors.
     121             :  */
     122             : typedef struct RelationSyncEntry
     123             : {
     124             :     Oid         relid;          /* relation oid */
     125             : 
     126             :     bool        replicate_valid;    /* overall validity flag for entry */
     127             : 
     128             :     bool        schema_sent;
     129             : 
     130             :     /*
     131             :      * This is set if the 'publish_generated_columns' parameter is true, and
     132             :      * the relation contains generated columns.
     133             :      */
     134             :     bool        include_gencols;
     135             :     List       *streamed_txns;  /* streamed toplevel transactions with this
     136             :                                  * schema */
     137             : 
     138             :     /* are we publishing this rel? */
     139             :     PublicationActions pubactions;
     140             : 
     141             :     /*
     142             :      * ExprState array for row filter. Different publication actions don't
     143             :      * allow multiple expressions to always be combined into one, because
     144             :      * updates or deletes restrict the column in expression to be part of the
     145             :      * replica identity index whereas inserts do not have this restriction, so
     146             :      * there is one ExprState per publication action.
     147             :      */
     148             :     ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
     149             :     EState     *estate;         /* executor state used for row filter */
     150             :     TupleTableSlot *new_slot;   /* slot for storing new tuple */
     151             :     TupleTableSlot *old_slot;   /* slot for storing old tuple */
     152             : 
     153             :     /*
     154             :      * OID of the relation to publish changes as.  For a partition, this may
     155             :      * be set to one of its ancestors whose schema will be used when
     156             :      * replicating changes, if publish_via_partition_root is set for the
     157             :      * publication.
     158             :      */
     159             :     Oid         publish_as_relid;
     160             : 
     161             :     /*
     162             :      * Map used when replicating using an ancestor's schema to convert tuples
     163             :      * from partition's type to the ancestor's; NULL if publish_as_relid is
     164             :      * same as 'relid' or if unnecessary due to partition and the ancestor
     165             :      * having identical TupleDesc.
     166             :      */
     167             :     AttrMap    *attrmap;
     168             : 
     169             :     /*
     170             :      * Columns included in the publication, or NULL if all columns are
     171             :      * included implicitly.  Note that the attnums in this bitmap are not
     172             :      * shifted by FirstLowInvalidHeapAttributeNumber.
     173             :      */
     174             :     Bitmapset  *columns;
     175             : 
     176             :     /*
     177             :      * Private context to store additional data for this entry - state for the
     178             :      * row filter expressions, column list, etc.
     179             :      */
     180             :     MemoryContext entry_cxt;
     181             : } RelationSyncEntry;
     182             : 
     183             : /*
     184             :  * Maintain a per-transaction level variable to track whether the transaction
     185             :  * has sent BEGIN. BEGIN is only sent when the first change in a transaction
     186             :  * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
     187             :  * messages for empty transactions which saves network bandwidth.
     188             :  *
     189             :  * This optimization is not used for prepared transactions because if the
     190             :  * WALSender restarts after prepare of a transaction and before commit prepared
     191             :  * of the same transaction then we won't be able to figure out if we have
     192             :  * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
     193             :  * because we would have lost the in-memory txndata information that was
     194             :  * present prior to the restart. This will result in sending a spurious
     195             :  * COMMIT PREPARED without a corresponding prepared transaction at the
     196             :  * downstream which would lead to an error when it tries to process it.
     197             :  *
     198             :  * XXX We could achieve this optimization by changing protocol to send
     199             :  * additional information so that downstream can detect that the corresponding
     200             :  * prepare has not been sent. However, adding such a check for every
     201             :  * transaction in the downstream could be costly so we might want to do it
     202             :  * optionally.
     203             :  *
     204             :  * We also don't have this optimization for streamed transactions because
     205             :  * they can contain prepared transactions.
     206             :  */
     207             : typedef struct PGOutputTxnData
     208             : {
     209             :     bool        sent_begin_txn; /* flag indicating whether BEGIN has been sent */
     210             : } PGOutputTxnData;
     211             : 
     212             : /* Map used to remember which relation schemas we sent. */
     213             : static HTAB *RelationSyncCache = NULL;
     214             : 
     215             : static void init_rel_sync_cache(MemoryContext cachectx);
     216             : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
     217             : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
     218             :                                              Relation relation);
     219             : static void send_relation_and_attrs(Relation relation, TransactionId xid,
     220             :                                     LogicalDecodingContext *ctx,
     221             :                                     RelationSyncEntry *relentry);
     222             : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
     223             : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
     224             :                                           uint32 hashvalue);
     225             : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     226             :                                             TransactionId xid);
     227             : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     228             :                                             TransactionId xid);
     229             : static void init_tuple_slot(PGOutputData *data, Relation relation,
     230             :                             RelationSyncEntry *entry);
     231             : 
     232             : /* row filter routines */
     233             : static EState *create_estate_for_relation(Relation rel);
     234             : static void pgoutput_row_filter_init(PGOutputData *data,
     235             :                                      List *publications,
     236             :                                      RelationSyncEntry *entry);
     237             : static bool pgoutput_row_filter_exec_expr(ExprState *state,
     238             :                                           ExprContext *econtext);
     239             : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
     240             :                                 TupleTableSlot **new_slot_ptr,
     241             :                                 RelationSyncEntry *entry,
     242             :                                 ReorderBufferChangeType *action);
     243             : 
     244             : /* column list routines */
     245             : static void pgoutput_column_list_init(PGOutputData *data,
     246             :                                       List *publications,
     247             :                                       RelationSyncEntry *entry);
     248             : 
     249             : /*
     250             :  * Specify output plugin callbacks
     251             :  */
     252             : void
     253        1304 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     254             : {
     255        1304 :     cb->startup_cb = pgoutput_startup;
     256        1304 :     cb->begin_cb = pgoutput_begin_txn;
     257        1304 :     cb->change_cb = pgoutput_change;
     258        1304 :     cb->truncate_cb = pgoutput_truncate;
     259        1304 :     cb->message_cb = pgoutput_message;
     260        1304 :     cb->commit_cb = pgoutput_commit_txn;
     261             : 
     262        1304 :     cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
     263        1304 :     cb->prepare_cb = pgoutput_prepare_txn;
     264        1304 :     cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
     265        1304 :     cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
     266        1304 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
     267        1304 :     cb->shutdown_cb = pgoutput_shutdown;
     268             : 
     269             :     /* transaction streaming */
     270        1304 :     cb->stream_start_cb = pgoutput_stream_start;
     271        1304 :     cb->stream_stop_cb = pgoutput_stream_stop;
     272        1304 :     cb->stream_abort_cb = pgoutput_stream_abort;
     273        1304 :     cb->stream_commit_cb = pgoutput_stream_commit;
     274        1304 :     cb->stream_change_cb = pgoutput_change;
     275        1304 :     cb->stream_message_cb = pgoutput_message;
     276        1304 :     cb->stream_truncate_cb = pgoutput_truncate;
     277             :     /* transaction streaming - two-phase commit */
     278        1304 :     cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
     279        1304 : }
     280             : 
     281             : static void
     282         702 : parse_output_parameters(List *options, PGOutputData *data)
     283             : {
     284             :     ListCell   *lc;
     285         702 :     bool        protocol_version_given = false;
     286         702 :     bool        publication_names_given = false;
     287         702 :     bool        binary_option_given = false;
     288         702 :     bool        messages_option_given = false;
     289         702 :     bool        streaming_given = false;
     290         702 :     bool        two_phase_option_given = false;
     291         702 :     bool        origin_option_given = false;
     292             : 
     293         702 :     data->binary = false;
     294         702 :     data->streaming = LOGICALREP_STREAM_OFF;
     295         702 :     data->messages = false;
     296         702 :     data->two_phase = false;
     297             : 
     298        3508 :     foreach(lc, options)
     299             :     {
     300        2806 :         DefElem    *defel = (DefElem *) lfirst(lc);
     301             : 
     302             :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     303             : 
     304             :         /* Check each param, whether or not we recognize it */
     305        2806 :         if (strcmp(defel->defname, "proto_version") == 0)
     306             :         {
     307             :             unsigned long parsed;
     308             :             char       *endptr;
     309             : 
     310         702 :             if (protocol_version_given)
     311           0 :                 ereport(ERROR,
     312             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     313             :                          errmsg("conflicting or redundant options")));
     314         702 :             protocol_version_given = true;
     315             : 
     316         702 :             errno = 0;
     317         702 :             parsed = strtoul(strVal(defel->arg), &endptr, 10);
     318         702 :             if (errno != 0 || *endptr != '\0')
     319           0 :                 ereport(ERROR,
     320             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     321             :                          errmsg("invalid proto_version")));
     322             : 
     323         702 :             if (parsed > PG_UINT32_MAX)
     324           0 :                 ereport(ERROR,
     325             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     326             :                          errmsg("proto_version \"%s\" out of range",
     327             :                                 strVal(defel->arg))));
     328             : 
     329         702 :             data->protocol_version = (uint32) parsed;
     330             :         }
     331        2104 :         else if (strcmp(defel->defname, "publication_names") == 0)
     332             :         {
     333         702 :             if (publication_names_given)
     334           0 :                 ereport(ERROR,
     335             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     336             :                          errmsg("conflicting or redundant options")));
     337         702 :             publication_names_given = true;
     338             : 
     339         702 :             if (!SplitIdentifierString(strVal(defel->arg), ',',
     340             :                                        &data->publication_names))
     341           0 :                 ereport(ERROR,
     342             :                         (errcode(ERRCODE_INVALID_NAME),
     343             :                          errmsg("invalid publication_names syntax")));
     344             :         }
     345        1402 :         else if (strcmp(defel->defname, "binary") == 0)
     346             :         {
     347          22 :             if (binary_option_given)
     348           0 :                 ereport(ERROR,
     349             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     350             :                          errmsg("conflicting or redundant options")));
     351          22 :             binary_option_given = true;
     352             : 
     353          22 :             data->binary = defGetBoolean(defel);
     354             :         }
     355        1380 :         else if (strcmp(defel->defname, "messages") == 0)
     356             :         {
     357           8 :             if (messages_option_given)
     358           0 :                 ereport(ERROR,
     359             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     360             :                          errmsg("conflicting or redundant options")));
     361           8 :             messages_option_given = true;
     362             : 
     363           8 :             data->messages = defGetBoolean(defel);
     364             :         }
     365        1372 :         else if (strcmp(defel->defname, "streaming") == 0)
     366             :         {
     367         670 :             if (streaming_given)
     368           0 :                 ereport(ERROR,
     369             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     370             :                          errmsg("conflicting or redundant options")));
     371         670 :             streaming_given = true;
     372             : 
     373         670 :             data->streaming = defGetStreamingMode(defel);
     374             :         }
     375         702 :         else if (strcmp(defel->defname, "two_phase") == 0)
     376             :         {
     377          14 :             if (two_phase_option_given)
     378           0 :                 ereport(ERROR,
     379             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     380             :                          errmsg("conflicting or redundant options")));
     381          14 :             two_phase_option_given = true;
     382             : 
     383          14 :             data->two_phase = defGetBoolean(defel);
     384             :         }
     385         688 :         else if (strcmp(defel->defname, "origin") == 0)
     386             :         {
     387             :             char       *origin;
     388             : 
     389         688 :             if (origin_option_given)
     390           0 :                 ereport(ERROR,
     391             :                         errcode(ERRCODE_SYNTAX_ERROR),
     392             :                         errmsg("conflicting or redundant options"));
     393         688 :             origin_option_given = true;
     394             : 
     395         688 :             origin = defGetString(defel);
     396         688 :             if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
     397          26 :                 data->publish_no_origin = true;
     398         662 :             else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
     399         662 :                 data->publish_no_origin = false;
     400             :             else
     401           0 :                 ereport(ERROR,
     402             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     403             :                         errmsg("unrecognized origin value: \"%s\"", origin));
     404             :         }
     405             :         else
     406           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     407             :     }
     408             : 
     409             :     /* Check required options */
     410         702 :     if (!protocol_version_given)
     411           0 :         ereport(ERROR,
     412             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     413             :                 errmsg("option \"%s\" missing", "proto_version"));
     414         702 :     if (!publication_names_given)
     415           0 :         ereport(ERROR,
     416             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     417             :                 errmsg("option \"%s\" missing", "publication_names"));
     418         702 : }
     419             : 
     420             : /*
     421             :  * Initialize this plugin
     422             :  */
     423             : static void
     424        1304 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     425             :                  bool is_init)
     426             : {
     427        1304 :     PGOutputData *data = palloc0(sizeof(PGOutputData));
     428             :     static bool publication_callback_registered = false;
     429             : 
     430             :     /* Create our memory context for private allocations. */
     431        1304 :     data->context = AllocSetContextCreate(ctx->context,
     432             :                                           "logical replication output context",
     433             :                                           ALLOCSET_DEFAULT_SIZES);
     434             : 
     435        1304 :     data->cachectx = AllocSetContextCreate(ctx->context,
     436             :                                            "logical replication cache context",
     437             :                                            ALLOCSET_DEFAULT_SIZES);
     438             : 
     439        1304 :     data->pubctx = AllocSetContextCreate(ctx->context,
     440             :                                          "logical replication publication list context",
     441             :                                          ALLOCSET_SMALL_SIZES);
     442             : 
     443        1304 :     ctx->output_plugin_private = data;
     444             : 
     445             :     /* This plugin uses binary protocol. */
     446        1304 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     447             : 
     448             :     /*
     449             :      * This is replication start and not slot initialization.
     450             :      *
     451             :      * Parse and validate options passed by the client.
     452             :      */
     453        1304 :     if (!is_init)
     454             :     {
     455             :         /* Parse the params and ERROR if we see any we don't recognize */
     456         702 :         parse_output_parameters(ctx->output_plugin_options, data);
     457             : 
     458             :         /* Check if we support requested protocol */
     459         702 :         if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
     460           0 :             ereport(ERROR,
     461             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     462             :                      errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
     463             :                             data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
     464             : 
     465         702 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     466           0 :             ereport(ERROR,
     467             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     468             :                      errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
     469             :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     470             : 
     471             :         /*
     472             :          * Decide whether to enable streaming. It is disabled by default, in
     473             :          * which case we just update the flag in decoding context. Otherwise
     474             :          * we only allow it with sufficient version of the protocol, and when
     475             :          * the output plugin supports it.
     476             :          */
     477         702 :         if (data->streaming == LOGICALREP_STREAM_OFF)
     478          32 :             ctx->streaming = false;
     479         670 :         else if (data->streaming == LOGICALREP_STREAM_ON &&
     480          54 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
     481           0 :             ereport(ERROR,
     482             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     483             :                      errmsg("requested proto_version=%d does not support streaming, need %d or higher",
     484             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
     485         670 :         else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
     486         616 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
     487           0 :             ereport(ERROR,
     488             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     489             :                      errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
     490             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
     491         670 :         else if (!ctx->streaming)
     492           0 :             ereport(ERROR,
     493             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     494             :                      errmsg("streaming requested, but not supported by output plugin")));
     495             : 
     496             :         /*
     497             :          * Here, we just check whether the two-phase option is passed by
     498             :          * plugin and decide whether to enable it at later point of time. It
     499             :          * remains enabled if the previous start-up has done so. But we only
     500             :          * allow the option to be passed in with sufficient version of the
     501             :          * protocol, and when the output plugin supports it.
     502             :          */
     503         702 :         if (!data->two_phase)
     504         688 :             ctx->twophase_opt_given = false;
     505          14 :         else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
     506           0 :             ereport(ERROR,
     507             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     508             :                      errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
     509             :                             data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
     510          14 :         else if (!ctx->twophase)
     511           0 :             ereport(ERROR,
     512             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     513             :                      errmsg("two-phase commit requested, but not supported by output plugin")));
     514             :         else
     515          14 :             ctx->twophase_opt_given = true;
     516             : 
     517             :         /* Init publication state. */
     518         702 :         data->publications = NIL;
     519         702 :         publications_valid = false;
     520             : 
     521             :         /*
     522             :          * Register callback for pg_publication if we didn't already do that
     523             :          * during some previous call in this process.
     524             :          */
     525         702 :         if (!publication_callback_registered)
     526             :         {
     527         700 :             CacheRegisterSyscacheCallback(PUBLICATIONOID,
     528             :                                           publication_invalidation_cb,
     529             :                                           (Datum) 0);
     530         700 :             publication_callback_registered = true;
     531             :         }
     532             : 
     533             :         /* Initialize relation schema cache. */
     534         702 :         init_rel_sync_cache(CacheMemoryContext);
     535             :     }
     536             :     else
     537             :     {
     538             :         /*
     539             :          * Disable the streaming and prepared transactions during the slot
     540             :          * initialization mode.
     541             :          */
     542         602 :         ctx->streaming = false;
     543         602 :         ctx->twophase = false;
     544             :     }
     545        1304 : }
     546             : 
     547             : /*
     548             :  * BEGIN callback.
     549             :  *
     550             :  * Don't send the BEGIN message here instead postpone it until the first
     551             :  * change. In logical replication, a common scenario is to replicate a set of
     552             :  * tables (instead of all tables) and transactions whose changes were on
     553             :  * the table(s) that are not published will produce empty transactions. These
     554             :  * empty transactions will send BEGIN and COMMIT messages to subscribers,
     555             :  * using bandwidth on something with little/no use for logical replication.
     556             :  */
     557             : static void
     558        1430 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     559             : {
     560        1430 :     PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
     561             :                                                       sizeof(PGOutputTxnData));
     562             : 
     563        1430 :     txn->output_plugin_private = txndata;
     564        1430 : }
     565             : 
     566             : /*
     567             :  * Send BEGIN.
     568             :  *
     569             :  * This is called while processing the first change of the transaction.
     570             :  */
     571             : static void
     572         820 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     573             : {
     574         820 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     575         820 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     576             : 
     577             :     Assert(txndata);
     578             :     Assert(!txndata->sent_begin_txn);
     579             : 
     580         820 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     581         820 :     logicalrep_write_begin(ctx->out, txn);
     582         820 :     txndata->sent_begin_txn = true;
     583             : 
     584         820 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     585             :                      send_replication_origin);
     586             : 
     587         820 :     OutputPluginWrite(ctx, true);
     588         818 : }
     589             : 
     590             : /*
     591             :  * COMMIT callback
     592             :  */
     593             : static void
     594        1422 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     595             :                     XLogRecPtr commit_lsn)
     596             : {
     597        1422 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     598             :     bool        sent_begin_txn;
     599             : 
     600             :     Assert(txndata);
     601             : 
     602             :     /*
     603             :      * We don't need to send the commit message unless some relevant change
     604             :      * from this transaction has been sent to the downstream.
     605             :      */
     606        1422 :     sent_begin_txn = txndata->sent_begin_txn;
     607        1422 :     OutputPluginUpdateProgress(ctx, !sent_begin_txn);
     608        1422 :     pfree(txndata);
     609        1422 :     txn->output_plugin_private = NULL;
     610             : 
     611        1422 :     if (!sent_begin_txn)
     612             :     {
     613         604 :         elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
     614         604 :         return;
     615             :     }
     616             : 
     617         818 :     OutputPluginPrepareWrite(ctx, true);
     618         818 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     619         818 :     OutputPluginWrite(ctx, true);
     620             : }
     621             : 
     622             : /*
     623             :  * BEGIN PREPARE callback
     624             :  */
     625             : static void
     626          40 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     627             : {
     628          40 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     629             : 
     630          40 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     631          40 :     logicalrep_write_begin_prepare(ctx->out, txn);
     632             : 
     633          40 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     634             :                      send_replication_origin);
     635             : 
     636          40 :     OutputPluginWrite(ctx, true);
     637          40 : }
     638             : 
     639             : /*
     640             :  * PREPARE callback
     641             :  */
     642             : static void
     643          40 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     644             :                      XLogRecPtr prepare_lsn)
     645             : {
     646          40 :     OutputPluginUpdateProgress(ctx, false);
     647             : 
     648          40 :     OutputPluginPrepareWrite(ctx, true);
     649          40 :     logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
     650          40 :     OutputPluginWrite(ctx, true);
     651          40 : }
     652             : 
     653             : /*
     654             :  * COMMIT PREPARED callback
     655             :  */
     656             : static void
     657          48 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     658             :                              XLogRecPtr commit_lsn)
     659             : {
     660          48 :     OutputPluginUpdateProgress(ctx, false);
     661             : 
     662          48 :     OutputPluginPrepareWrite(ctx, true);
     663          48 :     logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
     664          48 :     OutputPluginWrite(ctx, true);
     665          48 : }
     666             : 
     667             : /*
     668             :  * ROLLBACK PREPARED callback
     669             :  */
     670             : static void
     671          18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
     672             :                                ReorderBufferTXN *txn,
     673             :                                XLogRecPtr prepare_end_lsn,
     674             :                                TimestampTz prepare_time)
     675             : {
     676          18 :     OutputPluginUpdateProgress(ctx, false);
     677             : 
     678          18 :     OutputPluginPrepareWrite(ctx, true);
     679          18 :     logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
     680             :                                        prepare_time);
     681          18 :     OutputPluginWrite(ctx, true);
     682          18 : }
     683             : 
     684             : /*
     685             :  * Write the current schema of the relation and its ancestor (if any) if not
     686             :  * done yet.
     687             :  */
     688             : static void
     689      364450 : maybe_send_schema(LogicalDecodingContext *ctx,
     690             :                   ReorderBufferChange *change,
     691             :                   Relation relation, RelationSyncEntry *relentry)
     692             : {
     693      364450 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     694             :     bool        schema_sent;
     695      364450 :     TransactionId xid = InvalidTransactionId;
     696      364450 :     TransactionId topxid = InvalidTransactionId;
     697             : 
     698             :     /*
     699             :      * Remember XID of the (sub)transaction for the change. We don't care if
     700             :      * it's top-level transaction or not (we have already sent that XID in
     701             :      * start of the current streaming block).
     702             :      *
     703             :      * If we're not in a streaming block, just use InvalidTransactionId and
     704             :      * the write methods will not include it.
     705             :      */
     706      364450 :     if (data->in_streaming)
     707      351882 :         xid = change->txn->xid;
     708             : 
     709      364450 :     if (rbtxn_is_subtxn(change->txn))
     710       20338 :         topxid = rbtxn_get_toptxn(change->txn)->xid;
     711             :     else
     712      344112 :         topxid = xid;
     713             : 
     714             :     /*
     715             :      * Do we need to send the schema? We do track streamed transactions
     716             :      * separately, because those may be applied later (and the regular
     717             :      * transactions won't see their effects until then) and in an order that
     718             :      * we don't know at this point.
     719             :      *
     720             :      * XXX There is a scope of optimization here. Currently, we always send
     721             :      * the schema first time in a streaming transaction but we can probably
     722             :      * avoid that by checking 'relentry->schema_sent' flag. However, before
     723             :      * doing that we need to study its impact on the case where we have a mix
     724             :      * of streaming and non-streaming transactions.
     725             :      */
     726      364450 :     if (data->in_streaming)
     727      351882 :         schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
     728             :     else
     729       12568 :         schema_sent = relentry->schema_sent;
     730             : 
     731             :     /* Nothing to do if we already sent the schema. */
     732      364450 :     if (schema_sent)
     733      363836 :         return;
     734             : 
     735             :     /*
     736             :      * Send the schema.  If the changes will be published using an ancestor's
     737             :      * schema, not the relation's own, send that ancestor's schema before
     738             :      * sending relation's own (XXX - maybe sending only the former suffices?).
     739             :      */
     740         614 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
     741             :     {
     742          62 :         Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     743             : 
     744          62 :         send_relation_and_attrs(ancestor, xid, ctx, relentry);
     745          62 :         RelationClose(ancestor);
     746             :     }
     747             : 
     748         614 :     send_relation_and_attrs(relation, xid, ctx, relentry);
     749             : 
     750         614 :     if (data->in_streaming)
     751         142 :         set_schema_sent_in_streamed_txn(relentry, topxid);
     752             :     else
     753         472 :         relentry->schema_sent = true;
     754             : }
     755             : 
     756             : /*
     757             :  * Sends a relation
     758             :  */
     759             : static void
     760         676 : send_relation_and_attrs(Relation relation, TransactionId xid,
     761             :                         LogicalDecodingContext *ctx,
     762             :                         RelationSyncEntry *relentry)
     763             : {
     764         676 :     TupleDesc   desc = RelationGetDescr(relation);
     765         676 :     Bitmapset  *columns = relentry->columns;
     766         676 :     bool        include_gencols = relentry->include_gencols;
     767             :     int         i;
     768             : 
     769             :     /*
     770             :      * Write out type info if needed.  We do that only for user-created types.
     771             :      * We use FirstGenbkiObjectId as the cutoff, so that we only consider
     772             :      * objects with hand-assigned OIDs to be "built in", not for instance any
     773             :      * function or type defined in the information_schema. This is important
     774             :      * because only hand-assigned OIDs can be expected to remain stable across
     775             :      * major versions.
     776             :      */
     777        2060 :     for (i = 0; i < desc->natts; i++)
     778             :     {
     779        1384 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     780             : 
     781        1384 :         if (!logicalrep_should_publish_column(att, columns, include_gencols))
     782         136 :             continue;
     783             : 
     784        1248 :         if (att->atttypid < FirstGenbkiObjectId)
     785        1212 :             continue;
     786             : 
     787          36 :         OutputPluginPrepareWrite(ctx, false);
     788          36 :         logicalrep_write_typ(ctx->out, xid, att->atttypid);
     789          36 :         OutputPluginWrite(ctx, false);
     790             :     }
     791             : 
     792         676 :     OutputPluginPrepareWrite(ctx, false);
     793         676 :     logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols);
     794         676 :     OutputPluginWrite(ctx, false);
     795         676 : }
     796             : 
     797             : /*
     798             :  * Executor state preparation for evaluation of row filter expressions for the
     799             :  * specified relation.
     800             :  */
     801             : static EState *
     802          32 : create_estate_for_relation(Relation rel)
     803             : {
     804             :     EState     *estate;
     805             :     RangeTblEntry *rte;
     806          32 :     List       *perminfos = NIL;
     807             : 
     808          32 :     estate = CreateExecutorState();
     809             : 
     810          32 :     rte = makeNode(RangeTblEntry);
     811          32 :     rte->rtekind = RTE_RELATION;
     812          32 :     rte->relid = RelationGetRelid(rel);
     813          32 :     rte->relkind = rel->rd_rel->relkind;
     814          32 :     rte->rellockmode = AccessShareLock;
     815             : 
     816          32 :     addRTEPermissionInfo(&perminfos, rte);
     817             : 
     818          32 :     ExecInitRangeTable(estate, list_make1(rte), perminfos);
     819             : 
     820          32 :     estate->es_output_cid = GetCurrentCommandId(false);
     821             : 
     822          32 :     return estate;
     823             : }
     824             : 
     825             : /*
     826             :  * Evaluates row filter.
     827             :  *
     828             :  * If the row filter evaluates to NULL, it is taken as false i.e. the change
     829             :  * isn't replicated.
     830             :  */
     831             : static bool
     832          72 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
     833             : {
     834             :     Datum       ret;
     835             :     bool        isnull;
     836             : 
     837             :     Assert(state != NULL);
     838             : 
     839          72 :     ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
     840             : 
     841          72 :     elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
     842             :          isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
     843             :          isnull ? "true" : "false");
     844             : 
     845          72 :     if (isnull)
     846           2 :         return false;
     847             : 
     848          70 :     return DatumGetBool(ret);
     849             : }
     850             : 
     851             : /*
     852             :  * Make sure the per-entry memory context exists.
     853             :  */
     854             : static void
     855         572 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
     856             : {
     857             :     Relation    relation;
     858             : 
     859             :     /* The context may already exist, in which case bail out. */
     860         572 :     if (entry->entry_cxt)
     861          32 :         return;
     862             : 
     863         540 :     relation = RelationIdGetRelation(entry->publish_as_relid);
     864             : 
     865         540 :     entry->entry_cxt = AllocSetContextCreate(data->cachectx,
     866             :                                              "entry private context",
     867             :                                              ALLOCSET_SMALL_SIZES);
     868             : 
     869         540 :     MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
     870             :                                       RelationGetRelationName(relation));
     871             : }
     872             : 
     873             : /*
     874             :  * Initialize the row filter.
     875             :  */
     876             : static void
     877         540 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
     878             :                          RelationSyncEntry *entry)
     879             : {
     880             :     ListCell   *lc;
     881         540 :     List       *rfnodes[] = {NIL, NIL, NIL};    /* One per pubaction */
     882         540 :     bool        no_filter[] = {false, false, false};    /* One per pubaction */
     883             :     MemoryContext oldctx;
     884             :     int         idx;
     885         540 :     bool        has_filter = true;
     886         540 :     Oid         schemaid = get_rel_namespace(entry->publish_as_relid);
     887             : 
     888             :     /*
     889             :      * Find if there are any row filters for this relation. If there are, then
     890             :      * prepare the necessary ExprState and cache it in entry->exprstate. To
     891             :      * build an expression state, we need to ensure the following:
     892             :      *
     893             :      * All the given publication-table mappings must be checked.
     894             :      *
     895             :      * Multiple publications might have multiple row filters for this
     896             :      * relation. Since row filter usage depends on the DML operation, there
     897             :      * are multiple lists (one for each operation) to which row filters will
     898             :      * be appended.
     899             :      *
     900             :      * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
     901             :      * expression" so it takes precedence.
     902             :      */
     903         580 :     foreach(lc, publications)
     904             :     {
     905         548 :         Publication *pub = lfirst(lc);
     906         548 :         HeapTuple   rftuple = NULL;
     907         548 :         Datum       rfdatum = 0;
     908         548 :         bool        pub_no_filter = true;
     909             : 
     910             :         /*
     911             :          * If the publication is FOR ALL TABLES, or the publication includes a
     912             :          * FOR TABLES IN SCHEMA where the table belongs to the referred
     913             :          * schema, then it is treated the same as if there are no row filters
     914             :          * (even if other publications have a row filter).
     915             :          */
     916         548 :         if (!pub->alltables &&
     917         388 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     918             :                                    ObjectIdGetDatum(schemaid),
     919             :                                    ObjectIdGetDatum(pub->oid)))
     920             :         {
     921             :             /*
     922             :              * Check for the presence of a row filter in this publication.
     923             :              */
     924         376 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     925             :                                       ObjectIdGetDatum(entry->publish_as_relid),
     926             :                                       ObjectIdGetDatum(pub->oid));
     927             : 
     928         376 :             if (HeapTupleIsValid(rftuple))
     929             :             {
     930             :                 /* Null indicates no filter. */
     931         352 :                 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     932             :                                           Anum_pg_publication_rel_prqual,
     933             :                                           &pub_no_filter);
     934             :             }
     935             :         }
     936             : 
     937         548 :         if (pub_no_filter)
     938             :         {
     939         522 :             if (rftuple)
     940         326 :                 ReleaseSysCache(rftuple);
     941             : 
     942         522 :             no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
     943         522 :             no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
     944         522 :             no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
     945             : 
     946             :             /*
     947             :              * Quick exit if all the DML actions are publicized via this
     948             :              * publication.
     949             :              */
     950         522 :             if (no_filter[PUBACTION_INSERT] &&
     951         522 :                 no_filter[PUBACTION_UPDATE] &&
     952         508 :                 no_filter[PUBACTION_DELETE])
     953             :             {
     954         508 :                 has_filter = false;
     955         508 :                 break;
     956             :             }
     957             : 
     958             :             /* No additional work for this publication. Next one. */
     959          14 :             continue;
     960             :         }
     961             : 
     962             :         /* Form the per pubaction row filter lists. */
     963          26 :         if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
     964          26 :             rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
     965          26 :                                                 TextDatumGetCString(rfdatum));
     966          26 :         if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
     967          26 :             rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
     968          26 :                                                 TextDatumGetCString(rfdatum));
     969          26 :         if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
     970          26 :             rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
     971          26 :                                                 TextDatumGetCString(rfdatum));
     972             : 
     973          26 :         ReleaseSysCache(rftuple);
     974             :     }                           /* loop all subscribed publications */
     975             : 
     976             :     /* Clean the row filter */
     977        2160 :     for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
     978             :     {
     979        1620 :         if (no_filter[idx])
     980             :         {
     981        1542 :             list_free_deep(rfnodes[idx]);
     982        1542 :             rfnodes[idx] = NIL;
     983             :         }
     984             :     }
     985             : 
     986         540 :     if (has_filter)
     987             :     {
     988          32 :         Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
     989             : 
     990          32 :         pgoutput_ensure_entry_cxt(data, entry);
     991             : 
     992             :         /*
     993             :          * Now all the filters for all pubactions are known. Combine them when
     994             :          * their pubactions are the same.
     995             :          */
     996          32 :         oldctx = MemoryContextSwitchTo(entry->entry_cxt);
     997          32 :         entry->estate = create_estate_for_relation(relation);
     998         128 :         for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
     999             :         {
    1000          96 :             List       *filters = NIL;
    1001             :             Expr       *rfnode;
    1002             : 
    1003          96 :             if (rfnodes[idx] == NIL)
    1004          42 :                 continue;
    1005             : 
    1006         114 :             foreach(lc, rfnodes[idx])
    1007          60 :                 filters = lappend(filters, stringToNode((char *) lfirst(lc)));
    1008             : 
    1009             :             /* combine the row filter and cache the ExprState */
    1010          54 :             rfnode = make_orclause(filters);
    1011          54 :             entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
    1012             :         }                       /* for each pubaction */
    1013          32 :         MemoryContextSwitchTo(oldctx);
    1014             : 
    1015          32 :         RelationClose(relation);
    1016             :     }
    1017         540 : }
    1018             : 
    1019             : /*
    1020             :  * If the table contains a generated column, check for any conflicting
    1021             :  * values of 'publish_generated_columns' parameter in the publications.
    1022             :  */
    1023             : static void
    1024         540 : check_and_init_gencol(PGOutputData *data, List *publications,
    1025             :                       RelationSyncEntry *entry)
    1026             : {
    1027         540 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1028         540 :     TupleDesc   desc = RelationGetDescr(relation);
    1029         540 :     bool        gencolpresent = false;
    1030         540 :     bool        first = true;
    1031             : 
    1032             :     /* Check if there is any generated column present. */
    1033        1622 :     for (int i = 0; i < desc->natts; i++)
    1034             :     {
    1035        1094 :         Form_pg_attribute att = TupleDescAttr(desc, i);
    1036             : 
    1037        1094 :         if (att->attgenerated)
    1038             :         {
    1039          12 :             gencolpresent = true;
    1040          12 :             break;
    1041             :         }
    1042             :     }
    1043             : 
    1044             :     /* There are no generated columns to be published. */
    1045         540 :     if (!gencolpresent)
    1046             :     {
    1047         528 :         entry->include_gencols = false;
    1048         528 :         return;
    1049             :     }
    1050             : 
    1051             :     /*
    1052             :      * There may be a conflicting value for 'publish_generated_columns'
    1053             :      * parameter in the publications.
    1054             :      */
    1055          38 :     foreach_ptr(Publication, pub, publications)
    1056             :     {
    1057             :         /*
    1058             :          * The column list takes precedence over the
    1059             :          * 'publish_generated_columns' parameter. Those will be checked later,
    1060             :          * see pgoutput_column_list_init.
    1061             :          */
    1062          14 :         if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
    1063           6 :             continue;
    1064             : 
    1065           8 :         if (first)
    1066             :         {
    1067           8 :             entry->include_gencols = pub->pubgencols;
    1068           8 :             first = false;
    1069             :         }
    1070           0 :         else if (entry->include_gencols != pub->pubgencols)
    1071           0 :             ereport(ERROR,
    1072             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1073             :                     errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
    1074             :                            get_namespace_name(RelationGetNamespace(relation)),
    1075             :                            RelationGetRelationName(relation)));
    1076             :     }
    1077             : }
    1078             : 
    1079             : /*
    1080             :  * Initialize the column list.
    1081             :  */
    1082             : static void
    1083         540 : pgoutput_column_list_init(PGOutputData *data, List *publications,
    1084             :                           RelationSyncEntry *entry)
    1085             : {
    1086             :     ListCell   *lc;
    1087         540 :     bool        first = true;
    1088         540 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1089         540 :     bool        found_pub_collist = false;
    1090         540 :     Bitmapset  *relcols = NULL;
    1091             : 
    1092         540 :     pgoutput_ensure_entry_cxt(data, entry);
    1093             : 
    1094             :     /*
    1095             :      * Find if there are any column lists for this relation. If there are,
    1096             :      * build a bitmap using the column lists.
    1097             :      *
    1098             :      * Multiple publications might have multiple column lists for this
    1099             :      * relation.
    1100             :      *
    1101             :      * Note that we don't support the case where the column list is different
    1102             :      * for the same table when combining publications. See comments atop
    1103             :      * fetch_table_list. But one can later change the publication so we still
    1104             :      * need to check all the given publication-table mappings and report an
    1105             :      * error if any publications have a different column list.
    1106             :      */
    1107        1100 :     foreach(lc, publications)
    1108             :     {
    1109         562 :         Publication *pub = lfirst(lc);
    1110         562 :         Bitmapset  *cols = NULL;
    1111             : 
    1112             :         /* Retrieve the bitmap of columns for a column list publication. */
    1113         562 :         found_pub_collist |= check_and_fetch_column_list(pub,
    1114             :                                                          entry->publish_as_relid,
    1115             :                                                          entry->entry_cxt, &cols);
    1116             : 
    1117             :         /*
    1118             :          * For non-column list publications — e.g. TABLE (without a column
    1119             :          * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
    1120             :          * of the table (including generated columns when
    1121             :          * 'publish_generated_columns' parameter is true).
    1122             :          */
    1123         562 :         if (!cols)
    1124             :         {
    1125             :             /*
    1126             :              * Cache the table columns for the first publication with no
    1127             :              * specified column list to detect publication with a different
    1128             :              * column list.
    1129             :              */
    1130         484 :             if (!relcols && (list_length(publications) > 1))
    1131             :             {
    1132          18 :                 MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
    1133             : 
    1134          18 :                 relcols = pub_form_cols_map(relation, entry->include_gencols);
    1135          18 :                 MemoryContextSwitchTo(oldcxt);
    1136             :             }
    1137             : 
    1138         484 :             cols = relcols;
    1139             :         }
    1140             : 
    1141         562 :         if (first)
    1142             :         {
    1143         540 :             entry->columns = cols;
    1144         540 :             first = false;
    1145             :         }
    1146          22 :         else if (!bms_equal(entry->columns, cols))
    1147           2 :             ereport(ERROR,
    1148             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1149             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    1150             :                            get_namespace_name(RelationGetNamespace(relation)),
    1151             :                            RelationGetRelationName(relation)));
    1152             :     }                           /* loop all subscribed publications */
    1153             : 
    1154             :     /*
    1155             :      * If no column list publications exist, columns to be published will be
    1156             :      * computed later according to the 'publish_generated_columns' parameter.
    1157             :      */
    1158         538 :     if (!found_pub_collist)
    1159         466 :         entry->columns = NULL;
    1160             : 
    1161         538 :     RelationClose(relation);
    1162         538 : }
    1163             : 
    1164             : /*
    1165             :  * Initialize the slot for storing new and old tuples, and build the map that
    1166             :  * will be used to convert the relation's tuples into the ancestor's format.
    1167             :  */
    1168             : static void
    1169         540 : init_tuple_slot(PGOutputData *data, Relation relation,
    1170             :                 RelationSyncEntry *entry)
    1171             : {
    1172             :     MemoryContext oldctx;
    1173             :     TupleDesc   oldtupdesc;
    1174             :     TupleDesc   newtupdesc;
    1175             : 
    1176         540 :     oldctx = MemoryContextSwitchTo(data->cachectx);
    1177             : 
    1178             :     /*
    1179             :      * Create tuple table slots. Create a copy of the TupleDesc as it needs to
    1180             :      * live as long as the cache remains.
    1181             :      */
    1182         540 :     oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1183         540 :     newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1184             : 
    1185         540 :     entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
    1186         540 :     entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
    1187             : 
    1188         540 :     MemoryContextSwitchTo(oldctx);
    1189             : 
    1190             :     /*
    1191             :      * Cache the map that will be used to convert the relation's tuples into
    1192             :      * the ancestor's format, if needed.
    1193             :      */
    1194         540 :     if (entry->publish_as_relid != RelationGetRelid(relation))
    1195             :     {
    1196          72 :         Relation    ancestor = RelationIdGetRelation(entry->publish_as_relid);
    1197          72 :         TupleDesc   indesc = RelationGetDescr(relation);
    1198          72 :         TupleDesc   outdesc = RelationGetDescr(ancestor);
    1199             : 
    1200             :         /* Map must live as long as the logical decoding context. */
    1201          72 :         oldctx = MemoryContextSwitchTo(data->cachectx);
    1202             : 
    1203          72 :         entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
    1204             : 
    1205          72 :         MemoryContextSwitchTo(oldctx);
    1206          72 :         RelationClose(ancestor);
    1207             :     }
    1208         540 : }
    1209             : 
    1210             : /*
    1211             :  * Change is checked against the row filter if any.
    1212             :  *
    1213             :  * Returns true if the change is to be replicated, else false.
    1214             :  *
    1215             :  * For inserts, evaluate the row filter for new tuple.
    1216             :  * For deletes, evaluate the row filter for old tuple.
    1217             :  * For updates, evaluate the row filter for old and new tuple.
    1218             :  *
    1219             :  * For updates, if both evaluations are true, we allow sending the UPDATE and
    1220             :  * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
    1221             :  * only one of the tuples matches the row filter expression, we transform
    1222             :  * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
    1223             :  * following rules:
    1224             :  *
    1225             :  * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
    1226             :  * Case 2: old-row (no match)    new row (match)     -> INSERT
    1227             :  * Case 3: old-row (match)       new-row (no match)  -> DELETE
    1228             :  * Case 4: old-row (match)       new row (match)     -> UPDATE
    1229             :  *
    1230             :  * The new action is updated in the action parameter.
    1231             :  *
    1232             :  * The new slot could be updated when transforming the UPDATE into INSERT,
    1233             :  * because the original new tuple might not have column values from the replica
    1234             :  * identity.
    1235             :  *
    1236             :  * Examples:
    1237             :  * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
    1238             :  * Since the old tuple satisfies, the initial table synchronization copied this
    1239             :  * row (or another method was used to guarantee that there is data
    1240             :  * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
    1241             :  * row filter, so from a data consistency perspective, that row should be
    1242             :  * removed on the subscriber. The UPDATE should be transformed into a DELETE
    1243             :  * statement and be sent to the subscriber. Keeping this row on the subscriber
    1244             :  * is undesirable because it doesn't reflect what was defined in the row filter
    1245             :  * expression on the publisher. This row on the subscriber would likely not be
    1246             :  * modified by replication again. If someone inserted a new row with the same
    1247             :  * old identifier, replication could stop due to a constraint violation.
    1248             :  *
    1249             :  * Let's say the old tuple doesn't match the row filter but the new tuple does.
    1250             :  * Since the old tuple doesn't satisfy, the initial table synchronization
    1251             :  * probably didn't copy this row. However, after the UPDATE the new tuple does
    1252             :  * satisfy the row filter, so from a data consistency perspective, that row
    1253             :  * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
    1254             :  * statements have no effect (it matches no row -- see
    1255             :  * apply_handle_update_internal()). So, the UPDATE should be transformed into a
    1256             :  * INSERT statement and be sent to the subscriber. However, this might surprise
    1257             :  * someone who expects the data set to satisfy the row filter expression on the
    1258             :  * provider.
    1259             :  */
    1260             : static bool
    1261      364442 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
    1262             :                     TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
    1263             :                     ReorderBufferChangeType *action)
    1264             : {
    1265             :     TupleDesc   desc;
    1266             :     int         i;
    1267             :     bool        old_matched,
    1268             :                 new_matched,
    1269             :                 result;
    1270             :     TupleTableSlot *tmp_new_slot;
    1271      364442 :     TupleTableSlot *new_slot = *new_slot_ptr;
    1272             :     ExprContext *ecxt;
    1273             :     ExprState  *filter_exprstate;
    1274             : 
    1275             :     /*
    1276             :      * We need this map to avoid relying on ReorderBufferChangeType enums
    1277             :      * having specific values.
    1278             :      */
    1279             :     static const int map_changetype_pubaction[] = {
    1280             :         [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
    1281             :         [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
    1282             :         [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
    1283             :     };
    1284             : 
    1285             :     Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
    1286             :            *action == REORDER_BUFFER_CHANGE_UPDATE ||
    1287             :            *action == REORDER_BUFFER_CHANGE_DELETE);
    1288             : 
    1289             :     Assert(new_slot || old_slot);
    1290             : 
    1291             :     /* Get the corresponding row filter */
    1292      364442 :     filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
    1293             : 
    1294             :     /* Bail out if there is no row filter */
    1295      364442 :     if (!filter_exprstate)
    1296      364378 :         return true;
    1297             : 
    1298          64 :     elog(DEBUG3, "table \"%s.%s\" has row filter",
    1299             :          get_namespace_name(RelationGetNamespace(relation)),
    1300             :          RelationGetRelationName(relation));
    1301             : 
    1302          64 :     ResetPerTupleExprContext(entry->estate);
    1303             : 
    1304          64 :     ecxt = GetPerTupleExprContext(entry->estate);
    1305             : 
    1306             :     /*
    1307             :      * For the following occasions where there is only one tuple, we can
    1308             :      * evaluate the row filter for that tuple and return.
    1309             :      *
    1310             :      * For inserts, we only have the new tuple.
    1311             :      *
    1312             :      * For updates, we can have only a new tuple when none of the replica
    1313             :      * identity columns changed and none of those columns have external data
    1314             :      * but we still need to evaluate the row filter for the new tuple as the
    1315             :      * existing values of those columns might not match the filter. Also,
    1316             :      * users can use constant expressions in the row filter, so we anyway need
    1317             :      * to evaluate it for the new tuple.
    1318             :      *
    1319             :      * For deletes, we only have the old tuple.
    1320             :      */
    1321          64 :     if (!new_slot || !old_slot)
    1322             :     {
    1323          56 :         ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
    1324          56 :         result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1325             : 
    1326          56 :         return result;
    1327             :     }
    1328             : 
    1329             :     /*
    1330             :      * Both the old and new tuples must be valid only for updates and need to
    1331             :      * be checked against the row filter.
    1332             :      */
    1333             :     Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
    1334             : 
    1335           8 :     slot_getallattrs(new_slot);
    1336           8 :     slot_getallattrs(old_slot);
    1337             : 
    1338           8 :     tmp_new_slot = NULL;
    1339           8 :     desc = RelationGetDescr(relation);
    1340             : 
    1341             :     /*
    1342             :      * The new tuple might not have all the replica identity columns, in which
    1343             :      * case it needs to be copied over from the old tuple.
    1344             :      */
    1345          24 :     for (i = 0; i < desc->natts; i++)
    1346             :     {
    1347          16 :         CompactAttribute *att = TupleDescCompactAttr(desc, i);
    1348             : 
    1349             :         /*
    1350             :          * if the column in the new tuple or old tuple is null, nothing to do
    1351             :          */
    1352          16 :         if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
    1353           2 :             continue;
    1354             : 
    1355             :         /*
    1356             :          * Unchanged toasted replica identity columns are only logged in the
    1357             :          * old tuple. Copy this over to the new tuple. The changed (or WAL
    1358             :          * Logged) toast values are always assembled in memory and set as
    1359             :          * VARTAG_INDIRECT. See ReorderBufferToastReplace.
    1360             :          */
    1361          14 :         if (att->attlen == -1 &&
    1362           8 :             VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
    1363           2 :             !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
    1364             :         {
    1365           2 :             if (!tmp_new_slot)
    1366             :             {
    1367           2 :                 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
    1368           2 :                 ExecClearTuple(tmp_new_slot);
    1369             : 
    1370           2 :                 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
    1371           2 :                        desc->natts * sizeof(Datum));
    1372           2 :                 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
    1373           2 :                        desc->natts * sizeof(bool));
    1374             :             }
    1375             : 
    1376           2 :             tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
    1377           2 :             tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
    1378             :         }
    1379             :     }
    1380             : 
    1381           8 :     ecxt->ecxt_scantuple = old_slot;
    1382           8 :     old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1383             : 
    1384           8 :     if (tmp_new_slot)
    1385             :     {
    1386           2 :         ExecStoreVirtualTuple(tmp_new_slot);
    1387           2 :         ecxt->ecxt_scantuple = tmp_new_slot;
    1388             :     }
    1389             :     else
    1390           6 :         ecxt->ecxt_scantuple = new_slot;
    1391             : 
    1392           8 :     new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1393             : 
    1394             :     /*
    1395             :      * Case 1: if both tuples don't match the row filter, bailout. Send
    1396             :      * nothing.
    1397             :      */
    1398           8 :     if (!old_matched && !new_matched)
    1399           0 :         return false;
    1400             : 
    1401             :     /*
    1402             :      * Case 2: if the old tuple doesn't satisfy the row filter but the new
    1403             :      * tuple does, transform the UPDATE into INSERT.
    1404             :      *
    1405             :      * Use the newly transformed tuple that must contain the column values for
    1406             :      * all the replica identity columns. This is required to ensure that the
    1407             :      * while inserting the tuple in the downstream node, we have all the
    1408             :      * required column values.
    1409             :      */
    1410           8 :     if (!old_matched && new_matched)
    1411             :     {
    1412           4 :         *action = REORDER_BUFFER_CHANGE_INSERT;
    1413             : 
    1414           4 :         if (tmp_new_slot)
    1415           2 :             *new_slot_ptr = tmp_new_slot;
    1416             :     }
    1417             : 
    1418             :     /*
    1419             :      * Case 3: if the old tuple satisfies the row filter but the new tuple
    1420             :      * doesn't, transform the UPDATE into DELETE.
    1421             :      *
    1422             :      * This transformation does not require another tuple. The Old tuple will
    1423             :      * be used for DELETE.
    1424             :      */
    1425           4 :     else if (old_matched && !new_matched)
    1426           2 :         *action = REORDER_BUFFER_CHANGE_DELETE;
    1427             : 
    1428             :     /*
    1429             :      * Case 4: if both tuples match the row filter, transformation isn't
    1430             :      * required. (*action is default UPDATE).
    1431             :      */
    1432             : 
    1433           8 :     return true;
    1434             : }
    1435             : 
    1436             : /*
    1437             :  * Sends the decoded DML over wire.
    1438             :  *
    1439             :  * This is called both in streaming and non-streaming modes.
    1440             :  */
    1441             : static void
    1442      366724 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1443             :                 Relation relation, ReorderBufferChange *change)
    1444             : {
    1445      366724 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1446      366724 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1447             :     MemoryContext old;
    1448             :     RelationSyncEntry *relentry;
    1449      366724 :     TransactionId xid = InvalidTransactionId;
    1450      366724 :     Relation    ancestor = NULL;
    1451      366724 :     Relation    targetrel = relation;
    1452      366724 :     ReorderBufferChangeType action = change->action;
    1453      366724 :     TupleTableSlot *old_slot = NULL;
    1454      366724 :     TupleTableSlot *new_slot = NULL;
    1455             : 
    1456      366724 :     if (!is_publishable_relation(relation))
    1457        2276 :         return;
    1458             : 
    1459             :     /*
    1460             :      * Remember the xid for the change in streaming mode. We need to send xid
    1461             :      * with each change in the streaming mode so that subscriber can make
    1462             :      * their association and on aborts, it can discard the corresponding
    1463             :      * changes.
    1464             :      */
    1465      366724 :     if (data->in_streaming)
    1466      351882 :         xid = change->txn->xid;
    1467             : 
    1468      366724 :     relentry = get_rel_sync_entry(data, relation);
    1469             : 
    1470             :     /* First check the table filter */
    1471      366718 :     switch (action)
    1472             :     {
    1473      211900 :         case REORDER_BUFFER_CHANGE_INSERT:
    1474      211900 :             if (!relentry->pubactions.pubinsert)
    1475         106 :                 return;
    1476      211794 :             break;
    1477       68972 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1478       68972 :             if (!relentry->pubactions.pubupdate)
    1479          88 :                 return;
    1480       68884 :             break;
    1481       85846 :         case REORDER_BUFFER_CHANGE_DELETE:
    1482       85846 :             if (!relentry->pubactions.pubdelete)
    1483        2082 :                 return;
    1484             : 
    1485             :             /*
    1486             :              * This is only possible if deletes are allowed even when replica
    1487             :              * identity is not defined for a table. Since the DELETE action
    1488             :              * can't be published, we simply return.
    1489             :              */
    1490       83764 :             if (!change->data.tp.oldtuple)
    1491             :             {
    1492           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
    1493           0 :                 return;
    1494             :             }
    1495       83764 :             break;
    1496      364442 :         default:
    1497             :             Assert(false);
    1498             :     }
    1499             : 
    1500             :     /* Avoid leaking memory by using and resetting our own context */
    1501      364442 :     old = MemoryContextSwitchTo(data->context);
    1502             : 
    1503             :     /* Switch relation if publishing via root. */
    1504      364442 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
    1505             :     {
    1506             :         Assert(relation->rd_rel->relispartition);
    1507         138 :         ancestor = RelationIdGetRelation(relentry->publish_as_relid);
    1508         138 :         targetrel = ancestor;
    1509             :     }
    1510             : 
    1511      364442 :     if (change->data.tp.oldtuple)
    1512             :     {
    1513       84024 :         old_slot = relentry->old_slot;
    1514       84024 :         ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
    1515             : 
    1516             :         /* Convert tuple if needed. */
    1517       84024 :         if (relentry->attrmap)
    1518             :         {
    1519          10 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1520             :                                                       &TTSOpsVirtual);
    1521             : 
    1522          10 :             old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
    1523             :         }
    1524             :     }
    1525             : 
    1526      364442 :     if (change->data.tp.newtuple)
    1527             :     {
    1528      280678 :         new_slot = relentry->new_slot;
    1529      280678 :         ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
    1530             : 
    1531             :         /* Convert tuple if needed. */
    1532      280678 :         if (relentry->attrmap)
    1533             :         {
    1534          38 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1535             :                                                       &TTSOpsVirtual);
    1536             : 
    1537          38 :             new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
    1538             :         }
    1539             :     }
    1540             : 
    1541             :     /*
    1542             :      * Check row filter.
    1543             :      *
    1544             :      * Updates could be transformed to inserts or deletes based on the results
    1545             :      * of the row filter for old and new tuple.
    1546             :      */
    1547      364442 :     if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
    1548          22 :         goto cleanup;
    1549             : 
    1550             :     /*
    1551             :      * Send BEGIN if we haven't yet.
    1552             :      *
    1553             :      * We send the BEGIN message after ensuring that we will actually send the
    1554             :      * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
    1555             :      * transactions.
    1556             :      */
    1557      364420 :     if (txndata && !txndata->sent_begin_txn)
    1558         796 :         pgoutput_send_begin(ctx, txn);
    1559             : 
    1560             :     /*
    1561             :      * Schema should be sent using the original relation because it also sends
    1562             :      * the ancestor's relation.
    1563             :      */
    1564      364418 :     maybe_send_schema(ctx, change, relation, relentry);
    1565             : 
    1566      364418 :     OutputPluginPrepareWrite(ctx, true);
    1567             : 
    1568             :     /* Send the data */
    1569      364418 :     switch (action)
    1570             :     {
    1571      211774 :         case REORDER_BUFFER_CHANGE_INSERT:
    1572      211774 :             logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
    1573      211774 :                                     data->binary, relentry->columns,
    1574      211774 :                                     relentry->include_gencols);
    1575      211774 :             break;
    1576       68878 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1577       68878 :             logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
    1578       68878 :                                     new_slot, data->binary, relentry->columns,
    1579       68878 :                                     relentry->include_gencols);
    1580       68878 :             break;
    1581       83766 :         case REORDER_BUFFER_CHANGE_DELETE:
    1582       83766 :             logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
    1583       83766 :                                     data->binary, relentry->columns,
    1584       83766 :                                     relentry->include_gencols);
    1585       83766 :             break;
    1586      364418 :         default:
    1587             :             Assert(false);
    1588             :     }
    1589             : 
    1590      364418 :     OutputPluginWrite(ctx, true);
    1591             : 
    1592      364440 : cleanup:
    1593      364440 :     if (RelationIsValid(ancestor))
    1594             :     {
    1595         136 :         RelationClose(ancestor);
    1596         136 :         ancestor = NULL;
    1597             :     }
    1598             : 
    1599             :     /* Drop the new slots that were used to store the converted tuples. */
    1600      364440 :     if (relentry->attrmap)
    1601             :     {
    1602          48 :         if (old_slot)
    1603          10 :             ExecDropSingleTupleTableSlot(old_slot);
    1604             : 
    1605          48 :         if (new_slot)
    1606          38 :             ExecDropSingleTupleTableSlot(new_slot);
    1607             :     }
    1608             : 
    1609      364440 :     MemoryContextSwitchTo(old);
    1610      364440 :     MemoryContextReset(data->context);
    1611             : }
    1612             : 
    1613             : static void
    1614          28 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1615             :                   int nrelations, Relation relations[], ReorderBufferChange *change)
    1616             : {
    1617          28 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1618          28 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1619             :     MemoryContext old;
    1620             :     RelationSyncEntry *relentry;
    1621             :     int         i;
    1622             :     int         nrelids;
    1623             :     Oid        *relids;
    1624          28 :     TransactionId xid = InvalidTransactionId;
    1625             : 
    1626             :     /* Remember the xid for the change in streaming mode. See pgoutput_change. */
    1627          28 :     if (data->in_streaming)
    1628           0 :         xid = change->txn->xid;
    1629             : 
    1630          28 :     old = MemoryContextSwitchTo(data->context);
    1631             : 
    1632          28 :     relids = palloc0(nrelations * sizeof(Oid));
    1633          28 :     nrelids = 0;
    1634             : 
    1635          94 :     for (i = 0; i < nrelations; i++)
    1636             :     {
    1637          66 :         Relation    relation = relations[i];
    1638          66 :         Oid         relid = RelationGetRelid(relation);
    1639             : 
    1640          66 :         if (!is_publishable_relation(relation))
    1641           0 :             continue;
    1642             : 
    1643          66 :         relentry = get_rel_sync_entry(data, relation);
    1644             : 
    1645          66 :         if (!relentry->pubactions.pubtruncate)
    1646          28 :             continue;
    1647             : 
    1648             :         /*
    1649             :          * Don't send partitions if the publication wants to send only the
    1650             :          * root tables through it.
    1651             :          */
    1652          38 :         if (relation->rd_rel->relispartition &&
    1653          30 :             relentry->publish_as_relid != relid)
    1654           6 :             continue;
    1655             : 
    1656          32 :         relids[nrelids++] = relid;
    1657             : 
    1658             :         /* Send BEGIN if we haven't yet */
    1659          32 :         if (txndata && !txndata->sent_begin_txn)
    1660          20 :             pgoutput_send_begin(ctx, txn);
    1661             : 
    1662          32 :         maybe_send_schema(ctx, change, relation, relentry);
    1663             :     }
    1664             : 
    1665          28 :     if (nrelids > 0)
    1666             :     {
    1667          20 :         OutputPluginPrepareWrite(ctx, true);
    1668          20 :         logicalrep_write_truncate(ctx->out,
    1669             :                                   xid,
    1670             :                                   nrelids,
    1671             :                                   relids,
    1672          20 :                                   change->data.truncate.cascade,
    1673          20 :                                   change->data.truncate.restart_seqs);
    1674          20 :         OutputPluginWrite(ctx, true);
    1675             :     }
    1676             : 
    1677          28 :     MemoryContextSwitchTo(old);
    1678          28 :     MemoryContextReset(data->context);
    1679          28 : }
    1680             : 
    1681             : static void
    1682          14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1683             :                  XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
    1684             :                  const char *message)
    1685             : {
    1686          14 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1687          14 :     TransactionId xid = InvalidTransactionId;
    1688             : 
    1689          14 :     if (!data->messages)
    1690           4 :         return;
    1691             : 
    1692             :     /*
    1693             :      * Remember the xid for the message in streaming mode. See
    1694             :      * pgoutput_change.
    1695             :      */
    1696          10 :     if (data->in_streaming)
    1697           0 :         xid = txn->xid;
    1698             : 
    1699             :     /*
    1700             :      * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
    1701             :      */
    1702          10 :     if (transactional)
    1703             :     {
    1704           4 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1705             : 
    1706             :         /* Send BEGIN if we haven't yet */
    1707           4 :         if (txndata && !txndata->sent_begin_txn)
    1708           4 :             pgoutput_send_begin(ctx, txn);
    1709             :     }
    1710             : 
    1711          10 :     OutputPluginPrepareWrite(ctx, true);
    1712          10 :     logicalrep_write_message(ctx->out,
    1713             :                              xid,
    1714             :                              message_lsn,
    1715             :                              transactional,
    1716             :                              prefix,
    1717             :                              sz,
    1718             :                              message);
    1719          10 :     OutputPluginWrite(ctx, true);
    1720             : }
    1721             : 
    1722             : /*
    1723             :  * Return true if the data is associated with an origin and the user has
    1724             :  * requested the changes that don't have an origin, false otherwise.
    1725             :  */
    1726             : static bool
    1727      982186 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
    1728             :                        RepOriginId origin_id)
    1729             : {
    1730      982186 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1731             : 
    1732      982186 :     if (data->publish_no_origin && origin_id != InvalidRepOriginId)
    1733         140 :         return true;
    1734             : 
    1735      982046 :     return false;
    1736             : }
    1737             : 
    1738             : /*
    1739             :  * Shutdown the output plugin.
    1740             :  *
    1741             :  * Note, we don't need to clean the data->context, data->cachectx, and
    1742             :  * data->pubctx as they are child contexts of the ctx->context so they
    1743             :  * will be cleaned up by logical decoding machinery.
    1744             :  */
    1745             : static void
    1746         966 : pgoutput_shutdown(LogicalDecodingContext *ctx)
    1747             : {
    1748         966 :     if (RelationSyncCache)
    1749             :     {
    1750         364 :         hash_destroy(RelationSyncCache);
    1751         364 :         RelationSyncCache = NULL;
    1752             :     }
    1753         966 : }
    1754             : 
    1755             : /*
    1756             :  * Load publications from the list of publication names.
    1757             :  */
    1758             : static List *
    1759         316 : LoadPublications(List *pubnames)
    1760             : {
    1761         316 :     List       *result = NIL;
    1762             :     ListCell   *lc;
    1763             : 
    1764         714 :     foreach(lc, pubnames)
    1765             :     {
    1766         402 :         char       *pubname = (char *) lfirst(lc);
    1767         402 :         Publication *pub = GetPublicationByName(pubname, false);
    1768             : 
    1769         398 :         result = lappend(result, pub);
    1770             :     }
    1771             : 
    1772         312 :     return result;
    1773             : }
    1774             : 
    1775             : /*
    1776             :  * Publication syscache invalidation callback.
    1777             :  *
    1778             :  * Called for invalidations on pg_publication.
    1779             :  */
    1780             : static void
    1781         458 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
    1782             : {
    1783         458 :     publications_valid = false;
    1784             : 
    1785             :     /*
    1786             :      * Also invalidate per-relation cache so that next time the filtering info
    1787             :      * is checked it will be updated with the new publication settings.
    1788             :      */
    1789         458 :     rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
    1790         458 : }
    1791             : 
    1792             : /*
    1793             :  * START STREAM callback
    1794             :  */
    1795             : static void
    1796        1274 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
    1797             :                       ReorderBufferTXN *txn)
    1798             : {
    1799        1274 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1800        1274 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
    1801             : 
    1802             :     /* we can't nest streaming of transactions */
    1803             :     Assert(!data->in_streaming);
    1804             : 
    1805             :     /*
    1806             :      * If we already sent the first stream for this transaction then don't
    1807             :      * send the origin id in the subsequent streams.
    1808             :      */
    1809        1274 :     if (rbtxn_is_streamed(txn))
    1810        1148 :         send_replication_origin = false;
    1811             : 
    1812        1274 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
    1813        1274 :     logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
    1814             : 
    1815        1274 :     send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
    1816             :                      send_replication_origin);
    1817             : 
    1818        1274 :     OutputPluginWrite(ctx, true);
    1819             : 
    1820             :     /* we're streaming a chunk of transaction now */
    1821        1274 :     data->in_streaming = true;
    1822        1274 : }
    1823             : 
    1824             : /*
    1825             :  * STOP STREAM callback
    1826             :  */
    1827             : static void
    1828        1274 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
    1829             :                      ReorderBufferTXN *txn)
    1830             : {
    1831        1274 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1832             : 
    1833             :     /* we should be streaming a transaction */
    1834             :     Assert(data->in_streaming);
    1835             : 
    1836        1274 :     OutputPluginPrepareWrite(ctx, true);
    1837        1274 :     logicalrep_write_stream_stop(ctx->out);
    1838        1274 :     OutputPluginWrite(ctx, true);
    1839             : 
    1840             :     /* we've stopped streaming a transaction */
    1841        1274 :     data->in_streaming = false;
    1842        1274 : }
    1843             : 
    1844             : /*
    1845             :  * Notify downstream to discard the streamed transaction (along with all
    1846             :  * it's subtransactions, if it's a toplevel transaction).
    1847             :  */
    1848             : static void
    1849          52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
    1850             :                       ReorderBufferTXN *txn,
    1851             :                       XLogRecPtr abort_lsn)
    1852             : {
    1853             :     ReorderBufferTXN *toptxn;
    1854          52 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1855          52 :     bool        write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
    1856             : 
    1857             :     /*
    1858             :      * The abort should happen outside streaming block, even for streamed
    1859             :      * transactions. The transaction has to be marked as streamed, though.
    1860             :      */
    1861             :     Assert(!data->in_streaming);
    1862             : 
    1863             :     /* determine the toplevel transaction */
    1864          52 :     toptxn = rbtxn_get_toptxn(txn);
    1865             : 
    1866             :     Assert(rbtxn_is_streamed(toptxn));
    1867             : 
    1868          52 :     OutputPluginPrepareWrite(ctx, true);
    1869          52 :     logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
    1870             :                                   txn->xact_time.abort_time, write_abort_info);
    1871             : 
    1872          52 :     OutputPluginWrite(ctx, true);
    1873             : 
    1874          52 :     cleanup_rel_sync_cache(toptxn->xid, false);
    1875          52 : }
    1876             : 
    1877             : /*
    1878             :  * Notify downstream to apply the streamed transaction (along with all
    1879             :  * it's subtransactions).
    1880             :  */
    1881             : static void
    1882          92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
    1883             :                        ReorderBufferTXN *txn,
    1884             :                        XLogRecPtr commit_lsn)
    1885             : {
    1886          92 :     PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
    1887             : 
    1888             :     /*
    1889             :      * The commit should happen outside streaming block, even for streamed
    1890             :      * transactions. The transaction has to be marked as streamed, though.
    1891             :      */
    1892             :     Assert(!data->in_streaming);
    1893             :     Assert(rbtxn_is_streamed(txn));
    1894             : 
    1895          92 :     OutputPluginUpdateProgress(ctx, false);
    1896             : 
    1897          92 :     OutputPluginPrepareWrite(ctx, true);
    1898          92 :     logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
    1899          92 :     OutputPluginWrite(ctx, true);
    1900             : 
    1901          92 :     cleanup_rel_sync_cache(txn->xid, true);
    1902          92 : }
    1903             : 
    1904             : /*
    1905             :  * PREPARE callback (for streaming two-phase commit).
    1906             :  *
    1907             :  * Notify the downstream to prepare the transaction.
    1908             :  */
    1909             : static void
    1910          28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
    1911             :                             ReorderBufferTXN *txn,
    1912             :                             XLogRecPtr prepare_lsn)
    1913             : {
    1914             :     Assert(rbtxn_is_streamed(txn));
    1915             : 
    1916          28 :     OutputPluginUpdateProgress(ctx, false);
    1917          28 :     OutputPluginPrepareWrite(ctx, true);
    1918          28 :     logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
    1919          28 :     OutputPluginWrite(ctx, true);
    1920          28 : }
    1921             : 
    1922             : /*
    1923             :  * Initialize the relation schema sync cache for a decoding session.
    1924             :  *
    1925             :  * The hash table is destroyed at the end of a decoding session. While
    1926             :  * relcache invalidations still exist and will still be invoked, they
    1927             :  * will just see the null hash table global and take no action.
    1928             :  */
    1929             : static void
    1930         702 : init_rel_sync_cache(MemoryContext cachectx)
    1931             : {
    1932             :     HASHCTL     ctl;
    1933             :     static bool relation_callbacks_registered = false;
    1934             : 
    1935             :     /* Nothing to do if hash table already exists */
    1936         702 :     if (RelationSyncCache != NULL)
    1937           2 :         return;
    1938             : 
    1939             :     /* Make a new hash table for the cache */
    1940         702 :     ctl.keysize = sizeof(Oid);
    1941         702 :     ctl.entrysize = sizeof(RelationSyncEntry);
    1942         702 :     ctl.hcxt = cachectx;
    1943             : 
    1944         702 :     RelationSyncCache = hash_create("logical replication output relation cache",
    1945             :                                     128, &ctl,
    1946             :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
    1947             : 
    1948             :     Assert(RelationSyncCache != NULL);
    1949             : 
    1950             :     /* No more to do if we already registered callbacks */
    1951         702 :     if (relation_callbacks_registered)
    1952           2 :         return;
    1953             : 
    1954             :     /* We must update the cache entry for a relation after a relcache flush */
    1955         700 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
    1956             : 
    1957             :     /*
    1958             :      * Flush all cache entries after a pg_namespace change, in case it was a
    1959             :      * schema rename affecting a relation being replicated.
    1960             :      */
    1961         700 :     CacheRegisterSyscacheCallback(NAMESPACEOID,
    1962             :                                   rel_sync_cache_publication_cb,
    1963             :                                   (Datum) 0);
    1964             : 
    1965             :     /*
    1966             :      * Flush all cache entries after any publication changes.  (We need no
    1967             :      * callback entry for pg_publication, because publication_invalidation_cb
    1968             :      * will take care of it.)
    1969             :      */
    1970         700 :     CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
    1971             :                                   rel_sync_cache_publication_cb,
    1972             :                                   (Datum) 0);
    1973         700 :     CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
    1974             :                                   rel_sync_cache_publication_cb,
    1975             :                                   (Datum) 0);
    1976             : 
    1977         700 :     relation_callbacks_registered = true;
    1978             : }
    1979             : 
    1980             : /*
    1981             :  * We expect relatively small number of streamed transactions.
    1982             :  */
    1983             : static bool
    1984      351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    1985             : {
    1986      351882 :     return list_member_xid(entry->streamed_txns, xid);
    1987             : }
    1988             : 
    1989             : /*
    1990             :  * Add the xid in the rel sync entry for which we have already sent the schema
    1991             :  * of the relation.
    1992             :  */
    1993             : static void
    1994         142 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    1995             : {
    1996             :     MemoryContext oldctx;
    1997             : 
    1998         142 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1999             : 
    2000         142 :     entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
    2001             : 
    2002         142 :     MemoryContextSwitchTo(oldctx);
    2003         142 : }
    2004             : 
    2005             : /*
    2006             :  * Find or create entry in the relation schema cache.
    2007             :  *
    2008             :  * This looks up publications that the given relation is directly or
    2009             :  * indirectly part of (the latter if it's really the relation's ancestor that
    2010             :  * is part of a publication) and fills up the found entry with the information
    2011             :  * about which operations to publish and whether to use an ancestor's schema
    2012             :  * when publishing.
    2013             :  */
    2014             : static RelationSyncEntry *
    2015      366790 : get_rel_sync_entry(PGOutputData *data, Relation relation)
    2016             : {
    2017             :     RelationSyncEntry *entry;
    2018             :     bool        found;
    2019             :     MemoryContext oldctx;
    2020      366790 :     Oid         relid = RelationGetRelid(relation);
    2021             : 
    2022             :     Assert(RelationSyncCache != NULL);
    2023             : 
    2024             :     /* Find cached relation info, creating if not found */
    2025      366790 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
    2026             :                                               &relid,
    2027             :                                               HASH_ENTER, &found);
    2028             :     Assert(entry != NULL);
    2029             : 
    2030             :     /* initialize entry, if it's new */
    2031      366790 :     if (!found)
    2032             :     {
    2033         508 :         entry->replicate_valid = false;
    2034         508 :         entry->schema_sent = false;
    2035         508 :         entry->include_gencols = false;
    2036         508 :         entry->streamed_txns = NIL;
    2037         508 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
    2038         508 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
    2039         508 :         entry->new_slot = NULL;
    2040         508 :         entry->old_slot = NULL;
    2041         508 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2042         508 :         entry->entry_cxt = NULL;
    2043         508 :         entry->publish_as_relid = InvalidOid;
    2044         508 :         entry->columns = NULL;
    2045         508 :         entry->attrmap = NULL;
    2046             :     }
    2047             : 
    2048             :     /* Validate the entry */
    2049      366790 :     if (!entry->replicate_valid)
    2050             :     {
    2051         640 :         Oid         schemaId = get_rel_namespace(relid);
    2052         640 :         List       *pubids = GetRelationPublications(relid);
    2053             : 
    2054             :         /*
    2055             :          * We don't acquire a lock on the namespace system table as we build
    2056             :          * the cache entry using a historic snapshot and all the later changes
    2057             :          * are absorbed while decoding WAL.
    2058             :          */
    2059         640 :         List       *schemaPubids = GetSchemaPublications(schemaId);
    2060             :         ListCell   *lc;
    2061         640 :         Oid         publish_as_relid = relid;
    2062         640 :         int         publish_ancestor_level = 0;
    2063         640 :         bool        am_partition = get_rel_relispartition(relid);
    2064         640 :         char        relkind = get_rel_relkind(relid);
    2065         640 :         List       *rel_publications = NIL;
    2066             : 
    2067             :         /* Reload publications if needed before use. */
    2068         640 :         if (!publications_valid)
    2069             :         {
    2070         316 :             MemoryContextReset(data->pubctx);
    2071             : 
    2072         316 :             oldctx = MemoryContextSwitchTo(data->pubctx);
    2073         316 :             data->publications = LoadPublications(data->publication_names);
    2074         312 :             MemoryContextSwitchTo(oldctx);
    2075         312 :             publications_valid = true;
    2076             :         }
    2077             : 
    2078             :         /*
    2079             :          * Reset schema_sent status as the relation definition may have
    2080             :          * changed.  Also reset pubactions to empty in case rel was dropped
    2081             :          * from a publication.  Also free any objects that depended on the
    2082             :          * earlier definition.
    2083             :          */
    2084         636 :         entry->schema_sent = false;
    2085         636 :         entry->include_gencols = false;
    2086         636 :         list_free(entry->streamed_txns);
    2087         636 :         entry->streamed_txns = NIL;
    2088         636 :         bms_free(entry->columns);
    2089         636 :         entry->columns = NULL;
    2090         636 :         entry->pubactions.pubinsert = false;
    2091         636 :         entry->pubactions.pubupdate = false;
    2092         636 :         entry->pubactions.pubdelete = false;
    2093         636 :         entry->pubactions.pubtruncate = false;
    2094             : 
    2095             :         /*
    2096             :          * Tuple slots cleanups. (Will be rebuilt later if needed).
    2097             :          */
    2098         636 :         if (entry->old_slot)
    2099             :         {
    2100         112 :             TupleDesc   desc = entry->old_slot->tts_tupleDescriptor;
    2101             : 
    2102             :             Assert(desc->tdrefcount == -1);
    2103             : 
    2104         112 :             ExecDropSingleTupleTableSlot(entry->old_slot);
    2105             : 
    2106             :             /*
    2107             :              * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
    2108             :              * do it now to avoid any leaks.
    2109             :              */
    2110         112 :             FreeTupleDesc(desc);
    2111             :         }
    2112         636 :         if (entry->new_slot)
    2113             :         {
    2114         112 :             TupleDesc   desc = entry->new_slot->tts_tupleDescriptor;
    2115             : 
    2116             :             Assert(desc->tdrefcount == -1);
    2117             : 
    2118         112 :             ExecDropSingleTupleTableSlot(entry->new_slot);
    2119             : 
    2120             :             /*
    2121             :              * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
    2122             :              * do it now to avoid any leaks.
    2123             :              */
    2124         112 :             FreeTupleDesc(desc);
    2125             :         }
    2126             : 
    2127         636 :         entry->old_slot = NULL;
    2128         636 :         entry->new_slot = NULL;
    2129             : 
    2130         636 :         if (entry->attrmap)
    2131           6 :             free_attrmap(entry->attrmap);
    2132         636 :         entry->attrmap = NULL;
    2133             : 
    2134             :         /*
    2135             :          * Row filter cache cleanups.
    2136             :          */
    2137         636 :         if (entry->entry_cxt)
    2138         112 :             MemoryContextDelete(entry->entry_cxt);
    2139             : 
    2140         636 :         entry->entry_cxt = NULL;
    2141         636 :         entry->estate = NULL;
    2142         636 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2143             : 
    2144             :         /*
    2145             :          * Build publication cache. We can't use one provided by relcache as
    2146             :          * relcache considers all publications that the given relation is in,
    2147             :          * but here we only need to consider ones that the subscriber
    2148             :          * requested.
    2149             :          */
    2150        1568 :         foreach(lc, data->publications)
    2151             :         {
    2152         932 :             Publication *pub = lfirst(lc);
    2153         932 :             bool        publish = false;
    2154             : 
    2155             :             /*
    2156             :              * Under what relid should we publish changes in this publication?
    2157             :              * We'll use the top-most relid across all publications. Also
    2158             :              * track the ancestor level for this publication.
    2159             :              */
    2160         932 :             Oid         pub_relid = relid;
    2161         932 :             int         ancestor_level = 0;
    2162             : 
    2163             :             /*
    2164             :              * If this is a FOR ALL TABLES publication, pick the partition
    2165             :              * root and set the ancestor level accordingly.
    2166             :              */
    2167         932 :             if (pub->alltables)
    2168             :             {
    2169         164 :                 publish = true;
    2170         164 :                 if (pub->pubviaroot && am_partition)
    2171             :                 {
    2172          24 :                     List       *ancestors = get_partition_ancestors(relid);
    2173             : 
    2174          24 :                     pub_relid = llast_oid(ancestors);
    2175          24 :                     ancestor_level = list_length(ancestors);
    2176             :                 }
    2177             :             }
    2178             : 
    2179         932 :             if (!publish)
    2180             :             {
    2181         768 :                 bool        ancestor_published = false;
    2182             : 
    2183             :                 /*
    2184             :                  * For a partition, check if any of the ancestors are
    2185             :                  * published.  If so, note down the topmost ancestor that is
    2186             :                  * published via this publication, which will be used as the
    2187             :                  * relation via which to publish the partition's changes.
    2188             :                  */
    2189         768 :                 if (am_partition)
    2190             :                 {
    2191             :                     Oid         ancestor;
    2192             :                     int         level;
    2193         236 :                     List       *ancestors = get_partition_ancestors(relid);
    2194             : 
    2195         236 :                     ancestor = GetTopMostAncestorInPublication(pub->oid,
    2196             :                                                                ancestors,
    2197             :                                                                &level);
    2198             : 
    2199         236 :                     if (ancestor != InvalidOid)
    2200             :                     {
    2201          96 :                         ancestor_published = true;
    2202          96 :                         if (pub->pubviaroot)
    2203             :                         {
    2204          50 :                             pub_relid = ancestor;
    2205          50 :                             ancestor_level = level;
    2206             :                         }
    2207             :                     }
    2208             :                 }
    2209             : 
    2210        1194 :                 if (list_member_oid(pubids, pub->oid) ||
    2211         840 :                     list_member_oid(schemaPubids, pub->oid) ||
    2212             :                     ancestor_published)
    2213         410 :                     publish = true;
    2214             :             }
    2215             : 
    2216             :             /*
    2217             :              * If the relation is to be published, determine actions to
    2218             :              * publish, and list of columns, if appropriate.
    2219             :              *
    2220             :              * Don't publish changes for partitioned tables, because
    2221             :              * publishing those of its partitions suffices, unless partition
    2222             :              * changes won't be published due to pubviaroot being set.
    2223             :              */
    2224         932 :             if (publish &&
    2225           8 :                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
    2226             :             {
    2227         568 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
    2228         568 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
    2229         568 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
    2230         568 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
    2231             : 
    2232             :                 /*
    2233             :                  * We want to publish the changes as the top-most ancestor
    2234             :                  * across all publications. So we need to check if the already
    2235             :                  * calculated level is higher than the new one. If yes, we can
    2236             :                  * ignore the new value (as it's a child). Otherwise the new
    2237             :                  * value is an ancestor, so we keep it.
    2238             :                  */
    2239         568 :                 if (publish_ancestor_level > ancestor_level)
    2240           2 :                     continue;
    2241             : 
    2242             :                 /*
    2243             :                  * If we found an ancestor higher up in the tree, discard the
    2244             :                  * list of publications through which we replicate it, and use
    2245             :                  * the new ancestor.
    2246             :                  */
    2247         566 :                 if (publish_ancestor_level < ancestor_level)
    2248             :                 {
    2249          74 :                     publish_as_relid = pub_relid;
    2250          74 :                     publish_ancestor_level = ancestor_level;
    2251             : 
    2252             :                     /* reset the publication list for this relation */
    2253          74 :                     rel_publications = NIL;
    2254             :                 }
    2255             :                 else
    2256             :                 {
    2257             :                     /* Same ancestor level, has to be the same OID. */
    2258             :                     Assert(publish_as_relid == pub_relid);
    2259             :                 }
    2260             : 
    2261             :                 /* Track publications for this ancestor. */
    2262         566 :                 rel_publications = lappend(rel_publications, pub);
    2263             :             }
    2264             :         }
    2265             : 
    2266         636 :         entry->publish_as_relid = publish_as_relid;
    2267             : 
    2268             :         /*
    2269             :          * Initialize the tuple slot, map, and row filter. These are only used
    2270             :          * when publishing inserts, updates, or deletes.
    2271             :          */
    2272         636 :         if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
    2273          96 :             entry->pubactions.pubdelete)
    2274             :         {
    2275             :             /* Initialize the tuple slot and map */
    2276         540 :             init_tuple_slot(data, relation, entry);
    2277             : 
    2278             :             /* Initialize the row filter */
    2279         540 :             pgoutput_row_filter_init(data, rel_publications, entry);
    2280             : 
    2281             :             /* Check whether to publish generated columns. */
    2282         540 :             check_and_init_gencol(data, rel_publications, entry);
    2283             : 
    2284             :             /* Initialize the column list */
    2285         540 :             pgoutput_column_list_init(data, rel_publications, entry);
    2286             :         }
    2287             : 
    2288         634 :         list_free(pubids);
    2289         634 :         list_free(schemaPubids);
    2290         634 :         list_free(rel_publications);
    2291             : 
    2292         634 :         entry->replicate_valid = true;
    2293             :     }
    2294             : 
    2295      366784 :     return entry;
    2296             : }
    2297             : 
    2298             : /*
    2299             :  * Cleanup list of streamed transactions and update the schema_sent flag.
    2300             :  *
    2301             :  * When a streamed transaction commits or aborts, we need to remove the
    2302             :  * toplevel XID from the schema cache. If the transaction aborted, the
    2303             :  * subscriber will simply throw away the schema records we streamed, so
    2304             :  * we don't need to do anything else.
    2305             :  *
    2306             :  * If the transaction is committed, the subscriber will update the relation
    2307             :  * cache - so tweak the schema_sent flag accordingly.
    2308             :  */
    2309             : static void
    2310         144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
    2311             : {
    2312             :     HASH_SEQ_STATUS hash_seq;
    2313             :     RelationSyncEntry *entry;
    2314             : 
    2315             :     Assert(RelationSyncCache != NULL);
    2316             : 
    2317         144 :     hash_seq_init(&hash_seq, RelationSyncCache);
    2318         294 :     while ((entry = hash_seq_search(&hash_seq)) != NULL)
    2319             :     {
    2320             :         /*
    2321             :          * We can set the schema_sent flag for an entry that has committed xid
    2322             :          * in the list as that ensures that the subscriber would have the
    2323             :          * corresponding schema and we don't need to send it unless there is
    2324             :          * any invalidation for that relation.
    2325             :          */
    2326         340 :         foreach_xid(streamed_txn, entry->streamed_txns)
    2327             :         {
    2328         148 :             if (xid == streamed_txn)
    2329             :             {
    2330         108 :                 if (is_commit)
    2331          86 :                     entry->schema_sent = true;
    2332             : 
    2333         108 :                 entry->streamed_txns =
    2334         108 :                     foreach_delete_current(entry->streamed_txns, streamed_txn);
    2335         108 :                 break;
    2336             :             }
    2337             :         }
    2338             :     }
    2339         144 : }
    2340             : 
    2341             : /*
    2342             :  * Relcache invalidation callback
    2343             :  */
    2344             : static void
    2345        7168 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
    2346             : {
    2347             :     RelationSyncEntry *entry;
    2348             : 
    2349             :     /*
    2350             :      * We can get here if the plugin was used in SQL interface as the
    2351             :      * RelationSyncCache is destroyed when the decoding finishes, but there is
    2352             :      * no way to unregister the relcache invalidation callback.
    2353             :      */
    2354        7168 :     if (RelationSyncCache == NULL)
    2355          18 :         return;
    2356             : 
    2357             :     /*
    2358             :      * Nobody keeps pointers to entries in this hash table around outside
    2359             :      * logical decoding callback calls - but invalidation events can come in
    2360             :      * *during* a callback if we do any syscache access in the callback.
    2361             :      * Because of that we must mark the cache entry as invalid but not damage
    2362             :      * any of its substructure here.  The next get_rel_sync_entry() call will
    2363             :      * rebuild it all.
    2364             :      */
    2365        7150 :     if (OidIsValid(relid))
    2366             :     {
    2367             :         /*
    2368             :          * Getting invalidations for relations that aren't in the table is
    2369             :          * entirely normal.  So we don't care if it's found or not.
    2370             :          */
    2371        7084 :         entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
    2372             :                                                   HASH_FIND, NULL);
    2373        7084 :         if (entry != NULL)
    2374        1212 :             entry->replicate_valid = false;
    2375             :     }
    2376             :     else
    2377             :     {
    2378             :         /* Whole cache must be flushed. */
    2379             :         HASH_SEQ_STATUS status;
    2380             : 
    2381          66 :         hash_seq_init(&status, RelationSyncCache);
    2382         176 :         while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2383             :         {
    2384         110 :             entry->replicate_valid = false;
    2385             :         }
    2386             :     }
    2387             : }
    2388             : 
    2389             : /*
    2390             :  * Publication relation/schema map syscache invalidation callback
    2391             :  *
    2392             :  * Called for invalidations on pg_publication, pg_publication_rel,
    2393             :  * pg_publication_namespace, and pg_namespace.
    2394             :  */
    2395             : static void
    2396        1238 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
    2397             : {
    2398             :     HASH_SEQ_STATUS status;
    2399             :     RelationSyncEntry *entry;
    2400             : 
    2401             :     /*
    2402             :      * We can get here if the plugin was used in SQL interface as the
    2403             :      * RelationSyncCache is destroyed when the decoding finishes, but there is
    2404             :      * no way to unregister the invalidation callbacks.
    2405             :      */
    2406        1238 :     if (RelationSyncCache == NULL)
    2407          68 :         return;
    2408             : 
    2409             :     /*
    2410             :      * We have no easy way to identify which cache entries this invalidation
    2411             :      * event might have affected, so just mark them all invalid.
    2412             :      */
    2413        1170 :     hash_seq_init(&status, RelationSyncCache);
    2414        3658 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2415             :     {
    2416        2488 :         entry->replicate_valid = false;
    2417             :     }
    2418             : }
    2419             : 
    2420             : /* Send Replication origin */
    2421             : static void
    2422        2134 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
    2423             :                  XLogRecPtr origin_lsn, bool send_origin)
    2424             : {
    2425        2134 :     if (send_origin)
    2426             :     {
    2427             :         char       *origin;
    2428             : 
    2429             :         /*----------
    2430             :          * XXX: which behaviour do we want here?
    2431             :          *
    2432             :          * Alternatives:
    2433             :          *  - don't send origin message if origin name not found
    2434             :          *    (that's what we do now)
    2435             :          *  - throw error - that will break replication, not good
    2436             :          *  - send some special "unknown" origin
    2437             :          *----------
    2438             :          */
    2439          18 :         if (replorigin_by_oid(origin_id, true, &origin))
    2440             :         {
    2441             :             /* Message boundary */
    2442          18 :             OutputPluginWrite(ctx, false);
    2443          18 :             OutputPluginPrepareWrite(ctx, true);
    2444             : 
    2445          18 :             logicalrep_write_origin(ctx->out, origin, origin_lsn);
    2446             :         }
    2447             :     }
    2448        2134 : }

Generated by: LCOV version 1.14