Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consist out of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "utils/builtins.h"
92 : #include "utils/fmgroids.h"
93 : #include "utils/guc.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 :
99 : /* paths for replication origin checkpoint files */
100 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 :
103 : /* GUC variables */
104 : int max_active_replication_origins = 10;
105 :
106 : /*
107 : * Replay progress of a single remote node.
108 : */
109 : typedef struct ReplicationState
110 : {
111 : /*
112 : * Local identifier for the remote node.
113 : */
114 : RepOriginId roident;
115 :
116 : /*
117 : * Location of the latest commit from the remote side.
118 : */
119 : XLogRecPtr remote_lsn;
120 :
121 : /*
122 : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : * during a checkpoint so we know the commit record actually is safe on
124 : * disk.
125 : */
126 : XLogRecPtr local_lsn;
127 :
128 : /*
129 : * PID of backend that's acquired slot, or 0 if none.
130 : */
131 : int acquired_by;
132 :
133 : /*
134 : * Condition variable that's signaled when acquired_by changes.
135 : */
136 : ConditionVariable origin_cv;
137 :
138 : /*
139 : * Lock protecting remote_lsn and local_lsn.
140 : */
141 : LWLock lock;
142 : } ReplicationState;
143 :
144 : /*
145 : * On disk version of ReplicationState.
146 : */
147 : typedef struct ReplicationStateOnDisk
148 : {
149 : RepOriginId roident;
150 : XLogRecPtr remote_lsn;
151 : } ReplicationStateOnDisk;
152 :
153 :
154 : typedef struct ReplicationStateCtl
155 : {
156 : /* Tranche to use for per-origin LWLocks */
157 : int tranche_id;
158 : /* Array of length max_active_replication_origins */
159 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
160 : } ReplicationStateCtl;
161 :
162 : /* external variables */
163 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
164 : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
165 : TimestampTz replorigin_session_origin_timestamp = 0;
166 :
167 : /*
168 : * Base address into a shared memory array of replication states of size
169 : * max_active_replication_origins.
170 : */
171 : static ReplicationState *replication_states;
172 :
173 : /*
174 : * Actual shared memory block (replication_states[] is now part of this).
175 : */
176 : static ReplicationStateCtl *replication_states_ctl;
177 :
178 : /*
179 : * We keep a pointer to this backend's ReplicationState to avoid having to
180 : * search the replication_states array in replorigin_session_advance for each
181 : * remote commit. (Ownership of a backend's own entry can only be changed by
182 : * that backend.)
183 : */
184 : static ReplicationState *session_replication_state = NULL;
185 :
186 : /* Magic for on disk files. */
187 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188 :
189 : static void
190 84 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
191 : {
192 84 : if (check_origins && max_active_replication_origins == 0)
193 0 : ereport(ERROR,
194 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196 :
197 84 : if (!recoveryOK && RecoveryInProgress())
198 0 : ereport(ERROR,
199 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 : errmsg("cannot manipulate replication origins during recovery")));
201 84 : }
202 :
203 :
204 : /*
205 : * IsReservedOriginName
206 : * True iff name is either "none" or "any".
207 : */
208 : static bool
209 16 : IsReservedOriginName(const char *name)
210 : {
211 30 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212 14 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213 : }
214 :
215 : /* ---------------------------------------------------------------------------
216 : * Functions for working with replication origins themselves.
217 : * ---------------------------------------------------------------------------
218 : */
219 :
220 : /*
221 : * Check for a persistent replication origin identified by name.
222 : *
223 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
224 : */
225 : RepOriginId
226 1824 : replorigin_by_name(const char *roname, bool missing_ok)
227 : {
228 : Form_pg_replication_origin ident;
229 1824 : Oid roident = InvalidOid;
230 : HeapTuple tuple;
231 : Datum roname_d;
232 :
233 1824 : roname_d = CStringGetTextDatum(roname);
234 :
235 1824 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 1824 : if (HeapTupleIsValid(tuple))
237 : {
238 1064 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
239 1064 : roident = ident->roident;
240 1064 : ReleaseSysCache(tuple);
241 : }
242 760 : else if (!missing_ok)
243 8 : ereport(ERROR,
244 : (errcode(ERRCODE_UNDEFINED_OBJECT),
245 : errmsg("replication origin \"%s\" does not exist",
246 : roname)));
247 :
248 1816 : return roident;
249 : }
250 :
251 : /*
252 : * Create a replication origin.
253 : *
254 : * Needs to be called in a transaction.
255 : */
256 : RepOriginId
257 708 : replorigin_create(const char *roname)
258 : {
259 : Oid roident;
260 708 : HeapTuple tuple = NULL;
261 : Relation rel;
262 : Datum roname_d;
263 : SnapshotData SnapshotDirty;
264 : SysScanDesc scan;
265 : ScanKeyData key;
266 :
267 708 : roname_d = CStringGetTextDatum(roname);
268 :
269 : Assert(IsTransactionState());
270 :
271 : /*
272 : * We need the numeric replication origin to be 16bit wide, so we cannot
273 : * rely on the normal oid allocation. Instead we simply scan
274 : * pg_replication_origin for the first unused id. That's not particularly
275 : * efficient, but this should be a fairly infrequent operation - we can
276 : * easily spend a bit more code on this when it turns out it needs to be
277 : * faster.
278 : *
279 : * We handle concurrency by taking an exclusive lock (allowing reads!)
280 : * over the table for the duration of the search. Because we use a "dirty
281 : * snapshot" we can read rows that other in-progress sessions have
282 : * written, even though they would be invisible with normal snapshots. Due
283 : * to the exclusive lock there's no danger that new rows can appear while
284 : * we're checking.
285 : */
286 708 : InitDirtySnapshot(SnapshotDirty);
287 :
288 708 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
289 :
290 1238 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
291 : {
292 : bool nulls[Natts_pg_replication_origin];
293 : Datum values[Natts_pg_replication_origin];
294 : bool collides;
295 :
296 1238 : CHECK_FOR_INTERRUPTS();
297 :
298 1238 : ScanKeyInit(&key,
299 : Anum_pg_replication_origin_roident,
300 : BTEqualStrategyNumber, F_OIDEQ,
301 : ObjectIdGetDatum(roident));
302 :
303 1238 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
304 : true /* indexOK */ ,
305 : &SnapshotDirty,
306 : 1, &key);
307 :
308 1238 : collides = HeapTupleIsValid(systable_getnext(scan));
309 :
310 1238 : systable_endscan(scan);
311 :
312 1238 : if (!collides)
313 : {
314 : /*
315 : * Ok, found an unused roident, insert the new row and do a CCI,
316 : * so our callers can look it up if they want to.
317 : */
318 708 : memset(&nulls, 0, sizeof(nulls));
319 :
320 708 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
321 708 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
322 :
323 708 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
324 708 : CatalogTupleInsert(rel, tuple);
325 706 : CommandCounterIncrement();
326 706 : break;
327 : }
328 : }
329 :
330 : /* now release lock again, */
331 706 : table_close(rel, ExclusiveLock);
332 :
333 706 : if (tuple == NULL)
334 0 : ereport(ERROR,
335 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
336 : errmsg("could not find free replication origin ID")));
337 :
338 706 : heap_freetuple(tuple);
339 706 : return roident;
340 : }
341 :
342 : /*
343 : * Helper function to drop a replication origin.
344 : */
345 : static void
346 582 : replorigin_state_clear(RepOriginId roident, bool nowait)
347 : {
348 : int i;
349 :
350 : /*
351 : * Clean up the slot state info, if there is any matching slot.
352 : */
353 582 : restart:
354 582 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
355 :
356 1762 : for (i = 0; i < max_active_replication_origins; i++)
357 : {
358 1688 : ReplicationState *state = &replication_states[i];
359 :
360 1688 : if (state->roident == roident)
361 : {
362 : /* found our slot, is it busy? */
363 508 : if (state->acquired_by != 0)
364 : {
365 : ConditionVariable *cv;
366 :
367 0 : if (nowait)
368 0 : ereport(ERROR,
369 : (errcode(ERRCODE_OBJECT_IN_USE),
370 : errmsg("could not drop replication origin with ID %d, in use by PID %d",
371 : state->roident,
372 : state->acquired_by)));
373 :
374 : /*
375 : * We must wait and then retry. Since we don't know which CV
376 : * to wait on until here, we can't readily use
377 : * ConditionVariablePrepareToSleep (calling it here would be
378 : * wrong, since we could miss the signal if we did so); just
379 : * use ConditionVariableSleep directly.
380 : */
381 0 : cv = &state->origin_cv;
382 :
383 0 : LWLockRelease(ReplicationOriginLock);
384 :
385 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
386 0 : goto restart;
387 : }
388 :
389 : /* first make a WAL log entry */
390 : {
391 : xl_replorigin_drop xlrec;
392 :
393 508 : xlrec.node_id = roident;
394 508 : XLogBeginInsert();
395 508 : XLogRegisterData(&xlrec, sizeof(xlrec));
396 508 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
397 : }
398 :
399 : /* then clear the in-memory slot */
400 508 : state->roident = InvalidRepOriginId;
401 508 : state->remote_lsn = InvalidXLogRecPtr;
402 508 : state->local_lsn = InvalidXLogRecPtr;
403 508 : break;
404 : }
405 : }
406 582 : LWLockRelease(ReplicationOriginLock);
407 582 : ConditionVariableCancelSleep();
408 582 : }
409 :
410 : /*
411 : * Drop replication origin (by name).
412 : *
413 : * Needs to be called in a transaction.
414 : */
415 : void
416 954 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
417 : {
418 : RepOriginId roident;
419 : Relation rel;
420 : HeapTuple tuple;
421 :
422 : Assert(IsTransactionState());
423 :
424 954 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
425 :
426 954 : roident = replorigin_by_name(name, missing_ok);
427 :
428 : /* Lock the origin to prevent concurrent drops. */
429 952 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
430 : AccessExclusiveLock);
431 :
432 952 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
433 952 : if (!HeapTupleIsValid(tuple))
434 : {
435 370 : if (!missing_ok)
436 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
437 : roident);
438 :
439 : /*
440 : * We don't need to retain the locks if the origin is already dropped.
441 : */
442 370 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
443 : AccessExclusiveLock);
444 370 : table_close(rel, RowExclusiveLock);
445 370 : return;
446 : }
447 :
448 582 : replorigin_state_clear(roident, nowait);
449 :
450 : /*
451 : * Now, we can delete the catalog entry.
452 : */
453 582 : CatalogTupleDelete(rel, &tuple->t_self);
454 582 : ReleaseSysCache(tuple);
455 :
456 582 : CommandCounterIncrement();
457 :
458 : /* We keep the lock on pg_replication_origin until commit */
459 582 : table_close(rel, NoLock);
460 : }
461 :
462 : /*
463 : * Lookup replication origin via its oid and return the name.
464 : *
465 : * The external name is palloc'd in the calling context.
466 : *
467 : * Returns true if the origin is known, false otherwise.
468 : */
469 : bool
470 40 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
471 : {
472 : HeapTuple tuple;
473 : Form_pg_replication_origin ric;
474 :
475 : Assert(OidIsValid((Oid) roident));
476 : Assert(roident != InvalidRepOriginId);
477 : Assert(roident != DoNotReplicateId);
478 :
479 40 : tuple = SearchSysCache1(REPLORIGIDENT,
480 : ObjectIdGetDatum((Oid) roident));
481 :
482 40 : if (HeapTupleIsValid(tuple))
483 : {
484 34 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
485 34 : *roname = text_to_cstring(&ric->roname);
486 34 : ReleaseSysCache(tuple);
487 :
488 34 : return true;
489 : }
490 : else
491 : {
492 6 : *roname = NULL;
493 :
494 6 : if (!missing_ok)
495 0 : ereport(ERROR,
496 : (errcode(ERRCODE_UNDEFINED_OBJECT),
497 : errmsg("replication origin with ID %d does not exist",
498 : roident)));
499 :
500 6 : return false;
501 : }
502 : }
503 :
504 :
505 : /* ---------------------------------------------------------------------------
506 : * Functions for handling replication progress.
507 : * ---------------------------------------------------------------------------
508 : */
509 :
510 : Size
511 8044 : ReplicationOriginShmemSize(void)
512 : {
513 8044 : Size size = 0;
514 :
515 8044 : if (max_active_replication_origins == 0)
516 4 : return size;
517 :
518 8040 : size = add_size(size, offsetof(ReplicationStateCtl, states));
519 :
520 8040 : size = add_size(size,
521 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
522 8040 : return size;
523 : }
524 :
525 : void
526 2084 : ReplicationOriginShmemInit(void)
527 : {
528 : bool found;
529 :
530 2084 : if (max_active_replication_origins == 0)
531 2 : return;
532 :
533 2082 : replication_states_ctl = (ReplicationStateCtl *)
534 2082 : ShmemInitStruct("ReplicationOriginState",
535 : ReplicationOriginShmemSize(),
536 : &found);
537 2082 : replication_states = replication_states_ctl->states;
538 :
539 2082 : if (!found)
540 : {
541 : int i;
542 :
543 149778 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
544 :
545 2082 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
546 :
547 22884 : for (i = 0; i < max_active_replication_origins; i++)
548 : {
549 20802 : LWLockInitialize(&replication_states[i].lock,
550 20802 : replication_states_ctl->tranche_id);
551 20802 : ConditionVariableInit(&replication_states[i].origin_cv);
552 : }
553 : }
554 : }
555 :
556 : /* ---------------------------------------------------------------------------
557 : * Perform a checkpoint of each replication origin's progress with respect to
558 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
559 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
560 : * if the transactions were originally committed asynchronously.
561 : *
562 : * We store checkpoints in the following format:
563 : * +-------+------------------------+------------------+-----+--------+
564 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
565 : * +-------+------------------------+------------------+-----+--------+
566 : *
567 : * So its just the magic, followed by the statically sized
568 : * ReplicationStateOnDisk structs. Note that the maximum number of
569 : * ReplicationState is determined by max_active_replication_origins.
570 : * ---------------------------------------------------------------------------
571 : */
572 : void
573 2658 : CheckPointReplicationOrigin(void)
574 : {
575 2658 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
576 2658 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
577 : int tmpfd;
578 : int i;
579 2658 : uint32 magic = REPLICATION_STATE_MAGIC;
580 : pg_crc32c crc;
581 :
582 2658 : if (max_active_replication_origins == 0)
583 2 : return;
584 :
585 2656 : INIT_CRC32C(crc);
586 :
587 : /* make sure no old temp file is remaining */
588 2656 : if (unlink(tmppath) < 0 && errno != ENOENT)
589 0 : ereport(PANIC,
590 : (errcode_for_file_access(),
591 : errmsg("could not remove file \"%s\": %m",
592 : tmppath)));
593 :
594 : /*
595 : * no other backend can perform this at the same time; only one checkpoint
596 : * can happen at a time.
597 : */
598 2656 : tmpfd = OpenTransientFile(tmppath,
599 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
600 2656 : if (tmpfd < 0)
601 0 : ereport(PANIC,
602 : (errcode_for_file_access(),
603 : errmsg("could not create file \"%s\": %m",
604 : tmppath)));
605 :
606 : /* write magic */
607 2656 : errno = 0;
608 2656 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
609 : {
610 : /* if write didn't set errno, assume problem is no disk space */
611 0 : if (errno == 0)
612 0 : errno = ENOSPC;
613 0 : ereport(PANIC,
614 : (errcode_for_file_access(),
615 : errmsg("could not write to file \"%s\": %m",
616 : tmppath)));
617 : }
618 2656 : COMP_CRC32C(crc, &magic, sizeof(magic));
619 :
620 : /* prevent concurrent creations/drops */
621 2656 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
622 :
623 : /* write actual data */
624 29216 : for (i = 0; i < max_active_replication_origins; i++)
625 : {
626 : ReplicationStateOnDisk disk_state;
627 26560 : ReplicationState *curstate = &replication_states[i];
628 : XLogRecPtr local_lsn;
629 :
630 26560 : if (curstate->roident == InvalidRepOriginId)
631 26474 : continue;
632 :
633 : /* zero, to avoid uninitialized padding bytes */
634 86 : memset(&disk_state, 0, sizeof(disk_state));
635 :
636 86 : LWLockAcquire(&curstate->lock, LW_SHARED);
637 :
638 86 : disk_state.roident = curstate->roident;
639 :
640 86 : disk_state.remote_lsn = curstate->remote_lsn;
641 86 : local_lsn = curstate->local_lsn;
642 :
643 86 : LWLockRelease(&curstate->lock);
644 :
645 : /* make sure we only write out a commit that's persistent */
646 86 : XLogFlush(local_lsn);
647 :
648 86 : errno = 0;
649 86 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
650 : sizeof(disk_state))
651 : {
652 : /* if write didn't set errno, assume problem is no disk space */
653 0 : if (errno == 0)
654 0 : errno = ENOSPC;
655 0 : ereport(PANIC,
656 : (errcode_for_file_access(),
657 : errmsg("could not write to file \"%s\": %m",
658 : tmppath)));
659 : }
660 :
661 86 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
662 : }
663 :
664 2656 : LWLockRelease(ReplicationOriginLock);
665 :
666 : /* write out the CRC */
667 2656 : FIN_CRC32C(crc);
668 2656 : errno = 0;
669 2656 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
670 : {
671 : /* if write didn't set errno, assume problem is no disk space */
672 0 : if (errno == 0)
673 0 : errno = ENOSPC;
674 0 : ereport(PANIC,
675 : (errcode_for_file_access(),
676 : errmsg("could not write to file \"%s\": %m",
677 : tmppath)));
678 : }
679 :
680 2656 : if (CloseTransientFile(tmpfd) != 0)
681 0 : ereport(PANIC,
682 : (errcode_for_file_access(),
683 : errmsg("could not close file \"%s\": %m",
684 : tmppath)));
685 :
686 : /* fsync, rename to permanent file, fsync file and directory */
687 2656 : durable_rename(tmppath, path, PANIC);
688 : }
689 :
690 : /*
691 : * Recover replication replay status from checkpoint data saved earlier by
692 : * CheckPointReplicationOrigin.
693 : *
694 : * This only needs to be called at startup and *not* during every checkpoint
695 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
696 : * state thereafter can be recovered by looking at commit records.
697 : */
698 : void
699 1800 : StartupReplicationOrigin(void)
700 : {
701 1800 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
702 : int fd;
703 : int readBytes;
704 1800 : uint32 magic = REPLICATION_STATE_MAGIC;
705 1800 : int last_state = 0;
706 : pg_crc32c file_crc;
707 : pg_crc32c crc;
708 :
709 : /* don't want to overwrite already existing state */
710 : #ifdef USE_ASSERT_CHECKING
711 : static bool already_started = false;
712 :
713 : Assert(!already_started);
714 : already_started = true;
715 : #endif
716 :
717 1800 : if (max_active_replication_origins == 0)
718 98 : return;
719 :
720 1798 : INIT_CRC32C(crc);
721 :
722 1798 : elog(DEBUG2, "starting up replication origin progress state");
723 :
724 1798 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
725 :
726 : /*
727 : * might have had max_active_replication_origins == 0 last run, or we just
728 : * brought up a standby.
729 : */
730 1798 : if (fd < 0 && errno == ENOENT)
731 96 : return;
732 1702 : else if (fd < 0)
733 0 : ereport(PANIC,
734 : (errcode_for_file_access(),
735 : errmsg("could not open file \"%s\": %m",
736 : path)));
737 :
738 : /* verify magic, that is written even if nothing was active */
739 1702 : readBytes = read(fd, &magic, sizeof(magic));
740 1702 : if (readBytes != sizeof(magic))
741 : {
742 0 : if (readBytes < 0)
743 0 : ereport(PANIC,
744 : (errcode_for_file_access(),
745 : errmsg("could not read file \"%s\": %m",
746 : path)));
747 : else
748 0 : ereport(PANIC,
749 : (errcode(ERRCODE_DATA_CORRUPTED),
750 : errmsg("could not read file \"%s\": read %d of %zu",
751 : path, readBytes, sizeof(magic))));
752 : }
753 1702 : COMP_CRC32C(crc, &magic, sizeof(magic));
754 :
755 1702 : if (magic != REPLICATION_STATE_MAGIC)
756 0 : ereport(PANIC,
757 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
758 : magic, REPLICATION_STATE_MAGIC)));
759 :
760 : /* we can skip locking here, no other access is possible */
761 :
762 : /* recover individual states, until there are no more to be found */
763 : while (true)
764 38 : {
765 : ReplicationStateOnDisk disk_state;
766 :
767 1740 : readBytes = read(fd, &disk_state, sizeof(disk_state));
768 :
769 : /* no further data */
770 1740 : if (readBytes == sizeof(crc))
771 : {
772 : /* not pretty, but simple ... */
773 1702 : file_crc = *(pg_crc32c *) &disk_state;
774 1702 : break;
775 : }
776 :
777 38 : if (readBytes < 0)
778 : {
779 0 : ereport(PANIC,
780 : (errcode_for_file_access(),
781 : errmsg("could not read file \"%s\": %m",
782 : path)));
783 : }
784 :
785 38 : if (readBytes != sizeof(disk_state))
786 : {
787 0 : ereport(PANIC,
788 : (errcode_for_file_access(),
789 : errmsg("could not read file \"%s\": read %d of %zu",
790 : path, readBytes, sizeof(disk_state))));
791 : }
792 :
793 38 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
794 :
795 38 : if (last_state == max_active_replication_origins)
796 0 : ereport(PANIC,
797 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
798 : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
799 :
800 : /* copy data to shared memory */
801 38 : replication_states[last_state].roident = disk_state.roident;
802 38 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
803 38 : last_state++;
804 :
805 38 : ereport(LOG,
806 : (errmsg("recovered replication state of node %d to %X/%X",
807 : disk_state.roident,
808 : LSN_FORMAT_ARGS(disk_state.remote_lsn))));
809 : }
810 :
811 : /* now check checksum */
812 1702 : FIN_CRC32C(crc);
813 1702 : if (file_crc != crc)
814 0 : ereport(PANIC,
815 : (errcode(ERRCODE_DATA_CORRUPTED),
816 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
817 : crc, file_crc)));
818 :
819 1702 : if (CloseTransientFile(fd) != 0)
820 0 : ereport(PANIC,
821 : (errcode_for_file_access(),
822 : errmsg("could not close file \"%s\": %m",
823 : path)));
824 : }
825 :
826 : void
827 8 : replorigin_redo(XLogReaderState *record)
828 : {
829 8 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
830 :
831 8 : switch (info)
832 : {
833 4 : case XLOG_REPLORIGIN_SET:
834 : {
835 4 : xl_replorigin_set *xlrec =
836 4 : (xl_replorigin_set *) XLogRecGetData(record);
837 :
838 4 : replorigin_advance(xlrec->node_id,
839 : xlrec->remote_lsn, record->EndRecPtr,
840 4 : xlrec->force /* backward */ ,
841 : false /* WAL log */ );
842 4 : break;
843 : }
844 4 : case XLOG_REPLORIGIN_DROP:
845 : {
846 : xl_replorigin_drop *xlrec;
847 : int i;
848 :
849 4 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
850 :
851 6 : for (i = 0; i < max_active_replication_origins; i++)
852 : {
853 6 : ReplicationState *state = &replication_states[i];
854 :
855 : /* found our slot */
856 6 : if (state->roident == xlrec->node_id)
857 : {
858 : /* reset entry */
859 4 : state->roident = InvalidRepOriginId;
860 4 : state->remote_lsn = InvalidXLogRecPtr;
861 4 : state->local_lsn = InvalidXLogRecPtr;
862 4 : break;
863 : }
864 : }
865 4 : break;
866 : }
867 0 : default:
868 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
869 : }
870 8 : }
871 :
872 :
873 : /*
874 : * Tell the replication origin progress machinery that a commit from 'node'
875 : * that originated at the LSN remote_commit on the remote node was replayed
876 : * successfully and that we don't need to do so again. In combination with
877 : * setting up replorigin_session_origin_lsn and replorigin_session_origin
878 : * that ensures we won't lose knowledge about that after a crash if the
879 : * transaction had a persistent effect (think of asynchronous commits).
880 : *
881 : * local_commit needs to be a local LSN of the commit so that we can make sure
882 : * upon a checkpoint that enough WAL has been persisted to disk.
883 : *
884 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
885 : * unless running in recovery.
886 : */
887 : void
888 468 : replorigin_advance(RepOriginId node,
889 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
890 : bool go_backward, bool wal_log)
891 : {
892 : int i;
893 468 : ReplicationState *replication_state = NULL;
894 468 : ReplicationState *free_state = NULL;
895 :
896 : Assert(node != InvalidRepOriginId);
897 :
898 : /* we don't track DoNotReplicateId */
899 468 : if (node == DoNotReplicateId)
900 0 : return;
901 :
902 : /*
903 : * XXX: For the case where this is called by WAL replay, it'd be more
904 : * efficient to restore into a backend local hashtable and only dump into
905 : * shmem after recovery is finished. Let's wait with implementing that
906 : * till it's shown to be a measurable expense
907 : */
908 :
909 : /* Lock exclusively, as we may have to create a new table entry. */
910 468 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
911 :
912 : /*
913 : * Search for either an existing slot for the origin, or a free one we can
914 : * use.
915 : */
916 4306 : for (i = 0; i < max_active_replication_origins; i++)
917 : {
918 3924 : ReplicationState *curstate = &replication_states[i];
919 :
920 : /* remember where to insert if necessary */
921 3924 : if (curstate->roident == InvalidRepOriginId &&
922 : free_state == NULL)
923 : {
924 382 : free_state = curstate;
925 382 : continue;
926 : }
927 :
928 : /* not our slot */
929 3542 : if (curstate->roident != node)
930 : {
931 3456 : continue;
932 : }
933 :
934 : /* ok, found slot */
935 86 : replication_state = curstate;
936 :
937 86 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
938 :
939 : /* Make sure it's not used by somebody else */
940 86 : if (replication_state->acquired_by != 0)
941 : {
942 0 : ereport(ERROR,
943 : (errcode(ERRCODE_OBJECT_IN_USE),
944 : errmsg("replication origin with ID %d is already active for PID %d",
945 : replication_state->roident,
946 : replication_state->acquired_by)));
947 : }
948 :
949 86 : break;
950 : }
951 :
952 468 : if (replication_state == NULL && free_state == NULL)
953 0 : ereport(ERROR,
954 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
955 : errmsg("could not find free replication state slot for replication origin with ID %d",
956 : node),
957 : errhint("Increase \"max_active_replication_origins\" and try again.")));
958 :
959 468 : if (replication_state == NULL)
960 : {
961 : /* initialize new slot */
962 382 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
963 382 : replication_state = free_state;
964 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
965 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
966 382 : replication_state->roident = node;
967 : }
968 :
969 : Assert(replication_state->roident != InvalidRepOriginId);
970 :
971 : /*
972 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
973 : * and the standby gets the message. Primarily this will be called during
974 : * WAL replay (of commit records) where no WAL logging is necessary.
975 : */
976 468 : if (wal_log)
977 : {
978 : xl_replorigin_set xlrec;
979 :
980 386 : xlrec.remote_lsn = remote_commit;
981 386 : xlrec.node_id = node;
982 386 : xlrec.force = go_backward;
983 :
984 386 : XLogBeginInsert();
985 386 : XLogRegisterData(&xlrec, sizeof(xlrec));
986 :
987 386 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
988 : }
989 :
990 : /*
991 : * Due to - harmless - race conditions during a checkpoint we could see
992 : * values here that are older than the ones we already have in memory. We
993 : * could also see older values for prepared transactions when the prepare
994 : * is sent at a later point of time along with commit prepared and there
995 : * are other transactions commits between prepare and commit prepared. See
996 : * ReorderBufferFinishPrepared. Don't overwrite those.
997 : */
998 468 : if (go_backward || replication_state->remote_lsn < remote_commit)
999 454 : replication_state->remote_lsn = remote_commit;
1000 468 : if (local_commit != InvalidXLogRecPtr &&
1001 76 : (go_backward || replication_state->local_lsn < local_commit))
1002 80 : replication_state->local_lsn = local_commit;
1003 468 : LWLockRelease(&replication_state->lock);
1004 :
1005 : /*
1006 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1007 : * otherwise be dropped anytime.
1008 : */
1009 468 : LWLockRelease(ReplicationOriginLock);
1010 : }
1011 :
1012 :
1013 : XLogRecPtr
1014 16 : replorigin_get_progress(RepOriginId node, bool flush)
1015 : {
1016 : int i;
1017 16 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1018 16 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1019 :
1020 : /* prevent slots from being concurrently dropped */
1021 16 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1022 :
1023 76 : for (i = 0; i < max_active_replication_origins; i++)
1024 : {
1025 : ReplicationState *state;
1026 :
1027 70 : state = &replication_states[i];
1028 :
1029 70 : if (state->roident == node)
1030 : {
1031 10 : LWLockAcquire(&state->lock, LW_SHARED);
1032 :
1033 10 : remote_lsn = state->remote_lsn;
1034 10 : local_lsn = state->local_lsn;
1035 :
1036 10 : LWLockRelease(&state->lock);
1037 :
1038 10 : break;
1039 : }
1040 : }
1041 :
1042 16 : LWLockRelease(ReplicationOriginLock);
1043 :
1044 16 : if (flush && local_lsn != InvalidXLogRecPtr)
1045 2 : XLogFlush(local_lsn);
1046 :
1047 16 : return remote_lsn;
1048 : }
1049 :
1050 : /*
1051 : * Tear down a (possibly) configured session replication origin during process
1052 : * exit.
1053 : */
1054 : static void
1055 836 : ReplicationOriginExitCleanup(int code, Datum arg)
1056 : {
1057 836 : ConditionVariable *cv = NULL;
1058 :
1059 836 : if (session_replication_state == NULL)
1060 366 : return;
1061 :
1062 470 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1063 :
1064 470 : if (session_replication_state->acquired_by == MyProcPid)
1065 : {
1066 450 : cv = &session_replication_state->origin_cv;
1067 :
1068 450 : session_replication_state->acquired_by = 0;
1069 450 : session_replication_state = NULL;
1070 : }
1071 :
1072 470 : LWLockRelease(ReplicationOriginLock);
1073 :
1074 470 : if (cv)
1075 450 : ConditionVariableBroadcast(cv);
1076 : }
1077 :
1078 : /*
1079 : * Setup a replication origin in the shared memory struct if it doesn't
1080 : * already exist and cache access to the specific ReplicationSlot so the
1081 : * array doesn't have to be searched when calling
1082 : * replorigin_session_advance().
1083 : *
1084 : * Normally only one such cached origin can exist per process so the cached
1085 : * value can only be set again after the previous value is torn down with
1086 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1087 : * (meaning the slot is not allowed to be already acquired by another process).
1088 : *
1089 : * However, sometimes multiple processes can safely re-use the same origin slot
1090 : * (for example, multiple parallel apply processes can safely use the same
1091 : * origin, provided they maintain commit order by allowing only one process to
1092 : * commit at a time). For this case the first process must pass acquired_by =
1093 : * 0, and then the other processes sharing that same origin can pass
1094 : * acquired_by = PID of the first process.
1095 : */
1096 : void
1097 842 : replorigin_session_setup(RepOriginId node, int acquired_by)
1098 : {
1099 : static bool registered_cleanup;
1100 : int i;
1101 842 : int free_slot = -1;
1102 :
1103 842 : if (!registered_cleanup)
1104 : {
1105 836 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1106 836 : registered_cleanup = true;
1107 : }
1108 :
1109 : Assert(max_active_replication_origins > 0);
1110 :
1111 842 : if (session_replication_state != NULL)
1112 2 : ereport(ERROR,
1113 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1114 : errmsg("cannot setup replication origin when one is already setup")));
1115 :
1116 : /* Lock exclusively, as we may have to create a new table entry. */
1117 840 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1118 :
1119 : /*
1120 : * Search for either an existing slot for the origin, or a free one we can
1121 : * use.
1122 : */
1123 3556 : for (i = 0; i < max_active_replication_origins; i++)
1124 : {
1125 3332 : ReplicationState *curstate = &replication_states[i];
1126 :
1127 : /* remember where to insert if necessary */
1128 3332 : if (curstate->roident == InvalidRepOriginId &&
1129 : free_slot == -1)
1130 : {
1131 228 : free_slot = i;
1132 228 : continue;
1133 : }
1134 :
1135 : /* not our slot */
1136 3104 : if (curstate->roident != node)
1137 2488 : continue;
1138 :
1139 616 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1140 : {
1141 0 : ereport(ERROR,
1142 : (errcode(ERRCODE_OBJECT_IN_USE),
1143 : errmsg("replication origin with ID %d is already active for PID %d",
1144 : curstate->roident, curstate->acquired_by)));
1145 : }
1146 :
1147 : /* ok, found slot */
1148 616 : session_replication_state = curstate;
1149 616 : break;
1150 : }
1151 :
1152 :
1153 840 : if (session_replication_state == NULL && free_slot == -1)
1154 0 : ereport(ERROR,
1155 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1156 : errmsg("could not find free replication state slot for replication origin with ID %d",
1157 : node),
1158 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1159 840 : else if (session_replication_state == NULL)
1160 : {
1161 : /* initialize new slot */
1162 224 : session_replication_state = &replication_states[free_slot];
1163 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1164 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1165 224 : session_replication_state->roident = node;
1166 : }
1167 :
1168 :
1169 : Assert(session_replication_state->roident != InvalidRepOriginId);
1170 :
1171 840 : if (acquired_by == 0)
1172 820 : session_replication_state->acquired_by = MyProcPid;
1173 20 : else if (session_replication_state->acquired_by != acquired_by)
1174 0 : elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175 : node, acquired_by);
1176 :
1177 840 : LWLockRelease(ReplicationOriginLock);
1178 :
1179 : /* probably this one is pointless */
1180 840 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1181 840 : }
1182 :
1183 : /*
1184 : * Reset replay state previously setup in this session.
1185 : *
1186 : * This function may only be called if an origin was setup with
1187 : * replorigin_session_setup().
1188 : */
1189 : void
1190 372 : replorigin_session_reset(void)
1191 : {
1192 : ConditionVariable *cv;
1193 :
1194 : Assert(max_active_replication_origins != 0);
1195 :
1196 372 : if (session_replication_state == NULL)
1197 2 : ereport(ERROR,
1198 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1199 : errmsg("no replication origin is configured")));
1200 :
1201 370 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1202 :
1203 370 : session_replication_state->acquired_by = 0;
1204 370 : cv = &session_replication_state->origin_cv;
1205 370 : session_replication_state = NULL;
1206 :
1207 370 : LWLockRelease(ReplicationOriginLock);
1208 :
1209 370 : ConditionVariableBroadcast(cv);
1210 370 : }
1211 :
1212 : /*
1213 : * Do the same work replorigin_advance() does, just on the session's
1214 : * configured origin.
1215 : *
1216 : * This is noticeably cheaper than using replorigin_advance().
1217 : */
1218 : void
1219 2148 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1220 : {
1221 : Assert(session_replication_state != NULL);
1222 : Assert(session_replication_state->roident != InvalidRepOriginId);
1223 :
1224 2148 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1225 2148 : if (session_replication_state->local_lsn < local_commit)
1226 2148 : session_replication_state->local_lsn = local_commit;
1227 2148 : if (session_replication_state->remote_lsn < remote_commit)
1228 1014 : session_replication_state->remote_lsn = remote_commit;
1229 2148 : LWLockRelease(&session_replication_state->lock);
1230 2148 : }
1231 :
1232 : /*
1233 : * Ask the machinery about the point up to which we successfully replayed
1234 : * changes from an already setup replication origin.
1235 : */
1236 : XLogRecPtr
1237 434 : replorigin_session_get_progress(bool flush)
1238 : {
1239 : XLogRecPtr remote_lsn;
1240 : XLogRecPtr local_lsn;
1241 :
1242 : Assert(session_replication_state != NULL);
1243 :
1244 434 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1245 434 : remote_lsn = session_replication_state->remote_lsn;
1246 434 : local_lsn = session_replication_state->local_lsn;
1247 434 : LWLockRelease(&session_replication_state->lock);
1248 :
1249 434 : if (flush && local_lsn != InvalidXLogRecPtr)
1250 2 : XLogFlush(local_lsn);
1251 :
1252 434 : return remote_lsn;
1253 : }
1254 :
1255 :
1256 :
1257 : /* ---------------------------------------------------------------------------
1258 : * SQL functions for working with replication origin.
1259 : *
1260 : * These mostly should be fairly short wrappers around more generic functions.
1261 : * ---------------------------------------------------------------------------
1262 : */
1263 :
1264 : /*
1265 : * Create replication origin for the passed in name, and return the assigned
1266 : * oid.
1267 : */
1268 : Datum
1269 18 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1270 : {
1271 : char *name;
1272 : RepOriginId roident;
1273 :
1274 18 : replorigin_check_prerequisites(false, false);
1275 :
1276 18 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1277 :
1278 : /*
1279 : * Replication origins "any and "none" are reserved for system options.
1280 : * The origins "pg_xxx" are reserved for internal use.
1281 : */
1282 18 : if (IsReservedName(name) || IsReservedOriginName(name))
1283 6 : ereport(ERROR,
1284 : (errcode(ERRCODE_RESERVED_NAME),
1285 : errmsg("replication origin name \"%s\" is reserved",
1286 : name),
1287 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1288 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1289 :
1290 : /*
1291 : * If built with appropriate switch, whine when regression-testing
1292 : * conventions for replication origin names are violated.
1293 : */
1294 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1295 : if (strncmp(name, "regress_", 8) != 0)
1296 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1297 : #endif
1298 :
1299 12 : roident = replorigin_create(name);
1300 :
1301 10 : pfree(name);
1302 :
1303 10 : PG_RETURN_OID(roident);
1304 : }
1305 :
1306 : /*
1307 : * Drop replication origin.
1308 : */
1309 : Datum
1310 14 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1311 : {
1312 : char *name;
1313 :
1314 14 : replorigin_check_prerequisites(false, false);
1315 :
1316 14 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1317 :
1318 14 : replorigin_drop_by_name(name, false, true);
1319 :
1320 12 : pfree(name);
1321 :
1322 12 : PG_RETURN_VOID();
1323 : }
1324 :
1325 : /*
1326 : * Return oid of a replication origin.
1327 : */
1328 : Datum
1329 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1330 : {
1331 : char *name;
1332 : RepOriginId roident;
1333 :
1334 0 : replorigin_check_prerequisites(false, false);
1335 :
1336 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1337 0 : roident = replorigin_by_name(name, true);
1338 :
1339 0 : pfree(name);
1340 :
1341 0 : if (OidIsValid(roident))
1342 0 : PG_RETURN_OID(roident);
1343 0 : PG_RETURN_NULL();
1344 : }
1345 :
1346 : /*
1347 : * Setup a replication origin for this session.
1348 : */
1349 : Datum
1350 12 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1351 : {
1352 : char *name;
1353 : RepOriginId origin;
1354 :
1355 12 : replorigin_check_prerequisites(true, false);
1356 :
1357 12 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1358 12 : origin = replorigin_by_name(name, false);
1359 10 : replorigin_session_setup(origin, 0);
1360 :
1361 8 : replorigin_session_origin = origin;
1362 :
1363 8 : pfree(name);
1364 :
1365 8 : PG_RETURN_VOID();
1366 : }
1367 :
1368 : /*
1369 : * Reset previously setup origin in this session
1370 : */
1371 : Datum
1372 10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1373 : {
1374 10 : replorigin_check_prerequisites(true, false);
1375 :
1376 10 : replorigin_session_reset();
1377 :
1378 8 : replorigin_session_origin = InvalidRepOriginId;
1379 8 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1380 8 : replorigin_session_origin_timestamp = 0;
1381 :
1382 8 : PG_RETURN_VOID();
1383 : }
1384 :
1385 : /*
1386 : * Has a replication origin been setup for this session.
1387 : */
1388 : Datum
1389 0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1390 : {
1391 0 : replorigin_check_prerequisites(false, false);
1392 :
1393 0 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1394 : }
1395 :
1396 :
1397 : /*
1398 : * Return the replication progress for origin setup in the current session.
1399 : *
1400 : * If 'flush' is set to true it is ensured that the returned value corresponds
1401 : * to a local transaction that has been flushed. This is useful if asynchronous
1402 : * commits are used when replaying replicated transactions.
1403 : */
1404 : Datum
1405 4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1406 : {
1407 4 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1408 4 : bool flush = PG_GETARG_BOOL(0);
1409 :
1410 4 : replorigin_check_prerequisites(true, false);
1411 :
1412 4 : if (session_replication_state == NULL)
1413 0 : ereport(ERROR,
1414 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1415 : errmsg("no replication origin is configured")));
1416 :
1417 4 : remote_lsn = replorigin_session_get_progress(flush);
1418 :
1419 4 : if (remote_lsn == InvalidXLogRecPtr)
1420 0 : PG_RETURN_NULL();
1421 :
1422 4 : PG_RETURN_LSN(remote_lsn);
1423 : }
1424 :
1425 : Datum
1426 2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1427 : {
1428 2 : XLogRecPtr location = PG_GETARG_LSN(0);
1429 :
1430 2 : replorigin_check_prerequisites(true, false);
1431 :
1432 2 : if (session_replication_state == NULL)
1433 0 : ereport(ERROR,
1434 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1435 : errmsg("no replication origin is configured")));
1436 :
1437 2 : replorigin_session_origin_lsn = location;
1438 2 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1439 :
1440 2 : PG_RETURN_VOID();
1441 : }
1442 :
1443 : Datum
1444 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1445 : {
1446 0 : replorigin_check_prerequisites(true, false);
1447 :
1448 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1449 0 : replorigin_session_origin_timestamp = 0;
1450 :
1451 0 : PG_RETURN_VOID();
1452 : }
1453 :
1454 :
1455 : Datum
1456 6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1457 : {
1458 6 : text *name = PG_GETARG_TEXT_PP(0);
1459 6 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1460 : RepOriginId node;
1461 :
1462 6 : replorigin_check_prerequisites(true, false);
1463 :
1464 : /* lock to prevent the replication origin from vanishing */
1465 6 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1466 :
1467 6 : node = replorigin_by_name(text_to_cstring(name), false);
1468 :
1469 : /*
1470 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1471 : * xact hasn't committed yet. This is why this function should be used to
1472 : * set up the initial replication state, but not for replay.
1473 : */
1474 4 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1475 : true /* go backward */ , true /* WAL log */ );
1476 :
1477 4 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1478 :
1479 4 : PG_RETURN_VOID();
1480 : }
1481 :
1482 :
1483 : /*
1484 : * Return the replication progress for an individual replication origin.
1485 : *
1486 : * If 'flush' is set to true it is ensured that the returned value corresponds
1487 : * to a local transaction that has been flushed. This is useful if asynchronous
1488 : * commits are used when replaying replicated transactions.
1489 : */
1490 : Datum
1491 6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1492 : {
1493 : char *name;
1494 : bool flush;
1495 : RepOriginId roident;
1496 6 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1497 :
1498 6 : replorigin_check_prerequisites(true, true);
1499 :
1500 6 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1501 6 : flush = PG_GETARG_BOOL(1);
1502 :
1503 6 : roident = replorigin_by_name(name, false);
1504 : Assert(OidIsValid(roident));
1505 :
1506 4 : remote_lsn = replorigin_get_progress(roident, flush);
1507 :
1508 4 : if (remote_lsn == InvalidXLogRecPtr)
1509 0 : PG_RETURN_NULL();
1510 :
1511 4 : PG_RETURN_LSN(remote_lsn);
1512 : }
1513 :
1514 :
1515 : Datum
1516 12 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1517 : {
1518 12 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1519 : int i;
1520 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1521 :
1522 : /* we want to return 0 rows if slot is set to zero */
1523 12 : replorigin_check_prerequisites(false, true);
1524 :
1525 12 : InitMaterializedSRF(fcinfo, 0);
1526 :
1527 : /* prevent slots from being concurrently dropped */
1528 12 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1529 :
1530 : /*
1531 : * Iterate through all possible replication_states, display if they are
1532 : * filled. Note that we do not take any locks, so slightly corrupted/out
1533 : * of date values are a possibility.
1534 : */
1535 132 : for (i = 0; i < max_active_replication_origins; i++)
1536 : {
1537 : ReplicationState *state;
1538 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1539 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1540 : char *roname;
1541 :
1542 120 : state = &replication_states[i];
1543 :
1544 : /* unused slot, nothing to display */
1545 120 : if (state->roident == InvalidRepOriginId)
1546 110 : continue;
1547 :
1548 10 : memset(values, 0, sizeof(values));
1549 10 : memset(nulls, 1, sizeof(nulls));
1550 :
1551 10 : values[0] = ObjectIdGetDatum(state->roident);
1552 10 : nulls[0] = false;
1553 :
1554 : /*
1555 : * We're not preventing the origin to be dropped concurrently, so
1556 : * silently accept that it might be gone.
1557 : */
1558 10 : if (replorigin_by_oid(state->roident, true,
1559 : &roname))
1560 : {
1561 10 : values[1] = CStringGetTextDatum(roname);
1562 10 : nulls[1] = false;
1563 : }
1564 :
1565 10 : LWLockAcquire(&state->lock, LW_SHARED);
1566 :
1567 10 : values[2] = LSNGetDatum(state->remote_lsn);
1568 10 : nulls[2] = false;
1569 :
1570 10 : values[3] = LSNGetDatum(state->local_lsn);
1571 10 : nulls[3] = false;
1572 :
1573 10 : LWLockRelease(&state->lock);
1574 :
1575 10 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1576 : values, nulls);
1577 : }
1578 :
1579 12 : LWLockRelease(ReplicationOriginLock);
1580 :
1581 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1582 :
1583 12 : return (Datum) 0;
1584 : }
|