Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.h
4 : * Exports from replication/walreceiverfuncs.c.
5 : *
6 : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
7 : *
8 : * src/include/replication/walreceiver.h
9 : *
10 : *-------------------------------------------------------------------------
11 : */
12 : #ifndef _WALRECEIVER_H
13 : #define _WALRECEIVER_H
14 :
15 : #include <netdb.h>
16 :
17 : #include "access/xlog.h"
18 : #include "access/xlogdefs.h"
19 : #include "pgtime.h"
20 : #include "port/atomics.h"
21 : #include "replication/logicalproto.h"
22 : #include "replication/walsender.h"
23 : #include "storage/condition_variable.h"
24 : #include "storage/latch.h"
25 : #include "storage/spin.h"
26 : #include "utils/tuplestore.h"
27 :
28 : /* user-settable parameters */
29 : extern PGDLLIMPORT int wal_receiver_status_interval;
30 : extern PGDLLIMPORT int wal_receiver_timeout;
31 : extern PGDLLIMPORT bool hot_standby_feedback;
32 :
33 : /*
34 : * MAXCONNINFO: maximum size of a connection string.
35 : *
36 : * XXX: Should this move to pg_config_manual.h?
37 : */
38 : #define MAXCONNINFO 1024
39 :
40 : /* Can we allow the standby to accept replication connection from another standby? */
41 : #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
42 :
43 : /*
44 : * Values for WalRcv->walRcvState.
45 : */
46 : typedef enum
47 : {
48 : WALRCV_STOPPED, /* stopped and mustn't start up again */
49 : WALRCV_STARTING, /* launched, but the process hasn't
50 : * initialized yet */
51 : WALRCV_STREAMING, /* walreceiver is streaming */
52 : WALRCV_WAITING, /* stopped streaming, waiting for orders */
53 : WALRCV_RESTARTING, /* asked to restart streaming */
54 : WALRCV_STOPPING, /* requested to stop, but still running */
55 : } WalRcvState;
56 :
57 : /* Shared memory area for management of walreceiver process */
58 : typedef struct
59 : {
60 : /*
61 : * PID of currently active walreceiver process, its current state and
62 : * start time (actually, the time at which it was requested to be
63 : * started).
64 : */
65 : pid_t pid;
66 : WalRcvState walRcvState;
67 : ConditionVariable walRcvStoppedCV;
68 : pg_time_t startTime;
69 :
70 : /*
71 : * receiveStart and receiveStartTLI indicate the first byte position and
72 : * timeline that will be received. When startup process starts the
73 : * walreceiver, it sets these to the point where it wants the streaming to
74 : * begin.
75 : */
76 : XLogRecPtr receiveStart;
77 : TimeLineID receiveStartTLI;
78 :
79 : /*
80 : * flushedUpto-1 is the last byte position that has already been received,
81 : * and receivedTLI is the timeline it came from. At the first startup of
82 : * walreceiver, these are set to receiveStart and receiveStartTLI. After
83 : * that, walreceiver updates these whenever it flushes the received WAL to
84 : * disk.
85 : */
86 : XLogRecPtr flushedUpto;
87 : TimeLineID receivedTLI;
88 :
89 : /*
90 : * latestChunkStart is the starting byte position of the current "batch"
91 : * of received WAL. It's actually the same as the previous value of
92 : * flushedUpto before the last flush to disk. Startup process can use
93 : * this to detect whether it's keeping up or not.
94 : */
95 : XLogRecPtr latestChunkStart;
96 :
97 : /*
98 : * Time of send and receive of any message received.
99 : */
100 : TimestampTz lastMsgSendTime;
101 : TimestampTz lastMsgReceiptTime;
102 :
103 : /*
104 : * Latest reported end of WAL on the sender
105 : */
106 : XLogRecPtr latestWalEnd;
107 : TimestampTz latestWalEndTime;
108 :
109 : /*
110 : * connection string; initially set to connect to the primary, and later
111 : * clobbered to hide security-sensitive fields.
112 : */
113 : char conninfo[MAXCONNINFO];
114 :
115 : /*
116 : * Host name (this can be a host name, an IP address, or a directory path)
117 : * and port number of the active replication connection.
118 : */
119 : char sender_host[NI_MAXHOST];
120 : int sender_port;
121 :
122 : /*
123 : * replication slot name; is also used for walreceiver to connect with the
124 : * primary
125 : */
126 : char slotname[NAMEDATALEN];
127 :
128 : /*
129 : * If it's a temporary replication slot, it needs to be recreated when
130 : * connecting.
131 : */
132 : bool is_temp_slot;
133 :
134 : /* set true once conninfo is ready to display (obfuscated pwds etc) */
135 : bool ready_to_display;
136 :
137 : /*
138 : * Latch used by startup process to wake up walreceiver after telling it
139 : * where to start streaming (after setting receiveStart and
140 : * receiveStartTLI), and also to tell it to send apply feedback to the
141 : * primary whenever specially marked commit records are applied. This is
142 : * normally mapped to procLatch when walreceiver is running.
143 : */
144 : Latch *latch;
145 :
146 : slock_t mutex; /* locks shared variables shown above */
147 :
148 : /*
149 : * Like flushedUpto, but advanced after writing and before flushing,
150 : * without the need to acquire the spin lock. Data can be read by another
151 : * process up to this point, but shouldn't be used for data integrity
152 : * purposes.
153 : */
154 : pg_atomic_uint64 writtenUpto;
155 :
156 : /*
157 : * force walreceiver reply? This doesn't need to be locked; memory
158 : * barriers for ordering are sufficient. But we do need atomic fetch and
159 : * store semantics, so use sig_atomic_t.
160 : */
161 : sig_atomic_t force_reply; /* used as a bool */
162 : } WalRcvData;
163 :
164 : extern PGDLLIMPORT WalRcvData *WalRcv;
165 :
166 : typedef struct
167 : {
168 : bool logical; /* True if this is logical replication stream,
169 : * false if physical stream. */
170 : char *slotname; /* Name of the replication slot or NULL. */
171 : XLogRecPtr startpoint; /* LSN of starting point. */
172 :
173 : union
174 : {
175 : struct
176 : {
177 : TimeLineID startpointTLI; /* Starting timeline */
178 : } physical;
179 : struct
180 : {
181 : uint32 proto_version; /* Logical protocol version */
182 : List *publication_names; /* String list of publications */
183 : bool binary; /* Ask publisher to use binary */
184 : char *streaming_str; /* Streaming of large transactions */
185 : bool twophase; /* Streaming of two-phase transactions at
186 : * prepare time */
187 : char *origin; /* Only publish data originating from the
188 : * specified origin */
189 : } logical;
190 : } proto;
191 : } WalRcvStreamOptions;
192 :
193 : struct WalReceiverConn;
194 : typedef struct WalReceiverConn WalReceiverConn;
195 :
196 : /*
197 : * Status of walreceiver query execution.
198 : *
199 : * We only define statuses that are currently used.
200 : */
201 : typedef enum
202 : {
203 : WALRCV_ERROR, /* There was error when executing the query. */
204 : WALRCV_OK_COMMAND, /* Query executed utility or replication
205 : * command. */
206 : WALRCV_OK_TUPLES, /* Query returned tuples. */
207 : WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
208 : WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
209 : WALRCV_OK_COPY_BOTH, /* Query started COPY BOTH replication
210 : * protocol. */
211 : } WalRcvExecStatus;
212 :
213 : /*
214 : * Return value for walrcv_exec, returns the status of the execution and
215 : * tuples if any.
216 : */
217 : typedef struct WalRcvExecResult
218 : {
219 : WalRcvExecStatus status;
220 : int sqlstate;
221 : char *err;
222 : Tuplestorestate *tuplestore;
223 : TupleDesc tupledesc;
224 : } WalRcvExecResult;
225 :
226 : /* WAL receiver - libpqwalreceiver hooks */
227 :
228 : /*
229 : * walrcv_connect_fn
230 : *
231 : * Establish connection to a cluster. 'replication' is true if the
232 : * connection is a replication connection, and false if it is a
233 : * regular connection. If it is a replication connection, it could
234 : * be either logical or physical based on input argument 'logical'.
235 : * 'appname' is a name associated to the connection, to use for example
236 : * with fallback_application_name or application_name. Returns the
237 : * details about the connection established, as defined by
238 : * WalReceiverConn for each WAL receiver module. On error, NULL is
239 : * returned with 'err' including the error generated.
240 : */
241 : typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
242 : bool replication,
243 : bool logical,
244 : bool must_use_password,
245 : const char *appname,
246 : char **err);
247 :
248 : /*
249 : * walrcv_check_conninfo_fn
250 : *
251 : * Parse and validate the connection string given as of 'conninfo'.
252 : */
253 : typedef void (*walrcv_check_conninfo_fn) (const char *conninfo,
254 : bool must_use_password);
255 :
256 : /*
257 : * walrcv_get_conninfo_fn
258 : *
259 : * Returns a user-displayable conninfo string. Note that any
260 : * security-sensitive fields should be obfuscated.
261 : */
262 : typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
263 :
264 : /*
265 : * walrcv_get_senderinfo_fn
266 : *
267 : * Provide information of the WAL sender this WAL receiver is connected
268 : * to, as of 'sender_host' for the host of the sender and 'sender_port'
269 : * for its port.
270 : */
271 : typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
272 : char **sender_host,
273 : int *sender_port);
274 :
275 : /*
276 : * walrcv_identify_system_fn
277 : *
278 : * Run IDENTIFY_SYSTEM on the cluster connected to and validate the
279 : * identity of the cluster. Returns the system ID of the cluster
280 : * connected to. 'primary_tli' is the timeline ID of the sender.
281 : */
282 : typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
283 : TimeLineID *primary_tli);
284 :
285 : /*
286 : * walrcv_get_dbname_from_conninfo_fn
287 : *
288 : * Returns the database name from the primary_conninfo
289 : */
290 : typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo);
291 :
292 : /*
293 : * walrcv_server_version_fn
294 : *
295 : * Returns the version number of the cluster connected to.
296 : */
297 : typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
298 :
299 : /*
300 : * walrcv_readtimelinehistoryfile_fn
301 : *
302 : * Fetch from cluster the timeline history file for timeline 'tli'.
303 : * Returns the name of the timeline history file as of 'filename', its
304 : * contents as of 'content' and its 'size'.
305 : */
306 : typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
307 : TimeLineID tli,
308 : char **filename,
309 : char **content,
310 : int *size);
311 :
312 : /*
313 : * walrcv_startstreaming_fn
314 : *
315 : * Start streaming WAL data from given streaming options. Returns true
316 : * if the connection has switched successfully to copy-both mode and false
317 : * if the server received the command and executed it successfully, but
318 : * didn't switch to copy-mode.
319 : */
320 : typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
321 : const WalRcvStreamOptions *options);
322 :
323 : /*
324 : * walrcv_endstreaming_fn
325 : *
326 : * Stop streaming of WAL data. Returns the next timeline ID of the cluster
327 : * connected to in 'next_tli', or 0 if there was no report.
328 : */
329 : typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
330 : TimeLineID *next_tli);
331 :
332 : /*
333 : * walrcv_receive_fn
334 : *
335 : * Receive a message available from the WAL stream. 'buffer' is a pointer
336 : * to a buffer holding the message received. Returns the length of the data,
337 : * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
338 : * be waited on before a retry), and -1 if the cluster ended the COPY.
339 : */
340 : typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
341 : char **buffer,
342 : pgsocket *wait_fd);
343 :
344 : /*
345 : * walrcv_send_fn
346 : *
347 : * Send a message of size 'nbytes' to the WAL stream with 'buffer' as
348 : * contents.
349 : */
350 : typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
351 : const char *buffer,
352 : int nbytes);
353 :
354 : /*
355 : * walrcv_create_slot_fn
356 : *
357 : * Create a new replication slot named 'slotname'. 'temporary' defines
358 : * if the slot is temporary. 'snapshot_action' defines the behavior wanted
359 : * for an exported snapshot (see replication protocol for more details).
360 : * 'lsn' includes the LSN position at which the created slot became
361 : * consistent. Returns the name of the exported snapshot for a logical
362 : * slot, or NULL for a physical slot.
363 : */
364 : typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
365 : const char *slotname,
366 : bool temporary,
367 : bool two_phase,
368 : bool failover,
369 : CRSSnapshotAction snapshot_action,
370 : XLogRecPtr *lsn);
371 :
372 : /*
373 : * walrcv_alter_slot_fn
374 : *
375 : * Change the definition of a replication slot. Currently, it only supports
376 : * changing the failover property of the slot.
377 : */
378 : typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
379 : const char *slotname,
380 : bool failover);
381 :
382 : /*
383 : * walrcv_get_backend_pid_fn
384 : *
385 : * Returns the PID of the remote backend process.
386 : */
387 : typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
388 :
389 : /*
390 : * walrcv_exec_fn
391 : *
392 : * Send generic queries (and commands) to the remote cluster. 'nRetTypes'
393 : * is the expected number of returned attributes, and 'retTypes' an array
394 : * including their type OIDs. Returns the status of the execution and
395 : * tuples if any.
396 : */
397 : typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
398 : const char *query,
399 : const int nRetTypes,
400 : const Oid *retTypes);
401 :
402 : /*
403 : * walrcv_disconnect_fn
404 : *
405 : * Disconnect with the cluster.
406 : */
407 : typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
408 :
409 : typedef struct WalReceiverFunctionsType
410 : {
411 : walrcv_connect_fn walrcv_connect;
412 : walrcv_check_conninfo_fn walrcv_check_conninfo;
413 : walrcv_get_conninfo_fn walrcv_get_conninfo;
414 : walrcv_get_senderinfo_fn walrcv_get_senderinfo;
415 : walrcv_identify_system_fn walrcv_identify_system;
416 : walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo;
417 : walrcv_server_version_fn walrcv_server_version;
418 : walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
419 : walrcv_startstreaming_fn walrcv_startstreaming;
420 : walrcv_endstreaming_fn walrcv_endstreaming;
421 : walrcv_receive_fn walrcv_receive;
422 : walrcv_send_fn walrcv_send;
423 : walrcv_create_slot_fn walrcv_create_slot;
424 : walrcv_alter_slot_fn walrcv_alter_slot;
425 : walrcv_get_backend_pid_fn walrcv_get_backend_pid;
426 : walrcv_exec_fn walrcv_exec;
427 : walrcv_disconnect_fn walrcv_disconnect;
428 : } WalReceiverFunctionsType;
429 :
430 : extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
431 :
432 : #define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \
433 : WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
434 : #define walrcv_check_conninfo(conninfo, must_use_password) \
435 : WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
436 : #define walrcv_get_conninfo(conn) \
437 : WalReceiverFunctions->walrcv_get_conninfo(conn)
438 : #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
439 : WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
440 : #define walrcv_identify_system(conn, primary_tli) \
441 : WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
442 : #define walrcv_get_dbname_from_conninfo(conninfo) \
443 : WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
444 : #define walrcv_server_version(conn) \
445 : WalReceiverFunctions->walrcv_server_version(conn)
446 : #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
447 : WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
448 : #define walrcv_startstreaming(conn, options) \
449 : WalReceiverFunctions->walrcv_startstreaming(conn, options)
450 : #define walrcv_endstreaming(conn, next_tli) \
451 : WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
452 : #define walrcv_receive(conn, buffer, wait_fd) \
453 : WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
454 : #define walrcv_send(conn, buffer, nbytes) \
455 : WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
456 : #define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
457 : WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
458 : #define walrcv_alter_slot(conn, slotname, failover) \
459 : WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
460 : #define walrcv_get_backend_pid(conn) \
461 : WalReceiverFunctions->walrcv_get_backend_pid(conn)
462 : #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
463 : WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
464 : #define walrcv_disconnect(conn) \
465 : WalReceiverFunctions->walrcv_disconnect(conn)
466 :
467 : static inline void
468 3314 : walrcv_clear_result(WalRcvExecResult *walres)
469 : {
470 3314 : if (!walres)
471 0 : return;
472 :
473 3314 : if (walres->err)
474 0 : pfree(walres->err);
475 :
476 3314 : if (walres->tuplestore)
477 1874 : tuplestore_end(walres->tuplestore);
478 :
479 3314 : if (walres->tupledesc)
480 1874 : FreeTupleDesc(walres->tupledesc);
481 :
482 3314 : pfree(walres);
483 : }
484 :
485 : /* prototypes for functions in walreceiver.c */
486 : extern void WalReceiverMain(char *startup_data, size_t startup_data_len) pg_attribute_noreturn();
487 : extern void ProcessWalRcvInterrupts(void);
488 : extern void WalRcvForceReply(void);
489 :
490 : /* prototypes for functions in walreceiverfuncs.c */
491 : extern Size WalRcvShmemSize(void);
492 : extern void WalRcvShmemInit(void);
493 : extern void ShutdownWalRcv(void);
494 : extern bool WalRcvStreaming(void);
495 : extern bool WalRcvRunning(void);
496 : extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
497 : const char *conninfo, const char *slotname,
498 : bool create_temp_slot);
499 : extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
500 : extern XLogRecPtr GetWalRcvWriteRecPtr(void);
501 : extern int GetReplicationApplyDelay(void);
502 : extern int GetReplicationTransferLatency(void);
503 :
504 : #endif /* _WALRECEIVER_H */
|