Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * sequencesync.c
3 : * PostgreSQL logical replication: sequence synchronization
4 : *
5 : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/sequencesync.c
9 : *
10 : * NOTES
11 : * This file contains code for sequence synchronization for
12 : * logical replication.
13 : *
14 : * Sequences requiring synchronization are tracked in the pg_subscription_rel
15 : * catalog.
16 : *
17 : * Sequences to be synchronized will be added with state INIT when either of
18 : * the following commands is executed:
19 : * CREATE SUBSCRIPTION
20 : * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
21 : *
22 : * Executing the following command resets all sequences in the subscription to
23 : * state INIT, triggering re-synchronization:
24 : * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
25 : *
26 : * The apply worker periodically scans pg_subscription_rel for sequences in
27 : * INIT state. When such sequences are found, it spawns a sequencesync worker
28 : * to handle synchronization.
29 : *
30 : * A single sequencesync worker is responsible for synchronizing all sequences.
31 : * It begins by retrieving the list of sequences that are flagged for
32 : * synchronization, i.e., those in the INIT state. These sequences are then
33 : * processed in batches, allowing multiple entries to be synchronized within a
34 : * single transaction. The worker fetches the current sequence values and page
35 : * LSNs from the remote publisher, updates the corresponding sequences on the
36 : * local subscriber, and finally marks each sequence as READY upon successful
37 : * synchronization.
38 : *
39 : * Sequence state transitions follow this pattern:
40 : * INIT -> READY
41 : *
42 : * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
43 : * sequences are synchronized per transaction. The locks on the sequence
44 : * relation will be periodically released at each transaction commit.
45 : *
46 : * XXX: We didn't choose launcher process to maintain the launch of sequencesync
47 : * worker as it didn't have database connection to access the sequences from the
48 : * pg_subscription_rel system catalog that need to be synchronized.
49 : *-------------------------------------------------------------------------
50 : */
51 :
52 : #include "postgres.h"
53 :
54 : #include "access/genam.h"
55 : #include "access/table.h"
56 : #include "catalog/pg_sequence.h"
57 : #include "catalog/pg_subscription_rel.h"
58 : #include "commands/sequence.h"
59 : #include "pgstat.h"
60 : #include "postmaster/interrupt.h"
61 : #include "replication/logicalworker.h"
62 : #include "replication/worker_internal.h"
63 : #include "storage/lwlock.h"
64 : #include "utils/acl.h"
65 : #include "utils/builtins.h"
66 : #include "utils/fmgroids.h"
67 : #include "utils/guc.h"
68 : #include "utils/inval.h"
69 : #include "utils/lsyscache.h"
70 : #include "utils/memutils.h"
71 : #include "utils/pg_lsn.h"
72 : #include "utils/syscache.h"
73 : #include "utils/usercontext.h"
74 :
75 : #define REMOTE_SEQ_COL_COUNT 11
76 :
77 : typedef enum CopySeqResult
78 : {
79 : COPYSEQ_SUCCESS,
80 : COPYSEQ_MISMATCH,
81 : COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM,
82 : COPYSEQ_PUBLISHER_INSUFFICIENT_PERM,
83 : COPYSEQ_SKIPPED
84 : } CopySeqResult;
85 :
86 : static List *seqinfos = NIL;
87 :
88 : /*
89 : * Apply worker determines if sequence synchronization is needed.
90 : *
91 : * Start a sequencesync worker if one is not already running. The active
92 : * sequencesync worker will handle all pending sequence synchronization. If any
93 : * sequences remain unsynchronized after it exits, a new worker can be started
94 : * in the next iteration.
95 : */
96 : void
97 10449 : ProcessSequencesForSync(void)
98 : {
99 : LogicalRepWorker *sequencesync_worker;
100 : int nsyncworkers;
101 : bool has_pending_sequences;
102 : bool started_tx;
103 :
104 10449 : FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
105 :
106 10448 : if (started_tx)
107 : {
108 191 : CommitTransactionCommand();
109 191 : pgstat_report_stat(true);
110 : }
111 :
112 10448 : if (!has_pending_sequences)
113 10419 : return;
114 :
115 43 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
116 :
117 : /* Check if there is a sequencesync worker already running? */
118 43 : sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
119 43 : MyLogicalRepWorker->subid,
120 : InvalidOid, true);
121 43 : if (sequencesync_worker)
122 : {
123 14 : LWLockRelease(LogicalRepWorkerLock);
124 14 : return;
125 : }
126 :
127 : /*
128 : * Count running sync workers for this subscription, while we have the
129 : * lock.
130 : */
131 29 : nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
132 29 : LWLockRelease(LogicalRepWorkerLock);
133 :
134 : /*
135 : * It is okay to read/update last_seqsync_start_time here in apply worker
136 : * as we have already ensured that sync worker doesn't exist.
137 : */
138 29 : launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
139 29 : &MyLogicalRepWorker->last_seqsync_start_time);
140 : }
141 :
142 : /*
143 : * get_sequences_string
144 : *
145 : * Build a comma-separated string of schema-qualified sequence names
146 : * for the given list of sequence indexes.
147 : */
148 : static void
149 6 : get_sequences_string(List *seqindexes, StringInfo buf)
150 : {
151 6 : resetStringInfo(buf);
152 18 : foreach_int(seqidx, seqindexes)
153 : {
154 : LogicalRepSequenceInfo *seqinfo =
155 6 : (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
156 :
157 6 : if (buf->len > 0)
158 0 : appendStringInfoString(buf, ", ");
159 :
160 6 : appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
161 : }
162 6 : }
163 :
164 : /*
165 : * report_sequence_errors
166 : *
167 : * Report discrepancies found during sequence synchronization between
168 : * the publisher and subscriber. Emits warnings for:
169 : * a) mismatched definitions or concurrent rename
170 : * b) insufficient privileges on the subscriber
171 : * c) insufficient privileges on the publisher
172 : * d) missing sequences on the publisher
173 : * Then raises an ERROR to indicate synchronization failure.
174 : */
175 : static void
176 12 : report_sequence_errors(List *mismatched_seqs_idx,
177 : List *sub_insuffperm_seqs_idx,
178 : List *pub_insuffperm_seqs_idx,
179 : List *missing_seqs_idx)
180 : {
181 : StringInfoData seqstr;
182 :
183 : /* Quick exit if there are no errors to report */
184 12 : if (!mismatched_seqs_idx && !sub_insuffperm_seqs_idx &&
185 8 : !pub_insuffperm_seqs_idx && !missing_seqs_idx)
186 6 : return;
187 :
188 6 : initStringInfo(&seqstr);
189 :
190 6 : if (mismatched_seqs_idx)
191 : {
192 3 : get_sequences_string(mismatched_seqs_idx, &seqstr);
193 3 : ereport(WARNING,
194 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 : errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
196 : "mismatched or renamed sequences on subscriber (%s)",
197 : list_length(mismatched_seqs_idx),
198 : seqstr.data));
199 : }
200 :
201 6 : if (sub_insuffperm_seqs_idx)
202 : {
203 0 : get_sequences_string(sub_insuffperm_seqs_idx, &seqstr);
204 0 : ereport(WARNING,
205 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
206 : errmsg_plural("insufficient privileges on subscriber sequence (%s)",
207 : "insufficient privileges on subscriber sequences (%s)",
208 : list_length(sub_insuffperm_seqs_idx),
209 : seqstr.data));
210 : }
211 :
212 6 : if (pub_insuffperm_seqs_idx)
213 : {
214 1 : get_sequences_string(pub_insuffperm_seqs_idx, &seqstr);
215 1 : ereport(WARNING,
216 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
217 : errmsg_plural("insufficient privileges on publisher sequence (%s)",
218 : "insufficient privileges on publisher sequences (%s)",
219 : list_length(pub_insuffperm_seqs_idx),
220 : seqstr.data));
221 : }
222 :
223 6 : if (missing_seqs_idx)
224 : {
225 2 : get_sequences_string(missing_seqs_idx, &seqstr);
226 2 : ereport(WARNING,
227 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
228 : errmsg_plural("missing sequence on publisher (%s)",
229 : "missing sequences on publisher (%s)",
230 : list_length(missing_seqs_idx),
231 : seqstr.data));
232 : }
233 :
234 6 : ereport(ERROR,
235 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
236 : errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
237 : MySubscription->name));
238 : }
239 :
240 : /*
241 : * get_and_validate_seq_info
242 : *
243 : * Extracts remote sequence information from the tuple slot received from the
244 : * publisher, and validates it against the corresponding local sequence
245 : * definition.
246 : */
247 : static CopySeqResult
248 22 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
249 : LogicalRepSequenceInfo **seqinfo, int *seqidx)
250 : {
251 : bool isnull;
252 22 : int col = 0;
253 : Datum datum;
254 : bool remote_has_select_priv;
255 : Oid remote_typid;
256 : int64 remote_start;
257 : int64 remote_increment;
258 : int64 remote_min;
259 : int64 remote_max;
260 : bool remote_cycle;
261 22 : CopySeqResult result = COPYSEQ_SUCCESS;
262 : HeapTuple tup;
263 : Form_pg_sequence local_seq;
264 : LogicalRepSequenceInfo *seqinfo_local;
265 :
266 22 : *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
267 : Assert(!isnull);
268 :
269 : /* Identify the corresponding local sequence for the given index. */
270 22 : *seqinfo = seqinfo_local =
271 22 : (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
272 :
273 : /*
274 : * The remote sequence state can be NULL if the publisher lacks the
275 : * required privileges or if the sequence was dropped concurrently after
276 : * it was identified in the catalog snapshot (see pg_get_sequence_data()).
277 : */
278 22 : remote_has_select_priv = DatumGetBool(slot_getattr(slot, ++col, &isnull));
279 : Assert(!isnull);
280 :
281 22 : datum = slot_getattr(slot, ++col, &isnull);
282 22 : if (isnull)
283 1 : return remote_has_select_priv ? COPYSEQ_SKIPPED :
284 : COPYSEQ_PUBLISHER_INSUFFICIENT_PERM;
285 :
286 21 : seqinfo_local->last_value = DatumGetInt64(datum);
287 :
288 21 : seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
289 : Assert(!isnull);
290 :
291 21 : seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
292 : Assert(!isnull);
293 :
294 21 : remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
295 : Assert(!isnull);
296 :
297 21 : remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
298 : Assert(!isnull);
299 :
300 21 : remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
301 : Assert(!isnull);
302 :
303 21 : remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
304 : Assert(!isnull);
305 :
306 21 : remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
307 : Assert(!isnull);
308 :
309 21 : remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
310 : Assert(!isnull);
311 :
312 : /* Sanity check */
313 : Assert(col == REMOTE_SEQ_COL_COUNT);
314 :
315 21 : seqinfo_local->found_on_pub = true;
316 :
317 21 : *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
318 :
319 : /* Sequence was concurrently dropped? */
320 21 : if (!*sequence_rel)
321 0 : return COPYSEQ_SKIPPED;
322 :
323 21 : tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
324 :
325 : /* Sequence was concurrently dropped? */
326 21 : if (!HeapTupleIsValid(tup))
327 0 : elog(ERROR, "cache lookup failed for sequence %u",
328 : seqinfo_local->localrelid);
329 :
330 21 : local_seq = (Form_pg_sequence) GETSTRUCT(tup);
331 :
332 : /* Sequence parameters for remote/local are the same? */
333 21 : if (local_seq->seqtypid != remote_typid ||
334 21 : local_seq->seqstart != remote_start ||
335 20 : local_seq->seqincrement != remote_increment ||
336 18 : local_seq->seqmin != remote_min ||
337 18 : local_seq->seqmax != remote_max ||
338 18 : local_seq->seqcycle != remote_cycle)
339 3 : result = COPYSEQ_MISMATCH;
340 :
341 : /* Sequence was concurrently renamed? */
342 21 : if (strcmp(seqinfo_local->nspname,
343 21 : get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
344 21 : strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
345 0 : result = COPYSEQ_MISMATCH;
346 :
347 21 : ReleaseSysCache(tup);
348 21 : return result;
349 : }
350 :
351 : /*
352 : * Apply remote sequence state to local sequence and mark it as
353 : * synchronized (READY).
354 : */
355 : static CopySeqResult
356 18 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
357 : {
358 : UserContext ucxt;
359 : AclResult aclresult;
360 18 : bool run_as_owner = MySubscription->runasowner;
361 18 : Oid seqoid = seqinfo->localrelid;
362 :
363 : /*
364 : * If the user did not opt to run as the owner of the subscription
365 : * ('run_as_owner'), then copy the sequence as the owner of the sequence.
366 : */
367 18 : if (!run_as_owner)
368 18 : SwitchToUntrustedUser(seqowner, &ucxt);
369 :
370 18 : aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
371 :
372 18 : if (aclresult != ACLCHECK_OK)
373 : {
374 0 : if (!run_as_owner)
375 0 : RestoreUserContext(&ucxt);
376 :
377 0 : return COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM;
378 : }
379 :
380 : /*
381 : * The log counter (log_cnt) tracks how many sequence values are still
382 : * unused locally. It is only relevant to the local node and managed
383 : * internally by nextval() when allocating new ranges. Since log_cnt does
384 : * not affect the visible sequence state (like last_value or is_called)
385 : * and is only used for local caching, it need not be copied to the
386 : * subscriber during synchronization.
387 : */
388 18 : SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
389 :
390 18 : if (!run_as_owner)
391 18 : RestoreUserContext(&ucxt);
392 :
393 : /*
394 : * Record the remote sequence's LSN in pg_subscription_rel and mark the
395 : * sequence as READY.
396 : */
397 18 : UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
398 : seqinfo->page_lsn, false);
399 :
400 18 : return COPYSEQ_SUCCESS;
401 : }
402 :
403 : /*
404 : * Copy existing data of sequences from the publisher.
405 : */
406 : static void
407 12 : copy_sequences(WalReceiverConn *conn)
408 : {
409 12 : int cur_batch_base_index = 0;
410 12 : int n_seqinfos = list_length(seqinfos);
411 12 : List *mismatched_seqs_idx = NIL;
412 12 : List *missing_seqs_idx = NIL;
413 12 : List *sub_insuffperm_seqs_idx = NIL;
414 12 : List *pub_insuffperm_seqs_idx = NIL;
415 : StringInfoData seqstr;
416 : StringInfoData cmd;
417 : MemoryContext oldctx;
418 :
419 12 : initStringInfo(&seqstr);
420 12 : initStringInfo(&cmd);
421 :
422 : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
423 :
424 12 : elog(DEBUG1,
425 : "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
426 : MySubscription->name, n_seqinfos);
427 :
428 24 : while (cur_batch_base_index < n_seqinfos)
429 : {
430 12 : Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, BOOLOID, INT8OID,
431 : BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
432 12 : int batch_size = 0;
433 12 : int batch_succeeded_count = 0;
434 12 : int batch_mismatched_count = 0;
435 12 : int batch_skipped_count = 0;
436 12 : int batch_sub_insuffperm_count = 0;
437 12 : int batch_pub_insuffperm_count = 0;
438 : int batch_missing_count;
439 :
440 : WalRcvExecResult *res;
441 : TupleTableSlot *slot;
442 :
443 12 : StartTransactionCommand();
444 :
445 36 : for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
446 : {
447 : char *nspname_literal;
448 : char *seqname_literal;
449 :
450 : LogicalRepSequenceInfo *seqinfo =
451 24 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
452 :
453 24 : if (seqstr.len > 0)
454 12 : appendStringInfoString(&seqstr, ", ");
455 :
456 24 : nspname_literal = quote_literal_cstr(seqinfo->nspname);
457 24 : seqname_literal = quote_literal_cstr(seqinfo->seqname);
458 :
459 24 : appendStringInfo(&seqstr, "(%s, %s, %d)",
460 : nspname_literal, seqname_literal, idx);
461 :
462 24 : if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
463 0 : break;
464 : }
465 :
466 : /*
467 : * We deliberately avoid acquiring a local lock on the sequence before
468 : * querying the publisher to prevent potential distributed deadlocks
469 : * in bi-directional replication setups.
470 : *
471 : * Example scenario:
472 : *
473 : * - On each node, a background worker acquires a lock on a sequence
474 : * as part of a sync operation.
475 : *
476 : * - Concurrently, a user transaction attempts to alter the same
477 : * sequence, waiting on the background worker's lock.
478 : *
479 : * - Meanwhile, a query from the other node tries to access metadata
480 : * that depends on the completion of the alter operation.
481 : *
482 : * - This creates a circular wait across nodes:
483 : *
484 : * Node-1: Query -> waits on Alter -> waits on Sync Worker
485 : *
486 : * Node-2: Query -> waits on Alter -> waits on Sync Worker
487 : *
488 : * Since each node only sees part of the wait graph, the deadlock may
489 : * go undetected, leading to indefinite blocking.
490 : *
491 : * Note: Each entry in VALUES includes an index 'seqidx' that
492 : * represents the sequence's position in the local 'seqinfos' list.
493 : * This index is propagated to the query results and later used to
494 : * directly map the fetched publisher sequence rows back to their
495 : * corresponding local entries without relying on result order or name
496 : * matching.
497 : */
498 12 : appendStringInfo(&cmd,
499 : "SELECT s.seqidx, has_sequence_privilege(c.oid, 'SELECT'),\n"
500 : " ps.*, seq.seqtypid,\n"
501 : " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
502 : " seq.seqmax, seq.seqcycle\n"
503 : "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
504 : "JOIN pg_namespace n ON n.nspname = s.schname\n"
505 : "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
506 : "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
507 : "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
508 : seqstr.data);
509 :
510 12 : res = walrcv_exec(conn, cmd.data, lengthof(seqRow), seqRow);
511 12 : if (res->status != WALRCV_OK_TUPLES)
512 0 : ereport(ERROR,
513 : errcode(ERRCODE_CONNECTION_FAILURE),
514 : errmsg("could not fetch sequence information from the publisher: %s",
515 : res->err));
516 :
517 12 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
518 34 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
519 : {
520 : CopySeqResult sync_status;
521 : LogicalRepSequenceInfo *seqinfo;
522 22 : Relation sequence_rel = NULL;
523 : int seqidx;
524 :
525 22 : CHECK_FOR_INTERRUPTS();
526 :
527 22 : if (ConfigReloadPending)
528 : {
529 0 : ConfigReloadPending = false;
530 0 : ProcessConfigFile(PGC_SIGHUP);
531 : }
532 :
533 22 : sync_status = get_and_validate_seq_info(slot, &sequence_rel,
534 : &seqinfo, &seqidx);
535 22 : if (sync_status == COPYSEQ_SUCCESS)
536 18 : sync_status = copy_sequence(seqinfo,
537 18 : sequence_rel->rd_rel->relowner);
538 :
539 22 : switch (sync_status)
540 : {
541 18 : case COPYSEQ_SUCCESS:
542 18 : elog(DEBUG1,
543 : "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
544 : MySubscription->name, seqinfo->nspname,
545 : seqinfo->seqname);
546 18 : batch_succeeded_count++;
547 18 : break;
548 3 : case COPYSEQ_MISMATCH:
549 :
550 : /*
551 : * Remember mismatched sequences in a long-lived memory
552 : * context since these will be used after the transaction
553 : * is committed.
554 : */
555 3 : oldctx = MemoryContextSwitchTo(ApplyContext);
556 3 : mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
557 : seqidx);
558 3 : MemoryContextSwitchTo(oldctx);
559 3 : batch_mismatched_count++;
560 3 : break;
561 0 : case COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM:
562 :
563 : /*
564 : * Remember sequences with insufficient privileges in a
565 : * long-lived memory context since these will be used
566 : * after the transaction is committed.
567 : */
568 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
569 0 : sub_insuffperm_seqs_idx = lappend_int(sub_insuffperm_seqs_idx,
570 : seqidx);
571 0 : MemoryContextSwitchTo(oldctx);
572 0 : batch_sub_insuffperm_count++;
573 0 : break;
574 1 : case COPYSEQ_PUBLISHER_INSUFFICIENT_PERM:
575 :
576 : /*
577 : * Remember sequences for which the publisher lacks the
578 : * privileges required by pg_get_sequence_data().
579 : */
580 1 : oldctx = MemoryContextSwitchTo(ApplyContext);
581 1 : pub_insuffperm_seqs_idx = lappend_int(pub_insuffperm_seqs_idx,
582 : seqidx);
583 1 : MemoryContextSwitchTo(oldctx);
584 1 : batch_pub_insuffperm_count++;
585 1 : break;
586 0 : case COPYSEQ_SKIPPED:
587 :
588 : /*
589 : * Concurrent removal of a sequence on the subscriber is
590 : * treated as success, since the only viable action is to
591 : * skip the corresponding sequence data. Missing sequences
592 : * on the publisher are treated as ERROR.
593 : */
594 0 : if (seqinfo->found_on_pub)
595 : {
596 0 : ereport(LOG,
597 : errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
598 : seqinfo->nspname,
599 : seqinfo->seqname));
600 0 : batch_skipped_count++;
601 : }
602 0 : break;
603 : }
604 :
605 22 : if (sequence_rel)
606 21 : table_close(sequence_rel, NoLock);
607 : }
608 :
609 12 : ExecDropSingleTupleTableSlot(slot);
610 12 : walrcv_clear_result(res);
611 12 : resetStringInfo(&seqstr);
612 12 : resetStringInfo(&cmd);
613 :
614 12 : batch_missing_count = batch_size - (batch_succeeded_count +
615 12 : batch_mismatched_count +
616 12 : batch_sub_insuffperm_count +
617 12 : batch_pub_insuffperm_count +
618 : batch_skipped_count);
619 :
620 12 : elog(DEBUG1,
621 : "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d subscriber insufficient permission, %d publisher insufficient permission, %d missing from publisher, %d skipped",
622 : MySubscription->name,
623 : (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
624 : batch_size, batch_succeeded_count, batch_mismatched_count,
625 : batch_sub_insuffperm_count, batch_pub_insuffperm_count, batch_missing_count, batch_skipped_count);
626 :
627 : /* Commit this batch, and prepare for next batch */
628 12 : CommitTransactionCommand();
629 :
630 12 : if (batch_missing_count)
631 : {
632 8 : for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
633 : {
634 : LogicalRepSequenceInfo *seqinfo =
635 6 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
636 :
637 : /* If the sequence was not found on publisher, record it */
638 6 : if (!seqinfo->found_on_pub)
639 2 : missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
640 : }
641 : }
642 :
643 : /*
644 : * cur_batch_base_index is not incremented sequentially because some
645 : * sequences may be missing, and the number of fetched rows may not
646 : * match the batch size.
647 : */
648 12 : cur_batch_base_index += batch_size;
649 : }
650 :
651 : /* Report mismatches, permission issues, or missing sequences */
652 12 : report_sequence_errors(mismatched_seqs_idx, sub_insuffperm_seqs_idx,
653 : pub_insuffperm_seqs_idx, missing_seqs_idx);
654 6 : }
655 :
656 : /*
657 : * Identifies sequences that require synchronization and initiates the
658 : * synchronization process.
659 : */
660 : static void
661 12 : LogicalRepSyncSequences(void)
662 : {
663 : char *err;
664 : bool must_use_password;
665 : Relation rel;
666 : HeapTuple tup;
667 : ScanKeyData skey[2];
668 : SysScanDesc scan;
669 12 : Oid subid = MyLogicalRepWorker->subid;
670 : StringInfoData app_name;
671 :
672 12 : StartTransactionCommand();
673 :
674 12 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
675 :
676 12 : ScanKeyInit(&skey[0],
677 : Anum_pg_subscription_rel_srsubid,
678 : BTEqualStrategyNumber, F_OIDEQ,
679 : ObjectIdGetDatum(subid));
680 :
681 12 : ScanKeyInit(&skey[1],
682 : Anum_pg_subscription_rel_srsubstate,
683 : BTEqualStrategyNumber, F_CHAREQ,
684 : CharGetDatum(SUBREL_STATE_INIT));
685 :
686 12 : scan = systable_beginscan(rel, InvalidOid, false,
687 : NULL, 2, skey);
688 37 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
689 : {
690 : Form_pg_subscription_rel subrel;
691 : LogicalRepSequenceInfo *seq;
692 : Relation sequence_rel;
693 : MemoryContext oldctx;
694 :
695 25 : CHECK_FOR_INTERRUPTS();
696 :
697 25 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
698 :
699 25 : sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
700 :
701 : /* Skip if sequence was dropped concurrently */
702 25 : if (!sequence_rel)
703 0 : continue;
704 :
705 : /* Skip if the relation is not a sequence */
706 25 : if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
707 : {
708 1 : table_close(sequence_rel, NoLock);
709 1 : continue;
710 : }
711 :
712 : /*
713 : * Worker needs to process sequences across transaction boundary, so
714 : * allocate them under long-lived context.
715 : */
716 24 : oldctx = MemoryContextSwitchTo(ApplyContext);
717 :
718 24 : seq = palloc0_object(LogicalRepSequenceInfo);
719 24 : seq->localrelid = subrel->srrelid;
720 24 : seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
721 24 : seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
722 24 : seqinfos = lappend(seqinfos, seq);
723 :
724 24 : MemoryContextSwitchTo(oldctx);
725 :
726 24 : table_close(sequence_rel, NoLock);
727 : }
728 :
729 : /* Cleanup */
730 12 : systable_endscan(scan);
731 12 : table_close(rel, AccessShareLock);
732 :
733 12 : CommitTransactionCommand();
734 :
735 : /*
736 : * Exit early if no catalog entries found, likely due to concurrent drops.
737 : */
738 12 : if (!seqinfos)
739 0 : return;
740 :
741 : /* Is the use of a password mandatory? */
742 24 : must_use_password = MySubscription->passwordrequired &&
743 12 : !MySubscription->ownersuperuser;
744 :
745 12 : initStringInfo(&app_name);
746 12 : appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
747 12 : MySubscription->oid, GetSystemIdentifier());
748 :
749 : /*
750 : * Establish the connection to the publisher for sequence synchronization.
751 : */
752 12 : LogRepWorkerWalRcvConn =
753 12 : walrcv_connect(MySubscription->conninfo, true, true,
754 : must_use_password,
755 : app_name.data, &err);
756 12 : if (LogRepWorkerWalRcvConn == NULL)
757 0 : ereport(ERROR,
758 : errcode(ERRCODE_CONNECTION_FAILURE),
759 : errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
760 : MySubscription->name, err));
761 :
762 12 : pfree(app_name.data);
763 :
764 12 : copy_sequences(LogRepWorkerWalRcvConn);
765 : }
766 :
767 : /*
768 : * Execute the initial sync with error handling. Disable the subscription,
769 : * if required.
770 : *
771 : * Note that we don't handle FATAL errors which are probably because of system
772 : * resource error and are not repeatable.
773 : */
774 : static void
775 12 : start_sequence_sync(void)
776 : {
777 : Assert(am_sequencesync_worker());
778 :
779 12 : PG_TRY();
780 : {
781 : /* Call initial sync. */
782 12 : LogicalRepSyncSequences();
783 : }
784 6 : PG_CATCH();
785 : {
786 6 : if (MySubscription->disableonerr)
787 0 : DisableSubscriptionAndExit();
788 : else
789 : {
790 : /*
791 : * Report the worker failed during sequence synchronization. Abort
792 : * the current transaction so that the stats message is sent in an
793 : * idle state.
794 : */
795 6 : AbortOutOfAnyTransaction();
796 6 : pgstat_report_subscription_error(MySubscription->oid);
797 :
798 6 : PG_RE_THROW();
799 : }
800 : }
801 6 : PG_END_TRY();
802 6 : }
803 :
804 : /* Logical Replication sequencesync worker entry point */
805 : void
806 12 : SequenceSyncWorkerMain(Datum main_arg)
807 : {
808 12 : int worker_slot = DatumGetInt32(main_arg);
809 :
810 12 : SetupApplyOrSyncWorker(worker_slot);
811 :
812 12 : start_sequence_sync();
813 :
814 6 : FinishSyncWorker();
815 : }
|