Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * worker_internal.h
4 : * Internal headers shared by logical replication workers.
5 : *
6 : * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group
7 : *
8 : * src/include/replication/worker_internal.h
9 : *
10 : *-------------------------------------------------------------------------
11 : */
12 : #ifndef WORKER_INTERNAL_H
13 : #define WORKER_INTERNAL_H
14 :
15 : #include "access/xlogdefs.h"
16 : #include "catalog/pg_subscription.h"
17 : #include "datatype/timestamp.h"
18 : #include "miscadmin.h"
19 : #include "replication/logicalrelation.h"
20 : #include "replication/walreceiver.h"
21 : #include "storage/buffile.h"
22 : #include "storage/fileset.h"
23 : #include "storage/lock.h"
24 : #include "storage/shm_mq.h"
25 : #include "storage/shm_toc.h"
26 : #include "storage/spin.h"
27 :
28 : /* Different types of worker */
29 : typedef enum LogicalRepWorkerType
30 : {
31 : WORKERTYPE_UNKNOWN = 0,
32 : WORKERTYPE_TABLESYNC,
33 : WORKERTYPE_APPLY,
34 : WORKERTYPE_PARALLEL_APPLY,
35 : } LogicalRepWorkerType;
36 :
37 : typedef struct LogicalRepWorker
38 : {
39 : /* What type of worker is this? */
40 : LogicalRepWorkerType type;
41 :
42 : /* Time at which this worker was launched. */
43 : TimestampTz launch_time;
44 :
45 : /* Indicates if this slot is used or free. */
46 : bool in_use;
47 :
48 : /* Increased every time the slot is taken by new worker. */
49 : uint16 generation;
50 :
51 : /* Pointer to proc array. NULL if not running. */
52 : PGPROC *proc;
53 :
54 : /* Database id to connect to. */
55 : Oid dbid;
56 :
57 : /* User to use for connection (will be same as owner of subscription). */
58 : Oid userid;
59 :
60 : /* Subscription id for the worker. */
61 : Oid subid;
62 :
63 : /* Used for initial table synchronization. */
64 : Oid relid;
65 : char relstate;
66 : XLogRecPtr relstate_lsn;
67 : slock_t relmutex;
68 :
69 : /*
70 : * Used to create the changes and subxact files for the streaming
71 : * transactions. Upon the arrival of the first streaming transaction or
72 : * when the first-time leader apply worker times out while sending changes
73 : * to the parallel apply worker, the fileset will be initialized, and it
74 : * will be deleted when the worker exits. Under this, separate buffiles
75 : * would be created for each transaction which will be deleted after the
76 : * transaction is finished.
77 : */
78 : FileSet *stream_fileset;
79 :
80 : /*
81 : * PID of leader apply worker if this slot is used for a parallel apply
82 : * worker, InvalidPid otherwise.
83 : */
84 : pid_t leader_pid;
85 :
86 : /* Indicates whether apply can be performed in parallel. */
87 : bool parallel_apply;
88 :
89 : /*
90 : * Changes made by this transaction and subsequent ones must be preserved.
91 : * This ensures that update_deleted conflicts can be accurately detected
92 : * during the apply phase of logical replication by this worker.
93 : *
94 : * The logical replication launcher manages an internal replication slot
95 : * named "pg_conflict_detection". It asynchronously collects this ID to
96 : * decide when to advance the xmin value of the slot.
97 : */
98 : TransactionId oldest_nonremovable_xid;
99 :
100 : /* Stats. */
101 : XLogRecPtr last_lsn;
102 : TimestampTz last_send_time;
103 : TimestampTz last_recv_time;
104 : XLogRecPtr reply_lsn;
105 : TimestampTz reply_time;
106 : } LogicalRepWorker;
107 :
108 : /*
109 : * State of the transaction in parallel apply worker.
110 : *
111 : * The enum values must have the same order as the transaction state
112 : * transitions.
113 : */
114 : typedef enum ParallelTransState
115 : {
116 : PARALLEL_TRANS_UNKNOWN,
117 : PARALLEL_TRANS_STARTED,
118 : PARALLEL_TRANS_FINISHED,
119 : } ParallelTransState;
120 :
121 : /*
122 : * State of fileset used to communicate changes from leader to parallel
123 : * apply worker.
124 : *
125 : * FS_EMPTY indicates an initial state where the leader doesn't need to use
126 : * the file to communicate with the parallel apply worker.
127 : *
128 : * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
129 : * to the file.
130 : *
131 : * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
132 : * the file.
133 : *
134 : * FS_READY indicates that it is now ok for a parallel apply worker to
135 : * read the file.
136 : */
137 : typedef enum PartialFileSetState
138 : {
139 : FS_EMPTY,
140 : FS_SERIALIZE_IN_PROGRESS,
141 : FS_SERIALIZE_DONE,
142 : FS_READY,
143 : } PartialFileSetState;
144 :
145 : /*
146 : * Struct for sharing information between leader apply worker and parallel
147 : * apply workers.
148 : */
149 : typedef struct ParallelApplyWorkerShared
150 : {
151 : slock_t mutex;
152 :
153 : TransactionId xid;
154 :
155 : /*
156 : * State used to ensure commit ordering.
157 : *
158 : * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
159 : * handling the transaction finish commands while the apply leader will
160 : * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
161 : * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
162 : * STREAM_ABORT).
163 : */
164 : ParallelTransState xact_state;
165 :
166 : /* Information from the corresponding LogicalRepWorker slot. */
167 : uint16 logicalrep_worker_generation;
168 : int logicalrep_worker_slot_no;
169 :
170 : /*
171 : * Indicates whether there are pending streaming blocks in the queue. The
172 : * parallel apply worker will check it before starting to wait.
173 : */
174 : pg_atomic_uint32 pending_stream_count;
175 :
176 : /*
177 : * XactLastCommitEnd from the parallel apply worker. This is required by
178 : * the leader worker so it can update the lsn_mappings.
179 : */
180 : XLogRecPtr last_commit_end;
181 :
182 : /*
183 : * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
184 : * serialize changes to the file, and share the fileset with the parallel
185 : * apply worker when processing the transaction finish command. Then the
186 : * parallel apply worker will apply all the spooled messages.
187 : *
188 : * FileSet is used here instead of SharedFileSet because we need it to
189 : * survive after releasing the shared memory so that the leader apply
190 : * worker can re-use the same fileset for the next streaming transaction.
191 : */
192 : PartialFileSetState fileset_state;
193 : FileSet fileset;
194 : } ParallelApplyWorkerShared;
195 :
196 : /*
197 : * Information which is used to manage the parallel apply worker.
198 : */
199 : typedef struct ParallelApplyWorkerInfo
200 : {
201 : /*
202 : * This queue is used to send changes from the leader apply worker to the
203 : * parallel apply worker.
204 : */
205 : shm_mq_handle *mq_handle;
206 :
207 : /*
208 : * This queue is used to transfer error messages from the parallel apply
209 : * worker to the leader apply worker.
210 : */
211 : shm_mq_handle *error_mq_handle;
212 :
213 : dsm_segment *dsm_seg;
214 :
215 : /*
216 : * Indicates whether the leader apply worker needs to serialize the
217 : * remaining changes to a file due to timeout when attempting to send data
218 : * to the parallel apply worker via shared memory.
219 : */
220 : bool serialize_changes;
221 :
222 : /*
223 : * True if the worker is being used to process a parallel apply
224 : * transaction. False indicates this worker is available for re-use.
225 : */
226 : bool in_use;
227 :
228 : ParallelApplyWorkerShared *shared;
229 : } ParallelApplyWorkerInfo;
230 :
231 : /* Main memory context for apply worker. Permanent during worker lifetime. */
232 : extern PGDLLIMPORT MemoryContext ApplyContext;
233 :
234 : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
235 :
236 : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
237 :
238 : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
239 :
240 : /* libpqreceiver connection */
241 : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
242 :
243 : /* Worker and subscription objects. */
244 : extern PGDLLIMPORT Subscription *MySubscription;
245 : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
246 :
247 : extern PGDLLIMPORT bool in_remote_transaction;
248 :
249 : extern PGDLLIMPORT bool InitializingApplyWorker;
250 :
251 : extern void logicalrep_worker_attach(int slot);
252 : extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
253 : bool only_running);
254 : extern List *logicalrep_workers_find(Oid subid, bool only_running,
255 : bool acquire_lock);
256 : extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
257 : Oid dbid, Oid subid, const char *subname,
258 : Oid userid, Oid relid,
259 : dsm_handle subworker_dsm,
260 : bool retain_dead_tuples);
261 : extern void logicalrep_worker_stop(Oid subid, Oid relid);
262 : extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
263 : extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
264 : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
265 :
266 : extern int logicalrep_sync_worker_count(Oid subid);
267 :
268 : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
269 : char *originname, Size szoriginname);
270 :
271 : extern bool AllTablesyncsReady(void);
272 : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
273 :
274 : extern void process_syncing_tables(XLogRecPtr current_lsn);
275 : extern void invalidate_syncing_table_states(Datum arg, int cacheid,
276 : uint32 hashvalue);
277 :
278 : extern void stream_start_internal(TransactionId xid, bool first_segment);
279 : extern void stream_stop_internal(TransactionId xid);
280 :
281 : /* Common streaming function to apply all the spooled messages */
282 : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
283 : XLogRecPtr lsn);
284 :
285 : extern void apply_dispatch(StringInfo s);
286 :
287 : extern void maybe_reread_subscription(void);
288 :
289 : extern void stream_cleanup_files(Oid subid, TransactionId xid);
290 :
291 : extern void set_stream_options(WalRcvStreamOptions *options,
292 : char *slotname,
293 : XLogRecPtr *origin_startpos);
294 :
295 : extern void start_apply(XLogRecPtr origin_startpos);
296 :
297 : extern void InitializeLogRepWorker(void);
298 :
299 : extern void SetupApplyOrSyncWorker(int worker_slot);
300 :
301 : extern void DisableSubscriptionAndExit(void);
302 :
303 : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
304 :
305 : /* Function for apply error callback */
306 : extern void apply_error_callback(void *arg);
307 : extern void set_apply_error_context_origin(char *originname);
308 :
309 : /* Parallel apply worker setup and interactions */
310 : extern void pa_allocate_worker(TransactionId xid);
311 : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
312 : extern void pa_detach_all_error_mq(void);
313 :
314 : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
315 : const void *data);
316 : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
317 : bool stream_locked);
318 :
319 : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
320 : ParallelTransState xact_state);
321 : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
322 :
323 : extern void pa_start_subtrans(TransactionId current_xid,
324 : TransactionId top_xid);
325 : extern void pa_reset_subtrans(void);
326 : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
327 : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
328 : PartialFileSetState fileset_state);
329 :
330 : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
331 : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
332 :
333 : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
334 : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
335 :
336 : extern void pa_decr_and_wait_stream_block(void);
337 :
338 : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
339 : XLogRecPtr remote_lsn);
340 :
341 : #define isParallelApplyWorker(worker) ((worker)->in_use && \
342 : (worker)->type == WORKERTYPE_PARALLEL_APPLY)
343 : #define isTablesyncWorker(worker) ((worker)->in_use && \
344 : (worker)->type == WORKERTYPE_TABLESYNC)
345 :
346 : static inline bool
347 1078 : am_tablesync_worker(void)
348 : {
349 1078 : return isTablesyncWorker(MyLogicalRepWorker);
350 : }
351 :
352 : static inline bool
353 509410 : am_leader_apply_worker(void)
354 : {
355 : Assert(MyLogicalRepWorker->in_use);
356 509410 : return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
357 : }
358 :
359 : static inline bool
360 660032 : am_parallel_apply_worker(void)
361 : {
362 : Assert(MyLogicalRepWorker->in_use);
363 660032 : return isParallelApplyWorker(MyLogicalRepWorker);
364 : }
365 :
366 : #endif /* WORKER_INTERNAL_H */
|