LCOV - code coverage report
Current view: top level - src/backend/replication/logical - syncutils.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 45 47 95.7 %
Date: 2025-10-21 14:18:27 Functions: 4 4 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * syncutils.c
       3             :  *    PostgreSQL logical replication: common synchronization code
       4             :  *
       5             :  * Copyright (c) 2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/syncutils.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file contains code common for synchronization workers.
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "catalog/pg_subscription_rel.h"
      18             : #include "pgstat.h"
      19             : #include "replication/worker_internal.h"
      20             : #include "storage/ipc.h"
      21             : #include "utils/lsyscache.h"
      22             : #include "utils/memutils.h"
      23             : 
      24             : /*
      25             :  * Enum for phases of the subscription relations state.
      26             :  *
      27             :  * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
      28             :  * state is no longer valid, and the subscription relations should be rebuilt.
      29             :  *
      30             :  * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
      31             :  * relations state is being rebuilt.
      32             :  *
      33             :  * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
      34             :  * up-to-date and valid.
      35             :  */
      36             : typedef enum
      37             : {
      38             :     SYNC_RELATIONS_STATE_NEEDS_REBUILD,
      39             :     SYNC_RELATIONS_STATE_REBUILD_STARTED,
      40             :     SYNC_RELATIONS_STATE_VALID,
      41             : } SyncingRelationsState;
      42             : 
      43             : static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
      44             : 
      45             : /*
      46             :  * Exit routine for synchronization worker.
      47             :  */
      48             : pg_noreturn void
      49         366 : FinishSyncWorker(void)
      50             : {
      51             :     /*
      52             :      * Commit any outstanding transaction. This is the usual case, unless
      53             :      * there was nothing to do for the table.
      54             :      */
      55         366 :     if (IsTransactionState())
      56             :     {
      57         366 :         CommitTransactionCommand();
      58         366 :         pgstat_report_stat(true);
      59             :     }
      60             : 
      61             :     /* And flush all writes. */
      62         366 :     XLogFlush(GetXLogWriteRecPtr());
      63             : 
      64         366 :     StartTransactionCommand();
      65         366 :     ereport(LOG,
      66             :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
      67             :                     MySubscription->name,
      68             :                     get_rel_name(MyLogicalRepWorker->relid))));
      69         366 :     CommitTransactionCommand();
      70             : 
      71             :     /* Find the leader apply worker and signal it. */
      72         366 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
      73             : 
      74             :     /* Stop gracefully */
      75         366 :     proc_exit(0);
      76             : }
      77             : 
      78             : /*
      79             :  * Callback from syscache invalidation.
      80             :  */
      81             : void
      82        3406 : InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
      83             : {
      84        3406 :     relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
      85        3406 : }
      86             : 
      87             : /*
      88             :  * Process possible state change(s) of relations that are being synchronized.
      89             :  */
      90             : void
      91       11334 : ProcessSyncingRelations(XLogRecPtr current_lsn)
      92             : {
      93       11334 :     switch (MyLogicalRepWorker->type)
      94             :     {
      95          44 :         case WORKERTYPE_PARALLEL_APPLY:
      96             : 
      97             :             /*
      98             :              * Skip for parallel apply workers because they only operate on
      99             :              * tables that are in a READY state. See pa_can_start() and
     100             :              * should_apply_changes_for_rel().
     101             :              */
     102          44 :             break;
     103             : 
     104         408 :         case WORKERTYPE_TABLESYNC:
     105         408 :             ProcessSyncingTablesForSync(current_lsn);
     106          42 :             break;
     107             : 
     108       10882 :         case WORKERTYPE_APPLY:
     109       10882 :             ProcessSyncingTablesForApply(current_lsn);
     110       10868 :             break;
     111             : 
     112           0 :         case WORKERTYPE_UNKNOWN:
     113             :             /* Should never happen. */
     114           0 :             elog(ERROR, "Unknown worker type");
     115             :     }
     116       10954 : }
     117             : 
     118             : /*
     119             :  * Common code to fetch the up-to-date sync state info into the static lists.
     120             :  *
     121             :  * Returns true if subscription has 1 or more tables, else false.
     122             :  *
     123             :  * Note: If this function started the transaction (indicated by the parameter)
     124             :  * then it is the caller's responsibility to commit it.
     125             :  */
     126             : bool
     127       11528 : FetchRelationStates(bool *started_tx)
     128             : {
     129             :     static bool has_subtables = false;
     130             : 
     131       11528 :     *started_tx = false;
     132             : 
     133       11528 :     if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
     134             :     {
     135             :         MemoryContext oldctx;
     136             :         List       *rstates;
     137             :         ListCell   *lc;
     138             :         SubscriptionRelState *rstate;
     139             : 
     140        1764 :         relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
     141             : 
     142             :         /* Clean the old lists. */
     143        1764 :         list_free_deep(table_states_not_ready);
     144        1764 :         table_states_not_ready = NIL;
     145             : 
     146        1764 :         if (!IsTransactionState())
     147             :         {
     148        1730 :             StartTransactionCommand();
     149        1730 :             *started_tx = true;
     150             :         }
     151             : 
     152             :         /* Fetch tables and sequences that are in non-ready state. */
     153        1764 :         rstates = GetSubscriptionRelations(MySubscription->oid, true);
     154             : 
     155             :         /* Allocate the tracking info in a permanent memory context. */
     156        1764 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
     157        4558 :         foreach(lc, rstates)
     158             :         {
     159        2794 :             rstate = palloc(sizeof(SubscriptionRelState));
     160        2794 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
     161        2794 :             table_states_not_ready = lappend(table_states_not_ready, rstate);
     162             :         }
     163        1764 :         MemoryContextSwitchTo(oldctx);
     164             : 
     165             :         /*
     166             :          * Does the subscription have tables?
     167             :          *
     168             :          * If there were not-READY tables found then we know it does. But if
     169             :          * table_states_not_ready was empty we still need to check again to
     170             :          * see if there are 0 tables.
     171             :          */
     172        2284 :         has_subtables = (table_states_not_ready != NIL) ||
     173         520 :             HasSubscriptionTables(MySubscription->oid);
     174             : 
     175             :         /*
     176             :          * If the subscription relation cache has been invalidated since we
     177             :          * entered this routine, we still use and return the relations we just
     178             :          * finished constructing, to avoid infinite loops, but we leave the
     179             :          * table states marked as stale so that we'll rebuild it again on next
     180             :          * access. Otherwise, we mark the table states as valid.
     181             :          */
     182        1764 :         if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
     183        1764 :             relation_states_validity = SYNC_RELATIONS_STATE_VALID;
     184             :     }
     185             : 
     186       11528 :     return has_subtables;
     187             : }

Generated by: LCOV version 1.16