LCOV - code coverage report
Current view: top level - src/backend/replication/logical - sequencesync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 200 231 86.6 %
Date: 2026-02-07 10:18:48 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * sequencesync.c
       3             :  *    PostgreSQL logical replication: sequence synchronization
       4             :  *
       5             :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/sequencesync.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code for sequence synchronization for
      12             :  *    logical replication.
      13             :  *
      14             :  * Sequences requiring synchronization are tracked in the pg_subscription_rel
      15             :  * catalog.
      16             :  *
      17             :  * Sequences to be synchronized will be added with state INIT when either of
      18             :  * the following commands is executed:
      19             :  * CREATE SUBSCRIPTION
      20             :  * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
      21             :  *
      22             :  * Executing the following command resets all sequences in the subscription to
      23             :  * state INIT, triggering re-synchronization:
      24             :  * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
      25             :  *
      26             :  * The apply worker periodically scans pg_subscription_rel for sequences in
      27             :  * INIT state. When such sequences are found, it spawns a sequencesync worker
      28             :  * to handle synchronization.
      29             :  *
      30             :  * A single sequencesync worker is responsible for synchronizing all sequences.
      31             :  * It begins by retrieving the list of sequences that are flagged for
      32             :  * synchronization, i.e., those in the INIT state. These sequences are then
      33             :  * processed in batches, allowing multiple entries to be synchronized within a
      34             :  * single transaction. The worker fetches the current sequence values and page
      35             :  * LSNs from the remote publisher, updates the corresponding sequences on the
      36             :  * local subscriber, and finally marks each sequence as READY upon successful
      37             :  * synchronization.
      38             :  *
      39             :  * Sequence state transitions follow this pattern:
      40             :  *   INIT -> READY
      41             :  *
      42             :  * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
      43             :  * sequences are synchronized per transaction. The locks on the sequence
      44             :  * relation will be periodically released at each transaction commit.
      45             :  *
      46             :  * XXX: We didn't choose launcher process to maintain the launch of sequencesync
      47             :  * worker as it didn't have database connection to access the sequences from the
      48             :  * pg_subscription_rel system catalog that need to be synchronized.
      49             :  *-------------------------------------------------------------------------
      50             :  */
      51             : 
      52             : #include "postgres.h"
      53             : 
      54             : #include "access/genam.h"
      55             : #include "access/table.h"
      56             : #include "catalog/pg_sequence.h"
      57             : #include "catalog/pg_subscription_rel.h"
      58             : #include "commands/sequence.h"
      59             : #include "pgstat.h"
      60             : #include "postmaster/interrupt.h"
      61             : #include "replication/logicalworker.h"
      62             : #include "replication/worker_internal.h"
      63             : #include "utils/acl.h"
      64             : #include "utils/builtins.h"
      65             : #include "utils/fmgroids.h"
      66             : #include "utils/guc.h"
      67             : #include "utils/inval.h"
      68             : #include "utils/lsyscache.h"
      69             : #include "utils/memutils.h"
      70             : #include "utils/pg_lsn.h"
      71             : #include "utils/syscache.h"
      72             : #include "utils/usercontext.h"
      73             : 
      74             : #define REMOTE_SEQ_COL_COUNT 10
      75             : 
      76             : typedef enum CopySeqResult
      77             : {
      78             :     COPYSEQ_SUCCESS,
      79             :     COPYSEQ_MISMATCH,
      80             :     COPYSEQ_INSUFFICIENT_PERM,
      81             :     COPYSEQ_SKIPPED
      82             : } CopySeqResult;
      83             : 
      84             : static List *seqinfos = NIL;
      85             : 
      86             : /*
      87             :  * Apply worker determines if sequence synchronization is needed.
      88             :  *
      89             :  * Start a sequencesync worker if one is not already running. The active
      90             :  * sequencesync worker will handle all pending sequence synchronization. If any
      91             :  * sequences remain unsynchronized after it exits, a new worker can be started
      92             :  * in the next iteration.
      93             :  */
      94             : void
      95       10720 : ProcessSequencesForSync(void)
      96             : {
      97             :     LogicalRepWorker *sequencesync_worker;
      98             :     int         nsyncworkers;
      99             :     bool        has_pending_sequences;
     100             :     bool        started_tx;
     101             : 
     102       10720 :     FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
     103             : 
     104       10720 :     if (started_tx)
     105             :     {
     106         350 :         CommitTransactionCommand();
     107         350 :         pgstat_report_stat(true);
     108             :     }
     109             : 
     110       10720 :     if (!has_pending_sequences)
     111       10688 :         return;
     112             : 
     113          50 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     114             : 
     115             :     /* Check if there is a sequencesync worker already running? */
     116          50 :     sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
     117          50 :                                                  MyLogicalRepWorker->subid,
     118             :                                                  InvalidOid, true);
     119          50 :     if (sequencesync_worker)
     120             :     {
     121          18 :         LWLockRelease(LogicalRepWorkerLock);
     122          18 :         return;
     123             :     }
     124             : 
     125             :     /*
     126             :      * Count running sync workers for this subscription, while we have the
     127             :      * lock.
     128             :      */
     129          32 :     nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     130          32 :     LWLockRelease(LogicalRepWorkerLock);
     131             : 
     132             :     /*
     133             :      * It is okay to read/update last_seqsync_start_time here in apply worker
     134             :      * as we have already ensured that sync worker doesn't exist.
     135             :      */
     136          32 :     launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
     137          32 :                        &MyLogicalRepWorker->last_seqsync_start_time);
     138             : }
     139             : 
     140             : /*
     141             :  * get_sequences_string
     142             :  *
     143             :  * Build a comma-separated string of schema-qualified sequence names
     144             :  * for the given list of sequence indexes.
     145             :  */
     146             : static void
     147           8 : get_sequences_string(List *seqindexes, StringInfo buf)
     148             : {
     149           8 :     resetStringInfo(buf);
     150          24 :     foreach_int(seqidx, seqindexes)
     151             :     {
     152             :         LogicalRepSequenceInfo *seqinfo =
     153           8 :             (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
     154             : 
     155           8 :         if (buf->len > 0)
     156           0 :             appendStringInfoString(buf, ", ");
     157             : 
     158           8 :         appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
     159             :     }
     160           8 : }
     161             : 
     162             : /*
     163             :  * report_sequence_errors
     164             :  *
     165             :  * Report discrepancies found during sequence synchronization between
     166             :  * the publisher and subscriber. Emits warnings for:
     167             :  * a) mismatched definitions or concurrent rename
     168             :  * b) insufficient privileges
     169             :  * c) missing sequences on the subscriber
     170             :  * Then raises an ERROR to indicate synchronization failure.
     171             :  */
     172             : static void
     173          18 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
     174             :                        List *missing_seqs_idx)
     175             : {
     176             :     StringInfo  seqstr;
     177             : 
     178             :     /* Quick exit if there are no errors to report */
     179          18 :     if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
     180          10 :         return;
     181             : 
     182           8 :     seqstr = makeStringInfo();
     183             : 
     184           8 :     if (mismatched_seqs_idx)
     185             :     {
     186           6 :         get_sequences_string(mismatched_seqs_idx, seqstr);
     187           6 :         ereport(WARNING,
     188             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     189             :                 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
     190             :                               "mismatched or renamed sequences on subscriber (%s)",
     191             :                               list_length(mismatched_seqs_idx),
     192             :                               seqstr->data));
     193             :     }
     194             : 
     195           8 :     if (insuffperm_seqs_idx)
     196             :     {
     197           0 :         get_sequences_string(insuffperm_seqs_idx, seqstr);
     198           0 :         ereport(WARNING,
     199             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     200             :                 errmsg_plural("insufficient privileges on sequence (%s)",
     201             :                               "insufficient privileges on sequences (%s)",
     202             :                               list_length(insuffperm_seqs_idx),
     203             :                               seqstr->data));
     204             :     }
     205             : 
     206           8 :     if (missing_seqs_idx)
     207             :     {
     208           2 :         get_sequences_string(missing_seqs_idx, seqstr);
     209           2 :         ereport(WARNING,
     210             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     211             :                 errmsg_plural("missing sequence on publisher (%s)",
     212             :                               "missing sequences on publisher (%s)",
     213             :                               list_length(missing_seqs_idx),
     214             :                               seqstr->data));
     215             :     }
     216             : 
     217           8 :     ereport(ERROR,
     218             :             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     219             :             errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
     220             :                    MySubscription->name));
     221             : }
     222             : 
     223             : /*
     224             :  * get_and_validate_seq_info
     225             :  *
     226             :  * Extracts remote sequence information from the tuple slot received from the
     227             :  * publisher, and validates it against the corresponding local sequence
     228             :  * definition.
     229             :  */
     230             : static CopySeqResult
     231          24 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
     232             :                           LogicalRepSequenceInfo **seqinfo, int *seqidx)
     233             : {
     234             :     bool        isnull;
     235          24 :     int         col = 0;
     236             :     Datum       datum;
     237             :     Oid         remote_typid;
     238             :     int64       remote_start;
     239             :     int64       remote_increment;
     240             :     int64       remote_min;
     241             :     int64       remote_max;
     242             :     bool        remote_cycle;
     243          24 :     CopySeqResult result = COPYSEQ_SUCCESS;
     244             :     HeapTuple   tup;
     245             :     Form_pg_sequence local_seq;
     246             :     LogicalRepSequenceInfo *seqinfo_local;
     247             : 
     248          24 :     *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
     249             :     Assert(!isnull);
     250             : 
     251             :     /* Identify the corresponding local sequence for the given index. */
     252          24 :     *seqinfo = seqinfo_local =
     253          24 :         (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
     254             : 
     255             :     /*
     256             :      * last_value can be NULL if the sequence was dropped concurrently (see
     257             :      * pg_get_sequence_data()).
     258             :      */
     259          24 :     datum = slot_getattr(slot, ++col, &isnull);
     260          24 :     if (isnull)
     261           0 :         return COPYSEQ_SKIPPED;
     262          24 :     seqinfo_local->last_value = DatumGetInt64(datum);
     263             : 
     264          24 :     seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     265             :     Assert(!isnull);
     266             : 
     267          24 :     seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
     268             :     Assert(!isnull);
     269             : 
     270          24 :     remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
     271             :     Assert(!isnull);
     272             : 
     273          24 :     remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     274             :     Assert(!isnull);
     275             : 
     276          24 :     remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     277             :     Assert(!isnull);
     278             : 
     279          24 :     remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     280             :     Assert(!isnull);
     281             : 
     282          24 :     remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     283             :     Assert(!isnull);
     284             : 
     285          24 :     remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     286             :     Assert(!isnull);
     287             : 
     288             :     /* Sanity check */
     289             :     Assert(col == REMOTE_SEQ_COL_COUNT);
     290             : 
     291          24 :     seqinfo_local->found_on_pub = true;
     292             : 
     293          24 :     *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
     294             : 
     295             :     /* Sequence was concurrently dropped? */
     296          24 :     if (!*sequence_rel)
     297           0 :         return COPYSEQ_SKIPPED;
     298             : 
     299          24 :     tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
     300             : 
     301             :     /* Sequence was concurrently dropped? */
     302          24 :     if (!HeapTupleIsValid(tup))
     303           0 :         elog(ERROR, "cache lookup failed for sequence %u",
     304             :              seqinfo_local->localrelid);
     305             : 
     306          24 :     local_seq = (Form_pg_sequence) GETSTRUCT(tup);
     307             : 
     308             :     /* Sequence parameters for remote/local are the same? */
     309          24 :     if (local_seq->seqtypid != remote_typid ||
     310          24 :         local_seq->seqstart != remote_start ||
     311          22 :         local_seq->seqincrement != remote_increment ||
     312          18 :         local_seq->seqmin != remote_min ||
     313          18 :         local_seq->seqmax != remote_max ||
     314          18 :         local_seq->seqcycle != remote_cycle)
     315           6 :         result = COPYSEQ_MISMATCH;
     316             : 
     317             :     /* Sequence was concurrently renamed? */
     318          24 :     if (strcmp(seqinfo_local->nspname,
     319          24 :                get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
     320          24 :         strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
     321           0 :         result = COPYSEQ_MISMATCH;
     322             : 
     323          24 :     ReleaseSysCache(tup);
     324          24 :     return result;
     325             : }
     326             : 
     327             : /*
     328             :  * Apply remote sequence state to local sequence and mark it as
     329             :  * synchronized (READY).
     330             :  */
     331             : static CopySeqResult
     332          18 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
     333             : {
     334             :     UserContext ucxt;
     335             :     AclResult   aclresult;
     336          18 :     bool        run_as_owner = MySubscription->runasowner;
     337          18 :     Oid         seqoid = seqinfo->localrelid;
     338             : 
     339             :     /*
     340             :      * If the user did not opt to run as the owner of the subscription
     341             :      * ('run_as_owner'), then copy the sequence as the owner of the sequence.
     342             :      */
     343          18 :     if (!run_as_owner)
     344          18 :         SwitchToUntrustedUser(seqowner, &ucxt);
     345             : 
     346          18 :     aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
     347             : 
     348          18 :     if (aclresult != ACLCHECK_OK)
     349             :     {
     350           0 :         if (!run_as_owner)
     351           0 :             RestoreUserContext(&ucxt);
     352             : 
     353           0 :         return COPYSEQ_INSUFFICIENT_PERM;
     354             :     }
     355             : 
     356             :     /*
     357             :      * The log counter (log_cnt) tracks how many sequence values are still
     358             :      * unused locally. It is only relevant to the local node and managed
     359             :      * internally by nextval() when allocating new ranges. Since log_cnt does
     360             :      * not affect the visible sequence state (like last_value or is_called)
     361             :      * and is only used for local caching, it need not be copied to the
     362             :      * subscriber during synchronization.
     363             :      */
     364          18 :     SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
     365             : 
     366          18 :     if (!run_as_owner)
     367          18 :         RestoreUserContext(&ucxt);
     368             : 
     369             :     /*
     370             :      * Record the remote sequence's LSN in pg_subscription_rel and mark the
     371             :      * sequence as READY.
     372             :      */
     373          18 :     UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
     374             :                                seqinfo->page_lsn, false);
     375             : 
     376          18 :     return COPYSEQ_SUCCESS;
     377             : }
     378             : 
     379             : /*
     380             :  * Copy existing data of sequences from the publisher.
     381             :  */
     382             : static void
     383          18 : copy_sequences(WalReceiverConn *conn)
     384             : {
     385          18 :     int         cur_batch_base_index = 0;
     386          18 :     int         n_seqinfos = list_length(seqinfos);
     387          18 :     List       *mismatched_seqs_idx = NIL;
     388          18 :     List       *missing_seqs_idx = NIL;
     389          18 :     List       *insuffperm_seqs_idx = NIL;
     390          18 :     StringInfo  seqstr = makeStringInfo();
     391          18 :     StringInfo  cmd = makeStringInfo();
     392             :     MemoryContext oldctx;
     393             : 
     394             : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
     395             : 
     396          18 :     elog(DEBUG1,
     397             :          "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
     398             :          MySubscription->name, n_seqinfos);
     399             : 
     400          36 :     while (cur_batch_base_index < n_seqinfos)
     401             :     {
     402          18 :         Oid         seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
     403             :         BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
     404          18 :         int         batch_size = 0;
     405          18 :         int         batch_succeeded_count = 0;
     406          18 :         int         batch_mismatched_count = 0;
     407          18 :         int         batch_skipped_count = 0;
     408          18 :         int         batch_insuffperm_count = 0;
     409             :         int         batch_missing_count;
     410          18 :         Relation    sequence_rel = NULL;
     411             : 
     412             :         WalRcvExecResult *res;
     413             :         TupleTableSlot *slot;
     414             : 
     415          18 :         StartTransactionCommand();
     416             : 
     417          44 :         for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
     418             :         {
     419             :             char       *nspname_literal;
     420             :             char       *seqname_literal;
     421             : 
     422             :             LogicalRepSequenceInfo *seqinfo =
     423          26 :                 (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     424             : 
     425          26 :             if (seqstr->len > 0)
     426           8 :                 appendStringInfoString(seqstr, ", ");
     427             : 
     428          26 :             nspname_literal = quote_literal_cstr(seqinfo->nspname);
     429          26 :             seqname_literal = quote_literal_cstr(seqinfo->seqname);
     430             : 
     431          26 :             appendStringInfo(seqstr, "(%s, %s, %d)",
     432             :                              nspname_literal, seqname_literal, idx);
     433             : 
     434          26 :             if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
     435           0 :                 break;
     436             :         }
     437             : 
     438             :         /*
     439             :          * We deliberately avoid acquiring a local lock on the sequence before
     440             :          * querying the publisher to prevent potential distributed deadlocks
     441             :          * in bi-directional replication setups.
     442             :          *
     443             :          * Example scenario:
     444             :          *
     445             :          * - On each node, a background worker acquires a lock on a sequence
     446             :          * as part of a sync operation.
     447             :          *
     448             :          * - Concurrently, a user transaction attempts to alter the same
     449             :          * sequence, waiting on the background worker's lock.
     450             :          *
     451             :          * - Meanwhile, a query from the other node tries to access metadata
     452             :          * that depends on the completion of the alter operation.
     453             :          *
     454             :          * - This creates a circular wait across nodes:
     455             :          *
     456             :          * Node-1: Query -> waits on Alter -> waits on Sync Worker
     457             :          *
     458             :          * Node-2: Query -> waits on Alter -> waits on Sync Worker
     459             :          *
     460             :          * Since each node only sees part of the wait graph, the deadlock may
     461             :          * go undetected, leading to indefinite blocking.
     462             :          *
     463             :          * Note: Each entry in VALUES includes an index 'seqidx' that
     464             :          * represents the sequence's position in the local 'seqinfos' list.
     465             :          * This index is propagated to the query results and later used to
     466             :          * directly map the fetched publisher sequence rows back to their
     467             :          * corresponding local entries without relying on result order or name
     468             :          * matching.
     469             :          */
     470          18 :         appendStringInfo(cmd,
     471             :                          "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
     472             :                          "       seq.seqstart, seq.seqincrement, seq.seqmin,\n"
     473             :                          "       seq.seqmax, seq.seqcycle\n"
     474             :                          "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
     475             :                          "JOIN pg_namespace n ON n.nspname = s.schname\n"
     476             :                          "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
     477             :                          "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
     478             :                          "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
     479             :                          seqstr->data);
     480             : 
     481          18 :         res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
     482          18 :         if (res->status != WALRCV_OK_TUPLES)
     483           0 :             ereport(ERROR,
     484             :                     errcode(ERRCODE_CONNECTION_FAILURE),
     485             :                     errmsg("could not fetch sequence information from the publisher: %s",
     486             :                            res->err));
     487             : 
     488          18 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     489          42 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     490             :         {
     491             :             CopySeqResult sync_status;
     492             :             LogicalRepSequenceInfo *seqinfo;
     493             :             int         seqidx;
     494             : 
     495          24 :             CHECK_FOR_INTERRUPTS();
     496             : 
     497          24 :             if (ConfigReloadPending)
     498             :             {
     499           0 :                 ConfigReloadPending = false;
     500           0 :                 ProcessConfigFile(PGC_SIGHUP);
     501             :             }
     502             : 
     503          24 :             sync_status = get_and_validate_seq_info(slot, &sequence_rel,
     504             :                                                     &seqinfo, &seqidx);
     505          24 :             if (sync_status == COPYSEQ_SUCCESS)
     506          18 :                 sync_status = copy_sequence(seqinfo,
     507          18 :                                             sequence_rel->rd_rel->relowner);
     508             : 
     509          24 :             switch (sync_status)
     510             :             {
     511          18 :                 case COPYSEQ_SUCCESS:
     512          18 :                     elog(DEBUG1,
     513             :                          "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
     514             :                          MySubscription->name, seqinfo->nspname,
     515             :                          seqinfo->seqname);
     516          18 :                     batch_succeeded_count++;
     517          18 :                     break;
     518           6 :                 case COPYSEQ_MISMATCH:
     519             : 
     520             :                     /*
     521             :                      * Remember mismatched sequences in a long-lived memory
     522             :                      * context since these will be used after the transaction
     523             :                      * is committed.
     524             :                      */
     525           6 :                     oldctx = MemoryContextSwitchTo(ApplyContext);
     526           6 :                     mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
     527             :                                                       seqidx);
     528           6 :                     MemoryContextSwitchTo(oldctx);
     529           6 :                     batch_mismatched_count++;
     530           6 :                     break;
     531           0 :                 case COPYSEQ_INSUFFICIENT_PERM:
     532             : 
     533             :                     /*
     534             :                      * Remember sequences with insufficient privileges in a
     535             :                      * long-lived memory context since these will be used
     536             :                      * after the transaction is committed.
     537             :                      */
     538           0 :                     oldctx = MemoryContextSwitchTo(ApplyContext);
     539           0 :                     insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
     540             :                                                       seqidx);
     541           0 :                     MemoryContextSwitchTo(oldctx);
     542           0 :                     batch_insuffperm_count++;
     543           0 :                     break;
     544           0 :                 case COPYSEQ_SKIPPED:
     545             : 
     546             :                     /*
     547             :                      * Concurrent removal of a sequence on the subscriber is
     548             :                      * treated as success, since the only viable action is to
     549             :                      * skip the corresponding sequence data. Missing sequences
     550             :                      * on the publisher are treated as ERROR.
     551             :                      */
     552           0 :                     if (seqinfo->found_on_pub)
     553             :                     {
     554           0 :                         ereport(LOG,
     555             :                                 errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
     556             :                                        seqinfo->nspname,
     557             :                                        seqinfo->seqname));
     558           0 :                         batch_skipped_count++;
     559             :                     }
     560           0 :                     break;
     561             :             }
     562             : 
     563          24 :             if (sequence_rel)
     564          24 :                 table_close(sequence_rel, NoLock);
     565             :         }
     566             : 
     567          18 :         ExecDropSingleTupleTableSlot(slot);
     568          18 :         walrcv_clear_result(res);
     569          18 :         resetStringInfo(seqstr);
     570          18 :         resetStringInfo(cmd);
     571             : 
     572          18 :         batch_missing_count = batch_size - (batch_succeeded_count +
     573          18 :                                             batch_mismatched_count +
     574          18 :                                             batch_insuffperm_count +
     575             :                                             batch_skipped_count);
     576             : 
     577          18 :         elog(DEBUG1,
     578             :              "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
     579             :              MySubscription->name,
     580             :              (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
     581             :              batch_size, batch_succeeded_count, batch_mismatched_count,
     582             :              batch_insuffperm_count, batch_missing_count, batch_skipped_count);
     583             : 
     584             :         /* Commit this batch, and prepare for next batch */
     585          18 :         CommitTransactionCommand();
     586             : 
     587          18 :         if (batch_missing_count)
     588             :         {
     589           4 :             for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
     590             :             {
     591             :                 LogicalRepSequenceInfo *seqinfo =
     592           2 :                     (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     593             : 
     594             :                 /* If the sequence was not found on publisher, record it */
     595           2 :                 if (!seqinfo->found_on_pub)
     596           2 :                     missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
     597             :             }
     598             :         }
     599             : 
     600             :         /*
     601             :          * cur_batch_base_index is not incremented sequentially because some
     602             :          * sequences may be missing, and the number of fetched rows may not
     603             :          * match the batch size.
     604             :          */
     605          18 :         cur_batch_base_index += batch_size;
     606             :     }
     607             : 
     608             :     /* Report mismatches, permission issues, or missing sequences */
     609          18 :     report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
     610             :                            missing_seqs_idx);
     611          10 : }
     612             : 
     613             : /*
     614             :  * Identifies sequences that require synchronization and initiates the
     615             :  * synchronization process.
     616             :  */
     617             : static void
     618          18 : LogicalRepSyncSequences(void)
     619             : {
     620             :     char       *err;
     621             :     bool        must_use_password;
     622             :     Relation    rel;
     623             :     HeapTuple   tup;
     624             :     ScanKeyData skey[2];
     625             :     SysScanDesc scan;
     626          18 :     Oid         subid = MyLogicalRepWorker->subid;
     627             :     StringInfoData app_name;
     628             : 
     629          18 :     StartTransactionCommand();
     630             : 
     631          18 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     632             : 
     633          18 :     ScanKeyInit(&skey[0],
     634             :                 Anum_pg_subscription_rel_srsubid,
     635             :                 BTEqualStrategyNumber, F_OIDEQ,
     636             :                 ObjectIdGetDatum(subid));
     637             : 
     638          18 :     ScanKeyInit(&skey[1],
     639             :                 Anum_pg_subscription_rel_srsubstate,
     640             :                 BTEqualStrategyNumber, F_CHAREQ,
     641             :                 CharGetDatum(SUBREL_STATE_INIT));
     642             : 
     643          18 :     scan = systable_beginscan(rel, InvalidOid, false,
     644             :                               NULL, 2, skey);
     645          44 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     646             :     {
     647             :         Form_pg_subscription_rel subrel;
     648             :         LogicalRepSequenceInfo *seq;
     649             :         Relation    sequence_rel;
     650             :         MemoryContext oldctx;
     651             : 
     652          26 :         CHECK_FOR_INTERRUPTS();
     653             : 
     654          26 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     655             : 
     656          26 :         sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
     657             : 
     658             :         /* Skip if sequence was dropped concurrently */
     659          26 :         if (!sequence_rel)
     660           0 :             continue;
     661             : 
     662             :         /* Skip if the relation is not a sequence */
     663          26 :         if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
     664             :         {
     665           0 :             table_close(sequence_rel, NoLock);
     666           0 :             continue;
     667             :         }
     668             : 
     669             :         /*
     670             :          * Worker needs to process sequences across transaction boundary, so
     671             :          * allocate them under long-lived context.
     672             :          */
     673          26 :         oldctx = MemoryContextSwitchTo(ApplyContext);
     674             : 
     675          26 :         seq = palloc0_object(LogicalRepSequenceInfo);
     676          26 :         seq->localrelid = subrel->srrelid;
     677          26 :         seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
     678          26 :         seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
     679          26 :         seqinfos = lappend(seqinfos, seq);
     680             : 
     681          26 :         MemoryContextSwitchTo(oldctx);
     682             : 
     683          26 :         table_close(sequence_rel, NoLock);
     684             :     }
     685             : 
     686             :     /* Cleanup */
     687          18 :     systable_endscan(scan);
     688          18 :     table_close(rel, AccessShareLock);
     689             : 
     690          18 :     CommitTransactionCommand();
     691             : 
     692             :     /*
     693             :      * Exit early if no catalog entries found, likely due to concurrent drops.
     694             :      */
     695          18 :     if (!seqinfos)
     696           0 :         return;
     697             : 
     698             :     /* Is the use of a password mandatory? */
     699          36 :     must_use_password = MySubscription->passwordrequired &&
     700          18 :         !MySubscription->ownersuperuser;
     701             : 
     702          18 :     initStringInfo(&app_name);
     703          18 :     appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
     704          18 :                      MySubscription->oid, GetSystemIdentifier());
     705             : 
     706             :     /*
     707             :      * Establish the connection to the publisher for sequence synchronization.
     708             :      */
     709          18 :     LogRepWorkerWalRcvConn =
     710          18 :         walrcv_connect(MySubscription->conninfo, true, true,
     711             :                        must_use_password,
     712             :                        app_name.data, &err);
     713          18 :     if (LogRepWorkerWalRcvConn == NULL)
     714           0 :         ereport(ERROR,
     715             :                 errcode(ERRCODE_CONNECTION_FAILURE),
     716             :                 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
     717             :                        MySubscription->name, err));
     718             : 
     719          18 :     pfree(app_name.data);
     720             : 
     721          18 :     copy_sequences(LogRepWorkerWalRcvConn);
     722             : }
     723             : 
     724             : /*
     725             :  * Execute the initial sync with error handling. Disable the subscription,
     726             :  * if required.
     727             :  *
     728             :  * Note that we don't handle FATAL errors which are probably because of system
     729             :  * resource error and are not repeatable.
     730             :  */
     731             : static void
     732          18 : start_sequence_sync(void)
     733             : {
     734             :     Assert(am_sequencesync_worker());
     735             : 
     736          18 :     PG_TRY();
     737             :     {
     738             :         /* Call initial sync. */
     739          18 :         LogicalRepSyncSequences();
     740             :     }
     741           8 :     PG_CATCH();
     742             :     {
     743           8 :         if (MySubscription->disableonerr)
     744           0 :             DisableSubscriptionAndExit();
     745             :         else
     746             :         {
     747             :             /*
     748             :              * Report the worker failed during sequence synchronization. Abort
     749             :              * the current transaction so that the stats message is sent in an
     750             :              * idle state.
     751             :              */
     752           8 :             AbortOutOfAnyTransaction();
     753           8 :             pgstat_report_subscription_error(MySubscription->oid,
     754             :                                              WORKERTYPE_SEQUENCESYNC);
     755             : 
     756           8 :             PG_RE_THROW();
     757             :         }
     758             :     }
     759          10 :     PG_END_TRY();
     760          10 : }
     761             : 
     762             : /* Logical Replication sequencesync worker entry point */
     763             : void
     764          18 : SequenceSyncWorkerMain(Datum main_arg)
     765             : {
     766          18 :     int         worker_slot = DatumGetInt32(main_arg);
     767             : 
     768          18 :     SetupApplyOrSyncWorker(worker_slot);
     769             : 
     770          18 :     start_sequence_sync();
     771             : 
     772          10 :     FinishSyncWorker();
     773             : }

Generated by: LCOV version 1.16