Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * worker_internal.h
4 : * Internal headers shared by logical replication workers.
5 : *
6 : * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group
7 : *
8 : * src/include/replication/worker_internal.h
9 : *
10 : *-------------------------------------------------------------------------
11 : */
12 : #ifndef WORKER_INTERNAL_H
13 : #define WORKER_INTERNAL_H
14 :
15 : #include "access/xlogdefs.h"
16 : #include "catalog/pg_subscription.h"
17 : #include "datatype/timestamp.h"
18 : #include "miscadmin.h"
19 : #include "replication/logicalrelation.h"
20 : #include "replication/walreceiver.h"
21 : #include "storage/buffile.h"
22 : #include "storage/fileset.h"
23 : #include "storage/lock.h"
24 : #include "storage/shm_mq.h"
25 : #include "storage/shm_toc.h"
26 : #include "storage/spin.h"
27 :
28 : /* Different types of worker */
29 : typedef enum LogicalRepWorkerType
30 : {
31 : WORKERTYPE_UNKNOWN = 0,
32 : WORKERTYPE_TABLESYNC,
33 : WORKERTYPE_SEQUENCESYNC,
34 : WORKERTYPE_APPLY,
35 : WORKERTYPE_PARALLEL_APPLY,
36 : } LogicalRepWorkerType;
37 :
38 : typedef struct LogicalRepWorker
39 : {
40 : /* What type of worker is this? */
41 : LogicalRepWorkerType type;
42 :
43 : /* Time at which this worker was launched. */
44 : TimestampTz launch_time;
45 :
46 : /* Indicates if this slot is used or free. */
47 : bool in_use;
48 :
49 : /* Increased every time the slot is taken by new worker. */
50 : uint16 generation;
51 :
52 : /* Pointer to proc array. NULL if not running. */
53 : PGPROC *proc;
54 :
55 : /* Database id to connect to. */
56 : Oid dbid;
57 :
58 : /* User to use for connection (will be same as owner of subscription). */
59 : Oid userid;
60 :
61 : /* Subscription id for the worker. */
62 : Oid subid;
63 :
64 : /* Used for initial table synchronization. */
65 : Oid relid;
66 : char relstate;
67 : XLogRecPtr relstate_lsn;
68 : slock_t relmutex;
69 :
70 : /*
71 : * Used to create the changes and subxact files for the streaming
72 : * transactions. Upon the arrival of the first streaming transaction or
73 : * when the first-time leader apply worker times out while sending changes
74 : * to the parallel apply worker, the fileset will be initialized, and it
75 : * will be deleted when the worker exits. Under this, separate buffiles
76 : * would be created for each transaction which will be deleted after the
77 : * transaction is finished.
78 : */
79 : FileSet *stream_fileset;
80 :
81 : /*
82 : * PID of leader apply worker if this slot is used for a parallel apply
83 : * worker, InvalidPid otherwise.
84 : */
85 : pid_t leader_pid;
86 :
87 : /* Indicates whether apply can be performed in parallel. */
88 : bool parallel_apply;
89 :
90 : /*
91 : * Changes made by this transaction and subsequent ones must be preserved.
92 : * This ensures that update_deleted conflicts can be accurately detected
93 : * during the apply phase of logical replication by this worker.
94 : *
95 : * The logical replication launcher manages an internal replication slot
96 : * named "pg_conflict_detection". It asynchronously collects this ID to
97 : * decide when to advance the xmin value of the slot.
98 : *
99 : * This ID is set to InvalidTransactionId when the apply worker stops
100 : * retaining information needed for conflict detection.
101 : */
102 : TransactionId oldest_nonremovable_xid;
103 :
104 : /* Stats. */
105 : XLogRecPtr last_lsn;
106 : TimestampTz last_send_time;
107 : TimestampTz last_recv_time;
108 : XLogRecPtr reply_lsn;
109 : TimestampTz reply_time;
110 :
111 : TimestampTz last_seqsync_start_time;
112 : } LogicalRepWorker;
113 :
114 : /*
115 : * State of the transaction in parallel apply worker.
116 : *
117 : * The enum values must have the same order as the transaction state
118 : * transitions.
119 : */
120 : typedef enum ParallelTransState
121 : {
122 : PARALLEL_TRANS_UNKNOWN,
123 : PARALLEL_TRANS_STARTED,
124 : PARALLEL_TRANS_FINISHED,
125 : } ParallelTransState;
126 :
127 : /*
128 : * State of fileset used to communicate changes from leader to parallel
129 : * apply worker.
130 : *
131 : * FS_EMPTY indicates an initial state where the leader doesn't need to use
132 : * the file to communicate with the parallel apply worker.
133 : *
134 : * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
135 : * to the file.
136 : *
137 : * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
138 : * the file.
139 : *
140 : * FS_READY indicates that it is now ok for a parallel apply worker to
141 : * read the file.
142 : */
143 : typedef enum PartialFileSetState
144 : {
145 : FS_EMPTY,
146 : FS_SERIALIZE_IN_PROGRESS,
147 : FS_SERIALIZE_DONE,
148 : FS_READY,
149 : } PartialFileSetState;
150 :
151 : /*
152 : * Struct for sharing information between leader apply worker and parallel
153 : * apply workers.
154 : */
155 : typedef struct ParallelApplyWorkerShared
156 : {
157 : slock_t mutex;
158 :
159 : TransactionId xid;
160 :
161 : /*
162 : * State used to ensure commit ordering.
163 : *
164 : * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
165 : * handling the transaction finish commands while the apply leader will
166 : * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
167 : * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
168 : * STREAM_ABORT).
169 : */
170 : ParallelTransState xact_state;
171 :
172 : /* Information from the corresponding LogicalRepWorker slot. */
173 : uint16 logicalrep_worker_generation;
174 : int logicalrep_worker_slot_no;
175 :
176 : /*
177 : * Indicates whether there are pending streaming blocks in the queue. The
178 : * parallel apply worker will check it before starting to wait.
179 : */
180 : pg_atomic_uint32 pending_stream_count;
181 :
182 : /*
183 : * XactLastCommitEnd from the parallel apply worker. This is required by
184 : * the leader worker so it can update the lsn_mappings.
185 : */
186 : XLogRecPtr last_commit_end;
187 :
188 : /*
189 : * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
190 : * serialize changes to the file, and share the fileset with the parallel
191 : * apply worker when processing the transaction finish command. Then the
192 : * parallel apply worker will apply all the spooled messages.
193 : *
194 : * FileSet is used here instead of SharedFileSet because we need it to
195 : * survive after releasing the shared memory so that the leader apply
196 : * worker can re-use the same fileset for the next streaming transaction.
197 : */
198 : PartialFileSetState fileset_state;
199 : FileSet fileset;
200 : } ParallelApplyWorkerShared;
201 :
202 : /*
203 : * Information which is used to manage the parallel apply worker.
204 : */
205 : typedef struct ParallelApplyWorkerInfo
206 : {
207 : /*
208 : * This queue is used to send changes from the leader apply worker to the
209 : * parallel apply worker.
210 : */
211 : shm_mq_handle *mq_handle;
212 :
213 : /*
214 : * This queue is used to transfer error messages from the parallel apply
215 : * worker to the leader apply worker.
216 : */
217 : shm_mq_handle *error_mq_handle;
218 :
219 : dsm_segment *dsm_seg;
220 :
221 : /*
222 : * Indicates whether the leader apply worker needs to serialize the
223 : * remaining changes to a file due to timeout when attempting to send data
224 : * to the parallel apply worker via shared memory.
225 : */
226 : bool serialize_changes;
227 :
228 : /*
229 : * True if the worker is being used to process a parallel apply
230 : * transaction. False indicates this worker is available for re-use.
231 : */
232 : bool in_use;
233 :
234 : ParallelApplyWorkerShared *shared;
235 : } ParallelApplyWorkerInfo;
236 :
237 : /* Main memory context for apply worker. Permanent during worker lifetime. */
238 : extern PGDLLIMPORT MemoryContext ApplyContext;
239 :
240 : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
241 :
242 : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
243 :
244 : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
245 :
246 : /* libpqreceiver connection */
247 : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
248 :
249 : /* Worker and subscription objects. */
250 : extern PGDLLIMPORT Subscription *MySubscription;
251 : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
252 :
253 : extern PGDLLIMPORT bool in_remote_transaction;
254 :
255 : extern PGDLLIMPORT bool InitializingApplyWorker;
256 :
257 : extern PGDLLIMPORT List *table_states_not_ready;
258 :
259 : extern void logicalrep_worker_attach(int slot);
260 : extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
261 : Oid subid, Oid relid,
262 : bool only_running);
263 : extern List *logicalrep_workers_find(Oid subid, bool only_running,
264 : bool acquire_lock);
265 : extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
266 : Oid dbid, Oid subid, const char *subname,
267 : Oid userid, Oid relid,
268 : dsm_handle subworker_dsm,
269 : bool retain_dead_tuples);
270 : extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
271 : Oid relid);
272 : extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
273 : extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
274 : Oid relid);
275 : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
276 :
277 : extern void logicalrep_reset_seqsync_start_time(void);
278 : extern int logicalrep_sync_worker_count(Oid subid);
279 :
280 : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
281 : char *originname, Size szoriginname);
282 :
283 : extern bool AllTablesyncsReady(void);
284 : extern bool HasSubscriptionTablesCached(void);
285 : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
286 :
287 : extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
288 : extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
289 : extern void ProcessSequencesForSync(void);
290 :
291 : pg_noreturn extern void FinishSyncWorker(void);
292 : extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
293 : extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
294 : Oid relid, TimestampTz *last_start_time);
295 : extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
296 : extern void FetchRelationStates(bool *has_pending_subtables,
297 : bool *has_pending_sequences, bool *started_tx);
298 :
299 : extern void stream_start_internal(TransactionId xid, bool first_segment);
300 : extern void stream_stop_internal(TransactionId xid);
301 :
302 : /* Common streaming function to apply all the spooled messages */
303 : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
304 : XLogRecPtr lsn);
305 :
306 : extern void apply_dispatch(StringInfo s);
307 :
308 : extern void maybe_reread_subscription(void);
309 :
310 : extern void stream_cleanup_files(Oid subid, TransactionId xid);
311 :
312 : extern void set_stream_options(WalRcvStreamOptions *options,
313 : char *slotname,
314 : XLogRecPtr *origin_startpos);
315 :
316 : extern void start_apply(XLogRecPtr origin_startpos);
317 :
318 : extern void InitializeLogRepWorker(void);
319 :
320 : extern void SetupApplyOrSyncWorker(int worker_slot);
321 :
322 : extern void DisableSubscriptionAndExit(void);
323 :
324 : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
325 :
326 : /* Function for apply error callback */
327 : extern void apply_error_callback(void *arg);
328 : extern void set_apply_error_context_origin(char *originname);
329 :
330 : /* Parallel apply worker setup and interactions */
331 : extern void pa_allocate_worker(TransactionId xid);
332 : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
333 : extern void pa_detach_all_error_mq(void);
334 :
335 : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
336 : const void *data);
337 : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
338 : bool stream_locked);
339 :
340 : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
341 : ParallelTransState xact_state);
342 : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
343 :
344 : extern void pa_start_subtrans(TransactionId current_xid,
345 : TransactionId top_xid);
346 : extern void pa_reset_subtrans(void);
347 : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
348 : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
349 : PartialFileSetState fileset_state);
350 :
351 : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
352 : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
353 :
354 : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
355 : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
356 :
357 : extern void pa_decr_and_wait_stream_block(void);
358 :
359 : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
360 : XLogRecPtr remote_lsn);
361 :
362 : #define isParallelApplyWorker(worker) ((worker)->in_use && \
363 : (worker)->type == WORKERTYPE_PARALLEL_APPLY)
364 : #define isTableSyncWorker(worker) ((worker)->in_use && \
365 : (worker)->type == WORKERTYPE_TABLESYNC)
366 : #define isSequenceSyncWorker(worker) ((worker)->in_use && \
367 : (worker)->type == WORKERTYPE_SEQUENCESYNC)
368 :
369 : static inline bool
370 1040 : am_tablesync_worker(void)
371 : {
372 1040 : return isTableSyncWorker(MyLogicalRepWorker);
373 : }
374 :
375 : static inline bool
376 968 : am_sequencesync_worker(void)
377 : {
378 968 : return isSequenceSyncWorker(MyLogicalRepWorker);
379 : }
380 :
381 : static inline bool
382 545760 : am_leader_apply_worker(void)
383 : {
384 : Assert(MyLogicalRepWorker->in_use);
385 545760 : return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
386 : }
387 :
388 : static inline bool
389 663456 : am_parallel_apply_worker(void)
390 : {
391 : Assert(MyLogicalRepWorker->in_use);
392 663456 : return isParallelApplyWorker(MyLogicalRepWorker);
393 : }
394 :
395 : #endif /* WORKER_INTERNAL_H */
|