Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "optimizer/optimizer.h"
26 : #include "parser/parse_relation.h"
27 : #include "replication/logical.h"
28 : #include "replication/logicalproto.h"
29 : #include "replication/origin.h"
30 : #include "replication/pgoutput.h"
31 : #include "utils/builtins.h"
32 : #include "utils/inval.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/memutils.h"
35 : #include "utils/rel.h"
36 : #include "utils/syscache.h"
37 : #include "utils/varlena.h"
38 :
39 782 : PG_MODULE_MAGIC;
40 :
41 : static void pgoutput_startup(LogicalDecodingContext *ctx,
42 : OutputPluginOptions *opt, bool is_init);
43 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
44 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
45 : ReorderBufferTXN *txn);
46 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
47 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
48 : static void pgoutput_change(LogicalDecodingContext *ctx,
49 : ReorderBufferTXN *txn, Relation relation,
50 : ReorderBufferChange *change);
51 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
52 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
53 : ReorderBufferChange *change);
54 : static void pgoutput_message(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
56 : bool transactional, const char *prefix,
57 : Size sz, const char *message);
58 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
59 : RepOriginId origin_id);
60 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
61 : ReorderBufferTXN *txn);
62 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
63 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
64 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
65 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
66 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
67 : ReorderBufferTXN *txn,
68 : XLogRecPtr prepare_end_lsn,
69 : TimestampTz prepare_time);
70 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
71 : ReorderBufferTXN *txn);
72 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
73 : ReorderBufferTXN *txn);
74 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
75 : ReorderBufferTXN *txn,
76 : XLogRecPtr abort_lsn);
77 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
78 : ReorderBufferTXN *txn,
79 : XLogRecPtr commit_lsn);
80 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
82 :
83 : static bool publications_valid;
84 :
85 : static List *LoadPublications(List *pubnames);
86 : static void publication_invalidation_cb(Datum arg, int cacheid,
87 : uint32 hashvalue);
88 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
89 : LogicalDecodingContext *ctx,
90 : Bitmapset *columns);
91 : static void send_repl_origin(LogicalDecodingContext *ctx,
92 : RepOriginId origin_id, XLogRecPtr origin_lsn,
93 : bool send_origin);
94 :
95 : /*
96 : * Only 3 publication actions are used for row filtering ("insert", "update",
97 : * "delete"). See RelationSyncEntry.exprstate[].
98 : */
99 : enum RowFilterPubAction
100 : {
101 : PUBACTION_INSERT,
102 : PUBACTION_UPDATE,
103 : PUBACTION_DELETE,
104 : };
105 :
106 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107 :
108 : /*
109 : * Entry in the map used to remember which relation schemas we sent.
110 : *
111 : * The schema_sent flag determines if the current schema record for the
112 : * relation (and for its ancestor if publish_as_relid is set) was already
113 : * sent to the subscriber (in which case we don't need to send it again).
114 : *
115 : * The schema cache on downstream is however updated only at commit time,
116 : * and with streamed transactions the commit order may be different from
117 : * the order the transactions are sent in. Also, the (sub) transactions
118 : * might get aborted so we need to send the schema for each (sub) transaction
119 : * so that we don't lose the schema information on abort. For handling this,
120 : * we maintain the list of xids (streamed_txns) for those we have already sent
121 : * the schema.
122 : *
123 : * For partitions, 'pubactions' considers not only the table's own
124 : * publications, but also those of all of its ancestors.
125 : */
126 : typedef struct RelationSyncEntry
127 : {
128 : Oid relid; /* relation oid */
129 :
130 : bool replicate_valid; /* overall validity flag for entry */
131 :
132 : bool schema_sent;
133 : List *streamed_txns; /* streamed toplevel transactions with this
134 : * schema */
135 :
136 : /* are we publishing this rel? */
137 : PublicationActions pubactions;
138 :
139 : /*
140 : * ExprState array for row filter. Different publication actions don't
141 : * allow multiple expressions to always be combined into one, because
142 : * updates or deletes restrict the column in expression to be part of the
143 : * replica identity index whereas inserts do not have this restriction, so
144 : * there is one ExprState per publication action.
145 : */
146 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
147 : EState *estate; /* executor state used for row filter */
148 : TupleTableSlot *new_slot; /* slot for storing new tuple */
149 : TupleTableSlot *old_slot; /* slot for storing old tuple */
150 :
151 : /*
152 : * OID of the relation to publish changes as. For a partition, this may
153 : * be set to one of its ancestors whose schema will be used when
154 : * replicating changes, if publish_via_partition_root is set for the
155 : * publication.
156 : */
157 : Oid publish_as_relid;
158 :
159 : /*
160 : * Map used when replicating using an ancestor's schema to convert tuples
161 : * from partition's type to the ancestor's; NULL if publish_as_relid is
162 : * same as 'relid' or if unnecessary due to partition and the ancestor
163 : * having identical TupleDesc.
164 : */
165 : AttrMap *attrmap;
166 :
167 : /*
168 : * Columns included in the publication, or NULL if all columns are
169 : * included implicitly. Note that the attnums in this bitmap are not
170 : * shifted by FirstLowInvalidHeapAttributeNumber.
171 : */
172 : Bitmapset *columns;
173 :
174 : /*
175 : * Private context to store additional data for this entry - state for the
176 : * row filter expressions, column list, etc.
177 : */
178 : MemoryContext entry_cxt;
179 : } RelationSyncEntry;
180 :
181 : /*
182 : * Maintain a per-transaction level variable to track whether the transaction
183 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
184 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
185 : * messages for empty transactions which saves network bandwidth.
186 : *
187 : * This optimization is not used for prepared transactions because if the
188 : * WALSender restarts after prepare of a transaction and before commit prepared
189 : * of the same transaction then we won't be able to figure out if we have
190 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
191 : * because we would have lost the in-memory txndata information that was
192 : * present prior to the restart. This will result in sending a spurious
193 : * COMMIT PREPARED without a corresponding prepared transaction at the
194 : * downstream which would lead to an error when it tries to process it.
195 : *
196 : * XXX We could achieve this optimization by changing protocol to send
197 : * additional information so that downstream can detect that the corresponding
198 : * prepare has not been sent. However, adding such a check for every
199 : * transaction in the downstream could be costly so we might want to do it
200 : * optionally.
201 : *
202 : * We also don't have this optimization for streamed transactions because
203 : * they can contain prepared transactions.
204 : */
205 : typedef struct PGOutputTxnData
206 : {
207 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
208 : } PGOutputTxnData;
209 :
210 : /* Map used to remember which relation schemas we sent. */
211 : static HTAB *RelationSyncCache = NULL;
212 :
213 : static void init_rel_sync_cache(MemoryContext cachectx);
214 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
215 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
216 : Relation relation);
217 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
218 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
219 : uint32 hashvalue);
220 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
221 : TransactionId xid);
222 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
223 : TransactionId xid);
224 : static void init_tuple_slot(PGOutputData *data, Relation relation,
225 : RelationSyncEntry *entry);
226 :
227 : /* row filter routines */
228 : static EState *create_estate_for_relation(Relation rel);
229 : static void pgoutput_row_filter_init(PGOutputData *data,
230 : List *publications,
231 : RelationSyncEntry *entry);
232 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
233 : ExprContext *econtext);
234 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
235 : TupleTableSlot **new_slot_ptr,
236 : RelationSyncEntry *entry,
237 : ReorderBufferChangeType *action);
238 :
239 : /* column list routines */
240 : static void pgoutput_column_list_init(PGOutputData *data,
241 : List *publications,
242 : RelationSyncEntry *entry);
243 :
244 : /*
245 : * Specify output plugin callbacks
246 : */
247 : void
248 1102 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
249 : {
250 1102 : cb->startup_cb = pgoutput_startup;
251 1102 : cb->begin_cb = pgoutput_begin_txn;
252 1102 : cb->change_cb = pgoutput_change;
253 1102 : cb->truncate_cb = pgoutput_truncate;
254 1102 : cb->message_cb = pgoutput_message;
255 1102 : cb->commit_cb = pgoutput_commit_txn;
256 :
257 1102 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
258 1102 : cb->prepare_cb = pgoutput_prepare_txn;
259 1102 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
260 1102 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
261 1102 : cb->filter_by_origin_cb = pgoutput_origin_filter;
262 1102 : cb->shutdown_cb = pgoutput_shutdown;
263 :
264 : /* transaction streaming */
265 1102 : cb->stream_start_cb = pgoutput_stream_start;
266 1102 : cb->stream_stop_cb = pgoutput_stream_stop;
267 1102 : cb->stream_abort_cb = pgoutput_stream_abort;
268 1102 : cb->stream_commit_cb = pgoutput_stream_commit;
269 1102 : cb->stream_change_cb = pgoutput_change;
270 1102 : cb->stream_message_cb = pgoutput_message;
271 1102 : cb->stream_truncate_cb = pgoutput_truncate;
272 : /* transaction streaming - two-phase commit */
273 1102 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
274 1102 : }
275 :
276 : static void
277 608 : parse_output_parameters(List *options, PGOutputData *data)
278 : {
279 : ListCell *lc;
280 608 : bool protocol_version_given = false;
281 608 : bool publication_names_given = false;
282 608 : bool binary_option_given = false;
283 608 : bool messages_option_given = false;
284 608 : bool streaming_given = false;
285 608 : bool two_phase_option_given = false;
286 608 : bool origin_option_given = false;
287 :
288 608 : data->binary = false;
289 608 : data->streaming = LOGICALREP_STREAM_OFF;
290 608 : data->messages = false;
291 608 : data->two_phase = false;
292 :
293 2528 : foreach(lc, options)
294 : {
295 1920 : DefElem *defel = (DefElem *) lfirst(lc);
296 :
297 : Assert(defel->arg == NULL || IsA(defel->arg, String));
298 :
299 : /* Check each param, whether or not we recognize it */
300 1920 : if (strcmp(defel->defname, "proto_version") == 0)
301 : {
302 : unsigned long parsed;
303 : char *endptr;
304 :
305 608 : if (protocol_version_given)
306 0 : ereport(ERROR,
307 : (errcode(ERRCODE_SYNTAX_ERROR),
308 : errmsg("conflicting or redundant options")));
309 608 : protocol_version_given = true;
310 :
311 608 : errno = 0;
312 608 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
313 608 : if (errno != 0 || *endptr != '\0')
314 0 : ereport(ERROR,
315 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
316 : errmsg("invalid proto_version")));
317 :
318 608 : if (parsed > PG_UINT32_MAX)
319 0 : ereport(ERROR,
320 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
321 : errmsg("proto_version \"%s\" out of range",
322 : strVal(defel->arg))));
323 :
324 608 : data->protocol_version = (uint32) parsed;
325 : }
326 1312 : else if (strcmp(defel->defname, "publication_names") == 0)
327 : {
328 608 : if (publication_names_given)
329 0 : ereport(ERROR,
330 : (errcode(ERRCODE_SYNTAX_ERROR),
331 : errmsg("conflicting or redundant options")));
332 608 : publication_names_given = true;
333 :
334 608 : if (!SplitIdentifierString(strVal(defel->arg), ',',
335 : &data->publication_names))
336 0 : ereport(ERROR,
337 : (errcode(ERRCODE_INVALID_NAME),
338 : errmsg("invalid publication_names syntax")));
339 : }
340 704 : else if (strcmp(defel->defname, "binary") == 0)
341 : {
342 20 : if (binary_option_given)
343 0 : ereport(ERROR,
344 : (errcode(ERRCODE_SYNTAX_ERROR),
345 : errmsg("conflicting or redundant options")));
346 20 : binary_option_given = true;
347 :
348 20 : data->binary = defGetBoolean(defel);
349 : }
350 684 : else if (strcmp(defel->defname, "messages") == 0)
351 : {
352 8 : if (messages_option_given)
353 0 : ereport(ERROR,
354 : (errcode(ERRCODE_SYNTAX_ERROR),
355 : errmsg("conflicting or redundant options")));
356 8 : messages_option_given = true;
357 :
358 8 : data->messages = defGetBoolean(defel);
359 : }
360 676 : else if (strcmp(defel->defname, "streaming") == 0)
361 : {
362 70 : if (streaming_given)
363 0 : ereport(ERROR,
364 : (errcode(ERRCODE_SYNTAX_ERROR),
365 : errmsg("conflicting or redundant options")));
366 70 : streaming_given = true;
367 :
368 70 : data->streaming = defGetStreamingMode(defel);
369 : }
370 606 : else if (strcmp(defel->defname, "two_phase") == 0)
371 : {
372 12 : if (two_phase_option_given)
373 0 : ereport(ERROR,
374 : (errcode(ERRCODE_SYNTAX_ERROR),
375 : errmsg("conflicting or redundant options")));
376 12 : two_phase_option_given = true;
377 :
378 12 : data->two_phase = defGetBoolean(defel);
379 : }
380 594 : else if (strcmp(defel->defname, "origin") == 0)
381 : {
382 : char *origin;
383 :
384 594 : if (origin_option_given)
385 0 : ereport(ERROR,
386 : errcode(ERRCODE_SYNTAX_ERROR),
387 : errmsg("conflicting or redundant options"));
388 594 : origin_option_given = true;
389 :
390 594 : origin = defGetString(defel);
391 594 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
392 20 : data->publish_no_origin = true;
393 574 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
394 574 : data->publish_no_origin = false;
395 : else
396 0 : ereport(ERROR,
397 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
398 : errmsg("unrecognized origin value: \"%s\"", origin));
399 : }
400 : else
401 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
402 : }
403 608 : }
404 :
405 : /*
406 : * Initialize this plugin
407 : */
408 : static void
409 1102 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
410 : bool is_init)
411 : {
412 1102 : PGOutputData *data = palloc0(sizeof(PGOutputData));
413 : static bool publication_callback_registered = false;
414 :
415 : /* Create our memory context for private allocations. */
416 1102 : data->context = AllocSetContextCreate(ctx->context,
417 : "logical replication output context",
418 : ALLOCSET_DEFAULT_SIZES);
419 :
420 1102 : data->cachectx = AllocSetContextCreate(ctx->context,
421 : "logical replication cache context",
422 : ALLOCSET_DEFAULT_SIZES);
423 :
424 1102 : ctx->output_plugin_private = data;
425 :
426 : /* This plugin uses binary protocol. */
427 1102 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
428 :
429 : /*
430 : * This is replication start and not slot initialization.
431 : *
432 : * Parse and validate options passed by the client.
433 : */
434 1102 : if (!is_init)
435 : {
436 : /* Parse the params and ERROR if we see any we don't recognize */
437 608 : parse_output_parameters(ctx->output_plugin_options, data);
438 :
439 : /* Check if we support requested protocol */
440 608 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
441 0 : ereport(ERROR,
442 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
443 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
444 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
445 :
446 608 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
447 0 : ereport(ERROR,
448 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
449 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
450 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
451 :
452 608 : if (data->publication_names == NIL)
453 0 : ereport(ERROR,
454 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455 : errmsg("publication_names parameter missing")));
456 :
457 : /*
458 : * Decide whether to enable streaming. It is disabled by default, in
459 : * which case we just update the flag in decoding context. Otherwise
460 : * we only allow it with sufficient version of the protocol, and when
461 : * the output plugin supports it.
462 : */
463 608 : if (data->streaming == LOGICALREP_STREAM_OFF)
464 538 : ctx->streaming = false;
465 70 : else if (data->streaming == LOGICALREP_STREAM_ON &&
466 54 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
467 0 : ereport(ERROR,
468 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
470 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
471 70 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
472 16 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
473 0 : ereport(ERROR,
474 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
475 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
476 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
477 70 : else if (!ctx->streaming)
478 0 : ereport(ERROR,
479 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
480 : errmsg("streaming requested, but not supported by output plugin")));
481 :
482 : /*
483 : * Here, we just check whether the two-phase option is passed by
484 : * plugin and decide whether to enable it at later point of time. It
485 : * remains enabled if the previous start-up has done so. But we only
486 : * allow the option to be passed in with sufficient version of the
487 : * protocol, and when the output plugin supports it.
488 : */
489 608 : if (!data->two_phase)
490 596 : ctx->twophase_opt_given = false;
491 12 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
492 0 : ereport(ERROR,
493 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
494 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
495 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
496 12 : else if (!ctx->twophase)
497 0 : ereport(ERROR,
498 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
499 : errmsg("two-phase commit requested, but not supported by output plugin")));
500 : else
501 12 : ctx->twophase_opt_given = true;
502 :
503 : /* Init publication state. */
504 608 : data->publications = NIL;
505 608 : publications_valid = false;
506 :
507 : /*
508 : * Register callback for pg_publication if we didn't already do that
509 : * during some previous call in this process.
510 : */
511 608 : if (!publication_callback_registered)
512 : {
513 606 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
514 : publication_invalidation_cb,
515 : (Datum) 0);
516 606 : publication_callback_registered = true;
517 : }
518 :
519 : /* Initialize relation schema cache. */
520 608 : init_rel_sync_cache(CacheMemoryContext);
521 : }
522 : else
523 : {
524 : /*
525 : * Disable the streaming and prepared transactions during the slot
526 : * initialization mode.
527 : */
528 494 : ctx->streaming = false;
529 494 : ctx->twophase = false;
530 : }
531 1102 : }
532 :
533 : /*
534 : * BEGIN callback.
535 : *
536 : * Don't send the BEGIN message here instead postpone it until the first
537 : * change. In logical replication, a common scenario is to replicate a set of
538 : * tables (instead of all tables) and transactions whose changes were on
539 : * the table(s) that are not published will produce empty transactions. These
540 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
541 : * using bandwidth on something with little/no use for logical replication.
542 : */
543 : static void
544 1232 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
545 : {
546 1232 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
547 : sizeof(PGOutputTxnData));
548 :
549 1232 : txn->output_plugin_private = txndata;
550 1232 : }
551 :
552 : /*
553 : * Send BEGIN.
554 : *
555 : * This is called while processing the first change of the transaction.
556 : */
557 : static void
558 694 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
559 : {
560 694 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
561 694 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
562 :
563 : Assert(txndata);
564 : Assert(!txndata->sent_begin_txn);
565 :
566 694 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
567 694 : logicalrep_write_begin(ctx->out, txn);
568 694 : txndata->sent_begin_txn = true;
569 :
570 694 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
571 : send_replication_origin);
572 :
573 694 : OutputPluginWrite(ctx, true);
574 692 : }
575 :
576 : /*
577 : * COMMIT callback
578 : */
579 : static void
580 1220 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
581 : XLogRecPtr commit_lsn)
582 : {
583 1220 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
584 : bool sent_begin_txn;
585 :
586 : Assert(txndata);
587 :
588 : /*
589 : * We don't need to send the commit message unless some relevant change
590 : * from this transaction has been sent to the downstream.
591 : */
592 1220 : sent_begin_txn = txndata->sent_begin_txn;
593 1220 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
594 1220 : pfree(txndata);
595 1220 : txn->output_plugin_private = NULL;
596 :
597 1220 : if (!sent_begin_txn)
598 : {
599 532 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
600 532 : return;
601 : }
602 :
603 688 : OutputPluginPrepareWrite(ctx, true);
604 688 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
605 688 : OutputPluginWrite(ctx, true);
606 : }
607 :
608 : /*
609 : * BEGIN PREPARE callback
610 : */
611 : static void
612 36 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
613 : {
614 36 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
615 :
616 36 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
617 36 : logicalrep_write_begin_prepare(ctx->out, txn);
618 :
619 36 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
620 : send_replication_origin);
621 :
622 36 : OutputPluginWrite(ctx, true);
623 36 : }
624 :
625 : /*
626 : * PREPARE callback
627 : */
628 : static void
629 36 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
630 : XLogRecPtr prepare_lsn)
631 : {
632 36 : OutputPluginUpdateProgress(ctx, false);
633 :
634 36 : OutputPluginPrepareWrite(ctx, true);
635 36 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
636 36 : OutputPluginWrite(ctx, true);
637 36 : }
638 :
639 : /*
640 : * COMMIT PREPARED callback
641 : */
642 : static void
643 46 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
644 : XLogRecPtr commit_lsn)
645 : {
646 46 : OutputPluginUpdateProgress(ctx, false);
647 :
648 46 : OutputPluginPrepareWrite(ctx, true);
649 46 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
650 46 : OutputPluginWrite(ctx, true);
651 46 : }
652 :
653 : /*
654 : * ROLLBACK PREPARED callback
655 : */
656 : static void
657 18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
658 : ReorderBufferTXN *txn,
659 : XLogRecPtr prepare_end_lsn,
660 : TimestampTz prepare_time)
661 : {
662 18 : OutputPluginUpdateProgress(ctx, false);
663 :
664 18 : OutputPluginPrepareWrite(ctx, true);
665 18 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
666 : prepare_time);
667 18 : OutputPluginWrite(ctx, true);
668 18 : }
669 :
670 : /*
671 : * Write the current schema of the relation and its ancestor (if any) if not
672 : * done yet.
673 : */
674 : static void
675 364042 : maybe_send_schema(LogicalDecodingContext *ctx,
676 : ReorderBufferChange *change,
677 : Relation relation, RelationSyncEntry *relentry)
678 : {
679 364042 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
680 : bool schema_sent;
681 364042 : TransactionId xid = InvalidTransactionId;
682 364042 : TransactionId topxid = InvalidTransactionId;
683 :
684 : /*
685 : * Remember XID of the (sub)transaction for the change. We don't care if
686 : * it's top-level transaction or not (we have already sent that XID in
687 : * start of the current streaming block).
688 : *
689 : * If we're not in a streaming block, just use InvalidTransactionId and
690 : * the write methods will not include it.
691 : */
692 364042 : if (data->in_streaming)
693 351882 : xid = change->txn->xid;
694 :
695 364042 : if (rbtxn_is_subtxn(change->txn))
696 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
697 : else
698 343704 : topxid = xid;
699 :
700 : /*
701 : * Do we need to send the schema? We do track streamed transactions
702 : * separately, because those may be applied later (and the regular
703 : * transactions won't see their effects until then) and in an order that
704 : * we don't know at this point.
705 : *
706 : * XXX There is a scope of optimization here. Currently, we always send
707 : * the schema first time in a streaming transaction but we can probably
708 : * avoid that by checking 'relentry->schema_sent' flag. However, before
709 : * doing that we need to study its impact on the case where we have a mix
710 : * of streaming and non-streaming transactions.
711 : */
712 364042 : if (data->in_streaming)
713 351882 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
714 : else
715 12160 : schema_sent = relentry->schema_sent;
716 :
717 : /* Nothing to do if we already sent the schema. */
718 364042 : if (schema_sent)
719 363492 : return;
720 :
721 : /*
722 : * Send the schema. If the changes will be published using an ancestor's
723 : * schema, not the relation's own, send that ancestor's schema before
724 : * sending relation's own (XXX - maybe sending only the former suffices?).
725 : */
726 550 : if (relentry->publish_as_relid != RelationGetRelid(relation))
727 : {
728 56 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
729 :
730 56 : send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
731 52 : RelationClose(ancestor);
732 : }
733 :
734 546 : send_relation_and_attrs(relation, xid, ctx, relentry->columns);
735 :
736 546 : if (data->in_streaming)
737 144 : set_schema_sent_in_streamed_txn(relentry, topxid);
738 : else
739 402 : relentry->schema_sent = true;
740 : }
741 :
742 : /*
743 : * Sends a relation
744 : */
745 : static void
746 602 : send_relation_and_attrs(Relation relation, TransactionId xid,
747 : LogicalDecodingContext *ctx,
748 : Bitmapset *columns)
749 : {
750 602 : TupleDesc desc = RelationGetDescr(relation);
751 : int i;
752 :
753 : /*
754 : * Write out type info if needed. We do that only for user-created types.
755 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
756 : * objects with hand-assigned OIDs to be "built in", not for instance any
757 : * function or type defined in the information_schema. This is important
758 : * because only hand-assigned OIDs can be expected to remain stable across
759 : * major versions.
760 : */
761 1810 : for (i = 0; i < desc->natts; i++)
762 : {
763 1208 : Form_pg_attribute att = TupleDescAttr(desc, i);
764 :
765 1208 : if (att->attisdropped || att->attgenerated)
766 10 : continue;
767 :
768 1198 : if (att->atttypid < FirstGenbkiObjectId)
769 1162 : continue;
770 :
771 : /* Skip this attribute if it's not present in the column list */
772 36 : if (columns != NULL && !bms_is_member(att->attnum, columns))
773 0 : continue;
774 :
775 36 : OutputPluginPrepareWrite(ctx, false);
776 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
777 36 : OutputPluginWrite(ctx, false);
778 : }
779 :
780 602 : OutputPluginPrepareWrite(ctx, false);
781 602 : logicalrep_write_rel(ctx->out, xid, relation, columns);
782 602 : OutputPluginWrite(ctx, false);
783 598 : }
784 :
785 : /*
786 : * Executor state preparation for evaluation of row filter expressions for the
787 : * specified relation.
788 : */
789 : static EState *
790 32 : create_estate_for_relation(Relation rel)
791 : {
792 : EState *estate;
793 : RangeTblEntry *rte;
794 32 : List *perminfos = NIL;
795 :
796 32 : estate = CreateExecutorState();
797 :
798 32 : rte = makeNode(RangeTblEntry);
799 32 : rte->rtekind = RTE_RELATION;
800 32 : rte->relid = RelationGetRelid(rel);
801 32 : rte->relkind = rel->rd_rel->relkind;
802 32 : rte->rellockmode = AccessShareLock;
803 :
804 32 : addRTEPermissionInfo(&perminfos, rte);
805 :
806 32 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
807 :
808 32 : estate->es_output_cid = GetCurrentCommandId(false);
809 :
810 32 : return estate;
811 : }
812 :
813 : /*
814 : * Evaluates row filter.
815 : *
816 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
817 : * isn't replicated.
818 : */
819 : static bool
820 72 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
821 : {
822 : Datum ret;
823 : bool isnull;
824 :
825 : Assert(state != NULL);
826 :
827 72 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
828 :
829 72 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
830 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
831 : isnull ? "true" : "false");
832 :
833 72 : if (isnull)
834 2 : return false;
835 :
836 70 : return DatumGetBool(ret);
837 : }
838 :
839 : /*
840 : * Make sure the per-entry memory context exists.
841 : */
842 : static void
843 106 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
844 : {
845 : Relation relation;
846 :
847 : /* The context may already exist, in which case bail out. */
848 106 : if (entry->entry_cxt)
849 4 : return;
850 :
851 102 : relation = RelationIdGetRelation(entry->publish_as_relid);
852 :
853 102 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
854 : "entry private context",
855 : ALLOCSET_SMALL_SIZES);
856 :
857 102 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
858 : RelationGetRelationName(relation));
859 : }
860 :
861 : /*
862 : * Initialize the row filter.
863 : */
864 : static void
865 470 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
866 : RelationSyncEntry *entry)
867 : {
868 : ListCell *lc;
869 470 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
870 470 : bool no_filter[] = {false, false, false}; /* One per pubaction */
871 : MemoryContext oldctx;
872 : int idx;
873 470 : bool has_filter = true;
874 470 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
875 :
876 : /*
877 : * Find if there are any row filters for this relation. If there are, then
878 : * prepare the necessary ExprState and cache it in entry->exprstate. To
879 : * build an expression state, we need to ensure the following:
880 : *
881 : * All the given publication-table mappings must be checked.
882 : *
883 : * Multiple publications might have multiple row filters for this
884 : * relation. Since row filter usage depends on the DML operation, there
885 : * are multiple lists (one for each operation) to which row filters will
886 : * be appended.
887 : *
888 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
889 : * expression" so it takes precedence.
890 : */
891 510 : foreach(lc, publications)
892 : {
893 478 : Publication *pub = lfirst(lc);
894 478 : HeapTuple rftuple = NULL;
895 478 : Datum rfdatum = 0;
896 478 : bool pub_no_filter = true;
897 :
898 : /*
899 : * If the publication is FOR ALL TABLES, or the publication includes a
900 : * FOR TABLES IN SCHEMA where the table belongs to the referred
901 : * schema, then it is treated the same as if there are no row filters
902 : * (even if other publications have a row filter).
903 : */
904 478 : if (!pub->alltables &&
905 350 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
906 : ObjectIdGetDatum(schemaid),
907 : ObjectIdGetDatum(pub->oid)))
908 : {
909 : /*
910 : * Check for the presence of a row filter in this publication.
911 : */
912 338 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
913 : ObjectIdGetDatum(entry->publish_as_relid),
914 : ObjectIdGetDatum(pub->oid));
915 :
916 338 : if (HeapTupleIsValid(rftuple))
917 : {
918 : /* Null indicates no filter. */
919 314 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
920 : Anum_pg_publication_rel_prqual,
921 : &pub_no_filter);
922 : }
923 : }
924 :
925 478 : if (pub_no_filter)
926 : {
927 452 : if (rftuple)
928 288 : ReleaseSysCache(rftuple);
929 :
930 452 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
931 452 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
932 452 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
933 :
934 : /*
935 : * Quick exit if all the DML actions are publicized via this
936 : * publication.
937 : */
938 452 : if (no_filter[PUBACTION_INSERT] &&
939 452 : no_filter[PUBACTION_UPDATE] &&
940 438 : no_filter[PUBACTION_DELETE])
941 : {
942 438 : has_filter = false;
943 438 : break;
944 : }
945 :
946 : /* No additional work for this publication. Next one. */
947 14 : continue;
948 : }
949 :
950 : /* Form the per pubaction row filter lists. */
951 26 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
952 26 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
953 26 : TextDatumGetCString(rfdatum));
954 26 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
955 26 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
956 26 : TextDatumGetCString(rfdatum));
957 26 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
958 26 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
959 26 : TextDatumGetCString(rfdatum));
960 :
961 26 : ReleaseSysCache(rftuple);
962 : } /* loop all subscribed publications */
963 :
964 : /* Clean the row filter */
965 1880 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
966 : {
967 1410 : if (no_filter[idx])
968 : {
969 1332 : list_free_deep(rfnodes[idx]);
970 1332 : rfnodes[idx] = NIL;
971 : }
972 : }
973 :
974 470 : if (has_filter)
975 : {
976 32 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
977 :
978 32 : pgoutput_ensure_entry_cxt(data, entry);
979 :
980 : /*
981 : * Now all the filters for all pubactions are known. Combine them when
982 : * their pubactions are the same.
983 : */
984 32 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
985 32 : entry->estate = create_estate_for_relation(relation);
986 128 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
987 : {
988 96 : List *filters = NIL;
989 : Expr *rfnode;
990 :
991 96 : if (rfnodes[idx] == NIL)
992 42 : continue;
993 :
994 114 : foreach(lc, rfnodes[idx])
995 60 : filters = lappend(filters, stringToNode((char *) lfirst(lc)));
996 :
997 : /* combine the row filter and cache the ExprState */
998 54 : rfnode = make_orclause(filters);
999 54 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1000 : } /* for each pubaction */
1001 32 : MemoryContextSwitchTo(oldctx);
1002 :
1003 32 : RelationClose(relation);
1004 : }
1005 470 : }
1006 :
1007 : /*
1008 : * Initialize the column list.
1009 : */
1010 : static void
1011 470 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1012 : RelationSyncEntry *entry)
1013 : {
1014 : ListCell *lc;
1015 470 : bool first = true;
1016 470 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1017 :
1018 : /*
1019 : * Find if there are any column lists for this relation. If there are,
1020 : * build a bitmap using the column lists.
1021 : *
1022 : * Multiple publications might have multiple column lists for this
1023 : * relation.
1024 : *
1025 : * Note that we don't support the case where the column list is different
1026 : * for the same table when combining publications. See comments atop
1027 : * fetch_table_list. But one can later change the publication so we still
1028 : * need to check all the given publication-table mappings and report an
1029 : * error if any publications have a different column list.
1030 : *
1031 : * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1032 : */
1033 960 : foreach(lc, publications)
1034 : {
1035 492 : Publication *pub = lfirst(lc);
1036 492 : HeapTuple cftuple = NULL;
1037 492 : Datum cfdatum = 0;
1038 492 : Bitmapset *cols = NULL;
1039 :
1040 : /*
1041 : * If the publication is FOR ALL TABLES then it is treated the same as
1042 : * if there are no column lists (even if other publications have a
1043 : * list).
1044 : */
1045 492 : if (!pub->alltables)
1046 : {
1047 362 : bool pub_no_list = true;
1048 :
1049 : /*
1050 : * Check for the presence of a column list in this publication.
1051 : *
1052 : * Note: If we find no pg_publication_rel row, it's a publication
1053 : * defined for a whole schema, so it can't have a column list,
1054 : * just like a FOR ALL TABLES publication.
1055 : */
1056 362 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
1057 : ObjectIdGetDatum(entry->publish_as_relid),
1058 : ObjectIdGetDatum(pub->oid));
1059 :
1060 362 : if (HeapTupleIsValid(cftuple))
1061 : {
1062 : /* Lookup the column list attribute. */
1063 324 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1064 : Anum_pg_publication_rel_prattrs,
1065 : &pub_no_list);
1066 :
1067 : /* Build the column list bitmap in the per-entry context. */
1068 324 : if (!pub_no_list) /* when not null */
1069 : {
1070 : int i;
1071 74 : int nliveatts = 0;
1072 74 : TupleDesc desc = RelationGetDescr(relation);
1073 :
1074 74 : pgoutput_ensure_entry_cxt(data, entry);
1075 :
1076 74 : cols = pub_collist_to_bitmapset(cols, cfdatum,
1077 : entry->entry_cxt);
1078 :
1079 : /* Get the number of live attributes. */
1080 310 : for (i = 0; i < desc->natts; i++)
1081 : {
1082 236 : Form_pg_attribute att = TupleDescAttr(desc, i);
1083 :
1084 236 : if (att->attisdropped || att->attgenerated)
1085 4 : continue;
1086 :
1087 232 : nliveatts++;
1088 : }
1089 :
1090 : /*
1091 : * If column list includes all the columns of the table,
1092 : * set it to NULL.
1093 : */
1094 74 : if (bms_num_members(cols) == nliveatts)
1095 : {
1096 14 : bms_free(cols);
1097 14 : cols = NULL;
1098 : }
1099 : }
1100 :
1101 324 : ReleaseSysCache(cftuple);
1102 : }
1103 : }
1104 :
1105 492 : if (first)
1106 : {
1107 470 : entry->columns = cols;
1108 470 : first = false;
1109 : }
1110 22 : else if (!bms_equal(entry->columns, cols))
1111 2 : ereport(ERROR,
1112 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1113 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1114 : get_namespace_name(RelationGetNamespace(relation)),
1115 : RelationGetRelationName(relation)));
1116 : } /* loop all subscribed publications */
1117 :
1118 468 : RelationClose(relation);
1119 468 : }
1120 :
1121 : /*
1122 : * Initialize the slot for storing new and old tuples, and build the map that
1123 : * will be used to convert the relation's tuples into the ancestor's format.
1124 : */
1125 : static void
1126 470 : init_tuple_slot(PGOutputData *data, Relation relation,
1127 : RelationSyncEntry *entry)
1128 : {
1129 : MemoryContext oldctx;
1130 : TupleDesc oldtupdesc;
1131 : TupleDesc newtupdesc;
1132 :
1133 470 : oldctx = MemoryContextSwitchTo(data->cachectx);
1134 :
1135 : /*
1136 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1137 : * live as long as the cache remains.
1138 : */
1139 470 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1140 470 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1141 :
1142 470 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1143 470 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1144 :
1145 470 : MemoryContextSwitchTo(oldctx);
1146 :
1147 : /*
1148 : * Cache the map that will be used to convert the relation's tuples into
1149 : * the ancestor's format, if needed.
1150 : */
1151 470 : if (entry->publish_as_relid != RelationGetRelid(relation))
1152 : {
1153 60 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1154 60 : TupleDesc indesc = RelationGetDescr(relation);
1155 60 : TupleDesc outdesc = RelationGetDescr(ancestor);
1156 :
1157 : /* Map must live as long as the session does. */
1158 60 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1159 :
1160 60 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1161 :
1162 60 : MemoryContextSwitchTo(oldctx);
1163 60 : RelationClose(ancestor);
1164 : }
1165 470 : }
1166 :
1167 : /*
1168 : * Change is checked against the row filter if any.
1169 : *
1170 : * Returns true if the change is to be replicated, else false.
1171 : *
1172 : * For inserts, evaluate the row filter for new tuple.
1173 : * For deletes, evaluate the row filter for old tuple.
1174 : * For updates, evaluate the row filter for old and new tuple.
1175 : *
1176 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1177 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1178 : * only one of the tuples matches the row filter expression, we transform
1179 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1180 : * following rules:
1181 : *
1182 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1183 : * Case 2: old-row (no match) new row (match) -> INSERT
1184 : * Case 3: old-row (match) new-row (no match) -> DELETE
1185 : * Case 4: old-row (match) new row (match) -> UPDATE
1186 : *
1187 : * The new action is updated in the action parameter.
1188 : *
1189 : * The new slot could be updated when transforming the UPDATE into INSERT,
1190 : * because the original new tuple might not have column values from the replica
1191 : * identity.
1192 : *
1193 : * Examples:
1194 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1195 : * Since the old tuple satisfies, the initial table synchronization copied this
1196 : * row (or another method was used to guarantee that there is data
1197 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1198 : * row filter, so from a data consistency perspective, that row should be
1199 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1200 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1201 : * is undesirable because it doesn't reflect what was defined in the row filter
1202 : * expression on the publisher. This row on the subscriber would likely not be
1203 : * modified by replication again. If someone inserted a new row with the same
1204 : * old identifier, replication could stop due to a constraint violation.
1205 : *
1206 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1207 : * Since the old tuple doesn't satisfy, the initial table synchronization
1208 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1209 : * satisfy the row filter, so from a data consistency perspective, that row
1210 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1211 : * statements have no effect (it matches no row -- see
1212 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1213 : * INSERT statement and be sent to the subscriber. However, this might surprise
1214 : * someone who expects the data set to satisfy the row filter expression on the
1215 : * provider.
1216 : */
1217 : static bool
1218 364042 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1219 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1220 : ReorderBufferChangeType *action)
1221 : {
1222 : TupleDesc desc;
1223 : int i;
1224 : bool old_matched,
1225 : new_matched,
1226 : result;
1227 : TupleTableSlot *tmp_new_slot;
1228 364042 : TupleTableSlot *new_slot = *new_slot_ptr;
1229 : ExprContext *ecxt;
1230 : ExprState *filter_exprstate;
1231 :
1232 : /*
1233 : * We need this map to avoid relying on ReorderBufferChangeType enums
1234 : * having specific values.
1235 : */
1236 : static const int map_changetype_pubaction[] = {
1237 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1238 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1239 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1240 : };
1241 :
1242 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1243 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1244 : *action == REORDER_BUFFER_CHANGE_DELETE);
1245 :
1246 : Assert(new_slot || old_slot);
1247 :
1248 : /* Get the corresponding row filter */
1249 364042 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1250 :
1251 : /* Bail out if there is no row filter */
1252 364042 : if (!filter_exprstate)
1253 363978 : return true;
1254 :
1255 64 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1256 : get_namespace_name(RelationGetNamespace(relation)),
1257 : RelationGetRelationName(relation));
1258 :
1259 64 : ResetPerTupleExprContext(entry->estate);
1260 :
1261 64 : ecxt = GetPerTupleExprContext(entry->estate);
1262 :
1263 : /*
1264 : * For the following occasions where there is only one tuple, we can
1265 : * evaluate the row filter for that tuple and return.
1266 : *
1267 : * For inserts, we only have the new tuple.
1268 : *
1269 : * For updates, we can have only a new tuple when none of the replica
1270 : * identity columns changed and none of those columns have external data
1271 : * but we still need to evaluate the row filter for the new tuple as the
1272 : * existing values of those columns might not match the filter. Also,
1273 : * users can use constant expressions in the row filter, so we anyway need
1274 : * to evaluate it for the new tuple.
1275 : *
1276 : * For deletes, we only have the old tuple.
1277 : */
1278 64 : if (!new_slot || !old_slot)
1279 : {
1280 56 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1281 56 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1282 :
1283 56 : return result;
1284 : }
1285 :
1286 : /*
1287 : * Both the old and new tuples must be valid only for updates and need to
1288 : * be checked against the row filter.
1289 : */
1290 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1291 :
1292 8 : slot_getallattrs(new_slot);
1293 8 : slot_getallattrs(old_slot);
1294 :
1295 8 : tmp_new_slot = NULL;
1296 8 : desc = RelationGetDescr(relation);
1297 :
1298 : /*
1299 : * The new tuple might not have all the replica identity columns, in which
1300 : * case it needs to be copied over from the old tuple.
1301 : */
1302 24 : for (i = 0; i < desc->natts; i++)
1303 : {
1304 16 : Form_pg_attribute att = TupleDescAttr(desc, i);
1305 :
1306 : /*
1307 : * if the column in the new tuple or old tuple is null, nothing to do
1308 : */
1309 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1310 2 : continue;
1311 :
1312 : /*
1313 : * Unchanged toasted replica identity columns are only logged in the
1314 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1315 : * Logged) toast values are always assembled in memory and set as
1316 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1317 : */
1318 14 : if (att->attlen == -1 &&
1319 8 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1320 2 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1321 : {
1322 2 : if (!tmp_new_slot)
1323 : {
1324 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1325 2 : ExecClearTuple(tmp_new_slot);
1326 :
1327 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1328 2 : desc->natts * sizeof(Datum));
1329 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1330 2 : desc->natts * sizeof(bool));
1331 : }
1332 :
1333 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1334 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1335 : }
1336 : }
1337 :
1338 8 : ecxt->ecxt_scantuple = old_slot;
1339 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1340 :
1341 8 : if (tmp_new_slot)
1342 : {
1343 2 : ExecStoreVirtualTuple(tmp_new_slot);
1344 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1345 : }
1346 : else
1347 6 : ecxt->ecxt_scantuple = new_slot;
1348 :
1349 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1350 :
1351 : /*
1352 : * Case 1: if both tuples don't match the row filter, bailout. Send
1353 : * nothing.
1354 : */
1355 8 : if (!old_matched && !new_matched)
1356 0 : return false;
1357 :
1358 : /*
1359 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1360 : * tuple does, transform the UPDATE into INSERT.
1361 : *
1362 : * Use the newly transformed tuple that must contain the column values for
1363 : * all the replica identity columns. This is required to ensure that the
1364 : * while inserting the tuple in the downstream node, we have all the
1365 : * required column values.
1366 : */
1367 8 : if (!old_matched && new_matched)
1368 : {
1369 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1370 :
1371 4 : if (tmp_new_slot)
1372 2 : *new_slot_ptr = tmp_new_slot;
1373 : }
1374 :
1375 : /*
1376 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1377 : * doesn't, transform the UPDATE into DELETE.
1378 : *
1379 : * This transformation does not require another tuple. The Old tuple will
1380 : * be used for DELETE.
1381 : */
1382 4 : else if (old_matched && !new_matched)
1383 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1384 :
1385 : /*
1386 : * Case 4: if both tuples match the row filter, transformation isn't
1387 : * required. (*action is default UPDATE).
1388 : */
1389 :
1390 8 : return true;
1391 : }
1392 :
1393 : /*
1394 : * Sends the decoded DML over wire.
1395 : *
1396 : * This is called both in streaming and non-streaming modes.
1397 : */
1398 : static void
1399 366292 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1400 : Relation relation, ReorderBufferChange *change)
1401 : {
1402 366292 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1403 366292 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1404 : MemoryContext old;
1405 : RelationSyncEntry *relentry;
1406 366292 : TransactionId xid = InvalidTransactionId;
1407 366292 : Relation ancestor = NULL;
1408 366292 : Relation targetrel = relation;
1409 366292 : ReorderBufferChangeType action = change->action;
1410 366292 : TupleTableSlot *old_slot = NULL;
1411 366292 : TupleTableSlot *new_slot = NULL;
1412 :
1413 366292 : if (!is_publishable_relation(relation))
1414 2244 : return;
1415 :
1416 : /*
1417 : * Remember the xid for the change in streaming mode. We need to send xid
1418 : * with each change in the streaming mode so that subscriber can make
1419 : * their association and on aborts, it can discard the corresponding
1420 : * changes.
1421 : */
1422 366288 : if (data->in_streaming)
1423 351882 : xid = change->txn->xid;
1424 :
1425 366288 : relentry = get_rel_sync_entry(data, relation);
1426 :
1427 : /* First check the table filter */
1428 366282 : switch (action)
1429 : {
1430 211570 : case REORDER_BUFFER_CHANGE_INSERT:
1431 211570 : if (!relentry->pubactions.pubinsert)
1432 86 : return;
1433 211484 : break;
1434 68926 : case REORDER_BUFFER_CHANGE_UPDATE:
1435 68926 : if (!relentry->pubactions.pubupdate)
1436 86 : return;
1437 68840 : break;
1438 85786 : case REORDER_BUFFER_CHANGE_DELETE:
1439 85786 : if (!relentry->pubactions.pubdelete)
1440 2068 : return;
1441 :
1442 : /*
1443 : * This is only possible if deletes are allowed even when replica
1444 : * identity is not defined for a table. Since the DELETE action
1445 : * can't be published, we simply return.
1446 : */
1447 83718 : if (!change->data.tp.oldtuple)
1448 : {
1449 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1450 0 : return;
1451 : }
1452 83718 : break;
1453 364042 : default:
1454 : Assert(false);
1455 : }
1456 :
1457 : /* Avoid leaking memory by using and resetting our own context */
1458 364042 : old = MemoryContextSwitchTo(data->context);
1459 :
1460 : /* Switch relation if publishing via root. */
1461 364042 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1462 : {
1463 : Assert(relation->rd_rel->relispartition);
1464 102 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1465 102 : targetrel = ancestor;
1466 : }
1467 :
1468 364042 : if (change->data.tp.oldtuple)
1469 : {
1470 83944 : old_slot = relentry->old_slot;
1471 83944 : ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
1472 :
1473 : /* Convert tuple if needed. */
1474 83944 : if (relentry->attrmap)
1475 : {
1476 0 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1477 : &TTSOpsVirtual);
1478 :
1479 0 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1480 : }
1481 : }
1482 :
1483 364042 : if (change->data.tp.newtuple)
1484 : {
1485 280324 : new_slot = relentry->new_slot;
1486 280324 : ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
1487 :
1488 : /* Convert tuple if needed. */
1489 280324 : if (relentry->attrmap)
1490 : {
1491 18 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1492 : &TTSOpsVirtual);
1493 :
1494 18 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1495 : }
1496 : }
1497 :
1498 : /*
1499 : * Check row filter.
1500 : *
1501 : * Updates could be transformed to inserts or deletes based on the results
1502 : * of the row filter for old and new tuple.
1503 : */
1504 364042 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1505 22 : goto cleanup;
1506 :
1507 : /*
1508 : * Send BEGIN if we haven't yet.
1509 : *
1510 : * We send the BEGIN message after ensuring that we will actually send the
1511 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1512 : * transactions.
1513 : */
1514 364020 : if (txndata && !txndata->sent_begin_txn)
1515 676 : pgoutput_send_begin(ctx, txn);
1516 :
1517 : /*
1518 : * Schema should be sent using the original relation because it also sends
1519 : * the ancestor's relation.
1520 : */
1521 364018 : maybe_send_schema(ctx, change, relation, relentry);
1522 :
1523 364014 : OutputPluginPrepareWrite(ctx, true);
1524 :
1525 : /* Send the data */
1526 364014 : switch (action)
1527 : {
1528 211460 : case REORDER_BUFFER_CHANGE_INSERT:
1529 211460 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1530 211460 : data->binary, relentry->columns);
1531 211460 : break;
1532 68834 : case REORDER_BUFFER_CHANGE_UPDATE:
1533 68834 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1534 68834 : new_slot, data->binary, relentry->columns);
1535 68834 : break;
1536 83720 : case REORDER_BUFFER_CHANGE_DELETE:
1537 83720 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1538 83720 : data->binary, relentry->columns);
1539 83720 : break;
1540 364014 : default:
1541 : Assert(false);
1542 : }
1543 :
1544 364014 : OutputPluginWrite(ctx, true);
1545 :
1546 364036 : cleanup:
1547 364036 : if (RelationIsValid(ancestor))
1548 : {
1549 96 : RelationClose(ancestor);
1550 96 : ancestor = NULL;
1551 : }
1552 :
1553 364036 : MemoryContextSwitchTo(old);
1554 364036 : MemoryContextReset(data->context);
1555 : }
1556 :
1557 : static void
1558 22 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1559 : int nrelations, Relation relations[], ReorderBufferChange *change)
1560 : {
1561 22 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1562 22 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1563 : MemoryContext old;
1564 : RelationSyncEntry *relentry;
1565 : int i;
1566 : int nrelids;
1567 : Oid *relids;
1568 22 : TransactionId xid = InvalidTransactionId;
1569 :
1570 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1571 22 : if (data->in_streaming)
1572 0 : xid = change->txn->xid;
1573 :
1574 22 : old = MemoryContextSwitchTo(data->context);
1575 :
1576 22 : relids = palloc0(nrelations * sizeof(Oid));
1577 22 : nrelids = 0;
1578 :
1579 62 : for (i = 0; i < nrelations; i++)
1580 : {
1581 40 : Relation relation = relations[i];
1582 40 : Oid relid = RelationGetRelid(relation);
1583 :
1584 40 : if (!is_publishable_relation(relation))
1585 0 : continue;
1586 :
1587 40 : relentry = get_rel_sync_entry(data, relation);
1588 :
1589 40 : if (!relentry->pubactions.pubtruncate)
1590 16 : continue;
1591 :
1592 : /*
1593 : * Don't send partitions if the publication wants to send only the
1594 : * root tables through it.
1595 : */
1596 24 : if (relation->rd_rel->relispartition &&
1597 20 : relentry->publish_as_relid != relid)
1598 0 : continue;
1599 :
1600 24 : relids[nrelids++] = relid;
1601 :
1602 : /* Send BEGIN if we haven't yet */
1603 24 : if (txndata && !txndata->sent_begin_txn)
1604 14 : pgoutput_send_begin(ctx, txn);
1605 :
1606 24 : maybe_send_schema(ctx, change, relation, relentry);
1607 : }
1608 :
1609 22 : if (nrelids > 0)
1610 : {
1611 14 : OutputPluginPrepareWrite(ctx, true);
1612 14 : logicalrep_write_truncate(ctx->out,
1613 : xid,
1614 : nrelids,
1615 : relids,
1616 14 : change->data.truncate.cascade,
1617 14 : change->data.truncate.restart_seqs);
1618 14 : OutputPluginWrite(ctx, true);
1619 : }
1620 :
1621 22 : MemoryContextSwitchTo(old);
1622 22 : MemoryContextReset(data->context);
1623 22 : }
1624 :
1625 : static void
1626 12 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1627 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1628 : const char *message)
1629 : {
1630 12 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1631 12 : TransactionId xid = InvalidTransactionId;
1632 :
1633 12 : if (!data->messages)
1634 2 : return;
1635 :
1636 : /*
1637 : * Remember the xid for the message in streaming mode. See
1638 : * pgoutput_change.
1639 : */
1640 10 : if (data->in_streaming)
1641 0 : xid = txn->xid;
1642 :
1643 : /*
1644 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1645 : */
1646 10 : if (transactional)
1647 : {
1648 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1649 :
1650 : /* Send BEGIN if we haven't yet */
1651 4 : if (txndata && !txndata->sent_begin_txn)
1652 4 : pgoutput_send_begin(ctx, txn);
1653 : }
1654 :
1655 10 : OutputPluginPrepareWrite(ctx, true);
1656 10 : logicalrep_write_message(ctx->out,
1657 : xid,
1658 : message_lsn,
1659 : transactional,
1660 : prefix,
1661 : sz,
1662 : message);
1663 10 : OutputPluginWrite(ctx, true);
1664 : }
1665 :
1666 : /*
1667 : * Return true if the data is associated with an origin and the user has
1668 : * requested the changes that don't have an origin, false otherwise.
1669 : */
1670 : static bool
1671 981838 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1672 : RepOriginId origin_id)
1673 : {
1674 981838 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1675 :
1676 981838 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1677 96 : return true;
1678 :
1679 981742 : return false;
1680 : }
1681 :
1682 : /*
1683 : * Shutdown the output plugin.
1684 : *
1685 : * Note, we don't need to clean the data->context and data->cachectx as
1686 : * they are child contexts of the ctx->context so they will be cleaned up by
1687 : * logical decoding machinery.
1688 : */
1689 : static void
1690 822 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1691 : {
1692 822 : if (RelationSyncCache)
1693 : {
1694 328 : hash_destroy(RelationSyncCache);
1695 328 : RelationSyncCache = NULL;
1696 : }
1697 822 : }
1698 :
1699 : /*
1700 : * Load publications from the list of publication names.
1701 : */
1702 : static List *
1703 276 : LoadPublications(List *pubnames)
1704 : {
1705 276 : List *result = NIL;
1706 : ListCell *lc;
1707 :
1708 634 : foreach(lc, pubnames)
1709 : {
1710 362 : char *pubname = (char *) lfirst(lc);
1711 362 : Publication *pub = GetPublicationByName(pubname, false);
1712 :
1713 358 : result = lappend(result, pub);
1714 : }
1715 :
1716 272 : return result;
1717 : }
1718 :
1719 : /*
1720 : * Publication syscache invalidation callback.
1721 : *
1722 : * Called for invalidations on pg_publication.
1723 : */
1724 : static void
1725 480 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1726 : {
1727 480 : publications_valid = false;
1728 :
1729 : /*
1730 : * Also invalidate per-relation cache so that next time the filtering info
1731 : * is checked it will be updated with the new publication settings.
1732 : */
1733 480 : rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1734 480 : }
1735 :
1736 : /*
1737 : * START STREAM callback
1738 : */
1739 : static void
1740 1294 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1741 : ReorderBufferTXN *txn)
1742 : {
1743 1294 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1744 1294 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1745 :
1746 : /* we can't nest streaming of transactions */
1747 : Assert(!data->in_streaming);
1748 :
1749 : /*
1750 : * If we already sent the first stream for this transaction then don't
1751 : * send the origin id in the subsequent streams.
1752 : */
1753 1294 : if (rbtxn_is_streamed(txn))
1754 1168 : send_replication_origin = false;
1755 :
1756 1294 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1757 1294 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1758 :
1759 1294 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1760 : send_replication_origin);
1761 :
1762 1294 : OutputPluginWrite(ctx, true);
1763 :
1764 : /* we're streaming a chunk of transaction now */
1765 1294 : data->in_streaming = true;
1766 1294 : }
1767 :
1768 : /*
1769 : * STOP STREAM callback
1770 : */
1771 : static void
1772 1294 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1773 : ReorderBufferTXN *txn)
1774 : {
1775 1294 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1776 :
1777 : /* we should be streaming a transaction */
1778 : Assert(data->in_streaming);
1779 :
1780 1294 : OutputPluginPrepareWrite(ctx, true);
1781 1294 : logicalrep_write_stream_stop(ctx->out);
1782 1294 : OutputPluginWrite(ctx, true);
1783 :
1784 : /* we've stopped streaming a transaction */
1785 1294 : data->in_streaming = false;
1786 1294 : }
1787 :
1788 : /*
1789 : * Notify downstream to discard the streamed transaction (along with all
1790 : * it's subtransactions, if it's a toplevel transaction).
1791 : */
1792 : static void
1793 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1794 : ReorderBufferTXN *txn,
1795 : XLogRecPtr abort_lsn)
1796 : {
1797 : ReorderBufferTXN *toptxn;
1798 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1799 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1800 :
1801 : /*
1802 : * The abort should happen outside streaming block, even for streamed
1803 : * transactions. The transaction has to be marked as streamed, though.
1804 : */
1805 : Assert(!data->in_streaming);
1806 :
1807 : /* determine the toplevel transaction */
1808 52 : toptxn = rbtxn_get_toptxn(txn);
1809 :
1810 : Assert(rbtxn_is_streamed(toptxn));
1811 :
1812 52 : OutputPluginPrepareWrite(ctx, true);
1813 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1814 : txn->xact_time.abort_time, write_abort_info);
1815 :
1816 52 : OutputPluginWrite(ctx, true);
1817 :
1818 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1819 52 : }
1820 :
1821 : /*
1822 : * Notify downstream to apply the streamed transaction (along with all
1823 : * it's subtransactions).
1824 : */
1825 : static void
1826 92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1827 : ReorderBufferTXN *txn,
1828 : XLogRecPtr commit_lsn)
1829 : {
1830 92 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1831 :
1832 : /*
1833 : * The commit should happen outside streaming block, even for streamed
1834 : * transactions. The transaction has to be marked as streamed, though.
1835 : */
1836 : Assert(!data->in_streaming);
1837 : Assert(rbtxn_is_streamed(txn));
1838 :
1839 92 : OutputPluginUpdateProgress(ctx, false);
1840 :
1841 92 : OutputPluginPrepareWrite(ctx, true);
1842 92 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1843 92 : OutputPluginWrite(ctx, true);
1844 :
1845 92 : cleanup_rel_sync_cache(txn->xid, true);
1846 92 : }
1847 :
1848 : /*
1849 : * PREPARE callback (for streaming two-phase commit).
1850 : *
1851 : * Notify the downstream to prepare the transaction.
1852 : */
1853 : static void
1854 28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1855 : ReorderBufferTXN *txn,
1856 : XLogRecPtr prepare_lsn)
1857 : {
1858 : Assert(rbtxn_is_streamed(txn));
1859 :
1860 28 : OutputPluginUpdateProgress(ctx, false);
1861 28 : OutputPluginPrepareWrite(ctx, true);
1862 28 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1863 28 : OutputPluginWrite(ctx, true);
1864 28 : }
1865 :
1866 : /*
1867 : * Initialize the relation schema sync cache for a decoding session.
1868 : *
1869 : * The hash table is destroyed at the end of a decoding session. While
1870 : * relcache invalidations still exist and will still be invoked, they
1871 : * will just see the null hash table global and take no action.
1872 : */
1873 : static void
1874 608 : init_rel_sync_cache(MemoryContext cachectx)
1875 : {
1876 : HASHCTL ctl;
1877 : static bool relation_callbacks_registered = false;
1878 :
1879 : /* Nothing to do if hash table already exists */
1880 608 : if (RelationSyncCache != NULL)
1881 2 : return;
1882 :
1883 : /* Make a new hash table for the cache */
1884 608 : ctl.keysize = sizeof(Oid);
1885 608 : ctl.entrysize = sizeof(RelationSyncEntry);
1886 608 : ctl.hcxt = cachectx;
1887 :
1888 608 : RelationSyncCache = hash_create("logical replication output relation cache",
1889 : 128, &ctl,
1890 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1891 :
1892 : Assert(RelationSyncCache != NULL);
1893 :
1894 : /* No more to do if we already registered callbacks */
1895 608 : if (relation_callbacks_registered)
1896 2 : return;
1897 :
1898 : /* We must update the cache entry for a relation after a relcache flush */
1899 606 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1900 :
1901 : /*
1902 : * Flush all cache entries after a pg_namespace change, in case it was a
1903 : * schema rename affecting a relation being replicated.
1904 : */
1905 606 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1906 : rel_sync_cache_publication_cb,
1907 : (Datum) 0);
1908 :
1909 : /*
1910 : * Flush all cache entries after any publication changes. (We need no
1911 : * callback entry for pg_publication, because publication_invalidation_cb
1912 : * will take care of it.)
1913 : */
1914 606 : CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1915 : rel_sync_cache_publication_cb,
1916 : (Datum) 0);
1917 606 : CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1918 : rel_sync_cache_publication_cb,
1919 : (Datum) 0);
1920 :
1921 606 : relation_callbacks_registered = true;
1922 : }
1923 :
1924 : /*
1925 : * We expect relatively small number of streamed transactions.
1926 : */
1927 : static bool
1928 351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1929 : {
1930 351882 : return list_member_xid(entry->streamed_txns, xid);
1931 : }
1932 :
1933 : /*
1934 : * Add the xid in the rel sync entry for which we have already sent the schema
1935 : * of the relation.
1936 : */
1937 : static void
1938 144 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1939 : {
1940 : MemoryContext oldctx;
1941 :
1942 144 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1943 :
1944 144 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1945 :
1946 144 : MemoryContextSwitchTo(oldctx);
1947 144 : }
1948 :
1949 : /*
1950 : * Find or create entry in the relation schema cache.
1951 : *
1952 : * This looks up publications that the given relation is directly or
1953 : * indirectly part of (the latter if it's really the relation's ancestor that
1954 : * is part of a publication) and fills up the found entry with the information
1955 : * about which operations to publish and whether to use an ancestor's schema
1956 : * when publishing.
1957 : */
1958 : static RelationSyncEntry *
1959 366328 : get_rel_sync_entry(PGOutputData *data, Relation relation)
1960 : {
1961 : RelationSyncEntry *entry;
1962 : bool found;
1963 : MemoryContext oldctx;
1964 366328 : Oid relid = RelationGetRelid(relation);
1965 :
1966 : Assert(RelationSyncCache != NULL);
1967 :
1968 : /* Find cached relation info, creating if not found */
1969 366328 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
1970 : &relid,
1971 : HASH_ENTER, &found);
1972 : Assert(entry != NULL);
1973 :
1974 : /* initialize entry, if it's new */
1975 366328 : if (!found)
1976 : {
1977 440 : entry->replicate_valid = false;
1978 440 : entry->schema_sent = false;
1979 440 : entry->streamed_txns = NIL;
1980 440 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1981 440 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1982 440 : entry->new_slot = NULL;
1983 440 : entry->old_slot = NULL;
1984 440 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
1985 440 : entry->entry_cxt = NULL;
1986 440 : entry->publish_as_relid = InvalidOid;
1987 440 : entry->columns = NULL;
1988 440 : entry->attrmap = NULL;
1989 : }
1990 :
1991 : /* Validate the entry */
1992 366328 : if (!entry->replicate_valid)
1993 : {
1994 550 : Oid schemaId = get_rel_namespace(relid);
1995 550 : List *pubids = GetRelationPublications(relid);
1996 :
1997 : /*
1998 : * We don't acquire a lock on the namespace system table as we build
1999 : * the cache entry using a historic snapshot and all the later changes
2000 : * are absorbed while decoding WAL.
2001 : */
2002 550 : List *schemaPubids = GetSchemaPublications(schemaId);
2003 : ListCell *lc;
2004 550 : Oid publish_as_relid = relid;
2005 550 : int publish_ancestor_level = 0;
2006 550 : bool am_partition = get_rel_relispartition(relid);
2007 550 : char relkind = get_rel_relkind(relid);
2008 550 : List *rel_publications = NIL;
2009 :
2010 : /* Reload publications if needed before use. */
2011 550 : if (!publications_valid)
2012 : {
2013 276 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2014 276 : if (data->publications)
2015 : {
2016 38 : list_free_deep(data->publications);
2017 38 : data->publications = NIL;
2018 : }
2019 276 : data->publications = LoadPublications(data->publication_names);
2020 272 : MemoryContextSwitchTo(oldctx);
2021 272 : publications_valid = true;
2022 : }
2023 :
2024 : /*
2025 : * Reset schema_sent status as the relation definition may have
2026 : * changed. Also reset pubactions to empty in case rel was dropped
2027 : * from a publication. Also free any objects that depended on the
2028 : * earlier definition.
2029 : */
2030 546 : entry->schema_sent = false;
2031 546 : list_free(entry->streamed_txns);
2032 546 : entry->streamed_txns = NIL;
2033 546 : bms_free(entry->columns);
2034 546 : entry->columns = NULL;
2035 546 : entry->pubactions.pubinsert = false;
2036 546 : entry->pubactions.pubupdate = false;
2037 546 : entry->pubactions.pubdelete = false;
2038 546 : entry->pubactions.pubtruncate = false;
2039 :
2040 : /*
2041 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2042 : */
2043 546 : if (entry->old_slot)
2044 98 : ExecDropSingleTupleTableSlot(entry->old_slot);
2045 546 : if (entry->new_slot)
2046 98 : ExecDropSingleTupleTableSlot(entry->new_slot);
2047 :
2048 546 : entry->old_slot = NULL;
2049 546 : entry->new_slot = NULL;
2050 :
2051 546 : if (entry->attrmap)
2052 0 : free_attrmap(entry->attrmap);
2053 546 : entry->attrmap = NULL;
2054 :
2055 : /*
2056 : * Row filter cache cleanups.
2057 : */
2058 546 : if (entry->entry_cxt)
2059 20 : MemoryContextDelete(entry->entry_cxt);
2060 :
2061 546 : entry->entry_cxt = NULL;
2062 546 : entry->estate = NULL;
2063 546 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2064 :
2065 : /*
2066 : * Build publication cache. We can't use one provided by relcache as
2067 : * relcache considers all publications that the given relation is in,
2068 : * but here we only need to consider ones that the subscriber
2069 : * requested.
2070 : */
2071 1388 : foreach(lc, data->publications)
2072 : {
2073 842 : Publication *pub = lfirst(lc);
2074 842 : bool publish = false;
2075 :
2076 : /*
2077 : * Under what relid should we publish changes in this publication?
2078 : * We'll use the top-most relid across all publications. Also
2079 : * track the ancestor level for this publication.
2080 : */
2081 842 : Oid pub_relid = relid;
2082 842 : int ancestor_level = 0;
2083 :
2084 : /*
2085 : * If this is a FOR ALL TABLES publication, pick the partition
2086 : * root and set the ancestor level accordingly.
2087 : */
2088 842 : if (pub->alltables)
2089 : {
2090 132 : publish = true;
2091 132 : if (pub->pubviaroot && am_partition)
2092 : {
2093 28 : List *ancestors = get_partition_ancestors(relid);
2094 :
2095 28 : pub_relid = llast_oid(ancestors);
2096 28 : ancestor_level = list_length(ancestors);
2097 : }
2098 : }
2099 :
2100 842 : if (!publish)
2101 : {
2102 710 : bool ancestor_published = false;
2103 :
2104 : /*
2105 : * For a partition, check if any of the ancestors are
2106 : * published. If so, note down the topmost ancestor that is
2107 : * published via this publication, which will be used as the
2108 : * relation via which to publish the partition's changes.
2109 : */
2110 710 : if (am_partition)
2111 : {
2112 : Oid ancestor;
2113 : int level;
2114 198 : List *ancestors = get_partition_ancestors(relid);
2115 :
2116 198 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2117 : ancestors,
2118 : &level);
2119 :
2120 198 : if (ancestor != InvalidOid)
2121 : {
2122 80 : ancestor_published = true;
2123 80 : if (pub->pubviaroot)
2124 : {
2125 34 : pub_relid = ancestor;
2126 34 : ancestor_level = level;
2127 : }
2128 : }
2129 : }
2130 :
2131 1110 : if (list_member_oid(pubids, pub->oid) ||
2132 788 : list_member_oid(schemaPubids, pub->oid) ||
2133 : ancestor_published)
2134 372 : publish = true;
2135 : }
2136 :
2137 : /*
2138 : * If the relation is to be published, determine actions to
2139 : * publish, and list of columns, if appropriate.
2140 : *
2141 : * Don't publish changes for partitioned tables, because
2142 : * publishing those of its partitions suffices, unless partition
2143 : * changes won't be published due to pubviaroot being set.
2144 : */
2145 842 : if (publish &&
2146 6 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2147 : {
2148 498 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2149 498 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2150 498 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2151 498 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2152 :
2153 : /*
2154 : * We want to publish the changes as the top-most ancestor
2155 : * across all publications. So we need to check if the already
2156 : * calculated level is higher than the new one. If yes, we can
2157 : * ignore the new value (as it's a child). Otherwise the new
2158 : * value is an ancestor, so we keep it.
2159 : */
2160 498 : if (publish_ancestor_level > ancestor_level)
2161 2 : continue;
2162 :
2163 : /*
2164 : * If we found an ancestor higher up in the tree, discard the
2165 : * list of publications through which we replicate it, and use
2166 : * the new ancestor.
2167 : */
2168 496 : if (publish_ancestor_level < ancestor_level)
2169 : {
2170 62 : publish_as_relid = pub_relid;
2171 62 : publish_ancestor_level = ancestor_level;
2172 :
2173 : /* reset the publication list for this relation */
2174 62 : rel_publications = NIL;
2175 : }
2176 : else
2177 : {
2178 : /* Same ancestor level, has to be the same OID. */
2179 : Assert(publish_as_relid == pub_relid);
2180 : }
2181 :
2182 : /* Track publications for this ancestor. */
2183 496 : rel_publications = lappend(rel_publications, pub);
2184 : }
2185 : }
2186 :
2187 546 : entry->publish_as_relid = publish_as_relid;
2188 :
2189 : /*
2190 : * Initialize the tuple slot, map, and row filter. These are only used
2191 : * when publishing inserts, updates, or deletes.
2192 : */
2193 546 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2194 76 : entry->pubactions.pubdelete)
2195 : {
2196 : /* Initialize the tuple slot and map */
2197 470 : init_tuple_slot(data, relation, entry);
2198 :
2199 : /* Initialize the row filter */
2200 470 : pgoutput_row_filter_init(data, rel_publications, entry);
2201 :
2202 : /* Initialize the column list */
2203 470 : pgoutput_column_list_init(data, rel_publications, entry);
2204 : }
2205 :
2206 544 : list_free(pubids);
2207 544 : list_free(schemaPubids);
2208 544 : list_free(rel_publications);
2209 :
2210 544 : entry->replicate_valid = true;
2211 : }
2212 :
2213 366322 : return entry;
2214 : }
2215 :
2216 : /*
2217 : * Cleanup list of streamed transactions and update the schema_sent flag.
2218 : *
2219 : * When a streamed transaction commits or aborts, we need to remove the
2220 : * toplevel XID from the schema cache. If the transaction aborted, the
2221 : * subscriber will simply throw away the schema records we streamed, so
2222 : * we don't need to do anything else.
2223 : *
2224 : * If the transaction is committed, the subscriber will update the relation
2225 : * cache - so tweak the schema_sent flag accordingly.
2226 : */
2227 : static void
2228 144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2229 : {
2230 : HASH_SEQ_STATUS hash_seq;
2231 : RelationSyncEntry *entry;
2232 : ListCell *lc;
2233 :
2234 : Assert(RelationSyncCache != NULL);
2235 :
2236 144 : hash_seq_init(&hash_seq, RelationSyncCache);
2237 294 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2238 : {
2239 : /*
2240 : * We can set the schema_sent flag for an entry that has committed xid
2241 : * in the list as that ensures that the subscriber would have the
2242 : * corresponding schema and we don't need to send it unless there is
2243 : * any invalidation for that relation.
2244 : */
2245 190 : foreach(lc, entry->streamed_txns)
2246 : {
2247 148 : if (xid == lfirst_xid(lc))
2248 : {
2249 108 : if (is_commit)
2250 86 : entry->schema_sent = true;
2251 :
2252 108 : entry->streamed_txns =
2253 108 : foreach_delete_current(entry->streamed_txns, lc);
2254 108 : break;
2255 : }
2256 : }
2257 : }
2258 144 : }
2259 :
2260 : /*
2261 : * Relcache invalidation callback
2262 : */
2263 : static void
2264 6588 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2265 : {
2266 : RelationSyncEntry *entry;
2267 :
2268 : /*
2269 : * We can get here if the plugin was used in SQL interface as the
2270 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2271 : * no way to unregister the relcache invalidation callback.
2272 : */
2273 6588 : if (RelationSyncCache == NULL)
2274 18 : return;
2275 :
2276 : /*
2277 : * Nobody keeps pointers to entries in this hash table around outside
2278 : * logical decoding callback calls - but invalidation events can come in
2279 : * *during* a callback if we do any syscache access in the callback.
2280 : * Because of that we must mark the cache entry as invalid but not damage
2281 : * any of its substructure here. The next get_rel_sync_entry() call will
2282 : * rebuild it all.
2283 : */
2284 6570 : if (OidIsValid(relid))
2285 : {
2286 : /*
2287 : * Getting invalidations for relations that aren't in the table is
2288 : * entirely normal. So we don't care if it's found or not.
2289 : */
2290 6518 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2291 : HASH_FIND, NULL);
2292 6518 : if (entry != NULL)
2293 916 : entry->replicate_valid = false;
2294 : }
2295 : else
2296 : {
2297 : /* Whole cache must be flushed. */
2298 : HASH_SEQ_STATUS status;
2299 :
2300 52 : hash_seq_init(&status, RelationSyncCache);
2301 112 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2302 : {
2303 60 : entry->replicate_valid = false;
2304 : }
2305 : }
2306 : }
2307 :
2308 : /*
2309 : * Publication relation/schema map syscache invalidation callback
2310 : *
2311 : * Called for invalidations on pg_publication, pg_publication_rel,
2312 : * pg_publication_namespace, and pg_namespace.
2313 : */
2314 : static void
2315 1360 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2316 : {
2317 : HASH_SEQ_STATUS status;
2318 : RelationSyncEntry *entry;
2319 :
2320 : /*
2321 : * We can get here if the plugin was used in SQL interface as the
2322 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2323 : * no way to unregister the invalidation callbacks.
2324 : */
2325 1360 : if (RelationSyncCache == NULL)
2326 68 : return;
2327 :
2328 : /*
2329 : * We have no easy way to identify which cache entries this invalidation
2330 : * event might have affected, so just mark them all invalid.
2331 : */
2332 1292 : hash_seq_init(&status, RelationSyncCache);
2333 3758 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2334 : {
2335 2466 : entry->replicate_valid = false;
2336 : }
2337 : }
2338 :
2339 : /* Send Replication origin */
2340 : static void
2341 2024 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2342 : XLogRecPtr origin_lsn, bool send_origin)
2343 : {
2344 2024 : if (send_origin)
2345 : {
2346 : char *origin;
2347 :
2348 : /*----------
2349 : * XXX: which behaviour do we want here?
2350 : *
2351 : * Alternatives:
2352 : * - don't send origin message if origin name not found
2353 : * (that's what we do now)
2354 : * - throw error - that will break replication, not good
2355 : * - send some special "unknown" origin
2356 : *----------
2357 : */
2358 16 : if (replorigin_by_oid(origin_id, true, &origin))
2359 : {
2360 : /* Message boundary */
2361 16 : OutputPluginWrite(ctx, false);
2362 16 : OutputPluginPrepareWrite(ctx, true);
2363 :
2364 16 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2365 : }
2366 : }
2367 2024 : }
|