Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consist out of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "utils/builtins.h"
92 : #include "utils/fmgroids.h"
93 : #include "utils/guc.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 :
99 : /* paths for replication origin checkpoint files */
100 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 :
103 : /* GUC variables */
104 : int max_active_replication_origins = 10;
105 :
106 : /*
107 : * Replay progress of a single remote node.
108 : */
109 : typedef struct ReplicationState
110 : {
111 : /*
112 : * Local identifier for the remote node.
113 : */
114 : RepOriginId roident;
115 :
116 : /*
117 : * Location of the latest commit from the remote side.
118 : */
119 : XLogRecPtr remote_lsn;
120 :
121 : /*
122 : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : * during a checkpoint so we know the commit record actually is safe on
124 : * disk.
125 : */
126 : XLogRecPtr local_lsn;
127 :
128 : /*
129 : * PID of backend that's acquired slot, or 0 if none.
130 : */
131 : int acquired_by;
132 :
133 : /*
134 : * Condition variable that's signaled when acquired_by changes.
135 : */
136 : ConditionVariable origin_cv;
137 :
138 : /*
139 : * Lock protecting remote_lsn and local_lsn.
140 : */
141 : LWLock lock;
142 : } ReplicationState;
143 :
144 : /*
145 : * On disk version of ReplicationState.
146 : */
147 : typedef struct ReplicationStateOnDisk
148 : {
149 : RepOriginId roident;
150 : XLogRecPtr remote_lsn;
151 : } ReplicationStateOnDisk;
152 :
153 :
154 : typedef struct ReplicationStateCtl
155 : {
156 : /* Tranche to use for per-origin LWLocks */
157 : int tranche_id;
158 : /* Array of length max_active_replication_origins */
159 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
160 : } ReplicationStateCtl;
161 :
162 : /* external variables */
163 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
164 : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
165 : TimestampTz replorigin_session_origin_timestamp = 0;
166 :
167 : /*
168 : * Base address into a shared memory array of replication states of size
169 : * max_active_replication_origins.
170 : */
171 : static ReplicationState *replication_states;
172 :
173 : /*
174 : * Actual shared memory block (replication_states[] is now part of this).
175 : */
176 : static ReplicationStateCtl *replication_states_ctl;
177 :
178 : /*
179 : * We keep a pointer to this backend's ReplicationState to avoid having to
180 : * search the replication_states array in replorigin_session_advance for each
181 : * remote commit. (Ownership of a backend's own entry can only be changed by
182 : * that backend.)
183 : */
184 : static ReplicationState *session_replication_state = NULL;
185 :
186 : /* Magic for on disk files. */
187 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188 :
189 : static void
190 116 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
191 : {
192 116 : if (check_origins && max_active_replication_origins == 0)
193 0 : ereport(ERROR,
194 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196 :
197 116 : if (!recoveryOK && RecoveryInProgress())
198 0 : ereport(ERROR,
199 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 : errmsg("cannot manipulate replication origins during recovery")));
201 116 : }
202 :
203 :
204 : /*
205 : * IsReservedOriginName
206 : * True iff name is either "none" or "any".
207 : */
208 : static bool
209 24 : IsReservedOriginName(const char *name)
210 : {
211 46 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212 22 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213 : }
214 :
215 : /* ---------------------------------------------------------------------------
216 : * Functions for working with replication origins themselves.
217 : * ---------------------------------------------------------------------------
218 : */
219 :
220 : /*
221 : * Check for a persistent replication origin identified by name.
222 : *
223 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
224 : */
225 : RepOriginId
226 1988 : replorigin_by_name(const char *roname, bool missing_ok)
227 : {
228 : Form_pg_replication_origin ident;
229 1988 : Oid roident = InvalidOid;
230 : HeapTuple tuple;
231 : Datum roname_d;
232 :
233 1988 : roname_d = CStringGetTextDatum(roname);
234 :
235 1988 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 1988 : if (HeapTupleIsValid(tuple))
237 : {
238 1212 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
239 1212 : roident = ident->roident;
240 1212 : ReleaseSysCache(tuple);
241 : }
242 776 : else if (!missing_ok)
243 8 : ereport(ERROR,
244 : (errcode(ERRCODE_UNDEFINED_OBJECT),
245 : errmsg("replication origin \"%s\" does not exist",
246 : roname)));
247 :
248 1980 : return roident;
249 : }
250 :
251 : /*
252 : * Create a replication origin.
253 : *
254 : * Needs to be called in a transaction.
255 : */
256 : RepOriginId
257 754 : replorigin_create(const char *roname)
258 : {
259 : Oid roident;
260 754 : HeapTuple tuple = NULL;
261 : Relation rel;
262 : Datum roname_d;
263 : SnapshotData SnapshotDirty;
264 : SysScanDesc scan;
265 : ScanKeyData key;
266 :
267 : /*
268 : * To avoid needing a TOAST table for pg_replication_origin, we limit
269 : * replication origin names to 512 bytes. This should be more than enough
270 : * for all practical use.
271 : */
272 754 : if (strlen(roname) > MAX_RONAME_LEN)
273 6 : ereport(ERROR,
274 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 : errmsg("replication origin name is too long"),
276 : errdetail("Replication origin names must be no longer than %d bytes.",
277 : MAX_RONAME_LEN)));
278 :
279 748 : roname_d = CStringGetTextDatum(roname);
280 :
281 : Assert(IsTransactionState());
282 :
283 : /*
284 : * We need the numeric replication origin to be 16bit wide, so we cannot
285 : * rely on the normal oid allocation. Instead we simply scan
286 : * pg_replication_origin for the first unused id. That's not particularly
287 : * efficient, but this should be a fairly infrequent operation - we can
288 : * easily spend a bit more code on this when it turns out it needs to be
289 : * faster.
290 : *
291 : * We handle concurrency by taking an exclusive lock (allowing reads!)
292 : * over the table for the duration of the search. Because we use a "dirty
293 : * snapshot" we can read rows that other in-progress sessions have
294 : * written, even though they would be invisible with normal snapshots. Due
295 : * to the exclusive lock there's no danger that new rows can appear while
296 : * we're checking.
297 : */
298 748 : InitDirtySnapshot(SnapshotDirty);
299 :
300 748 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
301 :
302 : /*
303 : * We want to be able to access pg_replication_origin without setting up a
304 : * snapshot. To make that safe, it needs to not have a TOAST table, since
305 : * TOASTed data cannot be fetched without a snapshot. As of this writing,
306 : * its only varlena column is roname, which we limit to 512 bytes to avoid
307 : * needing out-of-line storage. If you add a TOAST table to this catalog,
308 : * be sure to set up a snapshot everywhere it might be needed. For more
309 : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
310 : */
311 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
312 :
313 1346 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
314 : {
315 : bool nulls[Natts_pg_replication_origin];
316 : Datum values[Natts_pg_replication_origin];
317 : bool collides;
318 :
319 1346 : CHECK_FOR_INTERRUPTS();
320 :
321 1346 : ScanKeyInit(&key,
322 : Anum_pg_replication_origin_roident,
323 : BTEqualStrategyNumber, F_OIDEQ,
324 : ObjectIdGetDatum(roident));
325 :
326 1346 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
327 : true /* indexOK */ ,
328 : &SnapshotDirty,
329 : 1, &key);
330 :
331 1346 : collides = HeapTupleIsValid(systable_getnext(scan));
332 :
333 1346 : systable_endscan(scan);
334 :
335 1346 : if (!collides)
336 : {
337 : /*
338 : * Ok, found an unused roident, insert the new row and do a CCI,
339 : * so our callers can look it up if they want to.
340 : */
341 748 : memset(&nulls, 0, sizeof(nulls));
342 :
343 748 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
344 748 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
345 :
346 748 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
347 748 : CatalogTupleInsert(rel, tuple);
348 746 : CommandCounterIncrement();
349 746 : break;
350 : }
351 : }
352 :
353 : /* now release lock again, */
354 746 : table_close(rel, ExclusiveLock);
355 :
356 746 : if (tuple == NULL)
357 0 : ereport(ERROR,
358 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 : errmsg("could not find free replication origin ID")));
360 :
361 746 : heap_freetuple(tuple);
362 746 : return roident;
363 : }
364 :
365 : /*
366 : * Helper function to drop a replication origin.
367 : */
368 : static void
369 612 : replorigin_state_clear(RepOriginId roident, bool nowait)
370 : {
371 : int i;
372 :
373 : /*
374 : * Clean up the slot state info, if there is any matching slot.
375 : */
376 612 : restart:
377 612 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
378 :
379 2002 : for (i = 0; i < max_active_replication_origins; i++)
380 : {
381 1912 : ReplicationState *state = &replication_states[i];
382 :
383 1912 : if (state->roident == roident)
384 : {
385 : /* found our slot, is it busy? */
386 522 : if (state->acquired_by != 0)
387 : {
388 : ConditionVariable *cv;
389 :
390 0 : if (nowait)
391 0 : ereport(ERROR,
392 : (errcode(ERRCODE_OBJECT_IN_USE),
393 : errmsg("could not drop replication origin with ID %d, in use by PID %d",
394 : state->roident,
395 : state->acquired_by)));
396 :
397 : /*
398 : * We must wait and then retry. Since we don't know which CV
399 : * to wait on until here, we can't readily use
400 : * ConditionVariablePrepareToSleep (calling it here would be
401 : * wrong, since we could miss the signal if we did so); just
402 : * use ConditionVariableSleep directly.
403 : */
404 0 : cv = &state->origin_cv;
405 :
406 0 : LWLockRelease(ReplicationOriginLock);
407 :
408 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
409 0 : goto restart;
410 : }
411 :
412 : /* first make a WAL log entry */
413 : {
414 : xl_replorigin_drop xlrec;
415 :
416 522 : xlrec.node_id = roident;
417 522 : XLogBeginInsert();
418 522 : XLogRegisterData(&xlrec, sizeof(xlrec));
419 522 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
420 : }
421 :
422 : /* then clear the in-memory slot */
423 522 : state->roident = InvalidRepOriginId;
424 522 : state->remote_lsn = InvalidXLogRecPtr;
425 522 : state->local_lsn = InvalidXLogRecPtr;
426 522 : break;
427 : }
428 : }
429 612 : LWLockRelease(ReplicationOriginLock);
430 612 : ConditionVariableCancelSleep();
431 612 : }
432 :
433 : /*
434 : * Drop replication origin (by name).
435 : *
436 : * Needs to be called in a transaction.
437 : */
438 : void
439 986 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
440 : {
441 : RepOriginId roident;
442 : Relation rel;
443 : HeapTuple tuple;
444 :
445 : Assert(IsTransactionState());
446 :
447 986 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
448 :
449 986 : roident = replorigin_by_name(name, missing_ok);
450 :
451 : /* Lock the origin to prevent concurrent drops. */
452 984 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
453 : AccessExclusiveLock);
454 :
455 984 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
456 984 : if (!HeapTupleIsValid(tuple))
457 : {
458 372 : if (!missing_ok)
459 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
460 : roident);
461 :
462 : /*
463 : * We don't need to retain the locks if the origin is already dropped.
464 : */
465 372 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
466 : AccessExclusiveLock);
467 372 : table_close(rel, RowExclusiveLock);
468 372 : return;
469 : }
470 :
471 612 : replorigin_state_clear(roident, nowait);
472 :
473 : /*
474 : * Now, we can delete the catalog entry.
475 : */
476 612 : CatalogTupleDelete(rel, &tuple->t_self);
477 612 : ReleaseSysCache(tuple);
478 :
479 612 : CommandCounterIncrement();
480 :
481 : /* We keep the lock on pg_replication_origin until commit */
482 612 : table_close(rel, NoLock);
483 : }
484 :
485 : /*
486 : * Lookup replication origin via its oid and return the name.
487 : *
488 : * The external name is palloc'd in the calling context.
489 : *
490 : * Returns true if the origin is known, false otherwise.
491 : */
492 : bool
493 60 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
494 : {
495 : HeapTuple tuple;
496 : Form_pg_replication_origin ric;
497 :
498 : Assert(OidIsValid((Oid) roident));
499 : Assert(roident != InvalidRepOriginId);
500 : Assert(roident != DoNotReplicateId);
501 :
502 60 : tuple = SearchSysCache1(REPLORIGIDENT,
503 : ObjectIdGetDatum((Oid) roident));
504 :
505 60 : if (HeapTupleIsValid(tuple))
506 : {
507 50 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
508 50 : *roname = text_to_cstring(&ric->roname);
509 50 : ReleaseSysCache(tuple);
510 :
511 50 : return true;
512 : }
513 : else
514 : {
515 10 : *roname = NULL;
516 :
517 10 : if (!missing_ok)
518 0 : ereport(ERROR,
519 : (errcode(ERRCODE_UNDEFINED_OBJECT),
520 : errmsg("replication origin with ID %d does not exist",
521 : roident)));
522 :
523 10 : return false;
524 : }
525 : }
526 :
527 :
528 : /* ---------------------------------------------------------------------------
529 : * Functions for handling replication progress.
530 : * ---------------------------------------------------------------------------
531 : */
532 :
533 : Size
534 8520 : ReplicationOriginShmemSize(void)
535 : {
536 8520 : Size size = 0;
537 :
538 8520 : if (max_active_replication_origins == 0)
539 4 : return size;
540 :
541 8516 : size = add_size(size, offsetof(ReplicationStateCtl, states));
542 :
543 8516 : size = add_size(size,
544 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
545 8516 : return size;
546 : }
547 :
548 : void
549 2206 : ReplicationOriginShmemInit(void)
550 : {
551 : bool found;
552 :
553 2206 : if (max_active_replication_origins == 0)
554 2 : return;
555 :
556 2204 : replication_states_ctl = (ReplicationStateCtl *)
557 2204 : ShmemInitStruct("ReplicationOriginState",
558 : ReplicationOriginShmemSize(),
559 : &found);
560 2204 : replication_states = replication_states_ctl->states;
561 :
562 2204 : if (!found)
563 : {
564 : int i;
565 :
566 158562 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
567 :
568 2204 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
569 :
570 24226 : for (i = 0; i < max_active_replication_origins; i++)
571 : {
572 22022 : LWLockInitialize(&replication_states[i].lock,
573 22022 : replication_states_ctl->tranche_id);
574 22022 : ConditionVariableInit(&replication_states[i].origin_cv);
575 : }
576 : }
577 : }
578 :
579 : /* ---------------------------------------------------------------------------
580 : * Perform a checkpoint of each replication origin's progress with respect to
581 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
582 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
583 : * if the transactions were originally committed asynchronously.
584 : *
585 : * We store checkpoints in the following format:
586 : * +-------+------------------------+------------------+-----+--------+
587 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
588 : * +-------+------------------------+------------------+-----+--------+
589 : *
590 : * So its just the magic, followed by the statically sized
591 : * ReplicationStateOnDisk structs. Note that the maximum number of
592 : * ReplicationState is determined by max_active_replication_origins.
593 : * ---------------------------------------------------------------------------
594 : */
595 : void
596 3464 : CheckPointReplicationOrigin(void)
597 : {
598 3464 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
599 3464 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
600 : int tmpfd;
601 : int i;
602 3464 : uint32 magic = REPLICATION_STATE_MAGIC;
603 : pg_crc32c crc;
604 :
605 3464 : if (max_active_replication_origins == 0)
606 2 : return;
607 :
608 3462 : INIT_CRC32C(crc);
609 :
610 : /* make sure no old temp file is remaining */
611 3462 : if (unlink(tmppath) < 0 && errno != ENOENT)
612 0 : ereport(PANIC,
613 : (errcode_for_file_access(),
614 : errmsg("could not remove file \"%s\": %m",
615 : tmppath)));
616 :
617 : /*
618 : * no other backend can perform this at the same time; only one checkpoint
619 : * can happen at a time.
620 : */
621 3462 : tmpfd = OpenTransientFile(tmppath,
622 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
623 3462 : if (tmpfd < 0)
624 0 : ereport(PANIC,
625 : (errcode_for_file_access(),
626 : errmsg("could not create file \"%s\": %m",
627 : tmppath)));
628 :
629 : /* write magic */
630 3462 : errno = 0;
631 3462 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632 : {
633 : /* if write didn't set errno, assume problem is no disk space */
634 0 : if (errno == 0)
635 0 : errno = ENOSPC;
636 0 : ereport(PANIC,
637 : (errcode_for_file_access(),
638 : errmsg("could not write to file \"%s\": %m",
639 : tmppath)));
640 : }
641 3462 : COMP_CRC32C(crc, &magic, sizeof(magic));
642 :
643 : /* prevent concurrent creations/drops */
644 3462 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
645 :
646 : /* write actual data */
647 38082 : for (i = 0; i < max_active_replication_origins; i++)
648 : {
649 : ReplicationStateOnDisk disk_state;
650 34620 : ReplicationState *curstate = &replication_states[i];
651 : XLogRecPtr local_lsn;
652 :
653 34620 : if (curstate->roident == InvalidRepOriginId)
654 34510 : continue;
655 :
656 : /* zero, to avoid uninitialized padding bytes */
657 110 : memset(&disk_state, 0, sizeof(disk_state));
658 :
659 110 : LWLockAcquire(&curstate->lock, LW_SHARED);
660 :
661 110 : disk_state.roident = curstate->roident;
662 :
663 110 : disk_state.remote_lsn = curstate->remote_lsn;
664 110 : local_lsn = curstate->local_lsn;
665 :
666 110 : LWLockRelease(&curstate->lock);
667 :
668 : /* make sure we only write out a commit that's persistent */
669 110 : XLogFlush(local_lsn);
670 :
671 110 : errno = 0;
672 110 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673 : sizeof(disk_state))
674 : {
675 : /* if write didn't set errno, assume problem is no disk space */
676 0 : if (errno == 0)
677 0 : errno = ENOSPC;
678 0 : ereport(PANIC,
679 : (errcode_for_file_access(),
680 : errmsg("could not write to file \"%s\": %m",
681 : tmppath)));
682 : }
683 :
684 110 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
685 : }
686 :
687 3462 : LWLockRelease(ReplicationOriginLock);
688 :
689 : /* write out the CRC */
690 3462 : FIN_CRC32C(crc);
691 3462 : errno = 0;
692 3462 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693 : {
694 : /* if write didn't set errno, assume problem is no disk space */
695 0 : if (errno == 0)
696 0 : errno = ENOSPC;
697 0 : ereport(PANIC,
698 : (errcode_for_file_access(),
699 : errmsg("could not write to file \"%s\": %m",
700 : tmppath)));
701 : }
702 :
703 3462 : if (CloseTransientFile(tmpfd) != 0)
704 0 : ereport(PANIC,
705 : (errcode_for_file_access(),
706 : errmsg("could not close file \"%s\": %m",
707 : tmppath)));
708 :
709 : /* fsync, rename to permanent file, fsync file and directory */
710 3462 : durable_rename(tmppath, path, PANIC);
711 : }
712 :
713 : /*
714 : * Recover replication replay status from checkpoint data saved earlier by
715 : * CheckPointReplicationOrigin.
716 : *
717 : * This only needs to be called at startup and *not* during every checkpoint
718 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
719 : * state thereafter can be recovered by looking at commit records.
720 : */
721 : void
722 1922 : StartupReplicationOrigin(void)
723 : {
724 1922 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
725 : int fd;
726 : int readBytes;
727 1922 : uint32 magic = REPLICATION_STATE_MAGIC;
728 1922 : int last_state = 0;
729 : pg_crc32c file_crc;
730 : pg_crc32c crc;
731 :
732 : /* don't want to overwrite already existing state */
733 : #ifdef USE_ASSERT_CHECKING
734 : static bool already_started = false;
735 :
736 : Assert(!already_started);
737 : already_started = true;
738 : #endif
739 :
740 1922 : if (max_active_replication_origins == 0)
741 102 : return;
742 :
743 1920 : INIT_CRC32C(crc);
744 :
745 1920 : elog(DEBUG2, "starting up replication origin progress state");
746 :
747 1920 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
748 :
749 : /*
750 : * might have had max_active_replication_origins == 0 last run, or we just
751 : * brought up a standby.
752 : */
753 1920 : if (fd < 0 && errno == ENOENT)
754 100 : return;
755 1820 : else if (fd < 0)
756 0 : ereport(PANIC,
757 : (errcode_for_file_access(),
758 : errmsg("could not open file \"%s\": %m",
759 : path)));
760 :
761 : /* verify magic, that is written even if nothing was active */
762 1820 : readBytes = read(fd, &magic, sizeof(magic));
763 1820 : if (readBytes != sizeof(magic))
764 : {
765 0 : if (readBytes < 0)
766 0 : ereport(PANIC,
767 : (errcode_for_file_access(),
768 : errmsg("could not read file \"%s\": %m",
769 : path)));
770 : else
771 0 : ereport(PANIC,
772 : (errcode(ERRCODE_DATA_CORRUPTED),
773 : errmsg("could not read file \"%s\": read %d of %zu",
774 : path, readBytes, sizeof(magic))));
775 : }
776 1820 : COMP_CRC32C(crc, &magic, sizeof(magic));
777 :
778 1820 : if (magic != REPLICATION_STATE_MAGIC)
779 0 : ereport(PANIC,
780 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
781 : magic, REPLICATION_STATE_MAGIC)));
782 :
783 : /* we can skip locking here, no other access is possible */
784 :
785 : /* recover individual states, until there are no more to be found */
786 : while (true)
787 58 : {
788 : ReplicationStateOnDisk disk_state;
789 :
790 1878 : readBytes = read(fd, &disk_state, sizeof(disk_state));
791 :
792 1878 : if (readBytes < 0)
793 : {
794 0 : ereport(PANIC,
795 : (errcode_for_file_access(),
796 : errmsg("could not read file \"%s\": %m",
797 : path)));
798 : }
799 :
800 : /* no further data */
801 1878 : if (readBytes == sizeof(crc))
802 : {
803 1820 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
804 1820 : break;
805 : }
806 :
807 58 : if (readBytes != sizeof(disk_state))
808 : {
809 0 : ereport(PANIC,
810 : (errcode_for_file_access(),
811 : errmsg("could not read file \"%s\": read %d of %zu",
812 : path, readBytes, sizeof(disk_state))));
813 : }
814 :
815 58 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
816 :
817 58 : if (last_state == max_active_replication_origins)
818 0 : ereport(PANIC,
819 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
820 : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
821 :
822 : /* copy data to shared memory */
823 58 : replication_states[last_state].roident = disk_state.roident;
824 58 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
825 58 : last_state++;
826 :
827 58 : ereport(LOG,
828 : errmsg("recovered replication state of node %d to %X/%08X",
829 : disk_state.roident,
830 : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
831 : }
832 :
833 : /* now check checksum */
834 1820 : FIN_CRC32C(crc);
835 1820 : if (file_crc != crc)
836 0 : ereport(PANIC,
837 : (errcode(ERRCODE_DATA_CORRUPTED),
838 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
839 : crc, file_crc)));
840 :
841 1820 : if (CloseTransientFile(fd) != 0)
842 0 : ereport(PANIC,
843 : (errcode_for_file_access(),
844 : errmsg("could not close file \"%s\": %m",
845 : path)));
846 : }
847 :
848 : void
849 8 : replorigin_redo(XLogReaderState *record)
850 : {
851 8 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
852 :
853 8 : switch (info)
854 : {
855 4 : case XLOG_REPLORIGIN_SET:
856 : {
857 4 : xl_replorigin_set *xlrec =
858 4 : (xl_replorigin_set *) XLogRecGetData(record);
859 :
860 4 : replorigin_advance(xlrec->node_id,
861 : xlrec->remote_lsn, record->EndRecPtr,
862 4 : xlrec->force /* backward */ ,
863 : false /* WAL log */ );
864 4 : break;
865 : }
866 4 : case XLOG_REPLORIGIN_DROP:
867 : {
868 : xl_replorigin_drop *xlrec;
869 : int i;
870 :
871 4 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
872 :
873 6 : for (i = 0; i < max_active_replication_origins; i++)
874 : {
875 6 : ReplicationState *state = &replication_states[i];
876 :
877 : /* found our slot */
878 6 : if (state->roident == xlrec->node_id)
879 : {
880 : /* reset entry */
881 4 : state->roident = InvalidRepOriginId;
882 4 : state->remote_lsn = InvalidXLogRecPtr;
883 4 : state->local_lsn = InvalidXLogRecPtr;
884 4 : break;
885 : }
886 : }
887 4 : break;
888 : }
889 0 : default:
890 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
891 : }
892 8 : }
893 :
894 :
895 : /*
896 : * Tell the replication origin progress machinery that a commit from 'node'
897 : * that originated at the LSN remote_commit on the remote node was replayed
898 : * successfully and that we don't need to do so again. In combination with
899 : * setting up replorigin_session_origin_lsn and replorigin_session_origin
900 : * that ensures we won't lose knowledge about that after a crash if the
901 : * transaction had a persistent effect (think of asynchronous commits).
902 : *
903 : * local_commit needs to be a local LSN of the commit so that we can make sure
904 : * upon a checkpoint that enough WAL has been persisted to disk.
905 : *
906 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
907 : * unless running in recovery.
908 : */
909 : void
910 482 : replorigin_advance(RepOriginId node,
911 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
912 : bool go_backward, bool wal_log)
913 : {
914 : int i;
915 482 : ReplicationState *replication_state = NULL;
916 482 : ReplicationState *free_state = NULL;
917 :
918 : Assert(node != InvalidRepOriginId);
919 :
920 : /* we don't track DoNotReplicateId */
921 482 : if (node == DoNotReplicateId)
922 0 : return;
923 :
924 : /*
925 : * XXX: For the case where this is called by WAL replay, it'd be more
926 : * efficient to restore into a backend local hashtable and only dump into
927 : * shmem after recovery is finished. Let's wait with implementing that
928 : * till it's shown to be a measurable expense
929 : */
930 :
931 : /* Lock exclusively, as we may have to create a new table entry. */
932 482 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
933 :
934 : /*
935 : * Search for either an existing slot for the origin, or a free one we can
936 : * use.
937 : */
938 4410 : for (i = 0; i < max_active_replication_origins; i++)
939 : {
940 4020 : ReplicationState *curstate = &replication_states[i];
941 :
942 : /* remember where to insert if necessary */
943 4020 : if (curstate->roident == InvalidRepOriginId &&
944 : free_state == NULL)
945 : {
946 390 : free_state = curstate;
947 390 : continue;
948 : }
949 :
950 : /* not our slot */
951 3630 : if (curstate->roident != node)
952 : {
953 3538 : continue;
954 : }
955 :
956 : /* ok, found slot */
957 92 : replication_state = curstate;
958 :
959 92 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
960 :
961 : /* Make sure it's not used by somebody else */
962 92 : if (replication_state->acquired_by != 0)
963 : {
964 0 : ereport(ERROR,
965 : (errcode(ERRCODE_OBJECT_IN_USE),
966 : errmsg("replication origin with ID %d is already active for PID %d",
967 : replication_state->roident,
968 : replication_state->acquired_by)));
969 : }
970 :
971 92 : break;
972 : }
973 :
974 482 : if (replication_state == NULL && free_state == NULL)
975 0 : ereport(ERROR,
976 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
977 : errmsg("could not find free replication state slot for replication origin with ID %d",
978 : node),
979 : errhint("Increase \"max_active_replication_origins\" and try again.")));
980 :
981 482 : if (replication_state == NULL)
982 : {
983 : /* initialize new slot */
984 390 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
985 390 : replication_state = free_state;
986 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
987 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
988 390 : replication_state->roident = node;
989 : }
990 :
991 : Assert(replication_state->roident != InvalidRepOriginId);
992 :
993 : /*
994 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
995 : * and the standby gets the message. Primarily this will be called during
996 : * WAL replay (of commit records) where no WAL logging is necessary.
997 : */
998 482 : if (wal_log)
999 : {
1000 : xl_replorigin_set xlrec;
1001 :
1002 400 : xlrec.remote_lsn = remote_commit;
1003 400 : xlrec.node_id = node;
1004 400 : xlrec.force = go_backward;
1005 :
1006 400 : XLogBeginInsert();
1007 400 : XLogRegisterData(&xlrec, sizeof(xlrec));
1008 :
1009 400 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1010 : }
1011 :
1012 : /*
1013 : * Due to - harmless - race conditions during a checkpoint we could see
1014 : * values here that are older than the ones we already have in memory. We
1015 : * could also see older values for prepared transactions when the prepare
1016 : * is sent at a later point of time along with commit prepared and there
1017 : * are other transactions commits between prepare and commit prepared. See
1018 : * ReorderBufferFinishPrepared. Don't overwrite those.
1019 : */
1020 482 : if (go_backward || replication_state->remote_lsn < remote_commit)
1021 468 : replication_state->remote_lsn = remote_commit;
1022 482 : if (XLogRecPtrIsValid(local_commit) &&
1023 76 : (go_backward || replication_state->local_lsn < local_commit))
1024 80 : replication_state->local_lsn = local_commit;
1025 482 : LWLockRelease(&replication_state->lock);
1026 :
1027 : /*
1028 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1029 : * otherwise be dropped anytime.
1030 : */
1031 482 : LWLockRelease(ReplicationOriginLock);
1032 : }
1033 :
1034 :
1035 : XLogRecPtr
1036 16 : replorigin_get_progress(RepOriginId node, bool flush)
1037 : {
1038 : int i;
1039 16 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1040 16 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1041 :
1042 : /* prevent slots from being concurrently dropped */
1043 16 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1044 :
1045 76 : for (i = 0; i < max_active_replication_origins; i++)
1046 : {
1047 : ReplicationState *state;
1048 :
1049 70 : state = &replication_states[i];
1050 :
1051 70 : if (state->roident == node)
1052 : {
1053 10 : LWLockAcquire(&state->lock, LW_SHARED);
1054 :
1055 10 : remote_lsn = state->remote_lsn;
1056 10 : local_lsn = state->local_lsn;
1057 :
1058 10 : LWLockRelease(&state->lock);
1059 :
1060 10 : break;
1061 : }
1062 : }
1063 :
1064 16 : LWLockRelease(ReplicationOriginLock);
1065 :
1066 16 : if (flush && XLogRecPtrIsValid(local_lsn))
1067 2 : XLogFlush(local_lsn);
1068 :
1069 16 : return remote_lsn;
1070 : }
1071 :
1072 : /*
1073 : * Tear down a (possibly) configured session replication origin during process
1074 : * exit.
1075 : */
1076 : static void
1077 966 : ReplicationOriginExitCleanup(int code, Datum arg)
1078 : {
1079 966 : ConditionVariable *cv = NULL;
1080 :
1081 966 : if (session_replication_state == NULL)
1082 376 : return;
1083 :
1084 590 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1085 :
1086 590 : if (session_replication_state->acquired_by == MyProcPid)
1087 : {
1088 570 : cv = &session_replication_state->origin_cv;
1089 :
1090 570 : session_replication_state->acquired_by = 0;
1091 570 : session_replication_state = NULL;
1092 : }
1093 :
1094 590 : LWLockRelease(ReplicationOriginLock);
1095 :
1096 590 : if (cv)
1097 570 : ConditionVariableBroadcast(cv);
1098 : }
1099 :
1100 : /*
1101 : * Setup a replication origin in the shared memory struct if it doesn't
1102 : * already exist and cache access to the specific ReplicationSlot so the
1103 : * array doesn't have to be searched when calling
1104 : * replorigin_session_advance().
1105 : *
1106 : * Normally only one such cached origin can exist per process so the cached
1107 : * value can only be set again after the previous value is torn down with
1108 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1109 : * (meaning the slot is not allowed to be already acquired by another process).
1110 : *
1111 : * However, sometimes multiple processes can safely re-use the same origin slot
1112 : * (for example, multiple parallel apply processes can safely use the same
1113 : * origin, provided they maintain commit order by allowing only one process to
1114 : * commit at a time). For this case the first process must pass acquired_by =
1115 : * 0, and then the other processes sharing that same origin can pass
1116 : * acquired_by = PID of the first process.
1117 : */
1118 : void
1119 974 : replorigin_session_setup(RepOriginId node, int acquired_by)
1120 : {
1121 : static bool registered_cleanup;
1122 : int i;
1123 974 : int free_slot = -1;
1124 :
1125 974 : if (!registered_cleanup)
1126 : {
1127 966 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1128 966 : registered_cleanup = true;
1129 : }
1130 :
1131 : Assert(max_active_replication_origins > 0);
1132 :
1133 974 : if (session_replication_state != NULL)
1134 2 : ereport(ERROR,
1135 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1136 : errmsg("cannot setup replication origin when one is already setup")));
1137 :
1138 : /* Lock exclusively, as we may have to create a new table entry. */
1139 972 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1140 :
1141 : /*
1142 : * Search for either an existing slot for the origin, or a free one we can
1143 : * use.
1144 : */
1145 3900 : for (i = 0; i < max_active_replication_origins; i++)
1146 : {
1147 3664 : ReplicationState *curstate = &replication_states[i];
1148 :
1149 : /* remember where to insert if necessary */
1150 3664 : if (curstate->roident == InvalidRepOriginId &&
1151 : free_slot == -1)
1152 : {
1153 240 : free_slot = i;
1154 240 : continue;
1155 : }
1156 :
1157 : /* not our slot */
1158 3424 : if (curstate->roident != node)
1159 2688 : continue;
1160 :
1161 736 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1162 : {
1163 0 : ereport(ERROR,
1164 : (errcode(ERRCODE_OBJECT_IN_USE),
1165 : errmsg("replication origin with ID %d is already active for PID %d",
1166 : curstate->roident, curstate->acquired_by)));
1167 : }
1168 :
1169 736 : else if (curstate->acquired_by != acquired_by)
1170 : {
1171 0 : ereport(ERROR,
1172 : (errcode(ERRCODE_OBJECT_IN_USE),
1173 : errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1174 : node, acquired_by)));
1175 : }
1176 :
1177 : /* ok, found slot */
1178 736 : session_replication_state = curstate;
1179 736 : break;
1180 : }
1181 :
1182 :
1183 972 : if (session_replication_state == NULL && free_slot == -1)
1184 0 : ereport(ERROR,
1185 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1186 : errmsg("could not find free replication state slot for replication origin with ID %d",
1187 : node),
1188 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1189 972 : else if (session_replication_state == NULL)
1190 : {
1191 236 : if (acquired_by)
1192 2 : ereport(ERROR,
1193 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1194 : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1195 : acquired_by, node)));
1196 :
1197 : /* initialize new slot */
1198 234 : session_replication_state = &replication_states[free_slot];
1199 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1200 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
1201 234 : session_replication_state->roident = node;
1202 : }
1203 :
1204 :
1205 : Assert(session_replication_state->roident != InvalidRepOriginId);
1206 :
1207 970 : if (acquired_by == 0)
1208 948 : session_replication_state->acquired_by = MyProcPid;
1209 : else
1210 : Assert(session_replication_state->acquired_by == acquired_by);
1211 :
1212 970 : LWLockRelease(ReplicationOriginLock);
1213 :
1214 : /* probably this one is pointless */
1215 970 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1216 970 : }
1217 :
1218 : /*
1219 : * Reset replay state previously setup in this session.
1220 : *
1221 : * This function may only be called if an origin was setup with
1222 : * replorigin_session_setup().
1223 : */
1224 : void
1225 382 : replorigin_session_reset(void)
1226 : {
1227 : ConditionVariable *cv;
1228 :
1229 : Assert(max_active_replication_origins != 0);
1230 :
1231 382 : if (session_replication_state == NULL)
1232 2 : ereport(ERROR,
1233 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1234 : errmsg("no replication origin is configured")));
1235 :
1236 380 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1237 :
1238 380 : session_replication_state->acquired_by = 0;
1239 380 : cv = &session_replication_state->origin_cv;
1240 380 : session_replication_state = NULL;
1241 :
1242 380 : LWLockRelease(ReplicationOriginLock);
1243 :
1244 380 : ConditionVariableBroadcast(cv);
1245 380 : }
1246 :
1247 : /*
1248 : * Do the same work replorigin_advance() does, just on the session's
1249 : * configured origin.
1250 : *
1251 : * This is noticeably cheaper than using replorigin_advance().
1252 : */
1253 : void
1254 2190 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1255 : {
1256 : Assert(session_replication_state != NULL);
1257 : Assert(session_replication_state->roident != InvalidRepOriginId);
1258 :
1259 2190 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1260 2190 : if (session_replication_state->local_lsn < local_commit)
1261 2190 : session_replication_state->local_lsn = local_commit;
1262 2190 : if (session_replication_state->remote_lsn < remote_commit)
1263 1026 : session_replication_state->remote_lsn = remote_commit;
1264 2190 : LWLockRelease(&session_replication_state->lock);
1265 2190 : }
1266 :
1267 : /*
1268 : * Ask the machinery about the point up to which we successfully replayed
1269 : * changes from an already setup replication origin.
1270 : */
1271 : XLogRecPtr
1272 546 : replorigin_session_get_progress(bool flush)
1273 : {
1274 : XLogRecPtr remote_lsn;
1275 : XLogRecPtr local_lsn;
1276 :
1277 : Assert(session_replication_state != NULL);
1278 :
1279 546 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1280 546 : remote_lsn = session_replication_state->remote_lsn;
1281 546 : local_lsn = session_replication_state->local_lsn;
1282 546 : LWLockRelease(&session_replication_state->lock);
1283 :
1284 546 : if (flush && XLogRecPtrIsValid(local_lsn))
1285 2 : XLogFlush(local_lsn);
1286 :
1287 546 : return remote_lsn;
1288 : }
1289 :
1290 :
1291 :
1292 : /* ---------------------------------------------------------------------------
1293 : * SQL functions for working with replication origin.
1294 : *
1295 : * These mostly should be fairly short wrappers around more generic functions.
1296 : * ---------------------------------------------------------------------------
1297 : */
1298 :
1299 : /*
1300 : * Create replication origin for the passed in name, and return the assigned
1301 : * oid.
1302 : */
1303 : Datum
1304 26 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1305 : {
1306 : char *name;
1307 : RepOriginId roident;
1308 :
1309 26 : replorigin_check_prerequisites(false, false);
1310 :
1311 26 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1312 :
1313 : /*
1314 : * Replication origins "any and "none" are reserved for system options.
1315 : * The origins "pg_xxx" are reserved for internal use.
1316 : */
1317 26 : if (IsReservedName(name) || IsReservedOriginName(name))
1318 6 : ereport(ERROR,
1319 : (errcode(ERRCODE_RESERVED_NAME),
1320 : errmsg("replication origin name \"%s\" is reserved",
1321 : name),
1322 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1323 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1324 :
1325 : /*
1326 : * If built with appropriate switch, whine when regression-testing
1327 : * conventions for replication origin names are violated.
1328 : */
1329 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1330 : if (strncmp(name, "regress_", 8) != 0)
1331 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1332 : #endif
1333 :
1334 20 : roident = replorigin_create(name);
1335 :
1336 12 : pfree(name);
1337 :
1338 12 : PG_RETURN_OID(roident);
1339 : }
1340 :
1341 : /*
1342 : * Drop replication origin.
1343 : */
1344 : Datum
1345 16 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1346 : {
1347 : char *name;
1348 :
1349 16 : replorigin_check_prerequisites(false, false);
1350 :
1351 16 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1352 :
1353 16 : replorigin_drop_by_name(name, false, true);
1354 :
1355 14 : pfree(name);
1356 :
1357 14 : PG_RETURN_VOID();
1358 : }
1359 :
1360 : /*
1361 : * Return oid of a replication origin.
1362 : */
1363 : Datum
1364 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1365 : {
1366 : char *name;
1367 : RepOriginId roident;
1368 :
1369 0 : replorigin_check_prerequisites(false, false);
1370 :
1371 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1372 0 : roident = replorigin_by_name(name, true);
1373 :
1374 0 : pfree(name);
1375 :
1376 0 : if (OidIsValid(roident))
1377 0 : PG_RETURN_OID(roident);
1378 0 : PG_RETURN_NULL();
1379 : }
1380 :
1381 : /*
1382 : * Setup a replication origin for this session.
1383 : */
1384 : Datum
1385 18 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1386 : {
1387 : char *name;
1388 : RepOriginId origin;
1389 : int pid;
1390 :
1391 18 : replorigin_check_prerequisites(true, false);
1392 :
1393 18 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1394 18 : origin = replorigin_by_name(name, false);
1395 16 : pid = PG_GETARG_INT32(1);
1396 16 : replorigin_session_setup(origin, pid);
1397 :
1398 12 : replorigin_session_origin = origin;
1399 :
1400 12 : pfree(name);
1401 :
1402 12 : PG_RETURN_VOID();
1403 : }
1404 :
1405 : /*
1406 : * Reset previously setup origin in this session
1407 : */
1408 : Datum
1409 14 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1410 : {
1411 14 : replorigin_check_prerequisites(true, false);
1412 :
1413 14 : replorigin_session_reset();
1414 :
1415 12 : replorigin_session_origin = InvalidRepOriginId;
1416 12 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1417 12 : replorigin_session_origin_timestamp = 0;
1418 :
1419 12 : PG_RETURN_VOID();
1420 : }
1421 :
1422 : /*
1423 : * Has a replication origin been setup for this session.
1424 : */
1425 : Datum
1426 4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1427 : {
1428 4 : replorigin_check_prerequisites(false, false);
1429 :
1430 4 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1431 : }
1432 :
1433 :
1434 : /*
1435 : * Return the replication progress for origin setup in the current session.
1436 : *
1437 : * If 'flush' is set to true it is ensured that the returned value corresponds
1438 : * to a local transaction that has been flushed. This is useful if asynchronous
1439 : * commits are used when replaying replicated transactions.
1440 : */
1441 : Datum
1442 4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1443 : {
1444 4 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1445 4 : bool flush = PG_GETARG_BOOL(0);
1446 :
1447 4 : replorigin_check_prerequisites(true, false);
1448 :
1449 4 : if (session_replication_state == NULL)
1450 0 : ereport(ERROR,
1451 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1452 : errmsg("no replication origin is configured")));
1453 :
1454 4 : remote_lsn = replorigin_session_get_progress(flush);
1455 :
1456 4 : if (!XLogRecPtrIsValid(remote_lsn))
1457 0 : PG_RETURN_NULL();
1458 :
1459 4 : PG_RETURN_LSN(remote_lsn);
1460 : }
1461 :
1462 : Datum
1463 2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1464 : {
1465 2 : XLogRecPtr location = PG_GETARG_LSN(0);
1466 :
1467 2 : replorigin_check_prerequisites(true, false);
1468 :
1469 2 : if (session_replication_state == NULL)
1470 0 : ereport(ERROR,
1471 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1472 : errmsg("no replication origin is configured")));
1473 :
1474 2 : replorigin_session_origin_lsn = location;
1475 2 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1476 :
1477 2 : PG_RETURN_VOID();
1478 : }
1479 :
1480 : Datum
1481 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1482 : {
1483 0 : replorigin_check_prerequisites(true, false);
1484 :
1485 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1486 0 : replorigin_session_origin_timestamp = 0;
1487 :
1488 0 : PG_RETURN_VOID();
1489 : }
1490 :
1491 :
1492 : Datum
1493 6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1494 : {
1495 6 : text *name = PG_GETARG_TEXT_PP(0);
1496 6 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1497 : RepOriginId node;
1498 :
1499 6 : replorigin_check_prerequisites(true, false);
1500 :
1501 : /* lock to prevent the replication origin from vanishing */
1502 6 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1503 :
1504 6 : node = replorigin_by_name(text_to_cstring(name), false);
1505 :
1506 : /*
1507 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1508 : * xact hasn't committed yet. This is why this function should be used to
1509 : * set up the initial replication state, but not for replay.
1510 : */
1511 4 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1512 : true /* go backward */ , true /* WAL log */ );
1513 :
1514 4 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1515 :
1516 4 : PG_RETURN_VOID();
1517 : }
1518 :
1519 :
1520 : /*
1521 : * Return the replication progress for an individual replication origin.
1522 : *
1523 : * If 'flush' is set to true it is ensured that the returned value corresponds
1524 : * to a local transaction that has been flushed. This is useful if asynchronous
1525 : * commits are used when replaying replicated transactions.
1526 : */
1527 : Datum
1528 6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1529 : {
1530 : char *name;
1531 : bool flush;
1532 : RepOriginId roident;
1533 6 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1534 :
1535 6 : replorigin_check_prerequisites(true, true);
1536 :
1537 6 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1538 6 : flush = PG_GETARG_BOOL(1);
1539 :
1540 6 : roident = replorigin_by_name(name, false);
1541 : Assert(OidIsValid(roident));
1542 :
1543 4 : remote_lsn = replorigin_get_progress(roident, flush);
1544 :
1545 4 : if (!XLogRecPtrIsValid(remote_lsn))
1546 0 : PG_RETURN_NULL();
1547 :
1548 4 : PG_RETURN_LSN(remote_lsn);
1549 : }
1550 :
1551 :
1552 : Datum
1553 20 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1554 : {
1555 20 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1556 : int i;
1557 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1558 :
1559 : /* we want to return 0 rows if slot is set to zero */
1560 20 : replorigin_check_prerequisites(false, true);
1561 :
1562 20 : InitMaterializedSRF(fcinfo, 0);
1563 :
1564 : /* prevent slots from being concurrently dropped */
1565 20 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1566 :
1567 : /*
1568 : * Iterate through all possible replication_states, display if they are
1569 : * filled. Note that we do not take any locks, so slightly corrupted/out
1570 : * of date values are a possibility.
1571 : */
1572 220 : for (i = 0; i < max_active_replication_origins; i++)
1573 : {
1574 : ReplicationState *state;
1575 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1576 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1577 : char *roname;
1578 :
1579 200 : state = &replication_states[i];
1580 :
1581 : /* unused slot, nothing to display */
1582 200 : if (state->roident == InvalidRepOriginId)
1583 170 : continue;
1584 :
1585 30 : memset(values, 0, sizeof(values));
1586 30 : memset(nulls, 1, sizeof(nulls));
1587 :
1588 30 : values[0] = ObjectIdGetDatum(state->roident);
1589 30 : nulls[0] = false;
1590 :
1591 : /*
1592 : * We're not preventing the origin to be dropped concurrently, so
1593 : * silently accept that it might be gone.
1594 : */
1595 30 : if (replorigin_by_oid(state->roident, true,
1596 : &roname))
1597 : {
1598 26 : values[1] = CStringGetTextDatum(roname);
1599 26 : nulls[1] = false;
1600 : }
1601 :
1602 30 : LWLockAcquire(&state->lock, LW_SHARED);
1603 :
1604 30 : values[2] = LSNGetDatum(state->remote_lsn);
1605 30 : nulls[2] = false;
1606 :
1607 30 : values[3] = LSNGetDatum(state->local_lsn);
1608 30 : nulls[3] = false;
1609 :
1610 30 : LWLockRelease(&state->lock);
1611 :
1612 30 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1613 : values, nulls);
1614 : }
1615 :
1616 20 : LWLockRelease(ReplicationOriginLock);
1617 :
1618 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1619 :
1620 20 : return (Datum) 0;
1621 : }
|