Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * logical.c
3 : * PostgreSQL logical decoding coordination
4 : *
5 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/logical.c
9 : *
10 : * NOTES
11 : * This file coordinates interaction between the various modules that
12 : * together provide logical decoding, primarily by providing so
13 : * called LogicalDecodingContexts. The goal is to encapsulate most of the
14 : * internal complexity for consumers of logical decoding, so they can
15 : * create and consume a changestream with a low amount of code. Builtin
16 : * consumers are the walsender and SQL SRF interface, but it's possible to
17 : * add further ones without changing core code, e.g. to consume changes in
18 : * a bgworker.
19 : *
20 : * The idea is that a consumer provides three callbacks, one to read WAL,
21 : * one to prepare a data write, and a final one for actually writing since
22 : * their implementation depends on the type of consumer. Check
23 : * logicalfuncs.c for an example implementation of a fairly simple consumer
24 : * and an implementation of a WAL reading callback that's suitable for
25 : * simple consumers.
26 : *-------------------------------------------------------------------------
27 : */
28 :
29 : #include "postgres.h"
30 :
31 : #include "access/xact.h"
32 : #include "access/xlogutils.h"
33 : #include "fmgr.h"
34 : #include "miscadmin.h"
35 : #include "pgstat.h"
36 : #include "replication/decode.h"
37 : #include "replication/logical.h"
38 : #include "replication/reorderbuffer.h"
39 : #include "replication/slotsync.h"
40 : #include "replication/snapbuild.h"
41 : #include "storage/proc.h"
42 : #include "storage/procarray.h"
43 : #include "utils/builtins.h"
44 : #include "utils/inval.h"
45 : #include "utils/memutils.h"
46 :
47 : /* data for errcontext callback */
48 : typedef struct LogicalErrorCallbackState
49 : {
50 : LogicalDecodingContext *ctx;
51 : const char *callback_name;
52 : XLogRecPtr report_location;
53 : } LogicalErrorCallbackState;
54 :
55 : /* wrappers around output plugin callbacks */
56 : static void output_plugin_error_callback(void *arg);
57 : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
58 : bool is_init);
59 : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
60 : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
61 : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
62 : XLogRecPtr commit_lsn);
63 : static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
64 : static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
65 : XLogRecPtr prepare_lsn);
66 : static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
67 : XLogRecPtr commit_lsn);
68 : static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
69 : XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
70 : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
71 : Relation relation, ReorderBufferChange *change);
72 : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
73 : int nrelations, Relation relations[], ReorderBufferChange *change);
74 : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
75 : XLogRecPtr message_lsn, bool transactional,
76 : const char *prefix, Size message_size, const char *message);
77 :
78 : /* streaming callbacks */
79 : static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
80 : XLogRecPtr first_lsn);
81 : static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
82 : XLogRecPtr last_lsn);
83 : static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
84 : XLogRecPtr abort_lsn);
85 : static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
86 : XLogRecPtr prepare_lsn);
87 : static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
88 : XLogRecPtr commit_lsn);
89 : static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
90 : Relation relation, ReorderBufferChange *change);
91 : static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
92 : XLogRecPtr message_lsn, bool transactional,
93 : const char *prefix, Size message_size, const char *message);
94 : static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
95 : int nrelations, Relation relations[], ReorderBufferChange *change);
96 :
97 : /* callback to update txn's progress */
98 : static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
99 : ReorderBufferTXN *txn,
100 : XLogRecPtr lsn);
101 :
102 : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
103 :
104 : /*
105 : * Make sure the current settings & environment are capable of doing logical
106 : * decoding.
107 : */
108 : void
109 2970 : CheckLogicalDecodingRequirements(void)
110 : {
111 2970 : CheckSlotRequirements();
112 :
113 : /*
114 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
115 : * needs the same check.
116 : */
117 :
118 2970 : if (wal_level < WAL_LEVEL_LOGICAL)
119 0 : ereport(ERROR,
120 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
121 : errmsg("logical decoding requires \"wal_level\" >= \"logical\"")));
122 :
123 2970 : if (MyDatabaseId == InvalidOid)
124 2 : ereport(ERROR,
125 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
126 : errmsg("logical decoding requires a database connection")));
127 :
128 2968 : if (RecoveryInProgress())
129 : {
130 : /*
131 : * This check may have race conditions, but whenever
132 : * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
133 : * verify that there are no existing logical replication slots. And to
134 : * avoid races around creating a new slot,
135 : * CheckLogicalDecodingRequirements() is called once before creating
136 : * the slot, and once when logical decoding is initially starting up.
137 : */
138 138 : if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
139 2 : ereport(ERROR,
140 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
141 : errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
142 : }
143 2966 : }
144 :
145 : /*
146 : * Helper function for CreateInitDecodingContext() and
147 : * CreateDecodingContext() performing common tasks.
148 : */
149 : static LogicalDecodingContext *
150 2074 : StartupDecodingContext(List *output_plugin_options,
151 : XLogRecPtr start_lsn,
152 : TransactionId xmin_horizon,
153 : bool need_full_snapshot,
154 : bool fast_forward,
155 : bool in_create,
156 : XLogReaderRoutine *xl_routine,
157 : LogicalOutputPluginWriterPrepareWrite prepare_write,
158 : LogicalOutputPluginWriterWrite do_write,
159 : LogicalOutputPluginWriterUpdateProgress update_progress)
160 : {
161 : ReplicationSlot *slot;
162 : MemoryContext context,
163 : old_context;
164 : LogicalDecodingContext *ctx;
165 :
166 : /* shorter lines... */
167 2074 : slot = MyReplicationSlot;
168 :
169 2074 : context = AllocSetContextCreate(CurrentMemoryContext,
170 : "Logical decoding context",
171 : ALLOCSET_DEFAULT_SIZES);
172 2074 : old_context = MemoryContextSwitchTo(context);
173 2074 : ctx = palloc0(sizeof(LogicalDecodingContext));
174 :
175 2074 : ctx->context = context;
176 :
177 : /*
178 : * (re-)load output plugins, so we detect a bad (removed) output plugin
179 : * now.
180 : */
181 2074 : if (!fast_forward)
182 2036 : LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
183 :
184 : /*
185 : * Now that the slot's xmin has been set, we can announce ourselves as a
186 : * logical decoding backend which doesn't need to be checked individually
187 : * when computing the xmin horizon because the xmin is enforced via
188 : * replication slots.
189 : *
190 : * We can only do so if we're outside of a transaction (i.e. the case when
191 : * streaming changes via walsender), otherwise an already setup
192 : * snapshot/xid would end up being ignored. That's not a particularly
193 : * bothersome restriction since the SQL interface can't be used for
194 : * streaming anyway.
195 : */
196 2072 : if (!IsTransactionOrTransactionBlock())
197 : {
198 1004 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
199 1004 : MyProc->statusFlags |= PROC_IN_LOGICAL_DECODING;
200 1004 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
201 1004 : LWLockRelease(ProcArrayLock);
202 : }
203 :
204 2072 : ctx->slot = slot;
205 :
206 2072 : ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
207 2072 : if (!ctx->reader)
208 0 : ereport(ERROR,
209 : (errcode(ERRCODE_OUT_OF_MEMORY),
210 : errmsg("out of memory"),
211 : errdetail("Failed while allocating a WAL reading processor.")));
212 :
213 2072 : ctx->reorder = ReorderBufferAllocate();
214 2072 : ctx->snapshot_builder =
215 2072 : AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
216 : need_full_snapshot, in_create, slot->data.two_phase_at);
217 :
218 2072 : ctx->reorder->private_data = ctx;
219 :
220 : /* wrap output plugin callbacks, so we can add error context information */
221 2072 : ctx->reorder->begin = begin_cb_wrapper;
222 2072 : ctx->reorder->apply_change = change_cb_wrapper;
223 2072 : ctx->reorder->apply_truncate = truncate_cb_wrapper;
224 2072 : ctx->reorder->commit = commit_cb_wrapper;
225 2072 : ctx->reorder->message = message_cb_wrapper;
226 :
227 : /*
228 : * To support streaming, we require start/stop/abort/commit/change
229 : * callbacks. The message and truncate callbacks are optional, similar to
230 : * regular output plugins. We however enable streaming when at least one
231 : * of the methods is enabled so that we can easily identify missing
232 : * methods.
233 : *
234 : * We decide it here, but only check it later in the wrappers.
235 : */
236 4182 : ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
237 38 : (ctx->callbacks.stream_stop_cb != NULL) ||
238 38 : (ctx->callbacks.stream_abort_cb != NULL) ||
239 38 : (ctx->callbacks.stream_commit_cb != NULL) ||
240 38 : (ctx->callbacks.stream_change_cb != NULL) ||
241 2148 : (ctx->callbacks.stream_message_cb != NULL) ||
242 38 : (ctx->callbacks.stream_truncate_cb != NULL);
243 :
244 : /*
245 : * streaming callbacks
246 : *
247 : * stream_message and stream_truncate callbacks are optional, so we do not
248 : * fail with ERROR when missing, but the wrappers simply do nothing. We
249 : * must set the ReorderBuffer callbacks to something, otherwise the calls
250 : * from there will crash (we don't want to move the checks there).
251 : */
252 2072 : ctx->reorder->stream_start = stream_start_cb_wrapper;
253 2072 : ctx->reorder->stream_stop = stream_stop_cb_wrapper;
254 2072 : ctx->reorder->stream_abort = stream_abort_cb_wrapper;
255 2072 : ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
256 2072 : ctx->reorder->stream_commit = stream_commit_cb_wrapper;
257 2072 : ctx->reorder->stream_change = stream_change_cb_wrapper;
258 2072 : ctx->reorder->stream_message = stream_message_cb_wrapper;
259 2072 : ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
260 :
261 :
262 : /*
263 : * To support two-phase logical decoding, we require
264 : * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
265 : * filter_prepare callback is optional. We however enable two-phase
266 : * logical decoding when at least one of the methods is enabled so that we
267 : * can easily identify missing methods.
268 : *
269 : * We decide it here, but only check it later in the wrappers.
270 : */
271 4182 : ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
272 38 : (ctx->callbacks.prepare_cb != NULL) ||
273 38 : (ctx->callbacks.commit_prepared_cb != NULL) ||
274 38 : (ctx->callbacks.rollback_prepared_cb != NULL) ||
275 2148 : (ctx->callbacks.stream_prepare_cb != NULL) ||
276 38 : (ctx->callbacks.filter_prepare_cb != NULL);
277 :
278 : /*
279 : * Callback to support decoding at prepare time.
280 : */
281 2072 : ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
282 2072 : ctx->reorder->prepare = prepare_cb_wrapper;
283 2072 : ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
284 2072 : ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
285 :
286 : /*
287 : * Callback to support updating progress during sending data of a
288 : * transaction (and its subtransactions) to the output plugin.
289 : */
290 2072 : ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
291 :
292 2072 : ctx->out = makeStringInfo();
293 2072 : ctx->prepare_write = prepare_write;
294 2072 : ctx->write = do_write;
295 2072 : ctx->update_progress = update_progress;
296 :
297 2072 : ctx->output_plugin_options = output_plugin_options;
298 :
299 2072 : ctx->fast_forward = fast_forward;
300 :
301 2072 : MemoryContextSwitchTo(old_context);
302 :
303 2072 : return ctx;
304 : }
305 :
306 : /*
307 : * Create a new decoding context, for a new logical slot.
308 : *
309 : * plugin -- contains the name of the output plugin
310 : * output_plugin_options -- contains options passed to the output plugin
311 : * need_full_snapshot -- if true, must obtain a snapshot able to read all
312 : * tables; if false, one that can read only catalogs is acceptable.
313 : * restart_lsn -- if given as invalid, it's this routine's responsibility to
314 : * mark WAL as reserved by setting a convenient restart_lsn for the slot.
315 : * Otherwise, we set for decoding to start from the given LSN without
316 : * marking WAL reserved beforehand. In that scenario, it's up to the
317 : * caller to guarantee that WAL remains available.
318 : * xl_routine -- XLogReaderRoutine for underlying XLogReader
319 : * prepare_write, do_write, update_progress --
320 : * callbacks that perform the use-case dependent, actual, work.
321 : *
322 : * Needs to be called while in a memory context that's at least as long lived
323 : * as the decoding context because further memory contexts will be created
324 : * inside it.
325 : *
326 : * Returns an initialized decoding context after calling the output plugin's
327 : * startup function.
328 : */
329 : LogicalDecodingContext *
330 890 : CreateInitDecodingContext(const char *plugin,
331 : List *output_plugin_options,
332 : bool need_full_snapshot,
333 : XLogRecPtr restart_lsn,
334 : XLogReaderRoutine *xl_routine,
335 : LogicalOutputPluginWriterPrepareWrite prepare_write,
336 : LogicalOutputPluginWriterWrite do_write,
337 : LogicalOutputPluginWriterUpdateProgress update_progress)
338 : {
339 890 : TransactionId xmin_horizon = InvalidTransactionId;
340 : ReplicationSlot *slot;
341 : NameData plugin_name;
342 : LogicalDecodingContext *ctx;
343 : MemoryContext old_context;
344 :
345 : /*
346 : * On a standby, this check is also required while creating the slot.
347 : * Check the comments in the function.
348 : */
349 890 : CheckLogicalDecodingRequirements();
350 :
351 : /* shorter lines... */
352 890 : slot = MyReplicationSlot;
353 :
354 : /* first some sanity checks that are unlikely to be violated */
355 890 : if (slot == NULL)
356 0 : elog(ERROR, "cannot perform logical decoding without an acquired slot");
357 :
358 890 : if (plugin == NULL)
359 0 : elog(ERROR, "cannot initialize logical decoding without a specified plugin");
360 :
361 : /* Make sure the passed slot is suitable. These are user facing errors. */
362 890 : if (SlotIsPhysical(slot))
363 0 : ereport(ERROR,
364 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
365 : errmsg("cannot use physical replication slot for logical decoding")));
366 :
367 890 : if (slot->data.database != MyDatabaseId)
368 0 : ereport(ERROR,
369 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
370 : errmsg("replication slot \"%s\" was not created in this database",
371 : NameStr(slot->data.name))));
372 :
373 1514 : if (IsTransactionState() &&
374 624 : GetTopTransactionIdIfAny() != InvalidTransactionId)
375 4 : ereport(ERROR,
376 : (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
377 : errmsg("cannot create logical replication slot in transaction that has performed writes")));
378 :
379 : /*
380 : * Register output plugin name with slot. We need the mutex to avoid
381 : * concurrent reading of a partially copied string. But we don't want any
382 : * complicated code while holding a spinlock, so do namestrcpy() outside.
383 : */
384 886 : namestrcpy(&plugin_name, plugin);
385 886 : SpinLockAcquire(&slot->mutex);
386 886 : slot->data.plugin = plugin_name;
387 886 : SpinLockRelease(&slot->mutex);
388 :
389 886 : if (XLogRecPtrIsInvalid(restart_lsn))
390 874 : ReplicationSlotReserveWal();
391 : else
392 : {
393 12 : SpinLockAcquire(&slot->mutex);
394 12 : slot->data.restart_lsn = restart_lsn;
395 12 : SpinLockRelease(&slot->mutex);
396 : }
397 :
398 : /* ----
399 : * This is a bit tricky: We need to determine a safe xmin horizon to start
400 : * decoding from, to avoid starting from a running xacts record referring
401 : * to xids whose rows have been vacuumed or pruned
402 : * already. GetOldestSafeDecodingTransactionId() returns such a value, but
403 : * without further interlock its return value might immediately be out of
404 : * date.
405 : *
406 : * So we have to acquire the ProcArrayLock to prevent computation of new
407 : * xmin horizons by other backends, get the safe decoding xid, and inform
408 : * the slot machinery about the new limit. Once that's done the
409 : * ProcArrayLock can be released as the slot machinery now is
410 : * protecting against vacuum.
411 : *
412 : * Note that, temporarily, the data, not just the catalog, xmin has to be
413 : * reserved if a data snapshot is to be exported. Otherwise the initial
414 : * data snapshot created here is not guaranteed to be valid. After that
415 : * the data xmin doesn't need to be managed anymore and the global xmin
416 : * should be recomputed. As we are fine with losing the pegged data xmin
417 : * after crash - no chance a snapshot would get exported anymore - we can
418 : * get away with just setting the slot's
419 : * effective_xmin. ReplicationSlotRelease will reset it again.
420 : *
421 : * ----
422 : */
423 886 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
424 :
425 886 : xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
426 :
427 886 : SpinLockAcquire(&slot->mutex);
428 886 : slot->effective_catalog_xmin = xmin_horizon;
429 886 : slot->data.catalog_xmin = xmin_horizon;
430 886 : if (need_full_snapshot)
431 382 : slot->effective_xmin = xmin_horizon;
432 886 : SpinLockRelease(&slot->mutex);
433 :
434 886 : ReplicationSlotsComputeRequiredXmin(true);
435 :
436 886 : LWLockRelease(ProcArrayLock);
437 :
438 886 : ReplicationSlotMarkDirty();
439 886 : ReplicationSlotSave();
440 :
441 886 : ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
442 : need_full_snapshot, false, true,
443 : xl_routine, prepare_write, do_write,
444 : update_progress);
445 :
446 : /* call output plugin initialization callback */
447 884 : old_context = MemoryContextSwitchTo(ctx->context);
448 884 : if (ctx->callbacks.startup_cb != NULL)
449 884 : startup_cb_wrapper(ctx, &ctx->options, true);
450 884 : MemoryContextSwitchTo(old_context);
451 :
452 : /*
453 : * We allow decoding of prepared transactions when the two_phase is
454 : * enabled at the time of slot creation, or when the two_phase option is
455 : * given at the streaming start, provided the plugin supports all the
456 : * callbacks for two-phase.
457 : */
458 884 : ctx->twophase &= slot->data.two_phase;
459 :
460 884 : ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
461 :
462 884 : return ctx;
463 : }
464 :
465 : /*
466 : * Create a new decoding context, for a logical slot that has previously been
467 : * used already.
468 : *
469 : * start_lsn
470 : * The LSN at which to start decoding. If InvalidXLogRecPtr, restart
471 : * from the slot's confirmed_flush; otherwise, start from the specified
472 : * location (but move it forwards to confirmed_flush if it's older than
473 : * that, see below).
474 : *
475 : * output_plugin_options
476 : * options passed to the output plugin.
477 : *
478 : * fast_forward
479 : * bypass the generation of logical changes.
480 : *
481 : * xl_routine
482 : * XLogReaderRoutine used by underlying xlogreader
483 : *
484 : * prepare_write, do_write, update_progress
485 : * callbacks that have to be filled to perform the use-case dependent,
486 : * actual work.
487 : *
488 : * Needs to be called while in a memory context that's at least as long lived
489 : * as the decoding context because further memory contexts will be created
490 : * inside it.
491 : *
492 : * Returns an initialized decoding context after calling the output plugin's
493 : * startup function.
494 : */
495 : LogicalDecodingContext *
496 1198 : CreateDecodingContext(XLogRecPtr start_lsn,
497 : List *output_plugin_options,
498 : bool fast_forward,
499 : XLogReaderRoutine *xl_routine,
500 : LogicalOutputPluginWriterPrepareWrite prepare_write,
501 : LogicalOutputPluginWriterWrite do_write,
502 : LogicalOutputPluginWriterUpdateProgress update_progress)
503 : {
504 : LogicalDecodingContext *ctx;
505 : ReplicationSlot *slot;
506 : MemoryContext old_context;
507 :
508 : /* shorter lines... */
509 1198 : slot = MyReplicationSlot;
510 :
511 : /* first some sanity checks that are unlikely to be violated */
512 1198 : if (slot == NULL)
513 0 : elog(ERROR, "cannot perform logical decoding without an acquired slot");
514 :
515 : /* make sure the passed slot is suitable, these are user facing errors */
516 1198 : if (SlotIsPhysical(slot))
517 2 : ereport(ERROR,
518 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
519 : errmsg("cannot use physical replication slot for logical decoding")));
520 :
521 : /*
522 : * We need to access the system tables during decoding to build the
523 : * logical changes unless we are in fast_forward mode where no changes are
524 : * generated.
525 : */
526 1196 : if (slot->data.database != MyDatabaseId && !fast_forward)
527 6 : ereport(ERROR,
528 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
529 : errmsg("replication slot \"%s\" was not created in this database",
530 : NameStr(slot->data.name))));
531 :
532 : /*
533 : * The slots being synced from the primary can't be used for decoding as
534 : * they are used after failover. However, we do allow advancing the LSNs
535 : * during the synchronization of slots. See update_local_synced_slot.
536 : */
537 1190 : if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
538 2 : ereport(ERROR,
539 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
540 : errmsg("cannot use replication slot \"%s\" for logical decoding",
541 : NameStr(slot->data.name)),
542 : errdetail("This replication slot is being synchronized from the primary server."),
543 : errhint("Specify another replication slot."));
544 :
545 : /* slot must be valid to allow decoding */
546 : Assert(slot->data.invalidated == RS_INVAL_NONE);
547 : Assert(slot->data.restart_lsn != InvalidXLogRecPtr);
548 :
549 1188 : if (start_lsn == InvalidXLogRecPtr)
550 : {
551 : /* continue from last position */
552 710 : start_lsn = slot->data.confirmed_flush;
553 : }
554 478 : else if (start_lsn < slot->data.confirmed_flush)
555 : {
556 : /*
557 : * It might seem like we should error out in this case, but it's
558 : * pretty common for a client to acknowledge a LSN it doesn't have to
559 : * do anything for, and thus didn't store persistently, because the
560 : * xlog records didn't result in anything relevant for logical
561 : * decoding. Clients have to be able to do that to support synchronous
562 : * replication.
563 : *
564 : * Starting at a different LSN than requested might not catch certain
565 : * kinds of client errors; so the client may wish to check that
566 : * confirmed_flush_lsn matches its expectations.
567 : */
568 36 : elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
569 : LSN_FORMAT_ARGS(start_lsn),
570 : LSN_FORMAT_ARGS(slot->data.confirmed_flush));
571 :
572 36 : start_lsn = slot->data.confirmed_flush;
573 : }
574 :
575 1188 : ctx = StartupDecodingContext(output_plugin_options,
576 : start_lsn, InvalidTransactionId, false,
577 : fast_forward, false, xl_routine, prepare_write,
578 : do_write, update_progress);
579 :
580 : /* call output plugin initialization callback */
581 1188 : old_context = MemoryContextSwitchTo(ctx->context);
582 1188 : if (ctx->callbacks.startup_cb != NULL)
583 1150 : startup_cb_wrapper(ctx, &ctx->options, false);
584 1182 : MemoryContextSwitchTo(old_context);
585 :
586 : /*
587 : * We allow decoding of prepared transactions when the two_phase is
588 : * enabled at the time of slot creation, or when the two_phase option is
589 : * given at the streaming start, provided the plugin supports all the
590 : * callbacks for two-phase.
591 : */
592 1182 : ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
593 :
594 : /* Mark slot to allow two_phase decoding if not already marked */
595 1182 : if (ctx->twophase && !slot->data.two_phase)
596 : {
597 16 : SpinLockAcquire(&slot->mutex);
598 16 : slot->data.two_phase = true;
599 16 : slot->data.two_phase_at = start_lsn;
600 16 : SpinLockRelease(&slot->mutex);
601 16 : ReplicationSlotMarkDirty();
602 16 : ReplicationSlotSave();
603 16 : SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
604 : }
605 :
606 1182 : ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
607 :
608 1182 : ereport(LOG,
609 : (errmsg("starting logical decoding for slot \"%s\"",
610 : NameStr(slot->data.name)),
611 : errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
612 : LSN_FORMAT_ARGS(slot->data.confirmed_flush),
613 : LSN_FORMAT_ARGS(slot->data.restart_lsn))));
614 :
615 1182 : return ctx;
616 : }
617 :
618 : /*
619 : * Returns true if a consistent initial decoding snapshot has been built.
620 : */
621 : bool
622 956 : DecodingContextReady(LogicalDecodingContext *ctx)
623 : {
624 956 : return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
625 : }
626 :
627 : /*
628 : * Read from the decoding slot, until it is ready to start extracting changes.
629 : */
630 : void
631 872 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
632 : {
633 872 : ReplicationSlot *slot = ctx->slot;
634 :
635 : /* Initialize from where to start reading WAL. */
636 872 : XLogBeginRead(ctx->reader, slot->data.restart_lsn);
637 :
638 872 : elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
639 : LSN_FORMAT_ARGS(slot->data.restart_lsn));
640 :
641 : /* Wait for a consistent starting point */
642 : for (;;)
643 78 : {
644 : XLogRecord *record;
645 950 : char *err = NULL;
646 :
647 : /* the read_page callback waits for new WAL */
648 950 : record = XLogReadRecord(ctx->reader, &err);
649 950 : if (err)
650 0 : elog(ERROR, "could not find logical decoding starting point: %s", err);
651 950 : if (!record)
652 0 : elog(ERROR, "could not find logical decoding starting point");
653 :
654 950 : LogicalDecodingProcessRecord(ctx, ctx->reader);
655 :
656 : /* only continue till we found a consistent spot */
657 946 : if (DecodingContextReady(ctx))
658 868 : break;
659 :
660 78 : CHECK_FOR_INTERRUPTS();
661 : }
662 :
663 868 : SpinLockAcquire(&slot->mutex);
664 868 : slot->data.confirmed_flush = ctx->reader->EndRecPtr;
665 868 : if (slot->data.two_phase)
666 18 : slot->data.two_phase_at = ctx->reader->EndRecPtr;
667 868 : SpinLockRelease(&slot->mutex);
668 868 : }
669 :
670 : /*
671 : * Free a previously allocated decoding context, invoking the shutdown
672 : * callback if necessary.
673 : */
674 : void
675 1698 : FreeDecodingContext(LogicalDecodingContext *ctx)
676 : {
677 1698 : if (ctx->callbacks.shutdown_cb != NULL)
678 1660 : shutdown_cb_wrapper(ctx);
679 :
680 1698 : ReorderBufferFree(ctx->reorder);
681 1698 : FreeSnapshotBuilder(ctx->snapshot_builder);
682 1698 : XLogReaderFree(ctx->reader);
683 1698 : MemoryContextDelete(ctx->context);
684 1698 : }
685 :
686 : /*
687 : * Prepare a write using the context's output routine.
688 : */
689 : void
690 672244 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
691 : {
692 672244 : if (!ctx->accept_writes)
693 0 : elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
694 :
695 672244 : ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
696 672244 : ctx->prepared_write = true;
697 672244 : }
698 :
699 : /*
700 : * Perform a write using the context's output routine.
701 : */
702 : void
703 672244 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
704 : {
705 672244 : if (!ctx->prepared_write)
706 0 : elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
707 :
708 672244 : ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
709 672238 : ctx->prepared_write = false;
710 672238 : }
711 :
712 : /*
713 : * Update progress tracking (if supported).
714 : */
715 : void
716 7920 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
717 : bool skipped_xact)
718 : {
719 7920 : if (!ctx->update_progress)
720 3172 : return;
721 :
722 4748 : ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
723 : skipped_xact);
724 : }
725 :
726 : /*
727 : * Load the output plugin, lookup its output plugin init function, and check
728 : * that it provides the required callbacks.
729 : */
730 : static void
731 2036 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
732 : {
733 : LogicalOutputPluginInit plugin_init;
734 :
735 2034 : plugin_init = (LogicalOutputPluginInit)
736 2036 : load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
737 :
738 2034 : if (plugin_init == NULL)
739 0 : elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
740 :
741 : /* ask the output plugin to fill the callback struct */
742 2034 : plugin_init(callbacks);
743 :
744 2034 : if (callbacks->begin_cb == NULL)
745 0 : elog(ERROR, "output plugins have to register a begin callback");
746 2034 : if (callbacks->change_cb == NULL)
747 0 : elog(ERROR, "output plugins have to register a change callback");
748 2034 : if (callbacks->commit_cb == NULL)
749 0 : elog(ERROR, "output plugins have to register a commit callback");
750 2034 : }
751 :
752 : static void
753 20 : output_plugin_error_callback(void *arg)
754 : {
755 20 : LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
756 :
757 : /* not all callbacks have an associated LSN */
758 20 : if (state->report_location != InvalidXLogRecPtr)
759 14 : errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
760 14 : NameStr(state->ctx->slot->data.name),
761 14 : NameStr(state->ctx->slot->data.plugin),
762 : state->callback_name,
763 14 : LSN_FORMAT_ARGS(state->report_location));
764 : else
765 6 : errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
766 6 : NameStr(state->ctx->slot->data.name),
767 6 : NameStr(state->ctx->slot->data.plugin),
768 : state->callback_name);
769 20 : }
770 :
771 : static void
772 2034 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
773 : {
774 : LogicalErrorCallbackState state;
775 : ErrorContextCallback errcallback;
776 :
777 : Assert(!ctx->fast_forward);
778 :
779 : /* Push callback + info on the error context stack */
780 2034 : state.ctx = ctx;
781 2034 : state.callback_name = "startup";
782 2034 : state.report_location = InvalidXLogRecPtr;
783 2034 : errcallback.callback = output_plugin_error_callback;
784 2034 : errcallback.arg = &state;
785 2034 : errcallback.previous = error_context_stack;
786 2034 : error_context_stack = &errcallback;
787 :
788 : /* set output state */
789 2034 : ctx->accept_writes = false;
790 2034 : ctx->end_xact = false;
791 :
792 : /* do the actual work: call callback */
793 2034 : ctx->callbacks.startup_cb(ctx, opt, is_init);
794 :
795 : /* Pop the error context stack */
796 2028 : error_context_stack = errcallback.previous;
797 2028 : }
798 :
799 : static void
800 1660 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
801 : {
802 : LogicalErrorCallbackState state;
803 : ErrorContextCallback errcallback;
804 :
805 : Assert(!ctx->fast_forward);
806 :
807 : /* Push callback + info on the error context stack */
808 1660 : state.ctx = ctx;
809 1660 : state.callback_name = "shutdown";
810 1660 : state.report_location = InvalidXLogRecPtr;
811 1660 : errcallback.callback = output_plugin_error_callback;
812 1660 : errcallback.arg = &state;
813 1660 : errcallback.previous = error_context_stack;
814 1660 : error_context_stack = &errcallback;
815 :
816 : /* set output state */
817 1660 : ctx->accept_writes = false;
818 1660 : ctx->end_xact = false;
819 :
820 : /* do the actual work: call callback */
821 1660 : ctx->callbacks.shutdown_cb(ctx);
822 :
823 : /* Pop the error context stack */
824 1660 : error_context_stack = errcallback.previous;
825 1660 : }
826 :
827 :
828 : /*
829 : * Callbacks for ReorderBuffer which add in some more information and then call
830 : * output_plugin.h plugins.
831 : */
832 : static void
833 2378 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
834 : {
835 2378 : LogicalDecodingContext *ctx = cache->private_data;
836 : LogicalErrorCallbackState state;
837 : ErrorContextCallback errcallback;
838 :
839 : Assert(!ctx->fast_forward);
840 :
841 : /* Push callback + info on the error context stack */
842 2378 : state.ctx = ctx;
843 2378 : state.callback_name = "begin";
844 2378 : state.report_location = txn->first_lsn;
845 2378 : errcallback.callback = output_plugin_error_callback;
846 2378 : errcallback.arg = &state;
847 2378 : errcallback.previous = error_context_stack;
848 2378 : error_context_stack = &errcallback;
849 :
850 : /* set output state */
851 2378 : ctx->accept_writes = true;
852 2378 : ctx->write_xid = txn->xid;
853 2378 : ctx->write_location = txn->first_lsn;
854 2378 : ctx->end_xact = false;
855 :
856 : /* do the actual work: call callback */
857 2378 : ctx->callbacks.begin_cb(ctx, txn);
858 :
859 : /* Pop the error context stack */
860 2378 : error_context_stack = errcallback.previous;
861 2378 : }
862 :
863 : static void
864 2372 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
865 : XLogRecPtr commit_lsn)
866 : {
867 2372 : LogicalDecodingContext *ctx = cache->private_data;
868 : LogicalErrorCallbackState state;
869 : ErrorContextCallback errcallback;
870 :
871 : Assert(!ctx->fast_forward);
872 :
873 : /* Push callback + info on the error context stack */
874 2372 : state.ctx = ctx;
875 2372 : state.callback_name = "commit";
876 2372 : state.report_location = txn->final_lsn; /* beginning of commit record */
877 2372 : errcallback.callback = output_plugin_error_callback;
878 2372 : errcallback.arg = &state;
879 2372 : errcallback.previous = error_context_stack;
880 2372 : error_context_stack = &errcallback;
881 :
882 : /* set output state */
883 2372 : ctx->accept_writes = true;
884 2372 : ctx->write_xid = txn->xid;
885 2372 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
886 2372 : ctx->end_xact = true;
887 :
888 : /* do the actual work: call callback */
889 2372 : ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
890 :
891 : /* Pop the error context stack */
892 2370 : error_context_stack = errcallback.previous;
893 2370 : }
894 :
895 : /*
896 : * The functionality of begin_prepare is quite similar to begin with the
897 : * exception that this will have gid (global transaction id) information which
898 : * can be used by plugin. Now, we thought about extending the existing begin
899 : * but that would break the replication protocol and additionally this looks
900 : * cleaner.
901 : */
902 : static void
903 60 : begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
904 : {
905 60 : LogicalDecodingContext *ctx = cache->private_data;
906 : LogicalErrorCallbackState state;
907 : ErrorContextCallback errcallback;
908 :
909 : Assert(!ctx->fast_forward);
910 :
911 : /* We're only supposed to call this when two-phase commits are supported */
912 : Assert(ctx->twophase);
913 :
914 : /* Push callback + info on the error context stack */
915 60 : state.ctx = ctx;
916 60 : state.callback_name = "begin_prepare";
917 60 : state.report_location = txn->first_lsn;
918 60 : errcallback.callback = output_plugin_error_callback;
919 60 : errcallback.arg = &state;
920 60 : errcallback.previous = error_context_stack;
921 60 : error_context_stack = &errcallback;
922 :
923 : /* set output state */
924 60 : ctx->accept_writes = true;
925 60 : ctx->write_xid = txn->xid;
926 60 : ctx->write_location = txn->first_lsn;
927 60 : ctx->end_xact = false;
928 :
929 : /*
930 : * If the plugin supports two-phase commits then begin prepare callback is
931 : * mandatory
932 : */
933 60 : if (ctx->callbacks.begin_prepare_cb == NULL)
934 0 : ereport(ERROR,
935 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
936 : errmsg("logical replication at prepare time requires a %s callback",
937 : "begin_prepare_cb")));
938 :
939 : /* do the actual work: call callback */
940 60 : ctx->callbacks.begin_prepare_cb(ctx, txn);
941 :
942 : /* Pop the error context stack */
943 60 : error_context_stack = errcallback.previous;
944 60 : }
945 :
946 : static void
947 60 : prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
948 : XLogRecPtr prepare_lsn)
949 : {
950 60 : LogicalDecodingContext *ctx = cache->private_data;
951 : LogicalErrorCallbackState state;
952 : ErrorContextCallback errcallback;
953 :
954 : Assert(!ctx->fast_forward);
955 :
956 : /* We're only supposed to call this when two-phase commits are supported */
957 : Assert(ctx->twophase);
958 :
959 : /* Push callback + info on the error context stack */
960 60 : state.ctx = ctx;
961 60 : state.callback_name = "prepare";
962 60 : state.report_location = txn->final_lsn; /* beginning of prepare record */
963 60 : errcallback.callback = output_plugin_error_callback;
964 60 : errcallback.arg = &state;
965 60 : errcallback.previous = error_context_stack;
966 60 : error_context_stack = &errcallback;
967 :
968 : /* set output state */
969 60 : ctx->accept_writes = true;
970 60 : ctx->write_xid = txn->xid;
971 60 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
972 60 : ctx->end_xact = true;
973 :
974 : /*
975 : * If the plugin supports two-phase commits then prepare callback is
976 : * mandatory
977 : */
978 60 : if (ctx->callbacks.prepare_cb == NULL)
979 0 : ereport(ERROR,
980 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
981 : errmsg("logical replication at prepare time requires a %s callback",
982 : "prepare_cb")));
983 :
984 : /* do the actual work: call callback */
985 60 : ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
986 :
987 : /* Pop the error context stack */
988 60 : error_context_stack = errcallback.previous;
989 60 : }
990 :
991 : static void
992 64 : commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
993 : XLogRecPtr commit_lsn)
994 : {
995 64 : LogicalDecodingContext *ctx = cache->private_data;
996 : LogicalErrorCallbackState state;
997 : ErrorContextCallback errcallback;
998 :
999 : Assert(!ctx->fast_forward);
1000 :
1001 : /* We're only supposed to call this when two-phase commits are supported */
1002 : Assert(ctx->twophase);
1003 :
1004 : /* Push callback + info on the error context stack */
1005 64 : state.ctx = ctx;
1006 64 : state.callback_name = "commit_prepared";
1007 64 : state.report_location = txn->final_lsn; /* beginning of commit record */
1008 64 : errcallback.callback = output_plugin_error_callback;
1009 64 : errcallback.arg = &state;
1010 64 : errcallback.previous = error_context_stack;
1011 64 : error_context_stack = &errcallback;
1012 :
1013 : /* set output state */
1014 64 : ctx->accept_writes = true;
1015 64 : ctx->write_xid = txn->xid;
1016 64 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
1017 64 : ctx->end_xact = true;
1018 :
1019 : /*
1020 : * If the plugin support two-phase commits then commit prepared callback
1021 : * is mandatory
1022 : */
1023 64 : if (ctx->callbacks.commit_prepared_cb == NULL)
1024 0 : ereport(ERROR,
1025 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1026 : errmsg("logical replication at prepare time requires a %s callback",
1027 : "commit_prepared_cb")));
1028 :
1029 : /* do the actual work: call callback */
1030 64 : ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
1031 :
1032 : /* Pop the error context stack */
1033 64 : error_context_stack = errcallback.previous;
1034 64 : }
1035 :
1036 : static void
1037 22 : rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1038 : XLogRecPtr prepare_end_lsn,
1039 : TimestampTz prepare_time)
1040 : {
1041 22 : LogicalDecodingContext *ctx = cache->private_data;
1042 : LogicalErrorCallbackState state;
1043 : ErrorContextCallback errcallback;
1044 :
1045 : Assert(!ctx->fast_forward);
1046 :
1047 : /* We're only supposed to call this when two-phase commits are supported */
1048 : Assert(ctx->twophase);
1049 :
1050 : /* Push callback + info on the error context stack */
1051 22 : state.ctx = ctx;
1052 22 : state.callback_name = "rollback_prepared";
1053 22 : state.report_location = txn->final_lsn; /* beginning of commit record */
1054 22 : errcallback.callback = output_plugin_error_callback;
1055 22 : errcallback.arg = &state;
1056 22 : errcallback.previous = error_context_stack;
1057 22 : error_context_stack = &errcallback;
1058 :
1059 : /* set output state */
1060 22 : ctx->accept_writes = true;
1061 22 : ctx->write_xid = txn->xid;
1062 22 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
1063 22 : ctx->end_xact = true;
1064 :
1065 : /*
1066 : * If the plugin support two-phase commits then rollback prepared callback
1067 : * is mandatory
1068 : */
1069 22 : if (ctx->callbacks.rollback_prepared_cb == NULL)
1070 0 : ereport(ERROR,
1071 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1072 : errmsg("logical replication at prepare time requires a %s callback",
1073 : "rollback_prepared_cb")));
1074 :
1075 : /* do the actual work: call callback */
1076 22 : ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
1077 : prepare_time);
1078 :
1079 : /* Pop the error context stack */
1080 22 : error_context_stack = errcallback.previous;
1081 22 : }
1082 :
1083 : static void
1084 315994 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1085 : Relation relation, ReorderBufferChange *change)
1086 : {
1087 315994 : LogicalDecodingContext *ctx = cache->private_data;
1088 : LogicalErrorCallbackState state;
1089 : ErrorContextCallback errcallback;
1090 :
1091 : Assert(!ctx->fast_forward);
1092 :
1093 : /* Push callback + info on the error context stack */
1094 315994 : state.ctx = ctx;
1095 315994 : state.callback_name = "change";
1096 315994 : state.report_location = change->lsn;
1097 315994 : errcallback.callback = output_plugin_error_callback;
1098 315994 : errcallback.arg = &state;
1099 315994 : errcallback.previous = error_context_stack;
1100 315994 : error_context_stack = &errcallback;
1101 :
1102 : /* set output state */
1103 315994 : ctx->accept_writes = true;
1104 315994 : ctx->write_xid = txn->xid;
1105 :
1106 : /*
1107 : * Report this change's lsn so replies from clients can give an up-to-date
1108 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1109 : * receipt of this transaction, but it might allow another transaction's
1110 : * commit to be confirmed with one message.
1111 : */
1112 315994 : ctx->write_location = change->lsn;
1113 :
1114 315994 : ctx->end_xact = false;
1115 :
1116 315994 : ctx->callbacks.change_cb(ctx, txn, relation, change);
1117 :
1118 : /* Pop the error context stack */
1119 315988 : error_context_stack = errcallback.previous;
1120 315988 : }
1121 :
1122 : static void
1123 44 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1124 : int nrelations, Relation relations[], ReorderBufferChange *change)
1125 : {
1126 44 : LogicalDecodingContext *ctx = cache->private_data;
1127 : LogicalErrorCallbackState state;
1128 : ErrorContextCallback errcallback;
1129 :
1130 : Assert(!ctx->fast_forward);
1131 :
1132 44 : if (!ctx->callbacks.truncate_cb)
1133 0 : return;
1134 :
1135 : /* Push callback + info on the error context stack */
1136 44 : state.ctx = ctx;
1137 44 : state.callback_name = "truncate";
1138 44 : state.report_location = change->lsn;
1139 44 : errcallback.callback = output_plugin_error_callback;
1140 44 : errcallback.arg = &state;
1141 44 : errcallback.previous = error_context_stack;
1142 44 : error_context_stack = &errcallback;
1143 :
1144 : /* set output state */
1145 44 : ctx->accept_writes = true;
1146 44 : ctx->write_xid = txn->xid;
1147 :
1148 : /*
1149 : * Report this change's lsn so replies from clients can give an up-to-date
1150 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1151 : * receipt of this transaction, but it might allow another transaction's
1152 : * commit to be confirmed with one message.
1153 : */
1154 44 : ctx->write_location = change->lsn;
1155 :
1156 44 : ctx->end_xact = false;
1157 :
1158 44 : ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
1159 :
1160 : /* Pop the error context stack */
1161 44 : error_context_stack = errcallback.previous;
1162 : }
1163 :
1164 : bool
1165 296 : filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
1166 : const char *gid)
1167 : {
1168 : LogicalErrorCallbackState state;
1169 : ErrorContextCallback errcallback;
1170 : bool ret;
1171 :
1172 : Assert(!ctx->fast_forward);
1173 :
1174 : /* Push callback + info on the error context stack */
1175 296 : state.ctx = ctx;
1176 296 : state.callback_name = "filter_prepare";
1177 296 : state.report_location = InvalidXLogRecPtr;
1178 296 : errcallback.callback = output_plugin_error_callback;
1179 296 : errcallback.arg = &state;
1180 296 : errcallback.previous = error_context_stack;
1181 296 : error_context_stack = &errcallback;
1182 :
1183 : /* set output state */
1184 296 : ctx->accept_writes = false;
1185 296 : ctx->end_xact = false;
1186 :
1187 : /* do the actual work: call callback */
1188 296 : ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
1189 :
1190 : /* Pop the error context stack */
1191 296 : error_context_stack = errcallback.previous;
1192 :
1193 296 : return ret;
1194 : }
1195 :
1196 : bool
1197 3383146 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
1198 : {
1199 : LogicalErrorCallbackState state;
1200 : ErrorContextCallback errcallback;
1201 : bool ret;
1202 :
1203 : Assert(!ctx->fast_forward);
1204 :
1205 : /* Push callback + info on the error context stack */
1206 3383146 : state.ctx = ctx;
1207 3383146 : state.callback_name = "filter_by_origin";
1208 3383146 : state.report_location = InvalidXLogRecPtr;
1209 3383146 : errcallback.callback = output_plugin_error_callback;
1210 3383146 : errcallback.arg = &state;
1211 3383146 : errcallback.previous = error_context_stack;
1212 3383146 : error_context_stack = &errcallback;
1213 :
1214 : /* set output state */
1215 3383146 : ctx->accept_writes = false;
1216 3383146 : ctx->end_xact = false;
1217 :
1218 : /* do the actual work: call callback */
1219 3383146 : ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
1220 :
1221 : /* Pop the error context stack */
1222 3383146 : error_context_stack = errcallback.previous;
1223 :
1224 3383146 : return ret;
1225 : }
1226 :
1227 : static void
1228 32 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1229 : XLogRecPtr message_lsn, bool transactional,
1230 : const char *prefix, Size message_size, const char *message)
1231 : {
1232 32 : LogicalDecodingContext *ctx = cache->private_data;
1233 : LogicalErrorCallbackState state;
1234 : ErrorContextCallback errcallback;
1235 :
1236 : Assert(!ctx->fast_forward);
1237 :
1238 32 : if (ctx->callbacks.message_cb == NULL)
1239 0 : return;
1240 :
1241 : /* Push callback + info on the error context stack */
1242 32 : state.ctx = ctx;
1243 32 : state.callback_name = "message";
1244 32 : state.report_location = message_lsn;
1245 32 : errcallback.callback = output_plugin_error_callback;
1246 32 : errcallback.arg = &state;
1247 32 : errcallback.previous = error_context_stack;
1248 32 : error_context_stack = &errcallback;
1249 :
1250 : /* set output state */
1251 32 : ctx->accept_writes = true;
1252 32 : ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
1253 32 : ctx->write_location = message_lsn;
1254 32 : ctx->end_xact = false;
1255 :
1256 : /* do the actual work: call callback */
1257 32 : ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
1258 : message_size, message);
1259 :
1260 : /* Pop the error context stack */
1261 32 : error_context_stack = errcallback.previous;
1262 : }
1263 :
1264 : static void
1265 1372 : stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1266 : XLogRecPtr first_lsn)
1267 : {
1268 1372 : LogicalDecodingContext *ctx = cache->private_data;
1269 : LogicalErrorCallbackState state;
1270 : ErrorContextCallback errcallback;
1271 :
1272 : Assert(!ctx->fast_forward);
1273 :
1274 : /* We're only supposed to call this when streaming is supported. */
1275 : Assert(ctx->streaming);
1276 :
1277 : /* Push callback + info on the error context stack */
1278 1372 : state.ctx = ctx;
1279 1372 : state.callback_name = "stream_start";
1280 1372 : state.report_location = first_lsn;
1281 1372 : errcallback.callback = output_plugin_error_callback;
1282 1372 : errcallback.arg = &state;
1283 1372 : errcallback.previous = error_context_stack;
1284 1372 : error_context_stack = &errcallback;
1285 :
1286 : /* set output state */
1287 1372 : ctx->accept_writes = true;
1288 1372 : ctx->write_xid = txn->xid;
1289 :
1290 : /*
1291 : * Report this message's lsn so replies from clients can give an
1292 : * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1293 : * confirm receipt of this transaction, but it might allow another
1294 : * transaction's commit to be confirmed with one message.
1295 : */
1296 1372 : ctx->write_location = first_lsn;
1297 :
1298 1372 : ctx->end_xact = false;
1299 :
1300 : /* in streaming mode, stream_start_cb is required */
1301 1372 : if (ctx->callbacks.stream_start_cb == NULL)
1302 0 : ereport(ERROR,
1303 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1304 : errmsg("logical streaming requires a %s callback",
1305 : "stream_start_cb")));
1306 :
1307 1372 : ctx->callbacks.stream_start_cb(ctx, txn);
1308 :
1309 : /* Pop the error context stack */
1310 1372 : error_context_stack = errcallback.previous;
1311 1372 : }
1312 :
1313 : static void
1314 1372 : stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1315 : XLogRecPtr last_lsn)
1316 : {
1317 1372 : LogicalDecodingContext *ctx = cache->private_data;
1318 : LogicalErrorCallbackState state;
1319 : ErrorContextCallback errcallback;
1320 :
1321 : Assert(!ctx->fast_forward);
1322 :
1323 : /* We're only supposed to call this when streaming is supported. */
1324 : Assert(ctx->streaming);
1325 :
1326 : /* Push callback + info on the error context stack */
1327 1372 : state.ctx = ctx;
1328 1372 : state.callback_name = "stream_stop";
1329 1372 : state.report_location = last_lsn;
1330 1372 : errcallback.callback = output_plugin_error_callback;
1331 1372 : errcallback.arg = &state;
1332 1372 : errcallback.previous = error_context_stack;
1333 1372 : error_context_stack = &errcallback;
1334 :
1335 : /* set output state */
1336 1372 : ctx->accept_writes = true;
1337 1372 : ctx->write_xid = txn->xid;
1338 :
1339 : /*
1340 : * Report this message's lsn so replies from clients can give an
1341 : * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1342 : * confirm receipt of this transaction, but it might allow another
1343 : * transaction's commit to be confirmed with one message.
1344 : */
1345 1372 : ctx->write_location = last_lsn;
1346 :
1347 1372 : ctx->end_xact = false;
1348 :
1349 : /* in streaming mode, stream_stop_cb is required */
1350 1372 : if (ctx->callbacks.stream_stop_cb == NULL)
1351 0 : ereport(ERROR,
1352 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1353 : errmsg("logical streaming requires a %s callback",
1354 : "stream_stop_cb")));
1355 :
1356 1372 : ctx->callbacks.stream_stop_cb(ctx, txn);
1357 :
1358 : /* Pop the error context stack */
1359 1372 : error_context_stack = errcallback.previous;
1360 1372 : }
1361 :
1362 : static void
1363 60 : stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1364 : XLogRecPtr abort_lsn)
1365 : {
1366 60 : LogicalDecodingContext *ctx = cache->private_data;
1367 : LogicalErrorCallbackState state;
1368 : ErrorContextCallback errcallback;
1369 :
1370 : Assert(!ctx->fast_forward);
1371 :
1372 : /* We're only supposed to call this when streaming is supported. */
1373 : Assert(ctx->streaming);
1374 :
1375 : /* Push callback + info on the error context stack */
1376 60 : state.ctx = ctx;
1377 60 : state.callback_name = "stream_abort";
1378 60 : state.report_location = abort_lsn;
1379 60 : errcallback.callback = output_plugin_error_callback;
1380 60 : errcallback.arg = &state;
1381 60 : errcallback.previous = error_context_stack;
1382 60 : error_context_stack = &errcallback;
1383 :
1384 : /* set output state */
1385 60 : ctx->accept_writes = true;
1386 60 : ctx->write_xid = txn->xid;
1387 60 : ctx->write_location = abort_lsn;
1388 60 : ctx->end_xact = true;
1389 :
1390 : /* in streaming mode, stream_abort_cb is required */
1391 60 : if (ctx->callbacks.stream_abort_cb == NULL)
1392 0 : ereport(ERROR,
1393 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1394 : errmsg("logical streaming requires a %s callback",
1395 : "stream_abort_cb")));
1396 :
1397 60 : ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
1398 :
1399 : /* Pop the error context stack */
1400 60 : error_context_stack = errcallback.previous;
1401 60 : }
1402 :
1403 : static void
1404 30 : stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1405 : XLogRecPtr prepare_lsn)
1406 : {
1407 30 : LogicalDecodingContext *ctx = cache->private_data;
1408 : LogicalErrorCallbackState state;
1409 : ErrorContextCallback errcallback;
1410 :
1411 : Assert(!ctx->fast_forward);
1412 :
1413 : /*
1414 : * We're only supposed to call this when streaming and two-phase commits
1415 : * are supported.
1416 : */
1417 : Assert(ctx->streaming);
1418 : Assert(ctx->twophase);
1419 :
1420 : /* Push callback + info on the error context stack */
1421 30 : state.ctx = ctx;
1422 30 : state.callback_name = "stream_prepare";
1423 30 : state.report_location = txn->final_lsn;
1424 30 : errcallback.callback = output_plugin_error_callback;
1425 30 : errcallback.arg = &state;
1426 30 : errcallback.previous = error_context_stack;
1427 30 : error_context_stack = &errcallback;
1428 :
1429 : /* set output state */
1430 30 : ctx->accept_writes = true;
1431 30 : ctx->write_xid = txn->xid;
1432 30 : ctx->write_location = txn->end_lsn;
1433 30 : ctx->end_xact = true;
1434 :
1435 : /* in streaming mode with two-phase commits, stream_prepare_cb is required */
1436 30 : if (ctx->callbacks.stream_prepare_cb == NULL)
1437 0 : ereport(ERROR,
1438 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1439 : errmsg("logical streaming at prepare time requires a %s callback",
1440 : "stream_prepare_cb")));
1441 :
1442 30 : ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
1443 :
1444 : /* Pop the error context stack */
1445 30 : error_context_stack = errcallback.previous;
1446 30 : }
1447 :
1448 : static void
1449 102 : stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1450 : XLogRecPtr commit_lsn)
1451 : {
1452 102 : LogicalDecodingContext *ctx = cache->private_data;
1453 : LogicalErrorCallbackState state;
1454 : ErrorContextCallback errcallback;
1455 :
1456 : Assert(!ctx->fast_forward);
1457 :
1458 : /* We're only supposed to call this when streaming is supported. */
1459 : Assert(ctx->streaming);
1460 :
1461 : /* Push callback + info on the error context stack */
1462 102 : state.ctx = ctx;
1463 102 : state.callback_name = "stream_commit";
1464 102 : state.report_location = txn->final_lsn;
1465 102 : errcallback.callback = output_plugin_error_callback;
1466 102 : errcallback.arg = &state;
1467 102 : errcallback.previous = error_context_stack;
1468 102 : error_context_stack = &errcallback;
1469 :
1470 : /* set output state */
1471 102 : ctx->accept_writes = true;
1472 102 : ctx->write_xid = txn->xid;
1473 102 : ctx->write_location = txn->end_lsn;
1474 102 : ctx->end_xact = true;
1475 :
1476 : /* in streaming mode, stream_commit_cb is required */
1477 102 : if (ctx->callbacks.stream_commit_cb == NULL)
1478 0 : ereport(ERROR,
1479 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1480 : errmsg("logical streaming requires a %s callback",
1481 : "stream_commit_cb")));
1482 :
1483 102 : ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
1484 :
1485 : /* Pop the error context stack */
1486 102 : error_context_stack = errcallback.previous;
1487 102 : }
1488 :
1489 : static void
1490 352012 : stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1491 : Relation relation, ReorderBufferChange *change)
1492 : {
1493 352012 : LogicalDecodingContext *ctx = cache->private_data;
1494 : LogicalErrorCallbackState state;
1495 : ErrorContextCallback errcallback;
1496 :
1497 : Assert(!ctx->fast_forward);
1498 :
1499 : /* We're only supposed to call this when streaming is supported. */
1500 : Assert(ctx->streaming);
1501 :
1502 : /* Push callback + info on the error context stack */
1503 352012 : state.ctx = ctx;
1504 352012 : state.callback_name = "stream_change";
1505 352012 : state.report_location = change->lsn;
1506 352012 : errcallback.callback = output_plugin_error_callback;
1507 352012 : errcallback.arg = &state;
1508 352012 : errcallback.previous = error_context_stack;
1509 352012 : error_context_stack = &errcallback;
1510 :
1511 : /* set output state */
1512 352012 : ctx->accept_writes = true;
1513 352012 : ctx->write_xid = txn->xid;
1514 :
1515 : /*
1516 : * Report this change's lsn so replies from clients can give an up-to-date
1517 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1518 : * receipt of this transaction, but it might allow another transaction's
1519 : * commit to be confirmed with one message.
1520 : */
1521 352012 : ctx->write_location = change->lsn;
1522 :
1523 352012 : ctx->end_xact = false;
1524 :
1525 : /* in streaming mode, stream_change_cb is required */
1526 352012 : if (ctx->callbacks.stream_change_cb == NULL)
1527 0 : ereport(ERROR,
1528 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1529 : errmsg("logical streaming requires a %s callback",
1530 : "stream_change_cb")));
1531 :
1532 352012 : ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
1533 :
1534 : /* Pop the error context stack */
1535 352012 : error_context_stack = errcallback.previous;
1536 352012 : }
1537 :
1538 : static void
1539 6 : stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1540 : XLogRecPtr message_lsn, bool transactional,
1541 : const char *prefix, Size message_size, const char *message)
1542 : {
1543 6 : LogicalDecodingContext *ctx = cache->private_data;
1544 : LogicalErrorCallbackState state;
1545 : ErrorContextCallback errcallback;
1546 :
1547 : Assert(!ctx->fast_forward);
1548 :
1549 : /* We're only supposed to call this when streaming is supported. */
1550 : Assert(ctx->streaming);
1551 :
1552 : /* this callback is optional */
1553 6 : if (ctx->callbacks.stream_message_cb == NULL)
1554 0 : return;
1555 :
1556 : /* Push callback + info on the error context stack */
1557 6 : state.ctx = ctx;
1558 6 : state.callback_name = "stream_message";
1559 6 : state.report_location = message_lsn;
1560 6 : errcallback.callback = output_plugin_error_callback;
1561 6 : errcallback.arg = &state;
1562 6 : errcallback.previous = error_context_stack;
1563 6 : error_context_stack = &errcallback;
1564 :
1565 : /* set output state */
1566 6 : ctx->accept_writes = true;
1567 6 : ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
1568 6 : ctx->write_location = message_lsn;
1569 6 : ctx->end_xact = false;
1570 :
1571 : /* do the actual work: call callback */
1572 6 : ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
1573 : message_size, message);
1574 :
1575 : /* Pop the error context stack */
1576 6 : error_context_stack = errcallback.previous;
1577 : }
1578 :
1579 : static void
1580 0 : stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1581 : int nrelations, Relation relations[],
1582 : ReorderBufferChange *change)
1583 : {
1584 0 : LogicalDecodingContext *ctx = cache->private_data;
1585 : LogicalErrorCallbackState state;
1586 : ErrorContextCallback errcallback;
1587 :
1588 : Assert(!ctx->fast_forward);
1589 :
1590 : /* We're only supposed to call this when streaming is supported. */
1591 : Assert(ctx->streaming);
1592 :
1593 : /* this callback is optional */
1594 0 : if (!ctx->callbacks.stream_truncate_cb)
1595 0 : return;
1596 :
1597 : /* Push callback + info on the error context stack */
1598 0 : state.ctx = ctx;
1599 0 : state.callback_name = "stream_truncate";
1600 0 : state.report_location = change->lsn;
1601 0 : errcallback.callback = output_plugin_error_callback;
1602 0 : errcallback.arg = &state;
1603 0 : errcallback.previous = error_context_stack;
1604 0 : error_context_stack = &errcallback;
1605 :
1606 : /* set output state */
1607 0 : ctx->accept_writes = true;
1608 0 : ctx->write_xid = txn->xid;
1609 :
1610 : /*
1611 : * Report this change's lsn so replies from clients can give an up-to-date
1612 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1613 : * receipt of this transaction, but it might allow another transaction's
1614 : * commit to be confirmed with one message.
1615 : */
1616 0 : ctx->write_location = change->lsn;
1617 :
1618 0 : ctx->end_xact = false;
1619 :
1620 0 : ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
1621 :
1622 : /* Pop the error context stack */
1623 0 : error_context_stack = errcallback.previous;
1624 : }
1625 :
1626 : static void
1627 6198 : update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1628 : XLogRecPtr lsn)
1629 : {
1630 6198 : LogicalDecodingContext *ctx = cache->private_data;
1631 : LogicalErrorCallbackState state;
1632 : ErrorContextCallback errcallback;
1633 :
1634 : Assert(!ctx->fast_forward);
1635 :
1636 : /* Push callback + info on the error context stack */
1637 6198 : state.ctx = ctx;
1638 6198 : state.callback_name = "update_progress_txn";
1639 6198 : state.report_location = lsn;
1640 6198 : errcallback.callback = output_plugin_error_callback;
1641 6198 : errcallback.arg = &state;
1642 6198 : errcallback.previous = error_context_stack;
1643 6198 : error_context_stack = &errcallback;
1644 :
1645 : /* set output state */
1646 6198 : ctx->accept_writes = false;
1647 6198 : ctx->write_xid = txn->xid;
1648 :
1649 : /*
1650 : * Report this change's lsn so replies from clients can give an up-to-date
1651 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1652 : * receipt of this transaction, but it might allow another transaction's
1653 : * commit to be confirmed with one message.
1654 : */
1655 6198 : ctx->write_location = lsn;
1656 :
1657 6198 : ctx->end_xact = false;
1658 :
1659 6198 : OutputPluginUpdateProgress(ctx, false);
1660 :
1661 : /* Pop the error context stack */
1662 6198 : error_context_stack = errcallback.previous;
1663 6198 : }
1664 :
1665 : /*
1666 : * Set the required catalog xmin horizon for historic snapshots in the current
1667 : * replication slot.
1668 : *
1669 : * Note that in the most cases, we won't be able to immediately use the xmin
1670 : * to increase the xmin horizon: we need to wait till the client has confirmed
1671 : * receiving current_lsn with LogicalConfirmReceivedLocation().
1672 : */
1673 : void
1674 722 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
1675 : {
1676 722 : bool updated_xmin = false;
1677 : ReplicationSlot *slot;
1678 722 : bool got_new_xmin = false;
1679 :
1680 722 : slot = MyReplicationSlot;
1681 :
1682 : Assert(slot != NULL);
1683 :
1684 722 : SpinLockAcquire(&slot->mutex);
1685 :
1686 : /*
1687 : * don't overwrite if we already have a newer xmin. This can happen if we
1688 : * restart decoding in a slot.
1689 : */
1690 722 : if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
1691 : {
1692 : }
1693 :
1694 : /*
1695 : * If the client has already confirmed up to this lsn, we directly can
1696 : * mark this as accepted. This can happen if we restart decoding in a
1697 : * slot.
1698 : */
1699 180 : else if (current_lsn <= slot->data.confirmed_flush)
1700 : {
1701 88 : slot->candidate_catalog_xmin = xmin;
1702 88 : slot->candidate_xmin_lsn = current_lsn;
1703 :
1704 : /* our candidate can directly be used */
1705 88 : updated_xmin = true;
1706 : }
1707 :
1708 : /*
1709 : * Only increase if the previous values have been applied, otherwise we
1710 : * might never end up updating if the receiver acks too slowly.
1711 : */
1712 92 : else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
1713 : {
1714 44 : slot->candidate_catalog_xmin = xmin;
1715 44 : slot->candidate_xmin_lsn = current_lsn;
1716 :
1717 : /*
1718 : * Log new xmin at an appropriate log level after releasing the
1719 : * spinlock.
1720 : */
1721 44 : got_new_xmin = true;
1722 : }
1723 722 : SpinLockRelease(&slot->mutex);
1724 :
1725 722 : if (got_new_xmin)
1726 44 : elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
1727 : LSN_FORMAT_ARGS(current_lsn));
1728 :
1729 : /* candidate already valid with the current flush position, apply */
1730 722 : if (updated_xmin)
1731 88 : LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
1732 722 : }
1733 :
1734 : /*
1735 : * Mark the minimal LSN (restart_lsn) we need to read to replay all
1736 : * transactions that have not yet committed at current_lsn.
1737 : *
1738 : * Just like LogicalIncreaseXminForSlot this only takes effect when the
1739 : * client has confirmed to have received current_lsn.
1740 : */
1741 : void
1742 628 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
1743 : {
1744 628 : bool updated_lsn = false;
1745 : ReplicationSlot *slot;
1746 :
1747 628 : slot = MyReplicationSlot;
1748 :
1749 : Assert(slot != NULL);
1750 : Assert(restart_lsn != InvalidXLogRecPtr);
1751 : Assert(current_lsn != InvalidXLogRecPtr);
1752 :
1753 628 : SpinLockAcquire(&slot->mutex);
1754 :
1755 : /* don't overwrite if have a newer restart lsn */
1756 628 : if (restart_lsn <= slot->data.restart_lsn)
1757 : {
1758 18 : SpinLockRelease(&slot->mutex);
1759 : }
1760 :
1761 : /*
1762 : * We might have already flushed far enough to directly accept this lsn,
1763 : * in this case there is no need to check for existing candidate LSNs
1764 : */
1765 610 : else if (current_lsn <= slot->data.confirmed_flush)
1766 : {
1767 476 : slot->candidate_restart_valid = current_lsn;
1768 476 : slot->candidate_restart_lsn = restart_lsn;
1769 476 : SpinLockRelease(&slot->mutex);
1770 :
1771 : /* our candidate can directly be used */
1772 476 : updated_lsn = true;
1773 : }
1774 :
1775 : /*
1776 : * Only increase if the previous values have been applied, otherwise we
1777 : * might never end up updating if the receiver acks too slowly. A missed
1778 : * value here will just cause some extra effort after reconnecting.
1779 : */
1780 134 : else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
1781 : {
1782 70 : slot->candidate_restart_valid = current_lsn;
1783 70 : slot->candidate_restart_lsn = restart_lsn;
1784 70 : SpinLockRelease(&slot->mutex);
1785 :
1786 70 : elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
1787 : LSN_FORMAT_ARGS(restart_lsn),
1788 : LSN_FORMAT_ARGS(current_lsn));
1789 : }
1790 : else
1791 : {
1792 : XLogRecPtr candidate_restart_lsn;
1793 : XLogRecPtr candidate_restart_valid;
1794 : XLogRecPtr confirmed_flush;
1795 :
1796 64 : candidate_restart_lsn = slot->candidate_restart_lsn;
1797 64 : candidate_restart_valid = slot->candidate_restart_valid;
1798 64 : confirmed_flush = slot->data.confirmed_flush;
1799 64 : SpinLockRelease(&slot->mutex);
1800 :
1801 64 : elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
1802 : LSN_FORMAT_ARGS(restart_lsn),
1803 : LSN_FORMAT_ARGS(current_lsn),
1804 : LSN_FORMAT_ARGS(candidate_restart_lsn),
1805 : LSN_FORMAT_ARGS(candidate_restart_valid),
1806 : LSN_FORMAT_ARGS(confirmed_flush));
1807 : }
1808 :
1809 : /* candidates are already valid with the current flush position, apply */
1810 628 : if (updated_lsn)
1811 476 : LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
1812 628 : }
1813 :
1814 : /*
1815 : * Handle a consumer's confirmation having received all changes up to lsn.
1816 : */
1817 : void
1818 48298 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
1819 : {
1820 : Assert(lsn != InvalidXLogRecPtr);
1821 :
1822 : /* Do an unlocked check for candidate_lsn first. */
1823 48298 : if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
1824 48172 : MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
1825 632 : {
1826 632 : bool updated_xmin = false;
1827 632 : bool updated_restart = false;
1828 :
1829 632 : SpinLockAcquire(&MyReplicationSlot->mutex);
1830 :
1831 632 : MyReplicationSlot->data.confirmed_flush = lsn;
1832 :
1833 : /* if we're past the location required for bumping xmin, do so */
1834 632 : if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
1835 126 : MyReplicationSlot->candidate_xmin_lsn <= lsn)
1836 : {
1837 : /*
1838 : * We have to write the changed xmin to disk *before* we change
1839 : * the in-memory value, otherwise after a crash we wouldn't know
1840 : * that some catalog tuples might have been removed already.
1841 : *
1842 : * Ensure that by first writing to ->xmin and only update
1843 : * ->effective_xmin once the new state is synced to disk. After a
1844 : * crash ->effective_xmin is set to ->xmin.
1845 : */
1846 122 : if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
1847 122 : MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
1848 : {
1849 122 : MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
1850 122 : MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
1851 122 : MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
1852 122 : updated_xmin = true;
1853 : }
1854 : }
1855 :
1856 632 : if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
1857 544 : MyReplicationSlot->candidate_restart_valid <= lsn)
1858 : {
1859 : Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
1860 :
1861 538 : MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
1862 538 : MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
1863 538 : MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
1864 538 : updated_restart = true;
1865 : }
1866 :
1867 632 : SpinLockRelease(&MyReplicationSlot->mutex);
1868 :
1869 : /* first write new xmin to disk, so we know what's up after a crash */
1870 632 : if (updated_xmin || updated_restart)
1871 : {
1872 626 : ReplicationSlotMarkDirty();
1873 626 : ReplicationSlotSave();
1874 626 : elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
1875 : }
1876 :
1877 : /*
1878 : * Now the new xmin is safely on disk, we can let the global value
1879 : * advance. We do not take ProcArrayLock or similar since we only
1880 : * advance xmin here and there's not much harm done by a concurrent
1881 : * computation missing that.
1882 : */
1883 632 : if (updated_xmin)
1884 : {
1885 122 : SpinLockAcquire(&MyReplicationSlot->mutex);
1886 122 : MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
1887 122 : SpinLockRelease(&MyReplicationSlot->mutex);
1888 :
1889 122 : ReplicationSlotsComputeRequiredXmin(false);
1890 122 : ReplicationSlotsComputeRequiredLSN();
1891 : }
1892 : }
1893 : else
1894 : {
1895 47666 : SpinLockAcquire(&MyReplicationSlot->mutex);
1896 47666 : MyReplicationSlot->data.confirmed_flush = lsn;
1897 47666 : SpinLockRelease(&MyReplicationSlot->mutex);
1898 : }
1899 48298 : }
1900 :
1901 : /*
1902 : * Clear logical streaming state during (sub)transaction abort.
1903 : */
1904 : void
1905 58198 : ResetLogicalStreamingState(void)
1906 : {
1907 58198 : CheckXidAlive = InvalidTransactionId;
1908 58198 : bsysscan = false;
1909 58198 : }
1910 :
1911 : /*
1912 : * Report stats for a slot.
1913 : */
1914 : void
1915 12760 : UpdateDecodingStats(LogicalDecodingContext *ctx)
1916 : {
1917 12760 : ReorderBuffer *rb = ctx->reorder;
1918 : PgStat_StatReplSlotEntry repSlotStat;
1919 :
1920 : /* Nothing to do if we don't have any replication stats to be sent. */
1921 12760 : if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
1922 508 : return;
1923 :
1924 12252 : elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
1925 : rb,
1926 : rb->spillTxns,
1927 : rb->spillCount,
1928 : rb->spillBytes,
1929 : rb->streamTxns,
1930 : rb->streamCount,
1931 : rb->streamBytes,
1932 : rb->totalTxns,
1933 : rb->totalBytes);
1934 :
1935 12252 : repSlotStat.spill_txns = rb->spillTxns;
1936 12252 : repSlotStat.spill_count = rb->spillCount;
1937 12252 : repSlotStat.spill_bytes = rb->spillBytes;
1938 12252 : repSlotStat.stream_txns = rb->streamTxns;
1939 12252 : repSlotStat.stream_count = rb->streamCount;
1940 12252 : repSlotStat.stream_bytes = rb->streamBytes;
1941 12252 : repSlotStat.total_txns = rb->totalTxns;
1942 12252 : repSlotStat.total_bytes = rb->totalBytes;
1943 :
1944 12252 : pgstat_report_replslot(ctx->slot, &repSlotStat);
1945 :
1946 12252 : rb->spillTxns = 0;
1947 12252 : rb->spillCount = 0;
1948 12252 : rb->spillBytes = 0;
1949 12252 : rb->streamTxns = 0;
1950 12252 : rb->streamCount = 0;
1951 12252 : rb->streamBytes = 0;
1952 12252 : rb->totalTxns = 0;
1953 12252 : rb->totalBytes = 0;
1954 : }
1955 :
1956 : /*
1957 : * Read up to the end of WAL starting from the decoding slot's restart_lsn.
1958 : * Return true if any meaningful/decodable WAL records are encountered,
1959 : * otherwise false.
1960 : */
1961 : bool
1962 10 : LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
1963 : {
1964 10 : bool has_pending_wal = false;
1965 :
1966 : Assert(MyReplicationSlot);
1967 :
1968 10 : PG_TRY();
1969 : {
1970 : LogicalDecodingContext *ctx;
1971 :
1972 : /*
1973 : * Create our decoding context in fast_forward mode, passing start_lsn
1974 : * as InvalidXLogRecPtr, so that we start processing from the slot's
1975 : * confirmed_flush.
1976 : */
1977 20 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
1978 : NIL,
1979 : true, /* fast_forward */
1980 10 : XL_ROUTINE(.page_read = read_local_xlog_page,
1981 : .segment_open = wal_segment_open,
1982 : .segment_close = wal_segment_close),
1983 : NULL, NULL, NULL);
1984 :
1985 : /*
1986 : * Start reading at the slot's restart_lsn, which we know points to a
1987 : * valid record.
1988 : */
1989 10 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
1990 :
1991 : /* Invalidate non-timetravel entries */
1992 10 : InvalidateSystemCaches();
1993 :
1994 : /* Loop until the end of WAL or some changes are processed */
1995 300 : while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
1996 : {
1997 : XLogRecord *record;
1998 290 : char *errm = NULL;
1999 :
2000 290 : record = XLogReadRecord(ctx->reader, &errm);
2001 :
2002 290 : if (errm)
2003 0 : elog(ERROR, "could not find record for logical decoding: %s", errm);
2004 :
2005 290 : if (record != NULL)
2006 290 : LogicalDecodingProcessRecord(ctx, ctx->reader);
2007 :
2008 290 : has_pending_wal = ctx->processing_required;
2009 :
2010 290 : CHECK_FOR_INTERRUPTS();
2011 : }
2012 :
2013 : /* Clean up */
2014 10 : FreeDecodingContext(ctx);
2015 10 : InvalidateSystemCaches();
2016 : }
2017 0 : PG_CATCH();
2018 : {
2019 : /* clear all timetravel entries */
2020 0 : InvalidateSystemCaches();
2021 :
2022 0 : PG_RE_THROW();
2023 : }
2024 10 : PG_END_TRY();
2025 :
2026 10 : return has_pending_wal;
2027 : }
2028 :
2029 : /*
2030 : * Helper function for advancing our logical replication slot forward.
2031 : *
2032 : * The slot's restart_lsn is used as start point for reading records, while
2033 : * confirmed_flush is used as base point for the decoding context.
2034 : *
2035 : * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
2036 : * because we need to digest WAL to advance restart_lsn allowing to recycle
2037 : * WAL and removal of old catalog tuples. As decoding is done in fast_forward
2038 : * mode, no changes are generated anyway.
2039 : *
2040 : * *found_consistent_snapshot will be true if the initial decoding snapshot has
2041 : * been built; Otherwise, it will be false.
2042 : */
2043 : XLogRecPtr
2044 28 : LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
2045 : bool *found_consistent_snapshot)
2046 : {
2047 : LogicalDecodingContext *ctx;
2048 28 : ResourceOwner old_resowner = CurrentResourceOwner;
2049 : XLogRecPtr retlsn;
2050 :
2051 : Assert(moveto != InvalidXLogRecPtr);
2052 :
2053 28 : if (found_consistent_snapshot)
2054 10 : *found_consistent_snapshot = false;
2055 :
2056 28 : PG_TRY();
2057 : {
2058 : /*
2059 : * Create our decoding context in fast_forward mode, passing start_lsn
2060 : * as InvalidXLogRecPtr, so that we start processing from my slot's
2061 : * confirmed_flush.
2062 : */
2063 56 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
2064 : NIL,
2065 : true, /* fast_forward */
2066 28 : XL_ROUTINE(.page_read = read_local_xlog_page,
2067 : .segment_open = wal_segment_open,
2068 : .segment_close = wal_segment_close),
2069 : NULL, NULL, NULL);
2070 :
2071 : /*
2072 : * Wait for specified streaming replication standby servers (if any)
2073 : * to confirm receipt of WAL up to moveto lsn.
2074 : */
2075 28 : WaitForStandbyConfirmation(moveto);
2076 :
2077 : /*
2078 : * Start reading at the slot's restart_lsn, which we know to point to
2079 : * a valid record.
2080 : */
2081 28 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
2082 :
2083 : /* invalidate non-timetravel entries */
2084 28 : InvalidateSystemCaches();
2085 :
2086 : /* Decode records until we reach the requested target */
2087 3424 : while (ctx->reader->EndRecPtr < moveto)
2088 : {
2089 3396 : char *errm = NULL;
2090 : XLogRecord *record;
2091 :
2092 : /*
2093 : * Read records. No changes are generated in fast_forward mode,
2094 : * but snapbuilder/slot statuses are updated properly.
2095 : */
2096 3396 : record = XLogReadRecord(ctx->reader, &errm);
2097 3396 : if (errm)
2098 0 : elog(ERROR, "could not find record while advancing replication slot: %s",
2099 : errm);
2100 :
2101 : /*
2102 : * Process the record. Storage-level changes are ignored in
2103 : * fast_forward mode, but other modules (such as snapbuilder)
2104 : * might still have critical updates to do.
2105 : */
2106 3396 : if (record)
2107 3396 : LogicalDecodingProcessRecord(ctx, ctx->reader);
2108 :
2109 3396 : CHECK_FOR_INTERRUPTS();
2110 : }
2111 :
2112 28 : if (found_consistent_snapshot && DecodingContextReady(ctx))
2113 10 : *found_consistent_snapshot = true;
2114 :
2115 : /*
2116 : * Logical decoding could have clobbered CurrentResourceOwner during
2117 : * transaction management, so restore the executor's value. (This is
2118 : * a kluge, but it's not worth cleaning up right now.)
2119 : */
2120 28 : CurrentResourceOwner = old_resowner;
2121 :
2122 28 : if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
2123 : {
2124 28 : LogicalConfirmReceivedLocation(moveto);
2125 :
2126 : /*
2127 : * If only the confirmed_flush LSN has changed the slot won't get
2128 : * marked as dirty by the above. Callers on the walsender
2129 : * interface are expected to keep track of their own progress and
2130 : * don't need it written out. But SQL-interface users cannot
2131 : * specify their own start positions and it's harder for them to
2132 : * keep track of their progress, so we should make more of an
2133 : * effort to save it for them.
2134 : *
2135 : * Dirty the slot so it is written out at the next checkpoint. The
2136 : * LSN position advanced to may still be lost on a crash but this
2137 : * makes the data consistent after a clean shutdown.
2138 : */
2139 28 : ReplicationSlotMarkDirty();
2140 : }
2141 :
2142 28 : retlsn = MyReplicationSlot->data.confirmed_flush;
2143 :
2144 : /* free context, call shutdown callback */
2145 28 : FreeDecodingContext(ctx);
2146 :
2147 28 : InvalidateSystemCaches();
2148 : }
2149 0 : PG_CATCH();
2150 : {
2151 : /* clear all timetravel entries */
2152 0 : InvalidateSystemCaches();
2153 :
2154 0 : PG_RE_THROW();
2155 : }
2156 28 : PG_END_TRY();
2157 :
2158 28 : return retlsn;
2159 : }
|