Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consists of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "utils/builtins.h"
92 : #include "utils/fmgroids.h"
93 : #include "utils/guc.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 :
99 : /* paths for replication origin checkpoint files */
100 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 :
103 : /* GUC variables */
104 : int max_active_replication_origins = 10;
105 :
106 : /*
107 : * Replay progress of a single remote node.
108 : */
109 : typedef struct ReplicationState
110 : {
111 : /*
112 : * Local identifier for the remote node.
113 : */
114 : RepOriginId roident;
115 :
116 : /*
117 : * Location of the latest commit from the remote side.
118 : */
119 : XLogRecPtr remote_lsn;
120 :
121 : /*
122 : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : * during a checkpoint so we know the commit record actually is safe on
124 : * disk.
125 : */
126 : XLogRecPtr local_lsn;
127 :
128 : /*
129 : * PID of backend that's acquired slot, or 0 if none.
130 : */
131 : int acquired_by;
132 :
133 : /* Count of processes that are currently using this origin. */
134 : int refcount;
135 :
136 : /*
137 : * Condition variable that's signaled when acquired_by changes.
138 : */
139 : ConditionVariable origin_cv;
140 :
141 : /*
142 : * Lock protecting remote_lsn and local_lsn.
143 : */
144 : LWLock lock;
145 : } ReplicationState;
146 :
147 : /*
148 : * On disk version of ReplicationState.
149 : */
150 : typedef struct ReplicationStateOnDisk
151 : {
152 : RepOriginId roident;
153 : XLogRecPtr remote_lsn;
154 : } ReplicationStateOnDisk;
155 :
156 :
157 : typedef struct ReplicationStateCtl
158 : {
159 : /* Tranche to use for per-origin LWLocks */
160 : int tranche_id;
161 : /* Array of length max_active_replication_origins */
162 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
163 : } ReplicationStateCtl;
164 :
165 : /* external variables */
166 : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
167 : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
168 : TimestampTz replorigin_session_origin_timestamp = 0;
169 :
170 : /*
171 : * Base address into a shared memory array of replication states of size
172 : * max_active_replication_origins.
173 : */
174 : static ReplicationState *replication_states;
175 :
176 : /*
177 : * Actual shared memory block (replication_states[] is now part of this).
178 : */
179 : static ReplicationStateCtl *replication_states_ctl;
180 :
181 : /*
182 : * We keep a pointer to this backend's ReplicationState to avoid having to
183 : * search the replication_states array in replorigin_session_advance for each
184 : * remote commit. (Ownership of a backend's own entry can only be changed by
185 : * that backend.)
186 : */
187 : static ReplicationState *session_replication_state = NULL;
188 :
189 : /* Magic for on disk files. */
190 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
191 :
192 : static void
193 136 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
194 : {
195 136 : if (check_origins && max_active_replication_origins == 0)
196 0 : ereport(ERROR,
197 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
198 : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
199 :
200 136 : if (!recoveryOK && RecoveryInProgress())
201 0 : ereport(ERROR,
202 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
203 : errmsg("cannot manipulate replication origins during recovery")));
204 136 : }
205 :
206 :
207 : /*
208 : * IsReservedOriginName
209 : * True iff name is either "none" or "any".
210 : */
211 : static bool
212 26 : IsReservedOriginName(const char *name)
213 : {
214 50 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
215 24 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
216 : }
217 :
218 : /* ---------------------------------------------------------------------------
219 : * Functions for working with replication origins themselves.
220 : * ---------------------------------------------------------------------------
221 : */
222 :
223 : /*
224 : * Check for a persistent replication origin identified by name.
225 : *
226 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
227 : */
228 : RepOriginId
229 1974 : replorigin_by_name(const char *roname, bool missing_ok)
230 : {
231 : Form_pg_replication_origin ident;
232 1974 : Oid roident = InvalidOid;
233 : HeapTuple tuple;
234 : Datum roname_d;
235 :
236 1974 : roname_d = CStringGetTextDatum(roname);
237 :
238 1974 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
239 1974 : if (HeapTupleIsValid(tuple))
240 : {
241 1214 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
242 1214 : roident = ident->roident;
243 1214 : ReleaseSysCache(tuple);
244 : }
245 760 : else if (!missing_ok)
246 8 : ereport(ERROR,
247 : (errcode(ERRCODE_UNDEFINED_OBJECT),
248 : errmsg("replication origin \"%s\" does not exist",
249 : roname)));
250 :
251 1966 : return roident;
252 : }
253 :
254 : /*
255 : * Create a replication origin.
256 : *
257 : * Needs to be called in a transaction.
258 : */
259 : RepOriginId
260 742 : replorigin_create(const char *roname)
261 : {
262 : Oid roident;
263 742 : HeapTuple tuple = NULL;
264 : Relation rel;
265 : Datum roname_d;
266 : SnapshotData SnapshotDirty;
267 : SysScanDesc scan;
268 : ScanKeyData key;
269 :
270 : /*
271 : * To avoid needing a TOAST table for pg_replication_origin, we limit
272 : * replication origin names to 512 bytes. This should be more than enough
273 : * for all practical use.
274 : */
275 742 : if (strlen(roname) > MAX_RONAME_LEN)
276 6 : ereport(ERROR,
277 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
278 : errmsg("replication origin name is too long"),
279 : errdetail("Replication origin names must be no longer than %d bytes.",
280 : MAX_RONAME_LEN)));
281 :
282 736 : roname_d = CStringGetTextDatum(roname);
283 :
284 : Assert(IsTransactionState());
285 :
286 : /*
287 : * We need the numeric replication origin to be 16bit wide, so we cannot
288 : * rely on the normal oid allocation. Instead we simply scan
289 : * pg_replication_origin for the first unused id. That's not particularly
290 : * efficient, but this should be a fairly infrequent operation - we can
291 : * easily spend a bit more code on this when it turns out it needs to be
292 : * faster.
293 : *
294 : * We handle concurrency by taking an exclusive lock (allowing reads!)
295 : * over the table for the duration of the search. Because we use a "dirty
296 : * snapshot" we can read rows that other in-progress sessions have
297 : * written, even though they would be invisible with normal snapshots. Due
298 : * to the exclusive lock there's no danger that new rows can appear while
299 : * we're checking.
300 : */
301 736 : InitDirtySnapshot(SnapshotDirty);
302 :
303 736 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
304 :
305 : /*
306 : * We want to be able to access pg_replication_origin without setting up a
307 : * snapshot. To make that safe, it needs to not have a TOAST table, since
308 : * TOASTed data cannot be fetched without a snapshot. As of this writing,
309 : * its only varlena column is roname, which we limit to 512 bytes to avoid
310 : * needing out-of-line storage. If you add a TOAST table to this catalog,
311 : * be sure to set up a snapshot everywhere it might be needed. For more
312 : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
313 : */
314 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
315 :
316 1332 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
317 : {
318 : bool nulls[Natts_pg_replication_origin];
319 : Datum values[Natts_pg_replication_origin];
320 : bool collides;
321 :
322 1332 : CHECK_FOR_INTERRUPTS();
323 :
324 1332 : ScanKeyInit(&key,
325 : Anum_pg_replication_origin_roident,
326 : BTEqualStrategyNumber, F_OIDEQ,
327 : ObjectIdGetDatum(roident));
328 :
329 1332 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
330 : true /* indexOK */ ,
331 : &SnapshotDirty,
332 : 1, &key);
333 :
334 1332 : collides = HeapTupleIsValid(systable_getnext(scan));
335 :
336 1332 : systable_endscan(scan);
337 :
338 1332 : if (!collides)
339 : {
340 : /*
341 : * Ok, found an unused roident, insert the new row and do a CCI,
342 : * so our callers can look it up if they want to.
343 : */
344 736 : memset(&nulls, 0, sizeof(nulls));
345 :
346 736 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
347 736 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
348 :
349 736 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
350 736 : CatalogTupleInsert(rel, tuple);
351 734 : CommandCounterIncrement();
352 734 : break;
353 : }
354 : }
355 :
356 : /* now release lock again, */
357 734 : table_close(rel, ExclusiveLock);
358 :
359 734 : if (tuple == NULL)
360 0 : ereport(ERROR,
361 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
362 : errmsg("could not find free replication origin ID")));
363 :
364 734 : heap_freetuple(tuple);
365 734 : return roident;
366 : }
367 :
368 : /*
369 : * Helper function to drop a replication origin.
370 : */
371 : static void
372 626 : replorigin_state_clear(RepOriginId roident, bool nowait)
373 : {
374 : int i;
375 :
376 : /*
377 : * Clean up the slot state info, if there is any matching slot.
378 : */
379 630 : restart:
380 630 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
381 :
382 2064 : for (i = 0; i < max_active_replication_origins; i++)
383 : {
384 1972 : ReplicationState *state = &replication_states[i];
385 :
386 1972 : if (state->roident == roident)
387 : {
388 : /* found our slot, is it busy? */
389 538 : if (state->refcount > 0)
390 : {
391 : ConditionVariable *cv;
392 :
393 4 : if (nowait)
394 0 : ereport(ERROR,
395 : (errcode(ERRCODE_OBJECT_IN_USE),
396 : (state->acquired_by != 0)
397 : ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
398 : state->roident,
399 : state->acquired_by)
400 : : errmsg("could not drop replication origin with ID %d, in use by another process",
401 : state->roident)));
402 :
403 : /*
404 : * We must wait and then retry. Since we don't know which CV
405 : * to wait on until here, we can't readily use
406 : * ConditionVariablePrepareToSleep (calling it here would be
407 : * wrong, since we could miss the signal if we did so); just
408 : * use ConditionVariableSleep directly.
409 : */
410 4 : cv = &state->origin_cv;
411 :
412 4 : LWLockRelease(ReplicationOriginLock);
413 :
414 4 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
415 4 : goto restart;
416 : }
417 :
418 : /* first make a WAL log entry */
419 : {
420 : xl_replorigin_drop xlrec;
421 :
422 534 : xlrec.node_id = roident;
423 534 : XLogBeginInsert();
424 534 : XLogRegisterData(&xlrec, sizeof(xlrec));
425 534 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
426 : }
427 :
428 : /* then clear the in-memory slot */
429 534 : state->roident = InvalidRepOriginId;
430 534 : state->remote_lsn = InvalidXLogRecPtr;
431 534 : state->local_lsn = InvalidXLogRecPtr;
432 534 : break;
433 : }
434 : }
435 626 : LWLockRelease(ReplicationOriginLock);
436 626 : ConditionVariableCancelSleep();
437 626 : }
438 :
439 : /*
440 : * Drop replication origin (by name).
441 : *
442 : * Needs to be called in a transaction.
443 : */
444 : void
445 1000 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
446 : {
447 : RepOriginId roident;
448 : Relation rel;
449 : HeapTuple tuple;
450 :
451 : Assert(IsTransactionState());
452 :
453 1000 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
454 :
455 1000 : roident = replorigin_by_name(name, missing_ok);
456 :
457 : /* Lock the origin to prevent concurrent drops. */
458 998 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
459 : AccessExclusiveLock);
460 :
461 998 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
462 998 : if (!HeapTupleIsValid(tuple))
463 : {
464 372 : if (!missing_ok)
465 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
466 : roident);
467 :
468 : /*
469 : * We don't need to retain the locks if the origin is already dropped.
470 : */
471 372 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
472 : AccessExclusiveLock);
473 372 : table_close(rel, RowExclusiveLock);
474 372 : return;
475 : }
476 :
477 626 : replorigin_state_clear(roident, nowait);
478 :
479 : /*
480 : * Now, we can delete the catalog entry.
481 : */
482 626 : CatalogTupleDelete(rel, &tuple->t_self);
483 626 : ReleaseSysCache(tuple);
484 :
485 626 : CommandCounterIncrement();
486 :
487 : /* We keep the lock on pg_replication_origin until commit */
488 626 : table_close(rel, NoLock);
489 : }
490 :
491 : /*
492 : * Lookup replication origin via its oid and return the name.
493 : *
494 : * The external name is palloc'd in the calling context.
495 : *
496 : * Returns true if the origin is known, false otherwise.
497 : */
498 : bool
499 58 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
500 : {
501 : HeapTuple tuple;
502 : Form_pg_replication_origin ric;
503 :
504 : Assert(OidIsValid((Oid) roident));
505 : Assert(roident != InvalidRepOriginId);
506 : Assert(roident != DoNotReplicateId);
507 :
508 58 : tuple = SearchSysCache1(REPLORIGIDENT,
509 : ObjectIdGetDatum((Oid) roident));
510 :
511 58 : if (HeapTupleIsValid(tuple))
512 : {
513 52 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
514 52 : *roname = text_to_cstring(&ric->roname);
515 52 : ReleaseSysCache(tuple);
516 :
517 52 : return true;
518 : }
519 : else
520 : {
521 6 : *roname = NULL;
522 :
523 6 : if (!missing_ok)
524 0 : ereport(ERROR,
525 : (errcode(ERRCODE_UNDEFINED_OBJECT),
526 : errmsg("replication origin with ID %d does not exist",
527 : roident)));
528 :
529 6 : return false;
530 : }
531 : }
532 :
533 :
534 : /* ---------------------------------------------------------------------------
535 : * Functions for handling replication progress.
536 : * ---------------------------------------------------------------------------
537 : */
538 :
539 : Size
540 8802 : ReplicationOriginShmemSize(void)
541 : {
542 8802 : Size size = 0;
543 :
544 8802 : if (max_active_replication_origins == 0)
545 4 : return size;
546 :
547 8798 : size = add_size(size, offsetof(ReplicationStateCtl, states));
548 :
549 8798 : size = add_size(size,
550 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
551 8798 : return size;
552 : }
553 :
554 : void
555 2278 : ReplicationOriginShmemInit(void)
556 : {
557 : bool found;
558 :
559 2278 : if (max_active_replication_origins == 0)
560 2 : return;
561 :
562 2276 : replication_states_ctl = (ReplicationStateCtl *)
563 2276 : ShmemInitStruct("ReplicationOriginState",
564 : ReplicationOriginShmemSize(),
565 : &found);
566 2276 : replication_states = replication_states_ctl->states;
567 :
568 2276 : if (!found)
569 : {
570 : int i;
571 :
572 186488 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
573 :
574 2276 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
575 :
576 25018 : for (i = 0; i < max_active_replication_origins; i++)
577 : {
578 22742 : LWLockInitialize(&replication_states[i].lock,
579 22742 : replication_states_ctl->tranche_id);
580 22742 : ConditionVariableInit(&replication_states[i].origin_cv);
581 : }
582 : }
583 : }
584 :
585 : /* ---------------------------------------------------------------------------
586 : * Perform a checkpoint of each replication origin's progress with respect to
587 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
588 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
589 : * if the transactions were originally committed asynchronously.
590 : *
591 : * We store checkpoints in the following format:
592 : * +-------+------------------------+------------------+-----+--------+
593 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
594 : * +-------+------------------------+------------------+-----+--------+
595 : *
596 : * So its just the magic, followed by the statically sized
597 : * ReplicationStateOnDisk structs. Note that the maximum number of
598 : * ReplicationState is determined by max_active_replication_origins.
599 : * ---------------------------------------------------------------------------
600 : */
601 : void
602 3558 : CheckPointReplicationOrigin(void)
603 : {
604 3558 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
605 3558 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
606 : int tmpfd;
607 : int i;
608 3558 : uint32 magic = REPLICATION_STATE_MAGIC;
609 : pg_crc32c crc;
610 :
611 3558 : if (max_active_replication_origins == 0)
612 2 : return;
613 :
614 3556 : INIT_CRC32C(crc);
615 :
616 : /* make sure no old temp file is remaining */
617 3556 : if (unlink(tmppath) < 0 && errno != ENOENT)
618 0 : ereport(PANIC,
619 : (errcode_for_file_access(),
620 : errmsg("could not remove file \"%s\": %m",
621 : tmppath)));
622 :
623 : /*
624 : * no other backend can perform this at the same time; only one checkpoint
625 : * can happen at a time.
626 : */
627 3556 : tmpfd = OpenTransientFile(tmppath,
628 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
629 3556 : if (tmpfd < 0)
630 0 : ereport(PANIC,
631 : (errcode_for_file_access(),
632 : errmsg("could not create file \"%s\": %m",
633 : tmppath)));
634 :
635 : /* write magic */
636 3556 : errno = 0;
637 3556 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
638 : {
639 : /* if write didn't set errno, assume problem is no disk space */
640 0 : if (errno == 0)
641 0 : errno = ENOSPC;
642 0 : ereport(PANIC,
643 : (errcode_for_file_access(),
644 : errmsg("could not write to file \"%s\": %m",
645 : tmppath)));
646 : }
647 3556 : COMP_CRC32C(crc, &magic, sizeof(magic));
648 :
649 : /* prevent concurrent creations/drops */
650 3556 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
651 :
652 : /* write actual data */
653 39116 : for (i = 0; i < max_active_replication_origins; i++)
654 : {
655 : ReplicationStateOnDisk disk_state;
656 35560 : ReplicationState *curstate = &replication_states[i];
657 : XLogRecPtr local_lsn;
658 :
659 35560 : if (curstate->roident == InvalidRepOriginId)
660 35454 : continue;
661 :
662 : /* zero, to avoid uninitialized padding bytes */
663 106 : memset(&disk_state, 0, sizeof(disk_state));
664 :
665 106 : LWLockAcquire(&curstate->lock, LW_SHARED);
666 :
667 106 : disk_state.roident = curstate->roident;
668 :
669 106 : disk_state.remote_lsn = curstate->remote_lsn;
670 106 : local_lsn = curstate->local_lsn;
671 :
672 106 : LWLockRelease(&curstate->lock);
673 :
674 : /* make sure we only write out a commit that's persistent */
675 106 : XLogFlush(local_lsn);
676 :
677 106 : errno = 0;
678 106 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
679 : sizeof(disk_state))
680 : {
681 : /* if write didn't set errno, assume problem is no disk space */
682 0 : if (errno == 0)
683 0 : errno = ENOSPC;
684 0 : ereport(PANIC,
685 : (errcode_for_file_access(),
686 : errmsg("could not write to file \"%s\": %m",
687 : tmppath)));
688 : }
689 :
690 106 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
691 : }
692 :
693 3556 : LWLockRelease(ReplicationOriginLock);
694 :
695 : /* write out the CRC */
696 3556 : FIN_CRC32C(crc);
697 3556 : errno = 0;
698 3556 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
699 : {
700 : /* if write didn't set errno, assume problem is no disk space */
701 0 : if (errno == 0)
702 0 : errno = ENOSPC;
703 0 : ereport(PANIC,
704 : (errcode_for_file_access(),
705 : errmsg("could not write to file \"%s\": %m",
706 : tmppath)));
707 : }
708 :
709 3556 : if (CloseTransientFile(tmpfd) != 0)
710 0 : ereport(PANIC,
711 : (errcode_for_file_access(),
712 : errmsg("could not close file \"%s\": %m",
713 : tmppath)));
714 :
715 : /* fsync, rename to permanent file, fsync file and directory */
716 3556 : durable_rename(tmppath, path, PANIC);
717 : }
718 :
719 : /*
720 : * Recover replication replay status from checkpoint data saved earlier by
721 : * CheckPointReplicationOrigin.
722 : *
723 : * This only needs to be called at startup and *not* during every checkpoint
724 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
725 : * state thereafter can be recovered by looking at commit records.
726 : */
727 : void
728 1986 : StartupReplicationOrigin(void)
729 : {
730 1986 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
731 : int fd;
732 : int readBytes;
733 1986 : uint32 magic = REPLICATION_STATE_MAGIC;
734 1986 : int last_state = 0;
735 : pg_crc32c file_crc;
736 : pg_crc32c crc;
737 :
738 : /* don't want to overwrite already existing state */
739 : #ifdef USE_ASSERT_CHECKING
740 : static bool already_started = false;
741 :
742 : Assert(!already_started);
743 : already_started = true;
744 : #endif
745 :
746 1986 : if (max_active_replication_origins == 0)
747 104 : return;
748 :
749 1984 : INIT_CRC32C(crc);
750 :
751 1984 : elog(DEBUG2, "starting up replication origin progress state");
752 :
753 1984 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
754 :
755 : /*
756 : * might have had max_active_replication_origins == 0 last run, or we just
757 : * brought up a standby.
758 : */
759 1984 : if (fd < 0 && errno == ENOENT)
760 102 : return;
761 1882 : else if (fd < 0)
762 0 : ereport(PANIC,
763 : (errcode_for_file_access(),
764 : errmsg("could not open file \"%s\": %m",
765 : path)));
766 :
767 : /* verify magic, that is written even if nothing was active */
768 1882 : readBytes = read(fd, &magic, sizeof(magic));
769 1882 : if (readBytes != sizeof(magic))
770 : {
771 0 : if (readBytes < 0)
772 0 : ereport(PANIC,
773 : (errcode_for_file_access(),
774 : errmsg("could not read file \"%s\": %m",
775 : path)));
776 : else
777 0 : ereport(PANIC,
778 : (errcode(ERRCODE_DATA_CORRUPTED),
779 : errmsg("could not read file \"%s\": read %d of %zu",
780 : path, readBytes, sizeof(magic))));
781 : }
782 1882 : COMP_CRC32C(crc, &magic, sizeof(magic));
783 :
784 1882 : if (magic != REPLICATION_STATE_MAGIC)
785 0 : ereport(PANIC,
786 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
787 : magic, REPLICATION_STATE_MAGIC)));
788 :
789 : /* we can skip locking here, no other access is possible */
790 :
791 : /* recover individual states, until there are no more to be found */
792 : while (true)
793 62 : {
794 : ReplicationStateOnDisk disk_state;
795 :
796 1944 : readBytes = read(fd, &disk_state, sizeof(disk_state));
797 :
798 1944 : if (readBytes < 0)
799 : {
800 0 : ereport(PANIC,
801 : (errcode_for_file_access(),
802 : errmsg("could not read file \"%s\": %m",
803 : path)));
804 : }
805 :
806 : /* no further data */
807 1944 : if (readBytes == sizeof(crc))
808 : {
809 1882 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
810 1882 : break;
811 : }
812 :
813 62 : if (readBytes != sizeof(disk_state))
814 : {
815 0 : ereport(PANIC,
816 : (errcode_for_file_access(),
817 : errmsg("could not read file \"%s\": read %d of %zu",
818 : path, readBytes, sizeof(disk_state))));
819 : }
820 :
821 62 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
822 :
823 62 : if (last_state == max_active_replication_origins)
824 0 : ereport(PANIC,
825 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
826 : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
827 :
828 : /* copy data to shared memory */
829 62 : replication_states[last_state].roident = disk_state.roident;
830 62 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
831 62 : last_state++;
832 :
833 62 : ereport(LOG,
834 : errmsg("recovered replication state of node %d to %X/%08X",
835 : disk_state.roident,
836 : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
837 : }
838 :
839 : /* now check checksum */
840 1882 : FIN_CRC32C(crc);
841 1882 : if (file_crc != crc)
842 0 : ereport(PANIC,
843 : (errcode(ERRCODE_DATA_CORRUPTED),
844 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
845 : crc, file_crc)));
846 :
847 1882 : if (CloseTransientFile(fd) != 0)
848 0 : ereport(PANIC,
849 : (errcode_for_file_access(),
850 : errmsg("could not close file \"%s\": %m",
851 : path)));
852 : }
853 :
854 : void
855 8 : replorigin_redo(XLogReaderState *record)
856 : {
857 8 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
858 :
859 8 : switch (info)
860 : {
861 4 : case XLOG_REPLORIGIN_SET:
862 : {
863 4 : xl_replorigin_set *xlrec =
864 4 : (xl_replorigin_set *) XLogRecGetData(record);
865 :
866 4 : replorigin_advance(xlrec->node_id,
867 : xlrec->remote_lsn, record->EndRecPtr,
868 4 : xlrec->force /* backward */ ,
869 : false /* WAL log */ );
870 4 : break;
871 : }
872 4 : case XLOG_REPLORIGIN_DROP:
873 : {
874 : xl_replorigin_drop *xlrec;
875 : int i;
876 :
877 4 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
878 :
879 6 : for (i = 0; i < max_active_replication_origins; i++)
880 : {
881 6 : ReplicationState *state = &replication_states[i];
882 :
883 : /* found our slot */
884 6 : if (state->roident == xlrec->node_id)
885 : {
886 : /* reset entry */
887 4 : state->roident = InvalidRepOriginId;
888 4 : state->remote_lsn = InvalidXLogRecPtr;
889 4 : state->local_lsn = InvalidXLogRecPtr;
890 4 : break;
891 : }
892 : }
893 4 : break;
894 : }
895 0 : default:
896 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
897 : }
898 8 : }
899 :
900 :
901 : /*
902 : * Tell the replication origin progress machinery that a commit from 'node'
903 : * that originated at the LSN remote_commit on the remote node was replayed
904 : * successfully and that we don't need to do so again. In combination with
905 : * setting up replorigin_session_origin_lsn and replorigin_session_origin
906 : * that ensures we won't lose knowledge about that after a crash if the
907 : * transaction had a persistent effect (think of asynchronous commits).
908 : *
909 : * local_commit needs to be a local LSN of the commit so that we can make sure
910 : * upon a checkpoint that enough WAL has been persisted to disk.
911 : *
912 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
913 : * unless running in recovery.
914 : */
915 : void
916 484 : replorigin_advance(RepOriginId node,
917 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
918 : bool go_backward, bool wal_log)
919 : {
920 : int i;
921 484 : ReplicationState *replication_state = NULL;
922 484 : ReplicationState *free_state = NULL;
923 :
924 : Assert(node != InvalidRepOriginId);
925 :
926 : /* we don't track DoNotReplicateId */
927 484 : if (node == DoNotReplicateId)
928 0 : return;
929 :
930 : /*
931 : * XXX: For the case where this is called by WAL replay, it'd be more
932 : * efficient to restore into a backend local hashtable and only dump into
933 : * shmem after recovery is finished. Let's wait with implementing that
934 : * till it's shown to be a measurable expense
935 : */
936 :
937 : /* Lock exclusively, as we may have to create a new table entry. */
938 484 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
939 :
940 : /*
941 : * Search for either an existing slot for the origin, or a free one we can
942 : * use.
943 : */
944 4430 : for (i = 0; i < max_active_replication_origins; i++)
945 : {
946 4038 : ReplicationState *curstate = &replication_states[i];
947 :
948 : /* remember where to insert if necessary */
949 4038 : if (curstate->roident == InvalidRepOriginId &&
950 : free_state == NULL)
951 : {
952 392 : free_state = curstate;
953 392 : continue;
954 : }
955 :
956 : /* not our slot */
957 3646 : if (curstate->roident != node)
958 : {
959 3554 : continue;
960 : }
961 :
962 : /* ok, found slot */
963 92 : replication_state = curstate;
964 :
965 92 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
966 :
967 : /* Make sure it's not used by somebody else */
968 92 : if (replication_state->refcount > 0)
969 : {
970 0 : ereport(ERROR,
971 : (errcode(ERRCODE_OBJECT_IN_USE),
972 : (replication_state->acquired_by != 0)
973 : ? errmsg("replication origin with ID %d is already active for PID %d",
974 : replication_state->roident,
975 : replication_state->acquired_by)
976 : : errmsg("replication origin with ID %d is already active in another process",
977 : replication_state->roident)));
978 : }
979 :
980 92 : break;
981 : }
982 :
983 484 : if (replication_state == NULL && free_state == NULL)
984 0 : ereport(ERROR,
985 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
986 : errmsg("could not find free replication state slot for replication origin with ID %d",
987 : node),
988 : errhint("Increase \"max_active_replication_origins\" and try again.")));
989 :
990 484 : if (replication_state == NULL)
991 : {
992 : /* initialize new slot */
993 392 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
994 392 : replication_state = free_state;
995 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
996 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
997 392 : replication_state->roident = node;
998 : }
999 :
1000 : Assert(replication_state->roident != InvalidRepOriginId);
1001 :
1002 : /*
1003 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1004 : * and the standby gets the message. Primarily this will be called during
1005 : * WAL replay (of commit records) where no WAL logging is necessary.
1006 : */
1007 484 : if (wal_log)
1008 : {
1009 : xl_replorigin_set xlrec;
1010 :
1011 402 : xlrec.remote_lsn = remote_commit;
1012 402 : xlrec.node_id = node;
1013 402 : xlrec.force = go_backward;
1014 :
1015 402 : XLogBeginInsert();
1016 402 : XLogRegisterData(&xlrec, sizeof(xlrec));
1017 :
1018 402 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1019 : }
1020 :
1021 : /*
1022 : * Due to - harmless - race conditions during a checkpoint we could see
1023 : * values here that are older than the ones we already have in memory. We
1024 : * could also see older values for prepared transactions when the prepare
1025 : * is sent at a later point of time along with commit prepared and there
1026 : * are other transactions commits between prepare and commit prepared. See
1027 : * ReorderBufferFinishPrepared. Don't overwrite those.
1028 : */
1029 484 : if (go_backward || replication_state->remote_lsn < remote_commit)
1030 470 : replication_state->remote_lsn = remote_commit;
1031 484 : if (XLogRecPtrIsValid(local_commit) &&
1032 76 : (go_backward || replication_state->local_lsn < local_commit))
1033 80 : replication_state->local_lsn = local_commit;
1034 484 : LWLockRelease(&replication_state->lock);
1035 :
1036 : /*
1037 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1038 : * otherwise be dropped anytime.
1039 : */
1040 484 : LWLockRelease(ReplicationOriginLock);
1041 : }
1042 :
1043 :
1044 : XLogRecPtr
1045 16 : replorigin_get_progress(RepOriginId node, bool flush)
1046 : {
1047 : int i;
1048 16 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1049 16 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1050 :
1051 : /* prevent slots from being concurrently dropped */
1052 16 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1053 :
1054 76 : for (i = 0; i < max_active_replication_origins; i++)
1055 : {
1056 : ReplicationState *state;
1057 :
1058 70 : state = &replication_states[i];
1059 :
1060 70 : if (state->roident == node)
1061 : {
1062 10 : LWLockAcquire(&state->lock, LW_SHARED);
1063 :
1064 10 : remote_lsn = state->remote_lsn;
1065 10 : local_lsn = state->local_lsn;
1066 :
1067 10 : LWLockRelease(&state->lock);
1068 :
1069 10 : break;
1070 : }
1071 : }
1072 :
1073 16 : LWLockRelease(ReplicationOriginLock);
1074 :
1075 16 : if (flush && XLogRecPtrIsValid(local_lsn))
1076 2 : XLogFlush(local_lsn);
1077 :
1078 16 : return remote_lsn;
1079 : }
1080 :
1081 : /* Helper function to reset the session replication origin */
1082 : static void
1083 942 : replorigin_session_reset_internal(void)
1084 : {
1085 : ConditionVariable *cv;
1086 :
1087 : Assert(session_replication_state != NULL);
1088 :
1089 942 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1090 :
1091 : /* The origin must be held by at least one process at this point. */
1092 : Assert(session_replication_state->refcount > 0);
1093 :
1094 : /*
1095 : * Reset the PID only if the current session is the first to set up this
1096 : * origin. This avoids clearing the first process's PID when any other
1097 : * session releases the origin.
1098 : */
1099 942 : if (session_replication_state->acquired_by == MyProcPid)
1100 914 : session_replication_state->acquired_by = 0;
1101 :
1102 942 : session_replication_state->refcount--;
1103 :
1104 942 : cv = &session_replication_state->origin_cv;
1105 942 : session_replication_state = NULL;
1106 :
1107 942 : LWLockRelease(ReplicationOriginLock);
1108 :
1109 942 : ConditionVariableBroadcast(cv);
1110 942 : }
1111 :
1112 : /*
1113 : * Tear down a (possibly) configured session replication origin during process
1114 : * exit.
1115 : */
1116 : static void
1117 934 : ReplicationOriginExitCleanup(int code, Datum arg)
1118 : {
1119 934 : if (session_replication_state == NULL)
1120 380 : return;
1121 :
1122 554 : replorigin_session_reset_internal();
1123 : }
1124 :
1125 : /*
1126 : * Setup a replication origin in the shared memory struct if it doesn't
1127 : * already exist and cache access to the specific ReplicationSlot so the
1128 : * array doesn't have to be searched when calling
1129 : * replorigin_session_advance().
1130 : *
1131 : * Normally only one such cached origin can exist per process so the cached
1132 : * value can only be set again after the previous value is torn down with
1133 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1134 : * (meaning the slot is not allowed to be already acquired by another process).
1135 : *
1136 : * However, sometimes multiple processes can safely re-use the same origin slot
1137 : * (for example, multiple parallel apply processes can safely use the same
1138 : * origin, provided they maintain commit order by allowing only one process to
1139 : * commit at a time). For this case the first process must pass acquired_by =
1140 : * 0, and then the other processes sharing that same origin can pass
1141 : * acquired_by = PID of the first process.
1142 : */
1143 : void
1144 946 : replorigin_session_setup(RepOriginId node, int acquired_by)
1145 : {
1146 : static bool registered_cleanup;
1147 : int i;
1148 946 : int free_slot = -1;
1149 :
1150 946 : if (!registered_cleanup)
1151 : {
1152 934 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1153 934 : registered_cleanup = true;
1154 : }
1155 :
1156 : Assert(max_active_replication_origins > 0);
1157 :
1158 946 : if (session_replication_state != NULL)
1159 2 : ereport(ERROR,
1160 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1161 : errmsg("cannot setup replication origin when one is already setup")));
1162 :
1163 : /* Lock exclusively, as we may have to create a new table entry. */
1164 944 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1165 :
1166 : /*
1167 : * Search for either an existing slot for the origin, or a free one we can
1168 : * use.
1169 : */
1170 3908 : for (i = 0; i < max_active_replication_origins; i++)
1171 : {
1172 3668 : ReplicationState *curstate = &replication_states[i];
1173 :
1174 : /* remember where to insert if necessary */
1175 3668 : if (curstate->roident == InvalidRepOriginId &&
1176 : free_slot == -1)
1177 : {
1178 244 : free_slot = i;
1179 244 : continue;
1180 : }
1181 :
1182 : /* not our slot */
1183 3424 : if (curstate->roident != node)
1184 2720 : continue;
1185 :
1186 704 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1187 : {
1188 0 : ereport(ERROR,
1189 : (errcode(ERRCODE_OBJECT_IN_USE),
1190 : errmsg("replication origin with ID %d is already active for PID %d",
1191 : curstate->roident, curstate->acquired_by)));
1192 : }
1193 :
1194 704 : else if (curstate->acquired_by != acquired_by)
1195 : {
1196 0 : ereport(ERROR,
1197 : (errcode(ERRCODE_OBJECT_IN_USE),
1198 : errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1199 : node, acquired_by)));
1200 : }
1201 :
1202 : /*
1203 : * The origin is in use, but PID is not recorded. This can happen if
1204 : * the process that originally acquired the origin exited without
1205 : * releasing it. To ensure correctness, other processes cannot acquire
1206 : * the origin until all processes currently using it have released it.
1207 : */
1208 704 : else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1209 0 : ereport(ERROR,
1210 : (errcode(ERRCODE_OBJECT_IN_USE),
1211 : errmsg("replication origin with ID %d is already active in another process",
1212 : curstate->roident)));
1213 :
1214 : /* ok, found slot */
1215 704 : session_replication_state = curstate;
1216 704 : break;
1217 : }
1218 :
1219 :
1220 944 : if (session_replication_state == NULL && free_slot == -1)
1221 0 : ereport(ERROR,
1222 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1223 : errmsg("could not find free replication state slot for replication origin with ID %d",
1224 : node),
1225 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1226 944 : else if (session_replication_state == NULL)
1227 : {
1228 240 : if (acquired_by)
1229 2 : ereport(ERROR,
1230 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1231 : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1232 : acquired_by, node)));
1233 :
1234 : /* initialize new slot */
1235 238 : session_replication_state = &replication_states[free_slot];
1236 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1237 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
1238 238 : session_replication_state->roident = node;
1239 : }
1240 :
1241 :
1242 : Assert(session_replication_state->roident != InvalidRepOriginId);
1243 :
1244 942 : if (acquired_by == 0)
1245 : {
1246 914 : session_replication_state->acquired_by = MyProcPid;
1247 : Assert(session_replication_state->refcount == 0);
1248 : }
1249 : else
1250 : {
1251 : /*
1252 : * Sanity check: the origin must already be acquired by the process
1253 : * passed as input, and at least one process must be using it.
1254 : */
1255 : Assert(session_replication_state->acquired_by == acquired_by);
1256 : Assert(session_replication_state->refcount > 0);
1257 : }
1258 :
1259 942 : session_replication_state->refcount++;
1260 :
1261 942 : LWLockRelease(ReplicationOriginLock);
1262 :
1263 : /* probably this one is pointless */
1264 942 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1265 942 : }
1266 :
1267 : /*
1268 : * Reset replay state previously setup in this session.
1269 : *
1270 : * This function may only be called if an origin was setup with
1271 : * replorigin_session_setup().
1272 : */
1273 : void
1274 392 : replorigin_session_reset(void)
1275 : {
1276 : Assert(max_active_replication_origins != 0);
1277 :
1278 392 : if (session_replication_state == NULL)
1279 2 : ereport(ERROR,
1280 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1281 : errmsg("no replication origin is configured")));
1282 :
1283 : /*
1284 : * Restrict explicit resetting of the replication origin if it was first
1285 : * acquired by this process and others are still using it. While the
1286 : * system handles this safely (as happens if the first session exits
1287 : * without calling reset), it is best to avoid doing so.
1288 : */
1289 390 : if (session_replication_state->acquired_by == MyProcPid &&
1290 386 : session_replication_state->refcount > 1)
1291 2 : ereport(ERROR,
1292 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1293 : errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1294 : session_replication_state->roident),
1295 : errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1296 : errhint("Reset the replication origin in all other processes before retrying.")));
1297 :
1298 388 : replorigin_session_reset_internal();
1299 388 : }
1300 :
1301 : /*
1302 : * Do the same work replorigin_advance() does, just on the session's
1303 : * configured origin.
1304 : *
1305 : * This is noticeably cheaper than using replorigin_advance().
1306 : */
1307 : void
1308 2196 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1309 : {
1310 : Assert(session_replication_state != NULL);
1311 : Assert(session_replication_state->roident != InvalidRepOriginId);
1312 :
1313 2196 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1314 2196 : if (session_replication_state->local_lsn < local_commit)
1315 2196 : session_replication_state->local_lsn = local_commit;
1316 2196 : if (session_replication_state->remote_lsn < remote_commit)
1317 1040 : session_replication_state->remote_lsn = remote_commit;
1318 2196 : LWLockRelease(&session_replication_state->lock);
1319 2196 : }
1320 :
1321 : /*
1322 : * Ask the machinery about the point up to which we successfully replayed
1323 : * changes from an already setup replication origin.
1324 : */
1325 : XLogRecPtr
1326 508 : replorigin_session_get_progress(bool flush)
1327 : {
1328 : XLogRecPtr remote_lsn;
1329 : XLogRecPtr local_lsn;
1330 :
1331 : Assert(session_replication_state != NULL);
1332 :
1333 508 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1334 508 : remote_lsn = session_replication_state->remote_lsn;
1335 508 : local_lsn = session_replication_state->local_lsn;
1336 508 : LWLockRelease(&session_replication_state->lock);
1337 :
1338 508 : if (flush && XLogRecPtrIsValid(local_lsn))
1339 2 : XLogFlush(local_lsn);
1340 :
1341 508 : return remote_lsn;
1342 : }
1343 :
1344 :
1345 :
1346 : /* ---------------------------------------------------------------------------
1347 : * SQL functions for working with replication origin.
1348 : *
1349 : * These mostly should be fairly short wrappers around more generic functions.
1350 : * ---------------------------------------------------------------------------
1351 : */
1352 :
1353 : /*
1354 : * Create replication origin for the passed in name, and return the assigned
1355 : * oid.
1356 : */
1357 : Datum
1358 28 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1359 : {
1360 : char *name;
1361 : RepOriginId roident;
1362 :
1363 28 : replorigin_check_prerequisites(false, false);
1364 :
1365 28 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1366 :
1367 : /*
1368 : * Replication origins "any and "none" are reserved for system options.
1369 : * The origins "pg_xxx" are reserved for internal use.
1370 : */
1371 28 : if (IsReservedName(name) || IsReservedOriginName(name))
1372 6 : ereport(ERROR,
1373 : (errcode(ERRCODE_RESERVED_NAME),
1374 : errmsg("replication origin name \"%s\" is reserved",
1375 : name),
1376 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1377 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1378 :
1379 : /*
1380 : * If built with appropriate switch, whine when regression-testing
1381 : * conventions for replication origin names are violated.
1382 : */
1383 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1384 : if (strncmp(name, "regress_", 8) != 0)
1385 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1386 : #endif
1387 :
1388 22 : roident = replorigin_create(name);
1389 :
1390 14 : pfree(name);
1391 :
1392 14 : PG_RETURN_OID(roident);
1393 : }
1394 :
1395 : /*
1396 : * Drop replication origin.
1397 : */
1398 : Datum
1399 18 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1400 : {
1401 : char *name;
1402 :
1403 18 : replorigin_check_prerequisites(false, false);
1404 :
1405 18 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1406 :
1407 18 : replorigin_drop_by_name(name, false, true);
1408 :
1409 16 : pfree(name);
1410 :
1411 16 : PG_RETURN_VOID();
1412 : }
1413 :
1414 : /*
1415 : * Return oid of a replication origin.
1416 : */
1417 : Datum
1418 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1419 : {
1420 : char *name;
1421 : RepOriginId roident;
1422 :
1423 0 : replorigin_check_prerequisites(false, false);
1424 :
1425 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1426 0 : roident = replorigin_by_name(name, true);
1427 :
1428 0 : pfree(name);
1429 :
1430 0 : if (OidIsValid(roident))
1431 0 : PG_RETURN_OID(roident);
1432 0 : PG_RETURN_NULL();
1433 : }
1434 :
1435 : /*
1436 : * Setup a replication origin for this session.
1437 : */
1438 : Datum
1439 22 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1440 : {
1441 : char *name;
1442 : RepOriginId origin;
1443 : int pid;
1444 :
1445 22 : replorigin_check_prerequisites(true, false);
1446 :
1447 22 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1448 22 : origin = replorigin_by_name(name, false);
1449 20 : pid = PG_GETARG_INT32(1);
1450 20 : replorigin_session_setup(origin, pid);
1451 :
1452 16 : replorigin_session_origin = origin;
1453 :
1454 16 : pfree(name);
1455 :
1456 16 : PG_RETURN_VOID();
1457 : }
1458 :
1459 : /*
1460 : * Reset previously setup origin in this session
1461 : */
1462 : Datum
1463 20 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1464 : {
1465 20 : replorigin_check_prerequisites(true, false);
1466 :
1467 20 : replorigin_session_reset();
1468 :
1469 16 : replorigin_session_origin = InvalidRepOriginId;
1470 16 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1471 16 : replorigin_session_origin_timestamp = 0;
1472 :
1473 16 : PG_RETURN_VOID();
1474 : }
1475 :
1476 : /*
1477 : * Has a replication origin been setup for this session.
1478 : */
1479 : Datum
1480 8 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1481 : {
1482 8 : replorigin_check_prerequisites(false, false);
1483 :
1484 8 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1485 : }
1486 :
1487 :
1488 : /*
1489 : * Return the replication progress for origin setup in the current session.
1490 : *
1491 : * If 'flush' is set to true it is ensured that the returned value corresponds
1492 : * to a local transaction that has been flushed. This is useful if asynchronous
1493 : * commits are used when replaying replicated transactions.
1494 : */
1495 : Datum
1496 4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1497 : {
1498 4 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1499 4 : bool flush = PG_GETARG_BOOL(0);
1500 :
1501 4 : replorigin_check_prerequisites(true, false);
1502 :
1503 4 : if (session_replication_state == NULL)
1504 0 : ereport(ERROR,
1505 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1506 : errmsg("no replication origin is configured")));
1507 :
1508 4 : remote_lsn = replorigin_session_get_progress(flush);
1509 :
1510 4 : if (!XLogRecPtrIsValid(remote_lsn))
1511 0 : PG_RETURN_NULL();
1512 :
1513 4 : PG_RETURN_LSN(remote_lsn);
1514 : }
1515 :
1516 : Datum
1517 2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1518 : {
1519 2 : XLogRecPtr location = PG_GETARG_LSN(0);
1520 :
1521 2 : replorigin_check_prerequisites(true, false);
1522 :
1523 2 : if (session_replication_state == NULL)
1524 0 : ereport(ERROR,
1525 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1526 : errmsg("no replication origin is configured")));
1527 :
1528 2 : replorigin_session_origin_lsn = location;
1529 2 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1530 :
1531 2 : PG_RETURN_VOID();
1532 : }
1533 :
1534 : Datum
1535 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1536 : {
1537 0 : replorigin_check_prerequisites(true, false);
1538 :
1539 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1540 0 : replorigin_session_origin_timestamp = 0;
1541 :
1542 0 : PG_RETURN_VOID();
1543 : }
1544 :
1545 :
1546 : Datum
1547 6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1548 : {
1549 6 : text *name = PG_GETARG_TEXT_PP(0);
1550 6 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1551 : RepOriginId node;
1552 :
1553 6 : replorigin_check_prerequisites(true, false);
1554 :
1555 : /* lock to prevent the replication origin from vanishing */
1556 6 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1557 :
1558 6 : node = replorigin_by_name(text_to_cstring(name), false);
1559 :
1560 : /*
1561 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1562 : * xact hasn't committed yet. This is why this function should be used to
1563 : * set up the initial replication state, but not for replay.
1564 : */
1565 4 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1566 : true /* go backward */ , true /* WAL log */ );
1567 :
1568 4 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1569 :
1570 4 : PG_RETURN_VOID();
1571 : }
1572 :
1573 :
1574 : /*
1575 : * Return the replication progress for an individual replication origin.
1576 : *
1577 : * If 'flush' is set to true it is ensured that the returned value corresponds
1578 : * to a local transaction that has been flushed. This is useful if asynchronous
1579 : * commits are used when replaying replicated transactions.
1580 : */
1581 : Datum
1582 6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1583 : {
1584 : char *name;
1585 : bool flush;
1586 : RepOriginId roident;
1587 6 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1588 :
1589 6 : replorigin_check_prerequisites(true, true);
1590 :
1591 6 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1592 6 : flush = PG_GETARG_BOOL(1);
1593 :
1594 6 : roident = replorigin_by_name(name, false);
1595 : Assert(OidIsValid(roident));
1596 :
1597 4 : remote_lsn = replorigin_get_progress(roident, flush);
1598 :
1599 4 : if (!XLogRecPtrIsValid(remote_lsn))
1600 0 : PG_RETURN_NULL();
1601 :
1602 4 : PG_RETURN_LSN(remote_lsn);
1603 : }
1604 :
1605 :
1606 : Datum
1607 22 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1608 : {
1609 22 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1610 : int i;
1611 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1612 :
1613 : /* we want to return 0 rows if slot is set to zero */
1614 22 : replorigin_check_prerequisites(false, true);
1615 :
1616 22 : InitMaterializedSRF(fcinfo, 0);
1617 :
1618 : /* prevent slots from being concurrently dropped */
1619 22 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1620 :
1621 : /*
1622 : * Iterate through all possible replication_states, display if they are
1623 : * filled. Note that we do not take any locks, so slightly corrupted/out
1624 : * of date values are a possibility.
1625 : */
1626 242 : for (i = 0; i < max_active_replication_origins; i++)
1627 : {
1628 : ReplicationState *state;
1629 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1630 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1631 : char *roname;
1632 :
1633 220 : state = &replication_states[i];
1634 :
1635 : /* unused slot, nothing to display */
1636 220 : if (state->roident == InvalidRepOriginId)
1637 194 : continue;
1638 :
1639 26 : memset(values, 0, sizeof(values));
1640 26 : memset(nulls, 1, sizeof(nulls));
1641 :
1642 26 : values[0] = ObjectIdGetDatum(state->roident);
1643 26 : nulls[0] = false;
1644 :
1645 : /*
1646 : * We're not preventing the origin to be dropped concurrently, so
1647 : * silently accept that it might be gone.
1648 : */
1649 26 : if (replorigin_by_oid(state->roident, true,
1650 : &roname))
1651 : {
1652 26 : values[1] = CStringGetTextDatum(roname);
1653 26 : nulls[1] = false;
1654 : }
1655 :
1656 26 : LWLockAcquire(&state->lock, LW_SHARED);
1657 :
1658 26 : values[2] = LSNGetDatum(state->remote_lsn);
1659 26 : nulls[2] = false;
1660 :
1661 26 : values[3] = LSNGetDatum(state->local_lsn);
1662 26 : nulls[3] = false;
1663 :
1664 26 : LWLockRelease(&state->lock);
1665 :
1666 26 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1667 : values, nulls);
1668 : }
1669 :
1670 22 : LWLockRelease(ReplicationOriginLock);
1671 :
1672 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1673 :
1674 22 : return (Datum) 0;
1675 : }
|