LCOV - code coverage report
Current view: top level - src/backend/replication/logical - sequencesync.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.6 % 231 200
Test Date: 2026-03-14 14:14:39 Functions: 100.0 % 9 9
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * sequencesync.c
       3              :  *    PostgreSQL logical replication: sequence synchronization
       4              :  *
       5              :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       6              :  *
       7              :  * IDENTIFICATION
       8              :  *    src/backend/replication/logical/sequencesync.c
       9              :  *
      10              :  * NOTES
      11              :  *    This file contains code for sequence synchronization for
      12              :  *    logical replication.
      13              :  *
      14              :  * Sequences requiring synchronization are tracked in the pg_subscription_rel
      15              :  * catalog.
      16              :  *
      17              :  * Sequences to be synchronized will be added with state INIT when either of
      18              :  * the following commands is executed:
      19              :  * CREATE SUBSCRIPTION
      20              :  * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
      21              :  *
      22              :  * Executing the following command resets all sequences in the subscription to
      23              :  * state INIT, triggering re-synchronization:
      24              :  * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
      25              :  *
      26              :  * The apply worker periodically scans pg_subscription_rel for sequences in
      27              :  * INIT state. When such sequences are found, it spawns a sequencesync worker
      28              :  * to handle synchronization.
      29              :  *
      30              :  * A single sequencesync worker is responsible for synchronizing all sequences.
      31              :  * It begins by retrieving the list of sequences that are flagged for
      32              :  * synchronization, i.e., those in the INIT state. These sequences are then
      33              :  * processed in batches, allowing multiple entries to be synchronized within a
      34              :  * single transaction. The worker fetches the current sequence values and page
      35              :  * LSNs from the remote publisher, updates the corresponding sequences on the
      36              :  * local subscriber, and finally marks each sequence as READY upon successful
      37              :  * synchronization.
      38              :  *
      39              :  * Sequence state transitions follow this pattern:
      40              :  *   INIT -> READY
      41              :  *
      42              :  * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
      43              :  * sequences are synchronized per transaction. The locks on the sequence
      44              :  * relation will be periodically released at each transaction commit.
      45              :  *
      46              :  * XXX: We didn't choose launcher process to maintain the launch of sequencesync
      47              :  * worker as it didn't have database connection to access the sequences from the
      48              :  * pg_subscription_rel system catalog that need to be synchronized.
      49              :  *-------------------------------------------------------------------------
      50              :  */
      51              : 
      52              : #include "postgres.h"
      53              : 
      54              : #include "access/genam.h"
      55              : #include "access/table.h"
      56              : #include "catalog/pg_sequence.h"
      57              : #include "catalog/pg_subscription_rel.h"
      58              : #include "commands/sequence.h"
      59              : #include "pgstat.h"
      60              : #include "postmaster/interrupt.h"
      61              : #include "replication/logicalworker.h"
      62              : #include "replication/worker_internal.h"
      63              : #include "utils/acl.h"
      64              : #include "utils/builtins.h"
      65              : #include "utils/fmgroids.h"
      66              : #include "utils/guc.h"
      67              : #include "utils/inval.h"
      68              : #include "utils/lsyscache.h"
      69              : #include "utils/memutils.h"
      70              : #include "utils/pg_lsn.h"
      71              : #include "utils/syscache.h"
      72              : #include "utils/usercontext.h"
      73              : 
      74              : #define REMOTE_SEQ_COL_COUNT 10
      75              : 
      76              : typedef enum CopySeqResult
      77              : {
      78              :     COPYSEQ_SUCCESS,
      79              :     COPYSEQ_MISMATCH,
      80              :     COPYSEQ_INSUFFICIENT_PERM,
      81              :     COPYSEQ_SKIPPED
      82              : } CopySeqResult;
      83              : 
      84              : static List *seqinfos = NIL;
      85              : 
      86              : /*
      87              :  * Apply worker determines if sequence synchronization is needed.
      88              :  *
      89              :  * Start a sequencesync worker if one is not already running. The active
      90              :  * sequencesync worker will handle all pending sequence synchronization. If any
      91              :  * sequences remain unsynchronized after it exits, a new worker can be started
      92              :  * in the next iteration.
      93              :  */
      94              : void
      95         8713 : ProcessSequencesForSync(void)
      96              : {
      97              :     LogicalRepWorker *sequencesync_worker;
      98              :     int         nsyncworkers;
      99              :     bool        has_pending_sequences;
     100              :     bool        started_tx;
     101              : 
     102         8713 :     FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
     103              : 
     104         8713 :     if (started_tx)
     105              :     {
     106          183 :         CommitTransactionCommand();
     107          183 :         pgstat_report_stat(true);
     108              :     }
     109              : 
     110         8713 :     if (!has_pending_sequences)
     111         8698 :         return;
     112              : 
     113           26 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
     114              : 
     115              :     /* Check if there is a sequencesync worker already running? */
     116           26 :     sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
     117           26 :                                                  MyLogicalRepWorker->subid,
     118              :                                                  InvalidOid, true);
     119           26 :     if (sequencesync_worker)
     120              :     {
     121           11 :         LWLockRelease(LogicalRepWorkerLock);
     122           11 :         return;
     123              :     }
     124              : 
     125              :     /*
     126              :      * Count running sync workers for this subscription, while we have the
     127              :      * lock.
     128              :      */
     129           15 :     nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
     130           15 :     LWLockRelease(LogicalRepWorkerLock);
     131              : 
     132              :     /*
     133              :      * It is okay to read/update last_seqsync_start_time here in apply worker
     134              :      * as we have already ensured that sync worker doesn't exist.
     135              :      */
     136           15 :     launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
     137           15 :                        &MyLogicalRepWorker->last_seqsync_start_time);
     138              : }
     139              : 
     140              : /*
     141              :  * get_sequences_string
     142              :  *
     143              :  * Build a comma-separated string of schema-qualified sequence names
     144              :  * for the given list of sequence indexes.
     145              :  */
     146              : static void
     147            4 : get_sequences_string(List *seqindexes, StringInfo buf)
     148              : {
     149            4 :     resetStringInfo(buf);
     150           12 :     foreach_int(seqidx, seqindexes)
     151              :     {
     152              :         LogicalRepSequenceInfo *seqinfo =
     153            4 :             (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
     154              : 
     155            4 :         if (buf->len > 0)
     156            0 :             appendStringInfoString(buf, ", ");
     157              : 
     158            4 :         appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
     159              :     }
     160            4 : }
     161              : 
     162              : /*
     163              :  * report_sequence_errors
     164              :  *
     165              :  * Report discrepancies found during sequence synchronization between
     166              :  * the publisher and subscriber. Emits warnings for:
     167              :  * a) mismatched definitions or concurrent rename
     168              :  * b) insufficient privileges
     169              :  * c) missing sequences on the subscriber
     170              :  * Then raises an ERROR to indicate synchronization failure.
     171              :  */
     172              : static void
     173            9 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
     174              :                        List *missing_seqs_idx)
     175              : {
     176              :     StringInfo  seqstr;
     177              : 
     178              :     /* Quick exit if there are no errors to report */
     179            9 :     if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
     180            5 :         return;
     181              : 
     182            4 :     seqstr = makeStringInfo();
     183              : 
     184            4 :     if (mismatched_seqs_idx)
     185              :     {
     186            3 :         get_sequences_string(mismatched_seqs_idx, seqstr);
     187            3 :         ereport(WARNING,
     188              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     189              :                 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
     190              :                               "mismatched or renamed sequences on subscriber (%s)",
     191              :                               list_length(mismatched_seqs_idx),
     192              :                               seqstr->data));
     193              :     }
     194              : 
     195            4 :     if (insuffperm_seqs_idx)
     196              :     {
     197            0 :         get_sequences_string(insuffperm_seqs_idx, seqstr);
     198            0 :         ereport(WARNING,
     199              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     200              :                 errmsg_plural("insufficient privileges on sequence (%s)",
     201              :                               "insufficient privileges on sequences (%s)",
     202              :                               list_length(insuffperm_seqs_idx),
     203              :                               seqstr->data));
     204              :     }
     205              : 
     206            4 :     if (missing_seqs_idx)
     207              :     {
     208            1 :         get_sequences_string(missing_seqs_idx, seqstr);
     209            1 :         ereport(WARNING,
     210              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     211              :                 errmsg_plural("missing sequence on publisher (%s)",
     212              :                               "missing sequences on publisher (%s)",
     213              :                               list_length(missing_seqs_idx),
     214              :                               seqstr->data));
     215              :     }
     216              : 
     217            4 :     ereport(ERROR,
     218              :             errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     219              :             errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
     220              :                    MySubscription->name));
     221              : }
     222              : 
     223              : /*
     224              :  * get_and_validate_seq_info
     225              :  *
     226              :  * Extracts remote sequence information from the tuple slot received from the
     227              :  * publisher, and validates it against the corresponding local sequence
     228              :  * definition.
     229              :  */
     230              : static CopySeqResult
     231           12 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
     232              :                           LogicalRepSequenceInfo **seqinfo, int *seqidx)
     233              : {
     234              :     bool        isnull;
     235           12 :     int         col = 0;
     236              :     Datum       datum;
     237              :     Oid         remote_typid;
     238              :     int64       remote_start;
     239              :     int64       remote_increment;
     240              :     int64       remote_min;
     241              :     int64       remote_max;
     242              :     bool        remote_cycle;
     243           12 :     CopySeqResult result = COPYSEQ_SUCCESS;
     244              :     HeapTuple   tup;
     245              :     Form_pg_sequence local_seq;
     246              :     LogicalRepSequenceInfo *seqinfo_local;
     247              : 
     248           12 :     *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
     249              :     Assert(!isnull);
     250              : 
     251              :     /* Identify the corresponding local sequence for the given index. */
     252           12 :     *seqinfo = seqinfo_local =
     253           12 :         (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
     254              : 
     255              :     /*
     256              :      * last_value can be NULL if the sequence was dropped concurrently (see
     257              :      * pg_get_sequence_data()).
     258              :      */
     259           12 :     datum = slot_getattr(slot, ++col, &isnull);
     260           12 :     if (isnull)
     261            0 :         return COPYSEQ_SKIPPED;
     262           12 :     seqinfo_local->last_value = DatumGetInt64(datum);
     263              : 
     264           12 :     seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     265              :     Assert(!isnull);
     266              : 
     267           12 :     seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
     268              :     Assert(!isnull);
     269              : 
     270           12 :     remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
     271              :     Assert(!isnull);
     272              : 
     273           12 :     remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     274              :     Assert(!isnull);
     275              : 
     276           12 :     remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     277              :     Assert(!isnull);
     278              : 
     279           12 :     remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     280              :     Assert(!isnull);
     281              : 
     282           12 :     remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
     283              :     Assert(!isnull);
     284              : 
     285           12 :     remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
     286              :     Assert(!isnull);
     287              : 
     288              :     /* Sanity check */
     289              :     Assert(col == REMOTE_SEQ_COL_COUNT);
     290              : 
     291           12 :     seqinfo_local->found_on_pub = true;
     292              : 
     293           12 :     *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
     294              : 
     295              :     /* Sequence was concurrently dropped? */
     296           12 :     if (!*sequence_rel)
     297            0 :         return COPYSEQ_SKIPPED;
     298              : 
     299           12 :     tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
     300              : 
     301              :     /* Sequence was concurrently dropped? */
     302           12 :     if (!HeapTupleIsValid(tup))
     303            0 :         elog(ERROR, "cache lookup failed for sequence %u",
     304              :              seqinfo_local->localrelid);
     305              : 
     306           12 :     local_seq = (Form_pg_sequence) GETSTRUCT(tup);
     307              : 
     308              :     /* Sequence parameters for remote/local are the same? */
     309           12 :     if (local_seq->seqtypid != remote_typid ||
     310           12 :         local_seq->seqstart != remote_start ||
     311           11 :         local_seq->seqincrement != remote_increment ||
     312            9 :         local_seq->seqmin != remote_min ||
     313            9 :         local_seq->seqmax != remote_max ||
     314            9 :         local_seq->seqcycle != remote_cycle)
     315            3 :         result = COPYSEQ_MISMATCH;
     316              : 
     317              :     /* Sequence was concurrently renamed? */
     318           12 :     if (strcmp(seqinfo_local->nspname,
     319           12 :                get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
     320           12 :         strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
     321            0 :         result = COPYSEQ_MISMATCH;
     322              : 
     323           12 :     ReleaseSysCache(tup);
     324           12 :     return result;
     325              : }
     326              : 
     327              : /*
     328              :  * Apply remote sequence state to local sequence and mark it as
     329              :  * synchronized (READY).
     330              :  */
     331              : static CopySeqResult
     332            9 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
     333              : {
     334              :     UserContext ucxt;
     335              :     AclResult   aclresult;
     336            9 :     bool        run_as_owner = MySubscription->runasowner;
     337            9 :     Oid         seqoid = seqinfo->localrelid;
     338              : 
     339              :     /*
     340              :      * If the user did not opt to run as the owner of the subscription
     341              :      * ('run_as_owner'), then copy the sequence as the owner of the sequence.
     342              :      */
     343            9 :     if (!run_as_owner)
     344            9 :         SwitchToUntrustedUser(seqowner, &ucxt);
     345              : 
     346            9 :     aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
     347              : 
     348            9 :     if (aclresult != ACLCHECK_OK)
     349              :     {
     350            0 :         if (!run_as_owner)
     351            0 :             RestoreUserContext(&ucxt);
     352              : 
     353            0 :         return COPYSEQ_INSUFFICIENT_PERM;
     354              :     }
     355              : 
     356              :     /*
     357              :      * The log counter (log_cnt) tracks how many sequence values are still
     358              :      * unused locally. It is only relevant to the local node and managed
     359              :      * internally by nextval() when allocating new ranges. Since log_cnt does
     360              :      * not affect the visible sequence state (like last_value or is_called)
     361              :      * and is only used for local caching, it need not be copied to the
     362              :      * subscriber during synchronization.
     363              :      */
     364            9 :     SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
     365              : 
     366            9 :     if (!run_as_owner)
     367            9 :         RestoreUserContext(&ucxt);
     368              : 
     369              :     /*
     370              :      * Record the remote sequence's LSN in pg_subscription_rel and mark the
     371              :      * sequence as READY.
     372              :      */
     373            9 :     UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
     374              :                                seqinfo->page_lsn, false);
     375              : 
     376            9 :     return COPYSEQ_SUCCESS;
     377              : }
     378              : 
     379              : /*
     380              :  * Copy existing data of sequences from the publisher.
     381              :  */
     382              : static void
     383            9 : copy_sequences(WalReceiverConn *conn)
     384              : {
     385            9 :     int         cur_batch_base_index = 0;
     386            9 :     int         n_seqinfos = list_length(seqinfos);
     387            9 :     List       *mismatched_seqs_idx = NIL;
     388            9 :     List       *missing_seqs_idx = NIL;
     389            9 :     List       *insuffperm_seqs_idx = NIL;
     390            9 :     StringInfo  seqstr = makeStringInfo();
     391            9 :     StringInfo  cmd = makeStringInfo();
     392              :     MemoryContext oldctx;
     393              : 
     394              : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
     395              : 
     396            9 :     elog(DEBUG1,
     397              :          "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
     398              :          MySubscription->name, n_seqinfos);
     399              : 
     400           18 :     while (cur_batch_base_index < n_seqinfos)
     401              :     {
     402            9 :         Oid         seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
     403              :         BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
     404            9 :         int         batch_size = 0;
     405            9 :         int         batch_succeeded_count = 0;
     406            9 :         int         batch_mismatched_count = 0;
     407            9 :         int         batch_skipped_count = 0;
     408            9 :         int         batch_insuffperm_count = 0;
     409              :         int         batch_missing_count;
     410            9 :         Relation    sequence_rel = NULL;
     411              : 
     412              :         WalRcvExecResult *res;
     413              :         TupleTableSlot *slot;
     414              : 
     415            9 :         StartTransactionCommand();
     416              : 
     417           22 :         for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
     418              :         {
     419              :             char       *nspname_literal;
     420              :             char       *seqname_literal;
     421              : 
     422              :             LogicalRepSequenceInfo *seqinfo =
     423           13 :                 (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     424              : 
     425           13 :             if (seqstr->len > 0)
     426            4 :                 appendStringInfoString(seqstr, ", ");
     427              : 
     428           13 :             nspname_literal = quote_literal_cstr(seqinfo->nspname);
     429           13 :             seqname_literal = quote_literal_cstr(seqinfo->seqname);
     430              : 
     431           13 :             appendStringInfo(seqstr, "(%s, %s, %d)",
     432              :                              nspname_literal, seqname_literal, idx);
     433              : 
     434           13 :             if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
     435            0 :                 break;
     436              :         }
     437              : 
     438              :         /*
     439              :          * We deliberately avoid acquiring a local lock on the sequence before
     440              :          * querying the publisher to prevent potential distributed deadlocks
     441              :          * in bi-directional replication setups.
     442              :          *
     443              :          * Example scenario:
     444              :          *
     445              :          * - On each node, a background worker acquires a lock on a sequence
     446              :          * as part of a sync operation.
     447              :          *
     448              :          * - Concurrently, a user transaction attempts to alter the same
     449              :          * sequence, waiting on the background worker's lock.
     450              :          *
     451              :          * - Meanwhile, a query from the other node tries to access metadata
     452              :          * that depends on the completion of the alter operation.
     453              :          *
     454              :          * - This creates a circular wait across nodes:
     455              :          *
     456              :          * Node-1: Query -> waits on Alter -> waits on Sync Worker
     457              :          *
     458              :          * Node-2: Query -> waits on Alter -> waits on Sync Worker
     459              :          *
     460              :          * Since each node only sees part of the wait graph, the deadlock may
     461              :          * go undetected, leading to indefinite blocking.
     462              :          *
     463              :          * Note: Each entry in VALUES includes an index 'seqidx' that
     464              :          * represents the sequence's position in the local 'seqinfos' list.
     465              :          * This index is propagated to the query results and later used to
     466              :          * directly map the fetched publisher sequence rows back to their
     467              :          * corresponding local entries without relying on result order or name
     468              :          * matching.
     469              :          */
     470            9 :         appendStringInfo(cmd,
     471              :                          "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
     472              :                          "       seq.seqstart, seq.seqincrement, seq.seqmin,\n"
     473              :                          "       seq.seqmax, seq.seqcycle\n"
     474              :                          "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
     475              :                          "JOIN pg_namespace n ON n.nspname = s.schname\n"
     476              :                          "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
     477              :                          "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
     478              :                          "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
     479              :                          seqstr->data);
     480              : 
     481            9 :         res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
     482            9 :         if (res->status != WALRCV_OK_TUPLES)
     483            0 :             ereport(ERROR,
     484              :                     errcode(ERRCODE_CONNECTION_FAILURE),
     485              :                     errmsg("could not fetch sequence information from the publisher: %s",
     486              :                            res->err));
     487              : 
     488            9 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     489           21 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
     490              :         {
     491              :             CopySeqResult sync_status;
     492              :             LogicalRepSequenceInfo *seqinfo;
     493              :             int         seqidx;
     494              : 
     495           12 :             CHECK_FOR_INTERRUPTS();
     496              : 
     497           12 :             if (ConfigReloadPending)
     498              :             {
     499            0 :                 ConfigReloadPending = false;
     500            0 :                 ProcessConfigFile(PGC_SIGHUP);
     501              :             }
     502              : 
     503           12 :             sync_status = get_and_validate_seq_info(slot, &sequence_rel,
     504              :                                                     &seqinfo, &seqidx);
     505           12 :             if (sync_status == COPYSEQ_SUCCESS)
     506            9 :                 sync_status = copy_sequence(seqinfo,
     507            9 :                                             sequence_rel->rd_rel->relowner);
     508              : 
     509           12 :             switch (sync_status)
     510              :             {
     511            9 :                 case COPYSEQ_SUCCESS:
     512            9 :                     elog(DEBUG1,
     513              :                          "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
     514              :                          MySubscription->name, seqinfo->nspname,
     515              :                          seqinfo->seqname);
     516            9 :                     batch_succeeded_count++;
     517            9 :                     break;
     518            3 :                 case COPYSEQ_MISMATCH:
     519              : 
     520              :                     /*
     521              :                      * Remember mismatched sequences in a long-lived memory
     522              :                      * context since these will be used after the transaction
     523              :                      * is committed.
     524              :                      */
     525            3 :                     oldctx = MemoryContextSwitchTo(ApplyContext);
     526            3 :                     mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
     527              :                                                       seqidx);
     528            3 :                     MemoryContextSwitchTo(oldctx);
     529            3 :                     batch_mismatched_count++;
     530            3 :                     break;
     531            0 :                 case COPYSEQ_INSUFFICIENT_PERM:
     532              : 
     533              :                     /*
     534              :                      * Remember sequences with insufficient privileges in a
     535              :                      * long-lived memory context since these will be used
     536              :                      * after the transaction is committed.
     537              :                      */
     538            0 :                     oldctx = MemoryContextSwitchTo(ApplyContext);
     539            0 :                     insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
     540              :                                                       seqidx);
     541            0 :                     MemoryContextSwitchTo(oldctx);
     542            0 :                     batch_insuffperm_count++;
     543            0 :                     break;
     544            0 :                 case COPYSEQ_SKIPPED:
     545              : 
     546              :                     /*
     547              :                      * Concurrent removal of a sequence on the subscriber is
     548              :                      * treated as success, since the only viable action is to
     549              :                      * skip the corresponding sequence data. Missing sequences
     550              :                      * on the publisher are treated as ERROR.
     551              :                      */
     552            0 :                     if (seqinfo->found_on_pub)
     553              :                     {
     554            0 :                         ereport(LOG,
     555              :                                 errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
     556              :                                        seqinfo->nspname,
     557              :                                        seqinfo->seqname));
     558            0 :                         batch_skipped_count++;
     559              :                     }
     560            0 :                     break;
     561              :             }
     562              : 
     563           12 :             if (sequence_rel)
     564           12 :                 table_close(sequence_rel, NoLock);
     565              :         }
     566              : 
     567            9 :         ExecDropSingleTupleTableSlot(slot);
     568            9 :         walrcv_clear_result(res);
     569            9 :         resetStringInfo(seqstr);
     570            9 :         resetStringInfo(cmd);
     571              : 
     572            9 :         batch_missing_count = batch_size - (batch_succeeded_count +
     573            9 :                                             batch_mismatched_count +
     574            9 :                                             batch_insuffperm_count +
     575              :                                             batch_skipped_count);
     576              : 
     577            9 :         elog(DEBUG1,
     578              :              "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
     579              :              MySubscription->name,
     580              :              (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
     581              :              batch_size, batch_succeeded_count, batch_mismatched_count,
     582              :              batch_insuffperm_count, batch_missing_count, batch_skipped_count);
     583              : 
     584              :         /* Commit this batch, and prepare for next batch */
     585            9 :         CommitTransactionCommand();
     586              : 
     587            9 :         if (batch_missing_count)
     588              :         {
     589            2 :             for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
     590              :             {
     591              :                 LogicalRepSequenceInfo *seqinfo =
     592            1 :                     (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
     593              : 
     594              :                 /* If the sequence was not found on publisher, record it */
     595            1 :                 if (!seqinfo->found_on_pub)
     596            1 :                     missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
     597              :             }
     598              :         }
     599              : 
     600              :         /*
     601              :          * cur_batch_base_index is not incremented sequentially because some
     602              :          * sequences may be missing, and the number of fetched rows may not
     603              :          * match the batch size.
     604              :          */
     605            9 :         cur_batch_base_index += batch_size;
     606              :     }
     607              : 
     608              :     /* Report mismatches, permission issues, or missing sequences */
     609            9 :     report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
     610              :                            missing_seqs_idx);
     611            5 : }
     612              : 
     613              : /*
     614              :  * Identifies sequences that require synchronization and initiates the
     615              :  * synchronization process.
     616              :  */
     617              : static void
     618            9 : LogicalRepSyncSequences(void)
     619              : {
     620              :     char       *err;
     621              :     bool        must_use_password;
     622              :     Relation    rel;
     623              :     HeapTuple   tup;
     624              :     ScanKeyData skey[2];
     625              :     SysScanDesc scan;
     626            9 :     Oid         subid = MyLogicalRepWorker->subid;
     627              :     StringInfoData app_name;
     628              : 
     629            9 :     StartTransactionCommand();
     630              : 
     631            9 :     rel = table_open(SubscriptionRelRelationId, AccessShareLock);
     632              : 
     633            9 :     ScanKeyInit(&skey[0],
     634              :                 Anum_pg_subscription_rel_srsubid,
     635              :                 BTEqualStrategyNumber, F_OIDEQ,
     636              :                 ObjectIdGetDatum(subid));
     637              : 
     638            9 :     ScanKeyInit(&skey[1],
     639              :                 Anum_pg_subscription_rel_srsubstate,
     640              :                 BTEqualStrategyNumber, F_CHAREQ,
     641              :                 CharGetDatum(SUBREL_STATE_INIT));
     642              : 
     643            9 :     scan = systable_beginscan(rel, InvalidOid, false,
     644              :                               NULL, 2, skey);
     645           22 :     while (HeapTupleIsValid(tup = systable_getnext(scan)))
     646              :     {
     647              :         Form_pg_subscription_rel subrel;
     648              :         LogicalRepSequenceInfo *seq;
     649              :         Relation    sequence_rel;
     650              :         MemoryContext oldctx;
     651              : 
     652           13 :         CHECK_FOR_INTERRUPTS();
     653              : 
     654           13 :         subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
     655              : 
     656           13 :         sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
     657              : 
     658              :         /* Skip if sequence was dropped concurrently */
     659           13 :         if (!sequence_rel)
     660            0 :             continue;
     661              : 
     662              :         /* Skip if the relation is not a sequence */
     663           13 :         if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
     664              :         {
     665            0 :             table_close(sequence_rel, NoLock);
     666            0 :             continue;
     667              :         }
     668              : 
     669              :         /*
     670              :          * Worker needs to process sequences across transaction boundary, so
     671              :          * allocate them under long-lived context.
     672              :          */
     673           13 :         oldctx = MemoryContextSwitchTo(ApplyContext);
     674              : 
     675           13 :         seq = palloc0_object(LogicalRepSequenceInfo);
     676           13 :         seq->localrelid = subrel->srrelid;
     677           13 :         seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
     678           13 :         seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
     679           13 :         seqinfos = lappend(seqinfos, seq);
     680              : 
     681           13 :         MemoryContextSwitchTo(oldctx);
     682              : 
     683           13 :         table_close(sequence_rel, NoLock);
     684              :     }
     685              : 
     686              :     /* Cleanup */
     687            9 :     systable_endscan(scan);
     688            9 :     table_close(rel, AccessShareLock);
     689              : 
     690            9 :     CommitTransactionCommand();
     691              : 
     692              :     /*
     693              :      * Exit early if no catalog entries found, likely due to concurrent drops.
     694              :      */
     695            9 :     if (!seqinfos)
     696            0 :         return;
     697              : 
     698              :     /* Is the use of a password mandatory? */
     699           18 :     must_use_password = MySubscription->passwordrequired &&
     700            9 :         !MySubscription->ownersuperuser;
     701              : 
     702            9 :     initStringInfo(&app_name);
     703            9 :     appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
     704            9 :                      MySubscription->oid, GetSystemIdentifier());
     705              : 
     706              :     /*
     707              :      * Establish the connection to the publisher for sequence synchronization.
     708              :      */
     709            9 :     LogRepWorkerWalRcvConn =
     710            9 :         walrcv_connect(MySubscription->conninfo, true, true,
     711              :                        must_use_password,
     712              :                        app_name.data, &err);
     713            9 :     if (LogRepWorkerWalRcvConn == NULL)
     714            0 :         ereport(ERROR,
     715              :                 errcode(ERRCODE_CONNECTION_FAILURE),
     716              :                 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
     717              :                        MySubscription->name, err));
     718              : 
     719            9 :     pfree(app_name.data);
     720              : 
     721            9 :     copy_sequences(LogRepWorkerWalRcvConn);
     722              : }
     723              : 
     724              : /*
     725              :  * Execute the initial sync with error handling. Disable the subscription,
     726              :  * if required.
     727              :  *
     728              :  * Note that we don't handle FATAL errors which are probably because of system
     729              :  * resource error and are not repeatable.
     730              :  */
     731              : static void
     732            9 : start_sequence_sync(void)
     733              : {
     734              :     Assert(am_sequencesync_worker());
     735              : 
     736            9 :     PG_TRY();
     737              :     {
     738              :         /* Call initial sync. */
     739            9 :         LogicalRepSyncSequences();
     740              :     }
     741            4 :     PG_CATCH();
     742              :     {
     743            4 :         if (MySubscription->disableonerr)
     744            0 :             DisableSubscriptionAndExit();
     745              :         else
     746              :         {
     747              :             /*
     748              :              * Report the worker failed during sequence synchronization. Abort
     749              :              * the current transaction so that the stats message is sent in an
     750              :              * idle state.
     751              :              */
     752            4 :             AbortOutOfAnyTransaction();
     753            4 :             pgstat_report_subscription_error(MySubscription->oid);
     754              : 
     755            4 :             PG_RE_THROW();
     756              :         }
     757              :     }
     758            5 :     PG_END_TRY();
     759            5 : }
     760              : 
     761              : /* Logical Replication sequencesync worker entry point */
     762              : void
     763            9 : SequenceSyncWorkerMain(Datum main_arg)
     764              : {
     765            9 :     int         worker_slot = DatumGetInt32(main_arg);
     766              : 
     767            9 :     SetupApplyOrSyncWorker(worker_slot);
     768              : 
     769            9 :     start_sequence_sync();
     770              : 
     771            5 :     FinishSyncWorker();
     772              : }
        

Generated by: LCOV version 2.0-1