Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * syncutils.c
3 : * PostgreSQL logical replication: common synchronization code
4 : *
5 : * Copyright (c) 2025, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/syncutils.c
9 : *
10 : * NOTES
11 : * This file contains code common for synchronization workers.
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "catalog/pg_subscription_rel.h"
18 : #include "pgstat.h"
19 : #include "replication/logicallauncher.h"
20 : #include "replication/worker_internal.h"
21 : #include "storage/ipc.h"
22 : #include "utils/lsyscache.h"
23 : #include "utils/memutils.h"
24 :
25 : /*
26 : * Enum for phases of the subscription relations state.
27 : *
28 : * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
29 : * state is no longer valid, and the subscription relations should be rebuilt.
30 : *
31 : * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
32 : * relations state is being rebuilt.
33 : *
34 : * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
35 : * up-to-date and valid.
36 : */
37 : typedef enum
38 : {
39 : SYNC_RELATIONS_STATE_NEEDS_REBUILD,
40 : SYNC_RELATIONS_STATE_REBUILD_STARTED,
41 : SYNC_RELATIONS_STATE_VALID,
42 : } SyncingRelationsState;
43 :
44 : static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
45 :
46 : /*
47 : * Exit routine for synchronization worker.
48 : */
49 : pg_noreturn void
50 378 : FinishSyncWorker(void)
51 : {
52 : Assert(am_sequencesync_worker() || am_tablesync_worker());
53 :
54 : /*
55 : * Commit any outstanding transaction. This is the usual case, unless
56 : * there was nothing to do for the table.
57 : */
58 378 : if (IsTransactionState())
59 : {
60 368 : CommitTransactionCommand();
61 368 : pgstat_report_stat(true);
62 : }
63 :
64 : /* And flush all writes. */
65 378 : XLogFlush(GetXLogWriteRecPtr());
66 :
67 378 : if (am_sequencesync_worker())
68 : {
69 10 : ereport(LOG,
70 : errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
71 : MySubscription->name));
72 :
73 : /*
74 : * Reset last_seqsync_start_time, so that next time a sequencesync
75 : * worker is needed it can be started promptly.
76 : */
77 10 : logicalrep_reset_seqsync_start_time();
78 : }
79 : else
80 : {
81 368 : StartTransactionCommand();
82 368 : ereport(LOG,
83 : errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
84 : MySubscription->name,
85 : get_rel_name(MyLogicalRepWorker->relid)));
86 368 : CommitTransactionCommand();
87 :
88 : /* Find the leader apply worker and signal it. */
89 368 : logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
90 : InvalidOid);
91 : }
92 :
93 : /* Stop gracefully */
94 378 : proc_exit(0);
95 : }
96 :
97 : /*
98 : * Callback from syscache invalidation.
99 : */
100 : void
101 3504 : InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
102 : {
103 3504 : relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
104 3504 : }
105 :
106 : /*
107 : * Attempt to launch a sync worker for one or more sequences or a table, if
108 : * a worker slot is available and the retry interval has elapsed.
109 : *
110 : * wtype: sync worker type.
111 : * nsyncworkers: Number of currently running sync workers for the subscription.
112 : * relid: InvalidOid for sequencesync worker, actual relid for tablesync
113 : * worker.
114 : * last_start_time: Pointer to the last start time of the worker.
115 : */
116 : void
117 1598 : launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
118 : TimestampTz *last_start_time)
119 : {
120 : TimestampTz now;
121 :
122 : Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
123 : (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
124 :
125 : /* If there is a free sync worker slot, start a new sync worker */
126 1598 : if (nsyncworkers >= max_sync_workers_per_subscription)
127 1144 : return;
128 :
129 454 : now = GetCurrentTimestamp();
130 :
131 516 : if (!(*last_start_time) ||
132 62 : TimestampDifferenceExceeds(*last_start_time, now,
133 : wal_retrieve_retry_interval))
134 : {
135 : /*
136 : * Set the last_start_time even if we fail to start the worker, so
137 : * that we won't retry until wal_retrieve_retry_interval has elapsed.
138 : */
139 410 : *last_start_time = now;
140 410 : (void) logicalrep_worker_launch(wtype,
141 410 : MyLogicalRepWorker->dbid,
142 410 : MySubscription->oid,
143 410 : MySubscription->name,
144 410 : MyLogicalRepWorker->userid,
145 : relid, DSM_HANDLE_INVALID, false);
146 : }
147 : }
148 :
149 : /*
150 : * Process possible state change(s) of relations that are being synchronized
151 : * and start new tablesync workers for the newly added tables. Also, start a
152 : * new sequencesync worker for the newly added sequences.
153 : */
154 : void
155 12458 : ProcessSyncingRelations(XLogRecPtr current_lsn)
156 : {
157 12458 : switch (MyLogicalRepWorker->type)
158 : {
159 44 : case WORKERTYPE_PARALLEL_APPLY:
160 :
161 : /*
162 : * Skip for parallel apply workers because they only operate on
163 : * tables that are in a READY state. See pa_can_start() and
164 : * should_apply_changes_for_rel().
165 : */
166 44 : break;
167 :
168 426 : case WORKERTYPE_TABLESYNC:
169 426 : ProcessSyncingTablesForSync(current_lsn);
170 58 : break;
171 :
172 11988 : case WORKERTYPE_APPLY:
173 11988 : ProcessSyncingTablesForApply(current_lsn);
174 11976 : ProcessSequencesForSync();
175 11976 : break;
176 :
177 0 : case WORKERTYPE_SEQUENCESYNC:
178 : /* Should never happen. */
179 0 : elog(ERROR, "sequence synchronization worker is not expected to process relations");
180 : break;
181 :
182 0 : case WORKERTYPE_UNKNOWN:
183 : /* Should never happen. */
184 0 : elog(ERROR, "Unknown worker type");
185 : }
186 12078 : }
187 :
188 : /*
189 : * Common code to fetch the up-to-date sync state info for tables and sequences.
190 : *
191 : * The pg_subscription_rel catalog is shared by tables and sequences. Changes
192 : * to either sequences or tables can affect the validity of relation states, so
193 : * we identify non-READY tables and non-READY sequences together to ensure
194 : * consistency.
195 : *
196 : * has_pending_subtables: true if the subscription has one or more tables that
197 : * are not in READY state, otherwise false.
198 : * has_pending_subsequences: true if the subscription has one or more sequences
199 : * that are not in READY state, otherwise false.
200 : */
201 : void
202 24686 : FetchRelationStates(bool *has_pending_subtables,
203 : bool *has_pending_subsequences,
204 : bool *started_tx)
205 : {
206 : /*
207 : * has_subtables and has_subsequences_non_ready are declared as static,
208 : * since the same value can be used until the system table is invalidated.
209 : */
210 : static bool has_subtables = false;
211 : static bool has_subsequences_non_ready = false;
212 :
213 24686 : *started_tx = false;
214 :
215 24686 : if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
216 : {
217 : MemoryContext oldctx;
218 : List *rstates;
219 : SubscriptionRelState *rstate;
220 :
221 1894 : relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
222 1894 : has_subsequences_non_ready = false;
223 :
224 : /* Clean the old lists. */
225 1894 : list_free_deep(table_states_not_ready);
226 1894 : table_states_not_ready = NIL;
227 :
228 1894 : if (!IsTransactionState())
229 : {
230 1856 : StartTransactionCommand();
231 1856 : *started_tx = true;
232 : }
233 :
234 : /* Fetch tables and sequences that are in non-READY state. */
235 1894 : rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
236 : true);
237 :
238 : /* Allocate the tracking info in a permanent memory context. */
239 1894 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
240 7176 : foreach_ptr(SubscriptionRelState, subrel, rstates)
241 : {
242 3388 : if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
243 28 : has_subsequences_non_ready = true;
244 : else
245 : {
246 3360 : rstate = palloc(sizeof(SubscriptionRelState));
247 3360 : memcpy(rstate, subrel, sizeof(SubscriptionRelState));
248 3360 : table_states_not_ready = lappend(table_states_not_ready,
249 : rstate);
250 : }
251 : }
252 1894 : MemoryContextSwitchTo(oldctx);
253 :
254 : /*
255 : * Does the subscription have tables?
256 : *
257 : * If there were not-READY tables found then we know it does. But if
258 : * table_states_not_ready was empty we still need to check again to
259 : * see if there are 0 tables.
260 : */
261 2420 : has_subtables = (table_states_not_ready != NIL) ||
262 526 : HasSubscriptionTables(MySubscription->oid);
263 :
264 : /*
265 : * If the subscription relation cache has been invalidated since we
266 : * entered this routine, we still use and return the relations we just
267 : * finished constructing, to avoid infinite loops, but we leave the
268 : * table states marked as stale so that we'll rebuild it again on next
269 : * access. Otherwise, we mark the table states as valid.
270 : */
271 1894 : if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
272 1876 : relation_states_validity = SYNC_RELATIONS_STATE_VALID;
273 : }
274 :
275 24686 : if (has_pending_subtables)
276 722 : *has_pending_subtables = has_subtables;
277 :
278 24686 : if (has_pending_subsequences)
279 11976 : *has_pending_subsequences = has_subsequences_non_ready;
280 24686 : }
|