Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "parser/parse_relation.h"
26 : #include "replication/logical.h"
27 : #include "replication/logicalproto.h"
28 : #include "replication/origin.h"
29 : #include "replication/pgoutput.h"
30 : #include "rewrite/rewriteHandler.h"
31 : #include "utils/builtins.h"
32 : #include "utils/inval.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 : #include "utils/syscache.h"
37 : #include "utils/varlena.h"
38 :
39 1088 : PG_MODULE_MAGIC_EXT(
40 : .name = "pgoutput",
41 : .version = PG_VERSION
42 : );
43 :
44 : static void pgoutput_startup(LogicalDecodingContext *ctx,
45 : OutputPluginOptions *opt, bool is_init);
46 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
47 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
48 : ReorderBufferTXN *txn);
49 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
50 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
51 : static void pgoutput_change(LogicalDecodingContext *ctx,
52 : ReorderBufferTXN *txn, Relation relation,
53 : ReorderBufferChange *change);
54 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
56 : ReorderBufferChange *change);
57 : static void pgoutput_message(LogicalDecodingContext *ctx,
58 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
59 : bool transactional, const char *prefix,
60 : Size sz, const char *message);
61 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
62 : RepOriginId origin_id);
63 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
64 : ReorderBufferTXN *txn);
65 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
67 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
68 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
69 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
70 : ReorderBufferTXN *txn,
71 : XLogRecPtr prepare_end_lsn,
72 : TimestampTz prepare_time);
73 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
74 : ReorderBufferTXN *txn);
75 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
76 : ReorderBufferTXN *txn);
77 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
78 : ReorderBufferTXN *txn,
79 : XLogRecPtr abort_lsn);
80 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn,
82 : XLogRecPtr commit_lsn);
83 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
84 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
85 :
86 : static bool publications_valid;
87 :
88 : static List *LoadPublications(List *pubnames);
89 : static void publication_invalidation_cb(Datum arg, int cacheid,
90 : uint32 hashvalue);
91 : static void send_repl_origin(LogicalDecodingContext *ctx,
92 : RepOriginId origin_id, XLogRecPtr origin_lsn,
93 : bool send_origin);
94 :
95 : /*
96 : * Only 3 publication actions are used for row filtering ("insert", "update",
97 : * "delete"). See RelationSyncEntry.exprstate[].
98 : */
99 : enum RowFilterPubAction
100 : {
101 : PUBACTION_INSERT,
102 : PUBACTION_UPDATE,
103 : PUBACTION_DELETE,
104 : };
105 :
106 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107 :
108 : /*
109 : * Entry in the map used to remember which relation schemas we sent.
110 : *
111 : * The schema_sent flag determines if the current schema record for the
112 : * relation (and for its ancestor if publish_as_relid is set) was already
113 : * sent to the subscriber (in which case we don't need to send it again).
114 : *
115 : * The schema cache on downstream is however updated only at commit time,
116 : * and with streamed transactions the commit order may be different from
117 : * the order the transactions are sent in. Also, the (sub) transactions
118 : * might get aborted so we need to send the schema for each (sub) transaction
119 : * so that we don't lose the schema information on abort. For handling this,
120 : * we maintain the list of xids (streamed_txns) for those we have already sent
121 : * the schema.
122 : *
123 : * For partitions, 'pubactions' considers not only the table's own
124 : * publications, but also those of all of its ancestors.
125 : */
126 : typedef struct RelationSyncEntry
127 : {
128 : Oid relid; /* relation oid */
129 :
130 : bool replicate_valid; /* overall validity flag for entry */
131 :
132 : bool schema_sent;
133 :
134 : /*
135 : * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 : * columns and the 'publish_generated_columns' parameter is set to
137 : * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 : * indicating that no generated columns should be published, unless
139 : * explicitly specified in the column list.
140 : */
141 : PublishGencolsType include_gencols_type;
142 : List *streamed_txns; /* streamed toplevel transactions with this
143 : * schema */
144 :
145 : /* are we publishing this rel? */
146 : PublicationActions pubactions;
147 :
148 : /*
149 : * ExprState array for row filter. Different publication actions don't
150 : * allow multiple expressions to always be combined into one, because
151 : * updates or deletes restrict the column in expression to be part of the
152 : * replica identity index whereas inserts do not have this restriction, so
153 : * there is one ExprState per publication action.
154 : */
155 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
156 : EState *estate; /* executor state used for row filter */
157 : TupleTableSlot *new_slot; /* slot for storing new tuple */
158 : TupleTableSlot *old_slot; /* slot for storing old tuple */
159 :
160 : /*
161 : * OID of the relation to publish changes as. For a partition, this may
162 : * be set to one of its ancestors whose schema will be used when
163 : * replicating changes, if publish_via_partition_root is set for the
164 : * publication.
165 : */
166 : Oid publish_as_relid;
167 :
168 : /*
169 : * Map used when replicating using an ancestor's schema to convert tuples
170 : * from partition's type to the ancestor's; NULL if publish_as_relid is
171 : * same as 'relid' or if unnecessary due to partition and the ancestor
172 : * having identical TupleDesc.
173 : */
174 : AttrMap *attrmap;
175 :
176 : /*
177 : * Columns included in the publication, or NULL if all columns are
178 : * included implicitly. Note that the attnums in this bitmap are not
179 : * shifted by FirstLowInvalidHeapAttributeNumber.
180 : */
181 : Bitmapset *columns;
182 :
183 : /*
184 : * Private context to store additional data for this entry - state for the
185 : * row filter expressions, column list, etc.
186 : */
187 : MemoryContext entry_cxt;
188 : } RelationSyncEntry;
189 :
190 : /*
191 : * Maintain a per-transaction level variable to track whether the transaction
192 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 : * messages for empty transactions which saves network bandwidth.
195 : *
196 : * This optimization is not used for prepared transactions because if the
197 : * WALSender restarts after prepare of a transaction and before commit prepared
198 : * of the same transaction then we won't be able to figure out if we have
199 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 : * because we would have lost the in-memory txndata information that was
201 : * present prior to the restart. This will result in sending a spurious
202 : * COMMIT PREPARED without a corresponding prepared transaction at the
203 : * downstream which would lead to an error when it tries to process it.
204 : *
205 : * XXX We could achieve this optimization by changing protocol to send
206 : * additional information so that downstream can detect that the corresponding
207 : * prepare has not been sent. However, adding such a check for every
208 : * transaction in the downstream could be costly so we might want to do it
209 : * optionally.
210 : *
211 : * We also don't have this optimization for streamed transactions because
212 : * they can contain prepared transactions.
213 : */
214 : typedef struct PGOutputTxnData
215 : {
216 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
217 : } PGOutputTxnData;
218 :
219 : /* Map used to remember which relation schemas we sent. */
220 : static HTAB *RelationSyncCache = NULL;
221 :
222 : static void init_rel_sync_cache(MemoryContext cachectx);
223 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
224 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
225 : Relation relation);
226 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
227 : LogicalDecodingContext *ctx,
228 : RelationSyncEntry *relentry);
229 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
230 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
231 : uint32 hashvalue);
232 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
233 : TransactionId xid);
234 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
235 : TransactionId xid);
236 : static void init_tuple_slot(PGOutputData *data, Relation relation,
237 : RelationSyncEntry *entry);
238 : static void pgoutput_memory_context_reset(void *arg);
239 :
240 : /* row filter routines */
241 : static EState *create_estate_for_relation(Relation rel);
242 : static void pgoutput_row_filter_init(PGOutputData *data,
243 : List *publications,
244 : RelationSyncEntry *entry);
245 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
246 : ExprContext *econtext);
247 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
248 : TupleTableSlot **new_slot_ptr,
249 : RelationSyncEntry *entry,
250 : ReorderBufferChangeType *action);
251 :
252 : /* column list routines */
253 : static void pgoutput_column_list_init(PGOutputData *data,
254 : List *publications,
255 : RelationSyncEntry *entry);
256 :
257 : /*
258 : * Specify output plugin callbacks
259 : */
260 : void
261 1472 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
262 : {
263 1472 : cb->startup_cb = pgoutput_startup;
264 1472 : cb->begin_cb = pgoutput_begin_txn;
265 1472 : cb->change_cb = pgoutput_change;
266 1472 : cb->truncate_cb = pgoutput_truncate;
267 1472 : cb->message_cb = pgoutput_message;
268 1472 : cb->commit_cb = pgoutput_commit_txn;
269 :
270 1472 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
271 1472 : cb->prepare_cb = pgoutput_prepare_txn;
272 1472 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
273 1472 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
274 1472 : cb->filter_by_origin_cb = pgoutput_origin_filter;
275 1472 : cb->shutdown_cb = pgoutput_shutdown;
276 :
277 : /* transaction streaming */
278 1472 : cb->stream_start_cb = pgoutput_stream_start;
279 1472 : cb->stream_stop_cb = pgoutput_stream_stop;
280 1472 : cb->stream_abort_cb = pgoutput_stream_abort;
281 1472 : cb->stream_commit_cb = pgoutput_stream_commit;
282 1472 : cb->stream_change_cb = pgoutput_change;
283 1472 : cb->stream_message_cb = pgoutput_message;
284 1472 : cb->stream_truncate_cb = pgoutput_truncate;
285 : /* transaction streaming - two-phase commit */
286 1472 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
287 1472 : }
288 :
289 : static void
290 798 : parse_output_parameters(List *options, PGOutputData *data)
291 : {
292 : ListCell *lc;
293 798 : bool protocol_version_given = false;
294 798 : bool publication_names_given = false;
295 798 : bool binary_option_given = false;
296 798 : bool messages_option_given = false;
297 798 : bool streaming_given = false;
298 798 : bool two_phase_option_given = false;
299 798 : bool origin_option_given = false;
300 :
301 : /* Initialize optional parameters to defaults */
302 798 : data->binary = false;
303 798 : data->streaming = LOGICALREP_STREAM_OFF;
304 798 : data->messages = false;
305 798 : data->two_phase = false;
306 798 : data->publish_no_origin = false;
307 :
308 3980 : foreach(lc, options)
309 : {
310 3182 : DefElem *defel = (DefElem *) lfirst(lc);
311 :
312 : Assert(defel->arg == NULL || IsA(defel->arg, String));
313 :
314 : /* Check each param, whether or not we recognize it */
315 3182 : if (strcmp(defel->defname, "proto_version") == 0)
316 : {
317 : unsigned long parsed;
318 : char *endptr;
319 :
320 798 : if (protocol_version_given)
321 0 : ereport(ERROR,
322 : (errcode(ERRCODE_SYNTAX_ERROR),
323 : errmsg("conflicting or redundant options")));
324 798 : protocol_version_given = true;
325 :
326 798 : errno = 0;
327 798 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
328 798 : if (errno != 0 || *endptr != '\0')
329 0 : ereport(ERROR,
330 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
331 : errmsg("invalid proto_version")));
332 :
333 798 : if (parsed > PG_UINT32_MAX)
334 0 : ereport(ERROR,
335 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
336 : errmsg("proto_version \"%s\" out of range",
337 : strVal(defel->arg))));
338 :
339 798 : data->protocol_version = (uint32) parsed;
340 : }
341 2384 : else if (strcmp(defel->defname, "publication_names") == 0)
342 : {
343 798 : if (publication_names_given)
344 0 : ereport(ERROR,
345 : (errcode(ERRCODE_SYNTAX_ERROR),
346 : errmsg("conflicting or redundant options")));
347 798 : publication_names_given = true;
348 :
349 : /*
350 : * Pass a copy of the DefElem->arg since SplitIdentifierString
351 : * modifies its input.
352 : */
353 798 : if (!SplitIdentifierString(pstrdup(strVal(defel->arg)), ',',
354 : &data->publication_names))
355 0 : ereport(ERROR,
356 : (errcode(ERRCODE_INVALID_NAME),
357 : errmsg("invalid publication_names syntax")));
358 : }
359 1586 : else if (strcmp(defel->defname, "binary") == 0)
360 : {
361 20 : if (binary_option_given)
362 0 : ereport(ERROR,
363 : (errcode(ERRCODE_SYNTAX_ERROR),
364 : errmsg("conflicting or redundant options")));
365 20 : binary_option_given = true;
366 :
367 20 : data->binary = defGetBoolean(defel);
368 : }
369 1566 : else if (strcmp(defel->defname, "messages") == 0)
370 : {
371 8 : if (messages_option_given)
372 0 : ereport(ERROR,
373 : (errcode(ERRCODE_SYNTAX_ERROR),
374 : errmsg("conflicting or redundant options")));
375 8 : messages_option_given = true;
376 :
377 8 : data->messages = defGetBoolean(defel);
378 : }
379 1558 : else if (strcmp(defel->defname, "streaming") == 0)
380 : {
381 762 : if (streaming_given)
382 0 : ereport(ERROR,
383 : (errcode(ERRCODE_SYNTAX_ERROR),
384 : errmsg("conflicting or redundant options")));
385 762 : streaming_given = true;
386 :
387 762 : data->streaming = defGetStreamingMode(defel);
388 : }
389 796 : else if (strcmp(defel->defname, "two_phase") == 0)
390 : {
391 16 : if (two_phase_option_given)
392 0 : ereport(ERROR,
393 : (errcode(ERRCODE_SYNTAX_ERROR),
394 : errmsg("conflicting or redundant options")));
395 16 : two_phase_option_given = true;
396 :
397 16 : data->two_phase = defGetBoolean(defel);
398 : }
399 780 : else if (strcmp(defel->defname, "origin") == 0)
400 : {
401 : char *origin;
402 :
403 780 : if (origin_option_given)
404 0 : ereport(ERROR,
405 : errcode(ERRCODE_SYNTAX_ERROR),
406 : errmsg("conflicting or redundant options"));
407 780 : origin_option_given = true;
408 :
409 780 : origin = defGetString(defel);
410 780 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
411 52 : data->publish_no_origin = true;
412 728 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
413 728 : data->publish_no_origin = false;
414 : else
415 0 : ereport(ERROR,
416 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 : errmsg("unrecognized origin value: \"%s\"", origin));
418 : }
419 : else
420 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
421 : }
422 :
423 : /* Check required options */
424 798 : if (!protocol_version_given)
425 0 : ereport(ERROR,
426 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
427 : errmsg("option \"%s\" missing", "proto_version"));
428 798 : if (!publication_names_given)
429 0 : ereport(ERROR,
430 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
431 : errmsg("option \"%s\" missing", "publication_names"));
432 798 : }
433 :
434 : /*
435 : * Memory context reset callback of PGOutputData->context.
436 : */
437 : static void
438 2132 : pgoutput_memory_context_reset(void *arg)
439 : {
440 2132 : if (RelationSyncCache)
441 : {
442 392 : hash_destroy(RelationSyncCache);
443 392 : RelationSyncCache = NULL;
444 : }
445 2132 : }
446 :
447 : /*
448 : * Initialize this plugin
449 : */
450 : static void
451 1472 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
452 : bool is_init)
453 : {
454 1472 : PGOutputData *data = palloc0_object(PGOutputData);
455 : static bool publication_callback_registered = false;
456 : MemoryContextCallback *mcallback;
457 :
458 : /* Create our memory context for private allocations. */
459 1472 : data->context = AllocSetContextCreate(ctx->context,
460 : "logical replication output context",
461 : ALLOCSET_DEFAULT_SIZES);
462 :
463 1472 : data->cachectx = AllocSetContextCreate(ctx->context,
464 : "logical replication cache context",
465 : ALLOCSET_DEFAULT_SIZES);
466 :
467 1472 : data->pubctx = AllocSetContextCreate(ctx->context,
468 : "logical replication publication list context",
469 : ALLOCSET_SMALL_SIZES);
470 :
471 : /*
472 : * Ensure to cleanup RelationSyncCache even when logical decoding invoked
473 : * via SQL interface ends up with an error.
474 : */
475 1472 : mcallback = palloc0_object(MemoryContextCallback);
476 1472 : mcallback->func = pgoutput_memory_context_reset;
477 1472 : MemoryContextRegisterResetCallback(ctx->context, mcallback);
478 :
479 1472 : ctx->output_plugin_private = data;
480 :
481 : /* This plugin uses binary protocol. */
482 1472 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
483 :
484 : /*
485 : * This is replication start and not slot initialization.
486 : *
487 : * Parse and validate options passed by the client.
488 : */
489 1472 : if (!is_init)
490 : {
491 : /* Parse the params and ERROR if we see any we don't recognize */
492 798 : parse_output_parameters(ctx->output_plugin_options, data);
493 :
494 : /* Check if we support requested protocol */
495 798 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
496 0 : ereport(ERROR,
497 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
498 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
499 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
500 :
501 798 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
502 0 : ereport(ERROR,
503 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
504 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
505 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
506 :
507 : /*
508 : * Decide whether to enable streaming. It is disabled by default, in
509 : * which case we just update the flag in decoding context. Otherwise
510 : * we only allow it with sufficient version of the protocol, and when
511 : * the output plugin supports it.
512 : */
513 798 : if (data->streaming == LOGICALREP_STREAM_OFF)
514 36 : ctx->streaming = false;
515 762 : else if (data->streaming == LOGICALREP_STREAM_ON &&
516 52 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
517 0 : ereport(ERROR,
518 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
519 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
520 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
521 762 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
522 710 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
523 0 : ereport(ERROR,
524 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
525 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
526 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
527 762 : else if (!ctx->streaming)
528 0 : ereport(ERROR,
529 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
530 : errmsg("streaming requested, but not supported by output plugin")));
531 :
532 : /*
533 : * Here, we just check whether the two-phase option is passed by
534 : * plugin and decide whether to enable it at later point of time. It
535 : * remains enabled if the previous start-up has done so. But we only
536 : * allow the option to be passed in with sufficient version of the
537 : * protocol, and when the output plugin supports it.
538 : */
539 798 : if (!data->two_phase)
540 782 : ctx->twophase_opt_given = false;
541 16 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
542 0 : ereport(ERROR,
543 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
544 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
545 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
546 16 : else if (!ctx->twophase)
547 0 : ereport(ERROR,
548 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
549 : errmsg("two-phase commit requested, but not supported by output plugin")));
550 : else
551 16 : ctx->twophase_opt_given = true;
552 :
553 : /* Init publication state. */
554 798 : data->publications = NIL;
555 798 : publications_valid = false;
556 :
557 : /*
558 : * Register callback for pg_publication if we didn't already do that
559 : * during some previous call in this process.
560 : */
561 798 : if (!publication_callback_registered)
562 : {
563 794 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
564 : publication_invalidation_cb,
565 : (Datum) 0);
566 794 : CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
567 : (Datum) 0);
568 794 : publication_callback_registered = true;
569 : }
570 :
571 : /* Initialize relation schema cache. */
572 798 : init_rel_sync_cache(CacheMemoryContext);
573 : }
574 : else
575 : {
576 : /*
577 : * Disable the streaming and prepared transactions during the slot
578 : * initialization mode.
579 : */
580 674 : ctx->streaming = false;
581 674 : ctx->twophase = false;
582 : }
583 1472 : }
584 :
585 : /*
586 : * BEGIN callback.
587 : *
588 : * Don't send the BEGIN message here instead postpone it until the first
589 : * change. In logical replication, a common scenario is to replicate a set of
590 : * tables (instead of all tables) and transactions whose changes were on
591 : * the table(s) that are not published will produce empty transactions. These
592 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
593 : * using bandwidth on something with little/no use for logical replication.
594 : */
595 : static void
596 1858 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
597 : {
598 1858 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
599 : sizeof(PGOutputTxnData));
600 :
601 1858 : txn->output_plugin_private = txndata;
602 1858 : }
603 :
604 : /*
605 : * Send BEGIN.
606 : *
607 : * This is called while processing the first change of the transaction.
608 : */
609 : static void
610 894 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
611 : {
612 894 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
613 894 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
614 :
615 : Assert(txndata);
616 : Assert(!txndata->sent_begin_txn);
617 :
618 894 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
619 894 : logicalrep_write_begin(ctx->out, txn);
620 894 : txndata->sent_begin_txn = true;
621 :
622 894 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
623 : send_replication_origin);
624 :
625 894 : OutputPluginWrite(ctx, true);
626 892 : }
627 :
628 : /*
629 : * COMMIT callback
630 : */
631 : static void
632 1854 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
633 : XLogRecPtr commit_lsn)
634 : {
635 1854 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
636 : bool sent_begin_txn;
637 :
638 : Assert(txndata);
639 :
640 : /*
641 : * We don't need to send the commit message unless some relevant change
642 : * from this transaction has been sent to the downstream.
643 : */
644 1854 : sent_begin_txn = txndata->sent_begin_txn;
645 1854 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
646 1854 : pfree(txndata);
647 1854 : txn->output_plugin_private = NULL;
648 :
649 1854 : if (!sent_begin_txn)
650 : {
651 962 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
652 962 : return;
653 : }
654 :
655 892 : OutputPluginPrepareWrite(ctx, true);
656 892 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
657 892 : OutputPluginWrite(ctx, true);
658 : }
659 :
660 : /*
661 : * BEGIN PREPARE callback
662 : */
663 : static void
664 40 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
665 : {
666 40 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
667 :
668 40 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
669 40 : logicalrep_write_begin_prepare(ctx->out, txn);
670 :
671 40 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
672 : send_replication_origin);
673 :
674 40 : OutputPluginWrite(ctx, true);
675 40 : }
676 :
677 : /*
678 : * PREPARE callback
679 : */
680 : static void
681 40 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
682 : XLogRecPtr prepare_lsn)
683 : {
684 40 : OutputPluginUpdateProgress(ctx, false);
685 :
686 40 : OutputPluginPrepareWrite(ctx, true);
687 40 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
688 40 : OutputPluginWrite(ctx, true);
689 40 : }
690 :
691 : /*
692 : * COMMIT PREPARED callback
693 : */
694 : static void
695 50 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
696 : XLogRecPtr commit_lsn)
697 : {
698 50 : OutputPluginUpdateProgress(ctx, false);
699 :
700 50 : OutputPluginPrepareWrite(ctx, true);
701 50 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
702 50 : OutputPluginWrite(ctx, true);
703 50 : }
704 :
705 : /*
706 : * ROLLBACK PREPARED callback
707 : */
708 : static void
709 14 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
710 : ReorderBufferTXN *txn,
711 : XLogRecPtr prepare_end_lsn,
712 : TimestampTz prepare_time)
713 : {
714 14 : OutputPluginUpdateProgress(ctx, false);
715 :
716 14 : OutputPluginPrepareWrite(ctx, true);
717 14 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
718 : prepare_time);
719 14 : OutputPluginWrite(ctx, true);
720 14 : }
721 :
722 : /*
723 : * Write the current schema of the relation and its ancestor (if any) if not
724 : * done yet.
725 : */
726 : static void
727 364524 : maybe_send_schema(LogicalDecodingContext *ctx,
728 : ReorderBufferChange *change,
729 : Relation relation, RelationSyncEntry *relentry)
730 : {
731 364524 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
732 : bool schema_sent;
733 364524 : TransactionId xid = InvalidTransactionId;
734 364524 : TransactionId topxid = InvalidTransactionId;
735 :
736 : /*
737 : * Remember XID of the (sub)transaction for the change. We don't care if
738 : * it's top-level transaction or not (we have already sent that XID in
739 : * start of the current streaming block).
740 : *
741 : * If we're not in a streaming block, just use InvalidTransactionId and
742 : * the write methods will not include it.
743 : */
744 364524 : if (data->in_streaming)
745 351846 : xid = change->txn->xid;
746 :
747 364524 : if (rbtxn_is_subtxn(change->txn))
748 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
749 : else
750 344186 : topxid = xid;
751 :
752 : /*
753 : * Do we need to send the schema? We do track streamed transactions
754 : * separately, because those may be applied later (and the regular
755 : * transactions won't see their effects until then) and in an order that
756 : * we don't know at this point.
757 : *
758 : * XXX There is a scope of optimization here. Currently, we always send
759 : * the schema first time in a streaming transaction but we can probably
760 : * avoid that by checking 'relentry->schema_sent' flag. However, before
761 : * doing that we need to study its impact on the case where we have a mix
762 : * of streaming and non-streaming transactions.
763 : */
764 364524 : if (data->in_streaming)
765 351846 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
766 : else
767 12678 : schema_sent = relentry->schema_sent;
768 :
769 : /* Nothing to do if we already sent the schema. */
770 364524 : if (schema_sent)
771 363856 : return;
772 :
773 : /*
774 : * Send the schema. If the changes will be published using an ancestor's
775 : * schema, not the relation's own, send that ancestor's schema before
776 : * sending relation's own (XXX - maybe sending only the former suffices?).
777 : */
778 668 : if (relentry->publish_as_relid != RelationGetRelid(relation))
779 : {
780 66 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
781 :
782 66 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
783 66 : RelationClose(ancestor);
784 : }
785 :
786 668 : send_relation_and_attrs(relation, xid, ctx, relentry);
787 :
788 668 : if (data->in_streaming)
789 138 : set_schema_sent_in_streamed_txn(relentry, topxid);
790 : else
791 530 : relentry->schema_sent = true;
792 : }
793 :
794 : /*
795 : * Sends a relation
796 : */
797 : static void
798 734 : send_relation_and_attrs(Relation relation, TransactionId xid,
799 : LogicalDecodingContext *ctx,
800 : RelationSyncEntry *relentry)
801 : {
802 734 : TupleDesc desc = RelationGetDescr(relation);
803 734 : Bitmapset *columns = relentry->columns;
804 734 : PublishGencolsType include_gencols_type = relentry->include_gencols_type;
805 : int i;
806 :
807 : /*
808 : * Write out type info if needed. We do that only for user-created types.
809 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
810 : * objects with hand-assigned OIDs to be "built in", not for instance any
811 : * function or type defined in the information_schema. This is important
812 : * because only hand-assigned OIDs can be expected to remain stable across
813 : * major versions.
814 : */
815 2250 : for (i = 0; i < desc->natts; i++)
816 : {
817 1516 : Form_pg_attribute att = TupleDescAttr(desc, i);
818 :
819 1516 : if (!logicalrep_should_publish_column(att, columns,
820 : include_gencols_type))
821 142 : continue;
822 :
823 1374 : if (att->atttypid < FirstGenbkiObjectId)
824 1338 : continue;
825 :
826 36 : OutputPluginPrepareWrite(ctx, false);
827 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
828 36 : OutputPluginWrite(ctx, false);
829 : }
830 :
831 734 : OutputPluginPrepareWrite(ctx, false);
832 734 : logicalrep_write_rel(ctx->out, xid, relation, columns,
833 : include_gencols_type);
834 734 : OutputPluginWrite(ctx, false);
835 734 : }
836 :
837 : /*
838 : * Executor state preparation for evaluation of row filter expressions for the
839 : * specified relation.
840 : */
841 : static EState *
842 34 : create_estate_for_relation(Relation rel)
843 : {
844 : EState *estate;
845 : RangeTblEntry *rte;
846 34 : List *perminfos = NIL;
847 :
848 34 : estate = CreateExecutorState();
849 :
850 34 : rte = makeNode(RangeTblEntry);
851 34 : rte->rtekind = RTE_RELATION;
852 34 : rte->relid = RelationGetRelid(rel);
853 34 : rte->relkind = rel->rd_rel->relkind;
854 34 : rte->rellockmode = AccessShareLock;
855 :
856 34 : addRTEPermissionInfo(&perminfos, rte);
857 :
858 34 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
859 : bms_make_singleton(1));
860 :
861 34 : estate->es_output_cid = GetCurrentCommandId(false);
862 :
863 34 : return estate;
864 : }
865 :
866 : /*
867 : * Evaluates row filter.
868 : *
869 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
870 : * isn't replicated.
871 : */
872 : static bool
873 76 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
874 : {
875 : Datum ret;
876 : bool isnull;
877 :
878 : Assert(state != NULL);
879 :
880 76 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
881 :
882 76 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
883 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
884 : isnull ? "true" : "false");
885 :
886 76 : if (isnull)
887 2 : return false;
888 :
889 74 : return DatumGetBool(ret);
890 : }
891 :
892 : /*
893 : * Make sure the per-entry memory context exists.
894 : */
895 : static void
896 634 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
897 : {
898 : Relation relation;
899 :
900 : /* The context may already exist, in which case bail out. */
901 634 : if (entry->entry_cxt)
902 34 : return;
903 :
904 600 : relation = RelationIdGetRelation(entry->publish_as_relid);
905 :
906 600 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
907 : "entry private context",
908 : ALLOCSET_SMALL_SIZES);
909 :
910 600 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
911 : RelationGetRelationName(relation));
912 : }
913 :
914 : /*
915 : * Initialize the row filter.
916 : */
917 : static void
918 600 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
919 : RelationSyncEntry *entry)
920 : {
921 : ListCell *lc;
922 600 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
923 600 : bool no_filter[] = {false, false, false}; /* One per pubaction */
924 : MemoryContext oldctx;
925 : int idx;
926 600 : bool has_filter = true;
927 600 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
928 :
929 : /*
930 : * Find if there are any row filters for this relation. If there are, then
931 : * prepare the necessary ExprState and cache it in entry->exprstate. To
932 : * build an expression state, we need to ensure the following:
933 : *
934 : * All the given publication-table mappings must be checked.
935 : *
936 : * Multiple publications might have multiple row filters for this
937 : * relation. Since row filter usage depends on the DML operation, there
938 : * are multiple lists (one for each operation) to which row filters will
939 : * be appended.
940 : *
941 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
942 : * expression" so it takes precedence.
943 : */
944 642 : foreach(lc, publications)
945 : {
946 608 : Publication *pub = lfirst(lc);
947 608 : HeapTuple rftuple = NULL;
948 608 : Datum rfdatum = 0;
949 608 : bool pub_no_filter = true;
950 :
951 : /*
952 : * If the publication is FOR ALL TABLES, or the publication includes a
953 : * FOR TABLES IN SCHEMA where the table belongs to the referred
954 : * schema, then it is treated the same as if there are no row filters
955 : * (even if other publications have a row filter).
956 : */
957 608 : if (!pub->alltables &&
958 446 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
959 : ObjectIdGetDatum(schemaid),
960 : ObjectIdGetDatum(pub->oid)))
961 : {
962 : /*
963 : * Check for the presence of a row filter in this publication.
964 : */
965 434 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
966 : ObjectIdGetDatum(entry->publish_as_relid),
967 : ObjectIdGetDatum(pub->oid));
968 :
969 434 : if (HeapTupleIsValid(rftuple))
970 : {
971 : /* Null indicates no filter. */
972 410 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
973 : Anum_pg_publication_rel_prqual,
974 : &pub_no_filter);
975 : }
976 : }
977 :
978 608 : if (pub_no_filter)
979 : {
980 580 : if (rftuple)
981 382 : ReleaseSysCache(rftuple);
982 :
983 580 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
984 580 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
985 580 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
986 :
987 : /*
988 : * Quick exit if all the DML actions are publicized via this
989 : * publication.
990 : */
991 580 : if (no_filter[PUBACTION_INSERT] &&
992 580 : no_filter[PUBACTION_UPDATE] &&
993 566 : no_filter[PUBACTION_DELETE])
994 : {
995 566 : has_filter = false;
996 566 : break;
997 : }
998 :
999 : /* No additional work for this publication. Next one. */
1000 14 : continue;
1001 : }
1002 :
1003 : /* Form the per pubaction row filter lists. */
1004 28 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
1005 28 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
1006 28 : TextDatumGetCString(rfdatum));
1007 28 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
1008 28 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
1009 28 : TextDatumGetCString(rfdatum));
1010 28 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
1011 28 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
1012 28 : TextDatumGetCString(rfdatum));
1013 :
1014 28 : ReleaseSysCache(rftuple);
1015 : } /* loop all subscribed publications */
1016 :
1017 : /* Clean the row filter */
1018 2400 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1019 : {
1020 1800 : if (no_filter[idx])
1021 : {
1022 1716 : list_free_deep(rfnodes[idx]);
1023 1716 : rfnodes[idx] = NIL;
1024 : }
1025 : }
1026 :
1027 600 : if (has_filter)
1028 : {
1029 34 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1030 :
1031 34 : pgoutput_ensure_entry_cxt(data, entry);
1032 :
1033 : /*
1034 : * Now all the filters for all pubactions are known. Combine them when
1035 : * their pubactions are the same.
1036 : */
1037 34 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1038 34 : entry->estate = create_estate_for_relation(relation);
1039 136 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1040 : {
1041 102 : List *filters = NIL;
1042 : Expr *rfnode;
1043 :
1044 102 : if (rfnodes[idx] == NIL)
1045 42 : continue;
1046 :
1047 126 : foreach(lc, rfnodes[idx])
1048 66 : filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1049 :
1050 : /* combine the row filter and cache the ExprState */
1051 60 : rfnode = make_orclause(filters);
1052 60 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1053 : } /* for each pubaction */
1054 34 : MemoryContextSwitchTo(oldctx);
1055 :
1056 34 : RelationClose(relation);
1057 : }
1058 600 : }
1059 :
1060 : /*
1061 : * If the table contains a generated column, check for any conflicting
1062 : * values of 'publish_generated_columns' parameter in the publications.
1063 : */
1064 : static void
1065 600 : check_and_init_gencol(PGOutputData *data, List *publications,
1066 : RelationSyncEntry *entry)
1067 : {
1068 600 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1069 600 : TupleDesc desc = RelationGetDescr(relation);
1070 600 : bool gencolpresent = false;
1071 600 : bool first = true;
1072 :
1073 : /* Check if there is any generated column present. */
1074 1812 : for (int i = 0; i < desc->natts; i++)
1075 : {
1076 1226 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1077 :
1078 1226 : if (att->attgenerated)
1079 : {
1080 14 : gencolpresent = true;
1081 14 : break;
1082 : }
1083 : }
1084 :
1085 : /* There are no generated columns to be published. */
1086 600 : if (!gencolpresent)
1087 : {
1088 586 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1089 586 : return;
1090 : }
1091 :
1092 : /*
1093 : * There may be a conflicting value for 'publish_generated_columns'
1094 : * parameter in the publications.
1095 : */
1096 44 : foreach_ptr(Publication, pub, publications)
1097 : {
1098 : /*
1099 : * The column list takes precedence over the
1100 : * 'publish_generated_columns' parameter. Those will be checked later,
1101 : * see pgoutput_column_list_init.
1102 : */
1103 16 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1104 6 : continue;
1105 :
1106 10 : if (first)
1107 : {
1108 10 : entry->include_gencols_type = pub->pubgencols_type;
1109 10 : first = false;
1110 : }
1111 0 : else if (entry->include_gencols_type != pub->pubgencols_type)
1112 0 : ereport(ERROR,
1113 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1114 : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1115 : get_namespace_name(RelationGetNamespace(relation)),
1116 : RelationGetRelationName(relation)));
1117 : }
1118 : }
1119 :
1120 : /*
1121 : * Initialize the column list.
1122 : */
1123 : static void
1124 600 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1125 : RelationSyncEntry *entry)
1126 : {
1127 : ListCell *lc;
1128 600 : bool first = true;
1129 600 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1130 600 : bool found_pub_collist = false;
1131 600 : Bitmapset *relcols = NULL;
1132 :
1133 600 : pgoutput_ensure_entry_cxt(data, entry);
1134 :
1135 : /*
1136 : * Find if there are any column lists for this relation. If there are,
1137 : * build a bitmap using the column lists.
1138 : *
1139 : * Multiple publications might have multiple column lists for this
1140 : * relation.
1141 : *
1142 : * Note that we don't support the case where the column list is different
1143 : * for the same table when combining publications. See comments atop
1144 : * fetch_relation_list. But one can later change the publication so we
1145 : * still need to check all the given publication-table mappings and report
1146 : * an error if any publications have a different column list.
1147 : */
1148 1220 : foreach(lc, publications)
1149 : {
1150 622 : Publication *pub = lfirst(lc);
1151 622 : Bitmapset *cols = NULL;
1152 :
1153 : /* Retrieve the bitmap of columns for a column list publication. */
1154 622 : found_pub_collist |= check_and_fetch_column_list(pub,
1155 : entry->publish_as_relid,
1156 : entry->entry_cxt, &cols);
1157 :
1158 : /*
1159 : * For non-column list publications — e.g. TABLE (without a column
1160 : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1161 : * of the table (including generated columns when
1162 : * 'publish_generated_columns' parameter is true).
1163 : */
1164 622 : if (!cols)
1165 : {
1166 : /*
1167 : * Cache the table columns for the first publication with no
1168 : * specified column list to detect publication with a different
1169 : * column list.
1170 : */
1171 544 : if (!relcols && (list_length(publications) > 1))
1172 : {
1173 18 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1174 :
1175 18 : relcols = pub_form_cols_map(relation,
1176 : entry->include_gencols_type);
1177 18 : MemoryContextSwitchTo(oldcxt);
1178 : }
1179 :
1180 544 : cols = relcols;
1181 : }
1182 :
1183 622 : if (first)
1184 : {
1185 600 : entry->columns = cols;
1186 600 : first = false;
1187 : }
1188 22 : else if (!bms_equal(entry->columns, cols))
1189 2 : ereport(ERROR,
1190 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1191 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1192 : get_namespace_name(RelationGetNamespace(relation)),
1193 : RelationGetRelationName(relation)));
1194 : } /* loop all subscribed publications */
1195 :
1196 : /*
1197 : * If no column list publications exist, columns to be published will be
1198 : * computed later according to the 'publish_generated_columns' parameter.
1199 : */
1200 598 : if (!found_pub_collist)
1201 526 : entry->columns = NULL;
1202 :
1203 598 : RelationClose(relation);
1204 598 : }
1205 :
1206 : /*
1207 : * Initialize the slot for storing new and old tuples, and build the map that
1208 : * will be used to convert the relation's tuples into the ancestor's format.
1209 : */
1210 : static void
1211 600 : init_tuple_slot(PGOutputData *data, Relation relation,
1212 : RelationSyncEntry *entry)
1213 : {
1214 : MemoryContext oldctx;
1215 : TupleDesc oldtupdesc;
1216 : TupleDesc newtupdesc;
1217 :
1218 600 : oldctx = MemoryContextSwitchTo(data->cachectx);
1219 :
1220 : /*
1221 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1222 : * live as long as the cache remains.
1223 : */
1224 600 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1225 600 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1226 :
1227 600 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1228 600 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1229 :
1230 600 : MemoryContextSwitchTo(oldctx);
1231 :
1232 : /*
1233 : * Cache the map that will be used to convert the relation's tuples into
1234 : * the ancestor's format, if needed.
1235 : */
1236 600 : if (entry->publish_as_relid != RelationGetRelid(relation))
1237 : {
1238 76 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1239 76 : TupleDesc indesc = RelationGetDescr(relation);
1240 76 : TupleDesc outdesc = RelationGetDescr(ancestor);
1241 :
1242 : /* Map must live as long as the logical decoding context. */
1243 76 : oldctx = MemoryContextSwitchTo(data->cachectx);
1244 :
1245 76 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1246 :
1247 76 : MemoryContextSwitchTo(oldctx);
1248 76 : RelationClose(ancestor);
1249 : }
1250 600 : }
1251 :
1252 : /*
1253 : * Change is checked against the row filter if any.
1254 : *
1255 : * Returns true if the change is to be replicated, else false.
1256 : *
1257 : * For inserts, evaluate the row filter for new tuple.
1258 : * For deletes, evaluate the row filter for old tuple.
1259 : * For updates, evaluate the row filter for old and new tuple.
1260 : *
1261 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1262 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1263 : * only one of the tuples matches the row filter expression, we transform
1264 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1265 : * following rules:
1266 : *
1267 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1268 : * Case 2: old-row (no match) new row (match) -> INSERT
1269 : * Case 3: old-row (match) new-row (no match) -> DELETE
1270 : * Case 4: old-row (match) new row (match) -> UPDATE
1271 : *
1272 : * The new action is updated in the action parameter.
1273 : *
1274 : * The new slot could be updated when transforming the UPDATE into INSERT,
1275 : * because the original new tuple might not have column values from the replica
1276 : * identity.
1277 : *
1278 : * Examples:
1279 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1280 : * Since the old tuple satisfies, the initial table synchronization copied this
1281 : * row (or another method was used to guarantee that there is data
1282 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1283 : * row filter, so from a data consistency perspective, that row should be
1284 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1285 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1286 : * is undesirable because it doesn't reflect what was defined in the row filter
1287 : * expression on the publisher. This row on the subscriber would likely not be
1288 : * modified by replication again. If someone inserted a new row with the same
1289 : * old identifier, replication could stop due to a constraint violation.
1290 : *
1291 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1292 : * Since the old tuple doesn't satisfy, the initial table synchronization
1293 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1294 : * satisfy the row filter, so from a data consistency perspective, that row
1295 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1296 : * statements have no effect (it matches no row -- see
1297 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1298 : * INSERT statement and be sent to the subscriber. However, this might surprise
1299 : * someone who expects the data set to satisfy the row filter expression on the
1300 : * provider.
1301 : */
1302 : static bool
1303 364514 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1304 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1305 : ReorderBufferChangeType *action)
1306 : {
1307 : TupleDesc desc;
1308 : int i;
1309 : bool old_matched,
1310 : new_matched,
1311 : result;
1312 : TupleTableSlot *tmp_new_slot;
1313 364514 : TupleTableSlot *new_slot = *new_slot_ptr;
1314 : ExprContext *ecxt;
1315 : ExprState *filter_exprstate;
1316 :
1317 : /*
1318 : * We need this map to avoid relying on ReorderBufferChangeType enums
1319 : * having specific values.
1320 : */
1321 : static const int map_changetype_pubaction[] = {
1322 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1323 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1324 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1325 : };
1326 :
1327 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1328 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1329 : *action == REORDER_BUFFER_CHANGE_DELETE);
1330 :
1331 : Assert(new_slot || old_slot);
1332 :
1333 : /* Get the corresponding row filter */
1334 364514 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1335 :
1336 : /* Bail out if there is no row filter */
1337 364514 : if (!filter_exprstate)
1338 364446 : return true;
1339 :
1340 68 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1341 : get_namespace_name(RelationGetNamespace(relation)),
1342 : RelationGetRelationName(relation));
1343 :
1344 68 : ResetPerTupleExprContext(entry->estate);
1345 :
1346 68 : ecxt = GetPerTupleExprContext(entry->estate);
1347 :
1348 : /*
1349 : * For the following occasions where there is only one tuple, we can
1350 : * evaluate the row filter for that tuple and return.
1351 : *
1352 : * For inserts, we only have the new tuple.
1353 : *
1354 : * For updates, we can have only a new tuple when none of the replica
1355 : * identity columns changed and none of those columns have external data
1356 : * but we still need to evaluate the row filter for the new tuple as the
1357 : * existing values of those columns might not match the filter. Also,
1358 : * users can use constant expressions in the row filter, so we anyway need
1359 : * to evaluate it for the new tuple.
1360 : *
1361 : * For deletes, we only have the old tuple.
1362 : */
1363 68 : if (!new_slot || !old_slot)
1364 : {
1365 60 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1366 60 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1367 :
1368 60 : return result;
1369 : }
1370 :
1371 : /*
1372 : * Both the old and new tuples must be valid only for updates and need to
1373 : * be checked against the row filter.
1374 : */
1375 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1376 :
1377 8 : slot_getallattrs(new_slot);
1378 8 : slot_getallattrs(old_slot);
1379 :
1380 8 : tmp_new_slot = NULL;
1381 8 : desc = RelationGetDescr(relation);
1382 :
1383 : /*
1384 : * The new tuple might not have all the replica identity columns, in which
1385 : * case it needs to be copied over from the old tuple.
1386 : */
1387 24 : for (i = 0; i < desc->natts; i++)
1388 : {
1389 16 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1390 :
1391 : /*
1392 : * if the column in the new tuple or old tuple is null, nothing to do
1393 : */
1394 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1395 2 : continue;
1396 :
1397 : /*
1398 : * Unchanged toasted replica identity columns are only logged in the
1399 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1400 : * Logged) toast values are always assembled in memory and set as
1401 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1402 : */
1403 22 : if (att->attlen == -1 &&
1404 8 : VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
1405 2 : !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
1406 : {
1407 2 : if (!tmp_new_slot)
1408 : {
1409 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1410 2 : ExecClearTuple(tmp_new_slot);
1411 :
1412 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1413 2 : desc->natts * sizeof(Datum));
1414 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1415 2 : desc->natts * sizeof(bool));
1416 : }
1417 :
1418 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1419 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1420 : }
1421 : }
1422 :
1423 8 : ecxt->ecxt_scantuple = old_slot;
1424 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1425 :
1426 8 : if (tmp_new_slot)
1427 : {
1428 2 : ExecStoreVirtualTuple(tmp_new_slot);
1429 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1430 : }
1431 : else
1432 6 : ecxt->ecxt_scantuple = new_slot;
1433 :
1434 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1435 :
1436 : /*
1437 : * Case 1: if both tuples don't match the row filter, bailout. Send
1438 : * nothing.
1439 : */
1440 8 : if (!old_matched && !new_matched)
1441 0 : return false;
1442 :
1443 : /*
1444 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1445 : * tuple does, transform the UPDATE into INSERT.
1446 : *
1447 : * Use the newly transformed tuple that must contain the column values for
1448 : * all the replica identity columns. This is required to ensure that the
1449 : * while inserting the tuple in the downstream node, we have all the
1450 : * required column values.
1451 : */
1452 8 : if (!old_matched && new_matched)
1453 : {
1454 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1455 :
1456 4 : if (tmp_new_slot)
1457 2 : *new_slot_ptr = tmp_new_slot;
1458 : }
1459 :
1460 : /*
1461 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1462 : * doesn't, transform the UPDATE into DELETE.
1463 : *
1464 : * This transformation does not require another tuple. The Old tuple will
1465 : * be used for DELETE.
1466 : */
1467 4 : else if (old_matched && !new_matched)
1468 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1469 :
1470 : /*
1471 : * Case 4: if both tuples match the row filter, transformation isn't
1472 : * required. (*action is default UPDATE).
1473 : */
1474 :
1475 8 : return true;
1476 : }
1477 :
1478 : /*
1479 : * Sends the decoded DML over wire.
1480 : *
1481 : * This is called both in streaming and non-streaming modes.
1482 : */
1483 : static void
1484 366866 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1485 : Relation relation, ReorderBufferChange *change)
1486 : {
1487 366866 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1488 366866 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1489 : MemoryContext old;
1490 : RelationSyncEntry *relentry;
1491 366866 : TransactionId xid = InvalidTransactionId;
1492 366866 : Relation ancestor = NULL;
1493 366866 : Relation targetrel = relation;
1494 366866 : ReorderBufferChangeType action = change->action;
1495 366866 : TupleTableSlot *old_slot = NULL;
1496 366866 : TupleTableSlot *new_slot = NULL;
1497 :
1498 366866 : if (!is_publishable_relation(relation))
1499 2350 : return;
1500 :
1501 : /*
1502 : * Remember the xid for the change in streaming mode. We need to send xid
1503 : * with each change in the streaming mode so that subscriber can make
1504 : * their association and on aborts, it can discard the corresponding
1505 : * changes.
1506 : */
1507 366866 : if (data->in_streaming)
1508 351846 : xid = change->txn->xid;
1509 :
1510 366866 : relentry = get_rel_sync_entry(data, relation);
1511 :
1512 : /* First check the table filter */
1513 366864 : switch (action)
1514 : {
1515 212028 : case REORDER_BUFFER_CHANGE_INSERT:
1516 212028 : if (!relentry->pubactions.pubinsert)
1517 156 : return;
1518 211872 : break;
1519 68970 : case REORDER_BUFFER_CHANGE_UPDATE:
1520 68970 : if (!relentry->pubactions.pubupdate)
1521 88 : return;
1522 68882 : break;
1523 85866 : case REORDER_BUFFER_CHANGE_DELETE:
1524 85866 : if (!relentry->pubactions.pubdelete)
1525 2106 : return;
1526 :
1527 : /*
1528 : * This is only possible if deletes are allowed even when replica
1529 : * identity is not defined for a table. Since the DELETE action
1530 : * can't be published, we simply return.
1531 : */
1532 83760 : if (!change->data.tp.oldtuple)
1533 : {
1534 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1535 0 : return;
1536 : }
1537 83760 : break;
1538 364514 : default:
1539 : Assert(false);
1540 : }
1541 :
1542 : /* Avoid leaking memory by using and resetting our own context */
1543 364514 : old = MemoryContextSwitchTo(data->context);
1544 :
1545 : /* Switch relation if publishing via root. */
1546 364514 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1547 : {
1548 : Assert(relation->rd_rel->relispartition);
1549 142 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1550 142 : targetrel = ancestor;
1551 : }
1552 :
1553 364514 : if (change->data.tp.oldtuple)
1554 : {
1555 84028 : old_slot = relentry->old_slot;
1556 84028 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1557 :
1558 : /* Convert tuple if needed. */
1559 84028 : if (relentry->attrmap)
1560 : {
1561 10 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1562 : &TTSOpsVirtual);
1563 :
1564 10 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1565 : }
1566 : }
1567 :
1568 364514 : if (change->data.tp.newtuple)
1569 : {
1570 280754 : new_slot = relentry->new_slot;
1571 280754 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1572 :
1573 : /* Convert tuple if needed. */
1574 280754 : if (relentry->attrmap)
1575 : {
1576 42 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1577 : &TTSOpsVirtual);
1578 :
1579 42 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1580 : }
1581 : }
1582 :
1583 : /*
1584 : * Check row filter.
1585 : *
1586 : * Updates could be transformed to inserts or deletes based on the results
1587 : * of the row filter for old and new tuple.
1588 : */
1589 364514 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1590 24 : goto cleanup;
1591 :
1592 : /*
1593 : * Send BEGIN if we haven't yet.
1594 : *
1595 : * We send the BEGIN message after ensuring that we will actually send the
1596 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1597 : * transactions.
1598 : */
1599 364490 : if (txndata && !txndata->sent_begin_txn)
1600 866 : pgoutput_send_begin(ctx, txn);
1601 :
1602 : /*
1603 : * Schema should be sent using the original relation because it also sends
1604 : * the ancestor's relation.
1605 : */
1606 364488 : maybe_send_schema(ctx, change, relation, relentry);
1607 :
1608 364488 : OutputPluginPrepareWrite(ctx, true);
1609 :
1610 : /* Send the data */
1611 364488 : switch (action)
1612 : {
1613 211850 : case REORDER_BUFFER_CHANGE_INSERT:
1614 211850 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1615 211850 : data->binary, relentry->columns,
1616 : relentry->include_gencols_type);
1617 211850 : break;
1618 68876 : case REORDER_BUFFER_CHANGE_UPDATE:
1619 68876 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1620 68876 : new_slot, data->binary, relentry->columns,
1621 : relentry->include_gencols_type);
1622 68876 : break;
1623 83762 : case REORDER_BUFFER_CHANGE_DELETE:
1624 83762 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1625 83762 : data->binary, relentry->columns,
1626 : relentry->include_gencols_type);
1627 83762 : break;
1628 364488 : default:
1629 : Assert(false);
1630 : }
1631 :
1632 364488 : OutputPluginWrite(ctx, true);
1633 :
1634 364512 : cleanup:
1635 364512 : if (RelationIsValid(ancestor))
1636 : {
1637 140 : RelationClose(ancestor);
1638 140 : ancestor = NULL;
1639 : }
1640 :
1641 : /* Drop the new slots that were used to store the converted tuples. */
1642 364512 : if (relentry->attrmap)
1643 : {
1644 52 : if (old_slot)
1645 10 : ExecDropSingleTupleTableSlot(old_slot);
1646 :
1647 52 : if (new_slot)
1648 42 : ExecDropSingleTupleTableSlot(new_slot);
1649 : }
1650 :
1651 364512 : MemoryContextSwitchTo(old);
1652 364512 : MemoryContextReset(data->context);
1653 : }
1654 :
1655 : static void
1656 36 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1657 : int nrelations, Relation relations[], ReorderBufferChange *change)
1658 : {
1659 36 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1660 36 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1661 : MemoryContext old;
1662 : RelationSyncEntry *relentry;
1663 : int i;
1664 : int nrelids;
1665 : Oid *relids;
1666 36 : TransactionId xid = InvalidTransactionId;
1667 :
1668 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1669 36 : if (data->in_streaming)
1670 0 : xid = change->txn->xid;
1671 :
1672 36 : old = MemoryContextSwitchTo(data->context);
1673 :
1674 36 : relids = palloc0(nrelations * sizeof(Oid));
1675 36 : nrelids = 0;
1676 :
1677 110 : for (i = 0; i < nrelations; i++)
1678 : {
1679 74 : Relation relation = relations[i];
1680 74 : Oid relid = RelationGetRelid(relation);
1681 :
1682 74 : if (!is_publishable_relation(relation))
1683 0 : continue;
1684 :
1685 74 : relentry = get_rel_sync_entry(data, relation);
1686 :
1687 74 : if (!relentry->pubactions.pubtruncate)
1688 32 : continue;
1689 :
1690 : /*
1691 : * Don't send partitions if the publication wants to send only the
1692 : * root tables through it.
1693 : */
1694 42 : if (relation->rd_rel->relispartition &&
1695 30 : relentry->publish_as_relid != relid)
1696 6 : continue;
1697 :
1698 36 : relids[nrelids++] = relid;
1699 :
1700 : /* Send BEGIN if we haven't yet */
1701 36 : if (txndata && !txndata->sent_begin_txn)
1702 24 : pgoutput_send_begin(ctx, txn);
1703 :
1704 36 : maybe_send_schema(ctx, change, relation, relentry);
1705 : }
1706 :
1707 36 : if (nrelids > 0)
1708 : {
1709 24 : OutputPluginPrepareWrite(ctx, true);
1710 24 : logicalrep_write_truncate(ctx->out,
1711 : xid,
1712 : nrelids,
1713 : relids,
1714 24 : change->data.truncate.cascade,
1715 24 : change->data.truncate.restart_seqs);
1716 24 : OutputPluginWrite(ctx, true);
1717 : }
1718 :
1719 36 : MemoryContextSwitchTo(old);
1720 36 : MemoryContextReset(data->context);
1721 36 : }
1722 :
1723 : static void
1724 14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1725 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1726 : const char *message)
1727 : {
1728 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1729 14 : TransactionId xid = InvalidTransactionId;
1730 :
1731 14 : if (!data->messages)
1732 4 : return;
1733 :
1734 : /*
1735 : * Remember the xid for the message in streaming mode. See
1736 : * pgoutput_change.
1737 : */
1738 10 : if (data->in_streaming)
1739 0 : xid = txn->xid;
1740 :
1741 : /*
1742 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1743 : */
1744 10 : if (transactional)
1745 : {
1746 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1747 :
1748 : /* Send BEGIN if we haven't yet */
1749 4 : if (txndata && !txndata->sent_begin_txn)
1750 4 : pgoutput_send_begin(ctx, txn);
1751 : }
1752 :
1753 10 : OutputPluginPrepareWrite(ctx, true);
1754 10 : logicalrep_write_message(ctx->out,
1755 : xid,
1756 : message_lsn,
1757 : transactional,
1758 : prefix,
1759 : sz,
1760 : message);
1761 10 : OutputPluginWrite(ctx, true);
1762 : }
1763 :
1764 : /*
1765 : * Return true if the data is associated with an origin and the user has
1766 : * requested the changes that don't have an origin, false otherwise.
1767 : */
1768 : static bool
1769 984098 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1770 : RepOriginId origin_id)
1771 : {
1772 984098 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1773 :
1774 984098 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1775 328 : return true;
1776 :
1777 983770 : return false;
1778 : }
1779 :
1780 : /*
1781 : * Shutdown the output plugin.
1782 : *
1783 : * Note, we don't need to clean the data->context, data->cachectx, and
1784 : * data->pubctx as they are child contexts of the ctx->context so they
1785 : * will be cleaned up by logical decoding machinery.
1786 : */
1787 : static void
1788 1066 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1789 : {
1790 1066 : pgoutput_memory_context_reset(NULL);
1791 1066 : }
1792 :
1793 : /*
1794 : * Load publications from the list of publication names.
1795 : *
1796 : * Here, we skip the publications that don't exist yet. This will allow us
1797 : * to silently continue the replication in the absence of a missing publication.
1798 : * This is required because we allow the users to create publications after they
1799 : * have specified the required publications at the time of replication start.
1800 : */
1801 : static List *
1802 392 : LoadPublications(List *pubnames)
1803 : {
1804 392 : List *result = NIL;
1805 : ListCell *lc;
1806 :
1807 888 : foreach(lc, pubnames)
1808 : {
1809 496 : char *pubname = (char *) lfirst(lc);
1810 496 : Publication *pub = GetPublicationByName(pubname, true);
1811 :
1812 496 : if (pub)
1813 492 : result = lappend(result, pub);
1814 : else
1815 4 : ereport(WARNING,
1816 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1817 : errmsg("skipped loading publication \"%s\"", pubname),
1818 : errdetail("The publication does not exist at this point in the WAL."),
1819 : errhint("Create the publication if it does not exist."));
1820 : }
1821 :
1822 392 : return result;
1823 : }
1824 :
1825 : /*
1826 : * Publication syscache invalidation callback.
1827 : *
1828 : * Called for invalidations on pg_publication.
1829 : */
1830 : static void
1831 654 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1832 : {
1833 654 : publications_valid = false;
1834 654 : }
1835 :
1836 : /*
1837 : * START STREAM callback
1838 : */
1839 : static void
1840 1238 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1841 : ReorderBufferTXN *txn)
1842 : {
1843 1238 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1844 1238 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1845 :
1846 : /* we can't nest streaming of transactions */
1847 : Assert(!data->in_streaming);
1848 :
1849 : /*
1850 : * If we already sent the first stream for this transaction then don't
1851 : * send the origin id in the subsequent streams.
1852 : */
1853 1238 : if (rbtxn_is_streamed(txn))
1854 1116 : send_replication_origin = false;
1855 :
1856 1238 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1857 1238 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1858 :
1859 1238 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1860 : send_replication_origin);
1861 :
1862 1238 : OutputPluginWrite(ctx, true);
1863 :
1864 : /* we're streaming a chunk of transaction now */
1865 1238 : data->in_streaming = true;
1866 1238 : }
1867 :
1868 : /*
1869 : * STOP STREAM callback
1870 : */
1871 : static void
1872 1238 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1873 : ReorderBufferTXN *txn)
1874 : {
1875 1238 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1876 :
1877 : /* we should be streaming a transaction */
1878 : Assert(data->in_streaming);
1879 :
1880 1238 : OutputPluginPrepareWrite(ctx, true);
1881 1238 : logicalrep_write_stream_stop(ctx->out);
1882 1238 : OutputPluginWrite(ctx, true);
1883 :
1884 : /* we've stopped streaming a transaction */
1885 1238 : data->in_streaming = false;
1886 1238 : }
1887 :
1888 : /*
1889 : * Notify downstream to discard the streamed transaction (along with all
1890 : * its subtransactions, if it's a toplevel transaction).
1891 : */
1892 : static void
1893 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1894 : ReorderBufferTXN *txn,
1895 : XLogRecPtr abort_lsn)
1896 : {
1897 : ReorderBufferTXN *toptxn;
1898 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1899 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1900 :
1901 : /*
1902 : * The abort should happen outside streaming block, even for streamed
1903 : * transactions. The transaction has to be marked as streamed, though.
1904 : */
1905 : Assert(!data->in_streaming);
1906 :
1907 : /* determine the toplevel transaction */
1908 52 : toptxn = rbtxn_get_toptxn(txn);
1909 :
1910 : Assert(rbtxn_is_streamed(toptxn));
1911 :
1912 52 : OutputPluginPrepareWrite(ctx, true);
1913 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1914 : txn->abort_time, write_abort_info);
1915 :
1916 52 : OutputPluginWrite(ctx, true);
1917 :
1918 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1919 52 : }
1920 :
1921 : /*
1922 : * Notify downstream to apply the streamed transaction (along with all
1923 : * its subtransactions).
1924 : */
1925 : static void
1926 90 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1927 : ReorderBufferTXN *txn,
1928 : XLogRecPtr commit_lsn)
1929 : {
1930 90 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1931 :
1932 : /*
1933 : * The commit should happen outside streaming block, even for streamed
1934 : * transactions. The transaction has to be marked as streamed, though.
1935 : */
1936 : Assert(!data->in_streaming);
1937 : Assert(rbtxn_is_streamed(txn));
1938 :
1939 90 : OutputPluginUpdateProgress(ctx, false);
1940 :
1941 90 : OutputPluginPrepareWrite(ctx, true);
1942 90 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1943 90 : OutputPluginWrite(ctx, true);
1944 :
1945 90 : cleanup_rel_sync_cache(txn->xid, true);
1946 90 : }
1947 :
1948 : /*
1949 : * PREPARE callback (for streaming two-phase commit).
1950 : *
1951 : * Notify the downstream to prepare the transaction.
1952 : */
1953 : static void
1954 26 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1955 : ReorderBufferTXN *txn,
1956 : XLogRecPtr prepare_lsn)
1957 : {
1958 : Assert(rbtxn_is_streamed(txn));
1959 :
1960 26 : OutputPluginUpdateProgress(ctx, false);
1961 26 : OutputPluginPrepareWrite(ctx, true);
1962 26 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1963 26 : OutputPluginWrite(ctx, true);
1964 26 : }
1965 :
1966 : /*
1967 : * Initialize the relation schema sync cache for a decoding session.
1968 : *
1969 : * The hash table is destroyed at the end of a decoding session. While
1970 : * relcache invalidations still exist and will still be invoked, they
1971 : * will just see the null hash table global and take no action.
1972 : */
1973 : static void
1974 798 : init_rel_sync_cache(MemoryContext cachectx)
1975 : {
1976 : HASHCTL ctl;
1977 : static bool relation_callbacks_registered = false;
1978 :
1979 : /* Nothing to do if hash table already exists */
1980 798 : if (RelationSyncCache != NULL)
1981 4 : return;
1982 :
1983 : /* Make a new hash table for the cache */
1984 798 : ctl.keysize = sizeof(Oid);
1985 798 : ctl.entrysize = sizeof(RelationSyncEntry);
1986 798 : ctl.hcxt = cachectx;
1987 :
1988 798 : RelationSyncCache = hash_create("logical replication output relation cache",
1989 : 128, &ctl,
1990 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1991 :
1992 : Assert(RelationSyncCache != NULL);
1993 :
1994 : /* No more to do if we already registered callbacks */
1995 798 : if (relation_callbacks_registered)
1996 4 : return;
1997 :
1998 : /* We must update the cache entry for a relation after a relcache flush */
1999 794 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
2000 :
2001 : /*
2002 : * Flush all cache entries after a pg_namespace change, in case it was a
2003 : * schema rename affecting a relation being replicated.
2004 : *
2005 : * XXX: It is not a good idea to invalidate all the relation entries in
2006 : * RelationSyncCache on schema rename. We can optimize it to invalidate
2007 : * only the required relations by either having a specific invalidation
2008 : * message containing impacted relations or by having schema information
2009 : * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
2010 : * passed to the callback.
2011 : */
2012 794 : CacheRegisterSyscacheCallback(NAMESPACEOID,
2013 : rel_sync_cache_publication_cb,
2014 : (Datum) 0);
2015 :
2016 794 : relation_callbacks_registered = true;
2017 : }
2018 :
2019 : /*
2020 : * We expect relatively small number of streamed transactions.
2021 : */
2022 : static bool
2023 351846 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2024 : {
2025 351846 : return list_member_xid(entry->streamed_txns, xid);
2026 : }
2027 :
2028 : /*
2029 : * Add the xid in the rel sync entry for which we have already sent the schema
2030 : * of the relation.
2031 : */
2032 : static void
2033 138 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2034 : {
2035 : MemoryContext oldctx;
2036 :
2037 138 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2038 :
2039 138 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2040 :
2041 138 : MemoryContextSwitchTo(oldctx);
2042 138 : }
2043 :
2044 : /*
2045 : * Find or create entry in the relation schema cache.
2046 : *
2047 : * This looks up publications that the given relation is directly or
2048 : * indirectly part of (the latter if it's really the relation's ancestor that
2049 : * is part of a publication) and fills up the found entry with the information
2050 : * about which operations to publish and whether to use an ancestor's schema
2051 : * when publishing.
2052 : */
2053 : static RelationSyncEntry *
2054 366940 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2055 : {
2056 : RelationSyncEntry *entry;
2057 : bool found;
2058 : MemoryContext oldctx;
2059 366940 : Oid relid = RelationGetRelid(relation);
2060 :
2061 : Assert(RelationSyncCache != NULL);
2062 :
2063 : /* Find cached relation info, creating if not found */
2064 366940 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2065 : &relid,
2066 : HASH_ENTER, &found);
2067 : Assert(entry != NULL);
2068 :
2069 : /* initialize entry, if it's new */
2070 366940 : if (!found)
2071 : {
2072 580 : entry->replicate_valid = false;
2073 580 : entry->schema_sent = false;
2074 580 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2075 580 : entry->streamed_txns = NIL;
2076 580 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2077 580 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2078 580 : entry->new_slot = NULL;
2079 580 : entry->old_slot = NULL;
2080 580 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2081 580 : entry->entry_cxt = NULL;
2082 580 : entry->publish_as_relid = InvalidOid;
2083 580 : entry->columns = NULL;
2084 580 : entry->attrmap = NULL;
2085 : }
2086 :
2087 : /* Validate the entry */
2088 366940 : if (!entry->replicate_valid)
2089 : {
2090 746 : Oid schemaId = get_rel_namespace(relid);
2091 746 : List *pubids = GetRelationPublications(relid);
2092 :
2093 : /*
2094 : * We don't acquire a lock on the namespace system table as we build
2095 : * the cache entry using a historic snapshot and all the later changes
2096 : * are absorbed while decoding WAL.
2097 : */
2098 746 : List *schemaPubids = GetSchemaPublications(schemaId);
2099 : ListCell *lc;
2100 746 : Oid publish_as_relid = relid;
2101 746 : int publish_ancestor_level = 0;
2102 746 : bool am_partition = get_rel_relispartition(relid);
2103 746 : char relkind = get_rel_relkind(relid);
2104 746 : List *rel_publications = NIL;
2105 :
2106 : /* Reload publications if needed before use. */
2107 746 : if (!publications_valid)
2108 : {
2109 392 : MemoryContextReset(data->pubctx);
2110 :
2111 392 : oldctx = MemoryContextSwitchTo(data->pubctx);
2112 392 : data->publications = LoadPublications(data->publication_names);
2113 392 : MemoryContextSwitchTo(oldctx);
2114 392 : publications_valid = true;
2115 : }
2116 :
2117 : /*
2118 : * Reset schema_sent status as the relation definition may have
2119 : * changed. Also reset pubactions to empty in case rel was dropped
2120 : * from a publication. Also free any objects that depended on the
2121 : * earlier definition.
2122 : */
2123 746 : entry->schema_sent = false;
2124 746 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2125 746 : list_free(entry->streamed_txns);
2126 746 : entry->streamed_txns = NIL;
2127 746 : bms_free(entry->columns);
2128 746 : entry->columns = NULL;
2129 746 : entry->pubactions.pubinsert = false;
2130 746 : entry->pubactions.pubupdate = false;
2131 746 : entry->pubactions.pubdelete = false;
2132 746 : entry->pubactions.pubtruncate = false;
2133 :
2134 : /*
2135 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2136 : */
2137 746 : if (entry->old_slot)
2138 : {
2139 122 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2140 :
2141 : Assert(desc->tdrefcount == -1);
2142 :
2143 122 : ExecDropSingleTupleTableSlot(entry->old_slot);
2144 :
2145 : /*
2146 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2147 : * do it now to avoid any leaks.
2148 : */
2149 122 : FreeTupleDesc(desc);
2150 : }
2151 746 : if (entry->new_slot)
2152 : {
2153 122 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2154 :
2155 : Assert(desc->tdrefcount == -1);
2156 :
2157 122 : ExecDropSingleTupleTableSlot(entry->new_slot);
2158 :
2159 : /*
2160 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2161 : * do it now to avoid any leaks.
2162 : */
2163 122 : FreeTupleDesc(desc);
2164 : }
2165 :
2166 746 : entry->old_slot = NULL;
2167 746 : entry->new_slot = NULL;
2168 :
2169 746 : if (entry->attrmap)
2170 6 : free_attrmap(entry->attrmap);
2171 746 : entry->attrmap = NULL;
2172 :
2173 : /*
2174 : * Row filter cache cleanups.
2175 : */
2176 746 : if (entry->entry_cxt)
2177 122 : MemoryContextDelete(entry->entry_cxt);
2178 :
2179 746 : entry->entry_cxt = NULL;
2180 746 : entry->estate = NULL;
2181 746 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2182 :
2183 : /*
2184 : * Build publication cache. We can't use one provided by relcache as
2185 : * relcache considers all publications that the given relation is in,
2186 : * but here we only need to consider ones that the subscriber
2187 : * requested.
2188 : */
2189 1838 : foreach(lc, data->publications)
2190 : {
2191 1092 : Publication *pub = lfirst(lc);
2192 1092 : bool publish = false;
2193 :
2194 : /*
2195 : * Under what relid should we publish changes in this publication?
2196 : * We'll use the top-most relid across all publications. Also
2197 : * track the ancestor level for this publication.
2198 : */
2199 1092 : Oid pub_relid = relid;
2200 1092 : int ancestor_level = 0;
2201 :
2202 : /*
2203 : * If this is a FOR ALL TABLES publication, pick the partition
2204 : * root and set the ancestor level accordingly.
2205 : */
2206 1092 : if (pub->alltables)
2207 : {
2208 166 : publish = true;
2209 166 : if (pub->pubviaroot && am_partition)
2210 : {
2211 28 : List *ancestors = get_partition_ancestors(relid);
2212 :
2213 28 : pub_relid = llast_oid(ancestors);
2214 28 : ancestor_level = list_length(ancestors);
2215 : }
2216 : }
2217 :
2218 1092 : if (!publish)
2219 : {
2220 926 : bool ancestor_published = false;
2221 :
2222 : /*
2223 : * For a partition, check if any of the ancestors are
2224 : * published. If so, note down the topmost ancestor that is
2225 : * published via this publication, which will be used as the
2226 : * relation via which to publish the partition's changes.
2227 : */
2228 926 : if (am_partition)
2229 : {
2230 : Oid ancestor;
2231 : int level;
2232 242 : List *ancestors = get_partition_ancestors(relid);
2233 :
2234 242 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2235 : ancestors,
2236 : &level);
2237 :
2238 242 : if (ancestor != InvalidOid)
2239 : {
2240 96 : ancestor_published = true;
2241 96 : if (pub->pubviaroot)
2242 : {
2243 50 : pub_relid = ancestor;
2244 50 : ancestor_level = level;
2245 : }
2246 : }
2247 : }
2248 :
2249 1452 : if (list_member_oid(pubids, pub->oid) ||
2250 1040 : list_member_oid(schemaPubids, pub->oid) ||
2251 : ancestor_published)
2252 468 : publish = true;
2253 : }
2254 :
2255 : /*
2256 : * If the relation is to be published, determine actions to
2257 : * publish, and list of columns, if appropriate.
2258 : *
2259 : * Don't publish changes for partitioned tables, because
2260 : * publishing those of its partitions suffices, unless partition
2261 : * changes won't be published due to pubviaroot being set.
2262 : */
2263 1092 : if (publish &&
2264 8 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2265 : {
2266 628 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2267 628 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2268 628 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2269 628 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2270 :
2271 : /*
2272 : * We want to publish the changes as the top-most ancestor
2273 : * across all publications. So we need to check if the already
2274 : * calculated level is higher than the new one. If yes, we can
2275 : * ignore the new value (as it's a child). Otherwise the new
2276 : * value is an ancestor, so we keep it.
2277 : */
2278 628 : if (publish_ancestor_level > ancestor_level)
2279 2 : continue;
2280 :
2281 : /*
2282 : * If we found an ancestor higher up in the tree, discard the
2283 : * list of publications through which we replicate it, and use
2284 : * the new ancestor.
2285 : */
2286 626 : if (publish_ancestor_level < ancestor_level)
2287 : {
2288 78 : publish_as_relid = pub_relid;
2289 78 : publish_ancestor_level = ancestor_level;
2290 :
2291 : /* reset the publication list for this relation */
2292 78 : rel_publications = NIL;
2293 : }
2294 : else
2295 : {
2296 : /* Same ancestor level, has to be the same OID. */
2297 : Assert(publish_as_relid == pub_relid);
2298 : }
2299 :
2300 : /* Track publications for this ancestor. */
2301 626 : rel_publications = lappend(rel_publications, pub);
2302 : }
2303 : }
2304 :
2305 746 : entry->publish_as_relid = publish_as_relid;
2306 :
2307 : /*
2308 : * Initialize the tuple slot, map, and row filter. These are only used
2309 : * when publishing inserts, updates, or deletes.
2310 : */
2311 746 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2312 146 : entry->pubactions.pubdelete)
2313 : {
2314 : /* Initialize the tuple slot and map */
2315 600 : init_tuple_slot(data, relation, entry);
2316 :
2317 : /* Initialize the row filter */
2318 600 : pgoutput_row_filter_init(data, rel_publications, entry);
2319 :
2320 : /* Check whether to publish generated columns. */
2321 600 : check_and_init_gencol(data, rel_publications, entry);
2322 :
2323 : /* Initialize the column list */
2324 600 : pgoutput_column_list_init(data, rel_publications, entry);
2325 : }
2326 :
2327 744 : list_free(pubids);
2328 744 : list_free(schemaPubids);
2329 744 : list_free(rel_publications);
2330 :
2331 744 : entry->replicate_valid = true;
2332 : }
2333 :
2334 366938 : return entry;
2335 : }
2336 :
2337 : /*
2338 : * Cleanup list of streamed transactions and update the schema_sent flag.
2339 : *
2340 : * When a streamed transaction commits or aborts, we need to remove the
2341 : * toplevel XID from the schema cache. If the transaction aborted, the
2342 : * subscriber will simply throw away the schema records we streamed, so
2343 : * we don't need to do anything else.
2344 : *
2345 : * If the transaction is committed, the subscriber will update the relation
2346 : * cache - so tweak the schema_sent flag accordingly.
2347 : */
2348 : static void
2349 142 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2350 : {
2351 : HASH_SEQ_STATUS hash_seq;
2352 : RelationSyncEntry *entry;
2353 :
2354 : Assert(RelationSyncCache != NULL);
2355 :
2356 142 : hash_seq_init(&hash_seq, RelationSyncCache);
2357 290 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2358 : {
2359 : /*
2360 : * We can set the schema_sent flag for an entry that has committed xid
2361 : * in the list as that ensures that the subscriber would have the
2362 : * corresponding schema and we don't need to send it unless there is
2363 : * any invalidation for that relation.
2364 : */
2365 334 : foreach_xid(streamed_txn, entry->streamed_txns)
2366 : {
2367 144 : if (xid == streamed_txn)
2368 : {
2369 106 : if (is_commit)
2370 84 : entry->schema_sent = true;
2371 :
2372 106 : entry->streamed_txns =
2373 106 : foreach_delete_current(entry->streamed_txns, streamed_txn);
2374 106 : break;
2375 : }
2376 : }
2377 : }
2378 142 : }
2379 :
2380 : /*
2381 : * Relcache invalidation callback
2382 : */
2383 : static void
2384 8070 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2385 : {
2386 : RelationSyncEntry *entry;
2387 :
2388 : /*
2389 : * We can get here if the plugin was used in SQL interface as the
2390 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2391 : * no way to unregister the relcache invalidation callback.
2392 : */
2393 8070 : if (RelationSyncCache == NULL)
2394 52 : return;
2395 :
2396 : /*
2397 : * Nobody keeps pointers to entries in this hash table around outside
2398 : * logical decoding callback calls - but invalidation events can come in
2399 : * *during* a callback if we do any syscache access in the callback.
2400 : * Because of that we must mark the cache entry as invalid but not damage
2401 : * any of its substructure here. The next get_rel_sync_entry() call will
2402 : * rebuild it all.
2403 : */
2404 8018 : if (OidIsValid(relid))
2405 : {
2406 : /*
2407 : * Getting invalidations for relations that aren't in the table is
2408 : * entirely normal. So we don't care if it's found or not.
2409 : */
2410 7900 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2411 : HASH_FIND, NULL);
2412 7900 : if (entry != NULL)
2413 1332 : entry->replicate_valid = false;
2414 : }
2415 : else
2416 : {
2417 : /* Whole cache must be flushed. */
2418 : HASH_SEQ_STATUS status;
2419 :
2420 118 : hash_seq_init(&status, RelationSyncCache);
2421 240 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2422 : {
2423 122 : entry->replicate_valid = false;
2424 : }
2425 : }
2426 : }
2427 :
2428 : /*
2429 : * Publication relation/schema map syscache invalidation callback
2430 : *
2431 : * Called for invalidations on pg_namespace.
2432 : */
2433 : static void
2434 68 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2435 : {
2436 : HASH_SEQ_STATUS status;
2437 : RelationSyncEntry *entry;
2438 :
2439 : /*
2440 : * We can get here if the plugin was used in SQL interface as the
2441 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2442 : * no way to unregister the invalidation callbacks.
2443 : */
2444 68 : if (RelationSyncCache == NULL)
2445 20 : return;
2446 :
2447 : /*
2448 : * We have no easy way to identify which cache entries this invalidation
2449 : * event might have affected, so just mark them all invalid.
2450 : */
2451 48 : hash_seq_init(&status, RelationSyncCache);
2452 86 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2453 : {
2454 38 : entry->replicate_valid = false;
2455 : }
2456 : }
2457 :
2458 : /* Send Replication origin */
2459 : static void
2460 2172 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2461 : XLogRecPtr origin_lsn, bool send_origin)
2462 : {
2463 2172 : if (send_origin)
2464 : {
2465 : char *origin;
2466 :
2467 : /*----------
2468 : * XXX: which behaviour do we want here?
2469 : *
2470 : * Alternatives:
2471 : * - don't send origin message if origin name not found
2472 : * (that's what we do now)
2473 : * - throw error - that will break replication, not good
2474 : * - send some special "unknown" origin
2475 : *----------
2476 : */
2477 18 : if (replorigin_by_oid(origin_id, true, &origin))
2478 : {
2479 : /* Message boundary */
2480 18 : OutputPluginWrite(ctx, false);
2481 18 : OutputPluginPrepareWrite(ctx, true);
2482 :
2483 18 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2484 : }
2485 : }
2486 2172 : }
|