LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 739 768 96.2 %
Date: 2026-01-09 13:17:31 Functions: 42 42 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pgoutput.c
       4             :  *      Logical Replication output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/backend/replication/pgoutput/pgoutput.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/tupconvert.h"
      16             : #include "catalog/partition.h"
      17             : #include "catalog/pg_publication.h"
      18             : #include "catalog/pg_publication_rel.h"
      19             : #include "catalog/pg_subscription.h"
      20             : #include "commands/defrem.h"
      21             : #include "commands/subscriptioncmds.h"
      22             : #include "executor/executor.h"
      23             : #include "fmgr.h"
      24             : #include "nodes/makefuncs.h"
      25             : #include "parser/parse_relation.h"
      26             : #include "replication/logical.h"
      27             : #include "replication/logicalproto.h"
      28             : #include "replication/origin.h"
      29             : #include "replication/pgoutput.h"
      30             : #include "rewrite/rewriteHandler.h"
      31             : #include "utils/builtins.h"
      32             : #include "utils/inval.h"
      33             : #include "utils/lsyscache.h"
      34             : #include "utils/memutils.h"
      35             : #include "utils/rel.h"
      36             : #include "utils/syscache.h"
      37             : #include "utils/varlena.h"
      38             : 
      39        1098 : PG_MODULE_MAGIC_EXT(
      40             :                     .name = "pgoutput",
      41             :                     .version = PG_VERSION
      42             : );
      43             : 
      44             : static void pgoutput_startup(LogicalDecodingContext *ctx,
      45             :                              OutputPluginOptions *opt, bool is_init);
      46             : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      47             : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      48             :                                ReorderBufferTXN *txn);
      49             : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      50             :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      51             : static void pgoutput_change(LogicalDecodingContext *ctx,
      52             :                             ReorderBufferTXN *txn, Relation relation,
      53             :                             ReorderBufferChange *change);
      54             : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      55             :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      56             :                               ReorderBufferChange *change);
      57             : static void pgoutput_message(LogicalDecodingContext *ctx,
      58             :                              ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      59             :                              bool transactional, const char *prefix,
      60             :                              Size sz, const char *message);
      61             : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      62             :                                    RepOriginId origin_id);
      63             : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
      64             :                                        ReorderBufferTXN *txn);
      65             : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
      66             :                                  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      67             : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
      68             :                                          ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      69             : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
      70             :                                            ReorderBufferTXN *txn,
      71             :                                            XLogRecPtr prepare_end_lsn,
      72             :                                            TimestampTz prepare_time);
      73             : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
      74             :                                   ReorderBufferTXN *txn);
      75             : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
      76             :                                  ReorderBufferTXN *txn);
      77             : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
      78             :                                   ReorderBufferTXN *txn,
      79             :                                   XLogRecPtr abort_lsn);
      80             : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
      81             :                                    ReorderBufferTXN *txn,
      82             :                                    XLogRecPtr commit_lsn);
      83             : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
      84             :                                         ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      85             : 
      86             : static bool publications_valid;
      87             : 
      88             : static List *LoadPublications(List *pubnames);
      89             : static void publication_invalidation_cb(Datum arg, int cacheid,
      90             :                                         uint32 hashvalue);
      91             : static void send_repl_origin(LogicalDecodingContext *ctx,
      92             :                              RepOriginId origin_id, XLogRecPtr origin_lsn,
      93             :                              bool send_origin);
      94             : 
      95             : /*
      96             :  * Only 3 publication actions are used for row filtering ("insert", "update",
      97             :  * "delete"). See RelationSyncEntry.exprstate[].
      98             :  */
      99             : enum RowFilterPubAction
     100             : {
     101             :     PUBACTION_INSERT,
     102             :     PUBACTION_UPDATE,
     103             :     PUBACTION_DELETE,
     104             : };
     105             : 
     106             : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
     107             : 
     108             : /*
     109             :  * Entry in the map used to remember which relation schemas we sent.
     110             :  *
     111             :  * The schema_sent flag determines if the current schema record for the
     112             :  * relation (and for its ancestor if publish_as_relid is set) was already
     113             :  * sent to the subscriber (in which case we don't need to send it again).
     114             :  *
     115             :  * The schema cache on downstream is however updated only at commit time,
     116             :  * and with streamed transactions the commit order may be different from
     117             :  * the order the transactions are sent in. Also, the (sub) transactions
     118             :  * might get aborted so we need to send the schema for each (sub) transaction
     119             :  * so that we don't lose the schema information on abort. For handling this,
     120             :  * we maintain the list of xids (streamed_txns) for those we have already sent
     121             :  * the schema.
     122             :  *
     123             :  * For partitions, 'pubactions' considers not only the table's own
     124             :  * publications, but also those of all of its ancestors.
     125             :  */
     126             : typedef struct RelationSyncEntry
     127             : {
     128             :     Oid         relid;          /* relation oid */
     129             : 
     130             :     bool        replicate_valid;    /* overall validity flag for entry */
     131             : 
     132             :     bool        schema_sent;
     133             : 
     134             :     /*
     135             :      * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
     136             :      * columns and the 'publish_generated_columns' parameter is set to
     137             :      * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
     138             :      * indicating that no generated columns should be published, unless
     139             :      * explicitly specified in the column list.
     140             :      */
     141             :     PublishGencolsType include_gencols_type;
     142             :     List       *streamed_txns;  /* streamed toplevel transactions with this
     143             :                                  * schema */
     144             : 
     145             :     /* are we publishing this rel? */
     146             :     PublicationActions pubactions;
     147             : 
     148             :     /*
     149             :      * ExprState array for row filter. Different publication actions don't
     150             :      * allow multiple expressions to always be combined into one, because
     151             :      * updates or deletes restrict the column in expression to be part of the
     152             :      * replica identity index whereas inserts do not have this restriction, so
     153             :      * there is one ExprState per publication action.
     154             :      */
     155             :     ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
     156             :     EState     *estate;         /* executor state used for row filter */
     157             :     TupleTableSlot *new_slot;   /* slot for storing new tuple */
     158             :     TupleTableSlot *old_slot;   /* slot for storing old tuple */
     159             : 
     160             :     /*
     161             :      * OID of the relation to publish changes as.  For a partition, this may
     162             :      * be set to one of its ancestors whose schema will be used when
     163             :      * replicating changes, if publish_via_partition_root is set for the
     164             :      * publication.
     165             :      */
     166             :     Oid         publish_as_relid;
     167             : 
     168             :     /*
     169             :      * Map used when replicating using an ancestor's schema to convert tuples
     170             :      * from partition's type to the ancestor's; NULL if publish_as_relid is
     171             :      * same as 'relid' or if unnecessary due to partition and the ancestor
     172             :      * having identical TupleDesc.
     173             :      */
     174             :     AttrMap    *attrmap;
     175             : 
     176             :     /*
     177             :      * Columns included in the publication, or NULL if all columns are
     178             :      * included implicitly.  Note that the attnums in this bitmap are not
     179             :      * shifted by FirstLowInvalidHeapAttributeNumber.
     180             :      */
     181             :     Bitmapset  *columns;
     182             : 
     183             :     /*
     184             :      * Private context to store additional data for this entry - state for the
     185             :      * row filter expressions, column list, etc.
     186             :      */
     187             :     MemoryContext entry_cxt;
     188             : } RelationSyncEntry;
     189             : 
     190             : /*
     191             :  * Maintain a per-transaction level variable to track whether the transaction
     192             :  * has sent BEGIN. BEGIN is only sent when the first change in a transaction
     193             :  * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
     194             :  * messages for empty transactions which saves network bandwidth.
     195             :  *
     196             :  * This optimization is not used for prepared transactions because if the
     197             :  * WALSender restarts after prepare of a transaction and before commit prepared
     198             :  * of the same transaction then we won't be able to figure out if we have
     199             :  * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
     200             :  * because we would have lost the in-memory txndata information that was
     201             :  * present prior to the restart. This will result in sending a spurious
     202             :  * COMMIT PREPARED without a corresponding prepared transaction at the
     203             :  * downstream which would lead to an error when it tries to process it.
     204             :  *
     205             :  * XXX We could achieve this optimization by changing protocol to send
     206             :  * additional information so that downstream can detect that the corresponding
     207             :  * prepare has not been sent. However, adding such a check for every
     208             :  * transaction in the downstream could be costly so we might want to do it
     209             :  * optionally.
     210             :  *
     211             :  * We also don't have this optimization for streamed transactions because
     212             :  * they can contain prepared transactions.
     213             :  */
     214             : typedef struct PGOutputTxnData
     215             : {
     216             :     bool        sent_begin_txn; /* flag indicating whether BEGIN has been sent */
     217             : } PGOutputTxnData;
     218             : 
     219             : /* Map used to remember which relation schemas we sent. */
     220             : static HTAB *RelationSyncCache = NULL;
     221             : 
     222             : static void init_rel_sync_cache(MemoryContext cachectx);
     223             : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
     224             : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
     225             :                                              Relation relation);
     226             : static void send_relation_and_attrs(Relation relation, TransactionId xid,
     227             :                                     LogicalDecodingContext *ctx,
     228             :                                     RelationSyncEntry *relentry);
     229             : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
     230             : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
     231             :                                           uint32 hashvalue);
     232             : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     233             :                                             TransactionId xid);
     234             : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     235             :                                             TransactionId xid);
     236             : static void init_tuple_slot(PGOutputData *data, Relation relation,
     237             :                             RelationSyncEntry *entry);
     238             : static void pgoutput_memory_context_reset(void *arg);
     239             : 
     240             : /* row filter routines */
     241             : static EState *create_estate_for_relation(Relation rel);
     242             : static void pgoutput_row_filter_init(PGOutputData *data,
     243             :                                      List *publications,
     244             :                                      RelationSyncEntry *entry);
     245             : static bool pgoutput_row_filter_exec_expr(ExprState *state,
     246             :                                           ExprContext *econtext);
     247             : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
     248             :                                 TupleTableSlot **new_slot_ptr,
     249             :                                 RelationSyncEntry *entry,
     250             :                                 ReorderBufferChangeType *action);
     251             : 
     252             : /* column list routines */
     253             : static void pgoutput_column_list_init(PGOutputData *data,
     254             :                                       List *publications,
     255             :                                       RelationSyncEntry *entry);
     256             : 
     257             : /*
     258             :  * Specify output plugin callbacks
     259             :  */
     260             : void
     261        1482 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     262             : {
     263        1482 :     cb->startup_cb = pgoutput_startup;
     264        1482 :     cb->begin_cb = pgoutput_begin_txn;
     265        1482 :     cb->change_cb = pgoutput_change;
     266        1482 :     cb->truncate_cb = pgoutput_truncate;
     267        1482 :     cb->message_cb = pgoutput_message;
     268        1482 :     cb->commit_cb = pgoutput_commit_txn;
     269             : 
     270        1482 :     cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
     271        1482 :     cb->prepare_cb = pgoutput_prepare_txn;
     272        1482 :     cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
     273        1482 :     cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
     274        1482 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
     275        1482 :     cb->shutdown_cb = pgoutput_shutdown;
     276             : 
     277             :     /* transaction streaming */
     278        1482 :     cb->stream_start_cb = pgoutput_stream_start;
     279        1482 :     cb->stream_stop_cb = pgoutput_stream_stop;
     280        1482 :     cb->stream_abort_cb = pgoutput_stream_abort;
     281        1482 :     cb->stream_commit_cb = pgoutput_stream_commit;
     282        1482 :     cb->stream_change_cb = pgoutput_change;
     283        1482 :     cb->stream_message_cb = pgoutput_message;
     284        1482 :     cb->stream_truncate_cb = pgoutput_truncate;
     285             :     /* transaction streaming - two-phase commit */
     286        1482 :     cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
     287        1482 : }
     288             : 
     289             : static void
     290         810 : parse_output_parameters(List *options, PGOutputData *data)
     291             : {
     292             :     ListCell   *lc;
     293         810 :     bool        protocol_version_given = false;
     294         810 :     bool        publication_names_given = false;
     295         810 :     bool        binary_option_given = false;
     296         810 :     bool        messages_option_given = false;
     297         810 :     bool        streaming_given = false;
     298         810 :     bool        two_phase_option_given = false;
     299         810 :     bool        origin_option_given = false;
     300             : 
     301             :     /* Initialize optional parameters to defaults */
     302         810 :     data->binary = false;
     303         810 :     data->streaming = LOGICALREP_STREAM_OFF;
     304         810 :     data->messages = false;
     305         810 :     data->two_phase = false;
     306         810 :     data->publish_no_origin = false;
     307             : 
     308        4042 :     foreach(lc, options)
     309             :     {
     310        3232 :         DefElem    *defel = (DefElem *) lfirst(lc);
     311             : 
     312             :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     313             : 
     314             :         /* Check each param, whether or not we recognize it */
     315        3232 :         if (strcmp(defel->defname, "proto_version") == 0)
     316             :         {
     317             :             unsigned long parsed;
     318             :             char       *endptr;
     319             : 
     320         810 :             if (protocol_version_given)
     321           0 :                 ereport(ERROR,
     322             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     323             :                          errmsg("conflicting or redundant options")));
     324         810 :             protocol_version_given = true;
     325             : 
     326         810 :             errno = 0;
     327         810 :             parsed = strtoul(strVal(defel->arg), &endptr, 10);
     328         810 :             if (errno != 0 || *endptr != '\0')
     329           0 :                 ereport(ERROR,
     330             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     331             :                          errmsg("invalid proto_version")));
     332             : 
     333         810 :             if (parsed > PG_UINT32_MAX)
     334           0 :                 ereport(ERROR,
     335             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     336             :                          errmsg("proto_version \"%s\" out of range",
     337             :                                 strVal(defel->arg))));
     338             : 
     339         810 :             data->protocol_version = (uint32) parsed;
     340             :         }
     341        2422 :         else if (strcmp(defel->defname, "publication_names") == 0)
     342             :         {
     343         810 :             if (publication_names_given)
     344           0 :                 ereport(ERROR,
     345             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     346             :                          errmsg("conflicting or redundant options")));
     347         810 :             publication_names_given = true;
     348             : 
     349             :             /*
     350             :              * Pass a copy of the DefElem->arg since SplitIdentifierString
     351             :              * modifies its input.
     352             :              */
     353         810 :             if (!SplitIdentifierString(pstrdup(strVal(defel->arg)), ',',
     354             :                                        &data->publication_names))
     355           0 :                 ereport(ERROR,
     356             :                         (errcode(ERRCODE_INVALID_NAME),
     357             :                          errmsg("invalid publication_names syntax")));
     358             :         }
     359        1612 :         else if (strcmp(defel->defname, "binary") == 0)
     360             :         {
     361          22 :             if (binary_option_given)
     362           0 :                 ereport(ERROR,
     363             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     364             :                          errmsg("conflicting or redundant options")));
     365          22 :             binary_option_given = true;
     366             : 
     367          22 :             data->binary = defGetBoolean(defel);
     368             :         }
     369        1590 :         else if (strcmp(defel->defname, "messages") == 0)
     370             :         {
     371           8 :             if (messages_option_given)
     372           0 :                 ereport(ERROR,
     373             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     374             :                          errmsg("conflicting or redundant options")));
     375           8 :             messages_option_given = true;
     376             : 
     377           8 :             data->messages = defGetBoolean(defel);
     378             :         }
     379        1582 :         else if (strcmp(defel->defname, "streaming") == 0)
     380             :         {
     381         774 :             if (streaming_given)
     382           0 :                 ereport(ERROR,
     383             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     384             :                          errmsg("conflicting or redundant options")));
     385         774 :             streaming_given = true;
     386             : 
     387         774 :             data->streaming = defGetStreamingMode(defel);
     388             :         }
     389         808 :         else if (strcmp(defel->defname, "two_phase") == 0)
     390             :         {
     391          16 :             if (two_phase_option_given)
     392           0 :                 ereport(ERROR,
     393             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     394             :                          errmsg("conflicting or redundant options")));
     395          16 :             two_phase_option_given = true;
     396             : 
     397          16 :             data->two_phase = defGetBoolean(defel);
     398             :         }
     399         792 :         else if (strcmp(defel->defname, "origin") == 0)
     400             :         {
     401             :             char       *origin;
     402             : 
     403         792 :             if (origin_option_given)
     404           0 :                 ereport(ERROR,
     405             :                         errcode(ERRCODE_SYNTAX_ERROR),
     406             :                         errmsg("conflicting or redundant options"));
     407         792 :             origin_option_given = true;
     408             : 
     409         792 :             origin = defGetString(defel);
     410         792 :             if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
     411          54 :                 data->publish_no_origin = true;
     412         738 :             else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
     413         738 :                 data->publish_no_origin = false;
     414             :             else
     415           0 :                 ereport(ERROR,
     416             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     417             :                         errmsg("unrecognized origin value: \"%s\"", origin));
     418             :         }
     419             :         else
     420           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     421             :     }
     422             : 
     423             :     /* Check required options */
     424         810 :     if (!protocol_version_given)
     425           0 :         ereport(ERROR,
     426             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     427             :                 errmsg("option \"%s\" missing", "proto_version"));
     428         810 :     if (!publication_names_given)
     429           0 :         ereport(ERROR,
     430             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     431             :                 errmsg("option \"%s\" missing", "publication_names"));
     432         810 : }
     433             : 
     434             : /*
     435             :  * Memory context reset callback of PGOutputData->context.
     436             :  */
     437             : static void
     438        2128 : pgoutput_memory_context_reset(void *arg)
     439             : {
     440        2128 :     if (RelationSyncCache)
     441             :     {
     442         392 :         hash_destroy(RelationSyncCache);
     443         392 :         RelationSyncCache = NULL;
     444             :     }
     445        2128 : }
     446             : 
     447             : /*
     448             :  * Initialize this plugin
     449             :  */
     450             : static void
     451        1482 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     452             :                  bool is_init)
     453             : {
     454        1482 :     PGOutputData *data = palloc0_object(PGOutputData);
     455             :     static bool publication_callback_registered = false;
     456             :     MemoryContextCallback *mcallback;
     457             : 
     458             :     /* Create our memory context for private allocations. */
     459        1482 :     data->context = AllocSetContextCreate(ctx->context,
     460             :                                           "logical replication output context",
     461             :                                           ALLOCSET_DEFAULT_SIZES);
     462             : 
     463        1482 :     data->cachectx = AllocSetContextCreate(ctx->context,
     464             :                                            "logical replication cache context",
     465             :                                            ALLOCSET_DEFAULT_SIZES);
     466             : 
     467        1482 :     data->pubctx = AllocSetContextCreate(ctx->context,
     468             :                                          "logical replication publication list context",
     469             :                                          ALLOCSET_SMALL_SIZES);
     470             : 
     471             :     /*
     472             :      * Ensure to cleanup RelationSyncCache even when logical decoding invoked
     473             :      * via SQL interface ends up with an error.
     474             :      */
     475        1482 :     mcallback = palloc0_object(MemoryContextCallback);
     476        1482 :     mcallback->func = pgoutput_memory_context_reset;
     477        1482 :     MemoryContextRegisterResetCallback(ctx->context, mcallback);
     478             : 
     479        1482 :     ctx->output_plugin_private = data;
     480             : 
     481             :     /* This plugin uses binary protocol. */
     482        1482 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     483             : 
     484             :     /*
     485             :      * This is replication start and not slot initialization.
     486             :      *
     487             :      * Parse and validate options passed by the client.
     488             :      */
     489        1482 :     if (!is_init)
     490             :     {
     491             :         /* Parse the params and ERROR if we see any we don't recognize */
     492         810 :         parse_output_parameters(ctx->output_plugin_options, data);
     493             : 
     494             :         /* Check if we support requested protocol */
     495         810 :         if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
     496           0 :             ereport(ERROR,
     497             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     498             :                      errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
     499             :                             data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
     500             : 
     501         810 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     502           0 :             ereport(ERROR,
     503             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     504             :                      errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
     505             :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     506             : 
     507             :         /*
     508             :          * Decide whether to enable streaming. It is disabled by default, in
     509             :          * which case we just update the flag in decoding context. Otherwise
     510             :          * we only allow it with sufficient version of the protocol, and when
     511             :          * the output plugin supports it.
     512             :          */
     513         810 :         if (data->streaming == LOGICALREP_STREAM_OFF)
     514          36 :             ctx->streaming = false;
     515         774 :         else if (data->streaming == LOGICALREP_STREAM_ON &&
     516          54 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
     517           0 :             ereport(ERROR,
     518             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     519             :                      errmsg("requested proto_version=%d does not support streaming, need %d or higher",
     520             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
     521         774 :         else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
     522         720 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
     523           0 :             ereport(ERROR,
     524             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     525             :                      errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
     526             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
     527         774 :         else if (!ctx->streaming)
     528           0 :             ereport(ERROR,
     529             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     530             :                      errmsg("streaming requested, but not supported by output plugin")));
     531             : 
     532             :         /*
     533             :          * Here, we just check whether the two-phase option is passed by
     534             :          * plugin and decide whether to enable it at later point of time. It
     535             :          * remains enabled if the previous start-up has done so. But we only
     536             :          * allow the option to be passed in with sufficient version of the
     537             :          * protocol, and when the output plugin supports it.
     538             :          */
     539         810 :         if (!data->two_phase)
     540         794 :             ctx->twophase_opt_given = false;
     541          16 :         else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
     542           0 :             ereport(ERROR,
     543             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     544             :                      errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
     545             :                             data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
     546          16 :         else if (!ctx->twophase)
     547           0 :             ereport(ERROR,
     548             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     549             :                      errmsg("two-phase commit requested, but not supported by output plugin")));
     550             :         else
     551          16 :             ctx->twophase_opt_given = true;
     552             : 
     553             :         /* Init publication state. */
     554         810 :         data->publications = NIL;
     555         810 :         publications_valid = false;
     556             : 
     557             :         /*
     558             :          * Register callback for pg_publication if we didn't already do that
     559             :          * during some previous call in this process.
     560             :          */
     561         810 :         if (!publication_callback_registered)
     562             :         {
     563         806 :             CacheRegisterSyscacheCallback(PUBLICATIONOID,
     564             :                                           publication_invalidation_cb,
     565             :                                           (Datum) 0);
     566         806 :             CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
     567             :                                          (Datum) 0);
     568         806 :             publication_callback_registered = true;
     569             :         }
     570             : 
     571             :         /* Initialize relation schema cache. */
     572         810 :         init_rel_sync_cache(CacheMemoryContext);
     573             :     }
     574             :     else
     575             :     {
     576             :         /*
     577             :          * Disable the streaming and prepared transactions during the slot
     578             :          * initialization mode.
     579             :          */
     580         672 :         ctx->streaming = false;
     581         672 :         ctx->twophase = false;
     582             :     }
     583        1482 : }
     584             : 
     585             : /*
     586             :  * BEGIN callback.
     587             :  *
     588             :  * Don't send the BEGIN message here instead postpone it until the first
     589             :  * change. In logical replication, a common scenario is to replicate a set of
     590             :  * tables (instead of all tables) and transactions whose changes were on
     591             :  * the table(s) that are not published will produce empty transactions. These
     592             :  * empty transactions will send BEGIN and COMMIT messages to subscribers,
     593             :  * using bandwidth on something with little/no use for logical replication.
     594             :  */
     595             : static void
     596        1850 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     597             : {
     598        1850 :     PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
     599             :                                                       sizeof(PGOutputTxnData));
     600             : 
     601        1850 :     txn->output_plugin_private = txndata;
     602        1850 : }
     603             : 
     604             : /*
     605             :  * Send BEGIN.
     606             :  *
     607             :  * This is called while processing the first change of the transaction.
     608             :  */
     609             : static void
     610         898 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     611             : {
     612         898 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     613         898 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     614             : 
     615             :     Assert(txndata);
     616             :     Assert(!txndata->sent_begin_txn);
     617             : 
     618         898 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     619         898 :     logicalrep_write_begin(ctx->out, txn);
     620         898 :     txndata->sent_begin_txn = true;
     621             : 
     622         898 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     623             :                      send_replication_origin);
     624             : 
     625         898 :     OutputPluginWrite(ctx, true);
     626         898 : }
     627             : 
     628             : /*
     629             :  * COMMIT callback
     630             :  */
     631             : static void
     632        1846 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     633             :                     XLogRecPtr commit_lsn)
     634             : {
     635        1846 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     636             :     bool        sent_begin_txn;
     637             : 
     638             :     Assert(txndata);
     639             : 
     640             :     /*
     641             :      * We don't need to send the commit message unless some relevant change
     642             :      * from this transaction has been sent to the downstream.
     643             :      */
     644        1846 :     sent_begin_txn = txndata->sent_begin_txn;
     645        1846 :     OutputPluginUpdateProgress(ctx, !sent_begin_txn);
     646        1846 :     pfree(txndata);
     647        1846 :     txn->output_plugin_private = NULL;
     648             : 
     649        1846 :     if (!sent_begin_txn)
     650             :     {
     651         950 :         elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
     652         950 :         return;
     653             :     }
     654             : 
     655         896 :     OutputPluginPrepareWrite(ctx, true);
     656         896 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     657         896 :     OutputPluginWrite(ctx, true);
     658             : }
     659             : 
     660             : /*
     661             :  * BEGIN PREPARE callback
     662             :  */
     663             : static void
     664          42 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     665             : {
     666          42 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     667             : 
     668          42 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     669          42 :     logicalrep_write_begin_prepare(ctx->out, txn);
     670             : 
     671          42 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     672             :                      send_replication_origin);
     673             : 
     674          42 :     OutputPluginWrite(ctx, true);
     675          42 : }
     676             : 
     677             : /*
     678             :  * PREPARE callback
     679             :  */
     680             : static void
     681          42 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     682             :                      XLogRecPtr prepare_lsn)
     683             : {
     684          42 :     OutputPluginUpdateProgress(ctx, false);
     685             : 
     686          42 :     OutputPluginPrepareWrite(ctx, true);
     687          42 :     logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
     688          42 :     OutputPluginWrite(ctx, true);
     689          42 : }
     690             : 
     691             : /*
     692             :  * COMMIT PREPARED callback
     693             :  */
     694             : static void
     695          54 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     696             :                              XLogRecPtr commit_lsn)
     697             : {
     698          54 :     OutputPluginUpdateProgress(ctx, false);
     699             : 
     700          54 :     OutputPluginPrepareWrite(ctx, true);
     701          54 :     logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
     702          54 :     OutputPluginWrite(ctx, true);
     703          54 : }
     704             : 
     705             : /*
     706             :  * ROLLBACK PREPARED callback
     707             :  */
     708             : static void
     709          18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
     710             :                                ReorderBufferTXN *txn,
     711             :                                XLogRecPtr prepare_end_lsn,
     712             :                                TimestampTz prepare_time)
     713             : {
     714          18 :     OutputPluginUpdateProgress(ctx, false);
     715             : 
     716          18 :     OutputPluginPrepareWrite(ctx, true);
     717          18 :     logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
     718             :                                        prepare_time);
     719          18 :     OutputPluginWrite(ctx, true);
     720          18 : }
     721             : 
     722             : /*
     723             :  * Write the current schema of the relation and its ancestor (if any) if not
     724             :  * done yet.
     725             :  */
     726             : static void
     727      364580 : maybe_send_schema(LogicalDecodingContext *ctx,
     728             :                   ReorderBufferChange *change,
     729             :                   Relation relation, RelationSyncEntry *relentry)
     730             : {
     731      364580 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     732             :     bool        schema_sent;
     733      364580 :     TransactionId xid = InvalidTransactionId;
     734      364580 :     TransactionId topxid = InvalidTransactionId;
     735             : 
     736             :     /*
     737             :      * Remember XID of the (sub)transaction for the change. We don't care if
     738             :      * it's top-level transaction or not (we have already sent that XID in
     739             :      * start of the current streaming block).
     740             :      *
     741             :      * If we're not in a streaming block, just use InvalidTransactionId and
     742             :      * the write methods will not include it.
     743             :      */
     744      364580 :     if (data->in_streaming)
     745      351886 :         xid = change->txn->xid;
     746             : 
     747      364580 :     if (rbtxn_is_subtxn(change->txn))
     748       20338 :         topxid = rbtxn_get_toptxn(change->txn)->xid;
     749             :     else
     750      344242 :         topxid = xid;
     751             : 
     752             :     /*
     753             :      * Do we need to send the schema? We do track streamed transactions
     754             :      * separately, because those may be applied later (and the regular
     755             :      * transactions won't see their effects until then) and in an order that
     756             :      * we don't know at this point.
     757             :      *
     758             :      * XXX There is a scope of optimization here. Currently, we always send
     759             :      * the schema first time in a streaming transaction but we can probably
     760             :      * avoid that by checking 'relentry->schema_sent' flag. However, before
     761             :      * doing that we need to study its impact on the case where we have a mix
     762             :      * of streaming and non-streaming transactions.
     763             :      */
     764      364580 :     if (data->in_streaming)
     765      351886 :         schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
     766             :     else
     767       12694 :         schema_sent = relentry->schema_sent;
     768             : 
     769             :     /* Nothing to do if we already sent the schema. */
     770      364580 :     if (schema_sent)
     771      363892 :         return;
     772             : 
     773             :     /*
     774             :      * Send the schema.  If the changes will be published using an ancestor's
     775             :      * schema, not the relation's own, send that ancestor's schema before
     776             :      * sending relation's own (XXX - maybe sending only the former suffices?).
     777             :      */
     778         688 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
     779             :     {
     780          66 :         Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     781             : 
     782          66 :         send_relation_and_attrs(ancestor, xid, ctx, relentry);
     783          66 :         RelationClose(ancestor);
     784             :     }
     785             : 
     786         688 :     send_relation_and_attrs(relation, xid, ctx, relentry);
     787             : 
     788         688 :     if (data->in_streaming)
     789         148 :         set_schema_sent_in_streamed_txn(relentry, topxid);
     790             :     else
     791         540 :         relentry->schema_sent = true;
     792             : }
     793             : 
     794             : /*
     795             :  * Sends a relation
     796             :  */
     797             : static void
     798         754 : send_relation_and_attrs(Relation relation, TransactionId xid,
     799             :                         LogicalDecodingContext *ctx,
     800             :                         RelationSyncEntry *relentry)
     801             : {
     802         754 :     TupleDesc   desc = RelationGetDescr(relation);
     803         754 :     Bitmapset  *columns = relentry->columns;
     804         754 :     PublishGencolsType include_gencols_type = relentry->include_gencols_type;
     805             :     int         i;
     806             : 
     807             :     /*
     808             :      * Write out type info if needed.  We do that only for user-created types.
     809             :      * We use FirstGenbkiObjectId as the cutoff, so that we only consider
     810             :      * objects with hand-assigned OIDs to be "built in", not for instance any
     811             :      * function or type defined in the information_schema. This is important
     812             :      * because only hand-assigned OIDs can be expected to remain stable across
     813             :      * major versions.
     814             :      */
     815        2318 :     for (i = 0; i < desc->natts; i++)
     816             :     {
     817        1564 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     818             : 
     819        1564 :         if (!logicalrep_should_publish_column(att, columns,
     820             :                                               include_gencols_type))
     821         142 :             continue;
     822             : 
     823        1422 :         if (att->atttypid < FirstGenbkiObjectId)
     824        1386 :             continue;
     825             : 
     826          36 :         OutputPluginPrepareWrite(ctx, false);
     827          36 :         logicalrep_write_typ(ctx->out, xid, att->atttypid);
     828          36 :         OutputPluginWrite(ctx, false);
     829             :     }
     830             : 
     831         754 :     OutputPluginPrepareWrite(ctx, false);
     832         754 :     logicalrep_write_rel(ctx->out, xid, relation, columns,
     833             :                          include_gencols_type);
     834         754 :     OutputPluginWrite(ctx, false);
     835         754 : }
     836             : 
     837             : /*
     838             :  * Executor state preparation for evaluation of row filter expressions for the
     839             :  * specified relation.
     840             :  */
     841             : static EState *
     842          34 : create_estate_for_relation(Relation rel)
     843             : {
     844             :     EState     *estate;
     845             :     RangeTblEntry *rte;
     846          34 :     List       *perminfos = NIL;
     847             : 
     848          34 :     estate = CreateExecutorState();
     849             : 
     850          34 :     rte = makeNode(RangeTblEntry);
     851          34 :     rte->rtekind = RTE_RELATION;
     852          34 :     rte->relid = RelationGetRelid(rel);
     853          34 :     rte->relkind = rel->rd_rel->relkind;
     854          34 :     rte->rellockmode = AccessShareLock;
     855             : 
     856          34 :     addRTEPermissionInfo(&perminfos, rte);
     857             : 
     858          34 :     ExecInitRangeTable(estate, list_make1(rte), perminfos,
     859             :                        bms_make_singleton(1));
     860             : 
     861          34 :     estate->es_output_cid = GetCurrentCommandId(false);
     862             : 
     863          34 :     return estate;
     864             : }
     865             : 
     866             : /*
     867             :  * Evaluates row filter.
     868             :  *
     869             :  * If the row filter evaluates to NULL, it is taken as false i.e. the change
     870             :  * isn't replicated.
     871             :  */
     872             : static bool
     873          76 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
     874             : {
     875             :     Datum       ret;
     876             :     bool        isnull;
     877             : 
     878             :     Assert(state != NULL);
     879             : 
     880          76 :     ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
     881             : 
     882          76 :     elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
     883             :          isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
     884             :          isnull ? "true" : "false");
     885             : 
     886          76 :     if (isnull)
     887           2 :         return false;
     888             : 
     889          74 :     return DatumGetBool(ret);
     890             : }
     891             : 
     892             : /*
     893             :  * Make sure the per-entry memory context exists.
     894             :  */
     895             : static void
     896         646 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
     897             : {
     898             :     Relation    relation;
     899             : 
     900             :     /* The context may already exist, in which case bail out. */
     901         646 :     if (entry->entry_cxt)
     902          34 :         return;
     903             : 
     904         612 :     relation = RelationIdGetRelation(entry->publish_as_relid);
     905             : 
     906         612 :     entry->entry_cxt = AllocSetContextCreate(data->cachectx,
     907             :                                              "entry private context",
     908             :                                              ALLOCSET_SMALL_SIZES);
     909             : 
     910         612 :     MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
     911             :                                       RelationGetRelationName(relation));
     912             : }
     913             : 
     914             : /*
     915             :  * Initialize the row filter.
     916             :  */
     917             : static void
     918         612 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
     919             :                          RelationSyncEntry *entry)
     920             : {
     921             :     ListCell   *lc;
     922         612 :     List       *rfnodes[] = {NIL, NIL, NIL};    /* One per pubaction */
     923         612 :     bool        no_filter[] = {false, false, false};    /* One per pubaction */
     924             :     MemoryContext oldctx;
     925             :     int         idx;
     926         612 :     bool        has_filter = true;
     927         612 :     Oid         schemaid = get_rel_namespace(entry->publish_as_relid);
     928             : 
     929             :     /*
     930             :      * Find if there are any row filters for this relation. If there are, then
     931             :      * prepare the necessary ExprState and cache it in entry->exprstate. To
     932             :      * build an expression state, we need to ensure the following:
     933             :      *
     934             :      * All the given publication-table mappings must be checked.
     935             :      *
     936             :      * Multiple publications might have multiple row filters for this
     937             :      * relation. Since row filter usage depends on the DML operation, there
     938             :      * are multiple lists (one for each operation) to which row filters will
     939             :      * be appended.
     940             :      *
     941             :      * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
     942             :      * expression" so it takes precedence.
     943             :      */
     944         654 :     foreach(lc, publications)
     945             :     {
     946         620 :         Publication *pub = lfirst(lc);
     947         620 :         HeapTuple   rftuple = NULL;
     948         620 :         Datum       rfdatum = 0;
     949         620 :         bool        pub_no_filter = true;
     950             : 
     951             :         /*
     952             :          * If the publication is FOR ALL TABLES, or the publication includes a
     953             :          * FOR TABLES IN SCHEMA where the table belongs to the referred
     954             :          * schema, then it is treated the same as if there are no row filters
     955             :          * (even if other publications have a row filter).
     956             :          */
     957         620 :         if (!pub->alltables &&
     958         456 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     959             :                                    ObjectIdGetDatum(schemaid),
     960             :                                    ObjectIdGetDatum(pub->oid)))
     961             :         {
     962             :             /*
     963             :              * Check for the presence of a row filter in this publication.
     964             :              */
     965         444 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     966             :                                       ObjectIdGetDatum(entry->publish_as_relid),
     967             :                                       ObjectIdGetDatum(pub->oid));
     968             : 
     969         444 :             if (HeapTupleIsValid(rftuple))
     970             :             {
     971             :                 /* Null indicates no filter. */
     972         420 :                 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     973             :                                           Anum_pg_publication_rel_prqual,
     974             :                                           &pub_no_filter);
     975             :             }
     976             :         }
     977             : 
     978         620 :         if (pub_no_filter)
     979             :         {
     980         592 :             if (rftuple)
     981         392 :                 ReleaseSysCache(rftuple);
     982             : 
     983         592 :             no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
     984         592 :             no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
     985         592 :             no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
     986             : 
     987             :             /*
     988             :              * Quick exit if all the DML actions are publicized via this
     989             :              * publication.
     990             :              */
     991         592 :             if (no_filter[PUBACTION_INSERT] &&
     992         592 :                 no_filter[PUBACTION_UPDATE] &&
     993         578 :                 no_filter[PUBACTION_DELETE])
     994             :             {
     995         578 :                 has_filter = false;
     996         578 :                 break;
     997             :             }
     998             : 
     999             :             /* No additional work for this publication. Next one. */
    1000          14 :             continue;
    1001             :         }
    1002             : 
    1003             :         /* Form the per pubaction row filter lists. */
    1004          28 :         if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
    1005          28 :             rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
    1006          28 :                                                 TextDatumGetCString(rfdatum));
    1007          28 :         if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
    1008          28 :             rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
    1009          28 :                                                 TextDatumGetCString(rfdatum));
    1010          28 :         if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
    1011          28 :             rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
    1012          28 :                                                 TextDatumGetCString(rfdatum));
    1013             : 
    1014          28 :         ReleaseSysCache(rftuple);
    1015             :     }                           /* loop all subscribed publications */
    1016             : 
    1017             :     /* Clean the row filter */
    1018        2448 :     for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
    1019             :     {
    1020        1836 :         if (no_filter[idx])
    1021             :         {
    1022        1752 :             list_free_deep(rfnodes[idx]);
    1023        1752 :             rfnodes[idx] = NIL;
    1024             :         }
    1025             :     }
    1026             : 
    1027         612 :     if (has_filter)
    1028             :     {
    1029          34 :         Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1030             : 
    1031          34 :         pgoutput_ensure_entry_cxt(data, entry);
    1032             : 
    1033             :         /*
    1034             :          * Now all the filters for all pubactions are known. Combine them when
    1035             :          * their pubactions are the same.
    1036             :          */
    1037          34 :         oldctx = MemoryContextSwitchTo(entry->entry_cxt);
    1038          34 :         entry->estate = create_estate_for_relation(relation);
    1039         136 :         for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
    1040             :         {
    1041         102 :             List       *filters = NIL;
    1042             :             Expr       *rfnode;
    1043             : 
    1044         102 :             if (rfnodes[idx] == NIL)
    1045          42 :                 continue;
    1046             : 
    1047         126 :             foreach(lc, rfnodes[idx])
    1048          66 :                 filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
    1049             : 
    1050             :             /* combine the row filter and cache the ExprState */
    1051          60 :             rfnode = make_orclause(filters);
    1052          60 :             entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
    1053             :         }                       /* for each pubaction */
    1054          34 :         MemoryContextSwitchTo(oldctx);
    1055             : 
    1056          34 :         RelationClose(relation);
    1057             :     }
    1058         612 : }
    1059             : 
    1060             : /*
    1061             :  * If the table contains a generated column, check for any conflicting
    1062             :  * values of 'publish_generated_columns' parameter in the publications.
    1063             :  */
    1064             : static void
    1065         612 : check_and_init_gencol(PGOutputData *data, List *publications,
    1066             :                       RelationSyncEntry *entry)
    1067             : {
    1068         612 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1069         612 :     TupleDesc   desc = RelationGetDescr(relation);
    1070         612 :     bool        gencolpresent = false;
    1071         612 :     bool        first = true;
    1072             : 
    1073             :     /* Check if there is any generated column present. */
    1074        1858 :     for (int i = 0; i < desc->natts; i++)
    1075             :     {
    1076        1260 :         CompactAttribute *att = TupleDescCompactAttr(desc, i);
    1077             : 
    1078        1260 :         if (att->attgenerated)
    1079             :         {
    1080          14 :             gencolpresent = true;
    1081          14 :             break;
    1082             :         }
    1083             :     }
    1084             : 
    1085             :     /* There are no generated columns to be published. */
    1086         612 :     if (!gencolpresent)
    1087             :     {
    1088         598 :         entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
    1089         598 :         return;
    1090             :     }
    1091             : 
    1092             :     /*
    1093             :      * There may be a conflicting value for 'publish_generated_columns'
    1094             :      * parameter in the publications.
    1095             :      */
    1096          44 :     foreach_ptr(Publication, pub, publications)
    1097             :     {
    1098             :         /*
    1099             :          * The column list takes precedence over the
    1100             :          * 'publish_generated_columns' parameter. Those will be checked later,
    1101             :          * see pgoutput_column_list_init.
    1102             :          */
    1103          16 :         if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
    1104           6 :             continue;
    1105             : 
    1106          10 :         if (first)
    1107             :         {
    1108          10 :             entry->include_gencols_type = pub->pubgencols_type;
    1109          10 :             first = false;
    1110             :         }
    1111           0 :         else if (entry->include_gencols_type != pub->pubgencols_type)
    1112           0 :             ereport(ERROR,
    1113             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1114             :                     errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
    1115             :                            get_namespace_name(RelationGetNamespace(relation)),
    1116             :                            RelationGetRelationName(relation)));
    1117             :     }
    1118             : }
    1119             : 
    1120             : /*
    1121             :  * Initialize the column list.
    1122             :  */
    1123             : static void
    1124         612 : pgoutput_column_list_init(PGOutputData *data, List *publications,
    1125             :                           RelationSyncEntry *entry)
    1126             : {
    1127             :     ListCell   *lc;
    1128         612 :     bool        first = true;
    1129         612 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1130         612 :     bool        found_pub_collist = false;
    1131         612 :     Bitmapset  *relcols = NULL;
    1132             : 
    1133         612 :     pgoutput_ensure_entry_cxt(data, entry);
    1134             : 
    1135             :     /*
    1136             :      * Find if there are any column lists for this relation. If there are,
    1137             :      * build a bitmap using the column lists.
    1138             :      *
    1139             :      * Multiple publications might have multiple column lists for this
    1140             :      * relation.
    1141             :      *
    1142             :      * Note that we don't support the case where the column list is different
    1143             :      * for the same table when combining publications. See comments atop
    1144             :      * fetch_relation_list. But one can later change the publication so we
    1145             :      * still need to check all the given publication-table mappings and report
    1146             :      * an error if any publications have a different column list.
    1147             :      */
    1148        1244 :     foreach(lc, publications)
    1149             :     {
    1150         634 :         Publication *pub = lfirst(lc);
    1151         634 :         Bitmapset  *cols = NULL;
    1152             : 
    1153             :         /* Retrieve the bitmap of columns for a column list publication. */
    1154         634 :         found_pub_collist |= check_and_fetch_column_list(pub,
    1155             :                                                          entry->publish_as_relid,
    1156             :                                                          entry->entry_cxt, &cols);
    1157             : 
    1158             :         /*
    1159             :          * For non-column list publications — e.g. TABLE (without a column
    1160             :          * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
    1161             :          * of the table (including generated columns when
    1162             :          * 'publish_generated_columns' parameter is true).
    1163             :          */
    1164         634 :         if (!cols)
    1165             :         {
    1166             :             /*
    1167             :              * Cache the table columns for the first publication with no
    1168             :              * specified column list to detect publication with a different
    1169             :              * column list.
    1170             :              */
    1171         556 :             if (!relcols && (list_length(publications) > 1))
    1172             :             {
    1173          18 :                 MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
    1174             : 
    1175          18 :                 relcols = pub_form_cols_map(relation,
    1176             :                                             entry->include_gencols_type);
    1177          18 :                 MemoryContextSwitchTo(oldcxt);
    1178             :             }
    1179             : 
    1180         556 :             cols = relcols;
    1181             :         }
    1182             : 
    1183         634 :         if (first)
    1184             :         {
    1185         612 :             entry->columns = cols;
    1186         612 :             first = false;
    1187             :         }
    1188          22 :         else if (!bms_equal(entry->columns, cols))
    1189           2 :             ereport(ERROR,
    1190             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1191             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    1192             :                            get_namespace_name(RelationGetNamespace(relation)),
    1193             :                            RelationGetRelationName(relation)));
    1194             :     }                           /* loop all subscribed publications */
    1195             : 
    1196             :     /*
    1197             :      * If no column list publications exist, columns to be published will be
    1198             :      * computed later according to the 'publish_generated_columns' parameter.
    1199             :      */
    1200         610 :     if (!found_pub_collist)
    1201         538 :         entry->columns = NULL;
    1202             : 
    1203         610 :     RelationClose(relation);
    1204         610 : }
    1205             : 
    1206             : /*
    1207             :  * Initialize the slot for storing new and old tuples, and build the map that
    1208             :  * will be used to convert the relation's tuples into the ancestor's format.
    1209             :  */
    1210             : static void
    1211         612 : init_tuple_slot(PGOutputData *data, Relation relation,
    1212             :                 RelationSyncEntry *entry)
    1213             : {
    1214             :     MemoryContext oldctx;
    1215             :     TupleDesc   oldtupdesc;
    1216             :     TupleDesc   newtupdesc;
    1217             : 
    1218         612 :     oldctx = MemoryContextSwitchTo(data->cachectx);
    1219             : 
    1220             :     /*
    1221             :      * Create tuple table slots. Create a copy of the TupleDesc as it needs to
    1222             :      * live as long as the cache remains.
    1223             :      */
    1224         612 :     oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1225         612 :     newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1226             : 
    1227         612 :     entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
    1228         612 :     entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
    1229             : 
    1230         612 :     MemoryContextSwitchTo(oldctx);
    1231             : 
    1232             :     /*
    1233             :      * Cache the map that will be used to convert the relation's tuples into
    1234             :      * the ancestor's format, if needed.
    1235             :      */
    1236         612 :     if (entry->publish_as_relid != RelationGetRelid(relation))
    1237             :     {
    1238          74 :         Relation    ancestor = RelationIdGetRelation(entry->publish_as_relid);
    1239          74 :         TupleDesc   indesc = RelationGetDescr(relation);
    1240          74 :         TupleDesc   outdesc = RelationGetDescr(ancestor);
    1241             : 
    1242             :         /* Map must live as long as the logical decoding context. */
    1243          74 :         oldctx = MemoryContextSwitchTo(data->cachectx);
    1244             : 
    1245          74 :         entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
    1246             : 
    1247          74 :         MemoryContextSwitchTo(oldctx);
    1248          74 :         RelationClose(ancestor);
    1249             :     }
    1250         612 : }
    1251             : 
    1252             : /*
    1253             :  * Change is checked against the row filter if any.
    1254             :  *
    1255             :  * Returns true if the change is to be replicated, else false.
    1256             :  *
    1257             :  * For inserts, evaluate the row filter for new tuple.
    1258             :  * For deletes, evaluate the row filter for old tuple.
    1259             :  * For updates, evaluate the row filter for old and new tuple.
    1260             :  *
    1261             :  * For updates, if both evaluations are true, we allow sending the UPDATE and
    1262             :  * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
    1263             :  * only one of the tuples matches the row filter expression, we transform
    1264             :  * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
    1265             :  * following rules:
    1266             :  *
    1267             :  * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
    1268             :  * Case 2: old-row (no match)    new row (match)     -> INSERT
    1269             :  * Case 3: old-row (match)       new-row (no match)  -> DELETE
    1270             :  * Case 4: old-row (match)       new row (match)     -> UPDATE
    1271             :  *
    1272             :  * The new action is updated in the action parameter.
    1273             :  *
    1274             :  * The new slot could be updated when transforming the UPDATE into INSERT,
    1275             :  * because the original new tuple might not have column values from the replica
    1276             :  * identity.
    1277             :  *
    1278             :  * Examples:
    1279             :  * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
    1280             :  * Since the old tuple satisfies, the initial table synchronization copied this
    1281             :  * row (or another method was used to guarantee that there is data
    1282             :  * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
    1283             :  * row filter, so from a data consistency perspective, that row should be
    1284             :  * removed on the subscriber. The UPDATE should be transformed into a DELETE
    1285             :  * statement and be sent to the subscriber. Keeping this row on the subscriber
    1286             :  * is undesirable because it doesn't reflect what was defined in the row filter
    1287             :  * expression on the publisher. This row on the subscriber would likely not be
    1288             :  * modified by replication again. If someone inserted a new row with the same
    1289             :  * old identifier, replication could stop due to a constraint violation.
    1290             :  *
    1291             :  * Let's say the old tuple doesn't match the row filter but the new tuple does.
    1292             :  * Since the old tuple doesn't satisfy, the initial table synchronization
    1293             :  * probably didn't copy this row. However, after the UPDATE the new tuple does
    1294             :  * satisfy the row filter, so from a data consistency perspective, that row
    1295             :  * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
    1296             :  * statements have no effect (it matches no row -- see
    1297             :  * apply_handle_update_internal()). So, the UPDATE should be transformed into a
    1298             :  * INSERT statement and be sent to the subscriber. However, this might surprise
    1299             :  * someone who expects the data set to satisfy the row filter expression on the
    1300             :  * provider.
    1301             :  */
    1302             : static bool
    1303      364568 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
    1304             :                     TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
    1305             :                     ReorderBufferChangeType *action)
    1306             : {
    1307             :     TupleDesc   desc;
    1308             :     int         i;
    1309             :     bool        old_matched,
    1310             :                 new_matched,
    1311             :                 result;
    1312             :     TupleTableSlot *tmp_new_slot;
    1313      364568 :     TupleTableSlot *new_slot = *new_slot_ptr;
    1314             :     ExprContext *ecxt;
    1315             :     ExprState  *filter_exprstate;
    1316             : 
    1317             :     /*
    1318             :      * We need this map to avoid relying on ReorderBufferChangeType enums
    1319             :      * having specific values.
    1320             :      */
    1321             :     static const int map_changetype_pubaction[] = {
    1322             :         [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
    1323             :         [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
    1324             :         [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
    1325             :     };
    1326             : 
    1327             :     Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
    1328             :            *action == REORDER_BUFFER_CHANGE_UPDATE ||
    1329             :            *action == REORDER_BUFFER_CHANGE_DELETE);
    1330             : 
    1331             :     Assert(new_slot || old_slot);
    1332             : 
    1333             :     /* Get the corresponding row filter */
    1334      364568 :     filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
    1335             : 
    1336             :     /* Bail out if there is no row filter */
    1337      364568 :     if (!filter_exprstate)
    1338      364500 :         return true;
    1339             : 
    1340          68 :     elog(DEBUG3, "table \"%s.%s\" has row filter",
    1341             :          get_namespace_name(RelationGetNamespace(relation)),
    1342             :          RelationGetRelationName(relation));
    1343             : 
    1344          68 :     ResetPerTupleExprContext(entry->estate);
    1345             : 
    1346          68 :     ecxt = GetPerTupleExprContext(entry->estate);
    1347             : 
    1348             :     /*
    1349             :      * For the following occasions where there is only one tuple, we can
    1350             :      * evaluate the row filter for that tuple and return.
    1351             :      *
    1352             :      * For inserts, we only have the new tuple.
    1353             :      *
    1354             :      * For updates, we can have only a new tuple when none of the replica
    1355             :      * identity columns changed and none of those columns have external data
    1356             :      * but we still need to evaluate the row filter for the new tuple as the
    1357             :      * existing values of those columns might not match the filter. Also,
    1358             :      * users can use constant expressions in the row filter, so we anyway need
    1359             :      * to evaluate it for the new tuple.
    1360             :      *
    1361             :      * For deletes, we only have the old tuple.
    1362             :      */
    1363          68 :     if (!new_slot || !old_slot)
    1364             :     {
    1365          60 :         ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
    1366          60 :         result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1367             : 
    1368          60 :         return result;
    1369             :     }
    1370             : 
    1371             :     /*
    1372             :      * Both the old and new tuples must be valid only for updates and need to
    1373             :      * be checked against the row filter.
    1374             :      */
    1375             :     Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
    1376             : 
    1377           8 :     slot_getallattrs(new_slot);
    1378           8 :     slot_getallattrs(old_slot);
    1379             : 
    1380           8 :     tmp_new_slot = NULL;
    1381           8 :     desc = RelationGetDescr(relation);
    1382             : 
    1383             :     /*
    1384             :      * The new tuple might not have all the replica identity columns, in which
    1385             :      * case it needs to be copied over from the old tuple.
    1386             :      */
    1387          24 :     for (i = 0; i < desc->natts; i++)
    1388             :     {
    1389          16 :         CompactAttribute *att = TupleDescCompactAttr(desc, i);
    1390             : 
    1391             :         /*
    1392             :          * if the column in the new tuple or old tuple is null, nothing to do
    1393             :          */
    1394          16 :         if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
    1395           2 :             continue;
    1396             : 
    1397             :         /*
    1398             :          * Unchanged toasted replica identity columns are only logged in the
    1399             :          * old tuple. Copy this over to the new tuple. The changed (or WAL
    1400             :          * Logged) toast values are always assembled in memory and set as
    1401             :          * VARTAG_INDIRECT. See ReorderBufferToastReplace.
    1402             :          */
    1403          22 :         if (att->attlen == -1 &&
    1404           8 :             VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
    1405           2 :             !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
    1406             :         {
    1407           2 :             if (!tmp_new_slot)
    1408             :             {
    1409           2 :                 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
    1410           2 :                 ExecClearTuple(tmp_new_slot);
    1411             : 
    1412           2 :                 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
    1413           2 :                        desc->natts * sizeof(Datum));
    1414           2 :                 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
    1415           2 :                        desc->natts * sizeof(bool));
    1416             :             }
    1417             : 
    1418           2 :             tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
    1419           2 :             tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
    1420             :         }
    1421             :     }
    1422             : 
    1423           8 :     ecxt->ecxt_scantuple = old_slot;
    1424           8 :     old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1425             : 
    1426           8 :     if (tmp_new_slot)
    1427             :     {
    1428           2 :         ExecStoreVirtualTuple(tmp_new_slot);
    1429           2 :         ecxt->ecxt_scantuple = tmp_new_slot;
    1430             :     }
    1431             :     else
    1432           6 :         ecxt->ecxt_scantuple = new_slot;
    1433             : 
    1434           8 :     new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1435             : 
    1436             :     /*
    1437             :      * Case 1: if both tuples don't match the row filter, bailout. Send
    1438             :      * nothing.
    1439             :      */
    1440           8 :     if (!old_matched && !new_matched)
    1441           0 :         return false;
    1442             : 
    1443             :     /*
    1444             :      * Case 2: if the old tuple doesn't satisfy the row filter but the new
    1445             :      * tuple does, transform the UPDATE into INSERT.
    1446             :      *
    1447             :      * Use the newly transformed tuple that must contain the column values for
    1448             :      * all the replica identity columns. This is required to ensure that the
    1449             :      * while inserting the tuple in the downstream node, we have all the
    1450             :      * required column values.
    1451             :      */
    1452           8 :     if (!old_matched && new_matched)
    1453             :     {
    1454           4 :         *action = REORDER_BUFFER_CHANGE_INSERT;
    1455             : 
    1456           4 :         if (tmp_new_slot)
    1457           2 :             *new_slot_ptr = tmp_new_slot;
    1458             :     }
    1459             : 
    1460             :     /*
    1461             :      * Case 3: if the old tuple satisfies the row filter but the new tuple
    1462             :      * doesn't, transform the UPDATE into DELETE.
    1463             :      *
    1464             :      * This transformation does not require another tuple. The Old tuple will
    1465             :      * be used for DELETE.
    1466             :      */
    1467           4 :     else if (old_matched && !new_matched)
    1468           2 :         *action = REORDER_BUFFER_CHANGE_DELETE;
    1469             : 
    1470             :     /*
    1471             :      * Case 4: if both tuples match the row filter, transformation isn't
    1472             :      * required. (*action is default UPDATE).
    1473             :      */
    1474             : 
    1475           8 :     return true;
    1476             : }
    1477             : 
    1478             : /*
    1479             :  * Sends the decoded DML over wire.
    1480             :  *
    1481             :  * This is called both in streaming and non-streaming modes.
    1482             :  */
    1483             : static void
    1484      366918 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1485             :                 Relation relation, ReorderBufferChange *change)
    1486             : {
    1487      366918 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1488      366918 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1489             :     MemoryContext old;
    1490             :     RelationSyncEntry *relentry;
    1491      366918 :     TransactionId xid = InvalidTransactionId;
    1492      366918 :     Relation    ancestor = NULL;
    1493      366918 :     Relation    targetrel = relation;
    1494      366918 :     ReorderBufferChangeType action = change->action;
    1495      366918 :     TupleTableSlot *old_slot = NULL;
    1496      366918 :     TupleTableSlot *new_slot = NULL;
    1497             : 
    1498      366918 :     if (!is_publishable_relation(relation))
    1499        2348 :         return;
    1500             : 
    1501             :     /*
    1502             :      * Remember the xid for the change in streaming mode. We need to send xid
    1503             :      * with each change in the streaming mode so that subscriber can make
    1504             :      * their association and on aborts, it can discard the corresponding
    1505             :      * changes.
    1506             :      */
    1507      366918 :     if (data->in_streaming)
    1508      351886 :         xid = change->txn->xid;
    1509             : 
    1510      366918 :     relentry = get_rel_sync_entry(data, relation);
    1511             : 
    1512             :     /* First check the table filter */
    1513      366916 :     switch (action)
    1514             :     {
    1515      212064 :         case REORDER_BUFFER_CHANGE_INSERT:
    1516      212064 :             if (!relentry->pubactions.pubinsert)
    1517         158 :                 return;
    1518      211906 :             break;
    1519       68982 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1520       68982 :             if (!relentry->pubactions.pubupdate)
    1521          88 :                 return;
    1522       68894 :             break;
    1523       85870 :         case REORDER_BUFFER_CHANGE_DELETE:
    1524       85870 :             if (!relentry->pubactions.pubdelete)
    1525        2102 :                 return;
    1526             : 
    1527             :             /*
    1528             :              * This is only possible if deletes are allowed even when replica
    1529             :              * identity is not defined for a table. Since the DELETE action
    1530             :              * can't be published, we simply return.
    1531             :              */
    1532       83768 :             if (!change->data.tp.oldtuple)
    1533             :             {
    1534           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
    1535           0 :                 return;
    1536             :             }
    1537       83768 :             break;
    1538      364568 :         default:
    1539             :             Assert(false);
    1540             :     }
    1541             : 
    1542             :     /* Avoid leaking memory by using and resetting our own context */
    1543      364568 :     old = MemoryContextSwitchTo(data->context);
    1544             : 
    1545             :     /* Switch relation if publishing via root. */
    1546      364568 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
    1547             :     {
    1548             :         Assert(relation->rd_rel->relispartition);
    1549         140 :         ancestor = RelationIdGetRelation(relentry->publish_as_relid);
    1550         140 :         targetrel = ancestor;
    1551             :     }
    1552             : 
    1553      364568 :     if (change->data.tp.oldtuple)
    1554             :     {
    1555       84036 :         old_slot = relentry->old_slot;
    1556       84036 :         ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
    1557             : 
    1558             :         /* Convert tuple if needed. */
    1559       84036 :         if (relentry->attrmap)
    1560             :         {
    1561          10 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1562             :                                                       &TTSOpsVirtual);
    1563             : 
    1564          10 :             old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
    1565             :         }
    1566             :     }
    1567             : 
    1568      364568 :     if (change->data.tp.newtuple)
    1569             :     {
    1570      280800 :         new_slot = relentry->new_slot;
    1571      280800 :         ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
    1572             : 
    1573             :         /* Convert tuple if needed. */
    1574      280800 :         if (relentry->attrmap)
    1575             :         {
    1576          42 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1577             :                                                       &TTSOpsVirtual);
    1578             : 
    1579          42 :             new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
    1580             :         }
    1581             :     }
    1582             : 
    1583             :     /*
    1584             :      * Check row filter.
    1585             :      *
    1586             :      * Updates could be transformed to inserts or deletes based on the results
    1587             :      * of the row filter for old and new tuple.
    1588             :      */
    1589      364568 :     if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
    1590          24 :         goto cleanup;
    1591             : 
    1592             :     /*
    1593             :      * Send BEGIN if we haven't yet.
    1594             :      *
    1595             :      * We send the BEGIN message after ensuring that we will actually send the
    1596             :      * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
    1597             :      * transactions.
    1598             :      */
    1599      364544 :     if (txndata && !txndata->sent_begin_txn)
    1600         870 :         pgoutput_send_begin(ctx, txn);
    1601             : 
    1602             :     /*
    1603             :      * Schema should be sent using the original relation because it also sends
    1604             :      * the ancestor's relation.
    1605             :      */
    1606      364544 :     maybe_send_schema(ctx, change, relation, relentry);
    1607             : 
    1608      364544 :     OutputPluginPrepareWrite(ctx, true);
    1609             : 
    1610             :     /* Send the data */
    1611      364544 :     switch (action)
    1612             :     {
    1613      211886 :         case REORDER_BUFFER_CHANGE_INSERT:
    1614      211886 :             logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
    1615      211886 :                                     data->binary, relentry->columns,
    1616             :                                     relentry->include_gencols_type);
    1617      211886 :             break;
    1618       68888 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1619       68888 :             logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
    1620       68888 :                                     new_slot, data->binary, relentry->columns,
    1621             :                                     relentry->include_gencols_type);
    1622       68888 :             break;
    1623       83770 :         case REORDER_BUFFER_CHANGE_DELETE:
    1624       83770 :             logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
    1625       83770 :                                     data->binary, relentry->columns,
    1626             :                                     relentry->include_gencols_type);
    1627       83770 :             break;
    1628      364544 :         default:
    1629             :             Assert(false);
    1630             :     }
    1631             : 
    1632      364544 :     OutputPluginWrite(ctx, true);
    1633             : 
    1634      364566 : cleanup:
    1635      364566 :     if (RelationIsValid(ancestor))
    1636             :     {
    1637         140 :         RelationClose(ancestor);
    1638         140 :         ancestor = NULL;
    1639             :     }
    1640             : 
    1641             :     /* Drop the new slots that were used to store the converted tuples. */
    1642      364566 :     if (relentry->attrmap)
    1643             :     {
    1644          52 :         if (old_slot)
    1645          10 :             ExecDropSingleTupleTableSlot(old_slot);
    1646             : 
    1647          52 :         if (new_slot)
    1648          42 :             ExecDropSingleTupleTableSlot(new_slot);
    1649             :     }
    1650             : 
    1651      364566 :     MemoryContextSwitchTo(old);
    1652      364566 :     MemoryContextReset(data->context);
    1653             : }
    1654             : 
    1655             : static void
    1656          36 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1657             :                   int nrelations, Relation relations[], ReorderBufferChange *change)
    1658             : {
    1659          36 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1660          36 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1661             :     MemoryContext old;
    1662             :     RelationSyncEntry *relentry;
    1663             :     int         i;
    1664             :     int         nrelids;
    1665             :     Oid        *relids;
    1666          36 :     TransactionId xid = InvalidTransactionId;
    1667             : 
    1668             :     /* Remember the xid for the change in streaming mode. See pgoutput_change. */
    1669          36 :     if (data->in_streaming)
    1670           0 :         xid = change->txn->xid;
    1671             : 
    1672          36 :     old = MemoryContextSwitchTo(data->context);
    1673             : 
    1674          36 :     relids = palloc0(nrelations * sizeof(Oid));
    1675          36 :     nrelids = 0;
    1676             : 
    1677         110 :     for (i = 0; i < nrelations; i++)
    1678             :     {
    1679          74 :         Relation    relation = relations[i];
    1680          74 :         Oid         relid = RelationGetRelid(relation);
    1681             : 
    1682          74 :         if (!is_publishable_relation(relation))
    1683           0 :             continue;
    1684             : 
    1685          74 :         relentry = get_rel_sync_entry(data, relation);
    1686             : 
    1687          74 :         if (!relentry->pubactions.pubtruncate)
    1688          32 :             continue;
    1689             : 
    1690             :         /*
    1691             :          * Don't send partitions if the publication wants to send only the
    1692             :          * root tables through it.
    1693             :          */
    1694          42 :         if (relation->rd_rel->relispartition &&
    1695          30 :             relentry->publish_as_relid != relid)
    1696           6 :             continue;
    1697             : 
    1698          36 :         relids[nrelids++] = relid;
    1699             : 
    1700             :         /* Send BEGIN if we haven't yet */
    1701          36 :         if (txndata && !txndata->sent_begin_txn)
    1702          24 :             pgoutput_send_begin(ctx, txn);
    1703             : 
    1704          36 :         maybe_send_schema(ctx, change, relation, relentry);
    1705             :     }
    1706             : 
    1707          36 :     if (nrelids > 0)
    1708             :     {
    1709          24 :         OutputPluginPrepareWrite(ctx, true);
    1710          24 :         logicalrep_write_truncate(ctx->out,
    1711             :                                   xid,
    1712             :                                   nrelids,
    1713             :                                   relids,
    1714          24 :                                   change->data.truncate.cascade,
    1715          24 :                                   change->data.truncate.restart_seqs);
    1716          24 :         OutputPluginWrite(ctx, true);
    1717             :     }
    1718             : 
    1719          36 :     MemoryContextSwitchTo(old);
    1720          36 :     MemoryContextReset(data->context);
    1721          36 : }
    1722             : 
    1723             : static void
    1724          14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1725             :                  XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
    1726             :                  const char *message)
    1727             : {
    1728          14 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1729          14 :     TransactionId xid = InvalidTransactionId;
    1730             : 
    1731          14 :     if (!data->messages)
    1732           4 :         return;
    1733             : 
    1734             :     /*
    1735             :      * Remember the xid for the message in streaming mode. See
    1736             :      * pgoutput_change.
    1737             :      */
    1738          10 :     if (data->in_streaming)
    1739           0 :         xid = txn->xid;
    1740             : 
    1741             :     /*
    1742             :      * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
    1743             :      */
    1744          10 :     if (transactional)
    1745             :     {
    1746           4 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1747             : 
    1748             :         /* Send BEGIN if we haven't yet */
    1749           4 :         if (txndata && !txndata->sent_begin_txn)
    1750           4 :             pgoutput_send_begin(ctx, txn);
    1751             :     }
    1752             : 
    1753          10 :     OutputPluginPrepareWrite(ctx, true);
    1754          10 :     logicalrep_write_message(ctx->out,
    1755             :                              xid,
    1756             :                              message_lsn,
    1757             :                              transactional,
    1758             :                              prefix,
    1759             :                              sz,
    1760             :                              message);
    1761          10 :     OutputPluginWrite(ctx, true);
    1762             : }
    1763             : 
    1764             : /*
    1765             :  * Return true if the data is associated with an origin and the user has
    1766             :  * requested the changes that don't have an origin, false otherwise.
    1767             :  */
    1768             : static bool
    1769      609692 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
    1770             :                        RepOriginId origin_id)
    1771             : {
    1772      609692 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1773             : 
    1774      609692 :     if (data->publish_no_origin && origin_id != InvalidRepOriginId)
    1775         310 :         return true;
    1776             : 
    1777      609382 :     return false;
    1778             : }
    1779             : 
    1780             : /*
    1781             :  * Shutdown the output plugin.
    1782             :  *
    1783             :  * Note, we don't need to clean the data->context, data->cachectx, and
    1784             :  * data->pubctx as they are child contexts of the ctx->context so they
    1785             :  * will be cleaned up by logical decoding machinery.
    1786             :  */
    1787             : static void
    1788        1064 : pgoutput_shutdown(LogicalDecodingContext *ctx)
    1789             : {
    1790        1064 :     pgoutput_memory_context_reset(NULL);
    1791        1064 : }
    1792             : 
    1793             : /*
    1794             :  * Load publications from the list of publication names.
    1795             :  *
    1796             :  * Here, we skip the publications that don't exist yet. This will allow us
    1797             :  * to silently continue the replication in the absence of a missing publication.
    1798             :  * This is required because we allow the users to create publications after they
    1799             :  * have specified the required publications at the time of replication start.
    1800             :  */
    1801             : static List *
    1802         406 : LoadPublications(List *pubnames)
    1803             : {
    1804         406 :     List       *result = NIL;
    1805             :     ListCell   *lc;
    1806             : 
    1807         916 :     foreach(lc, pubnames)
    1808             :     {
    1809         510 :         char       *pubname = (char *) lfirst(lc);
    1810         510 :         Publication *pub = GetPublicationByName(pubname, true);
    1811             : 
    1812         510 :         if (pub)
    1813         506 :             result = lappend(result, pub);
    1814             :         else
    1815           4 :             ereport(WARNING,
    1816             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1817             :                     errmsg("skipped loading publication \"%s\"", pubname),
    1818             :                     errdetail("The publication does not exist at this point in the WAL."),
    1819             :                     errhint("Create the publication if it does not exist."));
    1820             :     }
    1821             : 
    1822         406 :     return result;
    1823             : }
    1824             : 
    1825             : /*
    1826             :  * Publication syscache invalidation callback.
    1827             :  *
    1828             :  * Called for invalidations on pg_publication.
    1829             :  */
    1830             : static void
    1831         644 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
    1832             : {
    1833         644 :     publications_valid = false;
    1834         644 : }
    1835             : 
    1836             : /*
    1837             :  * START STREAM callback
    1838             :  */
    1839             : static void
    1840        1280 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
    1841             :                       ReorderBufferTXN *txn)
    1842             : {
    1843        1280 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1844        1280 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
    1845             : 
    1846             :     /* we can't nest streaming of transactions */
    1847             :     Assert(!data->in_streaming);
    1848             : 
    1849             :     /*
    1850             :      * If we already sent the first stream for this transaction then don't
    1851             :      * send the origin id in the subsequent streams.
    1852             :      */
    1853        1280 :     if (rbtxn_is_streamed(txn))
    1854        1150 :         send_replication_origin = false;
    1855             : 
    1856        1280 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
    1857        1280 :     logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
    1858             : 
    1859        1280 :     send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
    1860             :                      send_replication_origin);
    1861             : 
    1862        1280 :     OutputPluginWrite(ctx, true);
    1863             : 
    1864             :     /* we're streaming a chunk of transaction now */
    1865        1280 :     data->in_streaming = true;
    1866        1280 : }
    1867             : 
    1868             : /*
    1869             :  * STOP STREAM callback
    1870             :  */
    1871             : static void
    1872        1280 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
    1873             :                      ReorderBufferTXN *txn)
    1874             : {
    1875        1280 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1876             : 
    1877             :     /* we should be streaming a transaction */
    1878             :     Assert(data->in_streaming);
    1879             : 
    1880        1280 :     OutputPluginPrepareWrite(ctx, true);
    1881        1280 :     logicalrep_write_stream_stop(ctx->out);
    1882        1280 :     OutputPluginWrite(ctx, true);
    1883             : 
    1884             :     /* we've stopped streaming a transaction */
    1885        1280 :     data->in_streaming = false;
    1886        1280 : }
    1887             : 
    1888             : /*
    1889             :  * Notify downstream to discard the streamed transaction (along with all
    1890             :  * its subtransactions, if it's a toplevel transaction).
    1891             :  */
    1892             : static void
    1893          52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
    1894             :                       ReorderBufferTXN *txn,
    1895             :                       XLogRecPtr abort_lsn)
    1896             : {
    1897             :     ReorderBufferTXN *toptxn;
    1898          52 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1899          52 :     bool        write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
    1900             : 
    1901             :     /*
    1902             :      * The abort should happen outside streaming block, even for streamed
    1903             :      * transactions. The transaction has to be marked as streamed, though.
    1904             :      */
    1905             :     Assert(!data->in_streaming);
    1906             : 
    1907             :     /* determine the toplevel transaction */
    1908          52 :     toptxn = rbtxn_get_toptxn(txn);
    1909             : 
    1910             :     Assert(rbtxn_is_streamed(toptxn));
    1911             : 
    1912          52 :     OutputPluginPrepareWrite(ctx, true);
    1913          52 :     logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
    1914             :                                   txn->abort_time, write_abort_info);
    1915             : 
    1916          52 :     OutputPluginWrite(ctx, true);
    1917             : 
    1918          52 :     cleanup_rel_sync_cache(toptxn->xid, false);
    1919          52 : }
    1920             : 
    1921             : /*
    1922             :  * Notify downstream to apply the streamed transaction (along with all
    1923             :  * its subtransactions).
    1924             :  */
    1925             : static void
    1926          92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
    1927             :                        ReorderBufferTXN *txn,
    1928             :                        XLogRecPtr commit_lsn)
    1929             : {
    1930          92 :     PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
    1931             : 
    1932             :     /*
    1933             :      * The commit should happen outside streaming block, even for streamed
    1934             :      * transactions. The transaction has to be marked as streamed, though.
    1935             :      */
    1936             :     Assert(!data->in_streaming);
    1937             :     Assert(rbtxn_is_streamed(txn));
    1938             : 
    1939          92 :     OutputPluginUpdateProgress(ctx, false);
    1940             : 
    1941          92 :     OutputPluginPrepareWrite(ctx, true);
    1942          92 :     logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
    1943          92 :     OutputPluginWrite(ctx, true);
    1944             : 
    1945          92 :     cleanup_rel_sync_cache(txn->xid, true);
    1946          92 : }
    1947             : 
    1948             : /*
    1949             :  * PREPARE callback (for streaming two-phase commit).
    1950             :  *
    1951             :  * Notify the downstream to prepare the transaction.
    1952             :  */
    1953             : static void
    1954          32 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
    1955             :                             ReorderBufferTXN *txn,
    1956             :                             XLogRecPtr prepare_lsn)
    1957             : {
    1958             :     Assert(rbtxn_is_streamed(txn));
    1959             : 
    1960          32 :     OutputPluginUpdateProgress(ctx, false);
    1961          32 :     OutputPluginPrepareWrite(ctx, true);
    1962          32 :     logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
    1963          32 :     OutputPluginWrite(ctx, true);
    1964          32 : }
    1965             : 
    1966             : /*
    1967             :  * Initialize the relation schema sync cache for a decoding session.
    1968             :  *
    1969             :  * The hash table is destroyed at the end of a decoding session. While
    1970             :  * relcache invalidations still exist and will still be invoked, they
    1971             :  * will just see the null hash table global and take no action.
    1972             :  */
    1973             : static void
    1974         810 : init_rel_sync_cache(MemoryContext cachectx)
    1975             : {
    1976             :     HASHCTL     ctl;
    1977             :     static bool relation_callbacks_registered = false;
    1978             : 
    1979             :     /* Nothing to do if hash table already exists */
    1980         810 :     if (RelationSyncCache != NULL)
    1981           4 :         return;
    1982             : 
    1983             :     /* Make a new hash table for the cache */
    1984         810 :     ctl.keysize = sizeof(Oid);
    1985         810 :     ctl.entrysize = sizeof(RelationSyncEntry);
    1986         810 :     ctl.hcxt = cachectx;
    1987             : 
    1988         810 :     RelationSyncCache = hash_create("logical replication output relation cache",
    1989             :                                     128, &ctl,
    1990             :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
    1991             : 
    1992             :     Assert(RelationSyncCache != NULL);
    1993             : 
    1994             :     /* No more to do if we already registered callbacks */
    1995         810 :     if (relation_callbacks_registered)
    1996           4 :         return;
    1997             : 
    1998             :     /* We must update the cache entry for a relation after a relcache flush */
    1999         806 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
    2000             : 
    2001             :     /*
    2002             :      * Flush all cache entries after a pg_namespace change, in case it was a
    2003             :      * schema rename affecting a relation being replicated.
    2004             :      *
    2005             :      * XXX: It is not a good idea to invalidate all the relation entries in
    2006             :      * RelationSyncCache on schema rename. We can optimize it to invalidate
    2007             :      * only the required relations by either having a specific invalidation
    2008             :      * message containing impacted relations or by having schema information
    2009             :      * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
    2010             :      * passed to the callback.
    2011             :      */
    2012         806 :     CacheRegisterSyscacheCallback(NAMESPACEOID,
    2013             :                                   rel_sync_cache_publication_cb,
    2014             :                                   (Datum) 0);
    2015             : 
    2016         806 :     relation_callbacks_registered = true;
    2017             : }
    2018             : 
    2019             : /*
    2020             :  * We expect relatively small number of streamed transactions.
    2021             :  */
    2022             : static bool
    2023      351886 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    2024             : {
    2025      351886 :     return list_member_xid(entry->streamed_txns, xid);
    2026             : }
    2027             : 
    2028             : /*
    2029             :  * Add the xid in the rel sync entry for which we have already sent the schema
    2030             :  * of the relation.
    2031             :  */
    2032             : static void
    2033         148 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    2034             : {
    2035             :     MemoryContext oldctx;
    2036             : 
    2037         148 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    2038             : 
    2039         148 :     entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
    2040             : 
    2041         148 :     MemoryContextSwitchTo(oldctx);
    2042         148 : }
    2043             : 
    2044             : /*
    2045             :  * Find or create entry in the relation schema cache.
    2046             :  *
    2047             :  * This looks up publications that the given relation is directly or
    2048             :  * indirectly part of (the latter if it's really the relation's ancestor that
    2049             :  * is part of a publication) and fills up the found entry with the information
    2050             :  * about which operations to publish and whether to use an ancestor's schema
    2051             :  * when publishing.
    2052             :  */
    2053             : static RelationSyncEntry *
    2054      366992 : get_rel_sync_entry(PGOutputData *data, Relation relation)
    2055             : {
    2056             :     RelationSyncEntry *entry;
    2057             :     bool        found;
    2058             :     MemoryContext oldctx;
    2059      366992 :     Oid         relid = RelationGetRelid(relation);
    2060             : 
    2061             :     Assert(RelationSyncCache != NULL);
    2062             : 
    2063             :     /* Find cached relation info, creating if not found */
    2064      366992 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
    2065             :                                               &relid,
    2066             :                                               HASH_ENTER, &found);
    2067             :     Assert(entry != NULL);
    2068             : 
    2069             :     /* initialize entry, if it's new */
    2070      366992 :     if (!found)
    2071             :     {
    2072         592 :         entry->replicate_valid = false;
    2073         592 :         entry->schema_sent = false;
    2074         592 :         entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
    2075         592 :         entry->streamed_txns = NIL;
    2076         592 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
    2077         592 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
    2078         592 :         entry->new_slot = NULL;
    2079         592 :         entry->old_slot = NULL;
    2080         592 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2081         592 :         entry->entry_cxt = NULL;
    2082         592 :         entry->publish_as_relid = InvalidOid;
    2083         592 :         entry->columns = NULL;
    2084         592 :         entry->attrmap = NULL;
    2085             :     }
    2086             : 
    2087             :     /* Validate the entry */
    2088      366992 :     if (!entry->replicate_valid)
    2089             :     {
    2090         758 :         Oid         schemaId = get_rel_namespace(relid);
    2091         758 :         List       *pubids = GetRelationPublications(relid);
    2092             : 
    2093             :         /*
    2094             :          * We don't acquire a lock on the namespace system table as we build
    2095             :          * the cache entry using a historic snapshot and all the later changes
    2096             :          * are absorbed while decoding WAL.
    2097             :          */
    2098         758 :         List       *schemaPubids = GetSchemaPublications(schemaId);
    2099             :         ListCell   *lc;
    2100         758 :         Oid         publish_as_relid = relid;
    2101         758 :         int         publish_ancestor_level = 0;
    2102         758 :         bool        am_partition = get_rel_relispartition(relid);
    2103         758 :         char        relkind = get_rel_relkind(relid);
    2104         758 :         List       *rel_publications = NIL;
    2105             : 
    2106             :         /* Reload publications if needed before use. */
    2107         758 :         if (!publications_valid)
    2108             :         {
    2109         406 :             MemoryContextReset(data->pubctx);
    2110             : 
    2111         406 :             oldctx = MemoryContextSwitchTo(data->pubctx);
    2112         406 :             data->publications = LoadPublications(data->publication_names);
    2113         406 :             MemoryContextSwitchTo(oldctx);
    2114         406 :             publications_valid = true;
    2115             :         }
    2116             : 
    2117             :         /*
    2118             :          * Reset schema_sent status as the relation definition may have
    2119             :          * changed.  Also reset pubactions to empty in case rel was dropped
    2120             :          * from a publication.  Also free any objects that depended on the
    2121             :          * earlier definition.
    2122             :          */
    2123         758 :         entry->schema_sent = false;
    2124         758 :         entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
    2125         758 :         list_free(entry->streamed_txns);
    2126         758 :         entry->streamed_txns = NIL;
    2127         758 :         bms_free(entry->columns);
    2128         758 :         entry->columns = NULL;
    2129         758 :         entry->pubactions.pubinsert = false;
    2130         758 :         entry->pubactions.pubupdate = false;
    2131         758 :         entry->pubactions.pubdelete = false;
    2132         758 :         entry->pubactions.pubtruncate = false;
    2133             : 
    2134             :         /*
    2135             :          * Tuple slots cleanups. (Will be rebuilt later if needed).
    2136             :          */
    2137         758 :         if (entry->old_slot)
    2138             :         {
    2139         124 :             TupleDesc   desc = entry->old_slot->tts_tupleDescriptor;
    2140             : 
    2141             :             Assert(desc->tdrefcount == -1);
    2142             : 
    2143         124 :             ExecDropSingleTupleTableSlot(entry->old_slot);
    2144             : 
    2145             :             /*
    2146             :              * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
    2147             :              * do it now to avoid any leaks.
    2148             :              */
    2149         124 :             FreeTupleDesc(desc);
    2150             :         }
    2151         758 :         if (entry->new_slot)
    2152             :         {
    2153         124 :             TupleDesc   desc = entry->new_slot->tts_tupleDescriptor;
    2154             : 
    2155             :             Assert(desc->tdrefcount == -1);
    2156             : 
    2157         124 :             ExecDropSingleTupleTableSlot(entry->new_slot);
    2158             : 
    2159             :             /*
    2160             :              * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
    2161             :              * do it now to avoid any leaks.
    2162             :              */
    2163         124 :             FreeTupleDesc(desc);
    2164             :         }
    2165             : 
    2166         758 :         entry->old_slot = NULL;
    2167         758 :         entry->new_slot = NULL;
    2168             : 
    2169         758 :         if (entry->attrmap)
    2170           6 :             free_attrmap(entry->attrmap);
    2171         758 :         entry->attrmap = NULL;
    2172             : 
    2173             :         /*
    2174             :          * Row filter cache cleanups.
    2175             :          */
    2176         758 :         if (entry->entry_cxt)
    2177         124 :             MemoryContextDelete(entry->entry_cxt);
    2178             : 
    2179         758 :         entry->entry_cxt = NULL;
    2180         758 :         entry->estate = NULL;
    2181         758 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2182             : 
    2183             :         /*
    2184             :          * Build publication cache. We can't use one provided by relcache as
    2185             :          * relcache considers all publications that the given relation is in,
    2186             :          * but here we only need to consider ones that the subscriber
    2187             :          * requested.
    2188             :          */
    2189        1862 :         foreach(lc, data->publications)
    2190             :         {
    2191        1104 :             Publication *pub = lfirst(lc);
    2192        1104 :             bool        publish = false;
    2193             : 
    2194             :             /*
    2195             :              * Under what relid should we publish changes in this publication?
    2196             :              * We'll use the top-most relid across all publications. Also
    2197             :              * track the ancestor level for this publication.
    2198             :              */
    2199        1104 :             Oid         pub_relid = relid;
    2200        1104 :             int         ancestor_level = 0;
    2201             : 
    2202             :             /*
    2203             :              * If this is a FOR ALL TABLES publication, pick the partition
    2204             :              * root and set the ancestor level accordingly.
    2205             :              */
    2206        1104 :             if (pub->alltables)
    2207             :             {
    2208         168 :                 publish = true;
    2209         168 :                 if (pub->pubviaroot && am_partition)
    2210             :                 {
    2211          26 :                     List       *ancestors = get_partition_ancestors(relid);
    2212             : 
    2213          26 :                     pub_relid = llast_oid(ancestors);
    2214          26 :                     ancestor_level = list_length(ancestors);
    2215             :                 }
    2216             :             }
    2217             : 
    2218        1104 :             if (!publish)
    2219             :             {
    2220         936 :                 bool        ancestor_published = false;
    2221             : 
    2222             :                 /*
    2223             :                  * For a partition, check if any of the ancestors are
    2224             :                  * published.  If so, note down the topmost ancestor that is
    2225             :                  * published via this publication, which will be used as the
    2226             :                  * relation via which to publish the partition's changes.
    2227             :                  */
    2228         936 :                 if (am_partition)
    2229             :                 {
    2230             :                     Oid         ancestor;
    2231             :                     int         level;
    2232         242 :                     List       *ancestors = get_partition_ancestors(relid);
    2233             : 
    2234         242 :                     ancestor = GetTopMostAncestorInPublication(pub->oid,
    2235             :                                                                ancestors,
    2236             :                                                                &level);
    2237             : 
    2238         242 :                     if (ancestor != InvalidOid)
    2239             :                     {
    2240          96 :                         ancestor_published = true;
    2241          96 :                         if (pub->pubviaroot)
    2242             :                         {
    2243          50 :                             pub_relid = ancestor;
    2244          50 :                             ancestor_level = level;
    2245             :                         }
    2246             :                     }
    2247             :                 }
    2248             : 
    2249        1462 :                 if (list_member_oid(pubids, pub->oid) ||
    2250        1040 :                     list_member_oid(schemaPubids, pub->oid) ||
    2251             :                     ancestor_published)
    2252         478 :                     publish = true;
    2253             :             }
    2254             : 
    2255             :             /*
    2256             :              * If the relation is to be published, determine actions to
    2257             :              * publish, and list of columns, if appropriate.
    2258             :              *
    2259             :              * Don't publish changes for partitioned tables, because
    2260             :              * publishing those of its partitions suffices, unless partition
    2261             :              * changes won't be published due to pubviaroot being set.
    2262             :              */
    2263        1104 :             if (publish &&
    2264           8 :                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
    2265             :             {
    2266         640 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
    2267         640 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
    2268         640 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
    2269         640 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
    2270             : 
    2271             :                 /*
    2272             :                  * We want to publish the changes as the top-most ancestor
    2273             :                  * across all publications. So we need to check if the already
    2274             :                  * calculated level is higher than the new one. If yes, we can
    2275             :                  * ignore the new value (as it's a child). Otherwise the new
    2276             :                  * value is an ancestor, so we keep it.
    2277             :                  */
    2278         640 :                 if (publish_ancestor_level > ancestor_level)
    2279           2 :                     continue;
    2280             : 
    2281             :                 /*
    2282             :                  * If we found an ancestor higher up in the tree, discard the
    2283             :                  * list of publications through which we replicate it, and use
    2284             :                  * the new ancestor.
    2285             :                  */
    2286         638 :                 if (publish_ancestor_level < ancestor_level)
    2287             :                 {
    2288          76 :                     publish_as_relid = pub_relid;
    2289          76 :                     publish_ancestor_level = ancestor_level;
    2290             : 
    2291             :                     /* reset the publication list for this relation */
    2292          76 :                     rel_publications = NIL;
    2293             :                 }
    2294             :                 else
    2295             :                 {
    2296             :                     /* Same ancestor level, has to be the same OID. */
    2297             :                     Assert(publish_as_relid == pub_relid);
    2298             :                 }
    2299             : 
    2300             :                 /* Track publications for this ancestor. */
    2301         638 :                 rel_publications = lappend(rel_publications, pub);
    2302             :             }
    2303             :         }
    2304             : 
    2305         758 :         entry->publish_as_relid = publish_as_relid;
    2306             : 
    2307             :         /*
    2308             :          * Initialize the tuple slot, map, and row filter. These are only used
    2309             :          * when publishing inserts, updates, or deletes.
    2310             :          */
    2311         758 :         if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
    2312         146 :             entry->pubactions.pubdelete)
    2313             :         {
    2314             :             /* Initialize the tuple slot and map */
    2315         612 :             init_tuple_slot(data, relation, entry);
    2316             : 
    2317             :             /* Initialize the row filter */
    2318         612 :             pgoutput_row_filter_init(data, rel_publications, entry);
    2319             : 
    2320             :             /* Check whether to publish generated columns. */
    2321         612 :             check_and_init_gencol(data, rel_publications, entry);
    2322             : 
    2323             :             /* Initialize the column list */
    2324         612 :             pgoutput_column_list_init(data, rel_publications, entry);
    2325             :         }
    2326             : 
    2327         756 :         list_free(pubids);
    2328         756 :         list_free(schemaPubids);
    2329         756 :         list_free(rel_publications);
    2330             : 
    2331         756 :         entry->replicate_valid = true;
    2332             :     }
    2333             : 
    2334      366990 :     return entry;
    2335             : }
    2336             : 
    2337             : /*
    2338             :  * Cleanup list of streamed transactions and update the schema_sent flag.
    2339             :  *
    2340             :  * When a streamed transaction commits or aborts, we need to remove the
    2341             :  * toplevel XID from the schema cache. If the transaction aborted, the
    2342             :  * subscriber will simply throw away the schema records we streamed, so
    2343             :  * we don't need to do anything else.
    2344             :  *
    2345             :  * If the transaction is committed, the subscriber will update the relation
    2346             :  * cache - so tweak the schema_sent flag accordingly.
    2347             :  */
    2348             : static void
    2349         144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
    2350             : {
    2351             :     HASH_SEQ_STATUS hash_seq;
    2352             :     RelationSyncEntry *entry;
    2353             : 
    2354             :     Assert(RelationSyncCache != NULL);
    2355             : 
    2356         144 :     hash_seq_init(&hash_seq, RelationSyncCache);
    2357         294 :     while ((entry = hash_seq_search(&hash_seq)) != NULL)
    2358             :     {
    2359             :         /*
    2360             :          * We can set the schema_sent flag for an entry that has committed xid
    2361             :          * in the list as that ensures that the subscriber would have the
    2362             :          * corresponding schema and we don't need to send it unless there is
    2363             :          * any invalidation for that relation.
    2364             :          */
    2365         340 :         foreach_xid(streamed_txn, entry->streamed_txns)
    2366             :         {
    2367         148 :             if (xid == streamed_txn)
    2368             :             {
    2369         108 :                 if (is_commit)
    2370          86 :                     entry->schema_sent = true;
    2371             : 
    2372         108 :                 entry->streamed_txns =
    2373         108 :                     foreach_delete_current(entry->streamed_txns, streamed_txn);
    2374         108 :                 break;
    2375             :             }
    2376             :         }
    2377             :     }
    2378         144 : }
    2379             : 
    2380             : /*
    2381             :  * Relcache invalidation callback
    2382             :  */
    2383             : static void
    2384        8052 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
    2385             : {
    2386             :     RelationSyncEntry *entry;
    2387             : 
    2388             :     /*
    2389             :      * We can get here if the plugin was used in SQL interface as the
    2390             :      * RelationSyncCache is destroyed when the decoding finishes, but there is
    2391             :      * no way to unregister the relcache invalidation callback.
    2392             :      */
    2393        8052 :     if (RelationSyncCache == NULL)
    2394          52 :         return;
    2395             : 
    2396             :     /*
    2397             :      * Nobody keeps pointers to entries in this hash table around outside
    2398             :      * logical decoding callback calls - but invalidation events can come in
    2399             :      * *during* a callback if we do any syscache access in the callback.
    2400             :      * Because of that we must mark the cache entry as invalid but not damage
    2401             :      * any of its substructure here.  The next get_rel_sync_entry() call will
    2402             :      * rebuild it all.
    2403             :      */
    2404        8000 :     if (OidIsValid(relid))
    2405             :     {
    2406             :         /*
    2407             :          * Getting invalidations for relations that aren't in the table is
    2408             :          * entirely normal.  So we don't care if it's found or not.
    2409             :          */
    2410        7884 :         entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
    2411             :                                                   HASH_FIND, NULL);
    2412        7884 :         if (entry != NULL)
    2413        1330 :             entry->replicate_valid = false;
    2414             :     }
    2415             :     else
    2416             :     {
    2417             :         /* Whole cache must be flushed. */
    2418             :         HASH_SEQ_STATUS status;
    2419             : 
    2420         116 :         hash_seq_init(&status, RelationSyncCache);
    2421         238 :         while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2422             :         {
    2423         122 :             entry->replicate_valid = false;
    2424             :         }
    2425             :     }
    2426             : }
    2427             : 
    2428             : /*
    2429             :  * Publication relation/schema map syscache invalidation callback
    2430             :  *
    2431             :  * Called for invalidations on pg_namespace.
    2432             :  */
    2433             : static void
    2434          70 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
    2435             : {
    2436             :     HASH_SEQ_STATUS status;
    2437             :     RelationSyncEntry *entry;
    2438             : 
    2439             :     /*
    2440             :      * We can get here if the plugin was used in SQL interface as the
    2441             :      * RelationSyncCache is destroyed when the decoding finishes, but there is
    2442             :      * no way to unregister the invalidation callbacks.
    2443             :      */
    2444          70 :     if (RelationSyncCache == NULL)
    2445          20 :         return;
    2446             : 
    2447             :     /*
    2448             :      * We have no easy way to identify which cache entries this invalidation
    2449             :      * event might have affected, so just mark them all invalid.
    2450             :      */
    2451          50 :     hash_seq_init(&status, RelationSyncCache);
    2452          92 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2453             :     {
    2454          42 :         entry->replicate_valid = false;
    2455             :     }
    2456             : }
    2457             : 
    2458             : /* Send Replication origin */
    2459             : static void
    2460        2220 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
    2461             :                  XLogRecPtr origin_lsn, bool send_origin)
    2462             : {
    2463        2220 :     if (send_origin)
    2464             :     {
    2465             :         char       *origin;
    2466             : 
    2467             :         /*----------
    2468             :          * XXX: which behaviour do we want here?
    2469             :          *
    2470             :          * Alternatives:
    2471             :          *  - don't send origin message if origin name not found
    2472             :          *    (that's what we do now)
    2473             :          *  - throw error - that will break replication, not good
    2474             :          *  - send some special "unknown" origin
    2475             :          *----------
    2476             :          */
    2477          20 :         if (replorigin_by_oid(origin_id, true, &origin))
    2478             :         {
    2479             :             /* Message boundary */
    2480          20 :             OutputPluginWrite(ctx, false);
    2481          20 :             OutputPluginPrepareWrite(ctx, true);
    2482             : 
    2483          20 :             logicalrep_write_origin(ctx->out, origin, origin_lsn);
    2484             :         }
    2485             :     }
    2486        2220 : }

Generated by: LCOV version 1.16