Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * test_decoding.c
4 : * example logical decoding output plugin
5 : *
6 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/test_decoding/test_decoding.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "catalog/pg_type.h"
16 :
17 : #include "replication/logical.h"
18 : #include "replication/origin.h"
19 :
20 : #include "utils/builtins.h"
21 : #include "utils/lsyscache.h"
22 : #include "utils/memutils.h"
23 : #include "utils/rel.h"
24 :
25 222 : PG_MODULE_MAGIC_EXT(
26 : .name = "test_decoding",
27 : .version = PG_VERSION
28 : );
29 :
30 : typedef struct
31 : {
32 : MemoryContext context;
33 : bool include_xids;
34 : bool include_timestamp;
35 : bool skip_empty_xacts;
36 : bool only_local;
37 : } TestDecodingData;
38 :
39 : /*
40 : * Maintain the per-transaction level variables to track whether the
41 : * transaction and or streams have written any changes. In streaming mode the
42 : * transaction can be decoded in streams so along with maintaining whether the
43 : * transaction has written any changes, we also need to track whether the
44 : * current stream has written any changes. This is required so that if user
45 : * has requested to skip the empty transactions we can skip the empty streams
46 : * even though the transaction has written some changes.
47 : */
48 : typedef struct
49 : {
50 : bool xact_wrote_changes;
51 : bool stream_wrote_changes;
52 : } TestDecodingTxnData;
53 :
54 : static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
55 : bool is_init);
56 : static void pg_decode_shutdown(LogicalDecodingContext *ctx);
57 : static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
58 : ReorderBufferTXN *txn);
59 : static void pg_output_begin(LogicalDecodingContext *ctx,
60 : TestDecodingData *data,
61 : ReorderBufferTXN *txn,
62 : bool last_write);
63 : static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
64 : ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
65 : static void pg_decode_change(LogicalDecodingContext *ctx,
66 : ReorderBufferTXN *txn, Relation relation,
67 : ReorderBufferChange *change);
68 : static void pg_decode_truncate(LogicalDecodingContext *ctx,
69 : ReorderBufferTXN *txn,
70 : int nrelations, Relation relations[],
71 : ReorderBufferChange *change);
72 : static bool pg_decode_filter(LogicalDecodingContext *ctx,
73 : RepOriginId origin_id);
74 : static void pg_decode_message(LogicalDecodingContext *ctx,
75 : ReorderBufferTXN *txn, XLogRecPtr lsn,
76 : bool transactional, const char *prefix,
77 : Size sz, const char *message);
78 : static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
79 : TransactionId xid,
80 : const char *gid);
81 : static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
82 : ReorderBufferTXN *txn);
83 : static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
84 : ReorderBufferTXN *txn,
85 : XLogRecPtr prepare_lsn);
86 : static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
87 : ReorderBufferTXN *txn,
88 : XLogRecPtr commit_lsn);
89 : static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
90 : ReorderBufferTXN *txn,
91 : XLogRecPtr prepare_end_lsn,
92 : TimestampTz prepare_time);
93 : static void pg_decode_stream_start(LogicalDecodingContext *ctx,
94 : ReorderBufferTXN *txn);
95 : static void pg_output_stream_start(LogicalDecodingContext *ctx,
96 : TestDecodingData *data,
97 : ReorderBufferTXN *txn,
98 : bool last_write);
99 : static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
100 : ReorderBufferTXN *txn);
101 : static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
102 : ReorderBufferTXN *txn,
103 : XLogRecPtr abort_lsn);
104 : static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
105 : ReorderBufferTXN *txn,
106 : XLogRecPtr prepare_lsn);
107 : static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
108 : ReorderBufferTXN *txn,
109 : XLogRecPtr commit_lsn);
110 : static void pg_decode_stream_change(LogicalDecodingContext *ctx,
111 : ReorderBufferTXN *txn,
112 : Relation relation,
113 : ReorderBufferChange *change);
114 : static void pg_decode_stream_message(LogicalDecodingContext *ctx,
115 : ReorderBufferTXN *txn, XLogRecPtr lsn,
116 : bool transactional, const char *prefix,
117 : Size sz, const char *message);
118 : static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
119 : ReorderBufferTXN *txn,
120 : int nrelations, Relation relations[],
121 : ReorderBufferChange *change);
122 :
123 : void
124 222 : _PG_init(void)
125 : {
126 : /* other plugins can perform things here */
127 222 : }
128 :
129 : /* specify output plugin callbacks */
130 : void
131 676 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
132 : {
133 676 : cb->startup_cb = pg_decode_startup;
134 676 : cb->begin_cb = pg_decode_begin_txn;
135 676 : cb->change_cb = pg_decode_change;
136 676 : cb->truncate_cb = pg_decode_truncate;
137 676 : cb->commit_cb = pg_decode_commit_txn;
138 676 : cb->filter_by_origin_cb = pg_decode_filter;
139 676 : cb->shutdown_cb = pg_decode_shutdown;
140 676 : cb->message_cb = pg_decode_message;
141 676 : cb->filter_prepare_cb = pg_decode_filter_prepare;
142 676 : cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
143 676 : cb->prepare_cb = pg_decode_prepare_txn;
144 676 : cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
145 676 : cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
146 676 : cb->stream_start_cb = pg_decode_stream_start;
147 676 : cb->stream_stop_cb = pg_decode_stream_stop;
148 676 : cb->stream_abort_cb = pg_decode_stream_abort;
149 676 : cb->stream_prepare_cb = pg_decode_stream_prepare;
150 676 : cb->stream_commit_cb = pg_decode_stream_commit;
151 676 : cb->stream_change_cb = pg_decode_stream_change;
152 676 : cb->stream_message_cb = pg_decode_stream_message;
153 676 : cb->stream_truncate_cb = pg_decode_stream_truncate;
154 676 : }
155 :
156 :
157 : /* initialize this plugin */
158 : static void
159 676 : pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
160 : bool is_init)
161 : {
162 : ListCell *option;
163 : TestDecodingData *data;
164 676 : bool enable_streaming = false;
165 :
166 676 : data = palloc0(sizeof(TestDecodingData));
167 676 : data->context = AllocSetContextCreate(ctx->context,
168 : "text conversion context",
169 : ALLOCSET_DEFAULT_SIZES);
170 676 : data->include_xids = true;
171 676 : data->include_timestamp = false;
172 676 : data->skip_empty_xacts = false;
173 676 : data->only_local = false;
174 :
175 676 : ctx->output_plugin_private = data;
176 :
177 676 : opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
178 676 : opt->receive_rewrites = false;
179 :
180 1408 : foreach(option, ctx->output_plugin_options)
181 : {
182 738 : DefElem *elem = lfirst(option);
183 :
184 : Assert(elem->arg == NULL || IsA(elem->arg, String));
185 :
186 738 : if (strcmp(elem->defname, "include-xids") == 0)
187 : {
188 : /* if option does not provide a value, it means its value is true */
189 348 : if (elem->arg == NULL)
190 0 : data->include_xids = true;
191 348 : else if (!parse_bool(strVal(elem->arg), &data->include_xids))
192 4 : ereport(ERROR,
193 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
194 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
195 : strVal(elem->arg), elem->defname)));
196 : }
197 390 : else if (strcmp(elem->defname, "include-timestamp") == 0)
198 : {
199 2 : if (elem->arg == NULL)
200 0 : data->include_timestamp = true;
201 2 : else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
202 0 : ereport(ERROR,
203 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
204 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
205 : strVal(elem->arg), elem->defname)));
206 : }
207 388 : else if (strcmp(elem->defname, "force-binary") == 0)
208 : {
209 : bool force_binary;
210 :
211 12 : if (elem->arg == NULL)
212 0 : continue;
213 12 : else if (!parse_bool(strVal(elem->arg), &force_binary))
214 0 : ereport(ERROR,
215 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
216 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
217 : strVal(elem->arg), elem->defname)));
218 :
219 12 : if (force_binary)
220 4 : opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
221 : }
222 376 : else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
223 : {
224 :
225 342 : if (elem->arg == NULL)
226 0 : data->skip_empty_xacts = true;
227 342 : else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
228 0 : ereport(ERROR,
229 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
230 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
231 : strVal(elem->arg), elem->defname)));
232 : }
233 34 : else if (strcmp(elem->defname, "only-local") == 0)
234 : {
235 :
236 6 : if (elem->arg == NULL)
237 0 : data->only_local = true;
238 6 : else if (!parse_bool(strVal(elem->arg), &data->only_local))
239 0 : ereport(ERROR,
240 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
241 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
242 : strVal(elem->arg), elem->defname)));
243 : }
244 28 : else if (strcmp(elem->defname, "include-rewrites") == 0)
245 : {
246 :
247 2 : if (elem->arg == NULL)
248 0 : continue;
249 2 : else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
250 0 : ereport(ERROR,
251 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
252 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
253 : strVal(elem->arg), elem->defname)));
254 : }
255 26 : else if (strcmp(elem->defname, "stream-changes") == 0)
256 : {
257 24 : if (elem->arg == NULL)
258 0 : continue;
259 24 : else if (!parse_bool(strVal(elem->arg), &enable_streaming))
260 0 : ereport(ERROR,
261 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
262 : errmsg("could not parse value \"%s\" for parameter \"%s\"",
263 : strVal(elem->arg), elem->defname)));
264 : }
265 : else
266 : {
267 2 : ereport(ERROR,
268 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
269 : errmsg("option \"%s\" = \"%s\" is unknown",
270 : elem->defname,
271 : elem->arg ? strVal(elem->arg) : "(null)")));
272 : }
273 : }
274 :
275 670 : ctx->streaming &= enable_streaming;
276 670 : }
277 :
278 : /* cleanup this plugin's resources */
279 : static void
280 650 : pg_decode_shutdown(LogicalDecodingContext *ctx)
281 : {
282 650 : TestDecodingData *data = ctx->output_plugin_private;
283 :
284 : /* cleanup our own resources via memory context reset */
285 650 : MemoryContextDelete(data->context);
286 650 : }
287 :
288 : /* BEGIN callback */
289 : static void
290 876 : pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
291 : {
292 876 : TestDecodingData *data = ctx->output_plugin_private;
293 : TestDecodingTxnData *txndata =
294 876 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
295 :
296 876 : txndata->xact_wrote_changes = false;
297 876 : txn->output_plugin_private = txndata;
298 :
299 : /*
300 : * If asked to skip empty transactions, we'll emit BEGIN at the point
301 : * where the first operation is received for this transaction.
302 : */
303 876 : if (data->skip_empty_xacts)
304 788 : return;
305 :
306 88 : pg_output_begin(ctx, data, txn, true);
307 : }
308 :
309 : static void
310 542 : pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
311 : {
312 542 : OutputPluginPrepareWrite(ctx, last_write);
313 542 : if (data->include_xids)
314 80 : appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
315 : else
316 462 : appendStringInfoString(ctx->out, "BEGIN");
317 542 : OutputPluginWrite(ctx, last_write);
318 542 : }
319 :
320 : /* COMMIT callback */
321 : static void
322 876 : pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
323 : XLogRecPtr commit_lsn)
324 : {
325 876 : TestDecodingData *data = ctx->output_plugin_private;
326 876 : TestDecodingTxnData *txndata = txn->output_plugin_private;
327 876 : bool xact_wrote_changes = txndata->xact_wrote_changes;
328 :
329 876 : pfree(txndata);
330 876 : txn->output_plugin_private = NULL;
331 :
332 876 : if (data->skip_empty_xacts && !xact_wrote_changes)
333 350 : return;
334 :
335 526 : OutputPluginPrepareWrite(ctx, true);
336 526 : if (data->include_xids)
337 78 : appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
338 : else
339 448 : appendStringInfoString(ctx->out, "COMMIT");
340 :
341 526 : if (data->include_timestamp)
342 2 : appendStringInfo(ctx->out, " (at %s)",
343 : timestamptz_to_str(txn->xact_time.commit_time));
344 :
345 526 : OutputPluginWrite(ctx, true);
346 : }
347 :
348 : /* BEGIN PREPARE callback */
349 : static void
350 18 : pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
351 : {
352 18 : TestDecodingData *data = ctx->output_plugin_private;
353 : TestDecodingTxnData *txndata =
354 18 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
355 :
356 18 : txndata->xact_wrote_changes = false;
357 18 : txn->output_plugin_private = txndata;
358 :
359 : /*
360 : * If asked to skip empty transactions, we'll emit BEGIN at the point
361 : * where the first operation is received for this transaction.
362 : */
363 18 : if (data->skip_empty_xacts)
364 16 : return;
365 :
366 2 : pg_output_begin(ctx, data, txn, true);
367 : }
368 :
369 : /* PREPARE callback */
370 : static void
371 18 : pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
372 : XLogRecPtr prepare_lsn)
373 : {
374 18 : TestDecodingData *data = ctx->output_plugin_private;
375 18 : TestDecodingTxnData *txndata = txn->output_plugin_private;
376 :
377 : /*
378 : * If asked to skip empty transactions, we'll emit PREPARE at the point
379 : * where the first operation is received for this transaction.
380 : */
381 18 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
382 2 : return;
383 :
384 16 : OutputPluginPrepareWrite(ctx, true);
385 :
386 16 : appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
387 16 : quote_literal_cstr(txn->gid));
388 :
389 16 : if (data->include_xids)
390 2 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
391 :
392 16 : if (data->include_timestamp)
393 0 : appendStringInfo(ctx->out, " (at %s)",
394 : timestamptz_to_str(txn->xact_time.prepare_time));
395 :
396 16 : OutputPluginWrite(ctx, true);
397 : }
398 :
399 : /* COMMIT PREPARED callback */
400 : static void
401 16 : pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
402 : XLogRecPtr commit_lsn)
403 : {
404 16 : TestDecodingData *data = ctx->output_plugin_private;
405 :
406 16 : OutputPluginPrepareWrite(ctx, true);
407 :
408 16 : appendStringInfo(ctx->out, "COMMIT PREPARED %s",
409 16 : quote_literal_cstr(txn->gid));
410 :
411 16 : if (data->include_xids)
412 2 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
413 :
414 16 : if (data->include_timestamp)
415 0 : appendStringInfo(ctx->out, " (at %s)",
416 : timestamptz_to_str(txn->xact_time.commit_time));
417 :
418 16 : OutputPluginWrite(ctx, true);
419 16 : }
420 :
421 : /* ROLLBACK PREPARED callback */
422 : static void
423 4 : pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
424 : ReorderBufferTXN *txn,
425 : XLogRecPtr prepare_end_lsn,
426 : TimestampTz prepare_time)
427 : {
428 4 : TestDecodingData *data = ctx->output_plugin_private;
429 :
430 4 : OutputPluginPrepareWrite(ctx, true);
431 :
432 4 : appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
433 4 : quote_literal_cstr(txn->gid));
434 :
435 4 : if (data->include_xids)
436 0 : appendStringInfo(ctx->out, ", txid %u", txn->xid);
437 :
438 4 : if (data->include_timestamp)
439 0 : appendStringInfo(ctx->out, " (at %s)",
440 : timestamptz_to_str(txn->xact_time.commit_time));
441 :
442 4 : OutputPluginWrite(ctx, true);
443 4 : }
444 :
445 : /*
446 : * Filter out two-phase transactions.
447 : *
448 : * Each plugin can implement its own filtering logic. Here we demonstrate a
449 : * simple logic by checking the GID. If the GID contains the "_nodecode"
450 : * substring, then we filter it out.
451 : */
452 : static bool
453 296 : pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
454 : const char *gid)
455 : {
456 296 : if (strstr(gid, "_nodecode") != NULL)
457 24 : return true;
458 :
459 272 : return false;
460 : }
461 :
462 : static bool
463 2400996 : pg_decode_filter(LogicalDecodingContext *ctx,
464 : RepOriginId origin_id)
465 : {
466 2400996 : TestDecodingData *data = ctx->output_plugin_private;
467 :
468 2400996 : if (data->only_local && origin_id != InvalidRepOriginId)
469 18 : return true;
470 2400978 : return false;
471 : }
472 :
473 : /*
474 : * Print literal `outputstr' already represented as string of type `typid'
475 : * into stringbuf `s'.
476 : *
477 : * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
478 : * if standard_conforming_strings were enabled.
479 : */
480 : static void
481 351916 : print_literal(StringInfo s, Oid typid, char *outputstr)
482 : {
483 : const char *valptr;
484 :
485 351916 : switch (typid)
486 : {
487 120526 : case INT2OID:
488 : case INT4OID:
489 : case INT8OID:
490 : case OIDOID:
491 : case FLOAT4OID:
492 : case FLOAT8OID:
493 : case NUMERICOID:
494 : /* NB: We don't care about Inf, NaN et al. */
495 120526 : appendStringInfoString(s, outputstr);
496 120526 : break;
497 :
498 0 : case BITOID:
499 : case VARBITOID:
500 0 : appendStringInfo(s, "B'%s'", outputstr);
501 0 : break;
502 :
503 0 : case BOOLOID:
504 0 : if (strcmp(outputstr, "t") == 0)
505 0 : appendStringInfoString(s, "true");
506 : else
507 0 : appendStringInfoString(s, "false");
508 0 : break;
509 :
510 231390 : default:
511 231390 : appendStringInfoChar(s, '\'');
512 10854352 : for (valptr = outputstr; *valptr; valptr++)
513 : {
514 10622962 : char ch = *valptr;
515 :
516 10622962 : if (SQL_STR_DOUBLE(ch, false))
517 128 : appendStringInfoChar(s, ch);
518 10622962 : appendStringInfoChar(s, ch);
519 : }
520 231390 : appendStringInfoChar(s, '\'');
521 231390 : break;
522 : }
523 351916 : }
524 :
525 : /* print the tuple 'tuple' into the StringInfo s */
526 : static void
527 291142 : tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
528 : {
529 : int natt;
530 :
531 : /* print all columns individually */
532 694572 : for (natt = 0; natt < tupdesc->natts; natt++)
533 : {
534 : Form_pg_attribute attr; /* the attribute itself */
535 : Oid typid; /* type of current attribute */
536 : Oid typoutput; /* output function */
537 : bool typisvarlena;
538 : Datum origval; /* possibly toasted Datum */
539 : bool isnull; /* column is null? */
540 :
541 403430 : attr = TupleDescAttr(tupdesc, natt);
542 :
543 : /*
544 : * don't print dropped columns, we can't be sure everything is
545 : * available for them
546 : */
547 403430 : if (attr->attisdropped)
548 10260 : continue;
549 :
550 : /*
551 : * Don't print system columns, oid will already have been printed if
552 : * present.
553 : */
554 403286 : if (attr->attnum < 0)
555 0 : continue;
556 :
557 403286 : typid = attr->atttypid;
558 :
559 : /* get Datum from tuple */
560 403286 : origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
561 :
562 403286 : if (isnull && skip_nulls)
563 10116 : continue;
564 :
565 : /* print attribute name */
566 393170 : appendStringInfoChar(s, ' ');
567 393170 : appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
568 :
569 : /* print attribute type */
570 393170 : appendStringInfoChar(s, '[');
571 393170 : appendStringInfoString(s, format_type_be(typid));
572 393170 : appendStringInfoChar(s, ']');
573 :
574 : /* query output function */
575 393170 : getTypeOutputInfo(typid,
576 : &typoutput, &typisvarlena);
577 :
578 : /* print separator */
579 393170 : appendStringInfoChar(s, ':');
580 :
581 : /* print data */
582 393170 : if (isnull)
583 41230 : appendStringInfoString(s, "null");
584 351940 : else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval))
585 24 : appendStringInfoString(s, "unchanged-toast-datum");
586 351916 : else if (!typisvarlena)
587 120534 : print_literal(s, typid,
588 : OidOutputFunctionCall(typoutput, origval));
589 : else
590 : {
591 : Datum val; /* definitely detoasted Datum */
592 :
593 231382 : val = PointerGetDatum(PG_DETOAST_DATUM(origval));
594 231382 : print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
595 : }
596 : }
597 291142 : }
598 :
599 : /*
600 : * callback for individual changed tuples
601 : */
602 : static void
603 301120 : pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
604 : Relation relation, ReorderBufferChange *change)
605 : {
606 : TestDecodingData *data;
607 : TestDecodingTxnData *txndata;
608 : Form_pg_class class_form;
609 : TupleDesc tupdesc;
610 : MemoryContext old;
611 :
612 301120 : data = ctx->output_plugin_private;
613 301120 : txndata = txn->output_plugin_private;
614 :
615 : /* output BEGIN if we haven't yet */
616 301120 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
617 : {
618 432 : pg_output_begin(ctx, data, txn, false);
619 : }
620 301120 : txndata->xact_wrote_changes = true;
621 :
622 301120 : class_form = RelationGetForm(relation);
623 301120 : tupdesc = RelationGetDescr(relation);
624 :
625 : /* Avoid leaking memory by using and resetting our own context */
626 301120 : old = MemoryContextSwitchTo(data->context);
627 :
628 301120 : OutputPluginPrepareWrite(ctx, true);
629 :
630 301120 : appendStringInfoString(ctx->out, "table ");
631 301120 : appendStringInfoString(ctx->out,
632 301120 : quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
633 301120 : class_form->relrewrite ?
634 2 : get_rel_name(class_form->relrewrite) :
635 : NameStr(class_form->relname)));
636 301120 : appendStringInfoChar(ctx->out, ':');
637 :
638 301120 : switch (change->action)
639 : {
640 265990 : case REORDER_BUFFER_CHANGE_INSERT:
641 265990 : appendStringInfoString(ctx->out, " INSERT:");
642 265990 : if (change->data.tp.newtuple == NULL)
643 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
644 : else
645 265990 : tuple_to_stringinfo(ctx->out, tupdesc,
646 : change->data.tp.newtuple,
647 : false);
648 265990 : break;
649 15086 : case REORDER_BUFFER_CHANGE_UPDATE:
650 15086 : appendStringInfoString(ctx->out, " UPDATE:");
651 15086 : if (change->data.tp.oldtuple != NULL)
652 : {
653 36 : appendStringInfoString(ctx->out, " old-key:");
654 36 : tuple_to_stringinfo(ctx->out, tupdesc,
655 : change->data.tp.oldtuple,
656 : true);
657 36 : appendStringInfoString(ctx->out, " new-tuple:");
658 : }
659 :
660 15086 : if (change->data.tp.newtuple == NULL)
661 0 : appendStringInfoString(ctx->out, " (no-tuple-data)");
662 : else
663 15086 : tuple_to_stringinfo(ctx->out, tupdesc,
664 : change->data.tp.newtuple,
665 : false);
666 15086 : break;
667 20044 : case REORDER_BUFFER_CHANGE_DELETE:
668 20044 : appendStringInfoString(ctx->out, " DELETE:");
669 :
670 : /* if there was no PK, we only know that a delete happened */
671 20044 : if (change->data.tp.oldtuple == NULL)
672 10014 : appendStringInfoString(ctx->out, " (no-tuple-data)");
673 : /* In DELETE, only the replica identity is present; display that */
674 : else
675 10030 : tuple_to_stringinfo(ctx->out, tupdesc,
676 : change->data.tp.oldtuple,
677 : true);
678 20044 : break;
679 301120 : default:
680 : Assert(false);
681 : }
682 :
683 301120 : MemoryContextSwitchTo(old);
684 301120 : MemoryContextReset(data->context);
685 :
686 301120 : OutputPluginWrite(ctx, true);
687 301120 : }
688 :
689 : static void
690 16 : pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
691 : int nrelations, Relation relations[], ReorderBufferChange *change)
692 : {
693 : TestDecodingData *data;
694 : TestDecodingTxnData *txndata;
695 : MemoryContext old;
696 : int i;
697 :
698 16 : data = ctx->output_plugin_private;
699 16 : txndata = txn->output_plugin_private;
700 :
701 : /* output BEGIN if we haven't yet */
702 16 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
703 : {
704 14 : pg_output_begin(ctx, data, txn, false);
705 : }
706 16 : txndata->xact_wrote_changes = true;
707 :
708 : /* Avoid leaking memory by using and resetting our own context */
709 16 : old = MemoryContextSwitchTo(data->context);
710 :
711 16 : OutputPluginPrepareWrite(ctx, true);
712 :
713 16 : appendStringInfoString(ctx->out, "table ");
714 :
715 34 : for (i = 0; i < nrelations; i++)
716 : {
717 18 : if (i > 0)
718 2 : appendStringInfoString(ctx->out, ", ");
719 :
720 18 : appendStringInfoString(ctx->out,
721 18 : quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
722 18 : NameStr(relations[i]->rd_rel->relname)));
723 : }
724 :
725 16 : appendStringInfoString(ctx->out, ": TRUNCATE:");
726 :
727 16 : if (change->data.truncate.restart_seqs
728 14 : || change->data.truncate.cascade)
729 : {
730 2 : if (change->data.truncate.restart_seqs)
731 2 : appendStringInfoString(ctx->out, " restart_seqs");
732 2 : if (change->data.truncate.cascade)
733 2 : appendStringInfoString(ctx->out, " cascade");
734 : }
735 : else
736 14 : appendStringInfoString(ctx->out, " (no-flags)");
737 :
738 16 : MemoryContextSwitchTo(old);
739 16 : MemoryContextReset(data->context);
740 :
741 16 : OutputPluginWrite(ctx, true);
742 16 : }
743 :
744 : static void
745 18 : pg_decode_message(LogicalDecodingContext *ctx,
746 : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
747 : const char *prefix, Size sz, const char *message)
748 : {
749 18 : TestDecodingData *data = ctx->output_plugin_private;
750 : TestDecodingTxnData *txndata;
751 :
752 18 : txndata = transactional ? txn->output_plugin_private : NULL;
753 :
754 : /* output BEGIN if we haven't yet for transactional messages */
755 18 : if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
756 6 : pg_output_begin(ctx, data, txn, false);
757 :
758 18 : if (transactional)
759 10 : txndata->xact_wrote_changes = true;
760 :
761 18 : OutputPluginPrepareWrite(ctx, true);
762 18 : appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
763 : transactional, prefix, sz);
764 18 : appendBinaryStringInfo(ctx->out, message, sz);
765 18 : OutputPluginWrite(ctx, true);
766 18 : }
767 :
768 : static void
769 80 : pg_decode_stream_start(LogicalDecodingContext *ctx,
770 : ReorderBufferTXN *txn)
771 : {
772 80 : TestDecodingData *data = ctx->output_plugin_private;
773 80 : TestDecodingTxnData *txndata = txn->output_plugin_private;
774 :
775 : /*
776 : * Allocate the txn plugin data for the first stream in the transaction.
777 : */
778 80 : if (txndata == NULL)
779 : {
780 : txndata =
781 18 : MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
782 18 : txndata->xact_wrote_changes = false;
783 18 : txn->output_plugin_private = txndata;
784 : }
785 :
786 80 : txndata->stream_wrote_changes = false;
787 80 : if (data->skip_empty_xacts)
788 80 : return;
789 0 : pg_output_stream_start(ctx, data, txn, true);
790 : }
791 :
792 : static void
793 22 : pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
794 : {
795 22 : OutputPluginPrepareWrite(ctx, last_write);
796 22 : if (data->include_xids)
797 0 : appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
798 : else
799 22 : appendStringInfoString(ctx->out, "opening a streamed block for transaction");
800 22 : OutputPluginWrite(ctx, last_write);
801 22 : }
802 :
803 : static void
804 80 : pg_decode_stream_stop(LogicalDecodingContext *ctx,
805 : ReorderBufferTXN *txn)
806 : {
807 80 : TestDecodingData *data = ctx->output_plugin_private;
808 80 : TestDecodingTxnData *txndata = txn->output_plugin_private;
809 :
810 80 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
811 58 : return;
812 :
813 22 : OutputPluginPrepareWrite(ctx, true);
814 22 : if (data->include_xids)
815 0 : appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
816 : else
817 22 : appendStringInfoString(ctx->out, "closing a streamed block for transaction");
818 22 : OutputPluginWrite(ctx, true);
819 : }
820 :
821 : static void
822 8 : pg_decode_stream_abort(LogicalDecodingContext *ctx,
823 : ReorderBufferTXN *txn,
824 : XLogRecPtr abort_lsn)
825 : {
826 8 : TestDecodingData *data = ctx->output_plugin_private;
827 :
828 : /*
829 : * stream abort can be sent for an individual subtransaction but we
830 : * maintain the output_plugin_private only under the toptxn so if this is
831 : * not the toptxn then fetch the toptxn.
832 : */
833 8 : ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
834 8 : TestDecodingTxnData *txndata = toptxn->output_plugin_private;
835 8 : bool xact_wrote_changes = txndata->xact_wrote_changes;
836 :
837 8 : if (rbtxn_is_toptxn(txn))
838 : {
839 : Assert(txn->output_plugin_private != NULL);
840 0 : pfree(txndata);
841 0 : txn->output_plugin_private = NULL;
842 : }
843 :
844 8 : if (data->skip_empty_xacts && !xact_wrote_changes)
845 0 : return;
846 :
847 8 : OutputPluginPrepareWrite(ctx, true);
848 8 : if (data->include_xids)
849 0 : appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
850 : else
851 8 : appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
852 8 : OutputPluginWrite(ctx, true);
853 : }
854 :
855 : static void
856 2 : pg_decode_stream_prepare(LogicalDecodingContext *ctx,
857 : ReorderBufferTXN *txn,
858 : XLogRecPtr prepare_lsn)
859 : {
860 2 : TestDecodingData *data = ctx->output_plugin_private;
861 2 : TestDecodingTxnData *txndata = txn->output_plugin_private;
862 :
863 2 : if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
864 0 : return;
865 :
866 2 : OutputPluginPrepareWrite(ctx, true);
867 :
868 2 : if (data->include_xids)
869 0 : appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
870 0 : quote_literal_cstr(txn->gid), txn->xid);
871 : else
872 2 : appendStringInfo(ctx->out, "preparing streamed transaction %s",
873 2 : quote_literal_cstr(txn->gid));
874 :
875 2 : if (data->include_timestamp)
876 0 : appendStringInfo(ctx->out, " (at %s)",
877 : timestamptz_to_str(txn->xact_time.prepare_time));
878 :
879 2 : OutputPluginWrite(ctx, true);
880 : }
881 :
882 : static void
883 10 : pg_decode_stream_commit(LogicalDecodingContext *ctx,
884 : ReorderBufferTXN *txn,
885 : XLogRecPtr commit_lsn)
886 : {
887 10 : TestDecodingData *data = ctx->output_plugin_private;
888 10 : TestDecodingTxnData *txndata = txn->output_plugin_private;
889 10 : bool xact_wrote_changes = txndata->xact_wrote_changes;
890 :
891 10 : pfree(txndata);
892 10 : txn->output_plugin_private = NULL;
893 :
894 10 : if (data->skip_empty_xacts && !xact_wrote_changes)
895 0 : return;
896 :
897 10 : OutputPluginPrepareWrite(ctx, true);
898 :
899 10 : if (data->include_xids)
900 0 : appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
901 : else
902 10 : appendStringInfoString(ctx->out, "committing streamed transaction");
903 :
904 10 : if (data->include_timestamp)
905 0 : appendStringInfo(ctx->out, " (at %s)",
906 : timestamptz_to_str(txn->xact_time.commit_time));
907 :
908 10 : OutputPluginWrite(ctx, true);
909 : }
910 :
911 : /*
912 : * In streaming mode, we don't display the changes as the transaction can abort
913 : * at a later point in time. We don't want users to see the changes until the
914 : * transaction is committed.
915 : */
916 : static void
917 130 : pg_decode_stream_change(LogicalDecodingContext *ctx,
918 : ReorderBufferTXN *txn,
919 : Relation relation,
920 : ReorderBufferChange *change)
921 : {
922 130 : TestDecodingData *data = ctx->output_plugin_private;
923 130 : TestDecodingTxnData *txndata = txn->output_plugin_private;
924 :
925 : /* output stream start if we haven't yet */
926 130 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
927 : {
928 16 : pg_output_stream_start(ctx, data, txn, false);
929 : }
930 130 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
931 :
932 130 : OutputPluginPrepareWrite(ctx, true);
933 130 : if (data->include_xids)
934 0 : appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
935 : else
936 130 : appendStringInfoString(ctx->out, "streaming change for transaction");
937 130 : OutputPluginWrite(ctx, true);
938 130 : }
939 :
940 : /*
941 : * In streaming mode, we don't display the contents for transactional messages
942 : * as the transaction can abort at a later point in time. We don't want users to
943 : * see the message contents until the transaction is committed.
944 : */
945 : static void
946 6 : pg_decode_stream_message(LogicalDecodingContext *ctx,
947 : ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
948 : const char *prefix, Size sz, const char *message)
949 : {
950 : /* Output stream start if we haven't yet for transactional messages. */
951 6 : if (transactional)
952 : {
953 6 : TestDecodingData *data = ctx->output_plugin_private;
954 6 : TestDecodingTxnData *txndata = txn->output_plugin_private;
955 :
956 6 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
957 : {
958 6 : pg_output_stream_start(ctx, data, txn, false);
959 : }
960 6 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
961 : }
962 :
963 6 : OutputPluginPrepareWrite(ctx, true);
964 :
965 6 : if (transactional)
966 : {
967 6 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
968 : transactional, prefix, sz);
969 : }
970 : else
971 : {
972 0 : appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
973 : transactional, prefix, sz);
974 0 : appendBinaryStringInfo(ctx->out, message, sz);
975 : }
976 :
977 6 : OutputPluginWrite(ctx, true);
978 6 : }
979 :
980 : /*
981 : * In streaming mode, we don't display the detailed information of Truncate.
982 : * See pg_decode_stream_change.
983 : */
984 : static void
985 0 : pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
986 : int nrelations, Relation relations[],
987 : ReorderBufferChange *change)
988 : {
989 0 : TestDecodingData *data = ctx->output_plugin_private;
990 0 : TestDecodingTxnData *txndata = txn->output_plugin_private;
991 :
992 0 : if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
993 : {
994 0 : pg_output_stream_start(ctx, data, txn, false);
995 : }
996 0 : txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
997 :
998 0 : OutputPluginPrepareWrite(ctx, true);
999 0 : if (data->include_xids)
1000 0 : appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
1001 : else
1002 0 : appendStringInfoString(ctx->out, "streaming truncate for transaction");
1003 0 : OutputPluginWrite(ctx, true);
1004 0 : }
|