Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * sequencesync.c
3 : * PostgreSQL logical replication: sequence synchronization
4 : *
5 : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/sequencesync.c
9 : *
10 : * NOTES
11 : * This file contains code for sequence synchronization for
12 : * logical replication.
13 : *
14 : * Sequences requiring synchronization are tracked in the pg_subscription_rel
15 : * catalog.
16 : *
17 : * Sequences to be synchronized will be added with state INIT when either of
18 : * the following commands is executed:
19 : * CREATE SUBSCRIPTION
20 : * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
21 : *
22 : * Executing the following command resets all sequences in the subscription to
23 : * state INIT, triggering re-synchronization:
24 : * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
25 : *
26 : * The apply worker periodically scans pg_subscription_rel for sequences in
27 : * INIT state. When such sequences are found, it spawns a sequencesync worker
28 : * to handle synchronization.
29 : *
30 : * A single sequencesync worker is responsible for synchronizing all sequences.
31 : * It begins by retrieving the list of sequences that are flagged for
32 : * synchronization, i.e., those in the INIT state. These sequences are then
33 : * processed in batches, allowing multiple entries to be synchronized within a
34 : * single transaction. The worker fetches the current sequence values and page
35 : * LSNs from the remote publisher, updates the corresponding sequences on the
36 : * local subscriber, and finally marks each sequence as READY upon successful
37 : * synchronization.
38 : *
39 : * Sequence state transitions follow this pattern:
40 : * INIT -> READY
41 : *
42 : * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
43 : * sequences are synchronized per transaction. The locks on the sequence
44 : * relation will be periodically released at each transaction commit.
45 : *
46 : * XXX: We didn't choose launcher process to maintain the launch of sequencesync
47 : * worker as it didn't have database connection to access the sequences from the
48 : * pg_subscription_rel system catalog that need to be synchronized.
49 : *-------------------------------------------------------------------------
50 : */
51 :
52 : #include "postgres.h"
53 :
54 : #include "access/genam.h"
55 : #include "access/table.h"
56 : #include "catalog/pg_sequence.h"
57 : #include "catalog/pg_subscription_rel.h"
58 : #include "commands/sequence.h"
59 : #include "pgstat.h"
60 : #include "postmaster/interrupt.h"
61 : #include "replication/logicalworker.h"
62 : #include "replication/worker_internal.h"
63 : #include "storage/lwlock.h"
64 : #include "utils/acl.h"
65 : #include "utils/builtins.h"
66 : #include "utils/fmgroids.h"
67 : #include "utils/guc.h"
68 : #include "utils/inval.h"
69 : #include "utils/lsyscache.h"
70 : #include "utils/memutils.h"
71 : #include "utils/pg_lsn.h"
72 : #include "utils/syscache.h"
73 : #include "utils/usercontext.h"
74 :
75 : #define REMOTE_SEQ_COL_COUNT 10
76 :
77 : typedef enum CopySeqResult
78 : {
79 : COPYSEQ_SUCCESS,
80 : COPYSEQ_MISMATCH,
81 : COPYSEQ_INSUFFICIENT_PERM,
82 : COPYSEQ_SKIPPED
83 : } CopySeqResult;
84 :
85 : static List *seqinfos = NIL;
86 :
87 : /*
88 : * Apply worker determines if sequence synchronization is needed.
89 : *
90 : * Start a sequencesync worker if one is not already running. The active
91 : * sequencesync worker will handle all pending sequence synchronization. If any
92 : * sequences remain unsynchronized after it exits, a new worker can be started
93 : * in the next iteration.
94 : */
95 : void
96 10768 : ProcessSequencesForSync(void)
97 : {
98 : LogicalRepWorker *sequencesync_worker;
99 : int nsyncworkers;
100 : bool has_pending_sequences;
101 : bool started_tx;
102 :
103 10768 : FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
104 :
105 10768 : if (started_tx)
106 : {
107 194 : CommitTransactionCommand();
108 194 : pgstat_report_stat(true);
109 : }
110 :
111 10768 : if (!has_pending_sequences)
112 10752 : return;
113 :
114 29 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
115 :
116 : /* Check if there is a sequencesync worker already running? */
117 29 : sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
118 29 : MyLogicalRepWorker->subid,
119 : InvalidOid, true);
120 29 : if (sequencesync_worker)
121 : {
122 13 : LWLockRelease(LogicalRepWorkerLock);
123 13 : return;
124 : }
125 :
126 : /*
127 : * Count running sync workers for this subscription, while we have the
128 : * lock.
129 : */
130 16 : nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
131 16 : LWLockRelease(LogicalRepWorkerLock);
132 :
133 : /*
134 : * It is okay to read/update last_seqsync_start_time here in apply worker
135 : * as we have already ensured that sync worker doesn't exist.
136 : */
137 16 : launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
138 16 : &MyLogicalRepWorker->last_seqsync_start_time);
139 : }
140 :
141 : /*
142 : * get_sequences_string
143 : *
144 : * Build a comma-separated string of schema-qualified sequence names
145 : * for the given list of sequence indexes.
146 : */
147 : static void
148 4 : get_sequences_string(List *seqindexes, StringInfo buf)
149 : {
150 4 : resetStringInfo(buf);
151 12 : foreach_int(seqidx, seqindexes)
152 : {
153 : LogicalRepSequenceInfo *seqinfo =
154 4 : (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
155 :
156 4 : if (buf->len > 0)
157 0 : appendStringInfoString(buf, ", ");
158 :
159 4 : appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
160 : }
161 4 : }
162 :
163 : /*
164 : * report_sequence_errors
165 : *
166 : * Report discrepancies found during sequence synchronization between
167 : * the publisher and subscriber. Emits warnings for:
168 : * a) mismatched definitions or concurrent rename
169 : * b) insufficient privileges
170 : * c) missing sequences on the subscriber
171 : * Then raises an ERROR to indicate synchronization failure.
172 : */
173 : static void
174 9 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
175 : List *missing_seqs_idx)
176 : {
177 : StringInfo seqstr;
178 :
179 : /* Quick exit if there are no errors to report */
180 9 : if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
181 5 : return;
182 :
183 4 : seqstr = makeStringInfo();
184 :
185 4 : if (mismatched_seqs_idx)
186 : {
187 3 : get_sequences_string(mismatched_seqs_idx, seqstr);
188 3 : ereport(WARNING,
189 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190 : errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
191 : "mismatched or renamed sequences on subscriber (%s)",
192 : list_length(mismatched_seqs_idx),
193 : seqstr->data));
194 : }
195 :
196 4 : if (insuffperm_seqs_idx)
197 : {
198 0 : get_sequences_string(insuffperm_seqs_idx, seqstr);
199 0 : ereport(WARNING,
200 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
201 : errmsg_plural("insufficient privileges on sequence (%s)",
202 : "insufficient privileges on sequences (%s)",
203 : list_length(insuffperm_seqs_idx),
204 : seqstr->data));
205 : }
206 :
207 4 : if (missing_seqs_idx)
208 : {
209 1 : get_sequences_string(missing_seqs_idx, seqstr);
210 1 : ereport(WARNING,
211 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
212 : errmsg_plural("missing sequence on publisher (%s)",
213 : "missing sequences on publisher (%s)",
214 : list_length(missing_seqs_idx),
215 : seqstr->data));
216 : }
217 :
218 4 : ereport(ERROR,
219 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
220 : errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
221 : MySubscription->name));
222 : }
223 :
224 : /*
225 : * get_and_validate_seq_info
226 : *
227 : * Extracts remote sequence information from the tuple slot received from the
228 : * publisher, and validates it against the corresponding local sequence
229 : * definition.
230 : */
231 : static CopySeqResult
232 12 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
233 : LogicalRepSequenceInfo **seqinfo, int *seqidx)
234 : {
235 : bool isnull;
236 12 : int col = 0;
237 : Datum datum;
238 : Oid remote_typid;
239 : int64 remote_start;
240 : int64 remote_increment;
241 : int64 remote_min;
242 : int64 remote_max;
243 : bool remote_cycle;
244 12 : CopySeqResult result = COPYSEQ_SUCCESS;
245 : HeapTuple tup;
246 : Form_pg_sequence local_seq;
247 : LogicalRepSequenceInfo *seqinfo_local;
248 :
249 12 : *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
250 : Assert(!isnull);
251 :
252 : /* Identify the corresponding local sequence for the given index. */
253 12 : *seqinfo = seqinfo_local =
254 12 : (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
255 :
256 : /*
257 : * last_value can be NULL if the sequence was dropped concurrently (see
258 : * pg_get_sequence_data()).
259 : */
260 12 : datum = slot_getattr(slot, ++col, &isnull);
261 12 : if (isnull)
262 0 : return COPYSEQ_SKIPPED;
263 12 : seqinfo_local->last_value = DatumGetInt64(datum);
264 :
265 12 : seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
266 : Assert(!isnull);
267 :
268 12 : seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
269 : Assert(!isnull);
270 :
271 12 : remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
272 : Assert(!isnull);
273 :
274 12 : remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
275 : Assert(!isnull);
276 :
277 12 : remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
278 : Assert(!isnull);
279 :
280 12 : remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
281 : Assert(!isnull);
282 :
283 12 : remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
284 : Assert(!isnull);
285 :
286 12 : remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
287 : Assert(!isnull);
288 :
289 : /* Sanity check */
290 : Assert(col == REMOTE_SEQ_COL_COUNT);
291 :
292 12 : seqinfo_local->found_on_pub = true;
293 :
294 12 : *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
295 :
296 : /* Sequence was concurrently dropped? */
297 12 : if (!*sequence_rel)
298 0 : return COPYSEQ_SKIPPED;
299 :
300 12 : tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
301 :
302 : /* Sequence was concurrently dropped? */
303 12 : if (!HeapTupleIsValid(tup))
304 0 : elog(ERROR, "cache lookup failed for sequence %u",
305 : seqinfo_local->localrelid);
306 :
307 12 : local_seq = (Form_pg_sequence) GETSTRUCT(tup);
308 :
309 : /* Sequence parameters for remote/local are the same? */
310 12 : if (local_seq->seqtypid != remote_typid ||
311 12 : local_seq->seqstart != remote_start ||
312 11 : local_seq->seqincrement != remote_increment ||
313 9 : local_seq->seqmin != remote_min ||
314 9 : local_seq->seqmax != remote_max ||
315 9 : local_seq->seqcycle != remote_cycle)
316 3 : result = COPYSEQ_MISMATCH;
317 :
318 : /* Sequence was concurrently renamed? */
319 12 : if (strcmp(seqinfo_local->nspname,
320 12 : get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
321 12 : strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
322 0 : result = COPYSEQ_MISMATCH;
323 :
324 12 : ReleaseSysCache(tup);
325 12 : return result;
326 : }
327 :
328 : /*
329 : * Apply remote sequence state to local sequence and mark it as
330 : * synchronized (READY).
331 : */
332 : static CopySeqResult
333 9 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
334 : {
335 : UserContext ucxt;
336 : AclResult aclresult;
337 9 : bool run_as_owner = MySubscription->runasowner;
338 9 : Oid seqoid = seqinfo->localrelid;
339 :
340 : /*
341 : * If the user did not opt to run as the owner of the subscription
342 : * ('run_as_owner'), then copy the sequence as the owner of the sequence.
343 : */
344 9 : if (!run_as_owner)
345 9 : SwitchToUntrustedUser(seqowner, &ucxt);
346 :
347 9 : aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
348 :
349 9 : if (aclresult != ACLCHECK_OK)
350 : {
351 0 : if (!run_as_owner)
352 0 : RestoreUserContext(&ucxt);
353 :
354 0 : return COPYSEQ_INSUFFICIENT_PERM;
355 : }
356 :
357 : /*
358 : * The log counter (log_cnt) tracks how many sequence values are still
359 : * unused locally. It is only relevant to the local node and managed
360 : * internally by nextval() when allocating new ranges. Since log_cnt does
361 : * not affect the visible sequence state (like last_value or is_called)
362 : * and is only used for local caching, it need not be copied to the
363 : * subscriber during synchronization.
364 : */
365 9 : SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
366 :
367 9 : if (!run_as_owner)
368 9 : RestoreUserContext(&ucxt);
369 :
370 : /*
371 : * Record the remote sequence's LSN in pg_subscription_rel and mark the
372 : * sequence as READY.
373 : */
374 9 : UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
375 : seqinfo->page_lsn, false);
376 :
377 9 : return COPYSEQ_SUCCESS;
378 : }
379 :
380 : /*
381 : * Copy existing data of sequences from the publisher.
382 : */
383 : static void
384 9 : copy_sequences(WalReceiverConn *conn)
385 : {
386 9 : int cur_batch_base_index = 0;
387 9 : int n_seqinfos = list_length(seqinfos);
388 9 : List *mismatched_seqs_idx = NIL;
389 9 : List *missing_seqs_idx = NIL;
390 9 : List *insuffperm_seqs_idx = NIL;
391 9 : StringInfo seqstr = makeStringInfo();
392 9 : StringInfo cmd = makeStringInfo();
393 : MemoryContext oldctx;
394 :
395 : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
396 :
397 9 : elog(DEBUG1,
398 : "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
399 : MySubscription->name, n_seqinfos);
400 :
401 18 : while (cur_batch_base_index < n_seqinfos)
402 : {
403 9 : Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
404 : BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
405 9 : int batch_size = 0;
406 9 : int batch_succeeded_count = 0;
407 9 : int batch_mismatched_count = 0;
408 9 : int batch_skipped_count = 0;
409 9 : int batch_insuffperm_count = 0;
410 : int batch_missing_count;
411 9 : Relation sequence_rel = NULL;
412 :
413 : WalRcvExecResult *res;
414 : TupleTableSlot *slot;
415 :
416 9 : StartTransactionCommand();
417 :
418 22 : for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
419 : {
420 : char *nspname_literal;
421 : char *seqname_literal;
422 :
423 : LogicalRepSequenceInfo *seqinfo =
424 13 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
425 :
426 13 : if (seqstr->len > 0)
427 4 : appendStringInfoString(seqstr, ", ");
428 :
429 13 : nspname_literal = quote_literal_cstr(seqinfo->nspname);
430 13 : seqname_literal = quote_literal_cstr(seqinfo->seqname);
431 :
432 13 : appendStringInfo(seqstr, "(%s, %s, %d)",
433 : nspname_literal, seqname_literal, idx);
434 :
435 13 : if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
436 0 : break;
437 : }
438 :
439 : /*
440 : * We deliberately avoid acquiring a local lock on the sequence before
441 : * querying the publisher to prevent potential distributed deadlocks
442 : * in bi-directional replication setups.
443 : *
444 : * Example scenario:
445 : *
446 : * - On each node, a background worker acquires a lock on a sequence
447 : * as part of a sync operation.
448 : *
449 : * - Concurrently, a user transaction attempts to alter the same
450 : * sequence, waiting on the background worker's lock.
451 : *
452 : * - Meanwhile, a query from the other node tries to access metadata
453 : * that depends on the completion of the alter operation.
454 : *
455 : * - This creates a circular wait across nodes:
456 : *
457 : * Node-1: Query -> waits on Alter -> waits on Sync Worker
458 : *
459 : * Node-2: Query -> waits on Alter -> waits on Sync Worker
460 : *
461 : * Since each node only sees part of the wait graph, the deadlock may
462 : * go undetected, leading to indefinite blocking.
463 : *
464 : * Note: Each entry in VALUES includes an index 'seqidx' that
465 : * represents the sequence's position in the local 'seqinfos' list.
466 : * This index is propagated to the query results and later used to
467 : * directly map the fetched publisher sequence rows back to their
468 : * corresponding local entries without relying on result order or name
469 : * matching.
470 : */
471 9 : appendStringInfo(cmd,
472 : "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
473 : " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
474 : " seq.seqmax, seq.seqcycle\n"
475 : "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
476 : "JOIN pg_namespace n ON n.nspname = s.schname\n"
477 : "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
478 : "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
479 : "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
480 : seqstr->data);
481 :
482 9 : res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
483 9 : if (res->status != WALRCV_OK_TUPLES)
484 0 : ereport(ERROR,
485 : errcode(ERRCODE_CONNECTION_FAILURE),
486 : errmsg("could not fetch sequence information from the publisher: %s",
487 : res->err));
488 :
489 9 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
490 21 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
491 : {
492 : CopySeqResult sync_status;
493 : LogicalRepSequenceInfo *seqinfo;
494 : int seqidx;
495 :
496 12 : CHECK_FOR_INTERRUPTS();
497 :
498 12 : if (ConfigReloadPending)
499 : {
500 0 : ConfigReloadPending = false;
501 0 : ProcessConfigFile(PGC_SIGHUP);
502 : }
503 :
504 12 : sync_status = get_and_validate_seq_info(slot, &sequence_rel,
505 : &seqinfo, &seqidx);
506 12 : if (sync_status == COPYSEQ_SUCCESS)
507 9 : sync_status = copy_sequence(seqinfo,
508 9 : sequence_rel->rd_rel->relowner);
509 :
510 12 : switch (sync_status)
511 : {
512 9 : case COPYSEQ_SUCCESS:
513 9 : elog(DEBUG1,
514 : "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
515 : MySubscription->name, seqinfo->nspname,
516 : seqinfo->seqname);
517 9 : batch_succeeded_count++;
518 9 : break;
519 3 : case COPYSEQ_MISMATCH:
520 :
521 : /*
522 : * Remember mismatched sequences in a long-lived memory
523 : * context since these will be used after the transaction
524 : * is committed.
525 : */
526 3 : oldctx = MemoryContextSwitchTo(ApplyContext);
527 3 : mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
528 : seqidx);
529 3 : MemoryContextSwitchTo(oldctx);
530 3 : batch_mismatched_count++;
531 3 : break;
532 0 : case COPYSEQ_INSUFFICIENT_PERM:
533 :
534 : /*
535 : * Remember sequences with insufficient privileges in a
536 : * long-lived memory context since these will be used
537 : * after the transaction is committed.
538 : */
539 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
540 0 : insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
541 : seqidx);
542 0 : MemoryContextSwitchTo(oldctx);
543 0 : batch_insuffperm_count++;
544 0 : break;
545 0 : case COPYSEQ_SKIPPED:
546 :
547 : /*
548 : * Concurrent removal of a sequence on the subscriber is
549 : * treated as success, since the only viable action is to
550 : * skip the corresponding sequence data. Missing sequences
551 : * on the publisher are treated as ERROR.
552 : */
553 0 : if (seqinfo->found_on_pub)
554 : {
555 0 : ereport(LOG,
556 : errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
557 : seqinfo->nspname,
558 : seqinfo->seqname));
559 0 : batch_skipped_count++;
560 : }
561 0 : break;
562 : }
563 :
564 12 : if (sequence_rel)
565 12 : table_close(sequence_rel, NoLock);
566 : }
567 :
568 9 : ExecDropSingleTupleTableSlot(slot);
569 9 : walrcv_clear_result(res);
570 9 : resetStringInfo(seqstr);
571 9 : resetStringInfo(cmd);
572 :
573 9 : batch_missing_count = batch_size - (batch_succeeded_count +
574 9 : batch_mismatched_count +
575 9 : batch_insuffperm_count +
576 : batch_skipped_count);
577 :
578 9 : elog(DEBUG1,
579 : "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
580 : MySubscription->name,
581 : (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
582 : batch_size, batch_succeeded_count, batch_mismatched_count,
583 : batch_insuffperm_count, batch_missing_count, batch_skipped_count);
584 :
585 : /* Commit this batch, and prepare for next batch */
586 9 : CommitTransactionCommand();
587 :
588 9 : if (batch_missing_count)
589 : {
590 2 : for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
591 : {
592 : LogicalRepSequenceInfo *seqinfo =
593 1 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
594 :
595 : /* If the sequence was not found on publisher, record it */
596 1 : if (!seqinfo->found_on_pub)
597 1 : missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
598 : }
599 : }
600 :
601 : /*
602 : * cur_batch_base_index is not incremented sequentially because some
603 : * sequences may be missing, and the number of fetched rows may not
604 : * match the batch size.
605 : */
606 9 : cur_batch_base_index += batch_size;
607 : }
608 :
609 : /* Report mismatches, permission issues, or missing sequences */
610 9 : report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
611 : missing_seqs_idx);
612 5 : }
613 :
614 : /*
615 : * Identifies sequences that require synchronization and initiates the
616 : * synchronization process.
617 : */
618 : static void
619 9 : LogicalRepSyncSequences(void)
620 : {
621 : char *err;
622 : bool must_use_password;
623 : Relation rel;
624 : HeapTuple tup;
625 : ScanKeyData skey[2];
626 : SysScanDesc scan;
627 9 : Oid subid = MyLogicalRepWorker->subid;
628 : StringInfoData app_name;
629 :
630 9 : StartTransactionCommand();
631 :
632 9 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
633 :
634 9 : ScanKeyInit(&skey[0],
635 : Anum_pg_subscription_rel_srsubid,
636 : BTEqualStrategyNumber, F_OIDEQ,
637 : ObjectIdGetDatum(subid));
638 :
639 9 : ScanKeyInit(&skey[1],
640 : Anum_pg_subscription_rel_srsubstate,
641 : BTEqualStrategyNumber, F_CHAREQ,
642 : CharGetDatum(SUBREL_STATE_INIT));
643 :
644 9 : scan = systable_beginscan(rel, InvalidOid, false,
645 : NULL, 2, skey);
646 23 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
647 : {
648 : Form_pg_subscription_rel subrel;
649 : LogicalRepSequenceInfo *seq;
650 : Relation sequence_rel;
651 : MemoryContext oldctx;
652 :
653 14 : CHECK_FOR_INTERRUPTS();
654 :
655 14 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
656 :
657 14 : sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
658 :
659 : /* Skip if sequence was dropped concurrently */
660 14 : if (!sequence_rel)
661 0 : continue;
662 :
663 : /* Skip if the relation is not a sequence */
664 14 : if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
665 : {
666 1 : table_close(sequence_rel, NoLock);
667 1 : continue;
668 : }
669 :
670 : /*
671 : * Worker needs to process sequences across transaction boundary, so
672 : * allocate them under long-lived context.
673 : */
674 13 : oldctx = MemoryContextSwitchTo(ApplyContext);
675 :
676 13 : seq = palloc0_object(LogicalRepSequenceInfo);
677 13 : seq->localrelid = subrel->srrelid;
678 13 : seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
679 13 : seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
680 13 : seqinfos = lappend(seqinfos, seq);
681 :
682 13 : MemoryContextSwitchTo(oldctx);
683 :
684 13 : table_close(sequence_rel, NoLock);
685 : }
686 :
687 : /* Cleanup */
688 9 : systable_endscan(scan);
689 9 : table_close(rel, AccessShareLock);
690 :
691 9 : CommitTransactionCommand();
692 :
693 : /*
694 : * Exit early if no catalog entries found, likely due to concurrent drops.
695 : */
696 9 : if (!seqinfos)
697 0 : return;
698 :
699 : /* Is the use of a password mandatory? */
700 18 : must_use_password = MySubscription->passwordrequired &&
701 9 : !MySubscription->ownersuperuser;
702 :
703 9 : initStringInfo(&app_name);
704 9 : appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
705 9 : MySubscription->oid, GetSystemIdentifier());
706 :
707 : /*
708 : * Establish the connection to the publisher for sequence synchronization.
709 : */
710 9 : LogRepWorkerWalRcvConn =
711 9 : walrcv_connect(MySubscription->conninfo, true, true,
712 : must_use_password,
713 : app_name.data, &err);
714 9 : if (LogRepWorkerWalRcvConn == NULL)
715 0 : ereport(ERROR,
716 : errcode(ERRCODE_CONNECTION_FAILURE),
717 : errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
718 : MySubscription->name, err));
719 :
720 9 : pfree(app_name.data);
721 :
722 9 : copy_sequences(LogRepWorkerWalRcvConn);
723 : }
724 :
725 : /*
726 : * Execute the initial sync with error handling. Disable the subscription,
727 : * if required.
728 : *
729 : * Note that we don't handle FATAL errors which are probably because of system
730 : * resource error and are not repeatable.
731 : */
732 : static void
733 9 : start_sequence_sync(void)
734 : {
735 : Assert(am_sequencesync_worker());
736 :
737 9 : PG_TRY();
738 : {
739 : /* Call initial sync. */
740 9 : LogicalRepSyncSequences();
741 : }
742 4 : PG_CATCH();
743 : {
744 4 : if (MySubscription->disableonerr)
745 0 : DisableSubscriptionAndExit();
746 : else
747 : {
748 : /*
749 : * Report the worker failed during sequence synchronization. Abort
750 : * the current transaction so that the stats message is sent in an
751 : * idle state.
752 : */
753 4 : AbortOutOfAnyTransaction();
754 4 : pgstat_report_subscription_error(MySubscription->oid);
755 :
756 4 : PG_RE_THROW();
757 : }
758 : }
759 5 : PG_END_TRY();
760 5 : }
761 :
762 : /* Logical Replication sequencesync worker entry point */
763 : void
764 9 : SequenceSyncWorkerMain(Datum main_arg)
765 : {
766 9 : int worker_slot = DatumGetInt32(main_arg);
767 :
768 9 : SetupApplyOrSyncWorker(worker_slot);
769 :
770 9 : start_sequence_sync();
771 :
772 5 : FinishSyncWorker();
773 : }
|