Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "parser/parse_relation.h"
26 : #include "replication/logical.h"
27 : #include "replication/logicalproto.h"
28 : #include "replication/origin.h"
29 : #include "replication/pgoutput.h"
30 : #include "utils/builtins.h"
31 : #include "utils/inval.h"
32 : #include "utils/lsyscache.h"
33 : #include "utils/memutils.h"
34 : #include "utils/rel.h"
35 : #include "utils/syscache.h"
36 : #include "utils/varlena.h"
37 :
38 898 : PG_MODULE_MAGIC;
39 :
40 : static void pgoutput_startup(LogicalDecodingContext *ctx,
41 : OutputPluginOptions *opt, bool is_init);
42 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
43 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
44 : ReorderBufferTXN *txn);
45 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
46 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
47 : static void pgoutput_change(LogicalDecodingContext *ctx,
48 : ReorderBufferTXN *txn, Relation relation,
49 : ReorderBufferChange *change);
50 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
51 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
52 : ReorderBufferChange *change);
53 : static void pgoutput_message(LogicalDecodingContext *ctx,
54 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
55 : bool transactional, const char *prefix,
56 : Size sz, const char *message);
57 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
58 : RepOriginId origin_id);
59 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
60 : ReorderBufferTXN *txn);
61 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
62 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
63 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
64 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
65 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn,
67 : XLogRecPtr prepare_end_lsn,
68 : TimestampTz prepare_time);
69 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
70 : ReorderBufferTXN *txn);
71 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
72 : ReorderBufferTXN *txn);
73 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
74 : ReorderBufferTXN *txn,
75 : XLogRecPtr abort_lsn);
76 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
77 : ReorderBufferTXN *txn,
78 : XLogRecPtr commit_lsn);
79 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
80 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
81 :
82 : static bool publications_valid;
83 :
84 : static List *LoadPublications(List *pubnames);
85 : static void publication_invalidation_cb(Datum arg, int cacheid,
86 : uint32 hashvalue);
87 : static void send_repl_origin(LogicalDecodingContext *ctx,
88 : RepOriginId origin_id, XLogRecPtr origin_lsn,
89 : bool send_origin);
90 :
91 : /*
92 : * Only 3 publication actions are used for row filtering ("insert", "update",
93 : * "delete"). See RelationSyncEntry.exprstate[].
94 : */
95 : enum RowFilterPubAction
96 : {
97 : PUBACTION_INSERT,
98 : PUBACTION_UPDATE,
99 : PUBACTION_DELETE,
100 : };
101 :
102 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
103 :
104 : /*
105 : * Entry in the map used to remember which relation schemas we sent.
106 : *
107 : * The schema_sent flag determines if the current schema record for the
108 : * relation (and for its ancestor if publish_as_relid is set) was already
109 : * sent to the subscriber (in which case we don't need to send it again).
110 : *
111 : * The schema cache on downstream is however updated only at commit time,
112 : * and with streamed transactions the commit order may be different from
113 : * the order the transactions are sent in. Also, the (sub) transactions
114 : * might get aborted so we need to send the schema for each (sub) transaction
115 : * so that we don't lose the schema information on abort. For handling this,
116 : * we maintain the list of xids (streamed_txns) for those we have already sent
117 : * the schema.
118 : *
119 : * For partitions, 'pubactions' considers not only the table's own
120 : * publications, but also those of all of its ancestors.
121 : */
122 : typedef struct RelationSyncEntry
123 : {
124 : Oid relid; /* relation oid */
125 :
126 : bool replicate_valid; /* overall validity flag for entry */
127 :
128 : bool schema_sent;
129 :
130 : /*
131 : * This is set if the 'publish_generated_columns' parameter is true, and
132 : * the relation contains generated columns.
133 : */
134 : bool include_gencols;
135 : List *streamed_txns; /* streamed toplevel transactions with this
136 : * schema */
137 :
138 : /* are we publishing this rel? */
139 : PublicationActions pubactions;
140 :
141 : /*
142 : * ExprState array for row filter. Different publication actions don't
143 : * allow multiple expressions to always be combined into one, because
144 : * updates or deletes restrict the column in expression to be part of the
145 : * replica identity index whereas inserts do not have this restriction, so
146 : * there is one ExprState per publication action.
147 : */
148 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
149 : EState *estate; /* executor state used for row filter */
150 : TupleTableSlot *new_slot; /* slot for storing new tuple */
151 : TupleTableSlot *old_slot; /* slot for storing old tuple */
152 :
153 : /*
154 : * OID of the relation to publish changes as. For a partition, this may
155 : * be set to one of its ancestors whose schema will be used when
156 : * replicating changes, if publish_via_partition_root is set for the
157 : * publication.
158 : */
159 : Oid publish_as_relid;
160 :
161 : /*
162 : * Map used when replicating using an ancestor's schema to convert tuples
163 : * from partition's type to the ancestor's; NULL if publish_as_relid is
164 : * same as 'relid' or if unnecessary due to partition and the ancestor
165 : * having identical TupleDesc.
166 : */
167 : AttrMap *attrmap;
168 :
169 : /*
170 : * Columns included in the publication, or NULL if all columns are
171 : * included implicitly. Note that the attnums in this bitmap are not
172 : * shifted by FirstLowInvalidHeapAttributeNumber.
173 : */
174 : Bitmapset *columns;
175 :
176 : /*
177 : * Private context to store additional data for this entry - state for the
178 : * row filter expressions, column list, etc.
179 : */
180 : MemoryContext entry_cxt;
181 : } RelationSyncEntry;
182 :
183 : /*
184 : * Maintain a per-transaction level variable to track whether the transaction
185 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
186 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
187 : * messages for empty transactions which saves network bandwidth.
188 : *
189 : * This optimization is not used for prepared transactions because if the
190 : * WALSender restarts after prepare of a transaction and before commit prepared
191 : * of the same transaction then we won't be able to figure out if we have
192 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
193 : * because we would have lost the in-memory txndata information that was
194 : * present prior to the restart. This will result in sending a spurious
195 : * COMMIT PREPARED without a corresponding prepared transaction at the
196 : * downstream which would lead to an error when it tries to process it.
197 : *
198 : * XXX We could achieve this optimization by changing protocol to send
199 : * additional information so that downstream can detect that the corresponding
200 : * prepare has not been sent. However, adding such a check for every
201 : * transaction in the downstream could be costly so we might want to do it
202 : * optionally.
203 : *
204 : * We also don't have this optimization for streamed transactions because
205 : * they can contain prepared transactions.
206 : */
207 : typedef struct PGOutputTxnData
208 : {
209 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
210 : } PGOutputTxnData;
211 :
212 : /* Map used to remember which relation schemas we sent. */
213 : static HTAB *RelationSyncCache = NULL;
214 :
215 : static void init_rel_sync_cache(MemoryContext cachectx);
216 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
217 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
218 : Relation relation);
219 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
220 : LogicalDecodingContext *ctx,
221 : RelationSyncEntry *relentry);
222 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
223 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
224 : uint32 hashvalue);
225 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
226 : TransactionId xid);
227 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
228 : TransactionId xid);
229 : static void init_tuple_slot(PGOutputData *data, Relation relation,
230 : RelationSyncEntry *entry);
231 :
232 : /* row filter routines */
233 : static EState *create_estate_for_relation(Relation rel);
234 : static void pgoutput_row_filter_init(PGOutputData *data,
235 : List *publications,
236 : RelationSyncEntry *entry);
237 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
238 : ExprContext *econtext);
239 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
240 : TupleTableSlot **new_slot_ptr,
241 : RelationSyncEntry *entry,
242 : ReorderBufferChangeType *action);
243 :
244 : /* column list routines */
245 : static void pgoutput_column_list_init(PGOutputData *data,
246 : List *publications,
247 : RelationSyncEntry *entry);
248 :
249 : /*
250 : * Specify output plugin callbacks
251 : */
252 : void
253 1234 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
254 : {
255 1234 : cb->startup_cb = pgoutput_startup;
256 1234 : cb->begin_cb = pgoutput_begin_txn;
257 1234 : cb->change_cb = pgoutput_change;
258 1234 : cb->truncate_cb = pgoutput_truncate;
259 1234 : cb->message_cb = pgoutput_message;
260 1234 : cb->commit_cb = pgoutput_commit_txn;
261 :
262 1234 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
263 1234 : cb->prepare_cb = pgoutput_prepare_txn;
264 1234 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
265 1234 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
266 1234 : cb->filter_by_origin_cb = pgoutput_origin_filter;
267 1234 : cb->shutdown_cb = pgoutput_shutdown;
268 :
269 : /* transaction streaming */
270 1234 : cb->stream_start_cb = pgoutput_stream_start;
271 1234 : cb->stream_stop_cb = pgoutput_stream_stop;
272 1234 : cb->stream_abort_cb = pgoutput_stream_abort;
273 1234 : cb->stream_commit_cb = pgoutput_stream_commit;
274 1234 : cb->stream_change_cb = pgoutput_change;
275 1234 : cb->stream_message_cb = pgoutput_message;
276 1234 : cb->stream_truncate_cb = pgoutput_truncate;
277 : /* transaction streaming - two-phase commit */
278 1234 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
279 1234 : }
280 :
281 : static void
282 666 : parse_output_parameters(List *options, PGOutputData *data)
283 : {
284 : ListCell *lc;
285 666 : bool protocol_version_given = false;
286 666 : bool publication_names_given = false;
287 666 : bool binary_option_given = false;
288 666 : bool messages_option_given = false;
289 666 : bool streaming_given = false;
290 666 : bool two_phase_option_given = false;
291 666 : bool origin_option_given = false;
292 :
293 666 : data->binary = false;
294 666 : data->streaming = LOGICALREP_STREAM_OFF;
295 666 : data->messages = false;
296 666 : data->two_phase = false;
297 :
298 3326 : foreach(lc, options)
299 : {
300 2660 : DefElem *defel = (DefElem *) lfirst(lc);
301 :
302 : Assert(defel->arg == NULL || IsA(defel->arg, String));
303 :
304 : /* Check each param, whether or not we recognize it */
305 2660 : if (strcmp(defel->defname, "proto_version") == 0)
306 : {
307 : unsigned long parsed;
308 : char *endptr;
309 :
310 666 : if (protocol_version_given)
311 0 : ereport(ERROR,
312 : (errcode(ERRCODE_SYNTAX_ERROR),
313 : errmsg("conflicting or redundant options")));
314 666 : protocol_version_given = true;
315 :
316 666 : errno = 0;
317 666 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
318 666 : if (errno != 0 || *endptr != '\0')
319 0 : ereport(ERROR,
320 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
321 : errmsg("invalid proto_version")));
322 :
323 666 : if (parsed > PG_UINT32_MAX)
324 0 : ereport(ERROR,
325 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
326 : errmsg("proto_version \"%s\" out of range",
327 : strVal(defel->arg))));
328 :
329 666 : data->protocol_version = (uint32) parsed;
330 : }
331 1994 : else if (strcmp(defel->defname, "publication_names") == 0)
332 : {
333 666 : if (publication_names_given)
334 0 : ereport(ERROR,
335 : (errcode(ERRCODE_SYNTAX_ERROR),
336 : errmsg("conflicting or redundant options")));
337 666 : publication_names_given = true;
338 :
339 666 : if (!SplitIdentifierString(strVal(defel->arg), ',',
340 : &data->publication_names))
341 0 : ereport(ERROR,
342 : (errcode(ERRCODE_INVALID_NAME),
343 : errmsg("invalid publication_names syntax")));
344 : }
345 1328 : else if (strcmp(defel->defname, "binary") == 0)
346 : {
347 20 : if (binary_option_given)
348 0 : ereport(ERROR,
349 : (errcode(ERRCODE_SYNTAX_ERROR),
350 : errmsg("conflicting or redundant options")));
351 20 : binary_option_given = true;
352 :
353 20 : data->binary = defGetBoolean(defel);
354 : }
355 1308 : else if (strcmp(defel->defname, "messages") == 0)
356 : {
357 8 : if (messages_option_given)
358 0 : ereport(ERROR,
359 : (errcode(ERRCODE_SYNTAX_ERROR),
360 : errmsg("conflicting or redundant options")));
361 8 : messages_option_given = true;
362 :
363 8 : data->messages = defGetBoolean(defel);
364 : }
365 1300 : else if (strcmp(defel->defname, "streaming") == 0)
366 : {
367 634 : if (streaming_given)
368 0 : ereport(ERROR,
369 : (errcode(ERRCODE_SYNTAX_ERROR),
370 : errmsg("conflicting or redundant options")));
371 634 : streaming_given = true;
372 :
373 634 : data->streaming = defGetStreamingMode(defel);
374 : }
375 666 : else if (strcmp(defel->defname, "two_phase") == 0)
376 : {
377 14 : if (two_phase_option_given)
378 0 : ereport(ERROR,
379 : (errcode(ERRCODE_SYNTAX_ERROR),
380 : errmsg("conflicting or redundant options")));
381 14 : two_phase_option_given = true;
382 :
383 14 : data->two_phase = defGetBoolean(defel);
384 : }
385 652 : else if (strcmp(defel->defname, "origin") == 0)
386 : {
387 : char *origin;
388 :
389 652 : if (origin_option_given)
390 0 : ereport(ERROR,
391 : errcode(ERRCODE_SYNTAX_ERROR),
392 : errmsg("conflicting or redundant options"));
393 652 : origin_option_given = true;
394 :
395 652 : origin = defGetString(defel);
396 652 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
397 26 : data->publish_no_origin = true;
398 626 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
399 626 : data->publish_no_origin = false;
400 : else
401 0 : ereport(ERROR,
402 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
403 : errmsg("unrecognized origin value: \"%s\"", origin));
404 : }
405 : else
406 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
407 : }
408 :
409 : /* Check required options */
410 666 : if (!protocol_version_given)
411 0 : ereport(ERROR,
412 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
413 : errmsg("option \"%s\" missing", "proto_version"));
414 666 : if (!publication_names_given)
415 0 : ereport(ERROR,
416 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 : errmsg("option \"%s\" missing", "publication_names"));
418 666 : }
419 :
420 : /*
421 : * Initialize this plugin
422 : */
423 : static void
424 1234 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
425 : bool is_init)
426 : {
427 1234 : PGOutputData *data = palloc0(sizeof(PGOutputData));
428 : static bool publication_callback_registered = false;
429 :
430 : /* Create our memory context for private allocations. */
431 1234 : data->context = AllocSetContextCreate(ctx->context,
432 : "logical replication output context",
433 : ALLOCSET_DEFAULT_SIZES);
434 :
435 1234 : data->cachectx = AllocSetContextCreate(ctx->context,
436 : "logical replication cache context",
437 : ALLOCSET_DEFAULT_SIZES);
438 :
439 1234 : ctx->output_plugin_private = data;
440 :
441 : /* This plugin uses binary protocol. */
442 1234 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
443 :
444 : /*
445 : * This is replication start and not slot initialization.
446 : *
447 : * Parse and validate options passed by the client.
448 : */
449 1234 : if (!is_init)
450 : {
451 : /* Parse the params and ERROR if we see any we don't recognize */
452 666 : parse_output_parameters(ctx->output_plugin_options, data);
453 :
454 : /* Check if we support requested protocol */
455 666 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
456 0 : ereport(ERROR,
457 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
459 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
460 :
461 666 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
462 0 : ereport(ERROR,
463 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
464 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
465 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
466 :
467 : /*
468 : * Decide whether to enable streaming. It is disabled by default, in
469 : * which case we just update the flag in decoding context. Otherwise
470 : * we only allow it with sufficient version of the protocol, and when
471 : * the output plugin supports it.
472 : */
473 666 : if (data->streaming == LOGICALREP_STREAM_OFF)
474 32 : ctx->streaming = false;
475 634 : else if (data->streaming == LOGICALREP_STREAM_ON &&
476 54 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
477 0 : ereport(ERROR,
478 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
479 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
480 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
481 634 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
482 580 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
483 0 : ereport(ERROR,
484 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
485 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
486 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
487 634 : else if (!ctx->streaming)
488 0 : ereport(ERROR,
489 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
490 : errmsg("streaming requested, but not supported by output plugin")));
491 :
492 : /*
493 : * Here, we just check whether the two-phase option is passed by
494 : * plugin and decide whether to enable it at later point of time. It
495 : * remains enabled if the previous start-up has done so. But we only
496 : * allow the option to be passed in with sufficient version of the
497 : * protocol, and when the output plugin supports it.
498 : */
499 666 : if (!data->two_phase)
500 652 : ctx->twophase_opt_given = false;
501 14 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
502 0 : ereport(ERROR,
503 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
504 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
505 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
506 14 : else if (!ctx->twophase)
507 0 : ereport(ERROR,
508 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
509 : errmsg("two-phase commit requested, but not supported by output plugin")));
510 : else
511 14 : ctx->twophase_opt_given = true;
512 :
513 : /* Init publication state. */
514 666 : data->publications = NIL;
515 666 : publications_valid = false;
516 :
517 : /*
518 : * Register callback for pg_publication if we didn't already do that
519 : * during some previous call in this process.
520 : */
521 666 : if (!publication_callback_registered)
522 : {
523 664 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
524 : publication_invalidation_cb,
525 : (Datum) 0);
526 664 : publication_callback_registered = true;
527 : }
528 :
529 : /* Initialize relation schema cache. */
530 666 : init_rel_sync_cache(CacheMemoryContext);
531 : }
532 : else
533 : {
534 : /*
535 : * Disable the streaming and prepared transactions during the slot
536 : * initialization mode.
537 : */
538 568 : ctx->streaming = false;
539 568 : ctx->twophase = false;
540 : }
541 1234 : }
542 :
543 : /*
544 : * BEGIN callback.
545 : *
546 : * Don't send the BEGIN message here instead postpone it until the first
547 : * change. In logical replication, a common scenario is to replicate a set of
548 : * tables (instead of all tables) and transactions whose changes were on
549 : * the table(s) that are not published will produce empty transactions. These
550 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
551 : * using bandwidth on something with little/no use for logical replication.
552 : */
553 : static void
554 1344 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
555 : {
556 1344 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
557 : sizeof(PGOutputTxnData));
558 :
559 1344 : txn->output_plugin_private = txndata;
560 1344 : }
561 :
562 : /*
563 : * Send BEGIN.
564 : *
565 : * This is called while processing the first change of the transaction.
566 : */
567 : static void
568 770 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
569 : {
570 770 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
571 770 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
572 :
573 : Assert(txndata);
574 : Assert(!txndata->sent_begin_txn);
575 :
576 770 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
577 770 : logicalrep_write_begin(ctx->out, txn);
578 770 : txndata->sent_begin_txn = true;
579 :
580 770 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
581 : send_replication_origin);
582 :
583 770 : OutputPluginWrite(ctx, true);
584 770 : }
585 :
586 : /*
587 : * COMMIT callback
588 : */
589 : static void
590 1338 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
591 : XLogRecPtr commit_lsn)
592 : {
593 1338 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
594 : bool sent_begin_txn;
595 :
596 : Assert(txndata);
597 :
598 : /*
599 : * We don't need to send the commit message unless some relevant change
600 : * from this transaction has been sent to the downstream.
601 : */
602 1338 : sent_begin_txn = txndata->sent_begin_txn;
603 1338 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
604 1338 : pfree(txndata);
605 1338 : txn->output_plugin_private = NULL;
606 :
607 1338 : if (!sent_begin_txn)
608 : {
609 570 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
610 570 : return;
611 : }
612 :
613 768 : OutputPluginPrepareWrite(ctx, true);
614 768 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
615 768 : OutputPluginWrite(ctx, true);
616 : }
617 :
618 : /*
619 : * BEGIN PREPARE callback
620 : */
621 : static void
622 40 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
623 : {
624 40 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
625 :
626 40 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
627 40 : logicalrep_write_begin_prepare(ctx->out, txn);
628 :
629 40 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
630 : send_replication_origin);
631 :
632 40 : OutputPluginWrite(ctx, true);
633 40 : }
634 :
635 : /*
636 : * PREPARE callback
637 : */
638 : static void
639 40 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
640 : XLogRecPtr prepare_lsn)
641 : {
642 40 : OutputPluginUpdateProgress(ctx, false);
643 :
644 40 : OutputPluginPrepareWrite(ctx, true);
645 40 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
646 40 : OutputPluginWrite(ctx, true);
647 40 : }
648 :
649 : /*
650 : * COMMIT PREPARED callback
651 : */
652 : static void
653 48 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
654 : XLogRecPtr commit_lsn)
655 : {
656 48 : OutputPluginUpdateProgress(ctx, false);
657 :
658 48 : OutputPluginPrepareWrite(ctx, true);
659 48 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
660 48 : OutputPluginWrite(ctx, true);
661 48 : }
662 :
663 : /*
664 : * ROLLBACK PREPARED callback
665 : */
666 : static void
667 18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
668 : ReorderBufferTXN *txn,
669 : XLogRecPtr prepare_end_lsn,
670 : TimestampTz prepare_time)
671 : {
672 18 : OutputPluginUpdateProgress(ctx, false);
673 :
674 18 : OutputPluginPrepareWrite(ctx, true);
675 18 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
676 : prepare_time);
677 18 : OutputPluginWrite(ctx, true);
678 18 : }
679 :
680 : /*
681 : * Write the current schema of the relation and its ancestor (if any) if not
682 : * done yet.
683 : */
684 : static void
685 364356 : maybe_send_schema(LogicalDecodingContext *ctx,
686 : ReorderBufferChange *change,
687 : Relation relation, RelationSyncEntry *relentry)
688 : {
689 364356 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
690 : bool schema_sent;
691 364356 : TransactionId xid = InvalidTransactionId;
692 364356 : TransactionId topxid = InvalidTransactionId;
693 :
694 : /*
695 : * Remember XID of the (sub)transaction for the change. We don't care if
696 : * it's top-level transaction or not (we have already sent that XID in
697 : * start of the current streaming block).
698 : *
699 : * If we're not in a streaming block, just use InvalidTransactionId and
700 : * the write methods will not include it.
701 : */
702 364356 : if (data->in_streaming)
703 351882 : xid = change->txn->xid;
704 :
705 364356 : if (rbtxn_is_subtxn(change->txn))
706 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
707 : else
708 344018 : topxid = xid;
709 :
710 : /*
711 : * Do we need to send the schema? We do track streamed transactions
712 : * separately, because those may be applied later (and the regular
713 : * transactions won't see their effects until then) and in an order that
714 : * we don't know at this point.
715 : *
716 : * XXX There is a scope of optimization here. Currently, we always send
717 : * the schema first time in a streaming transaction but we can probably
718 : * avoid that by checking 'relentry->schema_sent' flag. However, before
719 : * doing that we need to study its impact on the case where we have a mix
720 : * of streaming and non-streaming transactions.
721 : */
722 364356 : if (data->in_streaming)
723 351882 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
724 : else
725 12474 : schema_sent = relentry->schema_sent;
726 :
727 : /* Nothing to do if we already sent the schema. */
728 364356 : if (schema_sent)
729 363762 : return;
730 :
731 : /*
732 : * Send the schema. If the changes will be published using an ancestor's
733 : * schema, not the relation's own, send that ancestor's schema before
734 : * sending relation's own (XXX - maybe sending only the former suffices?).
735 : */
736 594 : if (relentry->publish_as_relid != RelationGetRelid(relation))
737 : {
738 64 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
739 :
740 64 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
741 62 : RelationClose(ancestor);
742 : }
743 :
744 592 : send_relation_and_attrs(relation, xid, ctx, relentry);
745 :
746 592 : if (data->in_streaming)
747 142 : set_schema_sent_in_streamed_txn(relentry, topxid);
748 : else
749 450 : relentry->schema_sent = true;
750 : }
751 :
752 : /*
753 : * Sends a relation
754 : */
755 : static void
756 656 : send_relation_and_attrs(Relation relation, TransactionId xid,
757 : LogicalDecodingContext *ctx,
758 : RelationSyncEntry *relentry)
759 : {
760 656 : TupleDesc desc = RelationGetDescr(relation);
761 656 : Bitmapset *columns = relentry->columns;
762 656 : bool include_gencols = relentry->include_gencols;
763 : int i;
764 :
765 : /*
766 : * Write out type info if needed. We do that only for user-created types.
767 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
768 : * objects with hand-assigned OIDs to be "built in", not for instance any
769 : * function or type defined in the information_schema. This is important
770 : * because only hand-assigned OIDs can be expected to remain stable across
771 : * major versions.
772 : */
773 1980 : for (i = 0; i < desc->natts; i++)
774 : {
775 1324 : Form_pg_attribute att = TupleDescAttr(desc, i);
776 :
777 1324 : if (!logicalrep_should_publish_column(att, columns, include_gencols))
778 138 : continue;
779 :
780 1186 : if (att->atttypid < FirstGenbkiObjectId)
781 1150 : continue;
782 :
783 36 : OutputPluginPrepareWrite(ctx, false);
784 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
785 36 : OutputPluginWrite(ctx, false);
786 : }
787 :
788 656 : OutputPluginPrepareWrite(ctx, false);
789 656 : logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols);
790 656 : OutputPluginWrite(ctx, false);
791 654 : }
792 :
793 : /*
794 : * Executor state preparation for evaluation of row filter expressions for the
795 : * specified relation.
796 : */
797 : static EState *
798 32 : create_estate_for_relation(Relation rel)
799 : {
800 : EState *estate;
801 : RangeTblEntry *rte;
802 32 : List *perminfos = NIL;
803 :
804 32 : estate = CreateExecutorState();
805 :
806 32 : rte = makeNode(RangeTblEntry);
807 32 : rte->rtekind = RTE_RELATION;
808 32 : rte->relid = RelationGetRelid(rel);
809 32 : rte->relkind = rel->rd_rel->relkind;
810 32 : rte->rellockmode = AccessShareLock;
811 :
812 32 : addRTEPermissionInfo(&perminfos, rte);
813 :
814 32 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
815 :
816 32 : estate->es_output_cid = GetCurrentCommandId(false);
817 :
818 32 : return estate;
819 : }
820 :
821 : /*
822 : * Evaluates row filter.
823 : *
824 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
825 : * isn't replicated.
826 : */
827 : static bool
828 72 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
829 : {
830 : Datum ret;
831 : bool isnull;
832 :
833 : Assert(state != NULL);
834 :
835 72 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
836 :
837 72 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
838 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
839 : isnull ? "true" : "false");
840 :
841 72 : if (isnull)
842 2 : return false;
843 :
844 70 : return DatumGetBool(ret);
845 : }
846 :
847 : /*
848 : * Make sure the per-entry memory context exists.
849 : */
850 : static void
851 550 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
852 : {
853 : Relation relation;
854 :
855 : /* The context may already exist, in which case bail out. */
856 550 : if (entry->entry_cxt)
857 32 : return;
858 :
859 518 : relation = RelationIdGetRelation(entry->publish_as_relid);
860 :
861 518 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
862 : "entry private context",
863 : ALLOCSET_SMALL_SIZES);
864 :
865 518 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
866 : RelationGetRelationName(relation));
867 : }
868 :
869 : /*
870 : * Initialize the row filter.
871 : */
872 : static void
873 518 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
874 : RelationSyncEntry *entry)
875 : {
876 : ListCell *lc;
877 518 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
878 518 : bool no_filter[] = {false, false, false}; /* One per pubaction */
879 : MemoryContext oldctx;
880 : int idx;
881 518 : bool has_filter = true;
882 518 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
883 :
884 : /*
885 : * Find if there are any row filters for this relation. If there are, then
886 : * prepare the necessary ExprState and cache it in entry->exprstate. To
887 : * build an expression state, we need to ensure the following:
888 : *
889 : * All the given publication-table mappings must be checked.
890 : *
891 : * Multiple publications might have multiple row filters for this
892 : * relation. Since row filter usage depends on the DML operation, there
893 : * are multiple lists (one for each operation) to which row filters will
894 : * be appended.
895 : *
896 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
897 : * expression" so it takes precedence.
898 : */
899 558 : foreach(lc, publications)
900 : {
901 526 : Publication *pub = lfirst(lc);
902 526 : HeapTuple rftuple = NULL;
903 526 : Datum rfdatum = 0;
904 526 : bool pub_no_filter = true;
905 :
906 : /*
907 : * If the publication is FOR ALL TABLES, or the publication includes a
908 : * FOR TABLES IN SCHEMA where the table belongs to the referred
909 : * schema, then it is treated the same as if there are no row filters
910 : * (even if other publications have a row filter).
911 : */
912 526 : if (!pub->alltables &&
913 390 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
914 : ObjectIdGetDatum(schemaid),
915 : ObjectIdGetDatum(pub->oid)))
916 : {
917 : /*
918 : * Check for the presence of a row filter in this publication.
919 : */
920 378 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
921 : ObjectIdGetDatum(entry->publish_as_relid),
922 : ObjectIdGetDatum(pub->oid));
923 :
924 378 : if (HeapTupleIsValid(rftuple))
925 : {
926 : /* Null indicates no filter. */
927 354 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
928 : Anum_pg_publication_rel_prqual,
929 : &pub_no_filter);
930 : }
931 : }
932 :
933 526 : if (pub_no_filter)
934 : {
935 500 : if (rftuple)
936 328 : ReleaseSysCache(rftuple);
937 :
938 500 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
939 500 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
940 500 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
941 :
942 : /*
943 : * Quick exit if all the DML actions are publicized via this
944 : * publication.
945 : */
946 500 : if (no_filter[PUBACTION_INSERT] &&
947 500 : no_filter[PUBACTION_UPDATE] &&
948 486 : no_filter[PUBACTION_DELETE])
949 : {
950 486 : has_filter = false;
951 486 : break;
952 : }
953 :
954 : /* No additional work for this publication. Next one. */
955 14 : continue;
956 : }
957 :
958 : /* Form the per pubaction row filter lists. */
959 26 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
960 26 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
961 26 : TextDatumGetCString(rfdatum));
962 26 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
963 26 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
964 26 : TextDatumGetCString(rfdatum));
965 26 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
966 26 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
967 26 : TextDatumGetCString(rfdatum));
968 :
969 26 : ReleaseSysCache(rftuple);
970 : } /* loop all subscribed publications */
971 :
972 : /* Clean the row filter */
973 2072 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
974 : {
975 1554 : if (no_filter[idx])
976 : {
977 1476 : list_free_deep(rfnodes[idx]);
978 1476 : rfnodes[idx] = NIL;
979 : }
980 : }
981 :
982 518 : if (has_filter)
983 : {
984 32 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
985 :
986 32 : pgoutput_ensure_entry_cxt(data, entry);
987 :
988 : /*
989 : * Now all the filters for all pubactions are known. Combine them when
990 : * their pubactions are the same.
991 : */
992 32 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
993 32 : entry->estate = create_estate_for_relation(relation);
994 128 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
995 : {
996 96 : List *filters = NIL;
997 : Expr *rfnode;
998 :
999 96 : if (rfnodes[idx] == NIL)
1000 42 : continue;
1001 :
1002 114 : foreach(lc, rfnodes[idx])
1003 60 : filters = lappend(filters, stringToNode((char *) lfirst(lc)));
1004 :
1005 : /* combine the row filter and cache the ExprState */
1006 54 : rfnode = make_orclause(filters);
1007 54 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1008 : } /* for each pubaction */
1009 32 : MemoryContextSwitchTo(oldctx);
1010 :
1011 32 : RelationClose(relation);
1012 : }
1013 518 : }
1014 :
1015 : /*
1016 : * If the table contains a generated column, check for any conflicting
1017 : * values of 'publish_generated_columns' parameter in the publications.
1018 : */
1019 : static void
1020 518 : check_and_init_gencol(PGOutputData *data, List *publications,
1021 : RelationSyncEntry *entry)
1022 : {
1023 518 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1024 518 : TupleDesc desc = RelationGetDescr(relation);
1025 518 : bool gencolpresent = false;
1026 518 : bool first = true;
1027 :
1028 : /* Check if there is any generated column present. */
1029 1534 : for (int i = 0; i < desc->natts; i++)
1030 : {
1031 1030 : Form_pg_attribute att = TupleDescAttr(desc, i);
1032 :
1033 1030 : if (att->attgenerated)
1034 : {
1035 14 : gencolpresent = true;
1036 14 : break;
1037 : }
1038 : }
1039 :
1040 : /* There are no generated columns to be published. */
1041 518 : if (!gencolpresent)
1042 : {
1043 504 : entry->include_gencols = false;
1044 504 : return;
1045 : }
1046 :
1047 : /*
1048 : * There may be a conflicting value for 'publish_generated_columns'
1049 : * parameter in the publications.
1050 : */
1051 44 : foreach_ptr(Publication, pub, publications)
1052 : {
1053 : /*
1054 : * The column list takes precedence over the
1055 : * 'publish_generated_columns' parameter. Those will be checked later,
1056 : * see pgoutput_column_list_init.
1057 : */
1058 16 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1059 6 : continue;
1060 :
1061 10 : if (first)
1062 : {
1063 10 : entry->include_gencols = pub->pubgencols;
1064 10 : first = false;
1065 : }
1066 0 : else if (entry->include_gencols != pub->pubgencols)
1067 0 : ereport(ERROR,
1068 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1069 : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1070 : get_namespace_name(RelationGetNamespace(relation)),
1071 : RelationGetRelationName(relation)));
1072 : }
1073 : }
1074 :
1075 : /*
1076 : * Initialize the column list.
1077 : */
1078 : static void
1079 518 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1080 : RelationSyncEntry *entry)
1081 : {
1082 : ListCell *lc;
1083 518 : bool first = true;
1084 518 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1085 518 : bool found_pub_collist = false;
1086 518 : Bitmapset *relcols = NULL;
1087 :
1088 518 : pgoutput_ensure_entry_cxt(data, entry);
1089 :
1090 : /*
1091 : * Find if there are any column lists for this relation. If there are,
1092 : * build a bitmap using the column lists.
1093 : *
1094 : * Multiple publications might have multiple column lists for this
1095 : * relation.
1096 : *
1097 : * Note that we don't support the case where the column list is different
1098 : * for the same table when combining publications. See comments atop
1099 : * fetch_table_list. But one can later change the publication so we still
1100 : * need to check all the given publication-table mappings and report an
1101 : * error if any publications have a different column list.
1102 : */
1103 1056 : foreach(lc, publications)
1104 : {
1105 540 : Publication *pub = lfirst(lc);
1106 540 : Bitmapset *cols = NULL;
1107 :
1108 : /* Retrieve the bitmap of columns for a column list publication. */
1109 540 : found_pub_collist |= check_and_fetch_column_list(pub,
1110 : entry->publish_as_relid,
1111 : entry->entry_cxt, &cols);
1112 :
1113 : /*
1114 : * For non-column list publications — e.g. TABLE (without a column
1115 : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1116 : * of the table (including generated columns when
1117 : * 'publish_generated_columns' parameter is true).
1118 : */
1119 540 : if (!cols)
1120 : {
1121 : /*
1122 : * Cache the table columns for the first publication with no
1123 : * specified column list to detect publication with a different
1124 : * column list.
1125 : */
1126 462 : if (!relcols && (list_length(publications) > 1))
1127 : {
1128 18 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1129 :
1130 18 : relcols = pub_form_cols_map(relation, entry->include_gencols);
1131 18 : MemoryContextSwitchTo(oldcxt);
1132 : }
1133 :
1134 462 : cols = relcols;
1135 : }
1136 :
1137 540 : if (first)
1138 : {
1139 518 : entry->columns = cols;
1140 518 : first = false;
1141 : }
1142 22 : else if (!bms_equal(entry->columns, cols))
1143 2 : ereport(ERROR,
1144 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1145 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1146 : get_namespace_name(RelationGetNamespace(relation)),
1147 : RelationGetRelationName(relation)));
1148 : } /* loop all subscribed publications */
1149 :
1150 : /*
1151 : * If no column list publications exist, columns to be published will be
1152 : * computed later according to the 'publish_generated_columns' parameter.
1153 : */
1154 516 : if (!found_pub_collist)
1155 444 : entry->columns = NULL;
1156 :
1157 516 : RelationClose(relation);
1158 516 : }
1159 :
1160 : /*
1161 : * Initialize the slot for storing new and old tuples, and build the map that
1162 : * will be used to convert the relation's tuples into the ancestor's format.
1163 : */
1164 : static void
1165 518 : init_tuple_slot(PGOutputData *data, Relation relation,
1166 : RelationSyncEntry *entry)
1167 : {
1168 : MemoryContext oldctx;
1169 : TupleDesc oldtupdesc;
1170 : TupleDesc newtupdesc;
1171 :
1172 518 : oldctx = MemoryContextSwitchTo(data->cachectx);
1173 :
1174 : /*
1175 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1176 : * live as long as the cache remains.
1177 : */
1178 518 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1179 518 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1180 :
1181 518 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1182 518 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1183 :
1184 518 : MemoryContextSwitchTo(oldctx);
1185 :
1186 : /*
1187 : * Cache the map that will be used to convert the relation's tuples into
1188 : * the ancestor's format, if needed.
1189 : */
1190 518 : if (entry->publish_as_relid != RelationGetRelid(relation))
1191 : {
1192 72 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1193 72 : TupleDesc indesc = RelationGetDescr(relation);
1194 72 : TupleDesc outdesc = RelationGetDescr(ancestor);
1195 :
1196 : /* Map must live as long as the session does. */
1197 72 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1198 :
1199 72 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1200 :
1201 72 : MemoryContextSwitchTo(oldctx);
1202 72 : RelationClose(ancestor);
1203 : }
1204 518 : }
1205 :
1206 : /*
1207 : * Change is checked against the row filter if any.
1208 : *
1209 : * Returns true if the change is to be replicated, else false.
1210 : *
1211 : * For inserts, evaluate the row filter for new tuple.
1212 : * For deletes, evaluate the row filter for old tuple.
1213 : * For updates, evaluate the row filter for old and new tuple.
1214 : *
1215 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1216 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1217 : * only one of the tuples matches the row filter expression, we transform
1218 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1219 : * following rules:
1220 : *
1221 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1222 : * Case 2: old-row (no match) new row (match) -> INSERT
1223 : * Case 3: old-row (match) new-row (no match) -> DELETE
1224 : * Case 4: old-row (match) new row (match) -> UPDATE
1225 : *
1226 : * The new action is updated in the action parameter.
1227 : *
1228 : * The new slot could be updated when transforming the UPDATE into INSERT,
1229 : * because the original new tuple might not have column values from the replica
1230 : * identity.
1231 : *
1232 : * Examples:
1233 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1234 : * Since the old tuple satisfies, the initial table synchronization copied this
1235 : * row (or another method was used to guarantee that there is data
1236 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1237 : * row filter, so from a data consistency perspective, that row should be
1238 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1239 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1240 : * is undesirable because it doesn't reflect what was defined in the row filter
1241 : * expression on the publisher. This row on the subscriber would likely not be
1242 : * modified by replication again. If someone inserted a new row with the same
1243 : * old identifier, replication could stop due to a constraint violation.
1244 : *
1245 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1246 : * Since the old tuple doesn't satisfy, the initial table synchronization
1247 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1248 : * satisfy the row filter, so from a data consistency perspective, that row
1249 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1250 : * statements have no effect (it matches no row -- see
1251 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1252 : * INSERT statement and be sent to the subscriber. However, this might surprise
1253 : * someone who expects the data set to satisfy the row filter expression on the
1254 : * provider.
1255 : */
1256 : static bool
1257 364346 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1258 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1259 : ReorderBufferChangeType *action)
1260 : {
1261 : TupleDesc desc;
1262 : int i;
1263 : bool old_matched,
1264 : new_matched,
1265 : result;
1266 : TupleTableSlot *tmp_new_slot;
1267 364346 : TupleTableSlot *new_slot = *new_slot_ptr;
1268 : ExprContext *ecxt;
1269 : ExprState *filter_exprstate;
1270 :
1271 : /*
1272 : * We need this map to avoid relying on ReorderBufferChangeType enums
1273 : * having specific values.
1274 : */
1275 : static const int map_changetype_pubaction[] = {
1276 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1277 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1278 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1279 : };
1280 :
1281 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1282 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1283 : *action == REORDER_BUFFER_CHANGE_DELETE);
1284 :
1285 : Assert(new_slot || old_slot);
1286 :
1287 : /* Get the corresponding row filter */
1288 364346 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1289 :
1290 : /* Bail out if there is no row filter */
1291 364346 : if (!filter_exprstate)
1292 364282 : return true;
1293 :
1294 64 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1295 : get_namespace_name(RelationGetNamespace(relation)),
1296 : RelationGetRelationName(relation));
1297 :
1298 64 : ResetPerTupleExprContext(entry->estate);
1299 :
1300 64 : ecxt = GetPerTupleExprContext(entry->estate);
1301 :
1302 : /*
1303 : * For the following occasions where there is only one tuple, we can
1304 : * evaluate the row filter for that tuple and return.
1305 : *
1306 : * For inserts, we only have the new tuple.
1307 : *
1308 : * For updates, we can have only a new tuple when none of the replica
1309 : * identity columns changed and none of those columns have external data
1310 : * but we still need to evaluate the row filter for the new tuple as the
1311 : * existing values of those columns might not match the filter. Also,
1312 : * users can use constant expressions in the row filter, so we anyway need
1313 : * to evaluate it for the new tuple.
1314 : *
1315 : * For deletes, we only have the old tuple.
1316 : */
1317 64 : if (!new_slot || !old_slot)
1318 : {
1319 56 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1320 56 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1321 :
1322 56 : return result;
1323 : }
1324 :
1325 : /*
1326 : * Both the old and new tuples must be valid only for updates and need to
1327 : * be checked against the row filter.
1328 : */
1329 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1330 :
1331 8 : slot_getallattrs(new_slot);
1332 8 : slot_getallattrs(old_slot);
1333 :
1334 8 : tmp_new_slot = NULL;
1335 8 : desc = RelationGetDescr(relation);
1336 :
1337 : /*
1338 : * The new tuple might not have all the replica identity columns, in which
1339 : * case it needs to be copied over from the old tuple.
1340 : */
1341 24 : for (i = 0; i < desc->natts; i++)
1342 : {
1343 16 : Form_pg_attribute att = TupleDescAttr(desc, i);
1344 :
1345 : /*
1346 : * if the column in the new tuple or old tuple is null, nothing to do
1347 : */
1348 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1349 2 : continue;
1350 :
1351 : /*
1352 : * Unchanged toasted replica identity columns are only logged in the
1353 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1354 : * Logged) toast values are always assembled in memory and set as
1355 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1356 : */
1357 14 : if (att->attlen == -1 &&
1358 8 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1359 2 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1360 : {
1361 2 : if (!tmp_new_slot)
1362 : {
1363 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1364 2 : ExecClearTuple(tmp_new_slot);
1365 :
1366 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1367 2 : desc->natts * sizeof(Datum));
1368 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1369 2 : desc->natts * sizeof(bool));
1370 : }
1371 :
1372 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1373 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1374 : }
1375 : }
1376 :
1377 8 : ecxt->ecxt_scantuple = old_slot;
1378 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1379 :
1380 8 : if (tmp_new_slot)
1381 : {
1382 2 : ExecStoreVirtualTuple(tmp_new_slot);
1383 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1384 : }
1385 : else
1386 6 : ecxt->ecxt_scantuple = new_slot;
1387 :
1388 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1389 :
1390 : /*
1391 : * Case 1: if both tuples don't match the row filter, bailout. Send
1392 : * nothing.
1393 : */
1394 8 : if (!old_matched && !new_matched)
1395 0 : return false;
1396 :
1397 : /*
1398 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1399 : * tuple does, transform the UPDATE into INSERT.
1400 : *
1401 : * Use the newly transformed tuple that must contain the column values for
1402 : * all the replica identity columns. This is required to ensure that the
1403 : * while inserting the tuple in the downstream node, we have all the
1404 : * required column values.
1405 : */
1406 8 : if (!old_matched && new_matched)
1407 : {
1408 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1409 :
1410 4 : if (tmp_new_slot)
1411 2 : *new_slot_ptr = tmp_new_slot;
1412 : }
1413 :
1414 : /*
1415 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1416 : * doesn't, transform the UPDATE into DELETE.
1417 : *
1418 : * This transformation does not require another tuple. The Old tuple will
1419 : * be used for DELETE.
1420 : */
1421 4 : else if (old_matched && !new_matched)
1422 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1423 :
1424 : /*
1425 : * Case 4: if both tuples match the row filter, transformation isn't
1426 : * required. (*action is default UPDATE).
1427 : */
1428 :
1429 8 : return true;
1430 : }
1431 :
1432 : /*
1433 : * Sends the decoded DML over wire.
1434 : *
1435 : * This is called both in streaming and non-streaming modes.
1436 : */
1437 : static void
1438 366626 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1439 : Relation relation, ReorderBufferChange *change)
1440 : {
1441 366626 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1442 366626 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1443 : MemoryContext old;
1444 : RelationSyncEntry *relentry;
1445 366626 : TransactionId xid = InvalidTransactionId;
1446 366626 : Relation ancestor = NULL;
1447 366626 : Relation targetrel = relation;
1448 366626 : ReorderBufferChangeType action = change->action;
1449 366626 : TupleTableSlot *old_slot = NULL;
1450 366626 : TupleTableSlot *new_slot = NULL;
1451 :
1452 366626 : if (!is_publishable_relation(relation))
1453 2276 : return;
1454 :
1455 : /*
1456 : * Remember the xid for the change in streaming mode. We need to send xid
1457 : * with each change in the streaming mode so that subscriber can make
1458 : * their association and on aborts, it can discard the corresponding
1459 : * changes.
1460 : */
1461 366626 : if (data->in_streaming)
1462 351882 : xid = change->txn->xid;
1463 :
1464 366626 : relentry = get_rel_sync_entry(data, relation);
1465 :
1466 : /* First check the table filter */
1467 366622 : switch (action)
1468 : {
1469 211826 : case REORDER_BUFFER_CHANGE_INSERT:
1470 211826 : if (!relentry->pubactions.pubinsert)
1471 106 : return;
1472 211720 : break;
1473 68962 : case REORDER_BUFFER_CHANGE_UPDATE:
1474 68962 : if (!relentry->pubactions.pubupdate)
1475 88 : return;
1476 68874 : break;
1477 85834 : case REORDER_BUFFER_CHANGE_DELETE:
1478 85834 : if (!relentry->pubactions.pubdelete)
1479 2082 : return;
1480 :
1481 : /*
1482 : * This is only possible if deletes are allowed even when replica
1483 : * identity is not defined for a table. Since the DELETE action
1484 : * can't be published, we simply return.
1485 : */
1486 83752 : if (!change->data.tp.oldtuple)
1487 : {
1488 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1489 0 : return;
1490 : }
1491 83752 : break;
1492 364346 : default:
1493 : Assert(false);
1494 : }
1495 :
1496 : /* Avoid leaking memory by using and resetting our own context */
1497 364346 : old = MemoryContextSwitchTo(data->context);
1498 :
1499 : /* Switch relation if publishing via root. */
1500 364346 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1501 : {
1502 : Assert(relation->rd_rel->relispartition);
1503 138 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1504 138 : targetrel = ancestor;
1505 : }
1506 :
1507 364346 : if (change->data.tp.oldtuple)
1508 : {
1509 84008 : old_slot = relentry->old_slot;
1510 84008 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1511 :
1512 : /* Convert tuple if needed. */
1513 84008 : if (relentry->attrmap)
1514 : {
1515 10 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1516 : &TTSOpsVirtual);
1517 :
1518 10 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1519 : }
1520 : }
1521 :
1522 364346 : if (change->data.tp.newtuple)
1523 : {
1524 280594 : new_slot = relentry->new_slot;
1525 280594 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1526 :
1527 : /* Convert tuple if needed. */
1528 280594 : if (relentry->attrmap)
1529 : {
1530 38 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1531 : &TTSOpsVirtual);
1532 :
1533 38 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1534 : }
1535 : }
1536 :
1537 : /*
1538 : * Check row filter.
1539 : *
1540 : * Updates could be transformed to inserts or deletes based on the results
1541 : * of the row filter for old and new tuple.
1542 : */
1543 364346 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1544 22 : goto cleanup;
1545 :
1546 : /*
1547 : * Send BEGIN if we haven't yet.
1548 : *
1549 : * We send the BEGIN message after ensuring that we will actually send the
1550 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1551 : * transactions.
1552 : */
1553 364324 : if (txndata && !txndata->sent_begin_txn)
1554 746 : pgoutput_send_begin(ctx, txn);
1555 :
1556 : /*
1557 : * Schema should be sent using the original relation because it also sends
1558 : * the ancestor's relation.
1559 : */
1560 364324 : maybe_send_schema(ctx, change, relation, relentry);
1561 :
1562 364322 : OutputPluginPrepareWrite(ctx, true);
1563 :
1564 : /* Send the data */
1565 364322 : switch (action)
1566 : {
1567 211700 : case REORDER_BUFFER_CHANGE_INSERT:
1568 211700 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1569 211700 : data->binary, relentry->columns,
1570 211700 : relentry->include_gencols);
1571 211700 : break;
1572 68868 : case REORDER_BUFFER_CHANGE_UPDATE:
1573 68868 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1574 68868 : new_slot, data->binary, relentry->columns,
1575 68868 : relentry->include_gencols);
1576 68868 : break;
1577 83754 : case REORDER_BUFFER_CHANGE_DELETE:
1578 83754 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1579 83754 : data->binary, relentry->columns,
1580 83754 : relentry->include_gencols);
1581 83754 : break;
1582 364322 : default:
1583 : Assert(false);
1584 : }
1585 :
1586 364322 : OutputPluginWrite(ctx, true);
1587 :
1588 364344 : cleanup:
1589 364344 : if (RelationIsValid(ancestor))
1590 : {
1591 136 : RelationClose(ancestor);
1592 136 : ancestor = NULL;
1593 : }
1594 :
1595 : /* Drop the new slots that were used to store the converted tuples. */
1596 364344 : if (relentry->attrmap)
1597 : {
1598 48 : if (old_slot)
1599 10 : ExecDropSingleTupleTableSlot(old_slot);
1600 :
1601 48 : if (new_slot)
1602 38 : ExecDropSingleTupleTableSlot(new_slot);
1603 : }
1604 :
1605 364344 : MemoryContextSwitchTo(old);
1606 364344 : MemoryContextReset(data->context);
1607 : }
1608 :
1609 : static void
1610 28 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1611 : int nrelations, Relation relations[], ReorderBufferChange *change)
1612 : {
1613 28 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1614 28 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1615 : MemoryContext old;
1616 : RelationSyncEntry *relentry;
1617 : int i;
1618 : int nrelids;
1619 : Oid *relids;
1620 28 : TransactionId xid = InvalidTransactionId;
1621 :
1622 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1623 28 : if (data->in_streaming)
1624 0 : xid = change->txn->xid;
1625 :
1626 28 : old = MemoryContextSwitchTo(data->context);
1627 :
1628 28 : relids = palloc0(nrelations * sizeof(Oid));
1629 28 : nrelids = 0;
1630 :
1631 94 : for (i = 0; i < nrelations; i++)
1632 : {
1633 66 : Relation relation = relations[i];
1634 66 : Oid relid = RelationGetRelid(relation);
1635 :
1636 66 : if (!is_publishable_relation(relation))
1637 0 : continue;
1638 :
1639 66 : relentry = get_rel_sync_entry(data, relation);
1640 :
1641 66 : if (!relentry->pubactions.pubtruncate)
1642 28 : continue;
1643 :
1644 : /*
1645 : * Don't send partitions if the publication wants to send only the
1646 : * root tables through it.
1647 : */
1648 38 : if (relation->rd_rel->relispartition &&
1649 30 : relentry->publish_as_relid != relid)
1650 6 : continue;
1651 :
1652 32 : relids[nrelids++] = relid;
1653 :
1654 : /* Send BEGIN if we haven't yet */
1655 32 : if (txndata && !txndata->sent_begin_txn)
1656 20 : pgoutput_send_begin(ctx, txn);
1657 :
1658 32 : maybe_send_schema(ctx, change, relation, relentry);
1659 : }
1660 :
1661 28 : if (nrelids > 0)
1662 : {
1663 20 : OutputPluginPrepareWrite(ctx, true);
1664 20 : logicalrep_write_truncate(ctx->out,
1665 : xid,
1666 : nrelids,
1667 : relids,
1668 20 : change->data.truncate.cascade,
1669 20 : change->data.truncate.restart_seqs);
1670 20 : OutputPluginWrite(ctx, true);
1671 : }
1672 :
1673 28 : MemoryContextSwitchTo(old);
1674 28 : MemoryContextReset(data->context);
1675 28 : }
1676 :
1677 : static void
1678 14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1679 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1680 : const char *message)
1681 : {
1682 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1683 14 : TransactionId xid = InvalidTransactionId;
1684 :
1685 14 : if (!data->messages)
1686 4 : return;
1687 :
1688 : /*
1689 : * Remember the xid for the message in streaming mode. See
1690 : * pgoutput_change.
1691 : */
1692 10 : if (data->in_streaming)
1693 0 : xid = txn->xid;
1694 :
1695 : /*
1696 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1697 : */
1698 10 : if (transactional)
1699 : {
1700 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1701 :
1702 : /* Send BEGIN if we haven't yet */
1703 4 : if (txndata && !txndata->sent_begin_txn)
1704 4 : pgoutput_send_begin(ctx, txn);
1705 : }
1706 :
1707 10 : OutputPluginPrepareWrite(ctx, true);
1708 10 : logicalrep_write_message(ctx->out,
1709 : xid,
1710 : message_lsn,
1711 : transactional,
1712 : prefix,
1713 : sz,
1714 : message);
1715 10 : OutputPluginWrite(ctx, true);
1716 : }
1717 :
1718 : /*
1719 : * Return true if the data is associated with an origin and the user has
1720 : * requested the changes that don't have an origin, false otherwise.
1721 : */
1722 : static bool
1723 980938 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1724 : RepOriginId origin_id)
1725 : {
1726 980938 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1727 :
1728 980938 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1729 140 : return true;
1730 :
1731 980798 : return false;
1732 : }
1733 :
1734 : /*
1735 : * Shutdown the output plugin.
1736 : *
1737 : * Note, we don't need to clean the data->context and data->cachectx as
1738 : * they are child contexts of the ctx->context so they will be cleaned up by
1739 : * logical decoding machinery.
1740 : */
1741 : static void
1742 912 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1743 : {
1744 912 : if (RelationSyncCache)
1745 : {
1746 344 : hash_destroy(RelationSyncCache);
1747 344 : RelationSyncCache = NULL;
1748 : }
1749 912 : }
1750 :
1751 : /*
1752 : * Load publications from the list of publication names.
1753 : */
1754 : static List *
1755 304 : LoadPublications(List *pubnames)
1756 : {
1757 304 : List *result = NIL;
1758 : ListCell *lc;
1759 :
1760 692 : foreach(lc, pubnames)
1761 : {
1762 390 : char *pubname = (char *) lfirst(lc);
1763 390 : Publication *pub = GetPublicationByName(pubname, false);
1764 :
1765 388 : result = lappend(result, pub);
1766 : }
1767 :
1768 302 : return result;
1769 : }
1770 :
1771 : /*
1772 : * Publication syscache invalidation callback.
1773 : *
1774 : * Called for invalidations on pg_publication.
1775 : */
1776 : static void
1777 418 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1778 : {
1779 418 : publications_valid = false;
1780 :
1781 : /*
1782 : * Also invalidate per-relation cache so that next time the filtering info
1783 : * is checked it will be updated with the new publication settings.
1784 : */
1785 418 : rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1786 418 : }
1787 :
1788 : /*
1789 : * START STREAM callback
1790 : */
1791 : static void
1792 1274 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1793 : ReorderBufferTXN *txn)
1794 : {
1795 1274 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1796 1274 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1797 :
1798 : /* we can't nest streaming of transactions */
1799 : Assert(!data->in_streaming);
1800 :
1801 : /*
1802 : * If we already sent the first stream for this transaction then don't
1803 : * send the origin id in the subsequent streams.
1804 : */
1805 1274 : if (rbtxn_is_streamed(txn))
1806 1148 : send_replication_origin = false;
1807 :
1808 1274 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1809 1274 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1810 :
1811 1274 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1812 : send_replication_origin);
1813 :
1814 1274 : OutputPluginWrite(ctx, true);
1815 :
1816 : /* we're streaming a chunk of transaction now */
1817 1274 : data->in_streaming = true;
1818 1274 : }
1819 :
1820 : /*
1821 : * STOP STREAM callback
1822 : */
1823 : static void
1824 1274 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1825 : ReorderBufferTXN *txn)
1826 : {
1827 1274 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1828 :
1829 : /* we should be streaming a transaction */
1830 : Assert(data->in_streaming);
1831 :
1832 1274 : OutputPluginPrepareWrite(ctx, true);
1833 1274 : logicalrep_write_stream_stop(ctx->out);
1834 1274 : OutputPluginWrite(ctx, true);
1835 :
1836 : /* we've stopped streaming a transaction */
1837 1274 : data->in_streaming = false;
1838 1274 : }
1839 :
1840 : /*
1841 : * Notify downstream to discard the streamed transaction (along with all
1842 : * it's subtransactions, if it's a toplevel transaction).
1843 : */
1844 : static void
1845 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1846 : ReorderBufferTXN *txn,
1847 : XLogRecPtr abort_lsn)
1848 : {
1849 : ReorderBufferTXN *toptxn;
1850 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1851 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1852 :
1853 : /*
1854 : * The abort should happen outside streaming block, even for streamed
1855 : * transactions. The transaction has to be marked as streamed, though.
1856 : */
1857 : Assert(!data->in_streaming);
1858 :
1859 : /* determine the toplevel transaction */
1860 52 : toptxn = rbtxn_get_toptxn(txn);
1861 :
1862 : Assert(rbtxn_is_streamed(toptxn));
1863 :
1864 52 : OutputPluginPrepareWrite(ctx, true);
1865 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1866 : txn->xact_time.abort_time, write_abort_info);
1867 :
1868 52 : OutputPluginWrite(ctx, true);
1869 :
1870 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1871 52 : }
1872 :
1873 : /*
1874 : * Notify downstream to apply the streamed transaction (along with all
1875 : * it's subtransactions).
1876 : */
1877 : static void
1878 92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1879 : ReorderBufferTXN *txn,
1880 : XLogRecPtr commit_lsn)
1881 : {
1882 92 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1883 :
1884 : /*
1885 : * The commit should happen outside streaming block, even for streamed
1886 : * transactions. The transaction has to be marked as streamed, though.
1887 : */
1888 : Assert(!data->in_streaming);
1889 : Assert(rbtxn_is_streamed(txn));
1890 :
1891 92 : OutputPluginUpdateProgress(ctx, false);
1892 :
1893 92 : OutputPluginPrepareWrite(ctx, true);
1894 92 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1895 92 : OutputPluginWrite(ctx, true);
1896 :
1897 92 : cleanup_rel_sync_cache(txn->xid, true);
1898 92 : }
1899 :
1900 : /*
1901 : * PREPARE callback (for streaming two-phase commit).
1902 : *
1903 : * Notify the downstream to prepare the transaction.
1904 : */
1905 : static void
1906 28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1907 : ReorderBufferTXN *txn,
1908 : XLogRecPtr prepare_lsn)
1909 : {
1910 : Assert(rbtxn_is_streamed(txn));
1911 :
1912 28 : OutputPluginUpdateProgress(ctx, false);
1913 28 : OutputPluginPrepareWrite(ctx, true);
1914 28 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1915 28 : OutputPluginWrite(ctx, true);
1916 28 : }
1917 :
1918 : /*
1919 : * Initialize the relation schema sync cache for a decoding session.
1920 : *
1921 : * The hash table is destroyed at the end of a decoding session. While
1922 : * relcache invalidations still exist and will still be invoked, they
1923 : * will just see the null hash table global and take no action.
1924 : */
1925 : static void
1926 666 : init_rel_sync_cache(MemoryContext cachectx)
1927 : {
1928 : HASHCTL ctl;
1929 : static bool relation_callbacks_registered = false;
1930 :
1931 : /* Nothing to do if hash table already exists */
1932 666 : if (RelationSyncCache != NULL)
1933 2 : return;
1934 :
1935 : /* Make a new hash table for the cache */
1936 666 : ctl.keysize = sizeof(Oid);
1937 666 : ctl.entrysize = sizeof(RelationSyncEntry);
1938 666 : ctl.hcxt = cachectx;
1939 :
1940 666 : RelationSyncCache = hash_create("logical replication output relation cache",
1941 : 128, &ctl,
1942 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1943 :
1944 : Assert(RelationSyncCache != NULL);
1945 :
1946 : /* No more to do if we already registered callbacks */
1947 666 : if (relation_callbacks_registered)
1948 2 : return;
1949 :
1950 : /* We must update the cache entry for a relation after a relcache flush */
1951 664 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1952 :
1953 : /*
1954 : * Flush all cache entries after a pg_namespace change, in case it was a
1955 : * schema rename affecting a relation being replicated.
1956 : */
1957 664 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1958 : rel_sync_cache_publication_cb,
1959 : (Datum) 0);
1960 :
1961 : /*
1962 : * Flush all cache entries after any publication changes. (We need no
1963 : * callback entry for pg_publication, because publication_invalidation_cb
1964 : * will take care of it.)
1965 : */
1966 664 : CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1967 : rel_sync_cache_publication_cb,
1968 : (Datum) 0);
1969 664 : CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1970 : rel_sync_cache_publication_cb,
1971 : (Datum) 0);
1972 :
1973 664 : relation_callbacks_registered = true;
1974 : }
1975 :
1976 : /*
1977 : * We expect relatively small number of streamed transactions.
1978 : */
1979 : static bool
1980 351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1981 : {
1982 351882 : return list_member_xid(entry->streamed_txns, xid);
1983 : }
1984 :
1985 : /*
1986 : * Add the xid in the rel sync entry for which we have already sent the schema
1987 : * of the relation.
1988 : */
1989 : static void
1990 142 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1991 : {
1992 : MemoryContext oldctx;
1993 :
1994 142 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1995 :
1996 142 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1997 :
1998 142 : MemoryContextSwitchTo(oldctx);
1999 142 : }
2000 :
2001 : /*
2002 : * Find or create entry in the relation schema cache.
2003 : *
2004 : * This looks up publications that the given relation is directly or
2005 : * indirectly part of (the latter if it's really the relation's ancestor that
2006 : * is part of a publication) and fills up the found entry with the information
2007 : * about which operations to publish and whether to use an ancestor's schema
2008 : * when publishing.
2009 : */
2010 : static RelationSyncEntry *
2011 366692 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2012 : {
2013 : RelationSyncEntry *entry;
2014 : bool found;
2015 : MemoryContext oldctx;
2016 366692 : Oid relid = RelationGetRelid(relation);
2017 :
2018 : Assert(RelationSyncCache != NULL);
2019 :
2020 : /* Find cached relation info, creating if not found */
2021 366692 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2022 : &relid,
2023 : HASH_ENTER, &found);
2024 : Assert(entry != NULL);
2025 :
2026 : /* initialize entry, if it's new */
2027 366692 : if (!found)
2028 : {
2029 484 : entry->replicate_valid = false;
2030 484 : entry->schema_sent = false;
2031 484 : entry->include_gencols = false;
2032 484 : entry->streamed_txns = NIL;
2033 484 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2034 484 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2035 484 : entry->new_slot = NULL;
2036 484 : entry->old_slot = NULL;
2037 484 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2038 484 : entry->entry_cxt = NULL;
2039 484 : entry->publish_as_relid = InvalidOid;
2040 484 : entry->columns = NULL;
2041 484 : entry->attrmap = NULL;
2042 : }
2043 :
2044 : /* Validate the entry */
2045 366692 : if (!entry->replicate_valid)
2046 : {
2047 616 : Oid schemaId = get_rel_namespace(relid);
2048 616 : List *pubids = GetRelationPublications(relid);
2049 :
2050 : /*
2051 : * We don't acquire a lock on the namespace system table as we build
2052 : * the cache entry using a historic snapshot and all the later changes
2053 : * are absorbed while decoding WAL.
2054 : */
2055 616 : List *schemaPubids = GetSchemaPublications(schemaId);
2056 : ListCell *lc;
2057 616 : Oid publish_as_relid = relid;
2058 616 : int publish_ancestor_level = 0;
2059 616 : bool am_partition = get_rel_relispartition(relid);
2060 616 : char relkind = get_rel_relkind(relid);
2061 616 : List *rel_publications = NIL;
2062 :
2063 : /* Reload publications if needed before use. */
2064 616 : if (!publications_valid)
2065 : {
2066 304 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2067 304 : if (data->publications)
2068 : {
2069 38 : list_free_deep(data->publications);
2070 38 : data->publications = NIL;
2071 : }
2072 304 : data->publications = LoadPublications(data->publication_names);
2073 302 : MemoryContextSwitchTo(oldctx);
2074 302 : publications_valid = true;
2075 : }
2076 :
2077 : /*
2078 : * Reset schema_sent status as the relation definition may have
2079 : * changed. Also reset pubactions to empty in case rel was dropped
2080 : * from a publication. Also free any objects that depended on the
2081 : * earlier definition.
2082 : */
2083 614 : entry->schema_sent = false;
2084 614 : entry->include_gencols = false;
2085 614 : list_free(entry->streamed_txns);
2086 614 : entry->streamed_txns = NIL;
2087 614 : bms_free(entry->columns);
2088 614 : entry->columns = NULL;
2089 614 : entry->pubactions.pubinsert = false;
2090 614 : entry->pubactions.pubupdate = false;
2091 614 : entry->pubactions.pubdelete = false;
2092 614 : entry->pubactions.pubtruncate = false;
2093 :
2094 : /*
2095 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2096 : */
2097 614 : if (entry->old_slot)
2098 : {
2099 112 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2100 :
2101 : Assert(desc->tdrefcount == -1);
2102 :
2103 112 : ExecDropSingleTupleTableSlot(entry->old_slot);
2104 :
2105 : /*
2106 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2107 : * do it now to avoid any leaks.
2108 : */
2109 112 : FreeTupleDesc(desc);
2110 : }
2111 614 : if (entry->new_slot)
2112 : {
2113 112 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2114 :
2115 : Assert(desc->tdrefcount == -1);
2116 :
2117 112 : ExecDropSingleTupleTableSlot(entry->new_slot);
2118 :
2119 : /*
2120 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2121 : * do it now to avoid any leaks.
2122 : */
2123 112 : FreeTupleDesc(desc);
2124 : }
2125 :
2126 614 : entry->old_slot = NULL;
2127 614 : entry->new_slot = NULL;
2128 :
2129 614 : if (entry->attrmap)
2130 6 : free_attrmap(entry->attrmap);
2131 614 : entry->attrmap = NULL;
2132 :
2133 : /*
2134 : * Row filter cache cleanups.
2135 : */
2136 614 : if (entry->entry_cxt)
2137 112 : MemoryContextDelete(entry->entry_cxt);
2138 :
2139 614 : entry->entry_cxt = NULL;
2140 614 : entry->estate = NULL;
2141 614 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2142 :
2143 : /*
2144 : * Build publication cache. We can't use one provided by relcache as
2145 : * relcache considers all publications that the given relation is in,
2146 : * but here we only need to consider ones that the subscriber
2147 : * requested.
2148 : */
2149 1524 : foreach(lc, data->publications)
2150 : {
2151 910 : Publication *pub = lfirst(lc);
2152 910 : bool publish = false;
2153 :
2154 : /*
2155 : * Under what relid should we publish changes in this publication?
2156 : * We'll use the top-most relid across all publications. Also
2157 : * track the ancestor level for this publication.
2158 : */
2159 910 : Oid pub_relid = relid;
2160 910 : int ancestor_level = 0;
2161 :
2162 : /*
2163 : * If this is a FOR ALL TABLES publication, pick the partition
2164 : * root and set the ancestor level accordingly.
2165 : */
2166 910 : if (pub->alltables)
2167 : {
2168 140 : publish = true;
2169 140 : if (pub->pubviaroot && am_partition)
2170 : {
2171 24 : List *ancestors = get_partition_ancestors(relid);
2172 :
2173 24 : pub_relid = llast_oid(ancestors);
2174 24 : ancestor_level = list_length(ancestors);
2175 : }
2176 : }
2177 :
2178 910 : if (!publish)
2179 : {
2180 770 : bool ancestor_published = false;
2181 :
2182 : /*
2183 : * For a partition, check if any of the ancestors are
2184 : * published. If so, note down the topmost ancestor that is
2185 : * published via this publication, which will be used as the
2186 : * relation via which to publish the partition's changes.
2187 : */
2188 770 : if (am_partition)
2189 : {
2190 : Oid ancestor;
2191 : int level;
2192 236 : List *ancestors = get_partition_ancestors(relid);
2193 :
2194 236 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2195 : ancestors,
2196 : &level);
2197 :
2198 236 : if (ancestor != InvalidOid)
2199 : {
2200 96 : ancestor_published = true;
2201 96 : if (pub->pubviaroot)
2202 : {
2203 50 : pub_relid = ancestor;
2204 50 : ancestor_level = level;
2205 : }
2206 : }
2207 : }
2208 :
2209 1196 : if (list_member_oid(pubids, pub->oid) ||
2210 840 : list_member_oid(schemaPubids, pub->oid) ||
2211 : ancestor_published)
2212 412 : publish = true;
2213 : }
2214 :
2215 : /*
2216 : * If the relation is to be published, determine actions to
2217 : * publish, and list of columns, if appropriate.
2218 : *
2219 : * Don't publish changes for partitioned tables, because
2220 : * publishing those of its partitions suffices, unless partition
2221 : * changes won't be published due to pubviaroot being set.
2222 : */
2223 910 : if (publish &&
2224 8 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2225 : {
2226 546 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2227 546 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2228 546 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2229 546 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2230 :
2231 : /*
2232 : * We want to publish the changes as the top-most ancestor
2233 : * across all publications. So we need to check if the already
2234 : * calculated level is higher than the new one. If yes, we can
2235 : * ignore the new value (as it's a child). Otherwise the new
2236 : * value is an ancestor, so we keep it.
2237 : */
2238 546 : if (publish_ancestor_level > ancestor_level)
2239 2 : continue;
2240 :
2241 : /*
2242 : * If we found an ancestor higher up in the tree, discard the
2243 : * list of publications through which we replicate it, and use
2244 : * the new ancestor.
2245 : */
2246 544 : if (publish_ancestor_level < ancestor_level)
2247 : {
2248 74 : publish_as_relid = pub_relid;
2249 74 : publish_ancestor_level = ancestor_level;
2250 :
2251 : /* reset the publication list for this relation */
2252 74 : rel_publications = NIL;
2253 : }
2254 : else
2255 : {
2256 : /* Same ancestor level, has to be the same OID. */
2257 : Assert(publish_as_relid == pub_relid);
2258 : }
2259 :
2260 : /* Track publications for this ancestor. */
2261 544 : rel_publications = lappend(rel_publications, pub);
2262 : }
2263 : }
2264 :
2265 614 : entry->publish_as_relid = publish_as_relid;
2266 :
2267 : /*
2268 : * Initialize the tuple slot, map, and row filter. These are only used
2269 : * when publishing inserts, updates, or deletes.
2270 : */
2271 614 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2272 96 : entry->pubactions.pubdelete)
2273 : {
2274 : /* Initialize the tuple slot and map */
2275 518 : init_tuple_slot(data, relation, entry);
2276 :
2277 : /* Initialize the row filter */
2278 518 : pgoutput_row_filter_init(data, rel_publications, entry);
2279 :
2280 : /* Check whether to publish generated columns. */
2281 518 : check_and_init_gencol(data, rel_publications, entry);
2282 :
2283 : /* Initialize the column list */
2284 518 : pgoutput_column_list_init(data, rel_publications, entry);
2285 : }
2286 :
2287 612 : list_free(pubids);
2288 612 : list_free(schemaPubids);
2289 612 : list_free(rel_publications);
2290 :
2291 612 : entry->replicate_valid = true;
2292 : }
2293 :
2294 366688 : return entry;
2295 : }
2296 :
2297 : /*
2298 : * Cleanup list of streamed transactions and update the schema_sent flag.
2299 : *
2300 : * When a streamed transaction commits or aborts, we need to remove the
2301 : * toplevel XID from the schema cache. If the transaction aborted, the
2302 : * subscriber will simply throw away the schema records we streamed, so
2303 : * we don't need to do anything else.
2304 : *
2305 : * If the transaction is committed, the subscriber will update the relation
2306 : * cache - so tweak the schema_sent flag accordingly.
2307 : */
2308 : static void
2309 144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2310 : {
2311 : HASH_SEQ_STATUS hash_seq;
2312 : RelationSyncEntry *entry;
2313 :
2314 : Assert(RelationSyncCache != NULL);
2315 :
2316 144 : hash_seq_init(&hash_seq, RelationSyncCache);
2317 294 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2318 : {
2319 : /*
2320 : * We can set the schema_sent flag for an entry that has committed xid
2321 : * in the list as that ensures that the subscriber would have the
2322 : * corresponding schema and we don't need to send it unless there is
2323 : * any invalidation for that relation.
2324 : */
2325 340 : foreach_xid(streamed_txn, entry->streamed_txns)
2326 : {
2327 148 : if (xid == streamed_txn)
2328 : {
2329 108 : if (is_commit)
2330 86 : entry->schema_sent = true;
2331 :
2332 108 : entry->streamed_txns =
2333 108 : foreach_delete_current(entry->streamed_txns, streamed_txn);
2334 108 : break;
2335 : }
2336 : }
2337 : }
2338 144 : }
2339 :
2340 : /*
2341 : * Relcache invalidation callback
2342 : */
2343 : static void
2344 6504 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2345 : {
2346 : RelationSyncEntry *entry;
2347 :
2348 : /*
2349 : * We can get here if the plugin was used in SQL interface as the
2350 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2351 : * no way to unregister the relcache invalidation callback.
2352 : */
2353 6504 : if (RelationSyncCache == NULL)
2354 18 : return;
2355 :
2356 : /*
2357 : * Nobody keeps pointers to entries in this hash table around outside
2358 : * logical decoding callback calls - but invalidation events can come in
2359 : * *during* a callback if we do any syscache access in the callback.
2360 : * Because of that we must mark the cache entry as invalid but not damage
2361 : * any of its substructure here. The next get_rel_sync_entry() call will
2362 : * rebuild it all.
2363 : */
2364 6486 : if (OidIsValid(relid))
2365 : {
2366 : /*
2367 : * Getting invalidations for relations that aren't in the table is
2368 : * entirely normal. So we don't care if it's found or not.
2369 : */
2370 6440 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2371 : HASH_FIND, NULL);
2372 6440 : if (entry != NULL)
2373 1094 : entry->replicate_valid = false;
2374 : }
2375 : else
2376 : {
2377 : /* Whole cache must be flushed. */
2378 : HASH_SEQ_STATUS status;
2379 :
2380 46 : hash_seq_init(&status, RelationSyncCache);
2381 106 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2382 : {
2383 60 : entry->replicate_valid = false;
2384 : }
2385 : }
2386 : }
2387 :
2388 : /*
2389 : * Publication relation/schema map syscache invalidation callback
2390 : *
2391 : * Called for invalidations on pg_publication, pg_publication_rel,
2392 : * pg_publication_namespace, and pg_namespace.
2393 : */
2394 : static void
2395 1178 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2396 : {
2397 : HASH_SEQ_STATUS status;
2398 : RelationSyncEntry *entry;
2399 :
2400 : /*
2401 : * We can get here if the plugin was used in SQL interface as the
2402 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2403 : * no way to unregister the invalidation callbacks.
2404 : */
2405 1178 : if (RelationSyncCache == NULL)
2406 68 : return;
2407 :
2408 : /*
2409 : * We have no easy way to identify which cache entries this invalidation
2410 : * event might have affected, so just mark them all invalid.
2411 : */
2412 1110 : hash_seq_init(&status, RelationSyncCache);
2413 3544 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2414 : {
2415 2434 : entry->replicate_valid = false;
2416 : }
2417 : }
2418 :
2419 : /* Send Replication origin */
2420 : static void
2421 2084 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2422 : XLogRecPtr origin_lsn, bool send_origin)
2423 : {
2424 2084 : if (send_origin)
2425 : {
2426 : char *origin;
2427 :
2428 : /*----------
2429 : * XXX: which behaviour do we want here?
2430 : *
2431 : * Alternatives:
2432 : * - don't send origin message if origin name not found
2433 : * (that's what we do now)
2434 : * - throw error - that will break replication, not good
2435 : * - send some special "unknown" origin
2436 : *----------
2437 : */
2438 18 : if (replorigin_by_oid(origin_id, true, &origin))
2439 : {
2440 : /* Message boundary */
2441 18 : OutputPluginWrite(ctx, false);
2442 18 : OutputPluginPrepareWrite(ctx, true);
2443 :
2444 18 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2445 : }
2446 : }
2447 2084 : }
|