LCOV - code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 705 736 95.8 %
Date: 2023-12-01 20:11:03 Functions: 40 40 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pgoutput.c
       4             :  *      Logical Replication output plugin
       5             :  *
       6             :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/backend/replication/pgoutput/pgoutput.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/tupconvert.h"
      16             : #include "catalog/partition.h"
      17             : #include "catalog/pg_publication.h"
      18             : #include "catalog/pg_publication_rel.h"
      19             : #include "catalog/pg_subscription.h"
      20             : #include "commands/defrem.h"
      21             : #include "commands/subscriptioncmds.h"
      22             : #include "executor/executor.h"
      23             : #include "fmgr.h"
      24             : #include "nodes/makefuncs.h"
      25             : #include "optimizer/optimizer.h"
      26             : #include "parser/parse_relation.h"
      27             : #include "replication/logical.h"
      28             : #include "replication/logicalproto.h"
      29             : #include "replication/origin.h"
      30             : #include "replication/pgoutput.h"
      31             : #include "utils/builtins.h"
      32             : #include "utils/inval.h"
      33             : #include "utils/lsyscache.h"
      34             : #include "utils/memutils.h"
      35             : #include "utils/rel.h"
      36             : #include "utils/syscache.h"
      37             : #include "utils/varlena.h"
      38             : 
      39         782 : PG_MODULE_MAGIC;
      40             : 
      41             : static void pgoutput_startup(LogicalDecodingContext *ctx,
      42             :                              OutputPluginOptions *opt, bool is_init);
      43             : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
      44             : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
      45             :                                ReorderBufferTXN *txn);
      46             : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
      47             :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      48             : static void pgoutput_change(LogicalDecodingContext *ctx,
      49             :                             ReorderBufferTXN *txn, Relation relation,
      50             :                             ReorderBufferChange *change);
      51             : static void pgoutput_truncate(LogicalDecodingContext *ctx,
      52             :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
      53             :                               ReorderBufferChange *change);
      54             : static void pgoutput_message(LogicalDecodingContext *ctx,
      55             :                              ReorderBufferTXN *txn, XLogRecPtr message_lsn,
      56             :                              bool transactional, const char *prefix,
      57             :                              Size sz, const char *message);
      58             : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
      59             :                                    RepOriginId origin_id);
      60             : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
      61             :                                        ReorderBufferTXN *txn);
      62             : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
      63             :                                  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      64             : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
      65             :                                          ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
      66             : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
      67             :                                            ReorderBufferTXN *txn,
      68             :                                            XLogRecPtr prepare_end_lsn,
      69             :                                            TimestampTz prepare_time);
      70             : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
      71             :                                   ReorderBufferTXN *txn);
      72             : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
      73             :                                  ReorderBufferTXN *txn);
      74             : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
      75             :                                   ReorderBufferTXN *txn,
      76             :                                   XLogRecPtr abort_lsn);
      77             : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
      78             :                                    ReorderBufferTXN *txn,
      79             :                                    XLogRecPtr commit_lsn);
      80             : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
      81             :                                         ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
      82             : 
      83             : static bool publications_valid;
      84             : 
      85             : static List *LoadPublications(List *pubnames);
      86             : static void publication_invalidation_cb(Datum arg, int cacheid,
      87             :                                         uint32 hashvalue);
      88             : static void send_relation_and_attrs(Relation relation, TransactionId xid,
      89             :                                     LogicalDecodingContext *ctx,
      90             :                                     Bitmapset *columns);
      91             : static void send_repl_origin(LogicalDecodingContext *ctx,
      92             :                              RepOriginId origin_id, XLogRecPtr origin_lsn,
      93             :                              bool send_origin);
      94             : 
      95             : /*
      96             :  * Only 3 publication actions are used for row filtering ("insert", "update",
      97             :  * "delete"). See RelationSyncEntry.exprstate[].
      98             :  */
      99             : enum RowFilterPubAction
     100             : {
     101             :     PUBACTION_INSERT,
     102             :     PUBACTION_UPDATE,
     103             :     PUBACTION_DELETE,
     104             : };
     105             : 
     106             : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
     107             : 
     108             : /*
     109             :  * Entry in the map used to remember which relation schemas we sent.
     110             :  *
     111             :  * The schema_sent flag determines if the current schema record for the
     112             :  * relation (and for its ancestor if publish_as_relid is set) was already
     113             :  * sent to the subscriber (in which case we don't need to send it again).
     114             :  *
     115             :  * The schema cache on downstream is however updated only at commit time,
     116             :  * and with streamed transactions the commit order may be different from
     117             :  * the order the transactions are sent in. Also, the (sub) transactions
     118             :  * might get aborted so we need to send the schema for each (sub) transaction
     119             :  * so that we don't lose the schema information on abort. For handling this,
     120             :  * we maintain the list of xids (streamed_txns) for those we have already sent
     121             :  * the schema.
     122             :  *
     123             :  * For partitions, 'pubactions' considers not only the table's own
     124             :  * publications, but also those of all of its ancestors.
     125             :  */
     126             : typedef struct RelationSyncEntry
     127             : {
     128             :     Oid         relid;          /* relation oid */
     129             : 
     130             :     bool        replicate_valid;    /* overall validity flag for entry */
     131             : 
     132             :     bool        schema_sent;
     133             :     List       *streamed_txns;  /* streamed toplevel transactions with this
     134             :                                  * schema */
     135             : 
     136             :     /* are we publishing this rel? */
     137             :     PublicationActions pubactions;
     138             : 
     139             :     /*
     140             :      * ExprState array for row filter. Different publication actions don't
     141             :      * allow multiple expressions to always be combined into one, because
     142             :      * updates or deletes restrict the column in expression to be part of the
     143             :      * replica identity index whereas inserts do not have this restriction, so
     144             :      * there is one ExprState per publication action.
     145             :      */
     146             :     ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
     147             :     EState     *estate;         /* executor state used for row filter */
     148             :     TupleTableSlot *new_slot;   /* slot for storing new tuple */
     149             :     TupleTableSlot *old_slot;   /* slot for storing old tuple */
     150             : 
     151             :     /*
     152             :      * OID of the relation to publish changes as.  For a partition, this may
     153             :      * be set to one of its ancestors whose schema will be used when
     154             :      * replicating changes, if publish_via_partition_root is set for the
     155             :      * publication.
     156             :      */
     157             :     Oid         publish_as_relid;
     158             : 
     159             :     /*
     160             :      * Map used when replicating using an ancestor's schema to convert tuples
     161             :      * from partition's type to the ancestor's; NULL if publish_as_relid is
     162             :      * same as 'relid' or if unnecessary due to partition and the ancestor
     163             :      * having identical TupleDesc.
     164             :      */
     165             :     AttrMap    *attrmap;
     166             : 
     167             :     /*
     168             :      * Columns included in the publication, or NULL if all columns are
     169             :      * included implicitly.  Note that the attnums in this bitmap are not
     170             :      * shifted by FirstLowInvalidHeapAttributeNumber.
     171             :      */
     172             :     Bitmapset  *columns;
     173             : 
     174             :     /*
     175             :      * Private context to store additional data for this entry - state for the
     176             :      * row filter expressions, column list, etc.
     177             :      */
     178             :     MemoryContext entry_cxt;
     179             : } RelationSyncEntry;
     180             : 
     181             : /*
     182             :  * Maintain a per-transaction level variable to track whether the transaction
     183             :  * has sent BEGIN. BEGIN is only sent when the first change in a transaction
     184             :  * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
     185             :  * messages for empty transactions which saves network bandwidth.
     186             :  *
     187             :  * This optimization is not used for prepared transactions because if the
     188             :  * WALSender restarts after prepare of a transaction and before commit prepared
     189             :  * of the same transaction then we won't be able to figure out if we have
     190             :  * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
     191             :  * because we would have lost the in-memory txndata information that was
     192             :  * present prior to the restart. This will result in sending a spurious
     193             :  * COMMIT PREPARED without a corresponding prepared transaction at the
     194             :  * downstream which would lead to an error when it tries to process it.
     195             :  *
     196             :  * XXX We could achieve this optimization by changing protocol to send
     197             :  * additional information so that downstream can detect that the corresponding
     198             :  * prepare has not been sent. However, adding such a check for every
     199             :  * transaction in the downstream could be costly so we might want to do it
     200             :  * optionally.
     201             :  *
     202             :  * We also don't have this optimization for streamed transactions because
     203             :  * they can contain prepared transactions.
     204             :  */
     205             : typedef struct PGOutputTxnData
     206             : {
     207             :     bool        sent_begin_txn; /* flag indicating whether BEGIN has been sent */
     208             : } PGOutputTxnData;
     209             : 
     210             : /* Map used to remember which relation schemas we sent. */
     211             : static HTAB *RelationSyncCache = NULL;
     212             : 
     213             : static void init_rel_sync_cache(MemoryContext cachectx);
     214             : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
     215             : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
     216             :                                              Relation relation);
     217             : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
     218             : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
     219             :                                           uint32 hashvalue);
     220             : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     221             :                                             TransactionId xid);
     222             : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
     223             :                                             TransactionId xid);
     224             : static void init_tuple_slot(PGOutputData *data, Relation relation,
     225             :                             RelationSyncEntry *entry);
     226             : 
     227             : /* row filter routines */
     228             : static EState *create_estate_for_relation(Relation rel);
     229             : static void pgoutput_row_filter_init(PGOutputData *data,
     230             :                                      List *publications,
     231             :                                      RelationSyncEntry *entry);
     232             : static bool pgoutput_row_filter_exec_expr(ExprState *state,
     233             :                                           ExprContext *econtext);
     234             : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
     235             :                                 TupleTableSlot **new_slot_ptr,
     236             :                                 RelationSyncEntry *entry,
     237             :                                 ReorderBufferChangeType *action);
     238             : 
     239             : /* column list routines */
     240             : static void pgoutput_column_list_init(PGOutputData *data,
     241             :                                       List *publications,
     242             :                                       RelationSyncEntry *entry);
     243             : 
     244             : /*
     245             :  * Specify output plugin callbacks
     246             :  */
     247             : void
     248        1102 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
     249             : {
     250        1102 :     cb->startup_cb = pgoutput_startup;
     251        1102 :     cb->begin_cb = pgoutput_begin_txn;
     252        1102 :     cb->change_cb = pgoutput_change;
     253        1102 :     cb->truncate_cb = pgoutput_truncate;
     254        1102 :     cb->message_cb = pgoutput_message;
     255        1102 :     cb->commit_cb = pgoutput_commit_txn;
     256             : 
     257        1102 :     cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
     258        1102 :     cb->prepare_cb = pgoutput_prepare_txn;
     259        1102 :     cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
     260        1102 :     cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
     261        1102 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
     262        1102 :     cb->shutdown_cb = pgoutput_shutdown;
     263             : 
     264             :     /* transaction streaming */
     265        1102 :     cb->stream_start_cb = pgoutput_stream_start;
     266        1102 :     cb->stream_stop_cb = pgoutput_stream_stop;
     267        1102 :     cb->stream_abort_cb = pgoutput_stream_abort;
     268        1102 :     cb->stream_commit_cb = pgoutput_stream_commit;
     269        1102 :     cb->stream_change_cb = pgoutput_change;
     270        1102 :     cb->stream_message_cb = pgoutput_message;
     271        1102 :     cb->stream_truncate_cb = pgoutput_truncate;
     272             :     /* transaction streaming - two-phase commit */
     273        1102 :     cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
     274        1102 : }
     275             : 
     276             : static void
     277         608 : parse_output_parameters(List *options, PGOutputData *data)
     278             : {
     279             :     ListCell   *lc;
     280         608 :     bool        protocol_version_given = false;
     281         608 :     bool        publication_names_given = false;
     282         608 :     bool        binary_option_given = false;
     283         608 :     bool        messages_option_given = false;
     284         608 :     bool        streaming_given = false;
     285         608 :     bool        two_phase_option_given = false;
     286         608 :     bool        origin_option_given = false;
     287             : 
     288         608 :     data->binary = false;
     289         608 :     data->streaming = LOGICALREP_STREAM_OFF;
     290         608 :     data->messages = false;
     291         608 :     data->two_phase = false;
     292             : 
     293        2528 :     foreach(lc, options)
     294             :     {
     295        1920 :         DefElem    *defel = (DefElem *) lfirst(lc);
     296             : 
     297             :         Assert(defel->arg == NULL || IsA(defel->arg, String));
     298             : 
     299             :         /* Check each param, whether or not we recognize it */
     300        1920 :         if (strcmp(defel->defname, "proto_version") == 0)
     301             :         {
     302             :             unsigned long parsed;
     303             :             char       *endptr;
     304             : 
     305         608 :             if (protocol_version_given)
     306           0 :                 ereport(ERROR,
     307             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     308             :                          errmsg("conflicting or redundant options")));
     309         608 :             protocol_version_given = true;
     310             : 
     311         608 :             errno = 0;
     312         608 :             parsed = strtoul(strVal(defel->arg), &endptr, 10);
     313         608 :             if (errno != 0 || *endptr != '\0')
     314           0 :                 ereport(ERROR,
     315             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     316             :                          errmsg("invalid proto_version")));
     317             : 
     318         608 :             if (parsed > PG_UINT32_MAX)
     319           0 :                 ereport(ERROR,
     320             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     321             :                          errmsg("proto_version \"%s\" out of range",
     322             :                                 strVal(defel->arg))));
     323             : 
     324         608 :             data->protocol_version = (uint32) parsed;
     325             :         }
     326        1312 :         else if (strcmp(defel->defname, "publication_names") == 0)
     327             :         {
     328         608 :             if (publication_names_given)
     329           0 :                 ereport(ERROR,
     330             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     331             :                          errmsg("conflicting or redundant options")));
     332         608 :             publication_names_given = true;
     333             : 
     334         608 :             if (!SplitIdentifierString(strVal(defel->arg), ',',
     335             :                                        &data->publication_names))
     336           0 :                 ereport(ERROR,
     337             :                         (errcode(ERRCODE_INVALID_NAME),
     338             :                          errmsg("invalid publication_names syntax")));
     339             :         }
     340         704 :         else if (strcmp(defel->defname, "binary") == 0)
     341             :         {
     342          20 :             if (binary_option_given)
     343           0 :                 ereport(ERROR,
     344             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     345             :                          errmsg("conflicting or redundant options")));
     346          20 :             binary_option_given = true;
     347             : 
     348          20 :             data->binary = defGetBoolean(defel);
     349             :         }
     350         684 :         else if (strcmp(defel->defname, "messages") == 0)
     351             :         {
     352           8 :             if (messages_option_given)
     353           0 :                 ereport(ERROR,
     354             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     355             :                          errmsg("conflicting or redundant options")));
     356           8 :             messages_option_given = true;
     357             : 
     358           8 :             data->messages = defGetBoolean(defel);
     359             :         }
     360         676 :         else if (strcmp(defel->defname, "streaming") == 0)
     361             :         {
     362          70 :             if (streaming_given)
     363           0 :                 ereport(ERROR,
     364             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     365             :                          errmsg("conflicting or redundant options")));
     366          70 :             streaming_given = true;
     367             : 
     368          70 :             data->streaming = defGetStreamingMode(defel);
     369             :         }
     370         606 :         else if (strcmp(defel->defname, "two_phase") == 0)
     371             :         {
     372          12 :             if (two_phase_option_given)
     373           0 :                 ereport(ERROR,
     374             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     375             :                          errmsg("conflicting or redundant options")));
     376          12 :             two_phase_option_given = true;
     377             : 
     378          12 :             data->two_phase = defGetBoolean(defel);
     379             :         }
     380         594 :         else if (strcmp(defel->defname, "origin") == 0)
     381             :         {
     382             :             char       *origin;
     383             : 
     384         594 :             if (origin_option_given)
     385           0 :                 ereport(ERROR,
     386             :                         errcode(ERRCODE_SYNTAX_ERROR),
     387             :                         errmsg("conflicting or redundant options"));
     388         594 :             origin_option_given = true;
     389             : 
     390         594 :             origin = defGetString(defel);
     391         594 :             if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
     392          20 :                 data->publish_no_origin = true;
     393         574 :             else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
     394         574 :                 data->publish_no_origin = false;
     395             :             else
     396           0 :                 ereport(ERROR,
     397             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     398             :                         errmsg("unrecognized origin value: \"%s\"", origin));
     399             :         }
     400             :         else
     401           0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
     402             :     }
     403         608 : }
     404             : 
     405             : /*
     406             :  * Initialize this plugin
     407             :  */
     408             : static void
     409        1102 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
     410             :                  bool is_init)
     411             : {
     412        1102 :     PGOutputData *data = palloc0(sizeof(PGOutputData));
     413             :     static bool publication_callback_registered = false;
     414             : 
     415             :     /* Create our memory context for private allocations. */
     416        1102 :     data->context = AllocSetContextCreate(ctx->context,
     417             :                                           "logical replication output context",
     418             :                                           ALLOCSET_DEFAULT_SIZES);
     419             : 
     420        1102 :     data->cachectx = AllocSetContextCreate(ctx->context,
     421             :                                            "logical replication cache context",
     422             :                                            ALLOCSET_DEFAULT_SIZES);
     423             : 
     424        1102 :     ctx->output_plugin_private = data;
     425             : 
     426             :     /* This plugin uses binary protocol. */
     427        1102 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
     428             : 
     429             :     /*
     430             :      * This is replication start and not slot initialization.
     431             :      *
     432             :      * Parse and validate options passed by the client.
     433             :      */
     434        1102 :     if (!is_init)
     435             :     {
     436             :         /* Parse the params and ERROR if we see any we don't recognize */
     437         608 :         parse_output_parameters(ctx->output_plugin_options, data);
     438             : 
     439             :         /* Check if we support requested protocol */
     440         608 :         if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
     441           0 :             ereport(ERROR,
     442             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     443             :                      errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
     444             :                             data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
     445             : 
     446         608 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
     447           0 :             ereport(ERROR,
     448             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     449             :                      errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
     450             :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
     451             : 
     452         608 :         if (data->publication_names == NIL)
     453           0 :             ereport(ERROR,
     454             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     455             :                      errmsg("publication_names parameter missing")));
     456             : 
     457             :         /*
     458             :          * Decide whether to enable streaming. It is disabled by default, in
     459             :          * which case we just update the flag in decoding context. Otherwise
     460             :          * we only allow it with sufficient version of the protocol, and when
     461             :          * the output plugin supports it.
     462             :          */
     463         608 :         if (data->streaming == LOGICALREP_STREAM_OFF)
     464         538 :             ctx->streaming = false;
     465          70 :         else if (data->streaming == LOGICALREP_STREAM_ON &&
     466          54 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
     467           0 :             ereport(ERROR,
     468             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     469             :                      errmsg("requested proto_version=%d does not support streaming, need %d or higher",
     470             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
     471          70 :         else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
     472          16 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
     473           0 :             ereport(ERROR,
     474             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     475             :                      errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
     476             :                             data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
     477          70 :         else if (!ctx->streaming)
     478           0 :             ereport(ERROR,
     479             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     480             :                      errmsg("streaming requested, but not supported by output plugin")));
     481             : 
     482             :         /*
     483             :          * Here, we just check whether the two-phase option is passed by
     484             :          * plugin and decide whether to enable it at later point of time. It
     485             :          * remains enabled if the previous start-up has done so. But we only
     486             :          * allow the option to be passed in with sufficient version of the
     487             :          * protocol, and when the output plugin supports it.
     488             :          */
     489         608 :         if (!data->two_phase)
     490         596 :             ctx->twophase_opt_given = false;
     491          12 :         else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
     492           0 :             ereport(ERROR,
     493             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     494             :                      errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
     495             :                             data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
     496          12 :         else if (!ctx->twophase)
     497           0 :             ereport(ERROR,
     498             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     499             :                      errmsg("two-phase commit requested, but not supported by output plugin")));
     500             :         else
     501          12 :             ctx->twophase_opt_given = true;
     502             : 
     503             :         /* Init publication state. */
     504         608 :         data->publications = NIL;
     505         608 :         publications_valid = false;
     506             : 
     507             :         /*
     508             :          * Register callback for pg_publication if we didn't already do that
     509             :          * during some previous call in this process.
     510             :          */
     511         608 :         if (!publication_callback_registered)
     512             :         {
     513         606 :             CacheRegisterSyscacheCallback(PUBLICATIONOID,
     514             :                                           publication_invalidation_cb,
     515             :                                           (Datum) 0);
     516         606 :             publication_callback_registered = true;
     517             :         }
     518             : 
     519             :         /* Initialize relation schema cache. */
     520         608 :         init_rel_sync_cache(CacheMemoryContext);
     521             :     }
     522             :     else
     523             :     {
     524             :         /*
     525             :          * Disable the streaming and prepared transactions during the slot
     526             :          * initialization mode.
     527             :          */
     528         494 :         ctx->streaming = false;
     529         494 :         ctx->twophase = false;
     530             :     }
     531        1102 : }
     532             : 
     533             : /*
     534             :  * BEGIN callback.
     535             :  *
     536             :  * Don't send the BEGIN message here instead postpone it until the first
     537             :  * change. In logical replication, a common scenario is to replicate a set of
     538             :  * tables (instead of all tables) and transactions whose changes were on
     539             :  * the table(s) that are not published will produce empty transactions. These
     540             :  * empty transactions will send BEGIN and COMMIT messages to subscribers,
     541             :  * using bandwidth on something with little/no use for logical replication.
     542             :  */
     543             : static void
     544        1232 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     545             : {
     546        1232 :     PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
     547             :                                                       sizeof(PGOutputTxnData));
     548             : 
     549        1232 :     txn->output_plugin_private = txndata;
     550        1232 : }
     551             : 
     552             : /*
     553             :  * Send BEGIN.
     554             :  *
     555             :  * This is called while processing the first change of the transaction.
     556             :  */
     557             : static void
     558         694 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     559             : {
     560         694 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     561         694 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     562             : 
     563             :     Assert(txndata);
     564             :     Assert(!txndata->sent_begin_txn);
     565             : 
     566         694 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     567         694 :     logicalrep_write_begin(ctx->out, txn);
     568         694 :     txndata->sent_begin_txn = true;
     569             : 
     570         694 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     571             :                      send_replication_origin);
     572             : 
     573         694 :     OutputPluginWrite(ctx, true);
     574         692 : }
     575             : 
     576             : /*
     577             :  * COMMIT callback
     578             :  */
     579             : static void
     580        1220 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     581             :                     XLogRecPtr commit_lsn)
     582             : {
     583        1220 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
     584             :     bool        sent_begin_txn;
     585             : 
     586             :     Assert(txndata);
     587             : 
     588             :     /*
     589             :      * We don't need to send the commit message unless some relevant change
     590             :      * from this transaction has been sent to the downstream.
     591             :      */
     592        1220 :     sent_begin_txn = txndata->sent_begin_txn;
     593        1220 :     OutputPluginUpdateProgress(ctx, !sent_begin_txn);
     594        1220 :     pfree(txndata);
     595        1220 :     txn->output_plugin_private = NULL;
     596             : 
     597        1220 :     if (!sent_begin_txn)
     598             :     {
     599         532 :         elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
     600         532 :         return;
     601             :     }
     602             : 
     603         688 :     OutputPluginPrepareWrite(ctx, true);
     604         688 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
     605         688 :     OutputPluginWrite(ctx, true);
     606             : }
     607             : 
     608             : /*
     609             :  * BEGIN PREPARE callback
     610             :  */
     611             : static void
     612          36 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
     613             : {
     614          36 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
     615             : 
     616          36 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
     617          36 :     logicalrep_write_begin_prepare(ctx->out, txn);
     618             : 
     619          36 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
     620             :                      send_replication_origin);
     621             : 
     622          36 :     OutputPluginWrite(ctx, true);
     623          36 : }
     624             : 
     625             : /*
     626             :  * PREPARE callback
     627             :  */
     628             : static void
     629          36 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     630             :                      XLogRecPtr prepare_lsn)
     631             : {
     632          36 :     OutputPluginUpdateProgress(ctx, false);
     633             : 
     634          36 :     OutputPluginPrepareWrite(ctx, true);
     635          36 :     logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
     636          36 :     OutputPluginWrite(ctx, true);
     637          36 : }
     638             : 
     639             : /*
     640             :  * COMMIT PREPARED callback
     641             :  */
     642             : static void
     643          46 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
     644             :                              XLogRecPtr commit_lsn)
     645             : {
     646          46 :     OutputPluginUpdateProgress(ctx, false);
     647             : 
     648          46 :     OutputPluginPrepareWrite(ctx, true);
     649          46 :     logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
     650          46 :     OutputPluginWrite(ctx, true);
     651          46 : }
     652             : 
     653             : /*
     654             :  * ROLLBACK PREPARED callback
     655             :  */
     656             : static void
     657          18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
     658             :                                ReorderBufferTXN *txn,
     659             :                                XLogRecPtr prepare_end_lsn,
     660             :                                TimestampTz prepare_time)
     661             : {
     662          18 :     OutputPluginUpdateProgress(ctx, false);
     663             : 
     664          18 :     OutputPluginPrepareWrite(ctx, true);
     665          18 :     logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
     666             :                                        prepare_time);
     667          18 :     OutputPluginWrite(ctx, true);
     668          18 : }
     669             : 
     670             : /*
     671             :  * Write the current schema of the relation and its ancestor (if any) if not
     672             :  * done yet.
     673             :  */
     674             : static void
     675      364042 : maybe_send_schema(LogicalDecodingContext *ctx,
     676             :                   ReorderBufferChange *change,
     677             :                   Relation relation, RelationSyncEntry *relentry)
     678             : {
     679      364042 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
     680             :     bool        schema_sent;
     681      364042 :     TransactionId xid = InvalidTransactionId;
     682      364042 :     TransactionId topxid = InvalidTransactionId;
     683             : 
     684             :     /*
     685             :      * Remember XID of the (sub)transaction for the change. We don't care if
     686             :      * it's top-level transaction or not (we have already sent that XID in
     687             :      * start of the current streaming block).
     688             :      *
     689             :      * If we're not in a streaming block, just use InvalidTransactionId and
     690             :      * the write methods will not include it.
     691             :      */
     692      364042 :     if (data->in_streaming)
     693      351882 :         xid = change->txn->xid;
     694             : 
     695      364042 :     if (rbtxn_is_subtxn(change->txn))
     696       20338 :         topxid = rbtxn_get_toptxn(change->txn)->xid;
     697             :     else
     698      343704 :         topxid = xid;
     699             : 
     700             :     /*
     701             :      * Do we need to send the schema? We do track streamed transactions
     702             :      * separately, because those may be applied later (and the regular
     703             :      * transactions won't see their effects until then) and in an order that
     704             :      * we don't know at this point.
     705             :      *
     706             :      * XXX There is a scope of optimization here. Currently, we always send
     707             :      * the schema first time in a streaming transaction but we can probably
     708             :      * avoid that by checking 'relentry->schema_sent' flag. However, before
     709             :      * doing that we need to study its impact on the case where we have a mix
     710             :      * of streaming and non-streaming transactions.
     711             :      */
     712      364042 :     if (data->in_streaming)
     713      351882 :         schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
     714             :     else
     715       12160 :         schema_sent = relentry->schema_sent;
     716             : 
     717             :     /* Nothing to do if we already sent the schema. */
     718      364042 :     if (schema_sent)
     719      363492 :         return;
     720             : 
     721             :     /*
     722             :      * Send the schema.  If the changes will be published using an ancestor's
     723             :      * schema, not the relation's own, send that ancestor's schema before
     724             :      * sending relation's own (XXX - maybe sending only the former suffices?).
     725             :      */
     726         550 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
     727             :     {
     728          56 :         Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
     729             : 
     730          56 :         send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
     731          52 :         RelationClose(ancestor);
     732             :     }
     733             : 
     734         546 :     send_relation_and_attrs(relation, xid, ctx, relentry->columns);
     735             : 
     736         546 :     if (data->in_streaming)
     737         144 :         set_schema_sent_in_streamed_txn(relentry, topxid);
     738             :     else
     739         402 :         relentry->schema_sent = true;
     740             : }
     741             : 
     742             : /*
     743             :  * Sends a relation
     744             :  */
     745             : static void
     746         602 : send_relation_and_attrs(Relation relation, TransactionId xid,
     747             :                         LogicalDecodingContext *ctx,
     748             :                         Bitmapset *columns)
     749             : {
     750         602 :     TupleDesc   desc = RelationGetDescr(relation);
     751             :     int         i;
     752             : 
     753             :     /*
     754             :      * Write out type info if needed.  We do that only for user-created types.
     755             :      * We use FirstGenbkiObjectId as the cutoff, so that we only consider
     756             :      * objects with hand-assigned OIDs to be "built in", not for instance any
     757             :      * function or type defined in the information_schema. This is important
     758             :      * because only hand-assigned OIDs can be expected to remain stable across
     759             :      * major versions.
     760             :      */
     761        1810 :     for (i = 0; i < desc->natts; i++)
     762             :     {
     763        1208 :         Form_pg_attribute att = TupleDescAttr(desc, i);
     764             : 
     765        1208 :         if (att->attisdropped || att->attgenerated)
     766          10 :             continue;
     767             : 
     768        1198 :         if (att->atttypid < FirstGenbkiObjectId)
     769        1162 :             continue;
     770             : 
     771             :         /* Skip this attribute if it's not present in the column list */
     772          36 :         if (columns != NULL && !bms_is_member(att->attnum, columns))
     773           0 :             continue;
     774             : 
     775          36 :         OutputPluginPrepareWrite(ctx, false);
     776          36 :         logicalrep_write_typ(ctx->out, xid, att->atttypid);
     777          36 :         OutputPluginWrite(ctx, false);
     778             :     }
     779             : 
     780         602 :     OutputPluginPrepareWrite(ctx, false);
     781         602 :     logicalrep_write_rel(ctx->out, xid, relation, columns);
     782         602 :     OutputPluginWrite(ctx, false);
     783         598 : }
     784             : 
     785             : /*
     786             :  * Executor state preparation for evaluation of row filter expressions for the
     787             :  * specified relation.
     788             :  */
     789             : static EState *
     790          32 : create_estate_for_relation(Relation rel)
     791             : {
     792             :     EState     *estate;
     793             :     RangeTblEntry *rte;
     794          32 :     List       *perminfos = NIL;
     795             : 
     796          32 :     estate = CreateExecutorState();
     797             : 
     798          32 :     rte = makeNode(RangeTblEntry);
     799          32 :     rte->rtekind = RTE_RELATION;
     800          32 :     rte->relid = RelationGetRelid(rel);
     801          32 :     rte->relkind = rel->rd_rel->relkind;
     802          32 :     rte->rellockmode = AccessShareLock;
     803             : 
     804          32 :     addRTEPermissionInfo(&perminfos, rte);
     805             : 
     806          32 :     ExecInitRangeTable(estate, list_make1(rte), perminfos);
     807             : 
     808          32 :     estate->es_output_cid = GetCurrentCommandId(false);
     809             : 
     810          32 :     return estate;
     811             : }
     812             : 
     813             : /*
     814             :  * Evaluates row filter.
     815             :  *
     816             :  * If the row filter evaluates to NULL, it is taken as false i.e. the change
     817             :  * isn't replicated.
     818             :  */
     819             : static bool
     820          72 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
     821             : {
     822             :     Datum       ret;
     823             :     bool        isnull;
     824             : 
     825             :     Assert(state != NULL);
     826             : 
     827          72 :     ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
     828             : 
     829          72 :     elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
     830             :          isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
     831             :          isnull ? "true" : "false");
     832             : 
     833          72 :     if (isnull)
     834           2 :         return false;
     835             : 
     836          70 :     return DatumGetBool(ret);
     837             : }
     838             : 
     839             : /*
     840             :  * Make sure the per-entry memory context exists.
     841             :  */
     842             : static void
     843         106 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
     844             : {
     845             :     Relation    relation;
     846             : 
     847             :     /* The context may already exist, in which case bail out. */
     848         106 :     if (entry->entry_cxt)
     849           4 :         return;
     850             : 
     851         102 :     relation = RelationIdGetRelation(entry->publish_as_relid);
     852             : 
     853         102 :     entry->entry_cxt = AllocSetContextCreate(data->cachectx,
     854             :                                              "entry private context",
     855             :                                              ALLOCSET_SMALL_SIZES);
     856             : 
     857         102 :     MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
     858             :                                       RelationGetRelationName(relation));
     859             : }
     860             : 
     861             : /*
     862             :  * Initialize the row filter.
     863             :  */
     864             : static void
     865         470 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
     866             :                          RelationSyncEntry *entry)
     867             : {
     868             :     ListCell   *lc;
     869         470 :     List       *rfnodes[] = {NIL, NIL, NIL};    /* One per pubaction */
     870         470 :     bool        no_filter[] = {false, false, false};    /* One per pubaction */
     871             :     MemoryContext oldctx;
     872             :     int         idx;
     873         470 :     bool        has_filter = true;
     874         470 :     Oid         schemaid = get_rel_namespace(entry->publish_as_relid);
     875             : 
     876             :     /*
     877             :      * Find if there are any row filters for this relation. If there are, then
     878             :      * prepare the necessary ExprState and cache it in entry->exprstate. To
     879             :      * build an expression state, we need to ensure the following:
     880             :      *
     881             :      * All the given publication-table mappings must be checked.
     882             :      *
     883             :      * Multiple publications might have multiple row filters for this
     884             :      * relation. Since row filter usage depends on the DML operation, there
     885             :      * are multiple lists (one for each operation) to which row filters will
     886             :      * be appended.
     887             :      *
     888             :      * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
     889             :      * expression" so it takes precedence.
     890             :      */
     891         510 :     foreach(lc, publications)
     892             :     {
     893         478 :         Publication *pub = lfirst(lc);
     894         478 :         HeapTuple   rftuple = NULL;
     895         478 :         Datum       rfdatum = 0;
     896         478 :         bool        pub_no_filter = true;
     897             : 
     898             :         /*
     899             :          * If the publication is FOR ALL TABLES, or the publication includes a
     900             :          * FOR TABLES IN SCHEMA where the table belongs to the referred
     901             :          * schema, then it is treated the same as if there are no row filters
     902             :          * (even if other publications have a row filter).
     903             :          */
     904         478 :         if (!pub->alltables &&
     905         350 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
     906             :                                    ObjectIdGetDatum(schemaid),
     907             :                                    ObjectIdGetDatum(pub->oid)))
     908             :         {
     909             :             /*
     910             :              * Check for the presence of a row filter in this publication.
     911             :              */
     912         338 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
     913             :                                       ObjectIdGetDatum(entry->publish_as_relid),
     914             :                                       ObjectIdGetDatum(pub->oid));
     915             : 
     916         338 :             if (HeapTupleIsValid(rftuple))
     917             :             {
     918             :                 /* Null indicates no filter. */
     919         314 :                 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
     920             :                                           Anum_pg_publication_rel_prqual,
     921             :                                           &pub_no_filter);
     922             :             }
     923             :         }
     924             : 
     925         478 :         if (pub_no_filter)
     926             :         {
     927         452 :             if (rftuple)
     928         288 :                 ReleaseSysCache(rftuple);
     929             : 
     930         452 :             no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
     931         452 :             no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
     932         452 :             no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
     933             : 
     934             :             /*
     935             :              * Quick exit if all the DML actions are publicized via this
     936             :              * publication.
     937             :              */
     938         452 :             if (no_filter[PUBACTION_INSERT] &&
     939         452 :                 no_filter[PUBACTION_UPDATE] &&
     940         438 :                 no_filter[PUBACTION_DELETE])
     941             :             {
     942         438 :                 has_filter = false;
     943         438 :                 break;
     944             :             }
     945             : 
     946             :             /* No additional work for this publication. Next one. */
     947          14 :             continue;
     948             :         }
     949             : 
     950             :         /* Form the per pubaction row filter lists. */
     951          26 :         if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
     952          26 :             rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
     953          26 :                                                 TextDatumGetCString(rfdatum));
     954          26 :         if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
     955          26 :             rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
     956          26 :                                                 TextDatumGetCString(rfdatum));
     957          26 :         if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
     958          26 :             rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
     959          26 :                                                 TextDatumGetCString(rfdatum));
     960             : 
     961          26 :         ReleaseSysCache(rftuple);
     962             :     }                           /* loop all subscribed publications */
     963             : 
     964             :     /* Clean the row filter */
     965        1880 :     for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
     966             :     {
     967        1410 :         if (no_filter[idx])
     968             :         {
     969        1332 :             list_free_deep(rfnodes[idx]);
     970        1332 :             rfnodes[idx] = NIL;
     971             :         }
     972             :     }
     973             : 
     974         470 :     if (has_filter)
     975             :     {
     976          32 :         Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
     977             : 
     978          32 :         pgoutput_ensure_entry_cxt(data, entry);
     979             : 
     980             :         /*
     981             :          * Now all the filters for all pubactions are known. Combine them when
     982             :          * their pubactions are the same.
     983             :          */
     984          32 :         oldctx = MemoryContextSwitchTo(entry->entry_cxt);
     985          32 :         entry->estate = create_estate_for_relation(relation);
     986         128 :         for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
     987             :         {
     988          96 :             List       *filters = NIL;
     989             :             Expr       *rfnode;
     990             : 
     991          96 :             if (rfnodes[idx] == NIL)
     992          42 :                 continue;
     993             : 
     994         114 :             foreach(lc, rfnodes[idx])
     995          60 :                 filters = lappend(filters, stringToNode((char *) lfirst(lc)));
     996             : 
     997             :             /* combine the row filter and cache the ExprState */
     998          54 :             rfnode = make_orclause(filters);
     999          54 :             entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
    1000             :         }                       /* for each pubaction */
    1001          32 :         MemoryContextSwitchTo(oldctx);
    1002             : 
    1003          32 :         RelationClose(relation);
    1004             :     }
    1005         470 : }
    1006             : 
    1007             : /*
    1008             :  * Initialize the column list.
    1009             :  */
    1010             : static void
    1011         470 : pgoutput_column_list_init(PGOutputData *data, List *publications,
    1012             :                           RelationSyncEntry *entry)
    1013             : {
    1014             :     ListCell   *lc;
    1015         470 :     bool        first = true;
    1016         470 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
    1017             : 
    1018             :     /*
    1019             :      * Find if there are any column lists for this relation. If there are,
    1020             :      * build a bitmap using the column lists.
    1021             :      *
    1022             :      * Multiple publications might have multiple column lists for this
    1023             :      * relation.
    1024             :      *
    1025             :      * Note that we don't support the case where the column list is different
    1026             :      * for the same table when combining publications. See comments atop
    1027             :      * fetch_table_list. But one can later change the publication so we still
    1028             :      * need to check all the given publication-table mappings and report an
    1029             :      * error if any publications have a different column list.
    1030             :      *
    1031             :      * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
    1032             :      */
    1033         960 :     foreach(lc, publications)
    1034             :     {
    1035         492 :         Publication *pub = lfirst(lc);
    1036         492 :         HeapTuple   cftuple = NULL;
    1037         492 :         Datum       cfdatum = 0;
    1038         492 :         Bitmapset  *cols = NULL;
    1039             : 
    1040             :         /*
    1041             :          * If the publication is FOR ALL TABLES then it is treated the same as
    1042             :          * if there are no column lists (even if other publications have a
    1043             :          * list).
    1044             :          */
    1045         492 :         if (!pub->alltables)
    1046             :         {
    1047         362 :             bool        pub_no_list = true;
    1048             : 
    1049             :             /*
    1050             :              * Check for the presence of a column list in this publication.
    1051             :              *
    1052             :              * Note: If we find no pg_publication_rel row, it's a publication
    1053             :              * defined for a whole schema, so it can't have a column list,
    1054             :              * just like a FOR ALL TABLES publication.
    1055             :              */
    1056         362 :             cftuple = SearchSysCache2(PUBLICATIONRELMAP,
    1057             :                                       ObjectIdGetDatum(entry->publish_as_relid),
    1058             :                                       ObjectIdGetDatum(pub->oid));
    1059             : 
    1060         362 :             if (HeapTupleIsValid(cftuple))
    1061             :             {
    1062             :                 /* Lookup the column list attribute. */
    1063         324 :                 cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
    1064             :                                           Anum_pg_publication_rel_prattrs,
    1065             :                                           &pub_no_list);
    1066             : 
    1067             :                 /* Build the column list bitmap in the per-entry context. */
    1068         324 :                 if (!pub_no_list)   /* when not null */
    1069             :                 {
    1070             :                     int         i;
    1071          74 :                     int         nliveatts = 0;
    1072          74 :                     TupleDesc   desc = RelationGetDescr(relation);
    1073             : 
    1074          74 :                     pgoutput_ensure_entry_cxt(data, entry);
    1075             : 
    1076          74 :                     cols = pub_collist_to_bitmapset(cols, cfdatum,
    1077             :                                                     entry->entry_cxt);
    1078             : 
    1079             :                     /* Get the number of live attributes. */
    1080         310 :                     for (i = 0; i < desc->natts; i++)
    1081             :                     {
    1082         236 :                         Form_pg_attribute att = TupleDescAttr(desc, i);
    1083             : 
    1084         236 :                         if (att->attisdropped || att->attgenerated)
    1085           4 :                             continue;
    1086             : 
    1087         232 :                         nliveatts++;
    1088             :                     }
    1089             : 
    1090             :                     /*
    1091             :                      * If column list includes all the columns of the table,
    1092             :                      * set it to NULL.
    1093             :                      */
    1094          74 :                     if (bms_num_members(cols) == nliveatts)
    1095             :                     {
    1096          14 :                         bms_free(cols);
    1097          14 :                         cols = NULL;
    1098             :                     }
    1099             :                 }
    1100             : 
    1101         324 :                 ReleaseSysCache(cftuple);
    1102             :             }
    1103             :         }
    1104             : 
    1105         492 :         if (first)
    1106             :         {
    1107         470 :             entry->columns = cols;
    1108         470 :             first = false;
    1109             :         }
    1110          22 :         else if (!bms_equal(entry->columns, cols))
    1111           2 :             ereport(ERROR,
    1112             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1113             :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
    1114             :                            get_namespace_name(RelationGetNamespace(relation)),
    1115             :                            RelationGetRelationName(relation)));
    1116             :     }                           /* loop all subscribed publications */
    1117             : 
    1118         468 :     RelationClose(relation);
    1119         468 : }
    1120             : 
    1121             : /*
    1122             :  * Initialize the slot for storing new and old tuples, and build the map that
    1123             :  * will be used to convert the relation's tuples into the ancestor's format.
    1124             :  */
    1125             : static void
    1126         470 : init_tuple_slot(PGOutputData *data, Relation relation,
    1127             :                 RelationSyncEntry *entry)
    1128             : {
    1129             :     MemoryContext oldctx;
    1130             :     TupleDesc   oldtupdesc;
    1131             :     TupleDesc   newtupdesc;
    1132             : 
    1133         470 :     oldctx = MemoryContextSwitchTo(data->cachectx);
    1134             : 
    1135             :     /*
    1136             :      * Create tuple table slots. Create a copy of the TupleDesc as it needs to
    1137             :      * live as long as the cache remains.
    1138             :      */
    1139         470 :     oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1140         470 :     newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
    1141             : 
    1142         470 :     entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
    1143         470 :     entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
    1144             : 
    1145         470 :     MemoryContextSwitchTo(oldctx);
    1146             : 
    1147             :     /*
    1148             :      * Cache the map that will be used to convert the relation's tuples into
    1149             :      * the ancestor's format, if needed.
    1150             :      */
    1151         470 :     if (entry->publish_as_relid != RelationGetRelid(relation))
    1152             :     {
    1153          60 :         Relation    ancestor = RelationIdGetRelation(entry->publish_as_relid);
    1154          60 :         TupleDesc   indesc = RelationGetDescr(relation);
    1155          60 :         TupleDesc   outdesc = RelationGetDescr(ancestor);
    1156             : 
    1157             :         /* Map must live as long as the session does. */
    1158          60 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1159             : 
    1160          60 :         entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
    1161             : 
    1162          60 :         MemoryContextSwitchTo(oldctx);
    1163          60 :         RelationClose(ancestor);
    1164             :     }
    1165         470 : }
    1166             : 
    1167             : /*
    1168             :  * Change is checked against the row filter if any.
    1169             :  *
    1170             :  * Returns true if the change is to be replicated, else false.
    1171             :  *
    1172             :  * For inserts, evaluate the row filter for new tuple.
    1173             :  * For deletes, evaluate the row filter for old tuple.
    1174             :  * For updates, evaluate the row filter for old and new tuple.
    1175             :  *
    1176             :  * For updates, if both evaluations are true, we allow sending the UPDATE and
    1177             :  * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
    1178             :  * only one of the tuples matches the row filter expression, we transform
    1179             :  * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
    1180             :  * following rules:
    1181             :  *
    1182             :  * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
    1183             :  * Case 2: old-row (no match)    new row (match)     -> INSERT
    1184             :  * Case 3: old-row (match)       new-row (no match)  -> DELETE
    1185             :  * Case 4: old-row (match)       new row (match)     -> UPDATE
    1186             :  *
    1187             :  * The new action is updated in the action parameter.
    1188             :  *
    1189             :  * The new slot could be updated when transforming the UPDATE into INSERT,
    1190             :  * because the original new tuple might not have column values from the replica
    1191             :  * identity.
    1192             :  *
    1193             :  * Examples:
    1194             :  * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
    1195             :  * Since the old tuple satisfies, the initial table synchronization copied this
    1196             :  * row (or another method was used to guarantee that there is data
    1197             :  * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
    1198             :  * row filter, so from a data consistency perspective, that row should be
    1199             :  * removed on the subscriber. The UPDATE should be transformed into a DELETE
    1200             :  * statement and be sent to the subscriber. Keeping this row on the subscriber
    1201             :  * is undesirable because it doesn't reflect what was defined in the row filter
    1202             :  * expression on the publisher. This row on the subscriber would likely not be
    1203             :  * modified by replication again. If someone inserted a new row with the same
    1204             :  * old identifier, replication could stop due to a constraint violation.
    1205             :  *
    1206             :  * Let's say the old tuple doesn't match the row filter but the new tuple does.
    1207             :  * Since the old tuple doesn't satisfy, the initial table synchronization
    1208             :  * probably didn't copy this row. However, after the UPDATE the new tuple does
    1209             :  * satisfy the row filter, so from a data consistency perspective, that row
    1210             :  * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
    1211             :  * statements have no effect (it matches no row -- see
    1212             :  * apply_handle_update_internal()). So, the UPDATE should be transformed into a
    1213             :  * INSERT statement and be sent to the subscriber. However, this might surprise
    1214             :  * someone who expects the data set to satisfy the row filter expression on the
    1215             :  * provider.
    1216             :  */
    1217             : static bool
    1218      364042 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
    1219             :                     TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
    1220             :                     ReorderBufferChangeType *action)
    1221             : {
    1222             :     TupleDesc   desc;
    1223             :     int         i;
    1224             :     bool        old_matched,
    1225             :                 new_matched,
    1226             :                 result;
    1227             :     TupleTableSlot *tmp_new_slot;
    1228      364042 :     TupleTableSlot *new_slot = *new_slot_ptr;
    1229             :     ExprContext *ecxt;
    1230             :     ExprState  *filter_exprstate;
    1231             : 
    1232             :     /*
    1233             :      * We need this map to avoid relying on ReorderBufferChangeType enums
    1234             :      * having specific values.
    1235             :      */
    1236             :     static const int map_changetype_pubaction[] = {
    1237             :         [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
    1238             :         [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
    1239             :         [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
    1240             :     };
    1241             : 
    1242             :     Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
    1243             :            *action == REORDER_BUFFER_CHANGE_UPDATE ||
    1244             :            *action == REORDER_BUFFER_CHANGE_DELETE);
    1245             : 
    1246             :     Assert(new_slot || old_slot);
    1247             : 
    1248             :     /* Get the corresponding row filter */
    1249      364042 :     filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
    1250             : 
    1251             :     /* Bail out if there is no row filter */
    1252      364042 :     if (!filter_exprstate)
    1253      363978 :         return true;
    1254             : 
    1255          64 :     elog(DEBUG3, "table \"%s.%s\" has row filter",
    1256             :          get_namespace_name(RelationGetNamespace(relation)),
    1257             :          RelationGetRelationName(relation));
    1258             : 
    1259          64 :     ResetPerTupleExprContext(entry->estate);
    1260             : 
    1261          64 :     ecxt = GetPerTupleExprContext(entry->estate);
    1262             : 
    1263             :     /*
    1264             :      * For the following occasions where there is only one tuple, we can
    1265             :      * evaluate the row filter for that tuple and return.
    1266             :      *
    1267             :      * For inserts, we only have the new tuple.
    1268             :      *
    1269             :      * For updates, we can have only a new tuple when none of the replica
    1270             :      * identity columns changed and none of those columns have external data
    1271             :      * but we still need to evaluate the row filter for the new tuple as the
    1272             :      * existing values of those columns might not match the filter. Also,
    1273             :      * users can use constant expressions in the row filter, so we anyway need
    1274             :      * to evaluate it for the new tuple.
    1275             :      *
    1276             :      * For deletes, we only have the old tuple.
    1277             :      */
    1278          64 :     if (!new_slot || !old_slot)
    1279             :     {
    1280          56 :         ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
    1281          56 :         result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1282             : 
    1283          56 :         return result;
    1284             :     }
    1285             : 
    1286             :     /*
    1287             :      * Both the old and new tuples must be valid only for updates and need to
    1288             :      * be checked against the row filter.
    1289             :      */
    1290             :     Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
    1291             : 
    1292           8 :     slot_getallattrs(new_slot);
    1293           8 :     slot_getallattrs(old_slot);
    1294             : 
    1295           8 :     tmp_new_slot = NULL;
    1296           8 :     desc = RelationGetDescr(relation);
    1297             : 
    1298             :     /*
    1299             :      * The new tuple might not have all the replica identity columns, in which
    1300             :      * case it needs to be copied over from the old tuple.
    1301             :      */
    1302          24 :     for (i = 0; i < desc->natts; i++)
    1303             :     {
    1304          16 :         Form_pg_attribute att = TupleDescAttr(desc, i);
    1305             : 
    1306             :         /*
    1307             :          * if the column in the new tuple or old tuple is null, nothing to do
    1308             :          */
    1309          16 :         if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
    1310           2 :             continue;
    1311             : 
    1312             :         /*
    1313             :          * Unchanged toasted replica identity columns are only logged in the
    1314             :          * old tuple. Copy this over to the new tuple. The changed (or WAL
    1315             :          * Logged) toast values are always assembled in memory and set as
    1316             :          * VARTAG_INDIRECT. See ReorderBufferToastReplace.
    1317             :          */
    1318          14 :         if (att->attlen == -1 &&
    1319           8 :             VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
    1320           2 :             !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
    1321             :         {
    1322           2 :             if (!tmp_new_slot)
    1323             :             {
    1324           2 :                 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
    1325           2 :                 ExecClearTuple(tmp_new_slot);
    1326             : 
    1327           2 :                 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
    1328           2 :                        desc->natts * sizeof(Datum));
    1329           2 :                 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
    1330           2 :                        desc->natts * sizeof(bool));
    1331             :             }
    1332             : 
    1333           2 :             tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
    1334           2 :             tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
    1335             :         }
    1336             :     }
    1337             : 
    1338           8 :     ecxt->ecxt_scantuple = old_slot;
    1339           8 :     old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1340             : 
    1341           8 :     if (tmp_new_slot)
    1342             :     {
    1343           2 :         ExecStoreVirtualTuple(tmp_new_slot);
    1344           2 :         ecxt->ecxt_scantuple = tmp_new_slot;
    1345             :     }
    1346             :     else
    1347           6 :         ecxt->ecxt_scantuple = new_slot;
    1348             : 
    1349           8 :     new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
    1350             : 
    1351             :     /*
    1352             :      * Case 1: if both tuples don't match the row filter, bailout. Send
    1353             :      * nothing.
    1354             :      */
    1355           8 :     if (!old_matched && !new_matched)
    1356           0 :         return false;
    1357             : 
    1358             :     /*
    1359             :      * Case 2: if the old tuple doesn't satisfy the row filter but the new
    1360             :      * tuple does, transform the UPDATE into INSERT.
    1361             :      *
    1362             :      * Use the newly transformed tuple that must contain the column values for
    1363             :      * all the replica identity columns. This is required to ensure that the
    1364             :      * while inserting the tuple in the downstream node, we have all the
    1365             :      * required column values.
    1366             :      */
    1367           8 :     if (!old_matched && new_matched)
    1368             :     {
    1369           4 :         *action = REORDER_BUFFER_CHANGE_INSERT;
    1370             : 
    1371           4 :         if (tmp_new_slot)
    1372           2 :             *new_slot_ptr = tmp_new_slot;
    1373             :     }
    1374             : 
    1375             :     /*
    1376             :      * Case 3: if the old tuple satisfies the row filter but the new tuple
    1377             :      * doesn't, transform the UPDATE into DELETE.
    1378             :      *
    1379             :      * This transformation does not require another tuple. The Old tuple will
    1380             :      * be used for DELETE.
    1381             :      */
    1382           4 :     else if (old_matched && !new_matched)
    1383           2 :         *action = REORDER_BUFFER_CHANGE_DELETE;
    1384             : 
    1385             :     /*
    1386             :      * Case 4: if both tuples match the row filter, transformation isn't
    1387             :      * required. (*action is default UPDATE).
    1388             :      */
    1389             : 
    1390           8 :     return true;
    1391             : }
    1392             : 
    1393             : /*
    1394             :  * Sends the decoded DML over wire.
    1395             :  *
    1396             :  * This is called both in streaming and non-streaming modes.
    1397             :  */
    1398             : static void
    1399      366292 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1400             :                 Relation relation, ReorderBufferChange *change)
    1401             : {
    1402      366292 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1403      366292 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1404             :     MemoryContext old;
    1405             :     RelationSyncEntry *relentry;
    1406      366292 :     TransactionId xid = InvalidTransactionId;
    1407      366292 :     Relation    ancestor = NULL;
    1408      366292 :     Relation    targetrel = relation;
    1409      366292 :     ReorderBufferChangeType action = change->action;
    1410      366292 :     TupleTableSlot *old_slot = NULL;
    1411      366292 :     TupleTableSlot *new_slot = NULL;
    1412             : 
    1413      366292 :     if (!is_publishable_relation(relation))
    1414        2244 :         return;
    1415             : 
    1416             :     /*
    1417             :      * Remember the xid for the change in streaming mode. We need to send xid
    1418             :      * with each change in the streaming mode so that subscriber can make
    1419             :      * their association and on aborts, it can discard the corresponding
    1420             :      * changes.
    1421             :      */
    1422      366288 :     if (data->in_streaming)
    1423      351882 :         xid = change->txn->xid;
    1424             : 
    1425      366288 :     relentry = get_rel_sync_entry(data, relation);
    1426             : 
    1427             :     /* First check the table filter */
    1428      366282 :     switch (action)
    1429             :     {
    1430      211570 :         case REORDER_BUFFER_CHANGE_INSERT:
    1431      211570 :             if (!relentry->pubactions.pubinsert)
    1432          86 :                 return;
    1433      211484 :             break;
    1434       68926 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1435       68926 :             if (!relentry->pubactions.pubupdate)
    1436          86 :                 return;
    1437       68840 :             break;
    1438       85786 :         case REORDER_BUFFER_CHANGE_DELETE:
    1439       85786 :             if (!relentry->pubactions.pubdelete)
    1440        2068 :                 return;
    1441             : 
    1442             :             /*
    1443             :              * This is only possible if deletes are allowed even when replica
    1444             :              * identity is not defined for a table. Since the DELETE action
    1445             :              * can't be published, we simply return.
    1446             :              */
    1447       83718 :             if (!change->data.tp.oldtuple)
    1448             :             {
    1449           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
    1450           0 :                 return;
    1451             :             }
    1452       83718 :             break;
    1453      364042 :         default:
    1454             :             Assert(false);
    1455             :     }
    1456             : 
    1457             :     /* Avoid leaking memory by using and resetting our own context */
    1458      364042 :     old = MemoryContextSwitchTo(data->context);
    1459             : 
    1460             :     /* Switch relation if publishing via root. */
    1461      364042 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
    1462             :     {
    1463             :         Assert(relation->rd_rel->relispartition);
    1464         102 :         ancestor = RelationIdGetRelation(relentry->publish_as_relid);
    1465         102 :         targetrel = ancestor;
    1466             :     }
    1467             : 
    1468      364042 :     if (change->data.tp.oldtuple)
    1469             :     {
    1470       83944 :         old_slot = relentry->old_slot;
    1471       83944 :         ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
    1472             : 
    1473             :         /* Convert tuple if needed. */
    1474       83944 :         if (relentry->attrmap)
    1475             :         {
    1476           0 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1477             :                                                       &TTSOpsVirtual);
    1478             : 
    1479           0 :             old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
    1480             :         }
    1481             :     }
    1482             : 
    1483      364042 :     if (change->data.tp.newtuple)
    1484             :     {
    1485      280324 :         new_slot = relentry->new_slot;
    1486      280324 :         ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
    1487             : 
    1488             :         /* Convert tuple if needed. */
    1489      280324 :         if (relentry->attrmap)
    1490             :         {
    1491          18 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
    1492             :                                                       &TTSOpsVirtual);
    1493             : 
    1494          18 :             new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
    1495             :         }
    1496             :     }
    1497             : 
    1498             :     /*
    1499             :      * Check row filter.
    1500             :      *
    1501             :      * Updates could be transformed to inserts or deletes based on the results
    1502             :      * of the row filter for old and new tuple.
    1503             :      */
    1504      364042 :     if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
    1505          22 :         goto cleanup;
    1506             : 
    1507             :     /*
    1508             :      * Send BEGIN if we haven't yet.
    1509             :      *
    1510             :      * We send the BEGIN message after ensuring that we will actually send the
    1511             :      * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
    1512             :      * transactions.
    1513             :      */
    1514      364020 :     if (txndata && !txndata->sent_begin_txn)
    1515         676 :         pgoutput_send_begin(ctx, txn);
    1516             : 
    1517             :     /*
    1518             :      * Schema should be sent using the original relation because it also sends
    1519             :      * the ancestor's relation.
    1520             :      */
    1521      364018 :     maybe_send_schema(ctx, change, relation, relentry);
    1522             : 
    1523      364014 :     OutputPluginPrepareWrite(ctx, true);
    1524             : 
    1525             :     /* Send the data */
    1526      364014 :     switch (action)
    1527             :     {
    1528      211460 :         case REORDER_BUFFER_CHANGE_INSERT:
    1529      211460 :             logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
    1530      211460 :                                     data->binary, relentry->columns);
    1531      211460 :             break;
    1532       68834 :         case REORDER_BUFFER_CHANGE_UPDATE:
    1533       68834 :             logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
    1534       68834 :                                     new_slot, data->binary, relentry->columns);
    1535       68834 :             break;
    1536       83720 :         case REORDER_BUFFER_CHANGE_DELETE:
    1537       83720 :             logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
    1538       83720 :                                     data->binary, relentry->columns);
    1539       83720 :             break;
    1540      364014 :         default:
    1541             :             Assert(false);
    1542             :     }
    1543             : 
    1544      364014 :     OutputPluginWrite(ctx, true);
    1545             : 
    1546      364036 : cleanup:
    1547      364036 :     if (RelationIsValid(ancestor))
    1548             :     {
    1549          96 :         RelationClose(ancestor);
    1550          96 :         ancestor = NULL;
    1551             :     }
    1552             : 
    1553      364036 :     MemoryContextSwitchTo(old);
    1554      364036 :     MemoryContextReset(data->context);
    1555             : }
    1556             : 
    1557             : static void
    1558          22 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1559             :                   int nrelations, Relation relations[], ReorderBufferChange *change)
    1560             : {
    1561          22 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1562          22 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1563             :     MemoryContext old;
    1564             :     RelationSyncEntry *relentry;
    1565             :     int         i;
    1566             :     int         nrelids;
    1567             :     Oid        *relids;
    1568          22 :     TransactionId xid = InvalidTransactionId;
    1569             : 
    1570             :     /* Remember the xid for the change in streaming mode. See pgoutput_change. */
    1571          22 :     if (data->in_streaming)
    1572           0 :         xid = change->txn->xid;
    1573             : 
    1574          22 :     old = MemoryContextSwitchTo(data->context);
    1575             : 
    1576          22 :     relids = palloc0(nrelations * sizeof(Oid));
    1577          22 :     nrelids = 0;
    1578             : 
    1579          62 :     for (i = 0; i < nrelations; i++)
    1580             :     {
    1581          40 :         Relation    relation = relations[i];
    1582          40 :         Oid         relid = RelationGetRelid(relation);
    1583             : 
    1584          40 :         if (!is_publishable_relation(relation))
    1585           0 :             continue;
    1586             : 
    1587          40 :         relentry = get_rel_sync_entry(data, relation);
    1588             : 
    1589          40 :         if (!relentry->pubactions.pubtruncate)
    1590          16 :             continue;
    1591             : 
    1592             :         /*
    1593             :          * Don't send partitions if the publication wants to send only the
    1594             :          * root tables through it.
    1595             :          */
    1596          24 :         if (relation->rd_rel->relispartition &&
    1597          20 :             relentry->publish_as_relid != relid)
    1598           0 :             continue;
    1599             : 
    1600          24 :         relids[nrelids++] = relid;
    1601             : 
    1602             :         /* Send BEGIN if we haven't yet */
    1603          24 :         if (txndata && !txndata->sent_begin_txn)
    1604          14 :             pgoutput_send_begin(ctx, txn);
    1605             : 
    1606          24 :         maybe_send_schema(ctx, change, relation, relentry);
    1607             :     }
    1608             : 
    1609          22 :     if (nrelids > 0)
    1610             :     {
    1611          14 :         OutputPluginPrepareWrite(ctx, true);
    1612          14 :         logicalrep_write_truncate(ctx->out,
    1613             :                                   xid,
    1614             :                                   nrelids,
    1615             :                                   relids,
    1616          14 :                                   change->data.truncate.cascade,
    1617          14 :                                   change->data.truncate.restart_seqs);
    1618          14 :         OutputPluginWrite(ctx, true);
    1619             :     }
    1620             : 
    1621          22 :     MemoryContextSwitchTo(old);
    1622          22 :     MemoryContextReset(data->context);
    1623          22 : }
    1624             : 
    1625             : static void
    1626          12 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    1627             :                  XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
    1628             :                  const char *message)
    1629             : {
    1630          12 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1631          12 :     TransactionId xid = InvalidTransactionId;
    1632             : 
    1633          12 :     if (!data->messages)
    1634           2 :         return;
    1635             : 
    1636             :     /*
    1637             :      * Remember the xid for the message in streaming mode. See
    1638             :      * pgoutput_change.
    1639             :      */
    1640          10 :     if (data->in_streaming)
    1641           0 :         xid = txn->xid;
    1642             : 
    1643             :     /*
    1644             :      * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
    1645             :      */
    1646          10 :     if (transactional)
    1647             :     {
    1648           4 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
    1649             : 
    1650             :         /* Send BEGIN if we haven't yet */
    1651           4 :         if (txndata && !txndata->sent_begin_txn)
    1652           4 :             pgoutput_send_begin(ctx, txn);
    1653             :     }
    1654             : 
    1655          10 :     OutputPluginPrepareWrite(ctx, true);
    1656          10 :     logicalrep_write_message(ctx->out,
    1657             :                              xid,
    1658             :                              message_lsn,
    1659             :                              transactional,
    1660             :                              prefix,
    1661             :                              sz,
    1662             :                              message);
    1663          10 :     OutputPluginWrite(ctx, true);
    1664             : }
    1665             : 
    1666             : /*
    1667             :  * Return true if the data is associated with an origin and the user has
    1668             :  * requested the changes that don't have an origin, false otherwise.
    1669             :  */
    1670             : static bool
    1671      981838 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
    1672             :                        RepOriginId origin_id)
    1673             : {
    1674      981838 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1675             : 
    1676      981838 :     if (data->publish_no_origin && origin_id != InvalidRepOriginId)
    1677          96 :         return true;
    1678             : 
    1679      981742 :     return false;
    1680             : }
    1681             : 
    1682             : /*
    1683             :  * Shutdown the output plugin.
    1684             :  *
    1685             :  * Note, we don't need to clean the data->context and data->cachectx as
    1686             :  * they are child contexts of the ctx->context so they will be cleaned up by
    1687             :  * logical decoding machinery.
    1688             :  */
    1689             : static void
    1690         822 : pgoutput_shutdown(LogicalDecodingContext *ctx)
    1691             : {
    1692         822 :     if (RelationSyncCache)
    1693             :     {
    1694         328 :         hash_destroy(RelationSyncCache);
    1695         328 :         RelationSyncCache = NULL;
    1696             :     }
    1697         822 : }
    1698             : 
    1699             : /*
    1700             :  * Load publications from the list of publication names.
    1701             :  */
    1702             : static List *
    1703         276 : LoadPublications(List *pubnames)
    1704             : {
    1705         276 :     List       *result = NIL;
    1706             :     ListCell   *lc;
    1707             : 
    1708         634 :     foreach(lc, pubnames)
    1709             :     {
    1710         362 :         char       *pubname = (char *) lfirst(lc);
    1711         362 :         Publication *pub = GetPublicationByName(pubname, false);
    1712             : 
    1713         358 :         result = lappend(result, pub);
    1714             :     }
    1715             : 
    1716         272 :     return result;
    1717             : }
    1718             : 
    1719             : /*
    1720             :  * Publication syscache invalidation callback.
    1721             :  *
    1722             :  * Called for invalidations on pg_publication.
    1723             :  */
    1724             : static void
    1725         480 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
    1726             : {
    1727         480 :     publications_valid = false;
    1728             : 
    1729             :     /*
    1730             :      * Also invalidate per-relation cache so that next time the filtering info
    1731             :      * is checked it will be updated with the new publication settings.
    1732             :      */
    1733         480 :     rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
    1734         480 : }
    1735             : 
    1736             : /*
    1737             :  * START STREAM callback
    1738             :  */
    1739             : static void
    1740        1294 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
    1741             :                       ReorderBufferTXN *txn)
    1742             : {
    1743        1294 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1744        1294 :     bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
    1745             : 
    1746             :     /* we can't nest streaming of transactions */
    1747             :     Assert(!data->in_streaming);
    1748             : 
    1749             :     /*
    1750             :      * If we already sent the first stream for this transaction then don't
    1751             :      * send the origin id in the subsequent streams.
    1752             :      */
    1753        1294 :     if (rbtxn_is_streamed(txn))
    1754        1168 :         send_replication_origin = false;
    1755             : 
    1756        1294 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
    1757        1294 :     logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
    1758             : 
    1759        1294 :     send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
    1760             :                      send_replication_origin);
    1761             : 
    1762        1294 :     OutputPluginWrite(ctx, true);
    1763             : 
    1764             :     /* we're streaming a chunk of transaction now */
    1765        1294 :     data->in_streaming = true;
    1766        1294 : }
    1767             : 
    1768             : /*
    1769             :  * STOP STREAM callback
    1770             :  */
    1771             : static void
    1772        1294 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
    1773             :                      ReorderBufferTXN *txn)
    1774             : {
    1775        1294 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1776             : 
    1777             :     /* we should be streaming a transaction */
    1778             :     Assert(data->in_streaming);
    1779             : 
    1780        1294 :     OutputPluginPrepareWrite(ctx, true);
    1781        1294 :     logicalrep_write_stream_stop(ctx->out);
    1782        1294 :     OutputPluginWrite(ctx, true);
    1783             : 
    1784             :     /* we've stopped streaming a transaction */
    1785        1294 :     data->in_streaming = false;
    1786        1294 : }
    1787             : 
    1788             : /*
    1789             :  * Notify downstream to discard the streamed transaction (along with all
    1790             :  * it's subtransactions, if it's a toplevel transaction).
    1791             :  */
    1792             : static void
    1793          52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
    1794             :                       ReorderBufferTXN *txn,
    1795             :                       XLogRecPtr abort_lsn)
    1796             : {
    1797             :     ReorderBufferTXN *toptxn;
    1798          52 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    1799          52 :     bool        write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
    1800             : 
    1801             :     /*
    1802             :      * The abort should happen outside streaming block, even for streamed
    1803             :      * transactions. The transaction has to be marked as streamed, though.
    1804             :      */
    1805             :     Assert(!data->in_streaming);
    1806             : 
    1807             :     /* determine the toplevel transaction */
    1808          52 :     toptxn = rbtxn_get_toptxn(txn);
    1809             : 
    1810             :     Assert(rbtxn_is_streamed(toptxn));
    1811             : 
    1812          52 :     OutputPluginPrepareWrite(ctx, true);
    1813          52 :     logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
    1814             :                                   txn->xact_time.abort_time, write_abort_info);
    1815             : 
    1816          52 :     OutputPluginWrite(ctx, true);
    1817             : 
    1818          52 :     cleanup_rel_sync_cache(toptxn->xid, false);
    1819          52 : }
    1820             : 
    1821             : /*
    1822             :  * Notify downstream to apply the streamed transaction (along with all
    1823             :  * it's subtransactions).
    1824             :  */
    1825             : static void
    1826          92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
    1827             :                        ReorderBufferTXN *txn,
    1828             :                        XLogRecPtr commit_lsn)
    1829             : {
    1830          92 :     PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
    1831             : 
    1832             :     /*
    1833             :      * The commit should happen outside streaming block, even for streamed
    1834             :      * transactions. The transaction has to be marked as streamed, though.
    1835             :      */
    1836             :     Assert(!data->in_streaming);
    1837             :     Assert(rbtxn_is_streamed(txn));
    1838             : 
    1839          92 :     OutputPluginUpdateProgress(ctx, false);
    1840             : 
    1841          92 :     OutputPluginPrepareWrite(ctx, true);
    1842          92 :     logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
    1843          92 :     OutputPluginWrite(ctx, true);
    1844             : 
    1845          92 :     cleanup_rel_sync_cache(txn->xid, true);
    1846          92 : }
    1847             : 
    1848             : /*
    1849             :  * PREPARE callback (for streaming two-phase commit).
    1850             :  *
    1851             :  * Notify the downstream to prepare the transaction.
    1852             :  */
    1853             : static void
    1854          28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
    1855             :                             ReorderBufferTXN *txn,
    1856             :                             XLogRecPtr prepare_lsn)
    1857             : {
    1858             :     Assert(rbtxn_is_streamed(txn));
    1859             : 
    1860          28 :     OutputPluginUpdateProgress(ctx, false);
    1861          28 :     OutputPluginPrepareWrite(ctx, true);
    1862          28 :     logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
    1863          28 :     OutputPluginWrite(ctx, true);
    1864          28 : }
    1865             : 
    1866             : /*
    1867             :  * Initialize the relation schema sync cache for a decoding session.
    1868             :  *
    1869             :  * The hash table is destroyed at the end of a decoding session. While
    1870             :  * relcache invalidations still exist and will still be invoked, they
    1871             :  * will just see the null hash table global and take no action.
    1872             :  */
    1873             : static void
    1874         608 : init_rel_sync_cache(MemoryContext cachectx)
    1875             : {
    1876             :     HASHCTL     ctl;
    1877             :     static bool relation_callbacks_registered = false;
    1878             : 
    1879             :     /* Nothing to do if hash table already exists */
    1880         608 :     if (RelationSyncCache != NULL)
    1881           2 :         return;
    1882             : 
    1883             :     /* Make a new hash table for the cache */
    1884         608 :     ctl.keysize = sizeof(Oid);
    1885         608 :     ctl.entrysize = sizeof(RelationSyncEntry);
    1886         608 :     ctl.hcxt = cachectx;
    1887             : 
    1888         608 :     RelationSyncCache = hash_create("logical replication output relation cache",
    1889             :                                     128, &ctl,
    1890             :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
    1891             : 
    1892             :     Assert(RelationSyncCache != NULL);
    1893             : 
    1894             :     /* No more to do if we already registered callbacks */
    1895         608 :     if (relation_callbacks_registered)
    1896           2 :         return;
    1897             : 
    1898             :     /* We must update the cache entry for a relation after a relcache flush */
    1899         606 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
    1900             : 
    1901             :     /*
    1902             :      * Flush all cache entries after a pg_namespace change, in case it was a
    1903             :      * schema rename affecting a relation being replicated.
    1904             :      */
    1905         606 :     CacheRegisterSyscacheCallback(NAMESPACEOID,
    1906             :                                   rel_sync_cache_publication_cb,
    1907             :                                   (Datum) 0);
    1908             : 
    1909             :     /*
    1910             :      * Flush all cache entries after any publication changes.  (We need no
    1911             :      * callback entry for pg_publication, because publication_invalidation_cb
    1912             :      * will take care of it.)
    1913             :      */
    1914         606 :     CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
    1915             :                                   rel_sync_cache_publication_cb,
    1916             :                                   (Datum) 0);
    1917         606 :     CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
    1918             :                                   rel_sync_cache_publication_cb,
    1919             :                                   (Datum) 0);
    1920             : 
    1921         606 :     relation_callbacks_registered = true;
    1922             : }
    1923             : 
    1924             : /*
    1925             :  * We expect relatively small number of streamed transactions.
    1926             :  */
    1927             : static bool
    1928      351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    1929             : {
    1930      351882 :     return list_member_xid(entry->streamed_txns, xid);
    1931             : }
    1932             : 
    1933             : /*
    1934             :  * Add the xid in the rel sync entry for which we have already sent the schema
    1935             :  * of the relation.
    1936             :  */
    1937             : static void
    1938         144 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
    1939             : {
    1940             :     MemoryContext oldctx;
    1941             : 
    1942         144 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    1943             : 
    1944         144 :     entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
    1945             : 
    1946         144 :     MemoryContextSwitchTo(oldctx);
    1947         144 : }
    1948             : 
    1949             : /*
    1950             :  * Find or create entry in the relation schema cache.
    1951             :  *
    1952             :  * This looks up publications that the given relation is directly or
    1953             :  * indirectly part of (the latter if it's really the relation's ancestor that
    1954             :  * is part of a publication) and fills up the found entry with the information
    1955             :  * about which operations to publish and whether to use an ancestor's schema
    1956             :  * when publishing.
    1957             :  */
    1958             : static RelationSyncEntry *
    1959      366328 : get_rel_sync_entry(PGOutputData *data, Relation relation)
    1960             : {
    1961             :     RelationSyncEntry *entry;
    1962             :     bool        found;
    1963             :     MemoryContext oldctx;
    1964      366328 :     Oid         relid = RelationGetRelid(relation);
    1965             : 
    1966             :     Assert(RelationSyncCache != NULL);
    1967             : 
    1968             :     /* Find cached relation info, creating if not found */
    1969      366328 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
    1970             :                                               &relid,
    1971             :                                               HASH_ENTER, &found);
    1972             :     Assert(entry != NULL);
    1973             : 
    1974             :     /* initialize entry, if it's new */
    1975      366328 :     if (!found)
    1976             :     {
    1977         440 :         entry->replicate_valid = false;
    1978         440 :         entry->schema_sent = false;
    1979         440 :         entry->streamed_txns = NIL;
    1980         440 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
    1981         440 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
    1982         440 :         entry->new_slot = NULL;
    1983         440 :         entry->old_slot = NULL;
    1984         440 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    1985         440 :         entry->entry_cxt = NULL;
    1986         440 :         entry->publish_as_relid = InvalidOid;
    1987         440 :         entry->columns = NULL;
    1988         440 :         entry->attrmap = NULL;
    1989             :     }
    1990             : 
    1991             :     /* Validate the entry */
    1992      366328 :     if (!entry->replicate_valid)
    1993             :     {
    1994         550 :         Oid         schemaId = get_rel_namespace(relid);
    1995         550 :         List       *pubids = GetRelationPublications(relid);
    1996             : 
    1997             :         /*
    1998             :          * We don't acquire a lock on the namespace system table as we build
    1999             :          * the cache entry using a historic snapshot and all the later changes
    2000             :          * are absorbed while decoding WAL.
    2001             :          */
    2002         550 :         List       *schemaPubids = GetSchemaPublications(schemaId);
    2003             :         ListCell   *lc;
    2004         550 :         Oid         publish_as_relid = relid;
    2005         550 :         int         publish_ancestor_level = 0;
    2006         550 :         bool        am_partition = get_rel_relispartition(relid);
    2007         550 :         char        relkind = get_rel_relkind(relid);
    2008         550 :         List       *rel_publications = NIL;
    2009             : 
    2010             :         /* Reload publications if needed before use. */
    2011         550 :         if (!publications_valid)
    2012             :         {
    2013         276 :             oldctx = MemoryContextSwitchTo(CacheMemoryContext);
    2014         276 :             if (data->publications)
    2015             :             {
    2016          38 :                 list_free_deep(data->publications);
    2017          38 :                 data->publications = NIL;
    2018             :             }
    2019         276 :             data->publications = LoadPublications(data->publication_names);
    2020         272 :             MemoryContextSwitchTo(oldctx);
    2021         272 :             publications_valid = true;
    2022             :         }
    2023             : 
    2024             :         /*
    2025             :          * Reset schema_sent status as the relation definition may have
    2026             :          * changed.  Also reset pubactions to empty in case rel was dropped
    2027             :          * from a publication.  Also free any objects that depended on the
    2028             :          * earlier definition.
    2029             :          */
    2030         546 :         entry->schema_sent = false;
    2031         546 :         list_free(entry->streamed_txns);
    2032         546 :         entry->streamed_txns = NIL;
    2033         546 :         bms_free(entry->columns);
    2034         546 :         entry->columns = NULL;
    2035         546 :         entry->pubactions.pubinsert = false;
    2036         546 :         entry->pubactions.pubupdate = false;
    2037         546 :         entry->pubactions.pubdelete = false;
    2038         546 :         entry->pubactions.pubtruncate = false;
    2039             : 
    2040             :         /*
    2041             :          * Tuple slots cleanups. (Will be rebuilt later if needed).
    2042             :          */
    2043         546 :         if (entry->old_slot)
    2044          98 :             ExecDropSingleTupleTableSlot(entry->old_slot);
    2045         546 :         if (entry->new_slot)
    2046          98 :             ExecDropSingleTupleTableSlot(entry->new_slot);
    2047             : 
    2048         546 :         entry->old_slot = NULL;
    2049         546 :         entry->new_slot = NULL;
    2050             : 
    2051         546 :         if (entry->attrmap)
    2052           0 :             free_attrmap(entry->attrmap);
    2053         546 :         entry->attrmap = NULL;
    2054             : 
    2055             :         /*
    2056             :          * Row filter cache cleanups.
    2057             :          */
    2058         546 :         if (entry->entry_cxt)
    2059          20 :             MemoryContextDelete(entry->entry_cxt);
    2060             : 
    2061         546 :         entry->entry_cxt = NULL;
    2062         546 :         entry->estate = NULL;
    2063         546 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
    2064             : 
    2065             :         /*
    2066             :          * Build publication cache. We can't use one provided by relcache as
    2067             :          * relcache considers all publications that the given relation is in,
    2068             :          * but here we only need to consider ones that the subscriber
    2069             :          * requested.
    2070             :          */
    2071        1388 :         foreach(lc, data->publications)
    2072             :         {
    2073         842 :             Publication *pub = lfirst(lc);
    2074         842 :             bool        publish = false;
    2075             : 
    2076             :             /*
    2077             :              * Under what relid should we publish changes in this publication?
    2078             :              * We'll use the top-most relid across all publications. Also
    2079             :              * track the ancestor level for this publication.
    2080             :              */
    2081         842 :             Oid         pub_relid = relid;
    2082         842 :             int         ancestor_level = 0;
    2083             : 
    2084             :             /*
    2085             :              * If this is a FOR ALL TABLES publication, pick the partition
    2086             :              * root and set the ancestor level accordingly.
    2087             :              */
    2088         842 :             if (pub->alltables)
    2089             :             {
    2090         132 :                 publish = true;
    2091         132 :                 if (pub->pubviaroot && am_partition)
    2092             :                 {
    2093          28 :                     List       *ancestors = get_partition_ancestors(relid);
    2094             : 
    2095          28 :                     pub_relid = llast_oid(ancestors);
    2096          28 :                     ancestor_level = list_length(ancestors);
    2097             :                 }
    2098             :             }
    2099             : 
    2100         842 :             if (!publish)
    2101             :             {
    2102         710 :                 bool        ancestor_published = false;
    2103             : 
    2104             :                 /*
    2105             :                  * For a partition, check if any of the ancestors are
    2106             :                  * published.  If so, note down the topmost ancestor that is
    2107             :                  * published via this publication, which will be used as the
    2108             :                  * relation via which to publish the partition's changes.
    2109             :                  */
    2110         710 :                 if (am_partition)
    2111             :                 {
    2112             :                     Oid         ancestor;
    2113             :                     int         level;
    2114         198 :                     List       *ancestors = get_partition_ancestors(relid);
    2115             : 
    2116         198 :                     ancestor = GetTopMostAncestorInPublication(pub->oid,
    2117             :                                                                ancestors,
    2118             :                                                                &level);
    2119             : 
    2120         198 :                     if (ancestor != InvalidOid)
    2121             :                     {
    2122          80 :                         ancestor_published = true;
    2123          80 :                         if (pub->pubviaroot)
    2124             :                         {
    2125          34 :                             pub_relid = ancestor;
    2126          34 :                             ancestor_level = level;
    2127             :                         }
    2128             :                     }
    2129             :                 }
    2130             : 
    2131        1110 :                 if (list_member_oid(pubids, pub->oid) ||
    2132         788 :                     list_member_oid(schemaPubids, pub->oid) ||
    2133             :                     ancestor_published)
    2134         372 :                     publish = true;
    2135             :             }
    2136             : 
    2137             :             /*
    2138             :              * If the relation is to be published, determine actions to
    2139             :              * publish, and list of columns, if appropriate.
    2140             :              *
    2141             :              * Don't publish changes for partitioned tables, because
    2142             :              * publishing those of its partitions suffices, unless partition
    2143             :              * changes won't be published due to pubviaroot being set.
    2144             :              */
    2145         842 :             if (publish &&
    2146           6 :                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
    2147             :             {
    2148         498 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
    2149         498 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
    2150         498 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
    2151         498 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
    2152             : 
    2153             :                 /*
    2154             :                  * We want to publish the changes as the top-most ancestor
    2155             :                  * across all publications. So we need to check if the already
    2156             :                  * calculated level is higher than the new one. If yes, we can
    2157             :                  * ignore the new value (as it's a child). Otherwise the new
    2158             :                  * value is an ancestor, so we keep it.
    2159             :                  */
    2160         498 :                 if (publish_ancestor_level > ancestor_level)
    2161           2 :                     continue;
    2162             : 
    2163             :                 /*
    2164             :                  * If we found an ancestor higher up in the tree, discard the
    2165             :                  * list of publications through which we replicate it, and use
    2166             :                  * the new ancestor.
    2167             :                  */
    2168         496 :                 if (publish_ancestor_level < ancestor_level)
    2169             :                 {
    2170          62 :                     publish_as_relid = pub_relid;
    2171          62 :                     publish_ancestor_level = ancestor_level;
    2172             : 
    2173             :                     /* reset the publication list for this relation */
    2174          62 :                     rel_publications = NIL;
    2175             :                 }
    2176             :                 else
    2177             :                 {
    2178             :                     /* Same ancestor level, has to be the same OID. */
    2179             :                     Assert(publish_as_relid == pub_relid);
    2180             :                 }
    2181             : 
    2182             :                 /* Track publications for this ancestor. */
    2183         496 :                 rel_publications = lappend(rel_publications, pub);
    2184             :             }
    2185             :         }
    2186             : 
    2187         546 :         entry->publish_as_relid = publish_as_relid;
    2188             : 
    2189             :         /*
    2190             :          * Initialize the tuple slot, map, and row filter. These are only used
    2191             :          * when publishing inserts, updates, or deletes.
    2192             :          */
    2193         546 :         if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
    2194          76 :             entry->pubactions.pubdelete)
    2195             :         {
    2196             :             /* Initialize the tuple slot and map */
    2197         470 :             init_tuple_slot(data, relation, entry);
    2198             : 
    2199             :             /* Initialize the row filter */
    2200         470 :             pgoutput_row_filter_init(data, rel_publications, entry);
    2201             : 
    2202             :             /* Initialize the column list */
    2203         470 :             pgoutput_column_list_init(data, rel_publications, entry);
    2204             :         }
    2205             : 
    2206         544 :         list_free(pubids);
    2207         544 :         list_free(schemaPubids);
    2208         544 :         list_free(rel_publications);
    2209             : 
    2210         544 :         entry->replicate_valid = true;
    2211             :     }
    2212             : 
    2213      366322 :     return entry;
    2214             : }
    2215             : 
    2216             : /*
    2217             :  * Cleanup list of streamed transactions and update the schema_sent flag.
    2218             :  *
    2219             :  * When a streamed transaction commits or aborts, we need to remove the
    2220             :  * toplevel XID from the schema cache. If the transaction aborted, the
    2221             :  * subscriber will simply throw away the schema records we streamed, so
    2222             :  * we don't need to do anything else.
    2223             :  *
    2224             :  * If the transaction is committed, the subscriber will update the relation
    2225             :  * cache - so tweak the schema_sent flag accordingly.
    2226             :  */
    2227             : static void
    2228         144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
    2229             : {
    2230             :     HASH_SEQ_STATUS hash_seq;
    2231             :     RelationSyncEntry *entry;
    2232             :     ListCell   *lc;
    2233             : 
    2234             :     Assert(RelationSyncCache != NULL);
    2235             : 
    2236         144 :     hash_seq_init(&hash_seq, RelationSyncCache);
    2237         294 :     while ((entry = hash_seq_search(&hash_seq)) != NULL)
    2238             :     {
    2239             :         /*
    2240             :          * We can set the schema_sent flag for an entry that has committed xid
    2241             :          * in the list as that ensures that the subscriber would have the
    2242             :          * corresponding schema and we don't need to send it unless there is
    2243             :          * any invalidation for that relation.
    2244             :          */
    2245         190 :         foreach(lc, entry->streamed_txns)
    2246             :         {
    2247         148 :             if (xid == lfirst_xid(lc))
    2248             :             {
    2249         108 :                 if (is_commit)
    2250          86 :                     entry->schema_sent = true;
    2251             : 
    2252         108 :                 entry->streamed_txns =
    2253         108 :                     foreach_delete_current(entry->streamed_txns, lc);
    2254         108 :                 break;
    2255             :             }
    2256             :         }
    2257             :     }
    2258         144 : }
    2259             : 
    2260             : /*
    2261             :  * Relcache invalidation callback
    2262             :  */
    2263             : static void
    2264        6588 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
    2265             : {
    2266             :     RelationSyncEntry *entry;
    2267             : 
    2268             :     /*
    2269             :      * We can get here if the plugin was used in SQL interface as the
    2270             :      * RelationSyncCache is destroyed when the decoding finishes, but there is
    2271             :      * no way to unregister the relcache invalidation callback.
    2272             :      */
    2273        6588 :     if (RelationSyncCache == NULL)
    2274          18 :         return;
    2275             : 
    2276             :     /*
    2277             :      * Nobody keeps pointers to entries in this hash table around outside
    2278             :      * logical decoding callback calls - but invalidation events can come in
    2279             :      * *during* a callback if we do any syscache access in the callback.
    2280             :      * Because of that we must mark the cache entry as invalid but not damage
    2281             :      * any of its substructure here.  The next get_rel_sync_entry() call will
    2282             :      * rebuild it all.
    2283             :      */
    2284        6570 :     if (OidIsValid(relid))
    2285             :     {
    2286             :         /*
    2287             :          * Getting invalidations for relations that aren't in the table is
    2288             :          * entirely normal.  So we don't care if it's found or not.
    2289             :          */
    2290        6518 :         entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
    2291             :                                                   HASH_FIND, NULL);
    2292        6518 :         if (entry != NULL)
    2293         916 :             entry->replicate_valid = false;
    2294             :     }
    2295             :     else
    2296             :     {
    2297             :         /* Whole cache must be flushed. */
    2298             :         HASH_SEQ_STATUS status;
    2299             : 
    2300          52 :         hash_seq_init(&status, RelationSyncCache);
    2301         112 :         while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2302             :         {
    2303          60 :             entry->replicate_valid = false;
    2304             :         }
    2305             :     }
    2306             : }
    2307             : 
    2308             : /*
    2309             :  * Publication relation/schema map syscache invalidation callback
    2310             :  *
    2311             :  * Called for invalidations on pg_publication, pg_publication_rel,
    2312             :  * pg_publication_namespace, and pg_namespace.
    2313             :  */
    2314             : static void
    2315        1360 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
    2316             : {
    2317             :     HASH_SEQ_STATUS status;
    2318             :     RelationSyncEntry *entry;
    2319             : 
    2320             :     /*
    2321             :      * We can get here if the plugin was used in SQL interface as the
    2322             :      * RelationSyncCache is destroyed when the decoding finishes, but there is
    2323             :      * no way to unregister the invalidation callbacks.
    2324             :      */
    2325        1360 :     if (RelationSyncCache == NULL)
    2326          68 :         return;
    2327             : 
    2328             :     /*
    2329             :      * We have no easy way to identify which cache entries this invalidation
    2330             :      * event might have affected, so just mark them all invalid.
    2331             :      */
    2332        1292 :     hash_seq_init(&status, RelationSyncCache);
    2333        3758 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
    2334             :     {
    2335        2466 :         entry->replicate_valid = false;
    2336             :     }
    2337             : }
    2338             : 
    2339             : /* Send Replication origin */
    2340             : static void
    2341        2024 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
    2342             :                  XLogRecPtr origin_lsn, bool send_origin)
    2343             : {
    2344        2024 :     if (send_origin)
    2345             :     {
    2346             :         char       *origin;
    2347             : 
    2348             :         /*----------
    2349             :          * XXX: which behaviour do we want here?
    2350             :          *
    2351             :          * Alternatives:
    2352             :          *  - don't send origin message if origin name not found
    2353             :          *    (that's what we do now)
    2354             :          *  - throw error - that will break replication, not good
    2355             :          *  - send some special "unknown" origin
    2356             :          *----------
    2357             :          */
    2358          16 :         if (replorigin_by_oid(origin_id, true, &origin))
    2359             :         {
    2360             :             /* Message boundary */
    2361          16 :             OutputPluginWrite(ctx, false);
    2362          16 :             OutputPluginPrepareWrite(ctx, true);
    2363             : 
    2364          16 :             logicalrep_write_origin(ctx->out, origin, origin_lsn);
    2365             :         }
    2366             :     }
    2367        2024 : }

Generated by: LCOV version 1.14