Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "parser/parse_relation.h"
26 : #include "replication/logical.h"
27 : #include "replication/logicalproto.h"
28 : #include "replication/origin.h"
29 : #include "replication/pgoutput.h"
30 : #include "utils/builtins.h"
31 : #include "utils/inval.h"
32 : #include "utils/lsyscache.h"
33 : #include "utils/memutils.h"
34 : #include "utils/rel.h"
35 : #include "utils/syscache.h"
36 : #include "utils/varlena.h"
37 :
38 948 : PG_MODULE_MAGIC;
39 :
40 : static void pgoutput_startup(LogicalDecodingContext *ctx,
41 : OutputPluginOptions *opt, bool is_init);
42 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
43 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
44 : ReorderBufferTXN *txn);
45 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
46 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
47 : static void pgoutput_change(LogicalDecodingContext *ctx,
48 : ReorderBufferTXN *txn, Relation relation,
49 : ReorderBufferChange *change);
50 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
51 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
52 : ReorderBufferChange *change);
53 : static void pgoutput_message(LogicalDecodingContext *ctx,
54 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
55 : bool transactional, const char *prefix,
56 : Size sz, const char *message);
57 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
58 : RepOriginId origin_id);
59 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
60 : ReorderBufferTXN *txn);
61 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
62 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
63 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
64 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
65 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn,
67 : XLogRecPtr prepare_end_lsn,
68 : TimestampTz prepare_time);
69 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
70 : ReorderBufferTXN *txn);
71 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
72 : ReorderBufferTXN *txn);
73 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
74 : ReorderBufferTXN *txn,
75 : XLogRecPtr abort_lsn);
76 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
77 : ReorderBufferTXN *txn,
78 : XLogRecPtr commit_lsn);
79 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
80 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
81 :
82 : static bool publications_valid;
83 :
84 : static List *LoadPublications(List *pubnames);
85 : static void publication_invalidation_cb(Datum arg, int cacheid,
86 : uint32 hashvalue);
87 : static void send_repl_origin(LogicalDecodingContext *ctx,
88 : RepOriginId origin_id, XLogRecPtr origin_lsn,
89 : bool send_origin);
90 :
91 : /*
92 : * Only 3 publication actions are used for row filtering ("insert", "update",
93 : * "delete"). See RelationSyncEntry.exprstate[].
94 : */
95 : enum RowFilterPubAction
96 : {
97 : PUBACTION_INSERT,
98 : PUBACTION_UPDATE,
99 : PUBACTION_DELETE,
100 : };
101 :
102 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
103 :
104 : /*
105 : * Entry in the map used to remember which relation schemas we sent.
106 : *
107 : * The schema_sent flag determines if the current schema record for the
108 : * relation (and for its ancestor if publish_as_relid is set) was already
109 : * sent to the subscriber (in which case we don't need to send it again).
110 : *
111 : * The schema cache on downstream is however updated only at commit time,
112 : * and with streamed transactions the commit order may be different from
113 : * the order the transactions are sent in. Also, the (sub) transactions
114 : * might get aborted so we need to send the schema for each (sub) transaction
115 : * so that we don't lose the schema information on abort. For handling this,
116 : * we maintain the list of xids (streamed_txns) for those we have already sent
117 : * the schema.
118 : *
119 : * For partitions, 'pubactions' considers not only the table's own
120 : * publications, but also those of all of its ancestors.
121 : */
122 : typedef struct RelationSyncEntry
123 : {
124 : Oid relid; /* relation oid */
125 :
126 : bool replicate_valid; /* overall validity flag for entry */
127 :
128 : bool schema_sent;
129 :
130 : /*
131 : * This is set if the 'publish_generated_columns' parameter is true, and
132 : * the relation contains generated columns.
133 : */
134 : bool include_gencols;
135 : List *streamed_txns; /* streamed toplevel transactions with this
136 : * schema */
137 :
138 : /* are we publishing this rel? */
139 : PublicationActions pubactions;
140 :
141 : /*
142 : * ExprState array for row filter. Different publication actions don't
143 : * allow multiple expressions to always be combined into one, because
144 : * updates or deletes restrict the column in expression to be part of the
145 : * replica identity index whereas inserts do not have this restriction, so
146 : * there is one ExprState per publication action.
147 : */
148 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
149 : EState *estate; /* executor state used for row filter */
150 : TupleTableSlot *new_slot; /* slot for storing new tuple */
151 : TupleTableSlot *old_slot; /* slot for storing old tuple */
152 :
153 : /*
154 : * OID of the relation to publish changes as. For a partition, this may
155 : * be set to one of its ancestors whose schema will be used when
156 : * replicating changes, if publish_via_partition_root is set for the
157 : * publication.
158 : */
159 : Oid publish_as_relid;
160 :
161 : /*
162 : * Map used when replicating using an ancestor's schema to convert tuples
163 : * from partition's type to the ancestor's; NULL if publish_as_relid is
164 : * same as 'relid' or if unnecessary due to partition and the ancestor
165 : * having identical TupleDesc.
166 : */
167 : AttrMap *attrmap;
168 :
169 : /*
170 : * Columns included in the publication, or NULL if all columns are
171 : * included implicitly. Note that the attnums in this bitmap are not
172 : * shifted by FirstLowInvalidHeapAttributeNumber.
173 : */
174 : Bitmapset *columns;
175 :
176 : /*
177 : * Private context to store additional data for this entry - state for the
178 : * row filter expressions, column list, etc.
179 : */
180 : MemoryContext entry_cxt;
181 : } RelationSyncEntry;
182 :
183 : /*
184 : * Maintain a per-transaction level variable to track whether the transaction
185 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
186 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
187 : * messages for empty transactions which saves network bandwidth.
188 : *
189 : * This optimization is not used for prepared transactions because if the
190 : * WALSender restarts after prepare of a transaction and before commit prepared
191 : * of the same transaction then we won't be able to figure out if we have
192 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
193 : * because we would have lost the in-memory txndata information that was
194 : * present prior to the restart. This will result in sending a spurious
195 : * COMMIT PREPARED without a corresponding prepared transaction at the
196 : * downstream which would lead to an error when it tries to process it.
197 : *
198 : * XXX We could achieve this optimization by changing protocol to send
199 : * additional information so that downstream can detect that the corresponding
200 : * prepare has not been sent. However, adding such a check for every
201 : * transaction in the downstream could be costly so we might want to do it
202 : * optionally.
203 : *
204 : * We also don't have this optimization for streamed transactions because
205 : * they can contain prepared transactions.
206 : */
207 : typedef struct PGOutputTxnData
208 : {
209 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
210 : } PGOutputTxnData;
211 :
212 : /* Map used to remember which relation schemas we sent. */
213 : static HTAB *RelationSyncCache = NULL;
214 :
215 : static void init_rel_sync_cache(MemoryContext cachectx);
216 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
217 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
218 : Relation relation);
219 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
220 : LogicalDecodingContext *ctx,
221 : RelationSyncEntry *relentry);
222 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
223 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
224 : uint32 hashvalue);
225 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
226 : TransactionId xid);
227 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
228 : TransactionId xid);
229 : static void init_tuple_slot(PGOutputData *data, Relation relation,
230 : RelationSyncEntry *entry);
231 :
232 : /* row filter routines */
233 : static EState *create_estate_for_relation(Relation rel);
234 : static void pgoutput_row_filter_init(PGOutputData *data,
235 : List *publications,
236 : RelationSyncEntry *entry);
237 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
238 : ExprContext *econtext);
239 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
240 : TupleTableSlot **new_slot_ptr,
241 : RelationSyncEntry *entry,
242 : ReorderBufferChangeType *action);
243 :
244 : /* column list routines */
245 : static void pgoutput_column_list_init(PGOutputData *data,
246 : List *publications,
247 : RelationSyncEntry *entry);
248 :
249 : /*
250 : * Specify output plugin callbacks
251 : */
252 : void
253 1304 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
254 : {
255 1304 : cb->startup_cb = pgoutput_startup;
256 1304 : cb->begin_cb = pgoutput_begin_txn;
257 1304 : cb->change_cb = pgoutput_change;
258 1304 : cb->truncate_cb = pgoutput_truncate;
259 1304 : cb->message_cb = pgoutput_message;
260 1304 : cb->commit_cb = pgoutput_commit_txn;
261 :
262 1304 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
263 1304 : cb->prepare_cb = pgoutput_prepare_txn;
264 1304 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
265 1304 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
266 1304 : cb->filter_by_origin_cb = pgoutput_origin_filter;
267 1304 : cb->shutdown_cb = pgoutput_shutdown;
268 :
269 : /* transaction streaming */
270 1304 : cb->stream_start_cb = pgoutput_stream_start;
271 1304 : cb->stream_stop_cb = pgoutput_stream_stop;
272 1304 : cb->stream_abort_cb = pgoutput_stream_abort;
273 1304 : cb->stream_commit_cb = pgoutput_stream_commit;
274 1304 : cb->stream_change_cb = pgoutput_change;
275 1304 : cb->stream_message_cb = pgoutput_message;
276 1304 : cb->stream_truncate_cb = pgoutput_truncate;
277 : /* transaction streaming - two-phase commit */
278 1304 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
279 1304 : }
280 :
281 : static void
282 702 : parse_output_parameters(List *options, PGOutputData *data)
283 : {
284 : ListCell *lc;
285 702 : bool protocol_version_given = false;
286 702 : bool publication_names_given = false;
287 702 : bool binary_option_given = false;
288 702 : bool messages_option_given = false;
289 702 : bool streaming_given = false;
290 702 : bool two_phase_option_given = false;
291 702 : bool origin_option_given = false;
292 :
293 702 : data->binary = false;
294 702 : data->streaming = LOGICALREP_STREAM_OFF;
295 702 : data->messages = false;
296 702 : data->two_phase = false;
297 :
298 3508 : foreach(lc, options)
299 : {
300 2806 : DefElem *defel = (DefElem *) lfirst(lc);
301 :
302 : Assert(defel->arg == NULL || IsA(defel->arg, String));
303 :
304 : /* Check each param, whether or not we recognize it */
305 2806 : if (strcmp(defel->defname, "proto_version") == 0)
306 : {
307 : unsigned long parsed;
308 : char *endptr;
309 :
310 702 : if (protocol_version_given)
311 0 : ereport(ERROR,
312 : (errcode(ERRCODE_SYNTAX_ERROR),
313 : errmsg("conflicting or redundant options")));
314 702 : protocol_version_given = true;
315 :
316 702 : errno = 0;
317 702 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
318 702 : if (errno != 0 || *endptr != '\0')
319 0 : ereport(ERROR,
320 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
321 : errmsg("invalid proto_version")));
322 :
323 702 : if (parsed > PG_UINT32_MAX)
324 0 : ereport(ERROR,
325 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
326 : errmsg("proto_version \"%s\" out of range",
327 : strVal(defel->arg))));
328 :
329 702 : data->protocol_version = (uint32) parsed;
330 : }
331 2104 : else if (strcmp(defel->defname, "publication_names") == 0)
332 : {
333 702 : if (publication_names_given)
334 0 : ereport(ERROR,
335 : (errcode(ERRCODE_SYNTAX_ERROR),
336 : errmsg("conflicting or redundant options")));
337 702 : publication_names_given = true;
338 :
339 702 : if (!SplitIdentifierString(strVal(defel->arg), ',',
340 : &data->publication_names))
341 0 : ereport(ERROR,
342 : (errcode(ERRCODE_INVALID_NAME),
343 : errmsg("invalid publication_names syntax")));
344 : }
345 1402 : else if (strcmp(defel->defname, "binary") == 0)
346 : {
347 22 : if (binary_option_given)
348 0 : ereport(ERROR,
349 : (errcode(ERRCODE_SYNTAX_ERROR),
350 : errmsg("conflicting or redundant options")));
351 22 : binary_option_given = true;
352 :
353 22 : data->binary = defGetBoolean(defel);
354 : }
355 1380 : else if (strcmp(defel->defname, "messages") == 0)
356 : {
357 8 : if (messages_option_given)
358 0 : ereport(ERROR,
359 : (errcode(ERRCODE_SYNTAX_ERROR),
360 : errmsg("conflicting or redundant options")));
361 8 : messages_option_given = true;
362 :
363 8 : data->messages = defGetBoolean(defel);
364 : }
365 1372 : else if (strcmp(defel->defname, "streaming") == 0)
366 : {
367 670 : if (streaming_given)
368 0 : ereport(ERROR,
369 : (errcode(ERRCODE_SYNTAX_ERROR),
370 : errmsg("conflicting or redundant options")));
371 670 : streaming_given = true;
372 :
373 670 : data->streaming = defGetStreamingMode(defel);
374 : }
375 702 : else if (strcmp(defel->defname, "two_phase") == 0)
376 : {
377 14 : if (two_phase_option_given)
378 0 : ereport(ERROR,
379 : (errcode(ERRCODE_SYNTAX_ERROR),
380 : errmsg("conflicting or redundant options")));
381 14 : two_phase_option_given = true;
382 :
383 14 : data->two_phase = defGetBoolean(defel);
384 : }
385 688 : else if (strcmp(defel->defname, "origin") == 0)
386 : {
387 : char *origin;
388 :
389 688 : if (origin_option_given)
390 0 : ereport(ERROR,
391 : errcode(ERRCODE_SYNTAX_ERROR),
392 : errmsg("conflicting or redundant options"));
393 688 : origin_option_given = true;
394 :
395 688 : origin = defGetString(defel);
396 688 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
397 26 : data->publish_no_origin = true;
398 662 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
399 662 : data->publish_no_origin = false;
400 : else
401 0 : ereport(ERROR,
402 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
403 : errmsg("unrecognized origin value: \"%s\"", origin));
404 : }
405 : else
406 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
407 : }
408 :
409 : /* Check required options */
410 702 : if (!protocol_version_given)
411 0 : ereport(ERROR,
412 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
413 : errmsg("option \"%s\" missing", "proto_version"));
414 702 : if (!publication_names_given)
415 0 : ereport(ERROR,
416 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 : errmsg("option \"%s\" missing", "publication_names"));
418 702 : }
419 :
420 : /*
421 : * Initialize this plugin
422 : */
423 : static void
424 1304 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
425 : bool is_init)
426 : {
427 1304 : PGOutputData *data = palloc0(sizeof(PGOutputData));
428 : static bool publication_callback_registered = false;
429 :
430 : /* Create our memory context for private allocations. */
431 1304 : data->context = AllocSetContextCreate(ctx->context,
432 : "logical replication output context",
433 : ALLOCSET_DEFAULT_SIZES);
434 :
435 1304 : data->cachectx = AllocSetContextCreate(ctx->context,
436 : "logical replication cache context",
437 : ALLOCSET_DEFAULT_SIZES);
438 :
439 1304 : data->pubctx = AllocSetContextCreate(ctx->context,
440 : "logical replication publication list context",
441 : ALLOCSET_SMALL_SIZES);
442 :
443 1304 : ctx->output_plugin_private = data;
444 :
445 : /* This plugin uses binary protocol. */
446 1304 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
447 :
448 : /*
449 : * This is replication start and not slot initialization.
450 : *
451 : * Parse and validate options passed by the client.
452 : */
453 1304 : if (!is_init)
454 : {
455 : /* Parse the params and ERROR if we see any we don't recognize */
456 702 : parse_output_parameters(ctx->output_plugin_options, data);
457 :
458 : /* Check if we support requested protocol */
459 702 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
460 0 : ereport(ERROR,
461 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
462 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
463 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
464 :
465 702 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
466 0 : ereport(ERROR,
467 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
468 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
469 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
470 :
471 : /*
472 : * Decide whether to enable streaming. It is disabled by default, in
473 : * which case we just update the flag in decoding context. Otherwise
474 : * we only allow it with sufficient version of the protocol, and when
475 : * the output plugin supports it.
476 : */
477 702 : if (data->streaming == LOGICALREP_STREAM_OFF)
478 32 : ctx->streaming = false;
479 670 : else if (data->streaming == LOGICALREP_STREAM_ON &&
480 54 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
481 0 : ereport(ERROR,
482 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
483 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
484 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
485 670 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
486 616 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
487 0 : ereport(ERROR,
488 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
489 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
490 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
491 670 : else if (!ctx->streaming)
492 0 : ereport(ERROR,
493 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
494 : errmsg("streaming requested, but not supported by output plugin")));
495 :
496 : /*
497 : * Here, we just check whether the two-phase option is passed by
498 : * plugin and decide whether to enable it at later point of time. It
499 : * remains enabled if the previous start-up has done so. But we only
500 : * allow the option to be passed in with sufficient version of the
501 : * protocol, and when the output plugin supports it.
502 : */
503 702 : if (!data->two_phase)
504 688 : ctx->twophase_opt_given = false;
505 14 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
506 0 : ereport(ERROR,
507 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
508 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
509 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
510 14 : else if (!ctx->twophase)
511 0 : ereport(ERROR,
512 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
513 : errmsg("two-phase commit requested, but not supported by output plugin")));
514 : else
515 14 : ctx->twophase_opt_given = true;
516 :
517 : /* Init publication state. */
518 702 : data->publications = NIL;
519 702 : publications_valid = false;
520 :
521 : /*
522 : * Register callback for pg_publication if we didn't already do that
523 : * during some previous call in this process.
524 : */
525 702 : if (!publication_callback_registered)
526 : {
527 700 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
528 : publication_invalidation_cb,
529 : (Datum) 0);
530 700 : publication_callback_registered = true;
531 : }
532 :
533 : /* Initialize relation schema cache. */
534 702 : init_rel_sync_cache(CacheMemoryContext);
535 : }
536 : else
537 : {
538 : /*
539 : * Disable the streaming and prepared transactions during the slot
540 : * initialization mode.
541 : */
542 602 : ctx->streaming = false;
543 602 : ctx->twophase = false;
544 : }
545 1304 : }
546 :
547 : /*
548 : * BEGIN callback.
549 : *
550 : * Don't send the BEGIN message here instead postpone it until the first
551 : * change. In logical replication, a common scenario is to replicate a set of
552 : * tables (instead of all tables) and transactions whose changes were on
553 : * the table(s) that are not published will produce empty transactions. These
554 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
555 : * using bandwidth on something with little/no use for logical replication.
556 : */
557 : static void
558 1430 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
559 : {
560 1430 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
561 : sizeof(PGOutputTxnData));
562 :
563 1430 : txn->output_plugin_private = txndata;
564 1430 : }
565 :
566 : /*
567 : * Send BEGIN.
568 : *
569 : * This is called while processing the first change of the transaction.
570 : */
571 : static void
572 820 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
573 : {
574 820 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
575 820 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
576 :
577 : Assert(txndata);
578 : Assert(!txndata->sent_begin_txn);
579 :
580 820 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
581 820 : logicalrep_write_begin(ctx->out, txn);
582 820 : txndata->sent_begin_txn = true;
583 :
584 820 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
585 : send_replication_origin);
586 :
587 820 : OutputPluginWrite(ctx, true);
588 818 : }
589 :
590 : /*
591 : * COMMIT callback
592 : */
593 : static void
594 1422 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
595 : XLogRecPtr commit_lsn)
596 : {
597 1422 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
598 : bool sent_begin_txn;
599 :
600 : Assert(txndata);
601 :
602 : /*
603 : * We don't need to send the commit message unless some relevant change
604 : * from this transaction has been sent to the downstream.
605 : */
606 1422 : sent_begin_txn = txndata->sent_begin_txn;
607 1422 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
608 1422 : pfree(txndata);
609 1422 : txn->output_plugin_private = NULL;
610 :
611 1422 : if (!sent_begin_txn)
612 : {
613 604 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
614 604 : return;
615 : }
616 :
617 818 : OutputPluginPrepareWrite(ctx, true);
618 818 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
619 818 : OutputPluginWrite(ctx, true);
620 : }
621 :
622 : /*
623 : * BEGIN PREPARE callback
624 : */
625 : static void
626 40 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
627 : {
628 40 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
629 :
630 40 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
631 40 : logicalrep_write_begin_prepare(ctx->out, txn);
632 :
633 40 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
634 : send_replication_origin);
635 :
636 40 : OutputPluginWrite(ctx, true);
637 40 : }
638 :
639 : /*
640 : * PREPARE callback
641 : */
642 : static void
643 40 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
644 : XLogRecPtr prepare_lsn)
645 : {
646 40 : OutputPluginUpdateProgress(ctx, false);
647 :
648 40 : OutputPluginPrepareWrite(ctx, true);
649 40 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
650 40 : OutputPluginWrite(ctx, true);
651 40 : }
652 :
653 : /*
654 : * COMMIT PREPARED callback
655 : */
656 : static void
657 48 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
658 : XLogRecPtr commit_lsn)
659 : {
660 48 : OutputPluginUpdateProgress(ctx, false);
661 :
662 48 : OutputPluginPrepareWrite(ctx, true);
663 48 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
664 48 : OutputPluginWrite(ctx, true);
665 48 : }
666 :
667 : /*
668 : * ROLLBACK PREPARED callback
669 : */
670 : static void
671 18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
672 : ReorderBufferTXN *txn,
673 : XLogRecPtr prepare_end_lsn,
674 : TimestampTz prepare_time)
675 : {
676 18 : OutputPluginUpdateProgress(ctx, false);
677 :
678 18 : OutputPluginPrepareWrite(ctx, true);
679 18 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
680 : prepare_time);
681 18 : OutputPluginWrite(ctx, true);
682 18 : }
683 :
684 : /*
685 : * Write the current schema of the relation and its ancestor (if any) if not
686 : * done yet.
687 : */
688 : static void
689 364450 : maybe_send_schema(LogicalDecodingContext *ctx,
690 : ReorderBufferChange *change,
691 : Relation relation, RelationSyncEntry *relentry)
692 : {
693 364450 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
694 : bool schema_sent;
695 364450 : TransactionId xid = InvalidTransactionId;
696 364450 : TransactionId topxid = InvalidTransactionId;
697 :
698 : /*
699 : * Remember XID of the (sub)transaction for the change. We don't care if
700 : * it's top-level transaction or not (we have already sent that XID in
701 : * start of the current streaming block).
702 : *
703 : * If we're not in a streaming block, just use InvalidTransactionId and
704 : * the write methods will not include it.
705 : */
706 364450 : if (data->in_streaming)
707 351882 : xid = change->txn->xid;
708 :
709 364450 : if (rbtxn_is_subtxn(change->txn))
710 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
711 : else
712 344112 : topxid = xid;
713 :
714 : /*
715 : * Do we need to send the schema? We do track streamed transactions
716 : * separately, because those may be applied later (and the regular
717 : * transactions won't see their effects until then) and in an order that
718 : * we don't know at this point.
719 : *
720 : * XXX There is a scope of optimization here. Currently, we always send
721 : * the schema first time in a streaming transaction but we can probably
722 : * avoid that by checking 'relentry->schema_sent' flag. However, before
723 : * doing that we need to study its impact on the case where we have a mix
724 : * of streaming and non-streaming transactions.
725 : */
726 364450 : if (data->in_streaming)
727 351882 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
728 : else
729 12568 : schema_sent = relentry->schema_sent;
730 :
731 : /* Nothing to do if we already sent the schema. */
732 364450 : if (schema_sent)
733 363836 : return;
734 :
735 : /*
736 : * Send the schema. If the changes will be published using an ancestor's
737 : * schema, not the relation's own, send that ancestor's schema before
738 : * sending relation's own (XXX - maybe sending only the former suffices?).
739 : */
740 614 : if (relentry->publish_as_relid != RelationGetRelid(relation))
741 : {
742 62 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
743 :
744 62 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
745 62 : RelationClose(ancestor);
746 : }
747 :
748 614 : send_relation_and_attrs(relation, xid, ctx, relentry);
749 :
750 614 : if (data->in_streaming)
751 142 : set_schema_sent_in_streamed_txn(relentry, topxid);
752 : else
753 472 : relentry->schema_sent = true;
754 : }
755 :
756 : /*
757 : * Sends a relation
758 : */
759 : static void
760 676 : send_relation_and_attrs(Relation relation, TransactionId xid,
761 : LogicalDecodingContext *ctx,
762 : RelationSyncEntry *relentry)
763 : {
764 676 : TupleDesc desc = RelationGetDescr(relation);
765 676 : Bitmapset *columns = relentry->columns;
766 676 : bool include_gencols = relentry->include_gencols;
767 : int i;
768 :
769 : /*
770 : * Write out type info if needed. We do that only for user-created types.
771 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
772 : * objects with hand-assigned OIDs to be "built in", not for instance any
773 : * function or type defined in the information_schema. This is important
774 : * because only hand-assigned OIDs can be expected to remain stable across
775 : * major versions.
776 : */
777 2060 : for (i = 0; i < desc->natts; i++)
778 : {
779 1384 : Form_pg_attribute att = TupleDescAttr(desc, i);
780 :
781 1384 : if (!logicalrep_should_publish_column(att, columns, include_gencols))
782 136 : continue;
783 :
784 1248 : if (att->atttypid < FirstGenbkiObjectId)
785 1212 : continue;
786 :
787 36 : OutputPluginPrepareWrite(ctx, false);
788 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
789 36 : OutputPluginWrite(ctx, false);
790 : }
791 :
792 676 : OutputPluginPrepareWrite(ctx, false);
793 676 : logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols);
794 676 : OutputPluginWrite(ctx, false);
795 676 : }
796 :
797 : /*
798 : * Executor state preparation for evaluation of row filter expressions for the
799 : * specified relation.
800 : */
801 : static EState *
802 32 : create_estate_for_relation(Relation rel)
803 : {
804 : EState *estate;
805 : RangeTblEntry *rte;
806 32 : List *perminfos = NIL;
807 :
808 32 : estate = CreateExecutorState();
809 :
810 32 : rte = makeNode(RangeTblEntry);
811 32 : rte->rtekind = RTE_RELATION;
812 32 : rte->relid = RelationGetRelid(rel);
813 32 : rte->relkind = rel->rd_rel->relkind;
814 32 : rte->rellockmode = AccessShareLock;
815 :
816 32 : addRTEPermissionInfo(&perminfos, rte);
817 :
818 32 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
819 :
820 32 : estate->es_output_cid = GetCurrentCommandId(false);
821 :
822 32 : return estate;
823 : }
824 :
825 : /*
826 : * Evaluates row filter.
827 : *
828 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
829 : * isn't replicated.
830 : */
831 : static bool
832 72 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
833 : {
834 : Datum ret;
835 : bool isnull;
836 :
837 : Assert(state != NULL);
838 :
839 72 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
840 :
841 72 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
842 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
843 : isnull ? "true" : "false");
844 :
845 72 : if (isnull)
846 2 : return false;
847 :
848 70 : return DatumGetBool(ret);
849 : }
850 :
851 : /*
852 : * Make sure the per-entry memory context exists.
853 : */
854 : static void
855 572 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
856 : {
857 : Relation relation;
858 :
859 : /* The context may already exist, in which case bail out. */
860 572 : if (entry->entry_cxt)
861 32 : return;
862 :
863 540 : relation = RelationIdGetRelation(entry->publish_as_relid);
864 :
865 540 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
866 : "entry private context",
867 : ALLOCSET_SMALL_SIZES);
868 :
869 540 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
870 : RelationGetRelationName(relation));
871 : }
872 :
873 : /*
874 : * Initialize the row filter.
875 : */
876 : static void
877 540 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
878 : RelationSyncEntry *entry)
879 : {
880 : ListCell *lc;
881 540 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
882 540 : bool no_filter[] = {false, false, false}; /* One per pubaction */
883 : MemoryContext oldctx;
884 : int idx;
885 540 : bool has_filter = true;
886 540 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
887 :
888 : /*
889 : * Find if there are any row filters for this relation. If there are, then
890 : * prepare the necessary ExprState and cache it in entry->exprstate. To
891 : * build an expression state, we need to ensure the following:
892 : *
893 : * All the given publication-table mappings must be checked.
894 : *
895 : * Multiple publications might have multiple row filters for this
896 : * relation. Since row filter usage depends on the DML operation, there
897 : * are multiple lists (one for each operation) to which row filters will
898 : * be appended.
899 : *
900 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
901 : * expression" so it takes precedence.
902 : */
903 580 : foreach(lc, publications)
904 : {
905 548 : Publication *pub = lfirst(lc);
906 548 : HeapTuple rftuple = NULL;
907 548 : Datum rfdatum = 0;
908 548 : bool pub_no_filter = true;
909 :
910 : /*
911 : * If the publication is FOR ALL TABLES, or the publication includes a
912 : * FOR TABLES IN SCHEMA where the table belongs to the referred
913 : * schema, then it is treated the same as if there are no row filters
914 : * (even if other publications have a row filter).
915 : */
916 548 : if (!pub->alltables &&
917 388 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
918 : ObjectIdGetDatum(schemaid),
919 : ObjectIdGetDatum(pub->oid)))
920 : {
921 : /*
922 : * Check for the presence of a row filter in this publication.
923 : */
924 376 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
925 : ObjectIdGetDatum(entry->publish_as_relid),
926 : ObjectIdGetDatum(pub->oid));
927 :
928 376 : if (HeapTupleIsValid(rftuple))
929 : {
930 : /* Null indicates no filter. */
931 352 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
932 : Anum_pg_publication_rel_prqual,
933 : &pub_no_filter);
934 : }
935 : }
936 :
937 548 : if (pub_no_filter)
938 : {
939 522 : if (rftuple)
940 326 : ReleaseSysCache(rftuple);
941 :
942 522 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
943 522 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
944 522 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
945 :
946 : /*
947 : * Quick exit if all the DML actions are publicized via this
948 : * publication.
949 : */
950 522 : if (no_filter[PUBACTION_INSERT] &&
951 522 : no_filter[PUBACTION_UPDATE] &&
952 508 : no_filter[PUBACTION_DELETE])
953 : {
954 508 : has_filter = false;
955 508 : break;
956 : }
957 :
958 : /* No additional work for this publication. Next one. */
959 14 : continue;
960 : }
961 :
962 : /* Form the per pubaction row filter lists. */
963 26 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
964 26 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
965 26 : TextDatumGetCString(rfdatum));
966 26 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
967 26 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
968 26 : TextDatumGetCString(rfdatum));
969 26 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
970 26 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
971 26 : TextDatumGetCString(rfdatum));
972 :
973 26 : ReleaseSysCache(rftuple);
974 : } /* loop all subscribed publications */
975 :
976 : /* Clean the row filter */
977 2160 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
978 : {
979 1620 : if (no_filter[idx])
980 : {
981 1542 : list_free_deep(rfnodes[idx]);
982 1542 : rfnodes[idx] = NIL;
983 : }
984 : }
985 :
986 540 : if (has_filter)
987 : {
988 32 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
989 :
990 32 : pgoutput_ensure_entry_cxt(data, entry);
991 :
992 : /*
993 : * Now all the filters for all pubactions are known. Combine them when
994 : * their pubactions are the same.
995 : */
996 32 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
997 32 : entry->estate = create_estate_for_relation(relation);
998 128 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
999 : {
1000 96 : List *filters = NIL;
1001 : Expr *rfnode;
1002 :
1003 96 : if (rfnodes[idx] == NIL)
1004 42 : continue;
1005 :
1006 114 : foreach(lc, rfnodes[idx])
1007 60 : filters = lappend(filters, stringToNode((char *) lfirst(lc)));
1008 :
1009 : /* combine the row filter and cache the ExprState */
1010 54 : rfnode = make_orclause(filters);
1011 54 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1012 : } /* for each pubaction */
1013 32 : MemoryContextSwitchTo(oldctx);
1014 :
1015 32 : RelationClose(relation);
1016 : }
1017 540 : }
1018 :
1019 : /*
1020 : * If the table contains a generated column, check for any conflicting
1021 : * values of 'publish_generated_columns' parameter in the publications.
1022 : */
1023 : static void
1024 540 : check_and_init_gencol(PGOutputData *data, List *publications,
1025 : RelationSyncEntry *entry)
1026 : {
1027 540 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1028 540 : TupleDesc desc = RelationGetDescr(relation);
1029 540 : bool gencolpresent = false;
1030 540 : bool first = true;
1031 :
1032 : /* Check if there is any generated column present. */
1033 1622 : for (int i = 0; i < desc->natts; i++)
1034 : {
1035 1094 : Form_pg_attribute att = TupleDescAttr(desc, i);
1036 :
1037 1094 : if (att->attgenerated)
1038 : {
1039 12 : gencolpresent = true;
1040 12 : break;
1041 : }
1042 : }
1043 :
1044 : /* There are no generated columns to be published. */
1045 540 : if (!gencolpresent)
1046 : {
1047 528 : entry->include_gencols = false;
1048 528 : return;
1049 : }
1050 :
1051 : /*
1052 : * There may be a conflicting value for 'publish_generated_columns'
1053 : * parameter in the publications.
1054 : */
1055 38 : foreach_ptr(Publication, pub, publications)
1056 : {
1057 : /*
1058 : * The column list takes precedence over the
1059 : * 'publish_generated_columns' parameter. Those will be checked later,
1060 : * see pgoutput_column_list_init.
1061 : */
1062 14 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1063 6 : continue;
1064 :
1065 8 : if (first)
1066 : {
1067 8 : entry->include_gencols = pub->pubgencols;
1068 8 : first = false;
1069 : }
1070 0 : else if (entry->include_gencols != pub->pubgencols)
1071 0 : ereport(ERROR,
1072 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1073 : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1074 : get_namespace_name(RelationGetNamespace(relation)),
1075 : RelationGetRelationName(relation)));
1076 : }
1077 : }
1078 :
1079 : /*
1080 : * Initialize the column list.
1081 : */
1082 : static void
1083 540 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1084 : RelationSyncEntry *entry)
1085 : {
1086 : ListCell *lc;
1087 540 : bool first = true;
1088 540 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1089 540 : bool found_pub_collist = false;
1090 540 : Bitmapset *relcols = NULL;
1091 :
1092 540 : pgoutput_ensure_entry_cxt(data, entry);
1093 :
1094 : /*
1095 : * Find if there are any column lists for this relation. If there are,
1096 : * build a bitmap using the column lists.
1097 : *
1098 : * Multiple publications might have multiple column lists for this
1099 : * relation.
1100 : *
1101 : * Note that we don't support the case where the column list is different
1102 : * for the same table when combining publications. See comments atop
1103 : * fetch_table_list. But one can later change the publication so we still
1104 : * need to check all the given publication-table mappings and report an
1105 : * error if any publications have a different column list.
1106 : */
1107 1100 : foreach(lc, publications)
1108 : {
1109 562 : Publication *pub = lfirst(lc);
1110 562 : Bitmapset *cols = NULL;
1111 :
1112 : /* Retrieve the bitmap of columns for a column list publication. */
1113 562 : found_pub_collist |= check_and_fetch_column_list(pub,
1114 : entry->publish_as_relid,
1115 : entry->entry_cxt, &cols);
1116 :
1117 : /*
1118 : * For non-column list publications — e.g. TABLE (without a column
1119 : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1120 : * of the table (including generated columns when
1121 : * 'publish_generated_columns' parameter is true).
1122 : */
1123 562 : if (!cols)
1124 : {
1125 : /*
1126 : * Cache the table columns for the first publication with no
1127 : * specified column list to detect publication with a different
1128 : * column list.
1129 : */
1130 484 : if (!relcols && (list_length(publications) > 1))
1131 : {
1132 18 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1133 :
1134 18 : relcols = pub_form_cols_map(relation, entry->include_gencols);
1135 18 : MemoryContextSwitchTo(oldcxt);
1136 : }
1137 :
1138 484 : cols = relcols;
1139 : }
1140 :
1141 562 : if (first)
1142 : {
1143 540 : entry->columns = cols;
1144 540 : first = false;
1145 : }
1146 22 : else if (!bms_equal(entry->columns, cols))
1147 2 : ereport(ERROR,
1148 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1149 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1150 : get_namespace_name(RelationGetNamespace(relation)),
1151 : RelationGetRelationName(relation)));
1152 : } /* loop all subscribed publications */
1153 :
1154 : /*
1155 : * If no column list publications exist, columns to be published will be
1156 : * computed later according to the 'publish_generated_columns' parameter.
1157 : */
1158 538 : if (!found_pub_collist)
1159 466 : entry->columns = NULL;
1160 :
1161 538 : RelationClose(relation);
1162 538 : }
1163 :
1164 : /*
1165 : * Initialize the slot for storing new and old tuples, and build the map that
1166 : * will be used to convert the relation's tuples into the ancestor's format.
1167 : */
1168 : static void
1169 540 : init_tuple_slot(PGOutputData *data, Relation relation,
1170 : RelationSyncEntry *entry)
1171 : {
1172 : MemoryContext oldctx;
1173 : TupleDesc oldtupdesc;
1174 : TupleDesc newtupdesc;
1175 :
1176 540 : oldctx = MemoryContextSwitchTo(data->cachectx);
1177 :
1178 : /*
1179 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1180 : * live as long as the cache remains.
1181 : */
1182 540 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1183 540 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1184 :
1185 540 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1186 540 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1187 :
1188 540 : MemoryContextSwitchTo(oldctx);
1189 :
1190 : /*
1191 : * Cache the map that will be used to convert the relation's tuples into
1192 : * the ancestor's format, if needed.
1193 : */
1194 540 : if (entry->publish_as_relid != RelationGetRelid(relation))
1195 : {
1196 72 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1197 72 : TupleDesc indesc = RelationGetDescr(relation);
1198 72 : TupleDesc outdesc = RelationGetDescr(ancestor);
1199 :
1200 : /* Map must live as long as the logical decoding context. */
1201 72 : oldctx = MemoryContextSwitchTo(data->cachectx);
1202 :
1203 72 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1204 :
1205 72 : MemoryContextSwitchTo(oldctx);
1206 72 : RelationClose(ancestor);
1207 : }
1208 540 : }
1209 :
1210 : /*
1211 : * Change is checked against the row filter if any.
1212 : *
1213 : * Returns true if the change is to be replicated, else false.
1214 : *
1215 : * For inserts, evaluate the row filter for new tuple.
1216 : * For deletes, evaluate the row filter for old tuple.
1217 : * For updates, evaluate the row filter for old and new tuple.
1218 : *
1219 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1220 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1221 : * only one of the tuples matches the row filter expression, we transform
1222 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1223 : * following rules:
1224 : *
1225 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1226 : * Case 2: old-row (no match) new row (match) -> INSERT
1227 : * Case 3: old-row (match) new-row (no match) -> DELETE
1228 : * Case 4: old-row (match) new row (match) -> UPDATE
1229 : *
1230 : * The new action is updated in the action parameter.
1231 : *
1232 : * The new slot could be updated when transforming the UPDATE into INSERT,
1233 : * because the original new tuple might not have column values from the replica
1234 : * identity.
1235 : *
1236 : * Examples:
1237 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1238 : * Since the old tuple satisfies, the initial table synchronization copied this
1239 : * row (or another method was used to guarantee that there is data
1240 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1241 : * row filter, so from a data consistency perspective, that row should be
1242 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1243 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1244 : * is undesirable because it doesn't reflect what was defined in the row filter
1245 : * expression on the publisher. This row on the subscriber would likely not be
1246 : * modified by replication again. If someone inserted a new row with the same
1247 : * old identifier, replication could stop due to a constraint violation.
1248 : *
1249 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1250 : * Since the old tuple doesn't satisfy, the initial table synchronization
1251 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1252 : * satisfy the row filter, so from a data consistency perspective, that row
1253 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1254 : * statements have no effect (it matches no row -- see
1255 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1256 : * INSERT statement and be sent to the subscriber. However, this might surprise
1257 : * someone who expects the data set to satisfy the row filter expression on the
1258 : * provider.
1259 : */
1260 : static bool
1261 364442 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1262 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1263 : ReorderBufferChangeType *action)
1264 : {
1265 : TupleDesc desc;
1266 : int i;
1267 : bool old_matched,
1268 : new_matched,
1269 : result;
1270 : TupleTableSlot *tmp_new_slot;
1271 364442 : TupleTableSlot *new_slot = *new_slot_ptr;
1272 : ExprContext *ecxt;
1273 : ExprState *filter_exprstate;
1274 :
1275 : /*
1276 : * We need this map to avoid relying on ReorderBufferChangeType enums
1277 : * having specific values.
1278 : */
1279 : static const int map_changetype_pubaction[] = {
1280 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1281 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1282 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1283 : };
1284 :
1285 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1286 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1287 : *action == REORDER_BUFFER_CHANGE_DELETE);
1288 :
1289 : Assert(new_slot || old_slot);
1290 :
1291 : /* Get the corresponding row filter */
1292 364442 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1293 :
1294 : /* Bail out if there is no row filter */
1295 364442 : if (!filter_exprstate)
1296 364378 : return true;
1297 :
1298 64 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1299 : get_namespace_name(RelationGetNamespace(relation)),
1300 : RelationGetRelationName(relation));
1301 :
1302 64 : ResetPerTupleExprContext(entry->estate);
1303 :
1304 64 : ecxt = GetPerTupleExprContext(entry->estate);
1305 :
1306 : /*
1307 : * For the following occasions where there is only one tuple, we can
1308 : * evaluate the row filter for that tuple and return.
1309 : *
1310 : * For inserts, we only have the new tuple.
1311 : *
1312 : * For updates, we can have only a new tuple when none of the replica
1313 : * identity columns changed and none of those columns have external data
1314 : * but we still need to evaluate the row filter for the new tuple as the
1315 : * existing values of those columns might not match the filter. Also,
1316 : * users can use constant expressions in the row filter, so we anyway need
1317 : * to evaluate it for the new tuple.
1318 : *
1319 : * For deletes, we only have the old tuple.
1320 : */
1321 64 : if (!new_slot || !old_slot)
1322 : {
1323 56 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1324 56 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1325 :
1326 56 : return result;
1327 : }
1328 :
1329 : /*
1330 : * Both the old and new tuples must be valid only for updates and need to
1331 : * be checked against the row filter.
1332 : */
1333 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1334 :
1335 8 : slot_getallattrs(new_slot);
1336 8 : slot_getallattrs(old_slot);
1337 :
1338 8 : tmp_new_slot = NULL;
1339 8 : desc = RelationGetDescr(relation);
1340 :
1341 : /*
1342 : * The new tuple might not have all the replica identity columns, in which
1343 : * case it needs to be copied over from the old tuple.
1344 : */
1345 24 : for (i = 0; i < desc->natts; i++)
1346 : {
1347 16 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1348 :
1349 : /*
1350 : * if the column in the new tuple or old tuple is null, nothing to do
1351 : */
1352 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1353 2 : continue;
1354 :
1355 : /*
1356 : * Unchanged toasted replica identity columns are only logged in the
1357 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1358 : * Logged) toast values are always assembled in memory and set as
1359 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1360 : */
1361 14 : if (att->attlen == -1 &&
1362 8 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1363 2 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1364 : {
1365 2 : if (!tmp_new_slot)
1366 : {
1367 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1368 2 : ExecClearTuple(tmp_new_slot);
1369 :
1370 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1371 2 : desc->natts * sizeof(Datum));
1372 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1373 2 : desc->natts * sizeof(bool));
1374 : }
1375 :
1376 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1377 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1378 : }
1379 : }
1380 :
1381 8 : ecxt->ecxt_scantuple = old_slot;
1382 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1383 :
1384 8 : if (tmp_new_slot)
1385 : {
1386 2 : ExecStoreVirtualTuple(tmp_new_slot);
1387 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1388 : }
1389 : else
1390 6 : ecxt->ecxt_scantuple = new_slot;
1391 :
1392 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1393 :
1394 : /*
1395 : * Case 1: if both tuples don't match the row filter, bailout. Send
1396 : * nothing.
1397 : */
1398 8 : if (!old_matched && !new_matched)
1399 0 : return false;
1400 :
1401 : /*
1402 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1403 : * tuple does, transform the UPDATE into INSERT.
1404 : *
1405 : * Use the newly transformed tuple that must contain the column values for
1406 : * all the replica identity columns. This is required to ensure that the
1407 : * while inserting the tuple in the downstream node, we have all the
1408 : * required column values.
1409 : */
1410 8 : if (!old_matched && new_matched)
1411 : {
1412 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1413 :
1414 4 : if (tmp_new_slot)
1415 2 : *new_slot_ptr = tmp_new_slot;
1416 : }
1417 :
1418 : /*
1419 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1420 : * doesn't, transform the UPDATE into DELETE.
1421 : *
1422 : * This transformation does not require another tuple. The Old tuple will
1423 : * be used for DELETE.
1424 : */
1425 4 : else if (old_matched && !new_matched)
1426 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1427 :
1428 : /*
1429 : * Case 4: if both tuples match the row filter, transformation isn't
1430 : * required. (*action is default UPDATE).
1431 : */
1432 :
1433 8 : return true;
1434 : }
1435 :
1436 : /*
1437 : * Sends the decoded DML over wire.
1438 : *
1439 : * This is called both in streaming and non-streaming modes.
1440 : */
1441 : static void
1442 366724 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1443 : Relation relation, ReorderBufferChange *change)
1444 : {
1445 366724 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1446 366724 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1447 : MemoryContext old;
1448 : RelationSyncEntry *relentry;
1449 366724 : TransactionId xid = InvalidTransactionId;
1450 366724 : Relation ancestor = NULL;
1451 366724 : Relation targetrel = relation;
1452 366724 : ReorderBufferChangeType action = change->action;
1453 366724 : TupleTableSlot *old_slot = NULL;
1454 366724 : TupleTableSlot *new_slot = NULL;
1455 :
1456 366724 : if (!is_publishable_relation(relation))
1457 2276 : return;
1458 :
1459 : /*
1460 : * Remember the xid for the change in streaming mode. We need to send xid
1461 : * with each change in the streaming mode so that subscriber can make
1462 : * their association and on aborts, it can discard the corresponding
1463 : * changes.
1464 : */
1465 366724 : if (data->in_streaming)
1466 351882 : xid = change->txn->xid;
1467 :
1468 366724 : relentry = get_rel_sync_entry(data, relation);
1469 :
1470 : /* First check the table filter */
1471 366718 : switch (action)
1472 : {
1473 211900 : case REORDER_BUFFER_CHANGE_INSERT:
1474 211900 : if (!relentry->pubactions.pubinsert)
1475 106 : return;
1476 211794 : break;
1477 68972 : case REORDER_BUFFER_CHANGE_UPDATE:
1478 68972 : if (!relentry->pubactions.pubupdate)
1479 88 : return;
1480 68884 : break;
1481 85846 : case REORDER_BUFFER_CHANGE_DELETE:
1482 85846 : if (!relentry->pubactions.pubdelete)
1483 2082 : return;
1484 :
1485 : /*
1486 : * This is only possible if deletes are allowed even when replica
1487 : * identity is not defined for a table. Since the DELETE action
1488 : * can't be published, we simply return.
1489 : */
1490 83764 : if (!change->data.tp.oldtuple)
1491 : {
1492 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1493 0 : return;
1494 : }
1495 83764 : break;
1496 364442 : default:
1497 : Assert(false);
1498 : }
1499 :
1500 : /* Avoid leaking memory by using and resetting our own context */
1501 364442 : old = MemoryContextSwitchTo(data->context);
1502 :
1503 : /* Switch relation if publishing via root. */
1504 364442 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1505 : {
1506 : Assert(relation->rd_rel->relispartition);
1507 138 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1508 138 : targetrel = ancestor;
1509 : }
1510 :
1511 364442 : if (change->data.tp.oldtuple)
1512 : {
1513 84024 : old_slot = relentry->old_slot;
1514 84024 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1515 :
1516 : /* Convert tuple if needed. */
1517 84024 : if (relentry->attrmap)
1518 : {
1519 10 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1520 : &TTSOpsVirtual);
1521 :
1522 10 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1523 : }
1524 : }
1525 :
1526 364442 : if (change->data.tp.newtuple)
1527 : {
1528 280678 : new_slot = relentry->new_slot;
1529 280678 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1530 :
1531 : /* Convert tuple if needed. */
1532 280678 : if (relentry->attrmap)
1533 : {
1534 38 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1535 : &TTSOpsVirtual);
1536 :
1537 38 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1538 : }
1539 : }
1540 :
1541 : /*
1542 : * Check row filter.
1543 : *
1544 : * Updates could be transformed to inserts or deletes based on the results
1545 : * of the row filter for old and new tuple.
1546 : */
1547 364442 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1548 22 : goto cleanup;
1549 :
1550 : /*
1551 : * Send BEGIN if we haven't yet.
1552 : *
1553 : * We send the BEGIN message after ensuring that we will actually send the
1554 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1555 : * transactions.
1556 : */
1557 364420 : if (txndata && !txndata->sent_begin_txn)
1558 796 : pgoutput_send_begin(ctx, txn);
1559 :
1560 : /*
1561 : * Schema should be sent using the original relation because it also sends
1562 : * the ancestor's relation.
1563 : */
1564 364418 : maybe_send_schema(ctx, change, relation, relentry);
1565 :
1566 364418 : OutputPluginPrepareWrite(ctx, true);
1567 :
1568 : /* Send the data */
1569 364418 : switch (action)
1570 : {
1571 211774 : case REORDER_BUFFER_CHANGE_INSERT:
1572 211774 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1573 211774 : data->binary, relentry->columns,
1574 211774 : relentry->include_gencols);
1575 211774 : break;
1576 68878 : case REORDER_BUFFER_CHANGE_UPDATE:
1577 68878 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1578 68878 : new_slot, data->binary, relentry->columns,
1579 68878 : relentry->include_gencols);
1580 68878 : break;
1581 83766 : case REORDER_BUFFER_CHANGE_DELETE:
1582 83766 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1583 83766 : data->binary, relentry->columns,
1584 83766 : relentry->include_gencols);
1585 83766 : break;
1586 364418 : default:
1587 : Assert(false);
1588 : }
1589 :
1590 364418 : OutputPluginWrite(ctx, true);
1591 :
1592 364440 : cleanup:
1593 364440 : if (RelationIsValid(ancestor))
1594 : {
1595 136 : RelationClose(ancestor);
1596 136 : ancestor = NULL;
1597 : }
1598 :
1599 : /* Drop the new slots that were used to store the converted tuples. */
1600 364440 : if (relentry->attrmap)
1601 : {
1602 48 : if (old_slot)
1603 10 : ExecDropSingleTupleTableSlot(old_slot);
1604 :
1605 48 : if (new_slot)
1606 38 : ExecDropSingleTupleTableSlot(new_slot);
1607 : }
1608 :
1609 364440 : MemoryContextSwitchTo(old);
1610 364440 : MemoryContextReset(data->context);
1611 : }
1612 :
1613 : static void
1614 28 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1615 : int nrelations, Relation relations[], ReorderBufferChange *change)
1616 : {
1617 28 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1618 28 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1619 : MemoryContext old;
1620 : RelationSyncEntry *relentry;
1621 : int i;
1622 : int nrelids;
1623 : Oid *relids;
1624 28 : TransactionId xid = InvalidTransactionId;
1625 :
1626 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1627 28 : if (data->in_streaming)
1628 0 : xid = change->txn->xid;
1629 :
1630 28 : old = MemoryContextSwitchTo(data->context);
1631 :
1632 28 : relids = palloc0(nrelations * sizeof(Oid));
1633 28 : nrelids = 0;
1634 :
1635 94 : for (i = 0; i < nrelations; i++)
1636 : {
1637 66 : Relation relation = relations[i];
1638 66 : Oid relid = RelationGetRelid(relation);
1639 :
1640 66 : if (!is_publishable_relation(relation))
1641 0 : continue;
1642 :
1643 66 : relentry = get_rel_sync_entry(data, relation);
1644 :
1645 66 : if (!relentry->pubactions.pubtruncate)
1646 28 : continue;
1647 :
1648 : /*
1649 : * Don't send partitions if the publication wants to send only the
1650 : * root tables through it.
1651 : */
1652 38 : if (relation->rd_rel->relispartition &&
1653 30 : relentry->publish_as_relid != relid)
1654 6 : continue;
1655 :
1656 32 : relids[nrelids++] = relid;
1657 :
1658 : /* Send BEGIN if we haven't yet */
1659 32 : if (txndata && !txndata->sent_begin_txn)
1660 20 : pgoutput_send_begin(ctx, txn);
1661 :
1662 32 : maybe_send_schema(ctx, change, relation, relentry);
1663 : }
1664 :
1665 28 : if (nrelids > 0)
1666 : {
1667 20 : OutputPluginPrepareWrite(ctx, true);
1668 20 : logicalrep_write_truncate(ctx->out,
1669 : xid,
1670 : nrelids,
1671 : relids,
1672 20 : change->data.truncate.cascade,
1673 20 : change->data.truncate.restart_seqs);
1674 20 : OutputPluginWrite(ctx, true);
1675 : }
1676 :
1677 28 : MemoryContextSwitchTo(old);
1678 28 : MemoryContextReset(data->context);
1679 28 : }
1680 :
1681 : static void
1682 14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1683 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1684 : const char *message)
1685 : {
1686 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1687 14 : TransactionId xid = InvalidTransactionId;
1688 :
1689 14 : if (!data->messages)
1690 4 : return;
1691 :
1692 : /*
1693 : * Remember the xid for the message in streaming mode. See
1694 : * pgoutput_change.
1695 : */
1696 10 : if (data->in_streaming)
1697 0 : xid = txn->xid;
1698 :
1699 : /*
1700 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1701 : */
1702 10 : if (transactional)
1703 : {
1704 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1705 :
1706 : /* Send BEGIN if we haven't yet */
1707 4 : if (txndata && !txndata->sent_begin_txn)
1708 4 : pgoutput_send_begin(ctx, txn);
1709 : }
1710 :
1711 10 : OutputPluginPrepareWrite(ctx, true);
1712 10 : logicalrep_write_message(ctx->out,
1713 : xid,
1714 : message_lsn,
1715 : transactional,
1716 : prefix,
1717 : sz,
1718 : message);
1719 10 : OutputPluginWrite(ctx, true);
1720 : }
1721 :
1722 : /*
1723 : * Return true if the data is associated with an origin and the user has
1724 : * requested the changes that don't have an origin, false otherwise.
1725 : */
1726 : static bool
1727 982186 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1728 : RepOriginId origin_id)
1729 : {
1730 982186 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1731 :
1732 982186 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1733 140 : return true;
1734 :
1735 982046 : return false;
1736 : }
1737 :
1738 : /*
1739 : * Shutdown the output plugin.
1740 : *
1741 : * Note, we don't need to clean the data->context, data->cachectx, and
1742 : * data->pubctx as they are child contexts of the ctx->context so they
1743 : * will be cleaned up by logical decoding machinery.
1744 : */
1745 : static void
1746 966 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1747 : {
1748 966 : if (RelationSyncCache)
1749 : {
1750 364 : hash_destroy(RelationSyncCache);
1751 364 : RelationSyncCache = NULL;
1752 : }
1753 966 : }
1754 :
1755 : /*
1756 : * Load publications from the list of publication names.
1757 : */
1758 : static List *
1759 316 : LoadPublications(List *pubnames)
1760 : {
1761 316 : List *result = NIL;
1762 : ListCell *lc;
1763 :
1764 714 : foreach(lc, pubnames)
1765 : {
1766 402 : char *pubname = (char *) lfirst(lc);
1767 402 : Publication *pub = GetPublicationByName(pubname, false);
1768 :
1769 398 : result = lappend(result, pub);
1770 : }
1771 :
1772 312 : return result;
1773 : }
1774 :
1775 : /*
1776 : * Publication syscache invalidation callback.
1777 : *
1778 : * Called for invalidations on pg_publication.
1779 : */
1780 : static void
1781 458 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1782 : {
1783 458 : publications_valid = false;
1784 :
1785 : /*
1786 : * Also invalidate per-relation cache so that next time the filtering info
1787 : * is checked it will be updated with the new publication settings.
1788 : */
1789 458 : rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1790 458 : }
1791 :
1792 : /*
1793 : * START STREAM callback
1794 : */
1795 : static void
1796 1274 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1797 : ReorderBufferTXN *txn)
1798 : {
1799 1274 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1800 1274 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1801 :
1802 : /* we can't nest streaming of transactions */
1803 : Assert(!data->in_streaming);
1804 :
1805 : /*
1806 : * If we already sent the first stream for this transaction then don't
1807 : * send the origin id in the subsequent streams.
1808 : */
1809 1274 : if (rbtxn_is_streamed(txn))
1810 1148 : send_replication_origin = false;
1811 :
1812 1274 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1813 1274 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1814 :
1815 1274 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1816 : send_replication_origin);
1817 :
1818 1274 : OutputPluginWrite(ctx, true);
1819 :
1820 : /* we're streaming a chunk of transaction now */
1821 1274 : data->in_streaming = true;
1822 1274 : }
1823 :
1824 : /*
1825 : * STOP STREAM callback
1826 : */
1827 : static void
1828 1274 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1829 : ReorderBufferTXN *txn)
1830 : {
1831 1274 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1832 :
1833 : /* we should be streaming a transaction */
1834 : Assert(data->in_streaming);
1835 :
1836 1274 : OutputPluginPrepareWrite(ctx, true);
1837 1274 : logicalrep_write_stream_stop(ctx->out);
1838 1274 : OutputPluginWrite(ctx, true);
1839 :
1840 : /* we've stopped streaming a transaction */
1841 1274 : data->in_streaming = false;
1842 1274 : }
1843 :
1844 : /*
1845 : * Notify downstream to discard the streamed transaction (along with all
1846 : * it's subtransactions, if it's a toplevel transaction).
1847 : */
1848 : static void
1849 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1850 : ReorderBufferTXN *txn,
1851 : XLogRecPtr abort_lsn)
1852 : {
1853 : ReorderBufferTXN *toptxn;
1854 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1855 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1856 :
1857 : /*
1858 : * The abort should happen outside streaming block, even for streamed
1859 : * transactions. The transaction has to be marked as streamed, though.
1860 : */
1861 : Assert(!data->in_streaming);
1862 :
1863 : /* determine the toplevel transaction */
1864 52 : toptxn = rbtxn_get_toptxn(txn);
1865 :
1866 : Assert(rbtxn_is_streamed(toptxn));
1867 :
1868 52 : OutputPluginPrepareWrite(ctx, true);
1869 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1870 : txn->xact_time.abort_time, write_abort_info);
1871 :
1872 52 : OutputPluginWrite(ctx, true);
1873 :
1874 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1875 52 : }
1876 :
1877 : /*
1878 : * Notify downstream to apply the streamed transaction (along with all
1879 : * it's subtransactions).
1880 : */
1881 : static void
1882 92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1883 : ReorderBufferTXN *txn,
1884 : XLogRecPtr commit_lsn)
1885 : {
1886 92 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1887 :
1888 : /*
1889 : * The commit should happen outside streaming block, even for streamed
1890 : * transactions. The transaction has to be marked as streamed, though.
1891 : */
1892 : Assert(!data->in_streaming);
1893 : Assert(rbtxn_is_streamed(txn));
1894 :
1895 92 : OutputPluginUpdateProgress(ctx, false);
1896 :
1897 92 : OutputPluginPrepareWrite(ctx, true);
1898 92 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1899 92 : OutputPluginWrite(ctx, true);
1900 :
1901 92 : cleanup_rel_sync_cache(txn->xid, true);
1902 92 : }
1903 :
1904 : /*
1905 : * PREPARE callback (for streaming two-phase commit).
1906 : *
1907 : * Notify the downstream to prepare the transaction.
1908 : */
1909 : static void
1910 28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1911 : ReorderBufferTXN *txn,
1912 : XLogRecPtr prepare_lsn)
1913 : {
1914 : Assert(rbtxn_is_streamed(txn));
1915 :
1916 28 : OutputPluginUpdateProgress(ctx, false);
1917 28 : OutputPluginPrepareWrite(ctx, true);
1918 28 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1919 28 : OutputPluginWrite(ctx, true);
1920 28 : }
1921 :
1922 : /*
1923 : * Initialize the relation schema sync cache for a decoding session.
1924 : *
1925 : * The hash table is destroyed at the end of a decoding session. While
1926 : * relcache invalidations still exist and will still be invoked, they
1927 : * will just see the null hash table global and take no action.
1928 : */
1929 : static void
1930 702 : init_rel_sync_cache(MemoryContext cachectx)
1931 : {
1932 : HASHCTL ctl;
1933 : static bool relation_callbacks_registered = false;
1934 :
1935 : /* Nothing to do if hash table already exists */
1936 702 : if (RelationSyncCache != NULL)
1937 2 : return;
1938 :
1939 : /* Make a new hash table for the cache */
1940 702 : ctl.keysize = sizeof(Oid);
1941 702 : ctl.entrysize = sizeof(RelationSyncEntry);
1942 702 : ctl.hcxt = cachectx;
1943 :
1944 702 : RelationSyncCache = hash_create("logical replication output relation cache",
1945 : 128, &ctl,
1946 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1947 :
1948 : Assert(RelationSyncCache != NULL);
1949 :
1950 : /* No more to do if we already registered callbacks */
1951 702 : if (relation_callbacks_registered)
1952 2 : return;
1953 :
1954 : /* We must update the cache entry for a relation after a relcache flush */
1955 700 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1956 :
1957 : /*
1958 : * Flush all cache entries after a pg_namespace change, in case it was a
1959 : * schema rename affecting a relation being replicated.
1960 : */
1961 700 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1962 : rel_sync_cache_publication_cb,
1963 : (Datum) 0);
1964 :
1965 : /*
1966 : * Flush all cache entries after any publication changes. (We need no
1967 : * callback entry for pg_publication, because publication_invalidation_cb
1968 : * will take care of it.)
1969 : */
1970 700 : CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1971 : rel_sync_cache_publication_cb,
1972 : (Datum) 0);
1973 700 : CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1974 : rel_sync_cache_publication_cb,
1975 : (Datum) 0);
1976 :
1977 700 : relation_callbacks_registered = true;
1978 : }
1979 :
1980 : /*
1981 : * We expect relatively small number of streamed transactions.
1982 : */
1983 : static bool
1984 351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1985 : {
1986 351882 : return list_member_xid(entry->streamed_txns, xid);
1987 : }
1988 :
1989 : /*
1990 : * Add the xid in the rel sync entry for which we have already sent the schema
1991 : * of the relation.
1992 : */
1993 : static void
1994 142 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1995 : {
1996 : MemoryContext oldctx;
1997 :
1998 142 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1999 :
2000 142 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2001 :
2002 142 : MemoryContextSwitchTo(oldctx);
2003 142 : }
2004 :
2005 : /*
2006 : * Find or create entry in the relation schema cache.
2007 : *
2008 : * This looks up publications that the given relation is directly or
2009 : * indirectly part of (the latter if it's really the relation's ancestor that
2010 : * is part of a publication) and fills up the found entry with the information
2011 : * about which operations to publish and whether to use an ancestor's schema
2012 : * when publishing.
2013 : */
2014 : static RelationSyncEntry *
2015 366790 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2016 : {
2017 : RelationSyncEntry *entry;
2018 : bool found;
2019 : MemoryContext oldctx;
2020 366790 : Oid relid = RelationGetRelid(relation);
2021 :
2022 : Assert(RelationSyncCache != NULL);
2023 :
2024 : /* Find cached relation info, creating if not found */
2025 366790 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2026 : &relid,
2027 : HASH_ENTER, &found);
2028 : Assert(entry != NULL);
2029 :
2030 : /* initialize entry, if it's new */
2031 366790 : if (!found)
2032 : {
2033 508 : entry->replicate_valid = false;
2034 508 : entry->schema_sent = false;
2035 508 : entry->include_gencols = false;
2036 508 : entry->streamed_txns = NIL;
2037 508 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2038 508 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2039 508 : entry->new_slot = NULL;
2040 508 : entry->old_slot = NULL;
2041 508 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2042 508 : entry->entry_cxt = NULL;
2043 508 : entry->publish_as_relid = InvalidOid;
2044 508 : entry->columns = NULL;
2045 508 : entry->attrmap = NULL;
2046 : }
2047 :
2048 : /* Validate the entry */
2049 366790 : if (!entry->replicate_valid)
2050 : {
2051 640 : Oid schemaId = get_rel_namespace(relid);
2052 640 : List *pubids = GetRelationPublications(relid);
2053 :
2054 : /*
2055 : * We don't acquire a lock on the namespace system table as we build
2056 : * the cache entry using a historic snapshot and all the later changes
2057 : * are absorbed while decoding WAL.
2058 : */
2059 640 : List *schemaPubids = GetSchemaPublications(schemaId);
2060 : ListCell *lc;
2061 640 : Oid publish_as_relid = relid;
2062 640 : int publish_ancestor_level = 0;
2063 640 : bool am_partition = get_rel_relispartition(relid);
2064 640 : char relkind = get_rel_relkind(relid);
2065 640 : List *rel_publications = NIL;
2066 :
2067 : /* Reload publications if needed before use. */
2068 640 : if (!publications_valid)
2069 : {
2070 316 : MemoryContextReset(data->pubctx);
2071 :
2072 316 : oldctx = MemoryContextSwitchTo(data->pubctx);
2073 316 : data->publications = LoadPublications(data->publication_names);
2074 312 : MemoryContextSwitchTo(oldctx);
2075 312 : publications_valid = true;
2076 : }
2077 :
2078 : /*
2079 : * Reset schema_sent status as the relation definition may have
2080 : * changed. Also reset pubactions to empty in case rel was dropped
2081 : * from a publication. Also free any objects that depended on the
2082 : * earlier definition.
2083 : */
2084 636 : entry->schema_sent = false;
2085 636 : entry->include_gencols = false;
2086 636 : list_free(entry->streamed_txns);
2087 636 : entry->streamed_txns = NIL;
2088 636 : bms_free(entry->columns);
2089 636 : entry->columns = NULL;
2090 636 : entry->pubactions.pubinsert = false;
2091 636 : entry->pubactions.pubupdate = false;
2092 636 : entry->pubactions.pubdelete = false;
2093 636 : entry->pubactions.pubtruncate = false;
2094 :
2095 : /*
2096 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2097 : */
2098 636 : if (entry->old_slot)
2099 : {
2100 112 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2101 :
2102 : Assert(desc->tdrefcount == -1);
2103 :
2104 112 : ExecDropSingleTupleTableSlot(entry->old_slot);
2105 :
2106 : /*
2107 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2108 : * do it now to avoid any leaks.
2109 : */
2110 112 : FreeTupleDesc(desc);
2111 : }
2112 636 : if (entry->new_slot)
2113 : {
2114 112 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2115 :
2116 : Assert(desc->tdrefcount == -1);
2117 :
2118 112 : ExecDropSingleTupleTableSlot(entry->new_slot);
2119 :
2120 : /*
2121 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2122 : * do it now to avoid any leaks.
2123 : */
2124 112 : FreeTupleDesc(desc);
2125 : }
2126 :
2127 636 : entry->old_slot = NULL;
2128 636 : entry->new_slot = NULL;
2129 :
2130 636 : if (entry->attrmap)
2131 6 : free_attrmap(entry->attrmap);
2132 636 : entry->attrmap = NULL;
2133 :
2134 : /*
2135 : * Row filter cache cleanups.
2136 : */
2137 636 : if (entry->entry_cxt)
2138 112 : MemoryContextDelete(entry->entry_cxt);
2139 :
2140 636 : entry->entry_cxt = NULL;
2141 636 : entry->estate = NULL;
2142 636 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2143 :
2144 : /*
2145 : * Build publication cache. We can't use one provided by relcache as
2146 : * relcache considers all publications that the given relation is in,
2147 : * but here we only need to consider ones that the subscriber
2148 : * requested.
2149 : */
2150 1568 : foreach(lc, data->publications)
2151 : {
2152 932 : Publication *pub = lfirst(lc);
2153 932 : bool publish = false;
2154 :
2155 : /*
2156 : * Under what relid should we publish changes in this publication?
2157 : * We'll use the top-most relid across all publications. Also
2158 : * track the ancestor level for this publication.
2159 : */
2160 932 : Oid pub_relid = relid;
2161 932 : int ancestor_level = 0;
2162 :
2163 : /*
2164 : * If this is a FOR ALL TABLES publication, pick the partition
2165 : * root and set the ancestor level accordingly.
2166 : */
2167 932 : if (pub->alltables)
2168 : {
2169 164 : publish = true;
2170 164 : if (pub->pubviaroot && am_partition)
2171 : {
2172 24 : List *ancestors = get_partition_ancestors(relid);
2173 :
2174 24 : pub_relid = llast_oid(ancestors);
2175 24 : ancestor_level = list_length(ancestors);
2176 : }
2177 : }
2178 :
2179 932 : if (!publish)
2180 : {
2181 768 : bool ancestor_published = false;
2182 :
2183 : /*
2184 : * For a partition, check if any of the ancestors are
2185 : * published. If so, note down the topmost ancestor that is
2186 : * published via this publication, which will be used as the
2187 : * relation via which to publish the partition's changes.
2188 : */
2189 768 : if (am_partition)
2190 : {
2191 : Oid ancestor;
2192 : int level;
2193 236 : List *ancestors = get_partition_ancestors(relid);
2194 :
2195 236 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2196 : ancestors,
2197 : &level);
2198 :
2199 236 : if (ancestor != InvalidOid)
2200 : {
2201 96 : ancestor_published = true;
2202 96 : if (pub->pubviaroot)
2203 : {
2204 50 : pub_relid = ancestor;
2205 50 : ancestor_level = level;
2206 : }
2207 : }
2208 : }
2209 :
2210 1194 : if (list_member_oid(pubids, pub->oid) ||
2211 840 : list_member_oid(schemaPubids, pub->oid) ||
2212 : ancestor_published)
2213 410 : publish = true;
2214 : }
2215 :
2216 : /*
2217 : * If the relation is to be published, determine actions to
2218 : * publish, and list of columns, if appropriate.
2219 : *
2220 : * Don't publish changes for partitioned tables, because
2221 : * publishing those of its partitions suffices, unless partition
2222 : * changes won't be published due to pubviaroot being set.
2223 : */
2224 932 : if (publish &&
2225 8 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2226 : {
2227 568 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2228 568 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2229 568 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2230 568 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2231 :
2232 : /*
2233 : * We want to publish the changes as the top-most ancestor
2234 : * across all publications. So we need to check if the already
2235 : * calculated level is higher than the new one. If yes, we can
2236 : * ignore the new value (as it's a child). Otherwise the new
2237 : * value is an ancestor, so we keep it.
2238 : */
2239 568 : if (publish_ancestor_level > ancestor_level)
2240 2 : continue;
2241 :
2242 : /*
2243 : * If we found an ancestor higher up in the tree, discard the
2244 : * list of publications through which we replicate it, and use
2245 : * the new ancestor.
2246 : */
2247 566 : if (publish_ancestor_level < ancestor_level)
2248 : {
2249 74 : publish_as_relid = pub_relid;
2250 74 : publish_ancestor_level = ancestor_level;
2251 :
2252 : /* reset the publication list for this relation */
2253 74 : rel_publications = NIL;
2254 : }
2255 : else
2256 : {
2257 : /* Same ancestor level, has to be the same OID. */
2258 : Assert(publish_as_relid == pub_relid);
2259 : }
2260 :
2261 : /* Track publications for this ancestor. */
2262 566 : rel_publications = lappend(rel_publications, pub);
2263 : }
2264 : }
2265 :
2266 636 : entry->publish_as_relid = publish_as_relid;
2267 :
2268 : /*
2269 : * Initialize the tuple slot, map, and row filter. These are only used
2270 : * when publishing inserts, updates, or deletes.
2271 : */
2272 636 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2273 96 : entry->pubactions.pubdelete)
2274 : {
2275 : /* Initialize the tuple slot and map */
2276 540 : init_tuple_slot(data, relation, entry);
2277 :
2278 : /* Initialize the row filter */
2279 540 : pgoutput_row_filter_init(data, rel_publications, entry);
2280 :
2281 : /* Check whether to publish generated columns. */
2282 540 : check_and_init_gencol(data, rel_publications, entry);
2283 :
2284 : /* Initialize the column list */
2285 540 : pgoutput_column_list_init(data, rel_publications, entry);
2286 : }
2287 :
2288 634 : list_free(pubids);
2289 634 : list_free(schemaPubids);
2290 634 : list_free(rel_publications);
2291 :
2292 634 : entry->replicate_valid = true;
2293 : }
2294 :
2295 366784 : return entry;
2296 : }
2297 :
2298 : /*
2299 : * Cleanup list of streamed transactions and update the schema_sent flag.
2300 : *
2301 : * When a streamed transaction commits or aborts, we need to remove the
2302 : * toplevel XID from the schema cache. If the transaction aborted, the
2303 : * subscriber will simply throw away the schema records we streamed, so
2304 : * we don't need to do anything else.
2305 : *
2306 : * If the transaction is committed, the subscriber will update the relation
2307 : * cache - so tweak the schema_sent flag accordingly.
2308 : */
2309 : static void
2310 144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2311 : {
2312 : HASH_SEQ_STATUS hash_seq;
2313 : RelationSyncEntry *entry;
2314 :
2315 : Assert(RelationSyncCache != NULL);
2316 :
2317 144 : hash_seq_init(&hash_seq, RelationSyncCache);
2318 294 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2319 : {
2320 : /*
2321 : * We can set the schema_sent flag for an entry that has committed xid
2322 : * in the list as that ensures that the subscriber would have the
2323 : * corresponding schema and we don't need to send it unless there is
2324 : * any invalidation for that relation.
2325 : */
2326 340 : foreach_xid(streamed_txn, entry->streamed_txns)
2327 : {
2328 148 : if (xid == streamed_txn)
2329 : {
2330 108 : if (is_commit)
2331 86 : entry->schema_sent = true;
2332 :
2333 108 : entry->streamed_txns =
2334 108 : foreach_delete_current(entry->streamed_txns, streamed_txn);
2335 108 : break;
2336 : }
2337 : }
2338 : }
2339 144 : }
2340 :
2341 : /*
2342 : * Relcache invalidation callback
2343 : */
2344 : static void
2345 7168 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2346 : {
2347 : RelationSyncEntry *entry;
2348 :
2349 : /*
2350 : * We can get here if the plugin was used in SQL interface as the
2351 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2352 : * no way to unregister the relcache invalidation callback.
2353 : */
2354 7168 : if (RelationSyncCache == NULL)
2355 18 : return;
2356 :
2357 : /*
2358 : * Nobody keeps pointers to entries in this hash table around outside
2359 : * logical decoding callback calls - but invalidation events can come in
2360 : * *during* a callback if we do any syscache access in the callback.
2361 : * Because of that we must mark the cache entry as invalid but not damage
2362 : * any of its substructure here. The next get_rel_sync_entry() call will
2363 : * rebuild it all.
2364 : */
2365 7150 : if (OidIsValid(relid))
2366 : {
2367 : /*
2368 : * Getting invalidations for relations that aren't in the table is
2369 : * entirely normal. So we don't care if it's found or not.
2370 : */
2371 7084 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2372 : HASH_FIND, NULL);
2373 7084 : if (entry != NULL)
2374 1212 : entry->replicate_valid = false;
2375 : }
2376 : else
2377 : {
2378 : /* Whole cache must be flushed. */
2379 : HASH_SEQ_STATUS status;
2380 :
2381 66 : hash_seq_init(&status, RelationSyncCache);
2382 176 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2383 : {
2384 110 : entry->replicate_valid = false;
2385 : }
2386 : }
2387 : }
2388 :
2389 : /*
2390 : * Publication relation/schema map syscache invalidation callback
2391 : *
2392 : * Called for invalidations on pg_publication, pg_publication_rel,
2393 : * pg_publication_namespace, and pg_namespace.
2394 : */
2395 : static void
2396 1238 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2397 : {
2398 : HASH_SEQ_STATUS status;
2399 : RelationSyncEntry *entry;
2400 :
2401 : /*
2402 : * We can get here if the plugin was used in SQL interface as the
2403 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2404 : * no way to unregister the invalidation callbacks.
2405 : */
2406 1238 : if (RelationSyncCache == NULL)
2407 68 : return;
2408 :
2409 : /*
2410 : * We have no easy way to identify which cache entries this invalidation
2411 : * event might have affected, so just mark them all invalid.
2412 : */
2413 1170 : hash_seq_init(&status, RelationSyncCache);
2414 3658 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2415 : {
2416 2488 : entry->replicate_valid = false;
2417 : }
2418 : }
2419 :
2420 : /* Send Replication origin */
2421 : static void
2422 2134 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2423 : XLogRecPtr origin_lsn, bool send_origin)
2424 : {
2425 2134 : if (send_origin)
2426 : {
2427 : char *origin;
2428 :
2429 : /*----------
2430 : * XXX: which behaviour do we want here?
2431 : *
2432 : * Alternatives:
2433 : * - don't send origin message if origin name not found
2434 : * (that's what we do now)
2435 : * - throw error - that will break replication, not good
2436 : * - send some special "unknown" origin
2437 : *----------
2438 : */
2439 18 : if (replorigin_by_oid(origin_id, true, &origin))
2440 : {
2441 : /* Message boundary */
2442 18 : OutputPluginWrite(ctx, false);
2443 18 : OutputPluginPrepareWrite(ctx, true);
2444 :
2445 18 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2446 : }
2447 : }
2448 2134 : }
|