Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consists of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "utils/builtins.h"
92 : #include "utils/fmgroids.h"
93 : #include "utils/guc.h"
94 : #include "utils/pg_lsn.h"
95 : #include "utils/rel.h"
96 : #include "utils/snapmgr.h"
97 : #include "utils/syscache.h"
98 : #include "utils/wait_event.h"
99 :
100 : /* paths for replication origin checkpoint files */
101 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
102 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
103 :
104 : /* GUC variables */
105 : int max_active_replication_origins = 10;
106 :
107 : /*
108 : * Replay progress of a single remote node.
109 : */
110 : typedef struct ReplicationState
111 : {
112 : /*
113 : * Local identifier for the remote node.
114 : */
115 : ReplOriginId roident;
116 :
117 : /*
118 : * Location of the latest commit from the remote side.
119 : */
120 : XLogRecPtr remote_lsn;
121 :
122 : /*
123 : * Remember the local lsn of the commit record so we can XLogFlush() to it
124 : * during a checkpoint so we know the commit record actually is safe on
125 : * disk.
126 : */
127 : XLogRecPtr local_lsn;
128 :
129 : /*
130 : * PID of backend that's acquired slot, or 0 if none.
131 : */
132 : int acquired_by;
133 :
134 : /* Count of processes that are currently using this origin. */
135 : int refcount;
136 :
137 : /*
138 : * Condition variable that's signaled when acquired_by changes.
139 : */
140 : ConditionVariable origin_cv;
141 :
142 : /*
143 : * Lock protecting remote_lsn and local_lsn.
144 : */
145 : LWLock lock;
146 : } ReplicationState;
147 :
148 : /*
149 : * On disk version of ReplicationState.
150 : */
151 : typedef struct ReplicationStateOnDisk
152 : {
153 : ReplOriginId roident;
154 : XLogRecPtr remote_lsn;
155 : } ReplicationStateOnDisk;
156 :
157 :
158 : typedef struct ReplicationStateCtl
159 : {
160 : /* Tranche to use for per-origin LWLocks */
161 : int tranche_id;
162 : /* Array of length max_active_replication_origins */
163 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
164 : } ReplicationStateCtl;
165 :
166 : /* Global variable for per-transaction replication origin state */
167 : ReplOriginXactState replorigin_xact_state = {
168 : .origin = InvalidReplOriginId, /* assumed identity */
169 : .origin_lsn = InvalidXLogRecPtr,
170 : .origin_timestamp = 0
171 : };
172 :
173 : /*
174 : * Base address into a shared memory array of replication states of size
175 : * max_active_replication_origins.
176 : */
177 : static ReplicationState *replication_states;
178 :
179 : /*
180 : * Actual shared memory block (replication_states[] is now part of this).
181 : */
182 : static ReplicationStateCtl *replication_states_ctl;
183 :
184 : /*
185 : * We keep a pointer to this backend's ReplicationState to avoid having to
186 : * search the replication_states array in replorigin_session_advance for each
187 : * remote commit. (Ownership of a backend's own entry can only be changed by
188 : * that backend.)
189 : */
190 : static ReplicationState *session_replication_state = NULL;
191 :
192 : /* Magic for on disk files. */
193 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
194 :
195 : static void
196 69 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
197 : {
198 69 : if (check_origins && max_active_replication_origins == 0)
199 0 : ereport(ERROR,
200 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
201 : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
202 :
203 69 : if (!recoveryOK && RecoveryInProgress())
204 0 : ereport(ERROR,
205 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
206 : errmsg("cannot manipulate replication origins during recovery")));
207 69 : }
208 :
209 :
210 : /*
211 : * IsReservedOriginName
212 : * True iff name is either "none" or "any".
213 : */
214 : static bool
215 14 : IsReservedOriginName(const char *name)
216 : {
217 27 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
218 13 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
219 : }
220 :
221 : /* ---------------------------------------------------------------------------
222 : * Functions for working with replication origins themselves.
223 : * ---------------------------------------------------------------------------
224 : */
225 :
226 : /*
227 : * Check for a persistent replication origin identified by name.
228 : *
229 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
230 : */
231 : ReplOriginId
232 1097 : replorigin_by_name(const char *roname, bool missing_ok)
233 : {
234 : Form_pg_replication_origin ident;
235 1097 : Oid roident = InvalidOid;
236 : HeapTuple tuple;
237 : Datum roname_d;
238 :
239 1097 : roname_d = CStringGetTextDatum(roname);
240 :
241 1097 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
242 1097 : if (HeapTupleIsValid(tuple))
243 : {
244 693 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
245 693 : roident = ident->roident;
246 693 : ReleaseSysCache(tuple);
247 : }
248 404 : else if (!missing_ok)
249 4 : ereport(ERROR,
250 : (errcode(ERRCODE_UNDEFINED_OBJECT),
251 : errmsg("replication origin \"%s\" does not exist",
252 : roname)));
253 :
254 1093 : return roident;
255 : }
256 :
257 : /*
258 : * Create a replication origin.
259 : *
260 : * Needs to be called in a transaction.
261 : */
262 : ReplOriginId
263 409 : replorigin_create(const char *roname)
264 : {
265 : Oid roident;
266 409 : HeapTuple tuple = NULL;
267 : Relation rel;
268 : Datum roname_d;
269 : SnapshotData SnapshotDirty;
270 : SysScanDesc scan;
271 : ScanKeyData key;
272 :
273 : /*
274 : * To avoid needing a TOAST table for pg_replication_origin, we limit
275 : * replication origin names to 512 bytes. This should be more than enough
276 : * for all practical use.
277 : */
278 409 : if (strlen(roname) > MAX_RONAME_LEN)
279 4 : ereport(ERROR,
280 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
281 : errmsg("replication origin name is too long"),
282 : errdetail("Replication origin names must be no longer than %d bytes.",
283 : MAX_RONAME_LEN)));
284 :
285 405 : roname_d = CStringGetTextDatum(roname);
286 :
287 : Assert(IsTransactionState());
288 :
289 : /*
290 : * We need the numeric replication origin to be 16bit wide, so we cannot
291 : * rely on the normal oid allocation. Instead we simply scan
292 : * pg_replication_origin for the first unused id. That's not particularly
293 : * efficient, but this should be a fairly infrequent operation - we can
294 : * easily spend a bit more code on this when it turns out it needs to be
295 : * faster.
296 : *
297 : * We handle concurrency by taking an exclusive lock (allowing reads!)
298 : * over the table for the duration of the search. Because we use a "dirty
299 : * snapshot" we can read rows that other in-progress sessions have
300 : * written, even though they would be invisible with normal snapshots. Due
301 : * to the exclusive lock there's no danger that new rows can appear while
302 : * we're checking.
303 : */
304 405 : InitDirtySnapshot(SnapshotDirty);
305 :
306 405 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
307 :
308 : /*
309 : * We want to be able to access pg_replication_origin without setting up a
310 : * snapshot. To make that safe, it needs to not have a TOAST table, since
311 : * TOASTed data cannot be fetched without a snapshot. As of this writing,
312 : * its only varlena column is roname, which we limit to 512 bytes to avoid
313 : * needing out-of-line storage. If you add a TOAST table to this catalog,
314 : * be sure to set up a snapshot everywhere it might be needed. For more
315 : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
316 : */
317 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
318 :
319 721 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
320 : {
321 : bool nulls[Natts_pg_replication_origin];
322 : Datum values[Natts_pg_replication_origin];
323 : bool collides;
324 :
325 721 : CHECK_FOR_INTERRUPTS();
326 :
327 721 : ScanKeyInit(&key,
328 : Anum_pg_replication_origin_roident,
329 : BTEqualStrategyNumber, F_OIDEQ,
330 : ObjectIdGetDatum(roident));
331 :
332 721 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
333 : true /* indexOK */ ,
334 : &SnapshotDirty,
335 : 1, &key);
336 :
337 721 : collides = HeapTupleIsValid(systable_getnext(scan));
338 :
339 721 : systable_endscan(scan);
340 :
341 721 : if (!collides)
342 : {
343 : /*
344 : * Ok, found an unused roident, insert the new row and do a CCI,
345 : * so our callers can look it up if they want to.
346 : */
347 405 : memset(&nulls, 0, sizeof(nulls));
348 :
349 405 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
350 405 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
351 :
352 405 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
353 405 : CatalogTupleInsert(rel, tuple);
354 404 : CommandCounterIncrement();
355 404 : break;
356 : }
357 : }
358 :
359 : /* now release lock again, */
360 404 : table_close(rel, ExclusiveLock);
361 :
362 404 : if (tuple == NULL)
363 0 : ereport(ERROR,
364 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
365 : errmsg("could not find free replication origin ID")));
366 :
367 404 : heap_freetuple(tuple);
368 404 : return roident;
369 : }
370 :
371 : /*
372 : * Helper function to drop a replication origin.
373 : */
374 : static void
375 352 : replorigin_state_clear(ReplOriginId roident, bool nowait)
376 : {
377 : int i;
378 :
379 : /*
380 : * Clean up the slot state info, if there is any matching slot.
381 : */
382 352 : restart:
383 352 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
384 :
385 1292 : for (i = 0; i < max_active_replication_origins; i++)
386 : {
387 1225 : ReplicationState *state = &replication_states[i];
388 :
389 1225 : if (state->roident == roident)
390 : {
391 : /* found our slot, is it busy? */
392 285 : if (state->refcount > 0)
393 : {
394 : ConditionVariable *cv;
395 :
396 0 : if (nowait)
397 0 : ereport(ERROR,
398 : (errcode(ERRCODE_OBJECT_IN_USE),
399 : (state->acquired_by != 0)
400 : ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
401 : state->roident,
402 : state->acquired_by)
403 : : errmsg("could not drop replication origin with ID %d, in use by another process",
404 : state->roident)));
405 :
406 : /*
407 : * We must wait and then retry. Since we don't know which CV
408 : * to wait on until here, we can't readily use
409 : * ConditionVariablePrepareToSleep (calling it here would be
410 : * wrong, since we could miss the signal if we did so); just
411 : * use ConditionVariableSleep directly.
412 : */
413 0 : cv = &state->origin_cv;
414 :
415 0 : LWLockRelease(ReplicationOriginLock);
416 :
417 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
418 0 : goto restart;
419 : }
420 :
421 : /* first make a WAL log entry */
422 : {
423 : xl_replorigin_drop xlrec;
424 :
425 285 : xlrec.node_id = roident;
426 285 : XLogBeginInsert();
427 285 : XLogRegisterData(&xlrec, sizeof(xlrec));
428 285 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
429 : }
430 :
431 : /* then clear the in-memory slot */
432 285 : state->roident = InvalidReplOriginId;
433 285 : state->remote_lsn = InvalidXLogRecPtr;
434 285 : state->local_lsn = InvalidXLogRecPtr;
435 285 : break;
436 : }
437 : }
438 352 : LWLockRelease(ReplicationOriginLock);
439 352 : ConditionVariableCancelSleep();
440 352 : }
441 :
442 : /*
443 : * Drop replication origin (by name).
444 : *
445 : * Needs to be called in a transaction.
446 : */
447 : void
448 550 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
449 : {
450 : ReplOriginId roident;
451 : Relation rel;
452 : HeapTuple tuple;
453 :
454 : Assert(IsTransactionState());
455 :
456 550 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
457 :
458 550 : roident = replorigin_by_name(name, missing_ok);
459 :
460 : /* Lock the origin to prevent concurrent drops. */
461 549 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
462 : AccessExclusiveLock);
463 :
464 549 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
465 549 : if (!HeapTupleIsValid(tuple))
466 : {
467 197 : if (!missing_ok)
468 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
469 : roident);
470 :
471 : /*
472 : * We don't need to retain the locks if the origin is already dropped.
473 : */
474 197 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
475 : AccessExclusiveLock);
476 197 : table_close(rel, RowExclusiveLock);
477 197 : return;
478 : }
479 :
480 352 : replorigin_state_clear(roident, nowait);
481 :
482 : /*
483 : * Now, we can delete the catalog entry.
484 : */
485 352 : CatalogTupleDelete(rel, &tuple->t_self);
486 352 : ReleaseSysCache(tuple);
487 :
488 352 : CommandCounterIncrement();
489 :
490 : /* We keep the lock on pg_replication_origin until commit */
491 352 : table_close(rel, NoLock);
492 : }
493 :
494 : /*
495 : * Lookup replication origin via its oid and return the name.
496 : *
497 : * The external name is palloc'd in the calling context.
498 : *
499 : * Returns true if the origin is known, false otherwise.
500 : */
501 : bool
502 27 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
503 : {
504 : HeapTuple tuple;
505 : Form_pg_replication_origin ric;
506 :
507 : Assert(OidIsValid((Oid) roident));
508 : Assert(roident != InvalidReplOriginId);
509 : Assert(roident != DoNotReplicateId);
510 :
511 27 : tuple = SearchSysCache1(REPLORIGIDENT,
512 : ObjectIdGetDatum((Oid) roident));
513 :
514 27 : if (HeapTupleIsValid(tuple))
515 : {
516 24 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
517 24 : *roname = text_to_cstring(&ric->roname);
518 24 : ReleaseSysCache(tuple);
519 :
520 24 : return true;
521 : }
522 : else
523 : {
524 3 : *roname = NULL;
525 :
526 3 : if (!missing_ok)
527 0 : ereport(ERROR,
528 : (errcode(ERRCODE_UNDEFINED_OBJECT),
529 : errmsg("replication origin with ID %d does not exist",
530 : roident)));
531 :
532 3 : return false;
533 : }
534 : }
535 :
536 :
537 : /* ---------------------------------------------------------------------------
538 : * Functions for handling replication progress.
539 : * ---------------------------------------------------------------------------
540 : */
541 :
542 : Size
543 4573 : ReplicationOriginShmemSize(void)
544 : {
545 4573 : Size size = 0;
546 :
547 4573 : if (max_active_replication_origins == 0)
548 2 : return size;
549 :
550 4571 : size = add_size(size, offsetof(ReplicationStateCtl, states));
551 :
552 4571 : size = add_size(size,
553 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
554 4571 : return size;
555 : }
556 :
557 : void
558 1182 : ReplicationOriginShmemInit(void)
559 : {
560 : bool found;
561 :
562 1182 : if (max_active_replication_origins == 0)
563 1 : return;
564 :
565 1181 : replication_states_ctl = (ReplicationStateCtl *)
566 1181 : ShmemInitStruct("ReplicationOriginState",
567 : ReplicationOriginShmemSize(),
568 : &found);
569 1181 : replication_states = replication_states_ctl->states;
570 :
571 1181 : if (!found)
572 : {
573 : int i;
574 :
575 96770 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
576 :
577 1181 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
578 :
579 12982 : for (i = 0; i < max_active_replication_origins; i++)
580 : {
581 11801 : LWLockInitialize(&replication_states[i].lock,
582 11801 : replication_states_ctl->tranche_id);
583 11801 : ConditionVariableInit(&replication_states[i].origin_cv);
584 : }
585 : }
586 : }
587 :
588 : /* ---------------------------------------------------------------------------
589 : * Perform a checkpoint of each replication origin's progress with respect to
590 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
591 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
592 : * if the transactions were originally committed asynchronously.
593 : *
594 : * We store checkpoints in the following format:
595 : * +-------+------------------------+------------------+-----+--------+
596 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
597 : * +-------+------------------------+------------------+-----+--------+
598 : *
599 : * So its just the magic, followed by the statically sized
600 : * ReplicationStateOnDisk structs. Note that the maximum number of
601 : * ReplicationState is determined by max_active_replication_origins.
602 : * ---------------------------------------------------------------------------
603 : */
604 : void
605 1844 : CheckPointReplicationOrigin(void)
606 : {
607 1844 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
608 1844 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
609 : int tmpfd;
610 : int i;
611 1844 : uint32 magic = REPLICATION_STATE_MAGIC;
612 : pg_crc32c crc;
613 :
614 1844 : if (max_active_replication_origins == 0)
615 1 : return;
616 :
617 1843 : INIT_CRC32C(crc);
618 :
619 : /* make sure no old temp file is remaining */
620 1843 : if (unlink(tmppath) < 0 && errno != ENOENT)
621 0 : ereport(PANIC,
622 : (errcode_for_file_access(),
623 : errmsg("could not remove file \"%s\": %m",
624 : tmppath)));
625 :
626 : /*
627 : * no other backend can perform this at the same time; only one checkpoint
628 : * can happen at a time.
629 : */
630 1843 : tmpfd = OpenTransientFile(tmppath,
631 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
632 1843 : if (tmpfd < 0)
633 0 : ereport(PANIC,
634 : (errcode_for_file_access(),
635 : errmsg("could not create file \"%s\": %m",
636 : tmppath)));
637 :
638 : /* write magic */
639 1843 : errno = 0;
640 1843 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
641 : {
642 : /* if write didn't set errno, assume problem is no disk space */
643 0 : if (errno == 0)
644 0 : errno = ENOSPC;
645 0 : ereport(PANIC,
646 : (errcode_for_file_access(),
647 : errmsg("could not write to file \"%s\": %m",
648 : tmppath)));
649 : }
650 1843 : COMP_CRC32C(crc, &magic, sizeof(magic));
651 :
652 : /* prevent concurrent creations/drops */
653 1843 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
654 :
655 : /* write actual data */
656 20273 : for (i = 0; i < max_active_replication_origins; i++)
657 : {
658 : ReplicationStateOnDisk disk_state;
659 18430 : ReplicationState *curstate = &replication_states[i];
660 : XLogRecPtr local_lsn;
661 :
662 18430 : if (curstate->roident == InvalidReplOriginId)
663 18377 : continue;
664 :
665 : /* zero, to avoid uninitialized padding bytes */
666 53 : memset(&disk_state, 0, sizeof(disk_state));
667 :
668 53 : LWLockAcquire(&curstate->lock, LW_SHARED);
669 :
670 53 : disk_state.roident = curstate->roident;
671 :
672 53 : disk_state.remote_lsn = curstate->remote_lsn;
673 53 : local_lsn = curstate->local_lsn;
674 :
675 53 : LWLockRelease(&curstate->lock);
676 :
677 : /* make sure we only write out a commit that's persistent */
678 53 : XLogFlush(local_lsn);
679 :
680 53 : errno = 0;
681 53 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
682 : sizeof(disk_state))
683 : {
684 : /* if write didn't set errno, assume problem is no disk space */
685 0 : if (errno == 0)
686 0 : errno = ENOSPC;
687 0 : ereport(PANIC,
688 : (errcode_for_file_access(),
689 : errmsg("could not write to file \"%s\": %m",
690 : tmppath)));
691 : }
692 :
693 53 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
694 : }
695 :
696 1843 : LWLockRelease(ReplicationOriginLock);
697 :
698 : /* write out the CRC */
699 1843 : FIN_CRC32C(crc);
700 1843 : errno = 0;
701 1843 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
702 : {
703 : /* if write didn't set errno, assume problem is no disk space */
704 0 : if (errno == 0)
705 0 : errno = ENOSPC;
706 0 : ereport(PANIC,
707 : (errcode_for_file_access(),
708 : errmsg("could not write to file \"%s\": %m",
709 : tmppath)));
710 : }
711 :
712 1843 : if (CloseTransientFile(tmpfd) != 0)
713 0 : ereport(PANIC,
714 : (errcode_for_file_access(),
715 : errmsg("could not close file \"%s\": %m",
716 : tmppath)));
717 :
718 : /* fsync, rename to permanent file, fsync file and directory */
719 1843 : durable_rename(tmppath, path, PANIC);
720 : }
721 :
722 : /*
723 : * Recover replication replay status from checkpoint data saved earlier by
724 : * CheckPointReplicationOrigin.
725 : *
726 : * This only needs to be called at startup and *not* during every checkpoint
727 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
728 : * state thereafter can be recovered by looking at commit records.
729 : */
730 : void
731 1034 : StartupReplicationOrigin(void)
732 : {
733 1034 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
734 : int fd;
735 : int readBytes;
736 1034 : uint32 magic = REPLICATION_STATE_MAGIC;
737 1034 : int last_state = 0;
738 : pg_crc32c file_crc;
739 : pg_crc32c crc;
740 :
741 : /* don't want to overwrite already existing state */
742 : #ifdef USE_ASSERT_CHECKING
743 : static bool already_started = false;
744 :
745 : Assert(!already_started);
746 : already_started = true;
747 : #endif
748 :
749 1034 : if (max_active_replication_origins == 0)
750 52 : return;
751 :
752 1033 : INIT_CRC32C(crc);
753 :
754 1033 : elog(DEBUG2, "starting up replication origin progress state");
755 :
756 1033 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
757 :
758 : /*
759 : * might have had max_active_replication_origins == 0 last run, or we just
760 : * brought up a standby.
761 : */
762 1033 : if (fd < 0 && errno == ENOENT)
763 51 : return;
764 982 : else if (fd < 0)
765 0 : ereport(PANIC,
766 : (errcode_for_file_access(),
767 : errmsg("could not open file \"%s\": %m",
768 : path)));
769 :
770 : /* verify magic, that is written even if nothing was active */
771 982 : readBytes = read(fd, &magic, sizeof(magic));
772 982 : if (readBytes != sizeof(magic))
773 : {
774 0 : if (readBytes < 0)
775 0 : ereport(PANIC,
776 : (errcode_for_file_access(),
777 : errmsg("could not read file \"%s\": %m",
778 : path)));
779 : else
780 0 : ereport(PANIC,
781 : (errcode(ERRCODE_DATA_CORRUPTED),
782 : errmsg("could not read file \"%s\": read %d of %zu",
783 : path, readBytes, sizeof(magic))));
784 : }
785 982 : COMP_CRC32C(crc, &magic, sizeof(magic));
786 :
787 982 : if (magic != REPLICATION_STATE_MAGIC)
788 0 : ereport(PANIC,
789 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
790 : magic, REPLICATION_STATE_MAGIC)));
791 :
792 : /* we can skip locking here, no other access is possible */
793 :
794 : /* recover individual states, until there are no more to be found */
795 : while (true)
796 31 : {
797 : ReplicationStateOnDisk disk_state;
798 :
799 1013 : readBytes = read(fd, &disk_state, sizeof(disk_state));
800 :
801 1013 : if (readBytes < 0)
802 : {
803 0 : ereport(PANIC,
804 : (errcode_for_file_access(),
805 : errmsg("could not read file \"%s\": %m",
806 : path)));
807 : }
808 :
809 : /* no further data */
810 1013 : if (readBytes == sizeof(crc))
811 : {
812 982 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
813 982 : break;
814 : }
815 :
816 31 : if (readBytes != sizeof(disk_state))
817 : {
818 0 : ereport(PANIC,
819 : (errcode_for_file_access(),
820 : errmsg("could not read file \"%s\": read %d of %zu",
821 : path, readBytes, sizeof(disk_state))));
822 : }
823 :
824 31 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
825 :
826 31 : if (last_state == max_active_replication_origins)
827 0 : ereport(PANIC,
828 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
829 : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
830 :
831 : /* copy data to shared memory */
832 31 : replication_states[last_state].roident = disk_state.roident;
833 31 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
834 31 : last_state++;
835 :
836 31 : ereport(LOG,
837 : errmsg("recovered replication state of node %d to %X/%08X",
838 : disk_state.roident,
839 : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
840 : }
841 :
842 : /* now check checksum */
843 982 : FIN_CRC32C(crc);
844 982 : if (file_crc != crc)
845 0 : ereport(PANIC,
846 : (errcode(ERRCODE_DATA_CORRUPTED),
847 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
848 : crc, file_crc)));
849 :
850 982 : if (CloseTransientFile(fd) != 0)
851 0 : ereport(PANIC,
852 : (errcode_for_file_access(),
853 : errmsg("could not close file \"%s\": %m",
854 : path)));
855 : }
856 :
857 : void
858 5 : replorigin_redo(XLogReaderState *record)
859 : {
860 5 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
861 :
862 5 : switch (info)
863 : {
864 2 : case XLOG_REPLORIGIN_SET:
865 : {
866 2 : xl_replorigin_set *xlrec =
867 2 : (xl_replorigin_set *) XLogRecGetData(record);
868 :
869 2 : replorigin_advance(xlrec->node_id,
870 : xlrec->remote_lsn, record->EndRecPtr,
871 2 : xlrec->force /* backward */ ,
872 : false /* WAL log */ );
873 2 : break;
874 : }
875 3 : case XLOG_REPLORIGIN_DROP:
876 : {
877 : xl_replorigin_drop *xlrec;
878 : int i;
879 :
880 3 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
881 :
882 14 : for (i = 0; i < max_active_replication_origins; i++)
883 : {
884 13 : ReplicationState *state = &replication_states[i];
885 :
886 : /* found our slot */
887 13 : if (state->roident == xlrec->node_id)
888 : {
889 : /* reset entry */
890 2 : state->roident = InvalidReplOriginId;
891 2 : state->remote_lsn = InvalidXLogRecPtr;
892 2 : state->local_lsn = InvalidXLogRecPtr;
893 2 : break;
894 : }
895 : }
896 3 : break;
897 : }
898 0 : default:
899 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
900 : }
901 5 : }
902 :
903 :
904 : /*
905 : * Tell the replication origin progress machinery that a commit from 'node'
906 : * that originated at the LSN remote_commit on the remote node was replayed
907 : * successfully and that we don't need to do so again. In combination with
908 : * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
909 : * that ensures we won't lose knowledge about that after a crash if the
910 : * transaction had a persistent effect (think of asynchronous commits).
911 : *
912 : * local_commit needs to be a local LSN of the commit so that we can make sure
913 : * upon a checkpoint that enough WAL has been persisted to disk.
914 : *
915 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
916 : * unless running in recovery.
917 : */
918 : void
919 258 : replorigin_advance(ReplOriginId node,
920 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
921 : bool go_backward, bool wal_log)
922 : {
923 : int i;
924 258 : ReplicationState *replication_state = NULL;
925 258 : ReplicationState *free_state = NULL;
926 :
927 : Assert(node != InvalidReplOriginId);
928 :
929 : /* we don't track DoNotReplicateId */
930 258 : if (node == DoNotReplicateId)
931 0 : return;
932 :
933 : /*
934 : * XXX: For the case where this is called by WAL replay, it'd be more
935 : * efficient to restore into a backend local hashtable and only dump into
936 : * shmem after recovery is finished. Let's wait with implementing that
937 : * till it's shown to be a measurable expense
938 : */
939 :
940 : /* Lock exclusively, as we may have to create a new table entry. */
941 258 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
942 :
943 : /*
944 : * Search for either an existing slot for the origin, or a free one we can
945 : * use.
946 : */
947 2365 : for (i = 0; i < max_active_replication_origins; i++)
948 : {
949 2156 : ReplicationState *curstate = &replication_states[i];
950 :
951 : /* remember where to insert if necessary */
952 2156 : if (curstate->roident == InvalidReplOriginId &&
953 : free_state == NULL)
954 : {
955 210 : free_state = curstate;
956 210 : continue;
957 : }
958 :
959 : /* not our slot */
960 1946 : if (curstate->roident != node)
961 : {
962 1897 : continue;
963 : }
964 :
965 : /* ok, found slot */
966 49 : replication_state = curstate;
967 :
968 49 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
969 :
970 : /* Make sure it's not used by somebody else */
971 49 : if (replication_state->refcount > 0)
972 : {
973 0 : ereport(ERROR,
974 : (errcode(ERRCODE_OBJECT_IN_USE),
975 : (replication_state->acquired_by != 0)
976 : ? errmsg("replication origin with ID %d is already active for PID %d",
977 : replication_state->roident,
978 : replication_state->acquired_by)
979 : : errmsg("replication origin with ID %d is already active in another process",
980 : replication_state->roident)));
981 : }
982 :
983 49 : break;
984 : }
985 :
986 258 : if (replication_state == NULL && free_state == NULL)
987 0 : ereport(ERROR,
988 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
989 : errmsg("could not find free replication state slot for replication origin with ID %d",
990 : node),
991 : errhint("Increase \"max_active_replication_origins\" and try again.")));
992 :
993 258 : if (replication_state == NULL)
994 : {
995 : /* initialize new slot */
996 209 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
997 209 : replication_state = free_state;
998 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
999 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
1000 209 : replication_state->roident = node;
1001 : }
1002 :
1003 : Assert(replication_state->roident != InvalidReplOriginId);
1004 :
1005 : /*
1006 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1007 : * and the standby gets the message. Primarily this will be called during
1008 : * WAL replay (of commit records) where no WAL logging is necessary.
1009 : */
1010 258 : if (wal_log)
1011 : {
1012 : xl_replorigin_set xlrec;
1013 :
1014 217 : xlrec.remote_lsn = remote_commit;
1015 217 : xlrec.node_id = node;
1016 217 : xlrec.force = go_backward;
1017 :
1018 217 : XLogBeginInsert();
1019 217 : XLogRegisterData(&xlrec, sizeof(xlrec));
1020 :
1021 217 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1022 : }
1023 :
1024 : /*
1025 : * Due to - harmless - race conditions during a checkpoint we could see
1026 : * values here that are older than the ones we already have in memory. We
1027 : * could also see older values for prepared transactions when the prepare
1028 : * is sent at a later point of time along with commit prepared and there
1029 : * are other transactions commits between prepare and commit prepared. See
1030 : * ReorderBufferFinishPrepared. Don't overwrite those.
1031 : */
1032 258 : if (go_backward || replication_state->remote_lsn < remote_commit)
1033 251 : replication_state->remote_lsn = remote_commit;
1034 258 : if (XLogRecPtrIsValid(local_commit) &&
1035 38 : (go_backward || replication_state->local_lsn < local_commit))
1036 40 : replication_state->local_lsn = local_commit;
1037 258 : LWLockRelease(&replication_state->lock);
1038 :
1039 : /*
1040 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1041 : * otherwise be dropped anytime.
1042 : */
1043 258 : LWLockRelease(ReplicationOriginLock);
1044 : }
1045 :
1046 :
1047 : XLogRecPtr
1048 9 : replorigin_get_progress(ReplOriginId node, bool flush)
1049 : {
1050 : int i;
1051 9 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1052 9 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1053 :
1054 : /* prevent slots from being concurrently dropped */
1055 9 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1056 :
1057 49 : for (i = 0; i < max_active_replication_origins; i++)
1058 : {
1059 : ReplicationState *state;
1060 :
1061 45 : state = &replication_states[i];
1062 :
1063 45 : if (state->roident == node)
1064 : {
1065 5 : LWLockAcquire(&state->lock, LW_SHARED);
1066 :
1067 5 : remote_lsn = state->remote_lsn;
1068 5 : local_lsn = state->local_lsn;
1069 :
1070 5 : LWLockRelease(&state->lock);
1071 :
1072 5 : break;
1073 : }
1074 : }
1075 :
1076 9 : LWLockRelease(ReplicationOriginLock);
1077 :
1078 9 : if (flush && XLogRecPtrIsValid(local_lsn))
1079 1 : XLogFlush(local_lsn);
1080 :
1081 9 : return remote_lsn;
1082 : }
1083 :
1084 : /* Helper function to reset the session replication origin */
1085 : static void
1086 530 : replorigin_session_reset_internal(void)
1087 : {
1088 : ConditionVariable *cv;
1089 :
1090 : Assert(session_replication_state != NULL);
1091 :
1092 530 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1093 :
1094 : /* The origin must be held by at least one process at this point. */
1095 : Assert(session_replication_state->refcount > 0);
1096 :
1097 : /*
1098 : * Reset the PID only if the current session is the first to set up this
1099 : * origin. This avoids clearing the first process's PID when any other
1100 : * session releases the origin.
1101 : */
1102 530 : if (session_replication_state->acquired_by == MyProcPid)
1103 516 : session_replication_state->acquired_by = 0;
1104 :
1105 530 : session_replication_state->refcount--;
1106 :
1107 530 : cv = &session_replication_state->origin_cv;
1108 530 : session_replication_state = NULL;
1109 :
1110 530 : LWLockRelease(ReplicationOriginLock);
1111 :
1112 530 : ConditionVariableBroadcast(cv);
1113 530 : }
1114 :
1115 : /*
1116 : * Tear down a (possibly) configured session replication origin during process
1117 : * exit.
1118 : */
1119 : static void
1120 526 : ReplicationOriginExitCleanup(int code, Datum arg)
1121 : {
1122 526 : if (session_replication_state == NULL)
1123 202 : return;
1124 :
1125 324 : replorigin_session_reset_internal();
1126 : }
1127 :
1128 : /*
1129 : * Setup a replication origin in the shared memory struct if it doesn't
1130 : * already exist and cache access to the specific ReplicationSlot so the
1131 : * array doesn't have to be searched when calling
1132 : * replorigin_session_advance().
1133 : *
1134 : * Normally only one such cached origin can exist per process so the cached
1135 : * value can only be set again after the previous value is torn down with
1136 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1137 : * (meaning the slot is not allowed to be already acquired by another process).
1138 : *
1139 : * However, sometimes multiple processes can safely re-use the same origin slot
1140 : * (for example, multiple parallel apply processes can safely use the same
1141 : * origin, provided they maintain commit order by allowing only one process to
1142 : * commit at a time). For this case the first process must pass acquired_by =
1143 : * 0, and then the other processes sharing that same origin can pass
1144 : * acquired_by = PID of the first process.
1145 : */
1146 : void
1147 532 : replorigin_session_setup(ReplOriginId node, int acquired_by)
1148 : {
1149 : static bool registered_cleanup;
1150 : int i;
1151 532 : int free_slot = -1;
1152 :
1153 532 : if (!registered_cleanup)
1154 : {
1155 526 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1156 526 : registered_cleanup = true;
1157 : }
1158 :
1159 : Assert(max_active_replication_origins > 0);
1160 :
1161 532 : if (session_replication_state != NULL)
1162 1 : ereport(ERROR,
1163 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1164 : errmsg("cannot setup replication origin when one is already setup")));
1165 :
1166 : /* Lock exclusively, as we may have to create a new table entry. */
1167 531 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1168 :
1169 : /*
1170 : * Search for either an existing slot for the origin, or a free one we can
1171 : * use.
1172 : */
1173 2093 : for (i = 0; i < max_active_replication_origins; i++)
1174 : {
1175 1967 : ReplicationState *curstate = &replication_states[i];
1176 :
1177 : /* remember where to insert if necessary */
1178 1967 : if (curstate->roident == InvalidReplOriginId &&
1179 : free_slot == -1)
1180 : {
1181 128 : free_slot = i;
1182 128 : continue;
1183 : }
1184 :
1185 : /* not our slot */
1186 1839 : if (curstate->roident != node)
1187 1434 : continue;
1188 :
1189 405 : if (acquired_by == 0)
1190 : {
1191 : /* With acquired_by == 0, we need the origin to be free */
1192 391 : if (curstate->acquired_by != 0)
1193 : {
1194 0 : ereport(ERROR,
1195 : (errcode(ERRCODE_OBJECT_IN_USE),
1196 : errmsg("replication origin with ID %d is already active for PID %d",
1197 : curstate->roident, curstate->acquired_by)));
1198 : }
1199 391 : else if (curstate->refcount > 0)
1200 : {
1201 : /*
1202 : * The origin is in use, but PID is not recorded. This can
1203 : * happen if the process that originally acquired the origin
1204 : * exited without releasing it. To ensure correctness, other
1205 : * processes cannot acquire the origin until all processes
1206 : * currently using it have released it.
1207 : */
1208 0 : ereport(ERROR,
1209 : (errcode(ERRCODE_OBJECT_IN_USE),
1210 : errmsg("replication origin with ID %d is already active in another process",
1211 : curstate->roident)));
1212 : }
1213 : }
1214 : else
1215 : {
1216 : /*
1217 : * With acquired_by != 0, we need the origin to be active by the
1218 : * given PID
1219 : */
1220 14 : if (curstate->acquired_by != acquired_by)
1221 0 : ereport(ERROR,
1222 : (errcode(ERRCODE_OBJECT_IN_USE),
1223 : errmsg("replication origin with ID %d is not active for PID %d",
1224 : curstate->roident, acquired_by)));
1225 :
1226 : /*
1227 : * Here, it is okay to have refcount > 0 as more than one process
1228 : * can safely re-use the origin.
1229 : */
1230 : }
1231 :
1232 : /* ok, found slot */
1233 405 : session_replication_state = curstate;
1234 405 : break;
1235 : }
1236 :
1237 531 : if (session_replication_state == NULL)
1238 : {
1239 126 : if (acquired_by != 0)
1240 1 : ereport(ERROR,
1241 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1242 : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1243 : acquired_by, node)));
1244 :
1245 : /* initialize new slot */
1246 125 : if (free_slot == -1)
1247 0 : ereport(ERROR,
1248 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1249 : errmsg("could not find free replication state slot for replication origin with ID %d",
1250 : node),
1251 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1252 :
1253 125 : session_replication_state = &replication_states[free_slot];
1254 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1255 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
1256 125 : session_replication_state->roident = node;
1257 : }
1258 :
1259 :
1260 : Assert(session_replication_state->roident != InvalidReplOriginId);
1261 :
1262 530 : if (acquired_by == 0)
1263 : {
1264 516 : session_replication_state->acquired_by = MyProcPid;
1265 : Assert(session_replication_state->refcount == 0);
1266 : }
1267 : else
1268 : {
1269 : /*
1270 : * Sanity check: the origin must already be acquired by the process
1271 : * passed as input, and at least one process must be using it.
1272 : */
1273 : Assert(session_replication_state->acquired_by == acquired_by);
1274 : Assert(session_replication_state->refcount > 0);
1275 : }
1276 :
1277 530 : session_replication_state->refcount++;
1278 :
1279 530 : LWLockRelease(ReplicationOriginLock);
1280 :
1281 : /* probably this one is pointless */
1282 530 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1283 530 : }
1284 :
1285 : /*
1286 : * Reset replay state previously setup in this session.
1287 : *
1288 : * This function may only be called if an origin was setup with
1289 : * replorigin_session_setup().
1290 : */
1291 : void
1292 208 : replorigin_session_reset(void)
1293 : {
1294 : Assert(max_active_replication_origins != 0);
1295 :
1296 208 : if (session_replication_state == NULL)
1297 1 : ereport(ERROR,
1298 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1299 : errmsg("no replication origin is configured")));
1300 :
1301 : /*
1302 : * Restrict explicit resetting of the replication origin if it was first
1303 : * acquired by this process and others are still using it. While the
1304 : * system handles this safely (as happens if the first session exits
1305 : * without calling reset), it is best to avoid doing so.
1306 : */
1307 207 : if (session_replication_state->acquired_by == MyProcPid &&
1308 205 : session_replication_state->refcount > 1)
1309 1 : ereport(ERROR,
1310 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1311 : errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1312 : session_replication_state->roident),
1313 : errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1314 : errhint("Reset the replication origin in all other processes before retrying.")));
1315 :
1316 206 : replorigin_session_reset_internal();
1317 206 : }
1318 :
1319 : /*
1320 : * Do the same work replorigin_advance() does, just on the session's
1321 : * configured origin.
1322 : *
1323 : * This is noticeably cheaper than using replorigin_advance().
1324 : */
1325 : void
1326 1145 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1327 : {
1328 : Assert(session_replication_state != NULL);
1329 : Assert(session_replication_state->roident != InvalidReplOriginId);
1330 :
1331 1145 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1332 1145 : if (session_replication_state->local_lsn < local_commit)
1333 1145 : session_replication_state->local_lsn = local_commit;
1334 1145 : if (session_replication_state->remote_lsn < remote_commit)
1335 525 : session_replication_state->remote_lsn = remote_commit;
1336 1145 : LWLockRelease(&session_replication_state->lock);
1337 1145 : }
1338 :
1339 : /*
1340 : * Ask the machinery about the point up to which we successfully replayed
1341 : * changes from an already setup replication origin.
1342 : */
1343 : XLogRecPtr
1344 297 : replorigin_session_get_progress(bool flush)
1345 : {
1346 : XLogRecPtr remote_lsn;
1347 : XLogRecPtr local_lsn;
1348 :
1349 : Assert(session_replication_state != NULL);
1350 :
1351 297 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1352 297 : remote_lsn = session_replication_state->remote_lsn;
1353 297 : local_lsn = session_replication_state->local_lsn;
1354 297 : LWLockRelease(&session_replication_state->lock);
1355 :
1356 297 : if (flush && XLogRecPtrIsValid(local_lsn))
1357 1 : XLogFlush(local_lsn);
1358 :
1359 297 : return remote_lsn;
1360 : }
1361 :
1362 : /*
1363 : * Clear the per-transaction replication origin state.
1364 : *
1365 : * replorigin_session_origin is also cleared if clear_origin is set.
1366 : */
1367 : void
1368 830 : replorigin_xact_clear(bool clear_origin)
1369 : {
1370 830 : replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
1371 830 : replorigin_xact_state.origin_timestamp = 0;
1372 830 : if (clear_origin)
1373 830 : replorigin_xact_state.origin = InvalidReplOriginId;
1374 830 : }
1375 :
1376 :
1377 : /* ---------------------------------------------------------------------------
1378 : * SQL functions for working with replication origin.
1379 : *
1380 : * These mostly should be fairly short wrappers around more generic functions.
1381 : * ---------------------------------------------------------------------------
1382 : */
1383 :
1384 : /*
1385 : * Create replication origin for the passed in name, and return the assigned
1386 : * oid.
1387 : */
1388 : Datum
1389 15 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1390 : {
1391 : char *name;
1392 : ReplOriginId roident;
1393 :
1394 15 : replorigin_check_prerequisites(false, false);
1395 :
1396 15 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1397 :
1398 : /*
1399 : * Replication origins "any and "none" are reserved for system options.
1400 : * The origins "pg_xxx" are reserved for internal use.
1401 : */
1402 15 : if (IsReservedName(name) || IsReservedOriginName(name))
1403 3 : ereport(ERROR,
1404 : (errcode(ERRCODE_RESERVED_NAME),
1405 : errmsg("replication origin name \"%s\" is reserved",
1406 : name),
1407 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1408 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1409 :
1410 : /*
1411 : * If built with appropriate switch, whine when regression-testing
1412 : * conventions for replication origin names are violated.
1413 : */
1414 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1415 : if (strncmp(name, "regress_", 8) != 0)
1416 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1417 : #endif
1418 :
1419 12 : roident = replorigin_create(name);
1420 :
1421 7 : pfree(name);
1422 :
1423 7 : PG_RETURN_OID(roident);
1424 : }
1425 :
1426 : /*
1427 : * Drop replication origin.
1428 : */
1429 : Datum
1430 9 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1431 : {
1432 : char *name;
1433 :
1434 9 : replorigin_check_prerequisites(false, false);
1435 :
1436 9 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1437 :
1438 9 : replorigin_drop_by_name(name, false, true);
1439 :
1440 8 : pfree(name);
1441 :
1442 8 : PG_RETURN_VOID();
1443 : }
1444 :
1445 : /*
1446 : * Return oid of a replication origin.
1447 : */
1448 : Datum
1449 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1450 : {
1451 : char *name;
1452 : ReplOriginId roident;
1453 :
1454 0 : replorigin_check_prerequisites(false, false);
1455 :
1456 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1457 0 : roident = replorigin_by_name(name, true);
1458 :
1459 0 : pfree(name);
1460 :
1461 0 : if (OidIsValid(roident))
1462 0 : PG_RETURN_OID(roident);
1463 0 : PG_RETURN_NULL();
1464 : }
1465 :
1466 : /*
1467 : * Setup a replication origin for this session.
1468 : */
1469 : Datum
1470 11 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1471 : {
1472 : char *name;
1473 : ReplOriginId origin;
1474 : int pid;
1475 :
1476 11 : replorigin_check_prerequisites(true, false);
1477 :
1478 11 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1479 11 : origin = replorigin_by_name(name, false);
1480 10 : pid = PG_GETARG_INT32(1);
1481 10 : replorigin_session_setup(origin, pid);
1482 :
1483 8 : replorigin_xact_state.origin = origin;
1484 :
1485 8 : pfree(name);
1486 :
1487 8 : PG_RETURN_VOID();
1488 : }
1489 :
1490 : /*
1491 : * Reset previously setup origin in this session
1492 : */
1493 : Datum
1494 10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1495 : {
1496 10 : replorigin_check_prerequisites(true, false);
1497 :
1498 10 : replorigin_session_reset();
1499 :
1500 8 : replorigin_xact_clear(true);
1501 :
1502 8 : PG_RETURN_VOID();
1503 : }
1504 :
1505 : /*
1506 : * Has a replication origin been setup for this session.
1507 : */
1508 : Datum
1509 4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1510 : {
1511 4 : replorigin_check_prerequisites(false, false);
1512 :
1513 4 : PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
1514 : }
1515 :
1516 :
1517 : /*
1518 : * Return the replication progress for origin setup in the current session.
1519 : *
1520 : * If 'flush' is set to true it is ensured that the returned value corresponds
1521 : * to a local transaction that has been flushed. This is useful if asynchronous
1522 : * commits are used when replaying replicated transactions.
1523 : */
1524 : Datum
1525 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1526 : {
1527 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1528 2 : bool flush = PG_GETARG_BOOL(0);
1529 :
1530 2 : replorigin_check_prerequisites(true, false);
1531 :
1532 2 : if (session_replication_state == NULL)
1533 0 : ereport(ERROR,
1534 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1535 : errmsg("no replication origin is configured")));
1536 :
1537 2 : remote_lsn = replorigin_session_get_progress(flush);
1538 :
1539 2 : if (!XLogRecPtrIsValid(remote_lsn))
1540 0 : PG_RETURN_NULL();
1541 :
1542 2 : PG_RETURN_LSN(remote_lsn);
1543 : }
1544 :
1545 : Datum
1546 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1547 : {
1548 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1549 :
1550 1 : replorigin_check_prerequisites(true, false);
1551 :
1552 1 : if (session_replication_state == NULL)
1553 0 : ereport(ERROR,
1554 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1555 : errmsg("no replication origin is configured")));
1556 :
1557 1 : replorigin_xact_state.origin_lsn = location;
1558 1 : replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1559 :
1560 1 : PG_RETURN_VOID();
1561 : }
1562 :
1563 : Datum
1564 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1565 : {
1566 0 : replorigin_check_prerequisites(true, false);
1567 :
1568 : /* Do not clear the session origin */
1569 0 : replorigin_xact_clear(false);
1570 :
1571 0 : PG_RETURN_VOID();
1572 : }
1573 :
1574 :
1575 : Datum
1576 3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1577 : {
1578 3 : text *name = PG_GETARG_TEXT_PP(0);
1579 3 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1580 : ReplOriginId node;
1581 :
1582 3 : replorigin_check_prerequisites(true, false);
1583 :
1584 : /* lock to prevent the replication origin from vanishing */
1585 3 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1586 :
1587 3 : node = replorigin_by_name(text_to_cstring(name), false);
1588 :
1589 : /*
1590 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1591 : * xact hasn't committed yet. This is why this function should be used to
1592 : * set up the initial replication state, but not for replay.
1593 : */
1594 2 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1595 : true /* go backward */ , true /* WAL log */ );
1596 :
1597 2 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1598 :
1599 2 : PG_RETURN_VOID();
1600 : }
1601 :
1602 :
1603 : /*
1604 : * Return the replication progress for an individual replication origin.
1605 : *
1606 : * If 'flush' is set to true it is ensured that the returned value corresponds
1607 : * to a local transaction that has been flushed. This is useful if asynchronous
1608 : * commits are used when replaying replicated transactions.
1609 : */
1610 : Datum
1611 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1612 : {
1613 : char *name;
1614 : bool flush;
1615 : ReplOriginId roident;
1616 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1617 :
1618 3 : replorigin_check_prerequisites(true, true);
1619 :
1620 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1621 3 : flush = PG_GETARG_BOOL(1);
1622 :
1623 3 : roident = replorigin_by_name(name, false);
1624 : Assert(OidIsValid(roident));
1625 :
1626 2 : remote_lsn = replorigin_get_progress(roident, flush);
1627 :
1628 2 : if (!XLogRecPtrIsValid(remote_lsn))
1629 0 : PG_RETURN_NULL();
1630 :
1631 2 : PG_RETURN_LSN(remote_lsn);
1632 : }
1633 :
1634 :
1635 : Datum
1636 11 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1637 : {
1638 11 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1639 : int i;
1640 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1641 :
1642 : /* we want to return 0 rows if slot is set to zero */
1643 11 : replorigin_check_prerequisites(false, true);
1644 :
1645 11 : InitMaterializedSRF(fcinfo, 0);
1646 :
1647 : /* prevent slots from being concurrently dropped */
1648 11 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1649 :
1650 : /*
1651 : * Iterate through all possible replication_states, display if they are
1652 : * filled. Note that we do not take any locks, so slightly corrupted/out
1653 : * of date values are a possibility.
1654 : */
1655 121 : for (i = 0; i < max_active_replication_origins; i++)
1656 : {
1657 : ReplicationState *state;
1658 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1659 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1660 : char *roname;
1661 :
1662 110 : state = &replication_states[i];
1663 :
1664 : /* unused slot, nothing to display */
1665 110 : if (state->roident == InvalidReplOriginId)
1666 97 : continue;
1667 :
1668 13 : memset(values, 0, sizeof(values));
1669 13 : memset(nulls, 1, sizeof(nulls));
1670 :
1671 13 : values[0] = ObjectIdGetDatum(state->roident);
1672 13 : nulls[0] = false;
1673 :
1674 : /*
1675 : * We're not preventing the origin to be dropped concurrently, so
1676 : * silently accept that it might be gone.
1677 : */
1678 13 : if (replorigin_by_oid(state->roident, true,
1679 : &roname))
1680 : {
1681 13 : values[1] = CStringGetTextDatum(roname);
1682 13 : nulls[1] = false;
1683 : }
1684 :
1685 13 : LWLockAcquire(&state->lock, LW_SHARED);
1686 :
1687 13 : values[2] = LSNGetDatum(state->remote_lsn);
1688 13 : nulls[2] = false;
1689 :
1690 13 : values[3] = LSNGetDatum(state->local_lsn);
1691 13 : nulls[3] = false;
1692 :
1693 13 : LWLockRelease(&state->lock);
1694 :
1695 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1696 : values, nulls);
1697 : }
1698 :
1699 11 : LWLockRelease(ReplicationOriginLock);
1700 :
1701 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1702 :
1703 11 : return (Datum) 0;
1704 : }
|