Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * test_decoding.c
4 : * example logical decoding output plugin
5 : *
6 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/test_decoding/test_decoding.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "catalog/pg_type.h"
16 :
17 : #include "replication/logical.h"
18 : #include "replication/origin.h"
19 :
20 : #include "utils/builtins.h"
21 : #include "utils/lsyscache.h"
22 : #include "utils/memutils.h"
23 : #include "utils/rel.h"
24 :
25 192 : PG_MODULE_MAGIC;
26 :
27 : typedef struct
28 : {
29 : MemoryContext context;
30 : bool include_xids;
31 : bool include_timestamp;
32 : bool skip_empty_xacts;
33 : bool only_local;
34 : } TestDecodingData;
35 :
36 : /*
37 : * Maintain the per-transaction level variables to track whether the
38 : * transaction and or streams have written any changes. In streaming mode the
39 : * transaction can be decoded in streams so along with maintaining whether the
40 : * transaction has written any changes, we also need to track whether the
41 : * current stream has written any changes. This is required so that if user
42 : * has requested to skip the empty transactions we can skip the empty streams
43 : * even though the transaction has written some changes.
44 : */
45 : typedef struct
46 : {
47 : bool xact_wrote_changes;
48 : bool stream_wrote_changes;
49 : } TestDecodingTxnData;
50 :
51 : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
52 : bool is_init);
53 : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
54 : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
55 : ReorderBufferTXN *txn);
56 : static void pg_output_begin(LogicalDecodingContext *ctx,
57 : TestDecodingData *data,
58 : ReorderBufferTXN *txn,
59 : bool last_write);
60 : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
61 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
62 : static void pg_decode_change(LogicalDecodingContext *ctx,
63 : ReorderBufferTXN *txn, Relation relation,
64 : ReorderBufferChange *change);
65 : static void pg_decode_truncate(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn,
67 : int nrelations, Relation relations[],
68 : ReorderBufferChange *change);
69 : static bool pg_decode_filter(LogicalDecodingContext *ctx,
70 : RepOriginId origin_id);
71 : static void pg_decode_message(LogicalDecodingContext *ctx,
72 : ReorderBufferTXN *txn, XLogRecPtr lsn,
73 : bool transactional, const char *prefix,
74 : Size sz, const char *message);
75 : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
76 : TransactionId xid,
77 : const char *gid);
78 : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
79 : ReorderBufferTXN *txn);
80 : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
81 : ReorderBufferTXN *txn,
82 : XLogRecPtr prepare_lsn);
83 : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
84 : ReorderBufferTXN *txn,
85 : XLogRecPtr commit_lsn);
86 : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
87 : ReorderBufferTXN *txn,
88 : XLogRecPtr prepare_end_lsn,
89 : TimestampTz prepare_time);
90 : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
91 : ReorderBufferTXN *txn);
92 : static void pg_output_stream_start(LogicalDecodingContext *ctx,
93 : TestDecodingData *data,
94 : ReorderBufferTXN *txn,
95 : bool last_write);
96 : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
97 : ReorderBufferTXN *txn);
98 : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
99 : ReorderBufferTXN *txn,
100 : XLogRecPtr abort_lsn);
101 : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
102 : ReorderBufferTXN *txn,
103 : XLogRecPtr prepare_lsn);
104 : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
105 : ReorderBufferTXN *txn,
106 : XLogRecPtr commit_lsn);
107 : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
108 : ReorderBufferTXN *txn,
109 : Relation relation,
110 : ReorderBufferChange *change);
111 : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
112 : ReorderBufferTXN *txn, XLogRecPtr lsn,
113 : bool transactional, const char *prefix,
114 : Size sz, const char *message);
115 : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
116 : ReorderBufferTXN *txn,
117 : int nrelations, Relation relations[],
118 : ReorderBufferChange *change);
119 :
120 : void
121 192 : _PG_init(void)
122 : {
123 : /* other plugins can perform things here */
124 192 : }
125 :
126 : /* specify output plugin callbacks */
127 : void
128 616 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
129 : {
130 616 : cb->startup_cb = pg_decode_startup;
131 616 : cb->begin_cb = pg_decode_begin_txn;
132 616 : cb->change_cb = pg_decode_change;
133 616 : cb->truncate_cb = pg_decode_truncate;
134 616 : cb->commit_cb = pg_decode_commit_txn;
135 616 : cb->filter_by_origin_cb = pg_decode_filter;
136 616 : cb->shutdown_cb = pg_decode_shutdown;
137 616 : cb->message_cb = pg_decode_message;
138 616 : cb->filter_prepare_cb = pg_decode_filter_prepare;
139 616 : cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
140 616 : cb->prepare_cb = pg_decode_prepare_txn;
141 616 : cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
142 616 : cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
143 616 : cb->stream_start_cb = pg_decode_stream_start;
144 616 : cb->stream_stop_cb = pg_decode_stream_stop;
145 616 : cb->stream_abort_cb = pg_decode_stream_abort;
146 616 : cb->stream_prepare_cb = pg_decode_stream_prepare;
147 616 : cb->stream_commit_cb = pg_decode_stream_commit;
148 616 : cb->stream_change_cb = pg_decode_stream_change;
149 616 : cb->stream_message_cb = pg_decode_stream_message;
150 616 : cb->stream_truncate_cb = pg_decode_stream_truncate;
151 616 : }
152 :
153 :
154 : /* initialize this plugin */
155 : static void
156 616 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
157 : bool is_init)
158 : {
159 : ListCell *option;
160 : TestDecodingData *data;
161 616 : bool enable_streaming = false;
162 :
163 616 : data = palloc0(sizeof(TestDecodingData));
164 616 : data->context = AllocSetContextCreate(ctx->context,
165 : "text conversion context",
166 : ALLOCSET_DEFAULT_SIZES);
167 616 : data->include_xids = true;
168 616 : data->include_timestamp = false;
169 616 : data->skip_empty_xacts = false;
170 616 : data->only_local = false;
171 :
172 616 : ctx->output_plugin_private = data;
173 :
174 616 : opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
175 616 : opt->receive_rewrites = false;
176 :
177 1292 : foreach(option, ctx->output_plugin_options)
178 : {
179 682 : DefElem *elem = lfirst(option);
180 :
181 : Assert(elem->arg == NULL || IsA(elem->arg, String));
182 :
183 682 : if (strcmp(elem->defname, "include-xids") == 0)
184 : {
185 : /* if option does not provide a value, it means its value is true */
186 324 : if (elem->arg == NULL)
187 0 : data->include_xids = true;
188 324 : else if (!parse_bool(strVal(elem->arg), &data->include_xids))
189 4 : ereport(ERROR,
190 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
192 : strVal(elem->arg), elem->defname)));
193 : }
194 358 : else if (strcmp(elem->defname, "include-timestamp") == 0)
195 : {
196 2 : if (elem->arg == NULL)
197 0 : data->include_timestamp = true;
198 2 : else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
199 0 : ereport(ERROR,
200 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
201 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
202 : strVal(elem->arg), elem->defname)));
203 : }
204 356 : else if (strcmp(elem->defname, "force-binary") == 0)
205 : {
206 : bool force_binary;
207 :
208 12 : if (elem->arg == NULL)
209 0 : continue;
210 12 : else if (!parse_bool(strVal(elem->arg), &force_binary))
211 0 : ereport(ERROR,
212 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
213 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
214 : strVal(elem->arg), elem->defname)));
215 :
216 12 : if (force_binary)
217 4 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
218 : }
219 344 : else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
220 : {
221 :
222 318 : if (elem->arg == NULL)
223 0 : data->skip_empty_xacts = true;
224 318 : else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
225 0 : ereport(ERROR,
226 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
227 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
228 : strVal(elem->arg), elem->defname)));
229 : }
230 26 : else if (strcmp(elem->defname, "only-local") == 0)
231 : {
232 :
233 6 : if (elem->arg == NULL)
234 0 : data->only_local = true;
235 6 : else if (!parse_bool(strVal(elem->arg), &data->only_local))
236 0 : ereport(ERROR,
237 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
238 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
239 : strVal(elem->arg), elem->defname)));
240 : }
241 20 : else if (strcmp(elem->defname, "include-rewrites") == 0)
242 : {
243 :
244 2 : if (elem->arg == NULL)
245 0 : continue;
246 2 : else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
247 0 : ereport(ERROR,
248 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
249 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
250 : strVal(elem->arg), elem->defname)));
251 : }
252 18 : else if (strcmp(elem->defname, "stream-changes") == 0)
253 : {
254 16 : if (elem->arg == NULL)
255 0 : continue;
256 16 : else if (!parse_bool(strVal(elem->arg), &enable_streaming))
257 0 : ereport(ERROR,
258 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
259 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
260 : strVal(elem->arg), elem->defname)));
261 : }
262 : else
263 : {
264 2 : ereport(ERROR,
265 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
266 : errmsg("option \"%s\" = \"%s\" is unknown",
267 : elem->defname,
268 : elem->arg ? strVal(elem->arg) : "(null)")));
269 : }
270 : }
271 :
272 610 : ctx->streaming &= enable_streaming;
273 610 : }
274 :
275 : /* cleanup this plugin's resources */
276 : static void
277 590 : pg_decode_shutdown(LogicalDecodingContext *ctx)
278 : {
279 590 : TestDecodingData *data = ctx->output_plugin_private;
280 :
281 : /* cleanup our own resources via memory context reset */
282 590 : MemoryContextDelete(data->context);
283 590 : }
284 :
285 : /* BEGIN callback */
286 : static void
287 844 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
288 : {
289 844 : TestDecodingData *data = ctx->output_plugin_private;
290 : TestDecodingTxnData *txndata =
291 844 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
292 :
293 844 : txndata->xact_wrote_changes = false;
294 844 : txn->output_plugin_private = txndata;
295 :
296 : /*
297 : * If asked to skip empty transactions, we'll emit BEGIN at the point
298 : * where the first operation is received for this transaction.
299 : */
300 844 : if (data->skip_empty_xacts)
301 772 : return;
302 :
303 72 : pg_output_begin(ctx, data, txn, true);
304 : }
305 :
306 : static void
307 516 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
308 : {
309 516 : OutputPluginPrepareWrite(ctx, last_write);
310 516 : if (data->include_xids)
311 64 : appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
312 : else
313 452 : appendStringInfoString(ctx->out, "BEGIN");
314 516 : OutputPluginWrite(ctx, last_write);
315 516 : }
316 :
317 : /* COMMIT callback */
318 : static void
319 844 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
320 : XLogRecPtr commit_lsn)
321 : {
322 844 : TestDecodingData *data = ctx->output_plugin_private;
323 844 : TestDecodingTxnData *txndata = txn->output_plugin_private;
324 844 : bool xact_wrote_changes = txndata->xact_wrote_changes;
325 :
326 844 : pfree(txndata);
327 844 : txn->output_plugin_private = NULL;
328 :
329 844 : if (data->skip_empty_xacts && !xact_wrote_changes)
330 342 : return;
331 :
332 502 : OutputPluginPrepareWrite(ctx, true);
333 502 : if (data->include_xids)
334 62 : appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
335 : else
336 440 : appendStringInfoString(ctx->out, "COMMIT");
337 :
338 502 : if (data->include_timestamp)
339 2 : appendStringInfo(ctx->out, " (at %s)",
340 : timestamptz_to_str(txn->xact_time.commit_time));
341 :
342 502 : OutputPluginWrite(ctx, true);
343 : }
344 :
345 : /* BEGIN PREPARE callback */
346 : static void
347 14 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
348 : {
349 14 : TestDecodingData *data = ctx->output_plugin_private;
350 : TestDecodingTxnData *txndata =
351 14 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
352 :
353 14 : txndata->xact_wrote_changes = false;
354 14 : txn->output_plugin_private = txndata;
355 :
356 : /*
357 : * If asked to skip empty transactions, we'll emit BEGIN at the point
358 : * where the first operation is received for this transaction.
359 : */
360 14 : if (data->skip_empty_xacts)
361 12 : return;
362 :
363 2 : pg_output_begin(ctx, data, txn, true);
364 : }
365 :
366 : /* PREPARE callback */
367 : static void
368 14 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
369 : XLogRecPtr prepare_lsn)
370 : {
371 14 : TestDecodingData *data = ctx->output_plugin_private;
372 14 : TestDecodingTxnData *txndata = txn->output_plugin_private;
373 :
374 : /*
375 : * If asked to skip empty transactions, we'll emit PREPARE at the point
376 : * where the first operation is received for this transaction.
377 : */
378 14 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
379 0 : return;
380 :
381 14 : OutputPluginPrepareWrite(ctx, true);
382 :
383 14 : appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
384 14 : quote_literal_cstr(txn->gid));
385 :
386 14 : if (data->include_xids)
387 2 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
388 :
389 14 : if (data->include_timestamp)
390 0 : appendStringInfo(ctx->out, " (at %s)",
391 : timestamptz_to_str(txn->xact_time.prepare_time));
392 :
393 14 : OutputPluginWrite(ctx, true);
394 : }
395 :
396 : /* COMMIT PREPARED callback */
397 : static void
398 14 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
399 : XLogRecPtr commit_lsn)
400 : {
401 14 : TestDecodingData *data = ctx->output_plugin_private;
402 :
403 14 : OutputPluginPrepareWrite(ctx, true);
404 :
405 14 : appendStringInfo(ctx->out, "COMMIT PREPARED %s",
406 14 : quote_literal_cstr(txn->gid));
407 :
408 14 : if (data->include_xids)
409 2 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
410 :
411 14 : if (data->include_timestamp)
412 0 : appendStringInfo(ctx->out, " (at %s)",
413 : timestamptz_to_str(txn->xact_time.commit_time));
414 :
415 14 : OutputPluginWrite(ctx, true);
416 14 : }
417 :
418 : /* ROLLBACK PREPARED callback */
419 : static void
420 2 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
421 : ReorderBufferTXN *txn,
422 : XLogRecPtr prepare_end_lsn,
423 : TimestampTz prepare_time)
424 : {
425 2 : TestDecodingData *data = ctx->output_plugin_private;
426 :
427 2 : OutputPluginPrepareWrite(ctx, true);
428 :
429 2 : appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
430 2 : quote_literal_cstr(txn->gid));
431 :
432 2 : if (data->include_xids)
433 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
434 :
435 2 : if (data->include_timestamp)
436 0 : appendStringInfo(ctx->out, " (at %s)",
437 : timestamptz_to_str(txn->xact_time.commit_time));
438 :
439 2 : OutputPluginWrite(ctx, true);
440 2 : }
441 :
442 : /*
443 : * Filter out two-phase transactions.
444 : *
445 : * Each plugin can implement its own filtering logic. Here we demonstrate a
446 : * simple logic by checking the GID. If the GID contains the "_nodecode"
447 : * substring, then we filter it out.
448 : */
449 : static bool
450 234 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
451 : const char *gid)
452 : {
453 234 : if (strstr(gid, "_nodecode") != NULL)
454 16 : return true;
455 :
456 218 : return false;
457 : }
458 :
459 : static bool
460 2069800 : pg_decode_filter(LogicalDecodingContext *ctx,
461 : RepOriginId origin_id)
462 : {
463 2069800 : TestDecodingData *data = ctx->output_plugin_private;
464 :
465 2069800 : if (data->only_local && origin_id != InvalidRepOriginId)
466 18 : return true;
467 2069782 : return false;
468 : }
469 :
470 : /*
471 : * Print literal `outputstr' already represented as string of type `typid'
472 : * into stringbuf `s'.
473 : *
474 : * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
475 : * if standard_conforming_strings were enabled.
476 : */
477 : static void
478 351822 : print_literal(StringInfo s, Oid typid, char *outputstr)
479 : {
480 : const char *valptr;
481 :
482 351822 : switch (typid)
483 : {
484 120436 : case INT2OID:
485 : case INT4OID:
486 : case INT8OID:
487 : case OIDOID:
488 : case FLOAT4OID:
489 : case FLOAT8OID:
490 : case NUMERICOID:
491 : /* NB: We don't care about Inf, NaN et al. */
492 120436 : appendStringInfoString(s, outputstr);
493 120436 : break;
494 :
495 0 : case BITOID:
496 : case VARBITOID:
497 0 : appendStringInfo(s, "B'%s'", outputstr);
498 0 : break;
499 :
500 0 : case BOOLOID:
501 0 : if (strcmp(outputstr, "t") == 0)
502 0 : appendStringInfoString(s, "true");
503 : else
504 0 : appendStringInfoString(s, "false");
505 0 : break;
506 :
507 231386 : default:
508 231386 : appendStringInfoChar(s, '\'');
509 10854334 : for (valptr = outputstr; *valptr; valptr++)
510 : {
511 10622948 : char ch = *valptr;
512 :
513 10622948 : if (SQL_STR_DOUBLE(ch, false))
514 128 : appendStringInfoChar(s, ch);
515 10622948 : appendStringInfoChar(s, ch);
516 : }
517 231386 : appendStringInfoChar(s, '\'');
518 231386 : break;
519 : }
520 351822 : }
521 :
522 : /* print the tuple 'tuple' into the StringInfo s */
523 : static void
524 291050 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
525 : {
526 : int natt;
527 :
528 : /* print all columns individually */
529 694386 : for (natt = 0; natt < tupdesc->natts; natt++)
530 : {
531 : Form_pg_attribute attr; /* the attribute itself */
532 : Oid typid; /* type of current attribute */
533 : Oid typoutput; /* output function */
534 : bool typisvarlena;
535 : Datum origval; /* possibly toasted Datum */
536 : bool isnull; /* column is null? */
537 :
538 403336 : attr = TupleDescAttr(tupdesc, natt);
539 :
540 : /*
541 : * don't print dropped columns, we can't be sure everything is
542 : * available for them
543 : */
544 403336 : if (attr->attisdropped)
545 10260 : continue;
546 :
547 : /*
548 : * Don't print system columns, oid will already have been printed if
549 : * present.
550 : */
551 403192 : if (attr->attnum < 0)
552 0 : continue;
553 :
554 403192 : typid = attr->atttypid;
555 :
556 : /* get Datum from tuple */
557 403192 : origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
558 :
559 403192 : if (isnull && skip_nulls)
560 10116 : continue;
561 :
562 : /* print attribute name */
563 393076 : appendStringInfoChar(s, ' ');
564 393076 : appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
565 :
566 : /* print attribute type */
567 393076 : appendStringInfoChar(s, '[');
568 393076 : appendStringInfoString(s, format_type_be(typid));
569 393076 : appendStringInfoChar(s, ']');
570 :
571 : /* query output function */
572 393076 : getTypeOutputInfo(typid,
573 : &typoutput, &typisvarlena);
574 :
575 : /* print separator */
576 393076 : appendStringInfoChar(s, ':');
577 :
578 : /* print data */
579 393076 : if (isnull)
580 41230 : appendStringInfoString(s, "null");
581 351846 : else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
582 24 : appendStringInfoString(s, "unchanged-toast-datum");
583 351822 : else if (!typisvarlena)
584 120444 : print_literal(s, typid,
585 : OidOutputFunctionCall(typoutput, origval));
586 : else
587 : {
588 : Datum val; /* definitely detoasted Datum */
589 :
590 231378 : val = PointerGetDatum(PG_DETOAST_DATUM(origval));
591 231378 : print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
592 : }
593 : }
594 291050 : }
595 :
596 : /*
597 : * callback for individual changed tuples
598 : */
599 : static void
600 301028 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
601 : Relation relation, ReorderBufferChange *change)
602 : {
603 : TestDecodingData *data;
604 : TestDecodingTxnData *txndata;
605 : Form_pg_class class_form;
606 : TupleDesc tupdesc;
607 : MemoryContext old;
608 :
609 301028 : data = ctx->output_plugin_private;
610 301028 : txndata = txn->output_plugin_private;
611 :
612 : /* output BEGIN if we haven't yet */
613 301028 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
614 : {
615 424 : pg_output_begin(ctx, data, txn, false);
616 : }
617 301028 : txndata->xact_wrote_changes = true;
618 :
619 301028 : class_form = RelationGetForm(relation);
620 301028 : tupdesc = RelationGetDescr(relation);
621 :
622 : /* Avoid leaking memory by using and resetting our own context */
623 301028 : old = MemoryContextSwitchTo(data->context);
624 :
625 301028 : OutputPluginPrepareWrite(ctx, true);
626 :
627 301028 : appendStringInfoString(ctx->out, "table ");
628 301028 : appendStringInfoString(ctx->out,
629 301028 : quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
630 301028 : class_form->relrewrite ?
631 2 : get_rel_name(class_form->relrewrite) :
632 : NameStr(class_form->relname)));
633 301028 : appendStringInfoChar(ctx->out, ':');
634 :
635 301028 : switch (change->action)
636 : {
637 265898 : case REORDER_BUFFER_CHANGE_INSERT:
638 265898 : appendStringInfoString(ctx->out, " INSERT:");
639 265898 : if (change->data.tp.newtuple == NULL)
640 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
641 : else
642 265898 : tuple_to_stringinfo(ctx->out, tupdesc,
643 265898 : &change->data.tp.newtuple->tuple,
644 : false);
645 265898 : break;
646 15086 : case REORDER_BUFFER_CHANGE_UPDATE:
647 15086 : appendStringInfoString(ctx->out, " UPDATE:");
648 15086 : if (change->data.tp.oldtuple != NULL)
649 : {
650 36 : appendStringInfoString(ctx->out, " old-key:");
651 36 : tuple_to_stringinfo(ctx->out, tupdesc,
652 36 : &change->data.tp.oldtuple->tuple,
653 : true);
654 36 : appendStringInfoString(ctx->out, " new-tuple:");
655 : }
656 :
657 15086 : if (change->data.tp.newtuple == NULL)
658 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
659 : else
660 15086 : tuple_to_stringinfo(ctx->out, tupdesc,
661 15086 : &change->data.tp.newtuple->tuple,
662 : false);
663 15086 : break;
664 20044 : case REORDER_BUFFER_CHANGE_DELETE:
665 20044 : appendStringInfoString(ctx->out, " DELETE:");
666 :
667 : /* if there was no PK, we only know that a delete happened */
668 20044 : if (change->data.tp.oldtuple == NULL)
669 10014 : appendStringInfoString(ctx->out, " (no-tuple-data)");
670 : /* In DELETE, only the replica identity is present; display that */
671 : else
672 10030 : tuple_to_stringinfo(ctx->out, tupdesc,
673 10030 : &change->data.tp.oldtuple->tuple,
674 : true);
675 20044 : break;
676 301028 : default:
677 : Assert(false);
678 : }
679 :
680 301028 : MemoryContextSwitchTo(old);
681 301028 : MemoryContextReset(data->context);
682 :
683 301028 : OutputPluginWrite(ctx, true);
684 301028 : }
685 :
686 : static void
687 12 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
688 : int nrelations, Relation relations[], ReorderBufferChange *change)
689 : {
690 : TestDecodingData *data;
691 : TestDecodingTxnData *txndata;
692 : MemoryContext old;
693 : int i;
694 :
695 12 : data = ctx->output_plugin_private;
696 12 : txndata = txn->output_plugin_private;
697 :
698 : /* output BEGIN if we haven't yet */
699 12 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
700 : {
701 12 : pg_output_begin(ctx, data, txn, false);
702 : }
703 12 : txndata->xact_wrote_changes = true;
704 :
705 : /* Avoid leaking memory by using and resetting our own context */
706 12 : old = MemoryContextSwitchTo(data->context);
707 :
708 12 : OutputPluginPrepareWrite(ctx, true);
709 :
710 12 : appendStringInfoString(ctx->out, "table ");
711 :
712 26 : for (i = 0; i < nrelations; i++)
713 : {
714 14 : if (i > 0)
715 2 : appendStringInfoString(ctx->out, ", ");
716 :
717 14 : appendStringInfoString(ctx->out,
718 14 : quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
719 14 : NameStr(relations[i]->rd_rel->relname)));
720 : }
721 :
722 12 : appendStringInfoString(ctx->out, ": TRUNCATE:");
723 :
724 12 : if (change->data.truncate.restart_seqs
725 10 : || change->data.truncate.cascade)
726 : {
727 2 : if (change->data.truncate.restart_seqs)
728 2 : appendStringInfoString(ctx->out, " restart_seqs");
729 2 : if (change->data.truncate.cascade)
730 2 : appendStringInfoString(ctx->out, " cascade");
731 : }
732 : else
733 10 : appendStringInfoString(ctx->out, " (no-flags)");
734 :
735 12 : MemoryContextSwitchTo(old);
736 12 : MemoryContextReset(data->context);
737 :
738 12 : OutputPluginWrite(ctx, true);
739 12 : }
740 :
741 : static void
742 16 : pg_decode_message(LogicalDecodingContext *ctx,
743 : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
744 : const char *prefix, Size sz, const char *message)
745 : {
746 16 : TestDecodingData *data = ctx->output_plugin_private;
747 : TestDecodingTxnData *txndata;
748 :
749 16 : txndata = transactional ? txn->output_plugin_private : NULL;
750 :
751 : /* output BEGIN if we haven't yet for transactional messages */
752 16 : if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
753 6 : pg_output_begin(ctx, data, txn, false);
754 :
755 16 : if (transactional)
756 10 : txndata->xact_wrote_changes = true;
757 :
758 16 : OutputPluginPrepareWrite(ctx, true);
759 16 : appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
760 : transactional, prefix, sz);
761 16 : appendBinaryStringInfo(ctx->out, message, sz);
762 16 : OutputPluginWrite(ctx, true);
763 16 : }
764 :
765 : static void
766 22 : pg_decode_stream_start(LogicalDecodingContext *ctx,
767 : ReorderBufferTXN *txn)
768 : {
769 22 : TestDecodingData *data = ctx->output_plugin_private;
770 22 : TestDecodingTxnData *txndata = txn->output_plugin_private;
771 :
772 : /*
773 : * Allocate the txn plugin data for the first stream in the transaction.
774 : */
775 22 : if (txndata == NULL)
776 : {
777 : txndata =
778 14 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
779 14 : txndata->xact_wrote_changes = false;
780 14 : txn->output_plugin_private = txndata;
781 : }
782 :
783 22 : txndata->stream_wrote_changes = false;
784 22 : if (data->skip_empty_xacts)
785 22 : return;
786 0 : pg_output_stream_start(ctx, data, txn, true);
787 : }
788 :
789 : static void
790 18 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
791 : {
792 18 : OutputPluginPrepareWrite(ctx, last_write);
793 18 : if (data->include_xids)
794 0 : appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
795 : else
796 18 : appendStringInfoString(ctx->out, "opening a streamed block for transaction");
797 18 : OutputPluginWrite(ctx, last_write);
798 18 : }
799 :
800 : static void
801 22 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
802 : ReorderBufferTXN *txn)
803 : {
804 22 : TestDecodingData *data = ctx->output_plugin_private;
805 22 : TestDecodingTxnData *txndata = txn->output_plugin_private;
806 :
807 22 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
808 4 : return;
809 :
810 18 : OutputPluginPrepareWrite(ctx, true);
811 18 : if (data->include_xids)
812 0 : appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
813 : else
814 18 : appendStringInfoString(ctx->out, "closing a streamed block for transaction");
815 18 : OutputPluginWrite(ctx, true);
816 : }
817 :
818 : static void
819 6 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
820 : ReorderBufferTXN *txn,
821 : XLogRecPtr abort_lsn)
822 : {
823 6 : TestDecodingData *data = ctx->output_plugin_private;
824 :
825 : /*
826 : * stream abort can be sent for an individual subtransaction but we
827 : * maintain the output_plugin_private only under the toptxn so if this is
828 : * not the toptxn then fetch the toptxn.
829 : */
830 6 : ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
831 6 : TestDecodingTxnData *txndata = toptxn->output_plugin_private;
832 6 : bool xact_wrote_changes = txndata->xact_wrote_changes;
833 :
834 6 : if (rbtxn_is_toptxn(txn))
835 : {
836 : Assert(txn->output_plugin_private != NULL);
837 0 : pfree(txndata);
838 0 : txn->output_plugin_private = NULL;
839 : }
840 :
841 6 : if (data->skip_empty_xacts && !xact_wrote_changes)
842 0 : return;
843 :
844 6 : OutputPluginPrepareWrite(ctx, true);
845 6 : if (data->include_xids)
846 0 : appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
847 : else
848 6 : appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
849 6 : OutputPluginWrite(ctx, true);
850 : }
851 :
852 : static void
853 2 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
854 : ReorderBufferTXN *txn,
855 : XLogRecPtr prepare_lsn)
856 : {
857 2 : TestDecodingData *data = ctx->output_plugin_private;
858 2 : TestDecodingTxnData *txndata = txn->output_plugin_private;
859 :
860 2 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
861 0 : return;
862 :
863 2 : OutputPluginPrepareWrite(ctx, true);
864 :
865 2 : if (data->include_xids)
866 0 : appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
867 0 : quote_literal_cstr(txn->gid), txn->xid);
868 : else
869 2 : appendStringInfo(ctx->out, "preparing streamed transaction %s",
870 2 : quote_literal_cstr(txn->gid));
871 :
872 2 : if (data->include_timestamp)
873 0 : appendStringInfo(ctx->out, " (at %s)",
874 : timestamptz_to_str(txn->xact_time.prepare_time));
875 :
876 2 : OutputPluginWrite(ctx, true);
877 : }
878 :
879 : static void
880 8 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
881 : ReorderBufferTXN *txn,
882 : XLogRecPtr commit_lsn)
883 : {
884 8 : TestDecodingData *data = ctx->output_plugin_private;
885 8 : TestDecodingTxnData *txndata = txn->output_plugin_private;
886 8 : bool xact_wrote_changes = txndata->xact_wrote_changes;
887 :
888 8 : pfree(txndata);
889 8 : txn->output_plugin_private = NULL;
890 :
891 8 : if (data->skip_empty_xacts && !xact_wrote_changes)
892 0 : return;
893 :
894 8 : OutputPluginPrepareWrite(ctx, true);
895 :
896 8 : if (data->include_xids)
897 0 : appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
898 : else
899 8 : appendStringInfoString(ctx->out, "committing streamed transaction");
900 :
901 8 : if (data->include_timestamp)
902 0 : appendStringInfo(ctx->out, " (at %s)",
903 : timestamptz_to_str(txn->xact_time.commit_time));
904 :
905 8 : OutputPluginWrite(ctx, true);
906 : }
907 :
908 : /*
909 : * In streaming mode, we don't display the changes as the transaction can abort
910 : * at a later point in time. We don't want users to see the changes until the
911 : * transaction is committed.
912 : */
913 : static void
914 126 : pg_decode_stream_change(LogicalDecodingContext *ctx,
915 : ReorderBufferTXN *txn,
916 : Relation relation,
917 : ReorderBufferChange *change)
918 : {
919 126 : TestDecodingData *data = ctx->output_plugin_private;
920 126 : TestDecodingTxnData *txndata = txn->output_plugin_private;
921 :
922 : /* output stream start if we haven't yet */
923 126 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
924 : {
925 12 : pg_output_stream_start(ctx, data, txn, false);
926 : }
927 126 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
928 :
929 126 : OutputPluginPrepareWrite(ctx, true);
930 126 : if (data->include_xids)
931 0 : appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
932 : else
933 126 : appendStringInfoString(ctx->out, "streaming change for transaction");
934 126 : OutputPluginWrite(ctx, true);
935 126 : }
936 :
937 : /*
938 : * In streaming mode, we don't display the contents for transactional messages
939 : * as the transaction can abort at a later point in time. We don't want users to
940 : * see the message contents until the transaction is committed.
941 : */
942 : static void
943 6 : pg_decode_stream_message(LogicalDecodingContext *ctx,
944 : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
945 : const char *prefix, Size sz, const char *message)
946 : {
947 : /* Output stream start if we haven't yet for transactional messages. */
948 6 : if (transactional)
949 : {
950 6 : TestDecodingData *data = ctx->output_plugin_private;
951 6 : TestDecodingTxnData *txndata = txn->output_plugin_private;
952 :
953 6 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
954 : {
955 6 : pg_output_stream_start(ctx, data, txn, false);
956 : }
957 6 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
958 : }
959 :
960 6 : OutputPluginPrepareWrite(ctx, true);
961 :
962 6 : if (transactional)
963 : {
964 6 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
965 : transactional, prefix, sz);
966 : }
967 : else
968 : {
969 0 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
970 : transactional, prefix, sz);
971 0 : appendBinaryStringInfo(ctx->out, message, sz);
972 : }
973 :
974 6 : OutputPluginWrite(ctx, true);
975 6 : }
976 :
977 : /*
978 : * In streaming mode, we don't display the detailed information of Truncate.
979 : * See pg_decode_stream_change.
980 : */
981 : static void
982 0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
983 : int nrelations, Relation relations[],
984 : ReorderBufferChange *change)
985 : {
986 0 : TestDecodingData *data = ctx->output_plugin_private;
987 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
988 :
989 0 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
990 : {
991 0 : pg_output_stream_start(ctx, data, txn, false);
992 : }
993 0 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
994 :
995 0 : OutputPluginPrepareWrite(ctx, true);
996 0 : if (data->include_xids)
997 0 : appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
998 : else
999 0 : appendStringInfoString(ctx->out, "streaming truncate for transaction");
1000 0 : OutputPluginWrite(ctx, true);
1001 0 : }
|