Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consist out of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "utils/builtins.h"
92 : #include "utils/fmgroids.h"
93 : #include "utils/guc.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 :
99 : /* paths for replication origin checkpoint files */
100 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 :
103 : /* GUC variables */
104 : int max_active_replication_origins = 10;
105 :
106 : /*
107 : * Replay progress of a single remote node.
108 : */
109 : typedef struct ReplicationState
110 : {
111 : /*
112 : * Local identifier for the remote node.
113 : */
114 : RepOriginId roident;
115 :
116 : /*
117 : * Location of the latest commit from the remote side.
118 : */
119 : XLogRecPtr remote_lsn;
120 :
121 : /*
122 : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : * during a checkpoint so we know the commit record actually is safe on
124 : * disk.
125 : */
126 : XLogRecPtr local_lsn;
127 :
128 : /*
129 : * PID of backend that's acquired slot, or 0 if none.
130 : */
131 : int acquired_by;
132 :
133 : /*
134 : * Condition variable that's signaled when acquired_by changes.
135 : */
136 : ConditionVariable origin_cv;
137 :
138 : /*
139 : * Lock protecting remote_lsn and local_lsn.
140 : */
141 : LWLock lock;
142 : } ReplicationState;
143 :
144 : /*
145 : * On disk version of ReplicationState.
146 : */
147 : typedef struct ReplicationStateOnDisk
148 : {
149 : RepOriginId roident;
150 : XLogRecPtr remote_lsn;
151 : } ReplicationStateOnDisk;
152 :
153 :
154 : typedef struct ReplicationStateCtl
155 : {
156 : /* Tranche to use for per-origin LWLocks */
157 : int tranche_id;
158 : /* Array of length max_active_replication_origins */
159 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
160 : } ReplicationStateCtl;
161 :
162 : /* external variables */
163 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
164 : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
165 : TimestampTz replorigin_session_origin_timestamp = 0;
166 :
167 : /*
168 : * Base address into a shared memory array of replication states of size
169 : * max_active_replication_origins.
170 : */
171 : static ReplicationState *replication_states;
172 :
173 : /*
174 : * Actual shared memory block (replication_states[] is now part of this).
175 : */
176 : static ReplicationStateCtl *replication_states_ctl;
177 :
178 : /*
179 : * We keep a pointer to this backend's ReplicationState to avoid having to
180 : * search the replication_states array in replorigin_session_advance for each
181 : * remote commit. (Ownership of a backend's own entry can only be changed by
182 : * that backend.)
183 : */
184 : static ReplicationState *session_replication_state = NULL;
185 :
186 : /* Magic for on disk files. */
187 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188 :
189 : static void
190 116 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
191 : {
192 116 : if (check_origins && max_active_replication_origins == 0)
193 0 : ereport(ERROR,
194 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196 :
197 116 : if (!recoveryOK && RecoveryInProgress())
198 0 : ereport(ERROR,
199 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 : errmsg("cannot manipulate replication origins during recovery")));
201 116 : }
202 :
203 :
204 : /*
205 : * IsReservedOriginName
206 : * True iff name is either "none" or "any".
207 : */
208 : static bool
209 24 : IsReservedOriginName(const char *name)
210 : {
211 46 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212 22 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213 : }
214 :
215 : /* ---------------------------------------------------------------------------
216 : * Functions for working with replication origins themselves.
217 : * ---------------------------------------------------------------------------
218 : */
219 :
220 : /*
221 : * Check for a persistent replication origin identified by name.
222 : *
223 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
224 : */
225 : RepOriginId
226 1944 : replorigin_by_name(const char *roname, bool missing_ok)
227 : {
228 : Form_pg_replication_origin ident;
229 1944 : Oid roident = InvalidOid;
230 : HeapTuple tuple;
231 : Datum roname_d;
232 :
233 1944 : roname_d = CStringGetTextDatum(roname);
234 :
235 1944 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 1944 : if (HeapTupleIsValid(tuple))
237 : {
238 1172 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
239 1172 : roident = ident->roident;
240 1172 : ReleaseSysCache(tuple);
241 : }
242 772 : else if (!missing_ok)
243 8 : ereport(ERROR,
244 : (errcode(ERRCODE_UNDEFINED_OBJECT),
245 : errmsg("replication origin \"%s\" does not exist",
246 : roname)));
247 :
248 1936 : return roident;
249 : }
250 :
251 : /*
252 : * Create a replication origin.
253 : *
254 : * Needs to be called in a transaction.
255 : */
256 : RepOriginId
257 746 : replorigin_create(const char *roname)
258 : {
259 : Oid roident;
260 746 : HeapTuple tuple = NULL;
261 : Relation rel;
262 : Datum roname_d;
263 : SnapshotData SnapshotDirty;
264 : SysScanDesc scan;
265 : ScanKeyData key;
266 :
267 : /*
268 : * To avoid needing a TOAST table for pg_replication_origin, we limit
269 : * replication origin names to 512 bytes. This should be more than enough
270 : * for all practical use.
271 : */
272 746 : if (strlen(roname) > MAX_RONAME_LEN)
273 6 : ereport(ERROR,
274 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 : errmsg("replication origin name is too long"),
276 : errdetail("Replication origin names must be no longer than %d bytes.",
277 : MAX_RONAME_LEN)));
278 :
279 740 : roname_d = CStringGetTextDatum(roname);
280 :
281 : Assert(IsTransactionState());
282 :
283 : /*
284 : * We need the numeric replication origin to be 16bit wide, so we cannot
285 : * rely on the normal oid allocation. Instead we simply scan
286 : * pg_replication_origin for the first unused id. That's not particularly
287 : * efficient, but this should be a fairly infrequent operation - we can
288 : * easily spend a bit more code on this when it turns out it needs to be
289 : * faster.
290 : *
291 : * We handle concurrency by taking an exclusive lock (allowing reads!)
292 : * over the table for the duration of the search. Because we use a "dirty
293 : * snapshot" we can read rows that other in-progress sessions have
294 : * written, even though they would be invisible with normal snapshots. Due
295 : * to the exclusive lock there's no danger that new rows can appear while
296 : * we're checking.
297 : */
298 740 : InitDirtySnapshot(SnapshotDirty);
299 :
300 740 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
301 :
302 : /*
303 : * We want to be able to access pg_replication_origin without setting up a
304 : * snapshot. To make that safe, it needs to not have a TOAST table, since
305 : * TOASTed data cannot be fetched without a snapshot. As of this writing,
306 : * its only varlena column is roname, which we limit to 512 bytes to avoid
307 : * needing out-of-line storage. If you add a TOAST table to this catalog,
308 : * be sure to set up a snapshot everywhere it might be needed. For more
309 : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
310 : */
311 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
312 :
313 1336 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
314 : {
315 : bool nulls[Natts_pg_replication_origin];
316 : Datum values[Natts_pg_replication_origin];
317 : bool collides;
318 :
319 1336 : CHECK_FOR_INTERRUPTS();
320 :
321 1336 : ScanKeyInit(&key,
322 : Anum_pg_replication_origin_roident,
323 : BTEqualStrategyNumber, F_OIDEQ,
324 : ObjectIdGetDatum(roident));
325 :
326 1336 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
327 : true /* indexOK */ ,
328 : &SnapshotDirty,
329 : 1, &key);
330 :
331 1336 : collides = HeapTupleIsValid(systable_getnext(scan));
332 :
333 1336 : systable_endscan(scan);
334 :
335 1336 : if (!collides)
336 : {
337 : /*
338 : * Ok, found an unused roident, insert the new row and do a CCI,
339 : * so our callers can look it up if they want to.
340 : */
341 740 : memset(&nulls, 0, sizeof(nulls));
342 :
343 740 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
344 740 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
345 :
346 740 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
347 740 : CatalogTupleInsert(rel, tuple);
348 738 : CommandCounterIncrement();
349 738 : break;
350 : }
351 : }
352 :
353 : /* now release lock again, */
354 738 : table_close(rel, ExclusiveLock);
355 :
356 738 : if (tuple == NULL)
357 0 : ereport(ERROR,
358 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 : errmsg("could not find free replication origin ID")));
360 :
361 738 : heap_freetuple(tuple);
362 738 : return roident;
363 : }
364 :
365 : /*
366 : * Helper function to drop a replication origin.
367 : */
368 : static void
369 610 : replorigin_state_clear(RepOriginId roident, bool nowait)
370 : {
371 : int i;
372 :
373 : /*
374 : * Clean up the slot state info, if there is any matching slot.
375 : */
376 610 : restart:
377 610 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
378 :
379 2002 : for (i = 0; i < max_active_replication_origins; i++)
380 : {
381 1912 : ReplicationState *state = &replication_states[i];
382 :
383 1912 : if (state->roident == roident)
384 : {
385 : /* found our slot, is it busy? */
386 520 : if (state->acquired_by != 0)
387 : {
388 : ConditionVariable *cv;
389 :
390 0 : if (nowait)
391 0 : ereport(ERROR,
392 : (errcode(ERRCODE_OBJECT_IN_USE),
393 : errmsg("could not drop replication origin with ID %d, in use by PID %d",
394 : state->roident,
395 : state->acquired_by)));
396 :
397 : /*
398 : * We must wait and then retry. Since we don't know which CV
399 : * to wait on until here, we can't readily use
400 : * ConditionVariablePrepareToSleep (calling it here would be
401 : * wrong, since we could miss the signal if we did so); just
402 : * use ConditionVariableSleep directly.
403 : */
404 0 : cv = &state->origin_cv;
405 :
406 0 : LWLockRelease(ReplicationOriginLock);
407 :
408 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
409 0 : goto restart;
410 : }
411 :
412 : /* first make a WAL log entry */
413 : {
414 : xl_replorigin_drop xlrec;
415 :
416 520 : xlrec.node_id = roident;
417 520 : XLogBeginInsert();
418 520 : XLogRegisterData(&xlrec, sizeof(xlrec));
419 520 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
420 : }
421 :
422 : /* then clear the in-memory slot */
423 520 : state->roident = InvalidRepOriginId;
424 520 : state->remote_lsn = InvalidXLogRecPtr;
425 520 : state->local_lsn = InvalidXLogRecPtr;
426 520 : break;
427 : }
428 : }
429 610 : LWLockRelease(ReplicationOriginLock);
430 610 : ConditionVariableCancelSleep();
431 610 : }
432 :
433 : /*
434 : * Drop replication origin (by name).
435 : *
436 : * Needs to be called in a transaction.
437 : */
438 : void
439 986 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
440 : {
441 : RepOriginId roident;
442 : Relation rel;
443 : HeapTuple tuple;
444 :
445 : Assert(IsTransactionState());
446 :
447 986 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
448 :
449 986 : roident = replorigin_by_name(name, missing_ok);
450 :
451 : /* Lock the origin to prevent concurrent drops. */
452 984 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
453 : AccessExclusiveLock);
454 :
455 984 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
456 984 : if (!HeapTupleIsValid(tuple))
457 : {
458 374 : if (!missing_ok)
459 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
460 : roident);
461 :
462 : /*
463 : * We don't need to retain the locks if the origin is already dropped.
464 : */
465 374 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
466 : AccessExclusiveLock);
467 374 : table_close(rel, RowExclusiveLock);
468 374 : return;
469 : }
470 :
471 610 : replorigin_state_clear(roident, nowait);
472 :
473 : /*
474 : * Now, we can delete the catalog entry.
475 : */
476 610 : CatalogTupleDelete(rel, &tuple->t_self);
477 610 : ReleaseSysCache(tuple);
478 :
479 610 : CommandCounterIncrement();
480 :
481 : /* We keep the lock on pg_replication_origin until commit */
482 610 : table_close(rel, NoLock);
483 : }
484 :
485 : /*
486 : * Lookup replication origin via its oid and return the name.
487 : *
488 : * The external name is palloc'd in the calling context.
489 : *
490 : * Returns true if the origin is known, false otherwise.
491 : */
492 : bool
493 54 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
494 : {
495 : HeapTuple tuple;
496 : Form_pg_replication_origin ric;
497 :
498 : Assert(OidIsValid((Oid) roident));
499 : Assert(roident != InvalidRepOriginId);
500 : Assert(roident != DoNotReplicateId);
501 :
502 54 : tuple = SearchSysCache1(REPLORIGIDENT,
503 : ObjectIdGetDatum((Oid) roident));
504 :
505 54 : if (HeapTupleIsValid(tuple))
506 : {
507 48 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
508 48 : *roname = text_to_cstring(&ric->roname);
509 48 : ReleaseSysCache(tuple);
510 :
511 48 : return true;
512 : }
513 : else
514 : {
515 6 : *roname = NULL;
516 :
517 6 : if (!missing_ok)
518 0 : ereport(ERROR,
519 : (errcode(ERRCODE_UNDEFINED_OBJECT),
520 : errmsg("replication origin with ID %d does not exist",
521 : roident)));
522 :
523 6 : return false;
524 : }
525 : }
526 :
527 :
528 : /* ---------------------------------------------------------------------------
529 : * Functions for handling replication progress.
530 : * ---------------------------------------------------------------------------
531 : */
532 :
533 : Size
534 8416 : ReplicationOriginShmemSize(void)
535 : {
536 8416 : Size size = 0;
537 :
538 8416 : if (max_active_replication_origins == 0)
539 4 : return size;
540 :
541 8412 : size = add_size(size, offsetof(ReplicationStateCtl, states));
542 :
543 8412 : size = add_size(size,
544 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
545 8412 : return size;
546 : }
547 :
548 : void
549 2180 : ReplicationOriginShmemInit(void)
550 : {
551 : bool found;
552 :
553 2180 : if (max_active_replication_origins == 0)
554 2 : return;
555 :
556 2178 : replication_states_ctl = (ReplicationStateCtl *)
557 2178 : ShmemInitStruct("ReplicationOriginState",
558 : ReplicationOriginShmemSize(),
559 : &found);
560 2178 : replication_states = replication_states_ctl->states;
561 :
562 2178 : if (!found)
563 : {
564 : int i;
565 :
566 156690 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
567 :
568 2178 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
569 :
570 23940 : for (i = 0; i < max_active_replication_origins; i++)
571 : {
572 21762 : LWLockInitialize(&replication_states[i].lock,
573 21762 : replication_states_ctl->tranche_id);
574 21762 : ConditionVariableInit(&replication_states[i].origin_cv);
575 : }
576 : }
577 : }
578 :
579 : /* ---------------------------------------------------------------------------
580 : * Perform a checkpoint of each replication origin's progress with respect to
581 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
582 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
583 : * if the transactions were originally committed asynchronously.
584 : *
585 : * We store checkpoints in the following format:
586 : * +-------+------------------------+------------------+-----+--------+
587 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
588 : * +-------+------------------------+------------------+-----+--------+
589 : *
590 : * So its just the magic, followed by the statically sized
591 : * ReplicationStateOnDisk structs. Note that the maximum number of
592 : * ReplicationState is determined by max_active_replication_origins.
593 : * ---------------------------------------------------------------------------
594 : */
595 : void
596 3430 : CheckPointReplicationOrigin(void)
597 : {
598 3430 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
599 3430 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
600 : int tmpfd;
601 : int i;
602 3430 : uint32 magic = REPLICATION_STATE_MAGIC;
603 : pg_crc32c crc;
604 :
605 3430 : if (max_active_replication_origins == 0)
606 2 : return;
607 :
608 3428 : INIT_CRC32C(crc);
609 :
610 : /* make sure no old temp file is remaining */
611 3428 : if (unlink(tmppath) < 0 && errno != ENOENT)
612 0 : ereport(PANIC,
613 : (errcode_for_file_access(),
614 : errmsg("could not remove file \"%s\": %m",
615 : tmppath)));
616 :
617 : /*
618 : * no other backend can perform this at the same time; only one checkpoint
619 : * can happen at a time.
620 : */
621 3428 : tmpfd = OpenTransientFile(tmppath,
622 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
623 3428 : if (tmpfd < 0)
624 0 : ereport(PANIC,
625 : (errcode_for_file_access(),
626 : errmsg("could not create file \"%s\": %m",
627 : tmppath)));
628 :
629 : /* write magic */
630 3428 : errno = 0;
631 3428 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632 : {
633 : /* if write didn't set errno, assume problem is no disk space */
634 0 : if (errno == 0)
635 0 : errno = ENOSPC;
636 0 : ereport(PANIC,
637 : (errcode_for_file_access(),
638 : errmsg("could not write to file \"%s\": %m",
639 : tmppath)));
640 : }
641 3428 : COMP_CRC32C(crc, &magic, sizeof(magic));
642 :
643 : /* prevent concurrent creations/drops */
644 3428 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
645 :
646 : /* write actual data */
647 37708 : for (i = 0; i < max_active_replication_origins; i++)
648 : {
649 : ReplicationStateOnDisk disk_state;
650 34280 : ReplicationState *curstate = &replication_states[i];
651 : XLogRecPtr local_lsn;
652 :
653 34280 : if (curstate->roident == InvalidRepOriginId)
654 34180 : continue;
655 :
656 : /* zero, to avoid uninitialized padding bytes */
657 100 : memset(&disk_state, 0, sizeof(disk_state));
658 :
659 100 : LWLockAcquire(&curstate->lock, LW_SHARED);
660 :
661 100 : disk_state.roident = curstate->roident;
662 :
663 100 : disk_state.remote_lsn = curstate->remote_lsn;
664 100 : local_lsn = curstate->local_lsn;
665 :
666 100 : LWLockRelease(&curstate->lock);
667 :
668 : /* make sure we only write out a commit that's persistent */
669 100 : XLogFlush(local_lsn);
670 :
671 100 : errno = 0;
672 100 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673 : sizeof(disk_state))
674 : {
675 : /* if write didn't set errno, assume problem is no disk space */
676 0 : if (errno == 0)
677 0 : errno = ENOSPC;
678 0 : ereport(PANIC,
679 : (errcode_for_file_access(),
680 : errmsg("could not write to file \"%s\": %m",
681 : tmppath)));
682 : }
683 :
684 100 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
685 : }
686 :
687 3428 : LWLockRelease(ReplicationOriginLock);
688 :
689 : /* write out the CRC */
690 3428 : FIN_CRC32C(crc);
691 3428 : errno = 0;
692 3428 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693 : {
694 : /* if write didn't set errno, assume problem is no disk space */
695 0 : if (errno == 0)
696 0 : errno = ENOSPC;
697 0 : ereport(PANIC,
698 : (errcode_for_file_access(),
699 : errmsg("could not write to file \"%s\": %m",
700 : tmppath)));
701 : }
702 :
703 3428 : if (CloseTransientFile(tmpfd) != 0)
704 0 : ereport(PANIC,
705 : (errcode_for_file_access(),
706 : errmsg("could not close file \"%s\": %m",
707 : tmppath)));
708 :
709 : /* fsync, rename to permanent file, fsync file and directory */
710 3428 : durable_rename(tmppath, path, PANIC);
711 : }
712 :
713 : /*
714 : * Recover replication replay status from checkpoint data saved earlier by
715 : * CheckPointReplicationOrigin.
716 : *
717 : * This only needs to be called at startup and *not* during every checkpoint
718 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
719 : * state thereafter can be recovered by looking at commit records.
720 : */
721 : void
722 1894 : StartupReplicationOrigin(void)
723 : {
724 1894 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
725 : int fd;
726 : int readBytes;
727 1894 : uint32 magic = REPLICATION_STATE_MAGIC;
728 1894 : int last_state = 0;
729 : pg_crc32c file_crc;
730 : pg_crc32c crc;
731 :
732 : /* don't want to overwrite already existing state */
733 : #ifdef USE_ASSERT_CHECKING
734 : static bool already_started = false;
735 :
736 : Assert(!already_started);
737 : already_started = true;
738 : #endif
739 :
740 1894 : if (max_active_replication_origins == 0)
741 102 : return;
742 :
743 1892 : INIT_CRC32C(crc);
744 :
745 1892 : elog(DEBUG2, "starting up replication origin progress state");
746 :
747 1892 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
748 :
749 : /*
750 : * might have had max_active_replication_origins == 0 last run, or we just
751 : * brought up a standby.
752 : */
753 1892 : if (fd < 0 && errno == ENOENT)
754 100 : return;
755 1792 : else if (fd < 0)
756 0 : ereport(PANIC,
757 : (errcode_for_file_access(),
758 : errmsg("could not open file \"%s\": %m",
759 : path)));
760 :
761 : /* verify magic, that is written even if nothing was active */
762 1792 : readBytes = read(fd, &magic, sizeof(magic));
763 1792 : if (readBytes != sizeof(magic))
764 : {
765 0 : if (readBytes < 0)
766 0 : ereport(PANIC,
767 : (errcode_for_file_access(),
768 : errmsg("could not read file \"%s\": %m",
769 : path)));
770 : else
771 0 : ereport(PANIC,
772 : (errcode(ERRCODE_DATA_CORRUPTED),
773 : errmsg("could not read file \"%s\": read %d of %zu",
774 : path, readBytes, sizeof(magic))));
775 : }
776 1792 : COMP_CRC32C(crc, &magic, sizeof(magic));
777 :
778 1792 : if (magic != REPLICATION_STATE_MAGIC)
779 0 : ereport(PANIC,
780 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
781 : magic, REPLICATION_STATE_MAGIC)));
782 :
783 : /* we can skip locking here, no other access is possible */
784 :
785 : /* recover individual states, until there are no more to be found */
786 : while (true)
787 50 : {
788 : ReplicationStateOnDisk disk_state;
789 :
790 1842 : readBytes = read(fd, &disk_state, sizeof(disk_state));
791 :
792 : /* no further data */
793 1842 : if (readBytes == sizeof(crc))
794 : {
795 : /* not pretty, but simple ... */
796 1792 : file_crc = *(pg_crc32c *) &disk_state;
797 1792 : break;
798 : }
799 :
800 50 : if (readBytes < 0)
801 : {
802 0 : ereport(PANIC,
803 : (errcode_for_file_access(),
804 : errmsg("could not read file \"%s\": %m",
805 : path)));
806 : }
807 :
808 50 : if (readBytes != sizeof(disk_state))
809 : {
810 0 : ereport(PANIC,
811 : (errcode_for_file_access(),
812 : errmsg("could not read file \"%s\": read %d of %zu",
813 : path, readBytes, sizeof(disk_state))));
814 : }
815 :
816 50 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
817 :
818 50 : if (last_state == max_active_replication_origins)
819 0 : ereport(PANIC,
820 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
821 : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
822 :
823 : /* copy data to shared memory */
824 50 : replication_states[last_state].roident = disk_state.roident;
825 50 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
826 50 : last_state++;
827 :
828 50 : ereport(LOG,
829 : errmsg("recovered replication state of node %d to %X/%08X",
830 : disk_state.roident,
831 : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
832 : }
833 :
834 : /* now check checksum */
835 1792 : FIN_CRC32C(crc);
836 1792 : if (file_crc != crc)
837 0 : ereport(PANIC,
838 : (errcode(ERRCODE_DATA_CORRUPTED),
839 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
840 : crc, file_crc)));
841 :
842 1792 : if (CloseTransientFile(fd) != 0)
843 0 : ereport(PANIC,
844 : (errcode_for_file_access(),
845 : errmsg("could not close file \"%s\": %m",
846 : path)));
847 : }
848 :
849 : void
850 8 : replorigin_redo(XLogReaderState *record)
851 : {
852 8 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
853 :
854 8 : switch (info)
855 : {
856 4 : case XLOG_REPLORIGIN_SET:
857 : {
858 4 : xl_replorigin_set *xlrec =
859 4 : (xl_replorigin_set *) XLogRecGetData(record);
860 :
861 4 : replorigin_advance(xlrec->node_id,
862 : xlrec->remote_lsn, record->EndRecPtr,
863 4 : xlrec->force /* backward */ ,
864 : false /* WAL log */ );
865 4 : break;
866 : }
867 4 : case XLOG_REPLORIGIN_DROP:
868 : {
869 : xl_replorigin_drop *xlrec;
870 : int i;
871 :
872 4 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
873 :
874 6 : for (i = 0; i < max_active_replication_origins; i++)
875 : {
876 6 : ReplicationState *state = &replication_states[i];
877 :
878 : /* found our slot */
879 6 : if (state->roident == xlrec->node_id)
880 : {
881 : /* reset entry */
882 4 : state->roident = InvalidRepOriginId;
883 4 : state->remote_lsn = InvalidXLogRecPtr;
884 4 : state->local_lsn = InvalidXLogRecPtr;
885 4 : break;
886 : }
887 : }
888 4 : break;
889 : }
890 0 : default:
891 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
892 : }
893 8 : }
894 :
895 :
896 : /*
897 : * Tell the replication origin progress machinery that a commit from 'node'
898 : * that originated at the LSN remote_commit on the remote node was replayed
899 : * successfully and that we don't need to do so again. In combination with
900 : * setting up replorigin_session_origin_lsn and replorigin_session_origin
901 : * that ensures we won't lose knowledge about that after a crash if the
902 : * transaction had a persistent effect (think of asynchronous commits).
903 : *
904 : * local_commit needs to be a local LSN of the commit so that we can make sure
905 : * upon a checkpoint that enough WAL has been persisted to disk.
906 : *
907 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
908 : * unless running in recovery.
909 : */
910 : void
911 476 : replorigin_advance(RepOriginId node,
912 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
913 : bool go_backward, bool wal_log)
914 : {
915 : int i;
916 476 : ReplicationState *replication_state = NULL;
917 476 : ReplicationState *free_state = NULL;
918 :
919 : Assert(node != InvalidRepOriginId);
920 :
921 : /* we don't track DoNotReplicateId */
922 476 : if (node == DoNotReplicateId)
923 0 : return;
924 :
925 : /*
926 : * XXX: For the case where this is called by WAL replay, it'd be more
927 : * efficient to restore into a backend local hashtable and only dump into
928 : * shmem after recovery is finished. Let's wait with implementing that
929 : * till it's shown to be a measurable expense
930 : */
931 :
932 : /* Lock exclusively, as we may have to create a new table entry. */
933 476 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
934 :
935 : /*
936 : * Search for either an existing slot for the origin, or a free one we can
937 : * use.
938 : */
939 4376 : for (i = 0; i < max_active_replication_origins; i++)
940 : {
941 3988 : ReplicationState *curstate = &replication_states[i];
942 :
943 : /* remember where to insert if necessary */
944 3988 : if (curstate->roident == InvalidRepOriginId &&
945 : free_state == NULL)
946 : {
947 388 : free_state = curstate;
948 388 : continue;
949 : }
950 :
951 : /* not our slot */
952 3600 : if (curstate->roident != node)
953 : {
954 3512 : continue;
955 : }
956 :
957 : /* ok, found slot */
958 88 : replication_state = curstate;
959 :
960 88 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
961 :
962 : /* Make sure it's not used by somebody else */
963 88 : if (replication_state->acquired_by != 0)
964 : {
965 0 : ereport(ERROR,
966 : (errcode(ERRCODE_OBJECT_IN_USE),
967 : errmsg("replication origin with ID %d is already active for PID %d",
968 : replication_state->roident,
969 : replication_state->acquired_by)));
970 : }
971 :
972 88 : break;
973 : }
974 :
975 476 : if (replication_state == NULL && free_state == NULL)
976 0 : ereport(ERROR,
977 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
978 : errmsg("could not find free replication state slot for replication origin with ID %d",
979 : node),
980 : errhint("Increase \"max_active_replication_origins\" and try again.")));
981 :
982 476 : if (replication_state == NULL)
983 : {
984 : /* initialize new slot */
985 388 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
986 388 : replication_state = free_state;
987 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
988 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
989 388 : replication_state->roident = node;
990 : }
991 :
992 : Assert(replication_state->roident != InvalidRepOriginId);
993 :
994 : /*
995 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
996 : * and the standby gets the message. Primarily this will be called during
997 : * WAL replay (of commit records) where no WAL logging is necessary.
998 : */
999 476 : if (wal_log)
1000 : {
1001 : xl_replorigin_set xlrec;
1002 :
1003 394 : xlrec.remote_lsn = remote_commit;
1004 394 : xlrec.node_id = node;
1005 394 : xlrec.force = go_backward;
1006 :
1007 394 : XLogBeginInsert();
1008 394 : XLogRegisterData(&xlrec, sizeof(xlrec));
1009 :
1010 394 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1011 : }
1012 :
1013 : /*
1014 : * Due to - harmless - race conditions during a checkpoint we could see
1015 : * values here that are older than the ones we already have in memory. We
1016 : * could also see older values for prepared transactions when the prepare
1017 : * is sent at a later point of time along with commit prepared and there
1018 : * are other transactions commits between prepare and commit prepared. See
1019 : * ReorderBufferFinishPrepared. Don't overwrite those.
1020 : */
1021 476 : if (go_backward || replication_state->remote_lsn < remote_commit)
1022 462 : replication_state->remote_lsn = remote_commit;
1023 476 : if (local_commit != InvalidXLogRecPtr &&
1024 76 : (go_backward || replication_state->local_lsn < local_commit))
1025 80 : replication_state->local_lsn = local_commit;
1026 476 : LWLockRelease(&replication_state->lock);
1027 :
1028 : /*
1029 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1030 : * otherwise be dropped anytime.
1031 : */
1032 476 : LWLockRelease(ReplicationOriginLock);
1033 : }
1034 :
1035 :
1036 : XLogRecPtr
1037 16 : replorigin_get_progress(RepOriginId node, bool flush)
1038 : {
1039 : int i;
1040 16 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1041 16 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1042 :
1043 : /* prevent slots from being concurrently dropped */
1044 16 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1045 :
1046 76 : for (i = 0; i < max_active_replication_origins; i++)
1047 : {
1048 : ReplicationState *state;
1049 :
1050 70 : state = &replication_states[i];
1051 :
1052 70 : if (state->roident == node)
1053 : {
1054 10 : LWLockAcquire(&state->lock, LW_SHARED);
1055 :
1056 10 : remote_lsn = state->remote_lsn;
1057 10 : local_lsn = state->local_lsn;
1058 :
1059 10 : LWLockRelease(&state->lock);
1060 :
1061 10 : break;
1062 : }
1063 : }
1064 :
1065 16 : LWLockRelease(ReplicationOriginLock);
1066 :
1067 16 : if (flush && local_lsn != InvalidXLogRecPtr)
1068 2 : XLogFlush(local_lsn);
1069 :
1070 16 : return remote_lsn;
1071 : }
1072 :
1073 : /*
1074 : * Tear down a (possibly) configured session replication origin during process
1075 : * exit.
1076 : */
1077 : static void
1078 922 : ReplicationOriginExitCleanup(int code, Datum arg)
1079 : {
1080 922 : ConditionVariable *cv = NULL;
1081 :
1082 922 : if (session_replication_state == NULL)
1083 376 : return;
1084 :
1085 546 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1086 :
1087 546 : if (session_replication_state->acquired_by == MyProcPid)
1088 : {
1089 526 : cv = &session_replication_state->origin_cv;
1090 :
1091 526 : session_replication_state->acquired_by = 0;
1092 526 : session_replication_state = NULL;
1093 : }
1094 :
1095 546 : LWLockRelease(ReplicationOriginLock);
1096 :
1097 546 : if (cv)
1098 526 : ConditionVariableBroadcast(cv);
1099 : }
1100 :
1101 : /*
1102 : * Setup a replication origin in the shared memory struct if it doesn't
1103 : * already exist and cache access to the specific ReplicationSlot so the
1104 : * array doesn't have to be searched when calling
1105 : * replorigin_session_advance().
1106 : *
1107 : * Normally only one such cached origin can exist per process so the cached
1108 : * value can only be set again after the previous value is torn down with
1109 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1110 : * (meaning the slot is not allowed to be already acquired by another process).
1111 : *
1112 : * However, sometimes multiple processes can safely re-use the same origin slot
1113 : * (for example, multiple parallel apply processes can safely use the same
1114 : * origin, provided they maintain commit order by allowing only one process to
1115 : * commit at a time). For this case the first process must pass acquired_by =
1116 : * 0, and then the other processes sharing that same origin can pass
1117 : * acquired_by = PID of the first process.
1118 : */
1119 : void
1120 930 : replorigin_session_setup(RepOriginId node, int acquired_by)
1121 : {
1122 : static bool registered_cleanup;
1123 : int i;
1124 930 : int free_slot = -1;
1125 :
1126 930 : if (!registered_cleanup)
1127 : {
1128 922 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1129 922 : registered_cleanup = true;
1130 : }
1131 :
1132 : Assert(max_active_replication_origins > 0);
1133 :
1134 930 : if (session_replication_state != NULL)
1135 2 : ereport(ERROR,
1136 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1137 : errmsg("cannot setup replication origin when one is already setup")));
1138 :
1139 : /* Lock exclusively, as we may have to create a new table entry. */
1140 928 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1141 :
1142 : /*
1143 : * Search for either an existing slot for the origin, or a free one we can
1144 : * use.
1145 : */
1146 3818 : for (i = 0; i < max_active_replication_origins; i++)
1147 : {
1148 3584 : ReplicationState *curstate = &replication_states[i];
1149 :
1150 : /* remember where to insert if necessary */
1151 3584 : if (curstate->roident == InvalidRepOriginId &&
1152 : free_slot == -1)
1153 : {
1154 240 : free_slot = i;
1155 240 : continue;
1156 : }
1157 :
1158 : /* not our slot */
1159 3344 : if (curstate->roident != node)
1160 2650 : continue;
1161 :
1162 694 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1163 : {
1164 0 : ereport(ERROR,
1165 : (errcode(ERRCODE_OBJECT_IN_USE),
1166 : errmsg("replication origin with ID %d is already active for PID %d",
1167 : curstate->roident, curstate->acquired_by)));
1168 : }
1169 :
1170 694 : else if (curstate->acquired_by != acquired_by)
1171 : {
1172 0 : ereport(ERROR,
1173 : (errcode(ERRCODE_OBJECT_IN_USE),
1174 : errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175 : node, acquired_by)));
1176 : }
1177 :
1178 : /* ok, found slot */
1179 694 : session_replication_state = curstate;
1180 694 : break;
1181 : }
1182 :
1183 :
1184 928 : if (session_replication_state == NULL && free_slot == -1)
1185 0 : ereport(ERROR,
1186 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1187 : errmsg("could not find free replication state slot for replication origin with ID %d",
1188 : node),
1189 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1190 928 : else if (session_replication_state == NULL)
1191 : {
1192 234 : if (acquired_by)
1193 2 : ereport(ERROR,
1194 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1195 : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1196 : acquired_by, node)));
1197 :
1198 : /* initialize new slot */
1199 232 : session_replication_state = &replication_states[free_slot];
1200 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1201 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1202 232 : session_replication_state->roident = node;
1203 : }
1204 :
1205 :
1206 : Assert(session_replication_state->roident != InvalidRepOriginId);
1207 :
1208 926 : if (acquired_by == 0)
1209 904 : session_replication_state->acquired_by = MyProcPid;
1210 : else
1211 : Assert(session_replication_state->acquired_by == acquired_by);
1212 :
1213 926 : LWLockRelease(ReplicationOriginLock);
1214 :
1215 : /* probably this one is pointless */
1216 926 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1217 926 : }
1218 :
1219 : /*
1220 : * Reset replay state previously setup in this session.
1221 : *
1222 : * This function may only be called if an origin was setup with
1223 : * replorigin_session_setup().
1224 : */
1225 : void
1226 382 : replorigin_session_reset(void)
1227 : {
1228 : ConditionVariable *cv;
1229 :
1230 : Assert(max_active_replication_origins != 0);
1231 :
1232 382 : if (session_replication_state == NULL)
1233 2 : ereport(ERROR,
1234 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1235 : errmsg("no replication origin is configured")));
1236 :
1237 380 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1238 :
1239 380 : session_replication_state->acquired_by = 0;
1240 380 : cv = &session_replication_state->origin_cv;
1241 380 : session_replication_state = NULL;
1242 :
1243 380 : LWLockRelease(ReplicationOriginLock);
1244 :
1245 380 : ConditionVariableBroadcast(cv);
1246 380 : }
1247 :
1248 : /*
1249 : * Do the same work replorigin_advance() does, just on the session's
1250 : * configured origin.
1251 : *
1252 : * This is noticeably cheaper than using replorigin_advance().
1253 : */
1254 : void
1255 2184 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1256 : {
1257 : Assert(session_replication_state != NULL);
1258 : Assert(session_replication_state->roident != InvalidRepOriginId);
1259 :
1260 2184 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1261 2184 : if (session_replication_state->local_lsn < local_commit)
1262 2184 : session_replication_state->local_lsn = local_commit;
1263 2184 : if (session_replication_state->remote_lsn < remote_commit)
1264 1022 : session_replication_state->remote_lsn = remote_commit;
1265 2184 : LWLockRelease(&session_replication_state->lock);
1266 2184 : }
1267 :
1268 : /*
1269 : * Ask the machinery about the point up to which we successfully replayed
1270 : * changes from an already setup replication origin.
1271 : */
1272 : XLogRecPtr
1273 508 : replorigin_session_get_progress(bool flush)
1274 : {
1275 : XLogRecPtr remote_lsn;
1276 : XLogRecPtr local_lsn;
1277 :
1278 : Assert(session_replication_state != NULL);
1279 :
1280 508 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1281 508 : remote_lsn = session_replication_state->remote_lsn;
1282 508 : local_lsn = session_replication_state->local_lsn;
1283 508 : LWLockRelease(&session_replication_state->lock);
1284 :
1285 508 : if (flush && local_lsn != InvalidXLogRecPtr)
1286 2 : XLogFlush(local_lsn);
1287 :
1288 508 : return remote_lsn;
1289 : }
1290 :
1291 :
1292 :
1293 : /* ---------------------------------------------------------------------------
1294 : * SQL functions for working with replication origin.
1295 : *
1296 : * These mostly should be fairly short wrappers around more generic functions.
1297 : * ---------------------------------------------------------------------------
1298 : */
1299 :
1300 : /*
1301 : * Create replication origin for the passed in name, and return the assigned
1302 : * oid.
1303 : */
1304 : Datum
1305 26 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1306 : {
1307 : char *name;
1308 : RepOriginId roident;
1309 :
1310 26 : replorigin_check_prerequisites(false, false);
1311 :
1312 26 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1313 :
1314 : /*
1315 : * Replication origins "any and "none" are reserved for system options.
1316 : * The origins "pg_xxx" are reserved for internal use.
1317 : */
1318 26 : if (IsReservedName(name) || IsReservedOriginName(name))
1319 6 : ereport(ERROR,
1320 : (errcode(ERRCODE_RESERVED_NAME),
1321 : errmsg("replication origin name \"%s\" is reserved",
1322 : name),
1323 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1324 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1325 :
1326 : /*
1327 : * If built with appropriate switch, whine when regression-testing
1328 : * conventions for replication origin names are violated.
1329 : */
1330 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1331 : if (strncmp(name, "regress_", 8) != 0)
1332 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1333 : #endif
1334 :
1335 20 : roident = replorigin_create(name);
1336 :
1337 12 : pfree(name);
1338 :
1339 12 : PG_RETURN_OID(roident);
1340 : }
1341 :
1342 : /*
1343 : * Drop replication origin.
1344 : */
1345 : Datum
1346 16 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1347 : {
1348 : char *name;
1349 :
1350 16 : replorigin_check_prerequisites(false, false);
1351 :
1352 16 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1353 :
1354 16 : replorigin_drop_by_name(name, false, true);
1355 :
1356 14 : pfree(name);
1357 :
1358 14 : PG_RETURN_VOID();
1359 : }
1360 :
1361 : /*
1362 : * Return oid of a replication origin.
1363 : */
1364 : Datum
1365 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1366 : {
1367 : char *name;
1368 : RepOriginId roident;
1369 :
1370 0 : replorigin_check_prerequisites(false, false);
1371 :
1372 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1373 0 : roident = replorigin_by_name(name, true);
1374 :
1375 0 : pfree(name);
1376 :
1377 0 : if (OidIsValid(roident))
1378 0 : PG_RETURN_OID(roident);
1379 0 : PG_RETURN_NULL();
1380 : }
1381 :
1382 : /*
1383 : * Setup a replication origin for this session.
1384 : */
1385 : Datum
1386 18 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1387 : {
1388 : char *name;
1389 : RepOriginId origin;
1390 : int pid;
1391 :
1392 18 : replorigin_check_prerequisites(true, false);
1393 :
1394 18 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1395 18 : origin = replorigin_by_name(name, false);
1396 16 : pid = PG_GETARG_INT32(1);
1397 16 : replorigin_session_setup(origin, pid);
1398 :
1399 12 : replorigin_session_origin = origin;
1400 :
1401 12 : pfree(name);
1402 :
1403 12 : PG_RETURN_VOID();
1404 : }
1405 :
1406 : /*
1407 : * Reset previously setup origin in this session
1408 : */
1409 : Datum
1410 14 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1411 : {
1412 14 : replorigin_check_prerequisites(true, false);
1413 :
1414 14 : replorigin_session_reset();
1415 :
1416 12 : replorigin_session_origin = InvalidRepOriginId;
1417 12 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1418 12 : replorigin_session_origin_timestamp = 0;
1419 :
1420 12 : PG_RETURN_VOID();
1421 : }
1422 :
1423 : /*
1424 : * Has a replication origin been setup for this session.
1425 : */
1426 : Datum
1427 4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1428 : {
1429 4 : replorigin_check_prerequisites(false, false);
1430 :
1431 4 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1432 : }
1433 :
1434 :
1435 : /*
1436 : * Return the replication progress for origin setup in the current session.
1437 : *
1438 : * If 'flush' is set to true it is ensured that the returned value corresponds
1439 : * to a local transaction that has been flushed. This is useful if asynchronous
1440 : * commits are used when replaying replicated transactions.
1441 : */
1442 : Datum
1443 4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1444 : {
1445 4 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1446 4 : bool flush = PG_GETARG_BOOL(0);
1447 :
1448 4 : replorigin_check_prerequisites(true, false);
1449 :
1450 4 : if (session_replication_state == NULL)
1451 0 : ereport(ERROR,
1452 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1453 : errmsg("no replication origin is configured")));
1454 :
1455 4 : remote_lsn = replorigin_session_get_progress(flush);
1456 :
1457 4 : if (remote_lsn == InvalidXLogRecPtr)
1458 0 : PG_RETURN_NULL();
1459 :
1460 4 : PG_RETURN_LSN(remote_lsn);
1461 : }
1462 :
1463 : Datum
1464 2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1465 : {
1466 2 : XLogRecPtr location = PG_GETARG_LSN(0);
1467 :
1468 2 : replorigin_check_prerequisites(true, false);
1469 :
1470 2 : if (session_replication_state == NULL)
1471 0 : ereport(ERROR,
1472 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1473 : errmsg("no replication origin is configured")));
1474 :
1475 2 : replorigin_session_origin_lsn = location;
1476 2 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1477 :
1478 2 : PG_RETURN_VOID();
1479 : }
1480 :
1481 : Datum
1482 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1483 : {
1484 0 : replorigin_check_prerequisites(true, false);
1485 :
1486 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1487 0 : replorigin_session_origin_timestamp = 0;
1488 :
1489 0 : PG_RETURN_VOID();
1490 : }
1491 :
1492 :
1493 : Datum
1494 6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1495 : {
1496 6 : text *name = PG_GETARG_TEXT_PP(0);
1497 6 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1498 : RepOriginId node;
1499 :
1500 6 : replorigin_check_prerequisites(true, false);
1501 :
1502 : /* lock to prevent the replication origin from vanishing */
1503 6 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1504 :
1505 6 : node = replorigin_by_name(text_to_cstring(name), false);
1506 :
1507 : /*
1508 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1509 : * xact hasn't committed yet. This is why this function should be used to
1510 : * set up the initial replication state, but not for replay.
1511 : */
1512 4 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1513 : true /* go backward */ , true /* WAL log */ );
1514 :
1515 4 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1516 :
1517 4 : PG_RETURN_VOID();
1518 : }
1519 :
1520 :
1521 : /*
1522 : * Return the replication progress for an individual replication origin.
1523 : *
1524 : * If 'flush' is set to true it is ensured that the returned value corresponds
1525 : * to a local transaction that has been flushed. This is useful if asynchronous
1526 : * commits are used when replaying replicated transactions.
1527 : */
1528 : Datum
1529 6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1530 : {
1531 : char *name;
1532 : bool flush;
1533 : RepOriginId roident;
1534 6 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1535 :
1536 6 : replorigin_check_prerequisites(true, true);
1537 :
1538 6 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1539 6 : flush = PG_GETARG_BOOL(1);
1540 :
1541 6 : roident = replorigin_by_name(name, false);
1542 : Assert(OidIsValid(roident));
1543 :
1544 4 : remote_lsn = replorigin_get_progress(roident, flush);
1545 :
1546 4 : if (remote_lsn == InvalidXLogRecPtr)
1547 0 : PG_RETURN_NULL();
1548 :
1549 4 : PG_RETURN_LSN(remote_lsn);
1550 : }
1551 :
1552 :
1553 : Datum
1554 20 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1555 : {
1556 20 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1557 : int i;
1558 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1559 :
1560 : /* we want to return 0 rows if slot is set to zero */
1561 20 : replorigin_check_prerequisites(false, true);
1562 :
1563 20 : InitMaterializedSRF(fcinfo, 0);
1564 :
1565 : /* prevent slots from being concurrently dropped */
1566 20 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1567 :
1568 : /*
1569 : * Iterate through all possible replication_states, display if they are
1570 : * filled. Note that we do not take any locks, so slightly corrupted/out
1571 : * of date values are a possibility.
1572 : */
1573 220 : for (i = 0; i < max_active_replication_origins; i++)
1574 : {
1575 : ReplicationState *state;
1576 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1577 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1578 : char *roname;
1579 :
1580 200 : state = &replication_states[i];
1581 :
1582 : /* unused slot, nothing to display */
1583 200 : if (state->roident == InvalidRepOriginId)
1584 174 : continue;
1585 :
1586 26 : memset(values, 0, sizeof(values));
1587 26 : memset(nulls, 1, sizeof(nulls));
1588 :
1589 26 : values[0] = ObjectIdGetDatum(state->roident);
1590 26 : nulls[0] = false;
1591 :
1592 : /*
1593 : * We're not preventing the origin to be dropped concurrently, so
1594 : * silently accept that it might be gone.
1595 : */
1596 26 : if (replorigin_by_oid(state->roident, true,
1597 : &roname))
1598 : {
1599 26 : values[1] = CStringGetTextDatum(roname);
1600 26 : nulls[1] = false;
1601 : }
1602 :
1603 26 : LWLockAcquire(&state->lock, LW_SHARED);
1604 :
1605 26 : values[2] = LSNGetDatum(state->remote_lsn);
1606 26 : nulls[2] = false;
1607 :
1608 26 : values[3] = LSNGetDatum(state->local_lsn);
1609 26 : nulls[3] = false;
1610 :
1611 26 : LWLockRelease(&state->lock);
1612 :
1613 26 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1614 : values, nulls);
1615 : }
1616 :
1617 20 : LWLockRelease(ReplicationOriginLock);
1618 :
1619 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1620 :
1621 20 : return (Datum) 0;
1622 : }
|