Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_createsubscriber.c
4 : * Create a new logical replica from a standby server
5 : *
6 : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres_fe.h"
15 :
16 : #include <sys/stat.h>
17 : #include <sys/time.h>
18 : #include <sys/wait.h>
19 : #include <time.h>
20 :
21 : #include "common/connect.h"
22 : #include "common/controldata_utils.h"
23 : #include "common/logging.h"
24 : #include "common/pg_prng.h"
25 : #include "common/restricted_token.h"
26 : #include "datatype/timestamp.h"
27 : #include "fe_utils/recovery_gen.h"
28 : #include "fe_utils/simple_list.h"
29 : #include "fe_utils/string_utils.h"
30 : #include "fe_utils/version.h"
31 : #include "getopt_long.h"
32 :
33 : #define DEFAULT_SUB_PORT "50432"
34 : #define OBJECTTYPE_PUBLICATIONS 0x0001
35 :
36 : /* Command-line options */
37 : struct CreateSubscriberOptions
38 : {
39 : char *config_file; /* configuration file */
40 : char *pub_conninfo_str; /* publisher connection string */
41 : char *socket_dir; /* directory for Unix-domain socket, if any */
42 : char *sub_port; /* subscriber port number */
43 : const char *sub_username; /* subscriber username */
44 : bool two_phase; /* enable-two-phase option */
45 : SimpleStringList database_names; /* list of database names */
46 : SimpleStringList pub_names; /* list of publication names */
47 : SimpleStringList sub_names; /* list of subscription names */
48 : SimpleStringList replslot_names; /* list of replication slot names */
49 : int recovery_timeout; /* stop recovery after this time */
50 : bool all_dbs; /* all option */
51 : SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
52 : };
53 :
54 : /* per-database publication/subscription info */
55 : struct LogicalRepInfo
56 : {
57 : char *dbname; /* database name */
58 : char *pubconninfo; /* publisher connection string */
59 : char *subconninfo; /* subscriber connection string */
60 : char *pubname; /* publication name */
61 : char *subname; /* subscription name */
62 : char *replslotname; /* replication slot name */
63 :
64 : bool made_replslot; /* replication slot was created */
65 : bool made_publication; /* publication was created */
66 : };
67 :
68 : /*
69 : * Information shared across all the databases (or publications and
70 : * subscriptions).
71 : */
72 : struct LogicalRepInfos
73 : {
74 : struct LogicalRepInfo *dbinfo;
75 : bool two_phase; /* enable-two-phase option */
76 : bits32 objecttypes_to_clean; /* flags indicating which object types
77 : * to clean up on subscriber */
78 : };
79 :
80 : static void cleanup_objects_atexit(void);
81 : static void usage(void);
82 : static char *get_base_conninfo(const char *conninfo, char **dbname);
83 : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
84 : static char *get_exec_path(const char *argv0, const char *progname);
85 : static void check_data_directory(const char *datadir);
86 : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
87 : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
88 : const char *pub_base_conninfo,
89 : const char *sub_base_conninfo);
90 : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
91 : static void disconnect_database(PGconn *conn, bool exit_on_error);
92 : static uint64 get_primary_sysid(const char *conninfo);
93 : static uint64 get_standby_sysid(const char *datadir);
94 : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
95 : static bool server_is_in_recovery(PGconn *conn);
96 : static char *generate_object_name(PGconn *conn);
97 : static void check_publisher(const struct LogicalRepInfo *dbinfo);
98 : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
99 : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
100 : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
101 : const char *consistent_lsn);
102 : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
103 : const char *lsn);
104 : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
105 : const char *slotname);
106 : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
107 : static char *create_logical_replication_slot(PGconn *conn,
108 : struct LogicalRepInfo *dbinfo);
109 : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
110 : const char *slot_name);
111 : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
112 : static void start_standby_server(const struct CreateSubscriberOptions *opt,
113 : bool restricted_access,
114 : bool restrict_logical_worker);
115 : static void stop_standby_server(const char *datadir);
116 : static void wait_for_end_recovery(const char *conninfo,
117 : const struct CreateSubscriberOptions *opt);
118 : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
119 : static void drop_publication(PGconn *conn, const char *pubname,
120 : const char *dbname, bool *made_publication);
121 : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
122 : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
123 : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
124 : const char *lsn);
125 : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
126 : static void check_and_drop_existing_subscriptions(PGconn *conn,
127 : const struct LogicalRepInfo *dbinfo);
128 : static void drop_existing_subscription(PGconn *conn, const char *subname,
129 : const char *dbname);
130 : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
131 : bool dbnamespecified);
132 :
133 : #define WAIT_INTERVAL 1 /* 1 second */
134 :
135 : static const char *progname;
136 :
137 : static char *primary_slot_name = NULL;
138 : static bool dry_run = false;
139 :
140 : static bool success = false;
141 :
142 : static struct LogicalRepInfos dbinfos;
143 : static int num_dbs = 0; /* number of specified databases */
144 : static int num_pubs = 0; /* number of specified publications */
145 : static int num_subs = 0; /* number of specified subscriptions */
146 : static int num_replslots = 0; /* number of specified replication slots */
147 :
148 : static pg_prng_state prng_state;
149 :
150 : static char *pg_ctl_path = NULL;
151 : static char *pg_resetwal_path = NULL;
152 :
153 : /* standby / subscriber data directory */
154 : static char *subscriber_dir = NULL;
155 :
156 : static bool recovery_ended = false;
157 : static bool standby_running = false;
158 :
159 :
160 : /*
161 : * Cleanup objects that were created by pg_createsubscriber if there is an
162 : * error.
163 : *
164 : * Publications and replication slots are created on primary. Depending on the
165 : * step it failed, it should remove the already created objects if it is
166 : * possible (sometimes it won't work due to a connection issue).
167 : * There is no cleanup on the target server. The steps on the target server are
168 : * executed *after* promotion, hence, at this point, a failure means recreate
169 : * the physical replica and start again.
170 : */
171 : static void
172 20 : cleanup_objects_atexit(void)
173 : {
174 20 : if (success)
175 8 : return;
176 :
177 : /*
178 : * If the server is promoted, there is no way to use the current setup
179 : * again. Warn the user that a new replication setup should be done before
180 : * trying again.
181 : */
182 12 : if (recovery_ended)
183 : {
184 0 : pg_log_warning("failed after the end of recovery");
185 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
186 : "You must recreate the physical replica before continuing.");
187 : }
188 :
189 36 : for (int i = 0; i < num_dbs; i++)
190 : {
191 24 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
192 :
193 24 : if (dbinfo->made_publication || dbinfo->made_replslot)
194 : {
195 : PGconn *conn;
196 :
197 0 : conn = connect_database(dbinfo->pubconninfo, false);
198 0 : if (conn != NULL)
199 : {
200 0 : if (dbinfo->made_publication)
201 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
202 : &dbinfo->made_publication);
203 0 : if (dbinfo->made_replslot)
204 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
205 0 : disconnect_database(conn, false);
206 : }
207 : else
208 : {
209 : /*
210 : * If a connection could not be established, inform the user
211 : * that some objects were left on primary and should be
212 : * removed before trying again.
213 : */
214 0 : if (dbinfo->made_publication)
215 : {
216 0 : pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
217 : dbinfo->pubname,
218 : dbinfo->dbname);
219 0 : pg_log_warning_hint("Drop this publication before trying again.");
220 : }
221 0 : if (dbinfo->made_replslot)
222 : {
223 0 : pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
224 : dbinfo->replslotname,
225 : dbinfo->dbname);
226 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
227 : }
228 : }
229 : }
230 : }
231 :
232 12 : if (standby_running)
233 8 : stop_standby_server(subscriber_dir);
234 : }
235 :
236 : static void
237 2 : usage(void)
238 : {
239 2 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
240 : progname);
241 2 : printf(_("Usage:\n"));
242 2 : printf(_(" %s [OPTION]...\n"), progname);
243 2 : printf(_("\nOptions:\n"));
244 2 : printf(_(" -a, --all create subscriptions for all databases except template\n"
245 : " databases and databases that don't allow connections\n"));
246 2 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
247 2 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
248 2 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
249 2 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
250 2 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
251 2 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
252 2 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
253 2 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
254 2 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
255 2 : printf(_(" -v, --verbose output verbose messages\n"));
256 2 : printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
257 : " databases on the subscriber; accepts: \"%s\"\n"), "publications");
258 2 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
259 : " file when running target cluster\n"));
260 2 : printf(_(" --publication=NAME publication name\n"));
261 2 : printf(_(" --replication-slot=NAME replication slot name\n"));
262 2 : printf(_(" --subscription=NAME subscription name\n"));
263 2 : printf(_(" -V, --version output version information, then exit\n"));
264 2 : printf(_(" -?, --help show this help, then exit\n"));
265 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
266 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
267 2 : }
268 :
269 : /*
270 : * Subroutine to append "keyword=value" to a connection string,
271 : * with proper quoting of the value. (We assume keywords don't need that.)
272 : */
273 : static void
274 214 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
275 : {
276 214 : if (buf->len > 0)
277 158 : appendPQExpBufferChar(buf, ' ');
278 214 : appendPQExpBufferStr(buf, keyword);
279 214 : appendPQExpBufferChar(buf, '=');
280 214 : appendConnStrVal(buf, val);
281 214 : }
282 :
283 : /*
284 : * Validate a connection string. Returns a base connection string that is a
285 : * connection string without a database name.
286 : *
287 : * Since we might process multiple databases, each database name will be
288 : * appended to this base connection string to provide a final connection
289 : * string. If the second argument (dbname) is not null, returns dbname if the
290 : * provided connection string contains it.
291 : *
292 : * It is the caller's responsibility to free the returned connection string and
293 : * dbname.
294 : */
295 : static char *
296 28 : get_base_conninfo(const char *conninfo, char **dbname)
297 : {
298 : PQExpBuffer buf;
299 : PQconninfoOption *conn_opts;
300 : PQconninfoOption *conn_opt;
301 28 : char *errmsg = NULL;
302 : char *ret;
303 :
304 28 : conn_opts = PQconninfoParse(conninfo, &errmsg);
305 28 : if (conn_opts == NULL)
306 : {
307 0 : pg_log_error("could not parse connection string: %s", errmsg);
308 0 : PQfreemem(errmsg);
309 0 : return NULL;
310 : }
311 :
312 28 : buf = createPQExpBuffer();
313 1456 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
314 : {
315 1428 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
316 : {
317 66 : if (strcmp(conn_opt->keyword, "dbname") == 0)
318 : {
319 18 : if (dbname)
320 18 : *dbname = pg_strdup(conn_opt->val);
321 18 : continue;
322 : }
323 48 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
324 : }
325 : }
326 :
327 28 : ret = pg_strdup(buf->data);
328 :
329 28 : destroyPQExpBuffer(buf);
330 28 : PQconninfoFree(conn_opts);
331 :
332 28 : return ret;
333 : }
334 :
335 : /*
336 : * Build a subscriber connection string. Only a few parameters are supported
337 : * since it starts a server with restricted access.
338 : */
339 : static char *
340 28 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
341 : {
342 28 : PQExpBuffer buf = createPQExpBuffer();
343 : char *ret;
344 :
345 28 : appendConnStrItem(buf, "port", opt->sub_port);
346 : #if !defined(WIN32)
347 28 : appendConnStrItem(buf, "host", opt->socket_dir);
348 : #endif
349 28 : if (opt->sub_username != NULL)
350 0 : appendConnStrItem(buf, "user", opt->sub_username);
351 28 : appendConnStrItem(buf, "fallback_application_name", progname);
352 :
353 28 : ret = pg_strdup(buf->data);
354 :
355 28 : destroyPQExpBuffer(buf);
356 :
357 28 : return ret;
358 : }
359 :
360 : /*
361 : * Verify if a PostgreSQL binary (progname) is available in the same directory as
362 : * pg_createsubscriber and it has the same version. It returns the absolute
363 : * path of the progname.
364 : */
365 : static char *
366 40 : get_exec_path(const char *argv0, const char *progname)
367 : {
368 : char *versionstr;
369 : char *exec_path;
370 : int ret;
371 :
372 40 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
373 40 : exec_path = pg_malloc(MAXPGPATH);
374 40 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
375 :
376 40 : if (ret < 0)
377 : {
378 : char full_path[MAXPGPATH];
379 :
380 0 : if (find_my_exec(argv0, full_path) < 0)
381 0 : strlcpy(full_path, progname, sizeof(full_path));
382 :
383 0 : if (ret == -1)
384 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
385 : progname, "pg_createsubscriber", full_path);
386 : else
387 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
388 : progname, full_path, "pg_createsubscriber");
389 : }
390 :
391 40 : pg_log_debug("%s path is: %s", progname, exec_path);
392 :
393 40 : return exec_path;
394 : }
395 :
396 : /*
397 : * Is it a cluster directory? These are preliminary checks. It is far from
398 : * making an accurate check. If it is not a clone from the publisher, it will
399 : * eventually fail in a future step.
400 : */
401 : static void
402 20 : check_data_directory(const char *datadir)
403 : {
404 : struct stat statbuf;
405 : uint32 major_version;
406 : char *version_str;
407 :
408 20 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
409 : datadir);
410 :
411 20 : if (stat(datadir, &statbuf) != 0)
412 : {
413 0 : if (errno == ENOENT)
414 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
415 : else
416 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
417 : }
418 :
419 : /*
420 : * Retrieve the contents of this cluster's PG_VERSION. We require
421 : * compatibility with the same major version as the one this tool is
422 : * compiled with.
423 : */
424 20 : major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
425 20 : if (major_version != PG_MAJORVERSION_NUM)
426 : {
427 0 : pg_log_error("data directory is of wrong version");
428 0 : pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
429 : "PG_VERSION", version_str, PG_MAJORVERSION);
430 0 : exit(1);
431 : }
432 20 : }
433 :
434 : /*
435 : * Append database name into a base connection string.
436 : *
437 : * dbname is the only parameter that changes so it is not included in the base
438 : * connection string. This function concatenates dbname to build a "real"
439 : * connection string.
440 : */
441 : static char *
442 82 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
443 : {
444 82 : PQExpBuffer buf = createPQExpBuffer();
445 : char *ret;
446 :
447 : Assert(conninfo != NULL);
448 :
449 82 : appendPQExpBufferStr(buf, conninfo);
450 82 : appendConnStrItem(buf, "dbname", dbname);
451 :
452 82 : ret = pg_strdup(buf->data);
453 82 : destroyPQExpBuffer(buf);
454 :
455 82 : return ret;
456 : }
457 :
458 : /*
459 : * Store publication and subscription information.
460 : *
461 : * If publication, replication slot and subscription names were specified,
462 : * store it here. Otherwise, a generated name will be assigned to the object in
463 : * setup_publisher().
464 : */
465 : static struct LogicalRepInfo *
466 20 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
467 : const char *pub_base_conninfo,
468 : const char *sub_base_conninfo)
469 : {
470 : struct LogicalRepInfo *dbinfo;
471 20 : SimpleStringListCell *pubcell = NULL;
472 20 : SimpleStringListCell *subcell = NULL;
473 20 : SimpleStringListCell *replslotcell = NULL;
474 20 : int i = 0;
475 :
476 20 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
477 :
478 20 : if (num_pubs > 0)
479 4 : pubcell = opt->pub_names.head;
480 20 : if (num_subs > 0)
481 2 : subcell = opt->sub_names.head;
482 20 : if (num_replslots > 0)
483 4 : replslotcell = opt->replslot_names.head;
484 :
485 60 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
486 : {
487 : char *conninfo;
488 :
489 : /* Fill publisher attributes */
490 40 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
491 40 : dbinfo[i].pubconninfo = conninfo;
492 40 : dbinfo[i].dbname = cell->val;
493 40 : if (num_pubs > 0)
494 8 : dbinfo[i].pubname = pubcell->val;
495 : else
496 32 : dbinfo[i].pubname = NULL;
497 40 : if (num_replslots > 0)
498 6 : dbinfo[i].replslotname = replslotcell->val;
499 : else
500 34 : dbinfo[i].replslotname = NULL;
501 40 : dbinfo[i].made_replslot = false;
502 40 : dbinfo[i].made_publication = false;
503 : /* Fill subscriber attributes */
504 40 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
505 40 : dbinfo[i].subconninfo = conninfo;
506 40 : if (num_subs > 0)
507 4 : dbinfo[i].subname = subcell->val;
508 : else
509 36 : dbinfo[i].subname = NULL;
510 : /* Other fields will be filled later */
511 :
512 40 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
513 : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
514 : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
515 : dbinfo[i].pubconninfo);
516 40 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
517 : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
518 : dbinfo[i].subconninfo,
519 : dbinfos.two_phase ? "true" : "false");
520 :
521 40 : if (num_pubs > 0)
522 8 : pubcell = pubcell->next;
523 40 : if (num_subs > 0)
524 4 : subcell = subcell->next;
525 40 : if (num_replslots > 0)
526 6 : replslotcell = replslotcell->next;
527 :
528 40 : i++;
529 : }
530 :
531 20 : return dbinfo;
532 : }
533 :
534 : /*
535 : * Open a new connection. If exit_on_error is true, it has an undesired
536 : * condition and it should exit immediately.
537 : */
538 : static PGconn *
539 114 : connect_database(const char *conninfo, bool exit_on_error)
540 : {
541 : PGconn *conn;
542 : PGresult *res;
543 :
544 114 : conn = PQconnectdb(conninfo);
545 114 : if (PQstatus(conn) != CONNECTION_OK)
546 : {
547 0 : pg_log_error("connection to database failed: %s",
548 : PQerrorMessage(conn));
549 0 : PQfinish(conn);
550 :
551 0 : if (exit_on_error)
552 0 : exit(1);
553 0 : return NULL;
554 : }
555 :
556 : /* Secure search_path */
557 114 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
558 114 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
559 : {
560 0 : pg_log_error("could not clear \"search_path\": %s",
561 : PQresultErrorMessage(res));
562 0 : PQclear(res);
563 0 : PQfinish(conn);
564 :
565 0 : if (exit_on_error)
566 0 : exit(1);
567 0 : return NULL;
568 : }
569 114 : PQclear(res);
570 :
571 114 : return conn;
572 : }
573 :
574 : /*
575 : * Close the connection. If exit_on_error is true, it has an undesired
576 : * condition and it should exit immediately.
577 : */
578 : static void
579 114 : disconnect_database(PGconn *conn, bool exit_on_error)
580 : {
581 : Assert(conn != NULL);
582 :
583 114 : PQfinish(conn);
584 :
585 114 : if (exit_on_error)
586 4 : exit(1);
587 110 : }
588 :
589 : /*
590 : * Obtain the system identifier using the provided connection. It will be used
591 : * to compare if a data directory is a clone of another one.
592 : */
593 : static uint64
594 20 : get_primary_sysid(const char *conninfo)
595 : {
596 : PGconn *conn;
597 : PGresult *res;
598 : uint64 sysid;
599 :
600 20 : pg_log_info("getting system identifier from publisher");
601 :
602 20 : conn = connect_database(conninfo, true);
603 :
604 20 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
605 20 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
606 : {
607 0 : pg_log_error("could not get system identifier: %s",
608 : PQresultErrorMessage(res));
609 0 : disconnect_database(conn, true);
610 : }
611 20 : if (PQntuples(res) != 1)
612 : {
613 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
614 : PQntuples(res), 1);
615 0 : disconnect_database(conn, true);
616 : }
617 :
618 20 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
619 :
620 20 : pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
621 :
622 20 : PQclear(res);
623 20 : disconnect_database(conn, false);
624 :
625 20 : return sysid;
626 : }
627 :
628 : /*
629 : * Obtain the system identifier from control file. It will be used to compare
630 : * if a data directory is a clone of another one. This routine is used locally
631 : * and avoids a connection.
632 : */
633 : static uint64
634 20 : get_standby_sysid(const char *datadir)
635 : {
636 : ControlFileData *cf;
637 : bool crc_ok;
638 : uint64 sysid;
639 :
640 20 : pg_log_info("getting system identifier from subscriber");
641 :
642 20 : cf = get_controlfile(datadir, &crc_ok);
643 20 : if (!crc_ok)
644 0 : pg_fatal("control file appears to be corrupt");
645 :
646 20 : sysid = cf->system_identifier;
647 :
648 20 : pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
649 :
650 20 : pg_free(cf);
651 :
652 20 : return sysid;
653 : }
654 :
655 : /*
656 : * Modify the system identifier. Since a standby server preserves the system
657 : * identifier, it makes sense to change it to avoid situations in which WAL
658 : * files from one of the systems might be used in the other one.
659 : */
660 : static void
661 8 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
662 : {
663 : ControlFileData *cf;
664 : bool crc_ok;
665 : struct timeval tv;
666 :
667 : char *cmd_str;
668 :
669 8 : pg_log_info("modifying system identifier of subscriber");
670 :
671 8 : cf = get_controlfile(subscriber_dir, &crc_ok);
672 8 : if (!crc_ok)
673 0 : pg_fatal("control file appears to be corrupt");
674 :
675 : /*
676 : * Select a new system identifier.
677 : *
678 : * XXX this code was extracted from BootStrapXLOG().
679 : */
680 8 : gettimeofday(&tv, NULL);
681 8 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
682 8 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
683 8 : cf->system_identifier |= getpid() & 0xFFF;
684 :
685 8 : if (dry_run)
686 6 : pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
687 : cf->system_identifier);
688 : else
689 : {
690 2 : update_controlfile(subscriber_dir, cf, true);
691 2 : pg_log_info("system identifier is %" PRIu64 " on subscriber",
692 : cf->system_identifier);
693 : }
694 :
695 8 : if (dry_run)
696 6 : pg_log_info("dry-run: would run pg_resetwal on the subscriber");
697 : else
698 2 : pg_log_info("running pg_resetwal on the subscriber");
699 :
700 8 : cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
701 : subscriber_dir, DEVNULL);
702 :
703 8 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
704 :
705 8 : if (!dry_run)
706 : {
707 2 : int rc = system(cmd_str);
708 :
709 2 : if (rc == 0)
710 2 : pg_log_info("successfully reset WAL on the subscriber");
711 : else
712 0 : pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
713 : }
714 :
715 8 : pg_free(cf);
716 8 : }
717 :
718 : /*
719 : * Generate an object name using a prefix, database oid and a random integer.
720 : * It is used in case the user does not specify an object name (publication,
721 : * subscription, replication slot).
722 : */
723 : static char *
724 16 : generate_object_name(PGconn *conn)
725 : {
726 : PGresult *res;
727 : Oid oid;
728 : uint32 rand;
729 : char *objname;
730 :
731 16 : res = PQexec(conn,
732 : "SELECT oid FROM pg_catalog.pg_database "
733 : "WHERE datname = pg_catalog.current_database()");
734 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
735 : {
736 0 : pg_log_error("could not obtain database OID: %s",
737 : PQresultErrorMessage(res));
738 0 : disconnect_database(conn, true);
739 : }
740 :
741 16 : if (PQntuples(res) != 1)
742 : {
743 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
744 : PQntuples(res), 1);
745 0 : disconnect_database(conn, true);
746 : }
747 :
748 : /* Database OID */
749 16 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
750 :
751 16 : PQclear(res);
752 :
753 : /* Random unsigned integer */
754 16 : rand = pg_prng_uint32(&prng_state);
755 :
756 : /*
757 : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
758 : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
759 : * '\0').
760 : */
761 16 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
762 :
763 16 : return objname;
764 : }
765 :
766 : /*
767 : * Create the publications and replication slots in preparation for logical
768 : * replication. Returns the LSN from latest replication slot. It will be the
769 : * replication start point that is used to adjust the subscriptions (see
770 : * set_replication_progress).
771 : */
772 : static char *
773 8 : setup_publisher(struct LogicalRepInfo *dbinfo)
774 : {
775 8 : char *lsn = NULL;
776 :
777 8 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
778 :
779 24 : for (int i = 0; i < num_dbs; i++)
780 : {
781 : PGconn *conn;
782 16 : char *genname = NULL;
783 :
784 16 : conn = connect_database(dbinfo[i].pubconninfo, true);
785 :
786 : /*
787 : * If an object name was not specified as command-line options, assign
788 : * a generated object name. The replication slot has a different rule.
789 : * The subscription name is assigned to the replication slot name if
790 : * no replication slot is specified. It follows the same rule as
791 : * CREATE SUBSCRIPTION.
792 : */
793 16 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
794 16 : genname = generate_object_name(conn);
795 16 : if (num_pubs == 0)
796 8 : dbinfo[i].pubname = pg_strdup(genname);
797 16 : if (num_subs == 0)
798 12 : dbinfo[i].subname = pg_strdup(genname);
799 16 : if (num_replslots == 0)
800 10 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
801 :
802 : /*
803 : * Create publication on publisher. This step should be executed
804 : * *before* promoting the subscriber to avoid any transactions between
805 : * consistent LSN and the new publication rows (such transactions
806 : * wouldn't see the new publication rows resulting in an error).
807 : */
808 16 : create_publication(conn, &dbinfo[i]);
809 :
810 : /* Create replication slot on publisher */
811 16 : if (lsn)
812 2 : pg_free(lsn);
813 16 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
814 16 : if (lsn == NULL && !dry_run)
815 0 : exit(1);
816 :
817 : /*
818 : * Since we are using the LSN returned by the last replication slot as
819 : * recovery_target_lsn, this LSN is ahead of the current WAL position
820 : * and the recovery waits until the publisher writes a WAL record to
821 : * reach the target and ends the recovery. On idle systems, this wait
822 : * time is unpredictable and could lead to failure in promoting the
823 : * subscriber. To avoid that, insert a harmless WAL record.
824 : */
825 16 : if (i == num_dbs - 1 && !dry_run)
826 : {
827 : PGresult *res;
828 :
829 2 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
830 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
831 : {
832 0 : pg_log_error("could not write an additional WAL record: %s",
833 : PQresultErrorMessage(res));
834 0 : disconnect_database(conn, true);
835 : }
836 2 : PQclear(res);
837 : }
838 :
839 16 : disconnect_database(conn, false);
840 : }
841 :
842 8 : return lsn;
843 : }
844 :
845 : /*
846 : * Is recovery still in progress?
847 : */
848 : static bool
849 32 : server_is_in_recovery(PGconn *conn)
850 : {
851 : PGresult *res;
852 : int ret;
853 :
854 32 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
855 :
856 32 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
857 : {
858 0 : pg_log_error("could not obtain recovery progress: %s",
859 : PQresultErrorMessage(res));
860 0 : disconnect_database(conn, true);
861 : }
862 :
863 :
864 32 : ret = strcmp("t", PQgetvalue(res, 0, 0));
865 :
866 32 : PQclear(res);
867 :
868 32 : return ret == 0;
869 : }
870 :
871 : /*
872 : * Is the primary server ready for logical replication?
873 : *
874 : * XXX Does it not allow a synchronous replica?
875 : */
876 : static void
877 12 : check_publisher(const struct LogicalRepInfo *dbinfo)
878 : {
879 : PGconn *conn;
880 : PGresult *res;
881 12 : bool failed = false;
882 :
883 : char *wal_level;
884 : int max_repslots;
885 : int cur_repslots;
886 : int max_walsenders;
887 : int cur_walsenders;
888 : int max_prepared_transactions;
889 : char *max_slot_wal_keep_size;
890 :
891 12 : pg_log_info("checking settings on publisher");
892 :
893 12 : conn = connect_database(dbinfo[0].pubconninfo, true);
894 :
895 : /*
896 : * If the primary server is in recovery (i.e. cascading replication),
897 : * objects (publication) cannot be created because it is read only.
898 : */
899 12 : if (server_is_in_recovery(conn))
900 : {
901 2 : pg_log_error("primary server cannot be in recovery");
902 2 : disconnect_database(conn, true);
903 : }
904 :
905 : /*------------------------------------------------------------------------
906 : * Logical replication requires a few parameters to be set on publisher.
907 : * Since these parameters are not a requirement for physical replication,
908 : * we should check it to make sure it won't fail.
909 : *
910 : * - wal_level = logical
911 : * - max_replication_slots >= current + number of dbs to be converted
912 : * - max_wal_senders >= current + number of dbs to be converted
913 : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
914 : * -----------------------------------------------------------------------
915 : */
916 10 : res = PQexec(conn,
917 : "SELECT pg_catalog.current_setting('wal_level'),"
918 : " pg_catalog.current_setting('max_replication_slots'),"
919 : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
920 : " pg_catalog.current_setting('max_wal_senders'),"
921 : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
922 : " pg_catalog.current_setting('max_prepared_transactions'),"
923 : " pg_catalog.current_setting('max_slot_wal_keep_size')");
924 :
925 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
926 : {
927 0 : pg_log_error("could not obtain publisher settings: %s",
928 : PQresultErrorMessage(res));
929 0 : disconnect_database(conn, true);
930 : }
931 :
932 10 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
933 10 : max_repslots = atoi(PQgetvalue(res, 0, 1));
934 10 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
935 10 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
936 10 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
937 10 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
938 10 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
939 :
940 10 : PQclear(res);
941 :
942 10 : pg_log_debug("publisher: wal_level: %s", wal_level);
943 10 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
944 10 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
945 10 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
946 10 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
947 10 : pg_log_debug("publisher: max_prepared_transactions: %d",
948 : max_prepared_transactions);
949 10 : pg_log_debug("publisher: max_slot_wal_keep_size: %s",
950 : max_slot_wal_keep_size);
951 :
952 10 : disconnect_database(conn, false);
953 :
954 10 : if (strcmp(wal_level, "logical") != 0)
955 : {
956 2 : pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
957 2 : failed = true;
958 : }
959 :
960 10 : if (max_repslots - cur_repslots < num_dbs)
961 : {
962 2 : pg_log_error("publisher requires %d replication slots, but only %d remain",
963 : num_dbs, max_repslots - cur_repslots);
964 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
965 : "max_replication_slots", cur_repslots + num_dbs);
966 2 : failed = true;
967 : }
968 :
969 10 : if (max_walsenders - cur_walsenders < num_dbs)
970 : {
971 2 : pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
972 : num_dbs, max_walsenders - cur_walsenders);
973 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
974 : "max_wal_senders", cur_walsenders + num_dbs);
975 2 : failed = true;
976 : }
977 :
978 10 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
979 : {
980 0 : pg_log_warning("two_phase option will not be enabled for replication slots");
981 0 : pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
982 : "Prepared transactions will be replicated at COMMIT PREPARED.");
983 0 : pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
984 : }
985 :
986 : /*
987 : * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
988 : * is set to a non-default value, it may cause replication failures due to
989 : * required WAL files being prematurely removed.
990 : */
991 10 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
992 : {
993 0 : pg_log_warning("required WAL could be removed from the publisher");
994 0 : pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
995 : "max_slot_wal_keep_size");
996 : }
997 :
998 10 : pg_free(wal_level);
999 :
1000 10 : if (failed)
1001 2 : exit(1);
1002 8 : }
1003 :
1004 : /*
1005 : * Is the standby server ready for logical replication?
1006 : *
1007 : * XXX Does it not allow a time-delayed replica?
1008 : *
1009 : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1010 : * is S, it cannot detect there is a replica (server C) because server S starts
1011 : * accepting only local connections and server C cannot connect to it. Hence,
1012 : * there is not a reliable way to provide a suitable error saying the server C
1013 : * will be broken at the end of this process (due to pg_resetwal).
1014 : */
1015 : static void
1016 16 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1017 : {
1018 : PGconn *conn;
1019 : PGresult *res;
1020 16 : bool failed = false;
1021 :
1022 : int max_lrworkers;
1023 : int max_reporigins;
1024 : int max_wprocs;
1025 :
1026 16 : pg_log_info("checking settings on subscriber");
1027 :
1028 16 : conn = connect_database(dbinfo[0].subconninfo, true);
1029 :
1030 : /* The target server must be a standby */
1031 16 : if (!server_is_in_recovery(conn))
1032 : {
1033 2 : pg_log_error("target server must be a standby");
1034 2 : disconnect_database(conn, true);
1035 : }
1036 :
1037 : /*------------------------------------------------------------------------
1038 : * Logical replication requires a few parameters to be set on subscriber.
1039 : * Since these parameters are not a requirement for physical replication,
1040 : * we should check it to make sure it won't fail.
1041 : *
1042 : * - max_active_replication_origins >= number of dbs to be converted
1043 : * - max_logical_replication_workers >= number of dbs to be converted
1044 : * - max_worker_processes >= 1 + number of dbs to be converted
1045 : *------------------------------------------------------------------------
1046 : */
1047 14 : res = PQexec(conn,
1048 : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1049 : "'max_logical_replication_workers', "
1050 : "'max_active_replication_origins', "
1051 : "'max_worker_processes', "
1052 : "'primary_slot_name') "
1053 : "ORDER BY name");
1054 :
1055 14 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1056 : {
1057 0 : pg_log_error("could not obtain subscriber settings: %s",
1058 : PQresultErrorMessage(res));
1059 0 : disconnect_database(conn, true);
1060 : }
1061 :
1062 14 : max_reporigins = atoi(PQgetvalue(res, 0, 0));
1063 14 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1064 14 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1065 14 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1066 12 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1067 :
1068 14 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1069 : max_lrworkers);
1070 14 : pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1071 14 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1072 14 : if (primary_slot_name)
1073 12 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1074 :
1075 14 : PQclear(res);
1076 :
1077 14 : disconnect_database(conn, false);
1078 :
1079 14 : if (max_reporigins < num_dbs)
1080 : {
1081 2 : pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1082 : num_dbs, max_reporigins);
1083 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1084 : "max_active_replication_origins", num_dbs);
1085 2 : failed = true;
1086 : }
1087 :
1088 14 : if (max_lrworkers < num_dbs)
1089 : {
1090 2 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1091 : num_dbs, max_lrworkers);
1092 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1093 : "max_logical_replication_workers", num_dbs);
1094 2 : failed = true;
1095 : }
1096 :
1097 14 : if (max_wprocs < num_dbs + 1)
1098 : {
1099 2 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1100 : num_dbs + 1, max_wprocs);
1101 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1102 : "max_worker_processes", num_dbs + 1);
1103 2 : failed = true;
1104 : }
1105 :
1106 14 : if (failed)
1107 2 : exit(1);
1108 12 : }
1109 :
1110 : /*
1111 : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1112 : * the primary (publisher node) and the newly created subscriber. We
1113 : * shouldn't drop the associated slot as that would be used by the publisher
1114 : * node.
1115 : */
1116 : static void
1117 8 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
1118 : {
1119 8 : PQExpBuffer query = createPQExpBuffer();
1120 : PGresult *res;
1121 :
1122 : Assert(conn != NULL);
1123 :
1124 : /*
1125 : * Construct a query string. These commands are allowed to be executed
1126 : * within a transaction.
1127 : */
1128 8 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1129 : subname);
1130 8 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1131 : subname);
1132 8 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1133 :
1134 8 : if (dry_run)
1135 6 : pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1136 : subname, dbname);
1137 : else
1138 : {
1139 2 : pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1140 : subname, dbname);
1141 :
1142 2 : res = PQexec(conn, query->data);
1143 :
1144 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1145 : {
1146 0 : pg_log_error("could not drop subscription \"%s\": %s",
1147 : subname, PQresultErrorMessage(res));
1148 0 : disconnect_database(conn, true);
1149 : }
1150 :
1151 2 : PQclear(res);
1152 : }
1153 :
1154 8 : destroyPQExpBuffer(query);
1155 8 : }
1156 :
1157 : /*
1158 : * Retrieve and drop the pre-existing subscriptions.
1159 : */
1160 : static void
1161 16 : check_and_drop_existing_subscriptions(PGconn *conn,
1162 : const struct LogicalRepInfo *dbinfo)
1163 : {
1164 16 : PQExpBuffer query = createPQExpBuffer();
1165 : char *dbname;
1166 : PGresult *res;
1167 :
1168 : Assert(conn != NULL);
1169 :
1170 16 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1171 :
1172 16 : appendPQExpBuffer(query,
1173 : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1174 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1175 : "WHERE d.datname = %s",
1176 : dbname);
1177 16 : res = PQexec(conn, query->data);
1178 :
1179 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1180 : {
1181 0 : pg_log_error("could not obtain pre-existing subscriptions: %s",
1182 : PQresultErrorMessage(res));
1183 0 : disconnect_database(conn, true);
1184 : }
1185 :
1186 24 : for (int i = 0; i < PQntuples(res); i++)
1187 8 : drop_existing_subscription(conn, PQgetvalue(res, i, 0),
1188 8 : dbinfo->dbname);
1189 :
1190 16 : PQclear(res);
1191 16 : destroyPQExpBuffer(query);
1192 16 : PQfreemem(dbname);
1193 16 : }
1194 :
1195 : /*
1196 : * Create the subscriptions, adjust the initial location for logical
1197 : * replication and enable the subscriptions. That's the last step for logical
1198 : * replication setup.
1199 : */
1200 : static void
1201 8 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1202 : {
1203 24 : for (int i = 0; i < num_dbs; i++)
1204 : {
1205 : PGconn *conn;
1206 :
1207 : /* Connect to subscriber. */
1208 16 : conn = connect_database(dbinfo[i].subconninfo, true);
1209 :
1210 : /*
1211 : * We don't need the pre-existing subscriptions on the newly formed
1212 : * subscriber. They can connect to other publisher nodes and either
1213 : * get some unwarranted data or can lead to ERRORs in connecting to
1214 : * such nodes.
1215 : */
1216 16 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1217 :
1218 : /* Check and drop the required publications in the given database. */
1219 16 : check_and_drop_publications(conn, &dbinfo[i]);
1220 :
1221 16 : create_subscription(conn, &dbinfo[i]);
1222 :
1223 : /* Set the replication progress to the correct LSN */
1224 16 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1225 :
1226 : /* Enable subscription */
1227 16 : enable_subscription(conn, &dbinfo[i]);
1228 :
1229 16 : disconnect_database(conn, false);
1230 : }
1231 8 : }
1232 :
1233 : /*
1234 : * Write the required recovery parameters.
1235 : */
1236 : static void
1237 8 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1238 : {
1239 : PGconn *conn;
1240 : PQExpBuffer recoveryconfcontents;
1241 :
1242 : /*
1243 : * Despite of the recovery parameters will be written to the subscriber,
1244 : * use a publisher connection. The primary_conninfo is generated using the
1245 : * connection settings.
1246 : */
1247 8 : conn = connect_database(dbinfo[0].pubconninfo, true);
1248 :
1249 : /*
1250 : * Write recovery parameters.
1251 : *
1252 : * The subscriber is not running yet. In dry run mode, the recovery
1253 : * parameters *won't* be written. An invalid LSN is used for printing
1254 : * purposes. Additional recovery parameters are added here. It avoids
1255 : * unexpected behavior such as end of recovery as soon as a consistent
1256 : * state is reached (recovery_target) and failure due to multiple recovery
1257 : * targets (name, time, xid, LSN).
1258 : */
1259 8 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1260 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1261 8 : appendPQExpBufferStr(recoveryconfcontents,
1262 : "recovery_target_timeline = 'latest'\n");
1263 :
1264 : /*
1265 : * Set recovery_target_inclusive = false to avoid reapplying the
1266 : * transaction committed at 'lsn' after subscription is enabled. This is
1267 : * because the provided 'lsn' is also used as the replication start point
1268 : * for the subscription. So, the server can send the transaction committed
1269 : * at that 'lsn' after replication is started which can lead to applying
1270 : * the same transaction twice if we keep recovery_target_inclusive = true.
1271 : */
1272 8 : appendPQExpBufferStr(recoveryconfcontents,
1273 : "recovery_target_inclusive = false\n");
1274 8 : appendPQExpBufferStr(recoveryconfcontents,
1275 : "recovery_target_action = promote\n");
1276 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1277 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1278 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1279 :
1280 8 : if (dry_run)
1281 : {
1282 6 : appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1283 6 : appendPQExpBuffer(recoveryconfcontents,
1284 : "recovery_target_lsn = '%X/%08X'\n",
1285 6 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1286 : }
1287 : else
1288 : {
1289 2 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1290 : lsn);
1291 2 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1292 : }
1293 8 : disconnect_database(conn, false);
1294 :
1295 8 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1296 8 : }
1297 :
1298 : /*
1299 : * Drop physical replication slot on primary if the standby was using it. After
1300 : * the transformation, it has no use.
1301 : *
1302 : * XXX we might not fail here. Instead, we provide a warning so the user
1303 : * eventually drops this replication slot later.
1304 : */
1305 : static void
1306 8 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1307 : {
1308 : PGconn *conn;
1309 :
1310 : /* Replication slot does not exist, do nothing */
1311 8 : if (!primary_slot_name)
1312 0 : return;
1313 :
1314 8 : conn = connect_database(dbinfo[0].pubconninfo, false);
1315 8 : if (conn != NULL)
1316 : {
1317 8 : drop_replication_slot(conn, &dbinfo[0], slotname);
1318 8 : disconnect_database(conn, false);
1319 : }
1320 : else
1321 : {
1322 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1323 : slotname);
1324 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1325 : }
1326 : }
1327 :
1328 : /*
1329 : * Drop failover replication slots on subscriber. After the transformation,
1330 : * they have no use.
1331 : *
1332 : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1333 : * them later.
1334 : */
1335 : static void
1336 8 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1337 : {
1338 : PGconn *conn;
1339 : PGresult *res;
1340 :
1341 8 : conn = connect_database(dbinfo[0].subconninfo, false);
1342 8 : if (conn != NULL)
1343 : {
1344 : /* Get failover replication slot names */
1345 8 : res = PQexec(conn,
1346 : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1347 :
1348 8 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1349 : {
1350 : /* Remove failover replication slots from subscriber */
1351 16 : for (int i = 0; i < PQntuples(res); i++)
1352 8 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1353 : }
1354 : else
1355 : {
1356 0 : pg_log_warning("could not obtain failover replication slot information: %s",
1357 : PQresultErrorMessage(res));
1358 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1359 : }
1360 :
1361 8 : PQclear(res);
1362 8 : disconnect_database(conn, false);
1363 : }
1364 : else
1365 : {
1366 0 : pg_log_warning("could not drop failover replication slot");
1367 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1368 : }
1369 8 : }
1370 :
1371 : /*
1372 : * Create a logical replication slot and returns a LSN.
1373 : *
1374 : * CreateReplicationSlot() is not used because it does not provide the one-row
1375 : * result set that contains the LSN.
1376 : */
1377 : static char *
1378 16 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1379 : {
1380 16 : PQExpBuffer str = createPQExpBuffer();
1381 16 : PGresult *res = NULL;
1382 16 : const char *slot_name = dbinfo->replslotname;
1383 : char *slot_name_esc;
1384 16 : char *lsn = NULL;
1385 :
1386 : Assert(conn != NULL);
1387 :
1388 16 : if (dry_run)
1389 12 : pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1390 : slot_name, dbinfo->dbname);
1391 : else
1392 4 : pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1393 : slot_name, dbinfo->dbname);
1394 :
1395 16 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1396 :
1397 16 : appendPQExpBuffer(str,
1398 : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1399 : slot_name_esc,
1400 16 : dbinfos.two_phase ? "true" : "false");
1401 :
1402 16 : PQfreemem(slot_name_esc);
1403 :
1404 16 : pg_log_debug("command is: %s", str->data);
1405 :
1406 16 : if (!dry_run)
1407 : {
1408 4 : res = PQexec(conn, str->data);
1409 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1410 : {
1411 0 : pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1412 : slot_name, dbinfo->dbname,
1413 : PQresultErrorMessage(res));
1414 0 : PQclear(res);
1415 0 : destroyPQExpBuffer(str);
1416 0 : return NULL;
1417 : }
1418 :
1419 4 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1420 4 : PQclear(res);
1421 : }
1422 :
1423 : /* For cleanup purposes */
1424 16 : dbinfo->made_replslot = true;
1425 :
1426 16 : destroyPQExpBuffer(str);
1427 :
1428 16 : return lsn;
1429 : }
1430 :
1431 : static void
1432 16 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1433 : const char *slot_name)
1434 : {
1435 16 : PQExpBuffer str = createPQExpBuffer();
1436 : char *slot_name_esc;
1437 : PGresult *res;
1438 :
1439 : Assert(conn != NULL);
1440 :
1441 16 : if (dry_run)
1442 12 : pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1443 : slot_name, dbinfo->dbname);
1444 : else
1445 4 : pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1446 : slot_name, dbinfo->dbname);
1447 :
1448 16 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1449 :
1450 16 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1451 :
1452 16 : PQfreemem(slot_name_esc);
1453 :
1454 16 : pg_log_debug("command is: %s", str->data);
1455 :
1456 16 : if (!dry_run)
1457 : {
1458 4 : res = PQexec(conn, str->data);
1459 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1460 : {
1461 0 : pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1462 : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1463 0 : dbinfo->made_replslot = false; /* don't try again. */
1464 : }
1465 :
1466 4 : PQclear(res);
1467 : }
1468 :
1469 16 : destroyPQExpBuffer(str);
1470 16 : }
1471 :
1472 : /*
1473 : * Reports a suitable message if pg_ctl fails.
1474 : */
1475 : static void
1476 48 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1477 : {
1478 48 : if (rc != 0)
1479 : {
1480 0 : if (WIFEXITED(rc))
1481 : {
1482 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1483 : }
1484 0 : else if (WIFSIGNALED(rc))
1485 : {
1486 : #if defined(WIN32)
1487 : pg_log_error("pg_ctl was terminated by exception 0x%X",
1488 : WTERMSIG(rc));
1489 : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1490 : #else
1491 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1492 : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1493 : #endif
1494 : }
1495 : else
1496 : {
1497 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1498 : }
1499 :
1500 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1501 0 : exit(1);
1502 : }
1503 48 : }
1504 :
1505 : static void
1506 24 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1507 : bool restrict_logical_worker)
1508 : {
1509 24 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1510 : int rc;
1511 :
1512 24 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1513 24 : appendShellString(pg_ctl_cmd, subscriber_dir);
1514 24 : appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1515 :
1516 : /* Prevent unintended slot invalidation */
1517 24 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1518 :
1519 24 : if (restricted_access)
1520 : {
1521 24 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1522 : #if !defined(WIN32)
1523 :
1524 : /*
1525 : * An empty listen_addresses list means the server does not listen on
1526 : * any IP interfaces; only Unix-domain sockets can be used to connect
1527 : * to the server. Prevent external connections to minimize the chance
1528 : * of failure.
1529 : */
1530 24 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1531 24 : if (opt->socket_dir)
1532 24 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1533 24 : opt->socket_dir);
1534 24 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1535 : #endif
1536 : }
1537 24 : if (opt->config_file != NULL)
1538 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1539 0 : opt->config_file);
1540 :
1541 : /* Suppress to start logical replication if requested */
1542 24 : if (restrict_logical_worker)
1543 8 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1544 :
1545 24 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1546 24 : rc = system(pg_ctl_cmd->data);
1547 24 : pg_ctl_status(pg_ctl_cmd->data, rc);
1548 24 : standby_running = true;
1549 24 : destroyPQExpBuffer(pg_ctl_cmd);
1550 24 : pg_log_info("server was started");
1551 24 : }
1552 :
1553 : static void
1554 24 : stop_standby_server(const char *datadir)
1555 : {
1556 : char *pg_ctl_cmd;
1557 : int rc;
1558 :
1559 24 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1560 : datadir);
1561 24 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1562 24 : rc = system(pg_ctl_cmd);
1563 24 : pg_ctl_status(pg_ctl_cmd, rc);
1564 24 : standby_running = false;
1565 24 : pg_log_info("server was stopped");
1566 24 : }
1567 :
1568 : /*
1569 : * Returns after the server finishes the recovery process.
1570 : *
1571 : * If recovery_timeout option is set, terminate abnormally without finishing
1572 : * the recovery process. By default, it waits forever.
1573 : *
1574 : * XXX Is the recovery process still in progress? When recovery process has a
1575 : * better progress reporting mechanism, it should be added here.
1576 : */
1577 : static void
1578 8 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1579 : {
1580 : PGconn *conn;
1581 8 : bool ready = false;
1582 8 : int timer = 0;
1583 :
1584 8 : pg_log_info("waiting for the target server to reach the consistent state");
1585 :
1586 8 : conn = connect_database(conninfo, true);
1587 :
1588 : for (;;)
1589 : {
1590 : /* Did the recovery process finish? We're done if so. */
1591 10 : if (dry_run || !server_is_in_recovery(conn))
1592 : {
1593 8 : ready = true;
1594 8 : recovery_ended = true;
1595 8 : break;
1596 : }
1597 :
1598 : /* Bail out after recovery_timeout seconds if this option is set */
1599 2 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1600 : {
1601 0 : stop_standby_server(subscriber_dir);
1602 0 : pg_log_error("recovery timed out");
1603 0 : disconnect_database(conn, true);
1604 : }
1605 :
1606 : /* Keep waiting */
1607 2 : pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
1608 2 : timer += WAIT_INTERVAL;
1609 : }
1610 :
1611 8 : disconnect_database(conn, false);
1612 :
1613 8 : if (!ready)
1614 0 : pg_fatal("server did not end recovery");
1615 :
1616 8 : pg_log_info("target server reached the consistent state");
1617 8 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1618 8 : }
1619 :
1620 : /*
1621 : * Create a publication that includes all tables in the database.
1622 : */
1623 : static void
1624 16 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1625 : {
1626 16 : PQExpBuffer str = createPQExpBuffer();
1627 : PGresult *res;
1628 : char *ipubname_esc;
1629 : char *spubname_esc;
1630 :
1631 : Assert(conn != NULL);
1632 :
1633 16 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1634 16 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1635 :
1636 : /* Check if the publication already exists */
1637 16 : appendPQExpBuffer(str,
1638 : "SELECT 1 FROM pg_catalog.pg_publication "
1639 : "WHERE pubname = %s",
1640 : spubname_esc);
1641 16 : res = PQexec(conn, str->data);
1642 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1643 : {
1644 0 : pg_log_error("could not obtain publication information: %s",
1645 : PQresultErrorMessage(res));
1646 0 : disconnect_database(conn, true);
1647 : }
1648 :
1649 16 : if (PQntuples(res) == 1)
1650 : {
1651 : /*
1652 : * Unfortunately, if it reaches this code path, it will always fail
1653 : * (unless you decide to change the existing publication name). That's
1654 : * bad but it is very unlikely that the user will choose a name with
1655 : * pg_createsubscriber_ prefix followed by the exact database oid and
1656 : * a random number.
1657 : */
1658 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1659 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
1660 0 : disconnect_database(conn, true);
1661 : }
1662 :
1663 16 : PQclear(res);
1664 16 : resetPQExpBuffer(str);
1665 :
1666 16 : if (dry_run)
1667 12 : pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1668 : dbinfo->pubname, dbinfo->dbname);
1669 : else
1670 4 : pg_log_info("creating publication \"%s\" in database \"%s\"",
1671 : dbinfo->pubname, dbinfo->dbname);
1672 :
1673 16 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1674 : ipubname_esc);
1675 :
1676 16 : pg_log_debug("command is: %s", str->data);
1677 :
1678 16 : if (!dry_run)
1679 : {
1680 4 : res = PQexec(conn, str->data);
1681 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1682 : {
1683 0 : pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1684 : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1685 0 : disconnect_database(conn, true);
1686 : }
1687 4 : PQclear(res);
1688 : }
1689 :
1690 : /* For cleanup purposes */
1691 16 : dbinfo->made_publication = true;
1692 :
1693 16 : PQfreemem(ipubname_esc);
1694 16 : PQfreemem(spubname_esc);
1695 16 : destroyPQExpBuffer(str);
1696 16 : }
1697 :
1698 : /*
1699 : * Drop the specified publication in the given database.
1700 : */
1701 : static void
1702 20 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1703 : bool *made_publication)
1704 : {
1705 20 : PQExpBuffer str = createPQExpBuffer();
1706 : PGresult *res;
1707 : char *pubname_esc;
1708 :
1709 : Assert(conn != NULL);
1710 :
1711 20 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1712 :
1713 20 : if (dry_run)
1714 12 : pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1715 : pubname, dbname);
1716 : else
1717 8 : pg_log_info("dropping publication \"%s\" in database \"%s\"",
1718 : pubname, dbname);
1719 :
1720 20 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1721 :
1722 20 : PQfreemem(pubname_esc);
1723 :
1724 20 : pg_log_debug("command is: %s", str->data);
1725 :
1726 20 : if (!dry_run)
1727 : {
1728 8 : res = PQexec(conn, str->data);
1729 8 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1730 : {
1731 0 : pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1732 : pubname, dbname, PQresultErrorMessage(res));
1733 0 : *made_publication = false; /* don't try again. */
1734 :
1735 : /*
1736 : * Don't disconnect and exit here. This routine is used by primary
1737 : * (cleanup publication / replication slot due to an error) and
1738 : * subscriber (remove the replicated publications). In both cases,
1739 : * it can continue and provide instructions for the user to remove
1740 : * it later if cleanup fails.
1741 : */
1742 : }
1743 8 : PQclear(res);
1744 : }
1745 :
1746 20 : destroyPQExpBuffer(str);
1747 20 : }
1748 :
1749 : /*
1750 : * Retrieve and drop the publications.
1751 : *
1752 : * Since the publications were created before the consistent LSN, they
1753 : * remain on the subscriber even after the physical replica is
1754 : * promoted. Remove these publications from the subscriber because
1755 : * they have no use. Additionally, if requested, drop all pre-existing
1756 : * publications.
1757 : */
1758 : static void
1759 16 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
1760 : {
1761 : PGresult *res;
1762 16 : bool drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
1763 :
1764 : Assert(conn != NULL);
1765 :
1766 16 : if (drop_all_pubs)
1767 : {
1768 4 : pg_log_info("dropping all existing publications in database \"%s\"",
1769 : dbinfo->dbname);
1770 :
1771 : /* Fetch all publication names */
1772 4 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1773 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1774 : {
1775 0 : pg_log_error("could not obtain publication information: %s",
1776 : PQresultErrorMessage(res));
1777 0 : PQclear(res);
1778 0 : disconnect_database(conn, true);
1779 : }
1780 :
1781 : /* Drop each publication */
1782 12 : for (int i = 0; i < PQntuples(res); i++)
1783 8 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1784 : &dbinfo->made_publication);
1785 :
1786 4 : PQclear(res);
1787 : }
1788 :
1789 : /*
1790 : * In dry-run mode, we don't create publications, but we still try to drop
1791 : * those to provide necessary information to the user.
1792 : */
1793 16 : if (!drop_all_pubs || dry_run)
1794 12 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1795 : &dbinfo->made_publication);
1796 16 : }
1797 :
1798 : /*
1799 : * Create a subscription with some predefined options.
1800 : *
1801 : * A replication slot was already created in a previous step. Let's use it. It
1802 : * is not required to copy data. The subscription will be created but it will
1803 : * not be enabled now. That's because the replication progress must be set and
1804 : * the replication origin name (one of the function arguments) contains the
1805 : * subscription OID in its name. Once the subscription is created,
1806 : * set_replication_progress() can obtain the chosen origin name and set up its
1807 : * initial location.
1808 : */
1809 : static void
1810 16 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1811 : {
1812 16 : PQExpBuffer str = createPQExpBuffer();
1813 : PGresult *res;
1814 : char *pubname_esc;
1815 : char *subname_esc;
1816 : char *pubconninfo_esc;
1817 : char *replslotname_esc;
1818 :
1819 : Assert(conn != NULL);
1820 :
1821 16 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1822 16 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1823 16 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1824 16 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1825 :
1826 16 : if (dry_run)
1827 12 : pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
1828 : dbinfo->subname, dbinfo->dbname);
1829 : else
1830 4 : pg_log_info("creating subscription \"%s\" in database \"%s\"",
1831 : dbinfo->subname, dbinfo->dbname);
1832 :
1833 16 : appendPQExpBuffer(str,
1834 : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1835 : "WITH (create_slot = false, enabled = false, "
1836 : "slot_name = %s, copy_data = false, two_phase = %s)",
1837 : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1838 16 : dbinfos.two_phase ? "true" : "false");
1839 :
1840 16 : PQfreemem(pubname_esc);
1841 16 : PQfreemem(subname_esc);
1842 16 : PQfreemem(pubconninfo_esc);
1843 16 : PQfreemem(replslotname_esc);
1844 :
1845 16 : pg_log_debug("command is: %s", str->data);
1846 :
1847 16 : if (!dry_run)
1848 : {
1849 4 : res = PQexec(conn, str->data);
1850 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1851 : {
1852 0 : pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1853 : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1854 0 : disconnect_database(conn, true);
1855 : }
1856 4 : PQclear(res);
1857 : }
1858 :
1859 16 : destroyPQExpBuffer(str);
1860 16 : }
1861 :
1862 : /*
1863 : * Sets the replication progress to the consistent LSN.
1864 : *
1865 : * The subscriber caught up to the consistent LSN provided by the last
1866 : * replication slot that was created. The goal is to set up the initial
1867 : * location for the logical replication that is the exact LSN that the
1868 : * subscriber was promoted. Once the subscription is enabled it will start
1869 : * streaming from that location onwards. In dry run mode, the subscription OID
1870 : * and LSN are set to invalid values for printing purposes.
1871 : */
1872 : static void
1873 16 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1874 : {
1875 16 : PQExpBuffer str = createPQExpBuffer();
1876 : PGresult *res;
1877 : Oid suboid;
1878 : char *subname;
1879 : char *dbname;
1880 : char *originname;
1881 : char *lsnstr;
1882 :
1883 : Assert(conn != NULL);
1884 :
1885 16 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1886 16 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1887 :
1888 16 : appendPQExpBuffer(str,
1889 : "SELECT s.oid FROM pg_catalog.pg_subscription s "
1890 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1891 : "WHERE s.subname = %s AND d.datname = %s",
1892 : subname, dbname);
1893 :
1894 16 : res = PQexec(conn, str->data);
1895 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1896 : {
1897 0 : pg_log_error("could not obtain subscription OID: %s",
1898 : PQresultErrorMessage(res));
1899 0 : disconnect_database(conn, true);
1900 : }
1901 :
1902 16 : if (PQntuples(res) != 1 && !dry_run)
1903 : {
1904 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1905 : PQntuples(res), 1);
1906 0 : disconnect_database(conn, true);
1907 : }
1908 :
1909 16 : if (dry_run)
1910 : {
1911 12 : suboid = InvalidOid;
1912 12 : lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1913 : }
1914 : else
1915 : {
1916 4 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1917 4 : lsnstr = psprintf("%s", lsn);
1918 : }
1919 :
1920 16 : PQclear(res);
1921 :
1922 : /*
1923 : * The origin name is defined as pg_%u. %u is the subscription OID. See
1924 : * ApplyWorkerMain().
1925 : */
1926 16 : originname = psprintf("pg_%u", suboid);
1927 :
1928 16 : if (dry_run)
1929 12 : pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1930 : originname, lsnstr, dbinfo->dbname);
1931 : else
1932 4 : pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1933 : originname, lsnstr, dbinfo->dbname);
1934 :
1935 16 : resetPQExpBuffer(str);
1936 16 : appendPQExpBuffer(str,
1937 : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1938 : originname, lsnstr);
1939 :
1940 16 : pg_log_debug("command is: %s", str->data);
1941 :
1942 16 : if (!dry_run)
1943 : {
1944 4 : res = PQexec(conn, str->data);
1945 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1946 : {
1947 0 : pg_log_error("could not set replication progress for subscription \"%s\": %s",
1948 : dbinfo->subname, PQresultErrorMessage(res));
1949 0 : disconnect_database(conn, true);
1950 : }
1951 4 : PQclear(res);
1952 : }
1953 :
1954 16 : PQfreemem(subname);
1955 16 : PQfreemem(dbname);
1956 16 : pg_free(originname);
1957 16 : pg_free(lsnstr);
1958 16 : destroyPQExpBuffer(str);
1959 16 : }
1960 :
1961 : /*
1962 : * Enables the subscription.
1963 : *
1964 : * The subscription was created in a previous step but it was disabled. After
1965 : * adjusting the initial logical replication location, enable the subscription.
1966 : */
1967 : static void
1968 16 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1969 : {
1970 16 : PQExpBuffer str = createPQExpBuffer();
1971 : PGresult *res;
1972 : char *subname;
1973 :
1974 : Assert(conn != NULL);
1975 :
1976 16 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1977 :
1978 16 : if (dry_run)
1979 12 : pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
1980 : dbinfo->subname, dbinfo->dbname);
1981 : else
1982 4 : pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1983 : dbinfo->subname, dbinfo->dbname);
1984 :
1985 16 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1986 :
1987 16 : pg_log_debug("command is: %s", str->data);
1988 :
1989 16 : if (!dry_run)
1990 : {
1991 4 : res = PQexec(conn, str->data);
1992 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1993 : {
1994 0 : pg_log_error("could not enable subscription \"%s\": %s",
1995 : dbinfo->subname, PQresultErrorMessage(res));
1996 0 : disconnect_database(conn, true);
1997 : }
1998 :
1999 4 : PQclear(res);
2000 : }
2001 :
2002 16 : PQfreemem(subname);
2003 16 : destroyPQExpBuffer(str);
2004 16 : }
2005 :
2006 : /*
2007 : * Fetch a list of all connectable non-template databases from the source server
2008 : * and form a list such that they appear as if the user has specified multiple
2009 : * --database options, one for each source database.
2010 : */
2011 : static void
2012 2 : get_publisher_databases(struct CreateSubscriberOptions *opt,
2013 : bool dbnamespecified)
2014 : {
2015 : PGconn *conn;
2016 : PGresult *res;
2017 :
2018 : /* If a database name was specified, just connect to it. */
2019 2 : if (dbnamespecified)
2020 0 : conn = connect_database(opt->pub_conninfo_str, true);
2021 : else
2022 : {
2023 : /* Otherwise, try postgres first and then template1. */
2024 : char *conninfo;
2025 :
2026 2 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2027 2 : conn = connect_database(conninfo, false);
2028 2 : pg_free(conninfo);
2029 2 : if (!conn)
2030 : {
2031 0 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2032 0 : conn = connect_database(conninfo, true);
2033 0 : pg_free(conninfo);
2034 : }
2035 : }
2036 :
2037 2 : res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2038 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2039 : {
2040 0 : pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2041 0 : PQclear(res);
2042 0 : disconnect_database(conn, true);
2043 : }
2044 :
2045 8 : for (int i = 0; i < PQntuples(res); i++)
2046 : {
2047 6 : const char *dbname = PQgetvalue(res, i, 0);
2048 :
2049 6 : simple_string_list_append(&opt->database_names, dbname);
2050 :
2051 : /* Increment num_dbs to reflect multiple --database options */
2052 6 : num_dbs++;
2053 : }
2054 :
2055 2 : PQclear(res);
2056 2 : disconnect_database(conn, false);
2057 2 : }
2058 :
2059 : int
2060 46 : main(int argc, char **argv)
2061 : {
2062 : static struct option long_options[] =
2063 : {
2064 : {"all", no_argument, NULL, 'a'},
2065 : {"database", required_argument, NULL, 'd'},
2066 : {"pgdata", required_argument, NULL, 'D'},
2067 : {"dry-run", no_argument, NULL, 'n'},
2068 : {"subscriber-port", required_argument, NULL, 'p'},
2069 : {"publisher-server", required_argument, NULL, 'P'},
2070 : {"socketdir", required_argument, NULL, 's'},
2071 : {"recovery-timeout", required_argument, NULL, 't'},
2072 : {"enable-two-phase", no_argument, NULL, 'T'},
2073 : {"subscriber-username", required_argument, NULL, 'U'},
2074 : {"verbose", no_argument, NULL, 'v'},
2075 : {"version", no_argument, NULL, 'V'},
2076 : {"help", no_argument, NULL, '?'},
2077 : {"config-file", required_argument, NULL, 1},
2078 : {"publication", required_argument, NULL, 2},
2079 : {"replication-slot", required_argument, NULL, 3},
2080 : {"subscription", required_argument, NULL, 4},
2081 : {"clean", required_argument, NULL, 5},
2082 : {NULL, 0, NULL, 0}
2083 : };
2084 :
2085 46 : struct CreateSubscriberOptions opt = {0};
2086 :
2087 : int c;
2088 : int option_index;
2089 :
2090 : char *pub_base_conninfo;
2091 : char *sub_base_conninfo;
2092 46 : char *dbname_conninfo = NULL;
2093 :
2094 : uint64 pub_sysid;
2095 : uint64 sub_sysid;
2096 : struct stat statbuf;
2097 :
2098 : char *consistent_lsn;
2099 :
2100 : char pidfile[MAXPGPATH];
2101 :
2102 46 : pg_logging_init(argv[0]);
2103 46 : pg_logging_set_level(PG_LOG_WARNING);
2104 46 : progname = get_progname(argv[0]);
2105 46 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2106 :
2107 46 : if (argc > 1)
2108 : {
2109 44 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2110 : {
2111 2 : usage();
2112 2 : exit(0);
2113 : }
2114 42 : else if (strcmp(argv[1], "-V") == 0
2115 42 : || strcmp(argv[1], "--version") == 0)
2116 : {
2117 2 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2118 2 : exit(0);
2119 : }
2120 : }
2121 :
2122 : /* Default settings */
2123 42 : subscriber_dir = NULL;
2124 42 : opt.config_file = NULL;
2125 42 : opt.pub_conninfo_str = NULL;
2126 42 : opt.socket_dir = NULL;
2127 42 : opt.sub_port = DEFAULT_SUB_PORT;
2128 42 : opt.sub_username = NULL;
2129 42 : opt.two_phase = false;
2130 42 : opt.database_names = (SimpleStringList)
2131 : {
2132 : 0
2133 : };
2134 42 : opt.recovery_timeout = 0;
2135 42 : opt.all_dbs = false;
2136 :
2137 : /*
2138 : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2139 : * it either.
2140 : */
2141 : #ifndef WIN32
2142 42 : if (geteuid() == 0)
2143 : {
2144 0 : pg_log_error("cannot be executed by \"root\"");
2145 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2146 : progname);
2147 0 : exit(1);
2148 : }
2149 : #endif
2150 :
2151 42 : get_restricted_token();
2152 :
2153 324 : while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
2154 324 : long_options, &option_index)) != -1)
2155 : {
2156 288 : switch (c)
2157 : {
2158 6 : case 'a':
2159 6 : opt.all_dbs = true;
2160 6 : break;
2161 50 : case 'd':
2162 50 : if (!simple_string_list_member(&opt.database_names, optarg))
2163 : {
2164 48 : simple_string_list_append(&opt.database_names, optarg);
2165 48 : num_dbs++;
2166 : }
2167 : else
2168 2 : pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2169 48 : break;
2170 38 : case 'D':
2171 38 : subscriber_dir = pg_strdup(optarg);
2172 38 : canonicalize_path(subscriber_dir);
2173 38 : break;
2174 18 : case 'n':
2175 18 : dry_run = true;
2176 18 : break;
2177 24 : case 'p':
2178 24 : opt.sub_port = pg_strdup(optarg);
2179 24 : break;
2180 36 : case 'P':
2181 36 : opt.pub_conninfo_str = pg_strdup(optarg);
2182 36 : break;
2183 24 : case 's':
2184 24 : opt.socket_dir = pg_strdup(optarg);
2185 24 : canonicalize_path(opt.socket_dir);
2186 24 : break;
2187 6 : case 't':
2188 6 : opt.recovery_timeout = atoi(optarg);
2189 6 : break;
2190 2 : case 'T':
2191 2 : opt.two_phase = true;
2192 2 : break;
2193 0 : case 'U':
2194 0 : opt.sub_username = pg_strdup(optarg);
2195 0 : break;
2196 38 : case 'v':
2197 38 : pg_logging_increase_verbosity();
2198 38 : break;
2199 0 : case 1:
2200 0 : opt.config_file = pg_strdup(optarg);
2201 0 : break;
2202 24 : case 2:
2203 24 : if (!simple_string_list_member(&opt.pub_names, optarg))
2204 : {
2205 22 : simple_string_list_append(&opt.pub_names, optarg);
2206 22 : num_pubs++;
2207 : }
2208 : else
2209 2 : pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2210 22 : break;
2211 8 : case 3:
2212 8 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2213 : {
2214 8 : simple_string_list_append(&opt.replslot_names, optarg);
2215 8 : num_replslots++;
2216 : }
2217 : else
2218 0 : pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2219 8 : break;
2220 10 : case 4:
2221 10 : if (!simple_string_list_member(&opt.sub_names, optarg))
2222 : {
2223 10 : simple_string_list_append(&opt.sub_names, optarg);
2224 10 : num_subs++;
2225 : }
2226 : else
2227 0 : pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2228 10 : break;
2229 2 : case 5:
2230 2 : if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
2231 2 : simple_string_list_append(&opt.objecttypes_to_clean, optarg);
2232 : else
2233 0 : pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2234 2 : break;
2235 2 : default:
2236 : /* getopt_long already emitted a complaint */
2237 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2238 2 : exit(1);
2239 : }
2240 : }
2241 :
2242 : /* Validate that --all is not used with incompatible options */
2243 36 : if (opt.all_dbs)
2244 : {
2245 6 : char *bad_switch = NULL;
2246 :
2247 6 : if (num_dbs > 0)
2248 2 : bad_switch = "--database";
2249 4 : else if (num_pubs > 0)
2250 2 : bad_switch = "--publication";
2251 2 : else if (num_replslots > 0)
2252 0 : bad_switch = "--replication-slot";
2253 2 : else if (num_subs > 0)
2254 0 : bad_switch = "--subscription";
2255 :
2256 6 : if (bad_switch)
2257 : {
2258 4 : pg_log_error("options %s and %s cannot be used together",
2259 : bad_switch, "-a/--all");
2260 4 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2261 4 : exit(1);
2262 : }
2263 : }
2264 :
2265 : /* Any non-option arguments? */
2266 32 : if (optind < argc)
2267 : {
2268 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
2269 : argv[optind]);
2270 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2271 0 : exit(1);
2272 : }
2273 :
2274 : /* Required arguments */
2275 32 : if (subscriber_dir == NULL)
2276 : {
2277 2 : pg_log_error("no subscriber data directory specified");
2278 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2279 2 : exit(1);
2280 : }
2281 :
2282 : /* If socket directory is not provided, use the current directory */
2283 30 : if (opt.socket_dir == NULL)
2284 : {
2285 : char cwd[MAXPGPATH];
2286 :
2287 10 : if (!getcwd(cwd, MAXPGPATH))
2288 0 : pg_fatal("could not determine current directory");
2289 10 : opt.socket_dir = pg_strdup(cwd);
2290 10 : canonicalize_path(opt.socket_dir);
2291 : }
2292 :
2293 : /*
2294 : * Parse connection string. Build a base connection string that might be
2295 : * reused by multiple databases.
2296 : */
2297 30 : if (opt.pub_conninfo_str == NULL)
2298 : {
2299 : /*
2300 : * TODO use primary_conninfo (if available) from subscriber and
2301 : * extract publisher connection string. Assume that there are
2302 : * identical entries for physical and logical replication. If there is
2303 : * not, we would fail anyway.
2304 : */
2305 2 : pg_log_error("no publisher connection string specified");
2306 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2307 2 : exit(1);
2308 : }
2309 :
2310 28 : if (dry_run)
2311 16 : pg_log_info("Executing in dry-run mode.\n"
2312 : "The target directory will not be modified.");
2313 :
2314 28 : pg_log_info("validating publisher connection string");
2315 28 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2316 : &dbname_conninfo);
2317 28 : if (pub_base_conninfo == NULL)
2318 0 : exit(1);
2319 :
2320 28 : pg_log_info("validating subscriber connection string");
2321 28 : sub_base_conninfo = get_sub_conninfo(&opt);
2322 :
2323 : /*
2324 : * Fetch all databases from the source (publisher) and treat them as if
2325 : * the user specified has multiple --database options, one for each source
2326 : * database.
2327 : */
2328 28 : if (opt.all_dbs)
2329 : {
2330 2 : bool dbnamespecified = (dbname_conninfo != NULL);
2331 :
2332 2 : get_publisher_databases(&opt, dbnamespecified);
2333 : }
2334 :
2335 28 : if (opt.database_names.head == NULL)
2336 : {
2337 4 : pg_log_info("no database was specified");
2338 :
2339 : /*
2340 : * Try to obtain the dbname from the publisher conninfo. If dbname
2341 : * parameter is not available, error out.
2342 : */
2343 4 : if (dbname_conninfo)
2344 : {
2345 2 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2346 2 : num_dbs++;
2347 :
2348 2 : pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2349 : dbname_conninfo);
2350 : }
2351 : else
2352 : {
2353 2 : pg_log_error("no database name specified");
2354 2 : pg_log_error_hint("Try \"%s --help\" for more information.",
2355 : progname);
2356 2 : exit(1);
2357 : }
2358 : }
2359 :
2360 : /* Number of object names must match number of databases */
2361 26 : if (num_pubs > 0 && num_pubs != num_dbs)
2362 : {
2363 2 : pg_log_error("wrong number of publication names specified");
2364 2 : pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2365 : num_pubs, num_dbs);
2366 2 : exit(1);
2367 : }
2368 24 : if (num_subs > 0 && num_subs != num_dbs)
2369 : {
2370 2 : pg_log_error("wrong number of subscription names specified");
2371 2 : pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2372 : num_subs, num_dbs);
2373 2 : exit(1);
2374 : }
2375 22 : if (num_replslots > 0 && num_replslots != num_dbs)
2376 : {
2377 2 : pg_log_error("wrong number of replication slot names specified");
2378 2 : pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2379 : num_replslots, num_dbs);
2380 2 : exit(1);
2381 : }
2382 :
2383 : /* Verify the object types specified for removal from the subscriber */
2384 22 : for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2385 : {
2386 2 : if (pg_strcasecmp(cell->val, "publications") == 0)
2387 2 : dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
2388 : else
2389 : {
2390 0 : pg_log_error("invalid object type \"%s\" specified for %s",
2391 : cell->val, "--clean");
2392 0 : pg_log_error_hint("The valid value is: \"%s\"", "publications");
2393 0 : exit(1);
2394 : }
2395 : }
2396 :
2397 : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2398 20 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2399 20 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2400 :
2401 : /* Rudimentary check for a data directory */
2402 20 : check_data_directory(subscriber_dir);
2403 :
2404 20 : dbinfos.two_phase = opt.two_phase;
2405 :
2406 : /*
2407 : * Store database information for publisher and subscriber. It should be
2408 : * called before atexit() because its return is used in the
2409 : * cleanup_objects_atexit().
2410 : */
2411 20 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2412 :
2413 : /* Register a function to clean up objects in case of failure */
2414 20 : atexit(cleanup_objects_atexit);
2415 :
2416 : /*
2417 : * Check if the subscriber data directory has the same system identifier
2418 : * than the publisher data directory.
2419 : */
2420 20 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
2421 20 : sub_sysid = get_standby_sysid(subscriber_dir);
2422 20 : if (pub_sysid != sub_sysid)
2423 2 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2424 :
2425 : /* Subscriber PID file */
2426 18 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2427 :
2428 : /*
2429 : * The standby server must not be running. If the server is started under
2430 : * service manager and pg_createsubscriber stops it, the service manager
2431 : * might react to this action and start the server again. Therefore,
2432 : * refuse to proceed if the server is running to avoid possible failures.
2433 : */
2434 18 : if (stat(pidfile, &statbuf) == 0)
2435 : {
2436 2 : pg_log_error("standby server is running");
2437 2 : pg_log_error_hint("Stop the standby server and try again.");
2438 2 : exit(1);
2439 : }
2440 :
2441 : /*
2442 : * Start a short-lived standby server with temporary parameters (provided
2443 : * by command-line options). The goal is to avoid connections during the
2444 : * transformation steps.
2445 : */
2446 16 : pg_log_info("starting the standby server with command-line options");
2447 16 : start_standby_server(&opt, true, false);
2448 :
2449 : /* Check if the standby server is ready for logical replication */
2450 16 : check_subscriber(dbinfos.dbinfo);
2451 :
2452 : /* Check if the primary server is ready for logical replication */
2453 12 : check_publisher(dbinfos.dbinfo);
2454 :
2455 : /*
2456 : * Stop the target server. The recovery process requires that the server
2457 : * reaches a consistent state before targeting the recovery stop point.
2458 : * Make sure a consistent state is reached (stop the target server
2459 : * guarantees it) *before* creating the replication slots in
2460 : * setup_publisher().
2461 : */
2462 8 : pg_log_info("stopping the subscriber");
2463 8 : stop_standby_server(subscriber_dir);
2464 :
2465 : /* Create the required objects for each database on publisher */
2466 8 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2467 :
2468 : /* Write the required recovery parameters */
2469 8 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2470 :
2471 : /*
2472 : * Start subscriber so the recovery parameters will take effect. Wait
2473 : * until accepting connections. We don't want to start logical replication
2474 : * during setup.
2475 : */
2476 8 : pg_log_info("starting the subscriber");
2477 8 : start_standby_server(&opt, true, true);
2478 :
2479 : /* Waiting the subscriber to be promoted */
2480 8 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2481 :
2482 : /*
2483 : * Create the subscription for each database on subscriber. It does not
2484 : * enable it immediately because it needs to adjust the replication start
2485 : * point to the LSN reported by setup_publisher(). It also cleans up
2486 : * publications created by this tool and replication to the standby.
2487 : */
2488 8 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2489 :
2490 : /* Remove primary_slot_name if it exists on primary */
2491 8 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2492 :
2493 : /* Remove failover replication slots if they exist on subscriber */
2494 8 : drop_failover_replication_slots(dbinfos.dbinfo);
2495 :
2496 : /* Stop the subscriber */
2497 8 : pg_log_info("stopping the subscriber");
2498 8 : stop_standby_server(subscriber_dir);
2499 :
2500 : /* Change system identifier from subscriber */
2501 8 : modify_subscriber_sysid(&opt);
2502 :
2503 8 : success = true;
2504 :
2505 8 : pg_log_info("Done!");
2506 :
2507 8 : return 0;
2508 : }
|