Line data Source code
1 : /* -------------------------------------------------------------------------
2 : *
3 : * decode.c
4 : * This module decodes WAL records read using xlogreader.h's APIs for the
5 : * purpose of logical decoding by passing information to the
6 : * reorderbuffer module (containing the actual changes) and to the
7 : * snapbuild module to build a fitting catalog snapshot (to be able to
8 : * properly decode the changes in the reorderbuffer).
9 : *
10 : * NOTE:
11 : * This basically tries to handle all low level xlog stuff for
12 : * reorderbuffer.c and snapbuild.c. There's some minor leakage where a
13 : * specific record's struct is used to pass data along, but those just
14 : * happen to contain the right amount of data in a convenient
15 : * format. There isn't and shouldn't be much intelligence about the
16 : * contents of records in here except turning them into a more usable
17 : * format.
18 : *
19 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
20 : * Portions Copyright (c) 1994, Regents of the University of California
21 : *
22 : * IDENTIFICATION
23 : * src/backend/replication/logical/decode.c
24 : *
25 : * -------------------------------------------------------------------------
26 : */
27 : #include "postgres.h"
28 :
29 : #include "access/heapam_xlog.h"
30 : #include "access/transam.h"
31 : #include "access/xact.h"
32 : #include "access/xlog_internal.h"
33 : #include "access/xlogreader.h"
34 : #include "access/xlogrecord.h"
35 : #include "catalog/pg_control.h"
36 : #include "commands/repack.h"
37 : #include "replication/decode.h"
38 : #include "replication/logical.h"
39 : #include "replication/message.h"
40 : #include "replication/reorderbuffer.h"
41 : #include "replication/snapbuild.h"
42 : #include "storage/standbydefs.h"
43 :
44 : /* individual record(group)'s handlers */
45 : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
46 : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
47 : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
48 : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
49 : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
50 : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
51 :
52 : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
53 : xl_xact_parsed_commit *parsed, TransactionId xid,
54 : bool two_phase);
55 : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
56 : xl_xact_parsed_abort *parsed, TransactionId xid,
57 : bool two_phase);
58 : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
59 : xl_xact_parsed_prepare *parsed);
60 :
61 :
62 : /* common function to decode tuples */
63 : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
64 :
65 : /* helper functions for decoding transactions */
66 : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
67 : TransactionId xid, const char *gid);
68 : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
69 : XLogRecordBuffer *buf, Oid txn_dbid,
70 : ReplOriginId origin_id);
71 :
72 : /*
73 : * Take every XLogReadRecord()ed record and perform the actions required to
74 : * decode it using the output plugin already setup in the logical decoding
75 : * context.
76 : *
77 : * NB: Note that every record's xid needs to be processed by reorderbuffer
78 : * (xids contained in the content of records are not relevant for this rule).
79 : * That means that for records which'd otherwise not go through the
80 : * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
81 : * call ReorderBufferProcessXid for each record type by default, because
82 : * e.g. empty xacts can be handled more efficiently if there's no previous
83 : * state for them.
84 : *
85 : * We also support the ability to fast forward thru records, skipping some
86 : * record types completely - see individual record types for details.
87 : */
88 : void
89 2354285 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
90 : {
91 : XLogRecordBuffer buf;
92 : TransactionId txid;
93 : RmgrData rmgr;
94 :
95 2354285 : buf.origptr = ctx->reader->ReadRecPtr;
96 2354285 : buf.endptr = ctx->reader->EndRecPtr;
97 2354285 : buf.record = record;
98 :
99 2354285 : txid = XLogRecGetTopXid(record);
100 :
101 : /*
102 : * If the top-level xid is valid, we need to assign the subxact to the
103 : * top-level xact. We need to do this for all records, hence we do it
104 : * before the switch.
105 : */
106 2354285 : if (TransactionIdIsValid(txid))
107 : {
108 671 : ReorderBufferAssignChild(ctx->reorder,
109 : txid,
110 671 : XLogRecGetXid(record),
111 : buf.origptr);
112 : }
113 :
114 2354285 : rmgr = GetRmgr(XLogRecGetRmid(record));
115 :
116 2354285 : if (rmgr.rm_decode != NULL)
117 1798613 : rmgr.rm_decode(ctx, &buf);
118 : else
119 : {
120 : /* just deal with xid, and done */
121 555672 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
122 : buf.origptr);
123 : }
124 2354273 : }
125 :
126 : /*
127 : * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
128 : */
129 : void
130 6530 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
131 : {
132 6530 : SnapBuild *builder = ctx->snapshot_builder;
133 6530 : uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
134 :
135 6530 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
136 : buf->origptr);
137 :
138 6530 : switch (info)
139 : {
140 : /* this is also used in END_OF_RECOVERY checkpoints */
141 93 : case XLOG_CHECKPOINT_SHUTDOWN:
142 : case XLOG_END_OF_RECOVERY:
143 93 : SnapBuildSerializationPoint(builder, buf->origptr);
144 :
145 93 : break;
146 80 : case XLOG_CHECKPOINT_ONLINE:
147 :
148 : /*
149 : * a RUNNING_XACTS record will have been logged near to this, we
150 : * can restart from there.
151 : */
152 80 : break;
153 0 : case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
154 : {
155 : bool logical_decoding;
156 :
157 0 : memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
158 :
159 : /*
160 : * Error out as we should not decode this WAL record.
161 : *
162 : * Logical decoding is disabled, and existing logical slots on
163 : * the standby are invalidated when this WAL record is
164 : * replayed. No logical decoder can process this WAL record
165 : * until replay completes, and by then the slots are already
166 : * invalidated. Furthermore, no new logical slots can be
167 : * created while logical decoding is disabled. This cannot
168 : * occur even on primary either, since it will not restart
169 : * with wal_level < replica if any logical slots exist.
170 : */
171 0 : elog(ERROR, "unexpected logical decoding status change %d",
172 : logical_decoding);
173 :
174 : break;
175 : }
176 6357 : case XLOG_NOOP:
177 : case XLOG_NEXTOID:
178 : case XLOG_SWITCH:
179 : case XLOG_BACKUP_END:
180 : case XLOG_PARAMETER_CHANGE:
181 : case XLOG_RESTORE_POINT:
182 : case XLOG_FPW_CHANGE:
183 : case XLOG_FPI_FOR_HINT:
184 : case XLOG_FPI:
185 : case XLOG_OVERWRITE_CONTRECORD:
186 : case XLOG_CHECKPOINT_REDO:
187 6357 : break;
188 0 : default:
189 0 : elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
190 : }
191 6530 : }
192 :
193 : void
194 0 : xlog2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
195 : {
196 0 : uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
197 :
198 0 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr);
199 :
200 0 : switch (info)
201 : {
202 0 : case XLOG2_CHECKSUMS:
203 0 : break;
204 0 : default:
205 0 : elog(ERROR, "unexpected RM_XLOG2_ID record type: %u", info);
206 : }
207 0 : }
208 :
209 : /*
210 : * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
211 : */
212 : void
213 10577 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
214 : {
215 10577 : SnapBuild *builder = ctx->snapshot_builder;
216 10577 : ReorderBuffer *reorder = ctx->reorder;
217 10577 : XLogReaderState *r = buf->record;
218 10577 : uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
219 :
220 : /*
221 : * If the snapshot isn't yet fully built, we cannot decode anything, so
222 : * bail out.
223 : */
224 10577 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
225 13 : return;
226 :
227 10564 : switch (info)
228 : {
229 3967 : case XLOG_XACT_COMMIT:
230 : case XLOG_XACT_COMMIT_PREPARED:
231 : {
232 : xl_xact_commit *xlrec;
233 : xl_xact_parsed_commit parsed;
234 : TransactionId xid;
235 3967 : bool two_phase = false;
236 :
237 3967 : xlrec = (xl_xact_commit *) XLogRecGetData(r);
238 3967 : ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
239 :
240 3967 : if (!TransactionIdIsValid(parsed.twophase_xid))
241 3839 : xid = XLogRecGetXid(r);
242 : else
243 128 : xid = parsed.twophase_xid;
244 :
245 : /*
246 : * We would like to process the transaction in a two-phase
247 : * manner iff output plugin supports two-phase commits and
248 : * doesn't filter the transaction at prepare time.
249 : */
250 3967 : if (info == XLOG_XACT_COMMIT_PREPARED)
251 128 : two_phase = !(FilterPrepare(ctx, xid,
252 128 : parsed.twophase_gid));
253 :
254 3967 : DecodeCommit(ctx, buf, &parsed, xid, two_phase);
255 3957 : break;
256 : }
257 260 : case XLOG_XACT_ABORT:
258 : case XLOG_XACT_ABORT_PREPARED:
259 : {
260 : xl_xact_abort *xlrec;
261 : xl_xact_parsed_abort parsed;
262 : TransactionId xid;
263 260 : bool two_phase = false;
264 :
265 260 : xlrec = (xl_xact_abort *) XLogRecGetData(r);
266 260 : ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
267 :
268 260 : if (!TransactionIdIsValid(parsed.twophase_xid))
269 209 : xid = XLogRecGetXid(r);
270 : else
271 51 : xid = parsed.twophase_xid;
272 :
273 : /*
274 : * We would like to process the transaction in a two-phase
275 : * manner iff output plugin supports two-phase commits and
276 : * doesn't filter the transaction at prepare time.
277 : */
278 260 : if (info == XLOG_XACT_ABORT_PREPARED)
279 51 : two_phase = !(FilterPrepare(ctx, xid,
280 51 : parsed.twophase_gid));
281 :
282 260 : DecodeAbort(ctx, buf, &parsed, xid, two_phase);
283 260 : break;
284 : }
285 124 : case XLOG_XACT_ASSIGNMENT:
286 :
287 : /*
288 : * We assign subxact to the toplevel xact while processing each
289 : * record if required. So, we don't need to do anything here. See
290 : * LogicalDecodingProcessRecord.
291 : */
292 124 : break;
293 6011 : case XLOG_XACT_INVALIDATIONS:
294 : {
295 : TransactionId xid;
296 : xl_xact_invals *invals;
297 :
298 6011 : xid = XLogRecGetXid(r);
299 6011 : invals = (xl_xact_invals *) XLogRecGetData(r);
300 :
301 : /*
302 : * Execute the invalidations for xid-less transactions,
303 : * otherwise, accumulate them so that they can be processed at
304 : * the commit time.
305 : */
306 6011 : if (TransactionIdIsValid(xid))
307 : {
308 5979 : if (!ctx->fast_forward)
309 5904 : ReorderBufferAddInvalidations(reorder, xid,
310 : buf->origptr,
311 5904 : invals->nmsgs,
312 5904 : invals->msgs);
313 5979 : ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
314 : buf->origptr);
315 : }
316 32 : else if (!ctx->fast_forward)
317 32 : ReorderBufferImmediateInvalidation(ctx->reorder,
318 32 : invals->nmsgs,
319 32 : invals->msgs);
320 :
321 6011 : break;
322 : }
323 202 : case XLOG_XACT_PREPARE:
324 : {
325 : xl_xact_parsed_prepare parsed;
326 : xl_xact_prepare *xlrec;
327 :
328 : /* ok, parse it */
329 202 : xlrec = (xl_xact_prepare *) XLogRecGetData(r);
330 202 : ParsePrepareRecord(XLogRecGetInfo(buf->record),
331 : xlrec, &parsed);
332 :
333 : /*
334 : * We would like to process the transaction in a two-phase
335 : * manner iff output plugin supports two-phase commits and
336 : * doesn't filter the transaction at prepare time.
337 : */
338 202 : if (FilterPrepare(ctx, parsed.twophase_xid,
339 : parsed.twophase_gid))
340 : {
341 20 : ReorderBufferProcessXid(reorder, parsed.twophase_xid,
342 : buf->origptr);
343 20 : break;
344 : }
345 :
346 : /*
347 : * Note that if the prepared transaction has locked [user]
348 : * catalog tables exclusively then decoding prepare can block
349 : * till the main transaction is committed because it needs to
350 : * lock the catalog tables.
351 : *
352 : * XXX Now, this can even lead to a deadlock if the prepare
353 : * transaction is waiting to get it logically replicated for
354 : * distributed 2PC. This can be avoided by disallowing
355 : * preparing transactions that have locked [user] catalog
356 : * tables exclusively but as of now, we ask users not to do
357 : * such an operation.
358 : */
359 182 : DecodePrepare(ctx, buf, &parsed);
360 182 : break;
361 : }
362 0 : default:
363 0 : elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
364 : }
365 : }
366 :
367 : /*
368 : * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
369 : */
370 : void
371 4383 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
372 : {
373 4383 : SnapBuild *builder = ctx->snapshot_builder;
374 4383 : XLogReaderState *r = buf->record;
375 4383 : uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
376 :
377 4383 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
378 :
379 4383 : switch (info)
380 : {
381 1703 : case XLOG_RUNNING_XACTS:
382 : {
383 1703 : xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
384 :
385 : /*
386 : * Update this decoder's idea of transactions currently
387 : * running. In doing so we will determine whether we have
388 : * reached consistent status.
389 : */
390 1703 : SnapBuildProcessRunningXacts(builder, buf->origptr, running);
391 :
392 : /*
393 : * Abort all transactions that we keep track of, that are
394 : * older than the record's oldestRunningXid. This is the most
395 : * convenient spot for doing so since, in contrast to shutdown
396 : * or end-of-recovery checkpoints, we have information about
397 : * all running transactions which includes prepared ones,
398 : * while shutdown checkpoints just know that no non-prepared
399 : * transactions are in progress.
400 : */
401 1701 : ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
402 : }
403 1701 : break;
404 2648 : case XLOG_STANDBY_LOCK:
405 2648 : break;
406 32 : case XLOG_INVALIDATIONS:
407 :
408 : /*
409 : * We are processing the invalidations at the command level via
410 : * XLOG_XACT_INVALIDATIONS. So we don't need to do anything here.
411 : */
412 32 : break;
413 0 : default:
414 0 : elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
415 : }
416 4381 : }
417 :
418 : /*
419 : * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
420 : */
421 : void
422 36944 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
423 : {
424 36944 : uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
425 36944 : TransactionId xid = XLogRecGetXid(buf->record);
426 36944 : SnapBuild *builder = ctx->snapshot_builder;
427 :
428 36944 : ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
429 :
430 : /*
431 : * If we don't have snapshot or we are just fast-forwarding, there is no
432 : * point in decoding data changes. However, it's crucial to build the base
433 : * snapshot during fast-forward mode (as is done in
434 : * SnapBuildProcessChange()) because we require the snapshot's xmin when
435 : * determining the candidate catalog_xmin for the replication slot. See
436 : * SnapBuildProcessRunningXacts().
437 : */
438 36944 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
439 8 : return;
440 :
441 36936 : switch (info)
442 : {
443 6958 : case XLOG_HEAP2_MULTI_INSERT:
444 6958 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
445 6958 : !ctx->fast_forward &&
446 6855 : !change_useless_for_repack(buf))
447 6781 : DecodeMultiInsert(ctx, buf);
448 6958 : break;
449 27981 : case XLOG_HEAP2_NEW_CID:
450 27981 : if (!ctx->fast_forward)
451 : {
452 : xl_heap_new_cid *xlrec;
453 :
454 27598 : xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
455 27598 : SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
456 : }
457 27981 : break;
458 90 : case XLOG_HEAP2_REWRITE:
459 :
460 : /*
461 : * Although these records only exist to serve the needs of logical
462 : * decoding, all the work happens as part of crash or archive
463 : * recovery, so we don't need to do anything here.
464 : */
465 90 : break;
466 :
467 : /*
468 : * Everything else here is just low level physical stuff we're not
469 : * interested in.
470 : */
471 1907 : case XLOG_HEAP2_PRUNE_ON_ACCESS:
472 : case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
473 : case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
474 : case XLOG_HEAP2_LOCK_UPDATED:
475 1907 : break;
476 0 : default:
477 0 : elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
478 : }
479 : }
480 :
481 : /*
482 : * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
483 : */
484 : void
485 1740081 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
486 : {
487 1740081 : uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
488 1740081 : TransactionId xid = XLogRecGetXid(buf->record);
489 1740081 : SnapBuild *builder = ctx->snapshot_builder;
490 :
491 1740081 : ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
492 :
493 : /*
494 : * If we don't have snapshot or we are just fast-forwarding, there is no
495 : * point in decoding data changes. However, it's crucial to build the base
496 : * snapshot during fast-forward mode (as is done in
497 : * SnapBuildProcessChange()) because we require the snapshot's xmin when
498 : * determining the candidate catalog_xmin for the replication slot. See
499 : * SnapBuildProcessRunningXacts().
500 : */
501 1740081 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
502 1009 : return;
503 :
504 1739072 : switch (info)
505 : {
506 1125138 : case XLOG_HEAP_INSERT:
507 1125138 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
508 1125138 : !ctx->fast_forward &&
509 1124940 : !change_useless_for_repack(buf))
510 1124721 : DecodeInsert(ctx, buf);
511 1125138 : break;
512 :
513 : /*
514 : * Treat HOT update as normal updates. There is no useful
515 : * information in the fact that we could make it a HOT update
516 : * locally and the WAL layout is compatible.
517 : */
518 181103 : case XLOG_HEAP_HOT_UPDATE:
519 : case XLOG_HEAP_UPDATE:
520 181103 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
521 181103 : !ctx->fast_forward &&
522 181090 : !change_useless_for_repack(buf))
523 181055 : DecodeUpdate(ctx, buf);
524 181103 : break;
525 :
526 238110 : case XLOG_HEAP_DELETE:
527 238110 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
528 238110 : !ctx->fast_forward &&
529 238016 : !change_useless_for_repack(buf))
530 237991 : DecodeDelete(ctx, buf);
531 238110 : break;
532 :
533 64 : case XLOG_HEAP_TRUNCATE:
534 64 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
535 64 : !ctx->fast_forward &&
536 62 : !change_useless_for_repack(buf))
537 62 : DecodeTruncate(ctx, buf);
538 64 : break;
539 :
540 1228 : case XLOG_HEAP_INPLACE:
541 :
542 : /*
543 : * Inplace updates are only ever performed on catalog tuples and
544 : * can, per definition, not change tuple visibility. Since we
545 : * also don't decode catalog tuples, we're not interested in the
546 : * record's contents.
547 : */
548 1228 : break;
549 :
550 17965 : case XLOG_HEAP_CONFIRM:
551 17965 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
552 17965 : !ctx->fast_forward &&
553 17965 : !change_useless_for_repack(buf))
554 17965 : DecodeSpecConfirm(ctx, buf);
555 17965 : break;
556 :
557 175464 : case XLOG_HEAP_LOCK:
558 : /* we don't care about row level locks for now */
559 175464 : break;
560 :
561 0 : default:
562 0 : elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
563 : break;
564 : }
565 : }
566 :
567 : /*
568 : * Ask output plugin whether we want to skip this PREPARE and send
569 : * this transaction as a regular commit later.
570 : */
571 : static inline bool
572 381 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
573 : const char *gid)
574 : {
575 : /*
576 : * Skip if decoding of two-phase transactions at PREPARE time is not
577 : * enabled. In that case, all two-phase transactions are considered
578 : * filtered out and will be applied as regular transactions at COMMIT
579 : * PREPARED.
580 : */
581 381 : if (!ctx->twophase)
582 23 : return true;
583 :
584 : /*
585 : * The filter_prepare callback is optional. When not supplied, all
586 : * prepared transactions should go through.
587 : */
588 358 : if (ctx->callbacks.filter_prepare_cb == NULL)
589 210 : return false;
590 :
591 148 : return filter_prepare_cb_wrapper(ctx, xid, gid);
592 : }
593 :
594 : static inline bool
595 1559254 : FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
596 : {
597 1559254 : if (ctx->callbacks.filter_by_origin_cb == NULL)
598 131 : return false;
599 :
600 1559123 : return filter_by_origin_cb_wrapper(ctx, origin_id);
601 : }
602 :
603 : /*
604 : * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
605 : */
606 : void
607 98 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
608 : {
609 98 : SnapBuild *builder = ctx->snapshot_builder;
610 98 : XLogReaderState *r = buf->record;
611 98 : TransactionId xid = XLogRecGetXid(r);
612 98 : uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
613 98 : ReplOriginId origin_id = XLogRecGetOrigin(r);
614 98 : Snapshot snapshot = NULL;
615 : xl_logical_message *message;
616 :
617 98 : if (info != XLOG_LOGICAL_MESSAGE)
618 0 : elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
619 :
620 98 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
621 :
622 : /* If we don't have snapshot, there is no point in decoding messages */
623 98 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
624 0 : return;
625 :
626 98 : message = (xl_logical_message *) XLogRecGetData(r);
627 :
628 194 : if (message->dbId != ctx->slot->data.database ||
629 96 : FilterByOrigin(ctx, origin_id))
630 4 : return;
631 :
632 94 : if (message->transactional &&
633 39 : !SnapBuildProcessChange(builder, xid, buf->origptr))
634 0 : return;
635 149 : else if (!message->transactional &&
636 110 : (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
637 55 : SnapBuildXactNeedsSkip(builder, buf->origptr)))
638 44 : return;
639 :
640 : /*
641 : * We also skip decoding in fast_forward mode. This check must be last
642 : * because we don't want to set the processing_required flag unless we
643 : * have a decodable message.
644 : */
645 50 : if (ctx->fast_forward)
646 : {
647 : /*
648 : * We need to set processing_required flag to notify the message's
649 : * existence to the caller. Usually, the flag is set when either the
650 : * COMMIT or ABORT records are decoded, but this must be turned on
651 : * here because the non-transactional logical message is decoded
652 : * without waiting for these records.
653 : */
654 3 : if (!message->transactional)
655 3 : ctx->processing_required = true;
656 :
657 3 : return;
658 : }
659 :
660 : /*
661 : * If this is a non-transactional change, get the snapshot we're expected
662 : * to use. We only get here when the snapshot is consistent, and the
663 : * change is not meant to be skipped.
664 : *
665 : * For transactional changes we don't need a snapshot, we'll use the
666 : * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
667 : */
668 47 : if (!message->transactional)
669 8 : snapshot = SnapBuildGetOrBuildSnapshot(builder);
670 :
671 47 : ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
672 47 : message->transactional,
673 47 : message->message, /* first part of message is
674 : * prefix */
675 : message->message_size,
676 47 : message->message + message->prefix_size);
677 : }
678 :
679 : /*
680 : * Consolidated commit record handling between the different form of commit
681 : * records.
682 : *
683 : * 'two_phase' indicates that caller wants to process the transaction in two
684 : * phases, first process prepare if not already done and then process
685 : * commit_prepared.
686 : */
687 : static void
688 3967 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
689 : xl_xact_parsed_commit *parsed, TransactionId xid,
690 : bool two_phase)
691 : {
692 3967 : XLogRecPtr origin_lsn = InvalidXLogRecPtr;
693 3967 : TimestampTz commit_time = parsed->xact_time;
694 3967 : ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
695 : int i;
696 :
697 3967 : if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
698 : {
699 105 : origin_lsn = parsed->origin_lsn;
700 105 : commit_time = parsed->origin_timestamp;
701 : }
702 :
703 3967 : SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
704 : parsed->nsubxacts, parsed->subxacts,
705 : parsed->xinfo);
706 :
707 : /* ----
708 : * Check whether we are interested in this specific transaction, and tell
709 : * the reorderbuffer to forget the content of the (sub-)transactions
710 : * if not.
711 : *
712 : * We can't just use ReorderBufferAbort() here, because we need to execute
713 : * the transaction's invalidations. This currently won't be needed if
714 : * we're just skipping over the transaction because currently we only do
715 : * so during startup, to get to the first transaction the client needs. As
716 : * we have reset the catalog caches before starting to read WAL, and we
717 : * haven't yet touched any catalogs, there can't be anything to invalidate.
718 : * But if we're "forgetting" this commit because it happened in another
719 : * database, the invalidations might be important, because they could be
720 : * for shared catalogs and we might have loaded data into the relevant
721 : * syscaches.
722 : * ---
723 : */
724 3967 : if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
725 : {
726 2946 : for (i = 0; i < parsed->nsubxacts; i++)
727 : {
728 967 : ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
729 : }
730 1979 : ReorderBufferForget(ctx->reorder, xid, buf->origptr);
731 :
732 1979 : return;
733 : }
734 :
735 : /* tell the reorderbuffer about the surviving subtransactions */
736 2257 : for (i = 0; i < parsed->nsubxacts; i++)
737 : {
738 269 : ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
739 : buf->origptr, buf->endptr);
740 : }
741 :
742 : /*
743 : * Send the final commit record if the transaction data is already
744 : * decoded, otherwise, process the entire transaction.
745 : */
746 1988 : if (two_phase)
747 : {
748 35 : ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
749 35 : SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
750 : commit_time, origin_id, origin_lsn,
751 35 : parsed->twophase_gid, true);
752 : }
753 : else
754 : {
755 1953 : ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
756 : commit_time, origin_id, origin_lsn);
757 : }
758 :
759 : /*
760 : * Update the decoding stats at transaction prepare/commit/abort.
761 : * Additionally we send the stats when we spill or stream the changes to
762 : * avoid losing them in case the decoding is interrupted. It is not clear
763 : * that sending more or less frequently than this would be better.
764 : */
765 1978 : UpdateDecodingStats(ctx);
766 : }
767 :
768 : /*
769 : * Decode PREPARE record. Similar logic as in DecodeCommit.
770 : *
771 : * Note that we don't skip prepare even if have detected concurrent abort
772 : * because it is quite possible that we had already sent some changes before we
773 : * detect abort in which case we need to abort those changes in the subscriber.
774 : * To abort such changes, we do send the prepare and then the rollback prepared
775 : * which is what happened on the publisher-side as well. Now, we can invent a
776 : * new abort API wherein in such cases we send abort and skip sending prepared
777 : * and rollback prepared but then it is not that straightforward because we
778 : * might have streamed this transaction by that time in which case it is
779 : * handled when the rollback is encountered. It is not impossible to optimize
780 : * the concurrent abort case but it can introduce design complexity w.r.t
781 : * handling different cases so leaving it for now as it doesn't seem worth it.
782 : */
783 : static void
784 182 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
785 : xl_xact_parsed_prepare *parsed)
786 : {
787 182 : SnapBuild *builder = ctx->snapshot_builder;
788 182 : XLogRecPtr origin_lsn = parsed->origin_lsn;
789 182 : TimestampTz prepare_time = parsed->xact_time;
790 182 : ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
791 : int i;
792 182 : TransactionId xid = parsed->twophase_xid;
793 :
794 182 : if (parsed->origin_timestamp != 0)
795 8 : prepare_time = parsed->origin_timestamp;
796 :
797 : /*
798 : * Remember the prepare info for a txn so that it can be used later in
799 : * commit prepared if required. See ReorderBufferFinishPrepared.
800 : */
801 182 : if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
802 : buf->endptr, prepare_time, origin_id,
803 : origin_lsn))
804 0 : return;
805 :
806 : /* We can't start streaming unless a consistent state is reached. */
807 182 : if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
808 : {
809 3 : ReorderBufferSkipPrepare(ctx->reorder, xid);
810 3 : return;
811 : }
812 :
813 : /*
814 : * Check whether we need to process this transaction. See
815 : * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
816 : * transaction.
817 : *
818 : * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
819 : * hasn't yet been committed, removing this txn before a commit might
820 : * result in the computation of an incorrect restart_lsn. See
821 : * SnapBuildProcessRunningXacts. But we need to process cache
822 : * invalidations if there are any for the reasons mentioned in
823 : * DecodeCommit.
824 : */
825 179 : if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
826 : {
827 134 : ReorderBufferSkipPrepare(ctx->reorder, xid);
828 134 : ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
829 134 : return;
830 : }
831 :
832 : /* Tell the reorderbuffer about the surviving subtransactions. */
833 46 : for (i = 0; i < parsed->nsubxacts; i++)
834 : {
835 1 : ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
836 : buf->origptr, buf->endptr);
837 : }
838 :
839 : /* replay actions of all transaction + subtransactions in order */
840 45 : ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
841 :
842 : /*
843 : * Update the decoding stats at transaction prepare/commit/abort.
844 : * Additionally we send the stats when we spill or stream the changes to
845 : * avoid losing them in case the decoding is interrupted. It is not clear
846 : * that sending more or less frequently than this would be better.
847 : */
848 45 : UpdateDecodingStats(ctx);
849 : }
850 :
851 :
852 : /*
853 : * Get the data from the various forms of abort records and pass it on to
854 : * snapbuild.c and reorderbuffer.c.
855 : *
856 : * 'two_phase' indicates to finish prepared transaction.
857 : */
858 : static void
859 260 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
860 : xl_xact_parsed_abort *parsed, TransactionId xid,
861 : bool two_phase)
862 : {
863 : int i;
864 260 : XLogRecPtr origin_lsn = InvalidXLogRecPtr;
865 260 : TimestampTz abort_time = parsed->xact_time;
866 260 : ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
867 : bool skip_xact;
868 :
869 260 : if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
870 : {
871 4 : origin_lsn = parsed->origin_lsn;
872 4 : abort_time = parsed->origin_timestamp;
873 : }
874 :
875 : /*
876 : * Check whether we need to process this transaction. See
877 : * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
878 : * transaction.
879 : */
880 260 : skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
881 :
882 : /*
883 : * Send the final rollback record for a prepared transaction unless we
884 : * need to skip it. For non-two-phase xacts, simply forget the xact.
885 : */
886 260 : if (two_phase && !skip_xact)
887 : {
888 11 : ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
889 : InvalidXLogRecPtr,
890 : abort_time, origin_id, origin_lsn,
891 11 : parsed->twophase_gid, false);
892 : }
893 : else
894 : {
895 255 : for (i = 0; i < parsed->nsubxacts; i++)
896 : {
897 6 : ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
898 6 : buf->record->EndRecPtr, abort_time);
899 : }
900 :
901 249 : ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
902 : abort_time);
903 : }
904 :
905 : /* update the decoding stats */
906 260 : UpdateDecodingStats(ctx);
907 260 : }
908 :
909 : /*
910 : * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
911 : *
912 : * Inserts can contain the new tuple.
913 : */
914 : static void
915 1124721 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
916 : {
917 : Size datalen;
918 : char *tupledata;
919 : Size tuplelen;
920 1124721 : XLogReaderState *r = buf->record;
921 : xl_heap_insert *xlrec;
922 : ReorderBufferChange *change;
923 : RelFileLocator target_locator;
924 :
925 1124721 : xlrec = (xl_heap_insert *) XLogRecGetData(r);
926 :
927 : /*
928 : * Ignore insert records without new tuples (this does happen when
929 : * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
930 : */
931 1124721 : if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
932 4712 : return;
933 :
934 : /* only interested in our database */
935 1120168 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
936 1120168 : if (target_locator.dbOid != ctx->slot->data.database)
937 0 : return;
938 :
939 : /* output plugin doesn't look for this origin, no need to queue */
940 1120168 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
941 159 : return;
942 :
943 1120009 : change = ReorderBufferAllocChange(ctx->reorder);
944 1120009 : if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
945 1102044 : change->action = REORDER_BUFFER_CHANGE_INSERT;
946 : else
947 17965 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
948 1120009 : change->origin_id = XLogRecGetOrigin(r);
949 :
950 1120009 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
951 :
952 1120009 : tupledata = XLogRecGetBlockData(r, 0, &datalen);
953 1120009 : tuplelen = datalen - SizeOfHeapHeader;
954 :
955 1120009 : change->data.tp.newtuple =
956 1120009 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
957 :
958 1120009 : DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
959 :
960 1120009 : change->data.tp.clear_toast_afterwards = true;
961 :
962 1120009 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
963 : change,
964 1120009 : xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
965 : }
966 :
967 : /*
968 : * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
969 : * in the record, from wal into proper tuplebufs.
970 : *
971 : * Updates can possibly contain a new tuple and the old primary key.
972 : */
973 : static void
974 181055 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
975 : {
976 181055 : XLogReaderState *r = buf->record;
977 : xl_heap_update *xlrec;
978 : ReorderBufferChange *change;
979 : char *data;
980 : RelFileLocator target_locator;
981 :
982 181055 : xlrec = (xl_heap_update *) XLogRecGetData(r);
983 :
984 : /* only interested in our database */
985 181055 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
986 181055 : if (target_locator.dbOid != ctx->slot->data.database)
987 286 : return;
988 :
989 : /* output plugin doesn't look for this origin, no need to queue */
990 180794 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
991 25 : return;
992 :
993 180769 : change = ReorderBufferAllocChange(ctx->reorder);
994 180769 : change->action = REORDER_BUFFER_CHANGE_UPDATE;
995 180769 : change->origin_id = XLogRecGetOrigin(r);
996 180769 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
997 :
998 180769 : if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
999 : {
1000 : Size datalen;
1001 : Size tuplelen;
1002 :
1003 178966 : data = XLogRecGetBlockData(r, 0, &datalen);
1004 :
1005 178966 : tuplelen = datalen - SizeOfHeapHeader;
1006 :
1007 178966 : change->data.tp.newtuple =
1008 178966 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
1009 :
1010 178966 : DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
1011 : }
1012 :
1013 180769 : if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
1014 : {
1015 : Size datalen;
1016 : Size tuplelen;
1017 :
1018 : /* caution, remaining data in record is not aligned */
1019 362 : data = XLogRecGetData(r) + SizeOfHeapUpdate;
1020 362 : datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
1021 362 : tuplelen = datalen - SizeOfHeapHeader;
1022 :
1023 362 : change->data.tp.oldtuple =
1024 362 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
1025 :
1026 362 : DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
1027 : }
1028 :
1029 180769 : change->data.tp.clear_toast_afterwards = true;
1030 :
1031 180769 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
1032 : change, false);
1033 : }
1034 :
1035 : /*
1036 : * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
1037 : *
1038 : * Deletes can possibly contain the old primary key.
1039 : */
1040 : static void
1041 237991 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1042 : {
1043 237991 : XLogReaderState *r = buf->record;
1044 : xl_heap_delete *xlrec;
1045 : ReorderBufferChange *change;
1046 : RelFileLocator target_locator;
1047 :
1048 237991 : xlrec = (xl_heap_delete *) XLogRecGetData(r);
1049 :
1050 : /*
1051 : * Skip changes that were marked as ignorable at origin.
1052 : *
1053 : * (This is used for changes that affect relations not visible to other
1054 : * transactions, such as the transient table during concurrent repack.)
1055 : */
1056 237991 : if (xlrec->flags & XLH_DELETE_NO_LOGICAL)
1057 50 : return;
1058 :
1059 : /* only interested in our database */
1060 237991 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
1061 237991 : if (target_locator.dbOid != ctx->slot->data.database)
1062 32 : return;
1063 :
1064 : /* output plugin doesn't look for this origin, no need to queue */
1065 237959 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1066 18 : return;
1067 :
1068 237941 : change = ReorderBufferAllocChange(ctx->reorder);
1069 :
1070 237941 : if (xlrec->flags & XLH_DELETE_IS_SUPER)
1071 0 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
1072 : else
1073 237941 : change->action = REORDER_BUFFER_CHANGE_DELETE;
1074 :
1075 237941 : change->origin_id = XLogRecGetOrigin(r);
1076 :
1077 237941 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
1078 :
1079 : /* old primary key stored */
1080 237941 : if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
1081 : {
1082 175895 : Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
1083 175895 : Size tuplelen = datalen - SizeOfHeapHeader;
1084 :
1085 : Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
1086 :
1087 175895 : change->data.tp.oldtuple =
1088 175895 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
1089 :
1090 175895 : DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
1091 : datalen, change->data.tp.oldtuple);
1092 : }
1093 :
1094 237941 : change->data.tp.clear_toast_afterwards = true;
1095 :
1096 237941 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
1097 : change, false);
1098 : }
1099 :
1100 : /*
1101 : * Parse XLOG_HEAP_TRUNCATE from wal
1102 : */
1103 : static void
1104 62 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1105 : {
1106 62 : XLogReaderState *r = buf->record;
1107 : xl_heap_truncate *xlrec;
1108 : ReorderBufferChange *change;
1109 :
1110 62 : xlrec = (xl_heap_truncate *) XLogRecGetData(r);
1111 :
1112 : /* only interested in our database */
1113 62 : if (xlrec->dbId != ctx->slot->data.database)
1114 0 : return;
1115 :
1116 : /* output plugin doesn't look for this origin, no need to queue */
1117 62 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1118 1 : return;
1119 :
1120 61 : change = ReorderBufferAllocChange(ctx->reorder);
1121 61 : change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
1122 61 : change->origin_id = XLogRecGetOrigin(r);
1123 61 : if (xlrec->flags & XLH_TRUNCATE_CASCADE)
1124 1 : change->data.truncate.cascade = true;
1125 61 : if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
1126 2 : change->data.truncate.restart_seqs = true;
1127 61 : change->data.truncate.nrelids = xlrec->nrelids;
1128 122 : change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
1129 61 : xlrec->nrelids);
1130 61 : memcpy(change->data.truncate.relids, xlrec->relids,
1131 61 : xlrec->nrelids * sizeof(Oid));
1132 61 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
1133 : buf->origptr, change, false);
1134 : }
1135 :
1136 : /*
1137 : * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
1138 : *
1139 : * Currently MULTI_INSERT will always contain the full tuples.
1140 : */
1141 : static void
1142 6781 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1143 : {
1144 6781 : XLogReaderState *r = buf->record;
1145 : xl_heap_multi_insert *xlrec;
1146 : int i;
1147 : char *data;
1148 : char *tupledata;
1149 : Size tuplelen;
1150 : RelFileLocator rlocator;
1151 :
1152 6781 : xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
1153 :
1154 : /*
1155 : * Ignore insert records without new tuples. This happens when a
1156 : * multi_insert is done on a catalog or on a non-persistent relation.
1157 : */
1158 6781 : if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
1159 6766 : return;
1160 :
1161 : /* only interested in our database */
1162 53 : XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
1163 53 : if (rlocator.dbOid != ctx->slot->data.database)
1164 38 : return;
1165 :
1166 : /* output plugin doesn't look for this origin, no need to queue */
1167 15 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1168 0 : return;
1169 :
1170 : /*
1171 : * We know that this multi_insert isn't for a catalog, so the block should
1172 : * always have data even if a full-page write of it is taken.
1173 : */
1174 15 : tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
1175 : Assert(tupledata != NULL);
1176 :
1177 15 : data = tupledata;
1178 1064 : for (i = 0; i < xlrec->ntuples; i++)
1179 : {
1180 : ReorderBufferChange *change;
1181 : xl_multi_insert_tuple *xlhdr;
1182 : int datalen;
1183 : HeapTuple tuple;
1184 : HeapTupleHeader header;
1185 :
1186 1049 : change = ReorderBufferAllocChange(ctx->reorder);
1187 1049 : change->action = REORDER_BUFFER_CHANGE_INSERT;
1188 1049 : change->origin_id = XLogRecGetOrigin(r);
1189 :
1190 1049 : memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
1191 :
1192 1049 : xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
1193 1049 : data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
1194 1049 : datalen = xlhdr->datalen;
1195 :
1196 1049 : change->data.tp.newtuple =
1197 1049 : ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
1198 :
1199 1049 : tuple = change->data.tp.newtuple;
1200 1049 : header = tuple->t_data;
1201 :
1202 : /* not a disk based tuple */
1203 1049 : ItemPointerSetInvalid(&tuple->t_self);
1204 :
1205 : /*
1206 : * We can only figure this out after reassembling the transactions.
1207 : */
1208 1049 : tuple->t_tableOid = InvalidOid;
1209 :
1210 1049 : tuple->t_len = datalen + SizeofHeapTupleHeader;
1211 :
1212 1049 : memset(header, 0, SizeofHeapTupleHeader);
1213 :
1214 1049 : memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
1215 1049 : header->t_infomask = xlhdr->t_infomask;
1216 1049 : header->t_infomask2 = xlhdr->t_infomask2;
1217 1049 : header->t_hoff = xlhdr->t_hoff;
1218 :
1219 : /*
1220 : * Reset toast reassembly state only after the last row in the last
1221 : * xl_multi_insert_tuple record emitted by one heap_multi_insert()
1222 : * call.
1223 : */
1224 1049 : if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
1225 189 : (i + 1) == xlrec->ntuples)
1226 10 : change->data.tp.clear_toast_afterwards = true;
1227 : else
1228 1039 : change->data.tp.clear_toast_afterwards = false;
1229 :
1230 1049 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
1231 : buf->origptr, change, false);
1232 :
1233 : /* move to the next xl_multi_insert_tuple entry */
1234 1049 : data += datalen;
1235 : }
1236 : Assert(data == tupledata + tuplelen);
1237 : }
1238 :
1239 : /*
1240 : * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
1241 : *
1242 : * This is pretty trivial, all the state essentially already setup by the
1243 : * speculative insertion.
1244 : */
1245 : static void
1246 17965 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1247 : {
1248 17965 : XLogReaderState *r = buf->record;
1249 : ReorderBufferChange *change;
1250 : RelFileLocator target_locator;
1251 :
1252 : /* only interested in our database */
1253 17965 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
1254 17965 : if (target_locator.dbOid != ctx->slot->data.database)
1255 0 : return;
1256 :
1257 : /* output plugin doesn't look for this origin, no need to queue */
1258 17965 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1259 0 : return;
1260 :
1261 17965 : change = ReorderBufferAllocChange(ctx->reorder);
1262 17965 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
1263 17965 : change->origin_id = XLogRecGetOrigin(r);
1264 :
1265 17965 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
1266 :
1267 17965 : change->data.tp.clear_toast_afterwards = true;
1268 :
1269 17965 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
1270 : change, false);
1271 : }
1272 :
1273 :
1274 : /*
1275 : * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
1276 : * (but not by heap_multi_insert) into a tuplebuf.
1277 : *
1278 : * The size 'len' and the pointer 'data' in the record need to be
1279 : * computed outside as they are record specific.
1280 : */
1281 : static void
1282 1475232 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
1283 : {
1284 : xl_heap_header xlhdr;
1285 1475232 : int datalen = len - SizeOfHeapHeader;
1286 : HeapTupleHeader header;
1287 :
1288 : Assert(datalen >= 0);
1289 :
1290 1475232 : tuple->t_len = datalen + SizeofHeapTupleHeader;
1291 1475232 : header = tuple->t_data;
1292 :
1293 : /* not a disk based tuple */
1294 1475232 : ItemPointerSetInvalid(&tuple->t_self);
1295 :
1296 : /* we can only figure this out after reassembling the transactions */
1297 1475232 : tuple->t_tableOid = InvalidOid;
1298 :
1299 : /* data is not stored aligned, copy to aligned storage */
1300 1475232 : memcpy(&xlhdr, data, SizeOfHeapHeader);
1301 :
1302 1475232 : memset(header, 0, SizeofHeapTupleHeader);
1303 :
1304 1475232 : memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
1305 1475232 : data + SizeOfHeapHeader,
1306 : datalen);
1307 :
1308 1475232 : header->t_infomask = xlhdr.t_infomask;
1309 1475232 : header->t_infomask2 = xlhdr.t_infomask2;
1310 1475232 : header->t_hoff = xlhdr.t_hoff;
1311 1475232 : }
1312 :
1313 : /*
1314 : * Check whether we are interested in this specific transaction.
1315 : *
1316 : * There can be several reasons we might not be interested in this
1317 : * transaction:
1318 : * 1) We might not be interested in decoding transactions up to this
1319 : * LSN. This can happen because we previously decoded it and now just
1320 : * are restarting or if we haven't assembled a consistent snapshot yet.
1321 : * 2) The transaction happened in another database.
1322 : * 3) The output plugin is not interested in the origin.
1323 : * 4) We are doing fast-forwarding
1324 : */
1325 : static bool
1326 4406 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
1327 : Oid txn_dbid, ReplOriginId origin_id)
1328 : {
1329 4406 : if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
1330 4337 : (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
1331 2195 : FilterByOrigin(ctx, origin_id))
1332 2249 : return true;
1333 :
1334 : /*
1335 : * We also skip decoding in fast_forward mode. In passing set the
1336 : * processing_required flag to indicate that if it were not for
1337 : * fast_forward mode, processing would have been required.
1338 : */
1339 2157 : if (ctx->fast_forward)
1340 : {
1341 39 : ctx->processing_required = true;
1342 39 : return true;
1343 : }
1344 :
1345 2118 : return false;
1346 : }
|