Line data Source code
1 : /*------------------------------------------------------------------------- 2 : * syncutils.c 3 : * PostgreSQL logical replication: common synchronization code 4 : * 5 : * Copyright (c) 2025, PostgreSQL Global Development Group 6 : * 7 : * IDENTIFICATION 8 : * src/backend/replication/logical/syncutils.c 9 : * 10 : * NOTES 11 : * This file contains code common for synchronization workers. 12 : *------------------------------------------------------------------------- 13 : */ 14 : 15 : #include "postgres.h" 16 : 17 : #include "catalog/pg_subscription_rel.h" 18 : #include "pgstat.h" 19 : #include "replication/worker_internal.h" 20 : #include "storage/ipc.h" 21 : #include "utils/lsyscache.h" 22 : #include "utils/memutils.h" 23 : 24 : /* 25 : * Enum for phases of the subscription relations state. 26 : * 27 : * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations 28 : * state is no longer valid, and the subscription relations should be rebuilt. 29 : * 30 : * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription 31 : * relations state is being rebuilt. 32 : * 33 : * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is 34 : * up-to-date and valid. 35 : */ 36 : typedef enum 37 : { 38 : SYNC_RELATIONS_STATE_NEEDS_REBUILD, 39 : SYNC_RELATIONS_STATE_REBUILD_STARTED, 40 : SYNC_RELATIONS_STATE_VALID, 41 : } SyncingRelationsState; 42 : 43 : static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; 44 : 45 : /* 46 : * Exit routine for synchronization worker. 47 : */ 48 : pg_noreturn void 49 366 : FinishSyncWorker(void) 50 : { 51 : /* 52 : * Commit any outstanding transaction. This is the usual case, unless 53 : * there was nothing to do for the table. 54 : */ 55 366 : if (IsTransactionState()) 56 : { 57 366 : CommitTransactionCommand(); 58 366 : pgstat_report_stat(true); 59 : } 60 : 61 : /* And flush all writes. */ 62 366 : XLogFlush(GetXLogWriteRecPtr()); 63 : 64 366 : StartTransactionCommand(); 65 366 : ereport(LOG, 66 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", 67 : MySubscription->name, 68 : get_rel_name(MyLogicalRepWorker->relid)))); 69 366 : CommitTransactionCommand(); 70 : 71 : /* Find the leader apply worker and signal it. */ 72 366 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); 73 : 74 : /* Stop gracefully */ 75 366 : proc_exit(0); 76 : } 77 : 78 : /* 79 : * Callback from syscache invalidation. 80 : */ 81 : void 82 3406 : InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue) 83 : { 84 3406 : relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD; 85 3406 : } 86 : 87 : /* 88 : * Process possible state change(s) of relations that are being synchronized. 89 : */ 90 : void 91 11334 : ProcessSyncingRelations(XLogRecPtr current_lsn) 92 : { 93 11334 : switch (MyLogicalRepWorker->type) 94 : { 95 44 : case WORKERTYPE_PARALLEL_APPLY: 96 : 97 : /* 98 : * Skip for parallel apply workers because they only operate on 99 : * tables that are in a READY state. See pa_can_start() and 100 : * should_apply_changes_for_rel(). 101 : */ 102 44 : break; 103 : 104 408 : case WORKERTYPE_TABLESYNC: 105 408 : ProcessSyncingTablesForSync(current_lsn); 106 42 : break; 107 : 108 10882 : case WORKERTYPE_APPLY: 109 10882 : ProcessSyncingTablesForApply(current_lsn); 110 10868 : break; 111 : 112 0 : case WORKERTYPE_UNKNOWN: 113 : /* Should never happen. */ 114 0 : elog(ERROR, "Unknown worker type"); 115 : } 116 10954 : } 117 : 118 : /* 119 : * Common code to fetch the up-to-date sync state info into the static lists. 120 : * 121 : * Returns true if subscription has 1 or more tables, else false. 122 : * 123 : * Note: If this function started the transaction (indicated by the parameter) 124 : * then it is the caller's responsibility to commit it. 125 : */ 126 : bool 127 11528 : FetchRelationStates(bool *started_tx) 128 : { 129 : static bool has_subtables = false; 130 : 131 11528 : *started_tx = false; 132 : 133 11528 : if (relation_states_validity != SYNC_RELATIONS_STATE_VALID) 134 : { 135 : MemoryContext oldctx; 136 : List *rstates; 137 : ListCell *lc; 138 : SubscriptionRelState *rstate; 139 : 140 1764 : relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED; 141 : 142 : /* Clean the old lists. */ 143 1764 : list_free_deep(table_states_not_ready); 144 1764 : table_states_not_ready = NIL; 145 : 146 1764 : if (!IsTransactionState()) 147 : { 148 1730 : StartTransactionCommand(); 149 1730 : *started_tx = true; 150 : } 151 : 152 : /* Fetch tables and sequences that are in non-ready state. */ 153 1764 : rstates = GetSubscriptionRelations(MySubscription->oid, true); 154 : 155 : /* Allocate the tracking info in a permanent memory context. */ 156 1764 : oldctx = MemoryContextSwitchTo(CacheMemoryContext); 157 4558 : foreach(lc, rstates) 158 : { 159 2794 : rstate = palloc(sizeof(SubscriptionRelState)); 160 2794 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); 161 2794 : table_states_not_ready = lappend(table_states_not_ready, rstate); 162 : } 163 1764 : MemoryContextSwitchTo(oldctx); 164 : 165 : /* 166 : * Does the subscription have tables? 167 : * 168 : * If there were not-READY tables found then we know it does. But if 169 : * table_states_not_ready was empty we still need to check again to 170 : * see if there are 0 tables. 171 : */ 172 2284 : has_subtables = (table_states_not_ready != NIL) || 173 520 : HasSubscriptionTables(MySubscription->oid); 174 : 175 : /* 176 : * If the subscription relation cache has been invalidated since we 177 : * entered this routine, we still use and return the relations we just 178 : * finished constructing, to avoid infinite loops, but we leave the 179 : * table states marked as stale so that we'll rebuild it again on next 180 : * access. Otherwise, we mark the table states as valid. 181 : */ 182 1764 : if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED) 183 1764 : relation_states_validity = SYNC_RELATIONS_STATE_VALID; 184 : } 185 : 186 11528 : return has_subtables; 187 : }