Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "parser/parse_relation.h"
26 : #include "replication/logical.h"
27 : #include "replication/logicalproto.h"
28 : #include "replication/origin.h"
29 : #include "replication/pgoutput.h"
30 : #include "rewrite/rewriteHandler.h"
31 : #include "utils/builtins.h"
32 : #include "utils/inval.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 : #include "utils/syscache.h"
37 : #include "utils/varlena.h"
38 :
39 1062 : PG_MODULE_MAGIC_EXT(
40 : .name = "pgoutput",
41 : .version = PG_VERSION
42 : );
43 :
44 : static void pgoutput_startup(LogicalDecodingContext *ctx,
45 : OutputPluginOptions *opt, bool is_init);
46 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
47 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
48 : ReorderBufferTXN *txn);
49 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
50 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
51 : static void pgoutput_change(LogicalDecodingContext *ctx,
52 : ReorderBufferTXN *txn, Relation relation,
53 : ReorderBufferChange *change);
54 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
56 : ReorderBufferChange *change);
57 : static void pgoutput_message(LogicalDecodingContext *ctx,
58 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
59 : bool transactional, const char *prefix,
60 : Size sz, const char *message);
61 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
62 : RepOriginId origin_id);
63 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
64 : ReorderBufferTXN *txn);
65 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
67 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
68 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
69 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
70 : ReorderBufferTXN *txn,
71 : XLogRecPtr prepare_end_lsn,
72 : TimestampTz prepare_time);
73 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
74 : ReorderBufferTXN *txn);
75 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
76 : ReorderBufferTXN *txn);
77 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
78 : ReorderBufferTXN *txn,
79 : XLogRecPtr abort_lsn);
80 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn,
82 : XLogRecPtr commit_lsn);
83 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
84 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
85 :
86 : static bool publications_valid;
87 :
88 : static List *LoadPublications(List *pubnames);
89 : static void publication_invalidation_cb(Datum arg, int cacheid,
90 : uint32 hashvalue);
91 : static void send_repl_origin(LogicalDecodingContext *ctx,
92 : RepOriginId origin_id, XLogRecPtr origin_lsn,
93 : bool send_origin);
94 :
95 : /*
96 : * Only 3 publication actions are used for row filtering ("insert", "update",
97 : * "delete"). See RelationSyncEntry.exprstate[].
98 : */
99 : enum RowFilterPubAction
100 : {
101 : PUBACTION_INSERT,
102 : PUBACTION_UPDATE,
103 : PUBACTION_DELETE,
104 : };
105 :
106 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107 :
108 : /*
109 : * Entry in the map used to remember which relation schemas we sent.
110 : *
111 : * The schema_sent flag determines if the current schema record for the
112 : * relation (and for its ancestor if publish_as_relid is set) was already
113 : * sent to the subscriber (in which case we don't need to send it again).
114 : *
115 : * The schema cache on downstream is however updated only at commit time,
116 : * and with streamed transactions the commit order may be different from
117 : * the order the transactions are sent in. Also, the (sub) transactions
118 : * might get aborted so we need to send the schema for each (sub) transaction
119 : * so that we don't lose the schema information on abort. For handling this,
120 : * we maintain the list of xids (streamed_txns) for those we have already sent
121 : * the schema.
122 : *
123 : * For partitions, 'pubactions' considers not only the table's own
124 : * publications, but also those of all of its ancestors.
125 : */
126 : typedef struct RelationSyncEntry
127 : {
128 : Oid relid; /* relation oid */
129 :
130 : bool replicate_valid; /* overall validity flag for entry */
131 :
132 : bool schema_sent;
133 :
134 : /*
135 : * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 : * columns and the 'publish_generated_columns' parameter is set to
137 : * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 : * indicating that no generated columns should be published, unless
139 : * explicitly specified in the column list.
140 : */
141 : PublishGencolsType include_gencols_type;
142 : List *streamed_txns; /* streamed toplevel transactions with this
143 : * schema */
144 :
145 : /* are we publishing this rel? */
146 : PublicationActions pubactions;
147 :
148 : /*
149 : * ExprState array for row filter. Different publication actions don't
150 : * allow multiple expressions to always be combined into one, because
151 : * updates or deletes restrict the column in expression to be part of the
152 : * replica identity index whereas inserts do not have this restriction, so
153 : * there is one ExprState per publication action.
154 : */
155 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
156 : EState *estate; /* executor state used for row filter */
157 : TupleTableSlot *new_slot; /* slot for storing new tuple */
158 : TupleTableSlot *old_slot; /* slot for storing old tuple */
159 :
160 : /*
161 : * OID of the relation to publish changes as. For a partition, this may
162 : * be set to one of its ancestors whose schema will be used when
163 : * replicating changes, if publish_via_partition_root is set for the
164 : * publication.
165 : */
166 : Oid publish_as_relid;
167 :
168 : /*
169 : * Map used when replicating using an ancestor's schema to convert tuples
170 : * from partition's type to the ancestor's; NULL if publish_as_relid is
171 : * same as 'relid' or if unnecessary due to partition and the ancestor
172 : * having identical TupleDesc.
173 : */
174 : AttrMap *attrmap;
175 :
176 : /*
177 : * Columns included in the publication, or NULL if all columns are
178 : * included implicitly. Note that the attnums in this bitmap are not
179 : * shifted by FirstLowInvalidHeapAttributeNumber.
180 : */
181 : Bitmapset *columns;
182 :
183 : /*
184 : * Private context to store additional data for this entry - state for the
185 : * row filter expressions, column list, etc.
186 : */
187 : MemoryContext entry_cxt;
188 : } RelationSyncEntry;
189 :
190 : /*
191 : * Maintain a per-transaction level variable to track whether the transaction
192 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 : * messages for empty transactions which saves network bandwidth.
195 : *
196 : * This optimization is not used for prepared transactions because if the
197 : * WALSender restarts after prepare of a transaction and before commit prepared
198 : * of the same transaction then we won't be able to figure out if we have
199 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 : * because we would have lost the in-memory txndata information that was
201 : * present prior to the restart. This will result in sending a spurious
202 : * COMMIT PREPARED without a corresponding prepared transaction at the
203 : * downstream which would lead to an error when it tries to process it.
204 : *
205 : * XXX We could achieve this optimization by changing protocol to send
206 : * additional information so that downstream can detect that the corresponding
207 : * prepare has not been sent. However, adding such a check for every
208 : * transaction in the downstream could be costly so we might want to do it
209 : * optionally.
210 : *
211 : * We also don't have this optimization for streamed transactions because
212 : * they can contain prepared transactions.
213 : */
214 : typedef struct PGOutputTxnData
215 : {
216 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
217 : } PGOutputTxnData;
218 :
219 : /* Map used to remember which relation schemas we sent. */
220 : static HTAB *RelationSyncCache = NULL;
221 :
222 : static void init_rel_sync_cache(MemoryContext cachectx);
223 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
224 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
225 : Relation relation);
226 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
227 : LogicalDecodingContext *ctx,
228 : RelationSyncEntry *relentry);
229 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
230 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
231 : uint32 hashvalue);
232 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
233 : TransactionId xid);
234 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
235 : TransactionId xid);
236 : static void init_tuple_slot(PGOutputData *data, Relation relation,
237 : RelationSyncEntry *entry);
238 : static void pgoutput_memory_context_reset(void *arg);
239 :
240 : /* row filter routines */
241 : static EState *create_estate_for_relation(Relation rel);
242 : static void pgoutput_row_filter_init(PGOutputData *data,
243 : List *publications,
244 : RelationSyncEntry *entry);
245 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
246 : ExprContext *econtext);
247 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
248 : TupleTableSlot **new_slot_ptr,
249 : RelationSyncEntry *entry,
250 : ReorderBufferChangeType *action);
251 :
252 : /* column list routines */
253 : static void pgoutput_column_list_init(PGOutputData *data,
254 : List *publications,
255 : RelationSyncEntry *entry);
256 :
257 : /*
258 : * Specify output plugin callbacks
259 : */
260 : void
261 1440 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
262 : {
263 1440 : cb->startup_cb = pgoutput_startup;
264 1440 : cb->begin_cb = pgoutput_begin_txn;
265 1440 : cb->change_cb = pgoutput_change;
266 1440 : cb->truncate_cb = pgoutput_truncate;
267 1440 : cb->message_cb = pgoutput_message;
268 1440 : cb->commit_cb = pgoutput_commit_txn;
269 :
270 1440 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
271 1440 : cb->prepare_cb = pgoutput_prepare_txn;
272 1440 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
273 1440 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
274 1440 : cb->filter_by_origin_cb = pgoutput_origin_filter;
275 1440 : cb->shutdown_cb = pgoutput_shutdown;
276 :
277 : /* transaction streaming */
278 1440 : cb->stream_start_cb = pgoutput_stream_start;
279 1440 : cb->stream_stop_cb = pgoutput_stream_stop;
280 1440 : cb->stream_abort_cb = pgoutput_stream_abort;
281 1440 : cb->stream_commit_cb = pgoutput_stream_commit;
282 1440 : cb->stream_change_cb = pgoutput_change;
283 1440 : cb->stream_message_cb = pgoutput_message;
284 1440 : cb->stream_truncate_cb = pgoutput_truncate;
285 : /* transaction streaming - two-phase commit */
286 1440 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
287 1440 : }
288 :
289 : static void
290 792 : parse_output_parameters(List *options, PGOutputData *data)
291 : {
292 : ListCell *lc;
293 792 : bool protocol_version_given = false;
294 792 : bool publication_names_given = false;
295 792 : bool binary_option_given = false;
296 792 : bool messages_option_given = false;
297 792 : bool streaming_given = false;
298 792 : bool two_phase_option_given = false;
299 792 : bool origin_option_given = false;
300 :
301 : /* Initialize optional parameters to defaults */
302 792 : data->binary = false;
303 792 : data->streaming = LOGICALREP_STREAM_OFF;
304 792 : data->messages = false;
305 792 : data->two_phase = false;
306 792 : data->publish_no_origin = false;
307 :
308 3952 : foreach(lc, options)
309 : {
310 3160 : DefElem *defel = (DefElem *) lfirst(lc);
311 :
312 : Assert(defel->arg == NULL || IsA(defel->arg, String));
313 :
314 : /* Check each param, whether or not we recognize it */
315 3160 : if (strcmp(defel->defname, "proto_version") == 0)
316 : {
317 : unsigned long parsed;
318 : char *endptr;
319 :
320 792 : if (protocol_version_given)
321 0 : ereport(ERROR,
322 : (errcode(ERRCODE_SYNTAX_ERROR),
323 : errmsg("conflicting or redundant options")));
324 792 : protocol_version_given = true;
325 :
326 792 : errno = 0;
327 792 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
328 792 : if (errno != 0 || *endptr != '\0')
329 0 : ereport(ERROR,
330 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
331 : errmsg("invalid proto_version")));
332 :
333 792 : if (parsed > PG_UINT32_MAX)
334 0 : ereport(ERROR,
335 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
336 : errmsg("proto_version \"%s\" out of range",
337 : strVal(defel->arg))));
338 :
339 792 : data->protocol_version = (uint32) parsed;
340 : }
341 2368 : else if (strcmp(defel->defname, "publication_names") == 0)
342 : {
343 792 : if (publication_names_given)
344 0 : ereport(ERROR,
345 : (errcode(ERRCODE_SYNTAX_ERROR),
346 : errmsg("conflicting or redundant options")));
347 792 : publication_names_given = true;
348 :
349 792 : if (!SplitIdentifierString(strVal(defel->arg), ',',
350 : &data->publication_names))
351 0 : ereport(ERROR,
352 : (errcode(ERRCODE_INVALID_NAME),
353 : errmsg("invalid publication_names syntax")));
354 : }
355 1576 : else if (strcmp(defel->defname, "binary") == 0)
356 : {
357 22 : if (binary_option_given)
358 0 : ereport(ERROR,
359 : (errcode(ERRCODE_SYNTAX_ERROR),
360 : errmsg("conflicting or redundant options")));
361 22 : binary_option_given = true;
362 :
363 22 : data->binary = defGetBoolean(defel);
364 : }
365 1554 : else if (strcmp(defel->defname, "messages") == 0)
366 : {
367 8 : if (messages_option_given)
368 0 : ereport(ERROR,
369 : (errcode(ERRCODE_SYNTAX_ERROR),
370 : errmsg("conflicting or redundant options")));
371 8 : messages_option_given = true;
372 :
373 8 : data->messages = defGetBoolean(defel);
374 : }
375 1546 : else if (strcmp(defel->defname, "streaming") == 0)
376 : {
377 756 : if (streaming_given)
378 0 : ereport(ERROR,
379 : (errcode(ERRCODE_SYNTAX_ERROR),
380 : errmsg("conflicting or redundant options")));
381 756 : streaming_given = true;
382 :
383 756 : data->streaming = defGetStreamingMode(defel);
384 : }
385 790 : else if (strcmp(defel->defname, "two_phase") == 0)
386 : {
387 16 : if (two_phase_option_given)
388 0 : ereport(ERROR,
389 : (errcode(ERRCODE_SYNTAX_ERROR),
390 : errmsg("conflicting or redundant options")));
391 16 : two_phase_option_given = true;
392 :
393 16 : data->two_phase = defGetBoolean(defel);
394 : }
395 774 : else if (strcmp(defel->defname, "origin") == 0)
396 : {
397 : char *origin;
398 :
399 774 : if (origin_option_given)
400 0 : ereport(ERROR,
401 : errcode(ERRCODE_SYNTAX_ERROR),
402 : errmsg("conflicting or redundant options"));
403 774 : origin_option_given = true;
404 :
405 774 : origin = defGetString(defel);
406 774 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
407 52 : data->publish_no_origin = true;
408 722 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
409 722 : data->publish_no_origin = false;
410 : else
411 0 : ereport(ERROR,
412 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
413 : errmsg("unrecognized origin value: \"%s\"", origin));
414 : }
415 : else
416 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
417 : }
418 :
419 : /* Check required options */
420 792 : if (!protocol_version_given)
421 0 : ereport(ERROR,
422 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
423 : errmsg("option \"%s\" missing", "proto_version"));
424 792 : if (!publication_names_given)
425 0 : ereport(ERROR,
426 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
427 : errmsg("option \"%s\" missing", "publication_names"));
428 792 : }
429 :
430 : /*
431 : * Memory context reset callback of PGOutputData->context.
432 : */
433 : static void
434 2068 : pgoutput_memory_context_reset(void *arg)
435 : {
436 2068 : if (RelationSyncCache)
437 : {
438 386 : hash_destroy(RelationSyncCache);
439 386 : RelationSyncCache = NULL;
440 : }
441 2068 : }
442 :
443 : /*
444 : * Initialize this plugin
445 : */
446 : static void
447 1440 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
448 : bool is_init)
449 : {
450 1440 : PGOutputData *data = palloc0(sizeof(PGOutputData));
451 : static bool publication_callback_registered = false;
452 : MemoryContextCallback *mcallback;
453 :
454 : /* Create our memory context for private allocations. */
455 1440 : data->context = AllocSetContextCreate(ctx->context,
456 : "logical replication output context",
457 : ALLOCSET_DEFAULT_SIZES);
458 :
459 1440 : data->cachectx = AllocSetContextCreate(ctx->context,
460 : "logical replication cache context",
461 : ALLOCSET_DEFAULT_SIZES);
462 :
463 1440 : data->pubctx = AllocSetContextCreate(ctx->context,
464 : "logical replication publication list context",
465 : ALLOCSET_SMALL_SIZES);
466 :
467 : /*
468 : * Ensure to cleanup RelationSyncCache even when logical decoding invoked
469 : * via SQL interface ends up with an error.
470 : */
471 1440 : mcallback = palloc0(sizeof(MemoryContextCallback));
472 1440 : mcallback->func = pgoutput_memory_context_reset;
473 1440 : MemoryContextRegisterResetCallback(ctx->context, mcallback);
474 :
475 1440 : ctx->output_plugin_private = data;
476 :
477 : /* This plugin uses binary protocol. */
478 1440 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
479 :
480 : /*
481 : * This is replication start and not slot initialization.
482 : *
483 : * Parse and validate options passed by the client.
484 : */
485 1440 : if (!is_init)
486 : {
487 : /* Parse the params and ERROR if we see any we don't recognize */
488 792 : parse_output_parameters(ctx->output_plugin_options, data);
489 :
490 : /* Check if we support requested protocol */
491 792 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
492 0 : ereport(ERROR,
493 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
494 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
495 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
496 :
497 792 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
498 0 : ereport(ERROR,
499 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
500 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
501 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
502 :
503 : /*
504 : * Decide whether to enable streaming. It is disabled by default, in
505 : * which case we just update the flag in decoding context. Otherwise
506 : * we only allow it with sufficient version of the protocol, and when
507 : * the output plugin supports it.
508 : */
509 792 : if (data->streaming == LOGICALREP_STREAM_OFF)
510 36 : ctx->streaming = false;
511 756 : else if (data->streaming == LOGICALREP_STREAM_ON &&
512 54 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
513 0 : ereport(ERROR,
514 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
515 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
516 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
517 756 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
518 702 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
519 0 : ereport(ERROR,
520 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
521 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
522 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
523 756 : else if (!ctx->streaming)
524 0 : ereport(ERROR,
525 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
526 : errmsg("streaming requested, but not supported by output plugin")));
527 :
528 : /*
529 : * Here, we just check whether the two-phase option is passed by
530 : * plugin and decide whether to enable it at later point of time. It
531 : * remains enabled if the previous start-up has done so. But we only
532 : * allow the option to be passed in with sufficient version of the
533 : * protocol, and when the output plugin supports it.
534 : */
535 792 : if (!data->two_phase)
536 776 : ctx->twophase_opt_given = false;
537 16 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
538 0 : ereport(ERROR,
539 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
540 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
541 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
542 16 : else if (!ctx->twophase)
543 0 : ereport(ERROR,
544 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
545 : errmsg("two-phase commit requested, but not supported by output plugin")));
546 : else
547 16 : ctx->twophase_opt_given = true;
548 :
549 : /* Init publication state. */
550 792 : data->publications = NIL;
551 792 : publications_valid = false;
552 :
553 : /*
554 : * Register callback for pg_publication if we didn't already do that
555 : * during some previous call in this process.
556 : */
557 792 : if (!publication_callback_registered)
558 : {
559 788 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
560 : publication_invalidation_cb,
561 : (Datum) 0);
562 788 : CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
563 : (Datum) 0);
564 788 : publication_callback_registered = true;
565 : }
566 :
567 : /* Initialize relation schema cache. */
568 792 : init_rel_sync_cache(CacheMemoryContext);
569 : }
570 : else
571 : {
572 : /*
573 : * Disable the streaming and prepared transactions during the slot
574 : * initialization mode.
575 : */
576 648 : ctx->streaming = false;
577 648 : ctx->twophase = false;
578 : }
579 1440 : }
580 :
581 : /*
582 : * BEGIN callback.
583 : *
584 : * Don't send the BEGIN message here instead postpone it until the first
585 : * change. In logical replication, a common scenario is to replicate a set of
586 : * tables (instead of all tables) and transactions whose changes were on
587 : * the table(s) that are not published will produce empty transactions. These
588 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
589 : * using bandwidth on something with little/no use for logical replication.
590 : */
591 : static void
592 1648 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
593 : {
594 1648 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
595 : sizeof(PGOutputTxnData));
596 :
597 1648 : txn->output_plugin_private = txndata;
598 1648 : }
599 :
600 : /*
601 : * Send BEGIN.
602 : *
603 : * This is called while processing the first change of the transaction.
604 : */
605 : static void
606 882 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
607 : {
608 882 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
609 882 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
610 :
611 : Assert(txndata);
612 : Assert(!txndata->sent_begin_txn);
613 :
614 882 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
615 882 : logicalrep_write_begin(ctx->out, txn);
616 882 : txndata->sent_begin_txn = true;
617 :
618 882 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
619 : send_replication_origin);
620 :
621 882 : OutputPluginWrite(ctx, true);
622 880 : }
623 :
624 : /*
625 : * COMMIT callback
626 : */
627 : static void
628 1642 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
629 : XLogRecPtr commit_lsn)
630 : {
631 1642 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
632 : bool sent_begin_txn;
633 :
634 : Assert(txndata);
635 :
636 : /*
637 : * We don't need to send the commit message unless some relevant change
638 : * from this transaction has been sent to the downstream.
639 : */
640 1642 : sent_begin_txn = txndata->sent_begin_txn;
641 1642 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
642 1642 : pfree(txndata);
643 1642 : txn->output_plugin_private = NULL;
644 :
645 1642 : if (!sent_begin_txn)
646 : {
647 764 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
648 764 : return;
649 : }
650 :
651 878 : OutputPluginPrepareWrite(ctx, true);
652 878 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
653 878 : OutputPluginWrite(ctx, true);
654 : }
655 :
656 : /*
657 : * BEGIN PREPARE callback
658 : */
659 : static void
660 42 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
661 : {
662 42 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
663 :
664 42 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
665 42 : logicalrep_write_begin_prepare(ctx->out, txn);
666 :
667 42 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
668 : send_replication_origin);
669 :
670 42 : OutputPluginWrite(ctx, true);
671 42 : }
672 :
673 : /*
674 : * PREPARE callback
675 : */
676 : static void
677 42 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
678 : XLogRecPtr prepare_lsn)
679 : {
680 42 : OutputPluginUpdateProgress(ctx, false);
681 :
682 42 : OutputPluginPrepareWrite(ctx, true);
683 42 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
684 42 : OutputPluginWrite(ctx, true);
685 42 : }
686 :
687 : /*
688 : * COMMIT PREPARED callback
689 : */
690 : static void
691 48 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
692 : XLogRecPtr commit_lsn)
693 : {
694 48 : OutputPluginUpdateProgress(ctx, false);
695 :
696 48 : OutputPluginPrepareWrite(ctx, true);
697 48 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
698 48 : OutputPluginWrite(ctx, true);
699 48 : }
700 :
701 : /*
702 : * ROLLBACK PREPARED callback
703 : */
704 : static void
705 16 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
706 : ReorderBufferTXN *txn,
707 : XLogRecPtr prepare_end_lsn,
708 : TimestampTz prepare_time)
709 : {
710 16 : OutputPluginUpdateProgress(ctx, false);
711 :
712 16 : OutputPluginPrepareWrite(ctx, true);
713 16 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
714 : prepare_time);
715 16 : OutputPluginWrite(ctx, true);
716 16 : }
717 :
718 : /*
719 : * Write the current schema of the relation and its ancestor (if any) if not
720 : * done yet.
721 : */
722 : static void
723 364462 : maybe_send_schema(LogicalDecodingContext *ctx,
724 : ReorderBufferChange *change,
725 : Relation relation, RelationSyncEntry *relentry)
726 : {
727 364462 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
728 : bool schema_sent;
729 364462 : TransactionId xid = InvalidTransactionId;
730 364462 : TransactionId topxid = InvalidTransactionId;
731 :
732 : /*
733 : * Remember XID of the (sub)transaction for the change. We don't care if
734 : * it's top-level transaction or not (we have already sent that XID in
735 : * start of the current streaming block).
736 : *
737 : * If we're not in a streaming block, just use InvalidTransactionId and
738 : * the write methods will not include it.
739 : */
740 364462 : if (data->in_streaming)
741 351842 : xid = change->txn->xid;
742 :
743 364462 : if (rbtxn_is_subtxn(change->txn))
744 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
745 : else
746 344124 : topxid = xid;
747 :
748 : /*
749 : * Do we need to send the schema? We do track streamed transactions
750 : * separately, because those may be applied later (and the regular
751 : * transactions won't see their effects until then) and in an order that
752 : * we don't know at this point.
753 : *
754 : * XXX There is a scope of optimization here. Currently, we always send
755 : * the schema first time in a streaming transaction but we can probably
756 : * avoid that by checking 'relentry->schema_sent' flag. However, before
757 : * doing that we need to study its impact on the case where we have a mix
758 : * of streaming and non-streaming transactions.
759 : */
760 364462 : if (data->in_streaming)
761 351842 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
762 : else
763 12620 : schema_sent = relentry->schema_sent;
764 :
765 : /* Nothing to do if we already sent the schema. */
766 364462 : if (schema_sent)
767 363806 : return;
768 :
769 : /*
770 : * Send the schema. If the changes will be published using an ancestor's
771 : * schema, not the relation's own, send that ancestor's schema before
772 : * sending relation's own (XXX - maybe sending only the former suffices?).
773 : */
774 656 : if (relentry->publish_as_relid != RelationGetRelid(relation))
775 : {
776 60 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
777 :
778 60 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
779 60 : RelationClose(ancestor);
780 : }
781 :
782 656 : send_relation_and_attrs(relation, xid, ctx, relentry);
783 :
784 656 : if (data->in_streaming)
785 134 : set_schema_sent_in_streamed_txn(relentry, topxid);
786 : else
787 522 : relentry->schema_sent = true;
788 : }
789 :
790 : /*
791 : * Sends a relation
792 : */
793 : static void
794 716 : send_relation_and_attrs(Relation relation, TransactionId xid,
795 : LogicalDecodingContext *ctx,
796 : RelationSyncEntry *relentry)
797 : {
798 716 : TupleDesc desc = RelationGetDescr(relation);
799 716 : Bitmapset *columns = relentry->columns;
800 716 : PublishGencolsType include_gencols_type = relentry->include_gencols_type;
801 : int i;
802 :
803 : /*
804 : * Write out type info if needed. We do that only for user-created types.
805 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
806 : * objects with hand-assigned OIDs to be "built in", not for instance any
807 : * function or type defined in the information_schema. This is important
808 : * because only hand-assigned OIDs can be expected to remain stable across
809 : * major versions.
810 : */
811 2208 : for (i = 0; i < desc->natts; i++)
812 : {
813 1492 : Form_pg_attribute att = TupleDescAttr(desc, i);
814 :
815 1492 : if (!logicalrep_should_publish_column(att, columns,
816 : include_gencols_type))
817 142 : continue;
818 :
819 1350 : if (att->atttypid < FirstGenbkiObjectId)
820 1314 : continue;
821 :
822 36 : OutputPluginPrepareWrite(ctx, false);
823 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
824 36 : OutputPluginWrite(ctx, false);
825 : }
826 :
827 716 : OutputPluginPrepareWrite(ctx, false);
828 716 : logicalrep_write_rel(ctx->out, xid, relation, columns,
829 : include_gencols_type);
830 716 : OutputPluginWrite(ctx, false);
831 716 : }
832 :
833 : /*
834 : * Executor state preparation for evaluation of row filter expressions for the
835 : * specified relation.
836 : */
837 : static EState *
838 34 : create_estate_for_relation(Relation rel)
839 : {
840 : EState *estate;
841 : RangeTblEntry *rte;
842 34 : List *perminfos = NIL;
843 :
844 34 : estate = CreateExecutorState();
845 :
846 34 : rte = makeNode(RangeTblEntry);
847 34 : rte->rtekind = RTE_RELATION;
848 34 : rte->relid = RelationGetRelid(rel);
849 34 : rte->relkind = rel->rd_rel->relkind;
850 34 : rte->rellockmode = AccessShareLock;
851 :
852 34 : addRTEPermissionInfo(&perminfos, rte);
853 :
854 34 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
855 : bms_make_singleton(1));
856 :
857 34 : estate->es_output_cid = GetCurrentCommandId(false);
858 :
859 34 : return estate;
860 : }
861 :
862 : /*
863 : * Evaluates row filter.
864 : *
865 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
866 : * isn't replicated.
867 : */
868 : static bool
869 76 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
870 : {
871 : Datum ret;
872 : bool isnull;
873 :
874 : Assert(state != NULL);
875 :
876 76 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
877 :
878 76 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
879 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
880 : isnull ? "true" : "false");
881 :
882 76 : if (isnull)
883 2 : return false;
884 :
885 74 : return DatumGetBool(ret);
886 : }
887 :
888 : /*
889 : * Make sure the per-entry memory context exists.
890 : */
891 : static void
892 622 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
893 : {
894 : Relation relation;
895 :
896 : /* The context may already exist, in which case bail out. */
897 622 : if (entry->entry_cxt)
898 34 : return;
899 :
900 588 : relation = RelationIdGetRelation(entry->publish_as_relid);
901 :
902 588 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
903 : "entry private context",
904 : ALLOCSET_SMALL_SIZES);
905 :
906 588 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
907 : RelationGetRelationName(relation));
908 : }
909 :
910 : /*
911 : * Initialize the row filter.
912 : */
913 : static void
914 588 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
915 : RelationSyncEntry *entry)
916 : {
917 : ListCell *lc;
918 588 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
919 588 : bool no_filter[] = {false, false, false}; /* One per pubaction */
920 : MemoryContext oldctx;
921 : int idx;
922 588 : bool has_filter = true;
923 588 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
924 :
925 : /*
926 : * Find if there are any row filters for this relation. If there are, then
927 : * prepare the necessary ExprState and cache it in entry->exprstate. To
928 : * build an expression state, we need to ensure the following:
929 : *
930 : * All the given publication-table mappings must be checked.
931 : *
932 : * Multiple publications might have multiple row filters for this
933 : * relation. Since row filter usage depends on the DML operation, there
934 : * are multiple lists (one for each operation) to which row filters will
935 : * be appended.
936 : *
937 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
938 : * expression" so it takes precedence.
939 : */
940 630 : foreach(lc, publications)
941 : {
942 596 : Publication *pub = lfirst(lc);
943 596 : HeapTuple rftuple = NULL;
944 596 : Datum rfdatum = 0;
945 596 : bool pub_no_filter = true;
946 :
947 : /*
948 : * If the publication is FOR ALL TABLES, or the publication includes a
949 : * FOR TABLES IN SCHEMA where the table belongs to the referred
950 : * schema, then it is treated the same as if there are no row filters
951 : * (even if other publications have a row filter).
952 : */
953 596 : if (!pub->alltables &&
954 442 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
955 : ObjectIdGetDatum(schemaid),
956 : ObjectIdGetDatum(pub->oid)))
957 : {
958 : /*
959 : * Check for the presence of a row filter in this publication.
960 : */
961 430 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
962 : ObjectIdGetDatum(entry->publish_as_relid),
963 : ObjectIdGetDatum(pub->oid));
964 :
965 430 : if (HeapTupleIsValid(rftuple))
966 : {
967 : /* Null indicates no filter. */
968 406 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
969 : Anum_pg_publication_rel_prqual,
970 : &pub_no_filter);
971 : }
972 : }
973 :
974 596 : if (pub_no_filter)
975 : {
976 568 : if (rftuple)
977 378 : ReleaseSysCache(rftuple);
978 :
979 568 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
980 568 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
981 568 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
982 :
983 : /*
984 : * Quick exit if all the DML actions are publicized via this
985 : * publication.
986 : */
987 568 : if (no_filter[PUBACTION_INSERT] &&
988 568 : no_filter[PUBACTION_UPDATE] &&
989 554 : no_filter[PUBACTION_DELETE])
990 : {
991 554 : has_filter = false;
992 554 : break;
993 : }
994 :
995 : /* No additional work for this publication. Next one. */
996 14 : continue;
997 : }
998 :
999 : /* Form the per pubaction row filter lists. */
1000 28 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
1001 28 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
1002 28 : TextDatumGetCString(rfdatum));
1003 28 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
1004 28 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
1005 28 : TextDatumGetCString(rfdatum));
1006 28 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
1007 28 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
1008 28 : TextDatumGetCString(rfdatum));
1009 :
1010 28 : ReleaseSysCache(rftuple);
1011 : } /* loop all subscribed publications */
1012 :
1013 : /* Clean the row filter */
1014 2352 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1015 : {
1016 1764 : if (no_filter[idx])
1017 : {
1018 1680 : list_free_deep(rfnodes[idx]);
1019 1680 : rfnodes[idx] = NIL;
1020 : }
1021 : }
1022 :
1023 588 : if (has_filter)
1024 : {
1025 34 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1026 :
1027 34 : pgoutput_ensure_entry_cxt(data, entry);
1028 :
1029 : /*
1030 : * Now all the filters for all pubactions are known. Combine them when
1031 : * their pubactions are the same.
1032 : */
1033 34 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1034 34 : entry->estate = create_estate_for_relation(relation);
1035 136 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1036 : {
1037 102 : List *filters = NIL;
1038 : Expr *rfnode;
1039 :
1040 102 : if (rfnodes[idx] == NIL)
1041 42 : continue;
1042 :
1043 126 : foreach(lc, rfnodes[idx])
1044 66 : filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1045 :
1046 : /* combine the row filter and cache the ExprState */
1047 60 : rfnode = make_orclause(filters);
1048 60 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1049 : } /* for each pubaction */
1050 34 : MemoryContextSwitchTo(oldctx);
1051 :
1052 34 : RelationClose(relation);
1053 : }
1054 588 : }
1055 :
1056 : /*
1057 : * If the table contains a generated column, check for any conflicting
1058 : * values of 'publish_generated_columns' parameter in the publications.
1059 : */
1060 : static void
1061 588 : check_and_init_gencol(PGOutputData *data, List *publications,
1062 : RelationSyncEntry *entry)
1063 : {
1064 588 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1065 588 : TupleDesc desc = RelationGetDescr(relation);
1066 588 : bool gencolpresent = false;
1067 588 : bool first = true;
1068 :
1069 : /* Check if there is any generated column present. */
1070 1786 : for (int i = 0; i < desc->natts; i++)
1071 : {
1072 1212 : Form_pg_attribute att = TupleDescAttr(desc, i);
1073 :
1074 1212 : if (att->attgenerated)
1075 : {
1076 14 : gencolpresent = true;
1077 14 : break;
1078 : }
1079 : }
1080 :
1081 : /* There are no generated columns to be published. */
1082 588 : if (!gencolpresent)
1083 : {
1084 574 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1085 574 : return;
1086 : }
1087 :
1088 : /*
1089 : * There may be a conflicting value for 'publish_generated_columns'
1090 : * parameter in the publications.
1091 : */
1092 44 : foreach_ptr(Publication, pub, publications)
1093 : {
1094 : /*
1095 : * The column list takes precedence over the
1096 : * 'publish_generated_columns' parameter. Those will be checked later,
1097 : * see pgoutput_column_list_init.
1098 : */
1099 16 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1100 6 : continue;
1101 :
1102 10 : if (first)
1103 : {
1104 10 : entry->include_gencols_type = pub->pubgencols_type;
1105 10 : first = false;
1106 : }
1107 0 : else if (entry->include_gencols_type != pub->pubgencols_type)
1108 0 : ereport(ERROR,
1109 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1110 : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1111 : get_namespace_name(RelationGetNamespace(relation)),
1112 : RelationGetRelationName(relation)));
1113 : }
1114 : }
1115 :
1116 : /*
1117 : * Initialize the column list.
1118 : */
1119 : static void
1120 588 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1121 : RelationSyncEntry *entry)
1122 : {
1123 : ListCell *lc;
1124 588 : bool first = true;
1125 588 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1126 588 : bool found_pub_collist = false;
1127 588 : Bitmapset *relcols = NULL;
1128 :
1129 588 : pgoutput_ensure_entry_cxt(data, entry);
1130 :
1131 : /*
1132 : * Find if there are any column lists for this relation. If there are,
1133 : * build a bitmap using the column lists.
1134 : *
1135 : * Multiple publications might have multiple column lists for this
1136 : * relation.
1137 : *
1138 : * Note that we don't support the case where the column list is different
1139 : * for the same table when combining publications. See comments atop
1140 : * fetch_table_list. But one can later change the publication so we still
1141 : * need to check all the given publication-table mappings and report an
1142 : * error if any publications have a different column list.
1143 : */
1144 1196 : foreach(lc, publications)
1145 : {
1146 610 : Publication *pub = lfirst(lc);
1147 610 : Bitmapset *cols = NULL;
1148 :
1149 : /* Retrieve the bitmap of columns for a column list publication. */
1150 610 : found_pub_collist |= check_and_fetch_column_list(pub,
1151 : entry->publish_as_relid,
1152 : entry->entry_cxt, &cols);
1153 :
1154 : /*
1155 : * For non-column list publications — e.g. TABLE (without a column
1156 : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1157 : * of the table (including generated columns when
1158 : * 'publish_generated_columns' parameter is true).
1159 : */
1160 610 : if (!cols)
1161 : {
1162 : /*
1163 : * Cache the table columns for the first publication with no
1164 : * specified column list to detect publication with a different
1165 : * column list.
1166 : */
1167 532 : if (!relcols && (list_length(publications) > 1))
1168 : {
1169 18 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1170 :
1171 18 : relcols = pub_form_cols_map(relation,
1172 : entry->include_gencols_type);
1173 18 : MemoryContextSwitchTo(oldcxt);
1174 : }
1175 :
1176 532 : cols = relcols;
1177 : }
1178 :
1179 610 : if (first)
1180 : {
1181 588 : entry->columns = cols;
1182 588 : first = false;
1183 : }
1184 22 : else if (!bms_equal(entry->columns, cols))
1185 2 : ereport(ERROR,
1186 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1187 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1188 : get_namespace_name(RelationGetNamespace(relation)),
1189 : RelationGetRelationName(relation)));
1190 : } /* loop all subscribed publications */
1191 :
1192 : /*
1193 : * If no column list publications exist, columns to be published will be
1194 : * computed later according to the 'publish_generated_columns' parameter.
1195 : */
1196 586 : if (!found_pub_collist)
1197 514 : entry->columns = NULL;
1198 :
1199 586 : RelationClose(relation);
1200 586 : }
1201 :
1202 : /*
1203 : * Initialize the slot for storing new and old tuples, and build the map that
1204 : * will be used to convert the relation's tuples into the ancestor's format.
1205 : */
1206 : static void
1207 588 : init_tuple_slot(PGOutputData *data, Relation relation,
1208 : RelationSyncEntry *entry)
1209 : {
1210 : MemoryContext oldctx;
1211 : TupleDesc oldtupdesc;
1212 : TupleDesc newtupdesc;
1213 :
1214 588 : oldctx = MemoryContextSwitchTo(data->cachectx);
1215 :
1216 : /*
1217 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1218 : * live as long as the cache remains.
1219 : */
1220 588 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1221 588 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1222 :
1223 588 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1224 588 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1225 :
1226 588 : MemoryContextSwitchTo(oldctx);
1227 :
1228 : /*
1229 : * Cache the map that will be used to convert the relation's tuples into
1230 : * the ancestor's format, if needed.
1231 : */
1232 588 : if (entry->publish_as_relid != RelationGetRelid(relation))
1233 : {
1234 70 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1235 70 : TupleDesc indesc = RelationGetDescr(relation);
1236 70 : TupleDesc outdesc = RelationGetDescr(ancestor);
1237 :
1238 : /* Map must live as long as the logical decoding context. */
1239 70 : oldctx = MemoryContextSwitchTo(data->cachectx);
1240 :
1241 70 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1242 :
1243 70 : MemoryContextSwitchTo(oldctx);
1244 70 : RelationClose(ancestor);
1245 : }
1246 588 : }
1247 :
1248 : /*
1249 : * Change is checked against the row filter if any.
1250 : *
1251 : * Returns true if the change is to be replicated, else false.
1252 : *
1253 : * For inserts, evaluate the row filter for new tuple.
1254 : * For deletes, evaluate the row filter for old tuple.
1255 : * For updates, evaluate the row filter for old and new tuple.
1256 : *
1257 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1258 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1259 : * only one of the tuples matches the row filter expression, we transform
1260 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1261 : * following rules:
1262 : *
1263 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1264 : * Case 2: old-row (no match) new row (match) -> INSERT
1265 : * Case 3: old-row (match) new-row (no match) -> DELETE
1266 : * Case 4: old-row (match) new row (match) -> UPDATE
1267 : *
1268 : * The new action is updated in the action parameter.
1269 : *
1270 : * The new slot could be updated when transforming the UPDATE into INSERT,
1271 : * because the original new tuple might not have column values from the replica
1272 : * identity.
1273 : *
1274 : * Examples:
1275 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1276 : * Since the old tuple satisfies, the initial table synchronization copied this
1277 : * row (or another method was used to guarantee that there is data
1278 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1279 : * row filter, so from a data consistency perspective, that row should be
1280 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1281 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1282 : * is undesirable because it doesn't reflect what was defined in the row filter
1283 : * expression on the publisher. This row on the subscriber would likely not be
1284 : * modified by replication again. If someone inserted a new row with the same
1285 : * old identifier, replication could stop due to a constraint violation.
1286 : *
1287 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1288 : * Since the old tuple doesn't satisfy, the initial table synchronization
1289 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1290 : * satisfy the row filter, so from a data consistency perspective, that row
1291 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1292 : * statements have no effect (it matches no row -- see
1293 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1294 : * INSERT statement and be sent to the subscriber. However, this might surprise
1295 : * someone who expects the data set to satisfy the row filter expression on the
1296 : * provider.
1297 : */
1298 : static bool
1299 364454 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1300 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1301 : ReorderBufferChangeType *action)
1302 : {
1303 : TupleDesc desc;
1304 : int i;
1305 : bool old_matched,
1306 : new_matched,
1307 : result;
1308 : TupleTableSlot *tmp_new_slot;
1309 364454 : TupleTableSlot *new_slot = *new_slot_ptr;
1310 : ExprContext *ecxt;
1311 : ExprState *filter_exprstate;
1312 :
1313 : /*
1314 : * We need this map to avoid relying on ReorderBufferChangeType enums
1315 : * having specific values.
1316 : */
1317 : static const int map_changetype_pubaction[] = {
1318 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1319 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1320 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1321 : };
1322 :
1323 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1324 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1325 : *action == REORDER_BUFFER_CHANGE_DELETE);
1326 :
1327 : Assert(new_slot || old_slot);
1328 :
1329 : /* Get the corresponding row filter */
1330 364454 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1331 :
1332 : /* Bail out if there is no row filter */
1333 364454 : if (!filter_exprstate)
1334 364386 : return true;
1335 :
1336 68 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1337 : get_namespace_name(RelationGetNamespace(relation)),
1338 : RelationGetRelationName(relation));
1339 :
1340 68 : ResetPerTupleExprContext(entry->estate);
1341 :
1342 68 : ecxt = GetPerTupleExprContext(entry->estate);
1343 :
1344 : /*
1345 : * For the following occasions where there is only one tuple, we can
1346 : * evaluate the row filter for that tuple and return.
1347 : *
1348 : * For inserts, we only have the new tuple.
1349 : *
1350 : * For updates, we can have only a new tuple when none of the replica
1351 : * identity columns changed and none of those columns have external data
1352 : * but we still need to evaluate the row filter for the new tuple as the
1353 : * existing values of those columns might not match the filter. Also,
1354 : * users can use constant expressions in the row filter, so we anyway need
1355 : * to evaluate it for the new tuple.
1356 : *
1357 : * For deletes, we only have the old tuple.
1358 : */
1359 68 : if (!new_slot || !old_slot)
1360 : {
1361 60 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1362 60 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1363 :
1364 60 : return result;
1365 : }
1366 :
1367 : /*
1368 : * Both the old and new tuples must be valid only for updates and need to
1369 : * be checked against the row filter.
1370 : */
1371 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1372 :
1373 8 : slot_getallattrs(new_slot);
1374 8 : slot_getallattrs(old_slot);
1375 :
1376 8 : tmp_new_slot = NULL;
1377 8 : desc = RelationGetDescr(relation);
1378 :
1379 : /*
1380 : * The new tuple might not have all the replica identity columns, in which
1381 : * case it needs to be copied over from the old tuple.
1382 : */
1383 24 : for (i = 0; i < desc->natts; i++)
1384 : {
1385 16 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1386 :
1387 : /*
1388 : * if the column in the new tuple or old tuple is null, nothing to do
1389 : */
1390 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1391 2 : continue;
1392 :
1393 : /*
1394 : * Unchanged toasted replica identity columns are only logged in the
1395 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1396 : * Logged) toast values are always assembled in memory and set as
1397 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1398 : */
1399 22 : if (att->attlen == -1 &&
1400 8 : VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
1401 2 : !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
1402 : {
1403 2 : if (!tmp_new_slot)
1404 : {
1405 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1406 2 : ExecClearTuple(tmp_new_slot);
1407 :
1408 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1409 2 : desc->natts * sizeof(Datum));
1410 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1411 2 : desc->natts * sizeof(bool));
1412 : }
1413 :
1414 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1415 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1416 : }
1417 : }
1418 :
1419 8 : ecxt->ecxt_scantuple = old_slot;
1420 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1421 :
1422 8 : if (tmp_new_slot)
1423 : {
1424 2 : ExecStoreVirtualTuple(tmp_new_slot);
1425 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1426 : }
1427 : else
1428 6 : ecxt->ecxt_scantuple = new_slot;
1429 :
1430 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1431 :
1432 : /*
1433 : * Case 1: if both tuples don't match the row filter, bailout. Send
1434 : * nothing.
1435 : */
1436 8 : if (!old_matched && !new_matched)
1437 0 : return false;
1438 :
1439 : /*
1440 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1441 : * tuple does, transform the UPDATE into INSERT.
1442 : *
1443 : * Use the newly transformed tuple that must contain the column values for
1444 : * all the replica identity columns. This is required to ensure that the
1445 : * while inserting the tuple in the downstream node, we have all the
1446 : * required column values.
1447 : */
1448 8 : if (!old_matched && new_matched)
1449 : {
1450 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1451 :
1452 4 : if (tmp_new_slot)
1453 2 : *new_slot_ptr = tmp_new_slot;
1454 : }
1455 :
1456 : /*
1457 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1458 : * doesn't, transform the UPDATE into DELETE.
1459 : *
1460 : * This transformation does not require another tuple. The Old tuple will
1461 : * be used for DELETE.
1462 : */
1463 4 : else if (old_matched && !new_matched)
1464 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1465 :
1466 : /*
1467 : * Case 4: if both tuples match the row filter, transformation isn't
1468 : * required. (*action is default UPDATE).
1469 : */
1470 :
1471 8 : return true;
1472 : }
1473 :
1474 : /*
1475 : * Sends the decoded DML over wire.
1476 : *
1477 : * This is called both in streaming and non-streaming modes.
1478 : */
1479 : static void
1480 366760 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1481 : Relation relation, ReorderBufferChange *change)
1482 : {
1483 366760 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1484 366760 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1485 : MemoryContext old;
1486 : RelationSyncEntry *relentry;
1487 366760 : TransactionId xid = InvalidTransactionId;
1488 366760 : Relation ancestor = NULL;
1489 366760 : Relation targetrel = relation;
1490 366760 : ReorderBufferChangeType action = change->action;
1491 366760 : TupleTableSlot *old_slot = NULL;
1492 366760 : TupleTableSlot *new_slot = NULL;
1493 :
1494 366760 : if (!is_publishable_relation(relation))
1495 2304 : return;
1496 :
1497 : /*
1498 : * Remember the xid for the change in streaming mode. We need to send xid
1499 : * with each change in the streaming mode so that subscriber can make
1500 : * their association and on aborts, it can discard the corresponding
1501 : * changes.
1502 : */
1503 366760 : if (data->in_streaming)
1504 351842 : xid = change->txn->xid;
1505 :
1506 366760 : relentry = get_rel_sync_entry(data, relation);
1507 :
1508 : /* First check the table filter */
1509 366758 : switch (action)
1510 : {
1511 211946 : case REORDER_BUFFER_CHANGE_INSERT:
1512 211946 : if (!relentry->pubactions.pubinsert)
1513 132 : return;
1514 211814 : break;
1515 68970 : case REORDER_BUFFER_CHANGE_UPDATE:
1516 68970 : if (!relentry->pubactions.pubupdate)
1517 88 : return;
1518 68882 : break;
1519 85842 : case REORDER_BUFFER_CHANGE_DELETE:
1520 85842 : if (!relentry->pubactions.pubdelete)
1521 2084 : return;
1522 :
1523 : /*
1524 : * This is only possible if deletes are allowed even when replica
1525 : * identity is not defined for a table. Since the DELETE action
1526 : * can't be published, we simply return.
1527 : */
1528 83758 : if (!change->data.tp.oldtuple)
1529 : {
1530 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1531 0 : return;
1532 : }
1533 83758 : break;
1534 364454 : default:
1535 : Assert(false);
1536 : }
1537 :
1538 : /* Avoid leaking memory by using and resetting our own context */
1539 364454 : old = MemoryContextSwitchTo(data->context);
1540 :
1541 : /* Switch relation if publishing via root. */
1542 364454 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1543 : {
1544 : Assert(relation->rd_rel->relispartition);
1545 136 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1546 136 : targetrel = ancestor;
1547 : }
1548 :
1549 364454 : if (change->data.tp.oldtuple)
1550 : {
1551 84026 : old_slot = relentry->old_slot;
1552 84026 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1553 :
1554 : /* Convert tuple if needed. */
1555 84026 : if (relentry->attrmap)
1556 : {
1557 10 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1558 : &TTSOpsVirtual);
1559 :
1560 10 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1561 : }
1562 : }
1563 :
1564 364454 : if (change->data.tp.newtuple)
1565 : {
1566 280696 : new_slot = relentry->new_slot;
1567 280696 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1568 :
1569 : /* Convert tuple if needed. */
1570 280696 : if (relentry->attrmap)
1571 : {
1572 38 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1573 : &TTSOpsVirtual);
1574 :
1575 38 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1576 : }
1577 : }
1578 :
1579 : /*
1580 : * Check row filter.
1581 : *
1582 : * Updates could be transformed to inserts or deletes based on the results
1583 : * of the row filter for old and new tuple.
1584 : */
1585 364454 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1586 24 : goto cleanup;
1587 :
1588 : /*
1589 : * Send BEGIN if we haven't yet.
1590 : *
1591 : * We send the BEGIN message after ensuring that we will actually send the
1592 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1593 : * transactions.
1594 : */
1595 364430 : if (txndata && !txndata->sent_begin_txn)
1596 856 : pgoutput_send_begin(ctx, txn);
1597 :
1598 : /*
1599 : * Schema should be sent using the original relation because it also sends
1600 : * the ancestor's relation.
1601 : */
1602 364428 : maybe_send_schema(ctx, change, relation, relentry);
1603 :
1604 364428 : OutputPluginPrepareWrite(ctx, true);
1605 :
1606 : /* Send the data */
1607 364428 : switch (action)
1608 : {
1609 211792 : case REORDER_BUFFER_CHANGE_INSERT:
1610 211792 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1611 211792 : data->binary, relentry->columns,
1612 : relentry->include_gencols_type);
1613 211792 : break;
1614 68876 : case REORDER_BUFFER_CHANGE_UPDATE:
1615 68876 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1616 68876 : new_slot, data->binary, relentry->columns,
1617 : relentry->include_gencols_type);
1618 68876 : break;
1619 83760 : case REORDER_BUFFER_CHANGE_DELETE:
1620 83760 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1621 83760 : data->binary, relentry->columns,
1622 : relentry->include_gencols_type);
1623 83760 : break;
1624 364428 : default:
1625 : Assert(false);
1626 : }
1627 :
1628 364428 : OutputPluginWrite(ctx, true);
1629 :
1630 364450 : cleanup:
1631 364450 : if (RelationIsValid(ancestor))
1632 : {
1633 134 : RelationClose(ancestor);
1634 134 : ancestor = NULL;
1635 : }
1636 :
1637 : /* Drop the new slots that were used to store the converted tuples. */
1638 364450 : if (relentry->attrmap)
1639 : {
1640 48 : if (old_slot)
1641 10 : ExecDropSingleTupleTableSlot(old_slot);
1642 :
1643 48 : if (new_slot)
1644 38 : ExecDropSingleTupleTableSlot(new_slot);
1645 : }
1646 :
1647 364450 : MemoryContextSwitchTo(old);
1648 364450 : MemoryContextReset(data->context);
1649 : }
1650 :
1651 : static void
1652 30 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1653 : int nrelations, Relation relations[], ReorderBufferChange *change)
1654 : {
1655 30 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1656 30 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1657 : MemoryContext old;
1658 : RelationSyncEntry *relentry;
1659 : int i;
1660 : int nrelids;
1661 : Oid *relids;
1662 30 : TransactionId xid = InvalidTransactionId;
1663 :
1664 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1665 30 : if (data->in_streaming)
1666 0 : xid = change->txn->xid;
1667 :
1668 30 : old = MemoryContextSwitchTo(data->context);
1669 :
1670 30 : relids = palloc0(nrelations * sizeof(Oid));
1671 30 : nrelids = 0;
1672 :
1673 98 : for (i = 0; i < nrelations; i++)
1674 : {
1675 68 : Relation relation = relations[i];
1676 68 : Oid relid = RelationGetRelid(relation);
1677 :
1678 68 : if (!is_publishable_relation(relation))
1679 0 : continue;
1680 :
1681 68 : relentry = get_rel_sync_entry(data, relation);
1682 :
1683 68 : if (!relentry->pubactions.pubtruncate)
1684 28 : continue;
1685 :
1686 : /*
1687 : * Don't send partitions if the publication wants to send only the
1688 : * root tables through it.
1689 : */
1690 40 : if (relation->rd_rel->relispartition &&
1691 30 : relentry->publish_as_relid != relid)
1692 6 : continue;
1693 :
1694 34 : relids[nrelids++] = relid;
1695 :
1696 : /* Send BEGIN if we haven't yet */
1697 34 : if (txndata && !txndata->sent_begin_txn)
1698 22 : pgoutput_send_begin(ctx, txn);
1699 :
1700 34 : maybe_send_schema(ctx, change, relation, relentry);
1701 : }
1702 :
1703 30 : if (nrelids > 0)
1704 : {
1705 22 : OutputPluginPrepareWrite(ctx, true);
1706 22 : logicalrep_write_truncate(ctx->out,
1707 : xid,
1708 : nrelids,
1709 : relids,
1710 22 : change->data.truncate.cascade,
1711 22 : change->data.truncate.restart_seqs);
1712 22 : OutputPluginWrite(ctx, true);
1713 : }
1714 :
1715 30 : MemoryContextSwitchTo(old);
1716 30 : MemoryContextReset(data->context);
1717 30 : }
1718 :
1719 : static void
1720 14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1721 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1722 : const char *message)
1723 : {
1724 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1725 14 : TransactionId xid = InvalidTransactionId;
1726 :
1727 14 : if (!data->messages)
1728 4 : return;
1729 :
1730 : /*
1731 : * Remember the xid for the message in streaming mode. See
1732 : * pgoutput_change.
1733 : */
1734 10 : if (data->in_streaming)
1735 0 : xid = txn->xid;
1736 :
1737 : /*
1738 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1739 : */
1740 10 : if (transactional)
1741 : {
1742 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1743 :
1744 : /* Send BEGIN if we haven't yet */
1745 4 : if (txndata && !txndata->sent_begin_txn)
1746 4 : pgoutput_send_begin(ctx, txn);
1747 : }
1748 :
1749 10 : OutputPluginPrepareWrite(ctx, true);
1750 10 : logicalrep_write_message(ctx->out,
1751 : xid,
1752 : message_lsn,
1753 : transactional,
1754 : prefix,
1755 : sz,
1756 : message);
1757 10 : OutputPluginWrite(ctx, true);
1758 : }
1759 :
1760 : /*
1761 : * Return true if the data is associated with an origin and the user has
1762 : * requested the changes that don't have an origin, false otherwise.
1763 : */
1764 : static bool
1765 983050 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1766 : RepOriginId origin_id)
1767 : {
1768 983050 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1769 :
1770 983050 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1771 296 : return true;
1772 :
1773 982754 : return false;
1774 : }
1775 :
1776 : /*
1777 : * Shutdown the output plugin.
1778 : *
1779 : * Note, we don't need to clean the data->context, data->cachectx, and
1780 : * data->pubctx as they are child contexts of the ctx->context so they
1781 : * will be cleaned up by logical decoding machinery.
1782 : */
1783 : static void
1784 1034 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1785 : {
1786 1034 : pgoutput_memory_context_reset(NULL);
1787 1034 : }
1788 :
1789 : /*
1790 : * Load publications from the list of publication names.
1791 : *
1792 : * Here, we skip the publications that don't exist yet. This will allow us
1793 : * to silently continue the replication in the absence of a missing publication.
1794 : * This is required because we allow the users to create publications after they
1795 : * have specified the required publications at the time of replication start.
1796 : */
1797 : static List *
1798 376 : LoadPublications(List *pubnames)
1799 : {
1800 376 : List *result = NIL;
1801 : ListCell *lc;
1802 :
1803 842 : foreach(lc, pubnames)
1804 : {
1805 466 : char *pubname = (char *) lfirst(lc);
1806 466 : Publication *pub = GetPublicationByName(pubname, true);
1807 :
1808 466 : if (pub)
1809 462 : result = lappend(result, pub);
1810 : else
1811 4 : ereport(WARNING,
1812 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1813 : errmsg("skipped loading publication \"%s\"", pubname),
1814 : errdetail("The publication does not exist at this point in the WAL."),
1815 : errhint("Create the publication if it does not exist."));
1816 : }
1817 :
1818 376 : return result;
1819 : }
1820 :
1821 : /*
1822 : * Publication syscache invalidation callback.
1823 : *
1824 : * Called for invalidations on pg_publication.
1825 : */
1826 : static void
1827 616 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1828 : {
1829 616 : publications_valid = false;
1830 616 : }
1831 :
1832 : /*
1833 : * START STREAM callback
1834 : */
1835 : static void
1836 1234 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1837 : ReorderBufferTXN *txn)
1838 : {
1839 1234 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1840 1234 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1841 :
1842 : /* we can't nest streaming of transactions */
1843 : Assert(!data->in_streaming);
1844 :
1845 : /*
1846 : * If we already sent the first stream for this transaction then don't
1847 : * send the origin id in the subsequent streams.
1848 : */
1849 1234 : if (rbtxn_is_streamed(txn))
1850 1116 : send_replication_origin = false;
1851 :
1852 1234 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1853 1234 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1854 :
1855 1234 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1856 : send_replication_origin);
1857 :
1858 1234 : OutputPluginWrite(ctx, true);
1859 :
1860 : /* we're streaming a chunk of transaction now */
1861 1234 : data->in_streaming = true;
1862 1234 : }
1863 :
1864 : /*
1865 : * STOP STREAM callback
1866 : */
1867 : static void
1868 1234 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1869 : ReorderBufferTXN *txn)
1870 : {
1871 1234 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1872 :
1873 : /* we should be streaming a transaction */
1874 : Assert(data->in_streaming);
1875 :
1876 1234 : OutputPluginPrepareWrite(ctx, true);
1877 1234 : logicalrep_write_stream_stop(ctx->out);
1878 1234 : OutputPluginWrite(ctx, true);
1879 :
1880 : /* we've stopped streaming a transaction */
1881 1234 : data->in_streaming = false;
1882 1234 : }
1883 :
1884 : /*
1885 : * Notify downstream to discard the streamed transaction (along with all
1886 : * its subtransactions, if it's a toplevel transaction).
1887 : */
1888 : static void
1889 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1890 : ReorderBufferTXN *txn,
1891 : XLogRecPtr abort_lsn)
1892 : {
1893 : ReorderBufferTXN *toptxn;
1894 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1895 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1896 :
1897 : /*
1898 : * The abort should happen outside streaming block, even for streamed
1899 : * transactions. The transaction has to be marked as streamed, though.
1900 : */
1901 : Assert(!data->in_streaming);
1902 :
1903 : /* determine the toplevel transaction */
1904 52 : toptxn = rbtxn_get_toptxn(txn);
1905 :
1906 : Assert(rbtxn_is_streamed(toptxn));
1907 :
1908 52 : OutputPluginPrepareWrite(ctx, true);
1909 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1910 : txn->abort_time, write_abort_info);
1911 :
1912 52 : OutputPluginWrite(ctx, true);
1913 :
1914 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1915 52 : }
1916 :
1917 : /*
1918 : * Notify downstream to apply the streamed transaction (along with all
1919 : * its subtransactions).
1920 : */
1921 : static void
1922 90 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1923 : ReorderBufferTXN *txn,
1924 : XLogRecPtr commit_lsn)
1925 : {
1926 90 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1927 :
1928 : /*
1929 : * The commit should happen outside streaming block, even for streamed
1930 : * transactions. The transaction has to be marked as streamed, though.
1931 : */
1932 : Assert(!data->in_streaming);
1933 : Assert(rbtxn_is_streamed(txn));
1934 :
1935 90 : OutputPluginUpdateProgress(ctx, false);
1936 :
1937 90 : OutputPluginPrepareWrite(ctx, true);
1938 90 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1939 90 : OutputPluginWrite(ctx, true);
1940 :
1941 90 : cleanup_rel_sync_cache(txn->xid, true);
1942 90 : }
1943 :
1944 : /*
1945 : * PREPARE callback (for streaming two-phase commit).
1946 : *
1947 : * Notify the downstream to prepare the transaction.
1948 : */
1949 : static void
1950 22 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1951 : ReorderBufferTXN *txn,
1952 : XLogRecPtr prepare_lsn)
1953 : {
1954 : Assert(rbtxn_is_streamed(txn));
1955 :
1956 22 : OutputPluginUpdateProgress(ctx, false);
1957 22 : OutputPluginPrepareWrite(ctx, true);
1958 22 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1959 22 : OutputPluginWrite(ctx, true);
1960 22 : }
1961 :
1962 : /*
1963 : * Initialize the relation schema sync cache for a decoding session.
1964 : *
1965 : * The hash table is destroyed at the end of a decoding session. While
1966 : * relcache invalidations still exist and will still be invoked, they
1967 : * will just see the null hash table global and take no action.
1968 : */
1969 : static void
1970 792 : init_rel_sync_cache(MemoryContext cachectx)
1971 : {
1972 : HASHCTL ctl;
1973 : static bool relation_callbacks_registered = false;
1974 :
1975 : /* Nothing to do if hash table already exists */
1976 792 : if (RelationSyncCache != NULL)
1977 4 : return;
1978 :
1979 : /* Make a new hash table for the cache */
1980 792 : ctl.keysize = sizeof(Oid);
1981 792 : ctl.entrysize = sizeof(RelationSyncEntry);
1982 792 : ctl.hcxt = cachectx;
1983 :
1984 792 : RelationSyncCache = hash_create("logical replication output relation cache",
1985 : 128, &ctl,
1986 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1987 :
1988 : Assert(RelationSyncCache != NULL);
1989 :
1990 : /* No more to do if we already registered callbacks */
1991 792 : if (relation_callbacks_registered)
1992 4 : return;
1993 :
1994 : /* We must update the cache entry for a relation after a relcache flush */
1995 788 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1996 :
1997 : /*
1998 : * Flush all cache entries after a pg_namespace change, in case it was a
1999 : * schema rename affecting a relation being replicated.
2000 : *
2001 : * XXX: It is not a good idea to invalidate all the relation entries in
2002 : * RelationSyncCache on schema rename. We can optimize it to invalidate
2003 : * only the required relations by either having a specific invalidation
2004 : * message containing impacted relations or by having schema information
2005 : * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
2006 : * passed to the callback.
2007 : */
2008 788 : CacheRegisterSyscacheCallback(NAMESPACEOID,
2009 : rel_sync_cache_publication_cb,
2010 : (Datum) 0);
2011 :
2012 788 : relation_callbacks_registered = true;
2013 : }
2014 :
2015 : /*
2016 : * We expect relatively small number of streamed transactions.
2017 : */
2018 : static bool
2019 351842 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2020 : {
2021 351842 : return list_member_xid(entry->streamed_txns, xid);
2022 : }
2023 :
2024 : /*
2025 : * Add the xid in the rel sync entry for which we have already sent the schema
2026 : * of the relation.
2027 : */
2028 : static void
2029 134 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2030 : {
2031 : MemoryContext oldctx;
2032 :
2033 134 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2034 :
2035 134 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2036 :
2037 134 : MemoryContextSwitchTo(oldctx);
2038 134 : }
2039 :
2040 : /*
2041 : * Find or create entry in the relation schema cache.
2042 : *
2043 : * This looks up publications that the given relation is directly or
2044 : * indirectly part of (the latter if it's really the relation's ancestor that
2045 : * is part of a publication) and fills up the found entry with the information
2046 : * about which operations to publish and whether to use an ancestor's schema
2047 : * when publishing.
2048 : */
2049 : static RelationSyncEntry *
2050 366828 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2051 : {
2052 : RelationSyncEntry *entry;
2053 : bool found;
2054 : MemoryContext oldctx;
2055 366828 : Oid relid = RelationGetRelid(relation);
2056 :
2057 : Assert(RelationSyncCache != NULL);
2058 :
2059 : /* Find cached relation info, creating if not found */
2060 366828 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2061 : &relid,
2062 : HASH_ENTER, &found);
2063 : Assert(entry != NULL);
2064 :
2065 : /* initialize entry, if it's new */
2066 366828 : if (!found)
2067 : {
2068 562 : entry->replicate_valid = false;
2069 562 : entry->schema_sent = false;
2070 562 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2071 562 : entry->streamed_txns = NIL;
2072 562 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2073 562 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2074 562 : entry->new_slot = NULL;
2075 562 : entry->old_slot = NULL;
2076 562 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2077 562 : entry->entry_cxt = NULL;
2078 562 : entry->publish_as_relid = InvalidOid;
2079 562 : entry->columns = NULL;
2080 562 : entry->attrmap = NULL;
2081 : }
2082 :
2083 : /* Validate the entry */
2084 366828 : if (!entry->replicate_valid)
2085 : {
2086 708 : Oid schemaId = get_rel_namespace(relid);
2087 708 : List *pubids = GetRelationPublications(relid);
2088 :
2089 : /*
2090 : * We don't acquire a lock on the namespace system table as we build
2091 : * the cache entry using a historic snapshot and all the later changes
2092 : * are absorbed while decoding WAL.
2093 : */
2094 708 : List *schemaPubids = GetSchemaPublications(schemaId);
2095 : ListCell *lc;
2096 708 : Oid publish_as_relid = relid;
2097 708 : int publish_ancestor_level = 0;
2098 708 : bool am_partition = get_rel_relispartition(relid);
2099 708 : char relkind = get_rel_relkind(relid);
2100 708 : List *rel_publications = NIL;
2101 :
2102 : /* Reload publications if needed before use. */
2103 708 : if (!publications_valid)
2104 : {
2105 376 : MemoryContextReset(data->pubctx);
2106 :
2107 376 : oldctx = MemoryContextSwitchTo(data->pubctx);
2108 376 : data->publications = LoadPublications(data->publication_names);
2109 376 : MemoryContextSwitchTo(oldctx);
2110 376 : publications_valid = true;
2111 : }
2112 :
2113 : /*
2114 : * Reset schema_sent status as the relation definition may have
2115 : * changed. Also reset pubactions to empty in case rel was dropped
2116 : * from a publication. Also free any objects that depended on the
2117 : * earlier definition.
2118 : */
2119 708 : entry->schema_sent = false;
2120 708 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2121 708 : list_free(entry->streamed_txns);
2122 708 : entry->streamed_txns = NIL;
2123 708 : bms_free(entry->columns);
2124 708 : entry->columns = NULL;
2125 708 : entry->pubactions.pubinsert = false;
2126 708 : entry->pubactions.pubupdate = false;
2127 708 : entry->pubactions.pubdelete = false;
2128 708 : entry->pubactions.pubtruncate = false;
2129 :
2130 : /*
2131 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2132 : */
2133 708 : if (entry->old_slot)
2134 : {
2135 118 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2136 :
2137 : Assert(desc->tdrefcount == -1);
2138 :
2139 118 : ExecDropSingleTupleTableSlot(entry->old_slot);
2140 :
2141 : /*
2142 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2143 : * do it now to avoid any leaks.
2144 : */
2145 118 : FreeTupleDesc(desc);
2146 : }
2147 708 : if (entry->new_slot)
2148 : {
2149 118 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2150 :
2151 : Assert(desc->tdrefcount == -1);
2152 :
2153 118 : ExecDropSingleTupleTableSlot(entry->new_slot);
2154 :
2155 : /*
2156 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2157 : * do it now to avoid any leaks.
2158 : */
2159 118 : FreeTupleDesc(desc);
2160 : }
2161 :
2162 708 : entry->old_slot = NULL;
2163 708 : entry->new_slot = NULL;
2164 :
2165 708 : if (entry->attrmap)
2166 6 : free_attrmap(entry->attrmap);
2167 708 : entry->attrmap = NULL;
2168 :
2169 : /*
2170 : * Row filter cache cleanups.
2171 : */
2172 708 : if (entry->entry_cxt)
2173 118 : MemoryContextDelete(entry->entry_cxt);
2174 :
2175 708 : entry->entry_cxt = NULL;
2176 708 : entry->estate = NULL;
2177 708 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2178 :
2179 : /*
2180 : * Build publication cache. We can't use one provided by relcache as
2181 : * relcache considers all publications that the given relation is in,
2182 : * but here we only need to consider ones that the subscriber
2183 : * requested.
2184 : */
2185 1746 : foreach(lc, data->publications)
2186 : {
2187 1038 : Publication *pub = lfirst(lc);
2188 1038 : bool publish = false;
2189 :
2190 : /*
2191 : * Under what relid should we publish changes in this publication?
2192 : * We'll use the top-most relid across all publications. Also
2193 : * track the ancestor level for this publication.
2194 : */
2195 1038 : Oid pub_relid = relid;
2196 1038 : int ancestor_level = 0;
2197 :
2198 : /*
2199 : * If this is a FOR ALL TABLES publication, pick the partition
2200 : * root and set the ancestor level accordingly.
2201 : */
2202 1038 : if (pub->alltables)
2203 : {
2204 158 : publish = true;
2205 158 : if (pub->pubviaroot && am_partition)
2206 : {
2207 22 : List *ancestors = get_partition_ancestors(relid);
2208 :
2209 22 : pub_relid = llast_oid(ancestors);
2210 22 : ancestor_level = list_length(ancestors);
2211 : }
2212 : }
2213 :
2214 1038 : if (!publish)
2215 : {
2216 880 : bool ancestor_published = false;
2217 :
2218 : /*
2219 : * For a partition, check if any of the ancestors are
2220 : * published. If so, note down the topmost ancestor that is
2221 : * published via this publication, which will be used as the
2222 : * relation via which to publish the partition's changes.
2223 : */
2224 880 : if (am_partition)
2225 : {
2226 : Oid ancestor;
2227 : int level;
2228 242 : List *ancestors = get_partition_ancestors(relid);
2229 :
2230 242 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2231 : ancestors,
2232 : &level);
2233 :
2234 242 : if (ancestor != InvalidOid)
2235 : {
2236 96 : ancestor_published = true;
2237 96 : if (pub->pubviaroot)
2238 : {
2239 50 : pub_relid = ancestor;
2240 50 : ancestor_level = level;
2241 : }
2242 : }
2243 : }
2244 :
2245 1364 : if (list_member_oid(pubids, pub->oid) ||
2246 956 : list_member_oid(schemaPubids, pub->oid) ||
2247 : ancestor_published)
2248 464 : publish = true;
2249 : }
2250 :
2251 : /*
2252 : * If the relation is to be published, determine actions to
2253 : * publish, and list of columns, if appropriate.
2254 : *
2255 : * Don't publish changes for partitioned tables, because
2256 : * publishing those of its partitions suffices, unless partition
2257 : * changes won't be published due to pubviaroot being set.
2258 : */
2259 1038 : if (publish &&
2260 8 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2261 : {
2262 616 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2263 616 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2264 616 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2265 616 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2266 :
2267 : /*
2268 : * We want to publish the changes as the top-most ancestor
2269 : * across all publications. So we need to check if the already
2270 : * calculated level is higher than the new one. If yes, we can
2271 : * ignore the new value (as it's a child). Otherwise the new
2272 : * value is an ancestor, so we keep it.
2273 : */
2274 616 : if (publish_ancestor_level > ancestor_level)
2275 2 : continue;
2276 :
2277 : /*
2278 : * If we found an ancestor higher up in the tree, discard the
2279 : * list of publications through which we replicate it, and use
2280 : * the new ancestor.
2281 : */
2282 614 : if (publish_ancestor_level < ancestor_level)
2283 : {
2284 72 : publish_as_relid = pub_relid;
2285 72 : publish_ancestor_level = ancestor_level;
2286 :
2287 : /* reset the publication list for this relation */
2288 72 : rel_publications = NIL;
2289 : }
2290 : else
2291 : {
2292 : /* Same ancestor level, has to be the same OID. */
2293 : Assert(publish_as_relid == pub_relid);
2294 : }
2295 :
2296 : /* Track publications for this ancestor. */
2297 614 : rel_publications = lappend(rel_publications, pub);
2298 : }
2299 : }
2300 :
2301 708 : entry->publish_as_relid = publish_as_relid;
2302 :
2303 : /*
2304 : * Initialize the tuple slot, map, and row filter. These are only used
2305 : * when publishing inserts, updates, or deletes.
2306 : */
2307 708 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2308 120 : entry->pubactions.pubdelete)
2309 : {
2310 : /* Initialize the tuple slot and map */
2311 588 : init_tuple_slot(data, relation, entry);
2312 :
2313 : /* Initialize the row filter */
2314 588 : pgoutput_row_filter_init(data, rel_publications, entry);
2315 :
2316 : /* Check whether to publish generated columns. */
2317 588 : check_and_init_gencol(data, rel_publications, entry);
2318 :
2319 : /* Initialize the column list */
2320 588 : pgoutput_column_list_init(data, rel_publications, entry);
2321 : }
2322 :
2323 706 : list_free(pubids);
2324 706 : list_free(schemaPubids);
2325 706 : list_free(rel_publications);
2326 :
2327 706 : entry->replicate_valid = true;
2328 : }
2329 :
2330 366826 : return entry;
2331 : }
2332 :
2333 : /*
2334 : * Cleanup list of streamed transactions and update the schema_sent flag.
2335 : *
2336 : * When a streamed transaction commits or aborts, we need to remove the
2337 : * toplevel XID from the schema cache. If the transaction aborted, the
2338 : * subscriber will simply throw away the schema records we streamed, so
2339 : * we don't need to do anything else.
2340 : *
2341 : * If the transaction is committed, the subscriber will update the relation
2342 : * cache - so tweak the schema_sent flag accordingly.
2343 : */
2344 : static void
2345 142 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2346 : {
2347 : HASH_SEQ_STATUS hash_seq;
2348 : RelationSyncEntry *entry;
2349 :
2350 : Assert(RelationSyncCache != NULL);
2351 :
2352 142 : hash_seq_init(&hash_seq, RelationSyncCache);
2353 290 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2354 : {
2355 : /*
2356 : * We can set the schema_sent flag for an entry that has committed xid
2357 : * in the list as that ensures that the subscriber would have the
2358 : * corresponding schema and we don't need to send it unless there is
2359 : * any invalidation for that relation.
2360 : */
2361 334 : foreach_xid(streamed_txn, entry->streamed_txns)
2362 : {
2363 144 : if (xid == streamed_txn)
2364 : {
2365 106 : if (is_commit)
2366 84 : entry->schema_sent = true;
2367 :
2368 106 : entry->streamed_txns =
2369 106 : foreach_delete_current(entry->streamed_txns, streamed_txn);
2370 106 : break;
2371 : }
2372 : }
2373 : }
2374 142 : }
2375 :
2376 : /*
2377 : * Relcache invalidation callback
2378 : */
2379 : static void
2380 7498 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2381 : {
2382 : RelationSyncEntry *entry;
2383 :
2384 : /*
2385 : * We can get here if the plugin was used in SQL interface as the
2386 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2387 : * no way to unregister the relcache invalidation callback.
2388 : */
2389 7498 : if (RelationSyncCache == NULL)
2390 52 : return;
2391 :
2392 : /*
2393 : * Nobody keeps pointers to entries in this hash table around outside
2394 : * logical decoding callback calls - but invalidation events can come in
2395 : * *during* a callback if we do any syscache access in the callback.
2396 : * Because of that we must mark the cache entry as invalid but not damage
2397 : * any of its substructure here. The next get_rel_sync_entry() call will
2398 : * rebuild it all.
2399 : */
2400 7446 : if (OidIsValid(relid))
2401 : {
2402 : /*
2403 : * Getting invalidations for relations that aren't in the table is
2404 : * entirely normal. So we don't care if it's found or not.
2405 : */
2406 7334 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2407 : HASH_FIND, NULL);
2408 7334 : if (entry != NULL)
2409 1270 : entry->replicate_valid = false;
2410 : }
2411 : else
2412 : {
2413 : /* Whole cache must be flushed. */
2414 : HASH_SEQ_STATUS status;
2415 :
2416 112 : hash_seq_init(&status, RelationSyncCache);
2417 228 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2418 : {
2419 116 : entry->replicate_valid = false;
2420 : }
2421 : }
2422 : }
2423 :
2424 : /*
2425 : * Publication relation/schema map syscache invalidation callback
2426 : *
2427 : * Called for invalidations on pg_namespace.
2428 : */
2429 : static void
2430 70 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2431 : {
2432 : HASH_SEQ_STATUS status;
2433 : RelationSyncEntry *entry;
2434 :
2435 : /*
2436 : * We can get here if the plugin was used in SQL interface as the
2437 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2438 : * no way to unregister the invalidation callbacks.
2439 : */
2440 70 : if (RelationSyncCache == NULL)
2441 20 : return;
2442 :
2443 : /*
2444 : * We have no easy way to identify which cache entries this invalidation
2445 : * event might have affected, so just mark them all invalid.
2446 : */
2447 50 : hash_seq_init(&status, RelationSyncCache);
2448 92 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2449 : {
2450 42 : entry->replicate_valid = false;
2451 : }
2452 : }
2453 :
2454 : /* Send Replication origin */
2455 : static void
2456 2158 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2457 : XLogRecPtr origin_lsn, bool send_origin)
2458 : {
2459 2158 : if (send_origin)
2460 : {
2461 : char *origin;
2462 :
2463 : /*----------
2464 : * XXX: which behaviour do we want here?
2465 : *
2466 : * Alternatives:
2467 : * - don't send origin message if origin name not found
2468 : * (that's what we do now)
2469 : * - throw error - that will break replication, not good
2470 : * - send some special "unknown" origin
2471 : *----------
2472 : */
2473 16 : if (replorigin_by_oid(origin_id, true, &origin))
2474 : {
2475 : /* Message boundary */
2476 16 : OutputPluginWrite(ctx, false);
2477 16 : OutputPluginPrepareWrite(ctx, true);
2478 :
2479 16 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2480 : }
2481 : }
2482 2158 : }
|