Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consist out of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "utils/builtins.h"
92 : #include "utils/fmgroids.h"
93 : #include "utils/guc.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 :
99 : /* paths for replication origin checkpoint files */
100 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 :
103 : /* GUC variables */
104 : int max_active_replication_origins = 10;
105 :
106 : /*
107 : * Replay progress of a single remote node.
108 : */
109 : typedef struct ReplicationState
110 : {
111 : /*
112 : * Local identifier for the remote node.
113 : */
114 : RepOriginId roident;
115 :
116 : /*
117 : * Location of the latest commit from the remote side.
118 : */
119 : XLogRecPtr remote_lsn;
120 :
121 : /*
122 : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : * during a checkpoint so we know the commit record actually is safe on
124 : * disk.
125 : */
126 : XLogRecPtr local_lsn;
127 :
128 : /*
129 : * PID of backend that's acquired slot, or 0 if none.
130 : */
131 : int acquired_by;
132 :
133 : /*
134 : * Condition variable that's signaled when acquired_by changes.
135 : */
136 : ConditionVariable origin_cv;
137 :
138 : /*
139 : * Lock protecting remote_lsn and local_lsn.
140 : */
141 : LWLock lock;
142 : } ReplicationState;
143 :
144 : /*
145 : * On disk version of ReplicationState.
146 : */
147 : typedef struct ReplicationStateOnDisk
148 : {
149 : RepOriginId roident;
150 : XLogRecPtr remote_lsn;
151 : } ReplicationStateOnDisk;
152 :
153 :
154 : typedef struct ReplicationStateCtl
155 : {
156 : /* Tranche to use for per-origin LWLocks */
157 : int tranche_id;
158 : /* Array of length max_active_replication_origins */
159 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
160 : } ReplicationStateCtl;
161 :
162 : /* external variables */
163 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
164 : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
165 : TimestampTz replorigin_session_origin_timestamp = 0;
166 :
167 : /*
168 : * Base address into a shared memory array of replication states of size
169 : * max_active_replication_origins.
170 : */
171 : static ReplicationState *replication_states;
172 :
173 : /*
174 : * Actual shared memory block (replication_states[] is now part of this).
175 : */
176 : static ReplicationStateCtl *replication_states_ctl;
177 :
178 : /*
179 : * We keep a pointer to this backend's ReplicationState to avoid having to
180 : * search the replication_states array in replorigin_session_advance for each
181 : * remote commit. (Ownership of a backend's own entry can only be changed by
182 : * that backend.)
183 : */
184 : static ReplicationState *session_replication_state = NULL;
185 :
186 : /* Magic for on disk files. */
187 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188 :
189 : static void
190 94 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
191 : {
192 94 : if (check_origins && max_active_replication_origins == 0)
193 0 : ereport(ERROR,
194 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196 :
197 94 : if (!recoveryOK && RecoveryInProgress())
198 0 : ereport(ERROR,
199 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 : errmsg("cannot manipulate replication origins during recovery")));
201 94 : }
202 :
203 :
204 : /*
205 : * IsReservedOriginName
206 : * True iff name is either "none" or "any".
207 : */
208 : static bool
209 22 : IsReservedOriginName(const char *name)
210 : {
211 42 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212 20 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213 : }
214 :
215 : /* ---------------------------------------------------------------------------
216 : * Functions for working with replication origins themselves.
217 : * ---------------------------------------------------------------------------
218 : */
219 :
220 : /*
221 : * Check for a persistent replication origin identified by name.
222 : *
223 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
224 : */
225 : RepOriginId
226 1842 : replorigin_by_name(const char *roname, bool missing_ok)
227 : {
228 : Form_pg_replication_origin ident;
229 1842 : Oid roident = InvalidOid;
230 : HeapTuple tuple;
231 : Datum roname_d;
232 :
233 1842 : roname_d = CStringGetTextDatum(roname);
234 :
235 1842 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 1842 : if (HeapTupleIsValid(tuple))
237 : {
238 1076 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
239 1076 : roident = ident->roident;
240 1076 : ReleaseSysCache(tuple);
241 : }
242 766 : else if (!missing_ok)
243 8 : ereport(ERROR,
244 : (errcode(ERRCODE_UNDEFINED_OBJECT),
245 : errmsg("replication origin \"%s\" does not exist",
246 : roname)));
247 :
248 1834 : return roident;
249 : }
250 :
251 : /*
252 : * Create a replication origin.
253 : *
254 : * Needs to be called in a transaction.
255 : */
256 : RepOriginId
257 720 : replorigin_create(const char *roname)
258 : {
259 : Oid roident;
260 720 : HeapTuple tuple = NULL;
261 : Relation rel;
262 : Datum roname_d;
263 : SnapshotData SnapshotDirty;
264 : SysScanDesc scan;
265 : ScanKeyData key;
266 :
267 : /*
268 : * To avoid needing a TOAST table for pg_replication_origin, we limit
269 : * replication origin names to 512 bytes. This should be more than enough
270 : * for all practical use.
271 : */
272 720 : if (strlen(roname) > MAX_RONAME_LEN)
273 6 : ereport(ERROR,
274 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 : errmsg("replication origin name is too long"),
276 : errdetail("Replication origin names must be no longer than %d bytes.",
277 : MAX_RONAME_LEN)));
278 :
279 714 : roname_d = CStringGetTextDatum(roname);
280 :
281 : Assert(IsTransactionState());
282 :
283 : /*
284 : * We need the numeric replication origin to be 16bit wide, so we cannot
285 : * rely on the normal oid allocation. Instead we simply scan
286 : * pg_replication_origin for the first unused id. That's not particularly
287 : * efficient, but this should be a fairly infrequent operation - we can
288 : * easily spend a bit more code on this when it turns out it needs to be
289 : * faster.
290 : *
291 : * We handle concurrency by taking an exclusive lock (allowing reads!)
292 : * over the table for the duration of the search. Because we use a "dirty
293 : * snapshot" we can read rows that other in-progress sessions have
294 : * written, even though they would be invisible with normal snapshots. Due
295 : * to the exclusive lock there's no danger that new rows can appear while
296 : * we're checking.
297 : */
298 714 : InitDirtySnapshot(SnapshotDirty);
299 :
300 714 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
301 :
302 : /*
303 : * We want to be able to access pg_replication_origin without setting up a
304 : * snapshot. To make that safe, it needs to not have a TOAST table, since
305 : * TOASTed data cannot be fetched without a snapshot. As of this writing,
306 : * its only varlena column is roname, which we limit to 512 bytes to avoid
307 : * needing out-of-line storage. If you add a TOAST table to this catalog,
308 : * be sure to set up a snapshot everywhere it might be needed. For more
309 : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
310 : */
311 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
312 :
313 1256 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
314 : {
315 : bool nulls[Natts_pg_replication_origin];
316 : Datum values[Natts_pg_replication_origin];
317 : bool collides;
318 :
319 1256 : CHECK_FOR_INTERRUPTS();
320 :
321 1256 : ScanKeyInit(&key,
322 : Anum_pg_replication_origin_roident,
323 : BTEqualStrategyNumber, F_OIDEQ,
324 : ObjectIdGetDatum(roident));
325 :
326 1256 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
327 : true /* indexOK */ ,
328 : &SnapshotDirty,
329 : 1, &key);
330 :
331 1256 : collides = HeapTupleIsValid(systable_getnext(scan));
332 :
333 1256 : systable_endscan(scan);
334 :
335 1256 : if (!collides)
336 : {
337 : /*
338 : * Ok, found an unused roident, insert the new row and do a CCI,
339 : * so our callers can look it up if they want to.
340 : */
341 714 : memset(&nulls, 0, sizeof(nulls));
342 :
343 714 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
344 714 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
345 :
346 714 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
347 714 : CatalogTupleInsert(rel, tuple);
348 712 : CommandCounterIncrement();
349 712 : break;
350 : }
351 : }
352 :
353 : /* now release lock again, */
354 712 : table_close(rel, ExclusiveLock);
355 :
356 712 : if (tuple == NULL)
357 0 : ereport(ERROR,
358 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 : errmsg("could not find free replication origin ID")));
360 :
361 712 : heap_freetuple(tuple);
362 712 : return roident;
363 : }
364 :
365 : /*
366 : * Helper function to drop a replication origin.
367 : */
368 : static void
369 582 : replorigin_state_clear(RepOriginId roident, bool nowait)
370 : {
371 : int i;
372 :
373 : /*
374 : * Clean up the slot state info, if there is any matching slot.
375 : */
376 582 : restart:
377 582 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
378 :
379 1768 : for (i = 0; i < max_active_replication_origins; i++)
380 : {
381 1694 : ReplicationState *state = &replication_states[i];
382 :
383 1694 : if (state->roident == roident)
384 : {
385 : /* found our slot, is it busy? */
386 508 : if (state->acquired_by != 0)
387 : {
388 : ConditionVariable *cv;
389 :
390 0 : if (nowait)
391 0 : ereport(ERROR,
392 : (errcode(ERRCODE_OBJECT_IN_USE),
393 : errmsg("could not drop replication origin with ID %d, in use by PID %d",
394 : state->roident,
395 : state->acquired_by)));
396 :
397 : /*
398 : * We must wait and then retry. Since we don't know which CV
399 : * to wait on until here, we can't readily use
400 : * ConditionVariablePrepareToSleep (calling it here would be
401 : * wrong, since we could miss the signal if we did so); just
402 : * use ConditionVariableSleep directly.
403 : */
404 0 : cv = &state->origin_cv;
405 :
406 0 : LWLockRelease(ReplicationOriginLock);
407 :
408 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
409 0 : goto restart;
410 : }
411 :
412 : /* first make a WAL log entry */
413 : {
414 : xl_replorigin_drop xlrec;
415 :
416 508 : xlrec.node_id = roident;
417 508 : XLogBeginInsert();
418 508 : XLogRegisterData(&xlrec, sizeof(xlrec));
419 508 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
420 : }
421 :
422 : /* then clear the in-memory slot */
423 508 : state->roident = InvalidRepOriginId;
424 508 : state->remote_lsn = InvalidXLogRecPtr;
425 508 : state->local_lsn = InvalidXLogRecPtr;
426 508 : break;
427 : }
428 : }
429 582 : LWLockRelease(ReplicationOriginLock);
430 582 : ConditionVariableCancelSleep();
431 582 : }
432 :
433 : /*
434 : * Drop replication origin (by name).
435 : *
436 : * Needs to be called in a transaction.
437 : */
438 : void
439 956 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
440 : {
441 : RepOriginId roident;
442 : Relation rel;
443 : HeapTuple tuple;
444 :
445 : Assert(IsTransactionState());
446 :
447 956 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
448 :
449 956 : roident = replorigin_by_name(name, missing_ok);
450 :
451 : /* Lock the origin to prevent concurrent drops. */
452 954 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
453 : AccessExclusiveLock);
454 :
455 954 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
456 954 : if (!HeapTupleIsValid(tuple))
457 : {
458 372 : if (!missing_ok)
459 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
460 : roident);
461 :
462 : /*
463 : * We don't need to retain the locks if the origin is already dropped.
464 : */
465 372 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
466 : AccessExclusiveLock);
467 372 : table_close(rel, RowExclusiveLock);
468 372 : return;
469 : }
470 :
471 582 : replorigin_state_clear(roident, nowait);
472 :
473 : /*
474 : * Now, we can delete the catalog entry.
475 : */
476 582 : CatalogTupleDelete(rel, &tuple->t_self);
477 582 : ReleaseSysCache(tuple);
478 :
479 582 : CommandCounterIncrement();
480 :
481 : /* We keep the lock on pg_replication_origin until commit */
482 582 : table_close(rel, NoLock);
483 : }
484 :
485 : /*
486 : * Lookup replication origin via its oid and return the name.
487 : *
488 : * The external name is palloc'd in the calling context.
489 : *
490 : * Returns true if the origin is known, false otherwise.
491 : */
492 : bool
493 54 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
494 : {
495 : HeapTuple tuple;
496 : Form_pg_replication_origin ric;
497 :
498 : Assert(OidIsValid((Oid) roident));
499 : Assert(roident != InvalidRepOriginId);
500 : Assert(roident != DoNotReplicateId);
501 :
502 54 : tuple = SearchSysCache1(REPLORIGIDENT,
503 : ObjectIdGetDatum((Oid) roident));
504 :
505 54 : if (HeapTupleIsValid(tuple))
506 : {
507 48 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
508 48 : *roname = text_to_cstring(&ric->roname);
509 48 : ReleaseSysCache(tuple);
510 :
511 48 : return true;
512 : }
513 : else
514 : {
515 6 : *roname = NULL;
516 :
517 6 : if (!missing_ok)
518 0 : ereport(ERROR,
519 : (errcode(ERRCODE_UNDEFINED_OBJECT),
520 : errmsg("replication origin with ID %d does not exist",
521 : roident)));
522 :
523 6 : return false;
524 : }
525 : }
526 :
527 :
528 : /* ---------------------------------------------------------------------------
529 : * Functions for handling replication progress.
530 : * ---------------------------------------------------------------------------
531 : */
532 :
533 : Size
534 8102 : ReplicationOriginShmemSize(void)
535 : {
536 8102 : Size size = 0;
537 :
538 8102 : if (max_active_replication_origins == 0)
539 4 : return size;
540 :
541 8098 : size = add_size(size, offsetof(ReplicationStateCtl, states));
542 :
543 8098 : size = add_size(size,
544 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
545 8098 : return size;
546 : }
547 :
548 : void
549 2100 : ReplicationOriginShmemInit(void)
550 : {
551 : bool found;
552 :
553 2100 : if (max_active_replication_origins == 0)
554 2 : return;
555 :
556 2098 : replication_states_ctl = (ReplicationStateCtl *)
557 2098 : ShmemInitStruct("ReplicationOriginState",
558 : ReplicationOriginShmemSize(),
559 : &found);
560 2098 : replication_states = replication_states_ctl->states;
561 :
562 2098 : if (!found)
563 : {
564 : int i;
565 :
566 150930 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
567 :
568 2098 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
569 :
570 23060 : for (i = 0; i < max_active_replication_origins; i++)
571 : {
572 20962 : LWLockInitialize(&replication_states[i].lock,
573 20962 : replication_states_ctl->tranche_id);
574 20962 : ConditionVariableInit(&replication_states[i].origin_cv);
575 : }
576 : }
577 : }
578 :
579 : /* ---------------------------------------------------------------------------
580 : * Perform a checkpoint of each replication origin's progress with respect to
581 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
582 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
583 : * if the transactions were originally committed asynchronously.
584 : *
585 : * We store checkpoints in the following format:
586 : * +-------+------------------------+------------------+-----+--------+
587 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
588 : * +-------+------------------------+------------------+-----+--------+
589 : *
590 : * So its just the magic, followed by the statically sized
591 : * ReplicationStateOnDisk structs. Note that the maximum number of
592 : * ReplicationState is determined by max_active_replication_origins.
593 : * ---------------------------------------------------------------------------
594 : */
595 : void
596 3318 : CheckPointReplicationOrigin(void)
597 : {
598 3318 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
599 3318 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
600 : int tmpfd;
601 : int i;
602 3318 : uint32 magic = REPLICATION_STATE_MAGIC;
603 : pg_crc32c crc;
604 :
605 3318 : if (max_active_replication_origins == 0)
606 2 : return;
607 :
608 3316 : INIT_CRC32C(crc);
609 :
610 : /* make sure no old temp file is remaining */
611 3316 : if (unlink(tmppath) < 0 && errno != ENOENT)
612 0 : ereport(PANIC,
613 : (errcode_for_file_access(),
614 : errmsg("could not remove file \"%s\": %m",
615 : tmppath)));
616 :
617 : /*
618 : * no other backend can perform this at the same time; only one checkpoint
619 : * can happen at a time.
620 : */
621 3316 : tmpfd = OpenTransientFile(tmppath,
622 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
623 3316 : if (tmpfd < 0)
624 0 : ereport(PANIC,
625 : (errcode_for_file_access(),
626 : errmsg("could not create file \"%s\": %m",
627 : tmppath)));
628 :
629 : /* write magic */
630 3316 : errno = 0;
631 3316 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632 : {
633 : /* if write didn't set errno, assume problem is no disk space */
634 0 : if (errno == 0)
635 0 : errno = ENOSPC;
636 0 : ereport(PANIC,
637 : (errcode_for_file_access(),
638 : errmsg("could not write to file \"%s\": %m",
639 : tmppath)));
640 : }
641 3316 : COMP_CRC32C(crc, &magic, sizeof(magic));
642 :
643 : /* prevent concurrent creations/drops */
644 3316 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
645 :
646 : /* write actual data */
647 36476 : for (i = 0; i < max_active_replication_origins; i++)
648 : {
649 : ReplicationStateOnDisk disk_state;
650 33160 : ReplicationState *curstate = &replication_states[i];
651 : XLogRecPtr local_lsn;
652 :
653 33160 : if (curstate->roident == InvalidRepOriginId)
654 33068 : continue;
655 :
656 : /* zero, to avoid uninitialized padding bytes */
657 92 : memset(&disk_state, 0, sizeof(disk_state));
658 :
659 92 : LWLockAcquire(&curstate->lock, LW_SHARED);
660 :
661 92 : disk_state.roident = curstate->roident;
662 :
663 92 : disk_state.remote_lsn = curstate->remote_lsn;
664 92 : local_lsn = curstate->local_lsn;
665 :
666 92 : LWLockRelease(&curstate->lock);
667 :
668 : /* make sure we only write out a commit that's persistent */
669 92 : XLogFlush(local_lsn);
670 :
671 92 : errno = 0;
672 92 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673 : sizeof(disk_state))
674 : {
675 : /* if write didn't set errno, assume problem is no disk space */
676 0 : if (errno == 0)
677 0 : errno = ENOSPC;
678 0 : ereport(PANIC,
679 : (errcode_for_file_access(),
680 : errmsg("could not write to file \"%s\": %m",
681 : tmppath)));
682 : }
683 :
684 92 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
685 : }
686 :
687 3316 : LWLockRelease(ReplicationOriginLock);
688 :
689 : /* write out the CRC */
690 3316 : FIN_CRC32C(crc);
691 3316 : errno = 0;
692 3316 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693 : {
694 : /* if write didn't set errno, assume problem is no disk space */
695 0 : if (errno == 0)
696 0 : errno = ENOSPC;
697 0 : ereport(PANIC,
698 : (errcode_for_file_access(),
699 : errmsg("could not write to file \"%s\": %m",
700 : tmppath)));
701 : }
702 :
703 3316 : if (CloseTransientFile(tmpfd) != 0)
704 0 : ereport(PANIC,
705 : (errcode_for_file_access(),
706 : errmsg("could not close file \"%s\": %m",
707 : tmppath)));
708 :
709 : /* fsync, rename to permanent file, fsync file and directory */
710 3316 : durable_rename(tmppath, path, PANIC);
711 : }
712 :
713 : /*
714 : * Recover replication replay status from checkpoint data saved earlier by
715 : * CheckPointReplicationOrigin.
716 : *
717 : * This only needs to be called at startup and *not* during every checkpoint
718 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
719 : * state thereafter can be recovered by looking at commit records.
720 : */
721 : void
722 1812 : StartupReplicationOrigin(void)
723 : {
724 1812 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
725 : int fd;
726 : int readBytes;
727 1812 : uint32 magic = REPLICATION_STATE_MAGIC;
728 1812 : int last_state = 0;
729 : pg_crc32c file_crc;
730 : pg_crc32c crc;
731 :
732 : /* don't want to overwrite already existing state */
733 : #ifdef USE_ASSERT_CHECKING
734 : static bool already_started = false;
735 :
736 : Assert(!already_started);
737 : already_started = true;
738 : #endif
739 :
740 1812 : if (max_active_replication_origins == 0)
741 100 : return;
742 :
743 1810 : INIT_CRC32C(crc);
744 :
745 1810 : elog(DEBUG2, "starting up replication origin progress state");
746 :
747 1810 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
748 :
749 : /*
750 : * might have had max_active_replication_origins == 0 last run, or we just
751 : * brought up a standby.
752 : */
753 1810 : if (fd < 0 && errno == ENOENT)
754 98 : return;
755 1712 : else if (fd < 0)
756 0 : ereport(PANIC,
757 : (errcode_for_file_access(),
758 : errmsg("could not open file \"%s\": %m",
759 : path)));
760 :
761 : /* verify magic, that is written even if nothing was active */
762 1712 : readBytes = read(fd, &magic, sizeof(magic));
763 1712 : if (readBytes != sizeof(magic))
764 : {
765 0 : if (readBytes < 0)
766 0 : ereport(PANIC,
767 : (errcode_for_file_access(),
768 : errmsg("could not read file \"%s\": %m",
769 : path)));
770 : else
771 0 : ereport(PANIC,
772 : (errcode(ERRCODE_DATA_CORRUPTED),
773 : errmsg("could not read file \"%s\": read %d of %zu",
774 : path, readBytes, sizeof(magic))));
775 : }
776 1712 : COMP_CRC32C(crc, &magic, sizeof(magic));
777 :
778 1712 : if (magic != REPLICATION_STATE_MAGIC)
779 0 : ereport(PANIC,
780 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
781 : magic, REPLICATION_STATE_MAGIC)));
782 :
783 : /* we can skip locking here, no other access is possible */
784 :
785 : /* recover individual states, until there are no more to be found */
786 : while (true)
787 42 : {
788 : ReplicationStateOnDisk disk_state;
789 :
790 1754 : readBytes = read(fd, &disk_state, sizeof(disk_state));
791 :
792 : /* no further data */
793 1754 : if (readBytes == sizeof(crc))
794 : {
795 : /* not pretty, but simple ... */
796 1712 : file_crc = *(pg_crc32c *) &disk_state;
797 1712 : break;
798 : }
799 :
800 42 : if (readBytes < 0)
801 : {
802 0 : ereport(PANIC,
803 : (errcode_for_file_access(),
804 : errmsg("could not read file \"%s\": %m",
805 : path)));
806 : }
807 :
808 42 : if (readBytes != sizeof(disk_state))
809 : {
810 0 : ereport(PANIC,
811 : (errcode_for_file_access(),
812 : errmsg("could not read file \"%s\": read %d of %zu",
813 : path, readBytes, sizeof(disk_state))));
814 : }
815 :
816 42 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
817 :
818 42 : if (last_state == max_active_replication_origins)
819 0 : ereport(PANIC,
820 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
821 : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
822 :
823 : /* copy data to shared memory */
824 42 : replication_states[last_state].roident = disk_state.roident;
825 42 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
826 42 : last_state++;
827 :
828 42 : ereport(LOG,
829 : (errmsg("recovered replication state of node %d to %X/%X",
830 : disk_state.roident,
831 : LSN_FORMAT_ARGS(disk_state.remote_lsn))));
832 : }
833 :
834 : /* now check checksum */
835 1712 : FIN_CRC32C(crc);
836 1712 : if (file_crc != crc)
837 0 : ereport(PANIC,
838 : (errcode(ERRCODE_DATA_CORRUPTED),
839 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
840 : crc, file_crc)));
841 :
842 1712 : if (CloseTransientFile(fd) != 0)
843 0 : ereport(PANIC,
844 : (errcode_for_file_access(),
845 : errmsg("could not close file \"%s\": %m",
846 : path)));
847 : }
848 :
849 : void
850 8 : replorigin_redo(XLogReaderState *record)
851 : {
852 8 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
853 :
854 8 : switch (info)
855 : {
856 4 : case XLOG_REPLORIGIN_SET:
857 : {
858 4 : xl_replorigin_set *xlrec =
859 4 : (xl_replorigin_set *) XLogRecGetData(record);
860 :
861 4 : replorigin_advance(xlrec->node_id,
862 : xlrec->remote_lsn, record->EndRecPtr,
863 4 : xlrec->force /* backward */ ,
864 : false /* WAL log */ );
865 4 : break;
866 : }
867 4 : case XLOG_REPLORIGIN_DROP:
868 : {
869 : xl_replorigin_drop *xlrec;
870 : int i;
871 :
872 4 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
873 :
874 6 : for (i = 0; i < max_active_replication_origins; i++)
875 : {
876 6 : ReplicationState *state = &replication_states[i];
877 :
878 : /* found our slot */
879 6 : if (state->roident == xlrec->node_id)
880 : {
881 : /* reset entry */
882 4 : state->roident = InvalidRepOriginId;
883 4 : state->remote_lsn = InvalidXLogRecPtr;
884 4 : state->local_lsn = InvalidXLogRecPtr;
885 4 : break;
886 : }
887 : }
888 4 : break;
889 : }
890 0 : default:
891 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
892 : }
893 8 : }
894 :
895 :
896 : /*
897 : * Tell the replication origin progress machinery that a commit from 'node'
898 : * that originated at the LSN remote_commit on the remote node was replayed
899 : * successfully and that we don't need to do so again. In combination with
900 : * setting up replorigin_session_origin_lsn and replorigin_session_origin
901 : * that ensures we won't lose knowledge about that after a crash if the
902 : * transaction had a persistent effect (think of asynchronous commits).
903 : *
904 : * local_commit needs to be a local LSN of the commit so that we can make sure
905 : * upon a checkpoint that enough WAL has been persisted to disk.
906 : *
907 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
908 : * unless running in recovery.
909 : */
910 : void
911 472 : replorigin_advance(RepOriginId node,
912 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
913 : bool go_backward, bool wal_log)
914 : {
915 : int i;
916 472 : ReplicationState *replication_state = NULL;
917 472 : ReplicationState *free_state = NULL;
918 :
919 : Assert(node != InvalidRepOriginId);
920 :
921 : /* we don't track DoNotReplicateId */
922 472 : if (node == DoNotReplicateId)
923 0 : return;
924 :
925 : /*
926 : * XXX: For the case where this is called by WAL replay, it'd be more
927 : * efficient to restore into a backend local hashtable and only dump into
928 : * shmem after recovery is finished. Let's wait with implementing that
929 : * till it's shown to be a measurable expense
930 : */
931 :
932 : /* Lock exclusively, as we may have to create a new table entry. */
933 472 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
934 :
935 : /*
936 : * Search for either an existing slot for the origin, or a free one we can
937 : * use.
938 : */
939 4314 : for (i = 0; i < max_active_replication_origins; i++)
940 : {
941 3932 : ReplicationState *curstate = &replication_states[i];
942 :
943 : /* remember where to insert if necessary */
944 3932 : if (curstate->roident == InvalidRepOriginId &&
945 : free_state == NULL)
946 : {
947 382 : free_state = curstate;
948 382 : continue;
949 : }
950 :
951 : /* not our slot */
952 3550 : if (curstate->roident != node)
953 : {
954 3460 : continue;
955 : }
956 :
957 : /* ok, found slot */
958 90 : replication_state = curstate;
959 :
960 90 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
961 :
962 : /* Make sure it's not used by somebody else */
963 90 : if (replication_state->acquired_by != 0)
964 : {
965 0 : ereport(ERROR,
966 : (errcode(ERRCODE_OBJECT_IN_USE),
967 : errmsg("replication origin with ID %d is already active for PID %d",
968 : replication_state->roident,
969 : replication_state->acquired_by)));
970 : }
971 :
972 90 : break;
973 : }
974 :
975 472 : if (replication_state == NULL && free_state == NULL)
976 0 : ereport(ERROR,
977 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
978 : errmsg("could not find free replication state slot for replication origin with ID %d",
979 : node),
980 : errhint("Increase \"max_active_replication_origins\" and try again.")));
981 :
982 472 : if (replication_state == NULL)
983 : {
984 : /* initialize new slot */
985 382 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
986 382 : replication_state = free_state;
987 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
988 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
989 382 : replication_state->roident = node;
990 : }
991 :
992 : Assert(replication_state->roident != InvalidRepOriginId);
993 :
994 : /*
995 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
996 : * and the standby gets the message. Primarily this will be called during
997 : * WAL replay (of commit records) where no WAL logging is necessary.
998 : */
999 472 : if (wal_log)
1000 : {
1001 : xl_replorigin_set xlrec;
1002 :
1003 390 : xlrec.remote_lsn = remote_commit;
1004 390 : xlrec.node_id = node;
1005 390 : xlrec.force = go_backward;
1006 :
1007 390 : XLogBeginInsert();
1008 390 : XLogRegisterData(&xlrec, sizeof(xlrec));
1009 :
1010 390 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1011 : }
1012 :
1013 : /*
1014 : * Due to - harmless - race conditions during a checkpoint we could see
1015 : * values here that are older than the ones we already have in memory. We
1016 : * could also see older values for prepared transactions when the prepare
1017 : * is sent at a later point of time along with commit prepared and there
1018 : * are other transactions commits between prepare and commit prepared. See
1019 : * ReorderBufferFinishPrepared. Don't overwrite those.
1020 : */
1021 472 : if (go_backward || replication_state->remote_lsn < remote_commit)
1022 458 : replication_state->remote_lsn = remote_commit;
1023 472 : if (local_commit != InvalidXLogRecPtr &&
1024 76 : (go_backward || replication_state->local_lsn < local_commit))
1025 80 : replication_state->local_lsn = local_commit;
1026 472 : LWLockRelease(&replication_state->lock);
1027 :
1028 : /*
1029 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1030 : * otherwise be dropped anytime.
1031 : */
1032 472 : LWLockRelease(ReplicationOriginLock);
1033 : }
1034 :
1035 :
1036 : XLogRecPtr
1037 16 : replorigin_get_progress(RepOriginId node, bool flush)
1038 : {
1039 : int i;
1040 16 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1041 16 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1042 :
1043 : /* prevent slots from being concurrently dropped */
1044 16 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1045 :
1046 76 : for (i = 0; i < max_active_replication_origins; i++)
1047 : {
1048 : ReplicationState *state;
1049 :
1050 70 : state = &replication_states[i];
1051 :
1052 70 : if (state->roident == node)
1053 : {
1054 10 : LWLockAcquire(&state->lock, LW_SHARED);
1055 :
1056 10 : remote_lsn = state->remote_lsn;
1057 10 : local_lsn = state->local_lsn;
1058 :
1059 10 : LWLockRelease(&state->lock);
1060 :
1061 10 : break;
1062 : }
1063 : }
1064 :
1065 16 : LWLockRelease(ReplicationOriginLock);
1066 :
1067 16 : if (flush && local_lsn != InvalidXLogRecPtr)
1068 2 : XLogFlush(local_lsn);
1069 :
1070 16 : return remote_lsn;
1071 : }
1072 :
1073 : /*
1074 : * Tear down a (possibly) configured session replication origin during process
1075 : * exit.
1076 : */
1077 : static void
1078 852 : ReplicationOriginExitCleanup(int code, Datum arg)
1079 : {
1080 852 : ConditionVariable *cv = NULL;
1081 :
1082 852 : if (session_replication_state == NULL)
1083 366 : return;
1084 :
1085 486 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1086 :
1087 486 : if (session_replication_state->acquired_by == MyProcPid)
1088 : {
1089 466 : cv = &session_replication_state->origin_cv;
1090 :
1091 466 : session_replication_state->acquired_by = 0;
1092 466 : session_replication_state = NULL;
1093 : }
1094 :
1095 486 : LWLockRelease(ReplicationOriginLock);
1096 :
1097 486 : if (cv)
1098 466 : ConditionVariableBroadcast(cv);
1099 : }
1100 :
1101 : /*
1102 : * Setup a replication origin in the shared memory struct if it doesn't
1103 : * already exist and cache access to the specific ReplicationSlot so the
1104 : * array doesn't have to be searched when calling
1105 : * replorigin_session_advance().
1106 : *
1107 : * Normally only one such cached origin can exist per process so the cached
1108 : * value can only be set again after the previous value is torn down with
1109 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1110 : * (meaning the slot is not allowed to be already acquired by another process).
1111 : *
1112 : * However, sometimes multiple processes can safely re-use the same origin slot
1113 : * (for example, multiple parallel apply processes can safely use the same
1114 : * origin, provided they maintain commit order by allowing only one process to
1115 : * commit at a time). For this case the first process must pass acquired_by =
1116 : * 0, and then the other processes sharing that same origin can pass
1117 : * acquired_by = PID of the first process.
1118 : */
1119 : void
1120 858 : replorigin_session_setup(RepOriginId node, int acquired_by)
1121 : {
1122 : static bool registered_cleanup;
1123 : int i;
1124 858 : int free_slot = -1;
1125 :
1126 858 : if (!registered_cleanup)
1127 : {
1128 852 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1129 852 : registered_cleanup = true;
1130 : }
1131 :
1132 : Assert(max_active_replication_origins > 0);
1133 :
1134 858 : if (session_replication_state != NULL)
1135 2 : ereport(ERROR,
1136 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1137 : errmsg("cannot setup replication origin when one is already setup")));
1138 :
1139 : /* Lock exclusively, as we may have to create a new table entry. */
1140 856 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1141 :
1142 : /*
1143 : * Search for either an existing slot for the origin, or a free one we can
1144 : * use.
1145 : */
1146 3612 : for (i = 0; i < max_active_replication_origins; i++)
1147 : {
1148 3386 : ReplicationState *curstate = &replication_states[i];
1149 :
1150 : /* remember where to insert if necessary */
1151 3386 : if (curstate->roident == InvalidRepOriginId &&
1152 : free_slot == -1)
1153 : {
1154 230 : free_slot = i;
1155 230 : continue;
1156 : }
1157 :
1158 : /* not our slot */
1159 3156 : if (curstate->roident != node)
1160 2526 : continue;
1161 :
1162 630 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1163 : {
1164 0 : ereport(ERROR,
1165 : (errcode(ERRCODE_OBJECT_IN_USE),
1166 : errmsg("replication origin with ID %d is already active for PID %d",
1167 : curstate->roident, curstate->acquired_by)));
1168 : }
1169 :
1170 : /* ok, found slot */
1171 630 : session_replication_state = curstate;
1172 630 : break;
1173 : }
1174 :
1175 :
1176 856 : if (session_replication_state == NULL && free_slot == -1)
1177 0 : ereport(ERROR,
1178 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1179 : errmsg("could not find free replication state slot for replication origin with ID %d",
1180 : node),
1181 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1182 856 : else if (session_replication_state == NULL)
1183 : {
1184 : /* initialize new slot */
1185 226 : session_replication_state = &replication_states[free_slot];
1186 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1187 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1188 226 : session_replication_state->roident = node;
1189 : }
1190 :
1191 :
1192 : Assert(session_replication_state->roident != InvalidRepOriginId);
1193 :
1194 856 : if (acquired_by == 0)
1195 836 : session_replication_state->acquired_by = MyProcPid;
1196 20 : else if (session_replication_state->acquired_by != acquired_by)
1197 0 : elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1198 : node, acquired_by);
1199 :
1200 856 : LWLockRelease(ReplicationOriginLock);
1201 :
1202 : /* probably this one is pointless */
1203 856 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1204 856 : }
1205 :
1206 : /*
1207 : * Reset replay state previously setup in this session.
1208 : *
1209 : * This function may only be called if an origin was setup with
1210 : * replorigin_session_setup().
1211 : */
1212 : void
1213 372 : replorigin_session_reset(void)
1214 : {
1215 : ConditionVariable *cv;
1216 :
1217 : Assert(max_active_replication_origins != 0);
1218 :
1219 372 : if (session_replication_state == NULL)
1220 2 : ereport(ERROR,
1221 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1222 : errmsg("no replication origin is configured")));
1223 :
1224 370 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1225 :
1226 370 : session_replication_state->acquired_by = 0;
1227 370 : cv = &session_replication_state->origin_cv;
1228 370 : session_replication_state = NULL;
1229 :
1230 370 : LWLockRelease(ReplicationOriginLock);
1231 :
1232 370 : ConditionVariableBroadcast(cv);
1233 370 : }
1234 :
1235 : /*
1236 : * Do the same work replorigin_advance() does, just on the session's
1237 : * configured origin.
1238 : *
1239 : * This is noticeably cheaper than using replorigin_advance().
1240 : */
1241 : void
1242 2152 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1243 : {
1244 : Assert(session_replication_state != NULL);
1245 : Assert(session_replication_state->roident != InvalidRepOriginId);
1246 :
1247 2152 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1248 2152 : if (session_replication_state->local_lsn < local_commit)
1249 2152 : session_replication_state->local_lsn = local_commit;
1250 2152 : if (session_replication_state->remote_lsn < remote_commit)
1251 1018 : session_replication_state->remote_lsn = remote_commit;
1252 2152 : LWLockRelease(&session_replication_state->lock);
1253 2152 : }
1254 :
1255 : /*
1256 : * Ask the machinery about the point up to which we successfully replayed
1257 : * changes from an already setup replication origin.
1258 : */
1259 : XLogRecPtr
1260 446 : replorigin_session_get_progress(bool flush)
1261 : {
1262 : XLogRecPtr remote_lsn;
1263 : XLogRecPtr local_lsn;
1264 :
1265 : Assert(session_replication_state != NULL);
1266 :
1267 446 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1268 446 : remote_lsn = session_replication_state->remote_lsn;
1269 446 : local_lsn = session_replication_state->local_lsn;
1270 446 : LWLockRelease(&session_replication_state->lock);
1271 :
1272 446 : if (flush && local_lsn != InvalidXLogRecPtr)
1273 2 : XLogFlush(local_lsn);
1274 :
1275 446 : return remote_lsn;
1276 : }
1277 :
1278 :
1279 :
1280 : /* ---------------------------------------------------------------------------
1281 : * SQL functions for working with replication origin.
1282 : *
1283 : * These mostly should be fairly short wrappers around more generic functions.
1284 : * ---------------------------------------------------------------------------
1285 : */
1286 :
1287 : /*
1288 : * Create replication origin for the passed in name, and return the assigned
1289 : * oid.
1290 : */
1291 : Datum
1292 24 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1293 : {
1294 : char *name;
1295 : RepOriginId roident;
1296 :
1297 24 : replorigin_check_prerequisites(false, false);
1298 :
1299 24 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1300 :
1301 : /*
1302 : * Replication origins "any and "none" are reserved for system options.
1303 : * The origins "pg_xxx" are reserved for internal use.
1304 : */
1305 24 : if (IsReservedName(name) || IsReservedOriginName(name))
1306 6 : ereport(ERROR,
1307 : (errcode(ERRCODE_RESERVED_NAME),
1308 : errmsg("replication origin name \"%s\" is reserved",
1309 : name),
1310 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1311 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1312 :
1313 : /*
1314 : * If built with appropriate switch, whine when regression-testing
1315 : * conventions for replication origin names are violated.
1316 : */
1317 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1318 : if (strncmp(name, "regress_", 8) != 0)
1319 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1320 : #endif
1321 :
1322 18 : roident = replorigin_create(name);
1323 :
1324 10 : pfree(name);
1325 :
1326 10 : PG_RETURN_OID(roident);
1327 : }
1328 :
1329 : /*
1330 : * Drop replication origin.
1331 : */
1332 : Datum
1333 14 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1334 : {
1335 : char *name;
1336 :
1337 14 : replorigin_check_prerequisites(false, false);
1338 :
1339 14 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1340 :
1341 14 : replorigin_drop_by_name(name, false, true);
1342 :
1343 12 : pfree(name);
1344 :
1345 12 : PG_RETURN_VOID();
1346 : }
1347 :
1348 : /*
1349 : * Return oid of a replication origin.
1350 : */
1351 : Datum
1352 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1353 : {
1354 : char *name;
1355 : RepOriginId roident;
1356 :
1357 0 : replorigin_check_prerequisites(false, false);
1358 :
1359 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1360 0 : roident = replorigin_by_name(name, true);
1361 :
1362 0 : pfree(name);
1363 :
1364 0 : if (OidIsValid(roident))
1365 0 : PG_RETURN_OID(roident);
1366 0 : PG_RETURN_NULL();
1367 : }
1368 :
1369 : /*
1370 : * Setup a replication origin for this session.
1371 : */
1372 : Datum
1373 12 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1374 : {
1375 : char *name;
1376 : RepOriginId origin;
1377 :
1378 12 : replorigin_check_prerequisites(true, false);
1379 :
1380 12 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1381 12 : origin = replorigin_by_name(name, false);
1382 10 : replorigin_session_setup(origin, 0);
1383 :
1384 8 : replorigin_session_origin = origin;
1385 :
1386 8 : pfree(name);
1387 :
1388 8 : PG_RETURN_VOID();
1389 : }
1390 :
1391 : /*
1392 : * Reset previously setup origin in this session
1393 : */
1394 : Datum
1395 10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1396 : {
1397 10 : replorigin_check_prerequisites(true, false);
1398 :
1399 10 : replorigin_session_reset();
1400 :
1401 8 : replorigin_session_origin = InvalidRepOriginId;
1402 8 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1403 8 : replorigin_session_origin_timestamp = 0;
1404 :
1405 8 : PG_RETURN_VOID();
1406 : }
1407 :
1408 : /*
1409 : * Has a replication origin been setup for this session.
1410 : */
1411 : Datum
1412 0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1413 : {
1414 0 : replorigin_check_prerequisites(false, false);
1415 :
1416 0 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1417 : }
1418 :
1419 :
1420 : /*
1421 : * Return the replication progress for origin setup in the current session.
1422 : *
1423 : * If 'flush' is set to true it is ensured that the returned value corresponds
1424 : * to a local transaction that has been flushed. This is useful if asynchronous
1425 : * commits are used when replaying replicated transactions.
1426 : */
1427 : Datum
1428 4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1429 : {
1430 4 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1431 4 : bool flush = PG_GETARG_BOOL(0);
1432 :
1433 4 : replorigin_check_prerequisites(true, false);
1434 :
1435 4 : if (session_replication_state == NULL)
1436 0 : ereport(ERROR,
1437 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1438 : errmsg("no replication origin is configured")));
1439 :
1440 4 : remote_lsn = replorigin_session_get_progress(flush);
1441 :
1442 4 : if (remote_lsn == InvalidXLogRecPtr)
1443 0 : PG_RETURN_NULL();
1444 :
1445 4 : PG_RETURN_LSN(remote_lsn);
1446 : }
1447 :
1448 : Datum
1449 2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1450 : {
1451 2 : XLogRecPtr location = PG_GETARG_LSN(0);
1452 :
1453 2 : replorigin_check_prerequisites(true, false);
1454 :
1455 2 : if (session_replication_state == NULL)
1456 0 : ereport(ERROR,
1457 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1458 : errmsg("no replication origin is configured")));
1459 :
1460 2 : replorigin_session_origin_lsn = location;
1461 2 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1462 :
1463 2 : PG_RETURN_VOID();
1464 : }
1465 :
1466 : Datum
1467 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1468 : {
1469 0 : replorigin_check_prerequisites(true, false);
1470 :
1471 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1472 0 : replorigin_session_origin_timestamp = 0;
1473 :
1474 0 : PG_RETURN_VOID();
1475 : }
1476 :
1477 :
1478 : Datum
1479 6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1480 : {
1481 6 : text *name = PG_GETARG_TEXT_PP(0);
1482 6 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1483 : RepOriginId node;
1484 :
1485 6 : replorigin_check_prerequisites(true, false);
1486 :
1487 : /* lock to prevent the replication origin from vanishing */
1488 6 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1489 :
1490 6 : node = replorigin_by_name(text_to_cstring(name), false);
1491 :
1492 : /*
1493 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1494 : * xact hasn't committed yet. This is why this function should be used to
1495 : * set up the initial replication state, but not for replay.
1496 : */
1497 4 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1498 : true /* go backward */ , true /* WAL log */ );
1499 :
1500 4 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1501 :
1502 4 : PG_RETURN_VOID();
1503 : }
1504 :
1505 :
1506 : /*
1507 : * Return the replication progress for an individual replication origin.
1508 : *
1509 : * If 'flush' is set to true it is ensured that the returned value corresponds
1510 : * to a local transaction that has been flushed. This is useful if asynchronous
1511 : * commits are used when replaying replicated transactions.
1512 : */
1513 : Datum
1514 6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1515 : {
1516 : char *name;
1517 : bool flush;
1518 : RepOriginId roident;
1519 6 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1520 :
1521 6 : replorigin_check_prerequisites(true, true);
1522 :
1523 6 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1524 6 : flush = PG_GETARG_BOOL(1);
1525 :
1526 6 : roident = replorigin_by_name(name, false);
1527 : Assert(OidIsValid(roident));
1528 :
1529 4 : remote_lsn = replorigin_get_progress(roident, flush);
1530 :
1531 4 : if (remote_lsn == InvalidXLogRecPtr)
1532 0 : PG_RETURN_NULL();
1533 :
1534 4 : PG_RETURN_LSN(remote_lsn);
1535 : }
1536 :
1537 :
1538 : Datum
1539 16 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1540 : {
1541 16 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1542 : int i;
1543 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1544 :
1545 : /* we want to return 0 rows if slot is set to zero */
1546 16 : replorigin_check_prerequisites(false, true);
1547 :
1548 16 : InitMaterializedSRF(fcinfo, 0);
1549 :
1550 : /* prevent slots from being concurrently dropped */
1551 16 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1552 :
1553 : /*
1554 : * Iterate through all possible replication_states, display if they are
1555 : * filled. Note that we do not take any locks, so slightly corrupted/out
1556 : * of date values are a possibility.
1557 : */
1558 176 : for (i = 0; i < max_active_replication_origins; i++)
1559 : {
1560 : ReplicationState *state;
1561 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1562 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1563 : char *roname;
1564 :
1565 160 : state = &replication_states[i];
1566 :
1567 : /* unused slot, nothing to display */
1568 160 : if (state->roident == InvalidRepOriginId)
1569 138 : continue;
1570 :
1571 22 : memset(values, 0, sizeof(values));
1572 22 : memset(nulls, 1, sizeof(nulls));
1573 :
1574 22 : values[0] = ObjectIdGetDatum(state->roident);
1575 22 : nulls[0] = false;
1576 :
1577 : /*
1578 : * We're not preventing the origin to be dropped concurrently, so
1579 : * silently accept that it might be gone.
1580 : */
1581 22 : if (replorigin_by_oid(state->roident, true,
1582 : &roname))
1583 : {
1584 22 : values[1] = CStringGetTextDatum(roname);
1585 22 : nulls[1] = false;
1586 : }
1587 :
1588 22 : LWLockAcquire(&state->lock, LW_SHARED);
1589 :
1590 22 : values[2] = LSNGetDatum(state->remote_lsn);
1591 22 : nulls[2] = false;
1592 :
1593 22 : values[3] = LSNGetDatum(state->local_lsn);
1594 22 : nulls[3] = false;
1595 :
1596 22 : LWLockRelease(&state->lock);
1597 :
1598 22 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1599 : values, nulls);
1600 : }
1601 :
1602 16 : LWLockRelease(ReplicationOriginLock);
1603 :
1604 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1605 :
1606 16 : return (Datum) 0;
1607 : }
|