Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "optimizer/optimizer.h"
26 : #include "parser/parse_relation.h"
27 : #include "replication/logical.h"
28 : #include "replication/logicalproto.h"
29 : #include "replication/origin.h"
30 : #include "replication/pgoutput.h"
31 : #include "utils/builtins.h"
32 : #include "utils/inval.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 : #include "utils/syscache.h"
37 : #include "utils/varlena.h"
38 :
39 724 : PG_MODULE_MAGIC;
40 :
41 : static void pgoutput_startup(LogicalDecodingContext *ctx,
42 : OutputPluginOptions *opt, bool is_init);
43 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
44 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
45 : ReorderBufferTXN *txn);
46 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
47 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
48 : static void pgoutput_change(LogicalDecodingContext *ctx,
49 : ReorderBufferTXN *txn, Relation relation,
50 : ReorderBufferChange *change);
51 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
52 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
53 : ReorderBufferChange *change);
54 : static void pgoutput_message(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
56 : bool transactional, const char *prefix,
57 : Size sz, const char *message);
58 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
59 : RepOriginId origin_id);
60 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
61 : ReorderBufferTXN *txn);
62 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
63 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
64 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
65 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
66 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
67 : ReorderBufferTXN *txn,
68 : XLogRecPtr prepare_end_lsn,
69 : TimestampTz prepare_time);
70 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
71 : ReorderBufferTXN *txn);
72 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
73 : ReorderBufferTXN *txn);
74 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
75 : ReorderBufferTXN *txn,
76 : XLogRecPtr abort_lsn);
77 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
78 : ReorderBufferTXN *txn,
79 : XLogRecPtr commit_lsn);
80 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
82 :
83 : static bool publications_valid;
84 : static bool in_streaming;
85 : static bool publish_no_origin;
86 :
87 : static List *LoadPublications(List *pubnames);
88 : static void publication_invalidation_cb(Datum arg, int cacheid,
89 : uint32 hashvalue);
90 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
91 : LogicalDecodingContext *ctx,
92 : Bitmapset *columns);
93 : static void send_repl_origin(LogicalDecodingContext *ctx,
94 : RepOriginId origin_id, XLogRecPtr origin_lsn,
95 : bool send_origin);
96 :
97 : /*
98 : * Only 3 publication actions are used for row filtering ("insert", "update",
99 : * "delete"). See RelationSyncEntry.exprstate[].
100 : */
101 : enum RowFilterPubAction
102 : {
103 : PUBACTION_INSERT,
104 : PUBACTION_UPDATE,
105 : PUBACTION_DELETE
106 : };
107 :
108 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
109 :
110 : /*
111 : * Entry in the map used to remember which relation schemas we sent.
112 : *
113 : * The schema_sent flag determines if the current schema record for the
114 : * relation (and for its ancestor if publish_as_relid is set) was already
115 : * sent to the subscriber (in which case we don't need to send it again).
116 : *
117 : * The schema cache on downstream is however updated only at commit time,
118 : * and with streamed transactions the commit order may be different from
119 : * the order the transactions are sent in. Also, the (sub) transactions
120 : * might get aborted so we need to send the schema for each (sub) transaction
121 : * so that we don't lose the schema information on abort. For handling this,
122 : * we maintain the list of xids (streamed_txns) for those we have already sent
123 : * the schema.
124 : *
125 : * For partitions, 'pubactions' considers not only the table's own
126 : * publications, but also those of all of its ancestors.
127 : */
128 : typedef struct RelationSyncEntry
129 : {
130 : Oid relid; /* relation oid */
131 :
132 : bool replicate_valid; /* overall validity flag for entry */
133 :
134 : bool schema_sent;
135 : List *streamed_txns; /* streamed toplevel transactions with this
136 : * schema */
137 :
138 : /* are we publishing this rel? */
139 : PublicationActions pubactions;
140 :
141 : /*
142 : * ExprState array for row filter. Different publication actions don't
143 : * allow multiple expressions to always be combined into one, because
144 : * updates or deletes restrict the column in expression to be part of the
145 : * replica identity index whereas inserts do not have this restriction, so
146 : * there is one ExprState per publication action.
147 : */
148 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
149 : EState *estate; /* executor state used for row filter */
150 : TupleTableSlot *new_slot; /* slot for storing new tuple */
151 : TupleTableSlot *old_slot; /* slot for storing old tuple */
152 :
153 : /*
154 : * OID of the relation to publish changes as. For a partition, this may
155 : * be set to one of its ancestors whose schema will be used when
156 : * replicating changes, if publish_via_partition_root is set for the
157 : * publication.
158 : */
159 : Oid publish_as_relid;
160 :
161 : /*
162 : * Map used when replicating using an ancestor's schema to convert tuples
163 : * from partition's type to the ancestor's; NULL if publish_as_relid is
164 : * same as 'relid' or if unnecessary due to partition and the ancestor
165 : * having identical TupleDesc.
166 : */
167 : AttrMap *attrmap;
168 :
169 : /*
170 : * Columns included in the publication, or NULL if all columns are
171 : * included implicitly. Note that the attnums in this bitmap are not
172 : * shifted by FirstLowInvalidHeapAttributeNumber.
173 : */
174 : Bitmapset *columns;
175 :
176 : /*
177 : * Private context to store additional data for this entry - state for the
178 : * row filter expressions, column list, etc.
179 : */
180 : MemoryContext entry_cxt;
181 : } RelationSyncEntry;
182 :
183 : /*
184 : * Maintain a per-transaction level variable to track whether the transaction
185 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
186 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
187 : * messages for empty transactions which saves network bandwidth.
188 : *
189 : * This optimization is not used for prepared transactions because if the
190 : * WALSender restarts after prepare of a transaction and before commit prepared
191 : * of the same transaction then we won't be able to figure out if we have
192 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
193 : * because we would have lost the in-memory txndata information that was
194 : * present prior to the restart. This will result in sending a spurious
195 : * COMMIT PREPARED without a corresponding prepared transaction at the
196 : * downstream which would lead to an error when it tries to process it.
197 : *
198 : * XXX We could achieve this optimization by changing protocol to send
199 : * additional information so that downstream can detect that the corresponding
200 : * prepare has not been sent. However, adding such a check for every
201 : * transaction in the downstream could be costly so we might want to do it
202 : * optionally.
203 : *
204 : * We also don't have this optimization for streamed transactions because
205 : * they can contain prepared transactions.
206 : */
207 : typedef struct PGOutputTxnData
208 : {
209 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
210 : } PGOutputTxnData;
211 :
212 : /* Map used to remember which relation schemas we sent. */
213 : static HTAB *RelationSyncCache = NULL;
214 :
215 : static void init_rel_sync_cache(MemoryContext cachectx);
216 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
217 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
218 : Relation relation);
219 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
220 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
221 : uint32 hashvalue);
222 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
223 : TransactionId xid);
224 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
225 : TransactionId xid);
226 : static void init_tuple_slot(PGOutputData *data, Relation relation,
227 : RelationSyncEntry *entry);
228 :
229 : /* row filter routines */
230 : static EState *create_estate_for_relation(Relation rel);
231 : static void pgoutput_row_filter_init(PGOutputData *data,
232 : List *publications,
233 : RelationSyncEntry *entry);
234 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
235 : ExprContext *econtext);
236 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
237 : TupleTableSlot **new_slot_ptr,
238 : RelationSyncEntry *entry,
239 : ReorderBufferChangeType *action);
240 :
241 : /* column list routines */
242 : static void pgoutput_column_list_init(PGOutputData *data,
243 : List *publications,
244 : RelationSyncEntry *entry);
245 :
246 : /*
247 : * Specify output plugin callbacks
248 : */
249 : void
250 1026 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
251 : {
252 1026 : cb->startup_cb = pgoutput_startup;
253 1026 : cb->begin_cb = pgoutput_begin_txn;
254 1026 : cb->change_cb = pgoutput_change;
255 1026 : cb->truncate_cb = pgoutput_truncate;
256 1026 : cb->message_cb = pgoutput_message;
257 1026 : cb->commit_cb = pgoutput_commit_txn;
258 :
259 1026 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
260 1026 : cb->prepare_cb = pgoutput_prepare_txn;
261 1026 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
262 1026 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
263 1026 : cb->filter_by_origin_cb = pgoutput_origin_filter;
264 1026 : cb->shutdown_cb = pgoutput_shutdown;
265 :
266 : /* transaction streaming */
267 1026 : cb->stream_start_cb = pgoutput_stream_start;
268 1026 : cb->stream_stop_cb = pgoutput_stream_stop;
269 1026 : cb->stream_abort_cb = pgoutput_stream_abort;
270 1026 : cb->stream_commit_cb = pgoutput_stream_commit;
271 1026 : cb->stream_change_cb = pgoutput_change;
272 1026 : cb->stream_message_cb = pgoutput_message;
273 1026 : cb->stream_truncate_cb = pgoutput_truncate;
274 : /* transaction streaming - two-phase commit */
275 1026 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
276 1026 : }
277 :
278 : static void
279 564 : parse_output_parameters(List *options, PGOutputData *data)
280 : {
281 : ListCell *lc;
282 564 : bool protocol_version_given = false;
283 564 : bool publication_names_given = false;
284 564 : bool binary_option_given = false;
285 564 : bool messages_option_given = false;
286 564 : bool streaming_given = false;
287 564 : bool two_phase_option_given = false;
288 564 : bool origin_option_given = false;
289 :
290 564 : data->binary = false;
291 564 : data->streaming = LOGICALREP_STREAM_OFF;
292 564 : data->messages = false;
293 564 : data->two_phase = false;
294 :
295 2352 : foreach(lc, options)
296 : {
297 1788 : DefElem *defel = (DefElem *) lfirst(lc);
298 :
299 : Assert(defel->arg == NULL || IsA(defel->arg, String));
300 :
301 : /* Check each param, whether or not we recognize it */
302 1788 : if (strcmp(defel->defname, "proto_version") == 0)
303 : {
304 : unsigned long parsed;
305 : char *endptr;
306 :
307 564 : if (protocol_version_given)
308 0 : ereport(ERROR,
309 : (errcode(ERRCODE_SYNTAX_ERROR),
310 : errmsg("conflicting or redundant options")));
311 564 : protocol_version_given = true;
312 :
313 564 : errno = 0;
314 564 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
315 564 : if (errno != 0 || *endptr != '\0')
316 0 : ereport(ERROR,
317 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
318 : errmsg("invalid proto_version")));
319 :
320 564 : if (parsed > PG_UINT32_MAX)
321 0 : ereport(ERROR,
322 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
323 : errmsg("proto_version \"%s\" out of range",
324 : strVal(defel->arg))));
325 :
326 564 : data->protocol_version = (uint32) parsed;
327 : }
328 1224 : else if (strcmp(defel->defname, "publication_names") == 0)
329 : {
330 564 : if (publication_names_given)
331 0 : ereport(ERROR,
332 : (errcode(ERRCODE_SYNTAX_ERROR),
333 : errmsg("conflicting or redundant options")));
334 564 : publication_names_given = true;
335 :
336 564 : if (!SplitIdentifierString(strVal(defel->arg), ',',
337 : &data->publication_names))
338 0 : ereport(ERROR,
339 : (errcode(ERRCODE_INVALID_NAME),
340 : errmsg("invalid publication_names syntax")));
341 : }
342 660 : else if (strcmp(defel->defname, "binary") == 0)
343 : {
344 20 : if (binary_option_given)
345 0 : ereport(ERROR,
346 : (errcode(ERRCODE_SYNTAX_ERROR),
347 : errmsg("conflicting or redundant options")));
348 20 : binary_option_given = true;
349 :
350 20 : data->binary = defGetBoolean(defel);
351 : }
352 640 : else if (strcmp(defel->defname, "messages") == 0)
353 : {
354 8 : if (messages_option_given)
355 0 : ereport(ERROR,
356 : (errcode(ERRCODE_SYNTAX_ERROR),
357 : errmsg("conflicting or redundant options")));
358 8 : messages_option_given = true;
359 :
360 8 : data->messages = defGetBoolean(defel);
361 : }
362 632 : else if (strcmp(defel->defname, "streaming") == 0)
363 : {
364 70 : if (streaming_given)
365 0 : ereport(ERROR,
366 : (errcode(ERRCODE_SYNTAX_ERROR),
367 : errmsg("conflicting or redundant options")));
368 70 : streaming_given = true;
369 :
370 70 : data->streaming = defGetStreamingMode(defel);
371 : }
372 562 : else if (strcmp(defel->defname, "two_phase") == 0)
373 : {
374 10 : if (two_phase_option_given)
375 0 : ereport(ERROR,
376 : (errcode(ERRCODE_SYNTAX_ERROR),
377 : errmsg("conflicting or redundant options")));
378 10 : two_phase_option_given = true;
379 :
380 10 : data->two_phase = defGetBoolean(defel);
381 : }
382 552 : else if (strcmp(defel->defname, "origin") == 0)
383 : {
384 552 : if (origin_option_given)
385 0 : ereport(ERROR,
386 : errcode(ERRCODE_SYNTAX_ERROR),
387 : errmsg("conflicting or redundant options"));
388 552 : origin_option_given = true;
389 :
390 552 : data->origin = defGetString(defel);
391 552 : if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
392 18 : publish_no_origin = true;
393 534 : else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
394 534 : publish_no_origin = false;
395 : else
396 0 : ereport(ERROR,
397 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398 : errmsg("unrecognized origin value: \"%s\"", data->origin));
399 : }
400 : else
401 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
402 : }
403 564 : }
404 :
405 : /*
406 : * Initialize this plugin
407 : */
408 : static void
409 1026 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
410 : bool is_init)
411 : {
412 1026 : PGOutputData *data = palloc0(sizeof(PGOutputData));
413 : static bool publication_callback_registered = false;
414 :
415 : /* Create our memory context for private allocations. */
416 1026 : data->context = AllocSetContextCreate(ctx->context,
417 : "logical replication output context",
418 : ALLOCSET_DEFAULT_SIZES);
419 :
420 1026 : data->cachectx = AllocSetContextCreate(ctx->context,
421 : "logical replication cache context",
422 : ALLOCSET_DEFAULT_SIZES);
423 :
424 1026 : ctx->output_plugin_private = data;
425 :
426 : /* This plugin uses binary protocol. */
427 1026 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
428 :
429 : /*
430 : * This is replication start and not slot initialization.
431 : *
432 : * Parse and validate options passed by the client.
433 : */
434 1026 : if (!is_init)
435 : {
436 : /* Parse the params and ERROR if we see any we don't recognize */
437 564 : parse_output_parameters(ctx->output_plugin_options, data);
438 :
439 : /* Check if we support requested protocol */
440 564 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
441 0 : ereport(ERROR,
442 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
443 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
444 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
445 :
446 564 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
447 0 : ereport(ERROR,
448 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
449 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
450 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
451 :
452 564 : if (data->publication_names == NIL)
453 0 : ereport(ERROR,
454 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455 : errmsg("publication_names parameter missing")));
456 :
457 : /*
458 : * Decide whether to enable streaming. It is disabled by default, in
459 : * which case we just update the flag in decoding context. Otherwise
460 : * we only allow it with sufficient version of the protocol, and when
461 : * the output plugin supports it.
462 : */
463 564 : if (data->streaming == LOGICALREP_STREAM_OFF)
464 494 : ctx->streaming = false;
465 70 : else if (data->streaming == LOGICALREP_STREAM_ON &&
466 54 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
467 0 : ereport(ERROR,
468 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
470 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
471 70 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
472 16 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
473 0 : ereport(ERROR,
474 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
475 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
476 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
477 70 : else if (!ctx->streaming)
478 0 : ereport(ERROR,
479 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
480 : errmsg("streaming requested, but not supported by output plugin")));
481 :
482 : /* Also remember we're currently not streaming any transaction. */
483 564 : in_streaming = false;
484 :
485 : /*
486 : * Here, we just check whether the two-phase option is passed by
487 : * plugin and decide whether to enable it at later point of time. It
488 : * remains enabled if the previous start-up has done so. But we only
489 : * allow the option to be passed in with sufficient version of the
490 : * protocol, and when the output plugin supports it.
491 : */
492 564 : if (!data->two_phase)
493 554 : ctx->twophase_opt_given = false;
494 10 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
495 0 : ereport(ERROR,
496 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
497 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
498 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
499 10 : else if (!ctx->twophase)
500 0 : ereport(ERROR,
501 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
502 : errmsg("two-phase commit requested, but not supported by output plugin")));
503 : else
504 10 : ctx->twophase_opt_given = true;
505 :
506 : /* Init publication state. */
507 564 : data->publications = NIL;
508 564 : publications_valid = false;
509 :
510 : /*
511 : * Register callback for pg_publication if we didn't already do that
512 : * during some previous call in this process.
513 : */
514 564 : if (!publication_callback_registered)
515 : {
516 564 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
517 : publication_invalidation_cb,
518 : (Datum) 0);
519 564 : publication_callback_registered = true;
520 : }
521 :
522 : /* Initialize relation schema cache. */
523 564 : init_rel_sync_cache(CacheMemoryContext);
524 : }
525 : else
526 : {
527 : /*
528 : * Disable the streaming and prepared transactions during the slot
529 : * initialization mode.
530 : */
531 462 : ctx->streaming = false;
532 462 : ctx->twophase = false;
533 : }
534 1026 : }
535 :
536 : /*
537 : * BEGIN callback.
538 : *
539 : * Don't send the BEGIN message here instead postpone it until the first
540 : * change. In logical replication, a common scenario is to replicate a set of
541 : * tables (instead of all tables) and transactions whose changes were on
542 : * the table(s) that are not published will produce empty transactions. These
543 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
544 : * using bandwidth on something with little/no use for logical replication.
545 : */
546 : static void
547 1190 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
548 : {
549 1190 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
550 : sizeof(PGOutputTxnData));
551 :
552 1190 : txn->output_plugin_private = txndata;
553 1190 : }
554 :
555 : /*
556 : * Send BEGIN.
557 : *
558 : * This is called while processing the first change of the transaction.
559 : */
560 : static void
561 674 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
562 : {
563 674 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
564 674 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
565 :
566 : Assert(txndata);
567 : Assert(!txndata->sent_begin_txn);
568 :
569 674 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
570 674 : logicalrep_write_begin(ctx->out, txn);
571 674 : txndata->sent_begin_txn = true;
572 :
573 674 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
574 : send_replication_origin);
575 :
576 674 : OutputPluginWrite(ctx, true);
577 674 : }
578 :
579 : /*
580 : * COMMIT callback
581 : */
582 : static void
583 1184 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
584 : XLogRecPtr commit_lsn)
585 : {
586 1184 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
587 : bool sent_begin_txn;
588 :
589 : Assert(txndata);
590 :
591 : /*
592 : * We don't need to send the commit message unless some relevant change
593 : * from this transaction has been sent to the downstream.
594 : */
595 1184 : sent_begin_txn = txndata->sent_begin_txn;
596 1184 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
597 1184 : pfree(txndata);
598 1184 : txn->output_plugin_private = NULL;
599 :
600 1184 : if (!sent_begin_txn)
601 : {
602 512 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
603 512 : return;
604 : }
605 :
606 672 : OutputPluginPrepareWrite(ctx, true);
607 672 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
608 672 : OutputPluginWrite(ctx, true);
609 : }
610 :
611 : /*
612 : * BEGIN PREPARE callback
613 : */
614 : static void
615 36 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
616 : {
617 36 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
618 :
619 36 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
620 36 : logicalrep_write_begin_prepare(ctx->out, txn);
621 :
622 36 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
623 : send_replication_origin);
624 :
625 36 : OutputPluginWrite(ctx, true);
626 36 : }
627 :
628 : /*
629 : * PREPARE callback
630 : */
631 : static void
632 36 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
633 : XLogRecPtr prepare_lsn)
634 : {
635 36 : OutputPluginUpdateProgress(ctx, false);
636 :
637 36 : OutputPluginPrepareWrite(ctx, true);
638 36 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
639 36 : OutputPluginWrite(ctx, true);
640 36 : }
641 :
642 : /*
643 : * COMMIT PREPARED callback
644 : */
645 : static void
646 46 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
647 : XLogRecPtr commit_lsn)
648 : {
649 46 : OutputPluginUpdateProgress(ctx, false);
650 :
651 46 : OutputPluginPrepareWrite(ctx, true);
652 46 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
653 46 : OutputPluginWrite(ctx, true);
654 46 : }
655 :
656 : /*
657 : * ROLLBACK PREPARED callback
658 : */
659 : static void
660 18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
661 : ReorderBufferTXN *txn,
662 : XLogRecPtr prepare_end_lsn,
663 : TimestampTz prepare_time)
664 : {
665 18 : OutputPluginUpdateProgress(ctx, false);
666 :
667 18 : OutputPluginPrepareWrite(ctx, true);
668 18 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
669 : prepare_time);
670 18 : OutputPluginWrite(ctx, true);
671 18 : }
672 :
673 : /*
674 : * Write the current schema of the relation and its ancestor (if any) if not
675 : * done yet.
676 : */
677 : static void
678 364002 : maybe_send_schema(LogicalDecodingContext *ctx,
679 : ReorderBufferChange *change,
680 : Relation relation, RelationSyncEntry *relentry)
681 : {
682 : bool schema_sent;
683 364002 : TransactionId xid = InvalidTransactionId;
684 364002 : TransactionId topxid = InvalidTransactionId;
685 :
686 : /*
687 : * Remember XID of the (sub)transaction for the change. We don't care if
688 : * it's top-level transaction or not (we have already sent that XID in
689 : * start of the current streaming block).
690 : *
691 : * If we're not in a streaming block, just use InvalidTransactionId and
692 : * the write methods will not include it.
693 : */
694 364002 : if (in_streaming)
695 351882 : xid = change->txn->xid;
696 :
697 364002 : if (rbtxn_is_subtxn(change->txn))
698 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
699 : else
700 343664 : topxid = xid;
701 :
702 : /*
703 : * Do we need to send the schema? We do track streamed transactions
704 : * separately, because those may be applied later (and the regular
705 : * transactions won't see their effects until then) and in an order that
706 : * we don't know at this point.
707 : *
708 : * XXX There is a scope of optimization here. Currently, we always send
709 : * the schema first time in a streaming transaction but we can probably
710 : * avoid that by checking 'relentry->schema_sent' flag. However, before
711 : * doing that we need to study its impact on the case where we have a mix
712 : * of streaming and non-streaming transactions.
713 : */
714 364002 : if (in_streaming)
715 351882 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
716 : else
717 12120 : schema_sent = relentry->schema_sent;
718 :
719 : /* Nothing to do if we already sent the schema. */
720 364002 : if (schema_sent)
721 363468 : return;
722 :
723 : /*
724 : * Send the schema. If the changes will be published using an ancestor's
725 : * schema, not the relation's own, send that ancestor's schema before
726 : * sending relation's own (XXX - maybe sending only the former suffices?).
727 : */
728 534 : if (relentry->publish_as_relid != RelationGetRelid(relation))
729 : {
730 54 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
731 :
732 54 : send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
733 54 : RelationClose(ancestor);
734 : }
735 :
736 534 : send_relation_and_attrs(relation, xid, ctx, relentry->columns);
737 :
738 532 : if (in_streaming)
739 144 : set_schema_sent_in_streamed_txn(relentry, topxid);
740 : else
741 388 : relentry->schema_sent = true;
742 : }
743 :
744 : /*
745 : * Sends a relation
746 : */
747 : static void
748 588 : send_relation_and_attrs(Relation relation, TransactionId xid,
749 : LogicalDecodingContext *ctx,
750 : Bitmapset *columns)
751 : {
752 588 : TupleDesc desc = RelationGetDescr(relation);
753 : int i;
754 :
755 : /*
756 : * Write out type info if needed. We do that only for user-created types.
757 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
758 : * objects with hand-assigned OIDs to be "built in", not for instance any
759 : * function or type defined in the information_schema. This is important
760 : * because only hand-assigned OIDs can be expected to remain stable across
761 : * major versions.
762 : */
763 1784 : for (i = 0; i < desc->natts; i++)
764 : {
765 1196 : Form_pg_attribute att = TupleDescAttr(desc, i);
766 :
767 1196 : if (att->attisdropped || att->attgenerated)
768 10 : continue;
769 :
770 1186 : if (att->atttypid < FirstGenbkiObjectId)
771 1150 : continue;
772 :
773 : /* Skip this attribute if it's not present in the column list */
774 36 : if (columns != NULL && !bms_is_member(att->attnum, columns))
775 0 : continue;
776 :
777 36 : OutputPluginPrepareWrite(ctx, false);
778 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
779 36 : OutputPluginWrite(ctx, false);
780 : }
781 :
782 588 : OutputPluginPrepareWrite(ctx, false);
783 588 : logicalrep_write_rel(ctx->out, xid, relation, columns);
784 588 : OutputPluginWrite(ctx, false);
785 586 : }
786 :
787 : /*
788 : * Executor state preparation for evaluation of row filter expressions for the
789 : * specified relation.
790 : */
791 : static EState *
792 32 : create_estate_for_relation(Relation rel)
793 : {
794 : EState *estate;
795 : RangeTblEntry *rte;
796 32 : List *perminfos = NIL;
797 :
798 32 : estate = CreateExecutorState();
799 :
800 32 : rte = makeNode(RangeTblEntry);
801 32 : rte->rtekind = RTE_RELATION;
802 32 : rte->relid = RelationGetRelid(rel);
803 32 : rte->relkind = rel->rd_rel->relkind;
804 32 : rte->rellockmode = AccessShareLock;
805 :
806 32 : addRTEPermissionInfo(&perminfos, rte);
807 :
808 32 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
809 :
810 32 : estate->es_output_cid = GetCurrentCommandId(false);
811 :
812 32 : return estate;
813 : }
814 :
815 : /*
816 : * Evaluates row filter.
817 : *
818 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
819 : * isn't replicated.
820 : */
821 : static bool
822 72 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
823 : {
824 : Datum ret;
825 : bool isnull;
826 :
827 : Assert(state != NULL);
828 :
829 72 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
830 :
831 72 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
832 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
833 : isnull ? "true" : "false");
834 :
835 72 : if (isnull)
836 2 : return false;
837 :
838 70 : return DatumGetBool(ret);
839 : }
840 :
841 : /*
842 : * Make sure the per-entry memory context exists.
843 : */
844 : static void
845 106 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
846 : {
847 : Relation relation;
848 :
849 : /* The context may already exist, in which case bail out. */
850 106 : if (entry->entry_cxt)
851 4 : return;
852 :
853 102 : relation = RelationIdGetRelation(entry->publish_as_relid);
854 :
855 102 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
856 : "entry private context",
857 : ALLOCSET_SMALL_SIZES);
858 :
859 102 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
860 : RelationGetRelationName(relation));
861 : }
862 :
863 : /*
864 : * Initialize the row filter.
865 : */
866 : static void
867 452 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
868 : RelationSyncEntry *entry)
869 : {
870 : ListCell *lc;
871 452 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
872 452 : bool no_filter[] = {false, false, false}; /* One per pubaction */
873 : MemoryContext oldctx;
874 : int idx;
875 452 : bool has_filter = true;
876 452 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
877 :
878 : /*
879 : * Find if there are any row filters for this relation. If there are, then
880 : * prepare the necessary ExprState and cache it in entry->exprstate. To
881 : * build an expression state, we need to ensure the following:
882 : *
883 : * All the given publication-table mappings must be checked.
884 : *
885 : * Multiple publications might have multiple row filters for this
886 : * relation. Since row filter usage depends on the DML operation, there
887 : * are multiple lists (one for each operation) to which row filters will
888 : * be appended.
889 : *
890 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
891 : * expression" so it takes precedence.
892 : */
893 492 : foreach(lc, publications)
894 : {
895 460 : Publication *pub = lfirst(lc);
896 460 : HeapTuple rftuple = NULL;
897 460 : Datum rfdatum = 0;
898 460 : bool pub_no_filter = true;
899 :
900 : /*
901 : * If the publication is FOR ALL TABLES, or the publication includes a
902 : * FOR TABLES IN SCHEMA where the table belongs to the referred
903 : * schema, then it is treated the same as if there are no row filters
904 : * (even if other publications have a row filter).
905 : */
906 460 : if (!pub->alltables &&
907 338 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
908 : ObjectIdGetDatum(schemaid),
909 : ObjectIdGetDatum(pub->oid)))
910 : {
911 : /*
912 : * Check for the presence of a row filter in this publication.
913 : */
914 326 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
915 : ObjectIdGetDatum(entry->publish_as_relid),
916 : ObjectIdGetDatum(pub->oid));
917 :
918 326 : if (HeapTupleIsValid(rftuple))
919 : {
920 : /* Null indicates no filter. */
921 302 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
922 : Anum_pg_publication_rel_prqual,
923 : &pub_no_filter);
924 : }
925 : }
926 :
927 460 : if (pub_no_filter)
928 : {
929 434 : if (rftuple)
930 276 : ReleaseSysCache(rftuple);
931 :
932 434 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
933 434 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
934 434 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
935 :
936 : /*
937 : * Quick exit if all the DML actions are publicized via this
938 : * publication.
939 : */
940 434 : if (no_filter[PUBACTION_INSERT] &&
941 434 : no_filter[PUBACTION_UPDATE] &&
942 420 : no_filter[PUBACTION_DELETE])
943 : {
944 420 : has_filter = false;
945 420 : break;
946 : }
947 :
948 : /* No additional work for this publication. Next one. */
949 14 : continue;
950 : }
951 :
952 : /* Form the per pubaction row filter lists. */
953 26 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
954 26 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
955 26 : TextDatumGetCString(rfdatum));
956 26 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
957 26 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
958 26 : TextDatumGetCString(rfdatum));
959 26 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
960 26 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
961 26 : TextDatumGetCString(rfdatum));
962 :
963 26 : ReleaseSysCache(rftuple);
964 : } /* loop all subscribed publications */
965 :
966 : /* Clean the row filter */
967 1808 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
968 : {
969 1356 : if (no_filter[idx])
970 : {
971 1278 : list_free_deep(rfnodes[idx]);
972 1278 : rfnodes[idx] = NIL;
973 : }
974 : }
975 :
976 452 : if (has_filter)
977 : {
978 32 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
979 :
980 32 : pgoutput_ensure_entry_cxt(data, entry);
981 :
982 : /*
983 : * Now all the filters for all pubactions are known. Combine them when
984 : * their pubactions are the same.
985 : */
986 32 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
987 32 : entry->estate = create_estate_for_relation(relation);
988 128 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
989 : {
990 96 : List *filters = NIL;
991 : Expr *rfnode;
992 :
993 96 : if (rfnodes[idx] == NIL)
994 42 : continue;
995 :
996 114 : foreach(lc, rfnodes[idx])
997 60 : filters = lappend(filters, stringToNode((char *) lfirst(lc)));
998 :
999 : /* combine the row filter and cache the ExprState */
1000 54 : rfnode = make_orclause(filters);
1001 54 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1002 : } /* for each pubaction */
1003 32 : MemoryContextSwitchTo(oldctx);
1004 :
1005 32 : RelationClose(relation);
1006 : }
1007 452 : }
1008 :
1009 : /*
1010 : * Initialize the column list.
1011 : */
1012 : static void
1013 452 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1014 : RelationSyncEntry *entry)
1015 : {
1016 : ListCell *lc;
1017 452 : bool first = true;
1018 452 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1019 :
1020 : /*
1021 : * Find if there are any column lists for this relation. If there are,
1022 : * build a bitmap using the column lists.
1023 : *
1024 : * Multiple publications might have multiple column lists for this
1025 : * relation.
1026 : *
1027 : * Note that we don't support the case where the column list is different
1028 : * for the same table when combining publications. See comments atop
1029 : * fetch_table_list. But one can later change the publication so we still
1030 : * need to check all the given publication-table mappings and report an
1031 : * error if any publications have a different column list.
1032 : *
1033 : * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1034 : */
1035 924 : foreach(lc, publications)
1036 : {
1037 474 : Publication *pub = lfirst(lc);
1038 474 : HeapTuple cftuple = NULL;
1039 474 : Datum cfdatum = 0;
1040 474 : Bitmapset *cols = NULL;
1041 :
1042 : /*
1043 : * If the publication is FOR ALL TABLES then it is treated the same as
1044 : * if there are no column lists (even if other publications have a
1045 : * list).
1046 : */
1047 474 : if (!pub->alltables)
1048 : {
1049 350 : bool pub_no_list = true;
1050 :
1051 : /*
1052 : * Check for the presence of a column list in this publication.
1053 : *
1054 : * Note: If we find no pg_publication_rel row, it's a publication
1055 : * defined for a whole schema, so it can't have a column list,
1056 : * just like a FOR ALL TABLES publication.
1057 : */
1058 350 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
1059 : ObjectIdGetDatum(entry->publish_as_relid),
1060 : ObjectIdGetDatum(pub->oid));
1061 :
1062 350 : if (HeapTupleIsValid(cftuple))
1063 : {
1064 : /* Lookup the column list attribute. */
1065 312 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1066 : Anum_pg_publication_rel_prattrs,
1067 : &pub_no_list);
1068 :
1069 : /* Build the column list bitmap in the per-entry context. */
1070 312 : if (!pub_no_list) /* when not null */
1071 : {
1072 : int i;
1073 74 : int nliveatts = 0;
1074 74 : TupleDesc desc = RelationGetDescr(relation);
1075 :
1076 74 : pgoutput_ensure_entry_cxt(data, entry);
1077 :
1078 74 : cols = pub_collist_to_bitmapset(cols, cfdatum,
1079 : entry->entry_cxt);
1080 :
1081 : /* Get the number of live attributes. */
1082 310 : for (i = 0; i < desc->natts; i++)
1083 : {
1084 236 : Form_pg_attribute att = TupleDescAttr(desc, i);
1085 :
1086 236 : if (att->attisdropped || att->attgenerated)
1087 4 : continue;
1088 :
1089 232 : nliveatts++;
1090 : }
1091 :
1092 : /*
1093 : * If column list includes all the columns of the table,
1094 : * set it to NULL.
1095 : */
1096 74 : if (bms_num_members(cols) == nliveatts)
1097 : {
1098 14 : bms_free(cols);
1099 14 : cols = NULL;
1100 : }
1101 : }
1102 :
1103 312 : ReleaseSysCache(cftuple);
1104 : }
1105 : }
1106 :
1107 474 : if (first)
1108 : {
1109 452 : entry->columns = cols;
1110 452 : first = false;
1111 : }
1112 22 : else if (!bms_equal(entry->columns, cols))
1113 2 : ereport(ERROR,
1114 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1115 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1116 : get_namespace_name(RelationGetNamespace(relation)),
1117 : RelationGetRelationName(relation)));
1118 : } /* loop all subscribed publications */
1119 :
1120 450 : RelationClose(relation);
1121 450 : }
1122 :
1123 : /*
1124 : * Initialize the slot for storing new and old tuples, and build the map that
1125 : * will be used to convert the relation's tuples into the ancestor's format.
1126 : */
1127 : static void
1128 452 : init_tuple_slot(PGOutputData *data, Relation relation,
1129 : RelationSyncEntry *entry)
1130 : {
1131 : MemoryContext oldctx;
1132 : TupleDesc oldtupdesc;
1133 : TupleDesc newtupdesc;
1134 :
1135 452 : oldctx = MemoryContextSwitchTo(data->cachectx);
1136 :
1137 : /*
1138 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1139 : * live as long as the cache remains.
1140 : */
1141 452 : oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1142 452 : newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1143 :
1144 452 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1145 452 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1146 :
1147 452 : MemoryContextSwitchTo(oldctx);
1148 :
1149 : /*
1150 : * Cache the map that will be used to convert the relation's tuples into
1151 : * the ancestor's format, if needed.
1152 : */
1153 452 : if (entry->publish_as_relid != RelationGetRelid(relation))
1154 : {
1155 56 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1156 56 : TupleDesc indesc = RelationGetDescr(relation);
1157 56 : TupleDesc outdesc = RelationGetDescr(ancestor);
1158 :
1159 : /* Map must live as long as the session does. */
1160 56 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1161 :
1162 56 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1163 :
1164 56 : MemoryContextSwitchTo(oldctx);
1165 56 : RelationClose(ancestor);
1166 : }
1167 452 : }
1168 :
1169 : /*
1170 : * Change is checked against the row filter if any.
1171 : *
1172 : * Returns true if the change is to be replicated, else false.
1173 : *
1174 : * For inserts, evaluate the row filter for new tuple.
1175 : * For deletes, evaluate the row filter for old tuple.
1176 : * For updates, evaluate the row filter for old and new tuple.
1177 : *
1178 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1179 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1180 : * only one of the tuples matches the row filter expression, we transform
1181 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1182 : * following rules:
1183 : *
1184 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1185 : * Case 2: old-row (no match) new row (match) -> INSERT
1186 : * Case 3: old-row (match) new-row (no match) -> DELETE
1187 : * Case 4: old-row (match) new row (match) -> UPDATE
1188 : *
1189 : * The new action is updated in the action parameter.
1190 : *
1191 : * The new slot could be updated when transforming the UPDATE into INSERT,
1192 : * because the original new tuple might not have column values from the replica
1193 : * identity.
1194 : *
1195 : * Examples:
1196 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1197 : * Since the old tuple satisfies, the initial table synchronization copied this
1198 : * row (or another method was used to guarantee that there is data
1199 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1200 : * row filter, so from a data consistency perspective, that row should be
1201 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1202 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1203 : * is undesirable because it doesn't reflect what was defined in the row filter
1204 : * expression on the publisher. This row on the subscriber would likely not be
1205 : * modified by replication again. If someone inserted a new row with the same
1206 : * old identifier, replication could stop due to a constraint violation.
1207 : *
1208 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1209 : * Since the old tuple doesn't satisfy, the initial table synchronization
1210 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1211 : * satisfy the row filter, so from a data consistency perspective, that row
1212 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1213 : * statements have no effect (it matches no row -- see
1214 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1215 : * INSERT statement and be sent to the subscriber. However, this might surprise
1216 : * someone who expects the data set to satisfy the row filter expression on the
1217 : * provider.
1218 : */
1219 : static bool
1220 364000 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1221 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1222 : ReorderBufferChangeType *action)
1223 : {
1224 : TupleDesc desc;
1225 : int i;
1226 : bool old_matched,
1227 : new_matched,
1228 : result;
1229 : TupleTableSlot *tmp_new_slot;
1230 364000 : TupleTableSlot *new_slot = *new_slot_ptr;
1231 : ExprContext *ecxt;
1232 : ExprState *filter_exprstate;
1233 :
1234 : /*
1235 : * We need this map to avoid relying on ReorderBufferChangeType enums
1236 : * having specific values.
1237 : */
1238 : static const int map_changetype_pubaction[] = {
1239 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1240 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1241 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1242 : };
1243 :
1244 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1245 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1246 : *action == REORDER_BUFFER_CHANGE_DELETE);
1247 :
1248 : Assert(new_slot || old_slot);
1249 :
1250 : /* Get the corresponding row filter */
1251 364000 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1252 :
1253 : /* Bail out if there is no row filter */
1254 364000 : if (!filter_exprstate)
1255 363936 : return true;
1256 :
1257 64 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1258 : get_namespace_name(RelationGetNamespace(relation)),
1259 : RelationGetRelationName(relation));
1260 :
1261 64 : ResetPerTupleExprContext(entry->estate);
1262 :
1263 64 : ecxt = GetPerTupleExprContext(entry->estate);
1264 :
1265 : /*
1266 : * For the following occasions where there is only one tuple, we can
1267 : * evaluate the row filter for that tuple and return.
1268 : *
1269 : * For inserts, we only have the new tuple.
1270 : *
1271 : * For updates, we can have only a new tuple when none of the replica
1272 : * identity columns changed and none of those columns have external data
1273 : * but we still need to evaluate the row filter for the new tuple as the
1274 : * existing values of those columns might not match the filter. Also,
1275 : * users can use constant expressions in the row filter, so we anyway need
1276 : * to evaluate it for the new tuple.
1277 : *
1278 : * For deletes, we only have the old tuple.
1279 : */
1280 64 : if (!new_slot || !old_slot)
1281 : {
1282 56 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1283 56 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1284 :
1285 56 : return result;
1286 : }
1287 :
1288 : /*
1289 : * Both the old and new tuples must be valid only for updates and need to
1290 : * be checked against the row filter.
1291 : */
1292 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1293 :
1294 8 : slot_getallattrs(new_slot);
1295 8 : slot_getallattrs(old_slot);
1296 :
1297 8 : tmp_new_slot = NULL;
1298 8 : desc = RelationGetDescr(relation);
1299 :
1300 : /*
1301 : * The new tuple might not have all the replica identity columns, in which
1302 : * case it needs to be copied over from the old tuple.
1303 : */
1304 24 : for (i = 0; i < desc->natts; i++)
1305 : {
1306 16 : Form_pg_attribute att = TupleDescAttr(desc, i);
1307 :
1308 : /*
1309 : * if the column in the new tuple or old tuple is null, nothing to do
1310 : */
1311 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1312 2 : continue;
1313 :
1314 : /*
1315 : * Unchanged toasted replica identity columns are only logged in the
1316 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1317 : * Logged) toast values are always assembled in memory and set as
1318 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1319 : */
1320 14 : if (att->attlen == -1 &&
1321 8 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1322 2 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1323 : {
1324 2 : if (!tmp_new_slot)
1325 : {
1326 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1327 2 : ExecClearTuple(tmp_new_slot);
1328 :
1329 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1330 2 : desc->natts * sizeof(Datum));
1331 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1332 2 : desc->natts * sizeof(bool));
1333 : }
1334 :
1335 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1336 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1337 : }
1338 : }
1339 :
1340 8 : ecxt->ecxt_scantuple = old_slot;
1341 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1342 :
1343 8 : if (tmp_new_slot)
1344 : {
1345 2 : ExecStoreVirtualTuple(tmp_new_slot);
1346 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1347 : }
1348 : else
1349 6 : ecxt->ecxt_scantuple = new_slot;
1350 :
1351 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1352 :
1353 : /*
1354 : * Case 1: if both tuples don't match the row filter, bailout. Send
1355 : * nothing.
1356 : */
1357 8 : if (!old_matched && !new_matched)
1358 0 : return false;
1359 :
1360 : /*
1361 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1362 : * tuple does, transform the UPDATE into INSERT.
1363 : *
1364 : * Use the newly transformed tuple that must contain the column values for
1365 : * all the replica identity columns. This is required to ensure that the
1366 : * while inserting the tuple in the downstream node, we have all the
1367 : * required column values.
1368 : */
1369 8 : if (!old_matched && new_matched)
1370 : {
1371 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1372 :
1373 4 : if (tmp_new_slot)
1374 2 : *new_slot_ptr = tmp_new_slot;
1375 : }
1376 :
1377 : /*
1378 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1379 : * doesn't, transform the UPDATE into DELETE.
1380 : *
1381 : * This transformation does not require another tuple. The Old tuple will
1382 : * be used for DELETE.
1383 : */
1384 4 : else if (old_matched && !new_matched)
1385 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1386 :
1387 : /*
1388 : * Case 4: if both tuples match the row filter, transformation isn't
1389 : * required. (*action is default UPDATE).
1390 : */
1391 :
1392 8 : return true;
1393 : }
1394 :
1395 : /*
1396 : * Sends the decoded DML over wire.
1397 : *
1398 : * This is called both in streaming and non-streaming modes.
1399 : */
1400 : static void
1401 366280 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1402 : Relation relation, ReorderBufferChange *change)
1403 : {
1404 366280 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1405 366280 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1406 : MemoryContext old;
1407 : RelationSyncEntry *relentry;
1408 366280 : TransactionId xid = InvalidTransactionId;
1409 366280 : Relation ancestor = NULL;
1410 366280 : Relation targetrel = relation;
1411 366280 : ReorderBufferChangeType action = change->action;
1412 366280 : TupleTableSlot *old_slot = NULL;
1413 366280 : TupleTableSlot *new_slot = NULL;
1414 :
1415 366280 : if (!is_publishable_relation(relation))
1416 2276 : return;
1417 :
1418 : /*
1419 : * Remember the xid for the change in streaming mode. We need to send xid
1420 : * with each change in the streaming mode so that subscriber can make
1421 : * their association and on aborts, it can discard the corresponding
1422 : * changes.
1423 : */
1424 366276 : if (in_streaming)
1425 351882 : xid = change->txn->xid;
1426 :
1427 366276 : relentry = get_rel_sync_entry(data, relation);
1428 :
1429 : /* First check the table filter */
1430 366272 : switch (action)
1431 : {
1432 211534 : case REORDER_BUFFER_CHANGE_INSERT:
1433 211534 : if (!relentry->pubactions.pubinsert)
1434 82 : return;
1435 211452 : break;
1436 68916 : case REORDER_BUFFER_CHANGE_UPDATE:
1437 68916 : if (!relentry->pubactions.pubupdate)
1438 82 : return;
1439 68834 : break;
1440 85822 : case REORDER_BUFFER_CHANGE_DELETE:
1441 85822 : if (!relentry->pubactions.pubdelete)
1442 2108 : return;
1443 :
1444 : /*
1445 : * This is only possible if deletes are allowed even when replica
1446 : * identity is not defined for a table. Since the DELETE action
1447 : * can't be published, we simply return.
1448 : */
1449 83714 : if (!change->data.tp.oldtuple)
1450 : {
1451 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1452 0 : return;
1453 : }
1454 83714 : break;
1455 364000 : default:
1456 : Assert(false);
1457 : }
1458 :
1459 : /* Avoid leaking memory by using and resetting our own context */
1460 364000 : old = MemoryContextSwitchTo(data->context);
1461 :
1462 : /* Switch relation if publishing via root. */
1463 364000 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1464 : {
1465 : Assert(relation->rd_rel->relispartition);
1466 98 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1467 98 : targetrel = ancestor;
1468 : }
1469 :
1470 364000 : if (change->data.tp.oldtuple)
1471 : {
1472 83934 : old_slot = relentry->old_slot;
1473 83934 : ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
1474 :
1475 : /* Convert tuple if needed. */
1476 83934 : if (relentry->attrmap)
1477 : {
1478 0 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1479 : &TTSOpsVirtual);
1480 :
1481 0 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1482 : }
1483 : }
1484 :
1485 364000 : if (change->data.tp.newtuple)
1486 : {
1487 280286 : new_slot = relentry->new_slot;
1488 280286 : ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
1489 :
1490 : /* Convert tuple if needed. */
1491 280286 : if (relentry->attrmap)
1492 : {
1493 16 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1494 : &TTSOpsVirtual);
1495 :
1496 16 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1497 : }
1498 : }
1499 :
1500 : /*
1501 : * Check row filter.
1502 : *
1503 : * Updates could be transformed to inserts or deletes based on the results
1504 : * of the row filter for old and new tuple.
1505 : */
1506 364000 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1507 22 : goto cleanup;
1508 :
1509 : /*
1510 : * Send BEGIN if we haven't yet.
1511 : *
1512 : * We send the BEGIN message after ensuring that we will actually send the
1513 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1514 : * transactions.
1515 : */
1516 363978 : if (txndata && !txndata->sent_begin_txn)
1517 656 : pgoutput_send_begin(ctx, txn);
1518 :
1519 : /*
1520 : * Schema should be sent using the original relation because it also sends
1521 : * the ancestor's relation.
1522 : */
1523 363978 : maybe_send_schema(ctx, change, relation, relentry);
1524 :
1525 363976 : OutputPluginPrepareWrite(ctx, true);
1526 :
1527 : /* Send the data */
1528 363976 : switch (action)
1529 : {
1530 211432 : case REORDER_BUFFER_CHANGE_INSERT:
1531 211432 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1532 211432 : data->binary, relentry->columns);
1533 211432 : break;
1534 68828 : case REORDER_BUFFER_CHANGE_UPDATE:
1535 68828 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1536 68828 : new_slot, data->binary, relentry->columns);
1537 68828 : break;
1538 83716 : case REORDER_BUFFER_CHANGE_DELETE:
1539 83716 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1540 83716 : data->binary, relentry->columns);
1541 83716 : break;
1542 363976 : default:
1543 : Assert(false);
1544 : }
1545 :
1546 363976 : OutputPluginWrite(ctx, true);
1547 :
1548 363998 : cleanup:
1549 363998 : if (RelationIsValid(ancestor))
1550 : {
1551 96 : RelationClose(ancestor);
1552 96 : ancestor = NULL;
1553 : }
1554 :
1555 363998 : MemoryContextSwitchTo(old);
1556 363998 : MemoryContextReset(data->context);
1557 : }
1558 :
1559 : static void
1560 22 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1561 : int nrelations, Relation relations[], ReorderBufferChange *change)
1562 : {
1563 22 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1564 22 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1565 : MemoryContext old;
1566 : RelationSyncEntry *relentry;
1567 : int i;
1568 : int nrelids;
1569 : Oid *relids;
1570 22 : TransactionId xid = InvalidTransactionId;
1571 :
1572 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1573 22 : if (in_streaming)
1574 0 : xid = change->txn->xid;
1575 :
1576 22 : old = MemoryContextSwitchTo(data->context);
1577 :
1578 22 : relids = palloc0(nrelations * sizeof(Oid));
1579 22 : nrelids = 0;
1580 :
1581 62 : for (i = 0; i < nrelations; i++)
1582 : {
1583 40 : Relation relation = relations[i];
1584 40 : Oid relid = RelationGetRelid(relation);
1585 :
1586 40 : if (!is_publishable_relation(relation))
1587 0 : continue;
1588 :
1589 40 : relentry = get_rel_sync_entry(data, relation);
1590 :
1591 40 : if (!relentry->pubactions.pubtruncate)
1592 16 : continue;
1593 :
1594 : /*
1595 : * Don't send partitions if the publication wants to send only the
1596 : * root tables through it.
1597 : */
1598 24 : if (relation->rd_rel->relispartition &&
1599 20 : relentry->publish_as_relid != relid)
1600 0 : continue;
1601 :
1602 24 : relids[nrelids++] = relid;
1603 :
1604 : /* Send BEGIN if we haven't yet */
1605 24 : if (txndata && !txndata->sent_begin_txn)
1606 14 : pgoutput_send_begin(ctx, txn);
1607 :
1608 24 : maybe_send_schema(ctx, change, relation, relentry);
1609 : }
1610 :
1611 22 : if (nrelids > 0)
1612 : {
1613 14 : OutputPluginPrepareWrite(ctx, true);
1614 14 : logicalrep_write_truncate(ctx->out,
1615 : xid,
1616 : nrelids,
1617 : relids,
1618 14 : change->data.truncate.cascade,
1619 14 : change->data.truncate.restart_seqs);
1620 14 : OutputPluginWrite(ctx, true);
1621 : }
1622 :
1623 22 : MemoryContextSwitchTo(old);
1624 22 : MemoryContextReset(data->context);
1625 22 : }
1626 :
1627 : static void
1628 12 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1629 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1630 : const char *message)
1631 : {
1632 12 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1633 12 : TransactionId xid = InvalidTransactionId;
1634 :
1635 12 : if (!data->messages)
1636 2 : return;
1637 :
1638 : /*
1639 : * Remember the xid for the message in streaming mode. See
1640 : * pgoutput_change.
1641 : */
1642 10 : if (in_streaming)
1643 0 : xid = txn->xid;
1644 :
1645 : /*
1646 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1647 : */
1648 10 : if (transactional)
1649 : {
1650 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1651 :
1652 : /* Send BEGIN if we haven't yet */
1653 4 : if (txndata && !txndata->sent_begin_txn)
1654 4 : pgoutput_send_begin(ctx, txn);
1655 : }
1656 :
1657 10 : OutputPluginPrepareWrite(ctx, true);
1658 10 : logicalrep_write_message(ctx->out,
1659 : xid,
1660 : message_lsn,
1661 : transactional,
1662 : prefix,
1663 : sz,
1664 : message);
1665 10 : OutputPluginWrite(ctx, true);
1666 : }
1667 :
1668 : /*
1669 : * Return true if the data is associated with an origin and the user has
1670 : * requested the changes that don't have an origin, false otherwise.
1671 : */
1672 : static bool
1673 981542 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1674 : RepOriginId origin_id)
1675 : {
1676 981542 : if (publish_no_origin && origin_id != InvalidRepOriginId)
1677 92 : return true;
1678 :
1679 981450 : return false;
1680 : }
1681 :
1682 : /*
1683 : * Shutdown the output plugin.
1684 : *
1685 : * Note, we don't need to clean the data->context and data->cachectx as
1686 : * they are child contexts of the ctx->context so they will be cleaned up by
1687 : * logical decoding machinery.
1688 : */
1689 : static void
1690 770 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1691 : {
1692 770 : if (RelationSyncCache)
1693 : {
1694 308 : hash_destroy(RelationSyncCache);
1695 308 : RelationSyncCache = NULL;
1696 : }
1697 770 : }
1698 :
1699 : /*
1700 : * Load publications from the list of publication names.
1701 : */
1702 : static List *
1703 258 : LoadPublications(List *pubnames)
1704 : {
1705 258 : List *result = NIL;
1706 : ListCell *lc;
1707 :
1708 600 : foreach(lc, pubnames)
1709 : {
1710 344 : char *pubname = (char *) lfirst(lc);
1711 344 : Publication *pub = GetPublicationByName(pubname, false);
1712 :
1713 342 : result = lappend(result, pub);
1714 : }
1715 :
1716 256 : return result;
1717 : }
1718 :
1719 : /*
1720 : * Publication syscache invalidation callback.
1721 : *
1722 : * Called for invalidations on pg_publication.
1723 : */
1724 : static void
1725 436 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1726 : {
1727 436 : publications_valid = false;
1728 :
1729 : /*
1730 : * Also invalidate per-relation cache so that next time the filtering info
1731 : * is checked it will be updated with the new publication settings.
1732 : */
1733 436 : rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1734 436 : }
1735 :
1736 : /*
1737 : * START STREAM callback
1738 : */
1739 : static void
1740 1270 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1741 : ReorderBufferTXN *txn)
1742 : {
1743 1270 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1744 :
1745 : /* we can't nest streaming of transactions */
1746 : Assert(!in_streaming);
1747 :
1748 : /*
1749 : * If we already sent the first stream for this transaction then don't
1750 : * send the origin id in the subsequent streams.
1751 : */
1752 1270 : if (rbtxn_is_streamed(txn))
1753 1144 : send_replication_origin = false;
1754 :
1755 1270 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1756 1270 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1757 :
1758 1270 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1759 : send_replication_origin);
1760 :
1761 1270 : OutputPluginWrite(ctx, true);
1762 :
1763 : /* we're streaming a chunk of transaction now */
1764 1270 : in_streaming = true;
1765 1270 : }
1766 :
1767 : /*
1768 : * STOP STREAM callback
1769 : */
1770 : static void
1771 1270 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1772 : ReorderBufferTXN *txn)
1773 : {
1774 : /* we should be streaming a transaction */
1775 : Assert(in_streaming);
1776 :
1777 1270 : OutputPluginPrepareWrite(ctx, true);
1778 1270 : logicalrep_write_stream_stop(ctx->out);
1779 1270 : OutputPluginWrite(ctx, true);
1780 :
1781 : /* we've stopped streaming a transaction */
1782 1270 : in_streaming = false;
1783 1270 : }
1784 :
1785 : /*
1786 : * Notify downstream to discard the streamed transaction (along with all
1787 : * it's subtransactions, if it's a toplevel transaction).
1788 : */
1789 : static void
1790 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1791 : ReorderBufferTXN *txn,
1792 : XLogRecPtr abort_lsn)
1793 : {
1794 : ReorderBufferTXN *toptxn;
1795 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1796 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1797 :
1798 : /*
1799 : * The abort should happen outside streaming block, even for streamed
1800 : * transactions. The transaction has to be marked as streamed, though.
1801 : */
1802 : Assert(!in_streaming);
1803 :
1804 : /* determine the toplevel transaction */
1805 52 : toptxn = rbtxn_get_toptxn(txn);
1806 :
1807 : Assert(rbtxn_is_streamed(toptxn));
1808 :
1809 52 : OutputPluginPrepareWrite(ctx, true);
1810 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1811 : txn->xact_time.abort_time, write_abort_info);
1812 :
1813 52 : OutputPluginWrite(ctx, true);
1814 :
1815 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1816 52 : }
1817 :
1818 : /*
1819 : * Notify downstream to apply the streamed transaction (along with all
1820 : * it's subtransactions).
1821 : */
1822 : static void
1823 92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1824 : ReorderBufferTXN *txn,
1825 : XLogRecPtr commit_lsn)
1826 : {
1827 : /*
1828 : * The commit should happen outside streaming block, even for streamed
1829 : * transactions. The transaction has to be marked as streamed, though.
1830 : */
1831 : Assert(!in_streaming);
1832 : Assert(rbtxn_is_streamed(txn));
1833 :
1834 92 : OutputPluginUpdateProgress(ctx, false);
1835 :
1836 92 : OutputPluginPrepareWrite(ctx, true);
1837 92 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1838 92 : OutputPluginWrite(ctx, true);
1839 :
1840 92 : cleanup_rel_sync_cache(txn->xid, true);
1841 92 : }
1842 :
1843 : /*
1844 : * PREPARE callback (for streaming two-phase commit).
1845 : *
1846 : * Notify the downstream to prepare the transaction.
1847 : */
1848 : static void
1849 28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1850 : ReorderBufferTXN *txn,
1851 : XLogRecPtr prepare_lsn)
1852 : {
1853 : Assert(rbtxn_is_streamed(txn));
1854 :
1855 28 : OutputPluginUpdateProgress(ctx, false);
1856 28 : OutputPluginPrepareWrite(ctx, true);
1857 28 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1858 28 : OutputPluginWrite(ctx, true);
1859 28 : }
1860 :
1861 : /*
1862 : * Initialize the relation schema sync cache for a decoding session.
1863 : *
1864 : * The hash table is destroyed at the end of a decoding session. While
1865 : * relcache invalidations still exist and will still be invoked, they
1866 : * will just see the null hash table global and take no action.
1867 : */
1868 : static void
1869 564 : init_rel_sync_cache(MemoryContext cachectx)
1870 : {
1871 : HASHCTL ctl;
1872 : static bool relation_callbacks_registered = false;
1873 :
1874 : /* Nothing to do if hash table already exists */
1875 564 : if (RelationSyncCache != NULL)
1876 0 : return;
1877 :
1878 : /* Make a new hash table for the cache */
1879 564 : ctl.keysize = sizeof(Oid);
1880 564 : ctl.entrysize = sizeof(RelationSyncEntry);
1881 564 : ctl.hcxt = cachectx;
1882 :
1883 564 : RelationSyncCache = hash_create("logical replication output relation cache",
1884 : 128, &ctl,
1885 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1886 :
1887 : Assert(RelationSyncCache != NULL);
1888 :
1889 : /* No more to do if we already registered callbacks */
1890 564 : if (relation_callbacks_registered)
1891 0 : return;
1892 :
1893 : /* We must update the cache entry for a relation after a relcache flush */
1894 564 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1895 :
1896 : /*
1897 : * Flush all cache entries after a pg_namespace change, in case it was a
1898 : * schema rename affecting a relation being replicated.
1899 : */
1900 564 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1901 : rel_sync_cache_publication_cb,
1902 : (Datum) 0);
1903 :
1904 : /*
1905 : * Flush all cache entries after any publication changes. (We need no
1906 : * callback entry for pg_publication, because publication_invalidation_cb
1907 : * will take care of it.)
1908 : */
1909 564 : CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1910 : rel_sync_cache_publication_cb,
1911 : (Datum) 0);
1912 564 : CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1913 : rel_sync_cache_publication_cb,
1914 : (Datum) 0);
1915 :
1916 564 : relation_callbacks_registered = true;
1917 : }
1918 :
1919 : /*
1920 : * We expect relatively small number of streamed transactions.
1921 : */
1922 : static bool
1923 351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1924 : {
1925 351882 : return list_member_xid(entry->streamed_txns, xid);
1926 : }
1927 :
1928 : /*
1929 : * Add the xid in the rel sync entry for which we have already sent the schema
1930 : * of the relation.
1931 : */
1932 : static void
1933 144 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1934 : {
1935 : MemoryContext oldctx;
1936 :
1937 144 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1938 :
1939 144 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1940 :
1941 144 : MemoryContextSwitchTo(oldctx);
1942 144 : }
1943 :
1944 : /*
1945 : * Find or create entry in the relation schema cache.
1946 : *
1947 : * This looks up publications that the given relation is directly or
1948 : * indirectly part of (the latter if it's really the relation's ancestor that
1949 : * is part of a publication) and fills up the found entry with the information
1950 : * about which operations to publish and whether to use an ancestor's schema
1951 : * when publishing.
1952 : */
1953 : static RelationSyncEntry *
1954 366316 : get_rel_sync_entry(PGOutputData *data, Relation relation)
1955 : {
1956 : RelationSyncEntry *entry;
1957 : bool found;
1958 : MemoryContext oldctx;
1959 366316 : Oid relid = RelationGetRelid(relation);
1960 :
1961 : Assert(RelationSyncCache != NULL);
1962 :
1963 : /* Find cached relation info, creating if not found */
1964 366316 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
1965 : &relid,
1966 : HASH_ENTER, &found);
1967 : Assert(entry != NULL);
1968 :
1969 : /* initialize entry, if it's new */
1970 366316 : if (!found)
1971 : {
1972 420 : entry->replicate_valid = false;
1973 420 : entry->schema_sent = false;
1974 420 : entry->streamed_txns = NIL;
1975 420 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1976 420 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1977 420 : entry->new_slot = NULL;
1978 420 : entry->old_slot = NULL;
1979 420 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
1980 420 : entry->entry_cxt = NULL;
1981 420 : entry->publish_as_relid = InvalidOid;
1982 420 : entry->columns = NULL;
1983 420 : entry->attrmap = NULL;
1984 : }
1985 :
1986 : /* Validate the entry */
1987 366316 : if (!entry->replicate_valid)
1988 : {
1989 526 : Oid schemaId = get_rel_namespace(relid);
1990 526 : List *pubids = GetRelationPublications(relid);
1991 :
1992 : /*
1993 : * We don't acquire a lock on the namespace system table as we build
1994 : * the cache entry using a historic snapshot and all the later changes
1995 : * are absorbed while decoding WAL.
1996 : */
1997 526 : List *schemaPubids = GetSchemaPublications(schemaId);
1998 : ListCell *lc;
1999 526 : Oid publish_as_relid = relid;
2000 526 : int publish_ancestor_level = 0;
2001 526 : bool am_partition = get_rel_relispartition(relid);
2002 526 : char relkind = get_rel_relkind(relid);
2003 526 : List *rel_publications = NIL;
2004 :
2005 : /* Reload publications if needed before use. */
2006 526 : if (!publications_valid)
2007 : {
2008 258 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2009 258 : if (data->publications)
2010 : {
2011 38 : list_free_deep(data->publications);
2012 38 : data->publications = NIL;
2013 : }
2014 258 : data->publications = LoadPublications(data->publication_names);
2015 256 : MemoryContextSwitchTo(oldctx);
2016 256 : publications_valid = true;
2017 : }
2018 :
2019 : /*
2020 : * Reset schema_sent status as the relation definition may have
2021 : * changed. Also reset pubactions to empty in case rel was dropped
2022 : * from a publication. Also free any objects that depended on the
2023 : * earlier definition.
2024 : */
2025 524 : entry->schema_sent = false;
2026 524 : list_free(entry->streamed_txns);
2027 524 : entry->streamed_txns = NIL;
2028 524 : bms_free(entry->columns);
2029 524 : entry->columns = NULL;
2030 524 : entry->pubactions.pubinsert = false;
2031 524 : entry->pubactions.pubupdate = false;
2032 524 : entry->pubactions.pubdelete = false;
2033 524 : entry->pubactions.pubtruncate = false;
2034 :
2035 : /*
2036 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2037 : */
2038 524 : if (entry->old_slot)
2039 98 : ExecDropSingleTupleTableSlot(entry->old_slot);
2040 524 : if (entry->new_slot)
2041 98 : ExecDropSingleTupleTableSlot(entry->new_slot);
2042 :
2043 524 : entry->old_slot = NULL;
2044 524 : entry->new_slot = NULL;
2045 :
2046 524 : if (entry->attrmap)
2047 0 : free_attrmap(entry->attrmap);
2048 524 : entry->attrmap = NULL;
2049 :
2050 : /*
2051 : * Row filter cache cleanups.
2052 : */
2053 524 : if (entry->entry_cxt)
2054 20 : MemoryContextDelete(entry->entry_cxt);
2055 :
2056 524 : entry->entry_cxt = NULL;
2057 524 : entry->estate = NULL;
2058 524 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2059 :
2060 : /*
2061 : * Build publication cache. We can't use one provided by relcache as
2062 : * relcache considers all publications that the given relation is in,
2063 : * but here we only need to consider ones that the subscriber
2064 : * requested.
2065 : */
2066 1342 : foreach(lc, data->publications)
2067 : {
2068 818 : Publication *pub = lfirst(lc);
2069 818 : bool publish = false;
2070 :
2071 : /*
2072 : * Under what relid should we publish changes in this publication?
2073 : * We'll use the top-most relid across all publications. Also
2074 : * track the ancestor level for this publication.
2075 : */
2076 818 : Oid pub_relid = relid;
2077 818 : int ancestor_level = 0;
2078 :
2079 : /*
2080 : * If this is a FOR ALL TABLES publication, pick the partition
2081 : * root and set the ancestor level accordingly.
2082 : */
2083 818 : if (pub->alltables)
2084 : {
2085 126 : publish = true;
2086 126 : if (pub->pubviaroot && am_partition)
2087 : {
2088 24 : List *ancestors = get_partition_ancestors(relid);
2089 :
2090 24 : pub_relid = llast_oid(ancestors);
2091 24 : ancestor_level = list_length(ancestors);
2092 : }
2093 : }
2094 :
2095 818 : if (!publish)
2096 : {
2097 692 : bool ancestor_published = false;
2098 :
2099 : /*
2100 : * For a partition, check if any of the ancestors are
2101 : * published. If so, note down the topmost ancestor that is
2102 : * published via this publication, which will be used as the
2103 : * relation via which to publish the partition's changes.
2104 : */
2105 692 : if (am_partition)
2106 : {
2107 : Oid ancestor;
2108 : int level;
2109 198 : List *ancestors = get_partition_ancestors(relid);
2110 :
2111 198 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2112 : ancestors,
2113 : &level);
2114 :
2115 198 : if (ancestor != InvalidOid)
2116 : {
2117 80 : ancestor_published = true;
2118 80 : if (pub->pubviaroot)
2119 : {
2120 34 : pub_relid = ancestor;
2121 34 : ancestor_level = level;
2122 : }
2123 : }
2124 : }
2125 :
2126 1086 : if (list_member_oid(pubids, pub->oid) ||
2127 776 : list_member_oid(schemaPubids, pub->oid) ||
2128 : ancestor_published)
2129 360 : publish = true;
2130 : }
2131 :
2132 : /*
2133 : * If the relation is to be published, determine actions to
2134 : * publish, and list of columns, if appropriate.
2135 : *
2136 : * Don't publish changes for partitioned tables, because
2137 : * publishing those of its partitions suffices, unless partition
2138 : * changes won't be published due to pubviaroot being set.
2139 : */
2140 818 : if (publish &&
2141 6 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2142 : {
2143 480 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2144 480 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2145 480 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2146 480 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2147 :
2148 : /*
2149 : * We want to publish the changes as the top-most ancestor
2150 : * across all publications. So we need to check if the already
2151 : * calculated level is higher than the new one. If yes, we can
2152 : * ignore the new value (as it's a child). Otherwise the new
2153 : * value is an ancestor, so we keep it.
2154 : */
2155 480 : if (publish_ancestor_level > ancestor_level)
2156 2 : continue;
2157 :
2158 : /*
2159 : * If we found an ancestor higher up in the tree, discard the
2160 : * list of publications through which we replicate it, and use
2161 : * the new ancestor.
2162 : */
2163 478 : if (publish_ancestor_level < ancestor_level)
2164 : {
2165 58 : publish_as_relid = pub_relid;
2166 58 : publish_ancestor_level = ancestor_level;
2167 :
2168 : /* reset the publication list for this relation */
2169 58 : rel_publications = NIL;
2170 : }
2171 : else
2172 : {
2173 : /* Same ancestor level, has to be the same OID. */
2174 : Assert(publish_as_relid == pub_relid);
2175 : }
2176 :
2177 : /* Track publications for this ancestor. */
2178 478 : rel_publications = lappend(rel_publications, pub);
2179 : }
2180 : }
2181 :
2182 524 : entry->publish_as_relid = publish_as_relid;
2183 :
2184 : /*
2185 : * Initialize the tuple slot, map, and row filter. These are only used
2186 : * when publishing inserts, updates, or deletes.
2187 : */
2188 524 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2189 72 : entry->pubactions.pubdelete)
2190 : {
2191 : /* Initialize the tuple slot and map */
2192 452 : init_tuple_slot(data, relation, entry);
2193 :
2194 : /* Initialize the row filter */
2195 452 : pgoutput_row_filter_init(data, rel_publications, entry);
2196 :
2197 : /* Initialize the column list */
2198 452 : pgoutput_column_list_init(data, rel_publications, entry);
2199 : }
2200 :
2201 522 : list_free(pubids);
2202 522 : list_free(schemaPubids);
2203 522 : list_free(rel_publications);
2204 :
2205 522 : entry->replicate_valid = true;
2206 : }
2207 :
2208 366312 : return entry;
2209 : }
2210 :
2211 : /*
2212 : * Cleanup list of streamed transactions and update the schema_sent flag.
2213 : *
2214 : * When a streamed transaction commits or aborts, we need to remove the
2215 : * toplevel XID from the schema cache. If the transaction aborted, the
2216 : * subscriber will simply throw away the schema records we streamed, so
2217 : * we don't need to do anything else.
2218 : *
2219 : * If the transaction is committed, the subscriber will update the relation
2220 : * cache - so tweak the schema_sent flag accordingly.
2221 : */
2222 : static void
2223 144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2224 : {
2225 : HASH_SEQ_STATUS hash_seq;
2226 : RelationSyncEntry *entry;
2227 : ListCell *lc;
2228 :
2229 : Assert(RelationSyncCache != NULL);
2230 :
2231 144 : hash_seq_init(&hash_seq, RelationSyncCache);
2232 294 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2233 : {
2234 : /*
2235 : * We can set the schema_sent flag for an entry that has committed xid
2236 : * in the list as that ensures that the subscriber would have the
2237 : * corresponding schema and we don't need to send it unless there is
2238 : * any invalidation for that relation.
2239 : */
2240 190 : foreach(lc, entry->streamed_txns)
2241 : {
2242 148 : if (xid == lfirst_xid(lc))
2243 : {
2244 108 : if (is_commit)
2245 86 : entry->schema_sent = true;
2246 :
2247 108 : entry->streamed_txns =
2248 108 : foreach_delete_current(entry->streamed_txns, lc);
2249 108 : break;
2250 : }
2251 : }
2252 : }
2253 144 : }
2254 :
2255 : /*
2256 : * Relcache invalidation callback
2257 : */
2258 : static void
2259 5994 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2260 : {
2261 : RelationSyncEntry *entry;
2262 :
2263 : /*
2264 : * We can get here if the plugin was used in SQL interface as the
2265 : * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2266 : * is no way to unregister the relcache invalidation callback.
2267 : */
2268 5994 : if (RelationSyncCache == NULL)
2269 12 : return;
2270 :
2271 : /*
2272 : * Nobody keeps pointers to entries in this hash table around outside
2273 : * logical decoding callback calls - but invalidation events can come in
2274 : * *during* a callback if we do any syscache access in the callback.
2275 : * Because of that we must mark the cache entry as invalid but not damage
2276 : * any of its substructure here. The next get_rel_sync_entry() call will
2277 : * rebuild it all.
2278 : */
2279 5982 : if (OidIsValid(relid))
2280 : {
2281 : /*
2282 : * Getting invalidations for relations that aren't in the table is
2283 : * entirely normal. So we don't care if it's found or not.
2284 : */
2285 5938 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2286 : HASH_FIND, NULL);
2287 5938 : if (entry != NULL)
2288 842 : entry->replicate_valid = false;
2289 : }
2290 : else
2291 : {
2292 : /* Whole cache must be flushed. */
2293 : HASH_SEQ_STATUS status;
2294 :
2295 44 : hash_seq_init(&status, RelationSyncCache);
2296 104 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2297 : {
2298 60 : entry->replicate_valid = false;
2299 : }
2300 : }
2301 : }
2302 :
2303 : /*
2304 : * Publication relation/schema map syscache invalidation callback
2305 : *
2306 : * Called for invalidations on pg_publication, pg_publication_rel,
2307 : * pg_publication_namespace, and pg_namespace.
2308 : */
2309 : static void
2310 1248 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2311 : {
2312 : HASH_SEQ_STATUS status;
2313 : RelationSyncEntry *entry;
2314 :
2315 : /*
2316 : * We can get here if the plugin was used in SQL interface as the
2317 : * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2318 : * is no way to unregister the invalidation callbacks.
2319 : */
2320 1248 : if (RelationSyncCache == NULL)
2321 48 : return;
2322 :
2323 : /*
2324 : * We have no easy way to identify which cache entries this invalidation
2325 : * event might have affected, so just mark them all invalid.
2326 : */
2327 1200 : hash_seq_init(&status, RelationSyncCache);
2328 3610 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2329 : {
2330 2410 : entry->replicate_valid = false;
2331 : }
2332 : }
2333 :
2334 : /* Send Replication origin */
2335 : static void
2336 1980 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2337 : XLogRecPtr origin_lsn, bool send_origin)
2338 : {
2339 1980 : if (send_origin)
2340 : {
2341 : char *origin;
2342 :
2343 : /*----------
2344 : * XXX: which behaviour do we want here?
2345 : *
2346 : * Alternatives:
2347 : * - don't send origin message if origin name not found
2348 : * (that's what we do now)
2349 : * - throw error - that will break replication, not good
2350 : * - send some special "unknown" origin
2351 : *----------
2352 : */
2353 16 : if (replorigin_by_oid(origin_id, true, &origin))
2354 : {
2355 : /* Message boundary */
2356 16 : OutputPluginWrite(ctx, false);
2357 16 : OutputPluginPrepareWrite(ctx, true);
2358 :
2359 16 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2360 : }
2361 : }
2362 1980 : }
|