Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consists of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "utils/builtins.h"
92 : #include "utils/fmgroids.h"
93 : #include "utils/guc.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 : #include "utils/wait_event.h"
99 :
100 : /* paths for replication origin checkpoint files */
101 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
102 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
103 :
104 : /* GUC variables */
105 : int max_active_replication_origins = 10;
106 :
107 : /*
108 : * Replay progress of a single remote node.
109 : */
110 : typedef struct ReplicationState
111 : {
112 : /*
113 : * Local identifier for the remote node.
114 : */
115 : ReplOriginId roident;
116 :
117 : /*
118 : * Location of the latest commit from the remote side.
119 : */
120 : XLogRecPtr remote_lsn;
121 :
122 : /*
123 : * Remember the local lsn of the commit record so we can XLogFlush() to it
124 : * during a checkpoint so we know the commit record actually is safe on
125 : * disk.
126 : */
127 : XLogRecPtr local_lsn;
128 :
129 : /*
130 : * PID of backend that's acquired slot, or 0 if none.
131 : */
132 : int acquired_by;
133 :
134 : /* Count of processes that are currently using this origin. */
135 : int refcount;
136 :
137 : /*
138 : * Condition variable that's signaled when acquired_by changes.
139 : */
140 : ConditionVariable origin_cv;
141 :
142 : /*
143 : * Lock protecting remote_lsn and local_lsn.
144 : */
145 : LWLock lock;
146 : } ReplicationState;
147 :
148 : /*
149 : * On disk version of ReplicationState.
150 : */
151 : typedef struct ReplicationStateOnDisk
152 : {
153 : ReplOriginId roident;
154 : XLogRecPtr remote_lsn;
155 : } ReplicationStateOnDisk;
156 :
157 :
158 : typedef struct ReplicationStateCtl
159 : {
160 : /* Tranche to use for per-origin LWLocks */
161 : int tranche_id;
162 : /* Array of length max_active_replication_origins */
163 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
164 : } ReplicationStateCtl;
165 :
166 : /* Global variable for per-transaction replication origin state */
167 : ReplOriginXactState replorigin_xact_state = {
168 : .origin = InvalidReplOriginId, /* assumed identity */
169 : .origin_lsn = InvalidXLogRecPtr,
170 : .origin_timestamp = 0
171 : };
172 :
173 : /*
174 : * Base address into a shared memory array of replication states of size
175 : * max_active_replication_origins.
176 : */
177 : static ReplicationState *replication_states;
178 :
179 : /*
180 : * Actual shared memory block (replication_states[] is now part of this).
181 : */
182 : static ReplicationStateCtl *replication_states_ctl;
183 :
184 : /*
185 : * We keep a pointer to this backend's ReplicationState to avoid having to
186 : * search the replication_states array in replorigin_session_advance for each
187 : * remote commit. (Ownership of a backend's own entry can only be changed by
188 : * that backend.)
189 : */
190 : static ReplicationState *session_replication_state = NULL;
191 :
192 : /* Magic for on disk files. */
193 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
194 :
195 : static void
196 68 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
197 : {
198 68 : if (check_origins && max_active_replication_origins == 0)
199 0 : ereport(ERROR,
200 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
201 : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
202 :
203 68 : if (!recoveryOK && RecoveryInProgress())
204 0 : ereport(ERROR,
205 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
206 : errmsg("cannot manipulate replication origins during recovery")));
207 68 : }
208 :
209 :
210 : /*
211 : * IsReservedOriginName
212 : * True iff name is either "none" or "any".
213 : */
214 : static bool
215 13 : IsReservedOriginName(const char *name)
216 : {
217 25 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
218 12 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
219 : }
220 :
221 : /* ---------------------------------------------------------------------------
222 : * Functions for working with replication origins themselves.
223 : * ---------------------------------------------------------------------------
224 : */
225 :
226 : /*
227 : * Check for a persistent replication origin identified by name.
228 : *
229 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
230 : */
231 : ReplOriginId
232 1064 : replorigin_by_name(const char *roname, bool missing_ok)
233 : {
234 : Form_pg_replication_origin ident;
235 1064 : Oid roident = InvalidOid;
236 : HeapTuple tuple;
237 : Datum roname_d;
238 :
239 1064 : roname_d = CStringGetTextDatum(roname);
240 :
241 1064 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
242 1064 : if (HeapTupleIsValid(tuple))
243 : {
244 667 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
245 667 : roident = ident->roident;
246 667 : ReleaseSysCache(tuple);
247 : }
248 397 : else if (!missing_ok)
249 4 : ereport(ERROR,
250 : (errcode(ERRCODE_UNDEFINED_OBJECT),
251 : errmsg("replication origin \"%s\" does not exist",
252 : roname)));
253 :
254 1060 : return roident;
255 : }
256 :
257 : /*
258 : * Create a replication origin.
259 : *
260 : * Needs to be called in a transaction.
261 : */
262 : ReplOriginId
263 389 : replorigin_create(const char *roname)
264 : {
265 : Oid roident;
266 389 : HeapTuple tuple = NULL;
267 : Relation rel;
268 : Datum roname_d;
269 : SnapshotData SnapshotDirty;
270 : SysScanDesc scan;
271 : ScanKeyData key;
272 :
273 : /*
274 : * To avoid needing a TOAST table for pg_replication_origin, we limit
275 : * replication origin names to 512 bytes. This should be more than enough
276 : * for all practical use.
277 : */
278 389 : if (strlen(roname) > MAX_RONAME_LEN)
279 3 : ereport(ERROR,
280 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
281 : errmsg("replication origin name is too long"),
282 : errdetail("Replication origin names must be no longer than %d bytes.",
283 : MAX_RONAME_LEN)));
284 :
285 386 : roname_d = CStringGetTextDatum(roname);
286 :
287 : Assert(IsTransactionState());
288 :
289 : /*
290 : * We need the numeric replication origin to be 16bit wide, so we cannot
291 : * rely on the normal oid allocation. Instead we simply scan
292 : * pg_replication_origin for the first unused id. That's not particularly
293 : * efficient, but this should be a fairly infrequent operation - we can
294 : * easily spend a bit more code on this when it turns out it needs to be
295 : * faster.
296 : *
297 : * We handle concurrency by taking an exclusive lock (allowing reads!)
298 : * over the table for the duration of the search. Because we use a "dirty
299 : * snapshot" we can read rows that other in-progress sessions have
300 : * written, even though they would be invisible with normal snapshots. Due
301 : * to the exclusive lock there's no danger that new rows can appear while
302 : * we're checking.
303 : */
304 386 : InitDirtySnapshot(SnapshotDirty);
305 :
306 386 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
307 :
308 : /*
309 : * We want to be able to access pg_replication_origin without setting up a
310 : * snapshot. To make that safe, it needs to not have a TOAST table, since
311 : * TOASTed data cannot be fetched without a snapshot. As of this writing,
312 : * its only varlena column is roname, which we limit to 512 bytes to avoid
313 : * needing out-of-line storage. If you add a TOAST table to this catalog,
314 : * be sure to set up a snapshot everywhere it might be needed. For more
315 : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
316 : */
317 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
318 :
319 696 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
320 : {
321 : bool nulls[Natts_pg_replication_origin];
322 : Datum values[Natts_pg_replication_origin];
323 : bool collides;
324 :
325 696 : CHECK_FOR_INTERRUPTS();
326 :
327 696 : ScanKeyInit(&key,
328 : Anum_pg_replication_origin_roident,
329 : BTEqualStrategyNumber, F_OIDEQ,
330 : ObjectIdGetDatum(roident));
331 :
332 696 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
333 : true /* indexOK */ ,
334 : &SnapshotDirty,
335 : 1, &key);
336 :
337 696 : collides = HeapTupleIsValid(systable_getnext(scan));
338 :
339 696 : systable_endscan(scan);
340 :
341 696 : if (!collides)
342 : {
343 : /*
344 : * Ok, found an unused roident, insert the new row and do a CCI,
345 : * so our callers can look it up if they want to.
346 : */
347 386 : memset(&nulls, 0, sizeof(nulls));
348 :
349 386 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
350 386 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
351 :
352 386 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
353 386 : CatalogTupleInsert(rel, tuple);
354 385 : CommandCounterIncrement();
355 385 : break;
356 : }
357 : }
358 :
359 : /* now release lock again, */
360 385 : table_close(rel, ExclusiveLock);
361 :
362 385 : if (tuple == NULL)
363 0 : ereport(ERROR,
364 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
365 : errmsg("could not find free replication origin ID")));
366 :
367 385 : heap_freetuple(tuple);
368 385 : return roident;
369 : }
370 :
371 : /*
372 : * Helper function to drop a replication origin.
373 : */
374 : static void
375 333 : replorigin_state_clear(ReplOriginId roident, bool nowait)
376 : {
377 : int i;
378 :
379 : /*
380 : * Clean up the slot state info, if there is any matching slot.
381 : */
382 333 : restart:
383 333 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
384 :
385 1129 : for (i = 0; i < max_active_replication_origins; i++)
386 : {
387 1076 : ReplicationState *state = &replication_states[i];
388 :
389 1076 : if (state->roident == roident)
390 : {
391 : /* found our slot, is it busy? */
392 280 : if (state->refcount > 0)
393 : {
394 : ConditionVariable *cv;
395 :
396 0 : if (nowait)
397 0 : ereport(ERROR,
398 : (errcode(ERRCODE_OBJECT_IN_USE),
399 : (state->acquired_by != 0)
400 : ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
401 : state->roident,
402 : state->acquired_by)
403 : : errmsg("could not drop replication origin with ID %d, in use by another process",
404 : state->roident)));
405 :
406 : /*
407 : * We must wait and then retry. Since we don't know which CV
408 : * to wait on until here, we can't readily use
409 : * ConditionVariablePrepareToSleep (calling it here would be
410 : * wrong, since we could miss the signal if we did so); just
411 : * use ConditionVariableSleep directly.
412 : */
413 0 : cv = &state->origin_cv;
414 :
415 0 : LWLockRelease(ReplicationOriginLock);
416 :
417 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
418 0 : goto restart;
419 : }
420 :
421 : /* first make a WAL log entry */
422 : {
423 : xl_replorigin_drop xlrec;
424 :
425 280 : xlrec.node_id = roident;
426 280 : XLogBeginInsert();
427 280 : XLogRegisterData(&xlrec, sizeof(xlrec));
428 280 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
429 : }
430 :
431 : /* then clear the in-memory slot */
432 280 : state->roident = InvalidReplOriginId;
433 280 : state->remote_lsn = InvalidXLogRecPtr;
434 280 : state->local_lsn = InvalidXLogRecPtr;
435 280 : break;
436 : }
437 : }
438 333 : LWLockRelease(ReplicationOriginLock);
439 333 : ConditionVariableCancelSleep();
440 333 : }
441 :
442 : /*
443 : * Drop replication origin (by name).
444 : *
445 : * Needs to be called in a transaction.
446 : */
447 : void
448 528 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
449 : {
450 : ReplOriginId roident;
451 : Relation rel;
452 : HeapTuple tuple;
453 :
454 : Assert(IsTransactionState());
455 :
456 528 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
457 :
458 528 : roident = replorigin_by_name(name, missing_ok);
459 :
460 : /* Lock the origin to prevent concurrent drops. */
461 527 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
462 : AccessExclusiveLock);
463 :
464 527 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
465 527 : if (!HeapTupleIsValid(tuple))
466 : {
467 194 : if (!missing_ok)
468 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
469 : roident);
470 :
471 : /*
472 : * We don't need to retain the locks if the origin is already dropped.
473 : */
474 194 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
475 : AccessExclusiveLock);
476 194 : table_close(rel, RowExclusiveLock);
477 194 : return;
478 : }
479 :
480 333 : replorigin_state_clear(roident, nowait);
481 :
482 : /*
483 : * Now, we can delete the catalog entry.
484 : */
485 333 : CatalogTupleDelete(rel, &tuple->t_self);
486 333 : ReleaseSysCache(tuple);
487 :
488 333 : CommandCounterIncrement();
489 :
490 : /* We keep the lock on pg_replication_origin until commit */
491 333 : table_close(rel, NoLock);
492 : }
493 :
494 : /*
495 : * Lookup replication origin via its oid and return the name.
496 : *
497 : * The external name is palloc'd in the calling context.
498 : *
499 : * Returns true if the origin is known, false otherwise.
500 : */
501 : bool
502 28 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
503 : {
504 : HeapTuple tuple;
505 : Form_pg_replication_origin ric;
506 :
507 : Assert(OidIsValid((Oid) roident));
508 : Assert(roident != InvalidReplOriginId);
509 : Assert(roident != DoNotReplicateId);
510 :
511 28 : tuple = SearchSysCache1(REPLORIGIDENT,
512 : ObjectIdGetDatum((Oid) roident));
513 :
514 28 : if (HeapTupleIsValid(tuple))
515 : {
516 25 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
517 25 : *roname = text_to_cstring(&ric->roname);
518 25 : ReleaseSysCache(tuple);
519 :
520 25 : return true;
521 : }
522 : else
523 : {
524 3 : *roname = NULL;
525 :
526 3 : if (!missing_ok)
527 0 : ereport(ERROR,
528 : (errcode(ERRCODE_UNDEFINED_OBJECT),
529 : errmsg("replication origin with ID %d does not exist",
530 : roident)));
531 :
532 3 : return false;
533 : }
534 : }
535 :
536 :
537 : /* ---------------------------------------------------------------------------
538 : * Functions for handling replication progress.
539 : * ---------------------------------------------------------------------------
540 : */
541 :
542 : Size
543 4469 : ReplicationOriginShmemSize(void)
544 : {
545 4469 : Size size = 0;
546 :
547 4469 : if (max_active_replication_origins == 0)
548 2 : return size;
549 :
550 4467 : size = add_size(size, offsetof(ReplicationStateCtl, states));
551 :
552 4467 : size = add_size(size,
553 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
554 4467 : return size;
555 : }
556 :
557 : void
558 1156 : ReplicationOriginShmemInit(void)
559 : {
560 : bool found;
561 :
562 1156 : if (max_active_replication_origins == 0)
563 1 : return;
564 :
565 1155 : replication_states_ctl = (ReplicationStateCtl *)
566 1155 : ShmemInitStruct("ReplicationOriginState",
567 : ReplicationOriginShmemSize(),
568 : &found);
569 1155 : replication_states = replication_states_ctl->states;
570 :
571 1155 : if (!found)
572 : {
573 : int i;
574 :
575 94638 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
576 :
577 1155 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
578 :
579 12696 : for (i = 0; i < max_active_replication_origins; i++)
580 : {
581 11541 : LWLockInitialize(&replication_states[i].lock,
582 11541 : replication_states_ctl->tranche_id);
583 11541 : ConditionVariableInit(&replication_states[i].origin_cv);
584 : }
585 : }
586 : }
587 :
588 : /* ---------------------------------------------------------------------------
589 : * Perform a checkpoint of each replication origin's progress with respect to
590 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
591 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
592 : * if the transactions were originally committed asynchronously.
593 : *
594 : * We store checkpoints in the following format:
595 : * +-------+------------------------+------------------+-----+--------+
596 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
597 : * +-------+------------------------+------------------+-----+--------+
598 : *
599 : * So its just the magic, followed by the statically sized
600 : * ReplicationStateOnDisk structs. Note that the maximum number of
601 : * ReplicationState is determined by max_active_replication_origins.
602 : * ---------------------------------------------------------------------------
603 : */
604 : void
605 1808 : CheckPointReplicationOrigin(void)
606 : {
607 1808 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
608 1808 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
609 : int tmpfd;
610 : int i;
611 1808 : uint32 magic = REPLICATION_STATE_MAGIC;
612 : pg_crc32c crc;
613 :
614 1808 : if (max_active_replication_origins == 0)
615 1 : return;
616 :
617 1807 : INIT_CRC32C(crc);
618 :
619 : /* make sure no old temp file is remaining */
620 1807 : if (unlink(tmppath) < 0 && errno != ENOENT)
621 0 : ereport(PANIC,
622 : (errcode_for_file_access(),
623 : errmsg("could not remove file \"%s\": %m",
624 : tmppath)));
625 :
626 : /*
627 : * no other backend can perform this at the same time; only one checkpoint
628 : * can happen at a time.
629 : */
630 1807 : tmpfd = OpenTransientFile(tmppath,
631 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
632 1807 : if (tmpfd < 0)
633 0 : ereport(PANIC,
634 : (errcode_for_file_access(),
635 : errmsg("could not create file \"%s\": %m",
636 : tmppath)));
637 :
638 : /* write magic */
639 1807 : errno = 0;
640 1807 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
641 : {
642 : /* if write didn't set errno, assume problem is no disk space */
643 0 : if (errno == 0)
644 0 : errno = ENOSPC;
645 0 : ereport(PANIC,
646 : (errcode_for_file_access(),
647 : errmsg("could not write to file \"%s\": %m",
648 : tmppath)));
649 : }
650 1807 : COMP_CRC32C(crc, &magic, sizeof(magic));
651 :
652 : /* prevent concurrent creations/drops */
653 1807 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
654 :
655 : /* write actual data */
656 19877 : for (i = 0; i < max_active_replication_origins; i++)
657 : {
658 : ReplicationStateOnDisk disk_state;
659 18070 : ReplicationState *curstate = &replication_states[i];
660 : XLogRecPtr local_lsn;
661 :
662 18070 : if (curstate->roident == InvalidReplOriginId)
663 18017 : continue;
664 :
665 : /* zero, to avoid uninitialized padding bytes */
666 53 : memset(&disk_state, 0, sizeof(disk_state));
667 :
668 53 : LWLockAcquire(&curstate->lock, LW_SHARED);
669 :
670 53 : disk_state.roident = curstate->roident;
671 :
672 53 : disk_state.remote_lsn = curstate->remote_lsn;
673 53 : local_lsn = curstate->local_lsn;
674 :
675 53 : LWLockRelease(&curstate->lock);
676 :
677 : /* make sure we only write out a commit that's persistent */
678 53 : XLogFlush(local_lsn);
679 :
680 53 : errno = 0;
681 53 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
682 : sizeof(disk_state))
683 : {
684 : /* if write didn't set errno, assume problem is no disk space */
685 0 : if (errno == 0)
686 0 : errno = ENOSPC;
687 0 : ereport(PANIC,
688 : (errcode_for_file_access(),
689 : errmsg("could not write to file \"%s\": %m",
690 : tmppath)));
691 : }
692 :
693 53 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
694 : }
695 :
696 1807 : LWLockRelease(ReplicationOriginLock);
697 :
698 : /* write out the CRC */
699 1807 : FIN_CRC32C(crc);
700 1807 : errno = 0;
701 1807 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
702 : {
703 : /* if write didn't set errno, assume problem is no disk space */
704 0 : if (errno == 0)
705 0 : errno = ENOSPC;
706 0 : ereport(PANIC,
707 : (errcode_for_file_access(),
708 : errmsg("could not write to file \"%s\": %m",
709 : tmppath)));
710 : }
711 :
712 1807 : if (CloseTransientFile(tmpfd) != 0)
713 0 : ereport(PANIC,
714 : (errcode_for_file_access(),
715 : errmsg("could not close file \"%s\": %m",
716 : tmppath)));
717 :
718 : /* fsync, rename to permanent file, fsync file and directory */
719 1807 : durable_rename(tmppath, path, PANIC);
720 : }
721 :
722 : /*
723 : * Recover replication replay status from checkpoint data saved earlier by
724 : * CheckPointReplicationOrigin.
725 : *
726 : * This only needs to be called at startup and *not* during every checkpoint
727 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
728 : * state thereafter can be recovered by looking at commit records.
729 : */
730 : void
731 1008 : StartupReplicationOrigin(void)
732 : {
733 1008 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
734 : int fd;
735 : int readBytes;
736 1008 : uint32 magic = REPLICATION_STATE_MAGIC;
737 1008 : int last_state = 0;
738 : pg_crc32c file_crc;
739 : pg_crc32c crc;
740 :
741 : /* don't want to overwrite already existing state */
742 : #ifdef USE_ASSERT_CHECKING
743 : static bool already_started = false;
744 :
745 : Assert(!already_started);
746 : already_started = true;
747 : #endif
748 :
749 1008 : if (max_active_replication_origins == 0)
750 52 : return;
751 :
752 1007 : INIT_CRC32C(crc);
753 :
754 1007 : elog(DEBUG2, "starting up replication origin progress state");
755 :
756 1007 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
757 :
758 : /*
759 : * might have had max_active_replication_origins == 0 last run, or we just
760 : * brought up a standby.
761 : */
762 1007 : if (fd < 0 && errno == ENOENT)
763 51 : return;
764 956 : else if (fd < 0)
765 0 : ereport(PANIC,
766 : (errcode_for_file_access(),
767 : errmsg("could not open file \"%s\": %m",
768 : path)));
769 :
770 : /* verify magic, that is written even if nothing was active */
771 956 : readBytes = read(fd, &magic, sizeof(magic));
772 956 : if (readBytes != sizeof(magic))
773 : {
774 0 : if (readBytes < 0)
775 0 : ereport(PANIC,
776 : (errcode_for_file_access(),
777 : errmsg("could not read file \"%s\": %m",
778 : path)));
779 : else
780 0 : ereport(PANIC,
781 : (errcode(ERRCODE_DATA_CORRUPTED),
782 : errmsg("could not read file \"%s\": read %d of %zu",
783 : path, readBytes, sizeof(magic))));
784 : }
785 956 : COMP_CRC32C(crc, &magic, sizeof(magic));
786 :
787 956 : if (magic != REPLICATION_STATE_MAGIC)
788 0 : ereport(PANIC,
789 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
790 : magic, REPLICATION_STATE_MAGIC)));
791 :
792 : /* we can skip locking here, no other access is possible */
793 :
794 : /* recover individual states, until there are no more to be found */
795 : while (true)
796 31 : {
797 : ReplicationStateOnDisk disk_state;
798 :
799 987 : readBytes = read(fd, &disk_state, sizeof(disk_state));
800 :
801 987 : if (readBytes < 0)
802 : {
803 0 : ereport(PANIC,
804 : (errcode_for_file_access(),
805 : errmsg("could not read file \"%s\": %m",
806 : path)));
807 : }
808 :
809 : /* no further data */
810 987 : if (readBytes == sizeof(crc))
811 : {
812 956 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
813 956 : break;
814 : }
815 :
816 31 : if (readBytes != sizeof(disk_state))
817 : {
818 0 : ereport(PANIC,
819 : (errcode_for_file_access(),
820 : errmsg("could not read file \"%s\": read %d of %zu",
821 : path, readBytes, sizeof(disk_state))));
822 : }
823 :
824 31 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
825 :
826 31 : if (last_state == max_active_replication_origins)
827 0 : ereport(PANIC,
828 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
829 : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
830 :
831 : /* copy data to shared memory */
832 31 : replication_states[last_state].roident = disk_state.roident;
833 31 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
834 31 : last_state++;
835 :
836 31 : ereport(LOG,
837 : errmsg("recovered replication state of node %d to %X/%08X",
838 : disk_state.roident,
839 : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
840 : }
841 :
842 : /* now check checksum */
843 956 : FIN_CRC32C(crc);
844 956 : if (file_crc != crc)
845 0 : ereport(PANIC,
846 : (errcode(ERRCODE_DATA_CORRUPTED),
847 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
848 : crc, file_crc)));
849 :
850 956 : if (CloseTransientFile(fd) != 0)
851 0 : ereport(PANIC,
852 : (errcode_for_file_access(),
853 : errmsg("could not close file \"%s\": %m",
854 : path)));
855 : }
856 :
857 : void
858 4 : replorigin_redo(XLogReaderState *record)
859 : {
860 4 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
861 :
862 4 : switch (info)
863 : {
864 2 : case XLOG_REPLORIGIN_SET:
865 : {
866 2 : xl_replorigin_set *xlrec =
867 2 : (xl_replorigin_set *) XLogRecGetData(record);
868 :
869 2 : replorigin_advance(xlrec->node_id,
870 : xlrec->remote_lsn, record->EndRecPtr,
871 2 : xlrec->force /* backward */ ,
872 : false /* WAL log */ );
873 2 : break;
874 : }
875 2 : case XLOG_REPLORIGIN_DROP:
876 : {
877 : xl_replorigin_drop *xlrec;
878 : int i;
879 :
880 2 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
881 :
882 3 : for (i = 0; i < max_active_replication_origins; i++)
883 : {
884 3 : ReplicationState *state = &replication_states[i];
885 :
886 : /* found our slot */
887 3 : if (state->roident == xlrec->node_id)
888 : {
889 : /* reset entry */
890 2 : state->roident = InvalidReplOriginId;
891 2 : state->remote_lsn = InvalidXLogRecPtr;
892 2 : state->local_lsn = InvalidXLogRecPtr;
893 2 : break;
894 : }
895 : }
896 2 : break;
897 : }
898 0 : default:
899 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
900 : }
901 4 : }
902 :
903 :
904 : /*
905 : * Tell the replication origin progress machinery that a commit from 'node'
906 : * that originated at the LSN remote_commit on the remote node was replayed
907 : * successfully and that we don't need to do so again. In combination with
908 : * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
909 : * that ensures we won't lose knowledge about that after a crash if the
910 : * transaction had a persistent effect (think of asynchronous commits).
911 : *
912 : * local_commit needs to be a local LSN of the commit so that we can make sure
913 : * upon a checkpoint that enough WAL has been persisted to disk.
914 : *
915 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
916 : * unless running in recovery.
917 : */
918 : void
919 253 : replorigin_advance(ReplOriginId node,
920 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
921 : bool go_backward, bool wal_log)
922 : {
923 : int i;
924 253 : ReplicationState *replication_state = NULL;
925 253 : ReplicationState *free_state = NULL;
926 :
927 : Assert(node != InvalidReplOriginId);
928 :
929 : /* we don't track DoNotReplicateId */
930 253 : if (node == DoNotReplicateId)
931 0 : return;
932 :
933 : /*
934 : * XXX: For the case where this is called by WAL replay, it'd be more
935 : * efficient to restore into a backend local hashtable and only dump into
936 : * shmem after recovery is finished. Let's wait with implementing that
937 : * till it's shown to be a measurable expense
938 : */
939 :
940 : /* Lock exclusively, as we may have to create a new table entry. */
941 253 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
942 :
943 : /*
944 : * Search for either an existing slot for the origin, or a free one we can
945 : * use.
946 : */
947 2326 : for (i = 0; i < max_active_replication_origins; i++)
948 : {
949 2121 : ReplicationState *curstate = &replication_states[i];
950 :
951 : /* remember where to insert if necessary */
952 2121 : if (curstate->roident == InvalidReplOriginId &&
953 : free_state == NULL)
954 : {
955 213 : free_state = curstate;
956 213 : continue;
957 : }
958 :
959 : /* not our slot */
960 1908 : if (curstate->roident != node)
961 : {
962 1860 : continue;
963 : }
964 :
965 : /* ok, found slot */
966 48 : replication_state = curstate;
967 :
968 48 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
969 :
970 : /* Make sure it's not used by somebody else */
971 48 : if (replication_state->refcount > 0)
972 : {
973 0 : ereport(ERROR,
974 : (errcode(ERRCODE_OBJECT_IN_USE),
975 : (replication_state->acquired_by != 0)
976 : ? errmsg("replication origin with ID %d is already active for PID %d",
977 : replication_state->roident,
978 : replication_state->acquired_by)
979 : : errmsg("replication origin with ID %d is already active in another process",
980 : replication_state->roident)));
981 : }
982 :
983 48 : break;
984 : }
985 :
986 253 : if (replication_state == NULL && free_state == NULL)
987 0 : ereport(ERROR,
988 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
989 : errmsg("could not find free replication state slot for replication origin with ID %d",
990 : node),
991 : errhint("Increase \"max_active_replication_origins\" and try again.")));
992 :
993 253 : if (replication_state == NULL)
994 : {
995 : /* initialize new slot */
996 205 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
997 205 : replication_state = free_state;
998 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
999 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
1000 205 : replication_state->roident = node;
1001 : }
1002 :
1003 : Assert(replication_state->roident != InvalidReplOriginId);
1004 :
1005 : /*
1006 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1007 : * and the standby gets the message. Primarily this will be called during
1008 : * WAL replay (of commit records) where no WAL logging is necessary.
1009 : */
1010 253 : if (wal_log)
1011 : {
1012 : xl_replorigin_set xlrec;
1013 :
1014 212 : xlrec.remote_lsn = remote_commit;
1015 212 : xlrec.node_id = node;
1016 212 : xlrec.force = go_backward;
1017 :
1018 212 : XLogBeginInsert();
1019 212 : XLogRegisterData(&xlrec, sizeof(xlrec));
1020 :
1021 212 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1022 : }
1023 :
1024 : /*
1025 : * Due to - harmless - race conditions during a checkpoint we could see
1026 : * values here that are older than the ones we already have in memory. We
1027 : * could also see older values for prepared transactions when the prepare
1028 : * is sent at a later point of time along with commit prepared and there
1029 : * are other transactions commits between prepare and commit prepared. See
1030 : * ReorderBufferFinishPrepared. Don't overwrite those.
1031 : */
1032 253 : if (go_backward || replication_state->remote_lsn < remote_commit)
1033 246 : replication_state->remote_lsn = remote_commit;
1034 253 : if (XLogRecPtrIsValid(local_commit) &&
1035 38 : (go_backward || replication_state->local_lsn < local_commit))
1036 40 : replication_state->local_lsn = local_commit;
1037 253 : LWLockRelease(&replication_state->lock);
1038 :
1039 : /*
1040 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1041 : * otherwise be dropped anytime.
1042 : */
1043 253 : LWLockRelease(ReplicationOriginLock);
1044 : }
1045 :
1046 :
1047 : XLogRecPtr
1048 8 : replorigin_get_progress(ReplOriginId node, bool flush)
1049 : {
1050 : int i;
1051 8 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1052 8 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1053 :
1054 : /* prevent slots from being concurrently dropped */
1055 8 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1056 :
1057 38 : for (i = 0; i < max_active_replication_origins; i++)
1058 : {
1059 : ReplicationState *state;
1060 :
1061 35 : state = &replication_states[i];
1062 :
1063 35 : if (state->roident == node)
1064 : {
1065 5 : LWLockAcquire(&state->lock, LW_SHARED);
1066 :
1067 5 : remote_lsn = state->remote_lsn;
1068 5 : local_lsn = state->local_lsn;
1069 :
1070 5 : LWLockRelease(&state->lock);
1071 :
1072 5 : break;
1073 : }
1074 : }
1075 :
1076 8 : LWLockRelease(ReplicationOriginLock);
1077 :
1078 8 : if (flush && XLogRecPtrIsValid(local_lsn))
1079 1 : XLogFlush(local_lsn);
1080 :
1081 8 : return remote_lsn;
1082 : }
1083 :
1084 : /* Helper function to reset the session replication origin */
1085 : static void
1086 519 : replorigin_session_reset_internal(void)
1087 : {
1088 : ConditionVariable *cv;
1089 :
1090 : Assert(session_replication_state != NULL);
1091 :
1092 519 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1093 :
1094 : /* The origin must be held by at least one process at this point. */
1095 : Assert(session_replication_state->refcount > 0);
1096 :
1097 : /*
1098 : * Reset the PID only if the current session is the first to set up this
1099 : * origin. This avoids clearing the first process's PID when any other
1100 : * session releases the origin.
1101 : */
1102 519 : if (session_replication_state->acquired_by == MyProcPid)
1103 505 : session_replication_state->acquired_by = 0;
1104 :
1105 519 : session_replication_state->refcount--;
1106 :
1107 519 : cv = &session_replication_state->origin_cv;
1108 519 : session_replication_state = NULL;
1109 :
1110 519 : LWLockRelease(ReplicationOriginLock);
1111 :
1112 519 : ConditionVariableBroadcast(cv);
1113 519 : }
1114 :
1115 : /*
1116 : * Tear down a (possibly) configured session replication origin during process
1117 : * exit.
1118 : */
1119 : static void
1120 516 : ReplicationOriginExitCleanup(int code, Datum arg)
1121 : {
1122 516 : if (session_replication_state == NULL)
1123 200 : return;
1124 :
1125 316 : replorigin_session_reset_internal();
1126 : }
1127 :
1128 : /*
1129 : * Setup a replication origin in the shared memory struct if it doesn't
1130 : * already exist and cache access to the specific ReplicationSlot so the
1131 : * array doesn't have to be searched when calling
1132 : * replorigin_session_advance().
1133 : *
1134 : * Normally only one such cached origin can exist per process so the cached
1135 : * value can only be set again after the previous value is torn down with
1136 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1137 : * (meaning the slot is not allowed to be already acquired by another process).
1138 : *
1139 : * However, sometimes multiple processes can safely re-use the same origin slot
1140 : * (for example, multiple parallel apply processes can safely use the same
1141 : * origin, provided they maintain commit order by allowing only one process to
1142 : * commit at a time). For this case the first process must pass acquired_by =
1143 : * 0, and then the other processes sharing that same origin can pass
1144 : * acquired_by = PID of the first process.
1145 : */
1146 : void
1147 522 : replorigin_session_setup(ReplOriginId node, int acquired_by)
1148 : {
1149 : static bool registered_cleanup;
1150 : int i;
1151 522 : int free_slot = -1;
1152 :
1153 522 : if (!registered_cleanup)
1154 : {
1155 516 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1156 516 : registered_cleanup = true;
1157 : }
1158 :
1159 : Assert(max_active_replication_origins > 0);
1160 :
1161 522 : if (session_replication_state != NULL)
1162 1 : ereport(ERROR,
1163 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1164 : errmsg("cannot setup replication origin when one is already setup")));
1165 :
1166 : /* Lock exclusively, as we may have to create a new table entry. */
1167 521 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1168 :
1169 : /*
1170 : * Search for either an existing slot for the origin, or a free one we can
1171 : * use.
1172 : */
1173 2074 : for (i = 0; i < max_active_replication_origins; i++)
1174 : {
1175 1949 : ReplicationState *curstate = &replication_states[i];
1176 :
1177 : /* remember where to insert if necessary */
1178 1949 : if (curstate->roident == InvalidReplOriginId &&
1179 : free_slot == -1)
1180 : {
1181 128 : free_slot = i;
1182 128 : continue;
1183 : }
1184 :
1185 : /* not our slot */
1186 1821 : if (curstate->roident != node)
1187 1425 : continue;
1188 :
1189 396 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1190 : {
1191 1 : ereport(ERROR,
1192 : (errcode(ERRCODE_OBJECT_IN_USE),
1193 : errmsg("replication origin with ID %d is already active for PID %d",
1194 : curstate->roident, curstate->acquired_by)));
1195 : }
1196 :
1197 395 : else if (curstate->acquired_by != acquired_by)
1198 : {
1199 0 : ereport(ERROR,
1200 : (errcode(ERRCODE_OBJECT_IN_USE),
1201 : errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1202 : node, acquired_by)));
1203 : }
1204 :
1205 : /*
1206 : * The origin is in use, but PID is not recorded. This can happen if
1207 : * the process that originally acquired the origin exited without
1208 : * releasing it. To ensure correctness, other processes cannot acquire
1209 : * the origin until all processes currently using it have released it.
1210 : */
1211 395 : else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1212 0 : ereport(ERROR,
1213 : (errcode(ERRCODE_OBJECT_IN_USE),
1214 : errmsg("replication origin with ID %d is already active in another process",
1215 : curstate->roident)));
1216 :
1217 : /* ok, found slot */
1218 395 : session_replication_state = curstate;
1219 395 : break;
1220 : }
1221 :
1222 :
1223 520 : if (session_replication_state == NULL && free_slot == -1)
1224 0 : ereport(ERROR,
1225 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1226 : errmsg("could not find free replication state slot for replication origin with ID %d",
1227 : node),
1228 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1229 520 : else if (session_replication_state == NULL)
1230 : {
1231 125 : if (acquired_by)
1232 1 : ereport(ERROR,
1233 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1234 : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1235 : acquired_by, node)));
1236 :
1237 : /* initialize new slot */
1238 124 : session_replication_state = &replication_states[free_slot];
1239 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1240 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
1241 124 : session_replication_state->roident = node;
1242 : }
1243 :
1244 :
1245 : Assert(session_replication_state->roident != InvalidReplOriginId);
1246 :
1247 519 : if (acquired_by == 0)
1248 : {
1249 505 : session_replication_state->acquired_by = MyProcPid;
1250 : Assert(session_replication_state->refcount == 0);
1251 : }
1252 : else
1253 : {
1254 : /*
1255 : * Sanity check: the origin must already be acquired by the process
1256 : * passed as input, and at least one process must be using it.
1257 : */
1258 : Assert(session_replication_state->acquired_by == acquired_by);
1259 : Assert(session_replication_state->refcount > 0);
1260 : }
1261 :
1262 519 : session_replication_state->refcount++;
1263 :
1264 519 : LWLockRelease(ReplicationOriginLock);
1265 :
1266 : /* probably this one is pointless */
1267 519 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1268 519 : }
1269 :
1270 : /*
1271 : * Reset replay state previously setup in this session.
1272 : *
1273 : * This function may only be called if an origin was setup with
1274 : * replorigin_session_setup().
1275 : */
1276 : void
1277 205 : replorigin_session_reset(void)
1278 : {
1279 : Assert(max_active_replication_origins != 0);
1280 :
1281 205 : if (session_replication_state == NULL)
1282 1 : ereport(ERROR,
1283 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1284 : errmsg("no replication origin is configured")));
1285 :
1286 : /*
1287 : * Restrict explicit resetting of the replication origin if it was first
1288 : * acquired by this process and others are still using it. While the
1289 : * system handles this safely (as happens if the first session exits
1290 : * without calling reset), it is best to avoid doing so.
1291 : */
1292 204 : if (session_replication_state->acquired_by == MyProcPid &&
1293 202 : session_replication_state->refcount > 1)
1294 1 : ereport(ERROR,
1295 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1296 : errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1297 : session_replication_state->roident),
1298 : errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1299 : errhint("Reset the replication origin in all other processes before retrying.")));
1300 :
1301 203 : replorigin_session_reset_internal();
1302 203 : }
1303 :
1304 : /*
1305 : * Do the same work replorigin_advance() does, just on the session's
1306 : * configured origin.
1307 : *
1308 : * This is noticeably cheaper than using replorigin_advance().
1309 : */
1310 : void
1311 1131 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1312 : {
1313 : Assert(session_replication_state != NULL);
1314 : Assert(session_replication_state->roident != InvalidReplOriginId);
1315 :
1316 1131 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1317 1131 : if (session_replication_state->local_lsn < local_commit)
1318 1131 : session_replication_state->local_lsn = local_commit;
1319 1131 : if (session_replication_state->remote_lsn < remote_commit)
1320 524 : session_replication_state->remote_lsn = remote_commit;
1321 1131 : LWLockRelease(&session_replication_state->lock);
1322 1131 : }
1323 :
1324 : /*
1325 : * Ask the machinery about the point up to which we successfully replayed
1326 : * changes from an already setup replication origin.
1327 : */
1328 : XLogRecPtr
1329 291 : replorigin_session_get_progress(bool flush)
1330 : {
1331 : XLogRecPtr remote_lsn;
1332 : XLogRecPtr local_lsn;
1333 :
1334 : Assert(session_replication_state != NULL);
1335 :
1336 291 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1337 291 : remote_lsn = session_replication_state->remote_lsn;
1338 291 : local_lsn = session_replication_state->local_lsn;
1339 291 : LWLockRelease(&session_replication_state->lock);
1340 :
1341 291 : if (flush && XLogRecPtrIsValid(local_lsn))
1342 1 : XLogFlush(local_lsn);
1343 :
1344 291 : return remote_lsn;
1345 : }
1346 :
1347 : /*
1348 : * Clear the per-transaction replication origin state.
1349 : *
1350 : * replorigin_session_origin is also cleared if clear_origin is set.
1351 : */
1352 : void
1353 817 : replorigin_xact_clear(bool clear_origin)
1354 : {
1355 817 : replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
1356 817 : replorigin_xact_state.origin_timestamp = 0;
1357 817 : if (clear_origin)
1358 817 : replorigin_xact_state.origin = InvalidReplOriginId;
1359 817 : }
1360 :
1361 :
1362 : /* ---------------------------------------------------------------------------
1363 : * SQL functions for working with replication origin.
1364 : *
1365 : * These mostly should be fairly short wrappers around more generic functions.
1366 : * ---------------------------------------------------------------------------
1367 : */
1368 :
1369 : /*
1370 : * Create replication origin for the passed in name, and return the assigned
1371 : * oid.
1372 : */
1373 : Datum
1374 14 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1375 : {
1376 : char *name;
1377 : ReplOriginId roident;
1378 :
1379 14 : replorigin_check_prerequisites(false, false);
1380 :
1381 14 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1382 :
1383 : /*
1384 : * Replication origins "any and "none" are reserved for system options.
1385 : * The origins "pg_xxx" are reserved for internal use.
1386 : */
1387 14 : if (IsReservedName(name) || IsReservedOriginName(name))
1388 3 : ereport(ERROR,
1389 : (errcode(ERRCODE_RESERVED_NAME),
1390 : errmsg("replication origin name \"%s\" is reserved",
1391 : name),
1392 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1393 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1394 :
1395 : /*
1396 : * If built with appropriate switch, whine when regression-testing
1397 : * conventions for replication origin names are violated.
1398 : */
1399 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1400 : if (strncmp(name, "regress_", 8) != 0)
1401 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1402 : #endif
1403 :
1404 11 : roident = replorigin_create(name);
1405 :
1406 7 : pfree(name);
1407 :
1408 7 : PG_RETURN_OID(roident);
1409 : }
1410 :
1411 : /*
1412 : * Drop replication origin.
1413 : */
1414 : Datum
1415 9 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1416 : {
1417 : char *name;
1418 :
1419 9 : replorigin_check_prerequisites(false, false);
1420 :
1421 9 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1422 :
1423 9 : replorigin_drop_by_name(name, false, true);
1424 :
1425 8 : pfree(name);
1426 :
1427 8 : PG_RETURN_VOID();
1428 : }
1429 :
1430 : /*
1431 : * Return oid of a replication origin.
1432 : */
1433 : Datum
1434 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1435 : {
1436 : char *name;
1437 : ReplOriginId roident;
1438 :
1439 0 : replorigin_check_prerequisites(false, false);
1440 :
1441 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1442 0 : roident = replorigin_by_name(name, true);
1443 :
1444 0 : pfree(name);
1445 :
1446 0 : if (OidIsValid(roident))
1447 0 : PG_RETURN_OID(roident);
1448 0 : PG_RETURN_NULL();
1449 : }
1450 :
1451 : /*
1452 : * Setup a replication origin for this session.
1453 : */
1454 : Datum
1455 11 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1456 : {
1457 : char *name;
1458 : ReplOriginId origin;
1459 : int pid;
1460 :
1461 11 : replorigin_check_prerequisites(true, false);
1462 :
1463 11 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1464 11 : origin = replorigin_by_name(name, false);
1465 10 : pid = PG_GETARG_INT32(1);
1466 10 : replorigin_session_setup(origin, pid);
1467 :
1468 8 : replorigin_xact_state.origin = origin;
1469 :
1470 8 : pfree(name);
1471 :
1472 8 : PG_RETURN_VOID();
1473 : }
1474 :
1475 : /*
1476 : * Reset previously setup origin in this session
1477 : */
1478 : Datum
1479 10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1480 : {
1481 10 : replorigin_check_prerequisites(true, false);
1482 :
1483 10 : replorigin_session_reset();
1484 :
1485 8 : replorigin_xact_clear(true);
1486 :
1487 8 : PG_RETURN_VOID();
1488 : }
1489 :
1490 : /*
1491 : * Has a replication origin been setup for this session.
1492 : */
1493 : Datum
1494 4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1495 : {
1496 4 : replorigin_check_prerequisites(false, false);
1497 :
1498 4 : PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
1499 : }
1500 :
1501 :
1502 : /*
1503 : * Return the replication progress for origin setup in the current session.
1504 : *
1505 : * If 'flush' is set to true it is ensured that the returned value corresponds
1506 : * to a local transaction that has been flushed. This is useful if asynchronous
1507 : * commits are used when replaying replicated transactions.
1508 : */
1509 : Datum
1510 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1511 : {
1512 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1513 2 : bool flush = PG_GETARG_BOOL(0);
1514 :
1515 2 : replorigin_check_prerequisites(true, false);
1516 :
1517 2 : if (session_replication_state == NULL)
1518 0 : ereport(ERROR,
1519 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1520 : errmsg("no replication origin is configured")));
1521 :
1522 2 : remote_lsn = replorigin_session_get_progress(flush);
1523 :
1524 2 : if (!XLogRecPtrIsValid(remote_lsn))
1525 0 : PG_RETURN_NULL();
1526 :
1527 2 : PG_RETURN_LSN(remote_lsn);
1528 : }
1529 :
1530 : Datum
1531 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1532 : {
1533 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1534 :
1535 1 : replorigin_check_prerequisites(true, false);
1536 :
1537 1 : if (session_replication_state == NULL)
1538 0 : ereport(ERROR,
1539 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1540 : errmsg("no replication origin is configured")));
1541 :
1542 1 : replorigin_xact_state.origin_lsn = location;
1543 1 : replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1544 :
1545 1 : PG_RETURN_VOID();
1546 : }
1547 :
1548 : Datum
1549 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1550 : {
1551 0 : replorigin_check_prerequisites(true, false);
1552 :
1553 : /* Do not clear the session origin */
1554 0 : replorigin_xact_clear(false);
1555 :
1556 0 : PG_RETURN_VOID();
1557 : }
1558 :
1559 :
1560 : Datum
1561 3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1562 : {
1563 3 : text *name = PG_GETARG_TEXT_PP(0);
1564 3 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1565 : ReplOriginId node;
1566 :
1567 3 : replorigin_check_prerequisites(true, false);
1568 :
1569 : /* lock to prevent the replication origin from vanishing */
1570 3 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1571 :
1572 3 : node = replorigin_by_name(text_to_cstring(name), false);
1573 :
1574 : /*
1575 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1576 : * xact hasn't committed yet. This is why this function should be used to
1577 : * set up the initial replication state, but not for replay.
1578 : */
1579 2 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1580 : true /* go backward */ , true /* WAL log */ );
1581 :
1582 2 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1583 :
1584 2 : PG_RETURN_VOID();
1585 : }
1586 :
1587 :
1588 : /*
1589 : * Return the replication progress for an individual replication origin.
1590 : *
1591 : * If 'flush' is set to true it is ensured that the returned value corresponds
1592 : * to a local transaction that has been flushed. This is useful if asynchronous
1593 : * commits are used when replaying replicated transactions.
1594 : */
1595 : Datum
1596 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1597 : {
1598 : char *name;
1599 : bool flush;
1600 : ReplOriginId roident;
1601 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1602 :
1603 3 : replorigin_check_prerequisites(true, true);
1604 :
1605 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1606 3 : flush = PG_GETARG_BOOL(1);
1607 :
1608 3 : roident = replorigin_by_name(name, false);
1609 : Assert(OidIsValid(roident));
1610 :
1611 2 : remote_lsn = replorigin_get_progress(roident, flush);
1612 :
1613 2 : if (!XLogRecPtrIsValid(remote_lsn))
1614 0 : PG_RETURN_NULL();
1615 :
1616 2 : PG_RETURN_LSN(remote_lsn);
1617 : }
1618 :
1619 :
1620 : Datum
1621 11 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1622 : {
1623 11 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1624 : int i;
1625 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1626 :
1627 : /* we want to return 0 rows if slot is set to zero */
1628 11 : replorigin_check_prerequisites(false, true);
1629 :
1630 11 : InitMaterializedSRF(fcinfo, 0);
1631 :
1632 : /* prevent slots from being concurrently dropped */
1633 11 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1634 :
1635 : /*
1636 : * Iterate through all possible replication_states, display if they are
1637 : * filled. Note that we do not take any locks, so slightly corrupted/out
1638 : * of date values are a possibility.
1639 : */
1640 121 : for (i = 0; i < max_active_replication_origins; i++)
1641 : {
1642 : ReplicationState *state;
1643 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1644 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1645 : char *roname;
1646 :
1647 110 : state = &replication_states[i];
1648 :
1649 : /* unused slot, nothing to display */
1650 110 : if (state->roident == InvalidReplOriginId)
1651 97 : continue;
1652 :
1653 13 : memset(values, 0, sizeof(values));
1654 13 : memset(nulls, 1, sizeof(nulls));
1655 :
1656 13 : values[0] = ObjectIdGetDatum(state->roident);
1657 13 : nulls[0] = false;
1658 :
1659 : /*
1660 : * We're not preventing the origin to be dropped concurrently, so
1661 : * silently accept that it might be gone.
1662 : */
1663 13 : if (replorigin_by_oid(state->roident, true,
1664 : &roname))
1665 : {
1666 13 : values[1] = CStringGetTextDatum(roname);
1667 13 : nulls[1] = false;
1668 : }
1669 :
1670 13 : LWLockAcquire(&state->lock, LW_SHARED);
1671 :
1672 13 : values[2] = LSNGetDatum(state->remote_lsn);
1673 13 : nulls[2] = false;
1674 :
1675 13 : values[3] = LSNGetDatum(state->local_lsn);
1676 13 : nulls[3] = false;
1677 :
1678 13 : LWLockRelease(&state->lock);
1679 :
1680 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1681 : values, nulls);
1682 : }
1683 :
1684 11 : LWLockRelease(ReplicationOriginLock);
1685 :
1686 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1687 :
1688 11 : return (Datum) 0;
1689 : }
|