Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * logical.c
3 : * PostgreSQL logical decoding coordination
4 : *
5 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/logical.c
9 : *
10 : * NOTES
11 : * This file coordinates interaction between the various modules that
12 : * together provide logical decoding, primarily by providing so
13 : * called LogicalDecodingContexts. The goal is to encapsulate most of the
14 : * internal complexity for consumers of logical decoding, so they can
15 : * create and consume a changestream with a low amount of code. Builtin
16 : * consumers are the walsender and SQL SRF interface, but it's possible to
17 : * add further ones without changing core code, e.g. to consume changes in
18 : * a bgworker.
19 : *
20 : * The idea is that a consumer provides three callbacks, one to read WAL,
21 : * one to prepare a data write, and a final one for actually writing since
22 : * their implementation depends on the type of consumer. Check
23 : * logicalfuncs.c for an example implementation of a fairly simple consumer
24 : * and an implementation of a WAL reading callback that's suitable for
25 : * simple consumers.
26 : *-------------------------------------------------------------------------
27 : */
28 :
29 : #include "postgres.h"
30 :
31 : #include "access/xact.h"
32 : #include "access/xlogutils.h"
33 : #include "fmgr.h"
34 : #include "miscadmin.h"
35 : #include "pgstat.h"
36 : #include "replication/decode.h"
37 : #include "replication/logical.h"
38 : #include "replication/reorderbuffer.h"
39 : #include "replication/slotsync.h"
40 : #include "replication/snapbuild.h"
41 : #include "storage/proc.h"
42 : #include "storage/procarray.h"
43 : #include "utils/builtins.h"
44 : #include "utils/inval.h"
45 : #include "utils/memutils.h"
46 :
47 : /* data for errcontext callback */
48 : typedef struct LogicalErrorCallbackState
49 : {
50 : LogicalDecodingContext *ctx;
51 : const char *callback_name;
52 : XLogRecPtr report_location;
53 : } LogicalErrorCallbackState;
54 :
55 : /* wrappers around output plugin callbacks */
56 : static void output_plugin_error_callback(void *arg);
57 : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
58 : bool is_init);
59 : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
60 : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
61 : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
62 : XLogRecPtr commit_lsn);
63 : static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
64 : static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
65 : XLogRecPtr prepare_lsn);
66 : static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
67 : XLogRecPtr commit_lsn);
68 : static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
69 : XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
70 : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
71 : Relation relation, ReorderBufferChange *change);
72 : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
73 : int nrelations, Relation relations[], ReorderBufferChange *change);
74 : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
75 : XLogRecPtr message_lsn, bool transactional,
76 : const char *prefix, Size message_size, const char *message);
77 :
78 : /* streaming callbacks */
79 : static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
80 : XLogRecPtr first_lsn);
81 : static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
82 : XLogRecPtr last_lsn);
83 : static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
84 : XLogRecPtr abort_lsn);
85 : static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
86 : XLogRecPtr prepare_lsn);
87 : static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
88 : XLogRecPtr commit_lsn);
89 : static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
90 : Relation relation, ReorderBufferChange *change);
91 : static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
92 : XLogRecPtr message_lsn, bool transactional,
93 : const char *prefix, Size message_size, const char *message);
94 : static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
95 : int nrelations, Relation relations[], ReorderBufferChange *change);
96 :
97 : /* callback to update txn's progress */
98 : static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
99 : ReorderBufferTXN *txn,
100 : XLogRecPtr lsn);
101 :
102 : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
103 :
104 : /*
105 : * Make sure the current settings & environment are capable of doing logical
106 : * decoding.
107 : */
108 : void
109 2876 : CheckLogicalDecodingRequirements(void)
110 : {
111 2876 : CheckSlotRequirements();
112 :
113 : /*
114 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
115 : * needs the same check.
116 : */
117 :
118 2876 : if (wal_level < WAL_LEVEL_LOGICAL)
119 0 : ereport(ERROR,
120 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
121 : errmsg("logical decoding requires \"wal_level\" >= \"logical\"")));
122 :
123 2876 : if (MyDatabaseId == InvalidOid)
124 2 : ereport(ERROR,
125 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
126 : errmsg("logical decoding requires a database connection")));
127 :
128 2874 : if (RecoveryInProgress())
129 : {
130 : /*
131 : * This check may have race conditions, but whenever
132 : * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
133 : * verify that there are no existing logical replication slots. And to
134 : * avoid races around creating a new slot,
135 : * CheckLogicalDecodingRequirements() is called once before creating
136 : * the slot, and once when logical decoding is initially starting up.
137 : */
138 132 : if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
139 2 : ereport(ERROR,
140 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
141 : errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
142 : }
143 2872 : }
144 :
145 : /*
146 : * Helper function for CreateInitDecodingContext() and
147 : * CreateDecodingContext() performing common tasks.
148 : */
149 : static LogicalDecodingContext *
150 2014 : StartupDecodingContext(List *output_plugin_options,
151 : XLogRecPtr start_lsn,
152 : TransactionId xmin_horizon,
153 : bool need_full_snapshot,
154 : bool fast_forward,
155 : bool in_create,
156 : XLogReaderRoutine *xl_routine,
157 : LogicalOutputPluginWriterPrepareWrite prepare_write,
158 : LogicalOutputPluginWriterWrite do_write,
159 : LogicalOutputPluginWriterUpdateProgress update_progress)
160 : {
161 : ReplicationSlot *slot;
162 : MemoryContext context,
163 : old_context;
164 : LogicalDecodingContext *ctx;
165 :
166 : /* shorter lines... */
167 2014 : slot = MyReplicationSlot;
168 :
169 2014 : context = AllocSetContextCreate(CurrentMemoryContext,
170 : "Logical decoding context",
171 : ALLOCSET_DEFAULT_SIZES);
172 2014 : old_context = MemoryContextSwitchTo(context);
173 2014 : ctx = palloc0(sizeof(LogicalDecodingContext));
174 :
175 2014 : ctx->context = context;
176 :
177 : /*
178 : * (re-)load output plugins, so we detect a bad (removed) output plugin
179 : * now.
180 : */
181 2014 : if (!fast_forward)
182 1978 : LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
183 :
184 : /*
185 : * Now that the slot's xmin has been set, we can announce ourselves as a
186 : * logical decoding backend which doesn't need to be checked individually
187 : * when computing the xmin horizon because the xmin is enforced via
188 : * replication slots.
189 : *
190 : * We can only do so if we're outside of a transaction (i.e. the case when
191 : * streaming changes via walsender), otherwise an already setup
192 : * snapshot/xid would end up being ignored. That's not a particularly
193 : * bothersome restriction since the SQL interface can't be used for
194 : * streaming anyway.
195 : */
196 2012 : if (!IsTransactionOrTransactionBlock())
197 : {
198 966 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
199 966 : MyProc->statusFlags |= PROC_IN_LOGICAL_DECODING;
200 966 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
201 966 : LWLockRelease(ProcArrayLock);
202 : }
203 :
204 2012 : ctx->slot = slot;
205 :
206 2012 : ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
207 2012 : if (!ctx->reader)
208 0 : ereport(ERROR,
209 : (errcode(ERRCODE_OUT_OF_MEMORY),
210 : errmsg("out of memory"),
211 : errdetail("Failed while allocating a WAL reading processor.")));
212 :
213 2012 : ctx->reorder = ReorderBufferAllocate();
214 2012 : ctx->snapshot_builder =
215 2012 : AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
216 : need_full_snapshot, in_create, slot->data.two_phase_at);
217 :
218 2012 : ctx->reorder->private_data = ctx;
219 :
220 : /* wrap output plugin callbacks, so we can add error context information */
221 2012 : ctx->reorder->begin = begin_cb_wrapper;
222 2012 : ctx->reorder->apply_change = change_cb_wrapper;
223 2012 : ctx->reorder->apply_truncate = truncate_cb_wrapper;
224 2012 : ctx->reorder->commit = commit_cb_wrapper;
225 2012 : ctx->reorder->message = message_cb_wrapper;
226 :
227 : /*
228 : * To support streaming, we require start/stop/abort/commit/change
229 : * callbacks. The message and truncate callbacks are optional, similar to
230 : * regular output plugins. We however enable streaming when at least one
231 : * of the methods is enabled so that we can easily identify missing
232 : * methods.
233 : *
234 : * We decide it here, but only check it later in the wrappers.
235 : */
236 4060 : ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
237 36 : (ctx->callbacks.stream_stop_cb != NULL) ||
238 36 : (ctx->callbacks.stream_abort_cb != NULL) ||
239 36 : (ctx->callbacks.stream_commit_cb != NULL) ||
240 36 : (ctx->callbacks.stream_change_cb != NULL) ||
241 2084 : (ctx->callbacks.stream_message_cb != NULL) ||
242 36 : (ctx->callbacks.stream_truncate_cb != NULL);
243 :
244 : /*
245 : * streaming callbacks
246 : *
247 : * stream_message and stream_truncate callbacks are optional, so we do not
248 : * fail with ERROR when missing, but the wrappers simply do nothing. We
249 : * must set the ReorderBuffer callbacks to something, otherwise the calls
250 : * from there will crash (we don't want to move the checks there).
251 : */
252 2012 : ctx->reorder->stream_start = stream_start_cb_wrapper;
253 2012 : ctx->reorder->stream_stop = stream_stop_cb_wrapper;
254 2012 : ctx->reorder->stream_abort = stream_abort_cb_wrapper;
255 2012 : ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
256 2012 : ctx->reorder->stream_commit = stream_commit_cb_wrapper;
257 2012 : ctx->reorder->stream_change = stream_change_cb_wrapper;
258 2012 : ctx->reorder->stream_message = stream_message_cb_wrapper;
259 2012 : ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
260 :
261 :
262 : /*
263 : * To support two-phase logical decoding, we require
264 : * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
265 : * filter_prepare callback is optional. We however enable two-phase
266 : * logical decoding when at least one of the methods is enabled so that we
267 : * can easily identify missing methods.
268 : *
269 : * We decide it here, but only check it later in the wrappers.
270 : */
271 4060 : ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
272 36 : (ctx->callbacks.prepare_cb != NULL) ||
273 36 : (ctx->callbacks.commit_prepared_cb != NULL) ||
274 36 : (ctx->callbacks.rollback_prepared_cb != NULL) ||
275 2084 : (ctx->callbacks.stream_prepare_cb != NULL) ||
276 36 : (ctx->callbacks.filter_prepare_cb != NULL);
277 :
278 : /*
279 : * Callback to support decoding at prepare time.
280 : */
281 2012 : ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
282 2012 : ctx->reorder->prepare = prepare_cb_wrapper;
283 2012 : ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
284 2012 : ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
285 :
286 : /*
287 : * Callback to support updating progress during sending data of a
288 : * transaction (and its subtransactions) to the output plugin.
289 : */
290 2012 : ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
291 :
292 2012 : ctx->out = makeStringInfo();
293 2012 : ctx->prepare_write = prepare_write;
294 2012 : ctx->write = do_write;
295 2012 : ctx->update_progress = update_progress;
296 :
297 2012 : ctx->output_plugin_options = output_plugin_options;
298 :
299 2012 : ctx->fast_forward = fast_forward;
300 :
301 2012 : MemoryContextSwitchTo(old_context);
302 :
303 2012 : return ctx;
304 : }
305 :
306 : /*
307 : * Create a new decoding context, for a new logical slot.
308 : *
309 : * plugin -- contains the name of the output plugin
310 : * output_plugin_options -- contains options passed to the output plugin
311 : * need_full_snapshot -- if true, must obtain a snapshot able to read all
312 : * tables; if false, one that can read only catalogs is acceptable.
313 : * restart_lsn -- if given as invalid, it's this routine's responsibility to
314 : * mark WAL as reserved by setting a convenient restart_lsn for the slot.
315 : * Otherwise, we set for decoding to start from the given LSN without
316 : * marking WAL reserved beforehand. In that scenario, it's up to the
317 : * caller to guarantee that WAL remains available.
318 : * xl_routine -- XLogReaderRoutine for underlying XLogReader
319 : * prepare_write, do_write, update_progress --
320 : * callbacks that perform the use-case dependent, actual, work.
321 : *
322 : * Needs to be called while in a memory context that's at least as long lived
323 : * as the decoding context because further memory contexts will be created
324 : * inside it.
325 : *
326 : * Returns an initialized decoding context after calling the output plugin's
327 : * startup function.
328 : */
329 : LogicalDecodingContext *
330 856 : CreateInitDecodingContext(const char *plugin,
331 : List *output_plugin_options,
332 : bool need_full_snapshot,
333 : XLogRecPtr restart_lsn,
334 : XLogReaderRoutine *xl_routine,
335 : LogicalOutputPluginWriterPrepareWrite prepare_write,
336 : LogicalOutputPluginWriterWrite do_write,
337 : LogicalOutputPluginWriterUpdateProgress update_progress)
338 : {
339 856 : TransactionId xmin_horizon = InvalidTransactionId;
340 : ReplicationSlot *slot;
341 : NameData plugin_name;
342 : LogicalDecodingContext *ctx;
343 : MemoryContext old_context;
344 :
345 : /*
346 : * On a standby, this check is also required while creating the slot.
347 : * Check the comments in the function.
348 : */
349 856 : CheckLogicalDecodingRequirements();
350 :
351 : /* shorter lines... */
352 856 : slot = MyReplicationSlot;
353 :
354 : /* first some sanity checks that are unlikely to be violated */
355 856 : if (slot == NULL)
356 0 : elog(ERROR, "cannot perform logical decoding without an acquired slot");
357 :
358 856 : if (plugin == NULL)
359 0 : elog(ERROR, "cannot initialize logical decoding without a specified plugin");
360 :
361 : /* Make sure the passed slot is suitable. These are user facing errors. */
362 856 : if (SlotIsPhysical(slot))
363 0 : ereport(ERROR,
364 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
365 : errmsg("cannot use physical replication slot for logical decoding")));
366 :
367 856 : if (slot->data.database != MyDatabaseId)
368 0 : ereport(ERROR,
369 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
370 : errmsg("replication slot \"%s\" was not created in this database",
371 : NameStr(slot->data.name))));
372 :
373 1460 : if (IsTransactionState() &&
374 604 : GetTopTransactionIdIfAny() != InvalidTransactionId)
375 4 : ereport(ERROR,
376 : (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
377 : errmsg("cannot create logical replication slot in transaction that has performed writes")));
378 :
379 : /*
380 : * Register output plugin name with slot. We need the mutex to avoid
381 : * concurrent reading of a partially copied string. But we don't want any
382 : * complicated code while holding a spinlock, so do namestrcpy() outside.
383 : */
384 852 : namestrcpy(&plugin_name, plugin);
385 852 : SpinLockAcquire(&slot->mutex);
386 852 : slot->data.plugin = plugin_name;
387 852 : SpinLockRelease(&slot->mutex);
388 :
389 852 : if (XLogRecPtrIsInvalid(restart_lsn))
390 840 : ReplicationSlotReserveWal();
391 : else
392 : {
393 12 : SpinLockAcquire(&slot->mutex);
394 12 : slot->data.restart_lsn = restart_lsn;
395 12 : SpinLockRelease(&slot->mutex);
396 : }
397 :
398 : /* ----
399 : * This is a bit tricky: We need to determine a safe xmin horizon to start
400 : * decoding from, to avoid starting from a running xacts record referring
401 : * to xids whose rows have been vacuumed or pruned
402 : * already. GetOldestSafeDecodingTransactionId() returns such a value, but
403 : * without further interlock its return value might immediately be out of
404 : * date.
405 : *
406 : * So we have to acquire the ProcArrayLock to prevent computation of new
407 : * xmin horizons by other backends, get the safe decoding xid, and inform
408 : * the slot machinery about the new limit. Once that's done the
409 : * ProcArrayLock can be released as the slot machinery now is
410 : * protecting against vacuum.
411 : *
412 : * Note that, temporarily, the data, not just the catalog, xmin has to be
413 : * reserved if a data snapshot is to be exported. Otherwise the initial
414 : * data snapshot created here is not guaranteed to be valid. After that
415 : * the data xmin doesn't need to be managed anymore and the global xmin
416 : * should be recomputed. As we are fine with losing the pegged data xmin
417 : * after crash - no chance a snapshot would get exported anymore - we can
418 : * get away with just setting the slot's
419 : * effective_xmin. ReplicationSlotRelease will reset it again.
420 : *
421 : * ----
422 : */
423 852 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
424 :
425 852 : xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
426 :
427 852 : SpinLockAcquire(&slot->mutex);
428 852 : slot->effective_catalog_xmin = xmin_horizon;
429 852 : slot->data.catalog_xmin = xmin_horizon;
430 852 : if (need_full_snapshot)
431 368 : slot->effective_xmin = xmin_horizon;
432 852 : SpinLockRelease(&slot->mutex);
433 :
434 852 : ReplicationSlotsComputeRequiredXmin(true);
435 :
436 852 : LWLockRelease(ProcArrayLock);
437 :
438 852 : ReplicationSlotMarkDirty();
439 852 : ReplicationSlotSave();
440 :
441 852 : ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
442 : need_full_snapshot, false, true,
443 : xl_routine, prepare_write, do_write,
444 : update_progress);
445 :
446 : /* call output plugin initialization callback */
447 850 : old_context = MemoryContextSwitchTo(ctx->context);
448 850 : if (ctx->callbacks.startup_cb != NULL)
449 850 : startup_cb_wrapper(ctx, &ctx->options, true);
450 850 : MemoryContextSwitchTo(old_context);
451 :
452 : /*
453 : * We allow decoding of prepared transactions when the two_phase is
454 : * enabled at the time of slot creation, or when the two_phase option is
455 : * given at the streaming start, provided the plugin supports all the
456 : * callbacks for two-phase.
457 : */
458 850 : ctx->twophase &= slot->data.two_phase;
459 :
460 850 : ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
461 :
462 850 : return ctx;
463 : }
464 :
465 : /*
466 : * Create a new decoding context, for a logical slot that has previously been
467 : * used already.
468 : *
469 : * start_lsn
470 : * The LSN at which to start decoding. If InvalidXLogRecPtr, restart
471 : * from the slot's confirmed_flush; otherwise, start from the specified
472 : * location (but move it forwards to confirmed_flush if it's older than
473 : * that, see below).
474 : *
475 : * output_plugin_options
476 : * options passed to the output plugin.
477 : *
478 : * fast_forward
479 : * bypass the generation of logical changes.
480 : *
481 : * xl_routine
482 : * XLogReaderRoutine used by underlying xlogreader
483 : *
484 : * prepare_write, do_write, update_progress
485 : * callbacks that have to be filled to perform the use-case dependent,
486 : * actual work.
487 : *
488 : * Needs to be called while in a memory context that's at least as long lived
489 : * as the decoding context because further memory contexts will be created
490 : * inside it.
491 : *
492 : * Returns an initialized decoding context after calling the output plugin's
493 : * startup function.
494 : */
495 : LogicalDecodingContext *
496 1182 : CreateDecodingContext(XLogRecPtr start_lsn,
497 : List *output_plugin_options,
498 : bool fast_forward,
499 : XLogReaderRoutine *xl_routine,
500 : LogicalOutputPluginWriterPrepareWrite prepare_write,
501 : LogicalOutputPluginWriterWrite do_write,
502 : LogicalOutputPluginWriterUpdateProgress update_progress)
503 : {
504 : LogicalDecodingContext *ctx;
505 : ReplicationSlot *slot;
506 : MemoryContext old_context;
507 :
508 : /* shorter lines... */
509 1182 : slot = MyReplicationSlot;
510 :
511 : /* first some sanity checks that are unlikely to be violated */
512 1182 : if (slot == NULL)
513 0 : elog(ERROR, "cannot perform logical decoding without an acquired slot");
514 :
515 : /* make sure the passed slot is suitable, these are user facing errors */
516 1182 : if (SlotIsPhysical(slot))
517 2 : ereport(ERROR,
518 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
519 : errmsg("cannot use physical replication slot for logical decoding")));
520 :
521 : /*
522 : * We need to access the system tables during decoding to build the
523 : * logical changes unless we are in fast_forward mode where no changes are
524 : * generated.
525 : */
526 1180 : if (slot->data.database != MyDatabaseId && !fast_forward)
527 6 : ereport(ERROR,
528 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
529 : errmsg("replication slot \"%s\" was not created in this database",
530 : NameStr(slot->data.name))));
531 :
532 : /*
533 : * The slots being synced from the primary can't be used for decoding as
534 : * they are used after failover. However, we do allow advancing the LSNs
535 : * during the synchronization of slots. See update_local_synced_slot.
536 : */
537 1174 : if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
538 2 : ereport(ERROR,
539 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
540 : errmsg("cannot use replication slot \"%s\" for logical decoding",
541 : NameStr(slot->data.name)),
542 : errdetail("This replication slot is being synchronized from the primary server."),
543 : errhint("Specify another replication slot."));
544 :
545 : /*
546 : * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
547 : * "cannot get changes" wording in this errmsg because that'd be
548 : * confusingly ambiguous about no changes being available when called from
549 : * pg_logical_slot_get_changes_guts().
550 : */
551 1172 : if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
552 0 : ereport(ERROR,
553 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
554 : errmsg("can no longer get changes from replication slot \"%s\"",
555 : NameStr(MyReplicationSlot->data.name)),
556 : errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
557 :
558 1172 : if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
559 10 : ereport(ERROR,
560 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
561 : errmsg("can no longer get changes from replication slot \"%s\"",
562 : NameStr(MyReplicationSlot->data.name)),
563 : errdetail("This slot has been invalidated because it was conflicting with recovery.")));
564 :
565 : Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
566 : Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
567 :
568 1162 : if (start_lsn == InvalidXLogRecPtr)
569 : {
570 : /* continue from last position */
571 702 : start_lsn = slot->data.confirmed_flush;
572 : }
573 460 : else if (start_lsn < slot->data.confirmed_flush)
574 : {
575 : /*
576 : * It might seem like we should error out in this case, but it's
577 : * pretty common for a client to acknowledge a LSN it doesn't have to
578 : * do anything for, and thus didn't store persistently, because the
579 : * xlog records didn't result in anything relevant for logical
580 : * decoding. Clients have to be able to do that to support synchronous
581 : * replication.
582 : *
583 : * Starting at a different LSN than requested might not catch certain
584 : * kinds of client errors; so the client may wish to check that
585 : * confirmed_flush_lsn matches its expectations.
586 : */
587 34 : elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
588 : LSN_FORMAT_ARGS(start_lsn),
589 : LSN_FORMAT_ARGS(slot->data.confirmed_flush));
590 :
591 34 : start_lsn = slot->data.confirmed_flush;
592 : }
593 :
594 1162 : ctx = StartupDecodingContext(output_plugin_options,
595 : start_lsn, InvalidTransactionId, false,
596 : fast_forward, false, xl_routine, prepare_write,
597 : do_write, update_progress);
598 :
599 : /* call output plugin initialization callback */
600 1162 : old_context = MemoryContextSwitchTo(ctx->context);
601 1162 : if (ctx->callbacks.startup_cb != NULL)
602 1126 : startup_cb_wrapper(ctx, &ctx->options, false);
603 1156 : MemoryContextSwitchTo(old_context);
604 :
605 : /*
606 : * We allow decoding of prepared transactions when the two_phase is
607 : * enabled at the time of slot creation, or when the two_phase option is
608 : * given at the streaming start, provided the plugin supports all the
609 : * callbacks for two-phase.
610 : */
611 1156 : ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
612 :
613 : /* Mark slot to allow two_phase decoding if not already marked */
614 1156 : if (ctx->twophase && !slot->data.two_phase)
615 : {
616 14 : SpinLockAcquire(&slot->mutex);
617 14 : slot->data.two_phase = true;
618 14 : slot->data.two_phase_at = start_lsn;
619 14 : SpinLockRelease(&slot->mutex);
620 14 : ReplicationSlotMarkDirty();
621 14 : ReplicationSlotSave();
622 14 : SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
623 : }
624 :
625 1156 : ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
626 :
627 1156 : ereport(LOG,
628 : (errmsg("starting logical decoding for slot \"%s\"",
629 : NameStr(slot->data.name)),
630 : errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
631 : LSN_FORMAT_ARGS(slot->data.confirmed_flush),
632 : LSN_FORMAT_ARGS(slot->data.restart_lsn))));
633 :
634 1156 : return ctx;
635 : }
636 :
637 : /*
638 : * Returns true if a consistent initial decoding snapshot has been built.
639 : */
640 : bool
641 922 : DecodingContextReady(LogicalDecodingContext *ctx)
642 : {
643 922 : return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
644 : }
645 :
646 : /*
647 : * Read from the decoding slot, until it is ready to start extracting changes.
648 : */
649 : void
650 838 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
651 : {
652 838 : ReplicationSlot *slot = ctx->slot;
653 :
654 : /* Initialize from where to start reading WAL. */
655 838 : XLogBeginRead(ctx->reader, slot->data.restart_lsn);
656 :
657 838 : elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
658 : LSN_FORMAT_ARGS(slot->data.restart_lsn));
659 :
660 : /* Wait for a consistent starting point */
661 : for (;;)
662 78 : {
663 : XLogRecord *record;
664 916 : char *err = NULL;
665 :
666 : /* the read_page callback waits for new WAL */
667 916 : record = XLogReadRecord(ctx->reader, &err);
668 916 : if (err)
669 0 : elog(ERROR, "could not find logical decoding starting point: %s", err);
670 916 : if (!record)
671 0 : elog(ERROR, "could not find logical decoding starting point");
672 :
673 916 : LogicalDecodingProcessRecord(ctx, ctx->reader);
674 :
675 : /* only continue till we found a consistent spot */
676 912 : if (DecodingContextReady(ctx))
677 834 : break;
678 :
679 78 : CHECK_FOR_INTERRUPTS();
680 : }
681 :
682 834 : SpinLockAcquire(&slot->mutex);
683 834 : slot->data.confirmed_flush = ctx->reader->EndRecPtr;
684 834 : if (slot->data.two_phase)
685 12 : slot->data.two_phase_at = ctx->reader->EndRecPtr;
686 834 : SpinLockRelease(&slot->mutex);
687 834 : }
688 :
689 : /*
690 : * Free a previously allocated decoding context, invoking the shutdown
691 : * callback if necessary.
692 : */
693 : void
694 1648 : FreeDecodingContext(LogicalDecodingContext *ctx)
695 : {
696 1648 : if (ctx->callbacks.shutdown_cb != NULL)
697 1612 : shutdown_cb_wrapper(ctx);
698 :
699 1648 : ReorderBufferFree(ctx->reorder);
700 1648 : FreeSnapshotBuilder(ctx->snapshot_builder);
701 1648 : XLogReaderFree(ctx->reader);
702 1648 : MemoryContextDelete(ctx->context);
703 1648 : }
704 :
705 : /*
706 : * Prepare a write using the context's output routine.
707 : */
708 : void
709 672150 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
710 : {
711 672150 : if (!ctx->accept_writes)
712 0 : elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
713 :
714 672150 : ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
715 672150 : ctx->prepared_write = true;
716 672150 : }
717 :
718 : /*
719 : * Perform a write using the context's output routine.
720 : */
721 : void
722 672150 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
723 : {
724 672150 : if (!ctx->prepared_write)
725 0 : elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
726 :
727 672150 : ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
728 672148 : ctx->prepared_write = false;
729 672148 : }
730 :
731 : /*
732 : * Update progress tracking (if supported).
733 : */
734 : void
735 7862 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
736 : bool skipped_xact)
737 : {
738 7862 : if (!ctx->update_progress)
739 3162 : return;
740 :
741 4700 : ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
742 : skipped_xact);
743 : }
744 :
745 : /*
746 : * Load the output plugin, lookup its output plugin init function, and check
747 : * that it provides the required callbacks.
748 : */
749 : static void
750 1978 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
751 : {
752 : LogicalOutputPluginInit plugin_init;
753 :
754 1976 : plugin_init = (LogicalOutputPluginInit)
755 1978 : load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
756 :
757 1976 : if (plugin_init == NULL)
758 0 : elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
759 :
760 : /* ask the output plugin to fill the callback struct */
761 1976 : plugin_init(callbacks);
762 :
763 1976 : if (callbacks->begin_cb == NULL)
764 0 : elog(ERROR, "output plugins have to register a begin callback");
765 1976 : if (callbacks->change_cb == NULL)
766 0 : elog(ERROR, "output plugins have to register a change callback");
767 1976 : if (callbacks->commit_cb == NULL)
768 0 : elog(ERROR, "output plugins have to register a commit callback");
769 1976 : }
770 :
771 : static void
772 16 : output_plugin_error_callback(void *arg)
773 : {
774 16 : LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
775 :
776 : /* not all callbacks have an associated LSN */
777 16 : if (state->report_location != InvalidXLogRecPtr)
778 10 : errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
779 10 : NameStr(state->ctx->slot->data.name),
780 10 : NameStr(state->ctx->slot->data.plugin),
781 : state->callback_name,
782 10 : LSN_FORMAT_ARGS(state->report_location));
783 : else
784 6 : errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
785 6 : NameStr(state->ctx->slot->data.name),
786 6 : NameStr(state->ctx->slot->data.plugin),
787 : state->callback_name);
788 16 : }
789 :
790 : static void
791 1976 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
792 : {
793 : LogicalErrorCallbackState state;
794 : ErrorContextCallback errcallback;
795 :
796 : Assert(!ctx->fast_forward);
797 :
798 : /* Push callback + info on the error context stack */
799 1976 : state.ctx = ctx;
800 1976 : state.callback_name = "startup";
801 1976 : state.report_location = InvalidXLogRecPtr;
802 1976 : errcallback.callback = output_plugin_error_callback;
803 1976 : errcallback.arg = &state;
804 1976 : errcallback.previous = error_context_stack;
805 1976 : error_context_stack = &errcallback;
806 :
807 : /* set output state */
808 1976 : ctx->accept_writes = false;
809 1976 : ctx->end_xact = false;
810 :
811 : /* do the actual work: call callback */
812 1976 : ctx->callbacks.startup_cb(ctx, opt, is_init);
813 :
814 : /* Pop the error context stack */
815 1970 : error_context_stack = errcallback.previous;
816 1970 : }
817 :
818 : static void
819 1612 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
820 : {
821 : LogicalErrorCallbackState state;
822 : ErrorContextCallback errcallback;
823 :
824 : Assert(!ctx->fast_forward);
825 :
826 : /* Push callback + info on the error context stack */
827 1612 : state.ctx = ctx;
828 1612 : state.callback_name = "shutdown";
829 1612 : state.report_location = InvalidXLogRecPtr;
830 1612 : errcallback.callback = output_plugin_error_callback;
831 1612 : errcallback.arg = &state;
832 1612 : errcallback.previous = error_context_stack;
833 1612 : error_context_stack = &errcallback;
834 :
835 : /* set output state */
836 1612 : ctx->accept_writes = false;
837 1612 : ctx->end_xact = false;
838 :
839 : /* do the actual work: call callback */
840 1612 : ctx->callbacks.shutdown_cb(ctx);
841 :
842 : /* Pop the error context stack */
843 1612 : error_context_stack = errcallback.previous;
844 1612 : }
845 :
846 :
847 : /*
848 : * Callbacks for ReorderBuffer which add in some more information and then call
849 : * output_plugin.h plugins.
850 : */
851 : static void
852 2308 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
853 : {
854 2308 : LogicalDecodingContext *ctx = cache->private_data;
855 : LogicalErrorCallbackState state;
856 : ErrorContextCallback errcallback;
857 :
858 : Assert(!ctx->fast_forward);
859 :
860 : /* Push callback + info on the error context stack */
861 2308 : state.ctx = ctx;
862 2308 : state.callback_name = "begin";
863 2308 : state.report_location = txn->first_lsn;
864 2308 : errcallback.callback = output_plugin_error_callback;
865 2308 : errcallback.arg = &state;
866 2308 : errcallback.previous = error_context_stack;
867 2308 : error_context_stack = &errcallback;
868 :
869 : /* set output state */
870 2308 : ctx->accept_writes = true;
871 2308 : ctx->write_xid = txn->xid;
872 2308 : ctx->write_location = txn->first_lsn;
873 2308 : ctx->end_xact = false;
874 :
875 : /* do the actual work: call callback */
876 2308 : ctx->callbacks.begin_cb(ctx, txn);
877 :
878 : /* Pop the error context stack */
879 2308 : error_context_stack = errcallback.previous;
880 2308 : }
881 :
882 : static void
883 2300 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
884 : XLogRecPtr commit_lsn)
885 : {
886 2300 : LogicalDecodingContext *ctx = cache->private_data;
887 : LogicalErrorCallbackState state;
888 : ErrorContextCallback errcallback;
889 :
890 : Assert(!ctx->fast_forward);
891 :
892 : /* Push callback + info on the error context stack */
893 2300 : state.ctx = ctx;
894 2300 : state.callback_name = "commit";
895 2300 : state.report_location = txn->final_lsn; /* beginning of commit record */
896 2300 : errcallback.callback = output_plugin_error_callback;
897 2300 : errcallback.arg = &state;
898 2300 : errcallback.previous = error_context_stack;
899 2300 : error_context_stack = &errcallback;
900 :
901 : /* set output state */
902 2300 : ctx->accept_writes = true;
903 2300 : ctx->write_xid = txn->xid;
904 2300 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
905 2300 : ctx->end_xact = true;
906 :
907 : /* do the actual work: call callback */
908 2300 : ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
909 :
910 : /* Pop the error context stack */
911 2300 : error_context_stack = errcallback.previous;
912 2300 : }
913 :
914 : /*
915 : * The functionality of begin_prepare is quite similar to begin with the
916 : * exception that this will have gid (global transaction id) information which
917 : * can be used by plugin. Now, we thought about extending the existing begin
918 : * but that would break the replication protocol and additionally this looks
919 : * cleaner.
920 : */
921 : static void
922 56 : begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
923 : {
924 56 : LogicalDecodingContext *ctx = cache->private_data;
925 : LogicalErrorCallbackState state;
926 : ErrorContextCallback errcallback;
927 :
928 : Assert(!ctx->fast_forward);
929 :
930 : /* We're only supposed to call this when two-phase commits are supported */
931 : Assert(ctx->twophase);
932 :
933 : /* Push callback + info on the error context stack */
934 56 : state.ctx = ctx;
935 56 : state.callback_name = "begin_prepare";
936 56 : state.report_location = txn->first_lsn;
937 56 : errcallback.callback = output_plugin_error_callback;
938 56 : errcallback.arg = &state;
939 56 : errcallback.previous = error_context_stack;
940 56 : error_context_stack = &errcallback;
941 :
942 : /* set output state */
943 56 : ctx->accept_writes = true;
944 56 : ctx->write_xid = txn->xid;
945 56 : ctx->write_location = txn->first_lsn;
946 56 : ctx->end_xact = false;
947 :
948 : /*
949 : * If the plugin supports two-phase commits then begin prepare callback is
950 : * mandatory
951 : */
952 56 : if (ctx->callbacks.begin_prepare_cb == NULL)
953 0 : ereport(ERROR,
954 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
955 : errmsg("logical replication at prepare time requires a %s callback",
956 : "begin_prepare_cb")));
957 :
958 : /* do the actual work: call callback */
959 56 : ctx->callbacks.begin_prepare_cb(ctx, txn);
960 :
961 : /* Pop the error context stack */
962 56 : error_context_stack = errcallback.previous;
963 56 : }
964 :
965 : static void
966 56 : prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
967 : XLogRecPtr prepare_lsn)
968 : {
969 56 : LogicalDecodingContext *ctx = cache->private_data;
970 : LogicalErrorCallbackState state;
971 : ErrorContextCallback errcallback;
972 :
973 : Assert(!ctx->fast_forward);
974 :
975 : /* We're only supposed to call this when two-phase commits are supported */
976 : Assert(ctx->twophase);
977 :
978 : /* Push callback + info on the error context stack */
979 56 : state.ctx = ctx;
980 56 : state.callback_name = "prepare";
981 56 : state.report_location = txn->final_lsn; /* beginning of prepare record */
982 56 : errcallback.callback = output_plugin_error_callback;
983 56 : errcallback.arg = &state;
984 56 : errcallback.previous = error_context_stack;
985 56 : error_context_stack = &errcallback;
986 :
987 : /* set output state */
988 56 : ctx->accept_writes = true;
989 56 : ctx->write_xid = txn->xid;
990 56 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
991 56 : ctx->end_xact = true;
992 :
993 : /*
994 : * If the plugin supports two-phase commits then prepare callback is
995 : * mandatory
996 : */
997 56 : if (ctx->callbacks.prepare_cb == NULL)
998 0 : ereport(ERROR,
999 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1000 : errmsg("logical replication at prepare time requires a %s callback",
1001 : "prepare_cb")));
1002 :
1003 : /* do the actual work: call callback */
1004 56 : ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
1005 :
1006 : /* Pop the error context stack */
1007 56 : error_context_stack = errcallback.previous;
1008 56 : }
1009 :
1010 : static void
1011 64 : commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1012 : XLogRecPtr commit_lsn)
1013 : {
1014 64 : LogicalDecodingContext *ctx = cache->private_data;
1015 : LogicalErrorCallbackState state;
1016 : ErrorContextCallback errcallback;
1017 :
1018 : Assert(!ctx->fast_forward);
1019 :
1020 : /* We're only supposed to call this when two-phase commits are supported */
1021 : Assert(ctx->twophase);
1022 :
1023 : /* Push callback + info on the error context stack */
1024 64 : state.ctx = ctx;
1025 64 : state.callback_name = "commit_prepared";
1026 64 : state.report_location = txn->final_lsn; /* beginning of commit record */
1027 64 : errcallback.callback = output_plugin_error_callback;
1028 64 : errcallback.arg = &state;
1029 64 : errcallback.previous = error_context_stack;
1030 64 : error_context_stack = &errcallback;
1031 :
1032 : /* set output state */
1033 64 : ctx->accept_writes = true;
1034 64 : ctx->write_xid = txn->xid;
1035 64 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
1036 64 : ctx->end_xact = true;
1037 :
1038 : /*
1039 : * If the plugin support two-phase commits then commit prepared callback
1040 : * is mandatory
1041 : */
1042 64 : if (ctx->callbacks.commit_prepared_cb == NULL)
1043 0 : ereport(ERROR,
1044 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1045 : errmsg("logical replication at prepare time requires a %s callback",
1046 : "commit_prepared_cb")));
1047 :
1048 : /* do the actual work: call callback */
1049 64 : ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
1050 :
1051 : /* Pop the error context stack */
1052 64 : error_context_stack = errcallback.previous;
1053 64 : }
1054 :
1055 : static void
1056 20 : rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1057 : XLogRecPtr prepare_end_lsn,
1058 : TimestampTz prepare_time)
1059 : {
1060 20 : LogicalDecodingContext *ctx = cache->private_data;
1061 : LogicalErrorCallbackState state;
1062 : ErrorContextCallback errcallback;
1063 :
1064 : Assert(!ctx->fast_forward);
1065 :
1066 : /* We're only supposed to call this when two-phase commits are supported */
1067 : Assert(ctx->twophase);
1068 :
1069 : /* Push callback + info on the error context stack */
1070 20 : state.ctx = ctx;
1071 20 : state.callback_name = "rollback_prepared";
1072 20 : state.report_location = txn->final_lsn; /* beginning of commit record */
1073 20 : errcallback.callback = output_plugin_error_callback;
1074 20 : errcallback.arg = &state;
1075 20 : errcallback.previous = error_context_stack;
1076 20 : error_context_stack = &errcallback;
1077 :
1078 : /* set output state */
1079 20 : ctx->accept_writes = true;
1080 20 : ctx->write_xid = txn->xid;
1081 20 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
1082 20 : ctx->end_xact = true;
1083 :
1084 : /*
1085 : * If the plugin support two-phase commits then rollback prepared callback
1086 : * is mandatory
1087 : */
1088 20 : if (ctx->callbacks.rollback_prepared_cb == NULL)
1089 0 : ereport(ERROR,
1090 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1091 : errmsg("logical replication at prepare time requires a %s callback",
1092 : "rollback_prepared_cb")));
1093 :
1094 : /* do the actual work: call callback */
1095 20 : ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
1096 : prepare_time);
1097 :
1098 : /* Pop the error context stack */
1099 20 : error_context_stack = errcallback.previous;
1100 20 : }
1101 :
1102 : static void
1103 315964 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1104 : Relation relation, ReorderBufferChange *change)
1105 : {
1106 315964 : LogicalDecodingContext *ctx = cache->private_data;
1107 : LogicalErrorCallbackState state;
1108 : ErrorContextCallback errcallback;
1109 :
1110 : Assert(!ctx->fast_forward);
1111 :
1112 : /* Push callback + info on the error context stack */
1113 315964 : state.ctx = ctx;
1114 315964 : state.callback_name = "change";
1115 315964 : state.report_location = change->lsn;
1116 315964 : errcallback.callback = output_plugin_error_callback;
1117 315964 : errcallback.arg = &state;
1118 315964 : errcallback.previous = error_context_stack;
1119 315964 : error_context_stack = &errcallback;
1120 :
1121 : /* set output state */
1122 315964 : ctx->accept_writes = true;
1123 315964 : ctx->write_xid = txn->xid;
1124 :
1125 : /*
1126 : * Report this change's lsn so replies from clients can give an up-to-date
1127 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1128 : * receipt of this transaction, but it might allow another transaction's
1129 : * commit to be confirmed with one message.
1130 : */
1131 315964 : ctx->write_location = change->lsn;
1132 :
1133 315964 : ctx->end_xact = false;
1134 :
1135 315964 : ctx->callbacks.change_cb(ctx, txn, relation, change);
1136 :
1137 : /* Pop the error context stack */
1138 315956 : error_context_stack = errcallback.previous;
1139 315956 : }
1140 :
1141 : static void
1142 46 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1143 : int nrelations, Relation relations[], ReorderBufferChange *change)
1144 : {
1145 46 : LogicalDecodingContext *ctx = cache->private_data;
1146 : LogicalErrorCallbackState state;
1147 : ErrorContextCallback errcallback;
1148 :
1149 : Assert(!ctx->fast_forward);
1150 :
1151 46 : if (!ctx->callbacks.truncate_cb)
1152 0 : return;
1153 :
1154 : /* Push callback + info on the error context stack */
1155 46 : state.ctx = ctx;
1156 46 : state.callback_name = "truncate";
1157 46 : state.report_location = change->lsn;
1158 46 : errcallback.callback = output_plugin_error_callback;
1159 46 : errcallback.arg = &state;
1160 46 : errcallback.previous = error_context_stack;
1161 46 : error_context_stack = &errcallback;
1162 :
1163 : /* set output state */
1164 46 : ctx->accept_writes = true;
1165 46 : ctx->write_xid = txn->xid;
1166 :
1167 : /*
1168 : * Report this change's lsn so replies from clients can give an up-to-date
1169 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1170 : * receipt of this transaction, but it might allow another transaction's
1171 : * commit to be confirmed with one message.
1172 : */
1173 46 : ctx->write_location = change->lsn;
1174 :
1175 46 : ctx->end_xact = false;
1176 :
1177 46 : ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
1178 :
1179 : /* Pop the error context stack */
1180 46 : error_context_stack = errcallback.previous;
1181 : }
1182 :
1183 : bool
1184 292 : filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
1185 : const char *gid)
1186 : {
1187 : LogicalErrorCallbackState state;
1188 : ErrorContextCallback errcallback;
1189 : bool ret;
1190 :
1191 : Assert(!ctx->fast_forward);
1192 :
1193 : /* Push callback + info on the error context stack */
1194 292 : state.ctx = ctx;
1195 292 : state.callback_name = "filter_prepare";
1196 292 : state.report_location = InvalidXLogRecPtr;
1197 292 : errcallback.callback = output_plugin_error_callback;
1198 292 : errcallback.arg = &state;
1199 292 : errcallback.previous = error_context_stack;
1200 292 : error_context_stack = &errcallback;
1201 :
1202 : /* set output state */
1203 292 : ctx->accept_writes = false;
1204 292 : ctx->end_xact = false;
1205 :
1206 : /* do the actual work: call callback */
1207 292 : ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
1208 :
1209 : /* Pop the error context stack */
1210 292 : error_context_stack = errcallback.previous;
1211 :
1212 292 : return ret;
1213 : }
1214 :
1215 : bool
1216 3373174 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
1217 : {
1218 : LogicalErrorCallbackState state;
1219 : ErrorContextCallback errcallback;
1220 : bool ret;
1221 :
1222 : Assert(!ctx->fast_forward);
1223 :
1224 : /* Push callback + info on the error context stack */
1225 3373174 : state.ctx = ctx;
1226 3373174 : state.callback_name = "filter_by_origin";
1227 3373174 : state.report_location = InvalidXLogRecPtr;
1228 3373174 : errcallback.callback = output_plugin_error_callback;
1229 3373174 : errcallback.arg = &state;
1230 3373174 : errcallback.previous = error_context_stack;
1231 3373174 : error_context_stack = &errcallback;
1232 :
1233 : /* set output state */
1234 3373174 : ctx->accept_writes = false;
1235 3373174 : ctx->end_xact = false;
1236 :
1237 : /* do the actual work: call callback */
1238 3373174 : ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
1239 :
1240 : /* Pop the error context stack */
1241 3373174 : error_context_stack = errcallback.previous;
1242 :
1243 3373174 : return ret;
1244 : }
1245 :
1246 : static void
1247 32 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1248 : XLogRecPtr message_lsn, bool transactional,
1249 : const char *prefix, Size message_size, const char *message)
1250 : {
1251 32 : LogicalDecodingContext *ctx = cache->private_data;
1252 : LogicalErrorCallbackState state;
1253 : ErrorContextCallback errcallback;
1254 :
1255 : Assert(!ctx->fast_forward);
1256 :
1257 32 : if (ctx->callbacks.message_cb == NULL)
1258 0 : return;
1259 :
1260 : /* Push callback + info on the error context stack */
1261 32 : state.ctx = ctx;
1262 32 : state.callback_name = "message";
1263 32 : state.report_location = message_lsn;
1264 32 : errcallback.callback = output_plugin_error_callback;
1265 32 : errcallback.arg = &state;
1266 32 : errcallback.previous = error_context_stack;
1267 32 : error_context_stack = &errcallback;
1268 :
1269 : /* set output state */
1270 32 : ctx->accept_writes = true;
1271 32 : ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
1272 32 : ctx->write_location = message_lsn;
1273 32 : ctx->end_xact = false;
1274 :
1275 : /* do the actual work: call callback */
1276 32 : ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
1277 : message_size, message);
1278 :
1279 : /* Pop the error context stack */
1280 32 : error_context_stack = errcallback.previous;
1281 : }
1282 :
1283 : static void
1284 1344 : stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1285 : XLogRecPtr first_lsn)
1286 : {
1287 1344 : LogicalDecodingContext *ctx = cache->private_data;
1288 : LogicalErrorCallbackState state;
1289 : ErrorContextCallback errcallback;
1290 :
1291 : Assert(!ctx->fast_forward);
1292 :
1293 : /* We're only supposed to call this when streaming is supported. */
1294 : Assert(ctx->streaming);
1295 :
1296 : /* Push callback + info on the error context stack */
1297 1344 : state.ctx = ctx;
1298 1344 : state.callback_name = "stream_start";
1299 1344 : state.report_location = first_lsn;
1300 1344 : errcallback.callback = output_plugin_error_callback;
1301 1344 : errcallback.arg = &state;
1302 1344 : errcallback.previous = error_context_stack;
1303 1344 : error_context_stack = &errcallback;
1304 :
1305 : /* set output state */
1306 1344 : ctx->accept_writes = true;
1307 1344 : ctx->write_xid = txn->xid;
1308 :
1309 : /*
1310 : * Report this message's lsn so replies from clients can give an
1311 : * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1312 : * confirm receipt of this transaction, but it might allow another
1313 : * transaction's commit to be confirmed with one message.
1314 : */
1315 1344 : ctx->write_location = first_lsn;
1316 :
1317 1344 : ctx->end_xact = false;
1318 :
1319 : /* in streaming mode, stream_start_cb is required */
1320 1344 : if (ctx->callbacks.stream_start_cb == NULL)
1321 0 : ereport(ERROR,
1322 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1323 : errmsg("logical streaming requires a %s callback",
1324 : "stream_start_cb")));
1325 :
1326 1344 : ctx->callbacks.stream_start_cb(ctx, txn);
1327 :
1328 : /* Pop the error context stack */
1329 1344 : error_context_stack = errcallback.previous;
1330 1344 : }
1331 :
1332 : static void
1333 1344 : stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1334 : XLogRecPtr last_lsn)
1335 : {
1336 1344 : LogicalDecodingContext *ctx = cache->private_data;
1337 : LogicalErrorCallbackState state;
1338 : ErrorContextCallback errcallback;
1339 :
1340 : Assert(!ctx->fast_forward);
1341 :
1342 : /* We're only supposed to call this when streaming is supported. */
1343 : Assert(ctx->streaming);
1344 :
1345 : /* Push callback + info on the error context stack */
1346 1344 : state.ctx = ctx;
1347 1344 : state.callback_name = "stream_stop";
1348 1344 : state.report_location = last_lsn;
1349 1344 : errcallback.callback = output_plugin_error_callback;
1350 1344 : errcallback.arg = &state;
1351 1344 : errcallback.previous = error_context_stack;
1352 1344 : error_context_stack = &errcallback;
1353 :
1354 : /* set output state */
1355 1344 : ctx->accept_writes = true;
1356 1344 : ctx->write_xid = txn->xid;
1357 :
1358 : /*
1359 : * Report this message's lsn so replies from clients can give an
1360 : * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1361 : * confirm receipt of this transaction, but it might allow another
1362 : * transaction's commit to be confirmed with one message.
1363 : */
1364 1344 : ctx->write_location = last_lsn;
1365 :
1366 1344 : ctx->end_xact = false;
1367 :
1368 : /* in streaming mode, stream_stop_cb is required */
1369 1344 : if (ctx->callbacks.stream_stop_cb == NULL)
1370 0 : ereport(ERROR,
1371 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1372 : errmsg("logical streaming requires a %s callback",
1373 : "stream_stop_cb")));
1374 :
1375 1344 : ctx->callbacks.stream_stop_cb(ctx, txn);
1376 :
1377 : /* Pop the error context stack */
1378 1344 : error_context_stack = errcallback.previous;
1379 1344 : }
1380 :
1381 : static void
1382 60 : stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1383 : XLogRecPtr abort_lsn)
1384 : {
1385 60 : LogicalDecodingContext *ctx = cache->private_data;
1386 : LogicalErrorCallbackState state;
1387 : ErrorContextCallback errcallback;
1388 :
1389 : Assert(!ctx->fast_forward);
1390 :
1391 : /* We're only supposed to call this when streaming is supported. */
1392 : Assert(ctx->streaming);
1393 :
1394 : /* Push callback + info on the error context stack */
1395 60 : state.ctx = ctx;
1396 60 : state.callback_name = "stream_abort";
1397 60 : state.report_location = abort_lsn;
1398 60 : errcallback.callback = output_plugin_error_callback;
1399 60 : errcallback.arg = &state;
1400 60 : errcallback.previous = error_context_stack;
1401 60 : error_context_stack = &errcallback;
1402 :
1403 : /* set output state */
1404 60 : ctx->accept_writes = true;
1405 60 : ctx->write_xid = txn->xid;
1406 60 : ctx->write_location = abort_lsn;
1407 60 : ctx->end_xact = true;
1408 :
1409 : /* in streaming mode, stream_abort_cb is required */
1410 60 : if (ctx->callbacks.stream_abort_cb == NULL)
1411 0 : ereport(ERROR,
1412 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1413 : errmsg("logical streaming requires a %s callback",
1414 : "stream_abort_cb")));
1415 :
1416 60 : ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
1417 :
1418 : /* Pop the error context stack */
1419 60 : error_context_stack = errcallback.previous;
1420 60 : }
1421 :
1422 : static void
1423 30 : stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1424 : XLogRecPtr prepare_lsn)
1425 : {
1426 30 : LogicalDecodingContext *ctx = cache->private_data;
1427 : LogicalErrorCallbackState state;
1428 : ErrorContextCallback errcallback;
1429 :
1430 : Assert(!ctx->fast_forward);
1431 :
1432 : /*
1433 : * We're only supposed to call this when streaming and two-phase commits
1434 : * are supported.
1435 : */
1436 : Assert(ctx->streaming);
1437 : Assert(ctx->twophase);
1438 :
1439 : /* Push callback + info on the error context stack */
1440 30 : state.ctx = ctx;
1441 30 : state.callback_name = "stream_prepare";
1442 30 : state.report_location = txn->final_lsn;
1443 30 : errcallback.callback = output_plugin_error_callback;
1444 30 : errcallback.arg = &state;
1445 30 : errcallback.previous = error_context_stack;
1446 30 : error_context_stack = &errcallback;
1447 :
1448 : /* set output state */
1449 30 : ctx->accept_writes = true;
1450 30 : ctx->write_xid = txn->xid;
1451 30 : ctx->write_location = txn->end_lsn;
1452 30 : ctx->end_xact = true;
1453 :
1454 : /* in streaming mode with two-phase commits, stream_prepare_cb is required */
1455 30 : if (ctx->callbacks.stream_prepare_cb == NULL)
1456 0 : ereport(ERROR,
1457 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1458 : errmsg("logical streaming at prepare time requires a %s callback",
1459 : "stream_prepare_cb")));
1460 :
1461 30 : ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
1462 :
1463 : /* Pop the error context stack */
1464 30 : error_context_stack = errcallback.previous;
1465 30 : }
1466 :
1467 : static void
1468 102 : stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1469 : XLogRecPtr commit_lsn)
1470 : {
1471 102 : LogicalDecodingContext *ctx = cache->private_data;
1472 : LogicalErrorCallbackState state;
1473 : ErrorContextCallback errcallback;
1474 :
1475 : Assert(!ctx->fast_forward);
1476 :
1477 : /* We're only supposed to call this when streaming is supported. */
1478 : Assert(ctx->streaming);
1479 :
1480 : /* Push callback + info on the error context stack */
1481 102 : state.ctx = ctx;
1482 102 : state.callback_name = "stream_commit";
1483 102 : state.report_location = txn->final_lsn;
1484 102 : errcallback.callback = output_plugin_error_callback;
1485 102 : errcallback.arg = &state;
1486 102 : errcallback.previous = error_context_stack;
1487 102 : error_context_stack = &errcallback;
1488 :
1489 : /* set output state */
1490 102 : ctx->accept_writes = true;
1491 102 : ctx->write_xid = txn->xid;
1492 102 : ctx->write_location = txn->end_lsn;
1493 102 : ctx->end_xact = true;
1494 :
1495 : /* in streaming mode, stream_commit_cb is required */
1496 102 : if (ctx->callbacks.stream_commit_cb == NULL)
1497 0 : ereport(ERROR,
1498 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1499 : errmsg("logical streaming requires a %s callback",
1500 : "stream_commit_cb")));
1501 :
1502 102 : ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
1503 :
1504 : /* Pop the error context stack */
1505 102 : error_context_stack = errcallback.previous;
1506 102 : }
1507 :
1508 : static void
1509 352012 : stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1510 : Relation relation, ReorderBufferChange *change)
1511 : {
1512 352012 : LogicalDecodingContext *ctx = cache->private_data;
1513 : LogicalErrorCallbackState state;
1514 : ErrorContextCallback errcallback;
1515 :
1516 : Assert(!ctx->fast_forward);
1517 :
1518 : /* We're only supposed to call this when streaming is supported. */
1519 : Assert(ctx->streaming);
1520 :
1521 : /* Push callback + info on the error context stack */
1522 352012 : state.ctx = ctx;
1523 352012 : state.callback_name = "stream_change";
1524 352012 : state.report_location = change->lsn;
1525 352012 : errcallback.callback = output_plugin_error_callback;
1526 352012 : errcallback.arg = &state;
1527 352012 : errcallback.previous = error_context_stack;
1528 352012 : error_context_stack = &errcallback;
1529 :
1530 : /* set output state */
1531 352012 : ctx->accept_writes = true;
1532 352012 : ctx->write_xid = txn->xid;
1533 :
1534 : /*
1535 : * Report this change's lsn so replies from clients can give an up-to-date
1536 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1537 : * receipt of this transaction, but it might allow another transaction's
1538 : * commit to be confirmed with one message.
1539 : */
1540 352012 : ctx->write_location = change->lsn;
1541 :
1542 352012 : ctx->end_xact = false;
1543 :
1544 : /* in streaming mode, stream_change_cb is required */
1545 352012 : if (ctx->callbacks.stream_change_cb == NULL)
1546 0 : ereport(ERROR,
1547 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1548 : errmsg("logical streaming requires a %s callback",
1549 : "stream_change_cb")));
1550 :
1551 352012 : ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
1552 :
1553 : /* Pop the error context stack */
1554 352012 : error_context_stack = errcallback.previous;
1555 352012 : }
1556 :
1557 : static void
1558 6 : stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1559 : XLogRecPtr message_lsn, bool transactional,
1560 : const char *prefix, Size message_size, const char *message)
1561 : {
1562 6 : LogicalDecodingContext *ctx = cache->private_data;
1563 : LogicalErrorCallbackState state;
1564 : ErrorContextCallback errcallback;
1565 :
1566 : Assert(!ctx->fast_forward);
1567 :
1568 : /* We're only supposed to call this when streaming is supported. */
1569 : Assert(ctx->streaming);
1570 :
1571 : /* this callback is optional */
1572 6 : if (ctx->callbacks.stream_message_cb == NULL)
1573 0 : return;
1574 :
1575 : /* Push callback + info on the error context stack */
1576 6 : state.ctx = ctx;
1577 6 : state.callback_name = "stream_message";
1578 6 : state.report_location = message_lsn;
1579 6 : errcallback.callback = output_plugin_error_callback;
1580 6 : errcallback.arg = &state;
1581 6 : errcallback.previous = error_context_stack;
1582 6 : error_context_stack = &errcallback;
1583 :
1584 : /* set output state */
1585 6 : ctx->accept_writes = true;
1586 6 : ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
1587 6 : ctx->write_location = message_lsn;
1588 6 : ctx->end_xact = false;
1589 :
1590 : /* do the actual work: call callback */
1591 6 : ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
1592 : message_size, message);
1593 :
1594 : /* Pop the error context stack */
1595 6 : error_context_stack = errcallback.previous;
1596 : }
1597 :
1598 : static void
1599 0 : stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1600 : int nrelations, Relation relations[],
1601 : ReorderBufferChange *change)
1602 : {
1603 0 : LogicalDecodingContext *ctx = cache->private_data;
1604 : LogicalErrorCallbackState state;
1605 : ErrorContextCallback errcallback;
1606 :
1607 : Assert(!ctx->fast_forward);
1608 :
1609 : /* We're only supposed to call this when streaming is supported. */
1610 : Assert(ctx->streaming);
1611 :
1612 : /* this callback is optional */
1613 0 : if (!ctx->callbacks.stream_truncate_cb)
1614 0 : return;
1615 :
1616 : /* Push callback + info on the error context stack */
1617 0 : state.ctx = ctx;
1618 0 : state.callback_name = "stream_truncate";
1619 0 : state.report_location = change->lsn;
1620 0 : errcallback.callback = output_plugin_error_callback;
1621 0 : errcallback.arg = &state;
1622 0 : errcallback.previous = error_context_stack;
1623 0 : error_context_stack = &errcallback;
1624 :
1625 : /* set output state */
1626 0 : ctx->accept_writes = true;
1627 0 : ctx->write_xid = txn->xid;
1628 :
1629 : /*
1630 : * Report this change's lsn so replies from clients can give an up-to-date
1631 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1632 : * receipt of this transaction, but it might allow another transaction's
1633 : * commit to be confirmed with one message.
1634 : */
1635 0 : ctx->write_location = change->lsn;
1636 :
1637 0 : ctx->end_xact = false;
1638 :
1639 0 : ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
1640 :
1641 : /* Pop the error context stack */
1642 0 : error_context_stack = errcallback.previous;
1643 : }
1644 :
1645 : static void
1646 6214 : update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1647 : XLogRecPtr lsn)
1648 : {
1649 6214 : LogicalDecodingContext *ctx = cache->private_data;
1650 : LogicalErrorCallbackState state;
1651 : ErrorContextCallback errcallback;
1652 :
1653 : Assert(!ctx->fast_forward);
1654 :
1655 : /* Push callback + info on the error context stack */
1656 6214 : state.ctx = ctx;
1657 6214 : state.callback_name = "update_progress_txn";
1658 6214 : state.report_location = lsn;
1659 6214 : errcallback.callback = output_plugin_error_callback;
1660 6214 : errcallback.arg = &state;
1661 6214 : errcallback.previous = error_context_stack;
1662 6214 : error_context_stack = &errcallback;
1663 :
1664 : /* set output state */
1665 6214 : ctx->accept_writes = false;
1666 6214 : ctx->write_xid = txn->xid;
1667 :
1668 : /*
1669 : * Report this change's lsn so replies from clients can give an up-to-date
1670 : * answer. This won't ever be enough (and shouldn't be!) to confirm
1671 : * receipt of this transaction, but it might allow another transaction's
1672 : * commit to be confirmed with one message.
1673 : */
1674 6214 : ctx->write_location = lsn;
1675 :
1676 6214 : ctx->end_xact = false;
1677 :
1678 6214 : OutputPluginUpdateProgress(ctx, false);
1679 :
1680 : /* Pop the error context stack */
1681 6214 : error_context_stack = errcallback.previous;
1682 6214 : }
1683 :
1684 : /*
1685 : * Set the required catalog xmin horizon for historic snapshots in the current
1686 : * replication slot.
1687 : *
1688 : * Note that in the most cases, we won't be able to immediately use the xmin
1689 : * to increase the xmin horizon: we need to wait till the client has confirmed
1690 : * receiving current_lsn with LogicalConfirmReceivedLocation().
1691 : */
1692 : void
1693 716 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
1694 : {
1695 716 : bool updated_xmin = false;
1696 : ReplicationSlot *slot;
1697 716 : bool got_new_xmin = false;
1698 :
1699 716 : slot = MyReplicationSlot;
1700 :
1701 : Assert(slot != NULL);
1702 :
1703 716 : SpinLockAcquire(&slot->mutex);
1704 :
1705 : /*
1706 : * don't overwrite if we already have a newer xmin. This can happen if we
1707 : * restart decoding in a slot.
1708 : */
1709 716 : if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
1710 : {
1711 : }
1712 :
1713 : /*
1714 : * If the client has already confirmed up to this lsn, we directly can
1715 : * mark this as accepted. This can happen if we restart decoding in a
1716 : * slot.
1717 : */
1718 178 : else if (current_lsn <= slot->data.confirmed_flush)
1719 : {
1720 84 : slot->candidate_catalog_xmin = xmin;
1721 84 : slot->candidate_xmin_lsn = current_lsn;
1722 :
1723 : /* our candidate can directly be used */
1724 84 : updated_xmin = true;
1725 : }
1726 :
1727 : /*
1728 : * Only increase if the previous values have been applied, otherwise we
1729 : * might never end up updating if the receiver acks too slowly.
1730 : */
1731 94 : else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
1732 : {
1733 36 : slot->candidate_catalog_xmin = xmin;
1734 36 : slot->candidate_xmin_lsn = current_lsn;
1735 :
1736 : /*
1737 : * Log new xmin at an appropriate log level after releasing the
1738 : * spinlock.
1739 : */
1740 36 : got_new_xmin = true;
1741 : }
1742 716 : SpinLockRelease(&slot->mutex);
1743 :
1744 716 : if (got_new_xmin)
1745 36 : elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
1746 : LSN_FORMAT_ARGS(current_lsn));
1747 :
1748 : /* candidate already valid with the current flush position, apply */
1749 716 : if (updated_xmin)
1750 84 : LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
1751 716 : }
1752 :
1753 : /*
1754 : * Mark the minimal LSN (restart_lsn) we need to read to replay all
1755 : * transactions that have not yet committed at current_lsn.
1756 : *
1757 : * Just like LogicalIncreaseXminForSlot this only takes effect when the
1758 : * client has confirmed to have received current_lsn.
1759 : */
1760 : void
1761 620 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
1762 : {
1763 620 : bool updated_lsn = false;
1764 : ReplicationSlot *slot;
1765 :
1766 620 : slot = MyReplicationSlot;
1767 :
1768 : Assert(slot != NULL);
1769 : Assert(restart_lsn != InvalidXLogRecPtr);
1770 : Assert(current_lsn != InvalidXLogRecPtr);
1771 :
1772 620 : SpinLockAcquire(&slot->mutex);
1773 :
1774 : /* don't overwrite if have a newer restart lsn */
1775 620 : if (restart_lsn <= slot->data.restart_lsn)
1776 : {
1777 22 : SpinLockRelease(&slot->mutex);
1778 : }
1779 :
1780 : /*
1781 : * We might have already flushed far enough to directly accept this lsn,
1782 : * in this case there is no need to check for existing candidate LSNs
1783 : */
1784 598 : else if (current_lsn <= slot->data.confirmed_flush)
1785 : {
1786 462 : slot->candidate_restart_valid = current_lsn;
1787 462 : slot->candidate_restart_lsn = restart_lsn;
1788 462 : SpinLockRelease(&slot->mutex);
1789 :
1790 : /* our candidate can directly be used */
1791 462 : updated_lsn = true;
1792 : }
1793 :
1794 : /*
1795 : * Only increase if the previous values have been applied, otherwise we
1796 : * might never end up updating if the receiver acks too slowly. A missed
1797 : * value here will just cause some extra effort after reconnecting.
1798 : */
1799 136 : else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
1800 : {
1801 62 : slot->candidate_restart_valid = current_lsn;
1802 62 : slot->candidate_restart_lsn = restart_lsn;
1803 62 : SpinLockRelease(&slot->mutex);
1804 :
1805 62 : elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
1806 : LSN_FORMAT_ARGS(restart_lsn),
1807 : LSN_FORMAT_ARGS(current_lsn));
1808 : }
1809 : else
1810 : {
1811 : XLogRecPtr candidate_restart_lsn;
1812 : XLogRecPtr candidate_restart_valid;
1813 : XLogRecPtr confirmed_flush;
1814 :
1815 74 : candidate_restart_lsn = slot->candidate_restart_lsn;
1816 74 : candidate_restart_valid = slot->candidate_restart_valid;
1817 74 : confirmed_flush = slot->data.confirmed_flush;
1818 74 : SpinLockRelease(&slot->mutex);
1819 :
1820 74 : elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
1821 : LSN_FORMAT_ARGS(restart_lsn),
1822 : LSN_FORMAT_ARGS(current_lsn),
1823 : LSN_FORMAT_ARGS(candidate_restart_lsn),
1824 : LSN_FORMAT_ARGS(candidate_restart_valid),
1825 : LSN_FORMAT_ARGS(confirmed_flush));
1826 : }
1827 :
1828 : /* candidates are already valid with the current flush position, apply */
1829 620 : if (updated_lsn)
1830 462 : LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
1831 620 : }
1832 :
1833 : /*
1834 : * Handle a consumer's confirmation having received all changes up to lsn.
1835 : */
1836 : void
1837 47754 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
1838 : {
1839 : Assert(lsn != InvalidXLogRecPtr);
1840 :
1841 : /* Do an unlocked check for candidate_lsn first. */
1842 47754 : if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
1843 47630 : MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
1844 618 : {
1845 618 : bool updated_xmin = false;
1846 618 : bool updated_restart = false;
1847 :
1848 618 : SpinLockAcquire(&MyReplicationSlot->mutex);
1849 :
1850 618 : MyReplicationSlot->data.confirmed_flush = lsn;
1851 :
1852 : /* if we're past the location required for bumping xmin, do so */
1853 618 : if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
1854 124 : MyReplicationSlot->candidate_xmin_lsn <= lsn)
1855 : {
1856 : /*
1857 : * We have to write the changed xmin to disk *before* we change
1858 : * the in-memory value, otherwise after a crash we wouldn't know
1859 : * that some catalog tuples might have been removed already.
1860 : *
1861 : * Ensure that by first writing to ->xmin and only update
1862 : * ->effective_xmin once the new state is synced to disk. After a
1863 : * crash ->effective_xmin is set to ->xmin.
1864 : */
1865 110 : if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
1866 110 : MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
1867 : {
1868 110 : MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
1869 110 : MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
1870 110 : MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
1871 110 : updated_xmin = true;
1872 : }
1873 : }
1874 :
1875 618 : if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
1876 534 : MyReplicationSlot->candidate_restart_valid <= lsn)
1877 : {
1878 : Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
1879 :
1880 518 : MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
1881 518 : MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
1882 518 : MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
1883 518 : updated_restart = true;
1884 : }
1885 :
1886 618 : SpinLockRelease(&MyReplicationSlot->mutex);
1887 :
1888 : /* first write new xmin to disk, so we know what's up after a crash */
1889 618 : if (updated_xmin || updated_restart)
1890 : {
1891 602 : ReplicationSlotMarkDirty();
1892 602 : ReplicationSlotSave();
1893 602 : elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
1894 : }
1895 :
1896 : /*
1897 : * Now the new xmin is safely on disk, we can let the global value
1898 : * advance. We do not take ProcArrayLock or similar since we only
1899 : * advance xmin here and there's not much harm done by a concurrent
1900 : * computation missing that.
1901 : */
1902 618 : if (updated_xmin)
1903 : {
1904 110 : SpinLockAcquire(&MyReplicationSlot->mutex);
1905 110 : MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
1906 110 : SpinLockRelease(&MyReplicationSlot->mutex);
1907 :
1908 110 : ReplicationSlotsComputeRequiredXmin(false);
1909 110 : ReplicationSlotsComputeRequiredLSN();
1910 : }
1911 : }
1912 : else
1913 : {
1914 47136 : SpinLockAcquire(&MyReplicationSlot->mutex);
1915 47136 : MyReplicationSlot->data.confirmed_flush = lsn;
1916 47136 : SpinLockRelease(&MyReplicationSlot->mutex);
1917 : }
1918 47754 : }
1919 :
1920 : /*
1921 : * Clear logical streaming state during (sub)transaction abort.
1922 : */
1923 : void
1924 56380 : ResetLogicalStreamingState(void)
1925 : {
1926 56380 : CheckXidAlive = InvalidTransactionId;
1927 56380 : bsysscan = false;
1928 56380 : }
1929 :
1930 : /*
1931 : * Report stats for a slot.
1932 : */
1933 : void
1934 12072 : UpdateDecodingStats(LogicalDecodingContext *ctx)
1935 : {
1936 12072 : ReorderBuffer *rb = ctx->reorder;
1937 : PgStat_StatReplSlotEntry repSlotStat;
1938 :
1939 : /* Nothing to do if we don't have any replication stats to be sent. */
1940 12072 : if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
1941 504 : return;
1942 :
1943 11568 : elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
1944 : rb,
1945 : (long long) rb->spillTxns,
1946 : (long long) rb->spillCount,
1947 : (long long) rb->spillBytes,
1948 : (long long) rb->streamTxns,
1949 : (long long) rb->streamCount,
1950 : (long long) rb->streamBytes,
1951 : (long long) rb->totalTxns,
1952 : (long long) rb->totalBytes);
1953 :
1954 11568 : repSlotStat.spill_txns = rb->spillTxns;
1955 11568 : repSlotStat.spill_count = rb->spillCount;
1956 11568 : repSlotStat.spill_bytes = rb->spillBytes;
1957 11568 : repSlotStat.stream_txns = rb->streamTxns;
1958 11568 : repSlotStat.stream_count = rb->streamCount;
1959 11568 : repSlotStat.stream_bytes = rb->streamBytes;
1960 11568 : repSlotStat.total_txns = rb->totalTxns;
1961 11568 : repSlotStat.total_bytes = rb->totalBytes;
1962 :
1963 11568 : pgstat_report_replslot(ctx->slot, &repSlotStat);
1964 :
1965 11568 : rb->spillTxns = 0;
1966 11568 : rb->spillCount = 0;
1967 11568 : rb->spillBytes = 0;
1968 11568 : rb->streamTxns = 0;
1969 11568 : rb->streamCount = 0;
1970 11568 : rb->streamBytes = 0;
1971 11568 : rb->totalTxns = 0;
1972 11568 : rb->totalBytes = 0;
1973 : }
1974 :
1975 : /*
1976 : * Read up to the end of WAL starting from the decoding slot's restart_lsn.
1977 : * Return true if any meaningful/decodable WAL records are encountered,
1978 : * otherwise false.
1979 : */
1980 : bool
1981 10 : LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
1982 : {
1983 10 : bool has_pending_wal = false;
1984 :
1985 : Assert(MyReplicationSlot);
1986 :
1987 10 : PG_TRY();
1988 : {
1989 : LogicalDecodingContext *ctx;
1990 :
1991 : /*
1992 : * Create our decoding context in fast_forward mode, passing start_lsn
1993 : * as InvalidXLogRecPtr, so that we start processing from the slot's
1994 : * confirmed_flush.
1995 : */
1996 20 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
1997 : NIL,
1998 : true, /* fast_forward */
1999 10 : XL_ROUTINE(.page_read = read_local_xlog_page,
2000 : .segment_open = wal_segment_open,
2001 : .segment_close = wal_segment_close),
2002 : NULL, NULL, NULL);
2003 :
2004 : /*
2005 : * Start reading at the slot's restart_lsn, which we know points to a
2006 : * valid record.
2007 : */
2008 10 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
2009 :
2010 : /* Invalidate non-timetravel entries */
2011 10 : InvalidateSystemCaches();
2012 :
2013 : /* Loop until the end of WAL or some changes are processed */
2014 304 : while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
2015 : {
2016 : XLogRecord *record;
2017 294 : char *errm = NULL;
2018 :
2019 294 : record = XLogReadRecord(ctx->reader, &errm);
2020 :
2021 294 : if (errm)
2022 0 : elog(ERROR, "could not find record for logical decoding: %s", errm);
2023 :
2024 294 : if (record != NULL)
2025 294 : LogicalDecodingProcessRecord(ctx, ctx->reader);
2026 :
2027 294 : has_pending_wal = ctx->processing_required;
2028 :
2029 294 : CHECK_FOR_INTERRUPTS();
2030 : }
2031 :
2032 : /* Clean up */
2033 10 : FreeDecodingContext(ctx);
2034 10 : InvalidateSystemCaches();
2035 : }
2036 0 : PG_CATCH();
2037 : {
2038 : /* clear all timetravel entries */
2039 0 : InvalidateSystemCaches();
2040 :
2041 0 : PG_RE_THROW();
2042 : }
2043 10 : PG_END_TRY();
2044 :
2045 10 : return has_pending_wal;
2046 : }
2047 :
2048 : /*
2049 : * Helper function for advancing our logical replication slot forward.
2050 : *
2051 : * The slot's restart_lsn is used as start point for reading records, while
2052 : * confirmed_flush is used as base point for the decoding context.
2053 : *
2054 : * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
2055 : * because we need to digest WAL to advance restart_lsn allowing to recycle
2056 : * WAL and removal of old catalog tuples. As decoding is done in fast_forward
2057 : * mode, no changes are generated anyway.
2058 : *
2059 : * *found_consistent_snapshot will be true if the initial decoding snapshot has
2060 : * been built; Otherwise, it will be false.
2061 : */
2062 : XLogRecPtr
2063 26 : LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
2064 : bool *found_consistent_snapshot)
2065 : {
2066 : LogicalDecodingContext *ctx;
2067 26 : ResourceOwner old_resowner = CurrentResourceOwner;
2068 : XLogRecPtr retlsn;
2069 :
2070 : Assert(moveto != InvalidXLogRecPtr);
2071 :
2072 26 : if (found_consistent_snapshot)
2073 10 : *found_consistent_snapshot = false;
2074 :
2075 26 : PG_TRY();
2076 : {
2077 : /*
2078 : * Create our decoding context in fast_forward mode, passing start_lsn
2079 : * as InvalidXLogRecPtr, so that we start processing from my slot's
2080 : * confirmed_flush.
2081 : */
2082 52 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
2083 : NIL,
2084 : true, /* fast_forward */
2085 26 : XL_ROUTINE(.page_read = read_local_xlog_page,
2086 : .segment_open = wal_segment_open,
2087 : .segment_close = wal_segment_close),
2088 : NULL, NULL, NULL);
2089 :
2090 : /*
2091 : * Wait for specified streaming replication standby servers (if any)
2092 : * to confirm receipt of WAL up to moveto lsn.
2093 : */
2094 26 : WaitForStandbyConfirmation(moveto);
2095 :
2096 : /*
2097 : * Start reading at the slot's restart_lsn, which we know to point to
2098 : * a valid record.
2099 : */
2100 26 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
2101 :
2102 : /* invalidate non-timetravel entries */
2103 26 : InvalidateSystemCaches();
2104 :
2105 : /* Decode records until we reach the requested target */
2106 3262 : while (ctx->reader->EndRecPtr < moveto)
2107 : {
2108 3236 : char *errm = NULL;
2109 : XLogRecord *record;
2110 :
2111 : /*
2112 : * Read records. No changes are generated in fast_forward mode,
2113 : * but snapbuilder/slot statuses are updated properly.
2114 : */
2115 3236 : record = XLogReadRecord(ctx->reader, &errm);
2116 3236 : if (errm)
2117 0 : elog(ERROR, "could not find record while advancing replication slot: %s",
2118 : errm);
2119 :
2120 : /*
2121 : * Process the record. Storage-level changes are ignored in
2122 : * fast_forward mode, but other modules (such as snapbuilder)
2123 : * might still have critical updates to do.
2124 : */
2125 3236 : if (record)
2126 3236 : LogicalDecodingProcessRecord(ctx, ctx->reader);
2127 :
2128 3236 : CHECK_FOR_INTERRUPTS();
2129 : }
2130 :
2131 26 : if (found_consistent_snapshot && DecodingContextReady(ctx))
2132 10 : *found_consistent_snapshot = true;
2133 :
2134 : /*
2135 : * Logical decoding could have clobbered CurrentResourceOwner during
2136 : * transaction management, so restore the executor's value. (This is
2137 : * a kluge, but it's not worth cleaning up right now.)
2138 : */
2139 26 : CurrentResourceOwner = old_resowner;
2140 :
2141 26 : if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
2142 : {
2143 26 : LogicalConfirmReceivedLocation(moveto);
2144 :
2145 : /*
2146 : * If only the confirmed_flush LSN has changed the slot won't get
2147 : * marked as dirty by the above. Callers on the walsender
2148 : * interface are expected to keep track of their own progress and
2149 : * don't need it written out. But SQL-interface users cannot
2150 : * specify their own start positions and it's harder for them to
2151 : * keep track of their progress, so we should make more of an
2152 : * effort to save it for them.
2153 : *
2154 : * Dirty the slot so it is written out at the next checkpoint. The
2155 : * LSN position advanced to may still be lost on a crash but this
2156 : * makes the data consistent after a clean shutdown.
2157 : */
2158 26 : ReplicationSlotMarkDirty();
2159 : }
2160 :
2161 26 : retlsn = MyReplicationSlot->data.confirmed_flush;
2162 :
2163 : /* free context, call shutdown callback */
2164 26 : FreeDecodingContext(ctx);
2165 :
2166 26 : InvalidateSystemCaches();
2167 : }
2168 0 : PG_CATCH();
2169 : {
2170 : /* clear all timetravel entries */
2171 0 : InvalidateSystemCaches();
2172 :
2173 0 : PG_RE_THROW();
2174 : }
2175 26 : PG_END_TRY();
2176 :
2177 26 : return retlsn;
2178 : }
|