Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "parser/parse_relation.h"
26 : #include "replication/logical.h"
27 : #include "replication/logicalproto.h"
28 : #include "replication/origin.h"
29 : #include "replication/pgoutput.h"
30 : #include "rewrite/rewriteHandler.h"
31 : #include "utils/builtins.h"
32 : #include "utils/inval.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 : #include "utils/syscache.h"
37 : #include "utils/varlena.h"
38 :
39 980 : PG_MODULE_MAGIC_EXT(
40 : .name = "pgoutput",
41 : .version = PG_VERSION
42 : );
43 :
44 : static void pgoutput_startup(LogicalDecodingContext *ctx,
45 : OutputPluginOptions *opt, bool is_init);
46 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
47 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
48 : ReorderBufferTXN *txn);
49 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
50 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
51 : static void pgoutput_change(LogicalDecodingContext *ctx,
52 : ReorderBufferTXN *txn, Relation relation,
53 : ReorderBufferChange *change);
54 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
56 : ReorderBufferChange *change);
57 : static void pgoutput_message(LogicalDecodingContext *ctx,
58 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
59 : bool transactional, const char *prefix,
60 : Size sz, const char *message);
61 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
62 : RepOriginId origin_id);
63 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
64 : ReorderBufferTXN *txn);
65 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
67 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
68 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
69 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
70 : ReorderBufferTXN *txn,
71 : XLogRecPtr prepare_end_lsn,
72 : TimestampTz prepare_time);
73 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
74 : ReorderBufferTXN *txn);
75 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
76 : ReorderBufferTXN *txn);
77 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
78 : ReorderBufferTXN *txn,
79 : XLogRecPtr abort_lsn);
80 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn,
82 : XLogRecPtr commit_lsn);
83 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
84 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
85 :
86 : static bool publications_valid;
87 :
88 : static List *LoadPublications(List *pubnames);
89 : static void publication_invalidation_cb(Datum arg, int cacheid,
90 : uint32 hashvalue);
91 : static void send_repl_origin(LogicalDecodingContext *ctx,
92 : RepOriginId origin_id, XLogRecPtr origin_lsn,
93 : bool send_origin);
94 :
95 : /*
96 : * Only 3 publication actions are used for row filtering ("insert", "update",
97 : * "delete"). See RelationSyncEntry.exprstate[].
98 : */
99 : enum RowFilterPubAction
100 : {
101 : PUBACTION_INSERT,
102 : PUBACTION_UPDATE,
103 : PUBACTION_DELETE,
104 : };
105 :
106 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107 :
108 : /*
109 : * Entry in the map used to remember which relation schemas we sent.
110 : *
111 : * The schema_sent flag determines if the current schema record for the
112 : * relation (and for its ancestor if publish_as_relid is set) was already
113 : * sent to the subscriber (in which case we don't need to send it again).
114 : *
115 : * The schema cache on downstream is however updated only at commit time,
116 : * and with streamed transactions the commit order may be different from
117 : * the order the transactions are sent in. Also, the (sub) transactions
118 : * might get aborted so we need to send the schema for each (sub) transaction
119 : * so that we don't lose the schema information on abort. For handling this,
120 : * we maintain the list of xids (streamed_txns) for those we have already sent
121 : * the schema.
122 : *
123 : * For partitions, 'pubactions' considers not only the table's own
124 : * publications, but also those of all of its ancestors.
125 : */
126 : typedef struct RelationSyncEntry
127 : {
128 : Oid relid; /* relation oid */
129 :
130 : bool replicate_valid; /* overall validity flag for entry */
131 :
132 : bool schema_sent;
133 :
134 : /*
135 : * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 : * columns and the 'publish_generated_columns' parameter is set to
137 : * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 : * indicating that no generated columns should be published, unless
139 : * explicitly specified in the column list.
140 : */
141 : PublishGencolsType include_gencols_type;
142 : List *streamed_txns; /* streamed toplevel transactions with this
143 : * schema */
144 :
145 : /* are we publishing this rel? */
146 : PublicationActions pubactions;
147 :
148 : /*
149 : * ExprState array for row filter. Different publication actions don't
150 : * allow multiple expressions to always be combined into one, because
151 : * updates or deletes restrict the column in expression to be part of the
152 : * replica identity index whereas inserts do not have this restriction, so
153 : * there is one ExprState per publication action.
154 : */
155 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
156 : EState *estate; /* executor state used for row filter */
157 : TupleTableSlot *new_slot; /* slot for storing new tuple */
158 : TupleTableSlot *old_slot; /* slot for storing old tuple */
159 :
160 : /*
161 : * OID of the relation to publish changes as. For a partition, this may
162 : * be set to one of its ancestors whose schema will be used when
163 : * replicating changes, if publish_via_partition_root is set for the
164 : * publication.
165 : */
166 : Oid publish_as_relid;
167 :
168 : /*
169 : * Map used when replicating using an ancestor's schema to convert tuples
170 : * from partition's type to the ancestor's; NULL if publish_as_relid is
171 : * same as 'relid' or if unnecessary due to partition and the ancestor
172 : * having identical TupleDesc.
173 : */
174 : AttrMap *attrmap;
175 :
176 : /*
177 : * Columns included in the publication, or NULL if all columns are
178 : * included implicitly. Note that the attnums in this bitmap are not
179 : * shifted by FirstLowInvalidHeapAttributeNumber.
180 : */
181 : Bitmapset *columns;
182 :
183 : /*
184 : * Private context to store additional data for this entry - state for the
185 : * row filter expressions, column list, etc.
186 : */
187 : MemoryContext entry_cxt;
188 : } RelationSyncEntry;
189 :
190 : /*
191 : * Maintain a per-transaction level variable to track whether the transaction
192 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 : * messages for empty transactions which saves network bandwidth.
195 : *
196 : * This optimization is not used for prepared transactions because if the
197 : * WALSender restarts after prepare of a transaction and before commit prepared
198 : * of the same transaction then we won't be able to figure out if we have
199 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 : * because we would have lost the in-memory txndata information that was
201 : * present prior to the restart. This will result in sending a spurious
202 : * COMMIT PREPARED without a corresponding prepared transaction at the
203 : * downstream which would lead to an error when it tries to process it.
204 : *
205 : * XXX We could achieve this optimization by changing protocol to send
206 : * additional information so that downstream can detect that the corresponding
207 : * prepare has not been sent. However, adding such a check for every
208 : * transaction in the downstream could be costly so we might want to do it
209 : * optionally.
210 : *
211 : * We also don't have this optimization for streamed transactions because
212 : * they can contain prepared transactions.
213 : */
214 : typedef struct PGOutputTxnData
215 : {
216 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
217 : } PGOutputTxnData;
218 :
219 : /* Map used to remember which relation schemas we sent. */
220 : static HTAB *RelationSyncCache = NULL;
221 :
222 : static void init_rel_sync_cache(MemoryContext cachectx);
223 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
224 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
225 : Relation relation);
226 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
227 : LogicalDecodingContext *ctx,
228 : RelationSyncEntry *relentry);
229 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
230 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
231 : uint32 hashvalue);
232 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
233 : TransactionId xid);
234 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
235 : TransactionId xid);
236 : static void init_tuple_slot(PGOutputData *data, Relation relation,
237 : RelationSyncEntry *entry);
238 :
239 : /* row filter routines */
240 : static EState *create_estate_for_relation(Relation rel);
241 : static void pgoutput_row_filter_init(PGOutputData *data,
242 : List *publications,
243 : RelationSyncEntry *entry);
244 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
245 : ExprContext *econtext);
246 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
247 : TupleTableSlot **new_slot_ptr,
248 : RelationSyncEntry *entry,
249 : ReorderBufferChangeType *action);
250 :
251 : /* column list routines */
252 : static void pgoutput_column_list_init(PGOutputData *data,
253 : List *publications,
254 : RelationSyncEntry *entry);
255 :
256 : /*
257 : * Specify output plugin callbacks
258 : */
259 : void
260 1348 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
261 : {
262 1348 : cb->startup_cb = pgoutput_startup;
263 1348 : cb->begin_cb = pgoutput_begin_txn;
264 1348 : cb->change_cb = pgoutput_change;
265 1348 : cb->truncate_cb = pgoutput_truncate;
266 1348 : cb->message_cb = pgoutput_message;
267 1348 : cb->commit_cb = pgoutput_commit_txn;
268 :
269 1348 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
270 1348 : cb->prepare_cb = pgoutput_prepare_txn;
271 1348 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
272 1348 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
273 1348 : cb->filter_by_origin_cb = pgoutput_origin_filter;
274 1348 : cb->shutdown_cb = pgoutput_shutdown;
275 :
276 : /* transaction streaming */
277 1348 : cb->stream_start_cb = pgoutput_stream_start;
278 1348 : cb->stream_stop_cb = pgoutput_stream_stop;
279 1348 : cb->stream_abort_cb = pgoutput_stream_abort;
280 1348 : cb->stream_commit_cb = pgoutput_stream_commit;
281 1348 : cb->stream_change_cb = pgoutput_change;
282 1348 : cb->stream_message_cb = pgoutput_message;
283 1348 : cb->stream_truncate_cb = pgoutput_truncate;
284 : /* transaction streaming - two-phase commit */
285 1348 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
286 1348 : }
287 :
288 : static void
289 724 : parse_output_parameters(List *options, PGOutputData *data)
290 : {
291 : ListCell *lc;
292 724 : bool protocol_version_given = false;
293 724 : bool publication_names_given = false;
294 724 : bool binary_option_given = false;
295 724 : bool messages_option_given = false;
296 724 : bool streaming_given = false;
297 724 : bool two_phase_option_given = false;
298 724 : bool origin_option_given = false;
299 :
300 724 : data->binary = false;
301 724 : data->streaming = LOGICALREP_STREAM_OFF;
302 724 : data->messages = false;
303 724 : data->two_phase = false;
304 :
305 3618 : foreach(lc, options)
306 : {
307 2894 : DefElem *defel = (DefElem *) lfirst(lc);
308 :
309 : Assert(defel->arg == NULL || IsA(defel->arg, String));
310 :
311 : /* Check each param, whether or not we recognize it */
312 2894 : if (strcmp(defel->defname, "proto_version") == 0)
313 : {
314 : unsigned long parsed;
315 : char *endptr;
316 :
317 724 : if (protocol_version_given)
318 0 : ereport(ERROR,
319 : (errcode(ERRCODE_SYNTAX_ERROR),
320 : errmsg("conflicting or redundant options")));
321 724 : protocol_version_given = true;
322 :
323 724 : errno = 0;
324 724 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
325 724 : if (errno != 0 || *endptr != '\0')
326 0 : ereport(ERROR,
327 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
328 : errmsg("invalid proto_version")));
329 :
330 724 : if (parsed > PG_UINT32_MAX)
331 0 : ereport(ERROR,
332 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
333 : errmsg("proto_version \"%s\" out of range",
334 : strVal(defel->arg))));
335 :
336 724 : data->protocol_version = (uint32) parsed;
337 : }
338 2170 : else if (strcmp(defel->defname, "publication_names") == 0)
339 : {
340 724 : if (publication_names_given)
341 0 : ereport(ERROR,
342 : (errcode(ERRCODE_SYNTAX_ERROR),
343 : errmsg("conflicting or redundant options")));
344 724 : publication_names_given = true;
345 :
346 724 : if (!SplitIdentifierString(strVal(defel->arg), ',',
347 : &data->publication_names))
348 0 : ereport(ERROR,
349 : (errcode(ERRCODE_INVALID_NAME),
350 : errmsg("invalid publication_names syntax")));
351 : }
352 1446 : else if (strcmp(defel->defname, "binary") == 0)
353 : {
354 22 : if (binary_option_given)
355 0 : ereport(ERROR,
356 : (errcode(ERRCODE_SYNTAX_ERROR),
357 : errmsg("conflicting or redundant options")));
358 22 : binary_option_given = true;
359 :
360 22 : data->binary = defGetBoolean(defel);
361 : }
362 1424 : else if (strcmp(defel->defname, "messages") == 0)
363 : {
364 8 : if (messages_option_given)
365 0 : ereport(ERROR,
366 : (errcode(ERRCODE_SYNTAX_ERROR),
367 : errmsg("conflicting or redundant options")));
368 8 : messages_option_given = true;
369 :
370 8 : data->messages = defGetBoolean(defel);
371 : }
372 1416 : else if (strcmp(defel->defname, "streaming") == 0)
373 : {
374 692 : if (streaming_given)
375 0 : ereport(ERROR,
376 : (errcode(ERRCODE_SYNTAX_ERROR),
377 : errmsg("conflicting or redundant options")));
378 692 : streaming_given = true;
379 :
380 692 : data->streaming = defGetStreamingMode(defel);
381 : }
382 724 : else if (strcmp(defel->defname, "two_phase") == 0)
383 : {
384 14 : if (two_phase_option_given)
385 0 : ereport(ERROR,
386 : (errcode(ERRCODE_SYNTAX_ERROR),
387 : errmsg("conflicting or redundant options")));
388 14 : two_phase_option_given = true;
389 :
390 14 : data->two_phase = defGetBoolean(defel);
391 : }
392 710 : else if (strcmp(defel->defname, "origin") == 0)
393 : {
394 : char *origin;
395 :
396 710 : if (origin_option_given)
397 0 : ereport(ERROR,
398 : errcode(ERRCODE_SYNTAX_ERROR),
399 : errmsg("conflicting or redundant options"));
400 710 : origin_option_given = true;
401 :
402 710 : origin = defGetString(defel);
403 710 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
404 26 : data->publish_no_origin = true;
405 684 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
406 684 : data->publish_no_origin = false;
407 : else
408 0 : ereport(ERROR,
409 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
410 : errmsg("unrecognized origin value: \"%s\"", origin));
411 : }
412 : else
413 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
414 : }
415 :
416 : /* Check required options */
417 724 : if (!protocol_version_given)
418 0 : ereport(ERROR,
419 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
420 : errmsg("option \"%s\" missing", "proto_version"));
421 724 : if (!publication_names_given)
422 0 : ereport(ERROR,
423 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
424 : errmsg("option \"%s\" missing", "publication_names"));
425 724 : }
426 :
427 : /*
428 : * Initialize this plugin
429 : */
430 : static void
431 1348 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
432 : bool is_init)
433 : {
434 1348 : PGOutputData *data = palloc0(sizeof(PGOutputData));
435 : static bool publication_callback_registered = false;
436 :
437 : /* Create our memory context for private allocations. */
438 1348 : data->context = AllocSetContextCreate(ctx->context,
439 : "logical replication output context",
440 : ALLOCSET_DEFAULT_SIZES);
441 :
442 1348 : data->cachectx = AllocSetContextCreate(ctx->context,
443 : "logical replication cache context",
444 : ALLOCSET_DEFAULT_SIZES);
445 :
446 1348 : data->pubctx = AllocSetContextCreate(ctx->context,
447 : "logical replication publication list context",
448 : ALLOCSET_SMALL_SIZES);
449 :
450 1348 : ctx->output_plugin_private = data;
451 :
452 : /* This plugin uses binary protocol. */
453 1348 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
454 :
455 : /*
456 : * This is replication start and not slot initialization.
457 : *
458 : * Parse and validate options passed by the client.
459 : */
460 1348 : if (!is_init)
461 : {
462 : /* Parse the params and ERROR if we see any we don't recognize */
463 724 : parse_output_parameters(ctx->output_plugin_options, data);
464 :
465 : /* Check if we support requested protocol */
466 724 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
467 0 : ereport(ERROR,
468 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
469 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
470 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
471 :
472 724 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
473 0 : ereport(ERROR,
474 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
475 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
476 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
477 :
478 : /*
479 : * Decide whether to enable streaming. It is disabled by default, in
480 : * which case we just update the flag in decoding context. Otherwise
481 : * we only allow it with sufficient version of the protocol, and when
482 : * the output plugin supports it.
483 : */
484 724 : if (data->streaming == LOGICALREP_STREAM_OFF)
485 32 : ctx->streaming = false;
486 692 : else if (data->streaming == LOGICALREP_STREAM_ON &&
487 54 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
488 0 : ereport(ERROR,
489 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
490 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
491 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
492 692 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
493 638 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
494 0 : ereport(ERROR,
495 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
496 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
497 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
498 692 : else if (!ctx->streaming)
499 0 : ereport(ERROR,
500 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
501 : errmsg("streaming requested, but not supported by output plugin")));
502 :
503 : /*
504 : * Here, we just check whether the two-phase option is passed by
505 : * plugin and decide whether to enable it at later point of time. It
506 : * remains enabled if the previous start-up has done so. But we only
507 : * allow the option to be passed in with sufficient version of the
508 : * protocol, and when the output plugin supports it.
509 : */
510 724 : if (!data->two_phase)
511 710 : ctx->twophase_opt_given = false;
512 14 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
513 0 : ereport(ERROR,
514 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
515 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
516 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
517 14 : else if (!ctx->twophase)
518 0 : ereport(ERROR,
519 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
520 : errmsg("two-phase commit requested, but not supported by output plugin")));
521 : else
522 14 : ctx->twophase_opt_given = true;
523 :
524 : /* Init publication state. */
525 724 : data->publications = NIL;
526 724 : publications_valid = false;
527 :
528 : /*
529 : * Register callback for pg_publication if we didn't already do that
530 : * during some previous call in this process.
531 : */
532 724 : if (!publication_callback_registered)
533 : {
534 722 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
535 : publication_invalidation_cb,
536 : (Datum) 0);
537 722 : CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
538 : (Datum) 0);
539 722 : publication_callback_registered = true;
540 : }
541 :
542 : /* Initialize relation schema cache. */
543 724 : init_rel_sync_cache(CacheMemoryContext);
544 : }
545 : else
546 : {
547 : /*
548 : * Disable the streaming and prepared transactions during the slot
549 : * initialization mode.
550 : */
551 624 : ctx->streaming = false;
552 624 : ctx->twophase = false;
553 : }
554 1348 : }
555 :
556 : /*
557 : * BEGIN callback.
558 : *
559 : * Don't send the BEGIN message here instead postpone it until the first
560 : * change. In logical replication, a common scenario is to replicate a set of
561 : * tables (instead of all tables) and transactions whose changes were on
562 : * the table(s) that are not published will produce empty transactions. These
563 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
564 : * using bandwidth on something with little/no use for logical replication.
565 : */
566 : static void
567 1498 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
568 : {
569 1498 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
570 : sizeof(PGOutputTxnData));
571 :
572 1498 : txn->output_plugin_private = txndata;
573 1498 : }
574 :
575 : /*
576 : * Send BEGIN.
577 : *
578 : * This is called while processing the first change of the transaction.
579 : */
580 : static void
581 836 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
582 : {
583 836 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
584 836 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
585 :
586 : Assert(txndata);
587 : Assert(!txndata->sent_begin_txn);
588 :
589 836 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
590 836 : logicalrep_write_begin(ctx->out, txn);
591 836 : txndata->sent_begin_txn = true;
592 :
593 836 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
594 : send_replication_origin);
595 :
596 836 : OutputPluginWrite(ctx, true);
597 834 : }
598 :
599 : /*
600 : * COMMIT callback
601 : */
602 : static void
603 1494 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
604 : XLogRecPtr commit_lsn)
605 : {
606 1494 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
607 : bool sent_begin_txn;
608 :
609 : Assert(txndata);
610 :
611 : /*
612 : * We don't need to send the commit message unless some relevant change
613 : * from this transaction has been sent to the downstream.
614 : */
615 1494 : sent_begin_txn = txndata->sent_begin_txn;
616 1494 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
617 1494 : pfree(txndata);
618 1494 : txn->output_plugin_private = NULL;
619 :
620 1494 : if (!sent_begin_txn)
621 : {
622 660 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
623 660 : return;
624 : }
625 :
626 834 : OutputPluginPrepareWrite(ctx, true);
627 834 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
628 834 : OutputPluginWrite(ctx, true);
629 : }
630 :
631 : /*
632 : * BEGIN PREPARE callback
633 : */
634 : static void
635 40 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
636 : {
637 40 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
638 :
639 40 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
640 40 : logicalrep_write_begin_prepare(ctx->out, txn);
641 :
642 40 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
643 : send_replication_origin);
644 :
645 40 : OutputPluginWrite(ctx, true);
646 40 : }
647 :
648 : /*
649 : * PREPARE callback
650 : */
651 : static void
652 40 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
653 : XLogRecPtr prepare_lsn)
654 : {
655 40 : OutputPluginUpdateProgress(ctx, false);
656 :
657 40 : OutputPluginPrepareWrite(ctx, true);
658 40 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
659 40 : OutputPluginWrite(ctx, true);
660 40 : }
661 :
662 : /*
663 : * COMMIT PREPARED callback
664 : */
665 : static void
666 48 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
667 : XLogRecPtr commit_lsn)
668 : {
669 48 : OutputPluginUpdateProgress(ctx, false);
670 :
671 48 : OutputPluginPrepareWrite(ctx, true);
672 48 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
673 48 : OutputPluginWrite(ctx, true);
674 48 : }
675 :
676 : /*
677 : * ROLLBACK PREPARED callback
678 : */
679 : static void
680 18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
681 : ReorderBufferTXN *txn,
682 : XLogRecPtr prepare_end_lsn,
683 : TimestampTz prepare_time)
684 : {
685 18 : OutputPluginUpdateProgress(ctx, false);
686 :
687 18 : OutputPluginPrepareWrite(ctx, true);
688 18 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
689 : prepare_time);
690 18 : OutputPluginWrite(ctx, true);
691 18 : }
692 :
693 : /*
694 : * Write the current schema of the relation and its ancestor (if any) if not
695 : * done yet.
696 : */
697 : static void
698 364484 : maybe_send_schema(LogicalDecodingContext *ctx,
699 : ReorderBufferChange *change,
700 : Relation relation, RelationSyncEntry *relentry)
701 : {
702 364484 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
703 : bool schema_sent;
704 364484 : TransactionId xid = InvalidTransactionId;
705 364484 : TransactionId topxid = InvalidTransactionId;
706 :
707 : /*
708 : * Remember XID of the (sub)transaction for the change. We don't care if
709 : * it's top-level transaction or not (we have already sent that XID in
710 : * start of the current streaming block).
711 : *
712 : * If we're not in a streaming block, just use InvalidTransactionId and
713 : * the write methods will not include it.
714 : */
715 364484 : if (data->in_streaming)
716 351882 : xid = change->txn->xid;
717 :
718 364484 : if (rbtxn_is_subtxn(change->txn))
719 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
720 : else
721 344146 : topxid = xid;
722 :
723 : /*
724 : * Do we need to send the schema? We do track streamed transactions
725 : * separately, because those may be applied later (and the regular
726 : * transactions won't see their effects until then) and in an order that
727 : * we don't know at this point.
728 : *
729 : * XXX There is a scope of optimization here. Currently, we always send
730 : * the schema first time in a streaming transaction but we can probably
731 : * avoid that by checking 'relentry->schema_sent' flag. However, before
732 : * doing that we need to study its impact on the case where we have a mix
733 : * of streaming and non-streaming transactions.
734 : */
735 364484 : if (data->in_streaming)
736 351882 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
737 : else
738 12602 : schema_sent = relentry->schema_sent;
739 :
740 : /* Nothing to do if we already sent the schema. */
741 364484 : if (schema_sent)
742 363862 : return;
743 :
744 : /*
745 : * Send the schema. If the changes will be published using an ancestor's
746 : * schema, not the relation's own, send that ancestor's schema before
747 : * sending relation's own (XXX - maybe sending only the former suffices?).
748 : */
749 622 : if (relentry->publish_as_relid != RelationGetRelid(relation))
750 : {
751 60 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
752 :
753 60 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
754 60 : RelationClose(ancestor);
755 : }
756 :
757 622 : send_relation_and_attrs(relation, xid, ctx, relentry);
758 :
759 622 : if (data->in_streaming)
760 142 : set_schema_sent_in_streamed_txn(relentry, topxid);
761 : else
762 480 : relentry->schema_sent = true;
763 : }
764 :
765 : /*
766 : * Sends a relation
767 : */
768 : static void
769 682 : send_relation_and_attrs(Relation relation, TransactionId xid,
770 : LogicalDecodingContext *ctx,
771 : RelationSyncEntry *relentry)
772 : {
773 682 : TupleDesc desc = RelationGetDescr(relation);
774 682 : Bitmapset *columns = relentry->columns;
775 682 : PublishGencolsType include_gencols_type = relentry->include_gencols_type;
776 : int i;
777 :
778 : /*
779 : * Write out type info if needed. We do that only for user-created types.
780 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
781 : * objects with hand-assigned OIDs to be "built in", not for instance any
782 : * function or type defined in the information_schema. This is important
783 : * because only hand-assigned OIDs can be expected to remain stable across
784 : * major versions.
785 : */
786 2086 : for (i = 0; i < desc->natts; i++)
787 : {
788 1404 : Form_pg_attribute att = TupleDescAttr(desc, i);
789 :
790 1404 : if (!logicalrep_should_publish_column(att, columns,
791 : include_gencols_type))
792 142 : continue;
793 :
794 1262 : if (att->atttypid < FirstGenbkiObjectId)
795 1226 : continue;
796 :
797 36 : OutputPluginPrepareWrite(ctx, false);
798 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
799 36 : OutputPluginWrite(ctx, false);
800 : }
801 :
802 682 : OutputPluginPrepareWrite(ctx, false);
803 682 : logicalrep_write_rel(ctx->out, xid, relation, columns,
804 : include_gencols_type);
805 682 : OutputPluginWrite(ctx, false);
806 682 : }
807 :
808 : /*
809 : * Executor state preparation for evaluation of row filter expressions for the
810 : * specified relation.
811 : */
812 : static EState *
813 34 : create_estate_for_relation(Relation rel)
814 : {
815 : EState *estate;
816 : RangeTblEntry *rte;
817 34 : List *perminfos = NIL;
818 :
819 34 : estate = CreateExecutorState();
820 :
821 34 : rte = makeNode(RangeTblEntry);
822 34 : rte->rtekind = RTE_RELATION;
823 34 : rte->relid = RelationGetRelid(rel);
824 34 : rte->relkind = rel->rd_rel->relkind;
825 34 : rte->rellockmode = AccessShareLock;
826 :
827 34 : addRTEPermissionInfo(&perminfos, rte);
828 :
829 34 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
830 : bms_make_singleton(1));
831 :
832 34 : estate->es_output_cid = GetCurrentCommandId(false);
833 :
834 34 : return estate;
835 : }
836 :
837 : /*
838 : * Evaluates row filter.
839 : *
840 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
841 : * isn't replicated.
842 : */
843 : static bool
844 76 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
845 : {
846 : Datum ret;
847 : bool isnull;
848 :
849 : Assert(state != NULL);
850 :
851 76 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
852 :
853 76 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
854 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
855 : isnull ? "true" : "false");
856 :
857 76 : if (isnull)
858 2 : return false;
859 :
860 74 : return DatumGetBool(ret);
861 : }
862 :
863 : /*
864 : * Make sure the per-entry memory context exists.
865 : */
866 : static void
867 582 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
868 : {
869 : Relation relation;
870 :
871 : /* The context may already exist, in which case bail out. */
872 582 : if (entry->entry_cxt)
873 34 : return;
874 :
875 548 : relation = RelationIdGetRelation(entry->publish_as_relid);
876 :
877 548 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
878 : "entry private context",
879 : ALLOCSET_SMALL_SIZES);
880 :
881 548 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
882 : RelationGetRelationName(relation));
883 : }
884 :
885 : /*
886 : * Initialize the row filter.
887 : */
888 : static void
889 548 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
890 : RelationSyncEntry *entry)
891 : {
892 : ListCell *lc;
893 548 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
894 548 : bool no_filter[] = {false, false, false}; /* One per pubaction */
895 : MemoryContext oldctx;
896 : int idx;
897 548 : bool has_filter = true;
898 548 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
899 :
900 : /*
901 : * Find if there are any row filters for this relation. If there are, then
902 : * prepare the necessary ExprState and cache it in entry->exprstate. To
903 : * build an expression state, we need to ensure the following:
904 : *
905 : * All the given publication-table mappings must be checked.
906 : *
907 : * Multiple publications might have multiple row filters for this
908 : * relation. Since row filter usage depends on the DML operation, there
909 : * are multiple lists (one for each operation) to which row filters will
910 : * be appended.
911 : *
912 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
913 : * expression" so it takes precedence.
914 : */
915 590 : foreach(lc, publications)
916 : {
917 556 : Publication *pub = lfirst(lc);
918 556 : HeapTuple rftuple = NULL;
919 556 : Datum rfdatum = 0;
920 556 : bool pub_no_filter = true;
921 :
922 : /*
923 : * If the publication is FOR ALL TABLES, or the publication includes a
924 : * FOR TABLES IN SCHEMA where the table belongs to the referred
925 : * schema, then it is treated the same as if there are no row filters
926 : * (even if other publications have a row filter).
927 : */
928 556 : if (!pub->alltables &&
929 396 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
930 : ObjectIdGetDatum(schemaid),
931 : ObjectIdGetDatum(pub->oid)))
932 : {
933 : /*
934 : * Check for the presence of a row filter in this publication.
935 : */
936 384 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
937 : ObjectIdGetDatum(entry->publish_as_relid),
938 : ObjectIdGetDatum(pub->oid));
939 :
940 384 : if (HeapTupleIsValid(rftuple))
941 : {
942 : /* Null indicates no filter. */
943 360 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
944 : Anum_pg_publication_rel_prqual,
945 : &pub_no_filter);
946 : }
947 : }
948 :
949 556 : if (pub_no_filter)
950 : {
951 528 : if (rftuple)
952 332 : ReleaseSysCache(rftuple);
953 :
954 528 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
955 528 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
956 528 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
957 :
958 : /*
959 : * Quick exit if all the DML actions are publicized via this
960 : * publication.
961 : */
962 528 : if (no_filter[PUBACTION_INSERT] &&
963 528 : no_filter[PUBACTION_UPDATE] &&
964 514 : no_filter[PUBACTION_DELETE])
965 : {
966 514 : has_filter = false;
967 514 : break;
968 : }
969 :
970 : /* No additional work for this publication. Next one. */
971 14 : continue;
972 : }
973 :
974 : /* Form the per pubaction row filter lists. */
975 28 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
976 28 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
977 28 : TextDatumGetCString(rfdatum));
978 28 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
979 28 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
980 28 : TextDatumGetCString(rfdatum));
981 28 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
982 28 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
983 28 : TextDatumGetCString(rfdatum));
984 :
985 28 : ReleaseSysCache(rftuple);
986 : } /* loop all subscribed publications */
987 :
988 : /* Clean the row filter */
989 2192 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
990 : {
991 1644 : if (no_filter[idx])
992 : {
993 1560 : list_free_deep(rfnodes[idx]);
994 1560 : rfnodes[idx] = NIL;
995 : }
996 : }
997 :
998 548 : if (has_filter)
999 : {
1000 34 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1001 :
1002 34 : pgoutput_ensure_entry_cxt(data, entry);
1003 :
1004 : /*
1005 : * Now all the filters for all pubactions are known. Combine them when
1006 : * their pubactions are the same.
1007 : */
1008 34 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1009 34 : entry->estate = create_estate_for_relation(relation);
1010 136 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1011 : {
1012 102 : List *filters = NIL;
1013 : Expr *rfnode;
1014 :
1015 102 : if (rfnodes[idx] == NIL)
1016 42 : continue;
1017 :
1018 126 : foreach(lc, rfnodes[idx])
1019 66 : filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1020 :
1021 : /* combine the row filter and cache the ExprState */
1022 60 : rfnode = make_orclause(filters);
1023 60 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1024 : } /* for each pubaction */
1025 34 : MemoryContextSwitchTo(oldctx);
1026 :
1027 34 : RelationClose(relation);
1028 : }
1029 548 : }
1030 :
1031 : /*
1032 : * If the table contains a generated column, check for any conflicting
1033 : * values of 'publish_generated_columns' parameter in the publications.
1034 : */
1035 : static void
1036 548 : check_and_init_gencol(PGOutputData *data, List *publications,
1037 : RelationSyncEntry *entry)
1038 : {
1039 548 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1040 548 : TupleDesc desc = RelationGetDescr(relation);
1041 548 : bool gencolpresent = false;
1042 548 : bool first = true;
1043 :
1044 : /* Check if there is any generated column present. */
1045 1646 : for (int i = 0; i < desc->natts; i++)
1046 : {
1047 1112 : Form_pg_attribute att = TupleDescAttr(desc, i);
1048 :
1049 1112 : if (att->attgenerated)
1050 : {
1051 14 : gencolpresent = true;
1052 14 : break;
1053 : }
1054 : }
1055 :
1056 : /* There are no generated columns to be published. */
1057 548 : if (!gencolpresent)
1058 : {
1059 534 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1060 534 : return;
1061 : }
1062 :
1063 : /*
1064 : * There may be a conflicting value for 'publish_generated_columns'
1065 : * parameter in the publications.
1066 : */
1067 44 : foreach_ptr(Publication, pub, publications)
1068 : {
1069 : /*
1070 : * The column list takes precedence over the
1071 : * 'publish_generated_columns' parameter. Those will be checked later,
1072 : * see pgoutput_column_list_init.
1073 : */
1074 16 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1075 6 : continue;
1076 :
1077 10 : if (first)
1078 : {
1079 10 : entry->include_gencols_type = pub->pubgencols_type;
1080 10 : first = false;
1081 : }
1082 0 : else if (entry->include_gencols_type != pub->pubgencols_type)
1083 0 : ereport(ERROR,
1084 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1085 : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1086 : get_namespace_name(RelationGetNamespace(relation)),
1087 : RelationGetRelationName(relation)));
1088 : }
1089 : }
1090 :
1091 : /*
1092 : * Initialize the column list.
1093 : */
1094 : static void
1095 548 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1096 : RelationSyncEntry *entry)
1097 : {
1098 : ListCell *lc;
1099 548 : bool first = true;
1100 548 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1101 548 : bool found_pub_collist = false;
1102 548 : Bitmapset *relcols = NULL;
1103 :
1104 548 : pgoutput_ensure_entry_cxt(data, entry);
1105 :
1106 : /*
1107 : * Find if there are any column lists for this relation. If there are,
1108 : * build a bitmap using the column lists.
1109 : *
1110 : * Multiple publications might have multiple column lists for this
1111 : * relation.
1112 : *
1113 : * Note that we don't support the case where the column list is different
1114 : * for the same table when combining publications. See comments atop
1115 : * fetch_table_list. But one can later change the publication so we still
1116 : * need to check all the given publication-table mappings and report an
1117 : * error if any publications have a different column list.
1118 : */
1119 1116 : foreach(lc, publications)
1120 : {
1121 570 : Publication *pub = lfirst(lc);
1122 570 : Bitmapset *cols = NULL;
1123 :
1124 : /* Retrieve the bitmap of columns for a column list publication. */
1125 570 : found_pub_collist |= check_and_fetch_column_list(pub,
1126 : entry->publish_as_relid,
1127 : entry->entry_cxt, &cols);
1128 :
1129 : /*
1130 : * For non-column list publications — e.g. TABLE (without a column
1131 : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1132 : * of the table (including generated columns when
1133 : * 'publish_generated_columns' parameter is true).
1134 : */
1135 570 : if (!cols)
1136 : {
1137 : /*
1138 : * Cache the table columns for the first publication with no
1139 : * specified column list to detect publication with a different
1140 : * column list.
1141 : */
1142 492 : if (!relcols && (list_length(publications) > 1))
1143 : {
1144 18 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1145 :
1146 18 : relcols = pub_form_cols_map(relation,
1147 : entry->include_gencols_type);
1148 18 : MemoryContextSwitchTo(oldcxt);
1149 : }
1150 :
1151 492 : cols = relcols;
1152 : }
1153 :
1154 570 : if (first)
1155 : {
1156 548 : entry->columns = cols;
1157 548 : first = false;
1158 : }
1159 22 : else if (!bms_equal(entry->columns, cols))
1160 2 : ereport(ERROR,
1161 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1162 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1163 : get_namespace_name(RelationGetNamespace(relation)),
1164 : RelationGetRelationName(relation)));
1165 : } /* loop all subscribed publications */
1166 :
1167 : /*
1168 : * If no column list publications exist, columns to be published will be
1169 : * computed later according to the 'publish_generated_columns' parameter.
1170 : */
1171 546 : if (!found_pub_collist)
1172 474 : entry->columns = NULL;
1173 :
1174 546 : RelationClose(relation);
1175 546 : }
1176 :
1177 : /*
1178 : * Initialize the slot for storing new and old tuples, and build the map that
1179 : * will be used to convert the relation's tuples into the ancestor's format.
1180 : */
1181 : static void
1182 548 : init_tuple_slot(PGOutputData *data, Relation relation,
1183 : RelationSyncEntry *entry)
1184 : {
1185 : MemoryContext oldctx;
1186 : TupleDesc oldtupdesc;
1187 : TupleDesc newtupdesc;
1188 :
1189 548 : oldctx = MemoryContextSwitchTo(data->cachectx);
1190 :
1191 : /*
1192 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1193 : * live as long as the cache remains.
1194 : */
1195 548 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1196 548 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1197 :
1198 548 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1199 548 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1200 :
1201 548 : MemoryContextSwitchTo(oldctx);
1202 :
1203 : /*
1204 : * Cache the map that will be used to convert the relation's tuples into
1205 : * the ancestor's format, if needed.
1206 : */
1207 548 : if (entry->publish_as_relid != RelationGetRelid(relation))
1208 : {
1209 70 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1210 70 : TupleDesc indesc = RelationGetDescr(relation);
1211 70 : TupleDesc outdesc = RelationGetDescr(ancestor);
1212 :
1213 : /* Map must live as long as the logical decoding context. */
1214 70 : oldctx = MemoryContextSwitchTo(data->cachectx);
1215 :
1216 70 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1217 :
1218 70 : MemoryContextSwitchTo(oldctx);
1219 70 : RelationClose(ancestor);
1220 : }
1221 548 : }
1222 :
1223 : /*
1224 : * Change is checked against the row filter if any.
1225 : *
1226 : * Returns true if the change is to be replicated, else false.
1227 : *
1228 : * For inserts, evaluate the row filter for new tuple.
1229 : * For deletes, evaluate the row filter for old tuple.
1230 : * For updates, evaluate the row filter for old and new tuple.
1231 : *
1232 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1233 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1234 : * only one of the tuples matches the row filter expression, we transform
1235 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1236 : * following rules:
1237 : *
1238 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1239 : * Case 2: old-row (no match) new row (match) -> INSERT
1240 : * Case 3: old-row (match) new-row (no match) -> DELETE
1241 : * Case 4: old-row (match) new row (match) -> UPDATE
1242 : *
1243 : * The new action is updated in the action parameter.
1244 : *
1245 : * The new slot could be updated when transforming the UPDATE into INSERT,
1246 : * because the original new tuple might not have column values from the replica
1247 : * identity.
1248 : *
1249 : * Examples:
1250 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1251 : * Since the old tuple satisfies, the initial table synchronization copied this
1252 : * row (or another method was used to guarantee that there is data
1253 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1254 : * row filter, so from a data consistency perspective, that row should be
1255 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1256 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1257 : * is undesirable because it doesn't reflect what was defined in the row filter
1258 : * expression on the publisher. This row on the subscriber would likely not be
1259 : * modified by replication again. If someone inserted a new row with the same
1260 : * old identifier, replication could stop due to a constraint violation.
1261 : *
1262 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1263 : * Since the old tuple doesn't satisfy, the initial table synchronization
1264 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1265 : * satisfy the row filter, so from a data consistency perspective, that row
1266 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1267 : * statements have no effect (it matches no row -- see
1268 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1269 : * INSERT statement and be sent to the subscriber. However, this might surprise
1270 : * someone who expects the data set to satisfy the row filter expression on the
1271 : * provider.
1272 : */
1273 : static bool
1274 364478 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1275 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1276 : ReorderBufferChangeType *action)
1277 : {
1278 : TupleDesc desc;
1279 : int i;
1280 : bool old_matched,
1281 : new_matched,
1282 : result;
1283 : TupleTableSlot *tmp_new_slot;
1284 364478 : TupleTableSlot *new_slot = *new_slot_ptr;
1285 : ExprContext *ecxt;
1286 : ExprState *filter_exprstate;
1287 :
1288 : /*
1289 : * We need this map to avoid relying on ReorderBufferChangeType enums
1290 : * having specific values.
1291 : */
1292 : static const int map_changetype_pubaction[] = {
1293 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1294 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1295 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1296 : };
1297 :
1298 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1299 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1300 : *action == REORDER_BUFFER_CHANGE_DELETE);
1301 :
1302 : Assert(new_slot || old_slot);
1303 :
1304 : /* Get the corresponding row filter */
1305 364478 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1306 :
1307 : /* Bail out if there is no row filter */
1308 364478 : if (!filter_exprstate)
1309 364410 : return true;
1310 :
1311 68 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1312 : get_namespace_name(RelationGetNamespace(relation)),
1313 : RelationGetRelationName(relation));
1314 :
1315 68 : ResetPerTupleExprContext(entry->estate);
1316 :
1317 68 : ecxt = GetPerTupleExprContext(entry->estate);
1318 :
1319 : /*
1320 : * For the following occasions where there is only one tuple, we can
1321 : * evaluate the row filter for that tuple and return.
1322 : *
1323 : * For inserts, we only have the new tuple.
1324 : *
1325 : * For updates, we can have only a new tuple when none of the replica
1326 : * identity columns changed and none of those columns have external data
1327 : * but we still need to evaluate the row filter for the new tuple as the
1328 : * existing values of those columns might not match the filter. Also,
1329 : * users can use constant expressions in the row filter, so we anyway need
1330 : * to evaluate it for the new tuple.
1331 : *
1332 : * For deletes, we only have the old tuple.
1333 : */
1334 68 : if (!new_slot || !old_slot)
1335 : {
1336 60 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1337 60 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1338 :
1339 60 : return result;
1340 : }
1341 :
1342 : /*
1343 : * Both the old and new tuples must be valid only for updates and need to
1344 : * be checked against the row filter.
1345 : */
1346 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1347 :
1348 8 : slot_getallattrs(new_slot);
1349 8 : slot_getallattrs(old_slot);
1350 :
1351 8 : tmp_new_slot = NULL;
1352 8 : desc = RelationGetDescr(relation);
1353 :
1354 : /*
1355 : * The new tuple might not have all the replica identity columns, in which
1356 : * case it needs to be copied over from the old tuple.
1357 : */
1358 24 : for (i = 0; i < desc->natts; i++)
1359 : {
1360 16 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1361 :
1362 : /*
1363 : * if the column in the new tuple or old tuple is null, nothing to do
1364 : */
1365 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1366 2 : continue;
1367 :
1368 : /*
1369 : * Unchanged toasted replica identity columns are only logged in the
1370 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1371 : * Logged) toast values are always assembled in memory and set as
1372 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1373 : */
1374 14 : if (att->attlen == -1 &&
1375 8 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1376 2 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1377 : {
1378 2 : if (!tmp_new_slot)
1379 : {
1380 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1381 2 : ExecClearTuple(tmp_new_slot);
1382 :
1383 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1384 2 : desc->natts * sizeof(Datum));
1385 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1386 2 : desc->natts * sizeof(bool));
1387 : }
1388 :
1389 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1390 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1391 : }
1392 : }
1393 :
1394 8 : ecxt->ecxt_scantuple = old_slot;
1395 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1396 :
1397 8 : if (tmp_new_slot)
1398 : {
1399 2 : ExecStoreVirtualTuple(tmp_new_slot);
1400 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1401 : }
1402 : else
1403 6 : ecxt->ecxt_scantuple = new_slot;
1404 :
1405 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1406 :
1407 : /*
1408 : * Case 1: if both tuples don't match the row filter, bailout. Send
1409 : * nothing.
1410 : */
1411 8 : if (!old_matched && !new_matched)
1412 0 : return false;
1413 :
1414 : /*
1415 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1416 : * tuple does, transform the UPDATE into INSERT.
1417 : *
1418 : * Use the newly transformed tuple that must contain the column values for
1419 : * all the replica identity columns. This is required to ensure that the
1420 : * while inserting the tuple in the downstream node, we have all the
1421 : * required column values.
1422 : */
1423 8 : if (!old_matched && new_matched)
1424 : {
1425 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1426 :
1427 4 : if (tmp_new_slot)
1428 2 : *new_slot_ptr = tmp_new_slot;
1429 : }
1430 :
1431 : /*
1432 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1433 : * doesn't, transform the UPDATE into DELETE.
1434 : *
1435 : * This transformation does not require another tuple. The Old tuple will
1436 : * be used for DELETE.
1437 : */
1438 4 : else if (old_matched && !new_matched)
1439 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1440 :
1441 : /*
1442 : * Case 4: if both tuples match the row filter, transformation isn't
1443 : * required. (*action is default UPDATE).
1444 : */
1445 :
1446 8 : return true;
1447 : }
1448 :
1449 : /*
1450 : * Sends the decoded DML over wire.
1451 : *
1452 : * This is called both in streaming and non-streaming modes.
1453 : */
1454 : static void
1455 366766 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1456 : Relation relation, ReorderBufferChange *change)
1457 : {
1458 366766 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1459 366766 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1460 : MemoryContext old;
1461 : RelationSyncEntry *relentry;
1462 366766 : TransactionId xid = InvalidTransactionId;
1463 366766 : Relation ancestor = NULL;
1464 366766 : Relation targetrel = relation;
1465 366766 : ReorderBufferChangeType action = change->action;
1466 366766 : TupleTableSlot *old_slot = NULL;
1467 366766 : TupleTableSlot *new_slot = NULL;
1468 :
1469 366766 : if (!is_publishable_relation(relation))
1470 2286 : return;
1471 :
1472 : /*
1473 : * Remember the xid for the change in streaming mode. We need to send xid
1474 : * with each change in the streaming mode so that subscriber can make
1475 : * their association and on aborts, it can discard the corresponding
1476 : * changes.
1477 : */
1478 366766 : if (data->in_streaming)
1479 351882 : xid = change->txn->xid;
1480 :
1481 366766 : relentry = get_rel_sync_entry(data, relation);
1482 :
1483 : /* First check the table filter */
1484 366764 : switch (action)
1485 : {
1486 211946 : case REORDER_BUFFER_CHANGE_INSERT:
1487 211946 : if (!relentry->pubactions.pubinsert)
1488 116 : return;
1489 211830 : break;
1490 68974 : case REORDER_BUFFER_CHANGE_UPDATE:
1491 68974 : if (!relentry->pubactions.pubupdate)
1492 88 : return;
1493 68886 : break;
1494 85844 : case REORDER_BUFFER_CHANGE_DELETE:
1495 85844 : if (!relentry->pubactions.pubdelete)
1496 2082 : return;
1497 :
1498 : /*
1499 : * This is only possible if deletes are allowed even when replica
1500 : * identity is not defined for a table. Since the DELETE action
1501 : * can't be published, we simply return.
1502 : */
1503 83762 : if (!change->data.tp.oldtuple)
1504 : {
1505 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1506 0 : return;
1507 : }
1508 83762 : break;
1509 364478 : default:
1510 : Assert(false);
1511 : }
1512 :
1513 : /* Avoid leaking memory by using and resetting our own context */
1514 364478 : old = MemoryContextSwitchTo(data->context);
1515 :
1516 : /* Switch relation if publishing via root. */
1517 364478 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1518 : {
1519 : Assert(relation->rd_rel->relispartition);
1520 136 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1521 136 : targetrel = ancestor;
1522 : }
1523 :
1524 364478 : if (change->data.tp.oldtuple)
1525 : {
1526 84024 : old_slot = relentry->old_slot;
1527 84024 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1528 :
1529 : /* Convert tuple if needed. */
1530 84024 : if (relentry->attrmap)
1531 : {
1532 10 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1533 : &TTSOpsVirtual);
1534 :
1535 10 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1536 : }
1537 : }
1538 :
1539 364478 : if (change->data.tp.newtuple)
1540 : {
1541 280716 : new_slot = relentry->new_slot;
1542 280716 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1543 :
1544 : /* Convert tuple if needed. */
1545 280716 : if (relentry->attrmap)
1546 : {
1547 38 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1548 : &TTSOpsVirtual);
1549 :
1550 38 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1551 : }
1552 : }
1553 :
1554 : /*
1555 : * Check row filter.
1556 : *
1557 : * Updates could be transformed to inserts or deletes based on the results
1558 : * of the row filter for old and new tuple.
1559 : */
1560 364478 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1561 24 : goto cleanup;
1562 :
1563 : /*
1564 : * Send BEGIN if we haven't yet.
1565 : *
1566 : * We send the BEGIN message after ensuring that we will actually send the
1567 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1568 : * transactions.
1569 : */
1570 364454 : if (txndata && !txndata->sent_begin_txn)
1571 812 : pgoutput_send_begin(ctx, txn);
1572 :
1573 : /*
1574 : * Schema should be sent using the original relation because it also sends
1575 : * the ancestor's relation.
1576 : */
1577 364452 : maybe_send_schema(ctx, change, relation, relentry);
1578 :
1579 364452 : OutputPluginPrepareWrite(ctx, true);
1580 :
1581 : /* Send the data */
1582 364452 : switch (action)
1583 : {
1584 211808 : case REORDER_BUFFER_CHANGE_INSERT:
1585 211808 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1586 211808 : data->binary, relentry->columns,
1587 : relentry->include_gencols_type);
1588 211808 : break;
1589 68880 : case REORDER_BUFFER_CHANGE_UPDATE:
1590 68880 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1591 68880 : new_slot, data->binary, relentry->columns,
1592 : relentry->include_gencols_type);
1593 68880 : break;
1594 83764 : case REORDER_BUFFER_CHANGE_DELETE:
1595 83764 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1596 83764 : data->binary, relentry->columns,
1597 : relentry->include_gencols_type);
1598 83764 : break;
1599 364452 : default:
1600 : Assert(false);
1601 : }
1602 :
1603 364452 : OutputPluginWrite(ctx, true);
1604 :
1605 364476 : cleanup:
1606 364476 : if (RelationIsValid(ancestor))
1607 : {
1608 134 : RelationClose(ancestor);
1609 134 : ancestor = NULL;
1610 : }
1611 :
1612 : /* Drop the new slots that were used to store the converted tuples. */
1613 364476 : if (relentry->attrmap)
1614 : {
1615 48 : if (old_slot)
1616 10 : ExecDropSingleTupleTableSlot(old_slot);
1617 :
1618 48 : if (new_slot)
1619 38 : ExecDropSingleTupleTableSlot(new_slot);
1620 : }
1621 :
1622 364476 : MemoryContextSwitchTo(old);
1623 364476 : MemoryContextReset(data->context);
1624 : }
1625 :
1626 : static void
1627 28 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1628 : int nrelations, Relation relations[], ReorderBufferChange *change)
1629 : {
1630 28 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1631 28 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1632 : MemoryContext old;
1633 : RelationSyncEntry *relentry;
1634 : int i;
1635 : int nrelids;
1636 : Oid *relids;
1637 28 : TransactionId xid = InvalidTransactionId;
1638 :
1639 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1640 28 : if (data->in_streaming)
1641 0 : xid = change->txn->xid;
1642 :
1643 28 : old = MemoryContextSwitchTo(data->context);
1644 :
1645 28 : relids = palloc0(nrelations * sizeof(Oid));
1646 28 : nrelids = 0;
1647 :
1648 94 : for (i = 0; i < nrelations; i++)
1649 : {
1650 66 : Relation relation = relations[i];
1651 66 : Oid relid = RelationGetRelid(relation);
1652 :
1653 66 : if (!is_publishable_relation(relation))
1654 0 : continue;
1655 :
1656 66 : relentry = get_rel_sync_entry(data, relation);
1657 :
1658 66 : if (!relentry->pubactions.pubtruncate)
1659 28 : continue;
1660 :
1661 : /*
1662 : * Don't send partitions if the publication wants to send only the
1663 : * root tables through it.
1664 : */
1665 38 : if (relation->rd_rel->relispartition &&
1666 30 : relentry->publish_as_relid != relid)
1667 6 : continue;
1668 :
1669 32 : relids[nrelids++] = relid;
1670 :
1671 : /* Send BEGIN if we haven't yet */
1672 32 : if (txndata && !txndata->sent_begin_txn)
1673 20 : pgoutput_send_begin(ctx, txn);
1674 :
1675 32 : maybe_send_schema(ctx, change, relation, relentry);
1676 : }
1677 :
1678 28 : if (nrelids > 0)
1679 : {
1680 20 : OutputPluginPrepareWrite(ctx, true);
1681 20 : logicalrep_write_truncate(ctx->out,
1682 : xid,
1683 : nrelids,
1684 : relids,
1685 20 : change->data.truncate.cascade,
1686 20 : change->data.truncate.restart_seqs);
1687 20 : OutputPluginWrite(ctx, true);
1688 : }
1689 :
1690 28 : MemoryContextSwitchTo(old);
1691 28 : MemoryContextReset(data->context);
1692 28 : }
1693 :
1694 : static void
1695 14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1696 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1697 : const char *message)
1698 : {
1699 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1700 14 : TransactionId xid = InvalidTransactionId;
1701 :
1702 14 : if (!data->messages)
1703 4 : return;
1704 :
1705 : /*
1706 : * Remember the xid for the message in streaming mode. See
1707 : * pgoutput_change.
1708 : */
1709 10 : if (data->in_streaming)
1710 0 : xid = txn->xid;
1711 :
1712 : /*
1713 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1714 : */
1715 10 : if (transactional)
1716 : {
1717 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1718 :
1719 : /* Send BEGIN if we haven't yet */
1720 4 : if (txndata && !txndata->sent_begin_txn)
1721 4 : pgoutput_send_begin(ctx, txn);
1722 : }
1723 :
1724 10 : OutputPluginPrepareWrite(ctx, true);
1725 10 : logicalrep_write_message(ctx->out,
1726 : xid,
1727 : message_lsn,
1728 : transactional,
1729 : prefix,
1730 : sz,
1731 : message);
1732 10 : OutputPluginWrite(ctx, true);
1733 : }
1734 :
1735 : /*
1736 : * Return true if the data is associated with an origin and the user has
1737 : * requested the changes that don't have an origin, false otherwise.
1738 : */
1739 : static bool
1740 982144 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1741 : RepOriginId origin_id)
1742 : {
1743 982144 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1744 :
1745 982144 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1746 140 : return true;
1747 :
1748 982004 : return false;
1749 : }
1750 :
1751 : /*
1752 : * Shutdown the output plugin.
1753 : *
1754 : * Note, we don't need to clean the data->context, data->cachectx, and
1755 : * data->pubctx as they are child contexts of the ctx->context so they
1756 : * will be cleaned up by logical decoding machinery.
1757 : */
1758 : static void
1759 1000 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1760 : {
1761 1000 : if (RelationSyncCache)
1762 : {
1763 376 : hash_destroy(RelationSyncCache);
1764 376 : RelationSyncCache = NULL;
1765 : }
1766 1000 : }
1767 :
1768 : /*
1769 : * Load publications from the list of publication names.
1770 : *
1771 : * Here, we skip the publications that don't exist yet. This will allow us
1772 : * to silently continue the replication in the absence of a missing publication.
1773 : * This is required because we allow the users to create publications after they
1774 : * have specified the required publications at the time of replication start.
1775 : */
1776 : static List *
1777 330 : LoadPublications(List *pubnames)
1778 : {
1779 330 : List *result = NIL;
1780 : ListCell *lc;
1781 :
1782 750 : foreach(lc, pubnames)
1783 : {
1784 420 : char *pubname = (char *) lfirst(lc);
1785 420 : Publication *pub = GetPublicationByName(pubname, true);
1786 :
1787 420 : if (pub)
1788 416 : result = lappend(result, pub);
1789 : else
1790 4 : ereport(WARNING,
1791 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1792 : errmsg("skipped loading publication: %s", pubname),
1793 : errdetail("The publication does not exist at this point in the WAL."),
1794 : errhint("Create the publication if it does not exist."));
1795 : }
1796 :
1797 330 : return result;
1798 : }
1799 :
1800 : /*
1801 : * Publication syscache invalidation callback.
1802 : *
1803 : * Called for invalidations on pg_publication.
1804 : */
1805 : static void
1806 556 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1807 : {
1808 556 : publications_valid = false;
1809 556 : }
1810 :
1811 : /*
1812 : * START STREAM callback
1813 : */
1814 : static void
1815 1276 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1816 : ReorderBufferTXN *txn)
1817 : {
1818 1276 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1819 1276 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1820 :
1821 : /* we can't nest streaming of transactions */
1822 : Assert(!data->in_streaming);
1823 :
1824 : /*
1825 : * If we already sent the first stream for this transaction then don't
1826 : * send the origin id in the subsequent streams.
1827 : */
1828 1276 : if (rbtxn_is_streamed(txn))
1829 1150 : send_replication_origin = false;
1830 :
1831 1276 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1832 1276 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1833 :
1834 1276 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1835 : send_replication_origin);
1836 :
1837 1276 : OutputPluginWrite(ctx, true);
1838 :
1839 : /* we're streaming a chunk of transaction now */
1840 1276 : data->in_streaming = true;
1841 1276 : }
1842 :
1843 : /*
1844 : * STOP STREAM callback
1845 : */
1846 : static void
1847 1276 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1848 : ReorderBufferTXN *txn)
1849 : {
1850 1276 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1851 :
1852 : /* we should be streaming a transaction */
1853 : Assert(data->in_streaming);
1854 :
1855 1276 : OutputPluginPrepareWrite(ctx, true);
1856 1276 : logicalrep_write_stream_stop(ctx->out);
1857 1276 : OutputPluginWrite(ctx, true);
1858 :
1859 : /* we've stopped streaming a transaction */
1860 1276 : data->in_streaming = false;
1861 1276 : }
1862 :
1863 : /*
1864 : * Notify downstream to discard the streamed transaction (along with all
1865 : * its subtransactions, if it's a toplevel transaction).
1866 : */
1867 : static void
1868 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1869 : ReorderBufferTXN *txn,
1870 : XLogRecPtr abort_lsn)
1871 : {
1872 : ReorderBufferTXN *toptxn;
1873 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1874 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1875 :
1876 : /*
1877 : * The abort should happen outside streaming block, even for streamed
1878 : * transactions. The transaction has to be marked as streamed, though.
1879 : */
1880 : Assert(!data->in_streaming);
1881 :
1882 : /* determine the toplevel transaction */
1883 52 : toptxn = rbtxn_get_toptxn(txn);
1884 :
1885 : Assert(rbtxn_is_streamed(toptxn));
1886 :
1887 52 : OutputPluginPrepareWrite(ctx, true);
1888 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1889 : txn->xact_time.abort_time, write_abort_info);
1890 :
1891 52 : OutputPluginWrite(ctx, true);
1892 :
1893 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1894 52 : }
1895 :
1896 : /*
1897 : * Notify downstream to apply the streamed transaction (along with all
1898 : * its subtransactions).
1899 : */
1900 : static void
1901 92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1902 : ReorderBufferTXN *txn,
1903 : XLogRecPtr commit_lsn)
1904 : {
1905 92 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1906 :
1907 : /*
1908 : * The commit should happen outside streaming block, even for streamed
1909 : * transactions. The transaction has to be marked as streamed, though.
1910 : */
1911 : Assert(!data->in_streaming);
1912 : Assert(rbtxn_is_streamed(txn));
1913 :
1914 92 : OutputPluginUpdateProgress(ctx, false);
1915 :
1916 92 : OutputPluginPrepareWrite(ctx, true);
1917 92 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1918 92 : OutputPluginWrite(ctx, true);
1919 :
1920 92 : cleanup_rel_sync_cache(txn->xid, true);
1921 92 : }
1922 :
1923 : /*
1924 : * PREPARE callback (for streaming two-phase commit).
1925 : *
1926 : * Notify the downstream to prepare the transaction.
1927 : */
1928 : static void
1929 28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1930 : ReorderBufferTXN *txn,
1931 : XLogRecPtr prepare_lsn)
1932 : {
1933 : Assert(rbtxn_is_streamed(txn));
1934 :
1935 28 : OutputPluginUpdateProgress(ctx, false);
1936 28 : OutputPluginPrepareWrite(ctx, true);
1937 28 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1938 28 : OutputPluginWrite(ctx, true);
1939 28 : }
1940 :
1941 : /*
1942 : * Initialize the relation schema sync cache for a decoding session.
1943 : *
1944 : * The hash table is destroyed at the end of a decoding session. While
1945 : * relcache invalidations still exist and will still be invoked, they
1946 : * will just see the null hash table global and take no action.
1947 : */
1948 : static void
1949 724 : init_rel_sync_cache(MemoryContext cachectx)
1950 : {
1951 : HASHCTL ctl;
1952 : static bool relation_callbacks_registered = false;
1953 :
1954 : /* Nothing to do if hash table already exists */
1955 724 : if (RelationSyncCache != NULL)
1956 2 : return;
1957 :
1958 : /* Make a new hash table for the cache */
1959 724 : ctl.keysize = sizeof(Oid);
1960 724 : ctl.entrysize = sizeof(RelationSyncEntry);
1961 724 : ctl.hcxt = cachectx;
1962 :
1963 724 : RelationSyncCache = hash_create("logical replication output relation cache",
1964 : 128, &ctl,
1965 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1966 :
1967 : Assert(RelationSyncCache != NULL);
1968 :
1969 : /* No more to do if we already registered callbacks */
1970 724 : if (relation_callbacks_registered)
1971 2 : return;
1972 :
1973 : /* We must update the cache entry for a relation after a relcache flush */
1974 722 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1975 :
1976 : /*
1977 : * Flush all cache entries after a pg_namespace change, in case it was a
1978 : * schema rename affecting a relation being replicated.
1979 : *
1980 : * XXX: It is not a good idea to invalidate all the relation entries in
1981 : * RelationSyncCache on schema rename. We can optimize it to invalidate
1982 : * only the required relations by either having a specific invalidation
1983 : * message containing impacted relations or by having schema information
1984 : * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
1985 : * passed to the callback.
1986 : */
1987 722 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1988 : rel_sync_cache_publication_cb,
1989 : (Datum) 0);
1990 :
1991 722 : relation_callbacks_registered = true;
1992 : }
1993 :
1994 : /*
1995 : * We expect relatively small number of streamed transactions.
1996 : */
1997 : static bool
1998 351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1999 : {
2000 351882 : return list_member_xid(entry->streamed_txns, xid);
2001 : }
2002 :
2003 : /*
2004 : * Add the xid in the rel sync entry for which we have already sent the schema
2005 : * of the relation.
2006 : */
2007 : static void
2008 142 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2009 : {
2010 : MemoryContext oldctx;
2011 :
2012 142 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2013 :
2014 142 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2015 :
2016 142 : MemoryContextSwitchTo(oldctx);
2017 142 : }
2018 :
2019 : /*
2020 : * Find or create entry in the relation schema cache.
2021 : *
2022 : * This looks up publications that the given relation is directly or
2023 : * indirectly part of (the latter if it's really the relation's ancestor that
2024 : * is part of a publication) and fills up the found entry with the information
2025 : * about which operations to publish and whether to use an ancestor's schema
2026 : * when publishing.
2027 : */
2028 : static RelationSyncEntry *
2029 366832 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2030 : {
2031 : RelationSyncEntry *entry;
2032 : bool found;
2033 : MemoryContext oldctx;
2034 366832 : Oid relid = RelationGetRelid(relation);
2035 :
2036 : Assert(RelationSyncCache != NULL);
2037 :
2038 : /* Find cached relation info, creating if not found */
2039 366832 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2040 : &relid,
2041 : HASH_ENTER, &found);
2042 : Assert(entry != NULL);
2043 :
2044 : /* initialize entry, if it's new */
2045 366832 : if (!found)
2046 : {
2047 520 : entry->replicate_valid = false;
2048 520 : entry->schema_sent = false;
2049 520 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2050 520 : entry->streamed_txns = NIL;
2051 520 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2052 520 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2053 520 : entry->new_slot = NULL;
2054 520 : entry->old_slot = NULL;
2055 520 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2056 520 : entry->entry_cxt = NULL;
2057 520 : entry->publish_as_relid = InvalidOid;
2058 520 : entry->columns = NULL;
2059 520 : entry->attrmap = NULL;
2060 : }
2061 :
2062 : /* Validate the entry */
2063 366832 : if (!entry->replicate_valid)
2064 : {
2065 654 : Oid schemaId = get_rel_namespace(relid);
2066 654 : List *pubids = GetRelationPublications(relid);
2067 :
2068 : /*
2069 : * We don't acquire a lock on the namespace system table as we build
2070 : * the cache entry using a historic snapshot and all the later changes
2071 : * are absorbed while decoding WAL.
2072 : */
2073 654 : List *schemaPubids = GetSchemaPublications(schemaId);
2074 : ListCell *lc;
2075 654 : Oid publish_as_relid = relid;
2076 654 : int publish_ancestor_level = 0;
2077 654 : bool am_partition = get_rel_relispartition(relid);
2078 654 : char relkind = get_rel_relkind(relid);
2079 654 : List *rel_publications = NIL;
2080 :
2081 : /* Reload publications if needed before use. */
2082 654 : if (!publications_valid)
2083 : {
2084 330 : MemoryContextReset(data->pubctx);
2085 :
2086 330 : oldctx = MemoryContextSwitchTo(data->pubctx);
2087 330 : data->publications = LoadPublications(data->publication_names);
2088 330 : MemoryContextSwitchTo(oldctx);
2089 330 : publications_valid = true;
2090 : }
2091 :
2092 : /*
2093 : * Reset schema_sent status as the relation definition may have
2094 : * changed. Also reset pubactions to empty in case rel was dropped
2095 : * from a publication. Also free any objects that depended on the
2096 : * earlier definition.
2097 : */
2098 654 : entry->schema_sent = false;
2099 654 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2100 654 : list_free(entry->streamed_txns);
2101 654 : entry->streamed_txns = NIL;
2102 654 : bms_free(entry->columns);
2103 654 : entry->columns = NULL;
2104 654 : entry->pubactions.pubinsert = false;
2105 654 : entry->pubactions.pubupdate = false;
2106 654 : entry->pubactions.pubdelete = false;
2107 654 : entry->pubactions.pubtruncate = false;
2108 :
2109 : /*
2110 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2111 : */
2112 654 : if (entry->old_slot)
2113 : {
2114 112 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2115 :
2116 : Assert(desc->tdrefcount == -1);
2117 :
2118 112 : ExecDropSingleTupleTableSlot(entry->old_slot);
2119 :
2120 : /*
2121 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2122 : * do it now to avoid any leaks.
2123 : */
2124 112 : FreeTupleDesc(desc);
2125 : }
2126 654 : if (entry->new_slot)
2127 : {
2128 112 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2129 :
2130 : Assert(desc->tdrefcount == -1);
2131 :
2132 112 : ExecDropSingleTupleTableSlot(entry->new_slot);
2133 :
2134 : /*
2135 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2136 : * do it now to avoid any leaks.
2137 : */
2138 112 : FreeTupleDesc(desc);
2139 : }
2140 :
2141 654 : entry->old_slot = NULL;
2142 654 : entry->new_slot = NULL;
2143 :
2144 654 : if (entry->attrmap)
2145 6 : free_attrmap(entry->attrmap);
2146 654 : entry->attrmap = NULL;
2147 :
2148 : /*
2149 : * Row filter cache cleanups.
2150 : */
2151 654 : if (entry->entry_cxt)
2152 112 : MemoryContextDelete(entry->entry_cxt);
2153 :
2154 654 : entry->entry_cxt = NULL;
2155 654 : entry->estate = NULL;
2156 654 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2157 :
2158 : /*
2159 : * Build publication cache. We can't use one provided by relcache as
2160 : * relcache considers all publications that the given relation is in,
2161 : * but here we only need to consider ones that the subscriber
2162 : * requested.
2163 : */
2164 1638 : foreach(lc, data->publications)
2165 : {
2166 984 : Publication *pub = lfirst(lc);
2167 984 : bool publish = false;
2168 :
2169 : /*
2170 : * Under what relid should we publish changes in this publication?
2171 : * We'll use the top-most relid across all publications. Also
2172 : * track the ancestor level for this publication.
2173 : */
2174 984 : Oid pub_relid = relid;
2175 984 : int ancestor_level = 0;
2176 :
2177 : /*
2178 : * If this is a FOR ALL TABLES publication, pick the partition
2179 : * root and set the ancestor level accordingly.
2180 : */
2181 984 : if (pub->alltables)
2182 : {
2183 164 : publish = true;
2184 164 : if (pub->pubviaroot && am_partition)
2185 : {
2186 22 : List *ancestors = get_partition_ancestors(relid);
2187 :
2188 22 : pub_relid = llast_oid(ancestors);
2189 22 : ancestor_level = list_length(ancestors);
2190 : }
2191 : }
2192 :
2193 984 : if (!publish)
2194 : {
2195 820 : bool ancestor_published = false;
2196 :
2197 : /*
2198 : * For a partition, check if any of the ancestors are
2199 : * published. If so, note down the topmost ancestor that is
2200 : * published via this publication, which will be used as the
2201 : * relation via which to publish the partition's changes.
2202 : */
2203 820 : if (am_partition)
2204 : {
2205 : Oid ancestor;
2206 : int level;
2207 242 : List *ancestors = get_partition_ancestors(relid);
2208 :
2209 242 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2210 : ancestors,
2211 : &level);
2212 :
2213 242 : if (ancestor != InvalidOid)
2214 : {
2215 96 : ancestor_published = true;
2216 96 : if (pub->pubviaroot)
2217 : {
2218 50 : pub_relid = ancestor;
2219 50 : ancestor_level = level;
2220 : }
2221 : }
2222 : }
2223 :
2224 1290 : if (list_member_oid(pubids, pub->oid) ||
2225 928 : list_member_oid(schemaPubids, pub->oid) ||
2226 : ancestor_published)
2227 418 : publish = true;
2228 : }
2229 :
2230 : /*
2231 : * If the relation is to be published, determine actions to
2232 : * publish, and list of columns, if appropriate.
2233 : *
2234 : * Don't publish changes for partitioned tables, because
2235 : * publishing those of its partitions suffices, unless partition
2236 : * changes won't be published due to pubviaroot being set.
2237 : */
2238 984 : if (publish &&
2239 8 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2240 : {
2241 576 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2242 576 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2243 576 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2244 576 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2245 :
2246 : /*
2247 : * We want to publish the changes as the top-most ancestor
2248 : * across all publications. So we need to check if the already
2249 : * calculated level is higher than the new one. If yes, we can
2250 : * ignore the new value (as it's a child). Otherwise the new
2251 : * value is an ancestor, so we keep it.
2252 : */
2253 576 : if (publish_ancestor_level > ancestor_level)
2254 2 : continue;
2255 :
2256 : /*
2257 : * If we found an ancestor higher up in the tree, discard the
2258 : * list of publications through which we replicate it, and use
2259 : * the new ancestor.
2260 : */
2261 574 : if (publish_ancestor_level < ancestor_level)
2262 : {
2263 72 : publish_as_relid = pub_relid;
2264 72 : publish_ancestor_level = ancestor_level;
2265 :
2266 : /* reset the publication list for this relation */
2267 72 : rel_publications = NIL;
2268 : }
2269 : else
2270 : {
2271 : /* Same ancestor level, has to be the same OID. */
2272 : Assert(publish_as_relid == pub_relid);
2273 : }
2274 :
2275 : /* Track publications for this ancestor. */
2276 574 : rel_publications = lappend(rel_publications, pub);
2277 : }
2278 : }
2279 :
2280 654 : entry->publish_as_relid = publish_as_relid;
2281 :
2282 : /*
2283 : * Initialize the tuple slot, map, and row filter. These are only used
2284 : * when publishing inserts, updates, or deletes.
2285 : */
2286 654 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2287 106 : entry->pubactions.pubdelete)
2288 : {
2289 : /* Initialize the tuple slot and map */
2290 548 : init_tuple_slot(data, relation, entry);
2291 :
2292 : /* Initialize the row filter */
2293 548 : pgoutput_row_filter_init(data, rel_publications, entry);
2294 :
2295 : /* Check whether to publish generated columns. */
2296 548 : check_and_init_gencol(data, rel_publications, entry);
2297 :
2298 : /* Initialize the column list */
2299 548 : pgoutput_column_list_init(data, rel_publications, entry);
2300 : }
2301 :
2302 652 : list_free(pubids);
2303 652 : list_free(schemaPubids);
2304 652 : list_free(rel_publications);
2305 :
2306 652 : entry->replicate_valid = true;
2307 : }
2308 :
2309 366830 : return entry;
2310 : }
2311 :
2312 : /*
2313 : * Cleanup list of streamed transactions and update the schema_sent flag.
2314 : *
2315 : * When a streamed transaction commits or aborts, we need to remove the
2316 : * toplevel XID from the schema cache. If the transaction aborted, the
2317 : * subscriber will simply throw away the schema records we streamed, so
2318 : * we don't need to do anything else.
2319 : *
2320 : * If the transaction is committed, the subscriber will update the relation
2321 : * cache - so tweak the schema_sent flag accordingly.
2322 : */
2323 : static void
2324 144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2325 : {
2326 : HASH_SEQ_STATUS hash_seq;
2327 : RelationSyncEntry *entry;
2328 :
2329 : Assert(RelationSyncCache != NULL);
2330 :
2331 144 : hash_seq_init(&hash_seq, RelationSyncCache);
2332 294 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2333 : {
2334 : /*
2335 : * We can set the schema_sent flag for an entry that has committed xid
2336 : * in the list as that ensures that the subscriber would have the
2337 : * corresponding schema and we don't need to send it unless there is
2338 : * any invalidation for that relation.
2339 : */
2340 340 : foreach_xid(streamed_txn, entry->streamed_txns)
2341 : {
2342 148 : if (xid == streamed_txn)
2343 : {
2344 108 : if (is_commit)
2345 86 : entry->schema_sent = true;
2346 :
2347 108 : entry->streamed_txns =
2348 108 : foreach_delete_current(entry->streamed_txns, streamed_txn);
2349 108 : break;
2350 : }
2351 : }
2352 : }
2353 144 : }
2354 :
2355 : /*
2356 : * Relcache invalidation callback
2357 : */
2358 : static void
2359 7062 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2360 : {
2361 : RelationSyncEntry *entry;
2362 :
2363 : /*
2364 : * We can get here if the plugin was used in SQL interface as the
2365 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2366 : * no way to unregister the relcache invalidation callback.
2367 : */
2368 7062 : if (RelationSyncCache == NULL)
2369 34 : return;
2370 :
2371 : /*
2372 : * Nobody keeps pointers to entries in this hash table around outside
2373 : * logical decoding callback calls - but invalidation events can come in
2374 : * *during* a callback if we do any syscache access in the callback.
2375 : * Because of that we must mark the cache entry as invalid but not damage
2376 : * any of its substructure here. The next get_rel_sync_entry() call will
2377 : * rebuild it all.
2378 : */
2379 7028 : if (OidIsValid(relid))
2380 : {
2381 : /*
2382 : * Getting invalidations for relations that aren't in the table is
2383 : * entirely normal. So we don't care if it's found or not.
2384 : */
2385 6920 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2386 : HASH_FIND, NULL);
2387 6920 : if (entry != NULL)
2388 1258 : entry->replicate_valid = false;
2389 : }
2390 : else
2391 : {
2392 : /* Whole cache must be flushed. */
2393 : HASH_SEQ_STATUS status;
2394 :
2395 108 : hash_seq_init(&status, RelationSyncCache);
2396 242 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2397 : {
2398 134 : entry->replicate_valid = false;
2399 : }
2400 : }
2401 : }
2402 :
2403 : /*
2404 : * Publication relation/schema map syscache invalidation callback
2405 : *
2406 : * Called for invalidations on pg_namespace.
2407 : */
2408 : static void
2409 62 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2410 : {
2411 : HASH_SEQ_STATUS status;
2412 : RelationSyncEntry *entry;
2413 :
2414 : /*
2415 : * We can get here if the plugin was used in SQL interface as the
2416 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2417 : * no way to unregister the invalidation callbacks.
2418 : */
2419 62 : if (RelationSyncCache == NULL)
2420 16 : return;
2421 :
2422 : /*
2423 : * We have no easy way to identify which cache entries this invalidation
2424 : * event might have affected, so just mark them all invalid.
2425 : */
2426 46 : hash_seq_init(&status, RelationSyncCache);
2427 88 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2428 : {
2429 42 : entry->replicate_valid = false;
2430 : }
2431 : }
2432 :
2433 : /* Send Replication origin */
2434 : static void
2435 2152 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2436 : XLogRecPtr origin_lsn, bool send_origin)
2437 : {
2438 2152 : if (send_origin)
2439 : {
2440 : char *origin;
2441 :
2442 : /*----------
2443 : * XXX: which behaviour do we want here?
2444 : *
2445 : * Alternatives:
2446 : * - don't send origin message if origin name not found
2447 : * (that's what we do now)
2448 : * - throw error - that will break replication, not good
2449 : * - send some special "unknown" origin
2450 : *----------
2451 : */
2452 20 : if (replorigin_by_oid(origin_id, true, &origin))
2453 : {
2454 : /* Message boundary */
2455 20 : OutputPluginWrite(ctx, false);
2456 20 : OutputPluginPrepareWrite(ctx, true);
2457 :
2458 20 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2459 : }
2460 : }
2461 2152 : }
|