Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * sequencesync.c
3 : : * PostgreSQL logical replication: sequence synchronization
4 : : *
5 : : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/sequencesync.c
9 : : *
10 : : * NOTES
11 : : * This file contains code for sequence synchronization for
12 : : * logical replication.
13 : : *
14 : : * Sequences requiring synchronization are tracked in the pg_subscription_rel
15 : : * catalog.
16 : : *
17 : : * Sequences to be synchronized will be added with state INIT when either of
18 : : * the following commands is executed:
19 : : * CREATE SUBSCRIPTION
20 : : * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
21 : : *
22 : : * Executing the following command resets all sequences in the subscription to
23 : : * state INIT, triggering re-synchronization:
24 : : * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
25 : : *
26 : : * The apply worker periodically scans pg_subscription_rel for sequences in
27 : : * INIT state. When such sequences are found, it spawns a sequencesync worker
28 : : * to handle synchronization.
29 : : *
30 : : * A single sequencesync worker is responsible for synchronizing all sequences.
31 : : * It begins by retrieving the list of sequences that are flagged for
32 : : * synchronization, i.e., those in the INIT state. These sequences are then
33 : : * processed in batches, allowing multiple entries to be synchronized within a
34 : : * single transaction. The worker fetches the current sequence values and page
35 : : * LSNs from the remote publisher, updates the corresponding sequences on the
36 : : * local subscriber, and finally marks each sequence as READY upon successful
37 : : * synchronization.
38 : : *
39 : : * Sequence state transitions follow this pattern:
40 : : * INIT -> READY
41 : : *
42 : : * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
43 : : * sequences are synchronized per transaction. The locks on the sequence
44 : : * relation will be periodically released at each transaction commit.
45 : : *
46 : : * XXX: We didn't choose launcher process to maintain the launch of sequencesync
47 : : * worker as it didn't have database connection to access the sequences from the
48 : : * pg_subscription_rel system catalog that need to be synchronized.
49 : : *-------------------------------------------------------------------------
50 : : */
51 : :
52 : : #include "postgres.h"
53 : :
54 : : #include "access/genam.h"
55 : : #include "access/table.h"
56 : : #include "catalog/pg_sequence.h"
57 : : #include "catalog/pg_subscription_rel.h"
58 : : #include "commands/sequence.h"
59 : : #include "pgstat.h"
60 : : #include "postmaster/interrupt.h"
61 : : #include "replication/logicalworker.h"
62 : : #include "replication/worker_internal.h"
63 : : #include "storage/lwlock.h"
64 : : #include "utils/acl.h"
65 : : #include "utils/builtins.h"
66 : : #include "utils/fmgroids.h"
67 : : #include "utils/guc.h"
68 : : #include "utils/inval.h"
69 : : #include "utils/lsyscache.h"
70 : : #include "utils/memutils.h"
71 : : #include "utils/pg_lsn.h"
72 : : #include "utils/syscache.h"
73 : : #include "utils/usercontext.h"
74 : :
75 : : #define REMOTE_SEQ_COL_COUNT 11
76 : :
77 : : typedef enum CopySeqResult
78 : : {
79 : : COPYSEQ_SUCCESS,
80 : : COPYSEQ_MISMATCH,
81 : : COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM,
82 : : COPYSEQ_PUBLISHER_INSUFFICIENT_PERM,
83 : : COPYSEQ_SKIPPED
84 : : } CopySeqResult;
85 : :
86 : : static List *seqinfos = NIL;
87 : :
88 : : /*
89 : : * Apply worker determines if sequence synchronization is needed.
90 : : *
91 : : * Start a sequencesync worker if one is not already running. The active
92 : : * sequencesync worker will handle all pending sequence synchronization. If any
93 : : * sequences remain unsynchronized after it exits, a new worker can be started
94 : : * in the next iteration.
95 : : */
96 : : void
237 akapila@postgresql.o 97 :GNC 5023 : ProcessSequencesForSync(void)
98 : : {
99 : : LogicalRepWorker *sequencesync_worker;
100 : : int nsyncworkers;
101 : : bool has_pending_sequences;
102 : : bool started_tx;
103 : :
104 : 5023 : FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
105 : :
106 [ + + ]: 5023 : if (started_tx)
107 : : {
108 : 189 : CommitTransactionCommand();
109 : 189 : pgstat_report_stat(true);
110 : : }
111 : :
112 [ + + ]: 5023 : if (!has_pending_sequences)
113 : 4998 : return;
114 : :
115 : 39 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
116 : :
117 : : /* Check if there is a sequencesync worker already running? */
118 : 39 : sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
119 : 39 : MyLogicalRepWorker->subid,
120 : : InvalidOid, true);
121 [ + + ]: 39 : if (sequencesync_worker)
122 : : {
123 : 14 : LWLockRelease(LogicalRepWorkerLock);
124 : 14 : return;
125 : : }
126 : :
127 : : /*
128 : : * Count running sync workers for this subscription, while we have the
129 : : * lock.
130 : : */
131 : 25 : nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
132 : 25 : LWLockRelease(LogicalRepWorkerLock);
133 : :
134 : : /*
135 : : * It is okay to read/update last_seqsync_start_time here in apply worker
136 : : * as we have already ensured that sync worker doesn't exist.
137 : : */
138 : 25 : launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
139 : 25 : &MyLogicalRepWorker->last_seqsync_start_time);
140 : : }
141 : :
142 : : /*
143 : : * get_sequences_string
144 : : *
145 : : * Build a comma-separated string of schema-qualified sequence names
146 : : * for the given list of sequence indexes.
147 : : */
148 : : static void
149 : 6 : get_sequences_string(List *seqindexes, StringInfo buf)
150 : : {
151 : 6 : resetStringInfo(buf);
152 [ + - + + : 18 : foreach_int(seqidx, seqindexes)
+ + ]
153 : : {
154 : : LogicalRepSequenceInfo *seqinfo =
155 : 6 : (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
156 : :
157 [ - + ]: 6 : if (buf->len > 0)
237 akapila@postgresql.o 158 :UNC 0 : appendStringInfoString(buf, ", ");
159 : :
237 akapila@postgresql.o 160 :GNC 6 : appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
161 : : }
162 : 6 : }
163 : :
164 : : /*
165 : : * report_sequence_errors
166 : : *
167 : : * Report discrepancies found during sequence synchronization between
168 : : * the publisher and subscriber. Emits warnings for:
169 : : * a) mismatched definitions or concurrent rename
170 : : * b) insufficient privileges on the subscriber
171 : : * c) insufficient privileges on the publisher
172 : : * d) missing sequences on the publisher
173 : : * Then raises an ERROR to indicate synchronization failure.
174 : : */
175 : : static void
10 fujii@postgresql.org 176 : 11 : report_sequence_errors(List *mismatched_seqs_idx,
177 : : List *sub_insuffperm_seqs_idx,
178 : : List *pub_insuffperm_seqs_idx,
179 : : List *missing_seqs_idx)
180 : : {
181 : : StringInfoData seqstr;
182 : :
183 : : /* Quick exit if there are no errors to report */
184 [ + + + - : 11 : if (!mismatched_seqs_idx && !sub_insuffperm_seqs_idx &&
+ + ]
185 [ + + ]: 7 : !pub_insuffperm_seqs_idx && !missing_seqs_idx)
237 akapila@postgresql.o 186 : 5 : return;
187 : :
78 drowley@postgresql.o 188 : 6 : initStringInfo(&seqstr);
189 : :
237 akapila@postgresql.o 190 [ + + ]: 6 : if (mismatched_seqs_idx)
191 : : {
78 drowley@postgresql.o 192 : 3 : get_sequences_string(mismatched_seqs_idx, &seqstr);
237 akapila@postgresql.o 193 [ + - ]: 3 : ereport(WARNING,
194 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 : : errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
196 : : "mismatched or renamed sequences on subscriber (%s)",
197 : : list_length(mismatched_seqs_idx),
198 : : seqstr.data));
199 : : }
200 : :
10 fujii@postgresql.org 201 [ - + ]: 6 : if (sub_insuffperm_seqs_idx)
202 : : {
10 fujii@postgresql.org 203 :UNC 0 : get_sequences_string(sub_insuffperm_seqs_idx, &seqstr);
237 akapila@postgresql.o 204 [ # # ]: 0 : ereport(WARNING,
205 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
206 : : errmsg_plural("insufficient privileges on subscriber sequence (%s)",
207 : : "insufficient privileges on subscriber sequences (%s)",
208 : : list_length(sub_insuffperm_seqs_idx),
209 : : seqstr.data));
210 : : }
211 : :
10 fujii@postgresql.org 212 [ + + ]:GNC 6 : if (pub_insuffperm_seqs_idx)
213 : : {
214 : 1 : get_sequences_string(pub_insuffperm_seqs_idx, &seqstr);
215 [ + - ]: 1 : ereport(WARNING,
216 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
217 : : errmsg_plural("insufficient privileges on publisher sequence (%s)",
218 : : "insufficient privileges on publisher sequences (%s)",
219 : : list_length(pub_insuffperm_seqs_idx),
220 : : seqstr.data));
221 : : }
222 : :
237 akapila@postgresql.o 223 [ + + ]: 6 : if (missing_seqs_idx)
224 : : {
78 drowley@postgresql.o 225 : 2 : get_sequences_string(missing_seqs_idx, &seqstr);
237 akapila@postgresql.o 226 [ + - ]: 2 : ereport(WARNING,
227 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
228 : : errmsg_plural("missing sequence on publisher (%s)",
229 : : "missing sequences on publisher (%s)",
230 : : list_length(missing_seqs_idx),
231 : : seqstr.data));
232 : : }
233 : :
234 [ + - ]: 6 : ereport(ERROR,
235 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
236 : : errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
237 : : MySubscription->name));
238 : : }
239 : :
240 : : /*
241 : : * get_and_validate_seq_info
242 : : *
243 : : * Extracts remote sequence information from the tuple slot received from the
244 : : * publisher, and validates it against the corresponding local sequence
245 : : * definition.
246 : : */
247 : : static CopySeqResult
248 : 21 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
249 : : LogicalRepSequenceInfo **seqinfo, int *seqidx)
250 : : {
251 : : bool isnull;
252 : 21 : int col = 0;
253 : : Datum datum;
254 : : bool remote_has_select_priv;
255 : : Oid remote_typid;
256 : : int64 remote_start;
257 : : int64 remote_increment;
258 : : int64 remote_min;
259 : : int64 remote_max;
260 : : bool remote_cycle;
261 : 21 : CopySeqResult result = COPYSEQ_SUCCESS;
262 : : HeapTuple tup;
263 : : Form_pg_sequence local_seq;
264 : : LogicalRepSequenceInfo *seqinfo_local;
265 : :
266 : 21 : *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
267 [ - + ]: 21 : Assert(!isnull);
268 : :
269 : : /* Identify the corresponding local sequence for the given index. */
270 : 21 : *seqinfo = seqinfo_local =
271 : 21 : (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
272 : :
273 : : /*
274 : : * The remote sequence state can be NULL if the publisher lacks the
275 : : * required privileges or if the sequence was dropped concurrently after
276 : : * it was identified in the catalog snapshot (see pg_get_sequence_data()).
277 : : */
10 fujii@postgresql.org 278 : 21 : remote_has_select_priv = DatumGetBool(slot_getattr(slot, ++col, &isnull));
279 [ - + ]: 21 : Assert(!isnull);
280 : :
161 akapila@postgresql.o 281 : 21 : datum = slot_getattr(slot, ++col, &isnull);
282 [ + + ]: 21 : if (isnull)
10 fujii@postgresql.org 283 [ - + ]: 1 : return remote_has_select_priv ? COPYSEQ_SKIPPED :
284 : : COPYSEQ_PUBLISHER_INSUFFICIENT_PERM;
285 : :
161 akapila@postgresql.o 286 : 20 : seqinfo_local->last_value = DatumGetInt64(datum);
287 : :
237 288 : 20 : seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
289 [ - + ]: 20 : Assert(!isnull);
290 : :
291 : 20 : seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
292 [ - + ]: 20 : Assert(!isnull);
293 : :
294 : 20 : remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
295 [ - + ]: 20 : Assert(!isnull);
296 : :
297 : 20 : remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
298 [ - + ]: 20 : Assert(!isnull);
299 : :
300 : 20 : remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
301 [ - + ]: 20 : Assert(!isnull);
302 : :
303 : 20 : remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
304 [ - + ]: 20 : Assert(!isnull);
305 : :
306 : 20 : remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
307 [ - + ]: 20 : Assert(!isnull);
308 : :
309 : 20 : remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
310 [ - + ]: 20 : Assert(!isnull);
311 : :
312 : : /* Sanity check */
313 [ - + ]: 20 : Assert(col == REMOTE_SEQ_COL_COUNT);
314 : :
315 : 20 : seqinfo_local->found_on_pub = true;
316 : :
317 : 20 : *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
318 : :
319 : : /* Sequence was concurrently dropped? */
320 [ - + ]: 20 : if (!*sequence_rel)
237 akapila@postgresql.o 321 :UNC 0 : return COPYSEQ_SKIPPED;
322 : :
237 akapila@postgresql.o 323 :GNC 20 : tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
324 : :
325 : : /* Sequence was concurrently dropped? */
326 [ - + ]: 20 : if (!HeapTupleIsValid(tup))
237 akapila@postgresql.o 327 [ # # ]:UNC 0 : elog(ERROR, "cache lookup failed for sequence %u",
328 : : seqinfo_local->localrelid);
329 : :
237 akapila@postgresql.o 330 :GNC 20 : local_seq = (Form_pg_sequence) GETSTRUCT(tup);
331 : :
332 : : /* Sequence parameters for remote/local are the same? */
333 [ + - ]: 20 : if (local_seq->seqtypid != remote_typid ||
334 [ + + ]: 20 : local_seq->seqstart != remote_start ||
335 [ + + ]: 19 : local_seq->seqincrement != remote_increment ||
336 [ + - ]: 17 : local_seq->seqmin != remote_min ||
337 [ + - ]: 17 : local_seq->seqmax != remote_max ||
338 [ - + ]: 17 : local_seq->seqcycle != remote_cycle)
339 : 3 : result = COPYSEQ_MISMATCH;
340 : :
341 : : /* Sequence was concurrently renamed? */
342 [ + - ]: 20 : if (strcmp(seqinfo_local->nspname,
343 : 20 : get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
344 [ - + ]: 20 : strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
237 akapila@postgresql.o 345 :UNC 0 : result = COPYSEQ_MISMATCH;
346 : :
237 akapila@postgresql.o 347 :GNC 20 : ReleaseSysCache(tup);
348 : 20 : return result;
349 : : }
350 : :
351 : : /*
352 : : * Apply remote sequence state to local sequence and mark it as
353 : : * synchronized (READY).
354 : : */
355 : : static CopySeqResult
356 : 17 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
357 : : {
358 : : UserContext ucxt;
359 : : AclResult aclresult;
360 : 17 : bool run_as_owner = MySubscription->runasowner;
361 : 17 : Oid seqoid = seqinfo->localrelid;
362 : :
363 : : /*
364 : : * If the user did not opt to run as the owner of the subscription
365 : : * ('run_as_owner'), then copy the sequence as the owner of the sequence.
366 : : */
367 [ + - ]: 17 : if (!run_as_owner)
368 : 17 : SwitchToUntrustedUser(seqowner, &ucxt);
369 : :
370 : 17 : aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
371 : :
372 [ - + ]: 17 : if (aclresult != ACLCHECK_OK)
373 : : {
237 akapila@postgresql.o 374 [ # # ]:UNC 0 : if (!run_as_owner)
375 : 0 : RestoreUserContext(&ucxt);
376 : :
10 fujii@postgresql.org 377 : 0 : return COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM;
378 : : }
379 : :
380 : : /*
381 : : * The log counter (log_cnt) tracks how many sequence values are still
382 : : * unused locally. It is only relevant to the local node and managed
383 : : * internally by nextval() when allocating new ranges. Since log_cnt does
384 : : * not affect the visible sequence state (like last_value or is_called)
385 : : * and is only used for local caching, it need not be copied to the
386 : : * subscriber during synchronization.
387 : : */
237 akapila@postgresql.o 388 :GNC 17 : SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
389 : :
390 [ + - ]: 17 : if (!run_as_owner)
391 : 17 : RestoreUserContext(&ucxt);
392 : :
393 : : /*
394 : : * Record the remote sequence's LSN in pg_subscription_rel and mark the
395 : : * sequence as READY.
396 : : */
397 : 17 : UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
398 : : seqinfo->page_lsn, false);
399 : :
400 : 17 : return COPYSEQ_SUCCESS;
401 : : }
402 : :
403 : : /*
404 : : * Copy existing data of sequences from the publisher.
405 : : */
406 : : static void
407 : 11 : copy_sequences(WalReceiverConn *conn)
408 : : {
409 : 11 : int cur_batch_base_index = 0;
410 : 11 : int n_seqinfos = list_length(seqinfos);
411 : 11 : List *mismatched_seqs_idx = NIL;
412 : 11 : List *missing_seqs_idx = NIL;
10 fujii@postgresql.org 413 : 11 : List *sub_insuffperm_seqs_idx = NIL;
414 : 11 : List *pub_insuffperm_seqs_idx = NIL;
415 : : StringInfoData seqstr;
416 : : StringInfoData cmd;
417 : : MemoryContext oldctx;
418 : :
78 drowley@postgresql.o 419 : 11 : initStringInfo(&seqstr);
420 : 11 : initStringInfo(&cmd);
421 : :
422 : : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
423 : :
237 akapila@postgresql.o 424 [ - + ]: 11 : elog(DEBUG1,
425 : : "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
426 : : MySubscription->name, n_seqinfos);
427 : :
428 [ + + ]: 22 : while (cur_batch_base_index < n_seqinfos)
429 : : {
10 fujii@postgresql.org 430 : 11 : Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, BOOLOID, INT8OID,
431 : : BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
237 akapila@postgresql.o 432 : 11 : int batch_size = 0;
433 : 11 : int batch_succeeded_count = 0;
434 : 11 : int batch_mismatched_count = 0;
435 : 11 : int batch_skipped_count = 0;
10 fujii@postgresql.org 436 : 11 : int batch_sub_insuffperm_count = 0;
437 : 11 : int batch_pub_insuffperm_count = 0;
438 : : int batch_missing_count;
439 : :
440 : : WalRcvExecResult *res;
441 : : TupleTableSlot *slot;
442 : :
237 akapila@postgresql.o 443 : 11 : StartTransactionCommand();
444 : :
445 [ + + ]: 34 : for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
446 : : {
447 : : char *nspname_literal;
448 : : char *seqname_literal;
449 : :
450 : : LogicalRepSequenceInfo *seqinfo =
451 : 23 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
452 : :
78 drowley@postgresql.o 453 [ + + ]: 23 : if (seqstr.len > 0)
454 : 12 : appendStringInfoString(&seqstr, ", ");
455 : :
236 akapila@postgresql.o 456 : 23 : nspname_literal = quote_literal_cstr(seqinfo->nspname);
457 : 23 : seqname_literal = quote_literal_cstr(seqinfo->seqname);
458 : :
78 drowley@postgresql.o 459 : 23 : appendStringInfo(&seqstr, "(%s, %s, %d)",
460 : : nspname_literal, seqname_literal, idx);
461 : :
237 akapila@postgresql.o 462 [ - + ]: 23 : if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
237 akapila@postgresql.o 463 :UNC 0 : break;
464 : : }
465 : :
466 : : /*
467 : : * We deliberately avoid acquiring a local lock on the sequence before
468 : : * querying the publisher to prevent potential distributed deadlocks
469 : : * in bi-directional replication setups.
470 : : *
471 : : * Example scenario:
472 : : *
473 : : * - On each node, a background worker acquires a lock on a sequence
474 : : * as part of a sync operation.
475 : : *
476 : : * - Concurrently, a user transaction attempts to alter the same
477 : : * sequence, waiting on the background worker's lock.
478 : : *
479 : : * - Meanwhile, a query from the other node tries to access metadata
480 : : * that depends on the completion of the alter operation.
481 : : *
482 : : * - This creates a circular wait across nodes:
483 : : *
484 : : * Node-1: Query -> waits on Alter -> waits on Sync Worker
485 : : *
486 : : * Node-2: Query -> waits on Alter -> waits on Sync Worker
487 : : *
488 : : * Since each node only sees part of the wait graph, the deadlock may
489 : : * go undetected, leading to indefinite blocking.
490 : : *
491 : : * Note: Each entry in VALUES includes an index 'seqidx' that
492 : : * represents the sequence's position in the local 'seqinfos' list.
493 : : * This index is propagated to the query results and later used to
494 : : * directly map the fetched publisher sequence rows back to their
495 : : * corresponding local entries without relying on result order or name
496 : : * matching.
497 : : */
78 drowley@postgresql.o 498 :GNC 11 : appendStringInfo(&cmd,
499 : : "SELECT s.seqidx, has_sequence_privilege(c.oid, 'SELECT'),\n"
500 : : " ps.*, seq.seqtypid,\n"
501 : : " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
502 : : " seq.seqmax, seq.seqcycle\n"
503 : : "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
504 : : "JOIN pg_namespace n ON n.nspname = s.schname\n"
505 : : "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
506 : : "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
507 : : "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
508 : : seqstr.data);
509 : :
510 : 11 : res = walrcv_exec(conn, cmd.data, lengthof(seqRow), seqRow);
237 akapila@postgresql.o 511 [ - + ]: 11 : if (res->status != WALRCV_OK_TUPLES)
237 akapila@postgresql.o 512 [ # # ]:UNC 0 : ereport(ERROR,
513 : : errcode(ERRCODE_CONNECTION_FAILURE),
514 : : errmsg("could not fetch sequence information from the publisher: %s",
515 : : res->err));
516 : :
237 akapila@postgresql.o 517 :GNC 11 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
518 [ + + ]: 32 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
519 : : {
520 : : CopySeqResult sync_status;
521 : : LogicalRepSequenceInfo *seqinfo;
61 522 : 21 : Relation sequence_rel = NULL;
523 : : int seqidx;
524 : :
237 525 [ - + ]: 21 : CHECK_FOR_INTERRUPTS();
526 : :
527 [ - + ]: 21 : if (ConfigReloadPending)
528 : : {
237 akapila@postgresql.o 529 :UNC 0 : ConfigReloadPending = false;
530 : 0 : ProcessConfigFile(PGC_SIGHUP);
531 : : }
532 : :
237 akapila@postgresql.o 533 :GNC 21 : sync_status = get_and_validate_seq_info(slot, &sequence_rel,
534 : : &seqinfo, &seqidx);
535 [ + + ]: 21 : if (sync_status == COPYSEQ_SUCCESS)
536 : 17 : sync_status = copy_sequence(seqinfo,
537 : 17 : sequence_rel->rd_rel->relowner);
538 : :
539 [ + + - + : 21 : switch (sync_status)
- - ]
540 : : {
541 : 17 : case COPYSEQ_SUCCESS:
542 [ - + ]: 17 : elog(DEBUG1,
543 : : "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
544 : : MySubscription->name, seqinfo->nspname,
545 : : seqinfo->seqname);
546 : 17 : batch_succeeded_count++;
547 : 17 : break;
548 : 3 : case COPYSEQ_MISMATCH:
549 : :
550 : : /*
551 : : * Remember mismatched sequences in a long-lived memory
552 : : * context since these will be used after the transaction
553 : : * is committed.
554 : : */
555 : 3 : oldctx = MemoryContextSwitchTo(ApplyContext);
556 : 3 : mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
557 : : seqidx);
558 : 3 : MemoryContextSwitchTo(oldctx);
559 : 3 : batch_mismatched_count++;
560 : 3 : break;
10 fujii@postgresql.org 561 :UNC 0 : case COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM:
562 : :
563 : : /*
564 : : * Remember sequences with insufficient privileges in a
565 : : * long-lived memory context since these will be used
566 : : * after the transaction is committed.
567 : : */
237 akapila@postgresql.o 568 : 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
10 fujii@postgresql.org 569 : 0 : sub_insuffperm_seqs_idx = lappend_int(sub_insuffperm_seqs_idx,
570 : : seqidx);
571 : 0 : MemoryContextSwitchTo(oldctx);
572 : 0 : batch_sub_insuffperm_count++;
573 : 0 : break;
10 fujii@postgresql.org 574 :GNC 1 : case COPYSEQ_PUBLISHER_INSUFFICIENT_PERM:
575 : :
576 : : /*
577 : : * Remember sequences for which the publisher lacks the
578 : : * privileges required by pg_get_sequence_data().
579 : : */
580 : 1 : oldctx = MemoryContextSwitchTo(ApplyContext);
581 : 1 : pub_insuffperm_seqs_idx = lappend_int(pub_insuffperm_seqs_idx,
582 : : seqidx);
237 akapila@postgresql.o 583 : 1 : MemoryContextSwitchTo(oldctx);
10 fujii@postgresql.org 584 : 1 : batch_pub_insuffperm_count++;
237 akapila@postgresql.o 585 : 1 : break;
237 akapila@postgresql.o 586 :UNC 0 : case COPYSEQ_SKIPPED:
587 : :
588 : : /*
589 : : * Concurrent removal of a sequence on the subscriber is
590 : : * treated as success, since the only viable action is to
591 : : * skip the corresponding sequence data. Missing sequences
592 : : * on the publisher are treated as ERROR.
593 : : */
161 594 [ # # ]: 0 : if (seqinfo->found_on_pub)
595 : : {
596 [ # # ]: 0 : ereport(LOG,
597 : : errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
598 : : seqinfo->nspname,
599 : : seqinfo->seqname));
600 : 0 : batch_skipped_count++;
601 : : }
237 602 : 0 : break;
603 : : }
604 : :
237 akapila@postgresql.o 605 [ + + ]:GNC 21 : if (sequence_rel)
606 : 20 : table_close(sequence_rel, NoLock);
607 : : }
608 : :
609 : 11 : ExecDropSingleTupleTableSlot(slot);
610 : 11 : walrcv_clear_result(res);
78 drowley@postgresql.o 611 : 11 : resetStringInfo(&seqstr);
612 : 11 : resetStringInfo(&cmd);
613 : :
237 akapila@postgresql.o 614 : 11 : batch_missing_count = batch_size - (batch_succeeded_count +
615 : 11 : batch_mismatched_count +
10 fujii@postgresql.org 616 : 11 : batch_sub_insuffperm_count +
617 : 11 : batch_pub_insuffperm_count +
618 : : batch_skipped_count);
619 : :
237 akapila@postgresql.o 620 [ - + ]: 11 : elog(DEBUG1,
621 : : "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d subscriber insufficient permission, %d publisher insufficient permission, %d missing from publisher, %d skipped",
622 : : MySubscription->name,
623 : : (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
624 : : batch_size, batch_succeeded_count, batch_mismatched_count,
625 : : batch_sub_insuffperm_count, batch_pub_insuffperm_count, batch_missing_count, batch_skipped_count);
626 : :
627 : : /* Commit this batch, and prepare for next batch */
628 : 11 : CommitTransactionCommand();
629 : :
630 [ + + ]: 11 : if (batch_missing_count)
631 : : {
632 [ + + ]: 8 : for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
633 : : {
634 : : LogicalRepSequenceInfo *seqinfo =
635 : 6 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
636 : :
637 : : /* If the sequence was not found on publisher, record it */
638 [ + + ]: 6 : if (!seqinfo->found_on_pub)
639 : 2 : missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
640 : : }
641 : : }
642 : :
643 : : /*
644 : : * cur_batch_base_index is not incremented sequentially because some
645 : : * sequences may be missing, and the number of fetched rows may not
646 : : * match the batch size.
647 : : */
648 : 11 : cur_batch_base_index += batch_size;
649 : : }
650 : :
651 : : /* Report mismatches, permission issues, or missing sequences */
10 fujii@postgresql.org 652 : 11 : report_sequence_errors(mismatched_seqs_idx, sub_insuffperm_seqs_idx,
653 : : pub_insuffperm_seqs_idx, missing_seqs_idx);
237 akapila@postgresql.o 654 : 5 : }
655 : :
656 : : /*
657 : : * Identifies sequences that require synchronization and initiates the
658 : : * synchronization process.
659 : : */
660 : : static void
661 : 11 : LogicalRepSyncSequences(void)
662 : : {
663 : : char *err;
664 : : bool must_use_password;
665 : : Relation rel;
666 : : HeapTuple tup;
667 : : ScanKeyData skey[2];
668 : : SysScanDesc scan;
669 : 11 : Oid subid = MyLogicalRepWorker->subid;
670 : : StringInfoData app_name;
671 : :
672 : 11 : StartTransactionCommand();
673 : :
674 : 11 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
675 : :
676 : 11 : ScanKeyInit(&skey[0],
677 : : Anum_pg_subscription_rel_srsubid,
678 : : BTEqualStrategyNumber, F_OIDEQ,
679 : : ObjectIdGetDatum(subid));
680 : :
681 : 11 : ScanKeyInit(&skey[1],
682 : : Anum_pg_subscription_rel_srsubstate,
683 : : BTEqualStrategyNumber, F_CHAREQ,
684 : : CharGetDatum(SUBREL_STATE_INIT));
685 : :
686 : 11 : scan = systable_beginscan(rel, InvalidOid, false,
687 : : NULL, 2, skey);
688 [ + + ]: 35 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
689 : : {
690 : : Form_pg_subscription_rel subrel;
691 : : LogicalRepSequenceInfo *seq;
692 : : Relation sequence_rel;
693 : : MemoryContext oldctx;
694 : :
695 [ - + ]: 24 : CHECK_FOR_INTERRUPTS();
696 : :
697 : 24 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
698 : :
699 : 24 : sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
700 : :
701 : : /* Skip if sequence was dropped concurrently */
702 [ - + ]: 24 : if (!sequence_rel)
237 akapila@postgresql.o 703 :UNC 0 : continue;
704 : :
705 : : /* Skip if the relation is not a sequence */
237 akapila@postgresql.o 706 [ + + ]:GNC 24 : if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
707 : : {
708 : 1 : table_close(sequence_rel, NoLock);
709 : 1 : continue;
710 : : }
711 : :
712 : : /*
713 : : * Worker needs to process sequences across transaction boundary, so
714 : : * allocate them under long-lived context.
715 : : */
716 : 23 : oldctx = MemoryContextSwitchTo(ApplyContext);
717 : :
718 : 23 : seq = palloc0_object(LogicalRepSequenceInfo);
719 : 23 : seq->localrelid = subrel->srrelid;
720 : 23 : seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
721 : 23 : seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
722 : 23 : seqinfos = lappend(seqinfos, seq);
723 : :
724 : 23 : MemoryContextSwitchTo(oldctx);
725 : :
726 : 23 : table_close(sequence_rel, NoLock);
727 : : }
728 : :
729 : : /* Cleanup */
730 : 11 : systable_endscan(scan);
731 : 11 : table_close(rel, AccessShareLock);
732 : :
733 : 11 : CommitTransactionCommand();
734 : :
735 : : /*
736 : : * Exit early if no catalog entries found, likely due to concurrent drops.
737 : : */
738 [ - + ]: 11 : if (!seqinfos)
237 akapila@postgresql.o 739 :UNC 0 : return;
740 : :
741 : : /* Is the use of a password mandatory? */
237 akapila@postgresql.o 742 [ + - ]:GNC 22 : must_use_password = MySubscription->passwordrequired &&
743 [ - + ]: 11 : !MySubscription->ownersuperuser;
744 : :
745 : 11 : initStringInfo(&app_name);
746 : 11 : appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
747 : 11 : MySubscription->oid, GetSystemIdentifier());
748 : :
749 : : /*
750 : : * Establish the connection to the publisher for sequence synchronization.
751 : : */
752 : 11 : LogRepWorkerWalRcvConn =
753 : 11 : walrcv_connect(MySubscription->conninfo, true, true,
754 : : must_use_password,
755 : : app_name.data, &err);
756 [ - + ]: 11 : if (LogRepWorkerWalRcvConn == NULL)
237 akapila@postgresql.o 757 [ # # ]:UNC 0 : ereport(ERROR,
758 : : errcode(ERRCODE_CONNECTION_FAILURE),
759 : : errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
760 : : MySubscription->name, err));
761 : :
237 akapila@postgresql.o 762 :GNC 11 : pfree(app_name.data);
763 : :
764 : 11 : copy_sequences(LogRepWorkerWalRcvConn);
765 : : }
766 : :
767 : : /*
768 : : * Execute the initial sync with error handling. Disable the subscription,
769 : : * if required.
770 : : *
771 : : * Note that we don't handle FATAL errors which are probably because of system
772 : : * resource error and are not repeatable.
773 : : */
774 : : static void
209 nathan@postgresql.or 775 : 11 : start_sequence_sync(void)
776 : : {
237 akapila@postgresql.o 777 [ - + ]: 11 : Assert(am_sequencesync_worker());
778 : :
779 [ + + ]: 11 : PG_TRY();
780 : : {
781 : : /* Call initial sync. */
782 : 11 : LogicalRepSyncSequences();
783 : : }
784 : 6 : PG_CATCH();
785 : : {
786 [ - + ]: 6 : if (MySubscription->disableonerr)
237 akapila@postgresql.o 787 :UNC 0 : DisableSubscriptionAndExit();
788 : : else
789 : : {
790 : : /*
791 : : * Report the worker failed during sequence synchronization. Abort
792 : : * the current transaction so that the stats message is sent in an
793 : : * idle state.
794 : : */
237 akapila@postgresql.o 795 :GNC 6 : AbortOutOfAnyTransaction();
130 796 : 6 : pgstat_report_subscription_error(MySubscription->oid);
797 : :
237 798 : 6 : PG_RE_THROW();
799 : : }
800 : : }
801 [ - + ]: 5 : PG_END_TRY();
802 : 5 : }
803 : :
804 : : /* Logical Replication sequencesync worker entry point */
805 : : void
806 : 11 : SequenceSyncWorkerMain(Datum main_arg)
807 : : {
808 : 11 : int worker_slot = DatumGetInt32(main_arg);
809 : :
810 : 11 : SetupApplyOrSyncWorker(worker_slot);
811 : :
812 : 11 : start_sequence_sync();
813 : :
814 : 5 : FinishSyncWorker();
815 : : }
|