Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * sequencesync.c
3 : * PostgreSQL logical replication: sequence synchronization
4 : *
5 : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/sequencesync.c
9 : *
10 : * NOTES
11 : * This file contains code for sequence synchronization for
12 : * logical replication.
13 : *
14 : * Sequences requiring synchronization are tracked in the pg_subscription_rel
15 : * catalog.
16 : *
17 : * Sequences to be synchronized will be added with state INIT when either of
18 : * the following commands is executed:
19 : * CREATE SUBSCRIPTION
20 : * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
21 : *
22 : * Executing the following command resets all sequences in the subscription to
23 : * state INIT, triggering re-synchronization:
24 : * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
25 : *
26 : * The apply worker periodically scans pg_subscription_rel for sequences in
27 : * INIT state. When such sequences are found, it spawns a sequencesync worker
28 : * to handle synchronization.
29 : *
30 : * A single sequencesync worker is responsible for synchronizing all sequences.
31 : * It begins by retrieving the list of sequences that are flagged for
32 : * synchronization, i.e., those in the INIT state. These sequences are then
33 : * processed in batches, allowing multiple entries to be synchronized within a
34 : * single transaction. The worker fetches the current sequence values and page
35 : * LSNs from the remote publisher, updates the corresponding sequences on the
36 : * local subscriber, and finally marks each sequence as READY upon successful
37 : * synchronization.
38 : *
39 : * Sequence state transitions follow this pattern:
40 : * INIT -> READY
41 : *
42 : * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
43 : * sequences are synchronized per transaction. The locks on the sequence
44 : * relation will be periodically released at each transaction commit.
45 : *
46 : * XXX: We didn't choose launcher process to maintain the launch of sequencesync
47 : * worker as it didn't have database connection to access the sequences from the
48 : * pg_subscription_rel system catalog that need to be synchronized.
49 : *-------------------------------------------------------------------------
50 : */
51 :
52 : #include "postgres.h"
53 :
54 : #include "access/genam.h"
55 : #include "access/table.h"
56 : #include "catalog/pg_sequence.h"
57 : #include "catalog/pg_subscription_rel.h"
58 : #include "commands/sequence.h"
59 : #include "pgstat.h"
60 : #include "postmaster/interrupt.h"
61 : #include "replication/logicalworker.h"
62 : #include "replication/worker_internal.h"
63 : #include "utils/acl.h"
64 : #include "utils/builtins.h"
65 : #include "utils/fmgroids.h"
66 : #include "utils/guc.h"
67 : #include "utils/inval.h"
68 : #include "utils/lsyscache.h"
69 : #include "utils/memutils.h"
70 : #include "utils/pg_lsn.h"
71 : #include "utils/syscache.h"
72 : #include "utils/usercontext.h"
73 :
74 : #define REMOTE_SEQ_COL_COUNT 10
75 :
76 : typedef enum CopySeqResult
77 : {
78 : COPYSEQ_SUCCESS,
79 : COPYSEQ_MISMATCH,
80 : COPYSEQ_INSUFFICIENT_PERM,
81 : COPYSEQ_SKIPPED
82 : } CopySeqResult;
83 :
84 : static List *seqinfos = NIL;
85 :
86 : /*
87 : * Apply worker determines if sequence synchronization is needed.
88 : *
89 : * Start a sequencesync worker if one is not already running. The active
90 : * sequencesync worker will handle all pending sequence synchronization. If any
91 : * sequences remain unsynchronized after it exits, a new worker can be started
92 : * in the next iteration.
93 : */
94 : void
95 21926 : ProcessSequencesForSync(void)
96 : {
97 : LogicalRepWorker *sequencesync_worker;
98 : int nsyncworkers;
99 : bool has_pending_sequences;
100 : bool started_tx;
101 :
102 21926 : FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
103 :
104 21926 : if (started_tx)
105 : {
106 352 : CommitTransactionCommand();
107 352 : pgstat_report_stat(true);
108 : }
109 :
110 21926 : if (!has_pending_sequences)
111 21896 : return;
112 :
113 52 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
114 :
115 : /* Check if there is a sequencesync worker already running? */
116 52 : sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
117 52 : MyLogicalRepWorker->subid,
118 : InvalidOid, true);
119 52 : if (sequencesync_worker)
120 : {
121 22 : LWLockRelease(LogicalRepWorkerLock);
122 22 : return;
123 : }
124 :
125 : /*
126 : * Count running sync workers for this subscription, while we have the
127 : * lock.
128 : */
129 30 : nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
130 30 : LWLockRelease(LogicalRepWorkerLock);
131 :
132 : /*
133 : * It is okay to read/update last_seqsync_start_time here in apply worker
134 : * as we have already ensured that sync worker doesn't exist.
135 : */
136 30 : launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
137 30 : &MyLogicalRepWorker->last_seqsync_start_time);
138 : }
139 :
140 : /*
141 : * get_sequences_string
142 : *
143 : * Build a comma-separated string of schema-qualified sequence names
144 : * for the given list of sequence indexes.
145 : */
146 : static void
147 8 : get_sequences_string(List *seqindexes, StringInfo buf)
148 : {
149 8 : resetStringInfo(buf);
150 24 : foreach_int(seqidx, seqindexes)
151 : {
152 : LogicalRepSequenceInfo *seqinfo =
153 8 : (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
154 :
155 8 : if (buf->len > 0)
156 0 : appendStringInfoString(buf, ", ");
157 :
158 8 : appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
159 : }
160 8 : }
161 :
162 : /*
163 : * report_sequence_errors
164 : *
165 : * Report discrepancies found during sequence synchronization between
166 : * the publisher and subscriber. Emits warnings for:
167 : * a) mismatched definitions or concurrent rename
168 : * b) insufficient privileges
169 : * c) missing sequences on the subscriber
170 : * Then raises an ERROR to indicate synchronization failure.
171 : */
172 : static void
173 18 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
174 : List *missing_seqs_idx)
175 : {
176 : StringInfo seqstr;
177 :
178 : /* Quick exit if there are no errors to report */
179 18 : if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
180 10 : return;
181 :
182 8 : seqstr = makeStringInfo();
183 :
184 8 : if (mismatched_seqs_idx)
185 : {
186 6 : get_sequences_string(mismatched_seqs_idx, seqstr);
187 6 : ereport(WARNING,
188 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
189 : errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
190 : "mismatched or renamed sequences on subscriber (%s)",
191 : list_length(mismatched_seqs_idx),
192 : seqstr->data));
193 : }
194 :
195 8 : if (insuffperm_seqs_idx)
196 : {
197 0 : get_sequences_string(insuffperm_seqs_idx, seqstr);
198 0 : ereport(WARNING,
199 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
200 : errmsg_plural("insufficient privileges on sequence (%s)",
201 : "insufficient privileges on sequences (%s)",
202 : list_length(insuffperm_seqs_idx),
203 : seqstr->data));
204 : }
205 :
206 8 : if (missing_seqs_idx)
207 : {
208 2 : get_sequences_string(missing_seqs_idx, seqstr);
209 2 : ereport(WARNING,
210 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
211 : errmsg_plural("missing sequence on publisher (%s)",
212 : "missing sequences on publisher (%s)",
213 : list_length(missing_seqs_idx),
214 : seqstr->data));
215 : }
216 :
217 8 : ereport(ERROR,
218 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
219 : errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
220 : MySubscription->name));
221 : }
222 :
223 : /*
224 : * get_and_validate_seq_info
225 : *
226 : * Extracts remote sequence information from the tuple slot received from the
227 : * publisher, and validates it against the corresponding local sequence
228 : * definition.
229 : */
230 : static CopySeqResult
231 24 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
232 : LogicalRepSequenceInfo **seqinfo, int *seqidx)
233 : {
234 : bool isnull;
235 24 : int col = 0;
236 : Oid remote_typid;
237 : int64 remote_start;
238 : int64 remote_increment;
239 : int64 remote_min;
240 : int64 remote_max;
241 : bool remote_cycle;
242 24 : CopySeqResult result = COPYSEQ_SUCCESS;
243 : HeapTuple tup;
244 : Form_pg_sequence local_seq;
245 : LogicalRepSequenceInfo *seqinfo_local;
246 :
247 24 : *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
248 : Assert(!isnull);
249 :
250 : /* Identify the corresponding local sequence for the given index. */
251 24 : *seqinfo = seqinfo_local =
252 24 : (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
253 :
254 24 : seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
255 : Assert(!isnull);
256 :
257 24 : seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
258 : Assert(!isnull);
259 :
260 24 : seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
261 : Assert(!isnull);
262 :
263 24 : remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
264 : Assert(!isnull);
265 :
266 24 : remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
267 : Assert(!isnull);
268 :
269 24 : remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
270 : Assert(!isnull);
271 :
272 24 : remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
273 : Assert(!isnull);
274 :
275 24 : remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
276 : Assert(!isnull);
277 :
278 24 : remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
279 : Assert(!isnull);
280 :
281 : /* Sanity check */
282 : Assert(col == REMOTE_SEQ_COL_COUNT);
283 :
284 24 : seqinfo_local->found_on_pub = true;
285 :
286 24 : *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
287 :
288 : /* Sequence was concurrently dropped? */
289 24 : if (!*sequence_rel)
290 0 : return COPYSEQ_SKIPPED;
291 :
292 24 : tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
293 :
294 : /* Sequence was concurrently dropped? */
295 24 : if (!HeapTupleIsValid(tup))
296 0 : elog(ERROR, "cache lookup failed for sequence %u",
297 : seqinfo_local->localrelid);
298 :
299 24 : local_seq = (Form_pg_sequence) GETSTRUCT(tup);
300 :
301 : /* Sequence parameters for remote/local are the same? */
302 24 : if (local_seq->seqtypid != remote_typid ||
303 24 : local_seq->seqstart != remote_start ||
304 22 : local_seq->seqincrement != remote_increment ||
305 18 : local_seq->seqmin != remote_min ||
306 18 : local_seq->seqmax != remote_max ||
307 18 : local_seq->seqcycle != remote_cycle)
308 6 : result = COPYSEQ_MISMATCH;
309 :
310 : /* Sequence was concurrently renamed? */
311 24 : if (strcmp(seqinfo_local->nspname,
312 24 : get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
313 24 : strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
314 0 : result = COPYSEQ_MISMATCH;
315 :
316 24 : ReleaseSysCache(tup);
317 24 : return result;
318 : }
319 :
320 : /*
321 : * Apply remote sequence state to local sequence and mark it as
322 : * synchronized (READY).
323 : */
324 : static CopySeqResult
325 18 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
326 : {
327 : UserContext ucxt;
328 : AclResult aclresult;
329 18 : bool run_as_owner = MySubscription->runasowner;
330 18 : Oid seqoid = seqinfo->localrelid;
331 :
332 : /*
333 : * If the user did not opt to run as the owner of the subscription
334 : * ('run_as_owner'), then copy the sequence as the owner of the sequence.
335 : */
336 18 : if (!run_as_owner)
337 18 : SwitchToUntrustedUser(seqowner, &ucxt);
338 :
339 18 : aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
340 :
341 18 : if (aclresult != ACLCHECK_OK)
342 : {
343 0 : if (!run_as_owner)
344 0 : RestoreUserContext(&ucxt);
345 :
346 0 : return COPYSEQ_INSUFFICIENT_PERM;
347 : }
348 :
349 : /*
350 : * The log counter (log_cnt) tracks how many sequence values are still
351 : * unused locally. It is only relevant to the local node and managed
352 : * internally by nextval() when allocating new ranges. Since log_cnt does
353 : * not affect the visible sequence state (like last_value or is_called)
354 : * and is only used for local caching, it need not be copied to the
355 : * subscriber during synchronization.
356 : */
357 18 : SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
358 :
359 18 : if (!run_as_owner)
360 18 : RestoreUserContext(&ucxt);
361 :
362 : /*
363 : * Record the remote sequence's LSN in pg_subscription_rel and mark the
364 : * sequence as READY.
365 : */
366 18 : UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
367 : seqinfo->page_lsn, false);
368 :
369 18 : return COPYSEQ_SUCCESS;
370 : }
371 :
372 : /*
373 : * Copy existing data of sequences from the publisher.
374 : */
375 : static void
376 18 : copy_sequences(WalReceiverConn *conn)
377 : {
378 18 : int cur_batch_base_index = 0;
379 18 : int n_seqinfos = list_length(seqinfos);
380 18 : List *mismatched_seqs_idx = NIL;
381 18 : List *missing_seqs_idx = NIL;
382 18 : List *insuffperm_seqs_idx = NIL;
383 18 : StringInfo seqstr = makeStringInfo();
384 18 : StringInfo cmd = makeStringInfo();
385 : MemoryContext oldctx;
386 :
387 : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
388 :
389 18 : elog(DEBUG1,
390 : "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
391 : MySubscription->name, n_seqinfos);
392 :
393 36 : while (cur_batch_base_index < n_seqinfos)
394 : {
395 18 : Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
396 : BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
397 18 : int batch_size = 0;
398 18 : int batch_succeeded_count = 0;
399 18 : int batch_mismatched_count = 0;
400 18 : int batch_skipped_count = 0;
401 18 : int batch_insuffperm_count = 0;
402 : int batch_missing_count;
403 : Relation sequence_rel;
404 :
405 : WalRcvExecResult *res;
406 : TupleTableSlot *slot;
407 :
408 18 : StartTransactionCommand();
409 :
410 44 : for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
411 : {
412 : char *nspname_literal;
413 : char *seqname_literal;
414 :
415 : LogicalRepSequenceInfo *seqinfo =
416 26 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
417 :
418 26 : if (seqstr->len > 0)
419 8 : appendStringInfoString(seqstr, ", ");
420 :
421 26 : nspname_literal = quote_literal_cstr(seqinfo->nspname);
422 26 : seqname_literal = quote_literal_cstr(seqinfo->seqname);
423 :
424 26 : appendStringInfo(seqstr, "(%s, %s, %d)",
425 : nspname_literal, seqname_literal, idx);
426 :
427 26 : if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
428 0 : break;
429 : }
430 :
431 : /*
432 : * We deliberately avoid acquiring a local lock on the sequence before
433 : * querying the publisher to prevent potential distributed deadlocks
434 : * in bi-directional replication setups.
435 : *
436 : * Example scenario:
437 : *
438 : * - On each node, a background worker acquires a lock on a sequence
439 : * as part of a sync operation.
440 : *
441 : * - Concurrently, a user transaction attempts to alter the same
442 : * sequence, waiting on the background worker's lock.
443 : *
444 : * - Meanwhile, a query from the other node tries to access metadata
445 : * that depends on the completion of the alter operation.
446 : *
447 : * - This creates a circular wait across nodes:
448 : *
449 : * Node-1: Query -> waits on Alter -> waits on Sync Worker
450 : *
451 : * Node-2: Query -> waits on Alter -> waits on Sync Worker
452 : *
453 : * Since each node only sees part of the wait graph, the deadlock may
454 : * go undetected, leading to indefinite blocking.
455 : *
456 : * Note: Each entry in VALUES includes an index 'seqidx' that
457 : * represents the sequence's position in the local 'seqinfos' list.
458 : * This index is propagated to the query results and later used to
459 : * directly map the fetched publisher sequence rows back to their
460 : * corresponding local entries without relying on result order or name
461 : * matching.
462 : */
463 18 : appendStringInfo(cmd,
464 : "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
465 : " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
466 : " seq.seqmax, seq.seqcycle\n"
467 : "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
468 : "JOIN pg_namespace n ON n.nspname = s.schname\n"
469 : "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
470 : "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
471 : "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
472 : seqstr->data);
473 :
474 18 : res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
475 18 : if (res->status != WALRCV_OK_TUPLES)
476 0 : ereport(ERROR,
477 : errcode(ERRCODE_CONNECTION_FAILURE),
478 : errmsg("could not fetch sequence information from the publisher: %s",
479 : res->err));
480 :
481 18 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
482 42 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
483 : {
484 : CopySeqResult sync_status;
485 : LogicalRepSequenceInfo *seqinfo;
486 : int seqidx;
487 :
488 24 : CHECK_FOR_INTERRUPTS();
489 :
490 24 : if (ConfigReloadPending)
491 : {
492 0 : ConfigReloadPending = false;
493 0 : ProcessConfigFile(PGC_SIGHUP);
494 : }
495 :
496 24 : sync_status = get_and_validate_seq_info(slot, &sequence_rel,
497 : &seqinfo, &seqidx);
498 24 : if (sync_status == COPYSEQ_SUCCESS)
499 18 : sync_status = copy_sequence(seqinfo,
500 18 : sequence_rel->rd_rel->relowner);
501 :
502 24 : switch (sync_status)
503 : {
504 18 : case COPYSEQ_SUCCESS:
505 18 : elog(DEBUG1,
506 : "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
507 : MySubscription->name, seqinfo->nspname,
508 : seqinfo->seqname);
509 18 : batch_succeeded_count++;
510 18 : break;
511 6 : case COPYSEQ_MISMATCH:
512 :
513 : /*
514 : * Remember mismatched sequences in a long-lived memory
515 : * context since these will be used after the transaction
516 : * is committed.
517 : */
518 6 : oldctx = MemoryContextSwitchTo(ApplyContext);
519 6 : mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
520 : seqidx);
521 6 : MemoryContextSwitchTo(oldctx);
522 6 : batch_mismatched_count++;
523 6 : break;
524 0 : case COPYSEQ_INSUFFICIENT_PERM:
525 :
526 : /*
527 : * Remember sequences with insufficient privileges in a
528 : * long-lived memory context since these will be used
529 : * after the transaction is committed.
530 : */
531 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
532 0 : insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
533 : seqidx);
534 0 : MemoryContextSwitchTo(oldctx);
535 0 : batch_insuffperm_count++;
536 0 : break;
537 0 : case COPYSEQ_SKIPPED:
538 0 : ereport(LOG,
539 : errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
540 : seqinfo->nspname,
541 : seqinfo->seqname));
542 0 : batch_skipped_count++;
543 0 : break;
544 : }
545 :
546 24 : if (sequence_rel)
547 24 : table_close(sequence_rel, NoLock);
548 : }
549 :
550 18 : ExecDropSingleTupleTableSlot(slot);
551 18 : walrcv_clear_result(res);
552 18 : resetStringInfo(seqstr);
553 18 : resetStringInfo(cmd);
554 :
555 18 : batch_missing_count = batch_size - (batch_succeeded_count +
556 18 : batch_mismatched_count +
557 18 : batch_insuffperm_count +
558 : batch_skipped_count);
559 :
560 18 : elog(DEBUG1,
561 : "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
562 : MySubscription->name,
563 : (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
564 : batch_size, batch_succeeded_count, batch_mismatched_count,
565 : batch_insuffperm_count, batch_missing_count, batch_skipped_count);
566 :
567 : /* Commit this batch, and prepare for next batch */
568 18 : CommitTransactionCommand();
569 :
570 18 : if (batch_missing_count)
571 : {
572 4 : for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
573 : {
574 : LogicalRepSequenceInfo *seqinfo =
575 2 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
576 :
577 : /* If the sequence was not found on publisher, record it */
578 2 : if (!seqinfo->found_on_pub)
579 2 : missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
580 : }
581 : }
582 :
583 : /*
584 : * cur_batch_base_index is not incremented sequentially because some
585 : * sequences may be missing, and the number of fetched rows may not
586 : * match the batch size.
587 : */
588 18 : cur_batch_base_index += batch_size;
589 : }
590 :
591 : /* Report mismatches, permission issues, or missing sequences */
592 18 : report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
593 : missing_seqs_idx);
594 10 : }
595 :
596 : /*
597 : * Identifies sequences that require synchronization and initiates the
598 : * synchronization process.
599 : */
600 : static void
601 18 : LogicalRepSyncSequences(void)
602 : {
603 : char *err;
604 : bool must_use_password;
605 : Relation rel;
606 : HeapTuple tup;
607 : ScanKeyData skey[2];
608 : SysScanDesc scan;
609 18 : Oid subid = MyLogicalRepWorker->subid;
610 : StringInfoData app_name;
611 :
612 18 : StartTransactionCommand();
613 :
614 18 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
615 :
616 18 : ScanKeyInit(&skey[0],
617 : Anum_pg_subscription_rel_srsubid,
618 : BTEqualStrategyNumber, F_OIDEQ,
619 : ObjectIdGetDatum(subid));
620 :
621 18 : ScanKeyInit(&skey[1],
622 : Anum_pg_subscription_rel_srsubstate,
623 : BTEqualStrategyNumber, F_CHAREQ,
624 : CharGetDatum(SUBREL_STATE_INIT));
625 :
626 18 : scan = systable_beginscan(rel, InvalidOid, false,
627 : NULL, 2, skey);
628 46 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
629 : {
630 : Form_pg_subscription_rel subrel;
631 : LogicalRepSequenceInfo *seq;
632 : Relation sequence_rel;
633 : MemoryContext oldctx;
634 :
635 28 : CHECK_FOR_INTERRUPTS();
636 :
637 28 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
638 :
639 28 : sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
640 :
641 : /* Skip if sequence was dropped concurrently */
642 28 : if (!sequence_rel)
643 0 : continue;
644 :
645 : /* Skip if the relation is not a sequence */
646 28 : if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
647 : {
648 2 : table_close(sequence_rel, NoLock);
649 2 : continue;
650 : }
651 :
652 : /*
653 : * Worker needs to process sequences across transaction boundary, so
654 : * allocate them under long-lived context.
655 : */
656 26 : oldctx = MemoryContextSwitchTo(ApplyContext);
657 :
658 26 : seq = palloc0_object(LogicalRepSequenceInfo);
659 26 : seq->localrelid = subrel->srrelid;
660 26 : seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
661 26 : seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
662 26 : seqinfos = lappend(seqinfos, seq);
663 :
664 26 : MemoryContextSwitchTo(oldctx);
665 :
666 26 : table_close(sequence_rel, NoLock);
667 : }
668 :
669 : /* Cleanup */
670 18 : systable_endscan(scan);
671 18 : table_close(rel, AccessShareLock);
672 :
673 18 : CommitTransactionCommand();
674 :
675 : /*
676 : * Exit early if no catalog entries found, likely due to concurrent drops.
677 : */
678 18 : if (!seqinfos)
679 0 : return;
680 :
681 : /* Is the use of a password mandatory? */
682 36 : must_use_password = MySubscription->passwordrequired &&
683 18 : !MySubscription->ownersuperuser;
684 :
685 18 : initStringInfo(&app_name);
686 18 : appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
687 18 : MySubscription->oid, GetSystemIdentifier());
688 :
689 : /*
690 : * Establish the connection to the publisher for sequence synchronization.
691 : */
692 18 : LogRepWorkerWalRcvConn =
693 18 : walrcv_connect(MySubscription->conninfo, true, true,
694 : must_use_password,
695 : app_name.data, &err);
696 18 : if (LogRepWorkerWalRcvConn == NULL)
697 0 : ereport(ERROR,
698 : errcode(ERRCODE_CONNECTION_FAILURE),
699 : errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
700 : MySubscription->name, err));
701 :
702 18 : pfree(app_name.data);
703 :
704 18 : copy_sequences(LogRepWorkerWalRcvConn);
705 : }
706 :
707 : /*
708 : * Execute the initial sync with error handling. Disable the subscription,
709 : * if required.
710 : *
711 : * Note that we don't handle FATAL errors which are probably because of system
712 : * resource error and are not repeatable.
713 : */
714 : static void
715 18 : start_sequence_sync(void)
716 : {
717 : Assert(am_sequencesync_worker());
718 :
719 18 : PG_TRY();
720 : {
721 : /* Call initial sync. */
722 18 : LogicalRepSyncSequences();
723 : }
724 8 : PG_CATCH();
725 : {
726 8 : if (MySubscription->disableonerr)
727 0 : DisableSubscriptionAndExit();
728 : else
729 : {
730 : /*
731 : * Report the worker failed during sequence synchronization. Abort
732 : * the current transaction so that the stats message is sent in an
733 : * idle state.
734 : */
735 8 : AbortOutOfAnyTransaction();
736 8 : pgstat_report_subscription_error(MySubscription->oid,
737 : WORKERTYPE_SEQUENCESYNC);
738 :
739 8 : PG_RE_THROW();
740 : }
741 : }
742 10 : PG_END_TRY();
743 10 : }
744 :
745 : /* Logical Replication sequencesync worker entry point */
746 : void
747 18 : SequenceSyncWorkerMain(Datum main_arg)
748 : {
749 18 : int worker_slot = DatumGetInt32(main_arg);
750 :
751 18 : SetupApplyOrSyncWorker(worker_slot);
752 :
753 18 : start_sequence_sync();
754 :
755 10 : FinishSyncWorker();
756 : }
|