Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * syncutils.c
3 : * PostgreSQL logical replication: common synchronization code
4 : *
5 : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/syncutils.c
9 : *
10 : * NOTES
11 : * This file contains code common for synchronization workers.
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "catalog/pg_subscription_rel.h"
18 : #include "pgstat.h"
19 : #include "replication/logicallauncher.h"
20 : #include "replication/worker_internal.h"
21 : #include "storage/ipc.h"
22 : #include "utils/lsyscache.h"
23 : #include "utils/memutils.h"
24 :
25 : /*
26 : * Enum for phases of the subscription relations state.
27 : *
28 : * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
29 : * state is no longer valid, and the subscription relations should be rebuilt.
30 : *
31 : * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
32 : * relations state is being rebuilt.
33 : *
34 : * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
35 : * up-to-date and valid.
36 : */
37 : typedef enum
38 : {
39 : SYNC_RELATIONS_STATE_NEEDS_REBUILD,
40 : SYNC_RELATIONS_STATE_REBUILD_STARTED,
41 : SYNC_RELATIONS_STATE_VALID,
42 : } SyncingRelationsState;
43 :
44 : static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
45 :
46 : /*
47 : * Exit routine for synchronization worker.
48 : */
49 : pg_noreturn void
50 198 : FinishSyncWorker(void)
51 : {
52 : Assert(am_sequencesync_worker() || am_tablesync_worker());
53 :
54 : /*
55 : * Commit any outstanding transaction. This is the usual case, unless
56 : * there was nothing to do for the table.
57 : */
58 198 : if (IsTransactionState())
59 : {
60 193 : CommitTransactionCommand();
61 193 : pgstat_report_stat(true);
62 : }
63 :
64 : /* And flush all writes. */
65 198 : XLogFlush(GetXLogWriteRecPtr());
66 :
67 198 : if (am_sequencesync_worker())
68 : {
69 5 : ereport(LOG,
70 : errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
71 : MySubscription->name));
72 :
73 : /*
74 : * Reset last_seqsync_start_time, so that next time a sequencesync
75 : * worker is needed it can be started promptly.
76 : */
77 5 : logicalrep_reset_seqsync_start_time();
78 : }
79 : else
80 : {
81 193 : StartTransactionCommand();
82 193 : ereport(LOG,
83 : errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
84 : MySubscription->name,
85 : get_rel_name(MyLogicalRepWorker->relid)));
86 193 : CommitTransactionCommand();
87 :
88 : /* Find the leader apply worker and signal it. */
89 193 : logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
90 : InvalidOid);
91 : }
92 :
93 : /* Stop gracefully */
94 198 : proc_exit(0);
95 : }
96 :
97 : /*
98 : * Callback from syscache invalidation.
99 : */
100 : void
101 2037 : InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid,
102 : uint32 hashvalue)
103 : {
104 2037 : relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
105 2037 : }
106 :
107 : /*
108 : * Attempt to launch a sync worker for one or more sequences or a table, if
109 : * a worker slot is available and the retry interval has elapsed.
110 : *
111 : * wtype: sync worker type.
112 : * nsyncworkers: Number of currently running sync workers for the subscription.
113 : * relid: InvalidOid for sequencesync worker, actual relid for tablesync
114 : * worker.
115 : * last_start_time: Pointer to the last start time of the worker.
116 : */
117 : void
118 906 : launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
119 : TimestampTz *last_start_time)
120 : {
121 : TimestampTz now;
122 :
123 : Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
124 : (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
125 :
126 : /* If there is a free sync worker slot, start a new sync worker */
127 906 : if (nsyncworkers >= max_sync_workers_per_subscription)
128 664 : return;
129 :
130 242 : now = GetCurrentTimestamp();
131 :
132 278 : if (!(*last_start_time) ||
133 36 : TimestampDifferenceExceeds(*last_start_time, now,
134 : wal_retrieve_retry_interval))
135 : {
136 : /*
137 : * Set the last_start_time even if we fail to start the worker, so
138 : * that we won't retry until wal_retrieve_retry_interval has elapsed.
139 : */
140 218 : *last_start_time = now;
141 218 : (void) logicalrep_worker_launch(wtype,
142 218 : MyLogicalRepWorker->dbid,
143 218 : MySubscription->oid,
144 218 : MySubscription->name,
145 218 : MyLogicalRepWorker->userid,
146 : relid, DSM_HANDLE_INVALID, false);
147 : }
148 : }
149 :
150 : /*
151 : * Process possible state change(s) of relations that are being synchronized
152 : * and start new tablesync workers for the newly added tables. Also, start a
153 : * new sequencesync worker for the newly added sequences.
154 : */
155 : void
156 8969 : ProcessSyncingRelations(XLogRecPtr current_lsn)
157 : {
158 8969 : switch (MyLogicalRepWorker->type)
159 : {
160 23 : case WORKERTYPE_PARALLEL_APPLY:
161 :
162 : /*
163 : * Skip for parallel apply workers because they only operate on
164 : * tables that are in a READY state. See pa_can_start() and
165 : * should_apply_changes_for_rel().
166 : */
167 23 : break;
168 :
169 227 : case WORKERTYPE_TABLESYNC:
170 227 : ProcessSyncingTablesForSync(current_lsn);
171 34 : break;
172 :
173 8719 : case WORKERTYPE_APPLY:
174 8719 : ProcessSyncingTablesForApply(current_lsn);
175 8713 : ProcessSequencesForSync();
176 8713 : break;
177 :
178 0 : case WORKERTYPE_SEQUENCESYNC:
179 : /* Should never happen. */
180 0 : elog(ERROR, "sequence synchronization worker is not expected to process relations");
181 : break;
182 :
183 0 : case WORKERTYPE_UNKNOWN:
184 : /* Should never happen. */
185 0 : elog(ERROR, "Unknown worker type");
186 : }
187 8770 : }
188 :
189 : /*
190 : * Common code to fetch the up-to-date sync state info for tables and sequences.
191 : *
192 : * The pg_subscription_rel catalog is shared by tables and sequences. Changes
193 : * to either sequences or tables can affect the validity of relation states, so
194 : * we identify non-READY tables and non-READY sequences together to ensure
195 : * consistency.
196 : *
197 : * has_pending_subtables: true if the subscription has one or more tables that
198 : * are not in READY state, otherwise false.
199 : * has_pending_subsequences: true if the subscription has one or more sequences
200 : * that are not in READY state, otherwise false.
201 : */
202 : void
203 17771 : FetchRelationStates(bool *has_pending_subtables,
204 : bool *has_pending_subsequences,
205 : bool *started_tx)
206 : {
207 : /*
208 : * has_subtables and has_subsequences_non_ready are declared as static,
209 : * since the same value can be used until the system table is invalidated.
210 : */
211 : static bool has_subtables = false;
212 : static bool has_subsequences_non_ready = false;
213 :
214 17771 : *started_tx = false;
215 :
216 17771 : if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
217 : {
218 : MemoryContext oldctx;
219 : List *rstates;
220 : SubscriptionRelState *rstate;
221 :
222 1014 : relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
223 1014 : has_subsequences_non_ready = false;
224 :
225 : /* Clean the old lists. */
226 1014 : list_free_deep(table_states_not_ready);
227 1014 : table_states_not_ready = NIL;
228 :
229 1014 : if (!IsTransactionState())
230 : {
231 997 : StartTransactionCommand();
232 997 : *started_tx = true;
233 : }
234 :
235 : /* Fetch tables and sequences that are in non-READY state. */
236 1014 : rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
237 : true);
238 :
239 : /* Allocate the tracking info in a permanent memory context. */
240 1014 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
241 3809 : foreach_ptr(SubscriptionRelState, subrel, rstates)
242 : {
243 1781 : if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
244 16 : has_subsequences_non_ready = true;
245 : else
246 : {
247 1765 : rstate = palloc_object(SubscriptionRelState);
248 1765 : memcpy(rstate, subrel, sizeof(SubscriptionRelState));
249 1765 : table_states_not_ready = lappend(table_states_not_ready,
250 : rstate);
251 : }
252 : }
253 1014 : MemoryContextSwitchTo(oldctx);
254 :
255 : /*
256 : * Does the subscription have tables?
257 : *
258 : * If there were not-READY tables found then we know it does. But if
259 : * table_states_not_ready was empty we still need to check again to
260 : * see if there are 0 tables.
261 : */
262 1309 : has_subtables = (table_states_not_ready != NIL) ||
263 295 : HasSubscriptionTables(MySubscription->oid);
264 :
265 : /*
266 : * If the subscription relation cache has been invalidated since we
267 : * entered this routine, we still use and return the relations we just
268 : * finished constructing, to avoid infinite loops, but we leave the
269 : * table states marked as stale so that we'll rebuild it again on next
270 : * access. Otherwise, we mark the table states as valid.
271 : */
272 1014 : if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
273 1005 : relation_states_validity = SYNC_RELATIONS_STATE_VALID;
274 : }
275 :
276 17771 : if (has_pending_subtables)
277 339 : *has_pending_subtables = has_subtables;
278 :
279 17771 : if (has_pending_subsequences)
280 8713 : *has_pending_subsequences = has_subsequences_non_ready;
281 17771 : }
|