Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "parser/parse_relation.h"
26 : #include "replication/logical.h"
27 : #include "replication/logicalproto.h"
28 : #include "replication/origin.h"
29 : #include "replication/pgoutput.h"
30 : #include "rewrite/rewriteHandler.h"
31 : #include "utils/builtins.h"
32 : #include "utils/inval.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 : #include "utils/syscache.h"
37 : #include "utils/varlena.h"
38 :
39 954 : PG_MODULE_MAGIC;
40 :
41 : static void pgoutput_startup(LogicalDecodingContext *ctx,
42 : OutputPluginOptions *opt, bool is_init);
43 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
44 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
45 : ReorderBufferTXN *txn);
46 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
47 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
48 : static void pgoutput_change(LogicalDecodingContext *ctx,
49 : ReorderBufferTXN *txn, Relation relation,
50 : ReorderBufferChange *change);
51 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
52 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
53 : ReorderBufferChange *change);
54 : static void pgoutput_message(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
56 : bool transactional, const char *prefix,
57 : Size sz, const char *message);
58 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
59 : RepOriginId origin_id);
60 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
61 : ReorderBufferTXN *txn);
62 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
63 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
64 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
65 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
66 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
67 : ReorderBufferTXN *txn,
68 : XLogRecPtr prepare_end_lsn,
69 : TimestampTz prepare_time);
70 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
71 : ReorderBufferTXN *txn);
72 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
73 : ReorderBufferTXN *txn);
74 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
75 : ReorderBufferTXN *txn,
76 : XLogRecPtr abort_lsn);
77 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
78 : ReorderBufferTXN *txn,
79 : XLogRecPtr commit_lsn);
80 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
82 :
83 : static bool publications_valid;
84 :
85 : static List *LoadPublications(List *pubnames);
86 : static void publication_invalidation_cb(Datum arg, int cacheid,
87 : uint32 hashvalue);
88 : static void send_repl_origin(LogicalDecodingContext *ctx,
89 : RepOriginId origin_id, XLogRecPtr origin_lsn,
90 : bool send_origin);
91 :
92 : /*
93 : * Only 3 publication actions are used for row filtering ("insert", "update",
94 : * "delete"). See RelationSyncEntry.exprstate[].
95 : */
96 : enum RowFilterPubAction
97 : {
98 : PUBACTION_INSERT,
99 : PUBACTION_UPDATE,
100 : PUBACTION_DELETE,
101 : };
102 :
103 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
104 :
105 : /*
106 : * Entry in the map used to remember which relation schemas we sent.
107 : *
108 : * The schema_sent flag determines if the current schema record for the
109 : * relation (and for its ancestor if publish_as_relid is set) was already
110 : * sent to the subscriber (in which case we don't need to send it again).
111 : *
112 : * The schema cache on downstream is however updated only at commit time,
113 : * and with streamed transactions the commit order may be different from
114 : * the order the transactions are sent in. Also, the (sub) transactions
115 : * might get aborted so we need to send the schema for each (sub) transaction
116 : * so that we don't lose the schema information on abort. For handling this,
117 : * we maintain the list of xids (streamed_txns) for those we have already sent
118 : * the schema.
119 : *
120 : * For partitions, 'pubactions' considers not only the table's own
121 : * publications, but also those of all of its ancestors.
122 : */
123 : typedef struct RelationSyncEntry
124 : {
125 : Oid relid; /* relation oid */
126 :
127 : bool replicate_valid; /* overall validity flag for entry */
128 :
129 : bool schema_sent;
130 :
131 : /*
132 : * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
133 : * columns and the 'publish_generated_columns' parameter is set to
134 : * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
135 : * indicating that no generated columns should be published, unless
136 : * explicitly specified in the column list.
137 : */
138 : PublishGencolsType include_gencols_type;
139 : List *streamed_txns; /* streamed toplevel transactions with this
140 : * schema */
141 :
142 : /* are we publishing this rel? */
143 : PublicationActions pubactions;
144 :
145 : /*
146 : * ExprState array for row filter. Different publication actions don't
147 : * allow multiple expressions to always be combined into one, because
148 : * updates or deletes restrict the column in expression to be part of the
149 : * replica identity index whereas inserts do not have this restriction, so
150 : * there is one ExprState per publication action.
151 : */
152 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
153 : EState *estate; /* executor state used for row filter */
154 : TupleTableSlot *new_slot; /* slot for storing new tuple */
155 : TupleTableSlot *old_slot; /* slot for storing old tuple */
156 :
157 : /*
158 : * OID of the relation to publish changes as. For a partition, this may
159 : * be set to one of its ancestors whose schema will be used when
160 : * replicating changes, if publish_via_partition_root is set for the
161 : * publication.
162 : */
163 : Oid publish_as_relid;
164 :
165 : /*
166 : * Map used when replicating using an ancestor's schema to convert tuples
167 : * from partition's type to the ancestor's; NULL if publish_as_relid is
168 : * same as 'relid' or if unnecessary due to partition and the ancestor
169 : * having identical TupleDesc.
170 : */
171 : AttrMap *attrmap;
172 :
173 : /*
174 : * Columns included in the publication, or NULL if all columns are
175 : * included implicitly. Note that the attnums in this bitmap are not
176 : * shifted by FirstLowInvalidHeapAttributeNumber.
177 : */
178 : Bitmapset *columns;
179 :
180 : /*
181 : * Private context to store additional data for this entry - state for the
182 : * row filter expressions, column list, etc.
183 : */
184 : MemoryContext entry_cxt;
185 : } RelationSyncEntry;
186 :
187 : /*
188 : * Maintain a per-transaction level variable to track whether the transaction
189 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
190 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
191 : * messages for empty transactions which saves network bandwidth.
192 : *
193 : * This optimization is not used for prepared transactions because if the
194 : * WALSender restarts after prepare of a transaction and before commit prepared
195 : * of the same transaction then we won't be able to figure out if we have
196 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
197 : * because we would have lost the in-memory txndata information that was
198 : * present prior to the restart. This will result in sending a spurious
199 : * COMMIT PREPARED without a corresponding prepared transaction at the
200 : * downstream which would lead to an error when it tries to process it.
201 : *
202 : * XXX We could achieve this optimization by changing protocol to send
203 : * additional information so that downstream can detect that the corresponding
204 : * prepare has not been sent. However, adding such a check for every
205 : * transaction in the downstream could be costly so we might want to do it
206 : * optionally.
207 : *
208 : * We also don't have this optimization for streamed transactions because
209 : * they can contain prepared transactions.
210 : */
211 : typedef struct PGOutputTxnData
212 : {
213 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
214 : } PGOutputTxnData;
215 :
216 : /* Map used to remember which relation schemas we sent. */
217 : static HTAB *RelationSyncCache = NULL;
218 :
219 : static void init_rel_sync_cache(MemoryContext cachectx);
220 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
221 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
222 : Relation relation);
223 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
224 : LogicalDecodingContext *ctx,
225 : RelationSyncEntry *relentry);
226 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
227 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
228 : uint32 hashvalue);
229 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
230 : TransactionId xid);
231 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
232 : TransactionId xid);
233 : static void init_tuple_slot(PGOutputData *data, Relation relation,
234 : RelationSyncEntry *entry);
235 :
236 : /* row filter routines */
237 : static EState *create_estate_for_relation(Relation rel);
238 : static void pgoutput_row_filter_init(PGOutputData *data,
239 : List *publications,
240 : RelationSyncEntry *entry);
241 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
242 : ExprContext *econtext);
243 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
244 : TupleTableSlot **new_slot_ptr,
245 : RelationSyncEntry *entry,
246 : ReorderBufferChangeType *action);
247 :
248 : /* column list routines */
249 : static void pgoutput_column_list_init(PGOutputData *data,
250 : List *publications,
251 : RelationSyncEntry *entry);
252 :
253 : /*
254 : * Specify output plugin callbacks
255 : */
256 : void
257 1314 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
258 : {
259 1314 : cb->startup_cb = pgoutput_startup;
260 1314 : cb->begin_cb = pgoutput_begin_txn;
261 1314 : cb->change_cb = pgoutput_change;
262 1314 : cb->truncate_cb = pgoutput_truncate;
263 1314 : cb->message_cb = pgoutput_message;
264 1314 : cb->commit_cb = pgoutput_commit_txn;
265 :
266 1314 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
267 1314 : cb->prepare_cb = pgoutput_prepare_txn;
268 1314 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
269 1314 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
270 1314 : cb->filter_by_origin_cb = pgoutput_origin_filter;
271 1314 : cb->shutdown_cb = pgoutput_shutdown;
272 :
273 : /* transaction streaming */
274 1314 : cb->stream_start_cb = pgoutput_stream_start;
275 1314 : cb->stream_stop_cb = pgoutput_stream_stop;
276 1314 : cb->stream_abort_cb = pgoutput_stream_abort;
277 1314 : cb->stream_commit_cb = pgoutput_stream_commit;
278 1314 : cb->stream_change_cb = pgoutput_change;
279 1314 : cb->stream_message_cb = pgoutput_message;
280 1314 : cb->stream_truncate_cb = pgoutput_truncate;
281 : /* transaction streaming - two-phase commit */
282 1314 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
283 1314 : }
284 :
285 : static void
286 704 : parse_output_parameters(List *options, PGOutputData *data)
287 : {
288 : ListCell *lc;
289 704 : bool protocol_version_given = false;
290 704 : bool publication_names_given = false;
291 704 : bool binary_option_given = false;
292 704 : bool messages_option_given = false;
293 704 : bool streaming_given = false;
294 704 : bool two_phase_option_given = false;
295 704 : bool origin_option_given = false;
296 :
297 704 : data->binary = false;
298 704 : data->streaming = LOGICALREP_STREAM_OFF;
299 704 : data->messages = false;
300 704 : data->two_phase = false;
301 :
302 3516 : foreach(lc, options)
303 : {
304 2812 : DefElem *defel = (DefElem *) lfirst(lc);
305 :
306 : Assert(defel->arg == NULL || IsA(defel->arg, String));
307 :
308 : /* Check each param, whether or not we recognize it */
309 2812 : if (strcmp(defel->defname, "proto_version") == 0)
310 : {
311 : unsigned long parsed;
312 : char *endptr;
313 :
314 704 : if (protocol_version_given)
315 0 : ereport(ERROR,
316 : (errcode(ERRCODE_SYNTAX_ERROR),
317 : errmsg("conflicting or redundant options")));
318 704 : protocol_version_given = true;
319 :
320 704 : errno = 0;
321 704 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
322 704 : if (errno != 0 || *endptr != '\0')
323 0 : ereport(ERROR,
324 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
325 : errmsg("invalid proto_version")));
326 :
327 704 : if (parsed > PG_UINT32_MAX)
328 0 : ereport(ERROR,
329 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
330 : errmsg("proto_version \"%s\" out of range",
331 : strVal(defel->arg))));
332 :
333 704 : data->protocol_version = (uint32) parsed;
334 : }
335 2108 : else if (strcmp(defel->defname, "publication_names") == 0)
336 : {
337 704 : if (publication_names_given)
338 0 : ereport(ERROR,
339 : (errcode(ERRCODE_SYNTAX_ERROR),
340 : errmsg("conflicting or redundant options")));
341 704 : publication_names_given = true;
342 :
343 704 : if (!SplitIdentifierString(strVal(defel->arg), ',',
344 : &data->publication_names))
345 0 : ereport(ERROR,
346 : (errcode(ERRCODE_INVALID_NAME),
347 : errmsg("invalid publication_names syntax")));
348 : }
349 1404 : else if (strcmp(defel->defname, "binary") == 0)
350 : {
351 20 : if (binary_option_given)
352 0 : ereport(ERROR,
353 : (errcode(ERRCODE_SYNTAX_ERROR),
354 : errmsg("conflicting or redundant options")));
355 20 : binary_option_given = true;
356 :
357 20 : data->binary = defGetBoolean(defel);
358 : }
359 1384 : else if (strcmp(defel->defname, "messages") == 0)
360 : {
361 8 : if (messages_option_given)
362 0 : ereport(ERROR,
363 : (errcode(ERRCODE_SYNTAX_ERROR),
364 : errmsg("conflicting or redundant options")));
365 8 : messages_option_given = true;
366 :
367 8 : data->messages = defGetBoolean(defel);
368 : }
369 1376 : else if (strcmp(defel->defname, "streaming") == 0)
370 : {
371 672 : if (streaming_given)
372 0 : ereport(ERROR,
373 : (errcode(ERRCODE_SYNTAX_ERROR),
374 : errmsg("conflicting or redundant options")));
375 672 : streaming_given = true;
376 :
377 672 : data->streaming = defGetStreamingMode(defel);
378 : }
379 704 : else if (strcmp(defel->defname, "two_phase") == 0)
380 : {
381 14 : if (two_phase_option_given)
382 0 : ereport(ERROR,
383 : (errcode(ERRCODE_SYNTAX_ERROR),
384 : errmsg("conflicting or redundant options")));
385 14 : two_phase_option_given = true;
386 :
387 14 : data->two_phase = defGetBoolean(defel);
388 : }
389 690 : else if (strcmp(defel->defname, "origin") == 0)
390 : {
391 : char *origin;
392 :
393 690 : if (origin_option_given)
394 0 : ereport(ERROR,
395 : errcode(ERRCODE_SYNTAX_ERROR),
396 : errmsg("conflicting or redundant options"));
397 690 : origin_option_given = true;
398 :
399 690 : origin = defGetString(defel);
400 690 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
401 26 : data->publish_no_origin = true;
402 664 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
403 664 : data->publish_no_origin = false;
404 : else
405 0 : ereport(ERROR,
406 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
407 : errmsg("unrecognized origin value: \"%s\"", origin));
408 : }
409 : else
410 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
411 : }
412 :
413 : /* Check required options */
414 704 : if (!protocol_version_given)
415 0 : ereport(ERROR,
416 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 : errmsg("option \"%s\" missing", "proto_version"));
418 704 : if (!publication_names_given)
419 0 : ereport(ERROR,
420 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
421 : errmsg("option \"%s\" missing", "publication_names"));
422 704 : }
423 :
424 : /*
425 : * Initialize this plugin
426 : */
427 : static void
428 1314 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
429 : bool is_init)
430 : {
431 1314 : PGOutputData *data = palloc0(sizeof(PGOutputData));
432 : static bool publication_callback_registered = false;
433 :
434 : /* Create our memory context for private allocations. */
435 1314 : data->context = AllocSetContextCreate(ctx->context,
436 : "logical replication output context",
437 : ALLOCSET_DEFAULT_SIZES);
438 :
439 1314 : data->cachectx = AllocSetContextCreate(ctx->context,
440 : "logical replication cache context",
441 : ALLOCSET_DEFAULT_SIZES);
442 :
443 1314 : data->pubctx = AllocSetContextCreate(ctx->context,
444 : "logical replication publication list context",
445 : ALLOCSET_SMALL_SIZES);
446 :
447 1314 : ctx->output_plugin_private = data;
448 :
449 : /* This plugin uses binary protocol. */
450 1314 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
451 :
452 : /*
453 : * This is replication start and not slot initialization.
454 : *
455 : * Parse and validate options passed by the client.
456 : */
457 1314 : if (!is_init)
458 : {
459 : /* Parse the params and ERROR if we see any we don't recognize */
460 704 : parse_output_parameters(ctx->output_plugin_options, data);
461 :
462 : /* Check if we support requested protocol */
463 704 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
464 0 : ereport(ERROR,
465 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
466 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
467 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
468 :
469 704 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
470 0 : ereport(ERROR,
471 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
472 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
473 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
474 :
475 : /*
476 : * Decide whether to enable streaming. It is disabled by default, in
477 : * which case we just update the flag in decoding context. Otherwise
478 : * we only allow it with sufficient version of the protocol, and when
479 : * the output plugin supports it.
480 : */
481 704 : if (data->streaming == LOGICALREP_STREAM_OFF)
482 32 : ctx->streaming = false;
483 672 : else if (data->streaming == LOGICALREP_STREAM_ON &&
484 54 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
485 0 : ereport(ERROR,
486 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
487 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
488 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
489 672 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
490 618 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
491 0 : ereport(ERROR,
492 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
493 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
494 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
495 672 : else if (!ctx->streaming)
496 0 : ereport(ERROR,
497 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 : errmsg("streaming requested, but not supported by output plugin")));
499 :
500 : /*
501 : * Here, we just check whether the two-phase option is passed by
502 : * plugin and decide whether to enable it at later point of time. It
503 : * remains enabled if the previous start-up has done so. But we only
504 : * allow the option to be passed in with sufficient version of the
505 : * protocol, and when the output plugin supports it.
506 : */
507 704 : if (!data->two_phase)
508 690 : ctx->twophase_opt_given = false;
509 14 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
510 0 : ereport(ERROR,
511 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
512 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
513 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
514 14 : else if (!ctx->twophase)
515 0 : ereport(ERROR,
516 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
517 : errmsg("two-phase commit requested, but not supported by output plugin")));
518 : else
519 14 : ctx->twophase_opt_given = true;
520 :
521 : /* Init publication state. */
522 704 : data->publications = NIL;
523 704 : publications_valid = false;
524 :
525 : /*
526 : * Register callback for pg_publication if we didn't already do that
527 : * during some previous call in this process.
528 : */
529 704 : if (!publication_callback_registered)
530 : {
531 702 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
532 : publication_invalidation_cb,
533 : (Datum) 0);
534 702 : publication_callback_registered = true;
535 : }
536 :
537 : /* Initialize relation schema cache. */
538 704 : init_rel_sync_cache(CacheMemoryContext);
539 : }
540 : else
541 : {
542 : /*
543 : * Disable the streaming and prepared transactions during the slot
544 : * initialization mode.
545 : */
546 610 : ctx->streaming = false;
547 610 : ctx->twophase = false;
548 : }
549 1314 : }
550 :
551 : /*
552 : * BEGIN callback.
553 : *
554 : * Don't send the BEGIN message here instead postpone it until the first
555 : * change. In logical replication, a common scenario is to replicate a set of
556 : * tables (instead of all tables) and transactions whose changes were on
557 : * the table(s) that are not published will produce empty transactions. These
558 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
559 : * using bandwidth on something with little/no use for logical replication.
560 : */
561 : static void
562 1446 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
563 : {
564 1446 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
565 : sizeof(PGOutputTxnData));
566 :
567 1446 : txn->output_plugin_private = txndata;
568 1446 : }
569 :
570 : /*
571 : * Send BEGIN.
572 : *
573 : * This is called while processing the first change of the transaction.
574 : */
575 : static void
576 820 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
577 : {
578 820 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
579 820 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
580 :
581 : Assert(txndata);
582 : Assert(!txndata->sent_begin_txn);
583 :
584 820 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
585 820 : logicalrep_write_begin(ctx->out, txn);
586 820 : txndata->sent_begin_txn = true;
587 :
588 820 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
589 : send_replication_origin);
590 :
591 820 : OutputPluginWrite(ctx, true);
592 820 : }
593 :
594 : /*
595 : * COMMIT callback
596 : */
597 : static void
598 1440 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
599 : XLogRecPtr commit_lsn)
600 : {
601 1440 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
602 : bool sent_begin_txn;
603 :
604 : Assert(txndata);
605 :
606 : /*
607 : * We don't need to send the commit message unless some relevant change
608 : * from this transaction has been sent to the downstream.
609 : */
610 1440 : sent_begin_txn = txndata->sent_begin_txn;
611 1440 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
612 1440 : pfree(txndata);
613 1440 : txn->output_plugin_private = NULL;
614 :
615 1440 : if (!sent_begin_txn)
616 : {
617 622 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
618 622 : return;
619 : }
620 :
621 818 : OutputPluginPrepareWrite(ctx, true);
622 818 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
623 818 : OutputPluginWrite(ctx, true);
624 : }
625 :
626 : /*
627 : * BEGIN PREPARE callback
628 : */
629 : static void
630 40 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
631 : {
632 40 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
633 :
634 40 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
635 40 : logicalrep_write_begin_prepare(ctx->out, txn);
636 :
637 40 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
638 : send_replication_origin);
639 :
640 40 : OutputPluginWrite(ctx, true);
641 40 : }
642 :
643 : /*
644 : * PREPARE callback
645 : */
646 : static void
647 40 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
648 : XLogRecPtr prepare_lsn)
649 : {
650 40 : OutputPluginUpdateProgress(ctx, false);
651 :
652 40 : OutputPluginPrepareWrite(ctx, true);
653 40 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
654 40 : OutputPluginWrite(ctx, true);
655 40 : }
656 :
657 : /*
658 : * COMMIT PREPARED callback
659 : */
660 : static void
661 48 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
662 : XLogRecPtr commit_lsn)
663 : {
664 48 : OutputPluginUpdateProgress(ctx, false);
665 :
666 48 : OutputPluginPrepareWrite(ctx, true);
667 48 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
668 48 : OutputPluginWrite(ctx, true);
669 48 : }
670 :
671 : /*
672 : * ROLLBACK PREPARED callback
673 : */
674 : static void
675 18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
676 : ReorderBufferTXN *txn,
677 : XLogRecPtr prepare_end_lsn,
678 : TimestampTz prepare_time)
679 : {
680 18 : OutputPluginUpdateProgress(ctx, false);
681 :
682 18 : OutputPluginPrepareWrite(ctx, true);
683 18 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
684 : prepare_time);
685 18 : OutputPluginWrite(ctx, true);
686 18 : }
687 :
688 : /*
689 : * Write the current schema of the relation and its ancestor (if any) if not
690 : * done yet.
691 : */
692 : static void
693 364468 : maybe_send_schema(LogicalDecodingContext *ctx,
694 : ReorderBufferChange *change,
695 : Relation relation, RelationSyncEntry *relentry)
696 : {
697 364468 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
698 : bool schema_sent;
699 364468 : TransactionId xid = InvalidTransactionId;
700 364468 : TransactionId topxid = InvalidTransactionId;
701 :
702 : /*
703 : * Remember XID of the (sub)transaction for the change. We don't care if
704 : * it's top-level transaction or not (we have already sent that XID in
705 : * start of the current streaming block).
706 : *
707 : * If we're not in a streaming block, just use InvalidTransactionId and
708 : * the write methods will not include it.
709 : */
710 364468 : if (data->in_streaming)
711 351882 : xid = change->txn->xid;
712 :
713 364468 : if (rbtxn_is_subtxn(change->txn))
714 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
715 : else
716 344130 : topxid = xid;
717 :
718 : /*
719 : * Do we need to send the schema? We do track streamed transactions
720 : * separately, because those may be applied later (and the regular
721 : * transactions won't see their effects until then) and in an order that
722 : * we don't know at this point.
723 : *
724 : * XXX There is a scope of optimization here. Currently, we always send
725 : * the schema first time in a streaming transaction but we can probably
726 : * avoid that by checking 'relentry->schema_sent' flag. However, before
727 : * doing that we need to study its impact on the case where we have a mix
728 : * of streaming and non-streaming transactions.
729 : */
730 364468 : if (data->in_streaming)
731 351882 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
732 : else
733 12586 : schema_sent = relentry->schema_sent;
734 :
735 : /* Nothing to do if we already sent the schema. */
736 364468 : if (schema_sent)
737 363850 : return;
738 :
739 : /*
740 : * Send the schema. If the changes will be published using an ancestor's
741 : * schema, not the relation's own, send that ancestor's schema before
742 : * sending relation's own (XXX - maybe sending only the former suffices?).
743 : */
744 618 : if (relentry->publish_as_relid != RelationGetRelid(relation))
745 : {
746 64 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
747 :
748 64 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
749 62 : RelationClose(ancestor);
750 : }
751 :
752 616 : send_relation_and_attrs(relation, xid, ctx, relentry);
753 :
754 616 : if (data->in_streaming)
755 142 : set_schema_sent_in_streamed_txn(relentry, topxid);
756 : else
757 474 : relentry->schema_sent = true;
758 : }
759 :
760 : /*
761 : * Sends a relation
762 : */
763 : static void
764 680 : send_relation_and_attrs(Relation relation, TransactionId xid,
765 : LogicalDecodingContext *ctx,
766 : RelationSyncEntry *relentry)
767 : {
768 680 : TupleDesc desc = RelationGetDescr(relation);
769 680 : Bitmapset *columns = relentry->columns;
770 680 : PublishGencolsType include_gencols_type = relentry->include_gencols_type;
771 : int i;
772 :
773 : /*
774 : * Write out type info if needed. We do that only for user-created types.
775 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
776 : * objects with hand-assigned OIDs to be "built in", not for instance any
777 : * function or type defined in the information_schema. This is important
778 : * because only hand-assigned OIDs can be expected to remain stable across
779 : * major versions.
780 : */
781 2076 : for (i = 0; i < desc->natts; i++)
782 : {
783 1396 : Form_pg_attribute att = TupleDescAttr(desc, i);
784 :
785 1396 : if (!logicalrep_should_publish_column(att, columns,
786 : include_gencols_type))
787 142 : continue;
788 :
789 1254 : if (att->atttypid < FirstGenbkiObjectId)
790 1218 : continue;
791 :
792 36 : OutputPluginPrepareWrite(ctx, false);
793 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
794 36 : OutputPluginWrite(ctx, false);
795 : }
796 :
797 680 : OutputPluginPrepareWrite(ctx, false);
798 680 : logicalrep_write_rel(ctx->out, xid, relation, columns,
799 : include_gencols_type);
800 680 : OutputPluginWrite(ctx, false);
801 678 : }
802 :
803 : /*
804 : * Executor state preparation for evaluation of row filter expressions for the
805 : * specified relation.
806 : */
807 : static EState *
808 34 : create_estate_for_relation(Relation rel)
809 : {
810 : EState *estate;
811 : RangeTblEntry *rte;
812 34 : List *perminfos = NIL;
813 :
814 34 : estate = CreateExecutorState();
815 :
816 34 : rte = makeNode(RangeTblEntry);
817 34 : rte->rtekind = RTE_RELATION;
818 34 : rte->relid = RelationGetRelid(rel);
819 34 : rte->relkind = rel->rd_rel->relkind;
820 34 : rte->rellockmode = AccessShareLock;
821 :
822 34 : addRTEPermissionInfo(&perminfos, rte);
823 :
824 34 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
825 : bms_make_singleton(1));
826 :
827 34 : estate->es_output_cid = GetCurrentCommandId(false);
828 :
829 34 : return estate;
830 : }
831 :
832 : /*
833 : * Evaluates row filter.
834 : *
835 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
836 : * isn't replicated.
837 : */
838 : static bool
839 76 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
840 : {
841 : Datum ret;
842 : bool isnull;
843 :
844 : Assert(state != NULL);
845 :
846 76 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
847 :
848 76 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
849 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
850 : isnull ? "true" : "false");
851 :
852 76 : if (isnull)
853 2 : return false;
854 :
855 74 : return DatumGetBool(ret);
856 : }
857 :
858 : /*
859 : * Make sure the per-entry memory context exists.
860 : */
861 : static void
862 576 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
863 : {
864 : Relation relation;
865 :
866 : /* The context may already exist, in which case bail out. */
867 576 : if (entry->entry_cxt)
868 34 : return;
869 :
870 542 : relation = RelationIdGetRelation(entry->publish_as_relid);
871 :
872 542 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
873 : "entry private context",
874 : ALLOCSET_SMALL_SIZES);
875 :
876 542 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
877 : RelationGetRelationName(relation));
878 : }
879 :
880 : /*
881 : * Initialize the row filter.
882 : */
883 : static void
884 542 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
885 : RelationSyncEntry *entry)
886 : {
887 : ListCell *lc;
888 542 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
889 542 : bool no_filter[] = {false, false, false}; /* One per pubaction */
890 : MemoryContext oldctx;
891 : int idx;
892 542 : bool has_filter = true;
893 542 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
894 :
895 : /*
896 : * Find if there are any row filters for this relation. If there are, then
897 : * prepare the necessary ExprState and cache it in entry->exprstate. To
898 : * build an expression state, we need to ensure the following:
899 : *
900 : * All the given publication-table mappings must be checked.
901 : *
902 : * Multiple publications might have multiple row filters for this
903 : * relation. Since row filter usage depends on the DML operation, there
904 : * are multiple lists (one for each operation) to which row filters will
905 : * be appended.
906 : *
907 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
908 : * expression" so it takes precedence.
909 : */
910 584 : foreach(lc, publications)
911 : {
912 550 : Publication *pub = lfirst(lc);
913 550 : HeapTuple rftuple = NULL;
914 550 : Datum rfdatum = 0;
915 550 : bool pub_no_filter = true;
916 :
917 : /*
918 : * If the publication is FOR ALL TABLES, or the publication includes a
919 : * FOR TABLES IN SCHEMA where the table belongs to the referred
920 : * schema, then it is treated the same as if there are no row filters
921 : * (even if other publications have a row filter).
922 : */
923 550 : if (!pub->alltables &&
924 392 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
925 : ObjectIdGetDatum(schemaid),
926 : ObjectIdGetDatum(pub->oid)))
927 : {
928 : /*
929 : * Check for the presence of a row filter in this publication.
930 : */
931 380 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
932 : ObjectIdGetDatum(entry->publish_as_relid),
933 : ObjectIdGetDatum(pub->oid));
934 :
935 380 : if (HeapTupleIsValid(rftuple))
936 : {
937 : /* Null indicates no filter. */
938 356 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
939 : Anum_pg_publication_rel_prqual,
940 : &pub_no_filter);
941 : }
942 : }
943 :
944 550 : if (pub_no_filter)
945 : {
946 522 : if (rftuple)
947 328 : ReleaseSysCache(rftuple);
948 :
949 522 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
950 522 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
951 522 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
952 :
953 : /*
954 : * Quick exit if all the DML actions are publicized via this
955 : * publication.
956 : */
957 522 : if (no_filter[PUBACTION_INSERT] &&
958 522 : no_filter[PUBACTION_UPDATE] &&
959 508 : no_filter[PUBACTION_DELETE])
960 : {
961 508 : has_filter = false;
962 508 : break;
963 : }
964 :
965 : /* No additional work for this publication. Next one. */
966 14 : continue;
967 : }
968 :
969 : /* Form the per pubaction row filter lists. */
970 28 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
971 28 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
972 28 : TextDatumGetCString(rfdatum));
973 28 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
974 28 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
975 28 : TextDatumGetCString(rfdatum));
976 28 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
977 28 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
978 28 : TextDatumGetCString(rfdatum));
979 :
980 28 : ReleaseSysCache(rftuple);
981 : } /* loop all subscribed publications */
982 :
983 : /* Clean the row filter */
984 2168 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
985 : {
986 1626 : if (no_filter[idx])
987 : {
988 1542 : list_free_deep(rfnodes[idx]);
989 1542 : rfnodes[idx] = NIL;
990 : }
991 : }
992 :
993 542 : if (has_filter)
994 : {
995 34 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
996 :
997 34 : pgoutput_ensure_entry_cxt(data, entry);
998 :
999 : /*
1000 : * Now all the filters for all pubactions are known. Combine them when
1001 : * their pubactions are the same.
1002 : */
1003 34 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1004 34 : entry->estate = create_estate_for_relation(relation);
1005 136 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1006 : {
1007 102 : List *filters = NIL;
1008 : Expr *rfnode;
1009 :
1010 102 : if (rfnodes[idx] == NIL)
1011 42 : continue;
1012 :
1013 126 : foreach(lc, rfnodes[idx])
1014 66 : filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1015 :
1016 : /* combine the row filter and cache the ExprState */
1017 60 : rfnode = make_orclause(filters);
1018 60 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1019 : } /* for each pubaction */
1020 34 : MemoryContextSwitchTo(oldctx);
1021 :
1022 34 : RelationClose(relation);
1023 : }
1024 542 : }
1025 :
1026 : /*
1027 : * If the table contains a generated column, check for any conflicting
1028 : * values of 'publish_generated_columns' parameter in the publications.
1029 : */
1030 : static void
1031 542 : check_and_init_gencol(PGOutputData *data, List *publications,
1032 : RelationSyncEntry *entry)
1033 : {
1034 542 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1035 542 : TupleDesc desc = RelationGetDescr(relation);
1036 542 : bool gencolpresent = false;
1037 542 : bool first = true;
1038 :
1039 : /* Check if there is any generated column present. */
1040 1628 : for (int i = 0; i < desc->natts; i++)
1041 : {
1042 1100 : Form_pg_attribute att = TupleDescAttr(desc, i);
1043 :
1044 1100 : if (att->attgenerated)
1045 : {
1046 14 : gencolpresent = true;
1047 14 : break;
1048 : }
1049 : }
1050 :
1051 : /* There are no generated columns to be published. */
1052 542 : if (!gencolpresent)
1053 : {
1054 528 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1055 528 : return;
1056 : }
1057 :
1058 : /*
1059 : * There may be a conflicting value for 'publish_generated_columns'
1060 : * parameter in the publications.
1061 : */
1062 44 : foreach_ptr(Publication, pub, publications)
1063 : {
1064 : /*
1065 : * The column list takes precedence over the
1066 : * 'publish_generated_columns' parameter. Those will be checked later,
1067 : * see pgoutput_column_list_init.
1068 : */
1069 16 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1070 6 : continue;
1071 :
1072 10 : if (first)
1073 : {
1074 10 : entry->include_gencols_type = pub->pubgencols_type;
1075 10 : first = false;
1076 : }
1077 0 : else if (entry->include_gencols_type != pub->pubgencols_type)
1078 0 : ereport(ERROR,
1079 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1080 : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1081 : get_namespace_name(RelationGetNamespace(relation)),
1082 : RelationGetRelationName(relation)));
1083 : }
1084 : }
1085 :
1086 : /*
1087 : * Initialize the column list.
1088 : */
1089 : static void
1090 542 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1091 : RelationSyncEntry *entry)
1092 : {
1093 : ListCell *lc;
1094 542 : bool first = true;
1095 542 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1096 542 : bool found_pub_collist = false;
1097 542 : Bitmapset *relcols = NULL;
1098 :
1099 542 : pgoutput_ensure_entry_cxt(data, entry);
1100 :
1101 : /*
1102 : * Find if there are any column lists for this relation. If there are,
1103 : * build a bitmap using the column lists.
1104 : *
1105 : * Multiple publications might have multiple column lists for this
1106 : * relation.
1107 : *
1108 : * Note that we don't support the case where the column list is different
1109 : * for the same table when combining publications. See comments atop
1110 : * fetch_table_list. But one can later change the publication so we still
1111 : * need to check all the given publication-table mappings and report an
1112 : * error if any publications have a different column list.
1113 : */
1114 1104 : foreach(lc, publications)
1115 : {
1116 564 : Publication *pub = lfirst(lc);
1117 564 : Bitmapset *cols = NULL;
1118 :
1119 : /* Retrieve the bitmap of columns for a column list publication. */
1120 564 : found_pub_collist |= check_and_fetch_column_list(pub,
1121 : entry->publish_as_relid,
1122 : entry->entry_cxt, &cols);
1123 :
1124 : /*
1125 : * For non-column list publications — e.g. TABLE (without a column
1126 : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1127 : * of the table (including generated columns when
1128 : * 'publish_generated_columns' parameter is true).
1129 : */
1130 564 : if (!cols)
1131 : {
1132 : /*
1133 : * Cache the table columns for the first publication with no
1134 : * specified column list to detect publication with a different
1135 : * column list.
1136 : */
1137 486 : if (!relcols && (list_length(publications) > 1))
1138 : {
1139 18 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1140 :
1141 18 : relcols = pub_form_cols_map(relation,
1142 : entry->include_gencols_type);
1143 18 : MemoryContextSwitchTo(oldcxt);
1144 : }
1145 :
1146 486 : cols = relcols;
1147 : }
1148 :
1149 564 : if (first)
1150 : {
1151 542 : entry->columns = cols;
1152 542 : first = false;
1153 : }
1154 22 : else if (!bms_equal(entry->columns, cols))
1155 2 : ereport(ERROR,
1156 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1157 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1158 : get_namespace_name(RelationGetNamespace(relation)),
1159 : RelationGetRelationName(relation)));
1160 : } /* loop all subscribed publications */
1161 :
1162 : /*
1163 : * If no column list publications exist, columns to be published will be
1164 : * computed later according to the 'publish_generated_columns' parameter.
1165 : */
1166 540 : if (!found_pub_collist)
1167 468 : entry->columns = NULL;
1168 :
1169 540 : RelationClose(relation);
1170 540 : }
1171 :
1172 : /*
1173 : * Initialize the slot for storing new and old tuples, and build the map that
1174 : * will be used to convert the relation's tuples into the ancestor's format.
1175 : */
1176 : static void
1177 542 : init_tuple_slot(PGOutputData *data, Relation relation,
1178 : RelationSyncEntry *entry)
1179 : {
1180 : MemoryContext oldctx;
1181 : TupleDesc oldtupdesc;
1182 : TupleDesc newtupdesc;
1183 :
1184 542 : oldctx = MemoryContextSwitchTo(data->cachectx);
1185 :
1186 : /*
1187 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1188 : * live as long as the cache remains.
1189 : */
1190 542 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1191 542 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1192 :
1193 542 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1194 542 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1195 :
1196 542 : MemoryContextSwitchTo(oldctx);
1197 :
1198 : /*
1199 : * Cache the map that will be used to convert the relation's tuples into
1200 : * the ancestor's format, if needed.
1201 : */
1202 542 : if (entry->publish_as_relid != RelationGetRelid(relation))
1203 : {
1204 72 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1205 72 : TupleDesc indesc = RelationGetDescr(relation);
1206 72 : TupleDesc outdesc = RelationGetDescr(ancestor);
1207 :
1208 : /* Map must live as long as the logical decoding context. */
1209 72 : oldctx = MemoryContextSwitchTo(data->cachectx);
1210 :
1211 72 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1212 :
1213 72 : MemoryContextSwitchTo(oldctx);
1214 72 : RelationClose(ancestor);
1215 : }
1216 542 : }
1217 :
1218 : /*
1219 : * Change is checked against the row filter if any.
1220 : *
1221 : * Returns true if the change is to be replicated, else false.
1222 : *
1223 : * For inserts, evaluate the row filter for new tuple.
1224 : * For deletes, evaluate the row filter for old tuple.
1225 : * For updates, evaluate the row filter for old and new tuple.
1226 : *
1227 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1228 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1229 : * only one of the tuples matches the row filter expression, we transform
1230 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1231 : * following rules:
1232 : *
1233 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1234 : * Case 2: old-row (no match) new row (match) -> INSERT
1235 : * Case 3: old-row (match) new-row (no match) -> DELETE
1236 : * Case 4: old-row (match) new row (match) -> UPDATE
1237 : *
1238 : * The new action is updated in the action parameter.
1239 : *
1240 : * The new slot could be updated when transforming the UPDATE into INSERT,
1241 : * because the original new tuple might not have column values from the replica
1242 : * identity.
1243 : *
1244 : * Examples:
1245 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1246 : * Since the old tuple satisfies, the initial table synchronization copied this
1247 : * row (or another method was used to guarantee that there is data
1248 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1249 : * row filter, so from a data consistency perspective, that row should be
1250 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1251 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1252 : * is undesirable because it doesn't reflect what was defined in the row filter
1253 : * expression on the publisher. This row on the subscriber would likely not be
1254 : * modified by replication again. If someone inserted a new row with the same
1255 : * old identifier, replication could stop due to a constraint violation.
1256 : *
1257 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1258 : * Since the old tuple doesn't satisfy, the initial table synchronization
1259 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1260 : * satisfy the row filter, so from a data consistency perspective, that row
1261 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1262 : * statements have no effect (it matches no row -- see
1263 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1264 : * INSERT statement and be sent to the subscriber. However, this might surprise
1265 : * someone who expects the data set to satisfy the row filter expression on the
1266 : * provider.
1267 : */
1268 : static bool
1269 364460 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1270 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1271 : ReorderBufferChangeType *action)
1272 : {
1273 : TupleDesc desc;
1274 : int i;
1275 : bool old_matched,
1276 : new_matched,
1277 : result;
1278 : TupleTableSlot *tmp_new_slot;
1279 364460 : TupleTableSlot *new_slot = *new_slot_ptr;
1280 : ExprContext *ecxt;
1281 : ExprState *filter_exprstate;
1282 :
1283 : /*
1284 : * We need this map to avoid relying on ReorderBufferChangeType enums
1285 : * having specific values.
1286 : */
1287 : static const int map_changetype_pubaction[] = {
1288 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1289 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1290 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1291 : };
1292 :
1293 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1294 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1295 : *action == REORDER_BUFFER_CHANGE_DELETE);
1296 :
1297 : Assert(new_slot || old_slot);
1298 :
1299 : /* Get the corresponding row filter */
1300 364460 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1301 :
1302 : /* Bail out if there is no row filter */
1303 364460 : if (!filter_exprstate)
1304 364392 : return true;
1305 :
1306 68 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1307 : get_namespace_name(RelationGetNamespace(relation)),
1308 : RelationGetRelationName(relation));
1309 :
1310 68 : ResetPerTupleExprContext(entry->estate);
1311 :
1312 68 : ecxt = GetPerTupleExprContext(entry->estate);
1313 :
1314 : /*
1315 : * For the following occasions where there is only one tuple, we can
1316 : * evaluate the row filter for that tuple and return.
1317 : *
1318 : * For inserts, we only have the new tuple.
1319 : *
1320 : * For updates, we can have only a new tuple when none of the replica
1321 : * identity columns changed and none of those columns have external data
1322 : * but we still need to evaluate the row filter for the new tuple as the
1323 : * existing values of those columns might not match the filter. Also,
1324 : * users can use constant expressions in the row filter, so we anyway need
1325 : * to evaluate it for the new tuple.
1326 : *
1327 : * For deletes, we only have the old tuple.
1328 : */
1329 68 : if (!new_slot || !old_slot)
1330 : {
1331 60 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1332 60 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1333 :
1334 60 : return result;
1335 : }
1336 :
1337 : /*
1338 : * Both the old and new tuples must be valid only for updates and need to
1339 : * be checked against the row filter.
1340 : */
1341 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1342 :
1343 8 : slot_getallattrs(new_slot);
1344 8 : slot_getallattrs(old_slot);
1345 :
1346 8 : tmp_new_slot = NULL;
1347 8 : desc = RelationGetDescr(relation);
1348 :
1349 : /*
1350 : * The new tuple might not have all the replica identity columns, in which
1351 : * case it needs to be copied over from the old tuple.
1352 : */
1353 24 : for (i = 0; i < desc->natts; i++)
1354 : {
1355 16 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1356 :
1357 : /*
1358 : * if the column in the new tuple or old tuple is null, nothing to do
1359 : */
1360 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1361 2 : continue;
1362 :
1363 : /*
1364 : * Unchanged toasted replica identity columns are only logged in the
1365 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1366 : * Logged) toast values are always assembled in memory and set as
1367 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1368 : */
1369 14 : if (att->attlen == -1 &&
1370 8 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1371 2 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1372 : {
1373 2 : if (!tmp_new_slot)
1374 : {
1375 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1376 2 : ExecClearTuple(tmp_new_slot);
1377 :
1378 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1379 2 : desc->natts * sizeof(Datum));
1380 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1381 2 : desc->natts * sizeof(bool));
1382 : }
1383 :
1384 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1385 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1386 : }
1387 : }
1388 :
1389 8 : ecxt->ecxt_scantuple = old_slot;
1390 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1391 :
1392 8 : if (tmp_new_slot)
1393 : {
1394 2 : ExecStoreVirtualTuple(tmp_new_slot);
1395 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1396 : }
1397 : else
1398 6 : ecxt->ecxt_scantuple = new_slot;
1399 :
1400 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1401 :
1402 : /*
1403 : * Case 1: if both tuples don't match the row filter, bailout. Send
1404 : * nothing.
1405 : */
1406 8 : if (!old_matched && !new_matched)
1407 0 : return false;
1408 :
1409 : /*
1410 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1411 : * tuple does, transform the UPDATE into INSERT.
1412 : *
1413 : * Use the newly transformed tuple that must contain the column values for
1414 : * all the replica identity columns. This is required to ensure that the
1415 : * while inserting the tuple in the downstream node, we have all the
1416 : * required column values.
1417 : */
1418 8 : if (!old_matched && new_matched)
1419 : {
1420 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1421 :
1422 4 : if (tmp_new_slot)
1423 2 : *new_slot_ptr = tmp_new_slot;
1424 : }
1425 :
1426 : /*
1427 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1428 : * doesn't, transform the UPDATE into DELETE.
1429 : *
1430 : * This transformation does not require another tuple. The Old tuple will
1431 : * be used for DELETE.
1432 : */
1433 4 : else if (old_matched && !new_matched)
1434 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1435 :
1436 : /*
1437 : * Case 4: if both tuples match the row filter, transformation isn't
1438 : * required. (*action is default UPDATE).
1439 : */
1440 :
1441 8 : return true;
1442 : }
1443 :
1444 : /*
1445 : * Sends the decoded DML over wire.
1446 : *
1447 : * This is called both in streaming and non-streaming modes.
1448 : */
1449 : static void
1450 366740 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1451 : Relation relation, ReorderBufferChange *change)
1452 : {
1453 366740 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1454 366740 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1455 : MemoryContext old;
1456 : RelationSyncEntry *relentry;
1457 366740 : TransactionId xid = InvalidTransactionId;
1458 366740 : Relation ancestor = NULL;
1459 366740 : Relation targetrel = relation;
1460 366740 : ReorderBufferChangeType action = change->action;
1461 366740 : TupleTableSlot *old_slot = NULL;
1462 366740 : TupleTableSlot *new_slot = NULL;
1463 :
1464 366740 : if (!is_publishable_relation(relation))
1465 2276 : return;
1466 :
1467 : /*
1468 : * Remember the xid for the change in streaming mode. We need to send xid
1469 : * with each change in the streaming mode so that subscriber can make
1470 : * their association and on aborts, it can discard the corresponding
1471 : * changes.
1472 : */
1473 366740 : if (data->in_streaming)
1474 351882 : xid = change->txn->xid;
1475 :
1476 366740 : relentry = get_rel_sync_entry(data, relation);
1477 :
1478 : /* First check the table filter */
1479 366736 : switch (action)
1480 : {
1481 211918 : case REORDER_BUFFER_CHANGE_INSERT:
1482 211918 : if (!relentry->pubactions.pubinsert)
1483 106 : return;
1484 211812 : break;
1485 68972 : case REORDER_BUFFER_CHANGE_UPDATE:
1486 68972 : if (!relentry->pubactions.pubupdate)
1487 88 : return;
1488 68884 : break;
1489 85846 : case REORDER_BUFFER_CHANGE_DELETE:
1490 85846 : if (!relentry->pubactions.pubdelete)
1491 2082 : return;
1492 :
1493 : /*
1494 : * This is only possible if deletes are allowed even when replica
1495 : * identity is not defined for a table. Since the DELETE action
1496 : * can't be published, we simply return.
1497 : */
1498 83764 : if (!change->data.tp.oldtuple)
1499 : {
1500 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1501 0 : return;
1502 : }
1503 83764 : break;
1504 364460 : default:
1505 : Assert(false);
1506 : }
1507 :
1508 : /* Avoid leaking memory by using and resetting our own context */
1509 364460 : old = MemoryContextSwitchTo(data->context);
1510 :
1511 : /* Switch relation if publishing via root. */
1512 364460 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1513 : {
1514 : Assert(relation->rd_rel->relispartition);
1515 138 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1516 138 : targetrel = ancestor;
1517 : }
1518 :
1519 364460 : if (change->data.tp.oldtuple)
1520 : {
1521 84024 : old_slot = relentry->old_slot;
1522 84024 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1523 :
1524 : /* Convert tuple if needed. */
1525 84024 : if (relentry->attrmap)
1526 : {
1527 10 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1528 : &TTSOpsVirtual);
1529 :
1530 10 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1531 : }
1532 : }
1533 :
1534 364460 : if (change->data.tp.newtuple)
1535 : {
1536 280696 : new_slot = relentry->new_slot;
1537 280696 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1538 :
1539 : /* Convert tuple if needed. */
1540 280696 : if (relentry->attrmap)
1541 : {
1542 38 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1543 : &TTSOpsVirtual);
1544 :
1545 38 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1546 : }
1547 : }
1548 :
1549 : /*
1550 : * Check row filter.
1551 : *
1552 : * Updates could be transformed to inserts or deletes based on the results
1553 : * of the row filter for old and new tuple.
1554 : */
1555 364460 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1556 24 : goto cleanup;
1557 :
1558 : /*
1559 : * Send BEGIN if we haven't yet.
1560 : *
1561 : * We send the BEGIN message after ensuring that we will actually send the
1562 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1563 : * transactions.
1564 : */
1565 364436 : if (txndata && !txndata->sent_begin_txn)
1566 796 : pgoutput_send_begin(ctx, txn);
1567 :
1568 : /*
1569 : * Schema should be sent using the original relation because it also sends
1570 : * the ancestor's relation.
1571 : */
1572 364436 : maybe_send_schema(ctx, change, relation, relentry);
1573 :
1574 364434 : OutputPluginPrepareWrite(ctx, true);
1575 :
1576 : /* Send the data */
1577 364434 : switch (action)
1578 : {
1579 211790 : case REORDER_BUFFER_CHANGE_INSERT:
1580 211790 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1581 211790 : data->binary, relentry->columns,
1582 : relentry->include_gencols_type);
1583 211790 : break;
1584 68878 : case REORDER_BUFFER_CHANGE_UPDATE:
1585 68878 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1586 68878 : new_slot, data->binary, relentry->columns,
1587 : relentry->include_gencols_type);
1588 68878 : break;
1589 83766 : case REORDER_BUFFER_CHANGE_DELETE:
1590 83766 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1591 83766 : data->binary, relentry->columns,
1592 : relentry->include_gencols_type);
1593 83766 : break;
1594 364434 : default:
1595 : Assert(false);
1596 : }
1597 :
1598 364434 : OutputPluginWrite(ctx, true);
1599 :
1600 364458 : cleanup:
1601 364458 : if (RelationIsValid(ancestor))
1602 : {
1603 136 : RelationClose(ancestor);
1604 136 : ancestor = NULL;
1605 : }
1606 :
1607 : /* Drop the new slots that were used to store the converted tuples. */
1608 364458 : if (relentry->attrmap)
1609 : {
1610 48 : if (old_slot)
1611 10 : ExecDropSingleTupleTableSlot(old_slot);
1612 :
1613 48 : if (new_slot)
1614 38 : ExecDropSingleTupleTableSlot(new_slot);
1615 : }
1616 :
1617 364458 : MemoryContextSwitchTo(old);
1618 364458 : MemoryContextReset(data->context);
1619 : }
1620 :
1621 : static void
1622 28 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1623 : int nrelations, Relation relations[], ReorderBufferChange *change)
1624 : {
1625 28 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1626 28 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1627 : MemoryContext old;
1628 : RelationSyncEntry *relentry;
1629 : int i;
1630 : int nrelids;
1631 : Oid *relids;
1632 28 : TransactionId xid = InvalidTransactionId;
1633 :
1634 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1635 28 : if (data->in_streaming)
1636 0 : xid = change->txn->xid;
1637 :
1638 28 : old = MemoryContextSwitchTo(data->context);
1639 :
1640 28 : relids = palloc0(nrelations * sizeof(Oid));
1641 28 : nrelids = 0;
1642 :
1643 94 : for (i = 0; i < nrelations; i++)
1644 : {
1645 66 : Relation relation = relations[i];
1646 66 : Oid relid = RelationGetRelid(relation);
1647 :
1648 66 : if (!is_publishable_relation(relation))
1649 0 : continue;
1650 :
1651 66 : relentry = get_rel_sync_entry(data, relation);
1652 :
1653 66 : if (!relentry->pubactions.pubtruncate)
1654 28 : continue;
1655 :
1656 : /*
1657 : * Don't send partitions if the publication wants to send only the
1658 : * root tables through it.
1659 : */
1660 38 : if (relation->rd_rel->relispartition &&
1661 30 : relentry->publish_as_relid != relid)
1662 6 : continue;
1663 :
1664 32 : relids[nrelids++] = relid;
1665 :
1666 : /* Send BEGIN if we haven't yet */
1667 32 : if (txndata && !txndata->sent_begin_txn)
1668 20 : pgoutput_send_begin(ctx, txn);
1669 :
1670 32 : maybe_send_schema(ctx, change, relation, relentry);
1671 : }
1672 :
1673 28 : if (nrelids > 0)
1674 : {
1675 20 : OutputPluginPrepareWrite(ctx, true);
1676 20 : logicalrep_write_truncate(ctx->out,
1677 : xid,
1678 : nrelids,
1679 : relids,
1680 20 : change->data.truncate.cascade,
1681 20 : change->data.truncate.restart_seqs);
1682 20 : OutputPluginWrite(ctx, true);
1683 : }
1684 :
1685 28 : MemoryContextSwitchTo(old);
1686 28 : MemoryContextReset(data->context);
1687 28 : }
1688 :
1689 : static void
1690 14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1691 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1692 : const char *message)
1693 : {
1694 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1695 14 : TransactionId xid = InvalidTransactionId;
1696 :
1697 14 : if (!data->messages)
1698 4 : return;
1699 :
1700 : /*
1701 : * Remember the xid for the message in streaming mode. See
1702 : * pgoutput_change.
1703 : */
1704 10 : if (data->in_streaming)
1705 0 : xid = txn->xid;
1706 :
1707 : /*
1708 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1709 : */
1710 10 : if (transactional)
1711 : {
1712 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1713 :
1714 : /* Send BEGIN if we haven't yet */
1715 4 : if (txndata && !txndata->sent_begin_txn)
1716 4 : pgoutput_send_begin(ctx, txn);
1717 : }
1718 :
1719 10 : OutputPluginPrepareWrite(ctx, true);
1720 10 : logicalrep_write_message(ctx->out,
1721 : xid,
1722 : message_lsn,
1723 : transactional,
1724 : prefix,
1725 : sz,
1726 : message);
1727 10 : OutputPluginWrite(ctx, true);
1728 : }
1729 :
1730 : /*
1731 : * Return true if the data is associated with an origin and the user has
1732 : * requested the changes that don't have an origin, false otherwise.
1733 : */
1734 : static bool
1735 982216 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1736 : RepOriginId origin_id)
1737 : {
1738 982216 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1739 :
1740 982216 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1741 140 : return true;
1742 :
1743 982076 : return false;
1744 : }
1745 :
1746 : /*
1747 : * Shutdown the output plugin.
1748 : *
1749 : * Note, we don't need to clean the data->context, data->cachectx, and
1750 : * data->pubctx as they are child contexts of the ctx->context so they
1751 : * will be cleaned up by logical decoding machinery.
1752 : */
1753 : static void
1754 978 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1755 : {
1756 978 : if (RelationSyncCache)
1757 : {
1758 368 : hash_destroy(RelationSyncCache);
1759 368 : RelationSyncCache = NULL;
1760 : }
1761 978 : }
1762 :
1763 : /*
1764 : * Load publications from the list of publication names.
1765 : */
1766 : static List *
1767 312 : LoadPublications(List *pubnames)
1768 : {
1769 312 : List *result = NIL;
1770 : ListCell *lc;
1771 :
1772 712 : foreach(lc, pubnames)
1773 : {
1774 402 : char *pubname = (char *) lfirst(lc);
1775 402 : Publication *pub = GetPublicationByName(pubname, false);
1776 :
1777 400 : result = lappend(result, pub);
1778 : }
1779 :
1780 310 : return result;
1781 : }
1782 :
1783 : /*
1784 : * Publication syscache invalidation callback.
1785 : *
1786 : * Called for invalidations on pg_publication.
1787 : */
1788 : static void
1789 460 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1790 : {
1791 460 : publications_valid = false;
1792 :
1793 : /*
1794 : * Also invalidate per-relation cache so that next time the filtering info
1795 : * is checked it will be updated with the new publication settings.
1796 : */
1797 460 : rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1798 460 : }
1799 :
1800 : /*
1801 : * START STREAM callback
1802 : */
1803 : static void
1804 1294 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1805 : ReorderBufferTXN *txn)
1806 : {
1807 1294 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1808 1294 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1809 :
1810 : /* we can't nest streaming of transactions */
1811 : Assert(!data->in_streaming);
1812 :
1813 : /*
1814 : * If we already sent the first stream for this transaction then don't
1815 : * send the origin id in the subsequent streams.
1816 : */
1817 1294 : if (rbtxn_is_streamed(txn))
1818 1168 : send_replication_origin = false;
1819 :
1820 1294 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1821 1294 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1822 :
1823 1294 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1824 : send_replication_origin);
1825 :
1826 1294 : OutputPluginWrite(ctx, true);
1827 :
1828 : /* we're streaming a chunk of transaction now */
1829 1294 : data->in_streaming = true;
1830 1294 : }
1831 :
1832 : /*
1833 : * STOP STREAM callback
1834 : */
1835 : static void
1836 1294 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1837 : ReorderBufferTXN *txn)
1838 : {
1839 1294 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1840 :
1841 : /* we should be streaming a transaction */
1842 : Assert(data->in_streaming);
1843 :
1844 1294 : OutputPluginPrepareWrite(ctx, true);
1845 1294 : logicalrep_write_stream_stop(ctx->out);
1846 1294 : OutputPluginWrite(ctx, true);
1847 :
1848 : /* we've stopped streaming a transaction */
1849 1294 : data->in_streaming = false;
1850 1294 : }
1851 :
1852 : /*
1853 : * Notify downstream to discard the streamed transaction (along with all
1854 : * its subtransactions, if it's a toplevel transaction).
1855 : */
1856 : static void
1857 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1858 : ReorderBufferTXN *txn,
1859 : XLogRecPtr abort_lsn)
1860 : {
1861 : ReorderBufferTXN *toptxn;
1862 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1863 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1864 :
1865 : /*
1866 : * The abort should happen outside streaming block, even for streamed
1867 : * transactions. The transaction has to be marked as streamed, though.
1868 : */
1869 : Assert(!data->in_streaming);
1870 :
1871 : /* determine the toplevel transaction */
1872 52 : toptxn = rbtxn_get_toptxn(txn);
1873 :
1874 : Assert(rbtxn_is_streamed(toptxn));
1875 :
1876 52 : OutputPluginPrepareWrite(ctx, true);
1877 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1878 : txn->xact_time.abort_time, write_abort_info);
1879 :
1880 52 : OutputPluginWrite(ctx, true);
1881 :
1882 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1883 52 : }
1884 :
1885 : /*
1886 : * Notify downstream to apply the streamed transaction (along with all
1887 : * its subtransactions).
1888 : */
1889 : static void
1890 92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1891 : ReorderBufferTXN *txn,
1892 : XLogRecPtr commit_lsn)
1893 : {
1894 92 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1895 :
1896 : /*
1897 : * The commit should happen outside streaming block, even for streamed
1898 : * transactions. The transaction has to be marked as streamed, though.
1899 : */
1900 : Assert(!data->in_streaming);
1901 : Assert(rbtxn_is_streamed(txn));
1902 :
1903 92 : OutputPluginUpdateProgress(ctx, false);
1904 :
1905 92 : OutputPluginPrepareWrite(ctx, true);
1906 92 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1907 92 : OutputPluginWrite(ctx, true);
1908 :
1909 92 : cleanup_rel_sync_cache(txn->xid, true);
1910 92 : }
1911 :
1912 : /*
1913 : * PREPARE callback (for streaming two-phase commit).
1914 : *
1915 : * Notify the downstream to prepare the transaction.
1916 : */
1917 : static void
1918 28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1919 : ReorderBufferTXN *txn,
1920 : XLogRecPtr prepare_lsn)
1921 : {
1922 : Assert(rbtxn_is_streamed(txn));
1923 :
1924 28 : OutputPluginUpdateProgress(ctx, false);
1925 28 : OutputPluginPrepareWrite(ctx, true);
1926 28 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1927 28 : OutputPluginWrite(ctx, true);
1928 28 : }
1929 :
1930 : /*
1931 : * Initialize the relation schema sync cache for a decoding session.
1932 : *
1933 : * The hash table is destroyed at the end of a decoding session. While
1934 : * relcache invalidations still exist and will still be invoked, they
1935 : * will just see the null hash table global and take no action.
1936 : */
1937 : static void
1938 704 : init_rel_sync_cache(MemoryContext cachectx)
1939 : {
1940 : HASHCTL ctl;
1941 : static bool relation_callbacks_registered = false;
1942 :
1943 : /* Nothing to do if hash table already exists */
1944 704 : if (RelationSyncCache != NULL)
1945 2 : return;
1946 :
1947 : /* Make a new hash table for the cache */
1948 704 : ctl.keysize = sizeof(Oid);
1949 704 : ctl.entrysize = sizeof(RelationSyncEntry);
1950 704 : ctl.hcxt = cachectx;
1951 :
1952 704 : RelationSyncCache = hash_create("logical replication output relation cache",
1953 : 128, &ctl,
1954 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1955 :
1956 : Assert(RelationSyncCache != NULL);
1957 :
1958 : /* No more to do if we already registered callbacks */
1959 704 : if (relation_callbacks_registered)
1960 2 : return;
1961 :
1962 : /* We must update the cache entry for a relation after a relcache flush */
1963 702 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1964 :
1965 : /*
1966 : * Flush all cache entries after a pg_namespace change, in case it was a
1967 : * schema rename affecting a relation being replicated.
1968 : */
1969 702 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1970 : rel_sync_cache_publication_cb,
1971 : (Datum) 0);
1972 :
1973 : /*
1974 : * Flush all cache entries after any publication changes. (We need no
1975 : * callback entry for pg_publication, because publication_invalidation_cb
1976 : * will take care of it.)
1977 : */
1978 702 : CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1979 : rel_sync_cache_publication_cb,
1980 : (Datum) 0);
1981 702 : CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1982 : rel_sync_cache_publication_cb,
1983 : (Datum) 0);
1984 :
1985 702 : relation_callbacks_registered = true;
1986 : }
1987 :
1988 : /*
1989 : * We expect relatively small number of streamed transactions.
1990 : */
1991 : static bool
1992 351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1993 : {
1994 351882 : return list_member_xid(entry->streamed_txns, xid);
1995 : }
1996 :
1997 : /*
1998 : * Add the xid in the rel sync entry for which we have already sent the schema
1999 : * of the relation.
2000 : */
2001 : static void
2002 142 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2003 : {
2004 : MemoryContext oldctx;
2005 :
2006 142 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2007 :
2008 142 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2009 :
2010 142 : MemoryContextSwitchTo(oldctx);
2011 142 : }
2012 :
2013 : /*
2014 : * Find or create entry in the relation schema cache.
2015 : *
2016 : * This looks up publications that the given relation is directly or
2017 : * indirectly part of (the latter if it's really the relation's ancestor that
2018 : * is part of a publication) and fills up the found entry with the information
2019 : * about which operations to publish and whether to use an ancestor's schema
2020 : * when publishing.
2021 : */
2022 : static RelationSyncEntry *
2023 366806 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2024 : {
2025 : RelationSyncEntry *entry;
2026 : bool found;
2027 : MemoryContext oldctx;
2028 366806 : Oid relid = RelationGetRelid(relation);
2029 :
2030 : Assert(RelationSyncCache != NULL);
2031 :
2032 : /* Find cached relation info, creating if not found */
2033 366806 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2034 : &relid,
2035 : HASH_ENTER, &found);
2036 : Assert(entry != NULL);
2037 :
2038 : /* initialize entry, if it's new */
2039 366806 : if (!found)
2040 : {
2041 508 : entry->replicate_valid = false;
2042 508 : entry->schema_sent = false;
2043 508 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2044 508 : entry->streamed_txns = NIL;
2045 508 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2046 508 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2047 508 : entry->new_slot = NULL;
2048 508 : entry->old_slot = NULL;
2049 508 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2050 508 : entry->entry_cxt = NULL;
2051 508 : entry->publish_as_relid = InvalidOid;
2052 508 : entry->columns = NULL;
2053 508 : entry->attrmap = NULL;
2054 : }
2055 :
2056 : /* Validate the entry */
2057 366806 : if (!entry->replicate_valid)
2058 : {
2059 640 : Oid schemaId = get_rel_namespace(relid);
2060 640 : List *pubids = GetRelationPublications(relid);
2061 :
2062 : /*
2063 : * We don't acquire a lock on the namespace system table as we build
2064 : * the cache entry using a historic snapshot and all the later changes
2065 : * are absorbed while decoding WAL.
2066 : */
2067 640 : List *schemaPubids = GetSchemaPublications(schemaId);
2068 : ListCell *lc;
2069 640 : Oid publish_as_relid = relid;
2070 640 : int publish_ancestor_level = 0;
2071 640 : bool am_partition = get_rel_relispartition(relid);
2072 640 : char relkind = get_rel_relkind(relid);
2073 640 : List *rel_publications = NIL;
2074 :
2075 : /* Reload publications if needed before use. */
2076 640 : if (!publications_valid)
2077 : {
2078 312 : MemoryContextReset(data->pubctx);
2079 :
2080 312 : oldctx = MemoryContextSwitchTo(data->pubctx);
2081 312 : data->publications = LoadPublications(data->publication_names);
2082 310 : MemoryContextSwitchTo(oldctx);
2083 310 : publications_valid = true;
2084 : }
2085 :
2086 : /*
2087 : * Reset schema_sent status as the relation definition may have
2088 : * changed. Also reset pubactions to empty in case rel was dropped
2089 : * from a publication. Also free any objects that depended on the
2090 : * earlier definition.
2091 : */
2092 638 : entry->schema_sent = false;
2093 638 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2094 638 : list_free(entry->streamed_txns);
2095 638 : entry->streamed_txns = NIL;
2096 638 : bms_free(entry->columns);
2097 638 : entry->columns = NULL;
2098 638 : entry->pubactions.pubinsert = false;
2099 638 : entry->pubactions.pubupdate = false;
2100 638 : entry->pubactions.pubdelete = false;
2101 638 : entry->pubactions.pubtruncate = false;
2102 :
2103 : /*
2104 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2105 : */
2106 638 : if (entry->old_slot)
2107 : {
2108 112 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2109 :
2110 : Assert(desc->tdrefcount == -1);
2111 :
2112 112 : ExecDropSingleTupleTableSlot(entry->old_slot);
2113 :
2114 : /*
2115 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2116 : * do it now to avoid any leaks.
2117 : */
2118 112 : FreeTupleDesc(desc);
2119 : }
2120 638 : if (entry->new_slot)
2121 : {
2122 112 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2123 :
2124 : Assert(desc->tdrefcount == -1);
2125 :
2126 112 : ExecDropSingleTupleTableSlot(entry->new_slot);
2127 :
2128 : /*
2129 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2130 : * do it now to avoid any leaks.
2131 : */
2132 112 : FreeTupleDesc(desc);
2133 : }
2134 :
2135 638 : entry->old_slot = NULL;
2136 638 : entry->new_slot = NULL;
2137 :
2138 638 : if (entry->attrmap)
2139 6 : free_attrmap(entry->attrmap);
2140 638 : entry->attrmap = NULL;
2141 :
2142 : /*
2143 : * Row filter cache cleanups.
2144 : */
2145 638 : if (entry->entry_cxt)
2146 112 : MemoryContextDelete(entry->entry_cxt);
2147 :
2148 638 : entry->entry_cxt = NULL;
2149 638 : entry->estate = NULL;
2150 638 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2151 :
2152 : /*
2153 : * Build publication cache. We can't use one provided by relcache as
2154 : * relcache considers all publications that the given relation is in,
2155 : * but here we only need to consider ones that the subscriber
2156 : * requested.
2157 : */
2158 1616 : foreach(lc, data->publications)
2159 : {
2160 978 : Publication *pub = lfirst(lc);
2161 978 : bool publish = false;
2162 :
2163 : /*
2164 : * Under what relid should we publish changes in this publication?
2165 : * We'll use the top-most relid across all publications. Also
2166 : * track the ancestor level for this publication.
2167 : */
2168 978 : Oid pub_relid = relid;
2169 978 : int ancestor_level = 0;
2170 :
2171 : /*
2172 : * If this is a FOR ALL TABLES publication, pick the partition
2173 : * root and set the ancestor level accordingly.
2174 : */
2175 978 : if (pub->alltables)
2176 : {
2177 162 : publish = true;
2178 162 : if (pub->pubviaroot && am_partition)
2179 : {
2180 24 : List *ancestors = get_partition_ancestors(relid);
2181 :
2182 24 : pub_relid = llast_oid(ancestors);
2183 24 : ancestor_level = list_length(ancestors);
2184 : }
2185 : }
2186 :
2187 978 : if (!publish)
2188 : {
2189 816 : bool ancestor_published = false;
2190 :
2191 : /*
2192 : * For a partition, check if any of the ancestors are
2193 : * published. If so, note down the topmost ancestor that is
2194 : * published via this publication, which will be used as the
2195 : * relation via which to publish the partition's changes.
2196 : */
2197 816 : if (am_partition)
2198 : {
2199 : Oid ancestor;
2200 : int level;
2201 242 : List *ancestors = get_partition_ancestors(relid);
2202 :
2203 242 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2204 : ancestors,
2205 : &level);
2206 :
2207 242 : if (ancestor != InvalidOid)
2208 : {
2209 96 : ancestor_published = true;
2210 96 : if (pub->pubviaroot)
2211 : {
2212 50 : pub_relid = ancestor;
2213 50 : ancestor_level = level;
2214 : }
2215 : }
2216 : }
2217 :
2218 1286 : if (list_member_oid(pubids, pub->oid) ||
2219 928 : list_member_oid(schemaPubids, pub->oid) ||
2220 : ancestor_published)
2221 414 : publish = true;
2222 : }
2223 :
2224 : /*
2225 : * If the relation is to be published, determine actions to
2226 : * publish, and list of columns, if appropriate.
2227 : *
2228 : * Don't publish changes for partitioned tables, because
2229 : * publishing those of its partitions suffices, unless partition
2230 : * changes won't be published due to pubviaroot being set.
2231 : */
2232 978 : if (publish &&
2233 8 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2234 : {
2235 570 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2236 570 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2237 570 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2238 570 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2239 :
2240 : /*
2241 : * We want to publish the changes as the top-most ancestor
2242 : * across all publications. So we need to check if the already
2243 : * calculated level is higher than the new one. If yes, we can
2244 : * ignore the new value (as it's a child). Otherwise the new
2245 : * value is an ancestor, so we keep it.
2246 : */
2247 570 : if (publish_ancestor_level > ancestor_level)
2248 2 : continue;
2249 :
2250 : /*
2251 : * If we found an ancestor higher up in the tree, discard the
2252 : * list of publications through which we replicate it, and use
2253 : * the new ancestor.
2254 : */
2255 568 : if (publish_ancestor_level < ancestor_level)
2256 : {
2257 74 : publish_as_relid = pub_relid;
2258 74 : publish_ancestor_level = ancestor_level;
2259 :
2260 : /* reset the publication list for this relation */
2261 74 : rel_publications = NIL;
2262 : }
2263 : else
2264 : {
2265 : /* Same ancestor level, has to be the same OID. */
2266 : Assert(publish_as_relid == pub_relid);
2267 : }
2268 :
2269 : /* Track publications for this ancestor. */
2270 568 : rel_publications = lappend(rel_publications, pub);
2271 : }
2272 : }
2273 :
2274 638 : entry->publish_as_relid = publish_as_relid;
2275 :
2276 : /*
2277 : * Initialize the tuple slot, map, and row filter. These are only used
2278 : * when publishing inserts, updates, or deletes.
2279 : */
2280 638 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2281 96 : entry->pubactions.pubdelete)
2282 : {
2283 : /* Initialize the tuple slot and map */
2284 542 : init_tuple_slot(data, relation, entry);
2285 :
2286 : /* Initialize the row filter */
2287 542 : pgoutput_row_filter_init(data, rel_publications, entry);
2288 :
2289 : /* Check whether to publish generated columns. */
2290 542 : check_and_init_gencol(data, rel_publications, entry);
2291 :
2292 : /* Initialize the column list */
2293 542 : pgoutput_column_list_init(data, rel_publications, entry);
2294 : }
2295 :
2296 636 : list_free(pubids);
2297 636 : list_free(schemaPubids);
2298 636 : list_free(rel_publications);
2299 :
2300 636 : entry->replicate_valid = true;
2301 : }
2302 :
2303 366802 : return entry;
2304 : }
2305 :
2306 : /*
2307 : * Cleanup list of streamed transactions and update the schema_sent flag.
2308 : *
2309 : * When a streamed transaction commits or aborts, we need to remove the
2310 : * toplevel XID from the schema cache. If the transaction aborted, the
2311 : * subscriber will simply throw away the schema records we streamed, so
2312 : * we don't need to do anything else.
2313 : *
2314 : * If the transaction is committed, the subscriber will update the relation
2315 : * cache - so tweak the schema_sent flag accordingly.
2316 : */
2317 : static void
2318 144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2319 : {
2320 : HASH_SEQ_STATUS hash_seq;
2321 : RelationSyncEntry *entry;
2322 :
2323 : Assert(RelationSyncCache != NULL);
2324 :
2325 144 : hash_seq_init(&hash_seq, RelationSyncCache);
2326 294 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2327 : {
2328 : /*
2329 : * We can set the schema_sent flag for an entry that has committed xid
2330 : * in the list as that ensures that the subscriber would have the
2331 : * corresponding schema and we don't need to send it unless there is
2332 : * any invalidation for that relation.
2333 : */
2334 340 : foreach_xid(streamed_txn, entry->streamed_txns)
2335 : {
2336 148 : if (xid == streamed_txn)
2337 : {
2338 108 : if (is_commit)
2339 86 : entry->schema_sent = true;
2340 :
2341 108 : entry->streamed_txns =
2342 108 : foreach_delete_current(entry->streamed_txns, streamed_txn);
2343 108 : break;
2344 : }
2345 : }
2346 : }
2347 144 : }
2348 :
2349 : /*
2350 : * Relcache invalidation callback
2351 : */
2352 : static void
2353 6912 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2354 : {
2355 : RelationSyncEntry *entry;
2356 :
2357 : /*
2358 : * We can get here if the plugin was used in SQL interface as the
2359 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2360 : * no way to unregister the relcache invalidation callback.
2361 : */
2362 6912 : if (RelationSyncCache == NULL)
2363 18 : return;
2364 :
2365 : /*
2366 : * Nobody keeps pointers to entries in this hash table around outside
2367 : * logical decoding callback calls - but invalidation events can come in
2368 : * *during* a callback if we do any syscache access in the callback.
2369 : * Because of that we must mark the cache entry as invalid but not damage
2370 : * any of its substructure here. The next get_rel_sync_entry() call will
2371 : * rebuild it all.
2372 : */
2373 6894 : if (OidIsValid(relid))
2374 : {
2375 : /*
2376 : * Getting invalidations for relations that aren't in the table is
2377 : * entirely normal. So we don't care if it's found or not.
2378 : */
2379 6832 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2380 : HASH_FIND, NULL);
2381 6832 : if (entry != NULL)
2382 1204 : entry->replicate_valid = false;
2383 : }
2384 : else
2385 : {
2386 : /* Whole cache must be flushed. */
2387 : HASH_SEQ_STATUS status;
2388 :
2389 62 : hash_seq_init(&status, RelationSyncCache);
2390 166 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2391 : {
2392 104 : entry->replicate_valid = false;
2393 : }
2394 : }
2395 : }
2396 :
2397 : /*
2398 : * Publication relation/schema map syscache invalidation callback
2399 : *
2400 : * Called for invalidations on pg_publication, pg_publication_rel,
2401 : * pg_publication_namespace, and pg_namespace.
2402 : */
2403 : static void
2404 1226 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2405 : {
2406 : HASH_SEQ_STATUS status;
2407 : RelationSyncEntry *entry;
2408 :
2409 : /*
2410 : * We can get here if the plugin was used in SQL interface as the
2411 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2412 : * no way to unregister the invalidation callbacks.
2413 : */
2414 1226 : if (RelationSyncCache == NULL)
2415 68 : return;
2416 :
2417 : /*
2418 : * We have no easy way to identify which cache entries this invalidation
2419 : * event might have affected, so just mark them all invalid.
2420 : */
2421 1158 : hash_seq_init(&status, RelationSyncCache);
2422 3658 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2423 : {
2424 2500 : entry->replicate_valid = false;
2425 : }
2426 : }
2427 :
2428 : /* Send Replication origin */
2429 : static void
2430 2154 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2431 : XLogRecPtr origin_lsn, bool send_origin)
2432 : {
2433 2154 : if (send_origin)
2434 : {
2435 : char *origin;
2436 :
2437 : /*----------
2438 : * XXX: which behaviour do we want here?
2439 : *
2440 : * Alternatives:
2441 : * - don't send origin message if origin name not found
2442 : * (that's what we do now)
2443 : * - throw error - that will break replication, not good
2444 : * - send some special "unknown" origin
2445 : *----------
2446 : */
2447 20 : if (replorigin_by_oid(origin_id, true, &origin))
2448 : {
2449 : /* Message boundary */
2450 20 : OutputPluginWrite(ctx, false);
2451 20 : OutputPluginPrepareWrite(ctx, true);
2452 :
2453 20 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2454 : }
2455 : }
2456 2154 : }
|