LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 739 768 96.2 %
Date: 2025-10-10 18:17:38 Functions: 42 42 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pgoutput.c
       4             :  *      Logical Replication output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/backend/replication/pgoutput/pgoutput.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/tupconvert.h"
      16             : #include "catalog/partition.h"
      17             : #include "catalog/pg_publication.h"
      18             : #include "catalog/pg_publication_rel.h"
      19             : #include "catalog/pg_subscription.h"
      20             : #include "commands/defrem.h"
      21             : #include "commands/subscriptioncmds.h"
      22             : #include "executor/executor.h"
      23             : #include "fmgr.h"
      24             : #include "nodes/makefuncs.h"
      25             : #include "parser/parse_relation.h"
      26             : #include "replication/logical.h"
      27             : #include "replication/logicalproto.h"
      28             : #include "replication/origin.h"
      29             : #include "replication/pgoutput.h"
      30             : #include "rewrite/rewriteHandler.h"
      31             : #include "utils/builtins.h"
      32             : #include "utils/inval.h"
      33             : #include "utils/lsyscache.h"
      34             : #include "utils/memutils.h"
      35             : #include "utils/rel.h"
      36             : #include "utils/syscache.h"
      37             : #include "utils/varlena.h"
      38             : 
      39        1062 : PG_MODULE_MAGIC_EXT(
      40             :                     .name = "pgoutput",
      41             :                     .version = PG_VERSION
      42             : );
      43             : 
      44             : static void pgoutput_startup(LogicalDecodingContext *ctx,
      45             :                              OutputPluginOptions *opt, bool is_init);
      46             : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      47             : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      48             :                                ReorderBufferTXN *txn);
      49             : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      50             :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      51             : static void pgoutput_change(LogicalDecodingContext *ctx,
      52             :                             ReorderBufferTXN *txn, Relation relation,
      53             :                             ReorderBufferChange *change);
      54             : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      55             :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      56             :                               ReorderBufferChange *change);
      57             : static void pgoutput_message(LogicalDecodingContext *ctx,
      58             :                              ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      59             :                              bool transactional, const char *prefix,
      60             :                              Size sz, const char *message);
      61             : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      62             :                                    RepOriginId origin_id);
      63             : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
      64             :                                        ReorderBufferTXN *txn);
      65             : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
      66             :                                  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      67             : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
      68             :                                          ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      69             : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
      70             :                                            ReorderBufferTXN *txn,
      71             :                                            XLogRecPtr prepare_end_lsn,
      72             :                                            TimestampTz prepare_time);
      73             : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
      74             :                                   ReorderBufferTXN *txn);
      75             : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
      76             :                                  ReorderBufferTXN *txn);
      77             : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
      78             :                                   ReorderBufferTXN *txn,
      79             :                                   XLogRecPtr abort_lsn);
      80             : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
      81             :                                    ReorderBufferTXN *txn,
      82             :                                    XLogRecPtr commit_lsn);
      83             : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
      84             :                                         ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      85             : 
      86             : static bool publications_valid;
      87             : 
      88             : static List *LoadPublications(List *pubnames);
      89             : static void publication_invalidation_cb(Datum arg, int cacheid,
      90             :                                         uint32 hashvalue);
      91             : static void send_repl_origin(LogicalDecodingContext *ctx,
      92             :                              RepOriginId origin_id, XLogRecPtr origin_lsn,
      93             :                              bool send_origin);
      94             : 
      95             : /*
      96             :  * Only 3 publication actions are used for row filtering ("insert", "update",
      97             :  * "delete"). See RelationSyncEntry.exprstate[].
      98             :  */
      99             : enum RowFilterPubAction
     100             : {
     101             :     PUBACTION_INSERT,
     102             :     PUBACTION_UPDATE,
     103             :     PUBACTION_DELETE,
     104             : };
     105             : 
     106             : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
     107             : 
     108             : /*
     109             :  * Entry in the map used to remember which relation schemas we sent.
     110             :  *
     111             :  * The schema_sent flag determines if the current schema record for the
     112             :  * relation (and for its ancestor if publish_as_relid is set) was already
     113             :  * sent to the subscriber (in which case we don't need to send it again).
     114             :  *
     115             :  * The schema cache on downstream is however updated only at commit time,
     116             :  * and with streamed transactions the commit order may be different from
     117             :  * the order the transactions are sent in. Also, the (sub) transactions
     118             :  * might get aborted so we need to send the schema for each (sub) transaction
     119             :  * so that we don't lose the schema information on abort. For handling this,
     120             :  * we maintain the list of xids (streamed_txns) for those we have already sent
     121             :  * the schema.
     122             :  *
     123             :  * For partitions, 'pubactions' considers not only the table's own
     124             :  * publications, but also those of all of its ancestors.
     125             :  */
     126             : typedef struct RelationSyncEntry
     127             : {
     128             :     Oid         relid;          /* relation oid */
     129             : 
     130             :     bool        replicate_valid;    /* overall validity flag for entry */
     131             : 
     132             :     bool        schema_sent;
     133             : 
     134             :     /*
     135             :      * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
     136             :      * columns and the 'publish_generated_columns' parameter is set to
     137             :      * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
     138             :      * indicating that no generated columns should be published, unless
     139             :      * explicitly specified in the column list.
     140             :      */
     141             :     PublishGencolsType include_gencols_type;
     142             :     List       *streamed_txns;  /* streamed toplevel transactions with this
     143             :                                  * schema */
     144             : 
     145             :     /* are we publishing this rel? */
     146             :     PublicationActions pubactions;
     147             : 
     148             :     /*
     149             :      * ExprState array for row filter. Different publication actions don't
     150             :      * allow multiple expressions to always be combined into one, because
     151             :      * updates or deletes restrict the column in expression to be part of the
     152             :      * replica identity index whereas inserts do not have this restriction, so
     153             :      * there is one ExprState per publication action.
     154             :      */
     155             :     ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
     156             :     EState     *estate;         /* executor state used for row filter */
     157             :     TupleTableSlot *new_slot;   /* slot for storing new tuple */
     158             :     TupleTableSlot *old_slot;   /* slot for storing old tuple */
     159             : 
     160             :     /*
     161             :      * OID of the relation to publish changes as.  For a partition, this may
     162             :      * be set to one of its ancestors whose schema will be used when
     163             :      * replicating changes, if publish_via_partition_root is set for the
     164             :      * publication.
     165             :      */
     166             :     Oid         publish_as_relid;
     167             : 
     168             :     /*
     169             :      * Map used when replicating using an ancestor's schema to convert tuples
     170             :      * from partition's type to the ancestor's; NULL if publish_as_relid is
     171             :      * same as 'relid' or if unnecessary due to partition and the ancestor
     172             :      * having identical TupleDesc.
     173             :      */
     174             :     AttrMap    *attrmap;
     175             : 
     176             :     /*
     177             :      * Columns included in the publication, or NULL if all columns are
     178             :      * included implicitly.  Note that the attnums in this bitmap are not
     179             :      * shifted by FirstLowInvalidHeapAttributeNumber.
     180             :      */
     181             :     Bitmapset  *columns;
     182             : 
     183             :     /*
     184             :      * Private context to store additional data for this entry - state for the
     185             :      * row filter expressions, column list, etc.
     186             :      */
     187             :     MemoryContext entry_cxt;
     188             : } RelationSyncEntry;
     189             : 
     190             : /*
     191             :  * Maintain a per-transaction level variable to track whether the transaction
     192             :  * has sent BEGIN. BEGIN is only sent when the first change in a transaction
     193             :  * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
     194             :  * messages for empty transactions which saves network bandwidth.
     195             :  *
     196             :  * This optimization is not used for prepared transactions because if the
     197             :  * WALSender restarts after prepare of a transaction and before commit prepared
     198             :  * of the same transaction then we won't be able to figure out if we have
     199             :  * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
     200             :  * because we would have lost the in-memory txndata information that was
     201             :  * present prior to the restart. This will result in sending a spurious
     202             :  * COMMIT PREPARED without a corresponding prepared transaction at the
     203             :  * downstream which would lead to an error when it tries to process it.
     204             :  *
     205             :  * XXX We could achieve this optimization by changing protocol to send
     206             :  * additional information so that downstream can detect that the corresponding
     207             :  * prepare has not been sent. However, adding such a check for every
     208             :  * transaction in the downstream could be costly so we might want to do it
     209             :  * optionally.
     210             :  *
     211             :  * We also don't have this optimization for streamed transactions because
     212             :  * they can contain prepared transactions.
     213             :  */
     214             : typedef struct PGOutputTxnData
     215             : {
     216             :     bool        sent_begin_txn; /* flag indicating whether BEGIN has been sent */
     217             : } PGOutputTxnData;
     218             : 
     219             : /* Map used to remember which relation schemas we sent. */
     220             : static HTAB *RelationSyncCache = NULL;
     221             : 
     222             : static void init_rel_sync_cache(MemoryContext cachectx);
     223             : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
     224             : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
     225             :                                              Relation relation);
     226             : static void send_relation_and_attrs(Relation relation, TransactionId xid,
     227             :                                     LogicalDecodingContext *ctx,
     228             :                                     RelationSyncEntry *relentry);
     229             : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
     230             : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
     231             :                                           uint32 hashvalue);
     232             : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     233             :                                             TransactionId xid);
     234             : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     235             :                                             TransactionId xid);
     236             : static void init_tuple_slot(PGOutputData *data, Relation relation,
     237             :                             RelationSyncEntry *entry);
     238             : static void pgoutput_memory_context_reset(void *arg);
     239             : 
     240             : /* row filter routines */
     241             : static EState *create_estate_for_relation(Relation rel);
     242             : static void pgoutput_row_filter_init(PGOutputData *data,
     243             :                                      List *publications,
     244             :                                      RelationSyncEntry *entry);
     245             : static bool pgoutput_row_filter_exec_expr(ExprState *state,
     246             :                                           ExprContext *econtext);
     247             : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
     248             :                                 TupleTableSlot **new_slot_ptr,
     249             :                                 RelationSyncEntry *entry,
     250             :                                 ReorderBufferChangeType *action);
     251             : 
     252             : /* column list routines */
     253             : static void pgoutput_column_list_init(PGOutputData *data,
     254             :                                       List *publications,
     255             :                                       RelationSyncEntry *entry);
     256             : 
     257             : /*
     258             :  * Specify output plugin callbacks
     259             :  */
     260             : void
     261        1440 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     262             : {
     263        1440 :     cb->startup_cb = pgoutput_startup;
     264        1440 :     cb->begin_cb = pgoutput_begin_txn;
     265        1440 :     cb->change_cb = pgoutput_change;
     266        1440 :     cb->truncate_cb = pgoutput_truncate;
     267        1440 :     cb->message_cb = pgoutput_message;
     268        1440 :     cb->commit_cb = pgoutput_commit_txn;
     269             : 
     270        1440 :     cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
     271        1440 :     cb->prepare_cb = pgoutput_prepare_txn;
     272        1440 :     cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
     273        1440 :     cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
     274        1440 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
     275        1440 :     cb->shutdown_cb = pgoutput_shutdown;
     276             : 
     277             :     /* transaction streaming */
     278        1440 :     cb->stream_start_cb = pgoutput_stream_start;
     279        1440 :     cb->stream_stop_cb = pgoutput_stream_stop;
     280        1440 :     cb->stream_abort_cb = pgoutput_stream_abort;
     281        1440 :     cb->stream_commit_cb = pgoutput_stream_commit;
     282        1440 :     cb->stream_change_cb = pgoutput_change;
     283        1440 :     cb->stream_message_cb = pgoutput_message;
     284        1440 :     cb->stream_truncate_cb = pgoutput_truncate;
     285             :     /* transaction streaming - two-phase commit */
     286        1440 :     cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
     287        1440 : }
     288             : 
     289             : static void
     290         792 : parse_output_parameters(List *options, PGOutputData *data)
     291             : {
     292             :     ListCell   *lc;
     293         792 :     bool        protocol_version_given = false;
     294         792 :     bool        publication_names_given = false;
     295         792 :     bool        binary_option_given = false;
     296         792 :     bool        messages_option_given = false;
     297         792 :     bool        streaming_given = false;
     298         792 :     bool        two_phase_option_given = false;
     299         792 :     bool        origin_option_given = false;
     300             : 
     301             :     /* Initialize optional parameters to defaults */
     302         792 :     data->binary = false;
     303         792 :     data->streaming = LOGICALREP_STREAM_OFF;
     304         792 :     data->messages = false;
     305         792 :     data->two_phase = false;
     306         792 :     data->publish_no_origin = false;
     307             : 
     308        3952 :     foreach(lc, options)
     309             :     {
     310        3160 :         DefElem    *defel = (DefElem *) lfirst(lc);
     311             : 
     312             :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     313             : 
     314             :         /* Check each param, whether or not we recognize it */
     315        3160 :         if (strcmp(defel->defname, "proto_version") == 0)
     316             :         {
     317             :             unsigned long parsed;
     318             :             char       *endptr;
     319             : 
     320         792 :             if (protocol_version_given)
     321           0 :                 ereport(ERROR,
     322             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     323             :                          errmsg("conflicting or redundant options")));
     324         792 :             protocol_version_given = true;
     325             : 
     326         792 :             errno = 0;
     327         792 :             parsed = strtoul(strVal(defel->arg), &endptr, 10);
     328         792 :             if (errno != 0 || *endptr != '\0')
     329           0 :                 ereport(ERROR,
     330             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     331             :                          errmsg("invalid proto_version")));
     332             : 
     333         792 :             if (parsed > PG_UINT32_MAX)
     334           0 :                 ereport(ERROR,
     335             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     336             :                          errmsg("proto_version \"%s\" out of range",
     337             :                                 strVal(defel->arg))));
     338             : 
     339         792 :             data->protocol_version = (uint32) parsed;
     340             :         }
     341        2368 :         else if (strcmp(defel->defname, "publication_names") == 0)
     342             :         {
     343         792 :             if (publication_names_given)
     344           0 :                 ereport(ERROR,
     345             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     346             :                          errmsg("conflicting or redundant options")));
     347         792 :             publication_names_given = true;
     348             : 
     349         792 :             if (!SplitIdentifierString(strVal(defel->arg), ',',
     350             :                                        &data->publication_names))
     351           0 :                 ereport(ERROR,
     352             :                         (errcode(ERRCODE_INVALID_NAME),
     353             :                          errmsg("invalid publication_names syntax")));
     354             :         }
     355        1576 :         else if (strcmp(defel->defname, "binary") == 0)
     356             :         {
     357          22 :             if (binary_option_given)
     358           0 :                 ereport(ERROR,
     359             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     360             :                          errmsg("conflicting or redundant options")));
     361          22 :             binary_option_given = true;
     362             : 
     363          22 :             data->binary = defGetBoolean(defel);
     364             :         }
     365        1554 :         else if (strcmp(defel->defname, "messages") == 0)
     366             :         {
     367           8 :             if (messages_option_given)
     368           0 :                 ereport(ERROR,
     369             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     370             :                          errmsg("conflicting or redundant options")));
     371           8 :             messages_option_given = true;
     372             : 
     373           8 :             data->messages = defGetBoolean(defel);
     374             :         }
     375        1546 :         else if (strcmp(defel->defname, "streaming") == 0)
     376             :         {
     377         756 :             if (streaming_given)
     378           0 :                 ereport(ERROR,
     379             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     380             :                          errmsg("conflicting or redundant options")));
     381         756 :             streaming_given = true;
     382             : 
     383         756 :             data->streaming = defGetStreamingMode(defel);
     384             :         }
     385         790 :         else if (strcmp(defel->defname, "two_phase") == 0)
     386             :         {
     387          16 :             if (two_phase_option_given)
     388           0 :                 ereport(ERROR,
     389             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     390             :                          errmsg("conflicting or redundant options")));
     391          16 :             two_phase_option_given = true;
     392             : 
     393          16 :             data->two_phase = defGetBoolean(defel);
     394             :         }
     395         774 :         else if (strcmp(defel->defname, "origin") == 0)
     396             :         {
     397             :             char       *origin;
     398             : 
     399         774 :             if (origin_option_given)
     400           0 :                 ereport(ERROR,
     401             :                         errcode(ERRCODE_SYNTAX_ERROR),
     402             :                         errmsg("conflicting or redundant options"));
     403         774 :             origin_option_given = true;
     404             : 
     405         774 :             origin = defGetString(defel);
     406         774 :             if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
     407          52 :                 data->publish_no_origin = true;
     408         722 :             else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
     409         722 :                 data->publish_no_origin = false;
     410             :             else
     411           0 :                 ereport(ERROR,
     412             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     413             :                         errmsg("unrecognized origin value: \"%s\"", origin));
     414             :         }
     415             :         else
     416           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     417             :     }
     418             : 
     419             :     /* Check required options */
     420         792 :     if (!protocol_version_given)
     421           0 :         ereport(ERROR,
     422             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     423             :                 errmsg("option \"%s\" missing", "proto_version"));
     424         792 :     if (!publication_names_given)
     425           0 :         ereport(ERROR,
     426             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     427             :                 errmsg("option \"%s\" missing", "publication_names"));
     428         792 : }
     429             : 
     430             : /*
     431             :  * Memory context reset callback of PGOutputData->context.
     432             :  */
     433             : static void
     434        2068 : pgoutput_memory_context_reset(void *arg)
     435             : {
     436        2068 :     if (RelationSyncCache)
     437             :     {
     438         386 :         hash_destroy(RelationSyncCache);
     439         386 :         RelationSyncCache = NULL;
     440             :     }
     441        2068 : }
     442             : 
     443             : /*
     444             :  * Initialize this plugin
     445             :  */
     446             : static void
     447        1440 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     448             :                  bool is_init)
     449             : {
     450        1440 :     PGOutputData *data = palloc0(sizeof(PGOutputData));
     451             :     static bool publication_callback_registered = false;
     452             :     MemoryContextCallback *mcallback;
     453             : 
     454             :     /* Create our memory context for private allocations. */
     455        1440 :     data->context = AllocSetContextCreate(ctx->context,
     456             :                                           "logical replication output context",
     457             :                                           ALLOCSET_DEFAULT_SIZES);
     458             : 
     459        1440 :     data->cachectx = AllocSetContextCreate(ctx->context,
     460             :                                            "logical replication cache context",
     461             :                                            ALLOCSET_DEFAULT_SIZES);
     462             : 
     463        1440 :     data->pubctx = AllocSetContextCreate(ctx->context,
     464             :                                          "logical replication publication list context",
     465             :                                          ALLOCSET_SMALL_SIZES);
     466             : 
     467             :     /*
     468             :      * Ensure to cleanup RelationSyncCache even when logical decoding invoked
     469             :      * via SQL interface ends up with an error.
     470             :      */
     471        1440 :     mcallback = palloc0(sizeof(MemoryContextCallback));
     472        1440 :     mcallback->func = pgoutput_memory_context_reset;
     473        1440 :     MemoryContextRegisterResetCallback(ctx->context, mcallback);
     474             : 
     475        1440 :     ctx->output_plugin_private = data;
     476             : 
     477             :     /* This plugin uses binary protocol. */
     478        1440 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     479             : 
     480             :     /*
     481             :      * This is replication start and not slot initialization.
     482             :      *
     483             :      * Parse and validate options passed by the client.
     484             :      */
     485        1440 :     if (!is_init)
     486             :     {
     487             :         /* Parse the params and ERROR if we see any we don't recognize */
     488         792 :         parse_output_parameters(ctx->output_plugin_options, data);
     489             : 
     490             :         /* Check if we support requested protocol */
     491         792 :         if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
     492           0 :             ereport(ERROR,
     493             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     494             :                      errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
     495             :                             data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
     496             : 
     497         792 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     498           0 :             ereport(ERROR,
     499             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     500             :                      errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
     501             :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     502             : 
     503             :         /*
     504             :          * Decide whether to enable streaming. It is disabled by default, in
     505             :          * which case we just update the flag in decoding context. Otherwise
     506             :          * we only allow it with sufficient version of the protocol, and when
     507             :          * the output plugin supports it.
     508             :          */
     509         792 :         if (data->streaming == LOGICALREP_STREAM_OFF)
     510          36 :             ctx->streaming = false;
     511         756 :         else if (data->streaming == LOGICALREP_STREAM_ON &&
     512          54 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
     513           0 :             ereport(ERROR,
     514             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     515             :                      errmsg("requested proto_version=%d does not support streaming, need %d or higher",
     516             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
     517         756 :         else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
     518         702 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
     519           0 :             ereport(ERROR,
     520             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     521             :                      errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
     522             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
     523         756 :         else if (!ctx->streaming)
     524           0 :             ereport(ERROR,
     525             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     526             :                      errmsg("streaming requested, but not supported by output plugin")));
     527             : 
     528             :         /*
     529             :          * Here, we just check whether the two-phase option is passed by
     530             :          * plugin and decide whether to enable it at later point of time. It
     531             :          * remains enabled if the previous start-up has done so. But we only
     532             :          * allow the option to be passed in with sufficient version of the
     533             :          * protocol, and when the output plugin supports it.
     534             :          */
     535         792 :         if (!data->two_phase)
     536         776 :             ctx->twophase_opt_given = false;
     537          16 :         else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
     538           0 :             ereport(ERROR,
     539             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     540             :                      errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
     541             :                             data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
     542          16 :         else if (!ctx->twophase)
     543           0 :             ereport(ERROR,
     544             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     545             :                      errmsg("two-phase commit requested, but not supported by output plugin")));
     546             :         else
     547          16 :             ctx->twophase_opt_given = true;
     548             : 
     549             :         /* Init publication state. */
     550         792 :         data->publications = NIL;
     551         792 :         publications_valid = false;
     552             : 
     553             :         /*
     554             :          * Register callback for pg_publication if we didn't already do that
     555             :          * during some previous call in this process.
     556             :          */
     557         792 :         if (!publication_callback_registered)
     558             :         {
     559         788 :             CacheRegisterSyscacheCallback(PUBLICATIONOID,
     560             :                                           publication_invalidation_cb,
     561             :                                           (Datum) 0);
     562         788 :             CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
     563             :                                          (Datum) 0);
     564         788 :             publication_callback_registered = true;
     565             :         }
     566             : 
     567             :         /* Initialize relation schema cache. */
     568         792 :         init_rel_sync_cache(CacheMemoryContext);
     569             :     }
     570             :     else
     571             :     {
     572             :         /*
     573             :          * Disable the streaming and prepared transactions during the slot
     574             :          * initialization mode.
     575             :          */
     576         648 :         ctx->streaming = false;
     577         648 :         ctx->twophase = false;
     578             :     }
     579        1440 : }
     580             : 
     581             : /*
     582             :  * BEGIN callback.
     583             :  *
     584             :  * Don't send the BEGIN message here instead postpone it until the first
     585             :  * change. In logical replication, a common scenario is to replicate a set of
     586             :  * tables (instead of all tables) and transactions whose changes were on
     587             :  * the table(s) that are not published will produce empty transactions. These
     588             :  * empty transactions will send BEGIN and COMMIT messages to subscribers,
     589             :  * using bandwidth on something with little/no use for logical replication.
     590             :  */
     591             : static void
     592        1648 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     593             : {
     594        1648 :     PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
     595             :                                                       sizeof(PGOutputTxnData));
     596             : 
     597        1648 :     txn->output_plugin_private = txndata;
     598        1648 : }
     599             : 
     600             : /*
     601             :  * Send BEGIN.
     602             :  *
     603             :  * This is called while processing the first change of the transaction.
     604             :  */
     605             : static void
     606         882 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     607             : {
     608         882 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     609         882 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     610             : 
     611             :     Assert(txndata);
     612             :     Assert(!txndata->sent_begin_txn);
     613             : 
     614         882 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     615         882 :     logicalrep_write_begin(ctx->out, txn);
     616         882 :     txndata->sent_begin_txn = true;
     617             : 
     618         882 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     619             :                      send_replication_origin);
     620             : 
     621         882 :     OutputPluginWrite(ctx, true);
     622         880 : }
     623             : 
     624             : /*
     625             :  * COMMIT callback
     626             :  */
     627             : static void
     628        1642 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     629             :                     XLogRecPtr commit_lsn)
     630             : {
     631        1642 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     632             :     bool        sent_begin_txn;
     633             : 
     634             :     Assert(txndata);
     635             : 
     636             :     /*
     637             :      * We don't need to send the commit message unless some relevant change
     638             :      * from this transaction has been sent to the downstream.
     639             :      */
     640        1642 :     sent_begin_txn = txndata->sent_begin_txn;
     641        1642 :     OutputPluginUpdateProgress(ctx, !sent_begin_txn);
     642        1642 :     pfree(txndata);
     643        1642 :     txn->output_plugin_private = NULL;
     644             : 
     645        1642 :     if (!sent_begin_txn)
     646             :     {
     647         764 :         elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
     648         764 :         return;
     649             :     }
     650             : 
     651         878 :     OutputPluginPrepareWrite(ctx, true);
     652         878 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     653         878 :     OutputPluginWrite(ctx, true);
     654             : }
     655             : 
     656             : /*
     657             :  * BEGIN PREPARE callback
     658             :  */
     659             : static void
     660          42 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     661             : {
     662          42 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     663             : 
     664          42 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     665          42 :     logicalrep_write_begin_prepare(ctx->out, txn);
     666             : 
     667          42 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     668             :                      send_replication_origin);
     669             : 
     670          42 :     OutputPluginWrite(ctx, true);
     671          42 : }
     672             : 
     673             : /*
     674             :  * PREPARE callback
     675             :  */
     676             : static void
     677          42 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     678             :                      XLogRecPtr prepare_lsn)
     679             : {
     680          42 :     OutputPluginUpdateProgress(ctx, false);
     681             : 
     682          42 :     OutputPluginPrepareWrite(ctx, true);
     683          42 :     logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
     684          42 :     OutputPluginWrite(ctx, true);
     685          42 : }
     686             : 
     687             : /*
     688             :  * COMMIT PREPARED callback
     689             :  */
     690             : static void
     691          48 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     692             :                              XLogRecPtr commit_lsn)
     693             : {
     694          48 :     OutputPluginUpdateProgress(ctx, false);
     695             : 
     696          48 :     OutputPluginPrepareWrite(ctx, true);
     697          48 :     logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
     698          48 :     OutputPluginWrite(ctx, true);
     699          48 : }
     700             : 
     701             : /*
     702             :  * ROLLBACK PREPARED callback
     703             :  */
     704             : static void
     705          16 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
     706             :                                ReorderBufferTXN *txn,
     707             :                                XLogRecPtr prepare_end_lsn,
     708             :                                TimestampTz prepare_time)
     709             : {
     710          16 :     OutputPluginUpdateProgress(ctx, false);
     711             : 
     712          16 :     OutputPluginPrepareWrite(ctx, true);
     713          16 :     logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
     714             :                                        prepare_time);
     715          16 :     OutputPluginWrite(ctx, true);
     716          16 : }
     717             : 
     718             : /*
     719             :  * Write the current schema of the relation and its ancestor (if any) if not
     720             :  * done yet.
     721             :  */
     722             : static void
     723      364462 : maybe_send_schema(LogicalDecodingContext *ctx,
     724             :                   ReorderBufferChange *change,
     725             :                   Relation relation, RelationSyncEntry *relentry)
     726             : {
     727      364462 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     728             :     bool        schema_sent;
     729      364462 :     TransactionId xid = InvalidTransactionId;
     730      364462 :     TransactionId topxid = InvalidTransactionId;
     731             : 
     732             :     /*
     733             :      * Remember XID of the (sub)transaction for the change. We don't care if
     734             :      * it's top-level transaction or not (we have already sent that XID in
     735             :      * start of the current streaming block).
     736             :      *
     737             :      * If we're not in a streaming block, just use InvalidTransactionId and
     738             :      * the write methods will not include it.
     739             :      */
     740      364462 :     if (data->in_streaming)
     741      351842 :         xid = change->txn->xid;
     742             : 
     743      364462 :     if (rbtxn_is_subtxn(change->txn))
     744       20338 :         topxid = rbtxn_get_toptxn(change->txn)->xid;
     745             :     else
     746      344124 :         topxid = xid;
     747             : 
     748             :     /*
     749             :      * Do we need to send the schema? We do track streamed transactions
     750             :      * separately, because those may be applied later (and the regular
     751             :      * transactions won't see their effects until then) and in an order that
     752             :      * we don't know at this point.
     753             :      *
     754             :      * XXX There is a scope of optimization here. Currently, we always send
     755             :      * the schema first time in a streaming transaction but we can probably
     756             :      * avoid that by checking 'relentry->schema_sent' flag. However, before
     757             :      * doing that we need to study its impact on the case where we have a mix
     758             :      * of streaming and non-streaming transactions.
     759             :      */
     760      364462 :     if (data->in_streaming)
     761      351842 :         schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
     762             :     else
     763       12620 :         schema_sent = relentry->schema_sent;
     764             : 
     765             :     /* Nothing to do if we already sent the schema. */
     766      364462 :     if (schema_sent)
     767      363806 :         return;
     768             : 
     769             :     /*
     770             :      * Send the schema.  If the changes will be published using an ancestor's
     771             :      * schema, not the relation's own, send that ancestor's schema before
     772             :      * sending relation's own (XXX - maybe sending only the former suffices?).
     773             :      */
     774         656 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
     775             :     {
     776          60 :         Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     777             : 
     778          60 :         send_relation_and_attrs(ancestor, xid, ctx, relentry);
     779          60 :         RelationClose(ancestor);
     780             :     }
     781             : 
     782         656 :     send_relation_and_attrs(relation, xid, ctx, relentry);
     783             : 
     784         656 :     if (data->in_streaming)
     785         134 :         set_schema_sent_in_streamed_txn(relentry, topxid);
     786             :     else
     787         522 :         relentry->schema_sent = true;
     788             : }
     789             : 
     790             : /*
     791             :  * Sends a relation
     792             :  */
     793             : static void
     794         716 : send_relation_and_attrs(Relation relation, TransactionId xid,
     795             :                         LogicalDecodingContext *ctx,
     796             :                         RelationSyncEntry *relentry)
     797             : {
     798         716 :     TupleDesc   desc = RelationGetDescr(relation);
     799         716 :     Bitmapset  *columns = relentry->columns;
     800         716 :     PublishGencolsType include_gencols_type = relentry->include_gencols_type;
     801             :     int         i;
     802             : 
     803             :     /*
     804             :      * Write out type info if needed.  We do that only for user-created types.
     805             :      * We use FirstGenbkiObjectId as the cutoff, so that we only consider
     806             :      * objects with hand-assigned OIDs to be "built in", not for instance any
     807             :      * function or type defined in the information_schema. This is important
     808             :      * because only hand-assigned OIDs can be expected to remain stable across
     809             :      * major versions.
     810             :      */
     811        2208 :     for (i = 0; i < desc->natts; i++)
     812             :     {
     813        1492 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     814             : 
     815        1492 :         if (!logicalrep_should_publish_column(att, columns,
     816             :                                               include_gencols_type))
     817         142 :             continue;
     818             : 
     819        1350 :         if (att->atttypid < FirstGenbkiObjectId)
     820        1314 :             continue;
     821             : 
     822          36 :         OutputPluginPrepareWrite(ctx, false);
     823          36 :         logicalrep_write_typ(ctx->out, xid, att->atttypid);
     824          36 :         OutputPluginWrite(ctx, false);
     825             :     }
     826             : 
     827         716 :     OutputPluginPrepareWrite(ctx, false);
     828         716 :     logicalrep_write_rel(ctx->out, xid, relation, columns,
     829             :                          include_gencols_type);
     830         716 :     OutputPluginWrite(ctx, false);
     831         716 : }
     832             : 
     833             : /*
     834             :  * Executor state preparation for evaluation of row filter expressions for the
     835             :  * specified relation.
     836             :  */
     837             : static EState *
     838          34 : create_estate_for_relation(Relation rel)
     839             : {
     840             :     EState     *estate;
     841             :     RangeTblEntry *rte;
     842          34 :     List       *perminfos = NIL;
     843             : 
     844          34 :     estate = CreateExecutorState();
     845             : 
     846          34 :     rte = makeNode(RangeTblEntry);
     847          34 :     rte->rtekind = RTE_RELATION;
     848          34 :     rte->relid = RelationGetRelid(rel);
     849          34 :     rte->relkind = rel->rd_rel->relkind;
     850          34 :     rte->rellockmode = AccessShareLock;
     851             : 
     852          34 :     addRTEPermissionInfo(&perminfos, rte);
     853             : 
     854          34 :     ExecInitRangeTable(estate, list_make1(rte), perminfos,
     855             :                        bms_make_singleton(1));
     856             : 
     857          34 :     estate->es_output_cid = GetCurrentCommandId(false);
     858             : 
     859          34 :     return estate;
     860             : }
     861             : 
     862             : /*
     863             :  * Evaluates row filter.
     864             :  *
     865             :  * If the row filter evaluates to NULL, it is taken as false i.e. the change
     866             :  * isn't replicated.
     867             :  */
     868             : static bool
     869          76 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
     870             : {
     871             :     Datum       ret;
     872             :     bool        isnull;
     873             : 
     874             :     Assert(state != NULL);
     875             : 
     876          76 :     ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
     877             : 
     878          76 :     elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
     879             :          isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
     880             :          isnull ? "true" : "false");
     881             : 
     882          76 :     if (isnull)
     883           2 :         return false;
     884             : 
     885          74 :     return DatumGetBool(ret);
     886             : }
     887             : 
     888             : /*
     889             :  * Make sure the per-entry memory context exists.
     890             :  */
     891             : static void
     892         622 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
     893             : {
     894             :     Relation    relation;
     895             : 
     896             :     /* The context may already exist, in which case bail out. */
     897         622 :     if (entry->entry_cxt)
     898          34 :         return;
     899             : 
     900         588 :     relation = RelationIdGetRelation(entry->publish_as_relid);
     901             : 
     902         588 :     entry->entry_cxt = AllocSetContextCreate(data->cachectx,
     903             :                                              "entry private context",
     904             :                                              ALLOCSET_SMALL_SIZES);
     905             : 
     906         588 :     MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
     907             :                                       RelationGetRelationName(relation));
     908             : }
     909             : 
     910             : /*
     911             :  * Initialize the row filter.
     912             :  */
     913             : static void
     914         588 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
     915             :                          RelationSyncEntry *entry)
     916             : {
     917             :     ListCell   *lc;
     918         588 :     List       *rfnodes[] = {NIL, NIL, NIL};    /* One per pubaction */
     919         588 :     bool        no_filter[] = {false, false, false};    /* One per pubaction */
     920             :     MemoryContext oldctx;
     921             :     int         idx;
     922         588 :     bool        has_filter = true;
     923         588 :     Oid         schemaid = get_rel_namespace(entry->publish_as_relid);
     924             : 
     925             :     /*
     926             :      * Find if there are any row filters for this relation. If there are, then
     927             :      * prepare the necessary ExprState and cache it in entry->exprstate. To
     928             :      * build an expression state, we need to ensure the following:
     929             :      *
     930             :      * All the given publication-table mappings must be checked.
     931             :      *
     932             :      * Multiple publications might have multiple row filters for this
     933             :      * relation. Since row filter usage depends on the DML operation, there
     934             :      * are multiple lists (one for each operation) to which row filters will
     935             :      * be appended.
     936             :      *
     937             :      * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
     938             :      * expression" so it takes precedence.
     939             :      */
     940         630 :     foreach(lc, publications)
     941             :     {
     942         596 :         Publication *pub = lfirst(lc);
     943         596 :         HeapTuple   rftuple = NULL;
     944         596 :         Datum       rfdatum = 0;
     945         596 :         bool        pub_no_filter = true;
     946             : 
     947             :         /*
     948             :          * If the publication is FOR ALL TABLES, or the publication includes a
     949             :          * FOR TABLES IN SCHEMA where the table belongs to the referred
     950             :          * schema, then it is treated the same as if there are no row filters
     951             :          * (even if other publications have a row filter).
     952             :          */
     953         596 :         if (!pub->alltables &&
     954         442 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     955             :                                    ObjectIdGetDatum(schemaid),
     956             :                                    ObjectIdGetDatum(pub->oid)))
     957             :         {
     958             :             /*
     959             :              * Check for the presence of a row filter in this publication.
     960             :              */
     961         430 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     962             :                                       ObjectIdGetDatum(entry->publish_as_relid),
     963             :                                       ObjectIdGetDatum(pub->oid));
     964             : 
     965         430 :             if (HeapTupleIsValid(rftuple))
     966             :             {
     967             :                 /* Null indicates no filter. */
     968         406 :                 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     969             :                                           Anum_pg_publication_rel_prqual,
     970             :                                           &pub_no_filter);
     971             :             }
     972             :         }
     973             : 
     974         596 :         if (pub_no_filter)
     975             :         {
     976         568 :             if (rftuple)
     977         378 :                 ReleaseSysCache(rftuple);
     978             : 
     979         568 :             no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
     980         568 :             no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
     981         568 :             no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
     982             : 
     983             :             /*
     984             :              * Quick exit if all the DML actions are publicized via this
     985             :              * publication.
     986             :              */
     987         568 :             if (no_filter[PUBACTION_INSERT] &&
     988         568 :                 no_filter[PUBACTION_UPDATE] &&
     989         554 :                 no_filter[PUBACTION_DELETE])
     990             :             {
     991         554 :                 has_filter = false;
     992         554 :                 break;
     993             :             }
     994             : 
     995             :             /* No additional work for this publication. Next one. */
     996          14 :             continue;
     997             :         }
     998             : 
     999             :         /* Form the per pubaction row filter lists. */
    1000          28 :         if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
    1001          28 :             rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
    1002          28 :                                                 TextDatumGetCString(rfdatum));
    1003          28 :         if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
    1004          28 :             rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
    1005          28 :                                                 TextDatumGetCString(rfdatum));
    1006          28 :         if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
    1007          28 :             rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
    1008          28 :                                                 TextDatumGetCString(rfdatum));
    1009             : 
    1010          28 :         ReleaseSysCache(rftuple);
    1011             :     }                           /* loop all subscribed publications */
    1012             : 
    1013             :     /* Clean the row filter */
    1014        2352 :     for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
    1015             :     {
    1016        1764 :         if (no_filter[idx])
    1017             :         {
    1018        1680 :             list_free_deep(rfnodes[idx]);
    1019        1680 :             rfnodes[idx] = NIL;
    1020             :         }
    1021             :     }
    1022             : 
    1023         588 :     if (has_filter)
    1024             :     {
    1025          34 :         Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1026             : 
    1027          34 :         pgoutput_ensure_entry_cxt(data, entry);
    1028             : 
    1029             :         /*
    1030             :          * Now all the filters for all pubactions are known. Combine them when
    1031             :          * their pubactions are the same.
    1032             :          */
    1033          34 :         oldctx = MemoryContextSwitchTo(entry->entry_cxt);
    1034          34 :         entry->estate = create_estate_for_relation(relation);
    1035         136 :         for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
    1036             :         {
    1037         102 :             List       *filters = NIL;
    1038             :             Expr       *rfnode;
    1039             : 
    1040         102 :             if (rfnodes[idx] == NIL)
    1041          42 :                 continue;
    1042             : 
    1043         126 :             foreach(lc, rfnodes[idx])
    1044          66 :                 filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
    1045             : 
    1046             :             /* combine the row filter and cache the ExprState */
    1047          60 :             rfnode = make_orclause(filters);
    1048          60 :             entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
    1049             :         }                       /* for each pubaction */
    1050          34 :         MemoryContextSwitchTo(oldctx);
    1051             : 
    1052          34 :         RelationClose(relation);
    1053             :     }
    1054         588 : }
    1055             : 
    1056             : /*
    1057             :  * If the table contains a generated column, check for any conflicting
    1058             :  * values of 'publish_generated_columns' parameter in the publications.
    1059             :  */
    1060             : static void
    1061         588 : check_and_init_gencol(PGOutputData *data, List *publications,
    1062             :                       RelationSyncEntry *entry)
    1063             : {
    1064         588 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1065         588 :     TupleDesc   desc = RelationGetDescr(relation);
    1066         588 :     bool        gencolpresent = false;
    1067         588 :     bool        first = true;
    1068             : 
    1069             :     /* Check if there is any generated column present. */
    1070        1786 :     for (int i = 0; i < desc->natts; i++)
    1071             :     {
    1072        1212 :         Form_pg_attribute att = TupleDescAttr(desc, i);
    1073             : 
    1074        1212 :         if (att->attgenerated)
    1075             :         {
    1076          14 :             gencolpresent = true;
    1077          14 :             break;
    1078             :         }
    1079             :     }
    1080             : 
    1081             :     /* There are no generated columns to be published. */
    1082         588 :     if (!gencolpresent)
    1083             :     {
    1084         574 :         entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
    1085         574 :         return;
    1086             :     }
    1087             : 
    1088             :     /*
    1089             :      * There may be a conflicting value for 'publish_generated_columns'
    1090             :      * parameter in the publications.
    1091             :      */
    1092          44 :     foreach_ptr(Publication, pub, publications)
    1093             :     {
    1094             :         /*
    1095             :          * The column list takes precedence over the
    1096             :          * 'publish_generated_columns' parameter. Those will be checked later,
    1097             :          * see pgoutput_column_list_init.
    1098             :          */
    1099          16 :         if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
    1100           6 :             continue;
    1101             : 
    1102          10 :         if (first)
    1103             :         {
    1104          10 :             entry->include_gencols_type = pub->pubgencols_type;
    1105          10 :             first = false;
    1106             :         }
    1107           0 :         else if (entry->include_gencols_type != pub->pubgencols_type)
    1108           0 :             ereport(ERROR,
    1109             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1110             :                     errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
    1111             :                            get_namespace_name(RelationGetNamespace(relation)),
    1112             :                            RelationGetRelationName(relation)));
    1113             :     }
    1114             : }
    1115             : 
    1116             : /*
    1117             :  * Initialize the column list.
    1118             :  */
    1119             : static void
    1120         588 : pgoutput_column_list_init(PGOutputData *data, List *publications,
    1121             :                           RelationSyncEntry *entry)
    1122             : {
    1123             :     ListCell   *lc;
    1124         588 :     bool        first = true;
    1125         588 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1126         588 :     bool        found_pub_collist = false;
    1127         588 :     Bitmapset  *relcols = NULL;
    1128             : 
    1129         588 :     pgoutput_ensure_entry_cxt(data, entry);
    1130             : 
    1131             :     /*
    1132             :      * Find if there are any column lists for this relation. If there are,
    1133             :      * build a bitmap using the column lists.
    1134             :      *
    1135             :      * Multiple publications might have multiple column lists for this
    1136             :      * relation.
    1137             :      *
    1138             :      * Note that we don't support the case where the column list is different
    1139             :      * for the same table when combining publications. See comments atop
    1140             :      * fetch_table_list. But one can later change the publication so we still
    1141             :      * need to check all the given publication-table mappings and report an
    1142             :      * error if any publications have a different column list.
    1143             :      */
    1144        1196 :     foreach(lc, publications)
    1145             :     {
    1146         610 :         Publication *pub = lfirst(lc);
    1147         610 :         Bitmapset  *cols = NULL;
    1148             : 
    1149             :         /* Retrieve the bitmap of columns for a column list publication. */
    1150         610 :         found_pub_collist |= check_and_fetch_column_list(pub,
    1151             :                                                          entry->publish_as_relid,
    1152             :                                                          entry->entry_cxt, &cols);
    1153             : 
    1154             :         /*
    1155             :          * For non-column list publications — e.g. TABLE (without a column
    1156             :          * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
    1157             :          * of the table (including generated columns when
    1158             :          * 'publish_generated_columns' parameter is true).
    1159             :          */
    1160         610 :         if (!cols)
    1161             :         {
    1162             :             /*
    1163             :              * Cache the table columns for the first publication with no
    1164             :              * specified column list to detect publication with a different
    1165             :              * column list.
    1166             :              */
    1167         532 :             if (!relcols && (list_length(publications) > 1))
    1168             :             {
    1169          18 :                 MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
    1170             : 
    1171          18 :                 relcols = pub_form_cols_map(relation,
    1172             :                                             entry->include_gencols_type);
    1173          18 :                 MemoryContextSwitchTo(oldcxt);
    1174             :             }
    1175             : 
    1176         532 :             cols = relcols;
    1177             :         }
    1178             : 
    1179         610 :         if (first)
    1180             :         {
    1181         588 :             entry->columns = cols;
    1182         588 :             first = false;
    1183             :         }
    1184          22 :         else if (!bms_equal(entry->columns, cols))
    1185           2 :             ereport(ERROR,
    1186             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1187             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    1188             :                            get_namespace_name(RelationGetNamespace(relation)),
    1189             :                            RelationGetRelationName(relation)));
    1190             :     }                           /* loop all subscribed publications */
    1191             : 
    1192             :     /*
    1193             :      * If no column list publications exist, columns to be published will be
    1194             :      * computed later according to the 'publish_generated_columns' parameter.
    1195             :      */
    1196         586 :     if (!found_pub_collist)
    1197         514 :         entry->columns = NULL;
    1198             : 
    1199         586 :     RelationClose(relation);
    1200         586 : }
    1201             : 
    1202             : /*
    1203             :  * Initialize the slot for storing new and old tuples, and build the map that
    1204             :  * will be used to convert the relation's tuples into the ancestor's format.
    1205             :  */
    1206             : static void
    1207         588 : init_tuple_slot(PGOutputData *data, Relation relation,
    1208             :                 RelationSyncEntry *entry)
    1209             : {
    1210             :     MemoryContext oldctx;
    1211             :     TupleDesc   oldtupdesc;
    1212             :     TupleDesc   newtupdesc;
    1213             : 
    1214         588 :     oldctx = MemoryContextSwitchTo(data->cachectx);
    1215             : 
    1216             :     /*
    1217             :      * Create tuple table slots. Create a copy of the TupleDesc as it needs to
    1218             :      * live as long as the cache remains.
    1219             :      */
    1220         588 :     oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1221         588 :     newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1222             : 
    1223         588 :     entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
    1224         588 :     entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
    1225             : 
    1226         588 :     MemoryContextSwitchTo(oldctx);
    1227             : 
    1228             :     /*
    1229             :      * Cache the map that will be used to convert the relation's tuples into
    1230             :      * the ancestor's format, if needed.
    1231             :      */
    1232         588 :     if (entry->publish_as_relid != RelationGetRelid(relation))
    1233             :     {
    1234          70 :         Relation    ancestor = RelationIdGetRelation(entry->publish_as_relid);
    1235          70 :         TupleDesc   indesc = RelationGetDescr(relation);
    1236          70 :         TupleDesc   outdesc = RelationGetDescr(ancestor);
    1237             : 
    1238             :         /* Map must live as long as the logical decoding context. */
    1239          70 :         oldctx = MemoryContextSwitchTo(data->cachectx);
    1240             : 
    1241          70 :         entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
    1242             : 
    1243          70 :         MemoryContextSwitchTo(oldctx);
    1244          70 :         RelationClose(ancestor);
    1245             :     }
    1246         588 : }
    1247             : 
    1248             : /*
    1249             :  * Change is checked against the row filter if any.
    1250             :  *
    1251             :  * Returns true if the change is to be replicated, else false.
    1252             :  *
    1253             :  * For inserts, evaluate the row filter for new tuple.
    1254             :  * For deletes, evaluate the row filter for old tuple.
    1255             :  * For updates, evaluate the row filter for old and new tuple.
    1256             :  *
    1257             :  * For updates, if both evaluations are true, we allow sending the UPDATE and
    1258             :  * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
    1259             :  * only one of the tuples matches the row filter expression, we transform
    1260             :  * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
    1261             :  * following rules:
    1262             :  *
    1263             :  * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
    1264             :  * Case 2: old-row (no match)    new row (match)     -> INSERT
    1265             :  * Case 3: old-row (match)       new-row (no match)  -> DELETE
    1266             :  * Case 4: old-row (match)       new row (match)     -> UPDATE
    1267             :  *
    1268             :  * The new action is updated in the action parameter.
    1269             :  *
    1270             :  * The new slot could be updated when transforming the UPDATE into INSERT,
    1271             :  * because the original new tuple might not have column values from the replica
    1272             :  * identity.
    1273             :  *
    1274             :  * Examples:
    1275             :  * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
    1276             :  * Since the old tuple satisfies, the initial table synchronization copied this
    1277             :  * row (or another method was used to guarantee that there is data
    1278             :  * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
    1279             :  * row filter, so from a data consistency perspective, that row should be
    1280             :  * removed on the subscriber. The UPDATE should be transformed into a DELETE
    1281             :  * statement and be sent to the subscriber. Keeping this row on the subscriber
    1282             :  * is undesirable because it doesn't reflect what was defined in the row filter
    1283             :  * expression on the publisher. This row on the subscriber would likely not be
    1284             :  * modified by replication again. If someone inserted a new row with the same
    1285             :  * old identifier, replication could stop due to a constraint violation.
    1286             :  *
    1287             :  * Let's say the old tuple doesn't match the row filter but the new tuple does.
    1288             :  * Since the old tuple doesn't satisfy, the initial table synchronization
    1289             :  * probably didn't copy this row. However, after the UPDATE the new tuple does
    1290             :  * satisfy the row filter, so from a data consistency perspective, that row
    1291             :  * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
    1292             :  * statements have no effect (it matches no row -- see
    1293             :  * apply_handle_update_internal()). So, the UPDATE should be transformed into a
    1294             :  * INSERT statement and be sent to the subscriber. However, this might surprise
    1295             :  * someone who expects the data set to satisfy the row filter expression on the
    1296             :  * provider.
    1297             :  */
    1298             : static bool
    1299      364454 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
    1300             :                     TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
    1301             :                     ReorderBufferChangeType *action)
    1302             : {
    1303             :     TupleDesc   desc;
    1304             :     int         i;
    1305             :     bool        old_matched,
    1306             :                 new_matched,
    1307             :                 result;
    1308             :     TupleTableSlot *tmp_new_slot;
    1309      364454 :     TupleTableSlot *new_slot = *new_slot_ptr;
    1310             :     ExprContext *ecxt;
    1311             :     ExprState  *filter_exprstate;
    1312             : 
    1313             :     /*
    1314             :      * We need this map to avoid relying on ReorderBufferChangeType enums
    1315             :      * having specific values.
    1316             :      */
    1317             :     static const int map_changetype_pubaction[] = {
    1318             :         [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
    1319             :         [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
    1320             :         [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
    1321             :     };
    1322             : 
    1323             :     Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
    1324             :            *action == REORDER_BUFFER_CHANGE_UPDATE ||
    1325             :            *action == REORDER_BUFFER_CHANGE_DELETE);
    1326             : 
    1327             :     Assert(new_slot || old_slot);
    1328             : 
    1329             :     /* Get the corresponding row filter */
    1330      364454 :     filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
    1331             : 
    1332             :     /* Bail out if there is no row filter */
    1333      364454 :     if (!filter_exprstate)
    1334      364386 :         return true;
    1335             : 
    1336          68 :     elog(DEBUG3, "table \"%s.%s\" has row filter",
    1337             :          get_namespace_name(RelationGetNamespace(relation)),
    1338             :          RelationGetRelationName(relation));
    1339             : 
    1340          68 :     ResetPerTupleExprContext(entry->estate);
    1341             : 
    1342          68 :     ecxt = GetPerTupleExprContext(entry->estate);
    1343             : 
    1344             :     /*
    1345             :      * For the following occasions where there is only one tuple, we can
    1346             :      * evaluate the row filter for that tuple and return.
    1347             :      *
    1348             :      * For inserts, we only have the new tuple.
    1349             :      *
    1350             :      * For updates, we can have only a new tuple when none of the replica
    1351             :      * identity columns changed and none of those columns have external data
    1352             :      * but we still need to evaluate the row filter for the new tuple as the
    1353             :      * existing values of those columns might not match the filter. Also,
    1354             :      * users can use constant expressions in the row filter, so we anyway need
    1355             :      * to evaluate it for the new tuple.
    1356             :      *
    1357             :      * For deletes, we only have the old tuple.
    1358             :      */
    1359          68 :     if (!new_slot || !old_slot)
    1360             :     {
    1361          60 :         ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
    1362          60 :         result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1363             : 
    1364          60 :         return result;
    1365             :     }
    1366             : 
    1367             :     /*
    1368             :      * Both the old and new tuples must be valid only for updates and need to
    1369             :      * be checked against the row filter.
    1370             :      */
    1371             :     Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
    1372             : 
    1373           8 :     slot_getallattrs(new_slot);
    1374           8 :     slot_getallattrs(old_slot);
    1375             : 
    1376           8 :     tmp_new_slot = NULL;
    1377           8 :     desc = RelationGetDescr(relation);
    1378             : 
    1379             :     /*
    1380             :      * The new tuple might not have all the replica identity columns, in which
    1381             :      * case it needs to be copied over from the old tuple.
    1382             :      */
    1383          24 :     for (i = 0; i < desc->natts; i++)
    1384             :     {
    1385          16 :         CompactAttribute *att = TupleDescCompactAttr(desc, i);
    1386             : 
    1387             :         /*
    1388             :          * if the column in the new tuple or old tuple is null, nothing to do
    1389             :          */
    1390          16 :         if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
    1391           2 :             continue;
    1392             : 
    1393             :         /*
    1394             :          * Unchanged toasted replica identity columns are only logged in the
    1395             :          * old tuple. Copy this over to the new tuple. The changed (or WAL
    1396             :          * Logged) toast values are always assembled in memory and set as
    1397             :          * VARTAG_INDIRECT. See ReorderBufferToastReplace.
    1398             :          */
    1399          22 :         if (att->attlen == -1 &&
    1400           8 :             VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
    1401           2 :             !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
    1402             :         {
    1403           2 :             if (!tmp_new_slot)
    1404             :             {
    1405           2 :                 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
    1406           2 :                 ExecClearTuple(tmp_new_slot);
    1407             : 
    1408           2 :                 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
    1409           2 :                        desc->natts * sizeof(Datum));
    1410           2 :                 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
    1411           2 :                        desc->natts * sizeof(bool));
    1412             :             }
    1413             : 
    1414           2 :             tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
    1415           2 :             tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
    1416             :         }
    1417             :     }
    1418             : 
    1419           8 :     ecxt->ecxt_scantuple = old_slot;
    1420           8 :     old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1421             : 
    1422           8 :     if (tmp_new_slot)
    1423             :     {
    1424           2 :         ExecStoreVirtualTuple(tmp_new_slot);
    1425           2 :         ecxt->ecxt_scantuple = tmp_new_slot;
    1426             :     }
    1427             :     else
    1428           6 :         ecxt->ecxt_scantuple = new_slot;
    1429             : 
    1430           8 :     new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1431             : 
    1432             :     /*
    1433             :      * Case 1: if both tuples don't match the row filter, bailout. Send
    1434             :      * nothing.
    1435             :      */
    1436           8 :     if (!old_matched && !new_matched)
    1437           0 :         return false;
    1438             : 
    1439             :     /*
    1440             :      * Case 2: if the old tuple doesn't satisfy the row filter but the new
    1441             :      * tuple does, transform the UPDATE into INSERT.
    1442             :      *
    1443             :      * Use the newly transformed tuple that must contain the column values for
    1444             :      * all the replica identity columns. This is required to ensure that the
    1445             :      * while inserting the tuple in the downstream node, we have all the
    1446             :      * required column values.
    1447             :      */
    1448           8 :     if (!old_matched && new_matched)
    1449             :     {
    1450           4 :         *action = REORDER_BUFFER_CHANGE_INSERT;
    1451             : 
    1452           4 :         if (tmp_new_slot)
    1453           2 :             *new_slot_ptr = tmp_new_slot;
    1454             :     }
    1455             : 
    1456             :     /*
    1457             :      * Case 3: if the old tuple satisfies the row filter but the new tuple
    1458             :      * doesn't, transform the UPDATE into DELETE.
    1459             :      *
    1460             :      * This transformation does not require another tuple. The Old tuple will
    1461             :      * be used for DELETE.
    1462             :      */
    1463           4 :     else if (old_matched && !new_matched)
    1464           2 :         *action = REORDER_BUFFER_CHANGE_DELETE;
    1465             : 
    1466             :     /*
    1467             :      * Case 4: if both tuples match the row filter, transformation isn't
    1468             :      * required. (*action is default UPDATE).
    1469             :      */
    1470             : 
    1471           8 :     return true;
    1472             : }
    1473             : 
    1474             : /*
    1475             :  * Sends the decoded DML over wire.
    1476             :  *
    1477             :  * This is called both in streaming and non-streaming modes.
    1478             :  */
    1479             : static void
    1480      366760 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1481             :                 Relation relation, ReorderBufferChange *change)
    1482             : {
    1483      366760 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1484      366760 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1485             :     MemoryContext old;
    1486             :     RelationSyncEntry *relentry;
    1487      366760 :     TransactionId xid = InvalidTransactionId;
    1488      366760 :     Relation    ancestor = NULL;
    1489      366760 :     Relation    targetrel = relation;
    1490      366760 :     ReorderBufferChangeType action = change->action;
    1491      366760 :     TupleTableSlot *old_slot = NULL;
    1492      366760 :     TupleTableSlot *new_slot = NULL;
    1493             : 
    1494      366760 :     if (!is_publishable_relation(relation))
    1495        2304 :         return;
    1496             : 
    1497             :     /*
    1498             :      * Remember the xid for the change in streaming mode. We need to send xid
    1499             :      * with each change in the streaming mode so that subscriber can make
    1500             :      * their association and on aborts, it can discard the corresponding
    1501             :      * changes.
    1502             :      */
    1503      366760 :     if (data->in_streaming)
    1504      351842 :         xid = change->txn->xid;
    1505             : 
    1506      366760 :     relentry = get_rel_sync_entry(data, relation);
    1507             : 
    1508             :     /* First check the table filter */
    1509      366758 :     switch (action)
    1510             :     {
    1511      211946 :         case REORDER_BUFFER_CHANGE_INSERT:
    1512      211946 :             if (!relentry->pubactions.pubinsert)
    1513         132 :                 return;
    1514      211814 :             break;
    1515       68970 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1516       68970 :             if (!relentry->pubactions.pubupdate)
    1517          88 :                 return;
    1518       68882 :             break;
    1519       85842 :         case REORDER_BUFFER_CHANGE_DELETE:
    1520       85842 :             if (!relentry->pubactions.pubdelete)
    1521        2084 :                 return;
    1522             : 
    1523             :             /*
    1524             :              * This is only possible if deletes are allowed even when replica
    1525             :              * identity is not defined for a table. Since the DELETE action
    1526             :              * can't be published, we simply return.
    1527             :              */
    1528       83758 :             if (!change->data.tp.oldtuple)
    1529             :             {
    1530           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
    1531           0 :                 return;
    1532             :             }
    1533       83758 :             break;
    1534      364454 :         default:
    1535             :             Assert(false);
    1536             :     }
    1537             : 
    1538             :     /* Avoid leaking memory by using and resetting our own context */
    1539      364454 :     old = MemoryContextSwitchTo(data->context);
    1540             : 
    1541             :     /* Switch relation if publishing via root. */
    1542      364454 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
    1543             :     {
    1544             :         Assert(relation->rd_rel->relispartition);
    1545         136 :         ancestor = RelationIdGetRelation(relentry->publish_as_relid);
    1546         136 :         targetrel = ancestor;
    1547             :     }
    1548             : 
    1549      364454 :     if (change->data.tp.oldtuple)
    1550             :     {
    1551       84026 :         old_slot = relentry->old_slot;
    1552       84026 :         ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
    1553             : 
    1554             :         /* Convert tuple if needed. */
    1555       84026 :         if (relentry->attrmap)
    1556             :         {
    1557          10 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1558             :                                                       &TTSOpsVirtual);
    1559             : 
    1560          10 :             old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
    1561             :         }
    1562             :     }
    1563             : 
    1564      364454 :     if (change->data.tp.newtuple)
    1565             :     {
    1566      280696 :         new_slot = relentry->new_slot;
    1567      280696 :         ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
    1568             : 
    1569             :         /* Convert tuple if needed. */
    1570      280696 :         if (relentry->attrmap)
    1571             :         {
    1572          38 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1573             :                                                       &TTSOpsVirtual);
    1574             : 
    1575          38 :             new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
    1576             :         }
    1577             :     }
    1578             : 
    1579             :     /*
    1580             :      * Check row filter.
    1581             :      *
    1582             :      * Updates could be transformed to inserts or deletes based on the results
    1583             :      * of the row filter for old and new tuple.
    1584             :      */
    1585      364454 :     if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
    1586          24 :         goto cleanup;
    1587             : 
    1588             :     /*
    1589             :      * Send BEGIN if we haven't yet.
    1590             :      *
    1591             :      * We send the BEGIN message after ensuring that we will actually send the
    1592             :      * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
    1593             :      * transactions.
    1594             :      */
    1595      364430 :     if (txndata && !txndata->sent_begin_txn)
    1596         856 :         pgoutput_send_begin(ctx, txn);
    1597             : 
    1598             :     /*
    1599             :      * Schema should be sent using the original relation because it also sends
    1600             :      * the ancestor's relation.
    1601             :      */
    1602      364428 :     maybe_send_schema(ctx, change, relation, relentry);
    1603             : 
    1604      364428 :     OutputPluginPrepareWrite(ctx, true);
    1605             : 
    1606             :     /* Send the data */
    1607      364428 :     switch (action)
    1608             :     {
    1609      211792 :         case REORDER_BUFFER_CHANGE_INSERT:
    1610      211792 :             logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
    1611      211792 :                                     data->binary, relentry->columns,
    1612             :                                     relentry->include_gencols_type);
    1613      211792 :             break;
    1614       68876 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1615       68876 :             logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
    1616       68876 :                                     new_slot, data->binary, relentry->columns,
    1617             :                                     relentry->include_gencols_type);
    1618       68876 :             break;
    1619       83760 :         case REORDER_BUFFER_CHANGE_DELETE:
    1620       83760 :             logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
    1621       83760 :                                     data->binary, relentry->columns,
    1622             :                                     relentry->include_gencols_type);
    1623       83760 :             break;
    1624      364428 :         default:
    1625             :             Assert(false);
    1626             :     }
    1627             : 
    1628      364428 :     OutputPluginWrite(ctx, true);
    1629             : 
    1630      364450 : cleanup:
    1631      364450 :     if (RelationIsValid(ancestor))
    1632             :     {
    1633         134 :         RelationClose(ancestor);
    1634         134 :         ancestor = NULL;
    1635             :     }
    1636             : 
    1637             :     /* Drop the new slots that were used to store the converted tuples. */
    1638      364450 :     if (relentry->attrmap)
    1639             :     {
    1640          48 :         if (old_slot)
    1641          10 :             ExecDropSingleTupleTableSlot(old_slot);
    1642             : 
    1643          48 :         if (new_slot)
    1644          38 :             ExecDropSingleTupleTableSlot(new_slot);
    1645             :     }
    1646             : 
    1647      364450 :     MemoryContextSwitchTo(old);
    1648      364450 :     MemoryContextReset(data->context);
    1649             : }
    1650             : 
    1651             : static void
    1652          30 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1653             :                   int nrelations, Relation relations[], ReorderBufferChange *change)
    1654             : {
    1655          30 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1656          30 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1657             :     MemoryContext old;
    1658             :     RelationSyncEntry *relentry;
    1659             :     int         i;
    1660             :     int         nrelids;
    1661             :     Oid        *relids;
    1662          30 :     TransactionId xid = InvalidTransactionId;
    1663             : 
    1664             :     /* Remember the xid for the change in streaming mode. See pgoutput_change. */
    1665          30 :     if (data->in_streaming)
    1666           0 :         xid = change->txn->xid;
    1667             : 
    1668          30 :     old = MemoryContextSwitchTo(data->context);
    1669             : 
    1670          30 :     relids = palloc0(nrelations * sizeof(Oid));
    1671          30 :     nrelids = 0;
    1672             : 
    1673          98 :     for (i = 0; i < nrelations; i++)
    1674             :     {
    1675          68 :         Relation    relation = relations[i];
    1676          68 :         Oid         relid = RelationGetRelid(relation);
    1677             : 
    1678          68 :         if (!is_publishable_relation(relation))
    1679           0 :             continue;
    1680             : 
    1681          68 :         relentry = get_rel_sync_entry(data, relation);
    1682             : 
    1683          68 :         if (!relentry->pubactions.pubtruncate)
    1684          28 :             continue;
    1685             : 
    1686             :         /*
    1687             :          * Don't send partitions if the publication wants to send only the
    1688             :          * root tables through it.
    1689             :          */
    1690          40 :         if (relation->rd_rel->relispartition &&
    1691          30 :             relentry->publish_as_relid != relid)
    1692           6 :             continue;
    1693             : 
    1694          34 :         relids[nrelids++] = relid;
    1695             : 
    1696             :         /* Send BEGIN if we haven't yet */
    1697          34 :         if (txndata && !txndata->sent_begin_txn)
    1698          22 :             pgoutput_send_begin(ctx, txn);
    1699             : 
    1700          34 :         maybe_send_schema(ctx, change, relation, relentry);
    1701             :     }
    1702             : 
    1703          30 :     if (nrelids > 0)
    1704             :     {
    1705          22 :         OutputPluginPrepareWrite(ctx, true);
    1706          22 :         logicalrep_write_truncate(ctx->out,
    1707             :                                   xid,
    1708             :                                   nrelids,
    1709             :                                   relids,
    1710          22 :                                   change->data.truncate.cascade,
    1711          22 :                                   change->data.truncate.restart_seqs);
    1712          22 :         OutputPluginWrite(ctx, true);
    1713             :     }
    1714             : 
    1715          30 :     MemoryContextSwitchTo(old);
    1716          30 :     MemoryContextReset(data->context);
    1717          30 : }
    1718             : 
    1719             : static void
    1720          14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1721             :                  XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
    1722             :                  const char *message)
    1723             : {
    1724          14 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1725          14 :     TransactionId xid = InvalidTransactionId;
    1726             : 
    1727          14 :     if (!data->messages)
    1728           4 :         return;
    1729             : 
    1730             :     /*
    1731             :      * Remember the xid for the message in streaming mode. See
    1732             :      * pgoutput_change.
    1733             :      */
    1734          10 :     if (data->in_streaming)
    1735           0 :         xid = txn->xid;
    1736             : 
    1737             :     /*
    1738             :      * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
    1739             :      */
    1740          10 :     if (transactional)
    1741             :     {
    1742           4 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1743             : 
    1744             :         /* Send BEGIN if we haven't yet */
    1745           4 :         if (txndata && !txndata->sent_begin_txn)
    1746           4 :             pgoutput_send_begin(ctx, txn);
    1747             :     }
    1748             : 
    1749          10 :     OutputPluginPrepareWrite(ctx, true);
    1750          10 :     logicalrep_write_message(ctx->out,
    1751             :                              xid,
    1752             :                              message_lsn,
    1753             :                              transactional,
    1754             :                              prefix,
    1755             :                              sz,
    1756             :                              message);
    1757          10 :     OutputPluginWrite(ctx, true);
    1758             : }
    1759             : 
    1760             : /*
    1761             :  * Return true if the data is associated with an origin and the user has
    1762             :  * requested the changes that don't have an origin, false otherwise.
    1763             :  */
    1764             : static bool
    1765      983050 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
    1766             :                        RepOriginId origin_id)
    1767             : {
    1768      983050 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1769             : 
    1770      983050 :     if (data->publish_no_origin && origin_id != InvalidRepOriginId)
    1771         296 :         return true;
    1772             : 
    1773      982754 :     return false;
    1774             : }
    1775             : 
    1776             : /*
    1777             :  * Shutdown the output plugin.
    1778             :  *
    1779             :  * Note, we don't need to clean the data->context, data->cachectx, and
    1780             :  * data->pubctx as they are child contexts of the ctx->context so they
    1781             :  * will be cleaned up by logical decoding machinery.
    1782             :  */
    1783             : static void
    1784        1034 : pgoutput_shutdown(LogicalDecodingContext *ctx)
    1785             : {
    1786        1034 :     pgoutput_memory_context_reset(NULL);
    1787        1034 : }
    1788             : 
    1789             : /*
    1790             :  * Load publications from the list of publication names.
    1791             :  *
    1792             :  * Here, we skip the publications that don't exist yet. This will allow us
    1793             :  * to silently continue the replication in the absence of a missing publication.
    1794             :  * This is required because we allow the users to create publications after they
    1795             :  * have specified the required publications at the time of replication start.
    1796             :  */
    1797             : static List *
    1798         376 : LoadPublications(List *pubnames)
    1799             : {
    1800         376 :     List       *result = NIL;
    1801             :     ListCell   *lc;
    1802             : 
    1803         842 :     foreach(lc, pubnames)
    1804             :     {
    1805         466 :         char       *pubname = (char *) lfirst(lc);
    1806         466 :         Publication *pub = GetPublicationByName(pubname, true);
    1807             : 
    1808         466 :         if (pub)
    1809         462 :             result = lappend(result, pub);
    1810             :         else
    1811           4 :             ereport(WARNING,
    1812             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1813             :                     errmsg("skipped loading publication \"%s\"", pubname),
    1814             :                     errdetail("The publication does not exist at this point in the WAL."),
    1815             :                     errhint("Create the publication if it does not exist."));
    1816             :     }
    1817             : 
    1818         376 :     return result;
    1819             : }
    1820             : 
    1821             : /*
    1822             :  * Publication syscache invalidation callback.
    1823             :  *
    1824             :  * Called for invalidations on pg_publication.
    1825             :  */
    1826             : static void
    1827         616 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
    1828             : {
    1829         616 :     publications_valid = false;
    1830         616 : }
    1831             : 
    1832             : /*
    1833             :  * START STREAM callback
    1834             :  */
    1835             : static void
    1836        1234 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
    1837             :                       ReorderBufferTXN *txn)
    1838             : {
    1839        1234 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1840        1234 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
    1841             : 
    1842             :     /* we can't nest streaming of transactions */
    1843             :     Assert(!data->in_streaming);
    1844             : 
    1845             :     /*
    1846             :      * If we already sent the first stream for this transaction then don't
    1847             :      * send the origin id in the subsequent streams.
    1848             :      */
    1849        1234 :     if (rbtxn_is_streamed(txn))
    1850        1116 :         send_replication_origin = false;
    1851             : 
    1852        1234 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
    1853        1234 :     logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
    1854             : 
    1855        1234 :     send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
    1856             :                      send_replication_origin);
    1857             : 
    1858        1234 :     OutputPluginWrite(ctx, true);
    1859             : 
    1860             :     /* we're streaming a chunk of transaction now */
    1861        1234 :     data->in_streaming = true;
    1862        1234 : }
    1863             : 
    1864             : /*
    1865             :  * STOP STREAM callback
    1866             :  */
    1867             : static void
    1868        1234 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
    1869             :                      ReorderBufferTXN *txn)
    1870             : {
    1871        1234 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1872             : 
    1873             :     /* we should be streaming a transaction */
    1874             :     Assert(data->in_streaming);
    1875             : 
    1876        1234 :     OutputPluginPrepareWrite(ctx, true);
    1877        1234 :     logicalrep_write_stream_stop(ctx->out);
    1878        1234 :     OutputPluginWrite(ctx, true);
    1879             : 
    1880             :     /* we've stopped streaming a transaction */
    1881        1234 :     data->in_streaming = false;
    1882        1234 : }
    1883             : 
    1884             : /*
    1885             :  * Notify downstream to discard the streamed transaction (along with all
    1886             :  * its subtransactions, if it's a toplevel transaction).
    1887             :  */
    1888             : static void
    1889          52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
    1890             :                       ReorderBufferTXN *txn,
    1891             :                       XLogRecPtr abort_lsn)
    1892             : {
    1893             :     ReorderBufferTXN *toptxn;
    1894          52 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1895          52 :     bool        write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
    1896             : 
    1897             :     /*
    1898             :      * The abort should happen outside streaming block, even for streamed
    1899             :      * transactions. The transaction has to be marked as streamed, though.
    1900             :      */
    1901             :     Assert(!data->in_streaming);
    1902             : 
    1903             :     /* determine the toplevel transaction */
    1904          52 :     toptxn = rbtxn_get_toptxn(txn);
    1905             : 
    1906             :     Assert(rbtxn_is_streamed(toptxn));
    1907             : 
    1908          52 :     OutputPluginPrepareWrite(ctx, true);
    1909          52 :     logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
    1910             :                                   txn->abort_time, write_abort_info);
    1911             : 
    1912          52 :     OutputPluginWrite(ctx, true);
    1913             : 
    1914          52 :     cleanup_rel_sync_cache(toptxn->xid, false);
    1915          52 : }
    1916             : 
    1917             : /*
    1918             :  * Notify downstream to apply the streamed transaction (along with all
    1919             :  * its subtransactions).
    1920             :  */
    1921             : static void
    1922          90 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
    1923             :                        ReorderBufferTXN *txn,
    1924             :                        XLogRecPtr commit_lsn)
    1925             : {
    1926          90 :     PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
    1927             : 
    1928             :     /*
    1929             :      * The commit should happen outside streaming block, even for streamed
    1930             :      * transactions. The transaction has to be marked as streamed, though.
    1931             :      */
    1932             :     Assert(!data->in_streaming);
    1933             :     Assert(rbtxn_is_streamed(txn));
    1934             : 
    1935          90 :     OutputPluginUpdateProgress(ctx, false);
    1936             : 
    1937          90 :     OutputPluginPrepareWrite(ctx, true);
    1938          90 :     logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
    1939          90 :     OutputPluginWrite(ctx, true);
    1940             : 
    1941          90 :     cleanup_rel_sync_cache(txn->xid, true);
    1942          90 : }
    1943             : 
    1944             : /*
    1945             :  * PREPARE callback (for streaming two-phase commit).
    1946             :  *
    1947             :  * Notify the downstream to prepare the transaction.
    1948             :  */
    1949             : static void
    1950          22 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
    1951             :                             ReorderBufferTXN *txn,
    1952             :                             XLogRecPtr prepare_lsn)
    1953             : {
    1954             :     Assert(rbtxn_is_streamed(txn));
    1955             : 
    1956          22 :     OutputPluginUpdateProgress(ctx, false);
    1957          22 :     OutputPluginPrepareWrite(ctx, true);
    1958          22 :     logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
    1959          22 :     OutputPluginWrite(ctx, true);
    1960          22 : }
    1961             : 
    1962             : /*
    1963             :  * Initialize the relation schema sync cache for a decoding session.
    1964             :  *
    1965             :  * The hash table is destroyed at the end of a decoding session. While
    1966             :  * relcache invalidations still exist and will still be invoked, they
    1967             :  * will just see the null hash table global and take no action.
    1968             :  */
    1969             : static void
    1970         792 : init_rel_sync_cache(MemoryContext cachectx)
    1971             : {
    1972             :     HASHCTL     ctl;
    1973             :     static bool relation_callbacks_registered = false;
    1974             : 
    1975             :     /* Nothing to do if hash table already exists */
    1976         792 :     if (RelationSyncCache != NULL)
    1977           4 :         return;
    1978             : 
    1979             :     /* Make a new hash table for the cache */
    1980         792 :     ctl.keysize = sizeof(Oid);
    1981         792 :     ctl.entrysize = sizeof(RelationSyncEntry);
    1982         792 :     ctl.hcxt = cachectx;
    1983             : 
    1984         792 :     RelationSyncCache = hash_create("logical replication output relation cache",
    1985             :                                     128, &ctl,
    1986             :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
    1987             : 
    1988             :     Assert(RelationSyncCache != NULL);
    1989             : 
    1990             :     /* No more to do if we already registered callbacks */
    1991         792 :     if (relation_callbacks_registered)
    1992           4 :         return;
    1993             : 
    1994             :     /* We must update the cache entry for a relation after a relcache flush */
    1995         788 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
    1996             : 
    1997             :     /*
    1998             :      * Flush all cache entries after a pg_namespace change, in case it was a
    1999             :      * schema rename affecting a relation being replicated.
    2000             :      *
    2001             :      * XXX: It is not a good idea to invalidate all the relation entries in
    2002             :      * RelationSyncCache on schema rename. We can optimize it to invalidate
    2003             :      * only the required relations by either having a specific invalidation
    2004             :      * message containing impacted relations or by having schema information
    2005             :      * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
    2006             :      * passed to the callback.
    2007             :      */
    2008         788 :     CacheRegisterSyscacheCallback(NAMESPACEOID,
    2009             :                                   rel_sync_cache_publication_cb,
    2010             :                                   (Datum) 0);
    2011             : 
    2012         788 :     relation_callbacks_registered = true;
    2013             : }
    2014             : 
    2015             : /*
    2016             :  * We expect relatively small number of streamed transactions.
    2017             :  */
    2018             : static bool
    2019      351842 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    2020             : {
    2021      351842 :     return list_member_xid(entry->streamed_txns, xid);
    2022             : }
    2023             : 
    2024             : /*
    2025             :  * Add the xid in the rel sync entry for which we have already sent the schema
    2026             :  * of the relation.
    2027             :  */
    2028             : static void
    2029         134 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    2030             : {
    2031             :     MemoryContext oldctx;
    2032             : 
    2033         134 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    2034             : 
    2035         134 :     entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
    2036             : 
    2037         134 :     MemoryContextSwitchTo(oldctx);
    2038         134 : }
    2039             : 
    2040             : /*
    2041             :  * Find or create entry in the relation schema cache.
    2042             :  *
    2043             :  * This looks up publications that the given relation is directly or
    2044             :  * indirectly part of (the latter if it's really the relation's ancestor that
    2045             :  * is part of a publication) and fills up the found entry with the information
    2046             :  * about which operations to publish and whether to use an ancestor's schema
    2047             :  * when publishing.
    2048             :  */
    2049             : static RelationSyncEntry *
    2050      366828 : get_rel_sync_entry(PGOutputData *data, Relation relation)
    2051             : {
    2052             :     RelationSyncEntry *entry;
    2053             :     bool        found;
    2054             :     MemoryContext oldctx;
    2055      366828 :     Oid         relid = RelationGetRelid(relation);
    2056             : 
    2057             :     Assert(RelationSyncCache != NULL);
    2058             : 
    2059             :     /* Find cached relation info, creating if not found */
    2060      366828 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
    2061             :                                               &relid,
    2062             :                                               HASH_ENTER, &found);
    2063             :     Assert(entry != NULL);
    2064             : 
    2065             :     /* initialize entry, if it's new */
    2066      366828 :     if (!found)
    2067             :     {
    2068         562 :         entry->replicate_valid = false;
    2069         562 :         entry->schema_sent = false;
    2070         562 :         entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
    2071         562 :         entry->streamed_txns = NIL;
    2072         562 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
    2073         562 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
    2074         562 :         entry->new_slot = NULL;
    2075         562 :         entry->old_slot = NULL;
    2076         562 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2077         562 :         entry->entry_cxt = NULL;
    2078         562 :         entry->publish_as_relid = InvalidOid;
    2079         562 :         entry->columns = NULL;
    2080         562 :         entry->attrmap = NULL;
    2081             :     }
    2082             : 
    2083             :     /* Validate the entry */
    2084      366828 :     if (!entry->replicate_valid)
    2085             :     {
    2086         708 :         Oid         schemaId = get_rel_namespace(relid);
    2087         708 :         List       *pubids = GetRelationPublications(relid);
    2088             : 
    2089             :         /*
    2090             :          * We don't acquire a lock on the namespace system table as we build
    2091             :          * the cache entry using a historic snapshot and all the later changes
    2092             :          * are absorbed while decoding WAL.
    2093             :          */
    2094         708 :         List       *schemaPubids = GetSchemaPublications(schemaId);
    2095             :         ListCell   *lc;
    2096         708 :         Oid         publish_as_relid = relid;
    2097         708 :         int         publish_ancestor_level = 0;
    2098         708 :         bool        am_partition = get_rel_relispartition(relid);
    2099         708 :         char        relkind = get_rel_relkind(relid);
    2100         708 :         List       *rel_publications = NIL;
    2101             : 
    2102             :         /* Reload publications if needed before use. */
    2103         708 :         if (!publications_valid)
    2104             :         {
    2105         376 :             MemoryContextReset(data->pubctx);
    2106             : 
    2107         376 :             oldctx = MemoryContextSwitchTo(data->pubctx);
    2108         376 :             data->publications = LoadPublications(data->publication_names);
    2109         376 :             MemoryContextSwitchTo(oldctx);
    2110         376 :             publications_valid = true;
    2111             :         }
    2112             : 
    2113             :         /*
    2114             :          * Reset schema_sent status as the relation definition may have
    2115             :          * changed.  Also reset pubactions to empty in case rel was dropped
    2116             :          * from a publication.  Also free any objects that depended on the
    2117             :          * earlier definition.
    2118             :          */
    2119         708 :         entry->schema_sent = false;
    2120         708 :         entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
    2121         708 :         list_free(entry->streamed_txns);
    2122         708 :         entry->streamed_txns = NIL;
    2123         708 :         bms_free(entry->columns);
    2124         708 :         entry->columns = NULL;
    2125         708 :         entry->pubactions.pubinsert = false;
    2126         708 :         entry->pubactions.pubupdate = false;
    2127         708 :         entry->pubactions.pubdelete = false;
    2128         708 :         entry->pubactions.pubtruncate = false;
    2129             : 
    2130             :         /*
    2131             :          * Tuple slots cleanups. (Will be rebuilt later if needed).
    2132             :          */
    2133         708 :         if (entry->old_slot)
    2134             :         {
    2135         118 :             TupleDesc   desc = entry->old_slot->tts_tupleDescriptor;
    2136             : 
    2137             :             Assert(desc->tdrefcount == -1);
    2138             : 
    2139         118 :             ExecDropSingleTupleTableSlot(entry->old_slot);
    2140             : 
    2141             :             /*
    2142             :              * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
    2143             :              * do it now to avoid any leaks.
    2144             :              */
    2145         118 :             FreeTupleDesc(desc);
    2146             :         }
    2147         708 :         if (entry->new_slot)
    2148             :         {
    2149         118 :             TupleDesc   desc = entry->new_slot->tts_tupleDescriptor;
    2150             : 
    2151             :             Assert(desc->tdrefcount == -1);
    2152             : 
    2153         118 :             ExecDropSingleTupleTableSlot(entry->new_slot);
    2154             : 
    2155             :             /*
    2156             :              * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
    2157             :              * do it now to avoid any leaks.
    2158             :              */
    2159         118 :             FreeTupleDesc(desc);
    2160             :         }
    2161             : 
    2162         708 :         entry->old_slot = NULL;
    2163         708 :         entry->new_slot = NULL;
    2164             : 
    2165         708 :         if (entry->attrmap)
    2166           6 :             free_attrmap(entry->attrmap);
    2167         708 :         entry->attrmap = NULL;
    2168             : 
    2169             :         /*
    2170             :          * Row filter cache cleanups.
    2171             :          */
    2172         708 :         if (entry->entry_cxt)
    2173         118 :             MemoryContextDelete(entry->entry_cxt);
    2174             : 
    2175         708 :         entry->entry_cxt = NULL;
    2176         708 :         entry->estate = NULL;
    2177         708 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2178             : 
    2179             :         /*
    2180             :          * Build publication cache. We can't use one provided by relcache as
    2181             :          * relcache considers all publications that the given relation is in,
    2182             :          * but here we only need to consider ones that the subscriber
    2183             :          * requested.
    2184             :          */
    2185        1746 :         foreach(lc, data->publications)
    2186             :         {
    2187        1038 :             Publication *pub = lfirst(lc);
    2188        1038 :             bool        publish = false;
    2189             : 
    2190             :             /*
    2191             :              * Under what relid should we publish changes in this publication?
    2192             :              * We'll use the top-most relid across all publications. Also
    2193             :              * track the ancestor level for this publication.
    2194             :              */
    2195        1038 :             Oid         pub_relid = relid;
    2196        1038 :             int         ancestor_level = 0;
    2197             : 
    2198             :             /*
    2199             :              * If this is a FOR ALL TABLES publication, pick the partition
    2200             :              * root and set the ancestor level accordingly.
    2201             :              */
    2202        1038 :             if (pub->alltables)
    2203             :             {
    2204         158 :                 publish = true;
    2205         158 :                 if (pub->pubviaroot && am_partition)
    2206             :                 {
    2207          22 :                     List       *ancestors = get_partition_ancestors(relid);
    2208             : 
    2209          22 :                     pub_relid = llast_oid(ancestors);
    2210          22 :                     ancestor_level = list_length(ancestors);
    2211             :                 }
    2212             :             }
    2213             : 
    2214        1038 :             if (!publish)
    2215             :             {
    2216         880 :                 bool        ancestor_published = false;
    2217             : 
    2218             :                 /*
    2219             :                  * For a partition, check if any of the ancestors are
    2220             :                  * published.  If so, note down the topmost ancestor that is
    2221             :                  * published via this publication, which will be used as the
    2222             :                  * relation via which to publish the partition's changes.
    2223             :                  */
    2224         880 :                 if (am_partition)
    2225             :                 {
    2226             :                     Oid         ancestor;
    2227             :                     int         level;
    2228         242 :                     List       *ancestors = get_partition_ancestors(relid);
    2229             : 
    2230         242 :                     ancestor = GetTopMostAncestorInPublication(pub->oid,
    2231             :                                                                ancestors,
    2232             :                                                                &level);
    2233             : 
    2234         242 :                     if (ancestor != InvalidOid)
    2235             :                     {
    2236          96 :                         ancestor_published = true;
    2237          96 :                         if (pub->pubviaroot)
    2238             :                         {
    2239          50 :                             pub_relid = ancestor;
    2240          50 :                             ancestor_level = level;
    2241             :                         }
    2242             :                     }
    2243             :                 }
    2244             : 
    2245        1364 :                 if (list_member_oid(pubids, pub->oid) ||
    2246         956 :                     list_member_oid(schemaPubids, pub->oid) ||
    2247             :                     ancestor_published)
    2248         464 :                     publish = true;
    2249             :             }
    2250             : 
    2251             :             /*
    2252             :              * If the relation is to be published, determine actions to
    2253             :              * publish, and list of columns, if appropriate.
    2254             :              *
    2255             :              * Don't publish changes for partitioned tables, because
    2256             :              * publishing those of its partitions suffices, unless partition
    2257             :              * changes won't be published due to pubviaroot being set.
    2258             :              */
    2259        1038 :             if (publish &&
    2260           8 :                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
    2261             :             {
    2262         616 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
    2263         616 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
    2264         616 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
    2265         616 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
    2266             : 
    2267             :                 /*
    2268             :                  * We want to publish the changes as the top-most ancestor
    2269             :                  * across all publications. So we need to check if the already
    2270             :                  * calculated level is higher than the new one. If yes, we can
    2271             :                  * ignore the new value (as it's a child). Otherwise the new
    2272             :                  * value is an ancestor, so we keep it.
    2273             :                  */
    2274         616 :                 if (publish_ancestor_level > ancestor_level)
    2275           2 :                     continue;
    2276             : 
    2277             :                 /*
    2278             :                  * If we found an ancestor higher up in the tree, discard the
    2279             :                  * list of publications through which we replicate it, and use
    2280             :                  * the new ancestor.
    2281             :                  */
    2282         614 :                 if (publish_ancestor_level < ancestor_level)
    2283             :                 {
    2284          72 :                     publish_as_relid = pub_relid;
    2285          72 :                     publish_ancestor_level = ancestor_level;
    2286             : 
    2287             :                     /* reset the publication list for this relation */
    2288          72 :                     rel_publications = NIL;
    2289             :                 }
    2290             :                 else
    2291             :                 {
    2292             :                     /* Same ancestor level, has to be the same OID. */
    2293             :                     Assert(publish_as_relid == pub_relid);
    2294             :                 }
    2295             : 
    2296             :                 /* Track publications for this ancestor. */
    2297         614 :                 rel_publications = lappend(rel_publications, pub);
    2298             :             }
    2299             :         }
    2300             : 
    2301         708 :         entry->publish_as_relid = publish_as_relid;
    2302             : 
    2303             :         /*
    2304             :          * Initialize the tuple slot, map, and row filter. These are only used
    2305             :          * when publishing inserts, updates, or deletes.
    2306             :          */
    2307         708 :         if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
    2308         120 :             entry->pubactions.pubdelete)
    2309             :         {
    2310             :             /* Initialize the tuple slot and map */
    2311         588 :             init_tuple_slot(data, relation, entry);
    2312             : 
    2313             :             /* Initialize the row filter */
    2314         588 :             pgoutput_row_filter_init(data, rel_publications, entry);
    2315             : 
    2316             :             /* Check whether to publish generated columns. */
    2317         588 :             check_and_init_gencol(data, rel_publications, entry);
    2318             : 
    2319             :             /* Initialize the column list */
    2320         588 :             pgoutput_column_list_init(data, rel_publications, entry);
    2321             :         }
    2322             : 
    2323         706 :         list_free(pubids);
    2324         706 :         list_free(schemaPubids);
    2325         706 :         list_free(rel_publications);
    2326             : 
    2327         706 :         entry->replicate_valid = true;
    2328             :     }
    2329             : 
    2330      366826 :     return entry;
    2331             : }
    2332             : 
    2333             : /*
    2334             :  * Cleanup list of streamed transactions and update the schema_sent flag.
    2335             :  *
    2336             :  * When a streamed transaction commits or aborts, we need to remove the
    2337             :  * toplevel XID from the schema cache. If the transaction aborted, the
    2338             :  * subscriber will simply throw away the schema records we streamed, so
    2339             :  * we don't need to do anything else.
    2340             :  *
    2341             :  * If the transaction is committed, the subscriber will update the relation
    2342             :  * cache - so tweak the schema_sent flag accordingly.
    2343             :  */
    2344             : static void
    2345         142 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
    2346             : {
    2347             :     HASH_SEQ_STATUS hash_seq;
    2348             :     RelationSyncEntry *entry;
    2349             : 
    2350             :     Assert(RelationSyncCache != NULL);
    2351             : 
    2352         142 :     hash_seq_init(&hash_seq, RelationSyncCache);
    2353         290 :     while ((entry = hash_seq_search(&hash_seq)) != NULL)
    2354             :     {
    2355             :         /*
    2356             :          * We can set the schema_sent flag for an entry that has committed xid
    2357             :          * in the list as that ensures that the subscriber would have the
    2358             :          * corresponding schema and we don't need to send it unless there is
    2359             :          * any invalidation for that relation.
    2360             :          */
    2361         334 :         foreach_xid(streamed_txn, entry->streamed_txns)
    2362             :         {
    2363         144 :             if (xid == streamed_txn)
    2364             :             {
    2365         106 :                 if (is_commit)
    2366          84 :                     entry->schema_sent = true;
    2367             : 
    2368         106 :                 entry->streamed_txns =
    2369         106 :                     foreach_delete_current(entry->streamed_txns, streamed_txn);
    2370         106 :                 break;
    2371             :             }
    2372             :         }
    2373             :     }
    2374         142 : }
    2375             : 
    2376             : /*
    2377             :  * Relcache invalidation callback
    2378             :  */
    2379             : static void
    2380        7498 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
    2381             : {
    2382             :     RelationSyncEntry *entry;
    2383             : 
    2384             :     /*
    2385             :      * We can get here if the plugin was used in SQL interface as the
    2386             :      * RelationSyncCache is destroyed when the decoding finishes, but there is
    2387             :      * no way to unregister the relcache invalidation callback.
    2388             :      */
    2389        7498 :     if (RelationSyncCache == NULL)
    2390          52 :         return;
    2391             : 
    2392             :     /*
    2393             :      * Nobody keeps pointers to entries in this hash table around outside
    2394             :      * logical decoding callback calls - but invalidation events can come in
    2395             :      * *during* a callback if we do any syscache access in the callback.
    2396             :      * Because of that we must mark the cache entry as invalid but not damage
    2397             :      * any of its substructure here.  The next get_rel_sync_entry() call will
    2398             :      * rebuild it all.
    2399             :      */
    2400        7446 :     if (OidIsValid(relid))
    2401             :     {
    2402             :         /*
    2403             :          * Getting invalidations for relations that aren't in the table is
    2404             :          * entirely normal.  So we don't care if it's found or not.
    2405             :          */
    2406        7334 :         entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
    2407             :                                                   HASH_FIND, NULL);
    2408        7334 :         if (entry != NULL)
    2409        1270 :             entry->replicate_valid = false;
    2410             :     }
    2411             :     else
    2412             :     {
    2413             :         /* Whole cache must be flushed. */
    2414             :         HASH_SEQ_STATUS status;
    2415             : 
    2416         112 :         hash_seq_init(&status, RelationSyncCache);
    2417         228 :         while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2418             :         {
    2419         116 :             entry->replicate_valid = false;
    2420             :         }
    2421             :     }
    2422             : }
    2423             : 
    2424             : /*
    2425             :  * Publication relation/schema map syscache invalidation callback
    2426             :  *
    2427             :  * Called for invalidations on pg_namespace.
    2428             :  */
    2429             : static void
    2430          70 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
    2431             : {
    2432             :     HASH_SEQ_STATUS status;
    2433             :     RelationSyncEntry *entry;
    2434             : 
    2435             :     /*
    2436             :      * We can get here if the plugin was used in SQL interface as the
    2437             :      * RelationSyncCache is destroyed when the decoding finishes, but there is
    2438             :      * no way to unregister the invalidation callbacks.
    2439             :      */
    2440          70 :     if (RelationSyncCache == NULL)
    2441          20 :         return;
    2442             : 
    2443             :     /*
    2444             :      * We have no easy way to identify which cache entries this invalidation
    2445             :      * event might have affected, so just mark them all invalid.
    2446             :      */
    2447          50 :     hash_seq_init(&status, RelationSyncCache);
    2448          92 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2449             :     {
    2450          42 :         entry->replicate_valid = false;
    2451             :     }
    2452             : }
    2453             : 
    2454             : /* Send Replication origin */
    2455             : static void
    2456        2158 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
    2457             :                  XLogRecPtr origin_lsn, bool send_origin)
    2458             : {
    2459        2158 :     if (send_origin)
    2460             :     {
    2461             :         char       *origin;
    2462             : 
    2463             :         /*----------
    2464             :          * XXX: which behaviour do we want here?
    2465             :          *
    2466             :          * Alternatives:
    2467             :          *  - don't send origin message if origin name not found
    2468             :          *    (that's what we do now)
    2469             :          *  - throw error - that will break replication, not good
    2470             :          *  - send some special "unknown" origin
    2471             :          *----------
    2472             :          */
    2473          16 :         if (replorigin_by_oid(origin_id, true, &origin))
    2474             :         {
    2475             :             /* Message boundary */
    2476          16 :             OutputPluginWrite(ctx, false);
    2477          16 :             OutputPluginPrepareWrite(ctx, true);
    2478             : 
    2479          16 :             logicalrep_write_origin(ctx->out, origin, origin_lsn);
    2480             :         }
    2481             :     }
    2482        2158 : }

Generated by: LCOV version 1.16