LCOV - code coverage report
Current view: top level - src/backend/replication/logical - syncutils.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.4 % 72 68
Test Date: 2026-03-14 14:14:39 Functions: 100.0 % 5 5
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * syncutils.c
       3              :  *    PostgreSQL logical replication: common synchronization code
       4              :  *
       5              :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       6              :  *
       7              :  * IDENTIFICATION
       8              :  *    src/backend/replication/logical/syncutils.c
       9              :  *
      10              :  * NOTES
      11              :  *    This file contains code common for synchronization workers.
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "catalog/pg_subscription_rel.h"
      18              : #include "pgstat.h"
      19              : #include "replication/logicallauncher.h"
      20              : #include "replication/worker_internal.h"
      21              : #include "storage/ipc.h"
      22              : #include "utils/lsyscache.h"
      23              : #include "utils/memutils.h"
      24              : 
      25              : /*
      26              :  * Enum for phases of the subscription relations state.
      27              :  *
      28              :  * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
      29              :  * state is no longer valid, and the subscription relations should be rebuilt.
      30              :  *
      31              :  * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
      32              :  * relations state is being rebuilt.
      33              :  *
      34              :  * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
      35              :  * up-to-date and valid.
      36              :  */
      37              : typedef enum
      38              : {
      39              :     SYNC_RELATIONS_STATE_NEEDS_REBUILD,
      40              :     SYNC_RELATIONS_STATE_REBUILD_STARTED,
      41              :     SYNC_RELATIONS_STATE_VALID,
      42              : } SyncingRelationsState;
      43              : 
      44              : static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
      45              : 
      46              : /*
      47              :  * Exit routine for synchronization worker.
      48              :  */
      49              : pg_noreturn void
      50          198 : FinishSyncWorker(void)
      51              : {
      52              :     Assert(am_sequencesync_worker() || am_tablesync_worker());
      53              : 
      54              :     /*
      55              :      * Commit any outstanding transaction. This is the usual case, unless
      56              :      * there was nothing to do for the table.
      57              :      */
      58          198 :     if (IsTransactionState())
      59              :     {
      60          193 :         CommitTransactionCommand();
      61          193 :         pgstat_report_stat(true);
      62              :     }
      63              : 
      64              :     /* And flush all writes. */
      65          198 :     XLogFlush(GetXLogWriteRecPtr());
      66              : 
      67          198 :     if (am_sequencesync_worker())
      68              :     {
      69            5 :         ereport(LOG,
      70              :                 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
      71              :                        MySubscription->name));
      72              : 
      73              :         /*
      74              :          * Reset last_seqsync_start_time, so that next time a sequencesync
      75              :          * worker is needed it can be started promptly.
      76              :          */
      77            5 :         logicalrep_reset_seqsync_start_time();
      78              :     }
      79              :     else
      80              :     {
      81          193 :         StartTransactionCommand();
      82          193 :         ereport(LOG,
      83              :                 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
      84              :                        MySubscription->name,
      85              :                        get_rel_name(MyLogicalRepWorker->relid)));
      86          193 :         CommitTransactionCommand();
      87              : 
      88              :         /* Find the leader apply worker and signal it. */
      89          193 :         logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
      90              :                                  InvalidOid);
      91              :     }
      92              : 
      93              :     /* Stop gracefully */
      94          198 :     proc_exit(0);
      95              : }
      96              : 
      97              : /*
      98              :  * Callback from syscache invalidation.
      99              :  */
     100              : void
     101         2037 : InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid,
     102              :                            uint32 hashvalue)
     103              : {
     104         2037 :     relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
     105         2037 : }
     106              : 
     107              : /*
     108              :  * Attempt to launch a sync worker for one or more sequences or a table, if
     109              :  * a worker slot is available and the retry interval has elapsed.
     110              :  *
     111              :  * wtype: sync worker type.
     112              :  * nsyncworkers: Number of currently running sync workers for the subscription.
     113              :  * relid:  InvalidOid for sequencesync worker, actual relid for tablesync
     114              :  * worker.
     115              :  * last_start_time: Pointer to the last start time of the worker.
     116              :  */
     117              : void
     118          906 : launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
     119              :                    TimestampTz *last_start_time)
     120              : {
     121              :     TimestampTz now;
     122              : 
     123              :     Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
     124              :            (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
     125              : 
     126              :     /* If there is a free sync worker slot, start a new sync worker */
     127          906 :     if (nsyncworkers >= max_sync_workers_per_subscription)
     128          664 :         return;
     129              : 
     130          242 :     now = GetCurrentTimestamp();
     131              : 
     132          278 :     if (!(*last_start_time) ||
     133           36 :         TimestampDifferenceExceeds(*last_start_time, now,
     134              :                                    wal_retrieve_retry_interval))
     135              :     {
     136              :         /*
     137              :          * Set the last_start_time even if we fail to start the worker, so
     138              :          * that we won't retry until wal_retrieve_retry_interval has elapsed.
     139              :          */
     140          218 :         *last_start_time = now;
     141          218 :         (void) logicalrep_worker_launch(wtype,
     142          218 :                                         MyLogicalRepWorker->dbid,
     143          218 :                                         MySubscription->oid,
     144          218 :                                         MySubscription->name,
     145          218 :                                         MyLogicalRepWorker->userid,
     146              :                                         relid, DSM_HANDLE_INVALID, false);
     147              :     }
     148              : }
     149              : 
     150              : /*
     151              :  * Process possible state change(s) of relations that are being synchronized
     152              :  * and start new tablesync workers for the newly added tables. Also, start a
     153              :  * new sequencesync worker for the newly added sequences.
     154              :  */
     155              : void
     156         8969 : ProcessSyncingRelations(XLogRecPtr current_lsn)
     157              : {
     158         8969 :     switch (MyLogicalRepWorker->type)
     159              :     {
     160           23 :         case WORKERTYPE_PARALLEL_APPLY:
     161              : 
     162              :             /*
     163              :              * Skip for parallel apply workers because they only operate on
     164              :              * tables that are in a READY state. See pa_can_start() and
     165              :              * should_apply_changes_for_rel().
     166              :              */
     167           23 :             break;
     168              : 
     169          227 :         case WORKERTYPE_TABLESYNC:
     170          227 :             ProcessSyncingTablesForSync(current_lsn);
     171           34 :             break;
     172              : 
     173         8719 :         case WORKERTYPE_APPLY:
     174         8719 :             ProcessSyncingTablesForApply(current_lsn);
     175         8713 :             ProcessSequencesForSync();
     176         8713 :             break;
     177              : 
     178            0 :         case WORKERTYPE_SEQUENCESYNC:
     179              :             /* Should never happen. */
     180            0 :             elog(ERROR, "sequence synchronization worker is not expected to process relations");
     181              :             break;
     182              : 
     183            0 :         case WORKERTYPE_UNKNOWN:
     184              :             /* Should never happen. */
     185            0 :             elog(ERROR, "Unknown worker type");
     186              :     }
     187         8770 : }
     188              : 
     189              : /*
     190              :  * Common code to fetch the up-to-date sync state info for tables and sequences.
     191              :  *
     192              :  * The pg_subscription_rel catalog is shared by tables and sequences. Changes
     193              :  * to either sequences or tables can affect the validity of relation states, so
     194              :  * we identify non-READY tables and non-READY sequences together to ensure
     195              :  * consistency.
     196              :  *
     197              :  * has_pending_subtables: true if the subscription has one or more tables that
     198              :  * are not in READY state, otherwise false.
     199              :  * has_pending_subsequences: true if the subscription has one or more sequences
     200              :  * that are not in READY state, otherwise false.
     201              :  */
     202              : void
     203        17771 : FetchRelationStates(bool *has_pending_subtables,
     204              :                     bool *has_pending_subsequences,
     205              :                     bool *started_tx)
     206              : {
     207              :     /*
     208              :      * has_subtables and has_subsequences_non_ready are declared as static,
     209              :      * since the same value can be used until the system table is invalidated.
     210              :      */
     211              :     static bool has_subtables = false;
     212              :     static bool has_subsequences_non_ready = false;
     213              : 
     214        17771 :     *started_tx = false;
     215              : 
     216        17771 :     if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
     217              :     {
     218              :         MemoryContext oldctx;
     219              :         List       *rstates;
     220              :         SubscriptionRelState *rstate;
     221              : 
     222         1014 :         relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
     223         1014 :         has_subsequences_non_ready = false;
     224              : 
     225              :         /* Clean the old lists. */
     226         1014 :         list_free_deep(table_states_not_ready);
     227         1014 :         table_states_not_ready = NIL;
     228              : 
     229         1014 :         if (!IsTransactionState())
     230              :         {
     231          997 :             StartTransactionCommand();
     232          997 :             *started_tx = true;
     233              :         }
     234              : 
     235              :         /* Fetch tables and sequences that are in non-READY state. */
     236         1014 :         rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
     237              :                                            true);
     238              : 
     239              :         /* Allocate the tracking info in a permanent memory context. */
     240         1014 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     241         3809 :         foreach_ptr(SubscriptionRelState, subrel, rstates)
     242              :         {
     243         1781 :             if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
     244           16 :                 has_subsequences_non_ready = true;
     245              :             else
     246              :             {
     247         1765 :                 rstate = palloc_object(SubscriptionRelState);
     248         1765 :                 memcpy(rstate, subrel, sizeof(SubscriptionRelState));
     249         1765 :                 table_states_not_ready = lappend(table_states_not_ready,
     250              :                                                  rstate);
     251              :             }
     252              :         }
     253         1014 :         MemoryContextSwitchTo(oldctx);
     254              : 
     255              :         /*
     256              :          * Does the subscription have tables?
     257              :          *
     258              :          * If there were not-READY tables found then we know it does. But if
     259              :          * table_states_not_ready was empty we still need to check again to
     260              :          * see if there are 0 tables.
     261              :          */
     262         1309 :         has_subtables = (table_states_not_ready != NIL) ||
     263          295 :             HasSubscriptionTables(MySubscription->oid);
     264              : 
     265              :         /*
     266              :          * If the subscription relation cache has been invalidated since we
     267              :          * entered this routine, we still use and return the relations we just
     268              :          * finished constructing, to avoid infinite loops, but we leave the
     269              :          * table states marked as stale so that we'll rebuild it again on next
     270              :          * access. Otherwise, we mark the table states as valid.
     271              :          */
     272         1014 :         if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
     273         1005 :             relation_states_validity = SYNC_RELATIONS_STATE_VALID;
     274              :     }
     275              : 
     276        17771 :     if (has_pending_subtables)
     277          339 :         *has_pending_subtables = has_subtables;
     278              : 
     279        17771 :     if (has_pending_subsequences)
     280         8713 :         *has_pending_subsequences = has_subsequences_non_ready;
     281        17771 : }
        

Generated by: LCOV version 2.0-1