Line data Source code
1 : /* -------------------------------------------------------------------------
2 : *
3 : * decode.c
4 : * This module decodes WAL records read using xlogreader.h's APIs for the
5 : * purpose of logical decoding by passing information to the
6 : * reorderbuffer module (containing the actual changes) and to the
7 : * snapbuild module to build a fitting catalog snapshot (to be able to
8 : * properly decode the changes in the reorderbuffer).
9 : *
10 : * NOTE:
11 : * This basically tries to handle all low level xlog stuff for
12 : * reorderbuffer.c and snapbuild.c. There's some minor leakage where a
13 : * specific record's struct is used to pass data along, but those just
14 : * happen to contain the right amount of data in a convenient
15 : * format. There isn't and shouldn't be much intelligence about the
16 : * contents of records in here except turning them into a more usable
17 : * format.
18 : *
19 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
20 : * Portions Copyright (c) 1994, Regents of the University of California
21 : *
22 : * IDENTIFICATION
23 : * src/backend/replication/logical/decode.c
24 : *
25 : * -------------------------------------------------------------------------
26 : */
27 : #include "postgres.h"
28 :
29 : #include "access/heapam_xlog.h"
30 : #include "access/transam.h"
31 : #include "access/xact.h"
32 : #include "access/xlog_internal.h"
33 : #include "access/xlogreader.h"
34 : #include "access/xlogrecord.h"
35 : #include "catalog/pg_control.h"
36 : #include "commands/repack.h"
37 : #include "replication/decode.h"
38 : #include "replication/logical.h"
39 : #include "replication/message.h"
40 : #include "replication/reorderbuffer.h"
41 : #include "replication/snapbuild.h"
42 : #include "storage/standbydefs.h"
43 :
44 : /* individual record(group)'s handlers */
45 : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
46 : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
47 : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
48 : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
49 : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
50 : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
51 :
52 : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
53 : xl_xact_parsed_commit *parsed, TransactionId xid,
54 : bool two_phase);
55 : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
56 : xl_xact_parsed_abort *parsed, TransactionId xid,
57 : bool two_phase);
58 : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
59 : xl_xact_parsed_prepare *parsed);
60 :
61 :
62 : /* common function to decode tuples */
63 : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
64 :
65 : /* helper functions for decoding transactions */
66 : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
67 : TransactionId xid, const char *gid);
68 : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
69 : XLogRecordBuffer *buf, Oid txn_dbid,
70 : ReplOriginId origin_id);
71 :
72 : /*
73 : * Take every XLogReadRecord()ed record and perform the actions required to
74 : * decode it using the output plugin already setup in the logical decoding
75 : * context.
76 : *
77 : * NB: Note that every record's xid needs to be processed by reorderbuffer
78 : * (xids contained in the content of records are not relevant for this rule).
79 : * That means that for records which'd otherwise not go through the
80 : * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
81 : * call ReorderBufferProcessXid for each record type by default, because
82 : * e.g. empty xacts can be handled more efficiently if there's no previous
83 : * state for them.
84 : *
85 : * We also support the ability to fast forward thru records, skipping some
86 : * record types completely - see individual record types for details.
87 : */
88 : void
89 2358143 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
90 : {
91 : XLogRecordBuffer buf;
92 : TransactionId txid;
93 : RmgrData rmgr;
94 :
95 2358143 : buf.origptr = ctx->reader->ReadRecPtr;
96 2358143 : buf.endptr = ctx->reader->EndRecPtr;
97 2358143 : buf.record = record;
98 :
99 2358143 : txid = XLogRecGetTopXid(record);
100 :
101 : /*
102 : * If the top-level xid is valid, we need to assign the subxact to the
103 : * top-level xact. We need to do this for all records, hence we do it
104 : * before the switch.
105 : */
106 2358143 : if (TransactionIdIsValid(txid))
107 : {
108 671 : ReorderBufferAssignChild(ctx->reorder,
109 : txid,
110 671 : XLogRecGetXid(record),
111 : buf.origptr);
112 : }
113 :
114 2358143 : rmgr = GetRmgr(XLogRecGetRmid(record));
115 :
116 2358143 : if (rmgr.rm_decode != NULL)
117 1801858 : rmgr.rm_decode(ctx, &buf);
118 : else
119 : {
120 : /* just deal with xid, and done */
121 556285 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
122 : buf.origptr);
123 : }
124 2358128 : }
125 :
126 : /*
127 : * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
128 : */
129 : void
130 7562 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
131 : {
132 7562 : SnapBuild *builder = ctx->snapshot_builder;
133 7562 : uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
134 :
135 7562 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
136 : buf->origptr);
137 :
138 7562 : switch (info)
139 : {
140 : /* this is also used in END_OF_RECOVERY checkpoints */
141 102 : case XLOG_CHECKPOINT_SHUTDOWN:
142 : case XLOG_END_OF_RECOVERY:
143 102 : SnapBuildSerializationPoint(builder, buf->origptr);
144 :
145 102 : break;
146 85 : case XLOG_CHECKPOINT_ONLINE:
147 :
148 : /*
149 : * a RUNNING_XACTS record will have been logged near to this, we
150 : * can restart from there.
151 : */
152 85 : break;
153 0 : case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
154 : {
155 : bool logical_decoding;
156 :
157 0 : memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
158 :
159 : /*
160 : * Error out as we should not decode this WAL record.
161 : *
162 : * Logical decoding is disabled, and existing logical slots on
163 : * the standby are invalidated when this WAL record is
164 : * replayed. No logical decoder can process this WAL record
165 : * until replay completes, and by then the slots are already
166 : * invalidated. Furthermore, no new logical slots can be
167 : * created while logical decoding is disabled. This cannot
168 : * occur even on primary either, since it will not restart
169 : * with wal_level < replica if any logical slots exist.
170 : */
171 0 : elog(ERROR, "unexpected logical decoding status change %d",
172 : logical_decoding);
173 :
174 : break;
175 : }
176 7375 : case XLOG_NOOP:
177 : case XLOG_NEXTOID:
178 : case XLOG_SWITCH:
179 : case XLOG_BACKUP_END:
180 : case XLOG_PARAMETER_CHANGE:
181 : case XLOG_RESTORE_POINT:
182 : case XLOG_FPW_CHANGE:
183 : case XLOG_FPI_FOR_HINT:
184 : case XLOG_FPI:
185 : case XLOG_OVERWRITE_CONTRECORD:
186 : case XLOG_CHECKPOINT_REDO:
187 7375 : break;
188 0 : default:
189 0 : elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
190 : }
191 7562 : }
192 :
193 : void
194 0 : xlog2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
195 : {
196 0 : uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
197 :
198 0 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr);
199 :
200 0 : switch (info)
201 : {
202 0 : case XLOG2_CHECKSUMS:
203 0 : break;
204 0 : default:
205 0 : elog(ERROR, "unexpected RM_XLOG2_ID record type: %u", info);
206 : }
207 0 : }
208 :
209 : /*
210 : * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
211 : */
212 : void
213 10688 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
214 : {
215 10688 : SnapBuild *builder = ctx->snapshot_builder;
216 10688 : ReorderBuffer *reorder = ctx->reorder;
217 10688 : XLogReaderState *r = buf->record;
218 10688 : uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
219 :
220 : /*
221 : * If the snapshot isn't yet fully built, we cannot decode anything, so
222 : * bail out.
223 : */
224 10688 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
225 13 : return;
226 :
227 10675 : switch (info)
228 : {
229 3999 : case XLOG_XACT_COMMIT:
230 : case XLOG_XACT_COMMIT_PREPARED:
231 : {
232 : xl_xact_commit *xlrec;
233 : xl_xact_parsed_commit parsed;
234 : TransactionId xid;
235 3999 : bool two_phase = false;
236 :
237 3999 : xlrec = (xl_xact_commit *) XLogRecGetData(r);
238 3999 : ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
239 :
240 3999 : if (!TransactionIdIsValid(parsed.twophase_xid))
241 3871 : xid = XLogRecGetXid(r);
242 : else
243 128 : xid = parsed.twophase_xid;
244 :
245 : /*
246 : * We would like to process the transaction in a two-phase
247 : * manner iff output plugin supports two-phase commits and
248 : * doesn't filter the transaction at prepare time.
249 : */
250 3999 : if (info == XLOG_XACT_COMMIT_PREPARED)
251 128 : two_phase = !(FilterPrepare(ctx, xid,
252 128 : parsed.twophase_gid));
253 :
254 3999 : DecodeCommit(ctx, buf, &parsed, xid, two_phase);
255 3986 : break;
256 : }
257 295 : case XLOG_XACT_ABORT:
258 : case XLOG_XACT_ABORT_PREPARED:
259 : {
260 : xl_xact_abort *xlrec;
261 : xl_xact_parsed_abort parsed;
262 : TransactionId xid;
263 295 : bool two_phase = false;
264 :
265 295 : xlrec = (xl_xact_abort *) XLogRecGetData(r);
266 295 : ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
267 :
268 295 : if (!TransactionIdIsValid(parsed.twophase_xid))
269 244 : xid = XLogRecGetXid(r);
270 : else
271 51 : xid = parsed.twophase_xid;
272 :
273 : /*
274 : * We would like to process the transaction in a two-phase
275 : * manner iff output plugin supports two-phase commits and
276 : * doesn't filter the transaction at prepare time.
277 : */
278 295 : if (info == XLOG_XACT_ABORT_PREPARED)
279 51 : two_phase = !(FilterPrepare(ctx, xid,
280 51 : parsed.twophase_gid));
281 :
282 295 : DecodeAbort(ctx, buf, &parsed, xid, two_phase);
283 295 : break;
284 : }
285 124 : case XLOG_XACT_ASSIGNMENT:
286 :
287 : /*
288 : * We assign subxact to the toplevel xact while processing each
289 : * record if required. So, we don't need to do anything here. See
290 : * LogicalDecodingProcessRecord.
291 : */
292 124 : break;
293 6054 : case XLOG_XACT_INVALIDATIONS:
294 : {
295 : TransactionId xid;
296 : xl_xact_invals *invals;
297 :
298 6054 : xid = XLogRecGetXid(r);
299 6054 : invals = (xl_xact_invals *) XLogRecGetData(r);
300 :
301 : /*
302 : * Execute the invalidations for xid-less transactions,
303 : * otherwise, accumulate them so that they can be processed at
304 : * the commit time.
305 : */
306 6054 : if (TransactionIdIsValid(xid))
307 : {
308 6020 : if (!ctx->fast_forward)
309 5945 : ReorderBufferAddInvalidations(reorder, xid,
310 : buf->origptr,
311 5945 : invals->nmsgs,
312 5945 : invals->msgs);
313 6020 : ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
314 : buf->origptr);
315 : }
316 34 : else if (!ctx->fast_forward)
317 34 : ReorderBufferImmediateInvalidation(ctx->reorder,
318 34 : invals->nmsgs,
319 34 : invals->msgs);
320 :
321 6054 : break;
322 : }
323 203 : case XLOG_XACT_PREPARE:
324 : {
325 : xl_xact_parsed_prepare parsed;
326 : xl_xact_prepare *xlrec;
327 :
328 : /* ok, parse it */
329 203 : xlrec = (xl_xact_prepare *) XLogRecGetData(r);
330 203 : ParsePrepareRecord(XLogRecGetInfo(buf->record),
331 : xlrec, &parsed);
332 :
333 : /*
334 : * We would like to process the transaction in a two-phase
335 : * manner iff output plugin supports two-phase commits and
336 : * doesn't filter the transaction at prepare time.
337 : */
338 203 : if (FilterPrepare(ctx, parsed.twophase_xid,
339 : parsed.twophase_gid))
340 : {
341 20 : ReorderBufferProcessXid(reorder, parsed.twophase_xid,
342 : buf->origptr);
343 20 : break;
344 : }
345 :
346 : /*
347 : * Note that if the prepared transaction has locked [user]
348 : * catalog tables exclusively then decoding prepare can block
349 : * till the main transaction is committed because it needs to
350 : * lock the catalog tables.
351 : *
352 : * XXX Now, this can even lead to a deadlock if the prepare
353 : * transaction is waiting to get it logically replicated for
354 : * distributed 2PC. This can be avoided by disallowing
355 : * preparing transactions that have locked [user] catalog
356 : * tables exclusively but as of now, we ask users not to do
357 : * such an operation.
358 : */
359 183 : DecodePrepare(ctx, buf, &parsed);
360 183 : break;
361 : }
362 0 : default:
363 0 : elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
364 : }
365 : }
366 :
367 : /*
368 : * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
369 : */
370 : void
371 4411 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
372 : {
373 4411 : SnapBuild *builder = ctx->snapshot_builder;
374 4411 : XLogReaderState *r = buf->record;
375 4411 : uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
376 :
377 4411 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
378 :
379 4411 : switch (info)
380 : {
381 1742 : case XLOG_RUNNING_XACTS:
382 : {
383 1742 : xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
384 :
385 : /*
386 : * Update this decoder's idea of transactions currently
387 : * running. In doing so we will determine whether we have
388 : * reached consistent status.
389 : *
390 : * If the output plugin doesn't need access to shared
391 : * catalogs, we can ignore transactions in other databases.
392 : */
393 1742 : SnapBuildProcessRunningXacts(builder, buf->origptr, running,
394 1742 : !ctx->options.need_shared_catalogs);
395 :
396 : /*
397 : * Abort all transactions that we keep track of, that are
398 : * older than the record's oldestRunningXid. This is the most
399 : * convenient spot for doing so since, in contrast to shutdown
400 : * or end-of-recovery checkpoints, we have information about
401 : * all running transactions which includes prepared ones,
402 : * while shutdown checkpoints just know that no non-prepared
403 : * transactions are in progress.
404 : *
405 : * The database-specific records might work here too, but it's
406 : * not their purpose.
407 : */
408 1740 : if (!OidIsValid(running->dbid))
409 1737 : ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
410 : }
411 1740 : break;
412 2635 : case XLOG_STANDBY_LOCK:
413 2635 : break;
414 34 : case XLOG_INVALIDATIONS:
415 :
416 : /*
417 : * We are processing the invalidations at the command level via
418 : * XLOG_XACT_INVALIDATIONS. So we don't need to do anything here.
419 : */
420 34 : break;
421 0 : default:
422 0 : elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
423 : }
424 4409 : }
425 :
426 : /*
427 : * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
428 : */
429 : void
430 37040 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
431 : {
432 37040 : uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
433 37040 : TransactionId xid = XLogRecGetXid(buf->record);
434 37040 : SnapBuild *builder = ctx->snapshot_builder;
435 :
436 37040 : ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
437 :
438 : /*
439 : * If we don't have snapshot or we are just fast-forwarding, there is no
440 : * point in decoding data changes. However, it's crucial to build the base
441 : * snapshot during fast-forward mode (as is done in
442 : * SnapBuildProcessChange()) because we require the snapshot's xmin when
443 : * determining the candidate catalog_xmin for the replication slot. See
444 : * SnapBuildProcessRunningXacts().
445 : */
446 37040 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
447 8 : return;
448 :
449 37032 : switch (info)
450 : {
451 7020 : case XLOG_HEAP2_MULTI_INSERT:
452 7020 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
453 7020 : !ctx->fast_forward &&
454 6917 : !change_useless_for_repack(buf))
455 6885 : DecodeMultiInsert(ctx, buf);
456 7020 : break;
457 28067 : case XLOG_HEAP2_NEW_CID:
458 28067 : if (!ctx->fast_forward)
459 : {
460 : xl_heap_new_cid *xlrec;
461 :
462 27684 : xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
463 27684 : SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
464 : }
465 28067 : break;
466 90 : case XLOG_HEAP2_REWRITE:
467 :
468 : /*
469 : * Although these records only exist to serve the needs of logical
470 : * decoding, all the work happens as part of crash or archive
471 : * recovery, so we don't need to do anything here.
472 : */
473 90 : break;
474 :
475 : /*
476 : * Everything else here is just low level physical stuff we're not
477 : * interested in.
478 : */
479 1855 : case XLOG_HEAP2_PRUNE_ON_ACCESS:
480 : case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
481 : case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
482 : case XLOG_HEAP2_LOCK_UPDATED:
483 1855 : break;
484 0 : default:
485 0 : elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
486 : }
487 : }
488 :
489 : /*
490 : * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
491 : */
492 : void
493 1742059 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
494 : {
495 1742059 : uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
496 1742059 : TransactionId xid = XLogRecGetXid(buf->record);
497 1742059 : SnapBuild *builder = ctx->snapshot_builder;
498 :
499 1742059 : ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
500 :
501 : /*
502 : * If we don't have snapshot or we are just fast-forwarding, there is no
503 : * point in decoding data changes. However, it's crucial to build the base
504 : * snapshot during fast-forward mode (as is done in
505 : * SnapBuildProcessChange()) because we require the snapshot's xmin when
506 : * determining the candidate catalog_xmin for the replication slot. See
507 : * SnapBuildProcessRunningXacts().
508 : */
509 1742059 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
510 684 : return;
511 :
512 1741375 : switch (info)
513 : {
514 1127494 : case XLOG_HEAP_INSERT:
515 1127494 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
516 1127494 : !ctx->fast_forward &&
517 1127287 : !change_useless_for_repack(buf))
518 1127219 : DecodeInsert(ctx, buf);
519 1127494 : break;
520 :
521 : /*
522 : * Treat HOT update as normal updates. There is no useful
523 : * information in the fact that we could make it a HOT update
524 : * locally and the WAL layout is compatible.
525 : */
526 181057 : case XLOG_HEAP_HOT_UPDATE:
527 : case XLOG_HEAP_UPDATE:
528 181057 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
529 181057 : !ctx->fast_forward &&
530 181044 : !change_useless_for_repack(buf))
531 181029 : DecodeUpdate(ctx, buf);
532 181057 : break;
533 :
534 238069 : case XLOG_HEAP_DELETE:
535 238069 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
536 238069 : !ctx->fast_forward &&
537 237975 : !change_useless_for_repack(buf))
538 237968 : DecodeDelete(ctx, buf);
539 238069 : break;
540 :
541 62 : case XLOG_HEAP_TRUNCATE:
542 62 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
543 62 : !ctx->fast_forward &&
544 60 : !change_useless_for_repack(buf))
545 60 : DecodeTruncate(ctx, buf);
546 62 : break;
547 :
548 1235 : case XLOG_HEAP_INPLACE:
549 :
550 : /*
551 : * Inplace updates are only ever performed on catalog tuples and
552 : * can, per definition, not change tuple visibility. Since we
553 : * also don't decode catalog tuples, we're not interested in the
554 : * record's contents.
555 : */
556 1235 : break;
557 :
558 17964 : case XLOG_HEAP_CONFIRM:
559 17964 : if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
560 17964 : !ctx->fast_forward &&
561 17964 : !change_useless_for_repack(buf))
562 17964 : DecodeSpecConfirm(ctx, buf);
563 17964 : break;
564 :
565 175494 : case XLOG_HEAP_LOCK:
566 : /* we don't care about row level locks for now */
567 175494 : break;
568 :
569 0 : default:
570 0 : elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
571 : break;
572 : }
573 : }
574 :
575 : /*
576 : * Ask output plugin whether we want to skip this PREPARE and send
577 : * this transaction as a regular commit later.
578 : */
579 : static inline bool
580 382 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
581 : const char *gid)
582 : {
583 : /*
584 : * Skip if decoding of two-phase transactions at PREPARE time is not
585 : * enabled. In that case, all two-phase transactions are considered
586 : * filtered out and will be applied as regular transactions at COMMIT
587 : * PREPARED.
588 : */
589 382 : if (!ctx->twophase)
590 23 : return true;
591 :
592 : /*
593 : * The filter_prepare callback is optional. When not supplied, all
594 : * prepared transactions should go through.
595 : */
596 359 : if (ctx->callbacks.filter_prepare_cb == NULL)
597 211 : return false;
598 :
599 148 : return filter_prepare_cb_wrapper(ctx, xid, gid);
600 : }
601 :
602 : static inline bool
603 1561617 : FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
604 : {
605 1561617 : if (ctx->callbacks.filter_by_origin_cb == NULL)
606 79 : return false;
607 :
608 1561538 : return filter_by_origin_cb_wrapper(ctx, origin_id);
609 : }
610 :
611 : /*
612 : * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
613 : */
614 : void
615 98 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
616 : {
617 98 : SnapBuild *builder = ctx->snapshot_builder;
618 98 : XLogReaderState *r = buf->record;
619 98 : TransactionId xid = XLogRecGetXid(r);
620 98 : uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
621 98 : ReplOriginId origin_id = XLogRecGetOrigin(r);
622 98 : Snapshot snapshot = NULL;
623 : xl_logical_message *message;
624 :
625 98 : if (info != XLOG_LOGICAL_MESSAGE)
626 0 : elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
627 :
628 98 : ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
629 :
630 : /* If we don't have snapshot, there is no point in decoding messages */
631 98 : if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
632 0 : return;
633 :
634 98 : message = (xl_logical_message *) XLogRecGetData(r);
635 :
636 194 : if (message->dbId != ctx->slot->data.database ||
637 96 : FilterByOrigin(ctx, origin_id))
638 4 : return;
639 :
640 94 : if (message->transactional &&
641 39 : !SnapBuildProcessChange(builder, xid, buf->origptr))
642 0 : return;
643 149 : else if (!message->transactional &&
644 110 : (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
645 55 : SnapBuildXactNeedsSkip(builder, buf->origptr)))
646 44 : return;
647 :
648 : /*
649 : * We also skip decoding in fast_forward mode. This check must be last
650 : * because we don't want to set the processing_required flag unless we
651 : * have a decodable message.
652 : */
653 50 : if (ctx->fast_forward)
654 : {
655 : /*
656 : * We need to set processing_required flag to notify the message's
657 : * existence to the caller. Usually, the flag is set when either the
658 : * COMMIT or ABORT records are decoded, but this must be turned on
659 : * here because the non-transactional logical message is decoded
660 : * without waiting for these records.
661 : */
662 3 : if (!message->transactional)
663 3 : ctx->processing_required = true;
664 :
665 3 : return;
666 : }
667 :
668 : /*
669 : * If this is a non-transactional change, get the snapshot we're expected
670 : * to use. We only get here when the snapshot is consistent, and the
671 : * change is not meant to be skipped.
672 : *
673 : * For transactional changes we don't need a snapshot, we'll use the
674 : * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
675 : */
676 47 : if (!message->transactional)
677 8 : snapshot = SnapBuildGetOrBuildSnapshot(builder);
678 :
679 47 : ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
680 47 : message->transactional,
681 47 : message->message, /* first part of message is
682 : * prefix */
683 : message->message_size,
684 47 : message->message + message->prefix_size);
685 : }
686 :
687 : /*
688 : * Consolidated commit record handling between the different form of commit
689 : * records.
690 : *
691 : * 'two_phase' indicates that caller wants to process the transaction in two
692 : * phases, first process prepare if not already done and then process
693 : * commit_prepared.
694 : */
695 : static void
696 3999 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
697 : xl_xact_parsed_commit *parsed, TransactionId xid,
698 : bool two_phase)
699 : {
700 3999 : XLogRecPtr origin_lsn = InvalidXLogRecPtr;
701 3999 : TimestampTz commit_time = parsed->xact_time;
702 3999 : ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
703 : int i;
704 :
705 3999 : if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
706 : {
707 106 : origin_lsn = parsed->origin_lsn;
708 106 : commit_time = parsed->origin_timestamp;
709 : }
710 :
711 3999 : SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
712 : parsed->nsubxacts, parsed->subxacts,
713 : parsed->xinfo);
714 :
715 : /* ----
716 : * Check whether we are interested in this specific transaction, and tell
717 : * the reorderbuffer to forget the content of the (sub-)transactions
718 : * if not.
719 : *
720 : * We can't just use ReorderBufferAbort() here, because we need to execute
721 : * the transaction's invalidations. This currently won't be needed if
722 : * we're just skipping over the transaction because currently we only do
723 : * so during startup, to get to the first transaction the client needs. As
724 : * we have reset the catalog caches before starting to read WAL, and we
725 : * haven't yet touched any catalogs, there can't be anything to invalidate.
726 : * But if we're "forgetting" this commit because it happened in another
727 : * database, the invalidations might be important, because they could be
728 : * for shared catalogs and we might have loaded data into the relevant
729 : * syscaches.
730 : * ---
731 : */
732 3999 : if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
733 : {
734 2972 : for (i = 0; i < parsed->nsubxacts; i++)
735 : {
736 967 : ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
737 : }
738 2005 : ReorderBufferForget(ctx->reorder, xid, buf->origptr);
739 :
740 2005 : return;
741 : }
742 :
743 : /* tell the reorderbuffer about the surviving subtransactions */
744 2263 : for (i = 0; i < parsed->nsubxacts; i++)
745 : {
746 269 : ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
747 : buf->origptr, buf->endptr);
748 : }
749 :
750 : /*
751 : * Send the final commit record if the transaction data is already
752 : * decoded, otherwise, process the entire transaction.
753 : */
754 1994 : if (two_phase)
755 : {
756 35 : ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
757 35 : SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
758 : commit_time, origin_id, origin_lsn,
759 35 : parsed->twophase_gid, true);
760 : }
761 : else
762 : {
763 1959 : ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
764 : commit_time, origin_id, origin_lsn);
765 : }
766 :
767 : /*
768 : * Update the decoding stats at transaction prepare/commit/abort.
769 : * Additionally we send the stats when we spill or stream the changes to
770 : * avoid losing them in case the decoding is interrupted. It is not clear
771 : * that sending more or less frequently than this would be better.
772 : */
773 1981 : UpdateDecodingStats(ctx);
774 : }
775 :
776 : /*
777 : * Decode PREPARE record. Similar logic as in DecodeCommit.
778 : *
779 : * Note that we don't skip prepare even if have detected concurrent abort
780 : * because it is quite possible that we had already sent some changes before we
781 : * detect abort in which case we need to abort those changes in the subscriber.
782 : * To abort such changes, we do send the prepare and then the rollback prepared
783 : * which is what happened on the publisher-side as well. Now, we can invent a
784 : * new abort API wherein in such cases we send abort and skip sending prepared
785 : * and rollback prepared but then it is not that straightforward because we
786 : * might have streamed this transaction by that time in which case it is
787 : * handled when the rollback is encountered. It is not impossible to optimize
788 : * the concurrent abort case but it can introduce design complexity w.r.t
789 : * handling different cases so leaving it for now as it doesn't seem worth it.
790 : */
791 : static void
792 183 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
793 : xl_xact_parsed_prepare *parsed)
794 : {
795 183 : SnapBuild *builder = ctx->snapshot_builder;
796 183 : XLogRecPtr origin_lsn = parsed->origin_lsn;
797 183 : TimestampTz prepare_time = parsed->xact_time;
798 183 : ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
799 : int i;
800 183 : TransactionId xid = parsed->twophase_xid;
801 :
802 183 : if (parsed->origin_timestamp != 0)
803 8 : prepare_time = parsed->origin_timestamp;
804 :
805 : /*
806 : * Remember the prepare info for a txn so that it can be used later in
807 : * commit prepared if required. See ReorderBufferFinishPrepared.
808 : */
809 183 : if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
810 : buf->endptr, prepare_time, origin_id,
811 : origin_lsn))
812 0 : return;
813 :
814 : /* We can't start streaming unless a consistent state is reached. */
815 183 : if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
816 : {
817 3 : ReorderBufferSkipPrepare(ctx->reorder, xid);
818 3 : return;
819 : }
820 :
821 : /*
822 : * Check whether we need to process this transaction. See
823 : * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
824 : * transaction.
825 : *
826 : * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
827 : * hasn't yet been committed, removing this txn before a commit might
828 : * result in the computation of an incorrect restart_lsn. See
829 : * SnapBuildProcessRunningXacts. But we need to process cache
830 : * invalidations if there are any for the reasons mentioned in
831 : * DecodeCommit.
832 : */
833 180 : if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
834 : {
835 134 : ReorderBufferSkipPrepare(ctx->reorder, xid);
836 134 : ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
837 134 : return;
838 : }
839 :
840 : /* Tell the reorderbuffer about the surviving subtransactions. */
841 47 : for (i = 0; i < parsed->nsubxacts; i++)
842 : {
843 1 : ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
844 : buf->origptr, buf->endptr);
845 : }
846 :
847 : /* replay actions of all transaction + subtransactions in order */
848 46 : ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
849 :
850 : /*
851 : * Update the decoding stats at transaction prepare/commit/abort.
852 : * Additionally we send the stats when we spill or stream the changes to
853 : * avoid losing them in case the decoding is interrupted. It is not clear
854 : * that sending more or less frequently than this would be better.
855 : */
856 46 : UpdateDecodingStats(ctx);
857 : }
858 :
859 :
860 : /*
861 : * Get the data from the various forms of abort records and pass it on to
862 : * snapbuild.c and reorderbuffer.c.
863 : *
864 : * 'two_phase' indicates to finish prepared transaction.
865 : */
866 : static void
867 295 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
868 : xl_xact_parsed_abort *parsed, TransactionId xid,
869 : bool two_phase)
870 : {
871 : int i;
872 295 : XLogRecPtr origin_lsn = InvalidXLogRecPtr;
873 295 : TimestampTz abort_time = parsed->xact_time;
874 295 : ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
875 : bool skip_xact;
876 :
877 295 : if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
878 : {
879 4 : origin_lsn = parsed->origin_lsn;
880 4 : abort_time = parsed->origin_timestamp;
881 : }
882 :
883 : /*
884 : * Check whether we need to process this transaction. See
885 : * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
886 : * transaction.
887 : */
888 295 : skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
889 :
890 : /*
891 : * Send the final rollback record for a prepared transaction unless we
892 : * need to skip it. For non-two-phase xacts, simply forget the xact.
893 : */
894 295 : if (two_phase && !skip_xact)
895 : {
896 11 : ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
897 : InvalidXLogRecPtr,
898 : abort_time, origin_id, origin_lsn,
899 11 : parsed->twophase_gid, false);
900 : }
901 : else
902 : {
903 290 : for (i = 0; i < parsed->nsubxacts; i++)
904 : {
905 6 : ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
906 6 : buf->record->EndRecPtr, abort_time);
907 : }
908 :
909 284 : ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
910 : abort_time);
911 : }
912 :
913 : /* update the decoding stats */
914 295 : UpdateDecodingStats(ctx);
915 295 : }
916 :
917 : /*
918 : * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
919 : *
920 : * Inserts can contain the new tuple.
921 : */
922 : static void
923 1127219 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
924 : {
925 : Size datalen;
926 : char *tupledata;
927 : Size tuplelen;
928 1127219 : XLogReaderState *r = buf->record;
929 : xl_heap_insert *xlrec;
930 : ReorderBufferChange *change;
931 : RelFileLocator target_locator;
932 :
933 1127219 : xlrec = (xl_heap_insert *) XLogRecGetData(r);
934 :
935 : /*
936 : * Ignore insert records without new tuples (this does happen when
937 : * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
938 : */
939 1127219 : if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
940 4818 : return;
941 :
942 : /* only interested in our database */
943 1122594 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
944 1122594 : if (target_locator.dbOid != ctx->slot->data.database)
945 0 : return;
946 :
947 : /* output plugin doesn't look for this origin, no need to queue */
948 1122594 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
949 193 : return;
950 :
951 1122401 : change = ReorderBufferAllocChange(ctx->reorder);
952 1122401 : if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
953 1104437 : change->action = REORDER_BUFFER_CHANGE_INSERT;
954 : else
955 17964 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
956 1122401 : change->origin_id = XLogRecGetOrigin(r);
957 :
958 1122401 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
959 :
960 1122401 : tupledata = XLogRecGetBlockData(r, 0, &datalen);
961 1122401 : tuplelen = datalen - SizeOfHeapHeader;
962 :
963 1122401 : change->data.tp.newtuple =
964 1122401 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
965 :
966 1122401 : DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
967 :
968 1122401 : change->data.tp.clear_toast_afterwards = true;
969 :
970 1122401 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
971 : change,
972 1122401 : xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
973 : }
974 :
975 : /*
976 : * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
977 : * in the record, from wal into proper tuplebufs.
978 : *
979 : * Updates can possibly contain a new tuple and the old primary key.
980 : */
981 : static void
982 181029 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
983 : {
984 181029 : XLogReaderState *r = buf->record;
985 : xl_heap_update *xlrec;
986 : ReorderBufferChange *change;
987 : char *data;
988 : RelFileLocator target_locator;
989 :
990 181029 : xlrec = (xl_heap_update *) XLogRecGetData(r);
991 :
992 : /* only interested in our database */
993 181029 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
994 181029 : if (target_locator.dbOid != ctx->slot->data.database)
995 316 : return;
996 :
997 : /* output plugin doesn't look for this origin, no need to queue */
998 180746 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
999 33 : return;
1000 :
1001 180713 : change = ReorderBufferAllocChange(ctx->reorder);
1002 180713 : change->action = REORDER_BUFFER_CHANGE_UPDATE;
1003 180713 : change->origin_id = XLogRecGetOrigin(r);
1004 180713 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
1005 :
1006 180713 : if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
1007 : {
1008 : Size datalen;
1009 : Size tuplelen;
1010 :
1011 178921 : data = XLogRecGetBlockData(r, 0, &datalen);
1012 :
1013 178921 : tuplelen = datalen - SizeOfHeapHeader;
1014 :
1015 178921 : change->data.tp.newtuple =
1016 178921 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
1017 :
1018 178921 : DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
1019 : }
1020 :
1021 180713 : if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
1022 : {
1023 : Size datalen;
1024 : Size tuplelen;
1025 :
1026 : /* caution, remaining data in record is not aligned */
1027 352 : data = XLogRecGetData(r) + SizeOfHeapUpdate;
1028 352 : datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
1029 352 : tuplelen = datalen - SizeOfHeapHeader;
1030 :
1031 352 : change->data.tp.oldtuple =
1032 352 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
1033 :
1034 352 : DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
1035 : }
1036 :
1037 180713 : change->data.tp.clear_toast_afterwards = true;
1038 :
1039 180713 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
1040 : change, false);
1041 : }
1042 :
1043 : /*
1044 : * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
1045 : *
1046 : * Deletes can possibly contain the old primary key.
1047 : */
1048 : static void
1049 237968 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1050 : {
1051 237968 : XLogReaderState *r = buf->record;
1052 : xl_heap_delete *xlrec;
1053 : ReorderBufferChange *change;
1054 : RelFileLocator target_locator;
1055 :
1056 237968 : xlrec = (xl_heap_delete *) XLogRecGetData(r);
1057 :
1058 : /*
1059 : * Skip changes that were marked as ignorable at origin.
1060 : *
1061 : * (This is used for changes that affect relations not visible to other
1062 : * transactions, such as the transient table during concurrent repack.)
1063 : */
1064 237968 : if (xlrec->flags & XLH_DELETE_NO_LOGICAL)
1065 51 : return;
1066 :
1067 : /* only interested in our database */
1068 237968 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
1069 237968 : if (target_locator.dbOid != ctx->slot->data.database)
1070 33 : return;
1071 :
1072 : /* output plugin doesn't look for this origin, no need to queue */
1073 237935 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1074 18 : return;
1075 :
1076 237917 : change = ReorderBufferAllocChange(ctx->reorder);
1077 :
1078 237917 : if (xlrec->flags & XLH_DELETE_IS_SUPER)
1079 0 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
1080 : else
1081 237917 : change->action = REORDER_BUFFER_CHANGE_DELETE;
1082 :
1083 237917 : change->origin_id = XLogRecGetOrigin(r);
1084 :
1085 237917 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
1086 :
1087 : /* old primary key stored */
1088 237917 : if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
1089 : {
1090 175892 : Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
1091 175892 : Size tuplelen = datalen - SizeOfHeapHeader;
1092 :
1093 : Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
1094 :
1095 175892 : change->data.tp.oldtuple =
1096 175892 : ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
1097 :
1098 175892 : DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
1099 : datalen, change->data.tp.oldtuple);
1100 : }
1101 :
1102 237917 : change->data.tp.clear_toast_afterwards = true;
1103 :
1104 237917 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
1105 : change, false);
1106 : }
1107 :
1108 : /*
1109 : * Parse XLOG_HEAP_TRUNCATE from wal
1110 : */
1111 : static void
1112 60 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1113 : {
1114 60 : XLogReaderState *r = buf->record;
1115 : xl_heap_truncate *xlrec;
1116 : ReorderBufferChange *change;
1117 :
1118 60 : xlrec = (xl_heap_truncate *) XLogRecGetData(r);
1119 :
1120 : /* only interested in our database */
1121 60 : if (xlrec->dbId != ctx->slot->data.database)
1122 0 : return;
1123 :
1124 : /* output plugin doesn't look for this origin, no need to queue */
1125 60 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1126 1 : return;
1127 :
1128 59 : change = ReorderBufferAllocChange(ctx->reorder);
1129 59 : change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
1130 59 : change->origin_id = XLogRecGetOrigin(r);
1131 59 : if (xlrec->flags & XLH_TRUNCATE_CASCADE)
1132 1 : change->data.truncate.cascade = true;
1133 59 : if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
1134 2 : change->data.truncate.restart_seqs = true;
1135 59 : change->data.truncate.nrelids = xlrec->nrelids;
1136 118 : change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
1137 59 : xlrec->nrelids);
1138 59 : memcpy(change->data.truncate.relids, xlrec->relids,
1139 59 : xlrec->nrelids * sizeof(Oid));
1140 59 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
1141 : buf->origptr, change, false);
1142 : }
1143 :
1144 : /*
1145 : * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
1146 : *
1147 : * Currently MULTI_INSERT will always contain the full tuples.
1148 : */
1149 : static void
1150 6885 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1151 : {
1152 6885 : XLogReaderState *r = buf->record;
1153 : xl_heap_multi_insert *xlrec;
1154 : int i;
1155 : char *data;
1156 : char *tupledata;
1157 : Size tuplelen;
1158 : RelFileLocator rlocator;
1159 :
1160 6885 : xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
1161 :
1162 : /*
1163 : * Ignore insert records without new tuples. This happens when a
1164 : * multi_insert is done on a catalog or on a non-persistent relation.
1165 : */
1166 6885 : if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
1167 6871 : return;
1168 :
1169 : /* only interested in our database */
1170 65 : XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
1171 65 : if (rlocator.dbOid != ctx->slot->data.database)
1172 51 : return;
1173 :
1174 : /* output plugin doesn't look for this origin, no need to queue */
1175 14 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1176 0 : return;
1177 :
1178 : /*
1179 : * We know that this multi_insert isn't for a catalog, so the block should
1180 : * always have data even if a full-page write of it is taken.
1181 : */
1182 14 : tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
1183 : Assert(tupledata != NULL);
1184 :
1185 14 : data = tupledata;
1186 1053 : for (i = 0; i < xlrec->ntuples; i++)
1187 : {
1188 : ReorderBufferChange *change;
1189 : xl_multi_insert_tuple *xlhdr;
1190 : int datalen;
1191 : HeapTuple tuple;
1192 : HeapTupleHeader header;
1193 :
1194 1039 : change = ReorderBufferAllocChange(ctx->reorder);
1195 1039 : change->action = REORDER_BUFFER_CHANGE_INSERT;
1196 1039 : change->origin_id = XLogRecGetOrigin(r);
1197 :
1198 1039 : memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
1199 :
1200 1039 : xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
1201 1039 : data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
1202 1039 : datalen = xlhdr->datalen;
1203 :
1204 1039 : change->data.tp.newtuple =
1205 1039 : ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
1206 :
1207 1039 : tuple = change->data.tp.newtuple;
1208 1039 : header = tuple->t_data;
1209 :
1210 : /* not a disk based tuple */
1211 1039 : ItemPointerSetInvalid(&tuple->t_self);
1212 :
1213 : /*
1214 : * We can only figure this out after reassembling the transactions.
1215 : */
1216 1039 : tuple->t_tableOid = InvalidOid;
1217 :
1218 1039 : tuple->t_len = datalen + SizeofHeapTupleHeader;
1219 :
1220 1039 : memset(header, 0, SizeofHeapTupleHeader);
1221 :
1222 1039 : memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
1223 1039 : header->t_infomask = xlhdr->t_infomask;
1224 1039 : header->t_infomask2 = xlhdr->t_infomask2;
1225 1039 : header->t_hoff = xlhdr->t_hoff;
1226 :
1227 : /*
1228 : * Reset toast reassembly state only after the last row in the last
1229 : * xl_multi_insert_tuple record emitted by one heap_multi_insert()
1230 : * call.
1231 : */
1232 1039 : if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
1233 179 : (i + 1) == xlrec->ntuples)
1234 9 : change->data.tp.clear_toast_afterwards = true;
1235 : else
1236 1030 : change->data.tp.clear_toast_afterwards = false;
1237 :
1238 1039 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
1239 : buf->origptr, change, false);
1240 :
1241 : /* move to the next xl_multi_insert_tuple entry */
1242 1039 : data += datalen;
1243 : }
1244 : Assert(data == tupledata + tuplelen);
1245 : }
1246 :
1247 : /*
1248 : * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
1249 : *
1250 : * This is pretty trivial, all the state essentially already setup by the
1251 : * speculative insertion.
1252 : */
1253 : static void
1254 17964 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
1255 : {
1256 17964 : XLogReaderState *r = buf->record;
1257 : ReorderBufferChange *change;
1258 : RelFileLocator target_locator;
1259 :
1260 : /* only interested in our database */
1261 17964 : XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
1262 17964 : if (target_locator.dbOid != ctx->slot->data.database)
1263 0 : return;
1264 :
1265 : /* output plugin doesn't look for this origin, no need to queue */
1266 17964 : if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
1267 0 : return;
1268 :
1269 17964 : change = ReorderBufferAllocChange(ctx->reorder);
1270 17964 : change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
1271 17964 : change->origin_id = XLogRecGetOrigin(r);
1272 :
1273 17964 : memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
1274 :
1275 17964 : change->data.tp.clear_toast_afterwards = true;
1276 :
1277 17964 : ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
1278 : change, false);
1279 : }
1280 :
1281 :
1282 : /*
1283 : * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
1284 : * (but not by heap_multi_insert) into a tuplebuf.
1285 : *
1286 : * The size 'len' and the pointer 'data' in the record need to be
1287 : * computed outside as they are record specific.
1288 : */
1289 : static void
1290 1477566 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
1291 : {
1292 : xl_heap_header xlhdr;
1293 1477566 : int datalen = len - SizeOfHeapHeader;
1294 : HeapTupleHeader header;
1295 :
1296 : Assert(datalen >= 0);
1297 :
1298 1477566 : tuple->t_len = datalen + SizeofHeapTupleHeader;
1299 1477566 : header = tuple->t_data;
1300 :
1301 : /* not a disk based tuple */
1302 1477566 : ItemPointerSetInvalid(&tuple->t_self);
1303 :
1304 : /* we can only figure this out after reassembling the transactions */
1305 1477566 : tuple->t_tableOid = InvalidOid;
1306 :
1307 : /* data is not stored aligned, copy to aligned storage */
1308 1477566 : memcpy(&xlhdr, data, SizeOfHeapHeader);
1309 :
1310 1477566 : memset(header, 0, SizeofHeapTupleHeader);
1311 :
1312 1477566 : memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
1313 1477566 : data + SizeOfHeapHeader,
1314 : datalen);
1315 :
1316 1477566 : header->t_infomask = xlhdr.t_infomask;
1317 1477566 : header->t_infomask2 = xlhdr.t_infomask2;
1318 1477566 : header->t_hoff = xlhdr.t_hoff;
1319 1477566 : }
1320 :
1321 : /*
1322 : * Check whether we are interested in this specific transaction.
1323 : *
1324 : * There can be several reasons we might not be interested in this
1325 : * transaction:
1326 : * 1) We might not be interested in decoding transactions up to this
1327 : * LSN. This can happen because we previously decoded it and now just
1328 : * are restarting or if we haven't assembled a consistent snapshot yet.
1329 : * 2) The transaction happened in another database.
1330 : * 3) The output plugin is not interested in the origin.
1331 : * 4) We are doing fast-forwarding
1332 : */
1333 : static bool
1334 4474 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
1335 : Oid txn_dbid, ReplOriginId origin_id)
1336 : {
1337 4474 : if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
1338 4358 : (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
1339 2208 : FilterByOrigin(ctx, origin_id))
1340 2304 : return true;
1341 :
1342 : /*
1343 : * We also skip decoding in fast_forward mode. In passing set the
1344 : * processing_required flag to indicate that if it were not for
1345 : * fast_forward mode, processing would have been required.
1346 : */
1347 2170 : if (ctx->fast_forward)
1348 : {
1349 39 : ctx->processing_required = true;
1350 39 : return true;
1351 : }
1352 :
1353 2131 : return false;
1354 : }
|