LCOV - code coverage report
Current view: top level - src/backend/replication/logical - sequencesync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 199 226 88.1 %
Date: 2025-11-16 08:18:22 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * sequencesync.c
       3             :  *    PostgreSQL logical replication: sequence synchronization
       4             :  *
       5             :  * Copyright (c) 2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/sequencesync.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code for sequence synchronization for
      12             :  *    logical replication.
      13             :  *
      14             :  * Sequences requiring synchronization are tracked in the pg_subscription_rel
      15             :  * catalog.
      16             :  *
      17             :  * Sequences to be synchronized will be added with state INIT when either of
      18             :  * the following commands is executed:
      19             :  * CREATE SUBSCRIPTION
      20             :  * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
      21             :  *
      22             :  * Executing the following command resets all sequences in the subscription to
      23             :  * state INIT, triggering re-synchronization:
      24             :  * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
      25             :  *
      26             :  * The apply worker periodically scans pg_subscription_rel for sequences in
      27             :  * INIT state. When such sequences are found, it spawns a sequencesync worker
      28             :  * to handle synchronization.
      29             :  *
      30             :  * A single sequencesync worker is responsible for synchronizing all sequences.
      31             :  * It begins by retrieving the list of sequences that are flagged for
      32             :  * synchronization, i.e., those in the INIT state. These sequences are then
      33             :  * processed in batches, allowing multiple entries to be synchronized within a
      34             :  * single transaction. The worker fetches the current sequence values and page
      35             :  * LSNs from the remote publisher, updates the corresponding sequences on the
      36             :  * local subscriber, and finally marks each sequence as READY upon successful
      37             :  * synchronization.
      38             :  *
      39             :  * Sequence state transitions follow this pattern:
      40             :  *   INIT -> READY
      41             :  *
      42             :  * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
      43             :  * sequences are synchronized per transaction. The locks on the sequence
      44             :  * relation will be periodically released at each transaction commit.
      45             :  *
      46             :  * XXX: We didn't choose launcher process to maintain the launch of sequencesync
      47             :  * worker as it didn't have database connection to access the sequences from the
      48             :  * pg_subscription_rel system catalog that need to be synchronized.
      49             :  *-------------------------------------------------------------------------
      50             :  */
      51             : 
      52             : #include "postgres.h"
      53             : 
      54             : #include "access/table.h"
      55             : #include "catalog/pg_sequence.h"
      56             : #include "catalog/pg_subscription_rel.h"
      57             : #include "commands/sequence.h"
      58             : #include "pgstat.h"
      59             : #include "postmaster/interrupt.h"
      60             : #include "replication/logicalworker.h"
      61             : #include "replication/worker_internal.h"
      62             : #include "utils/acl.h"
      63             : #include "utils/builtins.h"
      64             : #include "utils/fmgroids.h"
      65             : #include "utils/guc.h"
      66             : #include "utils/inval.h"
      67             : #include "utils/lsyscache.h"
      68             : #include "utils/memutils.h"
      69             : #include "utils/pg_lsn.h"
      70             : #include "utils/syscache.h"
      71             : #include "utils/usercontext.h"
      72             : 
      73             : #define REMOTE_SEQ_COL_COUNT 10
      74             : 
      75             : typedef enum CopySeqResult
      76             : {
      77             :     COPYSEQ_SUCCESS,
      78             :     COPYSEQ_MISMATCH,
      79             :     COPYSEQ_INSUFFICIENT_PERM,
      80             :     COPYSEQ_SKIPPED
      81             : } CopySeqResult;
      82             : 
      83             : static List *seqinfos = NIL;
      84             : 
      85             : /*
      86             :  * Apply worker determines if sequence synchronization is needed.
      87             :  *
      88             :  * Start a sequencesync worker if one is not already running. The active
      89             :  * sequencesync worker will handle all pending sequence synchronization. If any
      90             :  * sequences remain unsynchronized after it exits, a new worker can be started
      91             :  * in the next iteration.
      92             :  */
      93             : void
      94       11976 : ProcessSequencesForSync(void)
      95             : {
      96             :     LogicalRepWorker *sequencesync_worker;
      97             :     int         nsyncworkers;
      98             :     bool        has_pending_sequences;
      99             :     bool        started_tx;
     100             : 
     101       11976 :     FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
     102             : 
     103       11976 :     if (started_tx)
     104             :     {
     105         356 :         CommitTransactionCommand();
     106         356 :         pgstat_report_stat(true);
     107             :     }
     108             : 
     109       11976 :     if (!has_pending_sequences)
     110       11944 :         return;
     111             : 
     112          56 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     113             : 
     114             :     /* Check if there is a sequencesync worker already running? */
     115          56 :     sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
     116          56 :                                                  MyLogicalRepWorker->subid,
     117             :                                                  InvalidOid, true);
     118          56 :     if (sequencesync_worker)
     119             :     {
     120          24 :         LWLockRelease(LogicalRepWorkerLock);
     121          24 :         return;
     122             :     }
     123             : 
     124             :     /*
     125             :      * Count running sync workers for this subscription, while we have the
     126             :      * lock.
     127             :      */
     128          32 :     nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     129          32 :     LWLockRelease(LogicalRepWorkerLock);
     130             : 
     131             :     /*
     132             :      * It is okay to read/update last_seqsync_start_time here in apply worker
     133             :      * as we have already ensured that sync worker doesn't exist.
     134             :      */
     135          32 :     launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
     136          32 :                        &MyLogicalRepWorker->last_seqsync_start_time);
     137             : }
     138             : 
     139             : /*
     140             :  * get_sequences_string
     141             :  *
     142             :  * Build a comma-separated string of schema-qualified sequence names
     143             :  * for the given list of sequence indexes.
     144             :  */
     145             : static void
     146           8 : get_sequences_string(List *seqindexes, StringInfo buf)
     147             : {
     148           8 :     resetStringInfo(buf);
     149          24 :     foreach_int(seqidx, seqindexes)
     150             :     {
     151             :         LogicalRepSequenceInfo *seqinfo =
     152           8 :             (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
     153             : 
     154           8 :         if (buf->len > 0)
     155           0 :             appendStringInfoString(buf, ", ");
     156             : 
     157           8 :         appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
     158             :     }
     159           8 : }
     160             : 
     161             : /*
     162             :  * report_sequence_errors
     163             :  *
     164             :  * Report discrepancies found during sequence synchronization between
     165             :  * the publisher and subscriber. Emits warnings for:
     166             :  * a) mismatched definitions or concurrent rename
     167             :  * b) insufficient privileges
     168             :  * c) missing sequences on the subscriber
     169             :  * Then raises an ERROR to indicate synchronization failure.
     170             :  */
     171             : static void
     172          18 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
     173             :                        List *missing_seqs_idx)
     174             : {
     175             :     StringInfo  seqstr;
     176             : 
     177             :     /* Quick exit if there are no errors to report */
     178          18 :     if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
     179          10 :         return;
     180             : 
     181           8 :     seqstr = makeStringInfo();
     182             : 
     183           8 :     if (mismatched_seqs_idx)
     184             :     {
     185           6 :         get_sequences_string(mismatched_seqs_idx, seqstr);
     186           6 :         ereport(WARNING,
     187             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     188             :                 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
     189             :                               "mismatched or renamed sequences on subscriber (%s)",
     190             :                               list_length(mismatched_seqs_idx),
     191             :                               seqstr->data));
     192             :     }
     193             : 
     194           8 :     if (insuffperm_seqs_idx)
     195             :     {
     196           0 :         get_sequences_string(insuffperm_seqs_idx, seqstr);
     197           0 :         ereport(WARNING,
     198             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     199             :                 errmsg_plural("insufficient privileges on sequence (%s)",
     200             :                               "insufficient privileges on sequences (%s)",
     201             :                               list_length(insuffperm_seqs_idx),
     202             :                               seqstr->data));
     203             :     }
     204             : 
     205           8 :     if (missing_seqs_idx)
     206             :     {
     207           2 :         get_sequences_string(missing_seqs_idx, seqstr);
     208           2 :         ereport(WARNING,
     209             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     210             :                 errmsg_plural("missing sequence on publisher (%s)",
     211             :                               "missing sequences on publisher (%s)",
     212             :                               list_length(missing_seqs_idx),
     213             :                               seqstr->data));
     214             :     }
     215             : 
     216           8 :     ereport(ERROR,
     217             :             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     218             :             errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
     219             :                    MySubscription->name));
     220             : }
     221             : 
     222             : /*
     223             :  * get_and_validate_seq_info
     224             :  *
     225             :  * Extracts remote sequence information from the tuple slot received from the
     226             :  * publisher, and validates it against the corresponding local sequence
     227             :  * definition.
     228             :  */
     229             : static CopySeqResult
     230          24 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
     231             :                           LogicalRepSequenceInfo **seqinfo, int *seqidx)
     232             : {
     233             :     bool        isnull;
     234          24 :     int         col = 0;
     235             :     Oid         remote_typid;
     236             :     int64       remote_start;
     237             :     int64       remote_increment;
     238             :     int64       remote_min;
     239             :     int64       remote_max;
     240             :     bool        remote_cycle;
     241          24 :     CopySeqResult result = COPYSEQ_SUCCESS;
     242             :     HeapTuple   tup;
     243             :     Form_pg_sequence local_seq;
     244             :     LogicalRepSequenceInfo *seqinfo_local;
     245             : 
     246          24 :     *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
     247             :     Assert(!isnull);
     248             : 
     249             :     /* Identify the corresponding local sequence for the given index. */
     250          24 :     *seqinfo = seqinfo_local =
     251          24 :         (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
     252             : 
     253          24 :     seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     254             :     Assert(!isnull);
     255             : 
     256          24 :     seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     257             :     Assert(!isnull);
     258             : 
     259          24 :     seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
     260             :     Assert(!isnull);
     261             : 
     262          24 :     remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
     263             :     Assert(!isnull);
     264             : 
     265          24 :     remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     266             :     Assert(!isnull);
     267             : 
     268          24 :     remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     269             :     Assert(!isnull);
     270             : 
     271          24 :     remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     272             :     Assert(!isnull);
     273             : 
     274          24 :     remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     275             :     Assert(!isnull);
     276             : 
     277          24 :     remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     278             :     Assert(!isnull);
     279             : 
     280             :     /* Sanity check */
     281             :     Assert(col == REMOTE_SEQ_COL_COUNT);
     282             : 
     283          24 :     seqinfo_local->found_on_pub = true;
     284             : 
     285          24 :     *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
     286             : 
     287             :     /* Sequence was concurrently dropped? */
     288          24 :     if (!*sequence_rel)
     289           0 :         return COPYSEQ_SKIPPED;
     290             : 
     291          24 :     tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
     292             : 
     293             :     /* Sequence was concurrently dropped? */
     294          24 :     if (!HeapTupleIsValid(tup))
     295           0 :         elog(ERROR, "cache lookup failed for sequence %u",
     296             :              seqinfo_local->localrelid);
     297             : 
     298          24 :     local_seq = (Form_pg_sequence) GETSTRUCT(tup);
     299             : 
     300             :     /* Sequence parameters for remote/local are the same? */
     301          24 :     if (local_seq->seqtypid != remote_typid ||
     302          24 :         local_seq->seqstart != remote_start ||
     303          22 :         local_seq->seqincrement != remote_increment ||
     304          18 :         local_seq->seqmin != remote_min ||
     305          18 :         local_seq->seqmax != remote_max ||
     306          18 :         local_seq->seqcycle != remote_cycle)
     307           6 :         result = COPYSEQ_MISMATCH;
     308             : 
     309             :     /* Sequence was concurrently renamed? */
     310          24 :     if (strcmp(seqinfo_local->nspname,
     311          24 :                get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
     312          24 :         strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
     313           0 :         result = COPYSEQ_MISMATCH;
     314             : 
     315          24 :     ReleaseSysCache(tup);
     316          24 :     return result;
     317             : }
     318             : 
     319             : /*
     320             :  * Apply remote sequence state to local sequence and mark it as
     321             :  * synchronized (READY).
     322             :  */
     323             : static CopySeqResult
     324          18 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
     325             : {
     326             :     UserContext ucxt;
     327             :     AclResult   aclresult;
     328          18 :     bool        run_as_owner = MySubscription->runasowner;
     329          18 :     Oid         seqoid = seqinfo->localrelid;
     330             : 
     331             :     /*
     332             :      * If the user did not opt to run as the owner of the subscription
     333             :      * ('run_as_owner'), then copy the sequence as the owner of the sequence.
     334             :      */
     335          18 :     if (!run_as_owner)
     336          18 :         SwitchToUntrustedUser(seqowner, &ucxt);
     337             : 
     338          18 :     aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
     339             : 
     340          18 :     if (aclresult != ACLCHECK_OK)
     341             :     {
     342           0 :         if (!run_as_owner)
     343           0 :             RestoreUserContext(&ucxt);
     344             : 
     345           0 :         return COPYSEQ_INSUFFICIENT_PERM;
     346             :     }
     347             : 
     348             :     /*
     349             :      * The log counter (log_cnt) tracks how many sequence values are still
     350             :      * unused locally. It is only relevant to the local node and managed
     351             :      * internally by nextval() when allocating new ranges. Since log_cnt does
     352             :      * not affect the visible sequence state (like last_value or is_called)
     353             :      * and is only used for local caching, it need not be copied to the
     354             :      * subscriber during synchronization.
     355             :      */
     356          18 :     SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
     357             : 
     358          18 :     if (!run_as_owner)
     359          18 :         RestoreUserContext(&ucxt);
     360             : 
     361             :     /*
     362             :      * Record the remote sequence's LSN in pg_subscription_rel and mark the
     363             :      * sequence as READY.
     364             :      */
     365          18 :     UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
     366             :                                seqinfo->page_lsn, false);
     367             : 
     368          18 :     return COPYSEQ_SUCCESS;
     369             : }
     370             : 
     371             : /*
     372             :  * Copy existing data of sequences from the publisher.
     373             :  */
     374             : static void
     375          18 : copy_sequences(WalReceiverConn *conn)
     376             : {
     377          18 :     int         cur_batch_base_index = 0;
     378          18 :     int         n_seqinfos = list_length(seqinfos);
     379          18 :     List       *mismatched_seqs_idx = NIL;
     380          18 :     List       *missing_seqs_idx = NIL;
     381          18 :     List       *insuffperm_seqs_idx = NIL;
     382          18 :     StringInfo  seqstr = makeStringInfo();
     383          18 :     StringInfo  cmd = makeStringInfo();
     384             :     MemoryContext oldctx;
     385             : 
     386             : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
     387             : 
     388          18 :     elog(DEBUG1,
     389             :          "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
     390             :          MySubscription->name, n_seqinfos);
     391             : 
     392          36 :     while (cur_batch_base_index < n_seqinfos)
     393             :     {
     394          18 :         Oid         seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
     395             :         BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
     396          18 :         int         batch_size = 0;
     397          18 :         int         batch_succeeded_count = 0;
     398          18 :         int         batch_mismatched_count = 0;
     399          18 :         int         batch_skipped_count = 0;
     400          18 :         int         batch_insuffperm_count = 0;
     401             :         int         batch_missing_count;
     402             :         Relation    sequence_rel;
     403             : 
     404             :         WalRcvExecResult *res;
     405             :         TupleTableSlot *slot;
     406             : 
     407          18 :         StartTransactionCommand();
     408             : 
     409          44 :         for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
     410             :         {
     411             :             char       *nspname_literal;
     412             :             char       *seqname_literal;
     413             : 
     414             :             LogicalRepSequenceInfo *seqinfo =
     415          26 :                 (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     416             : 
     417          26 :             if (seqstr->len > 0)
     418           8 :                 appendStringInfoString(seqstr, ", ");
     419             : 
     420          26 :             nspname_literal = quote_literal_cstr(seqinfo->nspname);
     421          26 :             seqname_literal = quote_literal_cstr(seqinfo->seqname);
     422             : 
     423          26 :             appendStringInfo(seqstr, "(%s, %s, %d)",
     424             :                              nspname_literal, seqname_literal, idx);
     425             : 
     426          26 :             if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
     427           0 :                 break;
     428             :         }
     429             : 
     430             :         /*
     431             :          * We deliberately avoid acquiring a local lock on the sequence before
     432             :          * querying the publisher to prevent potential distributed deadlocks
     433             :          * in bi-directional replication setups.
     434             :          *
     435             :          * Example scenario:
     436             :          *
     437             :          * - On each node, a background worker acquires a lock on a sequence
     438             :          * as part of a sync operation.
     439             :          *
     440             :          * - Concurrently, a user transaction attempts to alter the same
     441             :          * sequence, waiting on the background worker's lock.
     442             :          *
     443             :          * - Meanwhile, a query from the other node tries to access metadata
     444             :          * that depends on the completion of the alter operation.
     445             :          *
     446             :          * - This creates a circular wait across nodes:
     447             :          *
     448             :          * Node-1: Query -> waits on Alter -> waits on Sync Worker
     449             :          *
     450             :          * Node-2: Query -> waits on Alter -> waits on Sync Worker
     451             :          *
     452             :          * Since each node only sees part of the wait graph, the deadlock may
     453             :          * go undetected, leading to indefinite blocking.
     454             :          *
     455             :          * Note: Each entry in VALUES includes an index 'seqidx' that
     456             :          * represents the sequence's position in the local 'seqinfos' list.
     457             :          * This index is propagated to the query results and later used to
     458             :          * directly map the fetched publisher sequence rows back to their
     459             :          * corresponding local entries without relying on result order or name
     460             :          * matching.
     461             :          */
     462          18 :         appendStringInfo(cmd,
     463             :                          "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
     464             :                          "       seq.seqstart, seq.seqincrement, seq.seqmin,\n"
     465             :                          "       seq.seqmax, seq.seqcycle\n"
     466             :                          "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
     467             :                          "JOIN pg_namespace n ON n.nspname = s.schname\n"
     468             :                          "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
     469             :                          "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
     470             :                          "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
     471             :                          seqstr->data);
     472             : 
     473          18 :         res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
     474          18 :         if (res->status != WALRCV_OK_TUPLES)
     475           0 :             ereport(ERROR,
     476             :                     errcode(ERRCODE_CONNECTION_FAILURE),
     477             :                     errmsg("could not fetch sequence information from the publisher: %s",
     478             :                            res->err));
     479             : 
     480          18 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     481          42 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     482             :         {
     483             :             CopySeqResult sync_status;
     484             :             LogicalRepSequenceInfo *seqinfo;
     485             :             int         seqidx;
     486             : 
     487          24 :             CHECK_FOR_INTERRUPTS();
     488             : 
     489          24 :             if (ConfigReloadPending)
     490             :             {
     491           0 :                 ConfigReloadPending = false;
     492           0 :                 ProcessConfigFile(PGC_SIGHUP);
     493             :             }
     494             : 
     495          24 :             sync_status = get_and_validate_seq_info(slot, &sequence_rel,
     496             :                                                     &seqinfo, &seqidx);
     497          24 :             if (sync_status == COPYSEQ_SUCCESS)
     498          18 :                 sync_status = copy_sequence(seqinfo,
     499          18 :                                             sequence_rel->rd_rel->relowner);
     500             : 
     501          24 :             switch (sync_status)
     502             :             {
     503          18 :                 case COPYSEQ_SUCCESS:
     504          18 :                     elog(DEBUG1,
     505             :                          "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
     506             :                          MySubscription->name, seqinfo->nspname,
     507             :                          seqinfo->seqname);
     508          18 :                     batch_succeeded_count++;
     509          18 :                     break;
     510           6 :                 case COPYSEQ_MISMATCH:
     511             : 
     512             :                     /*
     513             :                      * Remember mismatched sequences in a long-lived memory
     514             :                      * context since these will be used after the transaction
     515             :                      * is committed.
     516             :                      */
     517           6 :                     oldctx = MemoryContextSwitchTo(ApplyContext);
     518           6 :                     mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
     519             :                                                       seqidx);
     520           6 :                     MemoryContextSwitchTo(oldctx);
     521           6 :                     batch_mismatched_count++;
     522           6 :                     break;
     523           0 :                 case COPYSEQ_INSUFFICIENT_PERM:
     524             : 
     525             :                     /*
     526             :                      * Remember sequences with insufficient privileges in a
     527             :                      * long-lived memory context since these will be used
     528             :                      * after the transaction is committed.
     529             :                      */
     530           0 :                     oldctx = MemoryContextSwitchTo(ApplyContext);
     531           0 :                     insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
     532             :                                                       seqidx);
     533           0 :                     MemoryContextSwitchTo(oldctx);
     534           0 :                     batch_insuffperm_count++;
     535           0 :                     break;
     536           0 :                 case COPYSEQ_SKIPPED:
     537           0 :                     ereport(LOG,
     538             :                             errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
     539             :                                    seqinfo->nspname,
     540             :                                    seqinfo->seqname));
     541           0 :                     batch_skipped_count++;
     542           0 :                     break;
     543             :             }
     544             : 
     545          24 :             if (sequence_rel)
     546          24 :                 table_close(sequence_rel, NoLock);
     547             :         }
     548             : 
     549          18 :         ExecDropSingleTupleTableSlot(slot);
     550          18 :         walrcv_clear_result(res);
     551          18 :         resetStringInfo(seqstr);
     552          18 :         resetStringInfo(cmd);
     553             : 
     554          18 :         batch_missing_count = batch_size - (batch_succeeded_count +
     555          18 :                                             batch_mismatched_count +
     556          18 :                                             batch_insuffperm_count +
     557             :                                             batch_skipped_count);
     558             : 
     559          18 :         elog(DEBUG1,
     560             :              "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
     561             :              MySubscription->name,
     562             :              (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
     563             :              batch_size, batch_succeeded_count, batch_mismatched_count,
     564             :              batch_insuffperm_count, batch_missing_count, batch_skipped_count);
     565             : 
     566             :         /* Commit this batch, and prepare for next batch */
     567          18 :         CommitTransactionCommand();
     568             : 
     569          18 :         if (batch_missing_count)
     570             :         {
     571           4 :             for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
     572             :             {
     573             :                 LogicalRepSequenceInfo *seqinfo =
     574           2 :                     (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     575             : 
     576             :                 /* If the sequence was not found on publisher, record it */
     577           2 :                 if (!seqinfo->found_on_pub)
     578           2 :                     missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
     579             :             }
     580             :         }
     581             : 
     582             :         /*
     583             :          * cur_batch_base_index is not incremented sequentially because some
     584             :          * sequences may be missing, and the number of fetched rows may not
     585             :          * match the batch size.
     586             :          */
     587          18 :         cur_batch_base_index += batch_size;
     588             :     }
     589             : 
     590             :     /* Report mismatches, permission issues, or missing sequences */
     591          18 :     report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
     592             :                            missing_seqs_idx);
     593          10 : }
     594             : 
     595             : /*
     596             :  * Identifies sequences that require synchronization and initiates the
     597             :  * synchronization process.
     598             :  */
     599             : static void
     600          18 : LogicalRepSyncSequences(void)
     601             : {
     602             :     char       *err;
     603             :     bool        must_use_password;
     604             :     Relation    rel;
     605             :     HeapTuple   tup;
     606             :     ScanKeyData skey[2];
     607             :     SysScanDesc scan;
     608          18 :     Oid         subid = MyLogicalRepWorker->subid;
     609             :     StringInfoData app_name;
     610             : 
     611          18 :     StartTransactionCommand();
     612             : 
     613          18 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     614             : 
     615          18 :     ScanKeyInit(&skey[0],
     616             :                 Anum_pg_subscription_rel_srsubid,
     617             :                 BTEqualStrategyNumber, F_OIDEQ,
     618             :                 ObjectIdGetDatum(subid));
     619             : 
     620          18 :     ScanKeyInit(&skey[1],
     621             :                 Anum_pg_subscription_rel_srsubstate,
     622             :                 BTEqualStrategyNumber, F_CHAREQ,
     623             :                 CharGetDatum(SUBREL_STATE_INIT));
     624             : 
     625          18 :     scan = systable_beginscan(rel, InvalidOid, false,
     626             :                               NULL, 2, skey);
     627          48 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     628             :     {
     629             :         Form_pg_subscription_rel subrel;
     630             :         LogicalRepSequenceInfo *seq;
     631             :         Relation    sequence_rel;
     632             :         MemoryContext oldctx;
     633             : 
     634          30 :         CHECK_FOR_INTERRUPTS();
     635             : 
     636          30 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     637             : 
     638          30 :         sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
     639             : 
     640             :         /* Skip if sequence was dropped concurrently */
     641          30 :         if (!sequence_rel)
     642           0 :             continue;
     643             : 
     644             :         /* Skip if the relation is not a sequence */
     645          30 :         if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
     646             :         {
     647           4 :             table_close(sequence_rel, NoLock);
     648           4 :             continue;
     649             :         }
     650             : 
     651             :         /*
     652             :          * Worker needs to process sequences across transaction boundary, so
     653             :          * allocate them under long-lived context.
     654             :          */
     655          26 :         oldctx = MemoryContextSwitchTo(ApplyContext);
     656             : 
     657          26 :         seq = palloc0_object(LogicalRepSequenceInfo);
     658          26 :         seq->localrelid = subrel->srrelid;
     659          26 :         seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
     660          26 :         seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
     661          26 :         seqinfos = lappend(seqinfos, seq);
     662             : 
     663          26 :         MemoryContextSwitchTo(oldctx);
     664             : 
     665          26 :         table_close(sequence_rel, NoLock);
     666             :     }
     667             : 
     668             :     /* Cleanup */
     669          18 :     systable_endscan(scan);
     670          18 :     table_close(rel, AccessShareLock);
     671             : 
     672          18 :     CommitTransactionCommand();
     673             : 
     674             :     /*
     675             :      * Exit early if no catalog entries found, likely due to concurrent drops.
     676             :      */
     677          18 :     if (!seqinfos)
     678           0 :         return;
     679             : 
     680             :     /* Is the use of a password mandatory? */
     681          36 :     must_use_password = MySubscription->passwordrequired &&
     682          18 :         !MySubscription->ownersuperuser;
     683             : 
     684          18 :     initStringInfo(&app_name);
     685          18 :     appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
     686          18 :                      MySubscription->oid, GetSystemIdentifier());
     687             : 
     688             :     /*
     689             :      * Establish the connection to the publisher for sequence synchronization.
     690             :      */
     691          18 :     LogRepWorkerWalRcvConn =
     692          18 :         walrcv_connect(MySubscription->conninfo, true, true,
     693             :                        must_use_password,
     694             :                        app_name.data, &err);
     695          18 :     if (LogRepWorkerWalRcvConn == NULL)
     696           0 :         ereport(ERROR,
     697             :                 errcode(ERRCODE_CONNECTION_FAILURE),
     698             :                 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
     699             :                        MySubscription->name, err));
     700             : 
     701          18 :     pfree(app_name.data);
     702             : 
     703          18 :     copy_sequences(LogRepWorkerWalRcvConn);
     704             : }
     705             : 
     706             : /*
     707             :  * Execute the initial sync with error handling. Disable the subscription,
     708             :  * if required.
     709             :  *
     710             :  * Note that we don't handle FATAL errors which are probably because of system
     711             :  * resource error and are not repeatable.
     712             :  */
     713             : static void
     714          18 : start_sequence_sync()
     715             : {
     716             :     Assert(am_sequencesync_worker());
     717             : 
     718          18 :     PG_TRY();
     719             :     {
     720             :         /* Call initial sync. */
     721          18 :         LogicalRepSyncSequences();
     722             :     }
     723           8 :     PG_CATCH();
     724             :     {
     725           8 :         if (MySubscription->disableonerr)
     726           0 :             DisableSubscriptionAndExit();
     727             :         else
     728             :         {
     729             :             /*
     730             :              * Report the worker failed during sequence synchronization. Abort
     731             :              * the current transaction so that the stats message is sent in an
     732             :              * idle state.
     733             :              */
     734           8 :             AbortOutOfAnyTransaction();
     735           8 :             pgstat_report_subscription_error(MySubscription->oid,
     736             :                                              WORKERTYPE_SEQUENCESYNC);
     737             : 
     738           8 :             PG_RE_THROW();
     739             :         }
     740             :     }
     741          10 :     PG_END_TRY();
     742          10 : }
     743             : 
     744             : /* Logical Replication sequencesync worker entry point */
     745             : void
     746          18 : SequenceSyncWorkerMain(Datum main_arg)
     747             : {
     748          18 :     int         worker_slot = DatumGetInt32(main_arg);
     749             : 
     750          18 :     SetupApplyOrSyncWorker(worker_slot);
     751             : 
     752          18 :     start_sequence_sync();
     753             : 
     754          10 :     FinishSyncWorker();
     755             : }

Generated by: LCOV version 1.16