Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consist out of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "utils/builtins.h"
92 : #include "utils/fmgroids.h"
93 : #include "utils/pg_lsn.h"
94 : #include "utils/rel.h"
95 : #include "utils/snapmgr.h"
96 : #include "utils/syscache.h"
97 :
98 : /* paths for replication origin checkpoint files */
99 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
100 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
101 :
102 : /*
103 : * Replay progress of a single remote node.
104 : */
105 : typedef struct ReplicationState
106 : {
107 : /*
108 : * Local identifier for the remote node.
109 : */
110 : RepOriginId roident;
111 :
112 : /*
113 : * Location of the latest commit from the remote side.
114 : */
115 : XLogRecPtr remote_lsn;
116 :
117 : /*
118 : * Remember the local lsn of the commit record so we can XLogFlush() to it
119 : * during a checkpoint so we know the commit record actually is safe on
120 : * disk.
121 : */
122 : XLogRecPtr local_lsn;
123 :
124 : /*
125 : * PID of backend that's acquired slot, or 0 if none.
126 : */
127 : int acquired_by;
128 :
129 : /*
130 : * Condition variable that's signaled when acquired_by changes.
131 : */
132 : ConditionVariable origin_cv;
133 :
134 : /*
135 : * Lock protecting remote_lsn and local_lsn.
136 : */
137 : LWLock lock;
138 : } ReplicationState;
139 :
140 : /*
141 : * On disk version of ReplicationState.
142 : */
143 : typedef struct ReplicationStateOnDisk
144 : {
145 : RepOriginId roident;
146 : XLogRecPtr remote_lsn;
147 : } ReplicationStateOnDisk;
148 :
149 :
150 : typedef struct ReplicationStateCtl
151 : {
152 : /* Tranche to use for per-origin LWLocks */
153 : int tranche_id;
154 : /* Array of length max_replication_slots */
155 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
156 : } ReplicationStateCtl;
157 :
158 : /* external variables */
159 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
160 : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
161 : TimestampTz replorigin_session_origin_timestamp = 0;
162 :
163 : /*
164 : * Base address into a shared memory array of replication states of size
165 : * max_replication_slots.
166 : *
167 : * XXX: Should we use a separate variable to size this rather than
168 : * max_replication_slots?
169 : */
170 : static ReplicationState *replication_states;
171 :
172 : /*
173 : * Actual shared memory block (replication_states[] is now part of this).
174 : */
175 : static ReplicationStateCtl *replication_states_ctl;
176 :
177 : /*
178 : * We keep a pointer to this backend's ReplicationState to avoid having to
179 : * search the replication_states array in replorigin_session_advance for each
180 : * remote commit. (Ownership of a backend's own entry can only be changed by
181 : * that backend.)
182 : */
183 : static ReplicationState *session_replication_state = NULL;
184 :
185 : /* Magic for on disk files. */
186 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
187 :
188 : static void
189 84 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
190 : {
191 84 : if (check_slots && max_replication_slots == 0)
192 0 : ereport(ERROR,
193 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
194 : errmsg("cannot query or manipulate replication origin when \"max_replication_slots\" is 0")));
195 :
196 84 : if (!recoveryOK && RecoveryInProgress())
197 0 : ereport(ERROR,
198 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
199 : errmsg("cannot manipulate replication origins during recovery")));
200 84 : }
201 :
202 :
203 : /*
204 : * IsReservedOriginName
205 : * True iff name is either "none" or "any".
206 : */
207 : static bool
208 16 : IsReservedOriginName(const char *name)
209 : {
210 30 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
211 14 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
212 : }
213 :
214 : /* ---------------------------------------------------------------------------
215 : * Functions for working with replication origins themselves.
216 : * ---------------------------------------------------------------------------
217 : */
218 :
219 : /*
220 : * Check for a persistent replication origin identified by name.
221 : *
222 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
223 : */
224 : RepOriginId
225 1742 : replorigin_by_name(const char *roname, bool missing_ok)
226 : {
227 : Form_pg_replication_origin ident;
228 1742 : Oid roident = InvalidOid;
229 : HeapTuple tuple;
230 : Datum roname_d;
231 :
232 1742 : roname_d = CStringGetTextDatum(roname);
233 :
234 1742 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
235 1742 : if (HeapTupleIsValid(tuple))
236 : {
237 1014 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
238 1014 : roident = ident->roident;
239 1014 : ReleaseSysCache(tuple);
240 : }
241 728 : else if (!missing_ok)
242 8 : ereport(ERROR,
243 : (errcode(ERRCODE_UNDEFINED_OBJECT),
244 : errmsg("replication origin \"%s\" does not exist",
245 : roname)));
246 :
247 1734 : return roident;
248 : }
249 :
250 : /*
251 : * Create a replication origin.
252 : *
253 : * Needs to be called in a transaction.
254 : */
255 : RepOriginId
256 684 : replorigin_create(const char *roname)
257 : {
258 : Oid roident;
259 684 : HeapTuple tuple = NULL;
260 : Relation rel;
261 : Datum roname_d;
262 : SnapshotData SnapshotDirty;
263 : SysScanDesc scan;
264 : ScanKeyData key;
265 :
266 684 : roname_d = CStringGetTextDatum(roname);
267 :
268 : Assert(IsTransactionState());
269 :
270 : /*
271 : * We need the numeric replication origin to be 16bit wide, so we cannot
272 : * rely on the normal oid allocation. Instead we simply scan
273 : * pg_replication_origin for the first unused id. That's not particularly
274 : * efficient, but this should be a fairly infrequent operation - we can
275 : * easily spend a bit more code on this when it turns out it needs to be
276 : * faster.
277 : *
278 : * We handle concurrency by taking an exclusive lock (allowing reads!)
279 : * over the table for the duration of the search. Because we use a "dirty
280 : * snapshot" we can read rows that other in-progress sessions have
281 : * written, even though they would be invisible with normal snapshots. Due
282 : * to the exclusive lock there's no danger that new rows can appear while
283 : * we're checking.
284 : */
285 684 : InitDirtySnapshot(SnapshotDirty);
286 :
287 684 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
288 :
289 1200 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
290 : {
291 : bool nulls[Natts_pg_replication_origin];
292 : Datum values[Natts_pg_replication_origin];
293 : bool collides;
294 :
295 1200 : CHECK_FOR_INTERRUPTS();
296 :
297 1200 : ScanKeyInit(&key,
298 : Anum_pg_replication_origin_roident,
299 : BTEqualStrategyNumber, F_OIDEQ,
300 : ObjectIdGetDatum(roident));
301 :
302 1200 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
303 : true /* indexOK */ ,
304 : &SnapshotDirty,
305 : 1, &key);
306 :
307 1200 : collides = HeapTupleIsValid(systable_getnext(scan));
308 :
309 1200 : systable_endscan(scan);
310 :
311 1200 : if (!collides)
312 : {
313 : /*
314 : * Ok, found an unused roident, insert the new row and do a CCI,
315 : * so our callers can look it up if they want to.
316 : */
317 684 : memset(&nulls, 0, sizeof(nulls));
318 :
319 684 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
320 684 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
321 :
322 684 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
323 684 : CatalogTupleInsert(rel, tuple);
324 682 : CommandCounterIncrement();
325 682 : break;
326 : }
327 : }
328 :
329 : /* now release lock again, */
330 682 : table_close(rel, ExclusiveLock);
331 :
332 682 : if (tuple == NULL)
333 0 : ereport(ERROR,
334 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
335 : errmsg("could not find free replication origin ID")));
336 :
337 682 : heap_freetuple(tuple);
338 682 : return roident;
339 : }
340 :
341 : /*
342 : * Helper function to drop a replication origin.
343 : */
344 : static void
345 554 : replorigin_state_clear(RepOriginId roident, bool nowait)
346 : {
347 : int i;
348 :
349 : /*
350 : * Clean up the slot state info, if there is any matching slot.
351 : */
352 554 : restart:
353 554 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
354 :
355 1708 : for (i = 0; i < max_replication_slots; i++)
356 : {
357 1634 : ReplicationState *state = &replication_states[i];
358 :
359 1634 : if (state->roident == roident)
360 : {
361 : /* found our slot, is it busy? */
362 480 : if (state->acquired_by != 0)
363 : {
364 : ConditionVariable *cv;
365 :
366 0 : if (nowait)
367 0 : ereport(ERROR,
368 : (errcode(ERRCODE_OBJECT_IN_USE),
369 : errmsg("could not drop replication origin with ID %d, in use by PID %d",
370 : state->roident,
371 : state->acquired_by)));
372 :
373 : /*
374 : * We must wait and then retry. Since we don't know which CV
375 : * to wait on until here, we can't readily use
376 : * ConditionVariablePrepareToSleep (calling it here would be
377 : * wrong, since we could miss the signal if we did so); just
378 : * use ConditionVariableSleep directly.
379 : */
380 0 : cv = &state->origin_cv;
381 :
382 0 : LWLockRelease(ReplicationOriginLock);
383 :
384 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
385 0 : goto restart;
386 : }
387 :
388 : /* first make a WAL log entry */
389 : {
390 : xl_replorigin_drop xlrec;
391 :
392 480 : xlrec.node_id = roident;
393 480 : XLogBeginInsert();
394 480 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
395 480 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
396 : }
397 :
398 : /* then clear the in-memory slot */
399 480 : state->roident = InvalidRepOriginId;
400 480 : state->remote_lsn = InvalidXLogRecPtr;
401 480 : state->local_lsn = InvalidXLogRecPtr;
402 480 : break;
403 : }
404 : }
405 554 : LWLockRelease(ReplicationOriginLock);
406 554 : ConditionVariableCancelSleep();
407 554 : }
408 :
409 : /*
410 : * Drop replication origin (by name).
411 : *
412 : * Needs to be called in a transaction.
413 : */
414 : void
415 908 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
416 : {
417 : RepOriginId roident;
418 : Relation rel;
419 : HeapTuple tuple;
420 :
421 : Assert(IsTransactionState());
422 :
423 908 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
424 :
425 908 : roident = replorigin_by_name(name, missing_ok);
426 :
427 : /* Lock the origin to prevent concurrent drops. */
428 906 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
429 : AccessExclusiveLock);
430 :
431 906 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
432 906 : if (!HeapTupleIsValid(tuple))
433 : {
434 352 : if (!missing_ok)
435 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
436 : roident);
437 :
438 : /*
439 : * We don't need to retain the locks if the origin is already dropped.
440 : */
441 352 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
442 : AccessExclusiveLock);
443 352 : table_close(rel, RowExclusiveLock);
444 352 : return;
445 : }
446 :
447 554 : replorigin_state_clear(roident, nowait);
448 :
449 : /*
450 : * Now, we can delete the catalog entry.
451 : */
452 554 : CatalogTupleDelete(rel, &tuple->t_self);
453 554 : ReleaseSysCache(tuple);
454 :
455 554 : CommandCounterIncrement();
456 :
457 : /* We keep the lock on pg_replication_origin until commit */
458 554 : table_close(rel, NoLock);
459 : }
460 :
461 : /*
462 : * Lookup replication origin via its oid and return the name.
463 : *
464 : * The external name is palloc'd in the calling context.
465 : *
466 : * Returns true if the origin is known, false otherwise.
467 : */
468 : bool
469 40 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
470 : {
471 : HeapTuple tuple;
472 : Form_pg_replication_origin ric;
473 :
474 : Assert(OidIsValid((Oid) roident));
475 : Assert(roident != InvalidRepOriginId);
476 : Assert(roident != DoNotReplicateId);
477 :
478 40 : tuple = SearchSysCache1(REPLORIGIDENT,
479 : ObjectIdGetDatum((Oid) roident));
480 :
481 40 : if (HeapTupleIsValid(tuple))
482 : {
483 34 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
484 34 : *roname = text_to_cstring(&ric->roname);
485 34 : ReleaseSysCache(tuple);
486 :
487 34 : return true;
488 : }
489 : else
490 : {
491 6 : *roname = NULL;
492 :
493 6 : if (!missing_ok)
494 0 : ereport(ERROR,
495 : (errcode(ERRCODE_UNDEFINED_OBJECT),
496 : errmsg("replication origin with ID %d does not exist",
497 : roident)));
498 :
499 6 : return false;
500 : }
501 : }
502 :
503 :
504 : /* ---------------------------------------------------------------------------
505 : * Functions for handling replication progress.
506 : * ---------------------------------------------------------------------------
507 : */
508 :
509 : Size
510 7398 : ReplicationOriginShmemSize(void)
511 : {
512 7398 : Size size = 0;
513 :
514 : /*
515 : * XXX: max_replication_slots is arguably the wrong thing to use, as here
516 : * we keep the replay state of *remote* transactions. But for now it seems
517 : * sufficient to reuse it, rather than introduce a separate GUC.
518 : */
519 7398 : if (max_replication_slots == 0)
520 4 : return size;
521 :
522 7394 : size = add_size(size, offsetof(ReplicationStateCtl, states));
523 :
524 7394 : size = add_size(size,
525 : mul_size(max_replication_slots, sizeof(ReplicationState)));
526 7394 : return size;
527 : }
528 :
529 : void
530 1918 : ReplicationOriginShmemInit(void)
531 : {
532 : bool found;
533 :
534 1918 : if (max_replication_slots == 0)
535 2 : return;
536 :
537 1916 : replication_states_ctl = (ReplicationStateCtl *)
538 1916 : ShmemInitStruct("ReplicationOriginState",
539 : ReplicationOriginShmemSize(),
540 : &found);
541 1916 : replication_states = replication_states_ctl->states;
542 :
543 1916 : if (!found)
544 : {
545 : int i;
546 :
547 135208 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
548 :
549 1916 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
550 :
551 20684 : for (i = 0; i < max_replication_slots; i++)
552 : {
553 18768 : LWLockInitialize(&replication_states[i].lock,
554 18768 : replication_states_ctl->tranche_id);
555 18768 : ConditionVariableInit(&replication_states[i].origin_cv);
556 : }
557 : }
558 : }
559 :
560 : /* ---------------------------------------------------------------------------
561 : * Perform a checkpoint of each replication origin's progress with respect to
562 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
563 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
564 : * if the transactions were originally committed asynchronously.
565 : *
566 : * We store checkpoints in the following format:
567 : * +-------+------------------------+------------------+-----+--------+
568 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
569 : * +-------+------------------------+------------------+-----+--------+
570 : *
571 : * So its just the magic, followed by the statically sized
572 : * ReplicationStateOnDisk structs. Note that the maximum number of
573 : * ReplicationState is determined by max_replication_slots.
574 : * ---------------------------------------------------------------------------
575 : */
576 : void
577 2476 : CheckPointReplicationOrigin(void)
578 : {
579 2476 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
580 2476 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
581 : int tmpfd;
582 : int i;
583 2476 : uint32 magic = REPLICATION_STATE_MAGIC;
584 : pg_crc32c crc;
585 :
586 2476 : if (max_replication_slots == 0)
587 2 : return;
588 :
589 2474 : INIT_CRC32C(crc);
590 :
591 : /* make sure no old temp file is remaining */
592 2474 : if (unlink(tmppath) < 0 && errno != ENOENT)
593 0 : ereport(PANIC,
594 : (errcode_for_file_access(),
595 : errmsg("could not remove file \"%s\": %m",
596 : tmppath)));
597 :
598 : /*
599 : * no other backend can perform this at the same time; only one checkpoint
600 : * can happen at a time.
601 : */
602 2474 : tmpfd = OpenTransientFile(tmppath,
603 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
604 2474 : if (tmpfd < 0)
605 0 : ereport(PANIC,
606 : (errcode_for_file_access(),
607 : errmsg("could not create file \"%s\": %m",
608 : tmppath)));
609 :
610 : /* write magic */
611 2474 : errno = 0;
612 2474 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
613 : {
614 : /* if write didn't set errno, assume problem is no disk space */
615 0 : if (errno == 0)
616 0 : errno = ENOSPC;
617 0 : ereport(PANIC,
618 : (errcode_for_file_access(),
619 : errmsg("could not write to file \"%s\": %m",
620 : tmppath)));
621 : }
622 2474 : COMP_CRC32C(crc, &magic, sizeof(magic));
623 :
624 : /* prevent concurrent creations/drops */
625 2474 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
626 :
627 : /* write actual data */
628 26680 : for (i = 0; i < max_replication_slots; i++)
629 : {
630 : ReplicationStateOnDisk disk_state;
631 24206 : ReplicationState *curstate = &replication_states[i];
632 : XLogRecPtr local_lsn;
633 :
634 24206 : if (curstate->roident == InvalidRepOriginId)
635 24116 : continue;
636 :
637 : /* zero, to avoid uninitialized padding bytes */
638 90 : memset(&disk_state, 0, sizeof(disk_state));
639 :
640 90 : LWLockAcquire(&curstate->lock, LW_SHARED);
641 :
642 90 : disk_state.roident = curstate->roident;
643 :
644 90 : disk_state.remote_lsn = curstate->remote_lsn;
645 90 : local_lsn = curstate->local_lsn;
646 :
647 90 : LWLockRelease(&curstate->lock);
648 :
649 : /* make sure we only write out a commit that's persistent */
650 90 : XLogFlush(local_lsn);
651 :
652 90 : errno = 0;
653 90 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
654 : sizeof(disk_state))
655 : {
656 : /* if write didn't set errno, assume problem is no disk space */
657 0 : if (errno == 0)
658 0 : errno = ENOSPC;
659 0 : ereport(PANIC,
660 : (errcode_for_file_access(),
661 : errmsg("could not write to file \"%s\": %m",
662 : tmppath)));
663 : }
664 :
665 90 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
666 : }
667 :
668 2474 : LWLockRelease(ReplicationOriginLock);
669 :
670 : /* write out the CRC */
671 2474 : FIN_CRC32C(crc);
672 2474 : errno = 0;
673 2474 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
674 : {
675 : /* if write didn't set errno, assume problem is no disk space */
676 0 : if (errno == 0)
677 0 : errno = ENOSPC;
678 0 : ereport(PANIC,
679 : (errcode_for_file_access(),
680 : errmsg("could not write to file \"%s\": %m",
681 : tmppath)));
682 : }
683 :
684 2474 : if (CloseTransientFile(tmpfd) != 0)
685 0 : ereport(PANIC,
686 : (errcode_for_file_access(),
687 : errmsg("could not close file \"%s\": %m",
688 : tmppath)));
689 :
690 : /* fsync, rename to permanent file, fsync file and directory */
691 2474 : durable_rename(tmppath, path, PANIC);
692 : }
693 :
694 : /*
695 : * Recover replication replay status from checkpoint data saved earlier by
696 : * CheckPointReplicationOrigin.
697 : *
698 : * This only needs to be called at startup and *not* during every checkpoint
699 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
700 : * state thereafter can be recovered by looking at commit records.
701 : */
702 : void
703 1650 : StartupReplicationOrigin(void)
704 : {
705 1650 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
706 : int fd;
707 : int readBytes;
708 1650 : uint32 magic = REPLICATION_STATE_MAGIC;
709 1650 : int last_state = 0;
710 : pg_crc32c file_crc;
711 : pg_crc32c crc;
712 :
713 : /* don't want to overwrite already existing state */
714 : #ifdef USE_ASSERT_CHECKING
715 : static bool already_started = false;
716 :
717 : Assert(!already_started);
718 : already_started = true;
719 : #endif
720 :
721 1650 : if (max_replication_slots == 0)
722 92 : return;
723 :
724 1648 : INIT_CRC32C(crc);
725 :
726 1648 : elog(DEBUG2, "starting up replication origin progress state");
727 :
728 1648 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
729 :
730 : /*
731 : * might have had max_replication_slots == 0 last run, or we just brought
732 : * up a standby.
733 : */
734 1648 : if (fd < 0 && errno == ENOENT)
735 90 : return;
736 1558 : else if (fd < 0)
737 0 : ereport(PANIC,
738 : (errcode_for_file_access(),
739 : errmsg("could not open file \"%s\": %m",
740 : path)));
741 :
742 : /* verify magic, that is written even if nothing was active */
743 1558 : readBytes = read(fd, &magic, sizeof(magic));
744 1558 : if (readBytes != sizeof(magic))
745 : {
746 0 : if (readBytes < 0)
747 0 : ereport(PANIC,
748 : (errcode_for_file_access(),
749 : errmsg("could not read file \"%s\": %m",
750 : path)));
751 : else
752 0 : ereport(PANIC,
753 : (errcode(ERRCODE_DATA_CORRUPTED),
754 : errmsg("could not read file \"%s\": read %d of %zu",
755 : path, readBytes, sizeof(magic))));
756 : }
757 1558 : COMP_CRC32C(crc, &magic, sizeof(magic));
758 :
759 1558 : if (magic != REPLICATION_STATE_MAGIC)
760 0 : ereport(PANIC,
761 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
762 : magic, REPLICATION_STATE_MAGIC)));
763 :
764 : /* we can skip locking here, no other access is possible */
765 :
766 : /* recover individual states, until there are no more to be found */
767 : while (true)
768 38 : {
769 : ReplicationStateOnDisk disk_state;
770 :
771 1596 : readBytes = read(fd, &disk_state, sizeof(disk_state));
772 :
773 : /* no further data */
774 1596 : if (readBytes == sizeof(crc))
775 : {
776 : /* not pretty, but simple ... */
777 1558 : file_crc = *(pg_crc32c *) &disk_state;
778 1558 : break;
779 : }
780 :
781 38 : if (readBytes < 0)
782 : {
783 0 : ereport(PANIC,
784 : (errcode_for_file_access(),
785 : errmsg("could not read file \"%s\": %m",
786 : path)));
787 : }
788 :
789 38 : if (readBytes != sizeof(disk_state))
790 : {
791 0 : ereport(PANIC,
792 : (errcode_for_file_access(),
793 : errmsg("could not read file \"%s\": read %d of %zu",
794 : path, readBytes, sizeof(disk_state))));
795 : }
796 :
797 38 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
798 :
799 38 : if (last_state == max_replication_slots)
800 0 : ereport(PANIC,
801 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
802 : errmsg("could not find free replication state, increase \"max_replication_slots\"")));
803 :
804 : /* copy data to shared memory */
805 38 : replication_states[last_state].roident = disk_state.roident;
806 38 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
807 38 : last_state++;
808 :
809 38 : ereport(LOG,
810 : (errmsg("recovered replication state of node %d to %X/%X",
811 : disk_state.roident,
812 : LSN_FORMAT_ARGS(disk_state.remote_lsn))));
813 : }
814 :
815 : /* now check checksum */
816 1558 : FIN_CRC32C(crc);
817 1558 : if (file_crc != crc)
818 0 : ereport(PANIC,
819 : (errcode(ERRCODE_DATA_CORRUPTED),
820 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
821 : crc, file_crc)));
822 :
823 1558 : if (CloseTransientFile(fd) != 0)
824 0 : ereport(PANIC,
825 : (errcode_for_file_access(),
826 : errmsg("could not close file \"%s\": %m",
827 : path)));
828 : }
829 :
830 : void
831 8 : replorigin_redo(XLogReaderState *record)
832 : {
833 8 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
834 :
835 8 : switch (info)
836 : {
837 4 : case XLOG_REPLORIGIN_SET:
838 : {
839 4 : xl_replorigin_set *xlrec =
840 4 : (xl_replorigin_set *) XLogRecGetData(record);
841 :
842 4 : replorigin_advance(xlrec->node_id,
843 : xlrec->remote_lsn, record->EndRecPtr,
844 4 : xlrec->force /* backward */ ,
845 : false /* WAL log */ );
846 4 : break;
847 : }
848 4 : case XLOG_REPLORIGIN_DROP:
849 : {
850 : xl_replorigin_drop *xlrec;
851 : int i;
852 :
853 4 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
854 :
855 6 : for (i = 0; i < max_replication_slots; i++)
856 : {
857 6 : ReplicationState *state = &replication_states[i];
858 :
859 : /* found our slot */
860 6 : if (state->roident == xlrec->node_id)
861 : {
862 : /* reset entry */
863 4 : state->roident = InvalidRepOriginId;
864 4 : state->remote_lsn = InvalidXLogRecPtr;
865 4 : state->local_lsn = InvalidXLogRecPtr;
866 4 : break;
867 : }
868 : }
869 4 : break;
870 : }
871 0 : default:
872 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
873 : }
874 8 : }
875 :
876 :
877 : /*
878 : * Tell the replication origin progress machinery that a commit from 'node'
879 : * that originated at the LSN remote_commit on the remote node was replayed
880 : * successfully and that we don't need to do so again. In combination with
881 : * setting up replorigin_session_origin_lsn and replorigin_session_origin
882 : * that ensures we won't lose knowledge about that after a crash if the
883 : * transaction had a persistent effect (think of asynchronous commits).
884 : *
885 : * local_commit needs to be a local LSN of the commit so that we can make sure
886 : * upon a checkpoint that enough WAL has been persisted to disk.
887 : *
888 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
889 : * unless running in recovery.
890 : */
891 : void
892 454 : replorigin_advance(RepOriginId node,
893 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
894 : bool go_backward, bool wal_log)
895 : {
896 : int i;
897 454 : ReplicationState *replication_state = NULL;
898 454 : ReplicationState *free_state = NULL;
899 :
900 : Assert(node != InvalidRepOriginId);
901 :
902 : /* we don't track DoNotReplicateId */
903 454 : if (node == DoNotReplicateId)
904 0 : return;
905 :
906 : /*
907 : * XXX: For the case where this is called by WAL replay, it'd be more
908 : * efficient to restore into a backend local hashtable and only dump into
909 : * shmem after recovery is finished. Let's wait with implementing that
910 : * till it's shown to be a measurable expense
911 : */
912 :
913 : /* Lock exclusively, as we may have to create a new table entry. */
914 454 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
915 :
916 : /*
917 : * Search for either an existing slot for the origin, or a free one we can
918 : * use.
919 : */
920 4134 : for (i = 0; i < max_replication_slots; i++)
921 : {
922 3768 : ReplicationState *curstate = &replication_states[i];
923 :
924 : /* remember where to insert if necessary */
925 3768 : if (curstate->roident == InvalidRepOriginId &&
926 : free_state == NULL)
927 : {
928 366 : free_state = curstate;
929 366 : continue;
930 : }
931 :
932 : /* not our slot */
933 3402 : if (curstate->roident != node)
934 : {
935 3314 : continue;
936 : }
937 :
938 : /* ok, found slot */
939 88 : replication_state = curstate;
940 :
941 88 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
942 :
943 : /* Make sure it's not used by somebody else */
944 88 : if (replication_state->acquired_by != 0)
945 : {
946 0 : ereport(ERROR,
947 : (errcode(ERRCODE_OBJECT_IN_USE),
948 : errmsg("replication origin with ID %d is already active for PID %d",
949 : replication_state->roident,
950 : replication_state->acquired_by)));
951 : }
952 :
953 88 : break;
954 : }
955 :
956 454 : if (replication_state == NULL && free_state == NULL)
957 0 : ereport(ERROR,
958 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
959 : errmsg("could not find free replication state slot for replication origin with ID %d",
960 : node),
961 : errhint("Increase \"max_replication_slots\" and try again.")));
962 :
963 454 : if (replication_state == NULL)
964 : {
965 : /* initialize new slot */
966 366 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
967 366 : replication_state = free_state;
968 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
969 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
970 366 : replication_state->roident = node;
971 : }
972 :
973 : Assert(replication_state->roident != InvalidRepOriginId);
974 :
975 : /*
976 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
977 : * and the standby gets the message. Primarily this will be called during
978 : * WAL replay (of commit records) where no WAL logging is necessary.
979 : */
980 454 : if (wal_log)
981 : {
982 : xl_replorigin_set xlrec;
983 :
984 372 : xlrec.remote_lsn = remote_commit;
985 372 : xlrec.node_id = node;
986 372 : xlrec.force = go_backward;
987 :
988 372 : XLogBeginInsert();
989 372 : XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
990 :
991 372 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
992 : }
993 :
994 : /*
995 : * Due to - harmless - race conditions during a checkpoint we could see
996 : * values here that are older than the ones we already have in memory. We
997 : * could also see older values for prepared transactions when the prepare
998 : * is sent at a later point of time along with commit prepared and there
999 : * are other transactions commits between prepare and commit prepared. See
1000 : * ReorderBufferFinishPrepared. Don't overwrite those.
1001 : */
1002 454 : if (go_backward || replication_state->remote_lsn < remote_commit)
1003 440 : replication_state->remote_lsn = remote_commit;
1004 454 : if (local_commit != InvalidXLogRecPtr &&
1005 76 : (go_backward || replication_state->local_lsn < local_commit))
1006 80 : replication_state->local_lsn = local_commit;
1007 454 : LWLockRelease(&replication_state->lock);
1008 :
1009 : /*
1010 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1011 : * otherwise be dropped anytime.
1012 : */
1013 454 : LWLockRelease(ReplicationOriginLock);
1014 : }
1015 :
1016 :
1017 : XLogRecPtr
1018 16 : replorigin_get_progress(RepOriginId node, bool flush)
1019 : {
1020 : int i;
1021 16 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1022 16 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1023 :
1024 : /* prevent slots from being concurrently dropped */
1025 16 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1026 :
1027 76 : for (i = 0; i < max_replication_slots; i++)
1028 : {
1029 : ReplicationState *state;
1030 :
1031 70 : state = &replication_states[i];
1032 :
1033 70 : if (state->roident == node)
1034 : {
1035 10 : LWLockAcquire(&state->lock, LW_SHARED);
1036 :
1037 10 : remote_lsn = state->remote_lsn;
1038 10 : local_lsn = state->local_lsn;
1039 :
1040 10 : LWLockRelease(&state->lock);
1041 :
1042 10 : break;
1043 : }
1044 : }
1045 :
1046 16 : LWLockRelease(ReplicationOriginLock);
1047 :
1048 16 : if (flush && local_lsn != InvalidXLogRecPtr)
1049 2 : XLogFlush(local_lsn);
1050 :
1051 16 : return remote_lsn;
1052 : }
1053 :
1054 : /*
1055 : * Tear down a (possibly) configured session replication origin during process
1056 : * exit.
1057 : */
1058 : static void
1059 800 : ReplicationOriginExitCleanup(int code, Datum arg)
1060 : {
1061 800 : ConditionVariable *cv = NULL;
1062 :
1063 800 : if (session_replication_state == NULL)
1064 352 : return;
1065 :
1066 448 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1067 :
1068 448 : if (session_replication_state->acquired_by == MyProcPid)
1069 : {
1070 428 : cv = &session_replication_state->origin_cv;
1071 :
1072 428 : session_replication_state->acquired_by = 0;
1073 428 : session_replication_state = NULL;
1074 : }
1075 :
1076 448 : LWLockRelease(ReplicationOriginLock);
1077 :
1078 448 : if (cv)
1079 428 : ConditionVariableBroadcast(cv);
1080 : }
1081 :
1082 : /*
1083 : * Setup a replication origin in the shared memory struct if it doesn't
1084 : * already exist and cache access to the specific ReplicationSlot so the
1085 : * array doesn't have to be searched when calling
1086 : * replorigin_session_advance().
1087 : *
1088 : * Normally only one such cached origin can exist per process so the cached
1089 : * value can only be set again after the previous value is torn down with
1090 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1091 : * (meaning the slot is not allowed to be already acquired by another process).
1092 : *
1093 : * However, sometimes multiple processes can safely re-use the same origin slot
1094 : * (for example, multiple parallel apply processes can safely use the same
1095 : * origin, provided they maintain commit order by allowing only one process to
1096 : * commit at a time). For this case the first process must pass acquired_by =
1097 : * 0, and then the other processes sharing that same origin can pass
1098 : * acquired_by = PID of the first process.
1099 : */
1100 : void
1101 806 : replorigin_session_setup(RepOriginId node, int acquired_by)
1102 : {
1103 : static bool registered_cleanup;
1104 : int i;
1105 806 : int free_slot = -1;
1106 :
1107 806 : if (!registered_cleanup)
1108 : {
1109 800 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1110 800 : registered_cleanup = true;
1111 : }
1112 :
1113 : Assert(max_replication_slots > 0);
1114 :
1115 806 : if (session_replication_state != NULL)
1116 2 : ereport(ERROR,
1117 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1118 : errmsg("cannot setup replication origin when one is already setup")));
1119 :
1120 : /* Lock exclusively, as we may have to create a new table entry. */
1121 804 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1122 :
1123 : /*
1124 : * Search for either an existing slot for the origin, or a free one we can
1125 : * use.
1126 : */
1127 3378 : for (i = 0; i < max_replication_slots; i++)
1128 : {
1129 3164 : ReplicationState *curstate = &replication_states[i];
1130 :
1131 : /* remember where to insert if necessary */
1132 3164 : if (curstate->roident == InvalidRepOriginId &&
1133 : free_slot == -1)
1134 : {
1135 220 : free_slot = i;
1136 220 : continue;
1137 : }
1138 :
1139 : /* not our slot */
1140 2944 : if (curstate->roident != node)
1141 2354 : continue;
1142 :
1143 590 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1144 : {
1145 0 : ereport(ERROR,
1146 : (errcode(ERRCODE_OBJECT_IN_USE),
1147 : errmsg("replication origin with ID %d is already active for PID %d",
1148 : curstate->roident, curstate->acquired_by)));
1149 : }
1150 :
1151 : /* ok, found slot */
1152 590 : session_replication_state = curstate;
1153 590 : break;
1154 : }
1155 :
1156 :
1157 804 : if (session_replication_state == NULL && free_slot == -1)
1158 0 : ereport(ERROR,
1159 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1160 : errmsg("could not find free replication state slot for replication origin with ID %d",
1161 : node),
1162 : errhint("Increase \"max_replication_slots\" and try again.")));
1163 804 : else if (session_replication_state == NULL)
1164 : {
1165 : /* initialize new slot */
1166 214 : session_replication_state = &replication_states[free_slot];
1167 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1168 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1169 214 : session_replication_state->roident = node;
1170 : }
1171 :
1172 :
1173 : Assert(session_replication_state->roident != InvalidRepOriginId);
1174 :
1175 804 : if (acquired_by == 0)
1176 784 : session_replication_state->acquired_by = MyProcPid;
1177 20 : else if (session_replication_state->acquired_by != acquired_by)
1178 0 : elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1179 : node, acquired_by);
1180 :
1181 804 : LWLockRelease(ReplicationOriginLock);
1182 :
1183 : /* probably this one is pointless */
1184 804 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1185 804 : }
1186 :
1187 : /*
1188 : * Reset replay state previously setup in this session.
1189 : *
1190 : * This function may only be called if an origin was setup with
1191 : * replorigin_session_setup().
1192 : */
1193 : void
1194 358 : replorigin_session_reset(void)
1195 : {
1196 : ConditionVariable *cv;
1197 :
1198 : Assert(max_replication_slots != 0);
1199 :
1200 358 : if (session_replication_state == NULL)
1201 2 : ereport(ERROR,
1202 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1203 : errmsg("no replication origin is configured")));
1204 :
1205 356 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1206 :
1207 356 : session_replication_state->acquired_by = 0;
1208 356 : cv = &session_replication_state->origin_cv;
1209 356 : session_replication_state = NULL;
1210 :
1211 356 : LWLockRelease(ReplicationOriginLock);
1212 :
1213 356 : ConditionVariableBroadcast(cv);
1214 356 : }
1215 :
1216 : /*
1217 : * Do the same work replorigin_advance() does, just on the session's
1218 : * configured origin.
1219 : *
1220 : * This is noticeably cheaper than using replorigin_advance().
1221 : */
1222 : void
1223 2082 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1224 : {
1225 : Assert(session_replication_state != NULL);
1226 : Assert(session_replication_state->roident != InvalidRepOriginId);
1227 :
1228 2082 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1229 2082 : if (session_replication_state->local_lsn < local_commit)
1230 2082 : session_replication_state->local_lsn = local_commit;
1231 2082 : if (session_replication_state->remote_lsn < remote_commit)
1232 1000 : session_replication_state->remote_lsn = remote_commit;
1233 2082 : LWLockRelease(&session_replication_state->lock);
1234 2082 : }
1235 :
1236 : /*
1237 : * Ask the machinery about the point up to which we successfully replayed
1238 : * changes from an already setup replication origin.
1239 : */
1240 : XLogRecPtr
1241 412 : replorigin_session_get_progress(bool flush)
1242 : {
1243 : XLogRecPtr remote_lsn;
1244 : XLogRecPtr local_lsn;
1245 :
1246 : Assert(session_replication_state != NULL);
1247 :
1248 412 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1249 412 : remote_lsn = session_replication_state->remote_lsn;
1250 412 : local_lsn = session_replication_state->local_lsn;
1251 412 : LWLockRelease(&session_replication_state->lock);
1252 :
1253 412 : if (flush && local_lsn != InvalidXLogRecPtr)
1254 2 : XLogFlush(local_lsn);
1255 :
1256 412 : return remote_lsn;
1257 : }
1258 :
1259 :
1260 :
1261 : /* ---------------------------------------------------------------------------
1262 : * SQL functions for working with replication origin.
1263 : *
1264 : * These mostly should be fairly short wrappers around more generic functions.
1265 : * ---------------------------------------------------------------------------
1266 : */
1267 :
1268 : /*
1269 : * Create replication origin for the passed in name, and return the assigned
1270 : * oid.
1271 : */
1272 : Datum
1273 18 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1274 : {
1275 : char *name;
1276 : RepOriginId roident;
1277 :
1278 18 : replorigin_check_prerequisites(false, false);
1279 :
1280 18 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1281 :
1282 : /*
1283 : * Replication origins "any and "none" are reserved for system options.
1284 : * The origins "pg_xxx" are reserved for internal use.
1285 : */
1286 18 : if (IsReservedName(name) || IsReservedOriginName(name))
1287 6 : ereport(ERROR,
1288 : (errcode(ERRCODE_RESERVED_NAME),
1289 : errmsg("replication origin name \"%s\" is reserved",
1290 : name),
1291 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1292 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1293 :
1294 : /*
1295 : * If built with appropriate switch, whine when regression-testing
1296 : * conventions for replication origin names are violated.
1297 : */
1298 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1299 : if (strncmp(name, "regress_", 8) != 0)
1300 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1301 : #endif
1302 :
1303 12 : roident = replorigin_create(name);
1304 :
1305 10 : pfree(name);
1306 :
1307 10 : PG_RETURN_OID(roident);
1308 : }
1309 :
1310 : /*
1311 : * Drop replication origin.
1312 : */
1313 : Datum
1314 14 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1315 : {
1316 : char *name;
1317 :
1318 14 : replorigin_check_prerequisites(false, false);
1319 :
1320 14 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1321 :
1322 14 : replorigin_drop_by_name(name, false, true);
1323 :
1324 12 : pfree(name);
1325 :
1326 12 : PG_RETURN_VOID();
1327 : }
1328 :
1329 : /*
1330 : * Return oid of a replication origin.
1331 : */
1332 : Datum
1333 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1334 : {
1335 : char *name;
1336 : RepOriginId roident;
1337 :
1338 0 : replorigin_check_prerequisites(false, false);
1339 :
1340 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1341 0 : roident = replorigin_by_name(name, true);
1342 :
1343 0 : pfree(name);
1344 :
1345 0 : if (OidIsValid(roident))
1346 0 : PG_RETURN_OID(roident);
1347 0 : PG_RETURN_NULL();
1348 : }
1349 :
1350 : /*
1351 : * Setup a replication origin for this session.
1352 : */
1353 : Datum
1354 12 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1355 : {
1356 : char *name;
1357 : RepOriginId origin;
1358 :
1359 12 : replorigin_check_prerequisites(true, false);
1360 :
1361 12 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1362 12 : origin = replorigin_by_name(name, false);
1363 10 : replorigin_session_setup(origin, 0);
1364 :
1365 8 : replorigin_session_origin = origin;
1366 :
1367 8 : pfree(name);
1368 :
1369 8 : PG_RETURN_VOID();
1370 : }
1371 :
1372 : /*
1373 : * Reset previously setup origin in this session
1374 : */
1375 : Datum
1376 10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1377 : {
1378 10 : replorigin_check_prerequisites(true, false);
1379 :
1380 10 : replorigin_session_reset();
1381 :
1382 8 : replorigin_session_origin = InvalidRepOriginId;
1383 8 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1384 8 : replorigin_session_origin_timestamp = 0;
1385 :
1386 8 : PG_RETURN_VOID();
1387 : }
1388 :
1389 : /*
1390 : * Has a replication origin been setup for this session.
1391 : */
1392 : Datum
1393 0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1394 : {
1395 0 : replorigin_check_prerequisites(false, false);
1396 :
1397 0 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1398 : }
1399 :
1400 :
1401 : /*
1402 : * Return the replication progress for origin setup in the current session.
1403 : *
1404 : * If 'flush' is set to true it is ensured that the returned value corresponds
1405 : * to a local transaction that has been flushed. This is useful if asynchronous
1406 : * commits are used when replaying replicated transactions.
1407 : */
1408 : Datum
1409 4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1410 : {
1411 4 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1412 4 : bool flush = PG_GETARG_BOOL(0);
1413 :
1414 4 : replorigin_check_prerequisites(true, false);
1415 :
1416 4 : if (session_replication_state == NULL)
1417 0 : ereport(ERROR,
1418 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1419 : errmsg("no replication origin is configured")));
1420 :
1421 4 : remote_lsn = replorigin_session_get_progress(flush);
1422 :
1423 4 : if (remote_lsn == InvalidXLogRecPtr)
1424 0 : PG_RETURN_NULL();
1425 :
1426 4 : PG_RETURN_LSN(remote_lsn);
1427 : }
1428 :
1429 : Datum
1430 2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1431 : {
1432 2 : XLogRecPtr location = PG_GETARG_LSN(0);
1433 :
1434 2 : replorigin_check_prerequisites(true, false);
1435 :
1436 2 : if (session_replication_state == NULL)
1437 0 : ereport(ERROR,
1438 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1439 : errmsg("no replication origin is configured")));
1440 :
1441 2 : replorigin_session_origin_lsn = location;
1442 2 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1443 :
1444 2 : PG_RETURN_VOID();
1445 : }
1446 :
1447 : Datum
1448 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1449 : {
1450 0 : replorigin_check_prerequisites(true, false);
1451 :
1452 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1453 0 : replorigin_session_origin_timestamp = 0;
1454 :
1455 0 : PG_RETURN_VOID();
1456 : }
1457 :
1458 :
1459 : Datum
1460 6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1461 : {
1462 6 : text *name = PG_GETARG_TEXT_PP(0);
1463 6 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1464 : RepOriginId node;
1465 :
1466 6 : replorigin_check_prerequisites(true, false);
1467 :
1468 : /* lock to prevent the replication origin from vanishing */
1469 6 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1470 :
1471 6 : node = replorigin_by_name(text_to_cstring(name), false);
1472 :
1473 : /*
1474 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1475 : * xact hasn't committed yet. This is why this function should be used to
1476 : * set up the initial replication state, but not for replay.
1477 : */
1478 4 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1479 : true /* go backward */ , true /* WAL log */ );
1480 :
1481 4 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1482 :
1483 4 : PG_RETURN_VOID();
1484 : }
1485 :
1486 :
1487 : /*
1488 : * Return the replication progress for an individual replication origin.
1489 : *
1490 : * If 'flush' is set to true it is ensured that the returned value corresponds
1491 : * to a local transaction that has been flushed. This is useful if asynchronous
1492 : * commits are used when replaying replicated transactions.
1493 : */
1494 : Datum
1495 6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1496 : {
1497 : char *name;
1498 : bool flush;
1499 : RepOriginId roident;
1500 6 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1501 :
1502 6 : replorigin_check_prerequisites(true, true);
1503 :
1504 6 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1505 6 : flush = PG_GETARG_BOOL(1);
1506 :
1507 6 : roident = replorigin_by_name(name, false);
1508 : Assert(OidIsValid(roident));
1509 :
1510 4 : remote_lsn = replorigin_get_progress(roident, flush);
1511 :
1512 4 : if (remote_lsn == InvalidXLogRecPtr)
1513 0 : PG_RETURN_NULL();
1514 :
1515 4 : PG_RETURN_LSN(remote_lsn);
1516 : }
1517 :
1518 :
1519 : Datum
1520 12 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1521 : {
1522 12 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1523 : int i;
1524 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1525 :
1526 : /* we want to return 0 rows if slot is set to zero */
1527 12 : replorigin_check_prerequisites(false, true);
1528 :
1529 12 : InitMaterializedSRF(fcinfo, 0);
1530 :
1531 : /* prevent slots from being concurrently dropped */
1532 12 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1533 :
1534 : /*
1535 : * Iterate through all possible replication_states, display if they are
1536 : * filled. Note that we do not take any locks, so slightly corrupted/out
1537 : * of date values are a possibility.
1538 : */
1539 108 : for (i = 0; i < max_replication_slots; i++)
1540 : {
1541 : ReplicationState *state;
1542 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1543 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1544 : char *roname;
1545 :
1546 96 : state = &replication_states[i];
1547 :
1548 : /* unused slot, nothing to display */
1549 96 : if (state->roident == InvalidRepOriginId)
1550 86 : continue;
1551 :
1552 10 : memset(values, 0, sizeof(values));
1553 10 : memset(nulls, 1, sizeof(nulls));
1554 :
1555 10 : values[0] = ObjectIdGetDatum(state->roident);
1556 10 : nulls[0] = false;
1557 :
1558 : /*
1559 : * We're not preventing the origin to be dropped concurrently, so
1560 : * silently accept that it might be gone.
1561 : */
1562 10 : if (replorigin_by_oid(state->roident, true,
1563 : &roname))
1564 : {
1565 10 : values[1] = CStringGetTextDatum(roname);
1566 10 : nulls[1] = false;
1567 : }
1568 :
1569 10 : LWLockAcquire(&state->lock, LW_SHARED);
1570 :
1571 10 : values[2] = LSNGetDatum(state->remote_lsn);
1572 10 : nulls[2] = false;
1573 :
1574 10 : values[3] = LSNGetDatum(state->local_lsn);
1575 10 : nulls[3] = false;
1576 :
1577 10 : LWLockRelease(&state->lock);
1578 :
1579 10 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1580 : values, nulls);
1581 : }
1582 :
1583 12 : LWLockRelease(ReplicationOriginLock);
1584 :
1585 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1586 :
1587 12 : return (Datum) 0;
1588 : }
|