Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "catalog/pg_publication_rel.h"
19 : #include "catalog/pg_subscription.h"
20 : #include "commands/defrem.h"
21 : #include "commands/subscriptioncmds.h"
22 : #include "executor/executor.h"
23 : #include "fmgr.h"
24 : #include "nodes/makefuncs.h"
25 : #include "parser/parse_relation.h"
26 : #include "replication/logical.h"
27 : #include "replication/logicalproto.h"
28 : #include "replication/origin.h"
29 : #include "replication/pgoutput.h"
30 : #include "utils/builtins.h"
31 : #include "utils/inval.h"
32 : #include "utils/lsyscache.h"
33 : #include "utils/memutils.h"
34 : #include "utils/rel.h"
35 : #include "utils/syscache.h"
36 : #include "utils/varlena.h"
37 :
38 862 : PG_MODULE_MAGIC;
39 :
40 : static void pgoutput_startup(LogicalDecodingContext *ctx,
41 : OutputPluginOptions *opt, bool is_init);
42 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
43 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
44 : ReorderBufferTXN *txn);
45 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
46 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
47 : static void pgoutput_change(LogicalDecodingContext *ctx,
48 : ReorderBufferTXN *txn, Relation relation,
49 : ReorderBufferChange *change);
50 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
51 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
52 : ReorderBufferChange *change);
53 : static void pgoutput_message(LogicalDecodingContext *ctx,
54 : ReorderBufferTXN *txn, XLogRecPtr message_lsn,
55 : bool transactional, const char *prefix,
56 : Size sz, const char *message);
57 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
58 : RepOriginId origin_id);
59 : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
60 : ReorderBufferTXN *txn);
61 : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
62 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
63 : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
64 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
65 : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn,
67 : XLogRecPtr prepare_end_lsn,
68 : TimestampTz prepare_time);
69 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
70 : ReorderBufferTXN *txn);
71 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
72 : ReorderBufferTXN *txn);
73 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
74 : ReorderBufferTXN *txn,
75 : XLogRecPtr abort_lsn);
76 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
77 : ReorderBufferTXN *txn,
78 : XLogRecPtr commit_lsn);
79 : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
80 : ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
81 :
82 : static bool publications_valid;
83 :
84 : static List *LoadPublications(List *pubnames);
85 : static void publication_invalidation_cb(Datum arg, int cacheid,
86 : uint32 hashvalue);
87 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
88 : LogicalDecodingContext *ctx,
89 : Bitmapset *columns);
90 : static void send_repl_origin(LogicalDecodingContext *ctx,
91 : RepOriginId origin_id, XLogRecPtr origin_lsn,
92 : bool send_origin);
93 :
94 : /*
95 : * Only 3 publication actions are used for row filtering ("insert", "update",
96 : * "delete"). See RelationSyncEntry.exprstate[].
97 : */
98 : enum RowFilterPubAction
99 : {
100 : PUBACTION_INSERT,
101 : PUBACTION_UPDATE,
102 : PUBACTION_DELETE,
103 : };
104 :
105 : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
106 :
107 : /*
108 : * Entry in the map used to remember which relation schemas we sent.
109 : *
110 : * The schema_sent flag determines if the current schema record for the
111 : * relation (and for its ancestor if publish_as_relid is set) was already
112 : * sent to the subscriber (in which case we don't need to send it again).
113 : *
114 : * The schema cache on downstream is however updated only at commit time,
115 : * and with streamed transactions the commit order may be different from
116 : * the order the transactions are sent in. Also, the (sub) transactions
117 : * might get aborted so we need to send the schema for each (sub) transaction
118 : * so that we don't lose the schema information on abort. For handling this,
119 : * we maintain the list of xids (streamed_txns) for those we have already sent
120 : * the schema.
121 : *
122 : * For partitions, 'pubactions' considers not only the table's own
123 : * publications, but also those of all of its ancestors.
124 : */
125 : typedef struct RelationSyncEntry
126 : {
127 : Oid relid; /* relation oid */
128 :
129 : bool replicate_valid; /* overall validity flag for entry */
130 :
131 : bool schema_sent;
132 : List *streamed_txns; /* streamed toplevel transactions with this
133 : * schema */
134 :
135 : /* are we publishing this rel? */
136 : PublicationActions pubactions;
137 :
138 : /*
139 : * ExprState array for row filter. Different publication actions don't
140 : * allow multiple expressions to always be combined into one, because
141 : * updates or deletes restrict the column in expression to be part of the
142 : * replica identity index whereas inserts do not have this restriction, so
143 : * there is one ExprState per publication action.
144 : */
145 : ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
146 : EState *estate; /* executor state used for row filter */
147 : TupleTableSlot *new_slot; /* slot for storing new tuple */
148 : TupleTableSlot *old_slot; /* slot for storing old tuple */
149 :
150 : /*
151 : * OID of the relation to publish changes as. For a partition, this may
152 : * be set to one of its ancestors whose schema will be used when
153 : * replicating changes, if publish_via_partition_root is set for the
154 : * publication.
155 : */
156 : Oid publish_as_relid;
157 :
158 : /*
159 : * Map used when replicating using an ancestor's schema to convert tuples
160 : * from partition's type to the ancestor's; NULL if publish_as_relid is
161 : * same as 'relid' or if unnecessary due to partition and the ancestor
162 : * having identical TupleDesc.
163 : */
164 : AttrMap *attrmap;
165 :
166 : /*
167 : * Columns included in the publication, or NULL if all columns are
168 : * included implicitly. Note that the attnums in this bitmap are not
169 : * shifted by FirstLowInvalidHeapAttributeNumber.
170 : */
171 : Bitmapset *columns;
172 :
173 : /*
174 : * Private context to store additional data for this entry - state for the
175 : * row filter expressions, column list, etc.
176 : */
177 : MemoryContext entry_cxt;
178 : } RelationSyncEntry;
179 :
180 : /*
181 : * Maintain a per-transaction level variable to track whether the transaction
182 : * has sent BEGIN. BEGIN is only sent when the first change in a transaction
183 : * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
184 : * messages for empty transactions which saves network bandwidth.
185 : *
186 : * This optimization is not used for prepared transactions because if the
187 : * WALSender restarts after prepare of a transaction and before commit prepared
188 : * of the same transaction then we won't be able to figure out if we have
189 : * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
190 : * because we would have lost the in-memory txndata information that was
191 : * present prior to the restart. This will result in sending a spurious
192 : * COMMIT PREPARED without a corresponding prepared transaction at the
193 : * downstream which would lead to an error when it tries to process it.
194 : *
195 : * XXX We could achieve this optimization by changing protocol to send
196 : * additional information so that downstream can detect that the corresponding
197 : * prepare has not been sent. However, adding such a check for every
198 : * transaction in the downstream could be costly so we might want to do it
199 : * optionally.
200 : *
201 : * We also don't have this optimization for streamed transactions because
202 : * they can contain prepared transactions.
203 : */
204 : typedef struct PGOutputTxnData
205 : {
206 : bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
207 : } PGOutputTxnData;
208 :
209 : /* Map used to remember which relation schemas we sent. */
210 : static HTAB *RelationSyncCache = NULL;
211 :
212 : static void init_rel_sync_cache(MemoryContext cachectx);
213 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
214 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
215 : Relation relation);
216 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
217 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
218 : uint32 hashvalue);
219 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
220 : TransactionId xid);
221 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
222 : TransactionId xid);
223 : static void init_tuple_slot(PGOutputData *data, Relation relation,
224 : RelationSyncEntry *entry);
225 :
226 : /* row filter routines */
227 : static EState *create_estate_for_relation(Relation rel);
228 : static void pgoutput_row_filter_init(PGOutputData *data,
229 : List *publications,
230 : RelationSyncEntry *entry);
231 : static bool pgoutput_row_filter_exec_expr(ExprState *state,
232 : ExprContext *econtext);
233 : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
234 : TupleTableSlot **new_slot_ptr,
235 : RelationSyncEntry *entry,
236 : ReorderBufferChangeType *action);
237 :
238 : /* column list routines */
239 : static void pgoutput_column_list_init(PGOutputData *data,
240 : List *publications,
241 : RelationSyncEntry *entry);
242 :
243 : /*
244 : * Specify output plugin callbacks
245 : */
246 : void
247 1190 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
248 : {
249 1190 : cb->startup_cb = pgoutput_startup;
250 1190 : cb->begin_cb = pgoutput_begin_txn;
251 1190 : cb->change_cb = pgoutput_change;
252 1190 : cb->truncate_cb = pgoutput_truncate;
253 1190 : cb->message_cb = pgoutput_message;
254 1190 : cb->commit_cb = pgoutput_commit_txn;
255 :
256 1190 : cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
257 1190 : cb->prepare_cb = pgoutput_prepare_txn;
258 1190 : cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
259 1190 : cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
260 1190 : cb->filter_by_origin_cb = pgoutput_origin_filter;
261 1190 : cb->shutdown_cb = pgoutput_shutdown;
262 :
263 : /* transaction streaming */
264 1190 : cb->stream_start_cb = pgoutput_stream_start;
265 1190 : cb->stream_stop_cb = pgoutput_stream_stop;
266 1190 : cb->stream_abort_cb = pgoutput_stream_abort;
267 1190 : cb->stream_commit_cb = pgoutput_stream_commit;
268 1190 : cb->stream_change_cb = pgoutput_change;
269 1190 : cb->stream_message_cb = pgoutput_message;
270 1190 : cb->stream_truncate_cb = pgoutput_truncate;
271 : /* transaction streaming - two-phase commit */
272 1190 : cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
273 1190 : }
274 :
275 : static void
276 640 : parse_output_parameters(List *options, PGOutputData *data)
277 : {
278 : ListCell *lc;
279 640 : bool protocol_version_given = false;
280 640 : bool publication_names_given = false;
281 640 : bool binary_option_given = false;
282 640 : bool messages_option_given = false;
283 640 : bool streaming_given = false;
284 640 : bool two_phase_option_given = false;
285 640 : bool origin_option_given = false;
286 :
287 640 : data->binary = false;
288 640 : data->streaming = LOGICALREP_STREAM_OFF;
289 640 : data->messages = false;
290 640 : data->two_phase = false;
291 :
292 2656 : foreach(lc, options)
293 : {
294 2016 : DefElem *defel = (DefElem *) lfirst(lc);
295 :
296 : Assert(defel->arg == NULL || IsA(defel->arg, String));
297 :
298 : /* Check each param, whether or not we recognize it */
299 2016 : if (strcmp(defel->defname, "proto_version") == 0)
300 : {
301 : unsigned long parsed;
302 : char *endptr;
303 :
304 640 : if (protocol_version_given)
305 0 : ereport(ERROR,
306 : (errcode(ERRCODE_SYNTAX_ERROR),
307 : errmsg("conflicting or redundant options")));
308 640 : protocol_version_given = true;
309 :
310 640 : errno = 0;
311 640 : parsed = strtoul(strVal(defel->arg), &endptr, 10);
312 640 : if (errno != 0 || *endptr != '\0')
313 0 : ereport(ERROR,
314 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
315 : errmsg("invalid proto_version")));
316 :
317 640 : if (parsed > PG_UINT32_MAX)
318 0 : ereport(ERROR,
319 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
320 : errmsg("proto_version \"%s\" out of range",
321 : strVal(defel->arg))));
322 :
323 640 : data->protocol_version = (uint32) parsed;
324 : }
325 1376 : else if (strcmp(defel->defname, "publication_names") == 0)
326 : {
327 640 : if (publication_names_given)
328 0 : ereport(ERROR,
329 : (errcode(ERRCODE_SYNTAX_ERROR),
330 : errmsg("conflicting or redundant options")));
331 640 : publication_names_given = true;
332 :
333 640 : if (!SplitIdentifierString(strVal(defel->arg), ',',
334 : &data->publication_names))
335 0 : ereport(ERROR,
336 : (errcode(ERRCODE_INVALID_NAME),
337 : errmsg("invalid publication_names syntax")));
338 : }
339 736 : else if (strcmp(defel->defname, "binary") == 0)
340 : {
341 20 : if (binary_option_given)
342 0 : ereport(ERROR,
343 : (errcode(ERRCODE_SYNTAX_ERROR),
344 : errmsg("conflicting or redundant options")));
345 20 : binary_option_given = true;
346 :
347 20 : data->binary = defGetBoolean(defel);
348 : }
349 716 : else if (strcmp(defel->defname, "messages") == 0)
350 : {
351 8 : if (messages_option_given)
352 0 : ereport(ERROR,
353 : (errcode(ERRCODE_SYNTAX_ERROR),
354 : errmsg("conflicting or redundant options")));
355 8 : messages_option_given = true;
356 :
357 8 : data->messages = defGetBoolean(defel);
358 : }
359 708 : else if (strcmp(defel->defname, "streaming") == 0)
360 : {
361 70 : if (streaming_given)
362 0 : ereport(ERROR,
363 : (errcode(ERRCODE_SYNTAX_ERROR),
364 : errmsg("conflicting or redundant options")));
365 70 : streaming_given = true;
366 :
367 70 : data->streaming = defGetStreamingMode(defel);
368 : }
369 638 : else if (strcmp(defel->defname, "two_phase") == 0)
370 : {
371 12 : if (two_phase_option_given)
372 0 : ereport(ERROR,
373 : (errcode(ERRCODE_SYNTAX_ERROR),
374 : errmsg("conflicting or redundant options")));
375 12 : two_phase_option_given = true;
376 :
377 12 : data->two_phase = defGetBoolean(defel);
378 : }
379 626 : else if (strcmp(defel->defname, "origin") == 0)
380 : {
381 : char *origin;
382 :
383 626 : if (origin_option_given)
384 0 : ereport(ERROR,
385 : errcode(ERRCODE_SYNTAX_ERROR),
386 : errmsg("conflicting or redundant options"));
387 626 : origin_option_given = true;
388 :
389 626 : origin = defGetString(defel);
390 626 : if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
391 20 : data->publish_no_origin = true;
392 606 : else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
393 606 : data->publish_no_origin = false;
394 : else
395 0 : ereport(ERROR,
396 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
397 : errmsg("unrecognized origin value: \"%s\"", origin));
398 : }
399 : else
400 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
401 : }
402 :
403 : /* Check required options */
404 640 : if (!protocol_version_given)
405 0 : ereport(ERROR,
406 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
407 : errmsg("proto_version option missing"));
408 640 : if (!publication_names_given)
409 0 : ereport(ERROR,
410 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
411 : errmsg("publication_names option missing"));
412 640 : }
413 :
414 : /*
415 : * Initialize this plugin
416 : */
417 : static void
418 1190 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
419 : bool is_init)
420 : {
421 1190 : PGOutputData *data = palloc0(sizeof(PGOutputData));
422 : static bool publication_callback_registered = false;
423 :
424 : /* Create our memory context for private allocations. */
425 1190 : data->context = AllocSetContextCreate(ctx->context,
426 : "logical replication output context",
427 : ALLOCSET_DEFAULT_SIZES);
428 :
429 1190 : data->cachectx = AllocSetContextCreate(ctx->context,
430 : "logical replication cache context",
431 : ALLOCSET_DEFAULT_SIZES);
432 :
433 1190 : ctx->output_plugin_private = data;
434 :
435 : /* This plugin uses binary protocol. */
436 1190 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
437 :
438 : /*
439 : * This is replication start and not slot initialization.
440 : *
441 : * Parse and validate options passed by the client.
442 : */
443 1190 : if (!is_init)
444 : {
445 : /* Parse the params and ERROR if we see any we don't recognize */
446 640 : parse_output_parameters(ctx->output_plugin_options, data);
447 :
448 : /* Check if we support requested protocol */
449 640 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
450 0 : ereport(ERROR,
451 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
452 : errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
453 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
454 :
455 640 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
456 0 : ereport(ERROR,
457 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458 : errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
459 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
460 :
461 : /*
462 : * Decide whether to enable streaming. It is disabled by default, in
463 : * which case we just update the flag in decoding context. Otherwise
464 : * we only allow it with sufficient version of the protocol, and when
465 : * the output plugin supports it.
466 : */
467 640 : if (data->streaming == LOGICALREP_STREAM_OFF)
468 570 : ctx->streaming = false;
469 70 : else if (data->streaming == LOGICALREP_STREAM_ON &&
470 54 : data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
471 0 : ereport(ERROR,
472 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
473 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
474 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
475 70 : else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
476 16 : data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
477 0 : ereport(ERROR,
478 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
479 : errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
480 : data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
481 70 : else if (!ctx->streaming)
482 0 : ereport(ERROR,
483 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
484 : errmsg("streaming requested, but not supported by output plugin")));
485 :
486 : /*
487 : * Here, we just check whether the two-phase option is passed by
488 : * plugin and decide whether to enable it at later point of time. It
489 : * remains enabled if the previous start-up has done so. But we only
490 : * allow the option to be passed in with sufficient version of the
491 : * protocol, and when the output plugin supports it.
492 : */
493 640 : if (!data->two_phase)
494 628 : ctx->twophase_opt_given = false;
495 12 : else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
496 0 : ereport(ERROR,
497 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 : errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
499 : data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
500 12 : else if (!ctx->twophase)
501 0 : ereport(ERROR,
502 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
503 : errmsg("two-phase commit requested, but not supported by output plugin")));
504 : else
505 12 : ctx->twophase_opt_given = true;
506 :
507 : /* Init publication state. */
508 640 : data->publications = NIL;
509 640 : publications_valid = false;
510 :
511 : /*
512 : * Register callback for pg_publication if we didn't already do that
513 : * during some previous call in this process.
514 : */
515 640 : if (!publication_callback_registered)
516 : {
517 638 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
518 : publication_invalidation_cb,
519 : (Datum) 0);
520 638 : publication_callback_registered = true;
521 : }
522 :
523 : /* Initialize relation schema cache. */
524 640 : init_rel_sync_cache(CacheMemoryContext);
525 : }
526 : else
527 : {
528 : /*
529 : * Disable the streaming and prepared transactions during the slot
530 : * initialization mode.
531 : */
532 550 : ctx->streaming = false;
533 550 : ctx->twophase = false;
534 : }
535 1190 : }
536 :
537 : /*
538 : * BEGIN callback.
539 : *
540 : * Don't send the BEGIN message here instead postpone it until the first
541 : * change. In logical replication, a common scenario is to replicate a set of
542 : * tables (instead of all tables) and transactions whose changes were on
543 : * the table(s) that are not published will produce empty transactions. These
544 : * empty transactions will send BEGIN and COMMIT messages to subscribers,
545 : * using bandwidth on something with little/no use for logical replication.
546 : */
547 : static void
548 1262 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
549 : {
550 1262 : PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
551 : sizeof(PGOutputTxnData));
552 :
553 1262 : txn->output_plugin_private = txndata;
554 1262 : }
555 :
556 : /*
557 : * Send BEGIN.
558 : *
559 : * This is called while processing the first change of the transaction.
560 : */
561 : static void
562 708 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
563 : {
564 708 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
565 708 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
566 :
567 : Assert(txndata);
568 : Assert(!txndata->sent_begin_txn);
569 :
570 708 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
571 708 : logicalrep_write_begin(ctx->out, txn);
572 708 : txndata->sent_begin_txn = true;
573 :
574 708 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
575 : send_replication_origin);
576 :
577 708 : OutputPluginWrite(ctx, true);
578 708 : }
579 :
580 : /*
581 : * COMMIT callback
582 : */
583 : static void
584 1252 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
585 : XLogRecPtr commit_lsn)
586 : {
587 1252 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
588 : bool sent_begin_txn;
589 :
590 : Assert(txndata);
591 :
592 : /*
593 : * We don't need to send the commit message unless some relevant change
594 : * from this transaction has been sent to the downstream.
595 : */
596 1252 : sent_begin_txn = txndata->sent_begin_txn;
597 1252 : OutputPluginUpdateProgress(ctx, !sent_begin_txn);
598 1252 : pfree(txndata);
599 1252 : txn->output_plugin_private = NULL;
600 :
601 1252 : if (!sent_begin_txn)
602 : {
603 548 : elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
604 548 : return;
605 : }
606 :
607 704 : OutputPluginPrepareWrite(ctx, true);
608 704 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
609 704 : OutputPluginWrite(ctx, true);
610 : }
611 :
612 : /*
613 : * BEGIN PREPARE callback
614 : */
615 : static void
616 36 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
617 : {
618 36 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
619 :
620 36 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
621 36 : logicalrep_write_begin_prepare(ctx->out, txn);
622 :
623 36 : send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
624 : send_replication_origin);
625 :
626 36 : OutputPluginWrite(ctx, true);
627 36 : }
628 :
629 : /*
630 : * PREPARE callback
631 : */
632 : static void
633 36 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
634 : XLogRecPtr prepare_lsn)
635 : {
636 36 : OutputPluginUpdateProgress(ctx, false);
637 :
638 36 : OutputPluginPrepareWrite(ctx, true);
639 36 : logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
640 36 : OutputPluginWrite(ctx, true);
641 36 : }
642 :
643 : /*
644 : * COMMIT PREPARED callback
645 : */
646 : static void
647 46 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
648 : XLogRecPtr commit_lsn)
649 : {
650 46 : OutputPluginUpdateProgress(ctx, false);
651 :
652 46 : OutputPluginPrepareWrite(ctx, true);
653 46 : logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
654 46 : OutputPluginWrite(ctx, true);
655 46 : }
656 :
657 : /*
658 : * ROLLBACK PREPARED callback
659 : */
660 : static void
661 18 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
662 : ReorderBufferTXN *txn,
663 : XLogRecPtr prepare_end_lsn,
664 : TimestampTz prepare_time)
665 : {
666 18 : OutputPluginUpdateProgress(ctx, false);
667 :
668 18 : OutputPluginPrepareWrite(ctx, true);
669 18 : logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
670 : prepare_time);
671 18 : OutputPluginWrite(ctx, true);
672 18 : }
673 :
674 : /*
675 : * Write the current schema of the relation and its ancestor (if any) if not
676 : * done yet.
677 : */
678 : static void
679 364228 : maybe_send_schema(LogicalDecodingContext *ctx,
680 : ReorderBufferChange *change,
681 : Relation relation, RelationSyncEntry *relentry)
682 : {
683 364228 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
684 : bool schema_sent;
685 364228 : TransactionId xid = InvalidTransactionId;
686 364228 : TransactionId topxid = InvalidTransactionId;
687 :
688 : /*
689 : * Remember XID of the (sub)transaction for the change. We don't care if
690 : * it's top-level transaction or not (we have already sent that XID in
691 : * start of the current streaming block).
692 : *
693 : * If we're not in a streaming block, just use InvalidTransactionId and
694 : * the write methods will not include it.
695 : */
696 364228 : if (data->in_streaming)
697 351882 : xid = change->txn->xid;
698 :
699 364228 : if (rbtxn_is_subtxn(change->txn))
700 20338 : topxid = rbtxn_get_toptxn(change->txn)->xid;
701 : else
702 343890 : topxid = xid;
703 :
704 : /*
705 : * Do we need to send the schema? We do track streamed transactions
706 : * separately, because those may be applied later (and the regular
707 : * transactions won't see their effects until then) and in an order that
708 : * we don't know at this point.
709 : *
710 : * XXX There is a scope of optimization here. Currently, we always send
711 : * the schema first time in a streaming transaction but we can probably
712 : * avoid that by checking 'relentry->schema_sent' flag. However, before
713 : * doing that we need to study its impact on the case where we have a mix
714 : * of streaming and non-streaming transactions.
715 : */
716 364228 : if (data->in_streaming)
717 351882 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
718 : else
719 12346 : schema_sent = relentry->schema_sent;
720 :
721 : /* Nothing to do if we already sent the schema. */
722 364228 : if (schema_sent)
723 363664 : return;
724 :
725 : /*
726 : * Send the schema. If the changes will be published using an ancestor's
727 : * schema, not the relation's own, send that ancestor's schema before
728 : * sending relation's own (XXX - maybe sending only the former suffices?).
729 : */
730 564 : if (relentry->publish_as_relid != RelationGetRelid(relation))
731 : {
732 56 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
733 :
734 56 : send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
735 54 : RelationClose(ancestor);
736 : }
737 :
738 562 : send_relation_and_attrs(relation, xid, ctx, relentry->columns);
739 :
740 562 : if (data->in_streaming)
741 144 : set_schema_sent_in_streamed_txn(relentry, topxid);
742 : else
743 418 : relentry->schema_sent = true;
744 : }
745 :
746 : /*
747 : * Sends a relation
748 : */
749 : static void
750 618 : send_relation_and_attrs(Relation relation, TransactionId xid,
751 : LogicalDecodingContext *ctx,
752 : Bitmapset *columns)
753 : {
754 618 : TupleDesc desc = RelationGetDescr(relation);
755 : int i;
756 :
757 : /*
758 : * Write out type info if needed. We do that only for user-created types.
759 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
760 : * objects with hand-assigned OIDs to be "built in", not for instance any
761 : * function or type defined in the information_schema. This is important
762 : * because only hand-assigned OIDs can be expected to remain stable across
763 : * major versions.
764 : */
765 1842 : for (i = 0; i < desc->natts; i++)
766 : {
767 1224 : Form_pg_attribute att = TupleDescAttr(desc, i);
768 :
769 1224 : if (att->attisdropped || att->attgenerated)
770 10 : continue;
771 :
772 1214 : if (att->atttypid < FirstGenbkiObjectId)
773 1178 : continue;
774 :
775 : /* Skip this attribute if it's not present in the column list */
776 36 : if (columns != NULL && !bms_is_member(att->attnum, columns))
777 0 : continue;
778 :
779 36 : OutputPluginPrepareWrite(ctx, false);
780 36 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
781 36 : OutputPluginWrite(ctx, false);
782 : }
783 :
784 618 : OutputPluginPrepareWrite(ctx, false);
785 618 : logicalrep_write_rel(ctx->out, xid, relation, columns);
786 618 : OutputPluginWrite(ctx, false);
787 616 : }
788 :
789 : /*
790 : * Executor state preparation for evaluation of row filter expressions for the
791 : * specified relation.
792 : */
793 : static EState *
794 32 : create_estate_for_relation(Relation rel)
795 : {
796 : EState *estate;
797 : RangeTblEntry *rte;
798 32 : List *perminfos = NIL;
799 :
800 32 : estate = CreateExecutorState();
801 :
802 32 : rte = makeNode(RangeTblEntry);
803 32 : rte->rtekind = RTE_RELATION;
804 32 : rte->relid = RelationGetRelid(rel);
805 32 : rte->relkind = rel->rd_rel->relkind;
806 32 : rte->rellockmode = AccessShareLock;
807 :
808 32 : addRTEPermissionInfo(&perminfos, rte);
809 :
810 32 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
811 :
812 32 : estate->es_output_cid = GetCurrentCommandId(false);
813 :
814 32 : return estate;
815 : }
816 :
817 : /*
818 : * Evaluates row filter.
819 : *
820 : * If the row filter evaluates to NULL, it is taken as false i.e. the change
821 : * isn't replicated.
822 : */
823 : static bool
824 72 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
825 : {
826 : Datum ret;
827 : bool isnull;
828 :
829 : Assert(state != NULL);
830 :
831 72 : ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
832 :
833 72 : elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
834 : isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
835 : isnull ? "true" : "false");
836 :
837 72 : if (isnull)
838 2 : return false;
839 :
840 70 : return DatumGetBool(ret);
841 : }
842 :
843 : /*
844 : * Make sure the per-entry memory context exists.
845 : */
846 : static void
847 106 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
848 : {
849 : Relation relation;
850 :
851 : /* The context may already exist, in which case bail out. */
852 106 : if (entry->entry_cxt)
853 4 : return;
854 :
855 102 : relation = RelationIdGetRelation(entry->publish_as_relid);
856 :
857 102 : entry->entry_cxt = AllocSetContextCreate(data->cachectx,
858 : "entry private context",
859 : ALLOCSET_SMALL_SIZES);
860 :
861 102 : MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
862 : RelationGetRelationName(relation));
863 : }
864 :
865 : /*
866 : * Initialize the row filter.
867 : */
868 : static void
869 482 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
870 : RelationSyncEntry *entry)
871 : {
872 : ListCell *lc;
873 482 : List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
874 482 : bool no_filter[] = {false, false, false}; /* One per pubaction */
875 : MemoryContext oldctx;
876 : int idx;
877 482 : bool has_filter = true;
878 482 : Oid schemaid = get_rel_namespace(entry->publish_as_relid);
879 :
880 : /*
881 : * Find if there are any row filters for this relation. If there are, then
882 : * prepare the necessary ExprState and cache it in entry->exprstate. To
883 : * build an expression state, we need to ensure the following:
884 : *
885 : * All the given publication-table mappings must be checked.
886 : *
887 : * Multiple publications might have multiple row filters for this
888 : * relation. Since row filter usage depends on the DML operation, there
889 : * are multiple lists (one for each operation) to which row filters will
890 : * be appended.
891 : *
892 : * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
893 : * expression" so it takes precedence.
894 : */
895 522 : foreach(lc, publications)
896 : {
897 490 : Publication *pub = lfirst(lc);
898 490 : HeapTuple rftuple = NULL;
899 490 : Datum rfdatum = 0;
900 490 : bool pub_no_filter = true;
901 :
902 : /*
903 : * If the publication is FOR ALL TABLES, or the publication includes a
904 : * FOR TABLES IN SCHEMA where the table belongs to the referred
905 : * schema, then it is treated the same as if there are no row filters
906 : * (even if other publications have a row filter).
907 : */
908 490 : if (!pub->alltables &&
909 354 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
910 : ObjectIdGetDatum(schemaid),
911 : ObjectIdGetDatum(pub->oid)))
912 : {
913 : /*
914 : * Check for the presence of a row filter in this publication.
915 : */
916 342 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
917 : ObjectIdGetDatum(entry->publish_as_relid),
918 : ObjectIdGetDatum(pub->oid));
919 :
920 342 : if (HeapTupleIsValid(rftuple))
921 : {
922 : /* Null indicates no filter. */
923 318 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
924 : Anum_pg_publication_rel_prqual,
925 : &pub_no_filter);
926 : }
927 : }
928 :
929 490 : if (pub_no_filter)
930 : {
931 464 : if (rftuple)
932 292 : ReleaseSysCache(rftuple);
933 :
934 464 : no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
935 464 : no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
936 464 : no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
937 :
938 : /*
939 : * Quick exit if all the DML actions are publicized via this
940 : * publication.
941 : */
942 464 : if (no_filter[PUBACTION_INSERT] &&
943 464 : no_filter[PUBACTION_UPDATE] &&
944 450 : no_filter[PUBACTION_DELETE])
945 : {
946 450 : has_filter = false;
947 450 : break;
948 : }
949 :
950 : /* No additional work for this publication. Next one. */
951 14 : continue;
952 : }
953 :
954 : /* Form the per pubaction row filter lists. */
955 26 : if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
956 26 : rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
957 26 : TextDatumGetCString(rfdatum));
958 26 : if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
959 26 : rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
960 26 : TextDatumGetCString(rfdatum));
961 26 : if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
962 26 : rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
963 26 : TextDatumGetCString(rfdatum));
964 :
965 26 : ReleaseSysCache(rftuple);
966 : } /* loop all subscribed publications */
967 :
968 : /* Clean the row filter */
969 1928 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
970 : {
971 1446 : if (no_filter[idx])
972 : {
973 1368 : list_free_deep(rfnodes[idx]);
974 1368 : rfnodes[idx] = NIL;
975 : }
976 : }
977 :
978 482 : if (has_filter)
979 : {
980 32 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
981 :
982 32 : pgoutput_ensure_entry_cxt(data, entry);
983 :
984 : /*
985 : * Now all the filters for all pubactions are known. Combine them when
986 : * their pubactions are the same.
987 : */
988 32 : oldctx = MemoryContextSwitchTo(entry->entry_cxt);
989 32 : entry->estate = create_estate_for_relation(relation);
990 128 : for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
991 : {
992 96 : List *filters = NIL;
993 : Expr *rfnode;
994 :
995 96 : if (rfnodes[idx] == NIL)
996 42 : continue;
997 :
998 114 : foreach(lc, rfnodes[idx])
999 60 : filters = lappend(filters, stringToNode((char *) lfirst(lc)));
1000 :
1001 : /* combine the row filter and cache the ExprState */
1002 54 : rfnode = make_orclause(filters);
1003 54 : entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1004 : } /* for each pubaction */
1005 32 : MemoryContextSwitchTo(oldctx);
1006 :
1007 32 : RelationClose(relation);
1008 : }
1009 482 : }
1010 :
1011 : /*
1012 : * Initialize the column list.
1013 : */
1014 : static void
1015 482 : pgoutput_column_list_init(PGOutputData *data, List *publications,
1016 : RelationSyncEntry *entry)
1017 : {
1018 : ListCell *lc;
1019 482 : bool first = true;
1020 482 : Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1021 :
1022 : /*
1023 : * Find if there are any column lists for this relation. If there are,
1024 : * build a bitmap using the column lists.
1025 : *
1026 : * Multiple publications might have multiple column lists for this
1027 : * relation.
1028 : *
1029 : * Note that we don't support the case where the column list is different
1030 : * for the same table when combining publications. See comments atop
1031 : * fetch_table_list. But one can later change the publication so we still
1032 : * need to check all the given publication-table mappings and report an
1033 : * error if any publications have a different column list.
1034 : *
1035 : * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1036 : */
1037 984 : foreach(lc, publications)
1038 : {
1039 504 : Publication *pub = lfirst(lc);
1040 504 : HeapTuple cftuple = NULL;
1041 504 : Datum cfdatum = 0;
1042 504 : Bitmapset *cols = NULL;
1043 :
1044 : /*
1045 : * If the publication is FOR ALL TABLES then it is treated the same as
1046 : * if there are no column lists (even if other publications have a
1047 : * list).
1048 : */
1049 504 : if (!pub->alltables)
1050 : {
1051 366 : bool pub_no_list = true;
1052 :
1053 : /*
1054 : * Check for the presence of a column list in this publication.
1055 : *
1056 : * Note: If we find no pg_publication_rel row, it's a publication
1057 : * defined for a whole schema, so it can't have a column list,
1058 : * just like a FOR ALL TABLES publication.
1059 : */
1060 366 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
1061 : ObjectIdGetDatum(entry->publish_as_relid),
1062 : ObjectIdGetDatum(pub->oid));
1063 :
1064 366 : if (HeapTupleIsValid(cftuple))
1065 : {
1066 : /* Lookup the column list attribute. */
1067 328 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1068 : Anum_pg_publication_rel_prattrs,
1069 : &pub_no_list);
1070 :
1071 : /* Build the column list bitmap in the per-entry context. */
1072 328 : if (!pub_no_list) /* when not null */
1073 : {
1074 : int i;
1075 74 : int nliveatts = 0;
1076 74 : TupleDesc desc = RelationGetDescr(relation);
1077 :
1078 74 : pgoutput_ensure_entry_cxt(data, entry);
1079 :
1080 74 : cols = pub_collist_to_bitmapset(cols, cfdatum,
1081 : entry->entry_cxt);
1082 :
1083 : /* Get the number of live attributes. */
1084 310 : for (i = 0; i < desc->natts; i++)
1085 : {
1086 236 : Form_pg_attribute att = TupleDescAttr(desc, i);
1087 :
1088 236 : if (att->attisdropped || att->attgenerated)
1089 4 : continue;
1090 :
1091 232 : nliveatts++;
1092 : }
1093 :
1094 : /*
1095 : * If column list includes all the columns of the table,
1096 : * set it to NULL.
1097 : */
1098 74 : if (bms_num_members(cols) == nliveatts)
1099 : {
1100 14 : bms_free(cols);
1101 14 : cols = NULL;
1102 : }
1103 : }
1104 :
1105 328 : ReleaseSysCache(cftuple);
1106 : }
1107 : }
1108 :
1109 504 : if (first)
1110 : {
1111 482 : entry->columns = cols;
1112 482 : first = false;
1113 : }
1114 22 : else if (!bms_equal(entry->columns, cols))
1115 2 : ereport(ERROR,
1116 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1117 : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1118 : get_namespace_name(RelationGetNamespace(relation)),
1119 : RelationGetRelationName(relation)));
1120 : } /* loop all subscribed publications */
1121 :
1122 480 : RelationClose(relation);
1123 480 : }
1124 :
1125 : /*
1126 : * Initialize the slot for storing new and old tuples, and build the map that
1127 : * will be used to convert the relation's tuples into the ancestor's format.
1128 : */
1129 : static void
1130 482 : init_tuple_slot(PGOutputData *data, Relation relation,
1131 : RelationSyncEntry *entry)
1132 : {
1133 : MemoryContext oldctx;
1134 : TupleDesc oldtupdesc;
1135 : TupleDesc newtupdesc;
1136 :
1137 482 : oldctx = MemoryContextSwitchTo(data->cachectx);
1138 :
1139 : /*
1140 : * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1141 : * live as long as the cache remains.
1142 : */
1143 482 : oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1144 482 : newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1145 :
1146 482 : entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1147 482 : entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1148 :
1149 482 : MemoryContextSwitchTo(oldctx);
1150 :
1151 : /*
1152 : * Cache the map that will be used to convert the relation's tuples into
1153 : * the ancestor's format, if needed.
1154 : */
1155 482 : if (entry->publish_as_relid != RelationGetRelid(relation))
1156 : {
1157 58 : Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1158 58 : TupleDesc indesc = RelationGetDescr(relation);
1159 58 : TupleDesc outdesc = RelationGetDescr(ancestor);
1160 :
1161 : /* Map must live as long as the session does. */
1162 58 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1163 :
1164 58 : entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1165 :
1166 58 : MemoryContextSwitchTo(oldctx);
1167 58 : RelationClose(ancestor);
1168 : }
1169 482 : }
1170 :
1171 : /*
1172 : * Change is checked against the row filter if any.
1173 : *
1174 : * Returns true if the change is to be replicated, else false.
1175 : *
1176 : * For inserts, evaluate the row filter for new tuple.
1177 : * For deletes, evaluate the row filter for old tuple.
1178 : * For updates, evaluate the row filter for old and new tuple.
1179 : *
1180 : * For updates, if both evaluations are true, we allow sending the UPDATE and
1181 : * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1182 : * only one of the tuples matches the row filter expression, we transform
1183 : * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1184 : * following rules:
1185 : *
1186 : * Case 1: old-row (no match) new-row (no match) -> (drop change)
1187 : * Case 2: old-row (no match) new row (match) -> INSERT
1188 : * Case 3: old-row (match) new-row (no match) -> DELETE
1189 : * Case 4: old-row (match) new row (match) -> UPDATE
1190 : *
1191 : * The new action is updated in the action parameter.
1192 : *
1193 : * The new slot could be updated when transforming the UPDATE into INSERT,
1194 : * because the original new tuple might not have column values from the replica
1195 : * identity.
1196 : *
1197 : * Examples:
1198 : * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1199 : * Since the old tuple satisfies, the initial table synchronization copied this
1200 : * row (or another method was used to guarantee that there is data
1201 : * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1202 : * row filter, so from a data consistency perspective, that row should be
1203 : * removed on the subscriber. The UPDATE should be transformed into a DELETE
1204 : * statement and be sent to the subscriber. Keeping this row on the subscriber
1205 : * is undesirable because it doesn't reflect what was defined in the row filter
1206 : * expression on the publisher. This row on the subscriber would likely not be
1207 : * modified by replication again. If someone inserted a new row with the same
1208 : * old identifier, replication could stop due to a constraint violation.
1209 : *
1210 : * Let's say the old tuple doesn't match the row filter but the new tuple does.
1211 : * Since the old tuple doesn't satisfy, the initial table synchronization
1212 : * probably didn't copy this row. However, after the UPDATE the new tuple does
1213 : * satisfy the row filter, so from a data consistency perspective, that row
1214 : * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1215 : * statements have no effect (it matches no row -- see
1216 : * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1217 : * INSERT statement and be sent to the subscriber. However, this might surprise
1218 : * someone who expects the data set to satisfy the row filter expression on the
1219 : * provider.
1220 : */
1221 : static bool
1222 364224 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1223 : TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1224 : ReorderBufferChangeType *action)
1225 : {
1226 : TupleDesc desc;
1227 : int i;
1228 : bool old_matched,
1229 : new_matched,
1230 : result;
1231 : TupleTableSlot *tmp_new_slot;
1232 364224 : TupleTableSlot *new_slot = *new_slot_ptr;
1233 : ExprContext *ecxt;
1234 : ExprState *filter_exprstate;
1235 :
1236 : /*
1237 : * We need this map to avoid relying on ReorderBufferChangeType enums
1238 : * having specific values.
1239 : */
1240 : static const int map_changetype_pubaction[] = {
1241 : [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1242 : [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1243 : [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1244 : };
1245 :
1246 : Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1247 : *action == REORDER_BUFFER_CHANGE_UPDATE ||
1248 : *action == REORDER_BUFFER_CHANGE_DELETE);
1249 :
1250 : Assert(new_slot || old_slot);
1251 :
1252 : /* Get the corresponding row filter */
1253 364224 : filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1254 :
1255 : /* Bail out if there is no row filter */
1256 364224 : if (!filter_exprstate)
1257 364160 : return true;
1258 :
1259 64 : elog(DEBUG3, "table \"%s.%s\" has row filter",
1260 : get_namespace_name(RelationGetNamespace(relation)),
1261 : RelationGetRelationName(relation));
1262 :
1263 64 : ResetPerTupleExprContext(entry->estate);
1264 :
1265 64 : ecxt = GetPerTupleExprContext(entry->estate);
1266 :
1267 : /*
1268 : * For the following occasions where there is only one tuple, we can
1269 : * evaluate the row filter for that tuple and return.
1270 : *
1271 : * For inserts, we only have the new tuple.
1272 : *
1273 : * For updates, we can have only a new tuple when none of the replica
1274 : * identity columns changed and none of those columns have external data
1275 : * but we still need to evaluate the row filter for the new tuple as the
1276 : * existing values of those columns might not match the filter. Also,
1277 : * users can use constant expressions in the row filter, so we anyway need
1278 : * to evaluate it for the new tuple.
1279 : *
1280 : * For deletes, we only have the old tuple.
1281 : */
1282 64 : if (!new_slot || !old_slot)
1283 : {
1284 56 : ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1285 56 : result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1286 :
1287 56 : return result;
1288 : }
1289 :
1290 : /*
1291 : * Both the old and new tuples must be valid only for updates and need to
1292 : * be checked against the row filter.
1293 : */
1294 : Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1295 :
1296 8 : slot_getallattrs(new_slot);
1297 8 : slot_getallattrs(old_slot);
1298 :
1299 8 : tmp_new_slot = NULL;
1300 8 : desc = RelationGetDescr(relation);
1301 :
1302 : /*
1303 : * The new tuple might not have all the replica identity columns, in which
1304 : * case it needs to be copied over from the old tuple.
1305 : */
1306 24 : for (i = 0; i < desc->natts; i++)
1307 : {
1308 16 : Form_pg_attribute att = TupleDescAttr(desc, i);
1309 :
1310 : /*
1311 : * if the column in the new tuple or old tuple is null, nothing to do
1312 : */
1313 16 : if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1314 2 : continue;
1315 :
1316 : /*
1317 : * Unchanged toasted replica identity columns are only logged in the
1318 : * old tuple. Copy this over to the new tuple. The changed (or WAL
1319 : * Logged) toast values are always assembled in memory and set as
1320 : * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1321 : */
1322 14 : if (att->attlen == -1 &&
1323 8 : VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1324 2 : !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1325 : {
1326 2 : if (!tmp_new_slot)
1327 : {
1328 2 : tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1329 2 : ExecClearTuple(tmp_new_slot);
1330 :
1331 2 : memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1332 2 : desc->natts * sizeof(Datum));
1333 2 : memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1334 2 : desc->natts * sizeof(bool));
1335 : }
1336 :
1337 2 : tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1338 2 : tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1339 : }
1340 : }
1341 :
1342 8 : ecxt->ecxt_scantuple = old_slot;
1343 8 : old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1344 :
1345 8 : if (tmp_new_slot)
1346 : {
1347 2 : ExecStoreVirtualTuple(tmp_new_slot);
1348 2 : ecxt->ecxt_scantuple = tmp_new_slot;
1349 : }
1350 : else
1351 6 : ecxt->ecxt_scantuple = new_slot;
1352 :
1353 8 : new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1354 :
1355 : /*
1356 : * Case 1: if both tuples don't match the row filter, bailout. Send
1357 : * nothing.
1358 : */
1359 8 : if (!old_matched && !new_matched)
1360 0 : return false;
1361 :
1362 : /*
1363 : * Case 2: if the old tuple doesn't satisfy the row filter but the new
1364 : * tuple does, transform the UPDATE into INSERT.
1365 : *
1366 : * Use the newly transformed tuple that must contain the column values for
1367 : * all the replica identity columns. This is required to ensure that the
1368 : * while inserting the tuple in the downstream node, we have all the
1369 : * required column values.
1370 : */
1371 8 : if (!old_matched && new_matched)
1372 : {
1373 4 : *action = REORDER_BUFFER_CHANGE_INSERT;
1374 :
1375 4 : if (tmp_new_slot)
1376 2 : *new_slot_ptr = tmp_new_slot;
1377 : }
1378 :
1379 : /*
1380 : * Case 3: if the old tuple satisfies the row filter but the new tuple
1381 : * doesn't, transform the UPDATE into DELETE.
1382 : *
1383 : * This transformation does not require another tuple. The Old tuple will
1384 : * be used for DELETE.
1385 : */
1386 4 : else if (old_matched && !new_matched)
1387 2 : *action = REORDER_BUFFER_CHANGE_DELETE;
1388 :
1389 : /*
1390 : * Case 4: if both tuples match the row filter, transformation isn't
1391 : * required. (*action is default UPDATE).
1392 : */
1393 :
1394 8 : return true;
1395 : }
1396 :
1397 : /*
1398 : * Sends the decoded DML over wire.
1399 : *
1400 : * This is called both in streaming and non-streaming modes.
1401 : */
1402 : static void
1403 366474 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1404 : Relation relation, ReorderBufferChange *change)
1405 : {
1406 366474 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1407 366474 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1408 : MemoryContext old;
1409 : RelationSyncEntry *relentry;
1410 366474 : TransactionId xid = InvalidTransactionId;
1411 366474 : Relation ancestor = NULL;
1412 366474 : Relation targetrel = relation;
1413 366474 : ReorderBufferChangeType action = change->action;
1414 366474 : TupleTableSlot *old_slot = NULL;
1415 366474 : TupleTableSlot *new_slot = NULL;
1416 :
1417 366474 : if (!is_publishable_relation(relation))
1418 2244 : return;
1419 :
1420 : /*
1421 : * Remember the xid for the change in streaming mode. We need to send xid
1422 : * with each change in the streaming mode so that subscriber can make
1423 : * their association and on aborts, it can discard the corresponding
1424 : * changes.
1425 : */
1426 366470 : if (data->in_streaming)
1427 351882 : xid = change->txn->xid;
1428 :
1429 366470 : relentry = get_rel_sync_entry(data, relation);
1430 :
1431 : /* First check the table filter */
1432 366464 : switch (action)
1433 : {
1434 211750 : case REORDER_BUFFER_CHANGE_INSERT:
1435 211750 : if (!relentry->pubactions.pubinsert)
1436 86 : return;
1437 211664 : break;
1438 68928 : case REORDER_BUFFER_CHANGE_UPDATE:
1439 68928 : if (!relentry->pubactions.pubupdate)
1440 86 : return;
1441 68842 : break;
1442 85786 : case REORDER_BUFFER_CHANGE_DELETE:
1443 85786 : if (!relentry->pubactions.pubdelete)
1444 2068 : return;
1445 :
1446 : /*
1447 : * This is only possible if deletes are allowed even when replica
1448 : * identity is not defined for a table. Since the DELETE action
1449 : * can't be published, we simply return.
1450 : */
1451 83718 : if (!change->data.tp.oldtuple)
1452 : {
1453 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1454 0 : return;
1455 : }
1456 83718 : break;
1457 364224 : default:
1458 : Assert(false);
1459 : }
1460 :
1461 : /* Avoid leaking memory by using and resetting our own context */
1462 364224 : old = MemoryContextSwitchTo(data->context);
1463 :
1464 : /* Switch relation if publishing via root. */
1465 364224 : if (relentry->publish_as_relid != RelationGetRelid(relation))
1466 : {
1467 : Assert(relation->rd_rel->relispartition);
1468 100 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1469 100 : targetrel = ancestor;
1470 : }
1471 :
1472 364224 : if (change->data.tp.oldtuple)
1473 : {
1474 83946 : old_slot = relentry->old_slot;
1475 83946 : ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1476 :
1477 : /* Convert tuple if needed. */
1478 83946 : if (relentry->attrmap)
1479 : {
1480 0 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1481 : &TTSOpsVirtual);
1482 :
1483 0 : old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1484 : }
1485 : }
1486 :
1487 364224 : if (change->data.tp.newtuple)
1488 : {
1489 280506 : new_slot = relentry->new_slot;
1490 280506 : ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1491 :
1492 : /* Convert tuple if needed. */
1493 280506 : if (relentry->attrmap)
1494 : {
1495 18 : TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1496 : &TTSOpsVirtual);
1497 :
1498 18 : new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1499 : }
1500 : }
1501 :
1502 : /*
1503 : * Check row filter.
1504 : *
1505 : * Updates could be transformed to inserts or deletes based on the results
1506 : * of the row filter for old and new tuple.
1507 : */
1508 364224 : if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1509 22 : goto cleanup;
1510 :
1511 : /*
1512 : * Send BEGIN if we haven't yet.
1513 : *
1514 : * We send the BEGIN message after ensuring that we will actually send the
1515 : * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1516 : * transactions.
1517 : */
1518 364202 : if (txndata && !txndata->sent_begin_txn)
1519 688 : pgoutput_send_begin(ctx, txn);
1520 :
1521 : /*
1522 : * Schema should be sent using the original relation because it also sends
1523 : * the ancestor's relation.
1524 : */
1525 364202 : maybe_send_schema(ctx, change, relation, relentry);
1526 :
1527 364200 : OutputPluginPrepareWrite(ctx, true);
1528 :
1529 : /* Send the data */
1530 364200 : switch (action)
1531 : {
1532 211644 : case REORDER_BUFFER_CHANGE_INSERT:
1533 211644 : logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1534 211644 : data->binary, relentry->columns);
1535 211644 : break;
1536 68836 : case REORDER_BUFFER_CHANGE_UPDATE:
1537 68836 : logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1538 68836 : new_slot, data->binary, relentry->columns);
1539 68836 : break;
1540 83720 : case REORDER_BUFFER_CHANGE_DELETE:
1541 83720 : logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1542 83720 : data->binary, relentry->columns);
1543 83720 : break;
1544 364200 : default:
1545 : Assert(false);
1546 : }
1547 :
1548 364200 : OutputPluginWrite(ctx, true);
1549 :
1550 364220 : cleanup:
1551 364220 : if (RelationIsValid(ancestor))
1552 : {
1553 98 : RelationClose(ancestor);
1554 98 : ancestor = NULL;
1555 : }
1556 :
1557 364220 : MemoryContextSwitchTo(old);
1558 364220 : MemoryContextReset(data->context);
1559 : }
1560 :
1561 : static void
1562 24 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1563 : int nrelations, Relation relations[], ReorderBufferChange *change)
1564 : {
1565 24 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1566 24 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1567 : MemoryContext old;
1568 : RelationSyncEntry *relentry;
1569 : int i;
1570 : int nrelids;
1571 : Oid *relids;
1572 24 : TransactionId xid = InvalidTransactionId;
1573 :
1574 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1575 24 : if (data->in_streaming)
1576 0 : xid = change->txn->xid;
1577 :
1578 24 : old = MemoryContextSwitchTo(data->context);
1579 :
1580 24 : relids = palloc0(nrelations * sizeof(Oid));
1581 24 : nrelids = 0;
1582 :
1583 66 : for (i = 0; i < nrelations; i++)
1584 : {
1585 42 : Relation relation = relations[i];
1586 42 : Oid relid = RelationGetRelid(relation);
1587 :
1588 42 : if (!is_publishable_relation(relation))
1589 0 : continue;
1590 :
1591 42 : relentry = get_rel_sync_entry(data, relation);
1592 :
1593 42 : if (!relentry->pubactions.pubtruncate)
1594 16 : continue;
1595 :
1596 : /*
1597 : * Don't send partitions if the publication wants to send only the
1598 : * root tables through it.
1599 : */
1600 26 : if (relation->rd_rel->relispartition &&
1601 20 : relentry->publish_as_relid != relid)
1602 0 : continue;
1603 :
1604 26 : relids[nrelids++] = relid;
1605 :
1606 : /* Send BEGIN if we haven't yet */
1607 26 : if (txndata && !txndata->sent_begin_txn)
1608 16 : pgoutput_send_begin(ctx, txn);
1609 :
1610 26 : maybe_send_schema(ctx, change, relation, relentry);
1611 : }
1612 :
1613 24 : if (nrelids > 0)
1614 : {
1615 16 : OutputPluginPrepareWrite(ctx, true);
1616 16 : logicalrep_write_truncate(ctx->out,
1617 : xid,
1618 : nrelids,
1619 : relids,
1620 16 : change->data.truncate.cascade,
1621 16 : change->data.truncate.restart_seqs);
1622 16 : OutputPluginWrite(ctx, true);
1623 : }
1624 :
1625 24 : MemoryContextSwitchTo(old);
1626 24 : MemoryContextReset(data->context);
1627 24 : }
1628 :
1629 : static void
1630 14 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1631 : XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1632 : const char *message)
1633 : {
1634 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1635 14 : TransactionId xid = InvalidTransactionId;
1636 :
1637 14 : if (!data->messages)
1638 4 : return;
1639 :
1640 : /*
1641 : * Remember the xid for the message in streaming mode. See
1642 : * pgoutput_change.
1643 : */
1644 10 : if (data->in_streaming)
1645 0 : xid = txn->xid;
1646 :
1647 : /*
1648 : * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1649 : */
1650 10 : if (transactional)
1651 : {
1652 4 : PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1653 :
1654 : /* Send BEGIN if we haven't yet */
1655 4 : if (txndata && !txndata->sent_begin_txn)
1656 4 : pgoutput_send_begin(ctx, txn);
1657 : }
1658 :
1659 10 : OutputPluginPrepareWrite(ctx, true);
1660 10 : logicalrep_write_message(ctx->out,
1661 : xid,
1662 : message_lsn,
1663 : transactional,
1664 : prefix,
1665 : sz,
1666 : message);
1667 10 : OutputPluginWrite(ctx, true);
1668 : }
1669 :
1670 : /*
1671 : * Return true if the data is associated with an origin and the user has
1672 : * requested the changes that don't have an origin, false otherwise.
1673 : */
1674 : static bool
1675 980460 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
1676 : RepOriginId origin_id)
1677 : {
1678 980460 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1679 :
1680 980460 : if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1681 96 : return true;
1682 :
1683 980364 : return false;
1684 : }
1685 :
1686 : /*
1687 : * Shutdown the output plugin.
1688 : *
1689 : * Note, we don't need to clean the data->context and data->cachectx as
1690 : * they are child contexts of the ctx->context so they will be cleaned up by
1691 : * logical decoding machinery.
1692 : */
1693 : static void
1694 886 : pgoutput_shutdown(LogicalDecodingContext *ctx)
1695 : {
1696 886 : if (RelationSyncCache)
1697 : {
1698 336 : hash_destroy(RelationSyncCache);
1699 336 : RelationSyncCache = NULL;
1700 : }
1701 886 : }
1702 :
1703 : /*
1704 : * Load publications from the list of publication names.
1705 : */
1706 : static List *
1707 288 : LoadPublications(List *pubnames)
1708 : {
1709 288 : List *result = NIL;
1710 : ListCell *lc;
1711 :
1712 658 : foreach(lc, pubnames)
1713 : {
1714 374 : char *pubname = (char *) lfirst(lc);
1715 374 : Publication *pub = GetPublicationByName(pubname, false);
1716 :
1717 370 : result = lappend(result, pub);
1718 : }
1719 :
1720 284 : return result;
1721 : }
1722 :
1723 : /*
1724 : * Publication syscache invalidation callback.
1725 : *
1726 : * Called for invalidations on pg_publication.
1727 : */
1728 : static void
1729 450 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1730 : {
1731 450 : publications_valid = false;
1732 :
1733 : /*
1734 : * Also invalidate per-relation cache so that next time the filtering info
1735 : * is checked it will be updated with the new publication settings.
1736 : */
1737 450 : rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1738 450 : }
1739 :
1740 : /*
1741 : * START STREAM callback
1742 : */
1743 : static void
1744 1274 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1745 : ReorderBufferTXN *txn)
1746 : {
1747 1274 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1748 1274 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1749 :
1750 : /* we can't nest streaming of transactions */
1751 : Assert(!data->in_streaming);
1752 :
1753 : /*
1754 : * If we already sent the first stream for this transaction then don't
1755 : * send the origin id in the subsequent streams.
1756 : */
1757 1274 : if (rbtxn_is_streamed(txn))
1758 1148 : send_replication_origin = false;
1759 :
1760 1274 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
1761 1274 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1762 :
1763 1274 : send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1764 : send_replication_origin);
1765 :
1766 1274 : OutputPluginWrite(ctx, true);
1767 :
1768 : /* we're streaming a chunk of transaction now */
1769 1274 : data->in_streaming = true;
1770 1274 : }
1771 :
1772 : /*
1773 : * STOP STREAM callback
1774 : */
1775 : static void
1776 1274 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1777 : ReorderBufferTXN *txn)
1778 : {
1779 1274 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1780 :
1781 : /* we should be streaming a transaction */
1782 : Assert(data->in_streaming);
1783 :
1784 1274 : OutputPluginPrepareWrite(ctx, true);
1785 1274 : logicalrep_write_stream_stop(ctx->out);
1786 1274 : OutputPluginWrite(ctx, true);
1787 :
1788 : /* we've stopped streaming a transaction */
1789 1274 : data->in_streaming = false;
1790 1274 : }
1791 :
1792 : /*
1793 : * Notify downstream to discard the streamed transaction (along with all
1794 : * it's subtransactions, if it's a toplevel transaction).
1795 : */
1796 : static void
1797 52 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1798 : ReorderBufferTXN *txn,
1799 : XLogRecPtr abort_lsn)
1800 : {
1801 : ReorderBufferTXN *toptxn;
1802 52 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1803 52 : bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1804 :
1805 : /*
1806 : * The abort should happen outside streaming block, even for streamed
1807 : * transactions. The transaction has to be marked as streamed, though.
1808 : */
1809 : Assert(!data->in_streaming);
1810 :
1811 : /* determine the toplevel transaction */
1812 52 : toptxn = rbtxn_get_toptxn(txn);
1813 :
1814 : Assert(rbtxn_is_streamed(toptxn));
1815 :
1816 52 : OutputPluginPrepareWrite(ctx, true);
1817 52 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1818 : txn->xact_time.abort_time, write_abort_info);
1819 :
1820 52 : OutputPluginWrite(ctx, true);
1821 :
1822 52 : cleanup_rel_sync_cache(toptxn->xid, false);
1823 52 : }
1824 :
1825 : /*
1826 : * Notify downstream to apply the streamed transaction (along with all
1827 : * it's subtransactions).
1828 : */
1829 : static void
1830 92 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1831 : ReorderBufferTXN *txn,
1832 : XLogRecPtr commit_lsn)
1833 : {
1834 92 : PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1835 :
1836 : /*
1837 : * The commit should happen outside streaming block, even for streamed
1838 : * transactions. The transaction has to be marked as streamed, though.
1839 : */
1840 : Assert(!data->in_streaming);
1841 : Assert(rbtxn_is_streamed(txn));
1842 :
1843 92 : OutputPluginUpdateProgress(ctx, false);
1844 :
1845 92 : OutputPluginPrepareWrite(ctx, true);
1846 92 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1847 92 : OutputPluginWrite(ctx, true);
1848 :
1849 92 : cleanup_rel_sync_cache(txn->xid, true);
1850 92 : }
1851 :
1852 : /*
1853 : * PREPARE callback (for streaming two-phase commit).
1854 : *
1855 : * Notify the downstream to prepare the transaction.
1856 : */
1857 : static void
1858 28 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1859 : ReorderBufferTXN *txn,
1860 : XLogRecPtr prepare_lsn)
1861 : {
1862 : Assert(rbtxn_is_streamed(txn));
1863 :
1864 28 : OutputPluginUpdateProgress(ctx, false);
1865 28 : OutputPluginPrepareWrite(ctx, true);
1866 28 : logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1867 28 : OutputPluginWrite(ctx, true);
1868 28 : }
1869 :
1870 : /*
1871 : * Initialize the relation schema sync cache for a decoding session.
1872 : *
1873 : * The hash table is destroyed at the end of a decoding session. While
1874 : * relcache invalidations still exist and will still be invoked, they
1875 : * will just see the null hash table global and take no action.
1876 : */
1877 : static void
1878 640 : init_rel_sync_cache(MemoryContext cachectx)
1879 : {
1880 : HASHCTL ctl;
1881 : static bool relation_callbacks_registered = false;
1882 :
1883 : /* Nothing to do if hash table already exists */
1884 640 : if (RelationSyncCache != NULL)
1885 2 : return;
1886 :
1887 : /* Make a new hash table for the cache */
1888 640 : ctl.keysize = sizeof(Oid);
1889 640 : ctl.entrysize = sizeof(RelationSyncEntry);
1890 640 : ctl.hcxt = cachectx;
1891 :
1892 640 : RelationSyncCache = hash_create("logical replication output relation cache",
1893 : 128, &ctl,
1894 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1895 :
1896 : Assert(RelationSyncCache != NULL);
1897 :
1898 : /* No more to do if we already registered callbacks */
1899 640 : if (relation_callbacks_registered)
1900 2 : return;
1901 :
1902 : /* We must update the cache entry for a relation after a relcache flush */
1903 638 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1904 :
1905 : /*
1906 : * Flush all cache entries after a pg_namespace change, in case it was a
1907 : * schema rename affecting a relation being replicated.
1908 : */
1909 638 : CacheRegisterSyscacheCallback(NAMESPACEOID,
1910 : rel_sync_cache_publication_cb,
1911 : (Datum) 0);
1912 :
1913 : /*
1914 : * Flush all cache entries after any publication changes. (We need no
1915 : * callback entry for pg_publication, because publication_invalidation_cb
1916 : * will take care of it.)
1917 : */
1918 638 : CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1919 : rel_sync_cache_publication_cb,
1920 : (Datum) 0);
1921 638 : CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1922 : rel_sync_cache_publication_cb,
1923 : (Datum) 0);
1924 :
1925 638 : relation_callbacks_registered = true;
1926 : }
1927 :
1928 : /*
1929 : * We expect relatively small number of streamed transactions.
1930 : */
1931 : static bool
1932 351882 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1933 : {
1934 351882 : return list_member_xid(entry->streamed_txns, xid);
1935 : }
1936 :
1937 : /*
1938 : * Add the xid in the rel sync entry for which we have already sent the schema
1939 : * of the relation.
1940 : */
1941 : static void
1942 144 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
1943 : {
1944 : MemoryContext oldctx;
1945 :
1946 144 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1947 :
1948 144 : entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1949 :
1950 144 : MemoryContextSwitchTo(oldctx);
1951 144 : }
1952 :
1953 : /*
1954 : * Find or create entry in the relation schema cache.
1955 : *
1956 : * This looks up publications that the given relation is directly or
1957 : * indirectly part of (the latter if it's really the relation's ancestor that
1958 : * is part of a publication) and fills up the found entry with the information
1959 : * about which operations to publish and whether to use an ancestor's schema
1960 : * when publishing.
1961 : */
1962 : static RelationSyncEntry *
1963 366512 : get_rel_sync_entry(PGOutputData *data, Relation relation)
1964 : {
1965 : RelationSyncEntry *entry;
1966 : bool found;
1967 : MemoryContext oldctx;
1968 366512 : Oid relid = RelationGetRelid(relation);
1969 :
1970 : Assert(RelationSyncCache != NULL);
1971 :
1972 : /* Find cached relation info, creating if not found */
1973 366512 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
1974 : &relid,
1975 : HASH_ENTER, &found);
1976 : Assert(entry != NULL);
1977 :
1978 : /* initialize entry, if it's new */
1979 366512 : if (!found)
1980 : {
1981 450 : entry->replicate_valid = false;
1982 450 : entry->schema_sent = false;
1983 450 : entry->streamed_txns = NIL;
1984 450 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1985 450 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1986 450 : entry->new_slot = NULL;
1987 450 : entry->old_slot = NULL;
1988 450 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
1989 450 : entry->entry_cxt = NULL;
1990 450 : entry->publish_as_relid = InvalidOid;
1991 450 : entry->columns = NULL;
1992 450 : entry->attrmap = NULL;
1993 : }
1994 :
1995 : /* Validate the entry */
1996 366512 : if (!entry->replicate_valid)
1997 : {
1998 562 : Oid schemaId = get_rel_namespace(relid);
1999 562 : List *pubids = GetRelationPublications(relid);
2000 :
2001 : /*
2002 : * We don't acquire a lock on the namespace system table as we build
2003 : * the cache entry using a historic snapshot and all the later changes
2004 : * are absorbed while decoding WAL.
2005 : */
2006 562 : List *schemaPubids = GetSchemaPublications(schemaId);
2007 : ListCell *lc;
2008 562 : Oid publish_as_relid = relid;
2009 562 : int publish_ancestor_level = 0;
2010 562 : bool am_partition = get_rel_relispartition(relid);
2011 562 : char relkind = get_rel_relkind(relid);
2012 562 : List *rel_publications = NIL;
2013 :
2014 : /* Reload publications if needed before use. */
2015 562 : if (!publications_valid)
2016 : {
2017 288 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2018 288 : if (data->publications)
2019 : {
2020 38 : list_free_deep(data->publications);
2021 38 : data->publications = NIL;
2022 : }
2023 288 : data->publications = LoadPublications(data->publication_names);
2024 284 : MemoryContextSwitchTo(oldctx);
2025 284 : publications_valid = true;
2026 : }
2027 :
2028 : /*
2029 : * Reset schema_sent status as the relation definition may have
2030 : * changed. Also reset pubactions to empty in case rel was dropped
2031 : * from a publication. Also free any objects that depended on the
2032 : * earlier definition.
2033 : */
2034 558 : entry->schema_sent = false;
2035 558 : list_free(entry->streamed_txns);
2036 558 : entry->streamed_txns = NIL;
2037 558 : bms_free(entry->columns);
2038 558 : entry->columns = NULL;
2039 558 : entry->pubactions.pubinsert = false;
2040 558 : entry->pubactions.pubupdate = false;
2041 558 : entry->pubactions.pubdelete = false;
2042 558 : entry->pubactions.pubtruncate = false;
2043 :
2044 : /*
2045 : * Tuple slots cleanups. (Will be rebuilt later if needed).
2046 : */
2047 558 : if (entry->old_slot)
2048 100 : ExecDropSingleTupleTableSlot(entry->old_slot);
2049 558 : if (entry->new_slot)
2050 100 : ExecDropSingleTupleTableSlot(entry->new_slot);
2051 :
2052 558 : entry->old_slot = NULL;
2053 558 : entry->new_slot = NULL;
2054 :
2055 558 : if (entry->attrmap)
2056 0 : free_attrmap(entry->attrmap);
2057 558 : entry->attrmap = NULL;
2058 :
2059 : /*
2060 : * Row filter cache cleanups.
2061 : */
2062 558 : if (entry->entry_cxt)
2063 20 : MemoryContextDelete(entry->entry_cxt);
2064 :
2065 558 : entry->entry_cxt = NULL;
2066 558 : entry->estate = NULL;
2067 558 : memset(entry->exprstate, 0, sizeof(entry->exprstate));
2068 :
2069 : /*
2070 : * Build publication cache. We can't use one provided by relcache as
2071 : * relcache considers all publications that the given relation is in,
2072 : * but here we only need to consider ones that the subscriber
2073 : * requested.
2074 : */
2075 1412 : foreach(lc, data->publications)
2076 : {
2077 854 : Publication *pub = lfirst(lc);
2078 854 : bool publish = false;
2079 :
2080 : /*
2081 : * Under what relid should we publish changes in this publication?
2082 : * We'll use the top-most relid across all publications. Also
2083 : * track the ancestor level for this publication.
2084 : */
2085 854 : Oid pub_relid = relid;
2086 854 : int ancestor_level = 0;
2087 :
2088 : /*
2089 : * If this is a FOR ALL TABLES publication, pick the partition
2090 : * root and set the ancestor level accordingly.
2091 : */
2092 854 : if (pub->alltables)
2093 : {
2094 140 : publish = true;
2095 140 : if (pub->pubviaroot && am_partition)
2096 : {
2097 26 : List *ancestors = get_partition_ancestors(relid);
2098 :
2099 26 : pub_relid = llast_oid(ancestors);
2100 26 : ancestor_level = list_length(ancestors);
2101 : }
2102 : }
2103 :
2104 854 : if (!publish)
2105 : {
2106 714 : bool ancestor_published = false;
2107 :
2108 : /*
2109 : * For a partition, check if any of the ancestors are
2110 : * published. If so, note down the topmost ancestor that is
2111 : * published via this publication, which will be used as the
2112 : * relation via which to publish the partition's changes.
2113 : */
2114 714 : if (am_partition)
2115 : {
2116 : Oid ancestor;
2117 : int level;
2118 198 : List *ancestors = get_partition_ancestors(relid);
2119 :
2120 198 : ancestor = GetTopMostAncestorInPublication(pub->oid,
2121 : ancestors,
2122 : &level);
2123 :
2124 198 : if (ancestor != InvalidOid)
2125 : {
2126 80 : ancestor_published = true;
2127 80 : if (pub->pubviaroot)
2128 : {
2129 34 : pub_relid = ancestor;
2130 34 : ancestor_level = level;
2131 : }
2132 : }
2133 : }
2134 :
2135 1114 : if (list_member_oid(pubids, pub->oid) ||
2136 788 : list_member_oid(schemaPubids, pub->oid) ||
2137 : ancestor_published)
2138 376 : publish = true;
2139 : }
2140 :
2141 : /*
2142 : * If the relation is to be published, determine actions to
2143 : * publish, and list of columns, if appropriate.
2144 : *
2145 : * Don't publish changes for partitioned tables, because
2146 : * publishing those of its partitions suffices, unless partition
2147 : * changes won't be published due to pubviaroot being set.
2148 : */
2149 854 : if (publish &&
2150 6 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2151 : {
2152 510 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2153 510 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2154 510 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2155 510 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2156 :
2157 : /*
2158 : * We want to publish the changes as the top-most ancestor
2159 : * across all publications. So we need to check if the already
2160 : * calculated level is higher than the new one. If yes, we can
2161 : * ignore the new value (as it's a child). Otherwise the new
2162 : * value is an ancestor, so we keep it.
2163 : */
2164 510 : if (publish_ancestor_level > ancestor_level)
2165 2 : continue;
2166 :
2167 : /*
2168 : * If we found an ancestor higher up in the tree, discard the
2169 : * list of publications through which we replicate it, and use
2170 : * the new ancestor.
2171 : */
2172 508 : if (publish_ancestor_level < ancestor_level)
2173 : {
2174 60 : publish_as_relid = pub_relid;
2175 60 : publish_ancestor_level = ancestor_level;
2176 :
2177 : /* reset the publication list for this relation */
2178 60 : rel_publications = NIL;
2179 : }
2180 : else
2181 : {
2182 : /* Same ancestor level, has to be the same OID. */
2183 : Assert(publish_as_relid == pub_relid);
2184 : }
2185 :
2186 : /* Track publications for this ancestor. */
2187 508 : rel_publications = lappend(rel_publications, pub);
2188 : }
2189 : }
2190 :
2191 558 : entry->publish_as_relid = publish_as_relid;
2192 :
2193 : /*
2194 : * Initialize the tuple slot, map, and row filter. These are only used
2195 : * when publishing inserts, updates, or deletes.
2196 : */
2197 558 : if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2198 76 : entry->pubactions.pubdelete)
2199 : {
2200 : /* Initialize the tuple slot and map */
2201 482 : init_tuple_slot(data, relation, entry);
2202 :
2203 : /* Initialize the row filter */
2204 482 : pgoutput_row_filter_init(data, rel_publications, entry);
2205 :
2206 : /* Initialize the column list */
2207 482 : pgoutput_column_list_init(data, rel_publications, entry);
2208 : }
2209 :
2210 556 : list_free(pubids);
2211 556 : list_free(schemaPubids);
2212 556 : list_free(rel_publications);
2213 :
2214 556 : entry->replicate_valid = true;
2215 : }
2216 :
2217 366506 : return entry;
2218 : }
2219 :
2220 : /*
2221 : * Cleanup list of streamed transactions and update the schema_sent flag.
2222 : *
2223 : * When a streamed transaction commits or aborts, we need to remove the
2224 : * toplevel XID from the schema cache. If the transaction aborted, the
2225 : * subscriber will simply throw away the schema records we streamed, so
2226 : * we don't need to do anything else.
2227 : *
2228 : * If the transaction is committed, the subscriber will update the relation
2229 : * cache - so tweak the schema_sent flag accordingly.
2230 : */
2231 : static void
2232 144 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2233 : {
2234 : HASH_SEQ_STATUS hash_seq;
2235 : RelationSyncEntry *entry;
2236 :
2237 : Assert(RelationSyncCache != NULL);
2238 :
2239 144 : hash_seq_init(&hash_seq, RelationSyncCache);
2240 294 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
2241 : {
2242 : /*
2243 : * We can set the schema_sent flag for an entry that has committed xid
2244 : * in the list as that ensures that the subscriber would have the
2245 : * corresponding schema and we don't need to send it unless there is
2246 : * any invalidation for that relation.
2247 : */
2248 340 : foreach_xid(streamed_txn, entry->streamed_txns)
2249 : {
2250 148 : if (xid == streamed_txn)
2251 : {
2252 108 : if (is_commit)
2253 86 : entry->schema_sent = true;
2254 :
2255 108 : entry->streamed_txns =
2256 108 : foreach_delete_current(entry->streamed_txns, streamed_txn);
2257 108 : break;
2258 : }
2259 : }
2260 : }
2261 144 : }
2262 :
2263 : /*
2264 : * Relcache invalidation callback
2265 : */
2266 : static void
2267 6252 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
2268 : {
2269 : RelationSyncEntry *entry;
2270 :
2271 : /*
2272 : * We can get here if the plugin was used in SQL interface as the
2273 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2274 : * no way to unregister the relcache invalidation callback.
2275 : */
2276 6252 : if (RelationSyncCache == NULL)
2277 18 : return;
2278 :
2279 : /*
2280 : * Nobody keeps pointers to entries in this hash table around outside
2281 : * logical decoding callback calls - but invalidation events can come in
2282 : * *during* a callback if we do any syscache access in the callback.
2283 : * Because of that we must mark the cache entry as invalid but not damage
2284 : * any of its substructure here. The next get_rel_sync_entry() call will
2285 : * rebuild it all.
2286 : */
2287 6234 : if (OidIsValid(relid))
2288 : {
2289 : /*
2290 : * Getting invalidations for relations that aren't in the table is
2291 : * entirely normal. So we don't care if it's found or not.
2292 : */
2293 6182 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2294 : HASH_FIND, NULL);
2295 6182 : if (entry != NULL)
2296 966 : entry->replicate_valid = false;
2297 : }
2298 : else
2299 : {
2300 : /* Whole cache must be flushed. */
2301 : HASH_SEQ_STATUS status;
2302 :
2303 52 : hash_seq_init(&status, RelationSyncCache);
2304 112 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2305 : {
2306 60 : entry->replicate_valid = false;
2307 : }
2308 : }
2309 : }
2310 :
2311 : /*
2312 : * Publication relation/schema map syscache invalidation callback
2313 : *
2314 : * Called for invalidations on pg_publication, pg_publication_rel,
2315 : * pg_publication_namespace, and pg_namespace.
2316 : */
2317 : static void
2318 1246 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2319 : {
2320 : HASH_SEQ_STATUS status;
2321 : RelationSyncEntry *entry;
2322 :
2323 : /*
2324 : * We can get here if the plugin was used in SQL interface as the
2325 : * RelationSyncCache is destroyed when the decoding finishes, but there is
2326 : * no way to unregister the invalidation callbacks.
2327 : */
2328 1246 : if (RelationSyncCache == NULL)
2329 68 : return;
2330 :
2331 : /*
2332 : * We have no easy way to identify which cache entries this invalidation
2333 : * event might have affected, so just mark them all invalid.
2334 : */
2335 1178 : hash_seq_init(&status, RelationSyncCache);
2336 3660 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2337 : {
2338 2482 : entry->replicate_valid = false;
2339 : }
2340 : }
2341 :
2342 : /* Send Replication origin */
2343 : static void
2344 2018 : send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2345 : XLogRecPtr origin_lsn, bool send_origin)
2346 : {
2347 2018 : if (send_origin)
2348 : {
2349 : char *origin;
2350 :
2351 : /*----------
2352 : * XXX: which behaviour do we want here?
2353 : *
2354 : * Alternatives:
2355 : * - don't send origin message if origin name not found
2356 : * (that's what we do now)
2357 : * - throw error - that will break replication, not good
2358 : * - send some special "unknown" origin
2359 : *----------
2360 : */
2361 16 : if (replorigin_by_oid(origin_id, true, &origin))
2362 : {
2363 : /* Message boundary */
2364 16 : OutputPluginWrite(ctx, false);
2365 16 : OutputPluginPrepareWrite(ctx, true);
2366 :
2367 16 : logicalrep_write_origin(ctx->out, origin, origin_lsn);
2368 : }
2369 : }
2370 2018 : }
|