Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * origin.c
4 : * Logical replication progress tracking support.
5 : *
6 : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/origin.c
10 : *
11 : * NOTES
12 : *
13 : * This file provides the following:
14 : * * An infrastructure to name nodes in a replication setup
15 : * * A facility to efficiently store and persist replication progress in an
16 : * efficient and durable manner.
17 : *
18 : * Replication origin consists of a descriptive, user defined, external
19 : * name and a short, thus space efficient, internal 2 byte one. This split
20 : * exists because replication origin have to be stored in WAL and shared
21 : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : * for the internal id of a replication origin as it seems unlikely that there
23 : * soon will be more than 65k nodes in one replication setup; and using only
24 : * two bytes allow us to be more space efficient.
25 : *
26 : * Replication progress is tracked in a shared memory table
27 : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : * ('slots') in this table are identified by the internal id. That's the case
29 : * because it allows to increase replication progress during crash
30 : * recovery. To allow doing so we store the original LSN (from the originating
31 : * system) of a transaction in the commit record. That allows to recover the
32 : * precise replayed state after crash recovery; without requiring synchronous
33 : * commits. Allowing logical replication to use asynchronous commit is
34 : * generally good for performance, but especially important as it allows a
35 : * single threaded replay process to keep up with a source that has multiple
36 : * backends generating changes concurrently. For efficiency and simplicity
37 : * reasons a backend can setup one replication origin that's from then used as
38 : * the source of changes produced by the backend, until reset again.
39 : *
40 : * This infrastructure is intended to be used in cooperation with logical
41 : * decoding. When replaying from a remote system the configured origin is
42 : * provided to output plugins, allowing prevention of replication loops and
43 : * other filtering.
44 : *
45 : * There are several levels of locking at work:
46 : *
47 : * * To create and drop replication origins an exclusive lock on
48 : * pg_replication_slot is required for the duration. That allows us to
49 : * safely and conflict free assign new origins using a dirty snapshot.
50 : *
51 : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : * LWLock has to be held exclusively; when iterating over the replication
53 : * progress a shared lock has to be held, the same when advancing the
54 : * replication progress of an individual backend that has not setup as the
55 : * session's replication origin.
56 : *
57 : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : * replication progress slot that slot's lwlock has to be held. That's
59 : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : * all our platforms, but it also simplifies memory ordering concerns
61 : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : * so it's less harmful to hold the lock over a WAL write
63 : * (cf. AdvanceReplicationProgress).
64 : *
65 : * ---------------------------------------------------------------------------
66 : */
67 :
68 : #include "postgres.h"
69 :
70 : #include <unistd.h>
71 : #include <sys/stat.h>
72 :
73 : #include "access/genam.h"
74 : #include "access/htup_details.h"
75 : #include "access/table.h"
76 : #include "access/xact.h"
77 : #include "access/xloginsert.h"
78 : #include "catalog/catalog.h"
79 : #include "catalog/indexing.h"
80 : #include "catalog/pg_subscription.h"
81 : #include "funcapi.h"
82 : #include "miscadmin.h"
83 : #include "nodes/execnodes.h"
84 : #include "pgstat.h"
85 : #include "replication/origin.h"
86 : #include "replication/slot.h"
87 : #include "storage/condition_variable.h"
88 : #include "storage/fd.h"
89 : #include "storage/ipc.h"
90 : #include "storage/lmgr.h"
91 : #include "storage/subsystems.h"
92 : #include "utils/builtins.h"
93 : #include "utils/fmgroids.h"
94 : #include "utils/guc.h"
95 : #include "utils/pg_lsn.h"
96 : #include "utils/rel.h"
97 : #include "utils/snapmgr.h"
98 : #include "utils/syscache.h"
99 : #include "utils/wait_event.h"
100 :
101 : /* paths for replication origin checkpoint files */
102 : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
103 : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
104 :
105 : /* GUC variables */
106 : int max_active_replication_origins = 10;
107 :
108 : /*
109 : * Replay progress of a single remote node.
110 : */
111 : typedef struct ReplicationState
112 : {
113 : /*
114 : * Local identifier for the remote node.
115 : */
116 : ReplOriginId roident;
117 :
118 : /*
119 : * Location of the latest commit from the remote side.
120 : */
121 : XLogRecPtr remote_lsn;
122 :
123 : /*
124 : * Remember the local lsn of the commit record so we can XLogFlush() to it
125 : * during a checkpoint so we know the commit record actually is safe on
126 : * disk.
127 : */
128 : XLogRecPtr local_lsn;
129 :
130 : /*
131 : * PID of backend that's acquired slot, or 0 if none.
132 : */
133 : int acquired_by;
134 :
135 : /* Count of processes that are currently using this origin. */
136 : int refcount;
137 :
138 : /*
139 : * Condition variable that's signaled when acquired_by changes.
140 : */
141 : ConditionVariable origin_cv;
142 :
143 : /*
144 : * Lock protecting remote_lsn and local_lsn.
145 : */
146 : LWLock lock;
147 : } ReplicationState;
148 :
149 : /*
150 : * On disk version of ReplicationState.
151 : */
152 : typedef struct ReplicationStateOnDisk
153 : {
154 : ReplOriginId roident;
155 : XLogRecPtr remote_lsn;
156 : } ReplicationStateOnDisk;
157 :
158 :
159 : typedef struct ReplicationStateCtl
160 : {
161 : /* Tranche to use for per-origin LWLocks */
162 : int tranche_id;
163 : /* Array of length max_active_replication_origins */
164 : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
165 : } ReplicationStateCtl;
166 :
167 : /* Global variable for per-transaction replication origin state */
168 : ReplOriginXactState replorigin_xact_state = {
169 : .origin = InvalidReplOriginId, /* assumed identity */
170 : .origin_lsn = InvalidXLogRecPtr,
171 : .origin_timestamp = 0
172 : };
173 :
174 : /*
175 : * Base address into a shared memory array of replication states of size
176 : * max_active_replication_origins.
177 : */
178 : static ReplicationState *replication_states;
179 :
180 : static void ReplicationOriginShmemRequest(void *arg);
181 : static void ReplicationOriginShmemInit(void *arg);
182 : static void ReplicationOriginShmemAttach(void *arg);
183 :
184 : const ShmemCallbacks ReplicationOriginShmemCallbacks = {
185 : .request_fn = ReplicationOriginShmemRequest,
186 : .init_fn = ReplicationOriginShmemInit,
187 : .attach_fn = ReplicationOriginShmemAttach,
188 : };
189 :
190 : /*
191 : * Actual shared memory block (replication_states[] is now part of this).
192 : */
193 : static ReplicationStateCtl *replication_states_ctl;
194 :
195 : /*
196 : * We keep a pointer to this backend's ReplicationState to avoid having to
197 : * search the replication_states array in replorigin_session_advance for each
198 : * remote commit. (Ownership of a backend's own entry can only be changed by
199 : * that backend.)
200 : */
201 : static ReplicationState *session_replication_state = NULL;
202 :
203 : /* Magic for on disk files. */
204 : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
205 :
206 : static void
207 69 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
208 : {
209 69 : if (check_origins && max_active_replication_origins == 0)
210 0 : ereport(ERROR,
211 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
212 : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
213 :
214 69 : if (!recoveryOK && RecoveryInProgress())
215 0 : ereport(ERROR,
216 : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
217 : errmsg("cannot manipulate replication origins during recovery")));
218 69 : }
219 :
220 :
221 : /*
222 : * IsReservedOriginName
223 : * True iff name is either "none" or "any".
224 : */
225 : static bool
226 14 : IsReservedOriginName(const char *name)
227 : {
228 27 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
229 13 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
230 : }
231 :
232 : /* ---------------------------------------------------------------------------
233 : * Functions for working with replication origins themselves.
234 : * ---------------------------------------------------------------------------
235 : */
236 :
237 : /*
238 : * Check for a persistent replication origin identified by name.
239 : *
240 : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
241 : */
242 : ReplOriginId
243 1116 : replorigin_by_name(const char *roname, bool missing_ok)
244 : {
245 : Form_pg_replication_origin ident;
246 1116 : Oid roident = InvalidOid;
247 : HeapTuple tuple;
248 : Datum roname_d;
249 :
250 1116 : roname_d = CStringGetTextDatum(roname);
251 :
252 1116 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
253 1116 : if (HeapTupleIsValid(tuple))
254 : {
255 710 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
256 710 : roident = ident->roident;
257 710 : ReleaseSysCache(tuple);
258 : }
259 406 : else if (!missing_ok)
260 4 : ereport(ERROR,
261 : (errcode(ERRCODE_UNDEFINED_OBJECT),
262 : errmsg("replication origin \"%s\" does not exist",
263 : roname)));
264 :
265 1112 : return roident;
266 : }
267 :
268 : /*
269 : * Create a replication origin.
270 : *
271 : * Needs to be called in a transaction.
272 : */
273 : ReplOriginId
274 411 : replorigin_create(const char *roname)
275 : {
276 : Oid roident;
277 411 : HeapTuple tuple = NULL;
278 : Relation rel;
279 : Datum roname_d;
280 : SnapshotData SnapshotDirty;
281 : SysScanDesc scan;
282 : ScanKeyData key;
283 :
284 : /*
285 : * To avoid needing a TOAST table for pg_replication_origin, we limit
286 : * replication origin names to 512 bytes. This should be more than enough
287 : * for all practical use.
288 : */
289 411 : if (strlen(roname) > MAX_RONAME_LEN)
290 4 : ereport(ERROR,
291 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
292 : errmsg("replication origin name is too long"),
293 : errdetail("Replication origin names must be no longer than %d bytes.",
294 : MAX_RONAME_LEN)));
295 :
296 407 : roname_d = CStringGetTextDatum(roname);
297 :
298 : Assert(IsTransactionState());
299 :
300 : /*
301 : * We need the numeric replication origin to be 16bit wide, so we cannot
302 : * rely on the normal oid allocation. Instead we simply scan
303 : * pg_replication_origin for the first unused id. That's not particularly
304 : * efficient, but this should be a fairly infrequent operation - we can
305 : * easily spend a bit more code on this when it turns out it needs to be
306 : * faster.
307 : *
308 : * We handle concurrency by taking an exclusive lock (allowing reads!)
309 : * over the table for the duration of the search. Because we use a "dirty
310 : * snapshot" we can read rows that other in-progress sessions have
311 : * written, even though they would be invisible with normal snapshots. Due
312 : * to the exclusive lock there's no danger that new rows can appear while
313 : * we're checking.
314 : */
315 407 : InitDirtySnapshot(SnapshotDirty);
316 :
317 407 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
318 :
319 : /*
320 : * We want to be able to access pg_replication_origin without setting up a
321 : * snapshot. To make that safe, it needs to not have a TOAST table, since
322 : * TOASTed data cannot be fetched without a snapshot. As of this writing,
323 : * its only varlena column is roname, which we limit to 512 bytes to avoid
324 : * needing out-of-line storage. If you add a TOAST table to this catalog,
325 : * be sure to set up a snapshot everywhere it might be needed. For more
326 : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
327 : */
328 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
329 :
330 718 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
331 : {
332 : bool nulls[Natts_pg_replication_origin];
333 : Datum values[Natts_pg_replication_origin];
334 : bool collides;
335 :
336 718 : CHECK_FOR_INTERRUPTS();
337 :
338 718 : ScanKeyInit(&key,
339 : Anum_pg_replication_origin_roident,
340 : BTEqualStrategyNumber, F_OIDEQ,
341 : ObjectIdGetDatum(roident));
342 :
343 718 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
344 : true /* indexOK */ ,
345 : &SnapshotDirty,
346 : 1, &key);
347 :
348 718 : collides = HeapTupleIsValid(systable_getnext(scan));
349 :
350 718 : systable_endscan(scan);
351 :
352 718 : if (!collides)
353 : {
354 : /*
355 : * Ok, found an unused roident, insert the new row and do a CCI,
356 : * so our callers can look it up if they want to.
357 : */
358 407 : memset(&nulls, 0, sizeof(nulls));
359 :
360 407 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
361 407 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
362 :
363 407 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
364 407 : CatalogTupleInsert(rel, tuple);
365 406 : CommandCounterIncrement();
366 406 : break;
367 : }
368 : }
369 :
370 : /* now release lock again, */
371 406 : table_close(rel, ExclusiveLock);
372 :
373 406 : if (tuple == NULL)
374 0 : ereport(ERROR,
375 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
376 : errmsg("could not find free replication origin ID")));
377 :
378 406 : heap_freetuple(tuple);
379 406 : return roident;
380 : }
381 :
382 : /*
383 : * Helper function to drop a replication origin.
384 : */
385 : static void
386 353 : replorigin_state_clear(ReplOriginId roident, bool nowait)
387 : {
388 : int i;
389 :
390 : /*
391 : * Clean up the slot state info, if there is any matching slot.
392 : */
393 353 : restart:
394 353 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
395 :
396 1303 : for (i = 0; i < max_active_replication_origins; i++)
397 : {
398 1235 : ReplicationState *state = &replication_states[i];
399 :
400 1235 : if (state->roident == roident)
401 : {
402 : /* found our slot, is it busy? */
403 285 : if (state->refcount > 0)
404 : {
405 : ConditionVariable *cv;
406 :
407 0 : if (nowait)
408 0 : ereport(ERROR,
409 : (errcode(ERRCODE_OBJECT_IN_USE),
410 : (state->acquired_by != 0)
411 : ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
412 : state->roident,
413 : state->acquired_by)
414 : : errmsg("could not drop replication origin with ID %d, in use by another process",
415 : state->roident)));
416 :
417 : /*
418 : * We must wait and then retry. Since we don't know which CV
419 : * to wait on until here, we can't readily use
420 : * ConditionVariablePrepareToSleep (calling it here would be
421 : * wrong, since we could miss the signal if we did so); just
422 : * use ConditionVariableSleep directly.
423 : */
424 0 : cv = &state->origin_cv;
425 :
426 0 : LWLockRelease(ReplicationOriginLock);
427 :
428 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
429 0 : goto restart;
430 : }
431 :
432 : /* first make a WAL log entry */
433 : {
434 : xl_replorigin_drop xlrec;
435 :
436 285 : xlrec.node_id = roident;
437 285 : XLogBeginInsert();
438 285 : XLogRegisterData(&xlrec, sizeof(xlrec));
439 285 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
440 : }
441 :
442 : /* then clear the in-memory slot */
443 285 : state->roident = InvalidReplOriginId;
444 285 : state->remote_lsn = InvalidXLogRecPtr;
445 285 : state->local_lsn = InvalidXLogRecPtr;
446 285 : break;
447 : }
448 : }
449 353 : LWLockRelease(ReplicationOriginLock);
450 353 : ConditionVariableCancelSleep();
451 353 : }
452 :
453 : /*
454 : * Drop replication origin (by name).
455 : *
456 : * Needs to be called in a transaction.
457 : */
458 : void
459 552 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
460 : {
461 : ReplOriginId roident;
462 : Relation rel;
463 : HeapTuple tuple;
464 :
465 : Assert(IsTransactionState());
466 :
467 552 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
468 :
469 552 : roident = replorigin_by_name(name, missing_ok);
470 :
471 : /* Lock the origin to prevent concurrent drops. */
472 551 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
473 : AccessExclusiveLock);
474 :
475 551 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
476 551 : if (!HeapTupleIsValid(tuple))
477 : {
478 198 : if (!missing_ok)
479 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
480 : roident);
481 :
482 : /*
483 : * We don't need to retain the locks if the origin is already dropped.
484 : */
485 198 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
486 : AccessExclusiveLock);
487 198 : table_close(rel, RowExclusiveLock);
488 198 : return;
489 : }
490 :
491 353 : replorigin_state_clear(roident, nowait);
492 :
493 : /*
494 : * Now, we can delete the catalog entry.
495 : */
496 353 : CatalogTupleDelete(rel, &tuple->t_self);
497 353 : ReleaseSysCache(tuple);
498 :
499 353 : CommandCounterIncrement();
500 :
501 : /* We keep the lock on pg_replication_origin until commit */
502 353 : table_close(rel, NoLock);
503 : }
504 :
505 : /*
506 : * Lookup replication origin via its oid and return the name.
507 : *
508 : * The external name is palloc'd in the calling context.
509 : *
510 : * Returns true if the origin is known, false otherwise.
511 : */
512 : bool
513 27 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
514 : {
515 : HeapTuple tuple;
516 : Form_pg_replication_origin ric;
517 :
518 : Assert(OidIsValid((Oid) roident));
519 : Assert(roident != InvalidReplOriginId);
520 : Assert(roident != DoNotReplicateId);
521 :
522 27 : tuple = SearchSysCache1(REPLORIGIDENT,
523 : ObjectIdGetDatum((Oid) roident));
524 :
525 27 : if (HeapTupleIsValid(tuple))
526 : {
527 24 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
528 24 : *roname = text_to_cstring(&ric->roname);
529 24 : ReleaseSysCache(tuple);
530 :
531 24 : return true;
532 : }
533 : else
534 : {
535 3 : *roname = NULL;
536 :
537 3 : if (!missing_ok)
538 0 : ereport(ERROR,
539 : (errcode(ERRCODE_UNDEFINED_OBJECT),
540 : errmsg("replication origin with ID %d does not exist",
541 : roident)));
542 :
543 3 : return false;
544 : }
545 : }
546 :
547 :
548 : /* ---------------------------------------------------------------------------
549 : * Functions for handling replication progress.
550 : * ---------------------------------------------------------------------------
551 : */
552 :
553 : static void
554 1234 : ReplicationOriginShmemRequest(void *arg)
555 : {
556 1234 : Size size = 0;
557 :
558 1234 : if (max_active_replication_origins == 0)
559 1 : return;
560 :
561 1233 : size = add_size(size, offsetof(ReplicationStateCtl, states));
562 1233 : size = add_size(size,
563 : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
564 1233 : ShmemRequestStruct(.name = "ReplicationOriginState",
565 : .size = size,
566 : .ptr = (void **) &replication_states_ctl,
567 : );
568 : }
569 :
570 : static void
571 1231 : ReplicationOriginShmemInit(void *arg)
572 : {
573 1231 : if (max_active_replication_origins == 0)
574 1 : return;
575 :
576 1230 : replication_states = replication_states_ctl->states;
577 :
578 1230 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
579 :
580 13521 : for (int i = 0; i < max_active_replication_origins; i++)
581 : {
582 12291 : LWLockInitialize(&replication_states[i].lock,
583 12291 : replication_states_ctl->tranche_id);
584 12291 : ConditionVariableInit(&replication_states[i].origin_cv);
585 : }
586 : }
587 :
588 : static void
589 0 : ReplicationOriginShmemAttach(void *arg)
590 : {
591 0 : if (max_active_replication_origins == 0)
592 0 : return;
593 :
594 0 : replication_states = replication_states_ctl->states;
595 : }
596 :
597 : /* ---------------------------------------------------------------------------
598 : * Perform a checkpoint of each replication origin's progress with respect to
599 : * the replayed remote_lsn. Make sure that all transactions we refer to in the
600 : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
601 : * if the transactions were originally committed asynchronously.
602 : *
603 : * We store checkpoints in the following format:
604 : * +-------+------------------------+------------------+-----+--------+
605 : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
606 : * +-------+------------------------+------------------+-----+--------+
607 : *
608 : * So its just the magic, followed by the statically sized
609 : * ReplicationStateOnDisk structs. Note that the maximum number of
610 : * ReplicationState is determined by max_active_replication_origins.
611 : * ---------------------------------------------------------------------------
612 : */
613 : void
614 1931 : CheckPointReplicationOrigin(void)
615 : {
616 1931 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
617 1931 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
618 : int tmpfd;
619 : int i;
620 1931 : uint32 magic = REPLICATION_STATE_MAGIC;
621 : pg_crc32c crc;
622 :
623 1931 : if (max_active_replication_origins == 0)
624 1 : return;
625 :
626 1930 : INIT_CRC32C(crc);
627 :
628 : /* make sure no old temp file is remaining */
629 1930 : if (unlink(tmppath) < 0 && errno != ENOENT)
630 0 : ereport(PANIC,
631 : (errcode_for_file_access(),
632 : errmsg("could not remove file \"%s\": %m",
633 : tmppath)));
634 :
635 : /*
636 : * no other backend can perform this at the same time; only one checkpoint
637 : * can happen at a time.
638 : */
639 1930 : tmpfd = OpenTransientFile(tmppath,
640 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
641 1930 : if (tmpfd < 0)
642 0 : ereport(PANIC,
643 : (errcode_for_file_access(),
644 : errmsg("could not create file \"%s\": %m",
645 : tmppath)));
646 :
647 : /* write magic */
648 1930 : errno = 0;
649 1930 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
650 : {
651 : /* if write didn't set errno, assume problem is no disk space */
652 0 : if (errno == 0)
653 0 : errno = ENOSPC;
654 0 : ereport(PANIC,
655 : (errcode_for_file_access(),
656 : errmsg("could not write to file \"%s\": %m",
657 : tmppath)));
658 : }
659 1930 : COMP_CRC32C(crc, &magic, sizeof(magic));
660 :
661 : /* prevent concurrent creations/drops */
662 1930 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
663 :
664 : /* write actual data */
665 21230 : for (i = 0; i < max_active_replication_origins; i++)
666 : {
667 : ReplicationStateOnDisk disk_state;
668 19300 : ReplicationState *curstate = &replication_states[i];
669 : XLogRecPtr local_lsn;
670 :
671 19300 : if (curstate->roident == InvalidReplOriginId)
672 19246 : continue;
673 :
674 : /* zero, to avoid uninitialized padding bytes */
675 54 : memset(&disk_state, 0, sizeof(disk_state));
676 :
677 54 : LWLockAcquire(&curstate->lock, LW_SHARED);
678 :
679 54 : disk_state.roident = curstate->roident;
680 :
681 54 : disk_state.remote_lsn = curstate->remote_lsn;
682 54 : local_lsn = curstate->local_lsn;
683 :
684 54 : LWLockRelease(&curstate->lock);
685 :
686 : /* make sure we only write out a commit that's persistent */
687 54 : XLogFlush(local_lsn);
688 :
689 54 : errno = 0;
690 54 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
691 : sizeof(disk_state))
692 : {
693 : /* if write didn't set errno, assume problem is no disk space */
694 0 : if (errno == 0)
695 0 : errno = ENOSPC;
696 0 : ereport(PANIC,
697 : (errcode_for_file_access(),
698 : errmsg("could not write to file \"%s\": %m",
699 : tmppath)));
700 : }
701 :
702 54 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
703 : }
704 :
705 1930 : LWLockRelease(ReplicationOriginLock);
706 :
707 : /* write out the CRC */
708 1930 : FIN_CRC32C(crc);
709 1930 : errno = 0;
710 1930 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
711 : {
712 : /* if write didn't set errno, assume problem is no disk space */
713 0 : if (errno == 0)
714 0 : errno = ENOSPC;
715 0 : ereport(PANIC,
716 : (errcode_for_file_access(),
717 : errmsg("could not write to file \"%s\": %m",
718 : tmppath)));
719 : }
720 :
721 1930 : if (CloseTransientFile(tmpfd) != 0)
722 0 : ereport(PANIC,
723 : (errcode_for_file_access(),
724 : errmsg("could not close file \"%s\": %m",
725 : tmppath)));
726 :
727 : /* fsync, rename to permanent file, fsync file and directory */
728 1930 : durable_rename(tmppath, path, PANIC);
729 : }
730 :
731 : /*
732 : * Recover replication replay status from checkpoint data saved earlier by
733 : * CheckPointReplicationOrigin.
734 : *
735 : * This only needs to be called at startup and *not* during every checkpoint
736 : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
737 : * state thereafter can be recovered by looking at commit records.
738 : */
739 : void
740 1070 : StartupReplicationOrigin(void)
741 : {
742 1070 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
743 : int fd;
744 : int readBytes;
745 1070 : uint32 magic = REPLICATION_STATE_MAGIC;
746 1070 : int last_state = 0;
747 : pg_crc32c file_crc;
748 : pg_crc32c crc;
749 :
750 : /* don't want to overwrite already existing state */
751 : #ifdef USE_ASSERT_CHECKING
752 : static bool already_started = false;
753 :
754 : Assert(!already_started);
755 : already_started = true;
756 : #endif
757 :
758 1070 : if (max_active_replication_origins == 0)
759 58 : return;
760 :
761 1069 : INIT_CRC32C(crc);
762 :
763 1069 : elog(DEBUG2, "starting up replication origin progress state");
764 :
765 1069 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
766 :
767 : /*
768 : * might have had max_active_replication_origins == 0 last run, or we just
769 : * brought up a standby.
770 : */
771 1069 : if (fd < 0 && errno == ENOENT)
772 57 : return;
773 1012 : else if (fd < 0)
774 0 : ereport(PANIC,
775 : (errcode_for_file_access(),
776 : errmsg("could not open file \"%s\": %m",
777 : path)));
778 :
779 : /* verify magic, that is written even if nothing was active */
780 1012 : readBytes = read(fd, &magic, sizeof(magic));
781 1012 : if (readBytes != sizeof(magic))
782 : {
783 0 : if (readBytes < 0)
784 0 : ereport(PANIC,
785 : (errcode_for_file_access(),
786 : errmsg("could not read file \"%s\": %m",
787 : path)));
788 : else
789 0 : ereport(PANIC,
790 : (errcode(ERRCODE_DATA_CORRUPTED),
791 : errmsg("could not read file \"%s\": read %d of %zu",
792 : path, readBytes, sizeof(magic))));
793 : }
794 1012 : COMP_CRC32C(crc, &magic, sizeof(magic));
795 :
796 1012 : if (magic != REPLICATION_STATE_MAGIC)
797 0 : ereport(PANIC,
798 : (errmsg("replication checkpoint has wrong magic %u instead of %u",
799 : magic, REPLICATION_STATE_MAGIC)));
800 :
801 : /* we can skip locking here, no other access is possible */
802 :
803 : /* recover individual states, until there are no more to be found */
804 : while (true)
805 31 : {
806 : ReplicationStateOnDisk disk_state;
807 :
808 1043 : readBytes = read(fd, &disk_state, sizeof(disk_state));
809 :
810 1043 : if (readBytes < 0)
811 : {
812 0 : ereport(PANIC,
813 : (errcode_for_file_access(),
814 : errmsg("could not read file \"%s\": %m",
815 : path)));
816 : }
817 :
818 : /* no further data */
819 1043 : if (readBytes == sizeof(crc))
820 : {
821 1012 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
822 1012 : break;
823 : }
824 :
825 31 : if (readBytes != sizeof(disk_state))
826 : {
827 0 : ereport(PANIC,
828 : (errcode_for_file_access(),
829 : errmsg("could not read file \"%s\": read %d of %zu",
830 : path, readBytes, sizeof(disk_state))));
831 : }
832 :
833 31 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
834 :
835 31 : if (last_state == max_active_replication_origins)
836 0 : ereport(PANIC,
837 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
838 : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
839 :
840 : /* copy data to shared memory */
841 31 : replication_states[last_state].roident = disk_state.roident;
842 31 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
843 31 : last_state++;
844 :
845 31 : ereport(LOG,
846 : errmsg("recovered replication state of node %d to %X/%08X",
847 : disk_state.roident,
848 : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
849 : }
850 :
851 : /* now check checksum */
852 1012 : FIN_CRC32C(crc);
853 1012 : if (file_crc != crc)
854 0 : ereport(PANIC,
855 : (errcode(ERRCODE_DATA_CORRUPTED),
856 : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
857 : crc, file_crc)));
858 :
859 1012 : if (CloseTransientFile(fd) != 0)
860 0 : ereport(PANIC,
861 : (errcode_for_file_access(),
862 : errmsg("could not close file \"%s\": %m",
863 : path)));
864 : }
865 :
866 : void
867 4 : replorigin_redo(XLogReaderState *record)
868 : {
869 4 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
870 :
871 4 : switch (info)
872 : {
873 2 : case XLOG_REPLORIGIN_SET:
874 : {
875 2 : xl_replorigin_set *xlrec =
876 2 : (xl_replorigin_set *) XLogRecGetData(record);
877 :
878 2 : replorigin_advance(xlrec->node_id,
879 : xlrec->remote_lsn, record->EndRecPtr,
880 2 : xlrec->force /* backward */ ,
881 : false /* WAL log */ );
882 2 : break;
883 : }
884 2 : case XLOG_REPLORIGIN_DROP:
885 : {
886 : xl_replorigin_drop *xlrec;
887 : int i;
888 :
889 2 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
890 :
891 3 : for (i = 0; i < max_active_replication_origins; i++)
892 : {
893 3 : ReplicationState *state = &replication_states[i];
894 :
895 : /* found our slot */
896 3 : if (state->roident == xlrec->node_id)
897 : {
898 : /* reset entry */
899 2 : state->roident = InvalidReplOriginId;
900 2 : state->remote_lsn = InvalidXLogRecPtr;
901 2 : state->local_lsn = InvalidXLogRecPtr;
902 2 : break;
903 : }
904 : }
905 2 : break;
906 : }
907 0 : default:
908 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
909 : }
910 4 : }
911 :
912 :
913 : /*
914 : * Tell the replication origin progress machinery that a commit from 'node'
915 : * that originated at the LSN remote_commit on the remote node was replayed
916 : * successfully and that we don't need to do so again. In combination with
917 : * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
918 : * that ensures we won't lose knowledge about that after a crash if the
919 : * transaction had a persistent effect (think of asynchronous commits).
920 : *
921 : * local_commit needs to be a local LSN of the commit so that we can make sure
922 : * upon a checkpoint that enough WAL has been persisted to disk.
923 : *
924 : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
925 : * unless running in recovery.
926 : */
927 : void
928 257 : replorigin_advance(ReplOriginId node,
929 : XLogRecPtr remote_commit, XLogRecPtr local_commit,
930 : bool go_backward, bool wal_log)
931 : {
932 : int i;
933 257 : ReplicationState *replication_state = NULL;
934 257 : ReplicationState *free_state = NULL;
935 :
936 : Assert(node != InvalidReplOriginId);
937 :
938 : /* we don't track DoNotReplicateId */
939 257 : if (node == DoNotReplicateId)
940 0 : return;
941 :
942 : /*
943 : * XXX: For the case where this is called by WAL replay, it'd be more
944 : * efficient to restore into a backend local hashtable and only dump into
945 : * shmem after recovery is finished. Let's wait with implementing that
946 : * till it's shown to be a measurable expense
947 : */
948 :
949 : /* Lock exclusively, as we may have to create a new table entry. */
950 257 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
951 :
952 : /*
953 : * Search for either an existing slot for the origin, or a free one we can
954 : * use.
955 : */
956 2371 : for (i = 0; i < max_active_replication_origins; i++)
957 : {
958 2161 : ReplicationState *curstate = &replication_states[i];
959 :
960 : /* remember where to insert if necessary */
961 2161 : if (curstate->roident == InvalidReplOriginId &&
962 : free_state == NULL)
963 : {
964 211 : free_state = curstate;
965 211 : continue;
966 : }
967 :
968 : /* not our slot */
969 1950 : if (curstate->roident != node)
970 : {
971 1903 : continue;
972 : }
973 :
974 : /* ok, found slot */
975 47 : replication_state = curstate;
976 :
977 47 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
978 :
979 : /* Make sure it's not used by somebody else */
980 47 : if (replication_state->refcount > 0)
981 : {
982 0 : ereport(ERROR,
983 : (errcode(ERRCODE_OBJECT_IN_USE),
984 : (replication_state->acquired_by != 0)
985 : ? errmsg("replication origin with ID %d is already active for PID %d",
986 : replication_state->roident,
987 : replication_state->acquired_by)
988 : : errmsg("replication origin with ID %d is already active in another process",
989 : replication_state->roident)));
990 : }
991 :
992 47 : break;
993 : }
994 :
995 257 : if (replication_state == NULL && free_state == NULL)
996 0 : ereport(ERROR,
997 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
998 : errmsg("could not find free replication state slot for replication origin with ID %d",
999 : node),
1000 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1001 :
1002 257 : if (replication_state == NULL)
1003 : {
1004 : /* initialize new slot */
1005 210 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
1006 210 : replication_state = free_state;
1007 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
1008 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
1009 210 : replication_state->roident = node;
1010 : }
1011 :
1012 : Assert(replication_state->roident != InvalidReplOriginId);
1013 :
1014 : /*
1015 : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1016 : * and the standby gets the message. Primarily this will be called during
1017 : * WAL replay (of commit records) where no WAL logging is necessary.
1018 : */
1019 257 : if (wal_log)
1020 : {
1021 : xl_replorigin_set xlrec;
1022 :
1023 216 : xlrec.remote_lsn = remote_commit;
1024 216 : xlrec.node_id = node;
1025 216 : xlrec.force = go_backward;
1026 :
1027 216 : XLogBeginInsert();
1028 216 : XLogRegisterData(&xlrec, sizeof(xlrec));
1029 :
1030 216 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1031 : }
1032 :
1033 : /*
1034 : * Due to - harmless - race conditions during a checkpoint we could see
1035 : * values here that are older than the ones we already have in memory. We
1036 : * could also see older values for prepared transactions when the prepare
1037 : * is sent at a later point of time along with commit prepared and there
1038 : * are other transactions commits between prepare and commit prepared. See
1039 : * ReorderBufferFinishPrepared. Don't overwrite those.
1040 : */
1041 257 : if (go_backward || replication_state->remote_lsn < remote_commit)
1042 250 : replication_state->remote_lsn = remote_commit;
1043 257 : if (XLogRecPtrIsValid(local_commit) &&
1044 38 : (go_backward || replication_state->local_lsn < local_commit))
1045 40 : replication_state->local_lsn = local_commit;
1046 257 : LWLockRelease(&replication_state->lock);
1047 :
1048 : /*
1049 : * Release *after* changing the LSNs, slot isn't acquired and thus could
1050 : * otherwise be dropped anytime.
1051 : */
1052 257 : LWLockRelease(ReplicationOriginLock);
1053 : }
1054 :
1055 :
1056 : XLogRecPtr
1057 9 : replorigin_get_progress(ReplOriginId node, bool flush)
1058 : {
1059 : int i;
1060 9 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1061 9 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1062 :
1063 : /* prevent slots from being concurrently dropped */
1064 9 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1065 :
1066 49 : for (i = 0; i < max_active_replication_origins; i++)
1067 : {
1068 : ReplicationState *state;
1069 :
1070 45 : state = &replication_states[i];
1071 :
1072 45 : if (state->roident == node)
1073 : {
1074 5 : LWLockAcquire(&state->lock, LW_SHARED);
1075 :
1076 5 : remote_lsn = state->remote_lsn;
1077 5 : local_lsn = state->local_lsn;
1078 :
1079 5 : LWLockRelease(&state->lock);
1080 :
1081 5 : break;
1082 : }
1083 : }
1084 :
1085 9 : LWLockRelease(ReplicationOriginLock);
1086 :
1087 9 : if (flush && XLogRecPtrIsValid(local_lsn))
1088 1 : XLogFlush(local_lsn);
1089 :
1090 9 : return remote_lsn;
1091 : }
1092 :
1093 : /* Helper function to reset the session replication origin */
1094 : static void
1095 546 : replorigin_session_reset_internal(void)
1096 : {
1097 : ConditionVariable *cv;
1098 :
1099 : Assert(session_replication_state != NULL);
1100 :
1101 546 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1102 :
1103 : /* The origin must be held by at least one process at this point. */
1104 : Assert(session_replication_state->refcount > 0);
1105 :
1106 : /*
1107 : * Reset the PID only if the current session is the first to set up this
1108 : * origin. This avoids clearing the first process's PID when any other
1109 : * session releases the origin.
1110 : */
1111 546 : if (session_replication_state->acquired_by == MyProcPid)
1112 531 : session_replication_state->acquired_by = 0;
1113 :
1114 546 : session_replication_state->refcount--;
1115 :
1116 546 : cv = &session_replication_state->origin_cv;
1117 546 : session_replication_state = NULL;
1118 :
1119 546 : LWLockRelease(ReplicationOriginLock);
1120 :
1121 546 : ConditionVariableBroadcast(cv);
1122 546 : }
1123 :
1124 : /*
1125 : * Tear down a (possibly) configured session replication origin during process
1126 : * exit.
1127 : */
1128 : static void
1129 543 : ReplicationOriginExitCleanup(int code, Datum arg)
1130 : {
1131 543 : if (session_replication_state == NULL)
1132 204 : return;
1133 :
1134 339 : replorigin_session_reset_internal();
1135 : }
1136 :
1137 : /*
1138 : * Setup a replication origin in the shared memory struct if it doesn't
1139 : * already exist and cache access to the specific ReplicationSlot so the
1140 : * array doesn't have to be searched when calling
1141 : * replorigin_session_advance().
1142 : *
1143 : * Normally only one such cached origin can exist per process so the cached
1144 : * value can only be set again after the previous value is torn down with
1145 : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1146 : * (meaning the slot is not allowed to be already acquired by another process).
1147 : *
1148 : * However, sometimes multiple processes can safely re-use the same origin slot
1149 : * (for example, multiple parallel apply processes can safely use the same
1150 : * origin, provided they maintain commit order by allowing only one process to
1151 : * commit at a time). For this case the first process must pass acquired_by =
1152 : * 0, and then the other processes sharing that same origin can pass
1153 : * acquired_by = PID of the first process.
1154 : */
1155 : void
1156 549 : replorigin_session_setup(ReplOriginId node, int acquired_by)
1157 : {
1158 : static bool registered_cleanup;
1159 : int i;
1160 549 : int free_slot = -1;
1161 :
1162 549 : if (!registered_cleanup)
1163 : {
1164 543 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1165 543 : registered_cleanup = true;
1166 : }
1167 :
1168 : Assert(max_active_replication_origins > 0);
1169 :
1170 549 : if (session_replication_state != NULL)
1171 1 : ereport(ERROR,
1172 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1173 : errmsg("cannot setup replication origin when one is already setup")));
1174 :
1175 : /* Lock exclusively, as we may have to create a new table entry. */
1176 548 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1177 :
1178 : /*
1179 : * Search for either an existing slot for the origin, or a free one we can
1180 : * use.
1181 : */
1182 2110 : for (i = 0; i < max_active_replication_origins; i++)
1183 : {
1184 1984 : ReplicationState *curstate = &replication_states[i];
1185 :
1186 : /* remember where to insert if necessary */
1187 1984 : if (curstate->roident == InvalidReplOriginId &&
1188 : free_slot == -1)
1189 : {
1190 128 : free_slot = i;
1191 128 : continue;
1192 : }
1193 :
1194 : /* not our slot */
1195 1856 : if (curstate->roident != node)
1196 1434 : continue;
1197 :
1198 422 : if (acquired_by == 0)
1199 : {
1200 : /* With acquired_by == 0, we need the origin to be free */
1201 407 : if (curstate->acquired_by != 0)
1202 : {
1203 1 : ereport(ERROR,
1204 : (errcode(ERRCODE_OBJECT_IN_USE),
1205 : errmsg("replication origin with ID %d is already active for PID %d",
1206 : curstate->roident, curstate->acquired_by)));
1207 : }
1208 406 : else if (curstate->refcount > 0)
1209 : {
1210 : /*
1211 : * The origin is in use, but PID is not recorded. This can
1212 : * happen if the process that originally acquired the origin
1213 : * exited without releasing it. To ensure correctness, other
1214 : * processes cannot acquire the origin until all processes
1215 : * currently using it have released it.
1216 : */
1217 0 : ereport(ERROR,
1218 : (errcode(ERRCODE_OBJECT_IN_USE),
1219 : errmsg("replication origin with ID %d is already active in another process",
1220 : curstate->roident)));
1221 : }
1222 : }
1223 : else
1224 : {
1225 : /*
1226 : * With acquired_by != 0, we need the origin to be active by the
1227 : * given PID
1228 : */
1229 15 : if (curstate->acquired_by != acquired_by)
1230 0 : ereport(ERROR,
1231 : (errcode(ERRCODE_OBJECT_IN_USE),
1232 : errmsg("replication origin with ID %d is not active for PID %d",
1233 : curstate->roident, acquired_by)));
1234 :
1235 : /*
1236 : * Here, it is okay to have refcount > 0 as more than one process
1237 : * can safely re-use the origin.
1238 : */
1239 : }
1240 :
1241 : /* ok, found slot */
1242 421 : session_replication_state = curstate;
1243 421 : break;
1244 : }
1245 :
1246 547 : if (session_replication_state == NULL)
1247 : {
1248 126 : if (acquired_by != 0)
1249 1 : ereport(ERROR,
1250 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1251 : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1252 : acquired_by, node)));
1253 :
1254 : /* initialize new slot */
1255 125 : if (free_slot == -1)
1256 0 : ereport(ERROR,
1257 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1258 : errmsg("could not find free replication state slot for replication origin with ID %d",
1259 : node),
1260 : errhint("Increase \"max_active_replication_origins\" and try again.")));
1261 :
1262 125 : session_replication_state = &replication_states[free_slot];
1263 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1264 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
1265 125 : session_replication_state->roident = node;
1266 : }
1267 :
1268 :
1269 : Assert(session_replication_state->roident != InvalidReplOriginId);
1270 :
1271 546 : if (acquired_by == 0)
1272 : {
1273 531 : session_replication_state->acquired_by = MyProcPid;
1274 : Assert(session_replication_state->refcount == 0);
1275 : }
1276 : else
1277 : {
1278 : /*
1279 : * Sanity check: the origin must already be acquired by the process
1280 : * passed as input, and at least one process must be using it.
1281 : */
1282 : Assert(session_replication_state->acquired_by == acquired_by);
1283 : Assert(session_replication_state->refcount > 0);
1284 : }
1285 :
1286 546 : session_replication_state->refcount++;
1287 :
1288 546 : LWLockRelease(ReplicationOriginLock);
1289 :
1290 : /* probably this one is pointless */
1291 546 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
1292 546 : }
1293 :
1294 : /*
1295 : * Reset replay state previously setup in this session.
1296 : *
1297 : * This function may only be called if an origin was setup with
1298 : * replorigin_session_setup().
1299 : */
1300 : void
1301 209 : replorigin_session_reset(void)
1302 : {
1303 : Assert(max_active_replication_origins != 0);
1304 :
1305 209 : if (session_replication_state == NULL)
1306 1 : ereport(ERROR,
1307 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1308 : errmsg("no replication origin is configured")));
1309 :
1310 : /*
1311 : * Restrict explicit resetting of the replication origin if it was first
1312 : * acquired by this process and others are still using it. While the
1313 : * system handles this safely (as happens if the first session exits
1314 : * without calling reset), it is best to avoid doing so.
1315 : */
1316 208 : if (session_replication_state->acquired_by == MyProcPid &&
1317 206 : session_replication_state->refcount > 1)
1318 1 : ereport(ERROR,
1319 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1320 : errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1321 : session_replication_state->roident),
1322 : errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1323 : errhint("Reset the replication origin in all other processes before retrying.")));
1324 :
1325 207 : replorigin_session_reset_internal();
1326 207 : }
1327 :
1328 : /*
1329 : * Do the same work replorigin_advance() does, just on the session's
1330 : * configured origin.
1331 : *
1332 : * This is noticeably cheaper than using replorigin_advance().
1333 : */
1334 : void
1335 1168 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1336 : {
1337 : Assert(session_replication_state != NULL);
1338 : Assert(session_replication_state->roident != InvalidReplOriginId);
1339 :
1340 1168 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1341 1168 : if (session_replication_state->local_lsn < local_commit)
1342 1168 : session_replication_state->local_lsn = local_commit;
1343 1168 : if (session_replication_state->remote_lsn < remote_commit)
1344 544 : session_replication_state->remote_lsn = remote_commit;
1345 1168 : LWLockRelease(&session_replication_state->lock);
1346 1168 : }
1347 :
1348 : /*
1349 : * Ask the machinery about the point up to which we successfully replayed
1350 : * changes from an already setup replication origin.
1351 : */
1352 : XLogRecPtr
1353 313 : replorigin_session_get_progress(bool flush)
1354 : {
1355 : XLogRecPtr remote_lsn;
1356 : XLogRecPtr local_lsn;
1357 :
1358 : Assert(session_replication_state != NULL);
1359 :
1360 313 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1361 313 : remote_lsn = session_replication_state->remote_lsn;
1362 313 : local_lsn = session_replication_state->local_lsn;
1363 313 : LWLockRelease(&session_replication_state->lock);
1364 :
1365 313 : if (flush && XLogRecPtrIsValid(local_lsn))
1366 1 : XLogFlush(local_lsn);
1367 :
1368 313 : return remote_lsn;
1369 : }
1370 :
1371 : /*
1372 : * Clear the per-transaction replication origin state.
1373 : *
1374 : * replorigin_session_origin is also cleared if clear_origin is set.
1375 : */
1376 : void
1377 864 : replorigin_xact_clear(bool clear_origin)
1378 : {
1379 864 : replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
1380 864 : replorigin_xact_state.origin_timestamp = 0;
1381 864 : if (clear_origin)
1382 864 : replorigin_xact_state.origin = InvalidReplOriginId;
1383 864 : }
1384 :
1385 :
1386 : /* ---------------------------------------------------------------------------
1387 : * SQL functions for working with replication origin.
1388 : *
1389 : * These mostly should be fairly short wrappers around more generic functions.
1390 : * ---------------------------------------------------------------------------
1391 : */
1392 :
1393 : /*
1394 : * Create replication origin for the passed in name, and return the assigned
1395 : * oid.
1396 : */
1397 : Datum
1398 15 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1399 : {
1400 : char *name;
1401 : ReplOriginId roident;
1402 :
1403 15 : replorigin_check_prerequisites(false, false);
1404 :
1405 15 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1406 :
1407 : /*
1408 : * Replication origins "any and "none" are reserved for system options.
1409 : * The origins "pg_xxx" are reserved for internal use.
1410 : */
1411 15 : if (IsReservedName(name) || IsReservedOriginName(name))
1412 3 : ereport(ERROR,
1413 : (errcode(ERRCODE_RESERVED_NAME),
1414 : errmsg("replication origin name \"%s\" is reserved",
1415 : name),
1416 : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1417 : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1418 :
1419 : /*
1420 : * If built with appropriate switch, whine when regression-testing
1421 : * conventions for replication origin names are violated.
1422 : */
1423 : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1424 : if (strncmp(name, "regress_", 8) != 0)
1425 : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1426 : #endif
1427 :
1428 12 : roident = replorigin_create(name);
1429 :
1430 7 : pfree(name);
1431 :
1432 7 : PG_RETURN_OID(roident);
1433 : }
1434 :
1435 : /*
1436 : * Drop replication origin.
1437 : */
1438 : Datum
1439 9 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1440 : {
1441 : char *name;
1442 :
1443 9 : replorigin_check_prerequisites(false, false);
1444 :
1445 9 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1446 :
1447 9 : replorigin_drop_by_name(name, false, true);
1448 :
1449 8 : pfree(name);
1450 :
1451 8 : PG_RETURN_VOID();
1452 : }
1453 :
1454 : /*
1455 : * Return oid of a replication origin.
1456 : */
1457 : Datum
1458 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1459 : {
1460 : char *name;
1461 : ReplOriginId roident;
1462 :
1463 0 : replorigin_check_prerequisites(false, false);
1464 :
1465 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1466 0 : roident = replorigin_by_name(name, true);
1467 :
1468 0 : pfree(name);
1469 :
1470 0 : if (OidIsValid(roident))
1471 0 : PG_RETURN_OID(roident);
1472 0 : PG_RETURN_NULL();
1473 : }
1474 :
1475 : /*
1476 : * Setup a replication origin for this session.
1477 : */
1478 : Datum
1479 11 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1480 : {
1481 : char *name;
1482 : ReplOriginId origin;
1483 : int pid;
1484 :
1485 11 : replorigin_check_prerequisites(true, false);
1486 :
1487 11 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1488 11 : origin = replorigin_by_name(name, false);
1489 10 : pid = PG_GETARG_INT32(1);
1490 10 : replorigin_session_setup(origin, pid);
1491 :
1492 8 : replorigin_xact_state.origin = origin;
1493 :
1494 8 : pfree(name);
1495 :
1496 8 : PG_RETURN_VOID();
1497 : }
1498 :
1499 : /*
1500 : * Reset previously setup origin in this session
1501 : */
1502 : Datum
1503 10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1504 : {
1505 10 : replorigin_check_prerequisites(true, false);
1506 :
1507 10 : replorigin_session_reset();
1508 :
1509 8 : replorigin_xact_clear(true);
1510 :
1511 8 : PG_RETURN_VOID();
1512 : }
1513 :
1514 : /*
1515 : * Has a replication origin been setup for this session.
1516 : */
1517 : Datum
1518 4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1519 : {
1520 4 : replorigin_check_prerequisites(false, false);
1521 :
1522 4 : PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
1523 : }
1524 :
1525 :
1526 : /*
1527 : * Return the replication progress for origin setup in the current session.
1528 : *
1529 : * If 'flush' is set to true it is ensured that the returned value corresponds
1530 : * to a local transaction that has been flushed. This is useful if asynchronous
1531 : * commits are used when replaying replicated transactions.
1532 : */
1533 : Datum
1534 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1535 : {
1536 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1537 2 : bool flush = PG_GETARG_BOOL(0);
1538 :
1539 2 : replorigin_check_prerequisites(true, false);
1540 :
1541 2 : if (session_replication_state == NULL)
1542 0 : ereport(ERROR,
1543 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1544 : errmsg("no replication origin is configured")));
1545 :
1546 2 : remote_lsn = replorigin_session_get_progress(flush);
1547 :
1548 2 : if (!XLogRecPtrIsValid(remote_lsn))
1549 0 : PG_RETURN_NULL();
1550 :
1551 2 : PG_RETURN_LSN(remote_lsn);
1552 : }
1553 :
1554 : Datum
1555 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1556 : {
1557 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1558 :
1559 1 : replorigin_check_prerequisites(true, false);
1560 :
1561 1 : if (session_replication_state == NULL)
1562 0 : ereport(ERROR,
1563 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1564 : errmsg("no replication origin is configured")));
1565 :
1566 1 : replorigin_xact_state.origin_lsn = location;
1567 1 : replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1568 :
1569 1 : PG_RETURN_VOID();
1570 : }
1571 :
1572 : Datum
1573 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1574 : {
1575 0 : replorigin_check_prerequisites(true, false);
1576 :
1577 : /* Do not clear the session origin */
1578 0 : replorigin_xact_clear(false);
1579 :
1580 0 : PG_RETURN_VOID();
1581 : }
1582 :
1583 :
1584 : Datum
1585 3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1586 : {
1587 3 : text *name = PG_GETARG_TEXT_PP(0);
1588 3 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1589 : ReplOriginId node;
1590 :
1591 3 : replorigin_check_prerequisites(true, false);
1592 :
1593 : /* lock to prevent the replication origin from vanishing */
1594 3 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1595 :
1596 3 : node = replorigin_by_name(text_to_cstring(name), false);
1597 :
1598 : /*
1599 : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1600 : * xact hasn't committed yet. This is why this function should be used to
1601 : * set up the initial replication state, but not for replay.
1602 : */
1603 2 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1604 : true /* go backward */ , true /* WAL log */ );
1605 :
1606 2 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1607 :
1608 2 : PG_RETURN_VOID();
1609 : }
1610 :
1611 :
1612 : /*
1613 : * Return the replication progress for an individual replication origin.
1614 : *
1615 : * If 'flush' is set to true it is ensured that the returned value corresponds
1616 : * to a local transaction that has been flushed. This is useful if asynchronous
1617 : * commits are used when replaying replicated transactions.
1618 : */
1619 : Datum
1620 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1621 : {
1622 : char *name;
1623 : bool flush;
1624 : ReplOriginId roident;
1625 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1626 :
1627 3 : replorigin_check_prerequisites(true, true);
1628 :
1629 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1630 3 : flush = PG_GETARG_BOOL(1);
1631 :
1632 3 : roident = replorigin_by_name(name, false);
1633 : Assert(OidIsValid(roident));
1634 :
1635 2 : remote_lsn = replorigin_get_progress(roident, flush);
1636 :
1637 2 : if (!XLogRecPtrIsValid(remote_lsn))
1638 0 : PG_RETURN_NULL();
1639 :
1640 2 : PG_RETURN_LSN(remote_lsn);
1641 : }
1642 :
1643 :
1644 : Datum
1645 11 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1646 : {
1647 11 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1648 : int i;
1649 : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1650 :
1651 : /* we want to return 0 rows if slot is set to zero */
1652 11 : replorigin_check_prerequisites(false, true);
1653 :
1654 11 : InitMaterializedSRF(fcinfo, 0);
1655 :
1656 : /* prevent slots from being concurrently dropped */
1657 11 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1658 :
1659 : /*
1660 : * Iterate through all possible replication_states, display if they are
1661 : * filled. Note that we do not take any locks, so slightly corrupted/out
1662 : * of date values are a possibility.
1663 : */
1664 121 : for (i = 0; i < max_active_replication_origins; i++)
1665 : {
1666 : ReplicationState *state;
1667 : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1668 : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1669 : char *roname;
1670 :
1671 110 : state = &replication_states[i];
1672 :
1673 : /* unused slot, nothing to display */
1674 110 : if (state->roident == InvalidReplOriginId)
1675 97 : continue;
1676 :
1677 13 : memset(values, 0, sizeof(values));
1678 13 : memset(nulls, 1, sizeof(nulls));
1679 :
1680 13 : values[0] = ObjectIdGetDatum(state->roident);
1681 13 : nulls[0] = false;
1682 :
1683 : /*
1684 : * We're not preventing the origin to be dropped concurrently, so
1685 : * silently accept that it might be gone.
1686 : */
1687 13 : if (replorigin_by_oid(state->roident, true,
1688 : &roname))
1689 : {
1690 13 : values[1] = CStringGetTextDatum(roname);
1691 13 : nulls[1] = false;
1692 : }
1693 :
1694 13 : LWLockAcquire(&state->lock, LW_SHARED);
1695 :
1696 13 : values[2] = LSNGetDatum(state->remote_lsn);
1697 13 : nulls[2] = false;
1698 :
1699 13 : values[3] = LSNGetDatum(state->local_lsn);
1700 13 : nulls[3] = false;
1701 :
1702 13 : LWLockRelease(&state->lock);
1703 :
1704 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1705 : values, nulls);
1706 : }
1707 :
1708 11 : LWLockRelease(ReplicationOriginLock);
1709 :
1710 : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1711 :
1712 11 : return (Datum) 0;
1713 : }
|