LCOV - code coverage report
Current view: top level - src/backend/replication/logical - sequencesync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 199 226 88.1 %
Date: 2026-01-18 04:17:21 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * sequencesync.c
       3             :  *    PostgreSQL logical replication: sequence synchronization
       4             :  *
       5             :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/sequencesync.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code for sequence synchronization for
      12             :  *    logical replication.
      13             :  *
      14             :  * Sequences requiring synchronization are tracked in the pg_subscription_rel
      15             :  * catalog.
      16             :  *
      17             :  * Sequences to be synchronized will be added with state INIT when either of
      18             :  * the following commands is executed:
      19             :  * CREATE SUBSCRIPTION
      20             :  * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
      21             :  *
      22             :  * Executing the following command resets all sequences in the subscription to
      23             :  * state INIT, triggering re-synchronization:
      24             :  * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
      25             :  *
      26             :  * The apply worker periodically scans pg_subscription_rel for sequences in
      27             :  * INIT state. When such sequences are found, it spawns a sequencesync worker
      28             :  * to handle synchronization.
      29             :  *
      30             :  * A single sequencesync worker is responsible for synchronizing all sequences.
      31             :  * It begins by retrieving the list of sequences that are flagged for
      32             :  * synchronization, i.e., those in the INIT state. These sequences are then
      33             :  * processed in batches, allowing multiple entries to be synchronized within a
      34             :  * single transaction. The worker fetches the current sequence values and page
      35             :  * LSNs from the remote publisher, updates the corresponding sequences on the
      36             :  * local subscriber, and finally marks each sequence as READY upon successful
      37             :  * synchronization.
      38             :  *
      39             :  * Sequence state transitions follow this pattern:
      40             :  *   INIT -> READY
      41             :  *
      42             :  * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
      43             :  * sequences are synchronized per transaction. The locks on the sequence
      44             :  * relation will be periodically released at each transaction commit.
      45             :  *
      46             :  * XXX: We didn't choose launcher process to maintain the launch of sequencesync
      47             :  * worker as it didn't have database connection to access the sequences from the
      48             :  * pg_subscription_rel system catalog that need to be synchronized.
      49             :  *-------------------------------------------------------------------------
      50             :  */
      51             : 
      52             : #include "postgres.h"
      53             : 
      54             : #include "access/genam.h"
      55             : #include "access/table.h"
      56             : #include "catalog/pg_sequence.h"
      57             : #include "catalog/pg_subscription_rel.h"
      58             : #include "commands/sequence.h"
      59             : #include "pgstat.h"
      60             : #include "postmaster/interrupt.h"
      61             : #include "replication/logicalworker.h"
      62             : #include "replication/worker_internal.h"
      63             : #include "utils/acl.h"
      64             : #include "utils/builtins.h"
      65             : #include "utils/fmgroids.h"
      66             : #include "utils/guc.h"
      67             : #include "utils/inval.h"
      68             : #include "utils/lsyscache.h"
      69             : #include "utils/memutils.h"
      70             : #include "utils/pg_lsn.h"
      71             : #include "utils/syscache.h"
      72             : #include "utils/usercontext.h"
      73             : 
      74             : #define REMOTE_SEQ_COL_COUNT 10
      75             : 
      76             : typedef enum CopySeqResult
      77             : {
      78             :     COPYSEQ_SUCCESS,
      79             :     COPYSEQ_MISMATCH,
      80             :     COPYSEQ_INSUFFICIENT_PERM,
      81             :     COPYSEQ_SKIPPED
      82             : } CopySeqResult;
      83             : 
      84             : static List *seqinfos = NIL;
      85             : 
      86             : /*
      87             :  * Apply worker determines if sequence synchronization is needed.
      88             :  *
      89             :  * Start a sequencesync worker if one is not already running. The active
      90             :  * sequencesync worker will handle all pending sequence synchronization. If any
      91             :  * sequences remain unsynchronized after it exits, a new worker can be started
      92             :  * in the next iteration.
      93             :  */
      94             : void
      95       21926 : ProcessSequencesForSync(void)
      96             : {
      97             :     LogicalRepWorker *sequencesync_worker;
      98             :     int         nsyncworkers;
      99             :     bool        has_pending_sequences;
     100             :     bool        started_tx;
     101             : 
     102       21926 :     FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
     103             : 
     104       21926 :     if (started_tx)
     105             :     {
     106         352 :         CommitTransactionCommand();
     107         352 :         pgstat_report_stat(true);
     108             :     }
     109             : 
     110       21926 :     if (!has_pending_sequences)
     111       21896 :         return;
     112             : 
     113          52 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     114             : 
     115             :     /* Check if there is a sequencesync worker already running? */
     116          52 :     sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
     117          52 :                                                  MyLogicalRepWorker->subid,
     118             :                                                  InvalidOid, true);
     119          52 :     if (sequencesync_worker)
     120             :     {
     121          22 :         LWLockRelease(LogicalRepWorkerLock);
     122          22 :         return;
     123             :     }
     124             : 
     125             :     /*
     126             :      * Count running sync workers for this subscription, while we have the
     127             :      * lock.
     128             :      */
     129          30 :     nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     130          30 :     LWLockRelease(LogicalRepWorkerLock);
     131             : 
     132             :     /*
     133             :      * It is okay to read/update last_seqsync_start_time here in apply worker
     134             :      * as we have already ensured that sync worker doesn't exist.
     135             :      */
     136          30 :     launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
     137          30 :                        &MyLogicalRepWorker->last_seqsync_start_time);
     138             : }
     139             : 
     140             : /*
     141             :  * get_sequences_string
     142             :  *
     143             :  * Build a comma-separated string of schema-qualified sequence names
     144             :  * for the given list of sequence indexes.
     145             :  */
     146             : static void
     147           8 : get_sequences_string(List *seqindexes, StringInfo buf)
     148             : {
     149           8 :     resetStringInfo(buf);
     150          24 :     foreach_int(seqidx, seqindexes)
     151             :     {
     152             :         LogicalRepSequenceInfo *seqinfo =
     153           8 :             (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
     154             : 
     155           8 :         if (buf->len > 0)
     156           0 :             appendStringInfoString(buf, ", ");
     157             : 
     158           8 :         appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
     159             :     }
     160           8 : }
     161             : 
     162             : /*
     163             :  * report_sequence_errors
     164             :  *
     165             :  * Report discrepancies found during sequence synchronization between
     166             :  * the publisher and subscriber. Emits warnings for:
     167             :  * a) mismatched definitions or concurrent rename
     168             :  * b) insufficient privileges
     169             :  * c) missing sequences on the subscriber
     170             :  * Then raises an ERROR to indicate synchronization failure.
     171             :  */
     172             : static void
     173          18 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
     174             :                        List *missing_seqs_idx)
     175             : {
     176             :     StringInfo  seqstr;
     177             : 
     178             :     /* Quick exit if there are no errors to report */
     179          18 :     if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
     180          10 :         return;
     181             : 
     182           8 :     seqstr = makeStringInfo();
     183             : 
     184           8 :     if (mismatched_seqs_idx)
     185             :     {
     186           6 :         get_sequences_string(mismatched_seqs_idx, seqstr);
     187           6 :         ereport(WARNING,
     188             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     189             :                 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
     190             :                               "mismatched or renamed sequences on subscriber (%s)",
     191             :                               list_length(mismatched_seqs_idx),
     192             :                               seqstr->data));
     193             :     }
     194             : 
     195           8 :     if (insuffperm_seqs_idx)
     196             :     {
     197           0 :         get_sequences_string(insuffperm_seqs_idx, seqstr);
     198           0 :         ereport(WARNING,
     199             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     200             :                 errmsg_plural("insufficient privileges on sequence (%s)",
     201             :                               "insufficient privileges on sequences (%s)",
     202             :                               list_length(insuffperm_seqs_idx),
     203             :                               seqstr->data));
     204             :     }
     205             : 
     206           8 :     if (missing_seqs_idx)
     207             :     {
     208           2 :         get_sequences_string(missing_seqs_idx, seqstr);
     209           2 :         ereport(WARNING,
     210             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     211             :                 errmsg_plural("missing sequence on publisher (%s)",
     212             :                               "missing sequences on publisher (%s)",
     213             :                               list_length(missing_seqs_idx),
     214             :                               seqstr->data));
     215             :     }
     216             : 
     217           8 :     ereport(ERROR,
     218             :             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     219             :             errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
     220             :                    MySubscription->name));
     221             : }
     222             : 
     223             : /*
     224             :  * get_and_validate_seq_info
     225             :  *
     226             :  * Extracts remote sequence information from the tuple slot received from the
     227             :  * publisher, and validates it against the corresponding local sequence
     228             :  * definition.
     229             :  */
     230             : static CopySeqResult
     231          24 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
     232             :                           LogicalRepSequenceInfo **seqinfo, int *seqidx)
     233             : {
     234             :     bool        isnull;
     235          24 :     int         col = 0;
     236             :     Oid         remote_typid;
     237             :     int64       remote_start;
     238             :     int64       remote_increment;
     239             :     int64       remote_min;
     240             :     int64       remote_max;
     241             :     bool        remote_cycle;
     242          24 :     CopySeqResult result = COPYSEQ_SUCCESS;
     243             :     HeapTuple   tup;
     244             :     Form_pg_sequence local_seq;
     245             :     LogicalRepSequenceInfo *seqinfo_local;
     246             : 
     247          24 :     *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
     248             :     Assert(!isnull);
     249             : 
     250             :     /* Identify the corresponding local sequence for the given index. */
     251          24 :     *seqinfo = seqinfo_local =
     252          24 :         (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
     253             : 
     254          24 :     seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     255             :     Assert(!isnull);
     256             : 
     257          24 :     seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     258             :     Assert(!isnull);
     259             : 
     260          24 :     seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
     261             :     Assert(!isnull);
     262             : 
     263          24 :     remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
     264             :     Assert(!isnull);
     265             : 
     266          24 :     remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     267             :     Assert(!isnull);
     268             : 
     269          24 :     remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     270             :     Assert(!isnull);
     271             : 
     272          24 :     remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     273             :     Assert(!isnull);
     274             : 
     275          24 :     remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     276             :     Assert(!isnull);
     277             : 
     278          24 :     remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     279             :     Assert(!isnull);
     280             : 
     281             :     /* Sanity check */
     282             :     Assert(col == REMOTE_SEQ_COL_COUNT);
     283             : 
     284          24 :     seqinfo_local->found_on_pub = true;
     285             : 
     286          24 :     *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
     287             : 
     288             :     /* Sequence was concurrently dropped? */
     289          24 :     if (!*sequence_rel)
     290           0 :         return COPYSEQ_SKIPPED;
     291             : 
     292          24 :     tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
     293             : 
     294             :     /* Sequence was concurrently dropped? */
     295          24 :     if (!HeapTupleIsValid(tup))
     296           0 :         elog(ERROR, "cache lookup failed for sequence %u",
     297             :              seqinfo_local->localrelid);
     298             : 
     299          24 :     local_seq = (Form_pg_sequence) GETSTRUCT(tup);
     300             : 
     301             :     /* Sequence parameters for remote/local are the same? */
     302          24 :     if (local_seq->seqtypid != remote_typid ||
     303          24 :         local_seq->seqstart != remote_start ||
     304          22 :         local_seq->seqincrement != remote_increment ||
     305          18 :         local_seq->seqmin != remote_min ||
     306          18 :         local_seq->seqmax != remote_max ||
     307          18 :         local_seq->seqcycle != remote_cycle)
     308           6 :         result = COPYSEQ_MISMATCH;
     309             : 
     310             :     /* Sequence was concurrently renamed? */
     311          24 :     if (strcmp(seqinfo_local->nspname,
     312          24 :                get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
     313          24 :         strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
     314           0 :         result = COPYSEQ_MISMATCH;
     315             : 
     316          24 :     ReleaseSysCache(tup);
     317          24 :     return result;
     318             : }
     319             : 
     320             : /*
     321             :  * Apply remote sequence state to local sequence and mark it as
     322             :  * synchronized (READY).
     323             :  */
     324             : static CopySeqResult
     325          18 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
     326             : {
     327             :     UserContext ucxt;
     328             :     AclResult   aclresult;
     329          18 :     bool        run_as_owner = MySubscription->runasowner;
     330          18 :     Oid         seqoid = seqinfo->localrelid;
     331             : 
     332             :     /*
     333             :      * If the user did not opt to run as the owner of the subscription
     334             :      * ('run_as_owner'), then copy the sequence as the owner of the sequence.
     335             :      */
     336          18 :     if (!run_as_owner)
     337          18 :         SwitchToUntrustedUser(seqowner, &ucxt);
     338             : 
     339          18 :     aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
     340             : 
     341          18 :     if (aclresult != ACLCHECK_OK)
     342             :     {
     343           0 :         if (!run_as_owner)
     344           0 :             RestoreUserContext(&ucxt);
     345             : 
     346           0 :         return COPYSEQ_INSUFFICIENT_PERM;
     347             :     }
     348             : 
     349             :     /*
     350             :      * The log counter (log_cnt) tracks how many sequence values are still
     351             :      * unused locally. It is only relevant to the local node and managed
     352             :      * internally by nextval() when allocating new ranges. Since log_cnt does
     353             :      * not affect the visible sequence state (like last_value or is_called)
     354             :      * and is only used for local caching, it need not be copied to the
     355             :      * subscriber during synchronization.
     356             :      */
     357          18 :     SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
     358             : 
     359          18 :     if (!run_as_owner)
     360          18 :         RestoreUserContext(&ucxt);
     361             : 
     362             :     /*
     363             :      * Record the remote sequence's LSN in pg_subscription_rel and mark the
     364             :      * sequence as READY.
     365             :      */
     366          18 :     UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
     367             :                                seqinfo->page_lsn, false);
     368             : 
     369          18 :     return COPYSEQ_SUCCESS;
     370             : }
     371             : 
     372             : /*
     373             :  * Copy existing data of sequences from the publisher.
     374             :  */
     375             : static void
     376          18 : copy_sequences(WalReceiverConn *conn)
     377             : {
     378          18 :     int         cur_batch_base_index = 0;
     379          18 :     int         n_seqinfos = list_length(seqinfos);
     380          18 :     List       *mismatched_seqs_idx = NIL;
     381          18 :     List       *missing_seqs_idx = NIL;
     382          18 :     List       *insuffperm_seqs_idx = NIL;
     383          18 :     StringInfo  seqstr = makeStringInfo();
     384          18 :     StringInfo  cmd = makeStringInfo();
     385             :     MemoryContext oldctx;
     386             : 
     387             : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
     388             : 
     389          18 :     elog(DEBUG1,
     390             :          "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
     391             :          MySubscription->name, n_seqinfos);
     392             : 
     393          36 :     while (cur_batch_base_index < n_seqinfos)
     394             :     {
     395          18 :         Oid         seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
     396             :         BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
     397          18 :         int         batch_size = 0;
     398          18 :         int         batch_succeeded_count = 0;
     399          18 :         int         batch_mismatched_count = 0;
     400          18 :         int         batch_skipped_count = 0;
     401          18 :         int         batch_insuffperm_count = 0;
     402             :         int         batch_missing_count;
     403             :         Relation    sequence_rel;
     404             : 
     405             :         WalRcvExecResult *res;
     406             :         TupleTableSlot *slot;
     407             : 
     408          18 :         StartTransactionCommand();
     409             : 
     410          44 :         for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
     411             :         {
     412             :             char       *nspname_literal;
     413             :             char       *seqname_literal;
     414             : 
     415             :             LogicalRepSequenceInfo *seqinfo =
     416          26 :                 (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     417             : 
     418          26 :             if (seqstr->len > 0)
     419           8 :                 appendStringInfoString(seqstr, ", ");
     420             : 
     421          26 :             nspname_literal = quote_literal_cstr(seqinfo->nspname);
     422          26 :             seqname_literal = quote_literal_cstr(seqinfo->seqname);
     423             : 
     424          26 :             appendStringInfo(seqstr, "(%s, %s, %d)",
     425             :                              nspname_literal, seqname_literal, idx);
     426             : 
     427          26 :             if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
     428           0 :                 break;
     429             :         }
     430             : 
     431             :         /*
     432             :          * We deliberately avoid acquiring a local lock on the sequence before
     433             :          * querying the publisher to prevent potential distributed deadlocks
     434             :          * in bi-directional replication setups.
     435             :          *
     436             :          * Example scenario:
     437             :          *
     438             :          * - On each node, a background worker acquires a lock on a sequence
     439             :          * as part of a sync operation.
     440             :          *
     441             :          * - Concurrently, a user transaction attempts to alter the same
     442             :          * sequence, waiting on the background worker's lock.
     443             :          *
     444             :          * - Meanwhile, a query from the other node tries to access metadata
     445             :          * that depends on the completion of the alter operation.
     446             :          *
     447             :          * - This creates a circular wait across nodes:
     448             :          *
     449             :          * Node-1: Query -> waits on Alter -> waits on Sync Worker
     450             :          *
     451             :          * Node-2: Query -> waits on Alter -> waits on Sync Worker
     452             :          *
     453             :          * Since each node only sees part of the wait graph, the deadlock may
     454             :          * go undetected, leading to indefinite blocking.
     455             :          *
     456             :          * Note: Each entry in VALUES includes an index 'seqidx' that
     457             :          * represents the sequence's position in the local 'seqinfos' list.
     458             :          * This index is propagated to the query results and later used to
     459             :          * directly map the fetched publisher sequence rows back to their
     460             :          * corresponding local entries without relying on result order or name
     461             :          * matching.
     462             :          */
     463          18 :         appendStringInfo(cmd,
     464             :                          "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
     465             :                          "       seq.seqstart, seq.seqincrement, seq.seqmin,\n"
     466             :                          "       seq.seqmax, seq.seqcycle\n"
     467             :                          "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
     468             :                          "JOIN pg_namespace n ON n.nspname = s.schname\n"
     469             :                          "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
     470             :                          "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
     471             :                          "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
     472             :                          seqstr->data);
     473             : 
     474          18 :         res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
     475          18 :         if (res->status != WALRCV_OK_TUPLES)
     476           0 :             ereport(ERROR,
     477             :                     errcode(ERRCODE_CONNECTION_FAILURE),
     478             :                     errmsg("could not fetch sequence information from the publisher: %s",
     479             :                            res->err));
     480             : 
     481          18 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     482          42 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     483             :         {
     484             :             CopySeqResult sync_status;
     485             :             LogicalRepSequenceInfo *seqinfo;
     486             :             int         seqidx;
     487             : 
     488          24 :             CHECK_FOR_INTERRUPTS();
     489             : 
     490          24 :             if (ConfigReloadPending)
     491             :             {
     492           0 :                 ConfigReloadPending = false;
     493           0 :                 ProcessConfigFile(PGC_SIGHUP);
     494             :             }
     495             : 
     496          24 :             sync_status = get_and_validate_seq_info(slot, &sequence_rel,
     497             :                                                     &seqinfo, &seqidx);
     498          24 :             if (sync_status == COPYSEQ_SUCCESS)
     499          18 :                 sync_status = copy_sequence(seqinfo,
     500          18 :                                             sequence_rel->rd_rel->relowner);
     501             : 
     502          24 :             switch (sync_status)
     503             :             {
     504          18 :                 case COPYSEQ_SUCCESS:
     505          18 :                     elog(DEBUG1,
     506             :                          "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
     507             :                          MySubscription->name, seqinfo->nspname,
     508             :                          seqinfo->seqname);
     509          18 :                     batch_succeeded_count++;
     510          18 :                     break;
     511           6 :                 case COPYSEQ_MISMATCH:
     512             : 
     513             :                     /*
     514             :                      * Remember mismatched sequences in a long-lived memory
     515             :                      * context since these will be used after the transaction
     516             :                      * is committed.
     517             :                      */
     518           6 :                     oldctx = MemoryContextSwitchTo(ApplyContext);
     519           6 :                     mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
     520             :                                                       seqidx);
     521           6 :                     MemoryContextSwitchTo(oldctx);
     522           6 :                     batch_mismatched_count++;
     523           6 :                     break;
     524           0 :                 case COPYSEQ_INSUFFICIENT_PERM:
     525             : 
     526             :                     /*
     527             :                      * Remember sequences with insufficient privileges in a
     528             :                      * long-lived memory context since these will be used
     529             :                      * after the transaction is committed.
     530             :                      */
     531           0 :                     oldctx = MemoryContextSwitchTo(ApplyContext);
     532           0 :                     insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
     533             :                                                       seqidx);
     534           0 :                     MemoryContextSwitchTo(oldctx);
     535           0 :                     batch_insuffperm_count++;
     536           0 :                     break;
     537           0 :                 case COPYSEQ_SKIPPED:
     538           0 :                     ereport(LOG,
     539             :                             errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
     540             :                                    seqinfo->nspname,
     541             :                                    seqinfo->seqname));
     542           0 :                     batch_skipped_count++;
     543           0 :                     break;
     544             :             }
     545             : 
     546          24 :             if (sequence_rel)
     547          24 :                 table_close(sequence_rel, NoLock);
     548             :         }
     549             : 
     550          18 :         ExecDropSingleTupleTableSlot(slot);
     551          18 :         walrcv_clear_result(res);
     552          18 :         resetStringInfo(seqstr);
     553          18 :         resetStringInfo(cmd);
     554             : 
     555          18 :         batch_missing_count = batch_size - (batch_succeeded_count +
     556          18 :                                             batch_mismatched_count +
     557          18 :                                             batch_insuffperm_count +
     558             :                                             batch_skipped_count);
     559             : 
     560          18 :         elog(DEBUG1,
     561             :              "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
     562             :              MySubscription->name,
     563             :              (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
     564             :              batch_size, batch_succeeded_count, batch_mismatched_count,
     565             :              batch_insuffperm_count, batch_missing_count, batch_skipped_count);
     566             : 
     567             :         /* Commit this batch, and prepare for next batch */
     568          18 :         CommitTransactionCommand();
     569             : 
     570          18 :         if (batch_missing_count)
     571             :         {
     572           4 :             for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
     573             :             {
     574             :                 LogicalRepSequenceInfo *seqinfo =
     575           2 :                     (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     576             : 
     577             :                 /* If the sequence was not found on publisher, record it */
     578           2 :                 if (!seqinfo->found_on_pub)
     579           2 :                     missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
     580             :             }
     581             :         }
     582             : 
     583             :         /*
     584             :          * cur_batch_base_index is not incremented sequentially because some
     585             :          * sequences may be missing, and the number of fetched rows may not
     586             :          * match the batch size.
     587             :          */
     588          18 :         cur_batch_base_index += batch_size;
     589             :     }
     590             : 
     591             :     /* Report mismatches, permission issues, or missing sequences */
     592          18 :     report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
     593             :                            missing_seqs_idx);
     594          10 : }
     595             : 
     596             : /*
     597             :  * Identifies sequences that require synchronization and initiates the
     598             :  * synchronization process.
     599             :  */
     600             : static void
     601          18 : LogicalRepSyncSequences(void)
     602             : {
     603             :     char       *err;
     604             :     bool        must_use_password;
     605             :     Relation    rel;
     606             :     HeapTuple   tup;
     607             :     ScanKeyData skey[2];
     608             :     SysScanDesc scan;
     609          18 :     Oid         subid = MyLogicalRepWorker->subid;
     610             :     StringInfoData app_name;
     611             : 
     612          18 :     StartTransactionCommand();
     613             : 
     614          18 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     615             : 
     616          18 :     ScanKeyInit(&skey[0],
     617             :                 Anum_pg_subscription_rel_srsubid,
     618             :                 BTEqualStrategyNumber, F_OIDEQ,
     619             :                 ObjectIdGetDatum(subid));
     620             : 
     621          18 :     ScanKeyInit(&skey[1],
     622             :                 Anum_pg_subscription_rel_srsubstate,
     623             :                 BTEqualStrategyNumber, F_CHAREQ,
     624             :                 CharGetDatum(SUBREL_STATE_INIT));
     625             : 
     626          18 :     scan = systable_beginscan(rel, InvalidOid, false,
     627             :                               NULL, 2, skey);
     628          46 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     629             :     {
     630             :         Form_pg_subscription_rel subrel;
     631             :         LogicalRepSequenceInfo *seq;
     632             :         Relation    sequence_rel;
     633             :         MemoryContext oldctx;
     634             : 
     635          28 :         CHECK_FOR_INTERRUPTS();
     636             : 
     637          28 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     638             : 
     639          28 :         sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
     640             : 
     641             :         /* Skip if sequence was dropped concurrently */
     642          28 :         if (!sequence_rel)
     643           0 :             continue;
     644             : 
     645             :         /* Skip if the relation is not a sequence */
     646          28 :         if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
     647             :         {
     648           2 :             table_close(sequence_rel, NoLock);
     649           2 :             continue;
     650             :         }
     651             : 
     652             :         /*
     653             :          * Worker needs to process sequences across transaction boundary, so
     654             :          * allocate them under long-lived context.
     655             :          */
     656          26 :         oldctx = MemoryContextSwitchTo(ApplyContext);
     657             : 
     658          26 :         seq = palloc0_object(LogicalRepSequenceInfo);
     659          26 :         seq->localrelid = subrel->srrelid;
     660          26 :         seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
     661          26 :         seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
     662          26 :         seqinfos = lappend(seqinfos, seq);
     663             : 
     664          26 :         MemoryContextSwitchTo(oldctx);
     665             : 
     666          26 :         table_close(sequence_rel, NoLock);
     667             :     }
     668             : 
     669             :     /* Cleanup */
     670          18 :     systable_endscan(scan);
     671          18 :     table_close(rel, AccessShareLock);
     672             : 
     673          18 :     CommitTransactionCommand();
     674             : 
     675             :     /*
     676             :      * Exit early if no catalog entries found, likely due to concurrent drops.
     677             :      */
     678          18 :     if (!seqinfos)
     679           0 :         return;
     680             : 
     681             :     /* Is the use of a password mandatory? */
     682          36 :     must_use_password = MySubscription->passwordrequired &&
     683          18 :         !MySubscription->ownersuperuser;
     684             : 
     685          18 :     initStringInfo(&app_name);
     686          18 :     appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
     687          18 :                      MySubscription->oid, GetSystemIdentifier());
     688             : 
     689             :     /*
     690             :      * Establish the connection to the publisher for sequence synchronization.
     691             :      */
     692          18 :     LogRepWorkerWalRcvConn =
     693          18 :         walrcv_connect(MySubscription->conninfo, true, true,
     694             :                        must_use_password,
     695             :                        app_name.data, &err);
     696          18 :     if (LogRepWorkerWalRcvConn == NULL)
     697           0 :         ereport(ERROR,
     698             :                 errcode(ERRCODE_CONNECTION_FAILURE),
     699             :                 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
     700             :                        MySubscription->name, err));
     701             : 
     702          18 :     pfree(app_name.data);
     703             : 
     704          18 :     copy_sequences(LogRepWorkerWalRcvConn);
     705             : }
     706             : 
     707             : /*
     708             :  * Execute the initial sync with error handling. Disable the subscription,
     709             :  * if required.
     710             :  *
     711             :  * Note that we don't handle FATAL errors which are probably because of system
     712             :  * resource error and are not repeatable.
     713             :  */
     714             : static void
     715          18 : start_sequence_sync(void)
     716             : {
     717             :     Assert(am_sequencesync_worker());
     718             : 
     719          18 :     PG_TRY();
     720             :     {
     721             :         /* Call initial sync. */
     722          18 :         LogicalRepSyncSequences();
     723             :     }
     724           8 :     PG_CATCH();
     725             :     {
     726           8 :         if (MySubscription->disableonerr)
     727           0 :             DisableSubscriptionAndExit();
     728             :         else
     729             :         {
     730             :             /*
     731             :              * Report the worker failed during sequence synchronization. Abort
     732             :              * the current transaction so that the stats message is sent in an
     733             :              * idle state.
     734             :              */
     735           8 :             AbortOutOfAnyTransaction();
     736           8 :             pgstat_report_subscription_error(MySubscription->oid,
     737             :                                              WORKERTYPE_SEQUENCESYNC);
     738             : 
     739           8 :             PG_RE_THROW();
     740             :         }
     741             :     }
     742          10 :     PG_END_TRY();
     743          10 : }
     744             : 
     745             : /* Logical Replication sequencesync worker entry point */
     746             : void
     747          18 : SequenceSyncWorkerMain(Datum main_arg)
     748             : {
     749          18 :     int         worker_slot = DatumGetInt32(main_arg);
     750             : 
     751          18 :     SetupApplyOrSyncWorker(worker_slot);
     752             : 
     753          18 :     start_sequence_sync();
     754             : 
     755          10 :     FinishSyncWorker();
     756             : }

Generated by: LCOV version 1.16