Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_createsubscriber.c
4 : * Create a new logical replica from a standby server
5 : *
6 : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres_fe.h"
15 :
16 : #include <sys/stat.h>
17 : #include <sys/time.h>
18 : #include <sys/wait.h>
19 : #include <time.h>
20 :
21 : #include "common/connect.h"
22 : #include "common/controldata_utils.h"
23 : #include "common/logging.h"
24 : #include "common/pg_prng.h"
25 : #include "common/restricted_token.h"
26 : #include "fe_utils/recovery_gen.h"
27 : #include "fe_utils/simple_list.h"
28 : #include "fe_utils/string_utils.h"
29 : #include "getopt_long.h"
30 :
31 : #define DEFAULT_SUB_PORT "50432"
32 :
33 : /* Command-line options */
34 : struct CreateSubscriberOptions
35 : {
36 : char *config_file; /* configuration file */
37 : char *pub_conninfo_str; /* publisher connection string */
38 : char *socket_dir; /* directory for Unix-domain socket, if any */
39 : char *sub_port; /* subscriber port number */
40 : const char *sub_username; /* subscriber username */
41 : SimpleStringList database_names; /* list of database names */
42 : SimpleStringList pub_names; /* list of publication names */
43 : SimpleStringList sub_names; /* list of subscription names */
44 : SimpleStringList replslot_names; /* list of replication slot names */
45 : int recovery_timeout; /* stop recovery after this time */
46 : };
47 :
48 : struct LogicalRepInfo
49 : {
50 : char *dbname; /* database name */
51 : char *pubconninfo; /* publisher connection string */
52 : char *subconninfo; /* subscriber connection string */
53 : char *pubname; /* publication name */
54 : char *subname; /* subscription name */
55 : char *replslotname; /* replication slot name */
56 :
57 : bool made_replslot; /* replication slot was created */
58 : bool made_publication; /* publication was created */
59 : };
60 :
61 : static void cleanup_objects_atexit(void);
62 : static void usage();
63 : static char *get_base_conninfo(const char *conninfo, char **dbname);
64 : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
65 : static char *get_exec_path(const char *argv0, const char *progname);
66 : static void check_data_directory(const char *datadir);
67 : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
68 : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
69 : const char *pub_base_conninfo,
70 : const char *sub_base_conninfo);
71 : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
72 : static void disconnect_database(PGconn *conn, bool exit_on_error);
73 : static uint64 get_primary_sysid(const char *conninfo);
74 : static uint64 get_standby_sysid(const char *datadir);
75 : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
76 : static bool server_is_in_recovery(PGconn *conn);
77 : static char *generate_object_name(PGconn *conn);
78 : static void check_publisher(const struct LogicalRepInfo *dbinfo);
79 : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
80 : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
81 : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
82 : const char *consistent_lsn);
83 : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
84 : const char *lsn);
85 : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
86 : const char *slotname);
87 : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
88 : static char *create_logical_replication_slot(PGconn *conn,
89 : struct LogicalRepInfo *dbinfo);
90 : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
91 : const char *slot_name);
92 : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
93 : static void start_standby_server(const struct CreateSubscriberOptions *opt,
94 : bool restricted_access,
95 : bool restrict_logical_worker);
96 : static void stop_standby_server(const char *datadir);
97 : static void wait_for_end_recovery(const char *conninfo,
98 : const struct CreateSubscriberOptions *opt);
99 : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
100 : static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
101 : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
102 : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
103 : const char *lsn);
104 : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
105 : static void check_and_drop_existing_subscriptions(PGconn *conn,
106 : const struct LogicalRepInfo *dbinfo);
107 : static void drop_existing_subscriptions(PGconn *conn, const char *subname,
108 : const char *dbname);
109 :
110 : #define USEC_PER_SEC 1000000
111 : #define WAIT_INTERVAL 1 /* 1 second */
112 :
113 : static const char *progname;
114 :
115 : static char *primary_slot_name = NULL;
116 : static bool dry_run = false;
117 :
118 : static bool success = false;
119 :
120 : static struct LogicalRepInfo *dbinfo;
121 : static int num_dbs = 0; /* number of specified databases */
122 : static int num_pubs = 0; /* number of specified publications */
123 : static int num_subs = 0; /* number of specified subscriptions */
124 : static int num_replslots = 0; /* number of specified replication slots */
125 :
126 : static pg_prng_state prng_state;
127 :
128 : static char *pg_ctl_path = NULL;
129 : static char *pg_resetwal_path = NULL;
130 :
131 : /* standby / subscriber data directory */
132 : static char *subscriber_dir = NULL;
133 :
134 : static bool recovery_ended = false;
135 : static bool standby_running = false;
136 :
137 : enum WaitPMResult
138 : {
139 : POSTMASTER_READY,
140 : POSTMASTER_STILL_STARTING
141 : };
142 :
143 :
144 : /*
145 : * Cleanup objects that were created by pg_createsubscriber if there is an
146 : * error.
147 : *
148 : * Publications and replication slots are created on primary. Depending on the
149 : * step it failed, it should remove the already created objects if it is
150 : * possible (sometimes it won't work due to a connection issue).
151 : * There is no cleanup on the target server. The steps on the target server are
152 : * executed *after* promotion, hence, at this point, a failure means recreate
153 : * the physical replica and start again.
154 : */
155 : static void
156 18 : cleanup_objects_atexit(void)
157 : {
158 18 : if (success)
159 6 : return;
160 :
161 : /*
162 : * If the server is promoted, there is no way to use the current setup
163 : * again. Warn the user that a new replication setup should be done before
164 : * trying again.
165 : */
166 12 : if (recovery_ended)
167 : {
168 0 : pg_log_warning("failed after the end of recovery");
169 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
170 : "You must recreate the physical replica before continuing.");
171 : }
172 :
173 36 : for (int i = 0; i < num_dbs; i++)
174 : {
175 24 : if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
176 : {
177 : PGconn *conn;
178 :
179 0 : conn = connect_database(dbinfo[i].pubconninfo, false);
180 0 : if (conn != NULL)
181 : {
182 0 : if (dbinfo[i].made_publication)
183 0 : drop_publication(conn, &dbinfo[i]);
184 0 : if (dbinfo[i].made_replslot)
185 0 : drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
186 0 : disconnect_database(conn, false);
187 : }
188 : else
189 : {
190 : /*
191 : * If a connection could not be established, inform the user
192 : * that some objects were left on primary and should be
193 : * removed before trying again.
194 : */
195 0 : if (dbinfo[i].made_publication)
196 : {
197 0 : pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
198 : dbinfo[i].pubname, dbinfo[i].dbname);
199 0 : pg_log_warning_hint("Drop this publication before trying again.");
200 : }
201 0 : if (dbinfo[i].made_replslot)
202 : {
203 0 : pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
204 : dbinfo[i].replslotname, dbinfo[i].dbname);
205 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
206 : }
207 : }
208 : }
209 : }
210 :
211 12 : if (standby_running)
212 8 : stop_standby_server(subscriber_dir);
213 : }
214 :
215 : static void
216 2 : usage(void)
217 : {
218 2 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
219 : progname);
220 2 : printf(_("Usage:\n"));
221 2 : printf(_(" %s [OPTION]...\n"), progname);
222 2 : printf(_("\nOptions:\n"));
223 2 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
224 2 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
225 2 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
226 2 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
227 2 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
228 2 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
229 2 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
230 2 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
231 2 : printf(_(" -v, --verbose output verbose messages\n"));
232 2 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
233 : " file when running target cluster\n"));
234 2 : printf(_(" --publication=NAME publication name\n"));
235 2 : printf(_(" --replication-slot=NAME replication slot name\n"));
236 2 : printf(_(" --subscription=NAME subscription name\n"));
237 2 : printf(_(" -V, --version output version information, then exit\n"));
238 2 : printf(_(" -?, --help show this help, then exit\n"));
239 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
240 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
241 2 : }
242 :
243 : /*
244 : * Subroutine to append "keyword=value" to a connection string,
245 : * with proper quoting of the value. (We assume keywords don't need that.)
246 : */
247 : static void
248 190 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
249 : {
250 190 : if (buf->len > 0)
251 138 : appendPQExpBufferChar(buf, ' ');
252 190 : appendPQExpBufferStr(buf, keyword);
253 190 : appendPQExpBufferChar(buf, '=');
254 190 : appendConnStrVal(buf, val);
255 190 : }
256 :
257 : /*
258 : * Validate a connection string. Returns a base connection string that is a
259 : * connection string without a database name.
260 : *
261 : * Since we might process multiple databases, each database name will be
262 : * appended to this base connection string to provide a final connection
263 : * string. If the second argument (dbname) is not null, returns dbname if the
264 : * provided connection string contains it.
265 : *
266 : * It is the caller's responsibility to free the returned connection string and
267 : * dbname.
268 : */
269 : static char *
270 26 : get_base_conninfo(const char *conninfo, char **dbname)
271 : {
272 : PQExpBuffer buf;
273 : PQconninfoOption *conn_opts;
274 : PQconninfoOption *conn_opt;
275 26 : char *errmsg = NULL;
276 : char *ret;
277 :
278 26 : conn_opts = PQconninfoParse(conninfo, &errmsg);
279 26 : if (conn_opts == NULL)
280 : {
281 0 : pg_log_error("could not parse connection string: %s", errmsg);
282 0 : PQfreemem(errmsg);
283 0 : return NULL;
284 : }
285 :
286 26 : buf = createPQExpBuffer();
287 1248 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
288 : {
289 1222 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
290 : {
291 62 : if (strcmp(conn_opt->keyword, "dbname") == 0)
292 : {
293 18 : if (dbname)
294 18 : *dbname = pg_strdup(conn_opt->val);
295 18 : continue;
296 : }
297 44 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
298 : }
299 : }
300 :
301 26 : ret = pg_strdup(buf->data);
302 :
303 26 : destroyPQExpBuffer(buf);
304 26 : PQconninfoFree(conn_opts);
305 :
306 26 : return ret;
307 : }
308 :
309 : /*
310 : * Build a subscriber connection string. Only a few parameters are supported
311 : * since it starts a server with restricted access.
312 : */
313 : static char *
314 26 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
315 : {
316 26 : PQExpBuffer buf = createPQExpBuffer();
317 : char *ret;
318 :
319 26 : appendConnStrItem(buf, "port", opt->sub_port);
320 : #if !defined(WIN32)
321 26 : appendConnStrItem(buf, "host", opt->socket_dir);
322 : #endif
323 26 : if (opt->sub_username != NULL)
324 0 : appendConnStrItem(buf, "user", opt->sub_username);
325 26 : appendConnStrItem(buf, "fallback_application_name", progname);
326 :
327 26 : ret = pg_strdup(buf->data);
328 :
329 26 : destroyPQExpBuffer(buf);
330 :
331 26 : return ret;
332 : }
333 :
334 : /*
335 : * Verify if a PostgreSQL binary (progname) is available in the same directory as
336 : * pg_createsubscriber and it has the same version. It returns the absolute
337 : * path of the progname.
338 : */
339 : static char *
340 36 : get_exec_path(const char *argv0, const char *progname)
341 : {
342 : char *versionstr;
343 : char *exec_path;
344 : int ret;
345 :
346 36 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
347 36 : exec_path = pg_malloc(MAXPGPATH);
348 36 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
349 :
350 36 : if (ret < 0)
351 : {
352 : char full_path[MAXPGPATH];
353 :
354 0 : if (find_my_exec(argv0, full_path) < 0)
355 0 : strlcpy(full_path, progname, sizeof(full_path));
356 :
357 0 : if (ret == -1)
358 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
359 : progname, "pg_createsubscriber", full_path);
360 : else
361 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
362 : progname, full_path, "pg_createsubscriber");
363 : }
364 :
365 36 : pg_log_debug("%s path is: %s", progname, exec_path);
366 :
367 36 : return exec_path;
368 : }
369 :
370 : /*
371 : * Is it a cluster directory? These are preliminary checks. It is far from
372 : * making an accurate check. If it is not a clone from the publisher, it will
373 : * eventually fail in a future step.
374 : */
375 : static void
376 18 : check_data_directory(const char *datadir)
377 : {
378 : struct stat statbuf;
379 : char versionfile[MAXPGPATH];
380 :
381 18 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
382 : datadir);
383 :
384 18 : if (stat(datadir, &statbuf) != 0)
385 : {
386 0 : if (errno == ENOENT)
387 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
388 : else
389 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
390 : }
391 :
392 18 : snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
393 18 : if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
394 : {
395 0 : pg_fatal("directory \"%s\" is not a database cluster directory",
396 : datadir);
397 : }
398 18 : }
399 :
400 : /*
401 : * Append database name into a base connection string.
402 : *
403 : * dbname is the only parameter that changes so it is not included in the base
404 : * connection string. This function concatenates dbname to build a "real"
405 : * connection string.
406 : */
407 : static char *
408 68 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
409 : {
410 68 : PQExpBuffer buf = createPQExpBuffer();
411 : char *ret;
412 :
413 : Assert(conninfo != NULL);
414 :
415 68 : appendPQExpBufferStr(buf, conninfo);
416 68 : appendConnStrItem(buf, "dbname", dbname);
417 :
418 68 : ret = pg_strdup(buf->data);
419 68 : destroyPQExpBuffer(buf);
420 :
421 68 : return ret;
422 : }
423 :
424 : /*
425 : * Store publication and subscription information.
426 : *
427 : * If publication, replication slot and subscription names were specified,
428 : * store it here. Otherwise, a generated name will be assigned to the object in
429 : * setup_publisher().
430 : */
431 : static struct LogicalRepInfo *
432 18 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
433 : const char *pub_base_conninfo,
434 : const char *sub_base_conninfo)
435 : {
436 : struct LogicalRepInfo *dbinfo;
437 18 : SimpleStringListCell *pubcell = NULL;
438 18 : SimpleStringListCell *subcell = NULL;
439 18 : SimpleStringListCell *replslotcell = NULL;
440 18 : int i = 0;
441 :
442 18 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
443 :
444 18 : if (num_pubs > 0)
445 4 : pubcell = opt->pub_names.head;
446 18 : if (num_subs > 0)
447 2 : subcell = opt->sub_names.head;
448 18 : if (num_replslots > 0)
449 4 : replslotcell = opt->replslot_names.head;
450 :
451 52 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
452 : {
453 : char *conninfo;
454 :
455 : /* Fill publisher attributes */
456 34 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
457 34 : dbinfo[i].pubconninfo = conninfo;
458 34 : dbinfo[i].dbname = cell->val;
459 34 : if (num_pubs > 0)
460 8 : dbinfo[i].pubname = pubcell->val;
461 : else
462 26 : dbinfo[i].pubname = NULL;
463 34 : if (num_replslots > 0)
464 6 : dbinfo[i].replslotname = replslotcell->val;
465 : else
466 28 : dbinfo[i].replslotname = NULL;
467 34 : dbinfo[i].made_replslot = false;
468 34 : dbinfo[i].made_publication = false;
469 : /* Fill subscriber attributes */
470 34 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
471 34 : dbinfo[i].subconninfo = conninfo;
472 34 : if (num_subs > 0)
473 4 : dbinfo[i].subname = subcell->val;
474 : else
475 30 : dbinfo[i].subname = NULL;
476 : /* Other fields will be filled later */
477 :
478 34 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
479 : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
480 : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
481 : dbinfo[i].pubconninfo);
482 34 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
483 : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
484 : dbinfo[i].subconninfo);
485 :
486 34 : if (num_pubs > 0)
487 8 : pubcell = pubcell->next;
488 34 : if (num_subs > 0)
489 4 : subcell = subcell->next;
490 34 : if (num_replslots > 0)
491 6 : replslotcell = replslotcell->next;
492 :
493 34 : i++;
494 : }
495 :
496 18 : return dbinfo;
497 : }
498 :
499 : /*
500 : * Open a new connection. If exit_on_error is true, it has an undesired
501 : * condition and it should exit immediately.
502 : */
503 : static PGconn *
504 86 : connect_database(const char *conninfo, bool exit_on_error)
505 : {
506 : PGconn *conn;
507 : PGresult *res;
508 :
509 86 : conn = PQconnectdb(conninfo);
510 86 : if (PQstatus(conn) != CONNECTION_OK)
511 : {
512 0 : pg_log_error("connection to database failed: %s",
513 : PQerrorMessage(conn));
514 0 : PQfinish(conn);
515 :
516 0 : if (exit_on_error)
517 0 : exit(1);
518 0 : return NULL;
519 : }
520 :
521 : /* Secure search_path */
522 86 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
523 86 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
524 : {
525 0 : pg_log_error("could not clear \"search_path\": %s",
526 : PQresultErrorMessage(res));
527 0 : PQclear(res);
528 0 : PQfinish(conn);
529 :
530 0 : if (exit_on_error)
531 0 : exit(1);
532 0 : return NULL;
533 : }
534 86 : PQclear(res);
535 :
536 86 : return conn;
537 : }
538 :
539 : /*
540 : * Close the connection. If exit_on_error is true, it has an undesired
541 : * condition and it should exit immediately.
542 : */
543 : static void
544 86 : disconnect_database(PGconn *conn, bool exit_on_error)
545 : {
546 : Assert(conn != NULL);
547 :
548 86 : PQfinish(conn);
549 :
550 86 : if (exit_on_error)
551 4 : exit(1);
552 82 : }
553 :
554 : /*
555 : * Obtain the system identifier using the provided connection. It will be used
556 : * to compare if a data directory is a clone of another one.
557 : */
558 : static uint64
559 18 : get_primary_sysid(const char *conninfo)
560 : {
561 : PGconn *conn;
562 : PGresult *res;
563 : uint64 sysid;
564 :
565 18 : pg_log_info("getting system identifier from publisher");
566 :
567 18 : conn = connect_database(conninfo, true);
568 :
569 18 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
570 18 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
571 : {
572 0 : pg_log_error("could not get system identifier: %s",
573 : PQresultErrorMessage(res));
574 0 : disconnect_database(conn, true);
575 : }
576 18 : if (PQntuples(res) != 1)
577 : {
578 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
579 : PQntuples(res), 1);
580 0 : disconnect_database(conn, true);
581 : }
582 :
583 18 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
584 :
585 18 : pg_log_info("system identifier is %llu on publisher",
586 : (unsigned long long) sysid);
587 :
588 18 : PQclear(res);
589 18 : disconnect_database(conn, false);
590 :
591 18 : return sysid;
592 : }
593 :
594 : /*
595 : * Obtain the system identifier from control file. It will be used to compare
596 : * if a data directory is a clone of another one. This routine is used locally
597 : * and avoids a connection.
598 : */
599 : static uint64
600 18 : get_standby_sysid(const char *datadir)
601 : {
602 : ControlFileData *cf;
603 : bool crc_ok;
604 : uint64 sysid;
605 :
606 18 : pg_log_info("getting system identifier from subscriber");
607 :
608 18 : cf = get_controlfile(datadir, &crc_ok);
609 18 : if (!crc_ok)
610 0 : pg_fatal("control file appears to be corrupt");
611 :
612 18 : sysid = cf->system_identifier;
613 :
614 18 : pg_log_info("system identifier is %llu on subscriber",
615 : (unsigned long long) sysid);
616 :
617 18 : pg_free(cf);
618 :
619 18 : return sysid;
620 : }
621 :
622 : /*
623 : * Modify the system identifier. Since a standby server preserves the system
624 : * identifier, it makes sense to change it to avoid situations in which WAL
625 : * files from one of the systems might be used in the other one.
626 : */
627 : static void
628 6 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
629 : {
630 : ControlFileData *cf;
631 : bool crc_ok;
632 : struct timeval tv;
633 :
634 : char *cmd_str;
635 :
636 6 : pg_log_info("modifying system identifier of subscriber");
637 :
638 6 : cf = get_controlfile(subscriber_dir, &crc_ok);
639 6 : if (!crc_ok)
640 0 : pg_fatal("control file appears to be corrupt");
641 :
642 : /*
643 : * Select a new system identifier.
644 : *
645 : * XXX this code was extracted from BootStrapXLOG().
646 : */
647 6 : gettimeofday(&tv, NULL);
648 6 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
649 6 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
650 6 : cf->system_identifier |= getpid() & 0xFFF;
651 :
652 6 : if (!dry_run)
653 2 : update_controlfile(subscriber_dir, cf, true);
654 :
655 6 : pg_log_info("system identifier is %llu on subscriber",
656 : (unsigned long long) cf->system_identifier);
657 :
658 6 : pg_log_info("running pg_resetwal on the subscriber");
659 :
660 6 : cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
661 : subscriber_dir, DEVNULL);
662 :
663 6 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
664 :
665 6 : if (!dry_run)
666 : {
667 2 : int rc = system(cmd_str);
668 :
669 2 : if (rc == 0)
670 2 : pg_log_info("subscriber successfully changed the system identifier");
671 : else
672 0 : pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
673 : }
674 :
675 6 : pg_free(cf);
676 6 : }
677 :
678 : /*
679 : * Generate an object name using a prefix, database oid and a random integer.
680 : * It is used in case the user does not specify an object name (publication,
681 : * subscription, replication slot).
682 : */
683 : static char *
684 10 : generate_object_name(PGconn *conn)
685 : {
686 : PGresult *res;
687 : Oid oid;
688 : uint32 rand;
689 : char *objname;
690 :
691 10 : res = PQexec(conn,
692 : "SELECT oid FROM pg_catalog.pg_database "
693 : "WHERE datname = pg_catalog.current_database()");
694 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
695 : {
696 0 : pg_log_error("could not obtain database OID: %s",
697 : PQresultErrorMessage(res));
698 0 : disconnect_database(conn, true);
699 : }
700 :
701 10 : if (PQntuples(res) != 1)
702 : {
703 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
704 : PQntuples(res), 1);
705 0 : disconnect_database(conn, true);
706 : }
707 :
708 : /* Database OID */
709 10 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
710 :
711 10 : PQclear(res);
712 :
713 : /* Random unsigned integer */
714 10 : rand = pg_prng_uint32(&prng_state);
715 :
716 : /*
717 : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
718 : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
719 : * '\0').
720 : */
721 10 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
722 :
723 10 : return objname;
724 : }
725 :
726 : /*
727 : * Create the publications and replication slots in preparation for logical
728 : * replication. Returns the LSN from latest replication slot. It will be the
729 : * replication start point that is used to adjust the subscriptions (see
730 : * set_replication_progress).
731 : */
732 : static char *
733 6 : setup_publisher(struct LogicalRepInfo *dbinfo)
734 : {
735 6 : char *lsn = NULL;
736 :
737 6 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
738 :
739 16 : for (int i = 0; i < num_dbs; i++)
740 : {
741 : PGconn *conn;
742 10 : char *genname = NULL;
743 :
744 10 : conn = connect_database(dbinfo[i].pubconninfo, true);
745 :
746 : /*
747 : * If an object name was not specified as command-line options, assign
748 : * a generated object name. The replication slot has a different rule.
749 : * The subscription name is assigned to the replication slot name if
750 : * no replication slot is specified. It follows the same rule as
751 : * CREATE SUBSCRIPTION.
752 : */
753 10 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
754 10 : genname = generate_object_name(conn);
755 10 : if (num_pubs == 0)
756 2 : dbinfo[i].pubname = pg_strdup(genname);
757 10 : if (num_subs == 0)
758 6 : dbinfo[i].subname = pg_strdup(genname);
759 10 : if (num_replslots == 0)
760 4 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
761 :
762 : /*
763 : * Create publication on publisher. This step should be executed
764 : * *before* promoting the subscriber to avoid any transactions between
765 : * consistent LSN and the new publication rows (such transactions
766 : * wouldn't see the new publication rows resulting in an error).
767 : */
768 10 : create_publication(conn, &dbinfo[i]);
769 :
770 : /* Create replication slot on publisher */
771 10 : if (lsn)
772 2 : pg_free(lsn);
773 10 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
774 10 : if (lsn != NULL || dry_run)
775 10 : pg_log_info("create replication slot \"%s\" on publisher",
776 : dbinfo[i].replslotname);
777 : else
778 0 : exit(1);
779 :
780 : /*
781 : * Since we are using the LSN returned by the last replication slot as
782 : * recovery_target_lsn, this LSN is ahead of the current WAL position
783 : * and the recovery waits until the publisher writes a WAL record to
784 : * reach the target and ends the recovery. On idle systems, this wait
785 : * time is unpredictable and could lead to failure in promoting the
786 : * subscriber. To avoid that, insert a harmless WAL record.
787 : */
788 10 : if (i == num_dbs - 1 && !dry_run)
789 : {
790 : PGresult *res;
791 :
792 2 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
793 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
794 : {
795 0 : pg_log_error("could not write an additional WAL record: %s",
796 : PQresultErrorMessage(res));
797 0 : disconnect_database(conn, true);
798 : }
799 2 : PQclear(res);
800 : }
801 :
802 10 : disconnect_database(conn, false);
803 : }
804 :
805 6 : return lsn;
806 : }
807 :
808 : /*
809 : * Is recovery still in progress?
810 : */
811 : static bool
812 32 : server_is_in_recovery(PGconn *conn)
813 : {
814 : PGresult *res;
815 : int ret;
816 :
817 32 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
818 :
819 32 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
820 : {
821 0 : pg_log_error("could not obtain recovery progress: %s",
822 : PQresultErrorMessage(res));
823 0 : disconnect_database(conn, true);
824 : }
825 :
826 :
827 32 : ret = strcmp("t", PQgetvalue(res, 0, 0));
828 :
829 32 : PQclear(res);
830 :
831 32 : return ret == 0;
832 : }
833 :
834 : /*
835 : * Is the primary server ready for logical replication?
836 : *
837 : * XXX Does it not allow a synchronous replica?
838 : */
839 : static void
840 10 : check_publisher(const struct LogicalRepInfo *dbinfo)
841 : {
842 : PGconn *conn;
843 : PGresult *res;
844 10 : bool failed = false;
845 :
846 : char *wal_level;
847 : int max_repslots;
848 : int cur_repslots;
849 : int max_walsenders;
850 : int cur_walsenders;
851 : int max_prepared_transactions;
852 : char *max_slot_wal_keep_size;
853 :
854 10 : pg_log_info("checking settings on publisher");
855 :
856 10 : conn = connect_database(dbinfo[0].pubconninfo, true);
857 :
858 : /*
859 : * If the primary server is in recovery (i.e. cascading replication),
860 : * objects (publication) cannot be created because it is read only.
861 : */
862 10 : if (server_is_in_recovery(conn))
863 : {
864 2 : pg_log_error("primary server cannot be in recovery");
865 2 : disconnect_database(conn, true);
866 : }
867 :
868 : /*------------------------------------------------------------------------
869 : * Logical replication requires a few parameters to be set on publisher.
870 : * Since these parameters are not a requirement for physical replication,
871 : * we should check it to make sure it won't fail.
872 : *
873 : * - wal_level = logical
874 : * - max_replication_slots >= current + number of dbs to be converted
875 : * - max_wal_senders >= current + number of dbs to be converted
876 : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
877 : * -----------------------------------------------------------------------
878 : */
879 8 : res = PQexec(conn,
880 : "SELECT pg_catalog.current_setting('wal_level'),"
881 : " pg_catalog.current_setting('max_replication_slots'),"
882 : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
883 : " pg_catalog.current_setting('max_wal_senders'),"
884 : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
885 : " pg_catalog.current_setting('max_prepared_transactions'),"
886 : " pg_catalog.current_setting('max_slot_wal_keep_size')");
887 :
888 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
889 : {
890 0 : pg_log_error("could not obtain publisher settings: %s",
891 : PQresultErrorMessage(res));
892 0 : disconnect_database(conn, true);
893 : }
894 :
895 8 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
896 8 : max_repslots = atoi(PQgetvalue(res, 0, 1));
897 8 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
898 8 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
899 8 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
900 8 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
901 8 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
902 :
903 8 : PQclear(res);
904 :
905 8 : pg_log_debug("publisher: wal_level: %s", wal_level);
906 8 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
907 8 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
908 8 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
909 8 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
910 8 : pg_log_debug("publisher: max_prepared_transactions: %d",
911 : max_prepared_transactions);
912 8 : pg_log_debug("publisher: max_slot_wal_keep_size: %s",
913 : max_slot_wal_keep_size);
914 :
915 8 : disconnect_database(conn, false);
916 :
917 8 : if (strcmp(wal_level, "logical") != 0)
918 : {
919 2 : pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
920 2 : failed = true;
921 : }
922 :
923 8 : if (max_repslots - cur_repslots < num_dbs)
924 : {
925 2 : pg_log_error("publisher requires %d replication slots, but only %d remain",
926 : num_dbs, max_repslots - cur_repslots);
927 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
928 : "max_replication_slots", cur_repslots + num_dbs);
929 2 : failed = true;
930 : }
931 :
932 8 : if (max_walsenders - cur_walsenders < num_dbs)
933 : {
934 2 : pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
935 : num_dbs, max_walsenders - cur_walsenders);
936 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
937 : "max_wal_senders", cur_walsenders + num_dbs);
938 2 : failed = true;
939 : }
940 :
941 8 : if (max_prepared_transactions != 0)
942 : {
943 0 : pg_log_warning("two_phase option will not be enabled for replication slots");
944 0 : pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
945 : "Prepared transactions will be replicated at COMMIT PREPARED.");
946 : }
947 :
948 : /*
949 : * Validate 'max_slot_wal_keep_size'. If this parameter is set to a
950 : * non-default value, it may cause replication failures due to required
951 : * WAL files being prematurely removed.
952 : */
953 8 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
954 : {
955 0 : pg_log_warning("required WAL could be removed from the publisher");
956 0 : pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
957 : "max_slot_wal_keep_size");
958 : }
959 :
960 8 : pg_free(wal_level);
961 :
962 8 : if (failed)
963 2 : exit(1);
964 6 : }
965 :
966 : /*
967 : * Is the standby server ready for logical replication?
968 : *
969 : * XXX Does it not allow a time-delayed replica?
970 : *
971 : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
972 : * is S, it cannot detect there is a replica (server C) because server S starts
973 : * accepting only local connections and server C cannot connect to it. Hence,
974 : * there is not a reliable way to provide a suitable error saying the server C
975 : * will be broken at the end of this process (due to pg_resetwal).
976 : */
977 : static void
978 14 : check_subscriber(const struct LogicalRepInfo *dbinfo)
979 : {
980 : PGconn *conn;
981 : PGresult *res;
982 14 : bool failed = false;
983 :
984 : int max_lrworkers;
985 : int max_repslots;
986 : int max_wprocs;
987 :
988 14 : pg_log_info("checking settings on subscriber");
989 :
990 14 : conn = connect_database(dbinfo[0].subconninfo, true);
991 :
992 : /* The target server must be a standby */
993 14 : if (!server_is_in_recovery(conn))
994 : {
995 2 : pg_log_error("target server must be a standby");
996 2 : disconnect_database(conn, true);
997 : }
998 :
999 : /*------------------------------------------------------------------------
1000 : * Logical replication requires a few parameters to be set on subscriber.
1001 : * Since these parameters are not a requirement for physical replication,
1002 : * we should check it to make sure it won't fail.
1003 : *
1004 : * - max_replication_slots >= number of dbs to be converted
1005 : * - max_logical_replication_workers >= number of dbs to be converted
1006 : * - max_worker_processes >= 1 + number of dbs to be converted
1007 : *------------------------------------------------------------------------
1008 : */
1009 12 : res = PQexec(conn,
1010 : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1011 : "'max_logical_replication_workers', "
1012 : "'max_replication_slots', "
1013 : "'max_worker_processes', "
1014 : "'primary_slot_name') "
1015 : "ORDER BY name");
1016 :
1017 12 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1018 : {
1019 0 : pg_log_error("could not obtain subscriber settings: %s",
1020 : PQresultErrorMessage(res));
1021 0 : disconnect_database(conn, true);
1022 : }
1023 :
1024 12 : max_lrworkers = atoi(PQgetvalue(res, 0, 0));
1025 12 : max_repslots = atoi(PQgetvalue(res, 1, 0));
1026 12 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1027 12 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1028 10 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1029 :
1030 12 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1031 : max_lrworkers);
1032 12 : pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
1033 12 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1034 12 : if (primary_slot_name)
1035 10 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1036 :
1037 12 : PQclear(res);
1038 :
1039 12 : disconnect_database(conn, false);
1040 :
1041 12 : if (max_repslots < num_dbs)
1042 : {
1043 2 : pg_log_error("subscriber requires %d replication slots, but only %d remain",
1044 : num_dbs, max_repslots);
1045 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1046 : "max_replication_slots", num_dbs);
1047 2 : failed = true;
1048 : }
1049 :
1050 12 : if (max_lrworkers < num_dbs)
1051 : {
1052 2 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1053 : num_dbs, max_lrworkers);
1054 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1055 : "max_logical_replication_workers", num_dbs);
1056 2 : failed = true;
1057 : }
1058 :
1059 12 : if (max_wprocs < num_dbs + 1)
1060 : {
1061 2 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1062 : num_dbs + 1, max_wprocs);
1063 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1064 : "max_worker_processes", num_dbs + 1);
1065 2 : failed = true;
1066 : }
1067 :
1068 12 : if (failed)
1069 2 : exit(1);
1070 10 : }
1071 :
1072 : /*
1073 : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1074 : * the primary (publisher node) and the newly created subscriber. We
1075 : * shouldn't drop the associated slot as that would be used by the publisher
1076 : * node.
1077 : */
1078 : static void
1079 6 : drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
1080 : {
1081 6 : PQExpBuffer query = createPQExpBuffer();
1082 : PGresult *res;
1083 :
1084 : Assert(conn != NULL);
1085 :
1086 : /*
1087 : * Construct a query string. These commands are allowed to be executed
1088 : * within a transaction.
1089 : */
1090 6 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1091 : subname);
1092 6 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1093 : subname);
1094 6 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1095 :
1096 6 : pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1097 : subname, dbname);
1098 :
1099 6 : if (!dry_run)
1100 : {
1101 2 : res = PQexec(conn, query->data);
1102 :
1103 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1104 : {
1105 0 : pg_log_error("could not drop subscription \"%s\": %s",
1106 : subname, PQresultErrorMessage(res));
1107 0 : disconnect_database(conn, true);
1108 : }
1109 :
1110 2 : PQclear(res);
1111 : }
1112 :
1113 6 : destroyPQExpBuffer(query);
1114 6 : }
1115 :
1116 : /*
1117 : * Retrieve and drop the pre-existing subscriptions.
1118 : */
1119 : static void
1120 10 : check_and_drop_existing_subscriptions(PGconn *conn,
1121 : const struct LogicalRepInfo *dbinfo)
1122 : {
1123 10 : PQExpBuffer query = createPQExpBuffer();
1124 : char *dbname;
1125 : PGresult *res;
1126 :
1127 : Assert(conn != NULL);
1128 :
1129 10 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1130 :
1131 10 : appendPQExpBuffer(query,
1132 : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1133 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1134 : "WHERE d.datname = %s",
1135 : dbname);
1136 10 : res = PQexec(conn, query->data);
1137 :
1138 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1139 : {
1140 0 : pg_log_error("could not obtain pre-existing subscriptions: %s",
1141 : PQresultErrorMessage(res));
1142 0 : disconnect_database(conn, true);
1143 : }
1144 :
1145 16 : for (int i = 0; i < PQntuples(res); i++)
1146 6 : drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
1147 6 : dbinfo->dbname);
1148 :
1149 10 : PQclear(res);
1150 10 : destroyPQExpBuffer(query);
1151 10 : PQfreemem(dbname);
1152 10 : }
1153 :
1154 : /*
1155 : * Create the subscriptions, adjust the initial location for logical
1156 : * replication and enable the subscriptions. That's the last step for logical
1157 : * replication setup.
1158 : */
1159 : static void
1160 6 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1161 : {
1162 16 : for (int i = 0; i < num_dbs; i++)
1163 : {
1164 : PGconn *conn;
1165 :
1166 : /* Connect to subscriber. */
1167 10 : conn = connect_database(dbinfo[i].subconninfo, true);
1168 :
1169 : /*
1170 : * We don't need the pre-existing subscriptions on the newly formed
1171 : * subscriber. They can connect to other publisher nodes and either
1172 : * get some unwarranted data or can lead to ERRORs in connecting to
1173 : * such nodes.
1174 : */
1175 10 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1176 :
1177 : /*
1178 : * Since the publication was created before the consistent LSN, it is
1179 : * available on the subscriber when the physical replica is promoted.
1180 : * Remove publications from the subscriber because it has no use.
1181 : */
1182 10 : drop_publication(conn, &dbinfo[i]);
1183 :
1184 10 : create_subscription(conn, &dbinfo[i]);
1185 :
1186 : /* Set the replication progress to the correct LSN */
1187 10 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1188 :
1189 : /* Enable subscription */
1190 10 : enable_subscription(conn, &dbinfo[i]);
1191 :
1192 10 : disconnect_database(conn, false);
1193 : }
1194 6 : }
1195 :
1196 : /*
1197 : * Write the required recovery parameters.
1198 : */
1199 : static void
1200 6 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1201 : {
1202 : PGconn *conn;
1203 : PQExpBuffer recoveryconfcontents;
1204 :
1205 : /*
1206 : * Despite of the recovery parameters will be written to the subscriber,
1207 : * use a publisher connection. The primary_conninfo is generated using the
1208 : * connection settings.
1209 : */
1210 6 : conn = connect_database(dbinfo[0].pubconninfo, true);
1211 :
1212 : /*
1213 : * Write recovery parameters.
1214 : *
1215 : * The subscriber is not running yet. In dry run mode, the recovery
1216 : * parameters *won't* be written. An invalid LSN is used for printing
1217 : * purposes. Additional recovery parameters are added here. It avoids
1218 : * unexpected behavior such as end of recovery as soon as a consistent
1219 : * state is reached (recovery_target) and failure due to multiple recovery
1220 : * targets (name, time, xid, LSN).
1221 : */
1222 6 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1223 6 : appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1224 6 : appendPQExpBuffer(recoveryconfcontents,
1225 : "recovery_target_timeline = 'latest'\n");
1226 6 : appendPQExpBuffer(recoveryconfcontents,
1227 : "recovery_target_inclusive = true\n");
1228 6 : appendPQExpBuffer(recoveryconfcontents,
1229 : "recovery_target_action = promote\n");
1230 6 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1231 6 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1232 6 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1233 :
1234 6 : if (dry_run)
1235 : {
1236 4 : appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1237 4 : appendPQExpBuffer(recoveryconfcontents,
1238 : "recovery_target_lsn = '%X/%X'\n",
1239 4 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1240 : }
1241 : else
1242 : {
1243 2 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1244 : lsn);
1245 2 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1246 : }
1247 6 : disconnect_database(conn, false);
1248 :
1249 6 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1250 6 : }
1251 :
1252 : /*
1253 : * Drop physical replication slot on primary if the standby was using it. After
1254 : * the transformation, it has no use.
1255 : *
1256 : * XXX we might not fail here. Instead, we provide a warning so the user
1257 : * eventually drops this replication slot later.
1258 : */
1259 : static void
1260 6 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1261 : {
1262 : PGconn *conn;
1263 :
1264 : /* Replication slot does not exist, do nothing */
1265 6 : if (!primary_slot_name)
1266 0 : return;
1267 :
1268 6 : conn = connect_database(dbinfo[0].pubconninfo, false);
1269 6 : if (conn != NULL)
1270 : {
1271 6 : drop_replication_slot(conn, &dbinfo[0], slotname);
1272 6 : disconnect_database(conn, false);
1273 : }
1274 : else
1275 : {
1276 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1277 : slotname);
1278 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1279 : }
1280 : }
1281 :
1282 : /*
1283 : * Drop failover replication slots on subscriber. After the transformation,
1284 : * they have no use.
1285 : *
1286 : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1287 : * them later.
1288 : */
1289 : static void
1290 6 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1291 : {
1292 : PGconn *conn;
1293 : PGresult *res;
1294 :
1295 6 : conn = connect_database(dbinfo[0].subconninfo, false);
1296 6 : if (conn != NULL)
1297 : {
1298 : /* Get failover replication slot names */
1299 6 : res = PQexec(conn,
1300 : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1301 :
1302 6 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1303 : {
1304 : /* Remove failover replication slots from subscriber */
1305 12 : for (int i = 0; i < PQntuples(res); i++)
1306 6 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1307 : }
1308 : else
1309 : {
1310 0 : pg_log_warning("could not obtain failover replication slot information: %s",
1311 : PQresultErrorMessage(res));
1312 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1313 : }
1314 :
1315 6 : PQclear(res);
1316 6 : disconnect_database(conn, false);
1317 : }
1318 : else
1319 : {
1320 0 : pg_log_warning("could not drop failover replication slot");
1321 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1322 : }
1323 6 : }
1324 :
1325 : /*
1326 : * Create a logical replication slot and returns a LSN.
1327 : *
1328 : * CreateReplicationSlot() is not used because it does not provide the one-row
1329 : * result set that contains the LSN.
1330 : */
1331 : static char *
1332 10 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1333 : {
1334 10 : PQExpBuffer str = createPQExpBuffer();
1335 10 : PGresult *res = NULL;
1336 10 : const char *slot_name = dbinfo->replslotname;
1337 : char *slot_name_esc;
1338 10 : char *lsn = NULL;
1339 :
1340 : Assert(conn != NULL);
1341 :
1342 10 : pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1343 : slot_name, dbinfo->dbname);
1344 :
1345 10 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1346 :
1347 10 : appendPQExpBuffer(str,
1348 : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1349 : slot_name_esc);
1350 :
1351 10 : PQfreemem(slot_name_esc);
1352 :
1353 10 : pg_log_debug("command is: %s", str->data);
1354 :
1355 10 : if (!dry_run)
1356 : {
1357 4 : res = PQexec(conn, str->data);
1358 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1359 : {
1360 0 : pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1361 : slot_name, dbinfo->dbname,
1362 : PQresultErrorMessage(res));
1363 0 : PQclear(res);
1364 0 : destroyPQExpBuffer(str);
1365 0 : return NULL;
1366 : }
1367 :
1368 4 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1369 4 : PQclear(res);
1370 : }
1371 :
1372 : /* For cleanup purposes */
1373 10 : dbinfo->made_replslot = true;
1374 :
1375 10 : destroyPQExpBuffer(str);
1376 :
1377 10 : return lsn;
1378 : }
1379 :
1380 : static void
1381 12 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1382 : const char *slot_name)
1383 : {
1384 12 : PQExpBuffer str = createPQExpBuffer();
1385 : char *slot_name_esc;
1386 : PGresult *res;
1387 :
1388 : Assert(conn != NULL);
1389 :
1390 12 : pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1391 : slot_name, dbinfo->dbname);
1392 :
1393 12 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1394 :
1395 12 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1396 :
1397 12 : PQfreemem(slot_name_esc);
1398 :
1399 12 : pg_log_debug("command is: %s", str->data);
1400 :
1401 12 : if (!dry_run)
1402 : {
1403 4 : res = PQexec(conn, str->data);
1404 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1405 : {
1406 0 : pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1407 : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1408 0 : dbinfo->made_replslot = false; /* don't try again. */
1409 : }
1410 :
1411 4 : PQclear(res);
1412 : }
1413 :
1414 12 : destroyPQExpBuffer(str);
1415 12 : }
1416 :
1417 : /*
1418 : * Reports a suitable message if pg_ctl fails.
1419 : */
1420 : static void
1421 40 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1422 : {
1423 40 : if (rc != 0)
1424 : {
1425 0 : if (WIFEXITED(rc))
1426 : {
1427 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1428 : }
1429 0 : else if (WIFSIGNALED(rc))
1430 : {
1431 : #if defined(WIN32)
1432 : pg_log_error("pg_ctl was terminated by exception 0x%X",
1433 : WTERMSIG(rc));
1434 : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1435 : #else
1436 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1437 : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1438 : #endif
1439 : }
1440 : else
1441 : {
1442 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1443 : }
1444 :
1445 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1446 0 : exit(1);
1447 : }
1448 40 : }
1449 :
1450 : static void
1451 20 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1452 : bool restrict_logical_worker)
1453 : {
1454 20 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1455 : int rc;
1456 :
1457 20 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1458 20 : appendShellString(pg_ctl_cmd, subscriber_dir);
1459 20 : appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1460 :
1461 : /* Prevent unintended slot invalidation */
1462 20 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1463 :
1464 20 : if (restricted_access)
1465 : {
1466 20 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1467 : #if !defined(WIN32)
1468 :
1469 : /*
1470 : * An empty listen_addresses list means the server does not listen on
1471 : * any IP interfaces; only Unix-domain sockets can be used to connect
1472 : * to the server. Prevent external connections to minimize the chance
1473 : * of failure.
1474 : */
1475 20 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1476 20 : if (opt->socket_dir)
1477 20 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1478 : opt->socket_dir);
1479 20 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1480 : #endif
1481 : }
1482 20 : if (opt->config_file != NULL)
1483 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1484 : opt->config_file);
1485 :
1486 : /* Suppress to start logical replication if requested */
1487 20 : if (restrict_logical_worker)
1488 6 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1489 :
1490 20 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1491 20 : rc = system(pg_ctl_cmd->data);
1492 20 : pg_ctl_status(pg_ctl_cmd->data, rc);
1493 20 : standby_running = true;
1494 20 : destroyPQExpBuffer(pg_ctl_cmd);
1495 20 : pg_log_info("server was started");
1496 20 : }
1497 :
1498 : static void
1499 20 : stop_standby_server(const char *datadir)
1500 : {
1501 : char *pg_ctl_cmd;
1502 : int rc;
1503 :
1504 20 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1505 : datadir);
1506 20 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1507 20 : rc = system(pg_ctl_cmd);
1508 20 : pg_ctl_status(pg_ctl_cmd, rc);
1509 20 : standby_running = false;
1510 20 : pg_log_info("server was stopped");
1511 20 : }
1512 :
1513 : /*
1514 : * Returns after the server finishes the recovery process.
1515 : *
1516 : * If recovery_timeout option is set, terminate abnormally without finishing
1517 : * the recovery process. By default, it waits forever.
1518 : *
1519 : * XXX Is the recovery process still in progress? When recovery process has a
1520 : * better progress reporting mechanism, it should be added here.
1521 : */
1522 : static void
1523 6 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1524 : {
1525 : PGconn *conn;
1526 6 : int status = POSTMASTER_STILL_STARTING;
1527 6 : int timer = 0;
1528 :
1529 6 : pg_log_info("waiting for the target server to reach the consistent state");
1530 :
1531 6 : conn = connect_database(conninfo, true);
1532 :
1533 : for (;;)
1534 2 : {
1535 8 : bool in_recovery = server_is_in_recovery(conn);
1536 :
1537 : /*
1538 : * Does the recovery process finish? In dry run mode, there is no
1539 : * recovery mode. Bail out as the recovery process has ended.
1540 : */
1541 8 : if (!in_recovery || dry_run)
1542 : {
1543 6 : status = POSTMASTER_READY;
1544 6 : recovery_ended = true;
1545 6 : break;
1546 : }
1547 :
1548 : /* Bail out after recovery_timeout seconds if this option is set */
1549 2 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1550 : {
1551 0 : stop_standby_server(subscriber_dir);
1552 0 : pg_log_error("recovery timed out");
1553 0 : disconnect_database(conn, true);
1554 : }
1555 :
1556 : /* Keep waiting */
1557 2 : pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
1558 :
1559 2 : timer += WAIT_INTERVAL;
1560 : }
1561 :
1562 6 : disconnect_database(conn, false);
1563 :
1564 6 : if (status == POSTMASTER_STILL_STARTING)
1565 0 : pg_fatal("server did not end recovery");
1566 :
1567 6 : pg_log_info("target server reached the consistent state");
1568 6 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1569 6 : }
1570 :
1571 : /*
1572 : * Create a publication that includes all tables in the database.
1573 : */
1574 : static void
1575 10 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1576 : {
1577 10 : PQExpBuffer str = createPQExpBuffer();
1578 : PGresult *res;
1579 : char *ipubname_esc;
1580 : char *spubname_esc;
1581 :
1582 : Assert(conn != NULL);
1583 :
1584 10 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1585 10 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1586 :
1587 : /* Check if the publication already exists */
1588 10 : appendPQExpBuffer(str,
1589 : "SELECT 1 FROM pg_catalog.pg_publication "
1590 : "WHERE pubname = %s",
1591 : spubname_esc);
1592 10 : res = PQexec(conn, str->data);
1593 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1594 : {
1595 0 : pg_log_error("could not obtain publication information: %s",
1596 : PQresultErrorMessage(res));
1597 0 : disconnect_database(conn, true);
1598 : }
1599 :
1600 10 : if (PQntuples(res) == 1)
1601 : {
1602 : /*
1603 : * Unfortunately, if it reaches this code path, it will always fail
1604 : * (unless you decide to change the existing publication name). That's
1605 : * bad but it is very unlikely that the user will choose a name with
1606 : * pg_createsubscriber_ prefix followed by the exact database oid and
1607 : * a random number.
1608 : */
1609 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1610 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
1611 0 : disconnect_database(conn, true);
1612 : }
1613 :
1614 10 : PQclear(res);
1615 10 : resetPQExpBuffer(str);
1616 :
1617 10 : pg_log_info("creating publication \"%s\" in database \"%s\"",
1618 : dbinfo->pubname, dbinfo->dbname);
1619 :
1620 10 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1621 : ipubname_esc);
1622 :
1623 10 : pg_log_debug("command is: %s", str->data);
1624 :
1625 10 : if (!dry_run)
1626 : {
1627 4 : res = PQexec(conn, str->data);
1628 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1629 : {
1630 0 : pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1631 : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1632 0 : disconnect_database(conn, true);
1633 : }
1634 4 : PQclear(res);
1635 : }
1636 :
1637 : /* For cleanup purposes */
1638 10 : dbinfo->made_publication = true;
1639 :
1640 10 : PQfreemem(ipubname_esc);
1641 10 : PQfreemem(spubname_esc);
1642 10 : destroyPQExpBuffer(str);
1643 10 : }
1644 :
1645 : /*
1646 : * Remove publication if it couldn't finish all steps.
1647 : */
1648 : static void
1649 10 : drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1650 : {
1651 10 : PQExpBuffer str = createPQExpBuffer();
1652 : PGresult *res;
1653 : char *pubname_esc;
1654 :
1655 : Assert(conn != NULL);
1656 :
1657 10 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1658 :
1659 10 : pg_log_info("dropping publication \"%s\" in database \"%s\"",
1660 : dbinfo->pubname, dbinfo->dbname);
1661 :
1662 10 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1663 :
1664 10 : PQfreemem(pubname_esc);
1665 :
1666 10 : pg_log_debug("command is: %s", str->data);
1667 :
1668 10 : if (!dry_run)
1669 : {
1670 4 : res = PQexec(conn, str->data);
1671 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1672 : {
1673 0 : pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1674 : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1675 0 : dbinfo->made_publication = false; /* don't try again. */
1676 :
1677 : /*
1678 : * Don't disconnect and exit here. This routine is used by primary
1679 : * (cleanup publication / replication slot due to an error) and
1680 : * subscriber (remove the replicated publications). In both cases,
1681 : * it can continue and provide instructions for the user to remove
1682 : * it later if cleanup fails.
1683 : */
1684 : }
1685 4 : PQclear(res);
1686 : }
1687 :
1688 10 : destroyPQExpBuffer(str);
1689 10 : }
1690 :
1691 : /*
1692 : * Create a subscription with some predefined options.
1693 : *
1694 : * A replication slot was already created in a previous step. Let's use it. It
1695 : * is not required to copy data. The subscription will be created but it will
1696 : * not be enabled now. That's because the replication progress must be set and
1697 : * the replication origin name (one of the function arguments) contains the
1698 : * subscription OID in its name. Once the subscription is created,
1699 : * set_replication_progress() can obtain the chosen origin name and set up its
1700 : * initial location.
1701 : */
1702 : static void
1703 10 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1704 : {
1705 10 : PQExpBuffer str = createPQExpBuffer();
1706 : PGresult *res;
1707 : char *pubname_esc;
1708 : char *subname_esc;
1709 : char *pubconninfo_esc;
1710 : char *replslotname_esc;
1711 :
1712 : Assert(conn != NULL);
1713 :
1714 10 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1715 10 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1716 10 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1717 10 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1718 :
1719 10 : pg_log_info("creating subscription \"%s\" in database \"%s\"",
1720 : dbinfo->subname, dbinfo->dbname);
1721 :
1722 10 : appendPQExpBuffer(str,
1723 : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1724 : "WITH (create_slot = false, enabled = false, "
1725 : "slot_name = %s, copy_data = false)",
1726 : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
1727 :
1728 10 : PQfreemem(pubname_esc);
1729 10 : PQfreemem(subname_esc);
1730 10 : PQfreemem(pubconninfo_esc);
1731 10 : PQfreemem(replslotname_esc);
1732 :
1733 10 : pg_log_debug("command is: %s", str->data);
1734 :
1735 10 : if (!dry_run)
1736 : {
1737 4 : res = PQexec(conn, str->data);
1738 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1739 : {
1740 0 : pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1741 : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1742 0 : disconnect_database(conn, true);
1743 : }
1744 4 : PQclear(res);
1745 : }
1746 :
1747 10 : destroyPQExpBuffer(str);
1748 10 : }
1749 :
1750 : /*
1751 : * Sets the replication progress to the consistent LSN.
1752 : *
1753 : * The subscriber caught up to the consistent LSN provided by the last
1754 : * replication slot that was created. The goal is to set up the initial
1755 : * location for the logical replication that is the exact LSN that the
1756 : * subscriber was promoted. Once the subscription is enabled it will start
1757 : * streaming from that location onwards. In dry run mode, the subscription OID
1758 : * and LSN are set to invalid values for printing purposes.
1759 : */
1760 : static void
1761 10 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1762 : {
1763 10 : PQExpBuffer str = createPQExpBuffer();
1764 : PGresult *res;
1765 : Oid suboid;
1766 : char *subname;
1767 : char *dbname;
1768 : char *originname;
1769 : char *lsnstr;
1770 :
1771 : Assert(conn != NULL);
1772 :
1773 10 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1774 10 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1775 :
1776 10 : appendPQExpBuffer(str,
1777 : "SELECT s.oid FROM pg_catalog.pg_subscription s "
1778 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1779 : "WHERE s.subname = %s AND d.datname = %s",
1780 : subname, dbname);
1781 :
1782 10 : res = PQexec(conn, str->data);
1783 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1784 : {
1785 0 : pg_log_error("could not obtain subscription OID: %s",
1786 : PQresultErrorMessage(res));
1787 0 : disconnect_database(conn, true);
1788 : }
1789 :
1790 10 : if (PQntuples(res) != 1 && !dry_run)
1791 : {
1792 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1793 : PQntuples(res), 1);
1794 0 : disconnect_database(conn, true);
1795 : }
1796 :
1797 10 : if (dry_run)
1798 : {
1799 6 : suboid = InvalidOid;
1800 6 : lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1801 : }
1802 : else
1803 : {
1804 4 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1805 4 : lsnstr = psprintf("%s", lsn);
1806 : }
1807 :
1808 10 : PQclear(res);
1809 :
1810 : /*
1811 : * The origin name is defined as pg_%u. %u is the subscription OID. See
1812 : * ApplyWorkerMain().
1813 : */
1814 10 : originname = psprintf("pg_%u", suboid);
1815 :
1816 10 : pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1817 : originname, lsnstr, dbinfo->dbname);
1818 :
1819 10 : resetPQExpBuffer(str);
1820 10 : appendPQExpBuffer(str,
1821 : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1822 : originname, lsnstr);
1823 :
1824 10 : pg_log_debug("command is: %s", str->data);
1825 :
1826 10 : if (!dry_run)
1827 : {
1828 4 : res = PQexec(conn, str->data);
1829 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1830 : {
1831 0 : pg_log_error("could not set replication progress for subscription \"%s\": %s",
1832 : dbinfo->subname, PQresultErrorMessage(res));
1833 0 : disconnect_database(conn, true);
1834 : }
1835 4 : PQclear(res);
1836 : }
1837 :
1838 10 : PQfreemem(subname);
1839 10 : PQfreemem(dbname);
1840 10 : pg_free(originname);
1841 10 : pg_free(lsnstr);
1842 10 : destroyPQExpBuffer(str);
1843 10 : }
1844 :
1845 : /*
1846 : * Enables the subscription.
1847 : *
1848 : * The subscription was created in a previous step but it was disabled. After
1849 : * adjusting the initial logical replication location, enable the subscription.
1850 : */
1851 : static void
1852 10 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1853 : {
1854 10 : PQExpBuffer str = createPQExpBuffer();
1855 : PGresult *res;
1856 : char *subname;
1857 :
1858 : Assert(conn != NULL);
1859 :
1860 10 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1861 :
1862 10 : pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1863 : dbinfo->subname, dbinfo->dbname);
1864 :
1865 10 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1866 :
1867 10 : pg_log_debug("command is: %s", str->data);
1868 :
1869 10 : if (!dry_run)
1870 : {
1871 4 : res = PQexec(conn, str->data);
1872 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1873 : {
1874 0 : pg_log_error("could not enable subscription \"%s\": %s",
1875 : dbinfo->subname, PQresultErrorMessage(res));
1876 0 : disconnect_database(conn, true);
1877 : }
1878 :
1879 4 : PQclear(res);
1880 : }
1881 :
1882 10 : PQfreemem(subname);
1883 10 : destroyPQExpBuffer(str);
1884 10 : }
1885 :
1886 : int
1887 40 : main(int argc, char **argv)
1888 : {
1889 : static struct option long_options[] =
1890 : {
1891 : {"database", required_argument, NULL, 'd'},
1892 : {"pgdata", required_argument, NULL, 'D'},
1893 : {"dry-run", no_argument, NULL, 'n'},
1894 : {"subscriber-port", required_argument, NULL, 'p'},
1895 : {"publisher-server", required_argument, NULL, 'P'},
1896 : {"socketdir", required_argument, NULL, 's'},
1897 : {"recovery-timeout", required_argument, NULL, 't'},
1898 : {"subscriber-username", required_argument, NULL, 'U'},
1899 : {"verbose", no_argument, NULL, 'v'},
1900 : {"version", no_argument, NULL, 'V'},
1901 : {"help", no_argument, NULL, '?'},
1902 : {"config-file", required_argument, NULL, 1},
1903 : {"publication", required_argument, NULL, 2},
1904 : {"replication-slot", required_argument, NULL, 3},
1905 : {"subscription", required_argument, NULL, 4},
1906 : {NULL, 0, NULL, 0}
1907 : };
1908 :
1909 40 : struct CreateSubscriberOptions opt = {0};
1910 :
1911 : int c;
1912 : int option_index;
1913 :
1914 : char *pub_base_conninfo;
1915 : char *sub_base_conninfo;
1916 40 : char *dbname_conninfo = NULL;
1917 :
1918 : uint64 pub_sysid;
1919 : uint64 sub_sysid;
1920 : struct stat statbuf;
1921 :
1922 : char *consistent_lsn;
1923 :
1924 : char pidfile[MAXPGPATH];
1925 :
1926 40 : pg_logging_init(argv[0]);
1927 40 : pg_logging_set_level(PG_LOG_WARNING);
1928 40 : progname = get_progname(argv[0]);
1929 40 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
1930 :
1931 40 : if (argc > 1)
1932 : {
1933 38 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
1934 : {
1935 2 : usage();
1936 2 : exit(0);
1937 : }
1938 36 : else if (strcmp(argv[1], "-V") == 0
1939 36 : || strcmp(argv[1], "--version") == 0)
1940 : {
1941 2 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
1942 2 : exit(0);
1943 : }
1944 : }
1945 :
1946 : /* Default settings */
1947 36 : subscriber_dir = NULL;
1948 36 : opt.config_file = NULL;
1949 36 : opt.pub_conninfo_str = NULL;
1950 36 : opt.socket_dir = NULL;
1951 36 : opt.sub_port = DEFAULT_SUB_PORT;
1952 36 : opt.sub_username = NULL;
1953 36 : opt.database_names = (SimpleStringList)
1954 : {
1955 : 0
1956 : };
1957 36 : opt.recovery_timeout = 0;
1958 :
1959 : /*
1960 : * Don't allow it to be run as root. It uses pg_ctl which does not allow
1961 : * it either.
1962 : */
1963 : #ifndef WIN32
1964 36 : if (geteuid() == 0)
1965 : {
1966 0 : pg_log_error("cannot be executed by \"root\"");
1967 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
1968 : progname);
1969 0 : exit(1);
1970 : }
1971 : #endif
1972 :
1973 36 : get_restricted_token();
1974 :
1975 268 : while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
1976 : long_options, &option_index)) != -1)
1977 : {
1978 238 : switch (c)
1979 : {
1980 48 : case 'd':
1981 48 : if (!simple_string_list_member(&opt.database_names, optarg))
1982 : {
1983 46 : simple_string_list_append(&opt.database_names, optarg);
1984 46 : num_dbs++;
1985 : }
1986 : else
1987 : {
1988 2 : pg_log_error("database \"%s\" specified more than once", optarg);
1989 2 : exit(1);
1990 : }
1991 46 : break;
1992 32 : case 'D':
1993 32 : subscriber_dir = pg_strdup(optarg);
1994 32 : canonicalize_path(subscriber_dir);
1995 32 : break;
1996 14 : case 'n':
1997 14 : dry_run = true;
1998 14 : break;
1999 18 : case 'p':
2000 18 : opt.sub_port = pg_strdup(optarg);
2001 18 : break;
2002 30 : case 'P':
2003 30 : opt.pub_conninfo_str = pg_strdup(optarg);
2004 30 : break;
2005 18 : case 's':
2006 18 : opt.socket_dir = pg_strdup(optarg);
2007 18 : canonicalize_path(opt.socket_dir);
2008 18 : break;
2009 4 : case 't':
2010 4 : opt.recovery_timeout = atoi(optarg);
2011 4 : break;
2012 0 : case 'U':
2013 0 : opt.sub_username = pg_strdup(optarg);
2014 0 : break;
2015 32 : case 'v':
2016 32 : pg_logging_increase_verbosity();
2017 32 : break;
2018 0 : case 1:
2019 0 : opt.config_file = pg_strdup(optarg);
2020 0 : break;
2021 22 : case 2:
2022 22 : if (!simple_string_list_member(&opt.pub_names, optarg))
2023 : {
2024 20 : simple_string_list_append(&opt.pub_names, optarg);
2025 20 : num_pubs++;
2026 : }
2027 : else
2028 : {
2029 2 : pg_log_error("publication \"%s\" specified more than once", optarg);
2030 2 : exit(1);
2031 : }
2032 20 : break;
2033 8 : case 3:
2034 8 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2035 : {
2036 8 : simple_string_list_append(&opt.replslot_names, optarg);
2037 8 : num_replslots++;
2038 : }
2039 : else
2040 : {
2041 0 : pg_log_error("replication slot \"%s\" specified more than once", optarg);
2042 0 : exit(1);
2043 : }
2044 8 : break;
2045 10 : case 4:
2046 10 : if (!simple_string_list_member(&opt.sub_names, optarg))
2047 : {
2048 10 : simple_string_list_append(&opt.sub_names, optarg);
2049 10 : num_subs++;
2050 : }
2051 : else
2052 : {
2053 0 : pg_log_error("subscription \"%s\" specified more than once", optarg);
2054 0 : exit(1);
2055 : }
2056 10 : break;
2057 2 : default:
2058 : /* getopt_long already emitted a complaint */
2059 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2060 2 : exit(1);
2061 : }
2062 : }
2063 :
2064 : /* Any non-option arguments? */
2065 30 : if (optind < argc)
2066 : {
2067 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
2068 : argv[optind]);
2069 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2070 0 : exit(1);
2071 : }
2072 :
2073 : /* Required arguments */
2074 30 : if (subscriber_dir == NULL)
2075 : {
2076 2 : pg_log_error("no subscriber data directory specified");
2077 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2078 2 : exit(1);
2079 : }
2080 :
2081 : /* If socket directory is not provided, use the current directory */
2082 28 : if (opt.socket_dir == NULL)
2083 : {
2084 : char cwd[MAXPGPATH];
2085 :
2086 10 : if (!getcwd(cwd, MAXPGPATH))
2087 0 : pg_fatal("could not determine current directory");
2088 10 : opt.socket_dir = pg_strdup(cwd);
2089 10 : canonicalize_path(opt.socket_dir);
2090 : }
2091 :
2092 : /*
2093 : * Parse connection string. Build a base connection string that might be
2094 : * reused by multiple databases.
2095 : */
2096 28 : if (opt.pub_conninfo_str == NULL)
2097 : {
2098 : /*
2099 : * TODO use primary_conninfo (if available) from subscriber and
2100 : * extract publisher connection string. Assume that there are
2101 : * identical entries for physical and logical replication. If there is
2102 : * not, we would fail anyway.
2103 : */
2104 2 : pg_log_error("no publisher connection string specified");
2105 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2106 2 : exit(1);
2107 : }
2108 26 : pg_log_info("validating publisher connection string");
2109 26 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2110 : &dbname_conninfo);
2111 26 : if (pub_base_conninfo == NULL)
2112 0 : exit(1);
2113 :
2114 26 : pg_log_info("validating subscriber connection string");
2115 26 : sub_base_conninfo = get_sub_conninfo(&opt);
2116 :
2117 26 : if (opt.database_names.head == NULL)
2118 : {
2119 4 : pg_log_info("no database was specified");
2120 :
2121 : /*
2122 : * If --database option is not provided, try to obtain the dbname from
2123 : * the publisher conninfo. If dbname parameter is not available, error
2124 : * out.
2125 : */
2126 4 : if (dbname_conninfo)
2127 : {
2128 2 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2129 2 : num_dbs++;
2130 :
2131 2 : pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2132 : dbname_conninfo);
2133 : }
2134 : else
2135 : {
2136 2 : pg_log_error("no database name specified");
2137 2 : pg_log_error_hint("Try \"%s --help\" for more information.",
2138 : progname);
2139 2 : exit(1);
2140 : }
2141 : }
2142 :
2143 : /* Number of object names must match number of databases */
2144 24 : if (num_pubs > 0 && num_pubs != num_dbs)
2145 : {
2146 2 : pg_log_error("wrong number of publication names specified");
2147 2 : pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2148 : num_pubs, num_dbs);
2149 2 : exit(1);
2150 : }
2151 22 : if (num_subs > 0 && num_subs != num_dbs)
2152 : {
2153 2 : pg_log_error("wrong number of subscription names specified");
2154 2 : pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2155 : num_subs, num_dbs);
2156 2 : exit(1);
2157 : }
2158 20 : if (num_replslots > 0 && num_replslots != num_dbs)
2159 : {
2160 2 : pg_log_error("wrong number of replication slot names specified");
2161 2 : pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2162 : num_replslots, num_dbs);
2163 2 : exit(1);
2164 : }
2165 :
2166 : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2167 18 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2168 18 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2169 :
2170 : /* Rudimentary check for a data directory */
2171 18 : check_data_directory(subscriber_dir);
2172 :
2173 : /*
2174 : * Store database information for publisher and subscriber. It should be
2175 : * called before atexit() because its return is used in the
2176 : * cleanup_objects_atexit().
2177 : */
2178 18 : dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2179 :
2180 : /* Register a function to clean up objects in case of failure */
2181 18 : atexit(cleanup_objects_atexit);
2182 :
2183 : /*
2184 : * Check if the subscriber data directory has the same system identifier
2185 : * than the publisher data directory.
2186 : */
2187 18 : pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
2188 18 : sub_sysid = get_standby_sysid(subscriber_dir);
2189 18 : if (pub_sysid != sub_sysid)
2190 2 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2191 :
2192 : /* Subscriber PID file */
2193 16 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2194 :
2195 : /*
2196 : * The standby server must not be running. If the server is started under
2197 : * service manager and pg_createsubscriber stops it, the service manager
2198 : * might react to this action and start the server again. Therefore,
2199 : * refuse to proceed if the server is running to avoid possible failures.
2200 : */
2201 16 : if (stat(pidfile, &statbuf) == 0)
2202 : {
2203 2 : pg_log_error("standby server is running");
2204 2 : pg_log_error_hint("Stop the standby server and try again.");
2205 2 : exit(1);
2206 : }
2207 :
2208 : /*
2209 : * Start a short-lived standby server with temporary parameters (provided
2210 : * by command-line options). The goal is to avoid connections during the
2211 : * transformation steps.
2212 : */
2213 14 : pg_log_info("starting the standby server with command-line options");
2214 14 : start_standby_server(&opt, true, false);
2215 :
2216 : /* Check if the standby server is ready for logical replication */
2217 14 : check_subscriber(dbinfo);
2218 :
2219 : /* Check if the primary server is ready for logical replication */
2220 10 : check_publisher(dbinfo);
2221 :
2222 : /*
2223 : * Stop the target server. The recovery process requires that the server
2224 : * reaches a consistent state before targeting the recovery stop point.
2225 : * Make sure a consistent state is reached (stop the target server
2226 : * guarantees it) *before* creating the replication slots in
2227 : * setup_publisher().
2228 : */
2229 6 : pg_log_info("stopping the subscriber");
2230 6 : stop_standby_server(subscriber_dir);
2231 :
2232 : /* Create the required objects for each database on publisher */
2233 6 : consistent_lsn = setup_publisher(dbinfo);
2234 :
2235 : /* Write the required recovery parameters */
2236 6 : setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
2237 :
2238 : /*
2239 : * Start subscriber so the recovery parameters will take effect. Wait
2240 : * until accepting connections. We don't want to start logical replication
2241 : * during setup.
2242 : */
2243 6 : pg_log_info("starting the subscriber");
2244 6 : start_standby_server(&opt, true, true);
2245 :
2246 : /* Waiting the subscriber to be promoted */
2247 6 : wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
2248 :
2249 : /*
2250 : * Create the subscription for each database on subscriber. It does not
2251 : * enable it immediately because it needs to adjust the replication start
2252 : * point to the LSN reported by setup_publisher(). It also cleans up
2253 : * publications created by this tool and replication to the standby.
2254 : */
2255 6 : setup_subscriber(dbinfo, consistent_lsn);
2256 :
2257 : /* Remove primary_slot_name if it exists on primary */
2258 6 : drop_primary_replication_slot(dbinfo, primary_slot_name);
2259 :
2260 : /* Remove failover replication slots if they exist on subscriber */
2261 6 : drop_failover_replication_slots(dbinfo);
2262 :
2263 : /* Stop the subscriber */
2264 6 : pg_log_info("stopping the subscriber");
2265 6 : stop_standby_server(subscriber_dir);
2266 :
2267 : /* Change system identifier from subscriber */
2268 6 : modify_subscriber_sysid(&opt);
2269 :
2270 6 : success = true;
2271 :
2272 6 : pg_log_info("Done!");
2273 :
2274 6 : return 0;
2275 : }
|