Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.h
4 : * Exports from replication/walreceiverfuncs.c.
5 : *
6 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
7 : *
8 : * src/include/replication/walreceiver.h
9 : *
10 : *-------------------------------------------------------------------------
11 : */
12 : #ifndef _WALRECEIVER_H
13 : #define _WALRECEIVER_H
14 :
15 : #include <netdb.h>
16 :
17 : #include "access/xlog.h"
18 : #include "access/xlogdefs.h"
19 : #include "pgtime.h"
20 : #include "port/atomics.h"
21 : #include "replication/logicalproto.h"
22 : #include "replication/walsender.h"
23 : #include "storage/condition_variable.h"
24 : #include "storage/spin.h"
25 : #include "utils/tuplestore.h"
26 :
27 : /* user-settable parameters */
28 : extern PGDLLIMPORT int wal_receiver_status_interval;
29 : extern PGDLLIMPORT int wal_receiver_timeout;
30 : extern PGDLLIMPORT bool hot_standby_feedback;
31 :
32 : /*
33 : * MAXCONNINFO: maximum size of a connection string.
34 : *
35 : * XXX: Should this move to pg_config_manual.h?
36 : */
37 : #define MAXCONNINFO 1024
38 :
39 : /* Can we allow the standby to accept replication connection from another standby? */
40 : #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
41 :
42 : /*
43 : * Values for WalRcv->walRcvState.
44 : */
45 : typedef enum
46 : {
47 : WALRCV_STOPPED, /* stopped and mustn't start up again */
48 : WALRCV_STARTING, /* launched, but the process hasn't
49 : * initialized yet */
50 : WALRCV_CONNECTING, /* connecting to upstream server */
51 : WALRCV_STREAMING, /* walreceiver is streaming */
52 : WALRCV_WAITING, /* stopped streaming, waiting for orders */
53 : WALRCV_RESTARTING, /* asked to restart streaming */
54 : WALRCV_STOPPING, /* requested to stop, but still running */
55 : } WalRcvState;
56 :
57 : /* Shared memory area for management of walreceiver process */
58 : typedef struct
59 : {
60 : /*
61 : * Currently active walreceiver process's proc number and PID.
62 : *
63 : * The startup process uses the proc number to wake it up after telling it
64 : * where to start streaming (after setting receiveStart and
65 : * receiveStartTLI), and also to tell it to send apply feedback to the
66 : * primary whenever specially marked commit records are applied.
67 : */
68 : ProcNumber procno;
69 : pid_t pid;
70 :
71 : /* Its current state */
72 : WalRcvState walRcvState;
73 : ConditionVariable walRcvStoppedCV;
74 :
75 : /*
76 : * Its start time (actually, the time at which it was requested to be
77 : * started).
78 : */
79 : pg_time_t startTime;
80 :
81 : /*
82 : * receiveStart and receiveStartTLI indicate the first byte position and
83 : * timeline that will be received. When startup process starts the
84 : * walreceiver, it sets these to the point where it wants the streaming to
85 : * begin.
86 : */
87 : XLogRecPtr receiveStart;
88 : TimeLineID receiveStartTLI;
89 :
90 : /*
91 : * flushedUpto-1 is the last byte position that has already been received,
92 : * and receivedTLI is the timeline it came from. At the first startup of
93 : * walreceiver, these are set to receiveStart and receiveStartTLI. After
94 : * that, walreceiver updates these whenever it flushes the received WAL to
95 : * disk.
96 : */
97 : XLogRecPtr flushedUpto;
98 : TimeLineID receivedTLI;
99 :
100 : /*
101 : * latestChunkStart is the starting byte position of the current "batch"
102 : * of received WAL. It's actually the same as the previous value of
103 : * flushedUpto before the last flush to disk. Startup process can use
104 : * this to detect whether it's keeping up or not.
105 : */
106 : XLogRecPtr latestChunkStart;
107 :
108 : /*
109 : * Time of send and receive of any message received.
110 : */
111 : TimestampTz lastMsgSendTime;
112 : TimestampTz lastMsgReceiptTime;
113 :
114 : /*
115 : * Latest reported end of WAL on the sender
116 : */
117 : XLogRecPtr latestWalEnd;
118 : TimestampTz latestWalEndTime;
119 :
120 : /*
121 : * connection string; initially set to connect to the primary, and later
122 : * clobbered to hide security-sensitive fields.
123 : */
124 : char conninfo[MAXCONNINFO];
125 :
126 : /*
127 : * Host name (this can be a host name, an IP address, or a directory path)
128 : * and port number of the active replication connection.
129 : */
130 : char sender_host[NI_MAXHOST];
131 : int sender_port;
132 :
133 : /*
134 : * replication slot name; is also used for walreceiver to connect with the
135 : * primary
136 : */
137 : char slotname[NAMEDATALEN];
138 :
139 : /*
140 : * If it's a temporary replication slot, it needs to be recreated when
141 : * connecting.
142 : */
143 : bool is_temp_slot;
144 :
145 : /* set true once conninfo is ready to display (obfuscated pwds etc) */
146 : bool ready_to_display;
147 :
148 : slock_t mutex; /* locks shared variables shown above */
149 :
150 : /*
151 : * Like flushedUpto, but advanced after writing and before flushing,
152 : * without the need to acquire the spin lock. Data can be read by another
153 : * process up to this point, but shouldn't be used for data integrity
154 : * purposes.
155 : */
156 : pg_atomic_uint64 writtenUpto;
157 :
158 : /*
159 : * force walreceiver reply? This doesn't need to be locked; memory
160 : * barriers for ordering are sufficient. But we do need atomic fetch and
161 : * store semantics, so use sig_atomic_t.
162 : */
163 : sig_atomic_t force_reply; /* used as a bool */
164 : } WalRcvData;
165 :
166 : extern PGDLLIMPORT WalRcvData *WalRcv;
167 :
168 : typedef struct
169 : {
170 : bool logical; /* True if this is logical replication stream,
171 : * false if physical stream. */
172 : char *slotname; /* Name of the replication slot or NULL. */
173 : XLogRecPtr startpoint; /* LSN of starting point. */
174 :
175 : union
176 : {
177 : struct
178 : {
179 : TimeLineID startpointTLI; /* Starting timeline */
180 : } physical;
181 : struct
182 : {
183 : uint32 proto_version; /* Logical protocol version */
184 : List *publication_names; /* String list of publications */
185 : bool binary; /* Ask publisher to use binary */
186 : char *streaming_str; /* Streaming of large transactions */
187 : bool twophase; /* Streaming of two-phase transactions at
188 : * prepare time */
189 : char *origin; /* Only publish data originating from the
190 : * specified origin */
191 : } logical;
192 : } proto;
193 : } WalRcvStreamOptions;
194 :
195 : struct WalReceiverConn;
196 : typedef struct WalReceiverConn WalReceiverConn;
197 :
198 : /*
199 : * Status of walreceiver query execution.
200 : *
201 : * We only define statuses that are currently used.
202 : */
203 : typedef enum
204 : {
205 : WALRCV_ERROR, /* There was error when executing the query. */
206 : WALRCV_OK_COMMAND, /* Query executed utility or replication
207 : * command. */
208 : WALRCV_OK_TUPLES, /* Query returned tuples. */
209 : WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
210 : WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
211 : WALRCV_OK_COPY_BOTH, /* Query started COPY BOTH replication
212 : * protocol. */
213 : } WalRcvExecStatus;
214 :
215 : /*
216 : * Return value for walrcv_exec, returns the status of the execution and
217 : * tuples if any.
218 : */
219 : typedef struct WalRcvExecResult
220 : {
221 : WalRcvExecStatus status;
222 : int sqlstate;
223 : char *err;
224 : Tuplestorestate *tuplestore;
225 : TupleDesc tupledesc;
226 : } WalRcvExecResult;
227 :
228 : /* WAL receiver - libpqwalreceiver hooks */
229 :
230 : /*
231 : * walrcv_connect_fn
232 : *
233 : * Establish connection to a cluster. 'replication' is true if the
234 : * connection is a replication connection, and false if it is a
235 : * regular connection. If it is a replication connection, it could
236 : * be either logical or physical based on input argument 'logical'.
237 : * 'appname' is a name associated to the connection, to use for example
238 : * with fallback_application_name or application_name. Returns the
239 : * details about the connection established, as defined by
240 : * WalReceiverConn for each WAL receiver module. On error, NULL is
241 : * returned with 'err' including the error generated.
242 : */
243 : typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
244 : bool replication,
245 : bool logical,
246 : bool must_use_password,
247 : const char *appname,
248 : char **err);
249 :
250 : /*
251 : * walrcv_check_conninfo_fn
252 : *
253 : * Parse and validate the connection string given as of 'conninfo'.
254 : */
255 : typedef void (*walrcv_check_conninfo_fn) (const char *conninfo,
256 : bool must_use_password);
257 :
258 : /*
259 : * walrcv_get_conninfo_fn
260 : *
261 : * Returns a user-displayable conninfo string. Note that any
262 : * security-sensitive fields should be obfuscated.
263 : */
264 : typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
265 :
266 : /*
267 : * walrcv_get_senderinfo_fn
268 : *
269 : * Provide information of the WAL sender this WAL receiver is connected
270 : * to, as of 'sender_host' for the host of the sender and 'sender_port'
271 : * for its port.
272 : */
273 : typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
274 : char **sender_host,
275 : int *sender_port);
276 :
277 : /*
278 : * walrcv_identify_system_fn
279 : *
280 : * Run IDENTIFY_SYSTEM on the cluster connected to and validate the
281 : * identity of the cluster. Returns the system ID of the cluster
282 : * connected to. 'primary_tli' is the timeline ID of the sender.
283 : */
284 : typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
285 : TimeLineID *primary_tli);
286 :
287 : /*
288 : * walrcv_get_dbname_from_conninfo_fn
289 : *
290 : * Returns the database name from the primary_conninfo
291 : */
292 : typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo);
293 :
294 : /*
295 : * walrcv_server_version_fn
296 : *
297 : * Returns the version number of the cluster connected to.
298 : */
299 : typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
300 :
301 : /*
302 : * walrcv_readtimelinehistoryfile_fn
303 : *
304 : * Fetch from cluster the timeline history file for timeline 'tli'.
305 : * Returns the name of the timeline history file as of 'filename', its
306 : * contents as of 'content' and its 'size'.
307 : */
308 : typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
309 : TimeLineID tli,
310 : char **filename,
311 : char **content,
312 : int *size);
313 :
314 : /*
315 : * walrcv_startstreaming_fn
316 : *
317 : * Start streaming WAL data from given streaming options. Returns true
318 : * if the connection has switched successfully to copy-both mode and false
319 : * if the server received the command and executed it successfully, but
320 : * didn't switch to copy-mode.
321 : */
322 : typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
323 : const WalRcvStreamOptions *options);
324 :
325 : /*
326 : * walrcv_endstreaming_fn
327 : *
328 : * Stop streaming of WAL data. Returns the next timeline ID of the cluster
329 : * connected to in 'next_tli', or 0 if there was no report.
330 : */
331 : typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
332 : TimeLineID *next_tli);
333 :
334 : /*
335 : * walrcv_receive_fn
336 : *
337 : * Receive a message available from the WAL stream. 'buffer' is a pointer
338 : * to a buffer holding the message received. Returns the length of the data,
339 : * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
340 : * be waited on before a retry), and -1 if the cluster ended the COPY.
341 : */
342 : typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
343 : char **buffer,
344 : pgsocket *wait_fd);
345 :
346 : /*
347 : * walrcv_send_fn
348 : *
349 : * Send a message of size 'nbytes' to the WAL stream with 'buffer' as
350 : * contents.
351 : */
352 : typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
353 : const char *buffer,
354 : int nbytes);
355 :
356 : /*
357 : * walrcv_create_slot_fn
358 : *
359 : * Create a new replication slot named 'slotname'. 'temporary' defines
360 : * if the slot is temporary. 'snapshot_action' defines the behavior wanted
361 : * for an exported snapshot (see replication protocol for more details).
362 : * 'lsn' includes the LSN position at which the created slot became
363 : * consistent. Returns the name of the exported snapshot for a logical
364 : * slot, or NULL for a physical slot.
365 : */
366 : typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
367 : const char *slotname,
368 : bool temporary,
369 : bool two_phase,
370 : bool failover,
371 : CRSSnapshotAction snapshot_action,
372 : XLogRecPtr *lsn);
373 :
374 : /*
375 : * walrcv_alter_slot_fn
376 : *
377 : * Change the definition of a replication slot. Currently, it supports
378 : * changing the failover and two_phase properties of the slot.
379 : */
380 : typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
381 : const char *slotname,
382 : const bool *failover,
383 : const bool *two_phase);
384 :
385 :
386 : /*
387 : * walrcv_get_backend_pid_fn
388 : *
389 : * Returns the PID of the remote backend process.
390 : */
391 : typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
392 :
393 : /*
394 : * walrcv_exec_fn
395 : *
396 : * Send generic queries (and commands) to the remote cluster. 'nRetTypes'
397 : * is the expected number of returned attributes, and 'retTypes' an array
398 : * including their type OIDs. Returns the status of the execution and
399 : * tuples if any.
400 : */
401 : typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
402 : const char *query,
403 : const int nRetTypes,
404 : const Oid *retTypes);
405 :
406 : /*
407 : * walrcv_disconnect_fn
408 : *
409 : * Disconnect with the cluster.
410 : */
411 : typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
412 :
413 : typedef struct WalReceiverFunctionsType
414 : {
415 : walrcv_connect_fn walrcv_connect;
416 : walrcv_check_conninfo_fn walrcv_check_conninfo;
417 : walrcv_get_conninfo_fn walrcv_get_conninfo;
418 : walrcv_get_senderinfo_fn walrcv_get_senderinfo;
419 : walrcv_identify_system_fn walrcv_identify_system;
420 : walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo;
421 : walrcv_server_version_fn walrcv_server_version;
422 : walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
423 : walrcv_startstreaming_fn walrcv_startstreaming;
424 : walrcv_endstreaming_fn walrcv_endstreaming;
425 : walrcv_receive_fn walrcv_receive;
426 : walrcv_send_fn walrcv_send;
427 : walrcv_create_slot_fn walrcv_create_slot;
428 : walrcv_alter_slot_fn walrcv_alter_slot;
429 : walrcv_get_backend_pid_fn walrcv_get_backend_pid;
430 : walrcv_exec_fn walrcv_exec;
431 : walrcv_disconnect_fn walrcv_disconnect;
432 : } WalReceiverFunctionsType;
433 :
434 : extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
435 :
436 : #define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \
437 : WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
438 : #define walrcv_check_conninfo(conninfo, must_use_password) \
439 : WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
440 : #define walrcv_get_conninfo(conn) \
441 : WalReceiverFunctions->walrcv_get_conninfo(conn)
442 : #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
443 : WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
444 : #define walrcv_identify_system(conn, primary_tli) \
445 : WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
446 : #define walrcv_get_dbname_from_conninfo(conninfo) \
447 : WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
448 : #define walrcv_server_version(conn) \
449 : WalReceiverFunctions->walrcv_server_version(conn)
450 : #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
451 : WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
452 : #define walrcv_startstreaming(conn, options) \
453 : WalReceiverFunctions->walrcv_startstreaming(conn, options)
454 : #define walrcv_endstreaming(conn, next_tli) \
455 : WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
456 : #define walrcv_receive(conn, buffer, wait_fd) \
457 : WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
458 : #define walrcv_send(conn, buffer, nbytes) \
459 : WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
460 : #define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
461 : WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
462 : #define walrcv_alter_slot(conn, slotname, failover, two_phase) \
463 : WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, two_phase)
464 : #define walrcv_get_backend_pid(conn) \
465 : WalReceiverFunctions->walrcv_get_backend_pid(conn)
466 : #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
467 : WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
468 : #define walrcv_disconnect(conn) \
469 : WalReceiverFunctions->walrcv_disconnect(conn)
470 :
471 : static inline void
472 4026 : walrcv_clear_result(WalRcvExecResult *walres)
473 : {
474 4026 : if (!walres)
475 0 : return;
476 :
477 4026 : if (walres->err)
478 4 : pfree(walres->err);
479 :
480 4026 : if (walres->tuplestore)
481 2318 : tuplestore_end(walres->tuplestore);
482 :
483 4026 : if (walres->tupledesc)
484 2318 : FreeTupleDesc(walres->tupledesc);
485 :
486 4026 : pfree(walres);
487 : }
488 :
489 : /* prototypes for functions in walreceiver.c */
490 : pg_noreturn extern void WalReceiverMain(const void *startup_data, size_t startup_data_len);
491 : extern void WalRcvForceReply(void);
492 :
493 : /* prototypes for functions in walreceiverfuncs.c */
494 : extern Size WalRcvShmemSize(void);
495 : extern void WalRcvShmemInit(void);
496 : extern void ShutdownWalRcv(void);
497 : extern bool WalRcvStreaming(void);
498 : extern bool WalRcvRunning(void);
499 : extern WalRcvState WalRcvGetState(void);
500 : extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
501 : const char *conninfo, const char *slotname,
502 : bool create_temp_slot);
503 : extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
504 : extern XLogRecPtr GetWalRcvWriteRecPtr(void);
505 : extern int GetReplicationApplyDelay(void);
506 : extern int GetReplicationTransferLatency(void);
507 :
508 : #endif /* _WALRECEIVER_H */
|