Line data Source code
1 : /* -------------------------------------------------------------------------
2 : *
3 : * decode.c
4 : * This module decodes WAL records read using xlogreader.h's APIs for the
5 : * purpose of logical decoding by passing information to the
6 : * reorderbuffer module (containing the actual changes) and to the
7 : * snapbuild module to build a fitting catalog snapshot (to be able to
8 : * properly decode the changes in the reorderbuffer).
9 : *
10 : * NOTE:
11 : * This basically tries to handle all low level xlog stuff for
12 : * reorderbuffer.c and snapbuild.c. There's some minor leakage where a
13 : * specific record's struct is used to pass data along, but those just
14 : * happen to contain the right amount of data in a convenient
15 : * format. There isn't and shouldn't be much intelligence about the
16 : * contents of records in here except turning them into a more usable
17 : * format.
18 : *
19 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
20 : * Portions Copyright (c) 1994, Regents of the University of California
21 : *
22 : * IDENTIFICATION
23 : * src/backend/replication/logical/decode.c
24 : *
25 : * -------------------------------------------------------------------------
26 : */
27 : #include "postgres.h"
28 :
29 : #include "access/heapam_xlog.h"
30 : #include "access/transam.h"
31 : #include "access/xact.h"
32 : #include "access/xlog_internal.h"
33 : #include "access/xlogreader.h"
34 : #include "access/xlogrecord.h"
35 : #include "catalog/pg_control.h"
36 : #include "commands/repack.h"
37 : #include "replication/decode.h"
38 : #include "replication/logical.h"
39 : #include "replication/message.h"
40 : #include "replication/reorderbuffer.h"
41 : #include "replication/snapbuild.h"
42 : #include "storage/standbydefs.h"
43 :
44 : /* individual record(group)'s handlers */
45 : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
46 : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
47 : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
48 : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
49 : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
50 : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
51 :
52 : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
53 : xl_xact_parsed_commit *parsed, TransactionId xid,
54 : bool two_phase);
55 : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
56 : xl_xact_parsed_abort *parsed, TransactionId xid,
57 : bool two_phase);
58 : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
59 : xl_xact_parsed_prepare *parsed);
60 :
61 :
62 : /* common function to decode tuples */
63 : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
64 :
65 : /* helper functions for decoding transactions */
66 : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
67 : TransactionId xid, const char *gid);
68 : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
69 : XLogRecordBuffer *buf, Oid txn_dbid,
70 : ReplOriginId origin_id);
71 :
72 : /*
73 : * Take every XLogReadRecord()ed record and perform the actions required to
74 : * decode it using the output plugin already setup in the logical decoding
75 : * context.
76 : *
77 : * NB: Note that every record's xid needs to be processed by reorderbuffer
78 : * (xids contained in the content of records are not relevant for this rule).
79 : * That means that for records which'd otherwise not go through the
80 : * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
81 : * call ReorderBufferProcessXid for each record type by default, because
82 : * e.g. empty xacts can be handled more efficiently if there's no previous
83 : * state for them.
84 : *
85 : * We also support the ability to fast forward thru records, skipping some
86 : * record types completely - see individual record types for details.
87 : */
88 : void
89 2593155 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
90 : {
91 : XLogRecordBuffer buf;
92 : TransactionId txid;
93 : RmgrData rmgr;
94 :
95 2593155 : buf.origptr = ctx->reader->ReadRecPtr;
96 2593155 : buf.endptr = ctx->reader->EndRecPtr;
97 2593155 : buf.record = record;
98 :
99 2593155 : txid = XLogRecGetTopXid(record);
100 :
101 : /*
102 : * If the top-level xid is valid, we need to assign the subxact to the
103 : * top-level xact. We need to do this for all records, hence we do it
104 : * before the switch.
105 : */
106 2593155 : if (TransactionIdIsValid(txid))
107 : {
108 687 : ReorderBufferAssignChild(ctx->reorder,
109 : txid,
110 687 : XLogRecGetXid(record),
111 : buf.origptr);
112 : }
113 :
114 2593155 : rmgr = GetRmgr(XLogRecGetRmid(record));
115 :
116 2593155 : if (rmgr.rm_decode != NULL)
117 1992592 : rmgr.rm_decode(ctx, &buf);
118 : else
119 : {
120 : /* just deal with xid, and done */
121 600563 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
122 : buf.origptr);
123 : }
124 2593140 : }
125 :
126 : /*
127 : * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
128 : */
129 : void
130 6430 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
131 : {
132 6430 : SnapBuild *builder = ctx->snapshot_builder;
133 6430 : uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
134 :
135 6430 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
136 : buf->origptr);
137 :
138 6430 : switch (info)
139 : {
140 : /* this is also used in END_OF_RECOVERY checkpoints */
141 82 : case XLOG_CHECKPOINT_SHUTDOWN:
142 : case XLOG_END_OF_RECOVERY:
143 82 : SnapBuildSerializationPoint(builder, buf->origptr);
144 :
145 82 : break;
146 81 : case XLOG_CHECKPOINT_ONLINE:
147 :
148 : /*
149 : * a RUNNING_XACTS record will have been logged near to this, we
150 : * can restart from there.
151 : */
152 81 : break;
153 0 : case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
154 : {
155 : bool logical_decoding;
156 :
157 0 : memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
158 :
159 : /*
160 : * Error out as we should not decode this WAL record.
161 : *
162 : * Logical decoding is disabled, and existing logical slots on
163 : * the standby are invalidated when this WAL record is
164 : * replayed. No logical decoder can process this WAL record
165 : * until replay completes, and by then the slots are already
166 : * invalidated. Furthermore, no new logical slots can be
167 : * created while logical decoding is disabled. This cannot
168 : * occur even on primary either, since it will not restart
169 : * with wal_level < replica if any logical slots exist.
170 : */
171 0 : elog(ERROR, "unexpected logical decoding status change %d",
172 : logical_decoding);
173 :
174 : break;
175 : }
176 6267 : case XLOG_NOOP:
177 : case XLOG_NEXTOID:
178 : case XLOG_SWITCH:
179 : case XLOG_BACKUP_END:
180 : case XLOG_PARAMETER_CHANGE:
181 : case XLOG_RESTORE_POINT:
182 : case XLOG_FPW_CHANGE:
183 : case XLOG_FPI_FOR_HINT:
184 : case XLOG_FPI:
185 : case XLOG_OVERWRITE_CONTRECORD:
186 : case XLOG_CHECKPOINT_REDO:
187 6267 : break;
188 0 : default:
189 0 : elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
190 : }
191 6430 : }
192 :
193 : void
194 0 : xlog2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
195 : {
196 0 : uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
197 :
198 0 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr);
199 :
200 0 : switch (info)
201 : {
202 0 : case XLOG2_CHECKSUMS:
203 0 : break;
204 0 : default:
205 0 : elog(ERROR, "unexpected RM_XLOG2_ID record type: %u", info);
206 : }
207 0 : }
208 :
209 : /*
210 : * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
211 : */
212 : void
213 10181 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
214 : {
215 10181 : SnapBuild *builder = ctx->snapshot_builder;
216 10181 : ReorderBuffer *reorder = ctx->reorder;
217 10181 : XLogReaderState *r = buf->record;
218 10181 : uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
219 :
220 : /*
221 : * If the snapshot isn't yet fully built, we cannot decode anything, so
222 : * bail out.
223 : */
224 10181 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
225 14 : return;
226 :
227 10167 : switch (info)
228 : {
229 3737 : case XLOG_XACT_COMMIT:
230 : case XLOG_XACT_COMMIT_PREPARED:
231 : {
232 : xl_xact_commit *xlrec;
233 : xl_xact_parsed_commit parsed;
234 : TransactionId xid;
235 3737 : bool two_phase = false;
236 :
237 3737 : xlrec = (xl_xact_commit *) XLogRecGetData(r);
238 3737 : ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
239 :
240 3737 : if (!TransactionIdIsValid(parsed.twophase_xid))
241 3610 : xid = XLogRecGetXid(r);
242 : else
243 127 : xid = parsed.twophase_xid;
244 :
245 : /*
246 : * We would like to process the transaction in a two-phase
247 : * manner iff output plugin supports two-phase commits and
248 : * doesn't filter the transaction at prepare time.
249 : */
250 3737 : if (info == XLOG_XACT_COMMIT_PREPARED)
251 127 : two_phase = !(FilterPrepare(ctx, xid,
252 127 : parsed.twophase_gid));
253 :
254 3737 : DecodeCommit(ctx, buf, &parsed, xid, two_phase);
255 3724 : break;
256 : }
257 218 : case XLOG_XACT_ABORT:
258 : case XLOG_XACT_ABORT_PREPARED:
259 : {
260 : xl_xact_abort *xlrec;
261 : xl_xact_parsed_abort parsed;
262 : TransactionId xid;
263 218 : bool two_phase = false;
264 :
265 218 : xlrec = (xl_xact_abort *) XLogRecGetData(r);
266 218 : ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
267 :
268 218 : if (!TransactionIdIsValid(parsed.twophase_xid))
269 168 : xid = XLogRecGetXid(r);
270 : else
271 50 : xid = parsed.twophase_xid;
272 :
273 : /*
274 : * We would like to process the transaction in a two-phase
275 : * manner iff output plugin supports two-phase commits and
276 : * doesn't filter the transaction at prepare time.
277 : */
278 218 : if (info == XLOG_XACT_ABORT_PREPARED)
279 50 : two_phase = !(FilterPrepare(ctx, xid,
280 50 : parsed.twophase_gid));
281 :
282 218 : DecodeAbort(ctx, buf, &parsed, xid, two_phase);
283 218 : break;
284 : }
285 133 : case XLOG_XACT_ASSIGNMENT:
286 :
287 : /*
288 : * We assign subxact to the toplevel xact while processing each
289 : * record if required. So, we don't need to do anything here. See
290 : * LogicalDecodingProcessRecord.
291 : */
292 133 : break;
293 5879 : case XLOG_XACT_INVALIDATIONS:
294 : {
295 : TransactionId xid;
296 : xl_xact_invals *invals;
297 :
298 5879 : xid = XLogRecGetXid(r);
299 5879 : invals = (xl_xact_invals *) XLogRecGetData(r);
300 :
301 : /*
302 : * Execute the invalidations for xid-less transactions,
303 : * otherwise, accumulate them so that they can be processed at
304 : * the commit time.
305 : */
306 5879 : if (TransactionIdIsValid(xid))
307 : {
308 5856 : if (!ctx->fast_forward)
309 5781 : ReorderBufferAddInvalidations(reorder, xid,
310 : buf->origptr,
311 5781 : invals->nmsgs,
312 5781 : invals->msgs);
313 5856 : ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
314 : buf->origptr);
315 : }
316 23 : else if (!ctx->fast_forward)
317 23 : ReorderBufferImmediateInvalidation(ctx->reorder,
318 23 : invals->nmsgs,
319 23 : invals->msgs);
320 :
321 5879 : break;
322 : }
323 200 : case XLOG_XACT_PREPARE:
324 : {
325 : xl_xact_parsed_prepare parsed;
326 : xl_xact_prepare *xlrec;
327 :
328 : /* ok, parse it */
329 200 : xlrec = (xl_xact_prepare *) XLogRecGetData(r);
330 200 : ParsePrepareRecord(XLogRecGetInfo(buf->record),
331 : xlrec, &parsed);
332 :
333 : /*
334 : * We would like to process the transaction in a two-phase
335 : * manner iff output plugin supports two-phase commits and
336 : * doesn't filter the transaction at prepare time.
337 : */
338 200 : if (FilterPrepare(ctx, parsed.twophase_xid,
339 : parsed.twophase_gid))
340 : {
341 20 : ReorderBufferProcessXid(reorder, parsed.twophase_xid,
342 : buf->origptr);
343 20 : break;
344 : }
345 :
346 : /*
347 : * Note that if the prepared transaction has locked [user]
348 : * catalog tables exclusively then decoding prepare can block
349 : * till the main transaction is committed because it needs to
350 : * lock the catalog tables.
351 : *
352 : * XXX Now, this can even lead to a deadlock if the prepare
353 : * transaction is waiting to get it logically replicated for
354 : * distributed 2PC. This can be avoided by disallowing
355 : * preparing transactions that have locked [user] catalog
356 : * tables exclusively but as of now, we ask users not to do
357 : * such an operation.
358 : */
359 180 : DecodePrepare(ctx, buf, &parsed);
360 180 : break;
361 : }
362 0 : default:
363 0 : elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
364 : }
365 : }
366 :
367 : /*
368 : * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
369 : */
370 : void
371 4324 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
372 : {
373 4324 : SnapBuild *builder = ctx->snapshot_builder;
374 4324 : XLogReaderState *r = buf->record;
375 4324 : uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
376 :
377 4324 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
378 :
379 4324 : switch (info)
380 : {
381 1673 : case XLOG_RUNNING_XACTS:
382 : {
383 1673 : xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
384 :
385 : /*
386 : * Update this decoder's idea of transactions currently
387 : * running. In doing so we will determine whether we have
388 : * reached consistent status.
389 : */
390 1673 : SnapBuildProcessRunningXacts(builder, buf->origptr, running);
391 :
392 : /*
393 : * Abort all transactions that we keep track of, that are
394 : * older than the record's oldestRunningXid. This is the most
395 : * convenient spot for doing so since, in contrast to shutdown
396 : * or end-of-recovery checkpoints, we have information about
397 : * all running transactions which includes prepared ones,
398 : * while shutdown checkpoints just know that no non-prepared
399 : * transactions are in progress.
400 : */
401 1671 : ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
402 : }
403 1671 : break;
404 2628 : case XLOG_STANDBY_LOCK:
405 2628 : break;
406 23 : case XLOG_INVALIDATIONS:
407 :
408 : /*
409 : * We are processing the invalidations at the command level via
410 : * XLOG_XACT_INVALIDATIONS. So we don't need to do anything here.
411 : */
412 23 : break;
413 0 : default:
414 0 : elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
415 : }
416 4322 : }
417 :
418 : /*
419 : * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
420 : */
421 : void
422 37277 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
423 : {
424 37277 : uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
425 37277 : TransactionId xid = XLogRecGetXid(buf->record);
426 37277 : SnapBuild *builder = ctx->snapshot_builder;
427 :
428 37277 : ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
429 :
430 : /*
431 : * If we don't have snapshot or we are just fast-forwarding, there is no
432 : * point in decoding data changes. However, it's crucial to build the base
433 : * snapshot during fast-forward mode (as is done in
434 : * SnapBuildProcessChange()) because we require the snapshot's xmin when
435 : * determining the candidate catalog_xmin for the replication slot. See
436 : * SnapBuildProcessRunningXacts().
437 : */
438 37277 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
439 9 : return;
440 :
441 37268 : switch (info)
442 : {
443 6985 : case XLOG_HEAP2_MULTI_INSERT:
444 6985 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
445 6985 : !ctx->fast_forward &&
446 6882 : !change_useless_for_repack(buf))
447 6808 : DecodeMultiInsert(ctx, buf);
448 6985 : break;
449 27828 : case XLOG_HEAP2_NEW_CID:
450 27828 : if (!ctx->fast_forward)
451 : {
452 : xl_heap_new_cid *xlrec;
453 :
454 27445 : xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
455 27445 : SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
456 : }
457 27828 : break;
458 90 : case XLOG_HEAP2_REWRITE:
459 :
460 : /*
461 : * Although these records only exist to serve the needs of logical
462 : * decoding, all the work happens as part of crash or archive
463 : * recovery, so we don't need to do anything here.
464 : */
465 90 : break;
466 :
467 : /*
468 : * Everything else here is just low level physical stuff we're not
469 : * interested in.
470 : */
471 2365 : case XLOG_HEAP2_PRUNE_ON_ACCESS:
472 : case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
473 : case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
474 : case XLOG_HEAP2_LOCK_UPDATED:
475 2365 : break;
476 0 : default:
477 0 : elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
478 : }
479 : }
480 :
481 : /*
482 : * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
483 : */
484 : void
485 1934282 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
486 : {
487 1934282 : uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
488 1934282 : TransactionId xid = XLogRecGetXid(buf->record);
489 1934282 : SnapBuild *builder = ctx->snapshot_builder;
490 :
491 1934282 : ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
492 :
493 : /*
494 : * If we don't have snapshot or we are just fast-forwarding, there is no
495 : * point in decoding data changes. However, it's crucial to build the base
496 : * snapshot during fast-forward mode (as is done in
497 : * SnapBuildProcessChange()) because we require the snapshot's xmin when
498 : * determining the candidate catalog_xmin for the replication slot. See
499 : * SnapBuildProcessRunningXacts().
500 : */
501 1934282 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
502 4 : return;
503 :
504 1934278 : switch (info)
505 : {
506 1248758 : case XLOG_HEAP_INSERT:
507 1248758 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
508 1248758 : !ctx->fast_forward &&
509 1248560 : !change_useless_for_repack(buf))
510 1248341 : DecodeInsert(ctx, buf);
511 1248758 : break;
512 :
513 : /*
514 : * Treat HOT update as normal updates. There is no useful
515 : * information in the fact that we could make it a HOT update
516 : * locally and the WAL layout is compatible.
517 : */
518 207657 : case XLOG_HEAP_HOT_UPDATE:
519 : case XLOG_HEAP_UPDATE:
520 207657 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
521 207657 : !ctx->fast_forward &&
522 207644 : !change_useless_for_repack(buf))
523 207609 : DecodeUpdate(ctx, buf);
524 207657 : break;
525 :
526 268064 : case XLOG_HEAP_DELETE:
527 268064 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
528 268064 : !ctx->fast_forward &&
529 267970 : !change_useless_for_repack(buf))
530 267945 : DecodeDelete(ctx, buf);
531 268064 : break;
532 :
533 60 : case XLOG_HEAP_TRUNCATE:
534 60 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
535 60 : !ctx->fast_forward &&
536 58 : !change_useless_for_repack(buf))
537 58 : DecodeTruncate(ctx, buf);
538 60 : break;
539 :
540 1217 : case XLOG_HEAP_INPLACE:
541 :
542 : /*
543 : * Inplace updates are only ever performed on catalog tuples and
544 : * can, per definition, not change tuple visibility. Since we
545 : * also don't decode catalog tuples, we're not interested in the
546 : * record's contents.
547 : */
548 1217 : break;
549 :
550 17964 : case XLOG_HEAP_CONFIRM:
551 17964 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
552 17964 : !ctx->fast_forward &&
553 17964 : !change_useless_for_repack(buf))
554 17964 : DecodeSpecConfirm(ctx, buf);
555 17964 : break;
556 :
557 190558 : case XLOG_HEAP_LOCK:
558 : /* we don't care about row level locks for now */
559 190558 : break;
560 :
561 0 : default:
562 0 : elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
563 : break;
564 : }
565 : }
566 :
567 : /*
568 : * Ask output plugin whether we want to skip this PREPARE and send
569 : * this transaction as a regular commit later.
570 : */
571 : static inline bool
572 377 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
573 : const char *gid)
574 : {
575 : /*
576 : * Skip if decoding of two-phase transactions at PREPARE time is not
577 : * enabled. In that case, all two-phase transactions are considered
578 : * filtered out and will be applied as regular transactions at COMMIT
579 : * PREPARED.
580 : */
581 377 : if (!ctx->twophase)
582 23 : return true;
583 :
584 : /*
585 : * The filter_prepare callback is optional. When not supplied, all
586 : * prepared transactions should go through.
587 : */
588 354 : if (ctx->callbacks.filter_prepare_cb == NULL)
589 206 : return false;
590 :
591 148 : return filter_prepare_cb_wrapper(ctx, xid, gid);
592 : }
593 :
594 : static inline bool
595 1739285 : FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
596 : {
597 1739285 : if (ctx->callbacks.filter_by_origin_cb == NULL)
598 132 : return false;
599 :
600 1739153 : return filter_by_origin_cb_wrapper(ctx, origin_id);
601 : }
602 :
603 : /*
604 : * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
605 : */
606 : void
607 98 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
608 : {
609 98 : SnapBuild *builder = ctx->snapshot_builder;
610 98 : XLogReaderState *r = buf->record;
611 98 : TransactionId xid = XLogRecGetXid(r);
612 98 : uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
613 98 : ReplOriginId origin_id = XLogRecGetOrigin(r);
614 98 : Snapshot snapshot = NULL;
615 : xl_logical_message *message;
616 :
617 98 : if (info != XLOG_LOGICAL_MESSAGE)
618 0 : elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
619 :
620 98 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
621 :
622 : /* If we don't have snapshot, there is no point in decoding messages */
623 98 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
624 0 : return;
625 :
626 98 : message = (xl_logical_message *) XLogRecGetData(r);
627 :
628 194 : if (message->dbId != ctx->slot->data.database ||
629 96 : FilterByOrigin(ctx, origin_id))
630 4 : return;
631 :
632 94 : if (message->transactional &&
633 39 : !SnapBuildProcessChange(builder, xid, buf->origptr))
634 0 : return;
635 149 : else if (!message->transactional &&
636 110 : (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
637 55 : SnapBuildXactNeedsSkip(builder, buf->origptr)))
638 44 : return;
639 :
640 : /*
641 : * We also skip decoding in fast_forward mode. This check must be last
642 : * because we don't want to set the processing_required flag unless we
643 : * have a decodable message.
644 : */
645 50 : if (ctx->fast_forward)
646 : {
647 : /*
648 : * We need to set processing_required flag to notify the message's
649 : * existence to the caller. Usually, the flag is set when either the
650 : * COMMIT or ABORT records are decoded, but this must be turned on
651 : * here because the non-transactional logical message is decoded
652 : * without waiting for these records.
653 : */
654 3 : if (!message->transactional)
655 3 : ctx->processing_required = true;
656 :
657 3 : return;
658 : }
659 :
660 : /*
661 : * If this is a non-transactional change, get the snapshot we're expected
662 : * to use. We only get here when the snapshot is consistent, and the
663 : * change is not meant to be skipped.
664 : *
665 : * For transactional changes we don't need a snapshot, we'll use the
666 : * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
667 : */
668 47 : if (!message->transactional)
669 8 : snapshot = SnapBuildGetOrBuildSnapshot(builder);
670 :
671 47 : ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
672 47 : message->transactional,
673 47 : message->message, /* first part of message is
674 : * prefix */
675 : message->message_size,
676 47 : message->message + message->prefix_size);
677 : }
678 :
679 : /*
680 : * Consolidated commit record handling between the different form of commit
681 : * records.
682 : *
683 : * 'two_phase' indicates that caller wants to process the transaction in two
684 : * phases, first process prepare if not already done and then process
685 : * commit_prepared.
686 : */
687 : static void
688 3737 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
689 : xl_xact_parsed_commit *parsed, TransactionId xid,
690 : bool two_phase)
691 : {
692 3737 : XLogRecPtr origin_lsn = InvalidXLogRecPtr;
693 3737 : TimestampTz commit_time = parsed->xact_time;
694 3737 : ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
695 : int i;
696 :
697 3737 : if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
698 : {
699 100 : origin_lsn = parsed->origin_lsn;
700 100 : commit_time = parsed->origin_timestamp;
701 : }
702 :
703 3737 : SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
704 : parsed->nsubxacts, parsed->subxacts,
705 : parsed->xinfo);
706 :
707 : /* ----
708 : * Check whether we are interested in this specific transaction, and tell
709 : * the reorderbuffer to forget the content of the (sub-)transactions
710 : * if not.
711 : *
712 : * We can't just use ReorderBufferAbort() here, because we need to execute
713 : * the transaction's invalidations. This currently won't be needed if
714 : * we're just skipping over the transaction because currently we only do
715 : * so during startup, to get to the first transaction the client needs. As
716 : * we have reset the catalog caches before starting to read WAL, and we
717 : * haven't yet touched any catalogs, there can't be anything to invalidate.
718 : * But if we're "forgetting" this commit because it happened in another
719 : * database, the invalidations might be important, because they could be
720 : * for shared catalogs and we might have loaded data into the relevant
721 : * syscaches.
722 : * ---
723 : */
724 3737 : if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
725 : {
726 2937 : for (i = 0; i < parsed->nsubxacts; i++)
727 : {
728 983 : ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
729 : }
730 1954 : ReorderBufferForget(ctx->reorder, xid, buf->origptr);
731 :
732 1954 : return;
733 : }
734 :
735 : /* tell the reorderbuffer about the surviving subtransactions */
736 2052 : for (i = 0; i < parsed->nsubxacts; i++)
737 : {
738 269 : ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
739 : buf->origptr, buf->endptr);
740 : }
741 :
742 : /*
743 : * Send the final commit record if the transaction data is already
744 : * decoded, otherwise, process the entire transaction.
745 : */
746 1783 : if (two_phase)
747 : {
748 36 : ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
749 36 : SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
750 : commit_time, origin_id, origin_lsn,
751 36 : parsed->twophase_gid, true);
752 : }
753 : else
754 : {
755 1747 : ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
756 : commit_time, origin_id, origin_lsn);
757 : }
758 :
759 : /*
760 : * Update the decoding stats at transaction prepare/commit/abort.
761 : * Additionally we send the stats when we spill or stream the changes to
762 : * avoid losing them in case the decoding is interrupted. It is not clear
763 : * that sending more or less frequently than this would be better.
764 : */
765 1770 : UpdateDecodingStats(ctx);
766 : }
767 :
768 : /*
769 : * Decode PREPARE record. Similar logic as in DecodeCommit.
770 : *
771 : * Note that we don't skip prepare even if have detected concurrent abort
772 : * because it is quite possible that we had already sent some changes before we
773 : * detect abort in which case we need to abort those changes in the subscriber.
774 : * To abort such changes, we do send the prepare and then the rollback prepared
775 : * which is what happened on the publisher-side as well. Now, we can invent a
776 : * new abort API wherein in such cases we send abort and skip sending prepared
777 : * and rollback prepared but then it is not that straightforward because we
778 : * might have streamed this transaction by that time in which case it is
779 : * handled when the rollback is encountered. It is not impossible to optimize
780 : * the concurrent abort case but it can introduce design complexity w.r.t
781 : * handling different cases so leaving it for now as it doesn't seem worth it.
782 : */
783 : static void
784 180 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
785 : xl_xact_parsed_prepare *parsed)
786 : {
787 180 : SnapBuild *builder = ctx->snapshot_builder;
788 180 : XLogRecPtr origin_lsn = parsed->origin_lsn;
789 180 : TimestampTz prepare_time = parsed->xact_time;
790 180 : ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
791 : int i;
792 180 : TransactionId xid = parsed->twophase_xid;
793 :
794 180 : if (parsed->origin_timestamp != 0)
795 5 : prepare_time = parsed->origin_timestamp;
796 :
797 : /*
798 : * Remember the prepare info for a txn so that it can be used later in
799 : * commit prepared if required. See ReorderBufferFinishPrepared.
800 : */
801 180 : if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
802 : buf->endptr, prepare_time, origin_id,
803 : origin_lsn))
804 0 : return;
805 :
806 : /* We can't start streaming unless a consistent state is reached. */
807 180 : if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
808 : {
809 3 : ReorderBufferSkipPrepare(ctx->reorder, xid);
810 3 : return;
811 : }
812 :
813 : /*
814 : * Check whether we need to process this transaction. See
815 : * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
816 : * transaction.
817 : *
818 : * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
819 : * hasn't yet been committed, removing this txn before a commit might
820 : * result in the computation of an incorrect restart_lsn. See
821 : * SnapBuildProcessRunningXacts. But we need to process cache
822 : * invalidations if there are any for the reasons mentioned in
823 : * DecodeCommit.
824 : */
825 177 : if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
826 : {
827 131 : ReorderBufferSkipPrepare(ctx->reorder, xid);
828 131 : ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
829 131 : return;
830 : }
831 :
832 : /* Tell the reorderbuffer about the surviving subtransactions. */
833 47 : for (i = 0; i < parsed->nsubxacts; i++)
834 : {
835 1 : ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
836 : buf->origptr, buf->endptr);
837 : }
838 :
839 : /* replay actions of all transaction + subtransactions in order */
840 46 : ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
841 :
842 : /*
843 : * Update the decoding stats at transaction prepare/commit/abort.
844 : * Additionally we send the stats when we spill or stream the changes to
845 : * avoid losing them in case the decoding is interrupted. It is not clear
846 : * that sending more or less frequently than this would be better.
847 : */
848 46 : UpdateDecodingStats(ctx);
849 : }
850 :
851 :
852 : /*
853 : * Get the data from the various forms of abort records and pass it on to
854 : * snapbuild.c and reorderbuffer.c.
855 : *
856 : * 'two_phase' indicates to finish prepared transaction.
857 : */
858 : static void
859 218 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
860 : xl_xact_parsed_abort *parsed, TransactionId xid,
861 : bool two_phase)
862 : {
863 : int i;
864 218 : XLogRecPtr origin_lsn = InvalidXLogRecPtr;
865 218 : TimestampTz abort_time = parsed->xact_time;
866 218 : ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
867 : bool skip_xact;
868 :
869 218 : if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
870 : {
871 3 : origin_lsn = parsed->origin_lsn;
872 3 : abort_time = parsed->origin_timestamp;
873 : }
874 :
875 : /*
876 : * Check whether we need to process this transaction. See
877 : * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
878 : * transaction.
879 : */
880 218 : skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
881 :
882 : /*
883 : * Send the final rollback record for a prepared transaction unless we
884 : * need to skip it. For non-two-phase xacts, simply forget the xact.
885 : */
886 218 : if (two_phase && !skip_xact)
887 : {
888 11 : ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
889 : InvalidXLogRecPtr,
890 : abort_time, origin_id, origin_lsn,
891 11 : parsed->twophase_gid, false);
892 : }
893 : else
894 : {
895 213 : for (i = 0; i < parsed->nsubxacts; i++)
896 : {
897 6 : ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
898 6 : buf->record->EndRecPtr, abort_time);
899 : }
900 :
901 207 : ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
902 : abort_time);
903 : }
904 :
905 : /* update the decoding stats */
906 218 : UpdateDecodingStats(ctx);
907 218 : }
908 :
909 : /*
910 : * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
911 : *
912 : * Inserts can contain the new tuple.
913 : */
914 : static void
915 1248341 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
916 : {
917 : Size datalen;
918 : char *tupledata;
919 : Size tuplelen;
920 1248341 : XLogReaderState *r = buf->record;
921 : xl_heap_insert *xlrec;
922 : ReorderBufferChange *change;
923 : RelFileLocator target_locator;
924 :
925 1248341 : xlrec = (xl_heap_insert *) XLogRecGetData(r);
926 :
927 : /*
928 : * Ignore insert records without new tuples (this does happen when
929 : * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
930 : */
931 1248341 : if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
932 4625 : return;
933 :
934 : /* only interested in our database */
935 1243835 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
936 1243835 : if (target_locator.dbOid != ctx->slot->data.database)
937 0 : return;
938 :
939 : /* output plugin doesn't look for this origin, no need to queue */
940 1243835 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
941 119 : return;
942 :
943 1243716 : change = ReorderBufferAllocChange(ctx->reorder);
944 1243716 : if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
945 1225752 : change->action = REORDER_BUFFER_CHANGE_INSERT;
946 : else
947 17964 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
948 1243716 : change->origin_id = XLogRecGetOrigin(r);
949 :
950 1243716 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
951 :
952 1243716 : tupledata = XLogRecGetBlockData(r, 0, &datalen);
953 1243716 : tuplelen = datalen - SizeOfHeapHeader;
954 :
955 1243716 : change->data.tp.newtuple =
956 1243716 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
957 :
958 1243716 : DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
959 :
960 1243716 : change->data.tp.clear_toast_afterwards = true;
961 :
962 1243716 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
963 : change,
964 1243716 : xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
965 : }
966 :
967 : /*
968 : * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
969 : * in the record, from wal into proper tuplebufs.
970 : *
971 : * Updates can possibly contain a new tuple and the old primary key.
972 : */
973 : static void
974 207609 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
975 : {
976 207609 : XLogReaderState *r = buf->record;
977 : xl_heap_update *xlrec;
978 : ReorderBufferChange *change;
979 : char *data;
980 : RelFileLocator target_locator;
981 :
982 207609 : xlrec = (xl_heap_update *) XLogRecGetData(r);
983 :
984 : /* only interested in our database */
985 207609 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
986 207609 : if (target_locator.dbOid != ctx->slot->data.database)
987 208 : return;
988 :
989 : /* output plugin doesn't look for this origin, no need to queue */
990 207426 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
991 25 : return;
992 :
993 207401 : change = ReorderBufferAllocChange(ctx->reorder);
994 207401 : change->action = REORDER_BUFFER_CHANGE_UPDATE;
995 207401 : change->origin_id = XLogRecGetOrigin(r);
996 207401 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
997 :
998 207401 : if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
999 : {
1000 : Size datalen;
1001 : Size tuplelen;
1002 :
1003 205585 : data = XLogRecGetBlockData(r, 0, &datalen);
1004 :
1005 205585 : tuplelen = datalen - SizeOfHeapHeader;
1006 :
1007 205585 : change->data.tp.newtuple =
1008 205585 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
1009 :
1010 205585 : DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
1011 : }
1012 :
1013 207401 : if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
1014 : {
1015 : Size datalen;
1016 : Size tuplelen;
1017 :
1018 : /* caution, remaining data in record is not aligned */
1019 332 : data = XLogRecGetData(r) + SizeOfHeapUpdate;
1020 332 : datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
1021 332 : tuplelen = datalen - SizeOfHeapHeader;
1022 :
1023 332 : change->data.tp.oldtuple =
1024 332 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
1025 :
1026 332 : DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
1027 : }
1028 :
1029 207401 : change->data.tp.clear_toast_afterwards = true;
1030 :
1031 207401 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
1032 : change, false);
1033 : }
1034 :
1035 : /*
1036 : * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
1037 : *
1038 : * Deletes can possibly contain the old primary key.
1039 : */
1040 : static void
1041 267945 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1042 : {
1043 267945 : XLogReaderState *r = buf->record;
1044 : xl_heap_delete *xlrec;
1045 : ReorderBufferChange *change;
1046 : RelFileLocator target_locator;
1047 :
1048 267945 : xlrec = (xl_heap_delete *) XLogRecGetData(r);
1049 :
1050 : /*
1051 : * Skip changes that were marked as ignorable at origin.
1052 : *
1053 : * (This is used for changes that affect relations not visible to other
1054 : * transactions, such as the transient table during concurrent repack.)
1055 : */
1056 267945 : if (xlrec->flags & XLH_DELETE_NO_LOGICAL)
1057 53 : return;
1058 :
1059 : /* only interested in our database */
1060 267945 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
1061 267945 : if (target_locator.dbOid != ctx->slot->data.database)
1062 35 : return;
1063 :
1064 : /* output plugin doesn't look for this origin, no need to queue */
1065 267910 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1066 18 : return;
1067 :
1068 267892 : change = ReorderBufferAllocChange(ctx->reorder);
1069 :
1070 267892 : if (xlrec->flags & XLH_DELETE_IS_SUPER)
1071 0 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
1072 : else
1073 267892 : change->action = REORDER_BUFFER_CHANGE_DELETE;
1074 :
1075 267892 : change->origin_id = XLogRecGetOrigin(r);
1076 :
1077 267892 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
1078 :
1079 : /* old primary key stored */
1080 267892 : if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
1081 : {
1082 205869 : Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
1083 205869 : Size tuplelen = datalen - SizeOfHeapHeader;
1084 :
1085 : Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
1086 :
1087 205869 : change->data.tp.oldtuple =
1088 205869 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
1089 :
1090 205869 : DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
1091 : datalen, change->data.tp.oldtuple);
1092 : }
1093 :
1094 267892 : change->data.tp.clear_toast_afterwards = true;
1095 :
1096 267892 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
1097 : change, false);
1098 : }
1099 :
1100 : /*
1101 : * Parse XLOG_HEAP_TRUNCATE from wal
1102 : */
1103 : static void
1104 58 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1105 : {
1106 58 : XLogReaderState *r = buf->record;
1107 : xl_heap_truncate *xlrec;
1108 : ReorderBufferChange *change;
1109 :
1110 58 : xlrec = (xl_heap_truncate *) XLogRecGetData(r);
1111 :
1112 : /* only interested in our database */
1113 58 : if (xlrec->dbId != ctx->slot->data.database)
1114 0 : return;
1115 :
1116 : /* output plugin doesn't look for this origin, no need to queue */
1117 58 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1118 1 : return;
1119 :
1120 57 : change = ReorderBufferAllocChange(ctx->reorder);
1121 57 : change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
1122 57 : change->origin_id = XLogRecGetOrigin(r);
1123 57 : if (xlrec->flags & XLH_TRUNCATE_CASCADE)
1124 1 : change->data.truncate.cascade = true;
1125 57 : if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
1126 2 : change->data.truncate.restart_seqs = true;
1127 57 : change->data.truncate.nrelids = xlrec->nrelids;
1128 114 : change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
1129 57 : xlrec->nrelids);
1130 57 : memcpy(change->data.truncate.relids, xlrec->relids,
1131 57 : xlrec->nrelids * sizeof(Oid));
1132 57 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
1133 : buf->origptr, change, false);
1134 : }
1135 :
1136 : /*
1137 : * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
1138 : *
1139 : * Currently MULTI_INSERT will always contain the full tuples.
1140 : */
1141 : static void
1142 6808 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1143 : {
1144 6808 : XLogReaderState *r = buf->record;
1145 : xl_heap_multi_insert *xlrec;
1146 : int i;
1147 : char *data;
1148 : char *tupledata;
1149 : Size tuplelen;
1150 : RelFileLocator rlocator;
1151 :
1152 6808 : xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
1153 :
1154 : /*
1155 : * Ignore insert records without new tuples. This happens when a
1156 : * multi_insert is done on a catalog or on a non-persistent relation.
1157 : */
1158 6808 : if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
1159 6794 : return;
1160 :
1161 : /* only interested in our database */
1162 49 : XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
1163 49 : if (rlocator.dbOid != ctx->slot->data.database)
1164 35 : return;
1165 :
1166 : /* output plugin doesn't look for this origin, no need to queue */
1167 14 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1168 0 : return;
1169 :
1170 : /*
1171 : * We know that this multi_insert isn't for a catalog, so the block should
1172 : * always have data even if a full-page write of it is taken.
1173 : */
1174 14 : tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
1175 : Assert(tupledata != NULL);
1176 :
1177 14 : data = tupledata;
1178 1053 : for (i = 0; i < xlrec->ntuples; i++)
1179 : {
1180 : ReorderBufferChange *change;
1181 : xl_multi_insert_tuple *xlhdr;
1182 : int datalen;
1183 : HeapTuple tuple;
1184 : HeapTupleHeader header;
1185 :
1186 1039 : change = ReorderBufferAllocChange(ctx->reorder);
1187 1039 : change->action = REORDER_BUFFER_CHANGE_INSERT;
1188 1039 : change->origin_id = XLogRecGetOrigin(r);
1189 :
1190 1039 : memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
1191 :
1192 1039 : xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
1193 1039 : data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
1194 1039 : datalen = xlhdr->datalen;
1195 :
1196 1039 : change->data.tp.newtuple =
1197 1039 : ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
1198 :
1199 1039 : tuple = change->data.tp.newtuple;
1200 1039 : header = tuple->t_data;
1201 :
1202 : /* not a disk based tuple */
1203 1039 : ItemPointerSetInvalid(&tuple->t_self);
1204 :
1205 : /*
1206 : * We can only figure this out after reassembling the transactions.
1207 : */
1208 1039 : tuple->t_tableOid = InvalidOid;
1209 :
1210 1039 : tuple->t_len = datalen + SizeofHeapTupleHeader;
1211 :
1212 1039 : memset(header, 0, SizeofHeapTupleHeader);
1213 :
1214 1039 : memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
1215 1039 : header->t_infomask = xlhdr->t_infomask;
1216 1039 : header->t_infomask2 = xlhdr->t_infomask2;
1217 1039 : header->t_hoff = xlhdr->t_hoff;
1218 :
1219 : /*
1220 : * Reset toast reassembly state only after the last row in the last
1221 : * xl_multi_insert_tuple record emitted by one heap_multi_insert()
1222 : * call.
1223 : */
1224 1039 : if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
1225 179 : (i + 1) == xlrec->ntuples)
1226 9 : change->data.tp.clear_toast_afterwards = true;
1227 : else
1228 1030 : change->data.tp.clear_toast_afterwards = false;
1229 :
1230 1039 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
1231 : buf->origptr, change, false);
1232 :
1233 : /* move to the next xl_multi_insert_tuple entry */
1234 1039 : data += datalen;
1235 : }
1236 : Assert(data == tupledata + tuplelen);
1237 : }
1238 :
1239 : /*
1240 : * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
1241 : *
1242 : * This is pretty trivial, all the state essentially already setup by the
1243 : * speculative insertion.
1244 : */
1245 : static void
1246 17964 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1247 : {
1248 17964 : XLogReaderState *r = buf->record;
1249 : ReorderBufferChange *change;
1250 : RelFileLocator target_locator;
1251 :
1252 : /* only interested in our database */
1253 17964 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
1254 17964 : if (target_locator.dbOid != ctx->slot->data.database)
1255 0 : return;
1256 :
1257 : /* output plugin doesn't look for this origin, no need to queue */
1258 17964 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1259 0 : return;
1260 :
1261 17964 : change = ReorderBufferAllocChange(ctx->reorder);
1262 17964 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
1263 17964 : change->origin_id = XLogRecGetOrigin(r);
1264 :
1265 17964 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
1266 :
1267 17964 : change->data.tp.clear_toast_afterwards = true;
1268 :
1269 17964 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
1270 : change, false);
1271 : }
1272 :
1273 :
1274 : /*
1275 : * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
1276 : * (but not by heap_multi_insert) into a tuplebuf.
1277 : *
1278 : * The size 'len' and the pointer 'data' in the record need to be
1279 : * computed outside as they are record specific.
1280 : */
1281 : static void
1282 1655502 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
1283 : {
1284 : xl_heap_header xlhdr;
1285 1655502 : int datalen = len - SizeOfHeapHeader;
1286 : HeapTupleHeader header;
1287 :
1288 : Assert(datalen >= 0);
1289 :
1290 1655502 : tuple->t_len = datalen + SizeofHeapTupleHeader;
1291 1655502 : header = tuple->t_data;
1292 :
1293 : /* not a disk based tuple */
1294 1655502 : ItemPointerSetInvalid(&tuple->t_self);
1295 :
1296 : /* we can only figure this out after reassembling the transactions */
1297 1655502 : tuple->t_tableOid = InvalidOid;
1298 :
1299 : /* data is not stored aligned, copy to aligned storage */
1300 1655502 : memcpy(&xlhdr, data, SizeOfHeapHeader);
1301 :
1302 1655502 : memset(header, 0, SizeofHeapTupleHeader);
1303 :
1304 1655502 : memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
1305 1655502 : data + SizeOfHeapHeader,
1306 : datalen);
1307 :
1308 1655502 : header->t_infomask = xlhdr.t_infomask;
1309 1655502 : header->t_infomask2 = xlhdr.t_infomask2;
1310 1655502 : header->t_hoff = xlhdr.t_hoff;
1311 1655502 : }
1312 :
1313 : /*
1314 : * Check whether we are interested in this specific transaction.
1315 : *
1316 : * There can be several reasons we might not be interested in this
1317 : * transaction:
1318 : * 1) We might not be interested in decoding transactions up to this
1319 : * LSN. This can happen because we previously decoded it and now just
1320 : * are restarting or if we haven't assembled a consistent snapshot yet.
1321 : * 2) The transaction happened in another database.
1322 : * 3) The output plugin is not interested in the origin.
1323 : * 4) We are doing fast-forwarding
1324 : */
1325 : static bool
1326 4132 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
1327 : Oid txn_dbid, ReplOriginId origin_id)
1328 : {
1329 4132 : if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
1330 3921 : (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
1331 1982 : FilterByOrigin(ctx, origin_id))
1332 2188 : return true;
1333 :
1334 : /*
1335 : * We also skip decoding in fast_forward mode. In passing set the
1336 : * processing_required flag to indicate that if it were not for
1337 : * fast_forward mode, processing would have been required.
1338 : */
1339 1944 : if (ctx->fast_forward)
1340 : {
1341 40 : ctx->processing_required = true;
1342 40 : return true;
1343 : }
1344 :
1345 1904 : return false;
1346 : }
|