Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgoutput.c
4 : * Logical Replication output plugin
5 : *
6 : * Copyright (c) 2012-2021, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/pgoutput/pgoutput.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/tupconvert.h"
16 : #include "catalog/partition.h"
17 : #include "catalog/pg_publication.h"
18 : #include "commands/defrem.h"
19 : #include "fmgr.h"
20 : #include "replication/logical.h"
21 : #include "replication/logicalproto.h"
22 : #include "replication/origin.h"
23 : #include "replication/pgoutput.h"
24 : #include "utils/int8.h"
25 : #include "utils/inval.h"
26 : #include "utils/lsyscache.h"
27 : #include "utils/memutils.h"
28 : #include "utils/syscache.h"
29 : #include "utils/varlena.h"
30 :
31 242 : PG_MODULE_MAGIC;
32 :
33 : extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
34 :
35 : static void pgoutput_startup(LogicalDecodingContext *ctx,
36 : OutputPluginOptions *opt, bool is_init);
37 : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
38 : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
39 : ReorderBufferTXN *txn);
40 : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
41 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
42 : static void pgoutput_change(LogicalDecodingContext *ctx,
43 : ReorderBufferTXN *txn, Relation rel,
44 : ReorderBufferChange *change);
45 : static void pgoutput_truncate(LogicalDecodingContext *ctx,
46 : ReorderBufferTXN *txn, int nrelations, Relation relations[],
47 : ReorderBufferChange *change);
48 : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
49 : RepOriginId origin_id);
50 : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
51 : ReorderBufferTXN *txn);
52 : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
53 : ReorderBufferTXN *txn);
54 : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn,
56 : XLogRecPtr abort_lsn);
57 : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
58 : ReorderBufferTXN *txn,
59 : XLogRecPtr commit_lsn);
60 :
61 : static bool publications_valid;
62 : static bool in_streaming;
63 :
64 : static List *LoadPublications(List *pubnames);
65 : static void publication_invalidation_cb(Datum arg, int cacheid,
66 : uint32 hashvalue);
67 : static void send_relation_and_attrs(Relation relation, TransactionId xid,
68 : LogicalDecodingContext *ctx);
69 :
70 : /*
71 : * Entry in the map used to remember which relation schemas we sent.
72 : *
73 : * The schema_sent flag determines if the current schema record was already
74 : * sent to the subscriber (in which case we don't need to send it again).
75 : *
76 : * The schema cache on downstream is however updated only at commit time,
77 : * and with streamed transactions the commit order may be different from
78 : * the order the transactions are sent in. Also, the (sub) transactions
79 : * might get aborted so we need to send the schema for each (sub) transaction
80 : * so that we don't lose the schema information on abort. For handling this,
81 : * we maintain the list of xids (streamed_txns) for those we have already sent
82 : * the schema.
83 : *
84 : * For partitions, 'pubactions' considers not only the table's own
85 : * publications, but also those of all of its ancestors.
86 : */
87 : typedef struct RelationSyncEntry
88 : {
89 : Oid relid; /* relation oid */
90 :
91 : /*
92 : * Did we send the schema? If ancestor relid is set, its schema must also
93 : * have been sent for this to be true.
94 : */
95 : bool schema_sent;
96 : List *streamed_txns; /* streamed toplevel transactions with this
97 : * schema */
98 :
99 : bool replicate_valid;
100 : PublicationActions pubactions;
101 :
102 : /*
103 : * OID of the relation to publish changes as. For a partition, this may
104 : * be set to one of its ancestors whose schema will be used when
105 : * replicating changes, if publish_via_partition_root is set for the
106 : * publication.
107 : */
108 : Oid publish_as_relid;
109 :
110 : /*
111 : * Map used when replicating using an ancestor's schema to convert tuples
112 : * from partition's type to the ancestor's; NULL if publish_as_relid is
113 : * same as 'relid' or if unnecessary due to partition and the ancestor
114 : * having identical TupleDesc.
115 : */
116 : TupleConversionMap *map;
117 : } RelationSyncEntry;
118 :
119 : /* Map used to remember which relation schemas we sent. */
120 : static HTAB *RelationSyncCache = NULL;
121 :
122 : static void init_rel_sync_cache(MemoryContext decoding_context);
123 : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
124 : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
125 : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
126 : static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
127 : uint32 hashvalue);
128 : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
129 : TransactionId xid);
130 : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
131 : TransactionId xid);
132 :
133 : /*
134 : * Specify output plugin callbacks
135 : */
136 : void
137 368 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
138 : {
139 : AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
140 :
141 368 : cb->startup_cb = pgoutput_startup;
142 368 : cb->begin_cb = pgoutput_begin_txn;
143 368 : cb->change_cb = pgoutput_change;
144 368 : cb->truncate_cb = pgoutput_truncate;
145 368 : cb->commit_cb = pgoutput_commit_txn;
146 368 : cb->filter_by_origin_cb = pgoutput_origin_filter;
147 368 : cb->shutdown_cb = pgoutput_shutdown;
148 :
149 : /* transaction streaming */
150 368 : cb->stream_start_cb = pgoutput_stream_start;
151 368 : cb->stream_stop_cb = pgoutput_stream_stop;
152 368 : cb->stream_abort_cb = pgoutput_stream_abort;
153 368 : cb->stream_commit_cb = pgoutput_stream_commit;
154 368 : cb->stream_change_cb = pgoutput_change;
155 368 : cb->stream_truncate_cb = pgoutput_truncate;
156 368 : }
157 :
158 : static void
159 180 : parse_output_parameters(List *options, uint32 *protocol_version,
160 : List **publication_names, bool *binary,
161 : bool *enable_streaming)
162 : {
163 : ListCell *lc;
164 180 : bool protocol_version_given = false;
165 180 : bool publication_names_given = false;
166 180 : bool binary_option_given = false;
167 180 : bool streaming_given = false;
168 :
169 180 : *binary = false;
170 :
171 572 : foreach(lc, options)
172 : {
173 392 : DefElem *defel = (DefElem *) lfirst(lc);
174 :
175 : Assert(defel->arg == NULL || IsA(defel->arg, String));
176 :
177 : /* Check each param, whether or not we recognize it */
178 392 : if (strcmp(defel->defname, "proto_version") == 0)
179 : {
180 : int64 parsed;
181 :
182 180 : if (protocol_version_given)
183 0 : ereport(ERROR,
184 : (errcode(ERRCODE_SYNTAX_ERROR),
185 : errmsg("conflicting or redundant options")));
186 180 : protocol_version_given = true;
187 :
188 180 : if (!scanint8(strVal(defel->arg), true, &parsed))
189 0 : ereport(ERROR,
190 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191 : errmsg("invalid proto_version")));
192 :
193 180 : if (parsed > PG_UINT32_MAX || parsed < 0)
194 0 : ereport(ERROR,
195 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
196 : errmsg("proto_version \"%s\" out of range",
197 : strVal(defel->arg))));
198 :
199 180 : *protocol_version = (uint32) parsed;
200 : }
201 212 : else if (strcmp(defel->defname, "publication_names") == 0)
202 : {
203 180 : if (publication_names_given)
204 0 : ereport(ERROR,
205 : (errcode(ERRCODE_SYNTAX_ERROR),
206 : errmsg("conflicting or redundant options")));
207 180 : publication_names_given = true;
208 :
209 180 : if (!SplitIdentifierString(strVal(defel->arg), ',',
210 : publication_names))
211 0 : ereport(ERROR,
212 : (errcode(ERRCODE_INVALID_NAME),
213 : errmsg("invalid publication_names syntax")));
214 : }
215 32 : else if (strcmp(defel->defname, "binary") == 0)
216 : {
217 10 : if (binary_option_given)
218 0 : ereport(ERROR,
219 : (errcode(ERRCODE_SYNTAX_ERROR),
220 : errmsg("conflicting or redundant options")));
221 10 : binary_option_given = true;
222 :
223 10 : *binary = defGetBoolean(defel);
224 : }
225 22 : else if (strcmp(defel->defname, "streaming") == 0)
226 : {
227 22 : if (streaming_given)
228 0 : ereport(ERROR,
229 : (errcode(ERRCODE_SYNTAX_ERROR),
230 : errmsg("conflicting or redundant options")));
231 22 : streaming_given = true;
232 :
233 22 : *enable_streaming = defGetBoolean(defel);
234 : }
235 : else
236 0 : elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
237 : }
238 180 : }
239 :
240 : /*
241 : * Initialize this plugin
242 : */
243 : static void
244 368 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
245 : bool is_init)
246 : {
247 368 : bool enable_streaming = false;
248 368 : PGOutputData *data = palloc0(sizeof(PGOutputData));
249 :
250 : /* Create our memory context for private allocations. */
251 368 : data->context = AllocSetContextCreate(ctx->context,
252 : "logical replication output context",
253 : ALLOCSET_DEFAULT_SIZES);
254 :
255 368 : ctx->output_plugin_private = data;
256 :
257 : /* This plugin uses binary protocol. */
258 368 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
259 :
260 : /*
261 : * This is replication start and not slot initialization.
262 : *
263 : * Parse and validate options passed by the client.
264 : */
265 368 : if (!is_init)
266 : {
267 : /* Parse the params and ERROR if we see any we don't recognize */
268 180 : parse_output_parameters(ctx->output_plugin_options,
269 : &data->protocol_version,
270 : &data->publication_names,
271 : &data->binary,
272 : &enable_streaming);
273 :
274 : /* Check if we support requested protocol */
275 180 : if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
276 0 : ereport(ERROR,
277 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
278 : errmsg("client sent proto_version=%d but we only support protocol %d or lower",
279 : data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
280 :
281 180 : if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
282 0 : ereport(ERROR,
283 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
284 : errmsg("client sent proto_version=%d but we only support protocol %d or higher",
285 : data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
286 :
287 180 : if (list_length(data->publication_names) < 1)
288 0 : ereport(ERROR,
289 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
290 : errmsg("publication_names parameter missing")));
291 :
292 : /*
293 : * Decide whether to enable streaming. It is disabled by default, in
294 : * which case we just update the flag in decoding context. Otherwise
295 : * we only allow it with sufficient version of the protocol, and when
296 : * the output plugin supports it.
297 : */
298 180 : if (!enable_streaming)
299 158 : ctx->streaming = false;
300 22 : else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
301 0 : ereport(ERROR,
302 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
303 : errmsg("requested proto_version=%d does not support streaming, need %d or higher",
304 : data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
305 22 : else if (!ctx->streaming)
306 0 : ereport(ERROR,
307 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
308 : errmsg("streaming requested, but not supported by output plugin")));
309 :
310 : /* Also remember we're currently not streaming any transaction. */
311 180 : in_streaming = false;
312 :
313 : /* Init publication state. */
314 180 : data->publications = NIL;
315 180 : publications_valid = false;
316 180 : CacheRegisterSyscacheCallback(PUBLICATIONOID,
317 : publication_invalidation_cb,
318 : (Datum) 0);
319 :
320 : /* Initialize relation schema cache. */
321 180 : init_rel_sync_cache(CacheMemoryContext);
322 : }
323 : else
324 : {
325 : /* Disable the streaming during the slot initialization mode. */
326 188 : ctx->streaming = false;
327 : }
328 368 : }
329 :
330 : /*
331 : * BEGIN callback
332 : */
333 : static void
334 358 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
335 : {
336 358 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
337 :
338 358 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
339 358 : logicalrep_write_begin(ctx->out, txn);
340 :
341 358 : if (send_replication_origin)
342 : {
343 : char *origin;
344 :
345 : /* Message boundary */
346 0 : OutputPluginWrite(ctx, false);
347 0 : OutputPluginPrepareWrite(ctx, true);
348 :
349 : /*----------
350 : * XXX: which behaviour do we want here?
351 : *
352 : * Alternatives:
353 : * - don't send origin message if origin name not found
354 : * (that's what we do now)
355 : * - throw error - that will break replication, not good
356 : * - send some special "unknown" origin
357 : *----------
358 : */
359 0 : if (replorigin_by_oid(txn->origin_id, true, &origin))
360 0 : logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
361 : }
362 :
363 358 : OutputPluginWrite(ctx, true);
364 358 : }
365 :
366 : /*
367 : * COMMIT callback
368 : */
369 : static void
370 358 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
371 : XLogRecPtr commit_lsn)
372 : {
373 358 : OutputPluginUpdateProgress(ctx);
374 :
375 358 : OutputPluginPrepareWrite(ctx, true);
376 358 : logicalrep_write_commit(ctx->out, txn, commit_lsn);
377 358 : OutputPluginWrite(ctx, true);
378 358 : }
379 :
380 : /*
381 : * Write the current schema of the relation and its ancestor (if any) if not
382 : * done yet.
383 : */
384 : static void
385 114220 : maybe_send_schema(LogicalDecodingContext *ctx,
386 : ReorderBufferTXN *txn, ReorderBufferChange *change,
387 : Relation relation, RelationSyncEntry *relentry)
388 : {
389 : bool schema_sent;
390 114220 : TransactionId xid = InvalidTransactionId;
391 114220 : TransactionId topxid = InvalidTransactionId;
392 :
393 : /*
394 : * Remember XID of the (sub)transaction for the change. We don't care if
395 : * it's top-level transaction or not (we have already sent that XID in
396 : * start of the current streaming block).
397 : *
398 : * If we're not in a streaming block, just use InvalidTransactionId and
399 : * the write methods will not include it.
400 : */
401 114220 : if (in_streaming)
402 112748 : xid = change->txn->xid;
403 :
404 114220 : if (change->txn->toptxn)
405 25618 : topxid = change->txn->toptxn->xid;
406 : else
407 88602 : topxid = xid;
408 :
409 : /*
410 : * Do we need to send the schema? We do track streamed transactions
411 : * separately, because those may be applied later (and the regular
412 : * transactions won't see their effects until then) and in an order that
413 : * we don't know at this point.
414 : *
415 : * XXX There is a scope of optimization here. Currently, we always send
416 : * the schema first time in a streaming transaction but we can probably
417 : * avoid that by checking 'relentry->schema_sent' flag. However, before
418 : * doing that we need to study its impact on the case where we have a mix
419 : * of streaming and non-streaming transactions.
420 : */
421 114220 : if (in_streaming)
422 112748 : schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
423 : else
424 1472 : schema_sent = relentry->schema_sent;
425 :
426 114220 : if (schema_sent)
427 114062 : return;
428 :
429 : /* If needed, send the ancestor's schema first. */
430 158 : if (relentry->publish_as_relid != RelationGetRelid(relation))
431 : {
432 0 : Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
433 0 : TupleDesc indesc = RelationGetDescr(relation);
434 0 : TupleDesc outdesc = RelationGetDescr(ancestor);
435 : MemoryContext oldctx;
436 :
437 : /* Map must live as long as the session does. */
438 0 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
439 0 : relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
440 : CreateTupleDescCopy(outdesc));
441 0 : MemoryContextSwitchTo(oldctx);
442 0 : send_relation_and_attrs(ancestor, xid, ctx);
443 0 : RelationClose(ancestor);
444 : }
445 :
446 158 : send_relation_and_attrs(relation, xid, ctx);
447 :
448 158 : if (in_streaming)
449 38 : set_schema_sent_in_streamed_txn(relentry, topxid);
450 : else
451 120 : relentry->schema_sent = true;
452 : }
453 :
454 : /*
455 : * Sends a relation
456 : */
457 : static void
458 158 : send_relation_and_attrs(Relation relation, TransactionId xid,
459 : LogicalDecodingContext *ctx)
460 : {
461 158 : TupleDesc desc = RelationGetDescr(relation);
462 : int i;
463 :
464 : /*
465 : * Write out type info if needed. We do that only for user-created types.
466 : * We use FirstGenbkiObjectId as the cutoff, so that we only consider
467 : * objects with hand-assigned OIDs to be "built in", not for instance any
468 : * function or type defined in the information_schema. This is important
469 : * because only hand-assigned OIDs can be expected to remain stable across
470 : * major versions.
471 : */
472 520 : for (i = 0; i < desc->natts; i++)
473 : {
474 362 : Form_pg_attribute att = TupleDescAttr(desc, i);
475 :
476 362 : if (att->attisdropped || att->attgenerated)
477 2 : continue;
478 :
479 360 : if (att->atttypid < FirstGenbkiObjectId)
480 328 : continue;
481 :
482 32 : OutputPluginPrepareWrite(ctx, false);
483 32 : logicalrep_write_typ(ctx->out, xid, att->atttypid);
484 32 : OutputPluginWrite(ctx, false);
485 : }
486 :
487 158 : OutputPluginPrepareWrite(ctx, false);
488 158 : logicalrep_write_rel(ctx->out, xid, relation);
489 158 : OutputPluginWrite(ctx, false);
490 158 : }
491 :
492 : /*
493 : * Sends the decoded DML over wire.
494 : *
495 : * This is called both in streaming and non-streaming modes.
496 : */
497 : static void
498 116404 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
499 : Relation relation, ReorderBufferChange *change)
500 : {
501 116404 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
502 : MemoryContext old;
503 : RelationSyncEntry *relentry;
504 116404 : TransactionId xid = InvalidTransactionId;
505 116404 : Relation ancestor = NULL;
506 :
507 116404 : if (!is_publishable_relation(relation))
508 4 : return;
509 :
510 : /*
511 : * Remember the xid for the change in streaming mode. We need to send xid
512 : * with each change in the streaming mode so that subscriber can make
513 : * their association and on aborts, it can discard the corresponding
514 : * changes.
515 : */
516 116400 : if (in_streaming)
517 112748 : xid = change->txn->xid;
518 :
519 116400 : relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
520 :
521 : /* First check the table filter */
522 116400 : switch (change->action)
523 : {
524 62738 : case REORDER_BUFFER_CHANGE_INSERT:
525 62738 : if (!relentry->pubactions.pubinsert)
526 4 : return;
527 62734 : break;
528 32832 : case REORDER_BUFFER_CHANGE_UPDATE:
529 32832 : if (!relentry->pubactions.pubupdate)
530 80 : return;
531 32752 : break;
532 20830 : case REORDER_BUFFER_CHANGE_DELETE:
533 20830 : if (!relentry->pubactions.pubdelete)
534 2106 : return;
535 18724 : break;
536 114210 : default:
537 : Assert(false);
538 : }
539 :
540 : /* Avoid leaking memory by using and resetting our own context */
541 114210 : old = MemoryContextSwitchTo(data->context);
542 :
543 114210 : maybe_send_schema(ctx, txn, change, relation, relentry);
544 :
545 : /* Send the data */
546 114210 : switch (change->action)
547 : {
548 62734 : case REORDER_BUFFER_CHANGE_INSERT:
549 : {
550 62734 : HeapTuple tuple = &change->data.tp.newtuple->tuple;
551 :
552 : /* Switch relation if publishing via root. */
553 62734 : if (relentry->publish_as_relid != RelationGetRelid(relation))
554 : {
555 : Assert(relation->rd_rel->relispartition);
556 0 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
557 0 : relation = ancestor;
558 : /* Convert tuple if needed. */
559 0 : if (relentry->map)
560 0 : tuple = execute_attr_map_tuple(tuple, relentry->map);
561 : }
562 :
563 62734 : OutputPluginPrepareWrite(ctx, true);
564 62734 : logicalrep_write_insert(ctx->out, xid, relation, tuple,
565 62734 : data->binary);
566 62734 : OutputPluginWrite(ctx, true);
567 62734 : break;
568 : }
569 32752 : case REORDER_BUFFER_CHANGE_UPDATE:
570 : {
571 65504 : HeapTuple oldtuple = change->data.tp.oldtuple ?
572 32752 : &change->data.tp.oldtuple->tuple : NULL;
573 32752 : HeapTuple newtuple = &change->data.tp.newtuple->tuple;
574 :
575 : /* Switch relation if publishing via root. */
576 32752 : if (relentry->publish_as_relid != RelationGetRelid(relation))
577 : {
578 : Assert(relation->rd_rel->relispartition);
579 0 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
580 0 : relation = ancestor;
581 : /* Convert tuples if needed. */
582 0 : if (relentry->map)
583 : {
584 0 : oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
585 0 : newtuple = execute_attr_map_tuple(newtuple, relentry->map);
586 : }
587 : }
588 :
589 32752 : OutputPluginPrepareWrite(ctx, true);
590 32752 : logicalrep_write_update(ctx->out, xid, relation, oldtuple,
591 32752 : newtuple, data->binary);
592 32752 : OutputPluginWrite(ctx, true);
593 32752 : break;
594 : }
595 18724 : case REORDER_BUFFER_CHANGE_DELETE:
596 18724 : if (change->data.tp.oldtuple)
597 : {
598 18724 : HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
599 :
600 : /* Switch relation if publishing via root. */
601 18724 : if (relentry->publish_as_relid != RelationGetRelid(relation))
602 : {
603 : Assert(relation->rd_rel->relispartition);
604 0 : ancestor = RelationIdGetRelation(relentry->publish_as_relid);
605 0 : relation = ancestor;
606 : /* Convert tuple if needed. */
607 0 : if (relentry->map)
608 0 : oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
609 : }
610 :
611 18724 : OutputPluginPrepareWrite(ctx, true);
612 18724 : logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
613 18724 : data->binary);
614 18724 : OutputPluginWrite(ctx, true);
615 : }
616 : else
617 0 : elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
618 18724 : break;
619 114210 : default:
620 : Assert(false);
621 : }
622 :
623 114210 : if (RelationIsValid(ancestor))
624 : {
625 0 : RelationClose(ancestor);
626 0 : ancestor = NULL;
627 : }
628 :
629 : /* Cleanup */
630 114210 : MemoryContextSwitchTo(old);
631 114210 : MemoryContextReset(data->context);
632 : }
633 :
634 : static void
635 14 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
636 : int nrelations, Relation relations[], ReorderBufferChange *change)
637 : {
638 14 : PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
639 : MemoryContext old;
640 : RelationSyncEntry *relentry;
641 : int i;
642 : int nrelids;
643 : Oid *relids;
644 14 : TransactionId xid = InvalidTransactionId;
645 :
646 : /* Remember the xid for the change in streaming mode. See pgoutput_change. */
647 14 : if (in_streaming)
648 0 : xid = change->txn->xid;
649 :
650 14 : old = MemoryContextSwitchTo(data->context);
651 :
652 14 : relids = palloc0(nrelations * sizeof(Oid));
653 14 : nrelids = 0;
654 :
655 36 : for (i = 0; i < nrelations; i++)
656 : {
657 22 : Relation relation = relations[i];
658 22 : Oid relid = RelationGetRelid(relation);
659 :
660 22 : if (!is_publishable_relation(relation))
661 0 : continue;
662 :
663 22 : relentry = get_rel_sync_entry(data, relid);
664 :
665 22 : if (!relentry->pubactions.pubtruncate)
666 12 : continue;
667 :
668 : /*
669 : * Don't send partitions if the publication wants to send only the
670 : * root tables through it.
671 : */
672 10 : if (relation->rd_rel->relispartition &&
673 8 : relentry->publish_as_relid != relid)
674 0 : continue;
675 :
676 10 : relids[nrelids++] = relid;
677 10 : maybe_send_schema(ctx, txn, change, relation, relentry);
678 : }
679 :
680 14 : if (nrelids > 0)
681 : {
682 6 : OutputPluginPrepareWrite(ctx, true);
683 12 : logicalrep_write_truncate(ctx->out,
684 : xid,
685 : nrelids,
686 : relids,
687 6 : change->data.truncate.cascade,
688 6 : change->data.truncate.restart_seqs);
689 6 : OutputPluginWrite(ctx, true);
690 : }
691 :
692 14 : MemoryContextSwitchTo(old);
693 14 : MemoryContextReset(data->context);
694 14 : }
695 :
696 : /*
697 : * Currently we always forward.
698 : */
699 : static bool
700 160766 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
701 : RepOriginId origin_id)
702 : {
703 160766 : return false;
704 : }
705 :
706 : /*
707 : * Shutdown the output plugin.
708 : *
709 : * Note, we don't need to clean the data->context as it's child context
710 : * of the ctx->context so it will be cleaned up by logical decoding machinery.
711 : */
712 : static void
713 308 : pgoutput_shutdown(LogicalDecodingContext *ctx)
714 : {
715 308 : if (RelationSyncCache)
716 : {
717 120 : hash_destroy(RelationSyncCache);
718 120 : RelationSyncCache = NULL;
719 : }
720 308 : }
721 :
722 : /*
723 : * Load publications from the list of publication names.
724 : */
725 : static List *
726 56 : LoadPublications(List *pubnames)
727 : {
728 56 : List *result = NIL;
729 : ListCell *lc;
730 :
731 120 : foreach(lc, pubnames)
732 : {
733 64 : char *pubname = (char *) lfirst(lc);
734 64 : Publication *pub = GetPublicationByName(pubname, false);
735 :
736 64 : result = lappend(result, pub);
737 : }
738 :
739 56 : return result;
740 : }
741 :
742 : /*
743 : * Publication cache invalidation callback.
744 : */
745 : static void
746 72 : publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
747 : {
748 72 : publications_valid = false;
749 :
750 : /*
751 : * Also invalidate per-relation cache so that next time the filtering info
752 : * is checked it will be updated with the new publication settings.
753 : */
754 72 : rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
755 72 : }
756 :
757 : /*
758 : * START STREAM callback
759 : */
760 : static void
761 602 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
762 : ReorderBufferTXN *txn)
763 : {
764 602 : bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
765 :
766 : /* we can't nest streaming of transactions */
767 : Assert(!in_streaming);
768 :
769 : /*
770 : * If we already sent the first stream for this transaction then don't
771 : * send the origin id in the subsequent streams.
772 : */
773 602 : if (rbtxn_is_streamed(txn))
774 580 : send_replication_origin = false;
775 :
776 602 : OutputPluginPrepareWrite(ctx, !send_replication_origin);
777 602 : logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
778 :
779 602 : if (send_replication_origin)
780 : {
781 : char *origin;
782 :
783 : /* Message boundary */
784 0 : OutputPluginWrite(ctx, false);
785 0 : OutputPluginPrepareWrite(ctx, true);
786 :
787 0 : if (replorigin_by_oid(txn->origin_id, true, &origin))
788 0 : logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
789 : }
790 :
791 602 : OutputPluginWrite(ctx, true);
792 :
793 : /* we're streaming a chunk of transaction now */
794 602 : in_streaming = true;
795 602 : }
796 :
797 : /*
798 : * STOP STREAM callback
799 : */
800 : static void
801 602 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
802 : ReorderBufferTXN *txn)
803 : {
804 : /* we should be streaming a trasanction */
805 : Assert(in_streaming);
806 :
807 602 : OutputPluginPrepareWrite(ctx, true);
808 602 : logicalrep_write_stream_stop(ctx->out);
809 602 : OutputPluginWrite(ctx, true);
810 :
811 : /* we've stopped streaming a transaction */
812 602 : in_streaming = false;
813 602 : }
814 :
815 : /*
816 : * Notify downstream to discard the streamed transaction (along with all
817 : * it's subtransactions, if it's a toplevel transaction).
818 : */
819 : static void
820 26 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
821 : ReorderBufferTXN *txn,
822 : XLogRecPtr abort_lsn)
823 : {
824 : ReorderBufferTXN *toptxn;
825 :
826 : /*
827 : * The abort should happen outside streaming block, even for streamed
828 : * transactions. The transaction has to be marked as streamed, though.
829 : */
830 : Assert(!in_streaming);
831 :
832 : /* determine the toplevel transaction */
833 26 : toptxn = (txn->toptxn) ? txn->toptxn : txn;
834 :
835 : Assert(rbtxn_is_streamed(toptxn));
836 :
837 26 : OutputPluginPrepareWrite(ctx, true);
838 26 : logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
839 26 : OutputPluginWrite(ctx, true);
840 :
841 26 : cleanup_rel_sync_cache(toptxn->xid, false);
842 26 : }
843 :
844 : /*
845 : * Notify downstream to apply the streamed transaction (along with all
846 : * it's subtransactions).
847 : */
848 : static void
849 20 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
850 : ReorderBufferTXN *txn,
851 : XLogRecPtr commit_lsn)
852 : {
853 : /*
854 : * The commit should happen outside streaming block, even for streamed
855 : * transactions. The transaction has to be marked as streamed, though.
856 : */
857 : Assert(!in_streaming);
858 : Assert(rbtxn_is_streamed(txn));
859 :
860 20 : OutputPluginUpdateProgress(ctx);
861 :
862 20 : OutputPluginPrepareWrite(ctx, true);
863 20 : logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
864 20 : OutputPluginWrite(ctx, true);
865 :
866 20 : cleanup_rel_sync_cache(txn->xid, true);
867 20 : }
868 :
869 : /*
870 : * Initialize the relation schema sync cache for a decoding session.
871 : *
872 : * The hash table is destroyed at the end of a decoding session. While
873 : * relcache invalidations still exist and will still be invoked, they
874 : * will just see the null hash table global and take no action.
875 : */
876 : static void
877 180 : init_rel_sync_cache(MemoryContext cachectx)
878 : {
879 : HASHCTL ctl;
880 :
881 180 : if (RelationSyncCache != NULL)
882 0 : return;
883 :
884 : /* Make a new hash table for the cache */
885 180 : ctl.keysize = sizeof(Oid);
886 180 : ctl.entrysize = sizeof(RelationSyncEntry);
887 180 : ctl.hcxt = cachectx;
888 :
889 180 : RelationSyncCache = hash_create("logical replication output relation cache",
890 : 128, &ctl,
891 : HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
892 :
893 : Assert(RelationSyncCache != NULL);
894 :
895 180 : CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
896 180 : CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
897 : rel_sync_cache_publication_cb,
898 : (Datum) 0);
899 : }
900 :
901 : /*
902 : * We expect relatively small number of streamed transactions.
903 : */
904 : static bool
905 112748 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
906 : {
907 : ListCell *lc;
908 :
909 132744 : foreach(lc, entry->streamed_txns)
910 : {
911 132706 : if (xid == (uint32) lfirst_int(lc))
912 112710 : return true;
913 : }
914 :
915 38 : return false;
916 : }
917 :
918 : /*
919 : * Add the xid in the rel sync entry for which we have already sent the schema
920 : * of the relation.
921 : */
922 : static void
923 38 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
924 : {
925 : MemoryContext oldctx;
926 :
927 38 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
928 :
929 38 : entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
930 :
931 38 : MemoryContextSwitchTo(oldctx);
932 38 : }
933 :
934 : /*
935 : * Find or create entry in the relation schema cache.
936 : *
937 : * This looks up publications that the given relation is directly or
938 : * indirectly part of (the latter if it's really the relation's ancestor that
939 : * is part of a publication) and fills up the found entry with the information
940 : * about which operations to publish and whether to use an ancestor's schema
941 : * when publishing.
942 : */
943 : static RelationSyncEntry *
944 116422 : get_rel_sync_entry(PGOutputData *data, Oid relid)
945 : {
946 : RelationSyncEntry *entry;
947 116422 : bool am_partition = get_rel_relispartition(relid);
948 116422 : char relkind = get_rel_relkind(relid);
949 : bool found;
950 : MemoryContext oldctx;
951 :
952 : Assert(RelationSyncCache != NULL);
953 :
954 : /* Find cached relation info, creating if not found */
955 116422 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
956 : (void *) &relid,
957 : HASH_ENTER, &found);
958 : Assert(entry != NULL);
959 :
960 : /* Not found means schema wasn't sent */
961 116422 : if (!found)
962 : {
963 : /* immediately make a new entry valid enough to satisfy callbacks */
964 118 : entry->schema_sent = false;
965 118 : entry->streamed_txns = NIL;
966 118 : entry->replicate_valid = false;
967 118 : entry->pubactions.pubinsert = entry->pubactions.pubupdate =
968 118 : entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
969 118 : entry->publish_as_relid = InvalidOid;
970 : }
971 :
972 : /* Validate the entry */
973 116422 : if (!entry->replicate_valid)
974 : {
975 128 : List *pubids = GetRelationPublications(relid);
976 : ListCell *lc;
977 128 : Oid publish_as_relid = relid;
978 :
979 : /* Reload publications if needed before use. */
980 128 : if (!publications_valid)
981 : {
982 56 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
983 56 : if (data->publications)
984 8 : list_free_deep(data->publications);
985 :
986 56 : data->publications = LoadPublications(data->publication_names);
987 56 : MemoryContextSwitchTo(oldctx);
988 56 : publications_valid = true;
989 : }
990 :
991 : /*
992 : * Build publication cache. We can't use one provided by relcache as
993 : * relcache considers all publications given relation is in, but here
994 : * we only need to consider ones that the subscriber requested.
995 : */
996 166 : foreach(lc, data->publications)
997 : {
998 136 : Publication *pub = lfirst(lc);
999 136 : bool publish = false;
1000 :
1001 136 : if (pub->alltables)
1002 : {
1003 60 : publish = true;
1004 60 : if (pub->pubviaroot && am_partition)
1005 0 : publish_as_relid = llast_oid(get_partition_ancestors(relid));
1006 : }
1007 :
1008 136 : if (!publish)
1009 : {
1010 76 : bool ancestor_published = false;
1011 :
1012 : /*
1013 : * For a partition, check if any of the ancestors are
1014 : * published. If so, note down the topmost ancestor that is
1015 : * published via this publication, which will be used as the
1016 : * relation via which to publish the partition's changes.
1017 : */
1018 76 : if (am_partition)
1019 : {
1020 6 : List *ancestors = get_partition_ancestors(relid);
1021 : ListCell *lc2;
1022 :
1023 : /*
1024 : * Find the "topmost" ancestor that is in this
1025 : * publication.
1026 : */
1027 12 : foreach(lc2, ancestors)
1028 : {
1029 6 : Oid ancestor = lfirst_oid(lc2);
1030 :
1031 6 : if (list_member_oid(GetRelationPublications(ancestor),
1032 : pub->oid))
1033 : {
1034 6 : ancestor_published = true;
1035 6 : if (pub->pubviaroot)
1036 0 : publish_as_relid = ancestor;
1037 : }
1038 : }
1039 : }
1040 :
1041 76 : if (list_member_oid(pubids, pub->oid) || ancestor_published)
1042 54 : publish = true;
1043 : }
1044 :
1045 : /*
1046 : * Don't publish changes for partitioned tables, because
1047 : * publishing those of its partitions suffices, unless partition
1048 : * changes won't be published due to pubviaroot being set.
1049 : */
1050 136 : if (publish &&
1051 2 : (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
1052 : {
1053 112 : entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
1054 112 : entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
1055 112 : entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
1056 112 : entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
1057 : }
1058 :
1059 136 : if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
1060 98 : entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
1061 98 : break;
1062 : }
1063 :
1064 128 : list_free(pubids);
1065 :
1066 128 : entry->publish_as_relid = publish_as_relid;
1067 128 : entry->replicate_valid = true;
1068 : }
1069 :
1070 116422 : return entry;
1071 : }
1072 :
1073 : /*
1074 : * Cleanup list of streamed transactions and update the schema_sent flag.
1075 : *
1076 : * When a streamed transaction commits or aborts, we need to remove the
1077 : * toplevel XID from the schema cache. If the transaction aborted, the
1078 : * subscriber will simply throw away the schema records we streamed, so
1079 : * we don't need to do anything else.
1080 : *
1081 : * If the transaction is committed, the subscriber will update the relation
1082 : * cache - so tweak the schema_sent flag accordingly.
1083 : */
1084 : static void
1085 46 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
1086 : {
1087 : HASH_SEQ_STATUS hash_seq;
1088 : RelationSyncEntry *entry;
1089 : ListCell *lc;
1090 :
1091 : Assert(RelationSyncCache != NULL);
1092 :
1093 46 : hash_seq_init(&hash_seq, RelationSyncCache);
1094 92 : while ((entry = hash_seq_search(&hash_seq)) != NULL)
1095 : {
1096 : /*
1097 : * We can set the schema_sent flag for an entry that has committed xid
1098 : * in the list as that ensures that the subscriber would have the
1099 : * corresponding schema and we don't need to send it unless there is
1100 : * any invalidation for that relation.
1101 : */
1102 48 : foreach(lc, entry->streamed_txns)
1103 : {
1104 22 : if (xid == (uint32) lfirst_int(lc))
1105 : {
1106 20 : if (is_commit)
1107 12 : entry->schema_sent = true;
1108 :
1109 20 : entry->streamed_txns =
1110 20 : foreach_delete_current(entry->streamed_txns, lc);
1111 20 : break;
1112 : }
1113 : }
1114 : }
1115 46 : }
1116 :
1117 : /*
1118 : * Relcache invalidation callback
1119 : */
1120 : static void
1121 1108 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
1122 : {
1123 : RelationSyncEntry *entry;
1124 :
1125 : /*
1126 : * We can get here if the plugin was used in SQL interface as the
1127 : * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1128 : * is no way to unregister the relcache invalidation callback.
1129 : */
1130 1108 : if (RelationSyncCache == NULL)
1131 0 : return;
1132 :
1133 : /*
1134 : * Nobody keeps pointers to entries in this hash table around outside
1135 : * logical decoding callback calls - but invalidation events can come in
1136 : * *during* a callback if we access the relcache in the callback. Because
1137 : * of that we must mark the cache entry as invalid but not remove it from
1138 : * the hash while it could still be referenced, then prune it at a later
1139 : * safe point.
1140 : *
1141 : * Getting invalidations for relations that aren't in the table is
1142 : * entirely normal, since there's no way to unregister for an invalidation
1143 : * event. So we don't care if it's found or not.
1144 : */
1145 1108 : entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
1146 : HASH_FIND, NULL);
1147 :
1148 : /*
1149 : * Reset schema sent status as the relation definition may have changed.
1150 : */
1151 1108 : if (entry != NULL)
1152 : {
1153 262 : entry->schema_sent = false;
1154 262 : list_free(entry->streamed_txns);
1155 262 : entry->streamed_txns = NIL;
1156 : }
1157 : }
1158 :
1159 : /*
1160 : * Publication relation map syscache invalidation callback
1161 : */
1162 : static void
1163 156 : rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
1164 : {
1165 : HASH_SEQ_STATUS status;
1166 : RelationSyncEntry *entry;
1167 :
1168 : /*
1169 : * We can get here if the plugin was used in SQL interface as the
1170 : * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1171 : * is no way to unregister the relcache invalidation callback.
1172 : */
1173 156 : if (RelationSyncCache == NULL)
1174 0 : return;
1175 :
1176 : /*
1177 : * There is no way to find which entry in our cache the hash belongs to so
1178 : * mark the whole cache as invalid.
1179 : */
1180 156 : hash_seq_init(&status, RelationSyncCache);
1181 792 : while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
1182 : {
1183 636 : entry->replicate_valid = false;
1184 :
1185 : /*
1186 : * There might be some relations dropped from the publication so we
1187 : * don't need to publish the changes for them.
1188 : */
1189 636 : entry->pubactions.pubinsert = false;
1190 636 : entry->pubactions.pubupdate = false;
1191 636 : entry->pubactions.pubdelete = false;
1192 636 : entry->pubactions.pubtruncate = false;
1193 : }
1194 : }
|