LCOV - code coverage report
Current view: top level - src/backend/replication/logical - syncutils.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 68 72 94.4 %
Date: 2025-11-16 08:18:22 Functions: 5 5 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * syncutils.c
       3             :  *    PostgreSQL logical replication: common synchronization code
       4             :  *
       5             :  * Copyright (c) 2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/syncutils.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code common for synchronization workers.
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "catalog/pg_subscription_rel.h"
      18             : #include "pgstat.h"
      19             : #include "replication/logicallauncher.h"
      20             : #include "replication/worker_internal.h"
      21             : #include "storage/ipc.h"
      22             : #include "utils/lsyscache.h"
      23             : #include "utils/memutils.h"
      24             : 
      25             : /*
      26             :  * Enum for phases of the subscription relations state.
      27             :  *
      28             :  * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
      29             :  * state is no longer valid, and the subscription relations should be rebuilt.
      30             :  *
      31             :  * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
      32             :  * relations state is being rebuilt.
      33             :  *
      34             :  * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
      35             :  * up-to-date and valid.
      36             :  */
      37             : typedef enum
      38             : {
      39             :     SYNC_RELATIONS_STATE_NEEDS_REBUILD,
      40             :     SYNC_RELATIONS_STATE_REBUILD_STARTED,
      41             :     SYNC_RELATIONS_STATE_VALID,
      42             : } SyncingRelationsState;
      43             : 
      44             : static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
      45             : 
      46             : /*
      47             :  * Exit routine for synchronization worker.
      48             :  */
      49             : pg_noreturn void
      50         378 : FinishSyncWorker(void)
      51             : {
      52             :     Assert(am_sequencesync_worker() || am_tablesync_worker());
      53             : 
      54             :     /*
      55             :      * Commit any outstanding transaction. This is the usual case, unless
      56             :      * there was nothing to do for the table.
      57             :      */
      58         378 :     if (IsTransactionState())
      59             :     {
      60         368 :         CommitTransactionCommand();
      61         368 :         pgstat_report_stat(true);
      62             :     }
      63             : 
      64             :     /* And flush all writes. */
      65         378 :     XLogFlush(GetXLogWriteRecPtr());
      66             : 
      67         378 :     if (am_sequencesync_worker())
      68             :     {
      69          10 :         ereport(LOG,
      70             :                 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
      71             :                        MySubscription->name));
      72             : 
      73             :         /*
      74             :          * Reset last_seqsync_start_time, so that next time a sequencesync
      75             :          * worker is needed it can be started promptly.
      76             :          */
      77          10 :         logicalrep_reset_seqsync_start_time();
      78             :     }
      79             :     else
      80             :     {
      81         368 :         StartTransactionCommand();
      82         368 :         ereport(LOG,
      83             :                 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
      84             :                        MySubscription->name,
      85             :                        get_rel_name(MyLogicalRepWorker->relid)));
      86         368 :         CommitTransactionCommand();
      87             : 
      88             :         /* Find the leader apply worker and signal it. */
      89         368 :         logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
      90             :                                  InvalidOid);
      91             :     }
      92             : 
      93             :     /* Stop gracefully */
      94         378 :     proc_exit(0);
      95             : }
      96             : 
      97             : /*
      98             :  * Callback from syscache invalidation.
      99             :  */
     100             : void
     101        3504 : InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
     102             : {
     103        3504 :     relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
     104        3504 : }
     105             : 
     106             : /*
     107             :  * Attempt to launch a sync worker for one or more sequences or a table, if
     108             :  * a worker slot is available and the retry interval has elapsed.
     109             :  *
     110             :  * wtype: sync worker type.
     111             :  * nsyncworkers: Number of currently running sync workers for the subscription.
     112             :  * relid:  InvalidOid for sequencesync worker, actual relid for tablesync
     113             :  * worker.
     114             :  * last_start_time: Pointer to the last start time of the worker.
     115             :  */
     116             : void
     117        1598 : launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
     118             :                    TimestampTz *last_start_time)
     119             : {
     120             :     TimestampTz now;
     121             : 
     122             :     Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
     123             :            (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
     124             : 
     125             :     /* If there is a free sync worker slot, start a new sync worker */
     126        1598 :     if (nsyncworkers >= max_sync_workers_per_subscription)
     127        1144 :         return;
     128             : 
     129         454 :     now = GetCurrentTimestamp();
     130             : 
     131         516 :     if (!(*last_start_time) ||
     132          62 :         TimestampDifferenceExceeds(*last_start_time, now,
     133             :                                    wal_retrieve_retry_interval))
     134             :     {
     135             :         /*
     136             :          * Set the last_start_time even if we fail to start the worker, so
     137             :          * that we won't retry until wal_retrieve_retry_interval has elapsed.
     138             :          */
     139         410 :         *last_start_time = now;
     140         410 :         (void) logicalrep_worker_launch(wtype,
     141         410 :                                         MyLogicalRepWorker->dbid,
     142         410 :                                         MySubscription->oid,
     143         410 :                                         MySubscription->name,
     144         410 :                                         MyLogicalRepWorker->userid,
     145             :                                         relid, DSM_HANDLE_INVALID, false);
     146             :     }
     147             : }
     148             : 
     149             : /*
     150             :  * Process possible state change(s) of relations that are being synchronized
     151             :  * and start new tablesync workers for the newly added tables. Also, start a
     152             :  * new sequencesync worker for the newly added sequences.
     153             :  */
     154             : void
     155       12458 : ProcessSyncingRelations(XLogRecPtr current_lsn)
     156             : {
     157       12458 :     switch (MyLogicalRepWorker->type)
     158             :     {
     159          44 :         case WORKERTYPE_PARALLEL_APPLY:
     160             : 
     161             :             /*
     162             :              * Skip for parallel apply workers because they only operate on
     163             :              * tables that are in a READY state. See pa_can_start() and
     164             :              * should_apply_changes_for_rel().
     165             :              */
     166          44 :             break;
     167             : 
     168         426 :         case WORKERTYPE_TABLESYNC:
     169         426 :             ProcessSyncingTablesForSync(current_lsn);
     170          58 :             break;
     171             : 
     172       11988 :         case WORKERTYPE_APPLY:
     173       11988 :             ProcessSyncingTablesForApply(current_lsn);
     174       11976 :             ProcessSequencesForSync();
     175       11976 :             break;
     176             : 
     177           0 :         case WORKERTYPE_SEQUENCESYNC:
     178             :             /* Should never happen. */
     179           0 :             elog(ERROR, "sequence synchronization worker is not expected to process relations");
     180             :             break;
     181             : 
     182           0 :         case WORKERTYPE_UNKNOWN:
     183             :             /* Should never happen. */
     184           0 :             elog(ERROR, "Unknown worker type");
     185             :     }
     186       12078 : }
     187             : 
     188             : /*
     189             :  * Common code to fetch the up-to-date sync state info for tables and sequences.
     190             :  *
     191             :  * The pg_subscription_rel catalog is shared by tables and sequences. Changes
     192             :  * to either sequences or tables can affect the validity of relation states, so
     193             :  * we identify non-READY tables and non-READY sequences together to ensure
     194             :  * consistency.
     195             :  *
     196             :  * has_pending_subtables: true if the subscription has one or more tables that
     197             :  * are not in READY state, otherwise false.
     198             :  * has_pending_subsequences: true if the subscription has one or more sequences
     199             :  * that are not in READY state, otherwise false.
     200             :  */
     201             : void
     202       24686 : FetchRelationStates(bool *has_pending_subtables,
     203             :                     bool *has_pending_subsequences,
     204             :                     bool *started_tx)
     205             : {
     206             :     /*
     207             :      * has_subtables and has_subsequences_non_ready are declared as static,
     208             :      * since the same value can be used until the system table is invalidated.
     209             :      */
     210             :     static bool has_subtables = false;
     211             :     static bool has_subsequences_non_ready = false;
     212             : 
     213       24686 :     *started_tx = false;
     214             : 
     215       24686 :     if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
     216             :     {
     217             :         MemoryContext oldctx;
     218             :         List       *rstates;
     219             :         SubscriptionRelState *rstate;
     220             : 
     221        1894 :         relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
     222        1894 :         has_subsequences_non_ready = false;
     223             : 
     224             :         /* Clean the old lists. */
     225        1894 :         list_free_deep(table_states_not_ready);
     226        1894 :         table_states_not_ready = NIL;
     227             : 
     228        1894 :         if (!IsTransactionState())
     229             :         {
     230        1856 :             StartTransactionCommand();
     231        1856 :             *started_tx = true;
     232             :         }
     233             : 
     234             :         /* Fetch tables and sequences that are in non-READY state. */
     235        1894 :         rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
     236             :                                            true);
     237             : 
     238             :         /* Allocate the tracking info in a permanent memory context. */
     239        1894 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     240        7176 :         foreach_ptr(SubscriptionRelState, subrel, rstates)
     241             :         {
     242        3388 :             if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
     243          28 :                 has_subsequences_non_ready = true;
     244             :             else
     245             :             {
     246        3360 :                 rstate = palloc(sizeof(SubscriptionRelState));
     247        3360 :                 memcpy(rstate, subrel, sizeof(SubscriptionRelState));
     248        3360 :                 table_states_not_ready = lappend(table_states_not_ready,
     249             :                                                  rstate);
     250             :             }
     251             :         }
     252        1894 :         MemoryContextSwitchTo(oldctx);
     253             : 
     254             :         /*
     255             :          * Does the subscription have tables?
     256             :          *
     257             :          * If there were not-READY tables found then we know it does. But if
     258             :          * table_states_not_ready was empty we still need to check again to
     259             :          * see if there are 0 tables.
     260             :          */
     261        2420 :         has_subtables = (table_states_not_ready != NIL) ||
     262         526 :             HasSubscriptionTables(MySubscription->oid);
     263             : 
     264             :         /*
     265             :          * If the subscription relation cache has been invalidated since we
     266             :          * entered this routine, we still use and return the relations we just
     267             :          * finished constructing, to avoid infinite loops, but we leave the
     268             :          * table states marked as stale so that we'll rebuild it again on next
     269             :          * access. Otherwise, we mark the table states as valid.
     270             :          */
     271        1894 :         if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
     272        1876 :             relation_states_validity = SYNC_RELATIONS_STATE_VALID;
     273             :     }
     274             : 
     275       24686 :     if (has_pending_subtables)
     276         722 :         *has_pending_subtables = has_subtables;
     277             : 
     278       24686 :     if (has_pending_subsequences)
     279       11976 :         *has_pending_subsequences = has_subsequences_non_ready;
     280       24686 : }

Generated by: LCOV version 1.16