Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consists of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "utils/builtins.h"
92 : #include "utils/fmgroids.h"
93 : #include "utils/guc.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 :
99 : /* paths for replication origin checkpoint files */
100 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 :
103 : /* GUC variables */
104 : int max_active_replication_origins = 10;
105 :
106 : /*
107 : * Replay progress of a single remote node.
108 : */
109 : typedef struct ReplicationState
110 : {
111 : /*
112 : * Local identifier for the remote node.
113 : */
114 : ReplOriginId roident;
115 :
116 : /*
117 : * Location of the latest commit from the remote side.
118 : */
119 : XLogRecPtr remote_lsn;
120 :
121 : /*
122 : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : * during a checkpoint so we know the commit record actually is safe on
124 : * disk.
125 : */
126 : XLogRecPtr local_lsn;
127 :
128 : /*
129 : * PID of backend that's acquired slot, or 0 if none.
130 : */
131 : int acquired_by;
132 :
133 : /* Count of processes that are currently using this origin. */
134 : int refcount;
135 :
136 : /*
137 : * Condition variable that's signaled when acquired_by changes.
138 : */
139 : ConditionVariable origin_cv;
140 :
141 : /*
142 : * Lock protecting remote_lsn and local_lsn.
143 : */
144 : LWLock lock;
145 : } ReplicationState;
146 :
147 : /*
148 : * On disk version of ReplicationState.
149 : */
150 : typedef struct ReplicationStateOnDisk
151 : {
152 : ReplOriginId roident;
153 : XLogRecPtr remote_lsn;
154 : } ReplicationStateOnDisk;
155 :
156 :
157 : typedef struct ReplicationStateCtl
158 : {
159 : /* Tranche to use for per-origin LWLocks */
160 : int tranche_id;
161 : /* Array of length max_active_replication_origins */
162 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
163 : } ReplicationStateCtl;
164 :
165 : /* Global variable for per-transaction replication origin state */
166 : ReplOriginXactState replorigin_xact_state = {
167 : .origin = InvalidReplOriginId, /* assumed identity */
168 : .origin_lsn = InvalidXLogRecPtr,
169 : .origin_timestamp = 0
170 : };
171 :
172 : /*
173 : * Base address into a shared memory array of replication states of size
174 : * max_active_replication_origins.
175 : */
176 : static ReplicationState *replication_states;
177 :
178 : /*
179 : * Actual shared memory block (replication_states[] is now part of this).
180 : */
181 : static ReplicationStateCtl *replication_states_ctl;
182 :
183 : /*
184 : * We keep a pointer to this backend's ReplicationState to avoid having to
185 : * search the replication_states array in replorigin_session_advance for each
186 : * remote commit. (Ownership of a backend's own entry can only be changed by
187 : * that backend.)
188 : */
189 : static ReplicationState *session_replication_state = NULL;
190 :
191 : /* Magic for on disk files. */
192 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
193 :
194 : static void
195 136 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
196 : {
197 136 : if (check_origins && max_active_replication_origins == 0)
198 0 : ereport(ERROR,
199 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
200 : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
201 :
202 136 : if (!recoveryOK && RecoveryInProgress())
203 0 : ereport(ERROR,
204 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
205 : errmsg("cannot manipulate replication origins during recovery")));
206 136 : }
207 :
208 :
209 : /*
210 : * IsReservedOriginName
211 : * True iff name is either "none" or "any".
212 : */
213 : static bool
214 26 : IsReservedOriginName(const char *name)
215 : {
216 50 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
217 24 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
218 : }
219 :
220 : /* ---------------------------------------------------------------------------
221 : * Functions for working with replication origins themselves.
222 : * ---------------------------------------------------------------------------
223 : */
224 :
225 : /*
226 : * Check for a persistent replication origin identified by name.
227 : *
228 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
229 : */
230 : ReplOriginId
231 2024 : replorigin_by_name(const char *roname, bool missing_ok)
232 : {
233 : Form_pg_replication_origin ident;
234 2024 : Oid roident = InvalidOid;
235 : HeapTuple tuple;
236 : Datum roname_d;
237 :
238 2024 : roname_d = CStringGetTextDatum(roname);
239 :
240 2024 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
241 2024 : if (HeapTupleIsValid(tuple))
242 : {
243 1264 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
244 1264 : roident = ident->roident;
245 1264 : ReleaseSysCache(tuple);
246 : }
247 760 : else if (!missing_ok)
248 8 : ereport(ERROR,
249 : (errcode(ERRCODE_UNDEFINED_OBJECT),
250 : errmsg("replication origin \"%s\" does not exist",
251 : roname)));
252 :
253 2016 : return roident;
254 : }
255 :
256 : /*
257 : * Create a replication origin.
258 : *
259 : * Needs to be called in a transaction.
260 : */
261 : ReplOriginId
262 742 : replorigin_create(const char *roname)
263 : {
264 : Oid roident;
265 742 : HeapTuple tuple = NULL;
266 : Relation rel;
267 : Datum roname_d;
268 : SnapshotData SnapshotDirty;
269 : SysScanDesc scan;
270 : ScanKeyData key;
271 :
272 : /*
273 : * To avoid needing a TOAST table for pg_replication_origin, we limit
274 : * replication origin names to 512 bytes. This should be more than enough
275 : * for all practical use.
276 : */
277 742 : if (strlen(roname) > MAX_RONAME_LEN)
278 6 : ereport(ERROR,
279 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
280 : errmsg("replication origin name is too long"),
281 : errdetail("Replication origin names must be no longer than %d bytes.",
282 : MAX_RONAME_LEN)));
283 :
284 736 : roname_d = CStringGetTextDatum(roname);
285 :
286 : Assert(IsTransactionState());
287 :
288 : /*
289 : * We need the numeric replication origin to be 16bit wide, so we cannot
290 : * rely on the normal oid allocation. Instead we simply scan
291 : * pg_replication_origin for the first unused id. That's not particularly
292 : * efficient, but this should be a fairly infrequent operation - we can
293 : * easily spend a bit more code on this when it turns out it needs to be
294 : * faster.
295 : *
296 : * We handle concurrency by taking an exclusive lock (allowing reads!)
297 : * over the table for the duration of the search. Because we use a "dirty
298 : * snapshot" we can read rows that other in-progress sessions have
299 : * written, even though they would be invisible with normal snapshots. Due
300 : * to the exclusive lock there's no danger that new rows can appear while
301 : * we're checking.
302 : */
303 736 : InitDirtySnapshot(SnapshotDirty);
304 :
305 736 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
306 :
307 : /*
308 : * We want to be able to access pg_replication_origin without setting up a
309 : * snapshot. To make that safe, it needs to not have a TOAST table, since
310 : * TOASTed data cannot be fetched without a snapshot. As of this writing,
311 : * its only varlena column is roname, which we limit to 512 bytes to avoid
312 : * needing out-of-line storage. If you add a TOAST table to this catalog,
313 : * be sure to set up a snapshot everywhere it might be needed. For more
314 : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
315 : */
316 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
317 :
318 1332 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
319 : {
320 : bool nulls[Natts_pg_replication_origin];
321 : Datum values[Natts_pg_replication_origin];
322 : bool collides;
323 :
324 1332 : CHECK_FOR_INTERRUPTS();
325 :
326 1332 : ScanKeyInit(&key,
327 : Anum_pg_replication_origin_roident,
328 : BTEqualStrategyNumber, F_OIDEQ,
329 : ObjectIdGetDatum(roident));
330 :
331 1332 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
332 : true /* indexOK */ ,
333 : &SnapshotDirty,
334 : 1, &key);
335 :
336 1332 : collides = HeapTupleIsValid(systable_getnext(scan));
337 :
338 1332 : systable_endscan(scan);
339 :
340 1332 : if (!collides)
341 : {
342 : /*
343 : * Ok, found an unused roident, insert the new row and do a CCI,
344 : * so our callers can look it up if they want to.
345 : */
346 736 : memset(&nulls, 0, sizeof(nulls));
347 :
348 736 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
349 736 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
350 :
351 736 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
352 736 : CatalogTupleInsert(rel, tuple);
353 734 : CommandCounterIncrement();
354 734 : break;
355 : }
356 : }
357 :
358 : /* now release lock again, */
359 734 : table_close(rel, ExclusiveLock);
360 :
361 734 : if (tuple == NULL)
362 0 : ereport(ERROR,
363 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
364 : errmsg("could not find free replication origin ID")));
365 :
366 734 : heap_freetuple(tuple);
367 734 : return roident;
368 : }
369 :
370 : /*
371 : * Helper function to drop a replication origin.
372 : */
373 : static void
374 626 : replorigin_state_clear(ReplOriginId roident, bool nowait)
375 : {
376 : int i;
377 :
378 : /*
379 : * Clean up the slot state info, if there is any matching slot.
380 : */
381 626 : restart:
382 626 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
383 :
384 2066 : for (i = 0; i < max_active_replication_origins; i++)
385 : {
386 1974 : ReplicationState *state = &replication_states[i];
387 :
388 1974 : if (state->roident == roident)
389 : {
390 : /* found our slot, is it busy? */
391 534 : if (state->refcount > 0)
392 : {
393 : ConditionVariable *cv;
394 :
395 0 : if (nowait)
396 0 : ereport(ERROR,
397 : (errcode(ERRCODE_OBJECT_IN_USE),
398 : (state->acquired_by != 0)
399 : ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
400 : state->roident,
401 : state->acquired_by)
402 : : errmsg("could not drop replication origin with ID %d, in use by another process",
403 : state->roident)));
404 :
405 : /*
406 : * We must wait and then retry. Since we don't know which CV
407 : * to wait on until here, we can't readily use
408 : * ConditionVariablePrepareToSleep (calling it here would be
409 : * wrong, since we could miss the signal if we did so); just
410 : * use ConditionVariableSleep directly.
411 : */
412 0 : cv = &state->origin_cv;
413 :
414 0 : LWLockRelease(ReplicationOriginLock);
415 :
416 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
417 0 : goto restart;
418 : }
419 :
420 : /* first make a WAL log entry */
421 : {
422 : xl_replorigin_drop xlrec;
423 :
424 534 : xlrec.node_id = roident;
425 534 : XLogBeginInsert();
426 534 : XLogRegisterData(&xlrec, sizeof(xlrec));
427 534 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
428 : }
429 :
430 : /* then clear the in-memory slot */
431 534 : state->roident = InvalidReplOriginId;
432 534 : state->remote_lsn = InvalidXLogRecPtr;
433 534 : state->local_lsn = InvalidXLogRecPtr;
434 534 : break;
435 : }
436 : }
437 626 : LWLockRelease(ReplicationOriginLock);
438 626 : ConditionVariableCancelSleep();
439 626 : }
440 :
441 : /*
442 : * Drop replication origin (by name).
443 : *
444 : * Needs to be called in a transaction.
445 : */
446 : void
447 1000 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
448 : {
449 : ReplOriginId roident;
450 : Relation rel;
451 : HeapTuple tuple;
452 :
453 : Assert(IsTransactionState());
454 :
455 1000 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
456 :
457 1000 : roident = replorigin_by_name(name, missing_ok);
458 :
459 : /* Lock the origin to prevent concurrent drops. */
460 998 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
461 : AccessExclusiveLock);
462 :
463 998 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
464 998 : if (!HeapTupleIsValid(tuple))
465 : {
466 372 : if (!missing_ok)
467 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
468 : roident);
469 :
470 : /*
471 : * We don't need to retain the locks if the origin is already dropped.
472 : */
473 372 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
474 : AccessExclusiveLock);
475 372 : table_close(rel, RowExclusiveLock);
476 372 : return;
477 : }
478 :
479 626 : replorigin_state_clear(roident, nowait);
480 :
481 : /*
482 : * Now, we can delete the catalog entry.
483 : */
484 626 : CatalogTupleDelete(rel, &tuple->t_self);
485 626 : ReleaseSysCache(tuple);
486 :
487 626 : CommandCounterIncrement();
488 :
489 : /* We keep the lock on pg_replication_origin until commit */
490 626 : table_close(rel, NoLock);
491 : }
492 :
493 : /*
494 : * Lookup replication origin via its oid and return the name.
495 : *
496 : * The external name is palloc'd in the calling context.
497 : *
498 : * Returns true if the origin is known, false otherwise.
499 : */
500 : bool
501 56 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
502 : {
503 : HeapTuple tuple;
504 : Form_pg_replication_origin ric;
505 :
506 : Assert(OidIsValid((Oid) roident));
507 : Assert(roident != InvalidReplOriginId);
508 : Assert(roident != DoNotReplicateId);
509 :
510 56 : tuple = SearchSysCache1(REPLORIGIDENT,
511 : ObjectIdGetDatum((Oid) roident));
512 :
513 56 : if (HeapTupleIsValid(tuple))
514 : {
515 50 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
516 50 : *roname = text_to_cstring(&ric->roname);
517 50 : ReleaseSysCache(tuple);
518 :
519 50 : return true;
520 : }
521 : else
522 : {
523 6 : *roname = NULL;
524 :
525 6 : if (!missing_ok)
526 0 : ereport(ERROR,
527 : (errcode(ERRCODE_UNDEFINED_OBJECT),
528 : errmsg("replication origin with ID %d does not exist",
529 : roident)));
530 :
531 6 : return false;
532 : }
533 : }
534 :
535 :
536 : /* ---------------------------------------------------------------------------
537 : * Functions for handling replication progress.
538 : * ---------------------------------------------------------------------------
539 : */
540 :
541 : Size
542 8810 : ReplicationOriginShmemSize(void)
543 : {
544 8810 : Size size = 0;
545 :
546 8810 : if (max_active_replication_origins == 0)
547 4 : return size;
548 :
549 8806 : size = add_size(size, offsetof(ReplicationStateCtl, states));
550 :
551 8806 : size = add_size(size,
552 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
553 8806 : return size;
554 : }
555 :
556 : void
557 2280 : ReplicationOriginShmemInit(void)
558 : {
559 : bool found;
560 :
561 2280 : if (max_active_replication_origins == 0)
562 2 : return;
563 :
564 2278 : replication_states_ctl = (ReplicationStateCtl *)
565 2278 : ShmemInitStruct("ReplicationOriginState",
566 : ReplicationOriginShmemSize(),
567 : &found);
568 2278 : replication_states = replication_states_ctl->states;
569 :
570 2278 : if (!found)
571 : {
572 : int i;
573 :
574 186652 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
575 :
576 2278 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
577 :
578 25040 : for (i = 0; i < max_active_replication_origins; i++)
579 : {
580 22762 : LWLockInitialize(&replication_states[i].lock,
581 22762 : replication_states_ctl->tranche_id);
582 22762 : ConditionVariableInit(&replication_states[i].origin_cv);
583 : }
584 : }
585 : }
586 :
587 : /* ---------------------------------------------------------------------------
588 : * Perform a checkpoint of each replication origin's progress with respect to
589 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
590 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
591 : * if the transactions were originally committed asynchronously.
592 : *
593 : * We store checkpoints in the following format:
594 : * +-------+------------------------+------------------+-----+--------+
595 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
596 : * +-------+------------------------+------------------+-----+--------+
597 : *
598 : * So its just the magic, followed by the statically sized
599 : * ReplicationStateOnDisk structs. Note that the maximum number of
600 : * ReplicationState is determined by max_active_replication_origins.
601 : * ---------------------------------------------------------------------------
602 : */
603 : void
604 3576 : CheckPointReplicationOrigin(void)
605 : {
606 3576 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
607 3576 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
608 : int tmpfd;
609 : int i;
610 3576 : uint32 magic = REPLICATION_STATE_MAGIC;
611 : pg_crc32c crc;
612 :
613 3576 : if (max_active_replication_origins == 0)
614 2 : return;
615 :
616 3574 : INIT_CRC32C(crc);
617 :
618 : /* make sure no old temp file is remaining */
619 3574 : if (unlink(tmppath) < 0 && errno != ENOENT)
620 0 : ereport(PANIC,
621 : (errcode_for_file_access(),
622 : errmsg("could not remove file \"%s\": %m",
623 : tmppath)));
624 :
625 : /*
626 : * no other backend can perform this at the same time; only one checkpoint
627 : * can happen at a time.
628 : */
629 3574 : tmpfd = OpenTransientFile(tmppath,
630 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
631 3574 : if (tmpfd < 0)
632 0 : ereport(PANIC,
633 : (errcode_for_file_access(),
634 : errmsg("could not create file \"%s\": %m",
635 : tmppath)));
636 :
637 : /* write magic */
638 3574 : errno = 0;
639 3574 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
640 : {
641 : /* if write didn't set errno, assume problem is no disk space */
642 0 : if (errno == 0)
643 0 : errno = ENOSPC;
644 0 : ereport(PANIC,
645 : (errcode_for_file_access(),
646 : errmsg("could not write to file \"%s\": %m",
647 : tmppath)));
648 : }
649 3574 : COMP_CRC32C(crc, &magic, sizeof(magic));
650 :
651 : /* prevent concurrent creations/drops */
652 3574 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
653 :
654 : /* write actual data */
655 39314 : for (i = 0; i < max_active_replication_origins; i++)
656 : {
657 : ReplicationStateOnDisk disk_state;
658 35740 : ReplicationState *curstate = &replication_states[i];
659 : XLogRecPtr local_lsn;
660 :
661 35740 : if (curstate->roident == InvalidReplOriginId)
662 35634 : continue;
663 :
664 : /* zero, to avoid uninitialized padding bytes */
665 106 : memset(&disk_state, 0, sizeof(disk_state));
666 :
667 106 : LWLockAcquire(&curstate->lock, LW_SHARED);
668 :
669 106 : disk_state.roident = curstate->roident;
670 :
671 106 : disk_state.remote_lsn = curstate->remote_lsn;
672 106 : local_lsn = curstate->local_lsn;
673 :
674 106 : LWLockRelease(&curstate->lock);
675 :
676 : /* make sure we only write out a commit that's persistent */
677 106 : XLogFlush(local_lsn);
678 :
679 106 : errno = 0;
680 106 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
681 : sizeof(disk_state))
682 : {
683 : /* if write didn't set errno, assume problem is no disk space */
684 0 : if (errno == 0)
685 0 : errno = ENOSPC;
686 0 : ereport(PANIC,
687 : (errcode_for_file_access(),
688 : errmsg("could not write to file \"%s\": %m",
689 : tmppath)));
690 : }
691 :
692 106 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
693 : }
694 :
695 3574 : LWLockRelease(ReplicationOriginLock);
696 :
697 : /* write out the CRC */
698 3574 : FIN_CRC32C(crc);
699 3574 : errno = 0;
700 3574 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
701 : {
702 : /* if write didn't set errno, assume problem is no disk space */
703 0 : if (errno == 0)
704 0 : errno = ENOSPC;
705 0 : ereport(PANIC,
706 : (errcode_for_file_access(),
707 : errmsg("could not write to file \"%s\": %m",
708 : tmppath)));
709 : }
710 :
711 3574 : if (CloseTransientFile(tmpfd) != 0)
712 0 : ereport(PANIC,
713 : (errcode_for_file_access(),
714 : errmsg("could not close file \"%s\": %m",
715 : tmppath)));
716 :
717 : /* fsync, rename to permanent file, fsync file and directory */
718 3574 : durable_rename(tmppath, path, PANIC);
719 : }
720 :
721 : /*
722 : * Recover replication replay status from checkpoint data saved earlier by
723 : * CheckPointReplicationOrigin.
724 : *
725 : * This only needs to be called at startup and *not* during every checkpoint
726 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
727 : * state thereafter can be recovered by looking at commit records.
728 : */
729 : void
730 1984 : StartupReplicationOrigin(void)
731 : {
732 1984 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
733 : int fd;
734 : int readBytes;
735 1984 : uint32 magic = REPLICATION_STATE_MAGIC;
736 1984 : int last_state = 0;
737 : pg_crc32c file_crc;
738 : pg_crc32c crc;
739 :
740 : /* don't want to overwrite already existing state */
741 : #ifdef USE_ASSERT_CHECKING
742 : static bool already_started = false;
743 :
744 : Assert(!already_started);
745 : already_started = true;
746 : #endif
747 :
748 1984 : if (max_active_replication_origins == 0)
749 104 : return;
750 :
751 1982 : INIT_CRC32C(crc);
752 :
753 1982 : elog(DEBUG2, "starting up replication origin progress state");
754 :
755 1982 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
756 :
757 : /*
758 : * might have had max_active_replication_origins == 0 last run, or we just
759 : * brought up a standby.
760 : */
761 1982 : if (fd < 0 && errno == ENOENT)
762 102 : return;
763 1880 : else if (fd < 0)
764 0 : ereport(PANIC,
765 : (errcode_for_file_access(),
766 : errmsg("could not open file \"%s\": %m",
767 : path)));
768 :
769 : /* verify magic, that is written even if nothing was active */
770 1880 : readBytes = read(fd, &magic, sizeof(magic));
771 1880 : if (readBytes != sizeof(magic))
772 : {
773 0 : if (readBytes < 0)
774 0 : ereport(PANIC,
775 : (errcode_for_file_access(),
776 : errmsg("could not read file \"%s\": %m",
777 : path)));
778 : else
779 0 : ereport(PANIC,
780 : (errcode(ERRCODE_DATA_CORRUPTED),
781 : errmsg("could not read file \"%s\": read %d of %zu",
782 : path, readBytes, sizeof(magic))));
783 : }
784 1880 : COMP_CRC32C(crc, &magic, sizeof(magic));
785 :
786 1880 : if (magic != REPLICATION_STATE_MAGIC)
787 0 : ereport(PANIC,
788 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
789 : magic, REPLICATION_STATE_MAGIC)));
790 :
791 : /* we can skip locking here, no other access is possible */
792 :
793 : /* recover individual states, until there are no more to be found */
794 : while (true)
795 62 : {
796 : ReplicationStateOnDisk disk_state;
797 :
798 1942 : readBytes = read(fd, &disk_state, sizeof(disk_state));
799 :
800 1942 : if (readBytes < 0)
801 : {
802 0 : ereport(PANIC,
803 : (errcode_for_file_access(),
804 : errmsg("could not read file \"%s\": %m",
805 : path)));
806 : }
807 :
808 : /* no further data */
809 1942 : if (readBytes == sizeof(crc))
810 : {
811 1880 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
812 1880 : break;
813 : }
814 :
815 62 : if (readBytes != sizeof(disk_state))
816 : {
817 0 : ereport(PANIC,
818 : (errcode_for_file_access(),
819 : errmsg("could not read file \"%s\": read %d of %zu",
820 : path, readBytes, sizeof(disk_state))));
821 : }
822 :
823 62 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
824 :
825 62 : if (last_state == max_active_replication_origins)
826 0 : ereport(PANIC,
827 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
828 : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
829 :
830 : /* copy data to shared memory */
831 62 : replication_states[last_state].roident = disk_state.roident;
832 62 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
833 62 : last_state++;
834 :
835 62 : ereport(LOG,
836 : errmsg("recovered replication state of node %d to %X/%08X",
837 : disk_state.roident,
838 : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
839 : }
840 :
841 : /* now check checksum */
842 1880 : FIN_CRC32C(crc);
843 1880 : if (file_crc != crc)
844 0 : ereport(PANIC,
845 : (errcode(ERRCODE_DATA_CORRUPTED),
846 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
847 : crc, file_crc)));
848 :
849 1880 : if (CloseTransientFile(fd) != 0)
850 0 : ereport(PANIC,
851 : (errcode_for_file_access(),
852 : errmsg("could not close file \"%s\": %m",
853 : path)));
854 : }
855 :
856 : void
857 8 : replorigin_redo(XLogReaderState *record)
858 : {
859 8 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
860 :
861 8 : switch (info)
862 : {
863 4 : case XLOG_REPLORIGIN_SET:
864 : {
865 4 : xl_replorigin_set *xlrec =
866 4 : (xl_replorigin_set *) XLogRecGetData(record);
867 :
868 4 : replorigin_advance(xlrec->node_id,
869 : xlrec->remote_lsn, record->EndRecPtr,
870 4 : xlrec->force /* backward */ ,
871 : false /* WAL log */ );
872 4 : break;
873 : }
874 4 : case XLOG_REPLORIGIN_DROP:
875 : {
876 : xl_replorigin_drop *xlrec;
877 : int i;
878 :
879 4 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
880 :
881 6 : for (i = 0; i < max_active_replication_origins; i++)
882 : {
883 6 : ReplicationState *state = &replication_states[i];
884 :
885 : /* found our slot */
886 6 : if (state->roident == xlrec->node_id)
887 : {
888 : /* reset entry */
889 4 : state->roident = InvalidReplOriginId;
890 4 : state->remote_lsn = InvalidXLogRecPtr;
891 4 : state->local_lsn = InvalidXLogRecPtr;
892 4 : break;
893 : }
894 : }
895 4 : break;
896 : }
897 0 : default:
898 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
899 : }
900 8 : }
901 :
902 :
903 : /*
904 : * Tell the replication origin progress machinery that a commit from 'node'
905 : * that originated at the LSN remote_commit on the remote node was replayed
906 : * successfully and that we don't need to do so again. In combination with
907 : * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
908 : * that ensures we won't lose knowledge about that after a crash if the
909 : * transaction had a persistent effect (think of asynchronous commits).
910 : *
911 : * local_commit needs to be a local LSN of the commit so that we can make sure
912 : * upon a checkpoint that enough WAL has been persisted to disk.
913 : *
914 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
915 : * unless running in recovery.
916 : */
917 : void
918 484 : replorigin_advance(ReplOriginId node,
919 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
920 : bool go_backward, bool wal_log)
921 : {
922 : int i;
923 484 : ReplicationState *replication_state = NULL;
924 484 : ReplicationState *free_state = NULL;
925 :
926 : Assert(node != InvalidReplOriginId);
927 :
928 : /* we don't track DoNotReplicateId */
929 484 : if (node == DoNotReplicateId)
930 0 : return;
931 :
932 : /*
933 : * XXX: For the case where this is called by WAL replay, it'd be more
934 : * efficient to restore into a backend local hashtable and only dump into
935 : * shmem after recovery is finished. Let's wait with implementing that
936 : * till it's shown to be a measurable expense
937 : */
938 :
939 : /* Lock exclusively, as we may have to create a new table entry. */
940 484 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
941 :
942 : /*
943 : * Search for either an existing slot for the origin, or a free one we can
944 : * use.
945 : */
946 4430 : for (i = 0; i < max_active_replication_origins; i++)
947 : {
948 4038 : ReplicationState *curstate = &replication_states[i];
949 :
950 : /* remember where to insert if necessary */
951 4038 : if (curstate->roident == InvalidReplOriginId &&
952 : free_state == NULL)
953 : {
954 392 : free_state = curstate;
955 392 : continue;
956 : }
957 :
958 : /* not our slot */
959 3646 : if (curstate->roident != node)
960 : {
961 3554 : continue;
962 : }
963 :
964 : /* ok, found slot */
965 92 : replication_state = curstate;
966 :
967 92 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
968 :
969 : /* Make sure it's not used by somebody else */
970 92 : if (replication_state->refcount > 0)
971 : {
972 0 : ereport(ERROR,
973 : (errcode(ERRCODE_OBJECT_IN_USE),
974 : (replication_state->acquired_by != 0)
975 : ? errmsg("replication origin with ID %d is already active for PID %d",
976 : replication_state->roident,
977 : replication_state->acquired_by)
978 : : errmsg("replication origin with ID %d is already active in another process",
979 : replication_state->roident)));
980 : }
981 :
982 92 : break;
983 : }
984 :
985 484 : if (replication_state == NULL && free_state == NULL)
986 0 : ereport(ERROR,
987 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
988 : errmsg("could not find free replication state slot for replication origin with ID %d",
989 : node),
990 : errhint("Increase \"max_active_replication_origins\" and try again.")));
991 :
992 484 : if (replication_state == NULL)
993 : {
994 : /* initialize new slot */
995 392 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
996 392 : replication_state = free_state;
997 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
998 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
999 392 : replication_state->roident = node;
1000 : }
1001 :
1002 : Assert(replication_state->roident != InvalidReplOriginId);
1003 :
1004 : /*
1005 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1006 : * and the standby gets the message. Primarily this will be called during
1007 : * WAL replay (of commit records) where no WAL logging is necessary.
1008 : */
1009 484 : if (wal_log)
1010 : {
1011 : xl_replorigin_set xlrec;
1012 :
1013 402 : xlrec.remote_lsn = remote_commit;
1014 402 : xlrec.node_id = node;
1015 402 : xlrec.force = go_backward;
1016 :
1017 402 : XLogBeginInsert();
1018 402 : XLogRegisterData(&xlrec, sizeof(xlrec));
1019 :
1020 402 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1021 : }
1022 :
1023 : /*
1024 : * Due to - harmless - race conditions during a checkpoint we could see
1025 : * values here that are older than the ones we already have in memory. We
1026 : * could also see older values for prepared transactions when the prepare
1027 : * is sent at a later point of time along with commit prepared and there
1028 : * are other transactions commits between prepare and commit prepared. See
1029 : * ReorderBufferFinishPrepared. Don't overwrite those.
1030 : */
1031 484 : if (go_backward || replication_state->remote_lsn < remote_commit)
1032 470 : replication_state->remote_lsn = remote_commit;
1033 484 : if (XLogRecPtrIsValid(local_commit) &&
1034 76 : (go_backward || replication_state->local_lsn < local_commit))
1035 80 : replication_state->local_lsn = local_commit;
1036 484 : LWLockRelease(&replication_state->lock);
1037 :
1038 : /*
1039 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1040 : * otherwise be dropped anytime.
1041 : */
1042 484 : LWLockRelease(ReplicationOriginLock);
1043 : }
1044 :
1045 :
1046 : XLogRecPtr
1047 16 : replorigin_get_progress(ReplOriginId node, bool flush)
1048 : {
1049 : int i;
1050 16 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1051 16 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1052 :
1053 : /* prevent slots from being concurrently dropped */
1054 16 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1055 :
1056 76 : for (i = 0; i < max_active_replication_origins; i++)
1057 : {
1058 : ReplicationState *state;
1059 :
1060 70 : state = &replication_states[i];
1061 :
1062 70 : if (state->roident == node)
1063 : {
1064 10 : LWLockAcquire(&state->lock, LW_SHARED);
1065 :
1066 10 : remote_lsn = state->remote_lsn;
1067 10 : local_lsn = state->local_lsn;
1068 :
1069 10 : LWLockRelease(&state->lock);
1070 :
1071 10 : break;
1072 : }
1073 : }
1074 :
1075 16 : LWLockRelease(ReplicationOriginLock);
1076 :
1077 16 : if (flush && XLogRecPtrIsValid(local_lsn))
1078 2 : XLogFlush(local_lsn);
1079 :
1080 16 : return remote_lsn;
1081 : }
1082 :
1083 : /* Helper function to reset the session replication origin */
1084 : static void
1085 992 : replorigin_session_reset_internal(void)
1086 : {
1087 : ConditionVariable *cv;
1088 :
1089 : Assert(session_replication_state != NULL);
1090 :
1091 992 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1092 :
1093 : /* The origin must be held by at least one process at this point. */
1094 : Assert(session_replication_state->refcount > 0);
1095 :
1096 : /*
1097 : * Reset the PID only if the current session is the first to set up this
1098 : * origin. This avoids clearing the first process's PID when any other
1099 : * session releases the origin.
1100 : */
1101 992 : if (session_replication_state->acquired_by == MyProcPid)
1102 964 : session_replication_state->acquired_by = 0;
1103 :
1104 992 : session_replication_state->refcount--;
1105 :
1106 992 : cv = &session_replication_state->origin_cv;
1107 992 : session_replication_state = NULL;
1108 :
1109 992 : LWLockRelease(ReplicationOriginLock);
1110 :
1111 992 : ConditionVariableBroadcast(cv);
1112 992 : }
1113 :
1114 : /*
1115 : * Tear down a (possibly) configured session replication origin during process
1116 : * exit.
1117 : */
1118 : static void
1119 984 : ReplicationOriginExitCleanup(int code, Datum arg)
1120 : {
1121 984 : if (session_replication_state == NULL)
1122 380 : return;
1123 :
1124 604 : replorigin_session_reset_internal();
1125 : }
1126 :
1127 : /*
1128 : * Setup a replication origin in the shared memory struct if it doesn't
1129 : * already exist and cache access to the specific ReplicationSlot so the
1130 : * array doesn't have to be searched when calling
1131 : * replorigin_session_advance().
1132 : *
1133 : * Normally only one such cached origin can exist per process so the cached
1134 : * value can only be set again after the previous value is torn down with
1135 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1136 : * (meaning the slot is not allowed to be already acquired by another process).
1137 : *
1138 : * However, sometimes multiple processes can safely re-use the same origin slot
1139 : * (for example, multiple parallel apply processes can safely use the same
1140 : * origin, provided they maintain commit order by allowing only one process to
1141 : * commit at a time). For this case the first process must pass acquired_by =
1142 : * 0, and then the other processes sharing that same origin can pass
1143 : * acquired_by = PID of the first process.
1144 : */
1145 : void
1146 996 : replorigin_session_setup(ReplOriginId node, int acquired_by)
1147 : {
1148 : static bool registered_cleanup;
1149 : int i;
1150 996 : int free_slot = -1;
1151 :
1152 996 : if (!registered_cleanup)
1153 : {
1154 984 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1155 984 : registered_cleanup = true;
1156 : }
1157 :
1158 : Assert(max_active_replication_origins > 0);
1159 :
1160 996 : if (session_replication_state != NULL)
1161 2 : ereport(ERROR,
1162 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1163 : errmsg("cannot setup replication origin when one is already setup")));
1164 :
1165 : /* Lock exclusively, as we may have to create a new table entry. */
1166 994 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1167 :
1168 : /*
1169 : * Search for either an existing slot for the origin, or a free one we can
1170 : * use.
1171 : */
1172 3972 : for (i = 0; i < max_active_replication_origins; i++)
1173 : {
1174 3732 : ReplicationState *curstate = &replication_states[i];
1175 :
1176 : /* remember where to insert if necessary */
1177 3732 : if (curstate->roident == InvalidReplOriginId &&
1178 : free_slot == -1)
1179 : {
1180 244 : free_slot = i;
1181 244 : continue;
1182 : }
1183 :
1184 : /* not our slot */
1185 3488 : if (curstate->roident != node)
1186 2734 : continue;
1187 :
1188 754 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1189 : {
1190 0 : ereport(ERROR,
1191 : (errcode(ERRCODE_OBJECT_IN_USE),
1192 : errmsg("replication origin with ID %d is already active for PID %d",
1193 : curstate->roident, curstate->acquired_by)));
1194 : }
1195 :
1196 754 : else if (curstate->acquired_by != acquired_by)
1197 : {
1198 0 : ereport(ERROR,
1199 : (errcode(ERRCODE_OBJECT_IN_USE),
1200 : errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1201 : node, acquired_by)));
1202 : }
1203 :
1204 : /*
1205 : * The origin is in use, but PID is not recorded. This can happen if
1206 : * the process that originally acquired the origin exited without
1207 : * releasing it. To ensure correctness, other processes cannot acquire
1208 : * the origin until all processes currently using it have released it.
1209 : */
1210 754 : else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1211 0 : ereport(ERROR,
1212 : (errcode(ERRCODE_OBJECT_IN_USE),
1213 : errmsg("replication origin with ID %d is already active in another process",
1214 : curstate->roident)));
1215 :
1216 : /* ok, found slot */
1217 754 : session_replication_state = curstate;
1218 754 : break;
1219 : }
1220 :
1221 :
1222 994 : if (session_replication_state == NULL && free_slot == -1)
1223 0 : ereport(ERROR,
1224 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1225 : errmsg("could not find free replication state slot for replication origin with ID %d",
1226 : node),
1227 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1228 994 : else if (session_replication_state == NULL)
1229 : {
1230 240 : if (acquired_by)
1231 2 : ereport(ERROR,
1232 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1233 : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1234 : acquired_by, node)));
1235 :
1236 : /* initialize new slot */
1237 238 : session_replication_state = &replication_states[free_slot];
1238 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1239 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
1240 238 : session_replication_state->roident = node;
1241 : }
1242 :
1243 :
1244 : Assert(session_replication_state->roident != InvalidReplOriginId);
1245 :
1246 992 : if (acquired_by == 0)
1247 : {
1248 964 : session_replication_state->acquired_by = MyProcPid;
1249 : Assert(session_replication_state->refcount == 0);
1250 : }
1251 : else
1252 : {
1253 : /*
1254 : * Sanity check: the origin must already be acquired by the process
1255 : * passed as input, and at least one process must be using it.
1256 : */
1257 : Assert(session_replication_state->acquired_by == acquired_by);
1258 : Assert(session_replication_state->refcount > 0);
1259 : }
1260 :
1261 992 : session_replication_state->refcount++;
1262 :
1263 992 : LWLockRelease(ReplicationOriginLock);
1264 :
1265 : /* probably this one is pointless */
1266 992 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1267 992 : }
1268 :
1269 : /*
1270 : * Reset replay state previously setup in this session.
1271 : *
1272 : * This function may only be called if an origin was setup with
1273 : * replorigin_session_setup().
1274 : */
1275 : void
1276 392 : replorigin_session_reset(void)
1277 : {
1278 : Assert(max_active_replication_origins != 0);
1279 :
1280 392 : if (session_replication_state == NULL)
1281 2 : ereport(ERROR,
1282 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1283 : errmsg("no replication origin is configured")));
1284 :
1285 : /*
1286 : * Restrict explicit resetting of the replication origin if it was first
1287 : * acquired by this process and others are still using it. While the
1288 : * system handles this safely (as happens if the first session exits
1289 : * without calling reset), it is best to avoid doing so.
1290 : */
1291 390 : if (session_replication_state->acquired_by == MyProcPid &&
1292 386 : session_replication_state->refcount > 1)
1293 2 : ereport(ERROR,
1294 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1295 : errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1296 : session_replication_state->roident),
1297 : errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1298 : errhint("Reset the replication origin in all other processes before retrying.")));
1299 :
1300 388 : replorigin_session_reset_internal();
1301 388 : }
1302 :
1303 : /*
1304 : * Do the same work replorigin_advance() does, just on the session's
1305 : * configured origin.
1306 : *
1307 : * This is noticeably cheaper than using replorigin_advance().
1308 : */
1309 : void
1310 2198 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1311 : {
1312 : Assert(session_replication_state != NULL);
1313 : Assert(session_replication_state->roident != InvalidReplOriginId);
1314 :
1315 2198 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1316 2198 : if (session_replication_state->local_lsn < local_commit)
1317 2198 : session_replication_state->local_lsn = local_commit;
1318 2198 : if (session_replication_state->remote_lsn < remote_commit)
1319 1034 : session_replication_state->remote_lsn = remote_commit;
1320 2198 : LWLockRelease(&session_replication_state->lock);
1321 2198 : }
1322 :
1323 : /*
1324 : * Ask the machinery about the point up to which we successfully replayed
1325 : * changes from an already setup replication origin.
1326 : */
1327 : XLogRecPtr
1328 558 : replorigin_session_get_progress(bool flush)
1329 : {
1330 : XLogRecPtr remote_lsn;
1331 : XLogRecPtr local_lsn;
1332 :
1333 : Assert(session_replication_state != NULL);
1334 :
1335 558 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1336 558 : remote_lsn = session_replication_state->remote_lsn;
1337 558 : local_lsn = session_replication_state->local_lsn;
1338 558 : LWLockRelease(&session_replication_state->lock);
1339 :
1340 558 : if (flush && XLogRecPtrIsValid(local_lsn))
1341 2 : XLogFlush(local_lsn);
1342 :
1343 558 : return remote_lsn;
1344 : }
1345 :
1346 : /*
1347 : * Clear the per-transaction replication origin state.
1348 : *
1349 : * replorigin_session_origin is also cleared if clear_origin is set.
1350 : */
1351 : void
1352 1550 : replorigin_xact_clear(bool clear_origin)
1353 : {
1354 1550 : replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
1355 1550 : replorigin_xact_state.origin_timestamp = 0;
1356 1550 : if (clear_origin)
1357 1550 : replorigin_xact_state.origin = InvalidReplOriginId;
1358 1550 : }
1359 :
1360 :
1361 : /* ---------------------------------------------------------------------------
1362 : * SQL functions for working with replication origin.
1363 : *
1364 : * These mostly should be fairly short wrappers around more generic functions.
1365 : * ---------------------------------------------------------------------------
1366 : */
1367 :
1368 : /*
1369 : * Create replication origin for the passed in name, and return the assigned
1370 : * oid.
1371 : */
1372 : Datum
1373 28 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1374 : {
1375 : char *name;
1376 : ReplOriginId roident;
1377 :
1378 28 : replorigin_check_prerequisites(false, false);
1379 :
1380 28 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1381 :
1382 : /*
1383 : * Replication origins "any and "none" are reserved for system options.
1384 : * The origins "pg_xxx" are reserved for internal use.
1385 : */
1386 28 : if (IsReservedName(name) || IsReservedOriginName(name))
1387 6 : ereport(ERROR,
1388 : (errcode(ERRCODE_RESERVED_NAME),
1389 : errmsg("replication origin name \"%s\" is reserved",
1390 : name),
1391 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1392 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1393 :
1394 : /*
1395 : * If built with appropriate switch, whine when regression-testing
1396 : * conventions for replication origin names are violated.
1397 : */
1398 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1399 : if (strncmp(name, "regress_", 8) != 0)
1400 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1401 : #endif
1402 :
1403 22 : roident = replorigin_create(name);
1404 :
1405 14 : pfree(name);
1406 :
1407 14 : PG_RETURN_OID(roident);
1408 : }
1409 :
1410 : /*
1411 : * Drop replication origin.
1412 : */
1413 : Datum
1414 18 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1415 : {
1416 : char *name;
1417 :
1418 18 : replorigin_check_prerequisites(false, false);
1419 :
1420 18 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1421 :
1422 18 : replorigin_drop_by_name(name, false, true);
1423 :
1424 16 : pfree(name);
1425 :
1426 16 : PG_RETURN_VOID();
1427 : }
1428 :
1429 : /*
1430 : * Return oid of a replication origin.
1431 : */
1432 : Datum
1433 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1434 : {
1435 : char *name;
1436 : ReplOriginId roident;
1437 :
1438 0 : replorigin_check_prerequisites(false, false);
1439 :
1440 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1441 0 : roident = replorigin_by_name(name, true);
1442 :
1443 0 : pfree(name);
1444 :
1445 0 : if (OidIsValid(roident))
1446 0 : PG_RETURN_OID(roident);
1447 0 : PG_RETURN_NULL();
1448 : }
1449 :
1450 : /*
1451 : * Setup a replication origin for this session.
1452 : */
1453 : Datum
1454 22 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1455 : {
1456 : char *name;
1457 : ReplOriginId origin;
1458 : int pid;
1459 :
1460 22 : replorigin_check_prerequisites(true, false);
1461 :
1462 22 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1463 22 : origin = replorigin_by_name(name, false);
1464 20 : pid = PG_GETARG_INT32(1);
1465 20 : replorigin_session_setup(origin, pid);
1466 :
1467 16 : replorigin_xact_state.origin = origin;
1468 :
1469 16 : pfree(name);
1470 :
1471 16 : PG_RETURN_VOID();
1472 : }
1473 :
1474 : /*
1475 : * Reset previously setup origin in this session
1476 : */
1477 : Datum
1478 20 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1479 : {
1480 20 : replorigin_check_prerequisites(true, false);
1481 :
1482 20 : replorigin_session_reset();
1483 :
1484 16 : replorigin_xact_clear(true);
1485 :
1486 16 : PG_RETURN_VOID();
1487 : }
1488 :
1489 : /*
1490 : * Has a replication origin been setup for this session.
1491 : */
1492 : Datum
1493 8 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1494 : {
1495 8 : replorigin_check_prerequisites(false, false);
1496 :
1497 8 : PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
1498 : }
1499 :
1500 :
1501 : /*
1502 : * Return the replication progress for origin setup in the current session.
1503 : *
1504 : * If 'flush' is set to true it is ensured that the returned value corresponds
1505 : * to a local transaction that has been flushed. This is useful if asynchronous
1506 : * commits are used when replaying replicated transactions.
1507 : */
1508 : Datum
1509 4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1510 : {
1511 4 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1512 4 : bool flush = PG_GETARG_BOOL(0);
1513 :
1514 4 : replorigin_check_prerequisites(true, false);
1515 :
1516 4 : if (session_replication_state == NULL)
1517 0 : ereport(ERROR,
1518 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1519 : errmsg("no replication origin is configured")));
1520 :
1521 4 : remote_lsn = replorigin_session_get_progress(flush);
1522 :
1523 4 : if (!XLogRecPtrIsValid(remote_lsn))
1524 0 : PG_RETURN_NULL();
1525 :
1526 4 : PG_RETURN_LSN(remote_lsn);
1527 : }
1528 :
1529 : Datum
1530 2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1531 : {
1532 2 : XLogRecPtr location = PG_GETARG_LSN(0);
1533 :
1534 2 : replorigin_check_prerequisites(true, false);
1535 :
1536 2 : if (session_replication_state == NULL)
1537 0 : ereport(ERROR,
1538 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1539 : errmsg("no replication origin is configured")));
1540 :
1541 2 : replorigin_xact_state.origin_lsn = location;
1542 2 : replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1543 :
1544 2 : PG_RETURN_VOID();
1545 : }
1546 :
1547 : Datum
1548 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1549 : {
1550 0 : replorigin_check_prerequisites(true, false);
1551 :
1552 : /* Do not clear the session origin */
1553 0 : replorigin_xact_clear(false);
1554 :
1555 0 : PG_RETURN_VOID();
1556 : }
1557 :
1558 :
1559 : Datum
1560 6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1561 : {
1562 6 : text *name = PG_GETARG_TEXT_PP(0);
1563 6 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1564 : ReplOriginId node;
1565 :
1566 6 : replorigin_check_prerequisites(true, false);
1567 :
1568 : /* lock to prevent the replication origin from vanishing */
1569 6 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1570 :
1571 6 : node = replorigin_by_name(text_to_cstring(name), false);
1572 :
1573 : /*
1574 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1575 : * xact hasn't committed yet. This is why this function should be used to
1576 : * set up the initial replication state, but not for replay.
1577 : */
1578 4 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1579 : true /* go backward */ , true /* WAL log */ );
1580 :
1581 4 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1582 :
1583 4 : PG_RETURN_VOID();
1584 : }
1585 :
1586 :
1587 : /*
1588 : * Return the replication progress for an individual replication origin.
1589 : *
1590 : * If 'flush' is set to true it is ensured that the returned value corresponds
1591 : * to a local transaction that has been flushed. This is useful if asynchronous
1592 : * commits are used when replaying replicated transactions.
1593 : */
1594 : Datum
1595 6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1596 : {
1597 : char *name;
1598 : bool flush;
1599 : ReplOriginId roident;
1600 6 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1601 :
1602 6 : replorigin_check_prerequisites(true, true);
1603 :
1604 6 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1605 6 : flush = PG_GETARG_BOOL(1);
1606 :
1607 6 : roident = replorigin_by_name(name, false);
1608 : Assert(OidIsValid(roident));
1609 :
1610 4 : remote_lsn = replorigin_get_progress(roident, flush);
1611 :
1612 4 : if (!XLogRecPtrIsValid(remote_lsn))
1613 0 : PG_RETURN_NULL();
1614 :
1615 4 : PG_RETURN_LSN(remote_lsn);
1616 : }
1617 :
1618 :
1619 : Datum
1620 22 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1621 : {
1622 22 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1623 : int i;
1624 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1625 :
1626 : /* we want to return 0 rows if slot is set to zero */
1627 22 : replorigin_check_prerequisites(false, true);
1628 :
1629 22 : InitMaterializedSRF(fcinfo, 0);
1630 :
1631 : /* prevent slots from being concurrently dropped */
1632 22 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1633 :
1634 : /*
1635 : * Iterate through all possible replication_states, display if they are
1636 : * filled. Note that we do not take any locks, so slightly corrupted/out
1637 : * of date values are a possibility.
1638 : */
1639 242 : for (i = 0; i < max_active_replication_origins; i++)
1640 : {
1641 : ReplicationState *state;
1642 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1643 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1644 : char *roname;
1645 :
1646 220 : state = &replication_states[i];
1647 :
1648 : /* unused slot, nothing to display */
1649 220 : if (state->roident == InvalidReplOriginId)
1650 194 : continue;
1651 :
1652 26 : memset(values, 0, sizeof(values));
1653 26 : memset(nulls, 1, sizeof(nulls));
1654 :
1655 26 : values[0] = ObjectIdGetDatum(state->roident);
1656 26 : nulls[0] = false;
1657 :
1658 : /*
1659 : * We're not preventing the origin to be dropped concurrently, so
1660 : * silently accept that it might be gone.
1661 : */
1662 26 : if (replorigin_by_oid(state->roident, true,
1663 : &roname))
1664 : {
1665 26 : values[1] = CStringGetTextDatum(roname);
1666 26 : nulls[1] = false;
1667 : }
1668 :
1669 26 : LWLockAcquire(&state->lock, LW_SHARED);
1670 :
1671 26 : values[2] = LSNGetDatum(state->remote_lsn);
1672 26 : nulls[2] = false;
1673 :
1674 26 : values[3] = LSNGetDatum(state->local_lsn);
1675 26 : nulls[3] = false;
1676 :
1677 26 : LWLockRelease(&state->lock);
1678 :
1679 26 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1680 : values, nulls);
1681 : }
1682 :
1683 22 : LWLockRelease(ReplicationOriginLock);
1684 :
1685 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1686 :
1687 22 : return (Datum) 0;
1688 : }
|