Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "parser/parse_relation.h"
26 : #include "replication/logical.h"
27 : #include "replication/logicalproto.h"
28 : #include "replication/origin.h"
29 : #include "replication/pgoutput.h"
30 : #include "rewrite/rewriteHandler.h"
31 : #include "utils/builtins.h"
32 : #include "utils/inval.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 : #include "utils/syscache.h"
37 : #include "utils/varlena.h"
38 :
39 1036 : PG_MODULE_MAGIC_EXT(
40 : .name = "pgoutput",
41 : .version = PG_VERSION
42 : );
43 :
44 : static void pgoutput_startup(LogicalDecodingContext *ctx,
45 : OutputPluginOptions *opt, bool is_init);
46 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
47 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
48 : ReorderBufferTXN *txn);
49 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
50 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
51 : static void pgoutput_change(LogicalDecodingContext *ctx,
52 : ReorderBufferTXN *txn, Relation relation,
53 : ReorderBufferChange *change);
54 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
56 : ReorderBufferChange *change);
57 : static void pgoutput_message(LogicalDecodingContext *ctx,
58 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
59 : bool transactional, const char *prefix,
60 : Size sz, const char *message);
61 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
62 : RepOriginId origin_id);
63 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
64 : ReorderBufferTXN *txn);
65 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
67 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
68 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
69 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
70 : ReorderBufferTXN *txn,
71 : XLogRecPtr prepare_end_lsn,
72 : TimestampTz prepare_time);
73 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
74 : ReorderBufferTXN *txn);
75 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
76 : ReorderBufferTXN *txn);
77 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
78 : ReorderBufferTXN *txn,
79 : XLogRecPtr abort_lsn);
80 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn,
82 : XLogRecPtr commit_lsn);
83 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
84 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
85 :
86 : static bool publications_valid;
87 :
88 : static List *LoadPublications(List *pubnames);
89 : static void publication_invalidation_cb(Datum arg, int cacheid,
90 : uint32 hashvalue);
91 : static void send_repl_origin(LogicalDecodingContext *ctx,
92 : RepOriginId origin_id, XLogRecPtr origin_lsn,
93 : bool send_origin);
94 :
95 : /*
96 : * Only 3 publication actions are used for row filtering ("insert", "update",
97 : * "delete"). See RelationSyncEntry.exprstate[].
98 : */
99 : enum RowFilterPubAction
100 : {
101 : PUBACTION_INSERT,
102 : PUBACTION_UPDATE,
103 : PUBACTION_DELETE,
104 : };
105 :
106 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107 :
108 : /*
109 : * Entry in the map used to remember which relation schemas we sent.
110 : *
111 : * The schema_sent flag determines if the current schema record for the
112 : * relation (and for its ancestor if publish_as_relid is set) was already
113 : * sent to the subscriber (in which case we don't need to send it again).
114 : *
115 : * The schema cache on downstream is however updated only at commit time,
116 : * and with streamed transactions the commit order may be different from
117 : * the order the transactions are sent in. Also, the (sub) transactions
118 : * might get aborted so we need to send the schema for each (sub) transaction
119 : * so that we don't lose the schema information on abort. For handling this,
120 : * we maintain the list of xids (streamed_txns) for those we have already sent
121 : * the schema.
122 : *
123 : * For partitions, 'pubactions' considers not only the table's own
124 : * publications, but also those of all of its ancestors.
125 : */
126 : typedef struct RelationSyncEntry
127 : {
128 : Oid relid; /* relation oid */
129 :
130 : bool replicate_valid; /* overall validity flag for entry */
131 :
132 : bool schema_sent;
133 :
134 : /*
135 : * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 : * columns and the 'publish_generated_columns' parameter is set to
137 : * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 : * indicating that no generated columns should be published, unless
139 : * explicitly specified in the column list.
140 : */
141 : PublishGencolsType include_gencols_type;
142 : List *streamed_txns; /* streamed toplevel transactions with this
143 : * schema */
144 :
145 : /* are we publishing this rel? */
146 : PublicationActions pubactions;
147 :
148 : /*
149 : * ExprState array for row filter. Different publication actions don't
150 : * allow multiple expressions to always be combined into one, because
151 : * updates or deletes restrict the column in expression to be part of the
152 : * replica identity index whereas inserts do not have this restriction, so
153 : * there is one ExprState per publication action.
154 : */
155 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
156 : EState *estate; /* executor state used for row filter */
157 : TupleTableSlot *new_slot; /* slot for storing new tuple */
158 : TupleTableSlot *old_slot; /* slot for storing old tuple */
159 :
160 : /*
161 : * OID of the relation to publish changes as. For a partition, this may
162 : * be set to one of its ancestors whose schema will be used when
163 : * replicating changes, if publish_via_partition_root is set for the
164 : * publication.
165 : */
166 : Oid publish_as_relid;
167 :
168 : /*
169 : * Map used when replicating using an ancestor's schema to convert tuples
170 : * from partition's type to the ancestor's; NULL if publish_as_relid is
171 : * same as 'relid' or if unnecessary due to partition and the ancestor
172 : * having identical TupleDesc.
173 : */
174 : AttrMap *attrmap;
175 :
176 : /*
177 : * Columns included in the publication, or NULL if all columns are
178 : * included implicitly. Note that the attnums in this bitmap are not
179 : * shifted by FirstLowInvalidHeapAttributeNumber.
180 : */
181 : Bitmapset *columns;
182 :
183 : /*
184 : * Private context to store additional data for this entry - state for the
185 : * row filter expressions, column list, etc.
186 : */
187 : MemoryContext entry_cxt;
188 : } RelationSyncEntry;
189 :
190 : /*
191 : * Maintain a per-transaction level variable to track whether the transaction
192 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 : * messages for empty transactions which saves network bandwidth.
195 : *
196 : * This optimization is not used for prepared transactions because if the
197 : * WALSender restarts after prepare of a transaction and before commit prepared
198 : * of the same transaction then we won't be able to figure out if we have
199 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 : * because we would have lost the in-memory txndata information that was
201 : * present prior to the restart. This will result in sending a spurious
202 : * COMMIT PREPARED without a corresponding prepared transaction at the
203 : * downstream which would lead to an error when it tries to process it.
204 : *
205 : * XXX We could achieve this optimization by changing protocol to send
206 : * additional information so that downstream can detect that the corresponding
207 : * prepare has not been sent. However, adding such a check for every
208 : * transaction in the downstream could be costly so we might want to do it
209 : * optionally.
210 : *
211 : * We also don't have this optimization for streamed transactions because
212 : * they can contain prepared transactions.
213 : */
214 : typedef struct PGOutputTxnData
215 : {
216 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
217 : } PGOutputTxnData;
218 :
219 : /* Map used to remember which relation schemas we sent. */
220 : static HTAB *RelationSyncCache = NULL;
221 :
222 : static void init_rel_sync_cache(MemoryContext cachectx);
223 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
224 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
225 : Relation relation);
226 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
227 : LogicalDecodingContext *ctx,
228 : RelationSyncEntry *relentry);
229 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
230 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
231 : uint32 hashvalue);
232 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
233 : TransactionId xid);
234 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
235 : TransactionId xid);
236 : static void init_tuple_slot(PGOutputData *data, Relation relation,
237 : RelationSyncEntry *entry);
238 :
239 : /* row filter routines */
240 : static EState *create_estate_for_relation(Relation rel);
241 : static void pgoutput_row_filter_init(PGOutputData *data,
242 : List *publications,
243 : RelationSyncEntry *entry);
244 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
245 : ExprContext *econtext);
246 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
247 : TupleTableSlot **new_slot_ptr,
248 : RelationSyncEntry *entry,
249 : ReorderBufferChangeType *action);
250 :
251 : /* column list routines */
252 : static void pgoutput_column_list_init(PGOutputData *data,
253 : List *publications,
254 : RelationSyncEntry *entry);
255 :
256 : /*
257 : * Specify output plugin callbacks
258 : */
259 : void
260 1414 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
261 : {
262 1414 : cb->startup_cb = pgoutput_startup;
263 1414 : cb->begin_cb = pgoutput_begin_txn;
264 1414 : cb->change_cb = pgoutput_change;
265 1414 : cb->truncate_cb = pgoutput_truncate;
266 1414 : cb->message_cb = pgoutput_message;
267 1414 : cb->commit_cb = pgoutput_commit_txn;
268 :
269 1414 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
270 1414 : cb->prepare_cb = pgoutput_prepare_txn;
271 1414 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
272 1414 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
273 1414 : cb->filter_by_origin_cb = pgoutput_origin_filter;
274 1414 : cb->shutdown_cb = pgoutput_shutdown;
275 :
276 : /* transaction streaming */
277 1414 : cb->stream_start_cb = pgoutput_stream_start;
278 1414 : cb->stream_stop_cb = pgoutput_stream_stop;
279 1414 : cb->stream_abort_cb = pgoutput_stream_abort;
280 1414 : cb->stream_commit_cb = pgoutput_stream_commit;
281 1414 : cb->stream_change_cb = pgoutput_change;
282 1414 : cb->stream_message_cb = pgoutput_message;
283 1414 : cb->stream_truncate_cb = pgoutput_truncate;
284 : /* transaction streaming - two-phase commit */
285 1414 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
286 1414 : }
287 :
288 : static void
289 768 : parse_output_parameters(List *options, PGOutputData *data)
290 : {
291 : ListCell *lc;
292 768 : bool protocol_version_given = false;
293 768 : bool publication_names_given = false;
294 768 : bool binary_option_given = false;
295 768 : bool messages_option_given = false;
296 768 : bool streaming_given = false;
297 768 : bool two_phase_option_given = false;
298 768 : bool origin_option_given = false;
299 :
300 : /* Initialize optional parameters to defaults */
301 768 : data->binary = false;
302 768 : data->streaming = LOGICALREP_STREAM_OFF;
303 768 : data->messages = false;
304 768 : data->two_phase = false;
305 768 : data->publish_no_origin = false;
306 :
307 3832 : foreach(lc, options)
308 : {
309 3064 : DefElem *defel = (DefElem *) lfirst(lc);
310 :
311 : Assert(defel->arg == NULL || IsA(defel->arg, String));
312 :
313 : /* Check each param, whether or not we recognize it */
314 3064 : if (strcmp(defel->defname, "proto_version") == 0)
315 : {
316 : unsigned long parsed;
317 : char *endptr;
318 :
319 768 : if (protocol_version_given)
320 0 : ereport(ERROR,
321 : (errcode(ERRCODE_SYNTAX_ERROR),
322 : errmsg("conflicting or redundant options")));
323 768 : protocol_version_given = true;
324 :
325 768 : errno = 0;
326 768 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
327 768 : if (errno != 0 || *endptr != '\0')
328 0 : ereport(ERROR,
329 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
330 : errmsg("invalid proto_version")));
331 :
332 768 : if (parsed > PG_UINT32_MAX)
333 0 : ereport(ERROR,
334 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
335 : errmsg("proto_version \"%s\" out of range",
336 : strVal(defel->arg))));
337 :
338 768 : data->protocol_version = (uint32) parsed;
339 : }
340 2296 : else if (strcmp(defel->defname, "publication_names") == 0)
341 : {
342 768 : if (publication_names_given)
343 0 : ereport(ERROR,
344 : (errcode(ERRCODE_SYNTAX_ERROR),
345 : errmsg("conflicting or redundant options")));
346 768 : publication_names_given = true;
347 :
348 768 : if (!SplitIdentifierString(strVal(defel->arg), ',',
349 : &data->publication_names))
350 0 : ereport(ERROR,
351 : (errcode(ERRCODE_INVALID_NAME),
352 : errmsg("invalid publication_names syntax")));
353 : }
354 1528 : else if (strcmp(defel->defname, "binary") == 0)
355 : {
356 22 : if (binary_option_given)
357 0 : ereport(ERROR,
358 : (errcode(ERRCODE_SYNTAX_ERROR),
359 : errmsg("conflicting or redundant options")));
360 22 : binary_option_given = true;
361 :
362 22 : data->binary = defGetBoolean(defel);
363 : }
364 1506 : else if (strcmp(defel->defname, "messages") == 0)
365 : {
366 8 : if (messages_option_given)
367 0 : ereport(ERROR,
368 : (errcode(ERRCODE_SYNTAX_ERROR),
369 : errmsg("conflicting or redundant options")));
370 8 : messages_option_given = true;
371 :
372 8 : data->messages = defGetBoolean(defel);
373 : }
374 1498 : else if (strcmp(defel->defname, "streaming") == 0)
375 : {
376 732 : if (streaming_given)
377 0 : ereport(ERROR,
378 : (errcode(ERRCODE_SYNTAX_ERROR),
379 : errmsg("conflicting or redundant options")));
380 732 : streaming_given = true;
381 :
382 732 : data->streaming = defGetStreamingMode(defel);
383 : }
384 766 : else if (strcmp(defel->defname, "two_phase") == 0)
385 : {
386 16 : if (two_phase_option_given)
387 0 : ereport(ERROR,
388 : (errcode(ERRCODE_SYNTAX_ERROR),
389 : errmsg("conflicting or redundant options")));
390 16 : two_phase_option_given = true;
391 :
392 16 : data->two_phase = defGetBoolean(defel);
393 : }
394 750 : else if (strcmp(defel->defname, "origin") == 0)
395 : {
396 : char *origin;
397 :
398 750 : if (origin_option_given)
399 0 : ereport(ERROR,
400 : errcode(ERRCODE_SYNTAX_ERROR),
401 : errmsg("conflicting or redundant options"));
402 750 : origin_option_given = true;
403 :
404 750 : origin = defGetString(defel);
405 750 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
406 42 : data->publish_no_origin = true;
407 708 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
408 708 : data->publish_no_origin = false;
409 : else
410 0 : ereport(ERROR,
411 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
412 : errmsg("unrecognized origin value: \"%s\"", origin));
413 : }
414 : else
415 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
416 : }
417 :
418 : /* Check required options */
419 768 : if (!protocol_version_given)
420 0 : ereport(ERROR,
421 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
422 : errmsg("option \"%s\" missing", "proto_version"));
423 768 : if (!publication_names_given)
424 0 : ereport(ERROR,
425 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
426 : errmsg("option \"%s\" missing", "publication_names"));
427 768 : }
428 :
429 : /*
430 : * Initialize this plugin
431 : */
432 : static void
433 1414 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
434 : bool is_init)
435 : {
436 1414 : PGOutputData *data = palloc0(sizeof(PGOutputData));
437 : static bool publication_callback_registered = false;
438 :
439 : /* Create our memory context for private allocations. */
440 1414 : data->context = AllocSetContextCreate(ctx->context,
441 : "logical replication output context",
442 : ALLOCSET_DEFAULT_SIZES);
443 :
444 1414 : data->cachectx = AllocSetContextCreate(ctx->context,
445 : "logical replication cache context",
446 : ALLOCSET_DEFAULT_SIZES);
447 :
448 1414 : data->pubctx = AllocSetContextCreate(ctx->context,
449 : "logical replication publication list context",
450 : ALLOCSET_SMALL_SIZES);
451 :
452 1414 : ctx->output_plugin_private = data;
453 :
454 : /* This plugin uses binary protocol. */
455 1414 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
456 :
457 : /*
458 : * This is replication start and not slot initialization.
459 : *
460 : * Parse and validate options passed by the client.
461 : */
462 1414 : if (!is_init)
463 : {
464 : /* Parse the params and ERROR if we see any we don't recognize */
465 768 : parse_output_parameters(ctx->output_plugin_options, data);
466 :
467 : /* Check if we support requested protocol */
468 768 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
469 0 : ereport(ERROR,
470 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
471 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
472 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
473 :
474 768 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
475 0 : ereport(ERROR,
476 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
477 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
478 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
479 :
480 : /*
481 : * Decide whether to enable streaming. It is disabled by default, in
482 : * which case we just update the flag in decoding context. Otherwise
483 : * we only allow it with sufficient version of the protocol, and when
484 : * the output plugin supports it.
485 : */
486 768 : if (data->streaming == LOGICALREP_STREAM_OFF)
487 36 : ctx->streaming = false;
488 732 : else if (data->streaming == LOGICALREP_STREAM_ON &&
489 54 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
490 0 : ereport(ERROR,
491 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
492 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
493 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
494 732 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
495 678 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
496 0 : ereport(ERROR,
497 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
499 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
500 732 : else if (!ctx->streaming)
501 0 : ereport(ERROR,
502 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
503 : errmsg("streaming requested, but not supported by output plugin")));
504 :
505 : /*
506 : * Here, we just check whether the two-phase option is passed by
507 : * plugin and decide whether to enable it at later point of time. It
508 : * remains enabled if the previous start-up has done so. But we only
509 : * allow the option to be passed in with sufficient version of the
510 : * protocol, and when the output plugin supports it.
511 : */
512 768 : if (!data->two_phase)
513 752 : ctx->twophase_opt_given = false;
514 16 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
515 0 : ereport(ERROR,
516 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
517 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
518 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
519 16 : else if (!ctx->twophase)
520 0 : ereport(ERROR,
521 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
522 : errmsg("two-phase commit requested, but not supported by output plugin")));
523 : else
524 16 : ctx->twophase_opt_given = true;
525 :
526 : /* Init publication state. */
527 768 : data->publications = NIL;
528 768 : publications_valid = false;
529 :
530 : /*
531 : * Register callback for pg_publication if we didn't already do that
532 : * during some previous call in this process.
533 : */
534 768 : if (!publication_callback_registered)
535 : {
536 764 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
537 : publication_invalidation_cb,
538 : (Datum) 0);
539 764 : CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
540 : (Datum) 0);
541 764 : publication_callback_registered = true;
542 : }
543 :
544 : /* Initialize relation schema cache. */
545 768 : init_rel_sync_cache(CacheMemoryContext);
546 : }
547 : else
548 : {
549 : /*
550 : * Disable the streaming and prepared transactions during the slot
551 : * initialization mode.
552 : */
553 646 : ctx->streaming = false;
554 646 : ctx->twophase = false;
555 : }
556 1414 : }
557 :
558 : /*
559 : * BEGIN callback.
560 : *
561 : * Don't send the BEGIN message here instead postpone it until the first
562 : * change. In logical replication, a common scenario is to replicate a set of
563 : * tables (instead of all tables) and transactions whose changes were on
564 : * the table(s) that are not published will produce empty transactions. These
565 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
566 : * using bandwidth on something with little/no use for logical replication.
567 : */
568 : static void
569 1592 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
570 : {
571 1592 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
572 : sizeof(PGOutputTxnData));
573 :
574 1592 : txn->output_plugin_private = txndata;
575 1592 : }
576 :
577 : /*
578 : * Send BEGIN.
579 : *
580 : * This is called while processing the first change of the transaction.
581 : */
582 : static void
583 858 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
584 : {
585 858 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
586 858 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
587 :
588 : Assert(txndata);
589 : Assert(!txndata->sent_begin_txn);
590 :
591 858 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
592 858 : logicalrep_write_begin(ctx->out, txn);
593 858 : txndata->sent_begin_txn = true;
594 :
595 858 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
596 : send_replication_origin);
597 :
598 858 : OutputPluginWrite(ctx, true);
599 858 : }
600 :
601 : /*
602 : * COMMIT callback
603 : */
604 : static void
605 1588 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
606 : XLogRecPtr commit_lsn)
607 : {
608 1588 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
609 : bool sent_begin_txn;
610 :
611 : Assert(txndata);
612 :
613 : /*
614 : * We don't need to send the commit message unless some relevant change
615 : * from this transaction has been sent to the downstream.
616 : */
617 1588 : sent_begin_txn = txndata->sent_begin_txn;
618 1588 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
619 1588 : pfree(txndata);
620 1588 : txn->output_plugin_private = NULL;
621 :
622 1588 : if (!sent_begin_txn)
623 : {
624 732 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
625 732 : return;
626 : }
627 :
628 856 : OutputPluginPrepareWrite(ctx, true);
629 856 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
630 856 : OutputPluginWrite(ctx, true);
631 : }
632 :
633 : /*
634 : * BEGIN PREPARE callback
635 : */
636 : static void
637 42 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
638 : {
639 42 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
640 :
641 42 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
642 42 : logicalrep_write_begin_prepare(ctx->out, txn);
643 :
644 42 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
645 : send_replication_origin);
646 :
647 42 : OutputPluginWrite(ctx, true);
648 42 : }
649 :
650 : /*
651 : * PREPARE callback
652 : */
653 : static void
654 42 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
655 : XLogRecPtr prepare_lsn)
656 : {
657 42 : OutputPluginUpdateProgress(ctx, false);
658 :
659 42 : OutputPluginPrepareWrite(ctx, true);
660 42 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
661 42 : OutputPluginWrite(ctx, true);
662 42 : }
663 :
664 : /*
665 : * COMMIT PREPARED callback
666 : */
667 : static void
668 48 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
669 : XLogRecPtr commit_lsn)
670 : {
671 48 : OutputPluginUpdateProgress(ctx, false);
672 :
673 48 : OutputPluginPrepareWrite(ctx, true);
674 48 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
675 48 : OutputPluginWrite(ctx, true);
676 48 : }
677 :
678 : /*
679 : * ROLLBACK PREPARED callback
680 : */
681 : static void
682 18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
683 : ReorderBufferTXN *txn,
684 : XLogRecPtr prepare_end_lsn,
685 : TimestampTz prepare_time)
686 : {
687 18 : OutputPluginUpdateProgress(ctx, false);
688 :
689 18 : OutputPluginPrepareWrite(ctx, true);
690 18 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
691 : prepare_time);
692 18 : OutputPluginWrite(ctx, true);
693 18 : }
694 :
695 : /*
696 : * Write the current schema of the relation and its ancestor (if any) if not
697 : * done yet.
698 : */
699 : static void
700 364498 : maybe_send_schema(LogicalDecodingContext *ctx,
701 : ReorderBufferChange *change,
702 : Relation relation, RelationSyncEntry *relentry)
703 : {
704 364498 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
705 : bool schema_sent;
706 364498 : TransactionId xid = InvalidTransactionId;
707 364498 : TransactionId topxid = InvalidTransactionId;
708 :
709 : /*
710 : * Remember XID of the (sub)transaction for the change. We don't care if
711 : * it's top-level transaction or not (we have already sent that XID in
712 : * start of the current streaming block).
713 : *
714 : * If we're not in a streaming block, just use InvalidTransactionId and
715 : * the write methods will not include it.
716 : */
717 364498 : if (data->in_streaming)
718 351882 : xid = change->txn->xid;
719 :
720 364498 : if (rbtxn_is_subtxn(change->txn))
721 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
722 : else
723 344160 : topxid = xid;
724 :
725 : /*
726 : * Do we need to send the schema? We do track streamed transactions
727 : * separately, because those may be applied later (and the regular
728 : * transactions won't see their effects until then) and in an order that
729 : * we don't know at this point.
730 : *
731 : * XXX There is a scope of optimization here. Currently, we always send
732 : * the schema first time in a streaming transaction but we can probably
733 : * avoid that by checking 'relentry->schema_sent' flag. However, before
734 : * doing that we need to study its impact on the case where we have a mix
735 : * of streaming and non-streaming transactions.
736 : */
737 364498 : if (data->in_streaming)
738 351882 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
739 : else
740 12616 : schema_sent = relentry->schema_sent;
741 :
742 : /* Nothing to do if we already sent the schema. */
743 364498 : if (schema_sent)
744 363854 : return;
745 :
746 : /*
747 : * Send the schema. If the changes will be published using an ancestor's
748 : * schema, not the relation's own, send that ancestor's schema before
749 : * sending relation's own (XXX - maybe sending only the former suffices?).
750 : */
751 644 : if (relentry->publish_as_relid != RelationGetRelid(relation))
752 : {
753 64 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
754 :
755 64 : send_relation_and_attrs(ancestor, xid, ctx, relentry);
756 64 : RelationClose(ancestor);
757 : }
758 :
759 644 : send_relation_and_attrs(relation, xid, ctx, relentry);
760 :
761 644 : if (data->in_streaming)
762 142 : set_schema_sent_in_streamed_txn(relentry, topxid);
763 : else
764 502 : relentry->schema_sent = true;
765 : }
766 :
767 : /*
768 : * Sends a relation
769 : */
770 : static void
771 708 : send_relation_and_attrs(Relation relation, TransactionId xid,
772 : LogicalDecodingContext *ctx,
773 : RelationSyncEntry *relentry)
774 : {
775 708 : TupleDesc desc = RelationGetDescr(relation);
776 708 : Bitmapset *columns = relentry->columns;
777 708 : PublishGencolsType include_gencols_type = relentry->include_gencols_type;
778 : int i;
779 :
780 : /*
781 : * Write out type info if needed. We do that only for user-created types.
782 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
783 : * objects with hand-assigned OIDs to be "built in", not for instance any
784 : * function or type defined in the information_schema. This is important
785 : * because only hand-assigned OIDs can be expected to remain stable across
786 : * major versions.
787 : */
788 2164 : for (i = 0; i < desc->natts; i++)
789 : {
790 1456 : Form_pg_attribute att = TupleDescAttr(desc, i);
791 :
792 1456 : if (!logicalrep_should_publish_column(att, columns,
793 : include_gencols_type))
794 142 : continue;
795 :
796 1314 : if (att->atttypid < FirstGenbkiObjectId)
797 1278 : continue;
798 :
799 36 : OutputPluginPrepareWrite(ctx, false);
800 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
801 36 : OutputPluginWrite(ctx, false);
802 : }
803 :
804 708 : OutputPluginPrepareWrite(ctx, false);
805 708 : logicalrep_write_rel(ctx->out, xid, relation, columns,
806 : include_gencols_type);
807 708 : OutputPluginWrite(ctx, false);
808 708 : }
809 :
810 : /*
811 : * Executor state preparation for evaluation of row filter expressions for the
812 : * specified relation.
813 : */
814 : static EState *
815 34 : create_estate_for_relation(Relation rel)
816 : {
817 : EState *estate;
818 : RangeTblEntry *rte;
819 34 : List *perminfos = NIL;
820 :
821 34 : estate = CreateExecutorState();
822 :
823 34 : rte = makeNode(RangeTblEntry);
824 34 : rte->rtekind = RTE_RELATION;
825 34 : rte->relid = RelationGetRelid(rel);
826 34 : rte->relkind = rel->rd_rel->relkind;
827 34 : rte->rellockmode = AccessShareLock;
828 :
829 34 : addRTEPermissionInfo(&perminfos, rte);
830 :
831 34 : ExecInitRangeTable(estate, list_make1(rte), perminfos,
832 : bms_make_singleton(1));
833 :
834 34 : estate->es_output_cid = GetCurrentCommandId(false);
835 :
836 34 : return estate;
837 : }
838 :
839 : /*
840 : * Evaluates row filter.
841 : *
842 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
843 : * isn't replicated.
844 : */
845 : static bool
846 76 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
847 : {
848 : Datum ret;
849 : bool isnull;
850 :
851 : Assert(state != NULL);
852 :
853 76 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
854 :
855 76 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
856 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
857 : isnull ? "true" : "false");
858 :
859 76 : if (isnull)
860 2 : return false;
861 :
862 74 : return DatumGetBool(ret);
863 : }
864 :
865 : /*
866 : * Make sure the per-entry memory context exists.
867 : */
868 : static void
869 602 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
870 : {
871 : Relation relation;
872 :
873 : /* The context may already exist, in which case bail out. */
874 602 : if (entry->entry_cxt)
875 34 : return;
876 :
877 568 : relation = RelationIdGetRelation(entry->publish_as_relid);
878 :
879 568 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
880 : "entry private context",
881 : ALLOCSET_SMALL_SIZES);
882 :
883 568 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
884 : RelationGetRelationName(relation));
885 : }
886 :
887 : /*
888 : * Initialize the row filter.
889 : */
890 : static void
891 568 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
892 : RelationSyncEntry *entry)
893 : {
894 : ListCell *lc;
895 568 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
896 568 : bool no_filter[] = {false, false, false}; /* One per pubaction */
897 : MemoryContext oldctx;
898 : int idx;
899 568 : bool has_filter = true;
900 568 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
901 :
902 : /*
903 : * Find if there are any row filters for this relation. If there are, then
904 : * prepare the necessary ExprState and cache it in entry->exprstate. To
905 : * build an expression state, we need to ensure the following:
906 : *
907 : * All the given publication-table mappings must be checked.
908 : *
909 : * Multiple publications might have multiple row filters for this
910 : * relation. Since row filter usage depends on the DML operation, there
911 : * are multiple lists (one for each operation) to which row filters will
912 : * be appended.
913 : *
914 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
915 : * expression" so it takes precedence.
916 : */
917 610 : foreach(lc, publications)
918 : {
919 576 : Publication *pub = lfirst(lc);
920 576 : HeapTuple rftuple = NULL;
921 576 : Datum rfdatum = 0;
922 576 : bool pub_no_filter = true;
923 :
924 : /*
925 : * If the publication is FOR ALL TABLES, or the publication includes a
926 : * FOR TABLES IN SCHEMA where the table belongs to the referred
927 : * schema, then it is treated the same as if there are no row filters
928 : * (even if other publications have a row filter).
929 : */
930 576 : if (!pub->alltables &&
931 420 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
932 : ObjectIdGetDatum(schemaid),
933 : ObjectIdGetDatum(pub->oid)))
934 : {
935 : /*
936 : * Check for the presence of a row filter in this publication.
937 : */
938 408 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
939 : ObjectIdGetDatum(entry->publish_as_relid),
940 : ObjectIdGetDatum(pub->oid));
941 :
942 408 : if (HeapTupleIsValid(rftuple))
943 : {
944 : /* Null indicates no filter. */
945 384 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
946 : Anum_pg_publication_rel_prqual,
947 : &pub_no_filter);
948 : }
949 : }
950 :
951 576 : if (pub_no_filter)
952 : {
953 548 : if (rftuple)
954 356 : ReleaseSysCache(rftuple);
955 :
956 548 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
957 548 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
958 548 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
959 :
960 : /*
961 : * Quick exit if all the DML actions are publicized via this
962 : * publication.
963 : */
964 548 : if (no_filter[PUBACTION_INSERT] &&
965 548 : no_filter[PUBACTION_UPDATE] &&
966 534 : no_filter[PUBACTION_DELETE])
967 : {
968 534 : has_filter = false;
969 534 : break;
970 : }
971 :
972 : /* No additional work for this publication. Next one. */
973 14 : continue;
974 : }
975 :
976 : /* Form the per pubaction row filter lists. */
977 28 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
978 28 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
979 28 : TextDatumGetCString(rfdatum));
980 28 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
981 28 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
982 28 : TextDatumGetCString(rfdatum));
983 28 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
984 28 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
985 28 : TextDatumGetCString(rfdatum));
986 :
987 28 : ReleaseSysCache(rftuple);
988 : } /* loop all subscribed publications */
989 :
990 : /* Clean the row filter */
991 2272 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
992 : {
993 1704 : if (no_filter[idx])
994 : {
995 1620 : list_free_deep(rfnodes[idx]);
996 1620 : rfnodes[idx] = NIL;
997 : }
998 : }
999 :
1000 568 : if (has_filter)
1001 : {
1002 34 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1003 :
1004 34 : pgoutput_ensure_entry_cxt(data, entry);
1005 :
1006 : /*
1007 : * Now all the filters for all pubactions are known. Combine them when
1008 : * their pubactions are the same.
1009 : */
1010 34 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1011 34 : entry->estate = create_estate_for_relation(relation);
1012 136 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1013 : {
1014 102 : List *filters = NIL;
1015 : Expr *rfnode;
1016 :
1017 102 : if (rfnodes[idx] == NIL)
1018 42 : continue;
1019 :
1020 126 : foreach(lc, rfnodes[idx])
1021 66 : filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1022 :
1023 : /* combine the row filter and cache the ExprState */
1024 60 : rfnode = make_orclause(filters);
1025 60 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1026 : } /* for each pubaction */
1027 34 : MemoryContextSwitchTo(oldctx);
1028 :
1029 34 : RelationClose(relation);
1030 : }
1031 568 : }
1032 :
1033 : /*
1034 : * If the table contains a generated column, check for any conflicting
1035 : * values of 'publish_generated_columns' parameter in the publications.
1036 : */
1037 : static void
1038 568 : check_and_init_gencol(PGOutputData *data, List *publications,
1039 : RelationSyncEntry *entry)
1040 : {
1041 568 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1042 568 : TupleDesc desc = RelationGetDescr(relation);
1043 568 : bool gencolpresent = false;
1044 568 : bool first = true;
1045 :
1046 : /* Check if there is any generated column present. */
1047 1710 : for (int i = 0; i < desc->natts; i++)
1048 : {
1049 1156 : Form_pg_attribute att = TupleDescAttr(desc, i);
1050 :
1051 1156 : if (att->attgenerated)
1052 : {
1053 14 : gencolpresent = true;
1054 14 : break;
1055 : }
1056 : }
1057 :
1058 : /* There are no generated columns to be published. */
1059 568 : if (!gencolpresent)
1060 : {
1061 554 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1062 554 : return;
1063 : }
1064 :
1065 : /*
1066 : * There may be a conflicting value for 'publish_generated_columns'
1067 : * parameter in the publications.
1068 : */
1069 44 : foreach_ptr(Publication, pub, publications)
1070 : {
1071 : /*
1072 : * The column list takes precedence over the
1073 : * 'publish_generated_columns' parameter. Those will be checked later,
1074 : * see pgoutput_column_list_init.
1075 : */
1076 16 : if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1077 6 : continue;
1078 :
1079 10 : if (first)
1080 : {
1081 10 : entry->include_gencols_type = pub->pubgencols_type;
1082 10 : first = false;
1083 : }
1084 0 : else if (entry->include_gencols_type != pub->pubgencols_type)
1085 0 : ereport(ERROR,
1086 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1087 : errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1088 : get_namespace_name(RelationGetNamespace(relation)),
1089 : RelationGetRelationName(relation)));
1090 : }
1091 : }
1092 :
1093 : /*
1094 : * Initialize the column list.
1095 : */
1096 : static void
1097 568 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1098 : RelationSyncEntry *entry)
1099 : {
1100 : ListCell *lc;
1101 568 : bool first = true;
1102 568 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1103 568 : bool found_pub_collist = false;
1104 568 : Bitmapset *relcols = NULL;
1105 :
1106 568 : pgoutput_ensure_entry_cxt(data, entry);
1107 :
1108 : /*
1109 : * Find if there are any column lists for this relation. If there are,
1110 : * build a bitmap using the column lists.
1111 : *
1112 : * Multiple publications might have multiple column lists for this
1113 : * relation.
1114 : *
1115 : * Note that we don't support the case where the column list is different
1116 : * for the same table when combining publications. See comments atop
1117 : * fetch_table_list. But one can later change the publication so we still
1118 : * need to check all the given publication-table mappings and report an
1119 : * error if any publications have a different column list.
1120 : */
1121 1156 : foreach(lc, publications)
1122 : {
1123 590 : Publication *pub = lfirst(lc);
1124 590 : Bitmapset *cols = NULL;
1125 :
1126 : /* Retrieve the bitmap of columns for a column list publication. */
1127 590 : found_pub_collist |= check_and_fetch_column_list(pub,
1128 : entry->publish_as_relid,
1129 : entry->entry_cxt, &cols);
1130 :
1131 : /*
1132 : * For non-column list publications — e.g. TABLE (without a column
1133 : * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1134 : * of the table (including generated columns when
1135 : * 'publish_generated_columns' parameter is true).
1136 : */
1137 590 : if (!cols)
1138 : {
1139 : /*
1140 : * Cache the table columns for the first publication with no
1141 : * specified column list to detect publication with a different
1142 : * column list.
1143 : */
1144 512 : if (!relcols && (list_length(publications) > 1))
1145 : {
1146 18 : MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1147 :
1148 18 : relcols = pub_form_cols_map(relation,
1149 : entry->include_gencols_type);
1150 18 : MemoryContextSwitchTo(oldcxt);
1151 : }
1152 :
1153 512 : cols = relcols;
1154 : }
1155 :
1156 590 : if (first)
1157 : {
1158 568 : entry->columns = cols;
1159 568 : first = false;
1160 : }
1161 22 : else if (!bms_equal(entry->columns, cols))
1162 2 : ereport(ERROR,
1163 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1164 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1165 : get_namespace_name(RelationGetNamespace(relation)),
1166 : RelationGetRelationName(relation)));
1167 : } /* loop all subscribed publications */
1168 :
1169 : /*
1170 : * If no column list publications exist, columns to be published will be
1171 : * computed later according to the 'publish_generated_columns' parameter.
1172 : */
1173 566 : if (!found_pub_collist)
1174 494 : entry->columns = NULL;
1175 :
1176 566 : RelationClose(relation);
1177 566 : }
1178 :
1179 : /*
1180 : * Initialize the slot for storing new and old tuples, and build the map that
1181 : * will be used to convert the relation's tuples into the ancestor's format.
1182 : */
1183 : static void
1184 568 : init_tuple_slot(PGOutputData *data, Relation relation,
1185 : RelationSyncEntry *entry)
1186 : {
1187 : MemoryContext oldctx;
1188 : TupleDesc oldtupdesc;
1189 : TupleDesc newtupdesc;
1190 :
1191 568 : oldctx = MemoryContextSwitchTo(data->cachectx);
1192 :
1193 : /*
1194 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1195 : * live as long as the cache remains.
1196 : */
1197 568 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1198 568 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1199 :
1200 568 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1201 568 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1202 :
1203 568 : MemoryContextSwitchTo(oldctx);
1204 :
1205 : /*
1206 : * Cache the map that will be used to convert the relation's tuples into
1207 : * the ancestor's format, if needed.
1208 : */
1209 568 : if (entry->publish_as_relid != RelationGetRelid(relation))
1210 : {
1211 72 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1212 72 : TupleDesc indesc = RelationGetDescr(relation);
1213 72 : TupleDesc outdesc = RelationGetDescr(ancestor);
1214 :
1215 : /* Map must live as long as the logical decoding context. */
1216 72 : oldctx = MemoryContextSwitchTo(data->cachectx);
1217 :
1218 72 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1219 :
1220 72 : MemoryContextSwitchTo(oldctx);
1221 72 : RelationClose(ancestor);
1222 : }
1223 568 : }
1224 :
1225 : /*
1226 : * Change is checked against the row filter if any.
1227 : *
1228 : * Returns true if the change is to be replicated, else false.
1229 : *
1230 : * For inserts, evaluate the row filter for new tuple.
1231 : * For deletes, evaluate the row filter for old tuple.
1232 : * For updates, evaluate the row filter for old and new tuple.
1233 : *
1234 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1235 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1236 : * only one of the tuples matches the row filter expression, we transform
1237 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1238 : * following rules:
1239 : *
1240 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1241 : * Case 2: old-row (no match) new row (match) -> INSERT
1242 : * Case 3: old-row (match) new-row (no match) -> DELETE
1243 : * Case 4: old-row (match) new row (match) -> UPDATE
1244 : *
1245 : * The new action is updated in the action parameter.
1246 : *
1247 : * The new slot could be updated when transforming the UPDATE into INSERT,
1248 : * because the original new tuple might not have column values from the replica
1249 : * identity.
1250 : *
1251 : * Examples:
1252 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1253 : * Since the old tuple satisfies, the initial table synchronization copied this
1254 : * row (or another method was used to guarantee that there is data
1255 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1256 : * row filter, so from a data consistency perspective, that row should be
1257 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1258 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1259 : * is undesirable because it doesn't reflect what was defined in the row filter
1260 : * expression on the publisher. This row on the subscriber would likely not be
1261 : * modified by replication again. If someone inserted a new row with the same
1262 : * old identifier, replication could stop due to a constraint violation.
1263 : *
1264 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1265 : * Since the old tuple doesn't satisfy, the initial table synchronization
1266 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1267 : * satisfy the row filter, so from a data consistency perspective, that row
1268 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1269 : * statements have no effect (it matches no row -- see
1270 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1271 : * INSERT statement and be sent to the subscriber. However, this might surprise
1272 : * someone who expects the data set to satisfy the row filter expression on the
1273 : * provider.
1274 : */
1275 : static bool
1276 364490 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1277 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1278 : ReorderBufferChangeType *action)
1279 : {
1280 : TupleDesc desc;
1281 : int i;
1282 : bool old_matched,
1283 : new_matched,
1284 : result;
1285 : TupleTableSlot *tmp_new_slot;
1286 364490 : TupleTableSlot *new_slot = *new_slot_ptr;
1287 : ExprContext *ecxt;
1288 : ExprState *filter_exprstate;
1289 :
1290 : /*
1291 : * We need this map to avoid relying on ReorderBufferChangeType enums
1292 : * having specific values.
1293 : */
1294 : static const int map_changetype_pubaction[] = {
1295 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1296 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1297 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1298 : };
1299 :
1300 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1301 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1302 : *action == REORDER_BUFFER_CHANGE_DELETE);
1303 :
1304 : Assert(new_slot || old_slot);
1305 :
1306 : /* Get the corresponding row filter */
1307 364490 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1308 :
1309 : /* Bail out if there is no row filter */
1310 364490 : if (!filter_exprstate)
1311 364422 : return true;
1312 :
1313 68 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1314 : get_namespace_name(RelationGetNamespace(relation)),
1315 : RelationGetRelationName(relation));
1316 :
1317 68 : ResetPerTupleExprContext(entry->estate);
1318 :
1319 68 : ecxt = GetPerTupleExprContext(entry->estate);
1320 :
1321 : /*
1322 : * For the following occasions where there is only one tuple, we can
1323 : * evaluate the row filter for that tuple and return.
1324 : *
1325 : * For inserts, we only have the new tuple.
1326 : *
1327 : * For updates, we can have only a new tuple when none of the replica
1328 : * identity columns changed and none of those columns have external data
1329 : * but we still need to evaluate the row filter for the new tuple as the
1330 : * existing values of those columns might not match the filter. Also,
1331 : * users can use constant expressions in the row filter, so we anyway need
1332 : * to evaluate it for the new tuple.
1333 : *
1334 : * For deletes, we only have the old tuple.
1335 : */
1336 68 : if (!new_slot || !old_slot)
1337 : {
1338 60 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1339 60 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1340 :
1341 60 : return result;
1342 : }
1343 :
1344 : /*
1345 : * Both the old and new tuples must be valid only for updates and need to
1346 : * be checked against the row filter.
1347 : */
1348 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1349 :
1350 8 : slot_getallattrs(new_slot);
1351 8 : slot_getallattrs(old_slot);
1352 :
1353 8 : tmp_new_slot = NULL;
1354 8 : desc = RelationGetDescr(relation);
1355 :
1356 : /*
1357 : * The new tuple might not have all the replica identity columns, in which
1358 : * case it needs to be copied over from the old tuple.
1359 : */
1360 24 : for (i = 0; i < desc->natts; i++)
1361 : {
1362 16 : CompactAttribute *att = TupleDescCompactAttr(desc, i);
1363 :
1364 : /*
1365 : * if the column in the new tuple or old tuple is null, nothing to do
1366 : */
1367 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1368 2 : continue;
1369 :
1370 : /*
1371 : * Unchanged toasted replica identity columns are only logged in the
1372 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1373 : * Logged) toast values are always assembled in memory and set as
1374 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1375 : */
1376 14 : if (att->attlen == -1 &&
1377 8 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1378 2 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1379 : {
1380 2 : if (!tmp_new_slot)
1381 : {
1382 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1383 2 : ExecClearTuple(tmp_new_slot);
1384 :
1385 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1386 2 : desc->natts * sizeof(Datum));
1387 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1388 2 : desc->natts * sizeof(bool));
1389 : }
1390 :
1391 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1392 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1393 : }
1394 : }
1395 :
1396 8 : ecxt->ecxt_scantuple = old_slot;
1397 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1398 :
1399 8 : if (tmp_new_slot)
1400 : {
1401 2 : ExecStoreVirtualTuple(tmp_new_slot);
1402 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1403 : }
1404 : else
1405 6 : ecxt->ecxt_scantuple = new_slot;
1406 :
1407 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1408 :
1409 : /*
1410 : * Case 1: if both tuples don't match the row filter, bailout. Send
1411 : * nothing.
1412 : */
1413 8 : if (!old_matched && !new_matched)
1414 0 : return false;
1415 :
1416 : /*
1417 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1418 : * tuple does, transform the UPDATE into INSERT.
1419 : *
1420 : * Use the newly transformed tuple that must contain the column values for
1421 : * all the replica identity columns. This is required to ensure that the
1422 : * while inserting the tuple in the downstream node, we have all the
1423 : * required column values.
1424 : */
1425 8 : if (!old_matched && new_matched)
1426 : {
1427 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1428 :
1429 4 : if (tmp_new_slot)
1430 2 : *new_slot_ptr = tmp_new_slot;
1431 : }
1432 :
1433 : /*
1434 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1435 : * doesn't, transform the UPDATE into DELETE.
1436 : *
1437 : * This transformation does not require another tuple. The Old tuple will
1438 : * be used for DELETE.
1439 : */
1440 4 : else if (old_matched && !new_matched)
1441 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1442 :
1443 : /*
1444 : * Case 4: if both tuples match the row filter, transformation isn't
1445 : * required. (*action is default UPDATE).
1446 : */
1447 :
1448 8 : return true;
1449 : }
1450 :
1451 : /*
1452 : * Sends the decoded DML over wire.
1453 : *
1454 : * This is called both in streaming and non-streaming modes.
1455 : */
1456 : static void
1457 366796 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1458 : Relation relation, ReorderBufferChange *change)
1459 : {
1460 366796 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1461 366796 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1462 : MemoryContext old;
1463 : RelationSyncEntry *relentry;
1464 366796 : TransactionId xid = InvalidTransactionId;
1465 366796 : Relation ancestor = NULL;
1466 366796 : Relation targetrel = relation;
1467 366796 : ReorderBufferChangeType action = change->action;
1468 366796 : TupleTableSlot *old_slot = NULL;
1469 366796 : TupleTableSlot *new_slot = NULL;
1470 :
1471 366796 : if (!is_publishable_relation(relation))
1472 2304 : return;
1473 :
1474 : /*
1475 : * Remember the xid for the change in streaming mode. We need to send xid
1476 : * with each change in the streaming mode so that subscriber can make
1477 : * their association and on aborts, it can discard the corresponding
1478 : * changes.
1479 : */
1480 366796 : if (data->in_streaming)
1481 351882 : xid = change->txn->xid;
1482 :
1483 366796 : relentry = get_rel_sync_entry(data, relation);
1484 :
1485 : /* First check the table filter */
1486 366794 : switch (action)
1487 : {
1488 211964 : case REORDER_BUFFER_CHANGE_INSERT:
1489 211964 : if (!relentry->pubactions.pubinsert)
1490 132 : return;
1491 211832 : break;
1492 68982 : case REORDER_BUFFER_CHANGE_UPDATE:
1493 68982 : if (!relentry->pubactions.pubupdate)
1494 88 : return;
1495 68894 : break;
1496 85848 : case REORDER_BUFFER_CHANGE_DELETE:
1497 85848 : if (!relentry->pubactions.pubdelete)
1498 2084 : return;
1499 :
1500 : /*
1501 : * This is only possible if deletes are allowed even when replica
1502 : * identity is not defined for a table. Since the DELETE action
1503 : * can't be published, we simply return.
1504 : */
1505 83764 : if (!change->data.tp.oldtuple)
1506 : {
1507 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1508 0 : return;
1509 : }
1510 83764 : break;
1511 364490 : default:
1512 : Assert(false);
1513 : }
1514 :
1515 : /* Avoid leaking memory by using and resetting our own context */
1516 364490 : old = MemoryContextSwitchTo(data->context);
1517 :
1518 : /* Switch relation if publishing via root. */
1519 364490 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1520 : {
1521 : Assert(relation->rd_rel->relispartition);
1522 138 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1523 138 : targetrel = ancestor;
1524 : }
1525 :
1526 364490 : if (change->data.tp.oldtuple)
1527 : {
1528 84032 : old_slot = relentry->old_slot;
1529 84032 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1530 :
1531 : /* Convert tuple if needed. */
1532 84032 : if (relentry->attrmap)
1533 : {
1534 10 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1535 : &TTSOpsVirtual);
1536 :
1537 10 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1538 : }
1539 : }
1540 :
1541 364490 : if (change->data.tp.newtuple)
1542 : {
1543 280726 : new_slot = relentry->new_slot;
1544 280726 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1545 :
1546 : /* Convert tuple if needed. */
1547 280726 : if (relentry->attrmap)
1548 : {
1549 40 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1550 : &TTSOpsVirtual);
1551 :
1552 40 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1553 : }
1554 : }
1555 :
1556 : /*
1557 : * Check row filter.
1558 : *
1559 : * Updates could be transformed to inserts or deletes based on the results
1560 : * of the row filter for old and new tuple.
1561 : */
1562 364490 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1563 24 : goto cleanup;
1564 :
1565 : /*
1566 : * Send BEGIN if we haven't yet.
1567 : *
1568 : * We send the BEGIN message after ensuring that we will actually send the
1569 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1570 : * transactions.
1571 : */
1572 364466 : if (txndata && !txndata->sent_begin_txn)
1573 834 : pgoutput_send_begin(ctx, txn);
1574 :
1575 : /*
1576 : * Schema should be sent using the original relation because it also sends
1577 : * the ancestor's relation.
1578 : */
1579 364466 : maybe_send_schema(ctx, change, relation, relentry);
1580 :
1581 364466 : OutputPluginPrepareWrite(ctx, true);
1582 :
1583 : /* Send the data */
1584 364466 : switch (action)
1585 : {
1586 211812 : case REORDER_BUFFER_CHANGE_INSERT:
1587 211812 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1588 211812 : data->binary, relentry->columns,
1589 : relentry->include_gencols_type);
1590 211812 : break;
1591 68888 : case REORDER_BUFFER_CHANGE_UPDATE:
1592 68888 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1593 68888 : new_slot, data->binary, relentry->columns,
1594 : relentry->include_gencols_type);
1595 68888 : break;
1596 83766 : case REORDER_BUFFER_CHANGE_DELETE:
1597 83766 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1598 83766 : data->binary, relentry->columns,
1599 : relentry->include_gencols_type);
1600 83766 : break;
1601 364466 : default:
1602 : Assert(false);
1603 : }
1604 :
1605 364466 : OutputPluginWrite(ctx, true);
1606 :
1607 364488 : cleanup:
1608 364488 : if (RelationIsValid(ancestor))
1609 : {
1610 138 : RelationClose(ancestor);
1611 138 : ancestor = NULL;
1612 : }
1613 :
1614 : /* Drop the new slots that were used to store the converted tuples. */
1615 364488 : if (relentry->attrmap)
1616 : {
1617 50 : if (old_slot)
1618 10 : ExecDropSingleTupleTableSlot(old_slot);
1619 :
1620 50 : if (new_slot)
1621 40 : ExecDropSingleTupleTableSlot(new_slot);
1622 : }
1623 :
1624 364488 : MemoryContextSwitchTo(old);
1625 364488 : MemoryContextReset(data->context);
1626 : }
1627 :
1628 : static void
1629 28 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1630 : int nrelations, Relation relations[], ReorderBufferChange *change)
1631 : {
1632 28 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1633 28 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1634 : MemoryContext old;
1635 : RelationSyncEntry *relentry;
1636 : int i;
1637 : int nrelids;
1638 : Oid *relids;
1639 28 : TransactionId xid = InvalidTransactionId;
1640 :
1641 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1642 28 : if (data->in_streaming)
1643 0 : xid = change->txn->xid;
1644 :
1645 28 : old = MemoryContextSwitchTo(data->context);
1646 :
1647 28 : relids = palloc0(nrelations * sizeof(Oid));
1648 28 : nrelids = 0;
1649 :
1650 94 : for (i = 0; i < nrelations; i++)
1651 : {
1652 66 : Relation relation = relations[i];
1653 66 : Oid relid = RelationGetRelid(relation);
1654 :
1655 66 : if (!is_publishable_relation(relation))
1656 0 : continue;
1657 :
1658 66 : relentry = get_rel_sync_entry(data, relation);
1659 :
1660 66 : if (!relentry->pubactions.pubtruncate)
1661 28 : continue;
1662 :
1663 : /*
1664 : * Don't send partitions if the publication wants to send only the
1665 : * root tables through it.
1666 : */
1667 38 : if (relation->rd_rel->relispartition &&
1668 30 : relentry->publish_as_relid != relid)
1669 6 : continue;
1670 :
1671 32 : relids[nrelids++] = relid;
1672 :
1673 : /* Send BEGIN if we haven't yet */
1674 32 : if (txndata && !txndata->sent_begin_txn)
1675 20 : pgoutput_send_begin(ctx, txn);
1676 :
1677 32 : maybe_send_schema(ctx, change, relation, relentry);
1678 : }
1679 :
1680 28 : if (nrelids > 0)
1681 : {
1682 20 : OutputPluginPrepareWrite(ctx, true);
1683 20 : logicalrep_write_truncate(ctx->out,
1684 : xid,
1685 : nrelids,
1686 : relids,
1687 20 : change->data.truncate.cascade,
1688 20 : change->data.truncate.restart_seqs);
1689 20 : OutputPluginWrite(ctx, true);
1690 : }
1691 :
1692 28 : MemoryContextSwitchTo(old);
1693 28 : MemoryContextReset(data->context);
1694 28 : }
1695 :
1696 : static void
1697 14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1698 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1699 : const char *message)
1700 : {
1701 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1702 14 : TransactionId xid = InvalidTransactionId;
1703 :
1704 14 : if (!data->messages)
1705 4 : return;
1706 :
1707 : /*
1708 : * Remember the xid for the message in streaming mode. See
1709 : * pgoutput_change.
1710 : */
1711 10 : if (data->in_streaming)
1712 0 : xid = txn->xid;
1713 :
1714 : /*
1715 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1716 : */
1717 10 : if (transactional)
1718 : {
1719 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1720 :
1721 : /* Send BEGIN if we haven't yet */
1722 4 : if (txndata && !txndata->sent_begin_txn)
1723 4 : pgoutput_send_begin(ctx, txn);
1724 : }
1725 :
1726 10 : OutputPluginPrepareWrite(ctx, true);
1727 10 : logicalrep_write_message(ctx->out,
1728 : xid,
1729 : message_lsn,
1730 : transactional,
1731 : prefix,
1732 : sz,
1733 : message);
1734 10 : OutputPluginWrite(ctx, true);
1735 : }
1736 :
1737 : /*
1738 : * Return true if the data is associated with an origin and the user has
1739 : * requested the changes that don't have an origin, false otherwise.
1740 : */
1741 : static bool
1742 799780 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1743 : RepOriginId origin_id)
1744 : {
1745 799780 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1746 :
1747 799780 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1748 162 : return true;
1749 :
1750 799618 : return false;
1751 : }
1752 :
1753 : /*
1754 : * Shutdown the output plugin.
1755 : *
1756 : * Note, we don't need to clean the data->context, data->cachectx, and
1757 : * data->pubctx as they are child contexts of the ctx->context so they
1758 : * will be cleaned up by logical decoding machinery.
1759 : */
1760 : static void
1761 1032 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1762 : {
1763 1032 : if (RelationSyncCache)
1764 : {
1765 386 : hash_destroy(RelationSyncCache);
1766 386 : RelationSyncCache = NULL;
1767 : }
1768 1032 : }
1769 :
1770 : /*
1771 : * Load publications from the list of publication names.
1772 : *
1773 : * Here, we skip the publications that don't exist yet. This will allow us
1774 : * to silently continue the replication in the absence of a missing publication.
1775 : * This is required because we allow the users to create publications after they
1776 : * have specified the required publications at the time of replication start.
1777 : */
1778 : static List *
1779 356 : LoadPublications(List *pubnames)
1780 : {
1781 356 : List *result = NIL;
1782 : ListCell *lc;
1783 :
1784 802 : foreach(lc, pubnames)
1785 : {
1786 446 : char *pubname = (char *) lfirst(lc);
1787 446 : Publication *pub = GetPublicationByName(pubname, true);
1788 :
1789 446 : if (pub)
1790 442 : result = lappend(result, pub);
1791 : else
1792 4 : ereport(WARNING,
1793 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1794 : errmsg("skipped loading publication \"%s\"", pubname),
1795 : errdetail("The publication does not exist at this point in the WAL."),
1796 : errhint("Create the publication if it does not exist."));
1797 : }
1798 :
1799 356 : return result;
1800 : }
1801 :
1802 : /*
1803 : * Publication syscache invalidation callback.
1804 : *
1805 : * Called for invalidations on pg_publication.
1806 : */
1807 : static void
1808 620 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1809 : {
1810 620 : publications_valid = false;
1811 620 : }
1812 :
1813 : /*
1814 : * START STREAM callback
1815 : */
1816 : static void
1817 1276 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1818 : ReorderBufferTXN *txn)
1819 : {
1820 1276 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1821 1276 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1822 :
1823 : /* we can't nest streaming of transactions */
1824 : Assert(!data->in_streaming);
1825 :
1826 : /*
1827 : * If we already sent the first stream for this transaction then don't
1828 : * send the origin id in the subsequent streams.
1829 : */
1830 1276 : if (rbtxn_is_streamed(txn))
1831 1150 : send_replication_origin = false;
1832 :
1833 1276 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1834 1276 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1835 :
1836 1276 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1837 : send_replication_origin);
1838 :
1839 1276 : OutputPluginWrite(ctx, true);
1840 :
1841 : /* we're streaming a chunk of transaction now */
1842 1276 : data->in_streaming = true;
1843 1276 : }
1844 :
1845 : /*
1846 : * STOP STREAM callback
1847 : */
1848 : static void
1849 1276 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1850 : ReorderBufferTXN *txn)
1851 : {
1852 1276 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1853 :
1854 : /* we should be streaming a transaction */
1855 : Assert(data->in_streaming);
1856 :
1857 1276 : OutputPluginPrepareWrite(ctx, true);
1858 1276 : logicalrep_write_stream_stop(ctx->out);
1859 1276 : OutputPluginWrite(ctx, true);
1860 :
1861 : /* we've stopped streaming a transaction */
1862 1276 : data->in_streaming = false;
1863 1276 : }
1864 :
1865 : /*
1866 : * Notify downstream to discard the streamed transaction (along with all
1867 : * its subtransactions, if it's a toplevel transaction).
1868 : */
1869 : static void
1870 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1871 : ReorderBufferTXN *txn,
1872 : XLogRecPtr abort_lsn)
1873 : {
1874 : ReorderBufferTXN *toptxn;
1875 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1876 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1877 :
1878 : /*
1879 : * The abort should happen outside streaming block, even for streamed
1880 : * transactions. The transaction has to be marked as streamed, though.
1881 : */
1882 : Assert(!data->in_streaming);
1883 :
1884 : /* determine the toplevel transaction */
1885 52 : toptxn = rbtxn_get_toptxn(txn);
1886 :
1887 : Assert(rbtxn_is_streamed(toptxn));
1888 :
1889 52 : OutputPluginPrepareWrite(ctx, true);
1890 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1891 : txn->xact_time.abort_time, write_abort_info);
1892 :
1893 52 : OutputPluginWrite(ctx, true);
1894 :
1895 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1896 52 : }
1897 :
1898 : /*
1899 : * Notify downstream to apply the streamed transaction (along with all
1900 : * its subtransactions).
1901 : */
1902 : static void
1903 92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1904 : ReorderBufferTXN *txn,
1905 : XLogRecPtr commit_lsn)
1906 : {
1907 92 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1908 :
1909 : /*
1910 : * The commit should happen outside streaming block, even for streamed
1911 : * transactions. The transaction has to be marked as streamed, though.
1912 : */
1913 : Assert(!data->in_streaming);
1914 : Assert(rbtxn_is_streamed(txn));
1915 :
1916 92 : OutputPluginUpdateProgress(ctx, false);
1917 :
1918 92 : OutputPluginPrepareWrite(ctx, true);
1919 92 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1920 92 : OutputPluginWrite(ctx, true);
1921 :
1922 92 : cleanup_rel_sync_cache(txn->xid, true);
1923 92 : }
1924 :
1925 : /*
1926 : * PREPARE callback (for streaming two-phase commit).
1927 : *
1928 : * Notify the downstream to prepare the transaction.
1929 : */
1930 : static void
1931 28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1932 : ReorderBufferTXN *txn,
1933 : XLogRecPtr prepare_lsn)
1934 : {
1935 : Assert(rbtxn_is_streamed(txn));
1936 :
1937 28 : OutputPluginUpdateProgress(ctx, false);
1938 28 : OutputPluginPrepareWrite(ctx, true);
1939 28 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1940 28 : OutputPluginWrite(ctx, true);
1941 28 : }
1942 :
1943 : /*
1944 : * Initialize the relation schema sync cache for a decoding session.
1945 : *
1946 : * The hash table is destroyed at the end of a decoding session. While
1947 : * relcache invalidations still exist and will still be invoked, they
1948 : * will just see the null hash table global and take no action.
1949 : */
1950 : static void
1951 768 : init_rel_sync_cache(MemoryContext cachectx)
1952 : {
1953 : HASHCTL ctl;
1954 : static bool relation_callbacks_registered = false;
1955 :
1956 : /* Nothing to do if hash table already exists */
1957 768 : if (RelationSyncCache != NULL)
1958 4 : return;
1959 :
1960 : /* Make a new hash table for the cache */
1961 768 : ctl.keysize = sizeof(Oid);
1962 768 : ctl.entrysize = sizeof(RelationSyncEntry);
1963 768 : ctl.hcxt = cachectx;
1964 :
1965 768 : RelationSyncCache = hash_create("logical replication output relation cache",
1966 : 128, &ctl,
1967 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1968 :
1969 : Assert(RelationSyncCache != NULL);
1970 :
1971 : /* No more to do if we already registered callbacks */
1972 768 : if (relation_callbacks_registered)
1973 4 : return;
1974 :
1975 : /* We must update the cache entry for a relation after a relcache flush */
1976 764 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1977 :
1978 : /*
1979 : * Flush all cache entries after a pg_namespace change, in case it was a
1980 : * schema rename affecting a relation being replicated.
1981 : *
1982 : * XXX: It is not a good idea to invalidate all the relation entries in
1983 : * RelationSyncCache on schema rename. We can optimize it to invalidate
1984 : * only the required relations by either having a specific invalidation
1985 : * message containing impacted relations or by having schema information
1986 : * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
1987 : * passed to the callback.
1988 : */
1989 764 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1990 : rel_sync_cache_publication_cb,
1991 : (Datum) 0);
1992 :
1993 764 : relation_callbacks_registered = true;
1994 : }
1995 :
1996 : /*
1997 : * We expect relatively small number of streamed transactions.
1998 : */
1999 : static bool
2000 351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2001 : {
2002 351882 : return list_member_xid(entry->streamed_txns, xid);
2003 : }
2004 :
2005 : /*
2006 : * Add the xid in the rel sync entry for which we have already sent the schema
2007 : * of the relation.
2008 : */
2009 : static void
2010 142 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2011 : {
2012 : MemoryContext oldctx;
2013 :
2014 142 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2015 :
2016 142 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2017 :
2018 142 : MemoryContextSwitchTo(oldctx);
2019 142 : }
2020 :
2021 : /*
2022 : * Find or create entry in the relation schema cache.
2023 : *
2024 : * This looks up publications that the given relation is directly or
2025 : * indirectly part of (the latter if it's really the relation's ancestor that
2026 : * is part of a publication) and fills up the found entry with the information
2027 : * about which operations to publish and whether to use an ancestor's schema
2028 : * when publishing.
2029 : */
2030 : static RelationSyncEntry *
2031 366862 : get_rel_sync_entry(PGOutputData *data, Relation relation)
2032 : {
2033 : RelationSyncEntry *entry;
2034 : bool found;
2035 : MemoryContext oldctx;
2036 366862 : Oid relid = RelationGetRelid(relation);
2037 :
2038 : Assert(RelationSyncCache != NULL);
2039 :
2040 : /* Find cached relation info, creating if not found */
2041 366862 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2042 : &relid,
2043 : HASH_ENTER, &found);
2044 : Assert(entry != NULL);
2045 :
2046 : /* initialize entry, if it's new */
2047 366862 : if (!found)
2048 : {
2049 546 : entry->replicate_valid = false;
2050 546 : entry->schema_sent = false;
2051 546 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2052 546 : entry->streamed_txns = NIL;
2053 546 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2054 546 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2055 546 : entry->new_slot = NULL;
2056 546 : entry->old_slot = NULL;
2057 546 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2058 546 : entry->entry_cxt = NULL;
2059 546 : entry->publish_as_relid = InvalidOid;
2060 546 : entry->columns = NULL;
2061 546 : entry->attrmap = NULL;
2062 : }
2063 :
2064 : /* Validate the entry */
2065 366862 : if (!entry->replicate_valid)
2066 : {
2067 688 : Oid schemaId = get_rel_namespace(relid);
2068 688 : List *pubids = GetRelationPublications(relid);
2069 :
2070 : /*
2071 : * We don't acquire a lock on the namespace system table as we build
2072 : * the cache entry using a historic snapshot and all the later changes
2073 : * are absorbed while decoding WAL.
2074 : */
2075 688 : List *schemaPubids = GetSchemaPublications(schemaId);
2076 : ListCell *lc;
2077 688 : Oid publish_as_relid = relid;
2078 688 : int publish_ancestor_level = 0;
2079 688 : bool am_partition = get_rel_relispartition(relid);
2080 688 : char relkind = get_rel_relkind(relid);
2081 688 : List *rel_publications = NIL;
2082 :
2083 : /* Reload publications if needed before use. */
2084 688 : if (!publications_valid)
2085 : {
2086 356 : MemoryContextReset(data->pubctx);
2087 :
2088 356 : oldctx = MemoryContextSwitchTo(data->pubctx);
2089 356 : data->publications = LoadPublications(data->publication_names);
2090 356 : MemoryContextSwitchTo(oldctx);
2091 356 : publications_valid = true;
2092 : }
2093 :
2094 : /*
2095 : * Reset schema_sent status as the relation definition may have
2096 : * changed. Also reset pubactions to empty in case rel was dropped
2097 : * from a publication. Also free any objects that depended on the
2098 : * earlier definition.
2099 : */
2100 688 : entry->schema_sent = false;
2101 688 : entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2102 688 : list_free(entry->streamed_txns);
2103 688 : entry->streamed_txns = NIL;
2104 688 : bms_free(entry->columns);
2105 688 : entry->columns = NULL;
2106 688 : entry->pubactions.pubinsert = false;
2107 688 : entry->pubactions.pubupdate = false;
2108 688 : entry->pubactions.pubdelete = false;
2109 688 : entry->pubactions.pubtruncate = false;
2110 :
2111 : /*
2112 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2113 : */
2114 688 : if (entry->old_slot)
2115 : {
2116 114 : TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2117 :
2118 : Assert(desc->tdrefcount == -1);
2119 :
2120 114 : ExecDropSingleTupleTableSlot(entry->old_slot);
2121 :
2122 : /*
2123 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2124 : * do it now to avoid any leaks.
2125 : */
2126 114 : FreeTupleDesc(desc);
2127 : }
2128 688 : if (entry->new_slot)
2129 : {
2130 114 : TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2131 :
2132 : Assert(desc->tdrefcount == -1);
2133 :
2134 114 : ExecDropSingleTupleTableSlot(entry->new_slot);
2135 :
2136 : /*
2137 : * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2138 : * do it now to avoid any leaks.
2139 : */
2140 114 : FreeTupleDesc(desc);
2141 : }
2142 :
2143 688 : entry->old_slot = NULL;
2144 688 : entry->new_slot = NULL;
2145 :
2146 688 : if (entry->attrmap)
2147 6 : free_attrmap(entry->attrmap);
2148 688 : entry->attrmap = NULL;
2149 :
2150 : /*
2151 : * Row filter cache cleanups.
2152 : */
2153 688 : if (entry->entry_cxt)
2154 114 : MemoryContextDelete(entry->entry_cxt);
2155 :
2156 688 : entry->entry_cxt = NULL;
2157 688 : entry->estate = NULL;
2158 688 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2159 :
2160 : /*
2161 : * Build publication cache. We can't use one provided by relcache as
2162 : * relcache considers all publications that the given relation is in,
2163 : * but here we only need to consider ones that the subscriber
2164 : * requested.
2165 : */
2166 1706 : foreach(lc, data->publications)
2167 : {
2168 1018 : Publication *pub = lfirst(lc);
2169 1018 : bool publish = false;
2170 :
2171 : /*
2172 : * Under what relid should we publish changes in this publication?
2173 : * We'll use the top-most relid across all publications. Also
2174 : * track the ancestor level for this publication.
2175 : */
2176 1018 : Oid pub_relid = relid;
2177 1018 : int ancestor_level = 0;
2178 :
2179 : /*
2180 : * If this is a FOR ALL TABLES publication, pick the partition
2181 : * root and set the ancestor level accordingly.
2182 : */
2183 1018 : if (pub->alltables)
2184 : {
2185 160 : publish = true;
2186 160 : if (pub->pubviaroot && am_partition)
2187 : {
2188 24 : List *ancestors = get_partition_ancestors(relid);
2189 :
2190 24 : pub_relid = llast_oid(ancestors);
2191 24 : ancestor_level = list_length(ancestors);
2192 : }
2193 : }
2194 :
2195 1018 : if (!publish)
2196 : {
2197 858 : bool ancestor_published = false;
2198 :
2199 : /*
2200 : * For a partition, check if any of the ancestors are
2201 : * published. If so, note down the topmost ancestor that is
2202 : * published via this publication, which will be used as the
2203 : * relation via which to publish the partition's changes.
2204 : */
2205 858 : if (am_partition)
2206 : {
2207 : Oid ancestor;
2208 : int level;
2209 242 : List *ancestors = get_partition_ancestors(relid);
2210 :
2211 242 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2212 : ancestors,
2213 : &level);
2214 :
2215 242 : if (ancestor != InvalidOid)
2216 : {
2217 96 : ancestor_published = true;
2218 96 : if (pub->pubviaroot)
2219 : {
2220 50 : pub_relid = ancestor;
2221 50 : ancestor_level = level;
2222 : }
2223 : }
2224 : }
2225 :
2226 1342 : if (list_member_oid(pubids, pub->oid) ||
2227 956 : list_member_oid(schemaPubids, pub->oid) ||
2228 : ancestor_published)
2229 442 : publish = true;
2230 : }
2231 :
2232 : /*
2233 : * If the relation is to be published, determine actions to
2234 : * publish, and list of columns, if appropriate.
2235 : *
2236 : * Don't publish changes for partitioned tables, because
2237 : * publishing those of its partitions suffices, unless partition
2238 : * changes won't be published due to pubviaroot being set.
2239 : */
2240 1018 : if (publish &&
2241 8 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2242 : {
2243 596 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2244 596 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2245 596 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2246 596 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2247 :
2248 : /*
2249 : * We want to publish the changes as the top-most ancestor
2250 : * across all publications. So we need to check if the already
2251 : * calculated level is higher than the new one. If yes, we can
2252 : * ignore the new value (as it's a child). Otherwise the new
2253 : * value is an ancestor, so we keep it.
2254 : */
2255 596 : if (publish_ancestor_level > ancestor_level)
2256 2 : continue;
2257 :
2258 : /*
2259 : * If we found an ancestor higher up in the tree, discard the
2260 : * list of publications through which we replicate it, and use
2261 : * the new ancestor.
2262 : */
2263 594 : if (publish_ancestor_level < ancestor_level)
2264 : {
2265 74 : publish_as_relid = pub_relid;
2266 74 : publish_ancestor_level = ancestor_level;
2267 :
2268 : /* reset the publication list for this relation */
2269 74 : rel_publications = NIL;
2270 : }
2271 : else
2272 : {
2273 : /* Same ancestor level, has to be the same OID. */
2274 : Assert(publish_as_relid == pub_relid);
2275 : }
2276 :
2277 : /* Track publications for this ancestor. */
2278 594 : rel_publications = lappend(rel_publications, pub);
2279 : }
2280 : }
2281 :
2282 688 : entry->publish_as_relid = publish_as_relid;
2283 :
2284 : /*
2285 : * Initialize the tuple slot, map, and row filter. These are only used
2286 : * when publishing inserts, updates, or deletes.
2287 : */
2288 688 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2289 120 : entry->pubactions.pubdelete)
2290 : {
2291 : /* Initialize the tuple slot and map */
2292 568 : init_tuple_slot(data, relation, entry);
2293 :
2294 : /* Initialize the row filter */
2295 568 : pgoutput_row_filter_init(data, rel_publications, entry);
2296 :
2297 : /* Check whether to publish generated columns. */
2298 568 : check_and_init_gencol(data, rel_publications, entry);
2299 :
2300 : /* Initialize the column list */
2301 568 : pgoutput_column_list_init(data, rel_publications, entry);
2302 : }
2303 :
2304 686 : list_free(pubids);
2305 686 : list_free(schemaPubids);
2306 686 : list_free(rel_publications);
2307 :
2308 686 : entry->replicate_valid = true;
2309 : }
2310 :
2311 366860 : return entry;
2312 : }
2313 :
2314 : /*
2315 : * Cleanup list of streamed transactions and update the schema_sent flag.
2316 : *
2317 : * When a streamed transaction commits or aborts, we need to remove the
2318 : * toplevel XID from the schema cache. If the transaction aborted, the
2319 : * subscriber will simply throw away the schema records we streamed, so
2320 : * we don't need to do anything else.
2321 : *
2322 : * If the transaction is committed, the subscriber will update the relation
2323 : * cache - so tweak the schema_sent flag accordingly.
2324 : */
2325 : static void
2326 144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2327 : {
2328 : HASH_SEQ_STATUS hash_seq;
2329 : RelationSyncEntry *entry;
2330 :
2331 : Assert(RelationSyncCache != NULL);
2332 :
2333 144 : hash_seq_init(&hash_seq, RelationSyncCache);
2334 294 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2335 : {
2336 : /*
2337 : * We can set the schema_sent flag for an entry that has committed xid
2338 : * in the list as that ensures that the subscriber would have the
2339 : * corresponding schema and we don't need to send it unless there is
2340 : * any invalidation for that relation.
2341 : */
2342 340 : foreach_xid(streamed_txn, entry->streamed_txns)
2343 : {
2344 148 : if (xid == streamed_txn)
2345 : {
2346 108 : if (is_commit)
2347 86 : entry->schema_sent = true;
2348 :
2349 108 : entry->streamed_txns =
2350 108 : foreach_delete_current(entry->streamed_txns, streamed_txn);
2351 108 : break;
2352 : }
2353 : }
2354 : }
2355 144 : }
2356 :
2357 : /*
2358 : * Relcache invalidation callback
2359 : */
2360 : static void
2361 7646 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2362 : {
2363 : RelationSyncEntry *entry;
2364 :
2365 : /*
2366 : * We can get here if the plugin was used in SQL interface as the
2367 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2368 : * no way to unregister the relcache invalidation callback.
2369 : */
2370 7646 : if (RelationSyncCache == NULL)
2371 52 : return;
2372 :
2373 : /*
2374 : * Nobody keeps pointers to entries in this hash table around outside
2375 : * logical decoding callback calls - but invalidation events can come in
2376 : * *during* a callback if we do any syscache access in the callback.
2377 : * Because of that we must mark the cache entry as invalid but not damage
2378 : * any of its substructure here. The next get_rel_sync_entry() call will
2379 : * rebuild it all.
2380 : */
2381 7594 : if (OidIsValid(relid))
2382 : {
2383 : /*
2384 : * Getting invalidations for relations that aren't in the table is
2385 : * entirely normal. So we don't care if it's found or not.
2386 : */
2387 7480 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2388 : HASH_FIND, NULL);
2389 7480 : if (entry != NULL)
2390 1234 : entry->replicate_valid = false;
2391 : }
2392 : else
2393 : {
2394 : /* Whole cache must be flushed. */
2395 : HASH_SEQ_STATUS status;
2396 :
2397 114 : hash_seq_init(&status, RelationSyncCache);
2398 236 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2399 : {
2400 122 : entry->replicate_valid = false;
2401 : }
2402 : }
2403 : }
2404 :
2405 : /*
2406 : * Publication relation/schema map syscache invalidation callback
2407 : *
2408 : * Called for invalidations on pg_namespace.
2409 : */
2410 : static void
2411 70 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2412 : {
2413 : HASH_SEQ_STATUS status;
2414 : RelationSyncEntry *entry;
2415 :
2416 : /*
2417 : * We can get here if the plugin was used in SQL interface as the
2418 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2419 : * no way to unregister the invalidation callbacks.
2420 : */
2421 70 : if (RelationSyncCache == NULL)
2422 20 : return;
2423 :
2424 : /*
2425 : * We have no easy way to identify which cache entries this invalidation
2426 : * event might have affected, so just mark them all invalid.
2427 : */
2428 50 : hash_seq_init(&status, RelationSyncCache);
2429 92 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2430 : {
2431 42 : entry->replicate_valid = false;
2432 : }
2433 : }
2434 :
2435 : /* Send Replication origin */
2436 : static void
2437 2176 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2438 : XLogRecPtr origin_lsn, bool send_origin)
2439 : {
2440 2176 : if (send_origin)
2441 : {
2442 : char *origin;
2443 :
2444 : /*----------
2445 : * XXX: which behaviour do we want here?
2446 : *
2447 : * Alternatives:
2448 : * - don't send origin message if origin name not found
2449 : * (that's what we do now)
2450 : * - throw error - that will break replication, not good
2451 : * - send some special "unknown" origin
2452 : *----------
2453 : */
2454 18 : if (replorigin_by_oid(origin_id, true, &origin))
2455 : {
2456 : /* Message boundary */
2457 18 : OutputPluginWrite(ctx, false);
2458 18 : OutputPluginPrepareWrite(ctx, true);
2459 :
2460 18 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2461 : }
2462 : }
2463 2176 : }
|