Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_createsubscriber.c
4 : * Create a new logical replica from a standby server
5 : *
6 : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres_fe.h"
15 :
16 : #include <sys/stat.h>
17 : #include <sys/time.h>
18 : #include <sys/wait.h>
19 : #include <time.h>
20 :
21 : #include "common/connect.h"
22 : #include "common/controldata_utils.h"
23 : #include "common/logging.h"
24 : #include "common/pg_prng.h"
25 : #include "common/restricted_token.h"
26 : #include "fe_utils/recovery_gen.h"
27 : #include "fe_utils/simple_list.h"
28 : #include "fe_utils/string_utils.h"
29 : #include "fe_utils/version.h"
30 : #include "getopt_long.h"
31 :
32 : #define DEFAULT_SUB_PORT "50432"
33 : #define OBJECTTYPE_PUBLICATIONS 0x0001
34 :
35 : /* Command-line options */
36 : struct CreateSubscriberOptions
37 : {
38 : char *config_file; /* configuration file */
39 : char *pub_conninfo_str; /* publisher connection string */
40 : char *socket_dir; /* directory for Unix-domain socket, if any */
41 : char *sub_port; /* subscriber port number */
42 : const char *sub_username; /* subscriber username */
43 : bool two_phase; /* enable-two-phase option */
44 : SimpleStringList database_names; /* list of database names */
45 : SimpleStringList pub_names; /* list of publication names */
46 : SimpleStringList sub_names; /* list of subscription names */
47 : SimpleStringList replslot_names; /* list of replication slot names */
48 : int recovery_timeout; /* stop recovery after this time */
49 : bool all_dbs; /* all option */
50 : SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
51 : };
52 :
53 : /* per-database publication/subscription info */
54 : struct LogicalRepInfo
55 : {
56 : char *dbname; /* database name */
57 : char *pubconninfo; /* publisher connection string */
58 : char *subconninfo; /* subscriber connection string */
59 : char *pubname; /* publication name */
60 : char *subname; /* subscription name */
61 : char *replslotname; /* replication slot name */
62 :
63 : bool made_replslot; /* replication slot was created */
64 : bool made_publication; /* publication was created */
65 : };
66 :
67 : /*
68 : * Information shared across all the databases (or publications and
69 : * subscriptions).
70 : */
71 : struct LogicalRepInfos
72 : {
73 : struct LogicalRepInfo *dbinfo;
74 : bool two_phase; /* enable-two-phase option */
75 : bits32 objecttypes_to_clean; /* flags indicating which object types
76 : * to clean up on subscriber */
77 : };
78 :
79 : static void cleanup_objects_atexit(void);
80 : static void usage();
81 : static char *get_base_conninfo(const char *conninfo, char **dbname);
82 : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
83 : static char *get_exec_path(const char *argv0, const char *progname);
84 : static void check_data_directory(const char *datadir);
85 : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
86 : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
87 : const char *pub_base_conninfo,
88 : const char *sub_base_conninfo);
89 : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
90 : static void disconnect_database(PGconn *conn, bool exit_on_error);
91 : static uint64 get_primary_sysid(const char *conninfo);
92 : static uint64 get_standby_sysid(const char *datadir);
93 : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
94 : static bool server_is_in_recovery(PGconn *conn);
95 : static char *generate_object_name(PGconn *conn);
96 : static void check_publisher(const struct LogicalRepInfo *dbinfo);
97 : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
98 : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
99 : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
100 : const char *consistent_lsn);
101 : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
102 : const char *lsn);
103 : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
104 : const char *slotname);
105 : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
106 : static char *create_logical_replication_slot(PGconn *conn,
107 : struct LogicalRepInfo *dbinfo);
108 : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
109 : const char *slot_name);
110 : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
111 : static void start_standby_server(const struct CreateSubscriberOptions *opt,
112 : bool restricted_access,
113 : bool restrict_logical_worker);
114 : static void stop_standby_server(const char *datadir);
115 : static void wait_for_end_recovery(const char *conninfo,
116 : const struct CreateSubscriberOptions *opt);
117 : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
118 : static void drop_publication(PGconn *conn, const char *pubname,
119 : const char *dbname, bool *made_publication);
120 : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
121 : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
122 : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
123 : const char *lsn);
124 : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
125 : static void check_and_drop_existing_subscriptions(PGconn *conn,
126 : const struct LogicalRepInfo *dbinfo);
127 : static void drop_existing_subscription(PGconn *conn, const char *subname,
128 : const char *dbname);
129 : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
130 : bool dbnamespecified);
131 :
132 : #define USEC_PER_SEC 1000000
133 : #define WAIT_INTERVAL 1 /* 1 second */
134 :
135 : static const char *progname;
136 :
137 : static char *primary_slot_name = NULL;
138 : static bool dry_run = false;
139 :
140 : static bool success = false;
141 :
142 : static struct LogicalRepInfos dbinfos;
143 : static int num_dbs = 0; /* number of specified databases */
144 : static int num_pubs = 0; /* number of specified publications */
145 : static int num_subs = 0; /* number of specified subscriptions */
146 : static int num_replslots = 0; /* number of specified replication slots */
147 :
148 : static pg_prng_state prng_state;
149 :
150 : static char *pg_ctl_path = NULL;
151 : static char *pg_resetwal_path = NULL;
152 :
153 : /* standby / subscriber data directory */
154 : static char *subscriber_dir = NULL;
155 :
156 : static bool recovery_ended = false;
157 : static bool standby_running = false;
158 :
159 : enum WaitPMResult
160 : {
161 : POSTMASTER_READY,
162 : POSTMASTER_STILL_STARTING
163 : };
164 :
165 :
166 : /*
167 : * Cleanup objects that were created by pg_createsubscriber if there is an
168 : * error.
169 : *
170 : * Publications and replication slots are created on primary. Depending on the
171 : * step it failed, it should remove the already created objects if it is
172 : * possible (sometimes it won't work due to a connection issue).
173 : * There is no cleanup on the target server. The steps on the target server are
174 : * executed *after* promotion, hence, at this point, a failure means recreate
175 : * the physical replica and start again.
176 : */
177 : static void
178 20 : cleanup_objects_atexit(void)
179 : {
180 20 : if (success)
181 8 : return;
182 :
183 : /*
184 : * If the server is promoted, there is no way to use the current setup
185 : * again. Warn the user that a new replication setup should be done before
186 : * trying again.
187 : */
188 12 : if (recovery_ended)
189 : {
190 0 : pg_log_warning("failed after the end of recovery");
191 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
192 : "You must recreate the physical replica before continuing.");
193 : }
194 :
195 36 : for (int i = 0; i < num_dbs; i++)
196 : {
197 24 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
198 :
199 24 : if (dbinfo->made_publication || dbinfo->made_replslot)
200 : {
201 : PGconn *conn;
202 :
203 0 : conn = connect_database(dbinfo->pubconninfo, false);
204 0 : if (conn != NULL)
205 : {
206 0 : if (dbinfo->made_publication)
207 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
208 : &dbinfo->made_publication);
209 0 : if (dbinfo->made_replslot)
210 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
211 0 : disconnect_database(conn, false);
212 : }
213 : else
214 : {
215 : /*
216 : * If a connection could not be established, inform the user
217 : * that some objects were left on primary and should be
218 : * removed before trying again.
219 : */
220 0 : if (dbinfo->made_publication)
221 : {
222 0 : pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
223 : dbinfo->pubname,
224 : dbinfo->dbname);
225 0 : pg_log_warning_hint("Drop this publication before trying again.");
226 : }
227 0 : if (dbinfo->made_replslot)
228 : {
229 0 : pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
230 : dbinfo->replslotname,
231 : dbinfo->dbname);
232 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
233 : }
234 : }
235 : }
236 : }
237 :
238 12 : if (standby_running)
239 8 : stop_standby_server(subscriber_dir);
240 : }
241 :
242 : static void
243 2 : usage(void)
244 : {
245 2 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
246 : progname);
247 2 : printf(_("Usage:\n"));
248 2 : printf(_(" %s [OPTION]...\n"), progname);
249 2 : printf(_("\nOptions:\n"));
250 2 : printf(_(" -a, --all create subscriptions for all databases except template\n"
251 : " databases and databases that don't allow connections\n"));
252 2 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
253 2 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
254 2 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
255 2 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
256 2 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
257 2 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
258 2 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
259 2 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
260 2 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
261 2 : printf(_(" -v, --verbose output verbose messages\n"));
262 2 : printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
263 : " databases on the subscriber; accepts: \"%s\"\n"), "publications");
264 2 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
265 : " file when running target cluster\n"));
266 2 : printf(_(" --publication=NAME publication name\n"));
267 2 : printf(_(" --replication-slot=NAME replication slot name\n"));
268 2 : printf(_(" --subscription=NAME subscription name\n"));
269 2 : printf(_(" -V, --version output version information, then exit\n"));
270 2 : printf(_(" -?, --help show this help, then exit\n"));
271 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
272 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
273 2 : }
274 :
275 : /*
276 : * Subroutine to append "keyword=value" to a connection string,
277 : * with proper quoting of the value. (We assume keywords don't need that.)
278 : */
279 : static void
280 214 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
281 : {
282 214 : if (buf->len > 0)
283 158 : appendPQExpBufferChar(buf, ' ');
284 214 : appendPQExpBufferStr(buf, keyword);
285 214 : appendPQExpBufferChar(buf, '=');
286 214 : appendConnStrVal(buf, val);
287 214 : }
288 :
289 : /*
290 : * Validate a connection string. Returns a base connection string that is a
291 : * connection string without a database name.
292 : *
293 : * Since we might process multiple databases, each database name will be
294 : * appended to this base connection string to provide a final connection
295 : * string. If the second argument (dbname) is not null, returns dbname if the
296 : * provided connection string contains it.
297 : *
298 : * It is the caller's responsibility to free the returned connection string and
299 : * dbname.
300 : */
301 : static char *
302 28 : get_base_conninfo(const char *conninfo, char **dbname)
303 : {
304 : PQExpBuffer buf;
305 : PQconninfoOption *conn_opts;
306 : PQconninfoOption *conn_opt;
307 28 : char *errmsg = NULL;
308 : char *ret;
309 :
310 28 : conn_opts = PQconninfoParse(conninfo, &errmsg);
311 28 : if (conn_opts == NULL)
312 : {
313 0 : pg_log_error("could not parse connection string: %s", errmsg);
314 0 : PQfreemem(errmsg);
315 0 : return NULL;
316 : }
317 :
318 28 : buf = createPQExpBuffer();
319 1456 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
320 : {
321 1428 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
322 : {
323 66 : if (strcmp(conn_opt->keyword, "dbname") == 0)
324 : {
325 18 : if (dbname)
326 18 : *dbname = pg_strdup(conn_opt->val);
327 18 : continue;
328 : }
329 48 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
330 : }
331 : }
332 :
333 28 : ret = pg_strdup(buf->data);
334 :
335 28 : destroyPQExpBuffer(buf);
336 28 : PQconninfoFree(conn_opts);
337 :
338 28 : return ret;
339 : }
340 :
341 : /*
342 : * Build a subscriber connection string. Only a few parameters are supported
343 : * since it starts a server with restricted access.
344 : */
345 : static char *
346 28 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
347 : {
348 28 : PQExpBuffer buf = createPQExpBuffer();
349 : char *ret;
350 :
351 28 : appendConnStrItem(buf, "port", opt->sub_port);
352 : #if !defined(WIN32)
353 28 : appendConnStrItem(buf, "host", opt->socket_dir);
354 : #endif
355 28 : if (opt->sub_username != NULL)
356 0 : appendConnStrItem(buf, "user", opt->sub_username);
357 28 : appendConnStrItem(buf, "fallback_application_name", progname);
358 :
359 28 : ret = pg_strdup(buf->data);
360 :
361 28 : destroyPQExpBuffer(buf);
362 :
363 28 : return ret;
364 : }
365 :
366 : /*
367 : * Verify if a PostgreSQL binary (progname) is available in the same directory as
368 : * pg_createsubscriber and it has the same version. It returns the absolute
369 : * path of the progname.
370 : */
371 : static char *
372 40 : get_exec_path(const char *argv0, const char *progname)
373 : {
374 : char *versionstr;
375 : char *exec_path;
376 : int ret;
377 :
378 40 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
379 40 : exec_path = pg_malloc(MAXPGPATH);
380 40 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
381 :
382 40 : if (ret < 0)
383 : {
384 : char full_path[MAXPGPATH];
385 :
386 0 : if (find_my_exec(argv0, full_path) < 0)
387 0 : strlcpy(full_path, progname, sizeof(full_path));
388 :
389 0 : if (ret == -1)
390 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
391 : progname, "pg_createsubscriber", full_path);
392 : else
393 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
394 : progname, full_path, "pg_createsubscriber");
395 : }
396 :
397 40 : pg_log_debug("%s path is: %s", progname, exec_path);
398 :
399 40 : return exec_path;
400 : }
401 :
402 : /*
403 : * Is it a cluster directory? These are preliminary checks. It is far from
404 : * making an accurate check. If it is not a clone from the publisher, it will
405 : * eventually fail in a future step.
406 : */
407 : static void
408 20 : check_data_directory(const char *datadir)
409 : {
410 : struct stat statbuf;
411 : uint32 major_version;
412 : char *version_str;
413 :
414 20 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
415 : datadir);
416 :
417 20 : if (stat(datadir, &statbuf) != 0)
418 : {
419 0 : if (errno == ENOENT)
420 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
421 : else
422 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
423 : }
424 :
425 : /*
426 : * Retrieve the contents of this cluster's PG_VERSION. We require
427 : * compatibility with the same major version as the one this tool is
428 : * compiled with.
429 : */
430 20 : major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
431 20 : if (major_version != PG_MAJORVERSION_NUM)
432 : {
433 0 : pg_log_error("data directory is of wrong version");
434 0 : pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
435 : "PG_VERSION", version_str, PG_MAJORVERSION);
436 0 : exit(1);
437 : }
438 20 : }
439 :
440 : /*
441 : * Append database name into a base connection string.
442 : *
443 : * dbname is the only parameter that changes so it is not included in the base
444 : * connection string. This function concatenates dbname to build a "real"
445 : * connection string.
446 : */
447 : static char *
448 82 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
449 : {
450 82 : PQExpBuffer buf = createPQExpBuffer();
451 : char *ret;
452 :
453 : Assert(conninfo != NULL);
454 :
455 82 : appendPQExpBufferStr(buf, conninfo);
456 82 : appendConnStrItem(buf, "dbname", dbname);
457 :
458 82 : ret = pg_strdup(buf->data);
459 82 : destroyPQExpBuffer(buf);
460 :
461 82 : return ret;
462 : }
463 :
464 : /*
465 : * Store publication and subscription information.
466 : *
467 : * If publication, replication slot and subscription names were specified,
468 : * store it here. Otherwise, a generated name will be assigned to the object in
469 : * setup_publisher().
470 : */
471 : static struct LogicalRepInfo *
472 20 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
473 : const char *pub_base_conninfo,
474 : const char *sub_base_conninfo)
475 : {
476 : struct LogicalRepInfo *dbinfo;
477 20 : SimpleStringListCell *pubcell = NULL;
478 20 : SimpleStringListCell *subcell = NULL;
479 20 : SimpleStringListCell *replslotcell = NULL;
480 20 : int i = 0;
481 :
482 20 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
483 :
484 20 : if (num_pubs > 0)
485 4 : pubcell = opt->pub_names.head;
486 20 : if (num_subs > 0)
487 2 : subcell = opt->sub_names.head;
488 20 : if (num_replslots > 0)
489 4 : replslotcell = opt->replslot_names.head;
490 :
491 60 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
492 : {
493 : char *conninfo;
494 :
495 : /* Fill publisher attributes */
496 40 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
497 40 : dbinfo[i].pubconninfo = conninfo;
498 40 : dbinfo[i].dbname = cell->val;
499 40 : if (num_pubs > 0)
500 8 : dbinfo[i].pubname = pubcell->val;
501 : else
502 32 : dbinfo[i].pubname = NULL;
503 40 : if (num_replslots > 0)
504 6 : dbinfo[i].replslotname = replslotcell->val;
505 : else
506 34 : dbinfo[i].replslotname = NULL;
507 40 : dbinfo[i].made_replslot = false;
508 40 : dbinfo[i].made_publication = false;
509 : /* Fill subscriber attributes */
510 40 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
511 40 : dbinfo[i].subconninfo = conninfo;
512 40 : if (num_subs > 0)
513 4 : dbinfo[i].subname = subcell->val;
514 : else
515 36 : dbinfo[i].subname = NULL;
516 : /* Other fields will be filled later */
517 :
518 40 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
519 : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
520 : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
521 : dbinfo[i].pubconninfo);
522 40 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
523 : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
524 : dbinfo[i].subconninfo,
525 : dbinfos.two_phase ? "true" : "false");
526 :
527 40 : if (num_pubs > 0)
528 8 : pubcell = pubcell->next;
529 40 : if (num_subs > 0)
530 4 : subcell = subcell->next;
531 40 : if (num_replslots > 0)
532 6 : replslotcell = replslotcell->next;
533 :
534 40 : i++;
535 : }
536 :
537 20 : return dbinfo;
538 : }
539 :
540 : /*
541 : * Open a new connection. If exit_on_error is true, it has an undesired
542 : * condition and it should exit immediately.
543 : */
544 : static PGconn *
545 114 : connect_database(const char *conninfo, bool exit_on_error)
546 : {
547 : PGconn *conn;
548 : PGresult *res;
549 :
550 114 : conn = PQconnectdb(conninfo);
551 114 : if (PQstatus(conn) != CONNECTION_OK)
552 : {
553 0 : pg_log_error("connection to database failed: %s",
554 : PQerrorMessage(conn));
555 0 : PQfinish(conn);
556 :
557 0 : if (exit_on_error)
558 0 : exit(1);
559 0 : return NULL;
560 : }
561 :
562 : /* Secure search_path */
563 114 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
564 114 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
565 : {
566 0 : pg_log_error("could not clear \"search_path\": %s",
567 : PQresultErrorMessage(res));
568 0 : PQclear(res);
569 0 : PQfinish(conn);
570 :
571 0 : if (exit_on_error)
572 0 : exit(1);
573 0 : return NULL;
574 : }
575 114 : PQclear(res);
576 :
577 114 : return conn;
578 : }
579 :
580 : /*
581 : * Close the connection. If exit_on_error is true, it has an undesired
582 : * condition and it should exit immediately.
583 : */
584 : static void
585 114 : disconnect_database(PGconn *conn, bool exit_on_error)
586 : {
587 : Assert(conn != NULL);
588 :
589 114 : PQfinish(conn);
590 :
591 114 : if (exit_on_error)
592 4 : exit(1);
593 110 : }
594 :
595 : /*
596 : * Obtain the system identifier using the provided connection. It will be used
597 : * to compare if a data directory is a clone of another one.
598 : */
599 : static uint64
600 20 : get_primary_sysid(const char *conninfo)
601 : {
602 : PGconn *conn;
603 : PGresult *res;
604 : uint64 sysid;
605 :
606 20 : pg_log_info("getting system identifier from publisher");
607 :
608 20 : conn = connect_database(conninfo, true);
609 :
610 20 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
611 20 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
612 : {
613 0 : pg_log_error("could not get system identifier: %s",
614 : PQresultErrorMessage(res));
615 0 : disconnect_database(conn, true);
616 : }
617 20 : if (PQntuples(res) != 1)
618 : {
619 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
620 : PQntuples(res), 1);
621 0 : disconnect_database(conn, true);
622 : }
623 :
624 20 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
625 :
626 20 : pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
627 :
628 20 : PQclear(res);
629 20 : disconnect_database(conn, false);
630 :
631 20 : return sysid;
632 : }
633 :
634 : /*
635 : * Obtain the system identifier from control file. It will be used to compare
636 : * if a data directory is a clone of another one. This routine is used locally
637 : * and avoids a connection.
638 : */
639 : static uint64
640 20 : get_standby_sysid(const char *datadir)
641 : {
642 : ControlFileData *cf;
643 : bool crc_ok;
644 : uint64 sysid;
645 :
646 20 : pg_log_info("getting system identifier from subscriber");
647 :
648 20 : cf = get_controlfile(datadir, &crc_ok);
649 20 : if (!crc_ok)
650 0 : pg_fatal("control file appears to be corrupt");
651 :
652 20 : sysid = cf->system_identifier;
653 :
654 20 : pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
655 :
656 20 : pg_free(cf);
657 :
658 20 : return sysid;
659 : }
660 :
661 : /*
662 : * Modify the system identifier. Since a standby server preserves the system
663 : * identifier, it makes sense to change it to avoid situations in which WAL
664 : * files from one of the systems might be used in the other one.
665 : */
666 : static void
667 8 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
668 : {
669 : ControlFileData *cf;
670 : bool crc_ok;
671 : struct timeval tv;
672 :
673 : char *cmd_str;
674 :
675 8 : pg_log_info("modifying system identifier of subscriber");
676 :
677 8 : cf = get_controlfile(subscriber_dir, &crc_ok);
678 8 : if (!crc_ok)
679 0 : pg_fatal("control file appears to be corrupt");
680 :
681 : /*
682 : * Select a new system identifier.
683 : *
684 : * XXX this code was extracted from BootStrapXLOG().
685 : */
686 8 : gettimeofday(&tv, NULL);
687 8 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
688 8 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
689 8 : cf->system_identifier |= getpid() & 0xFFF;
690 :
691 8 : if (dry_run)
692 6 : pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
693 : cf->system_identifier);
694 : else
695 : {
696 2 : update_controlfile(subscriber_dir, cf, true);
697 2 : pg_log_info("system identifier is %" PRIu64 " on subscriber",
698 : cf->system_identifier);
699 : }
700 :
701 8 : if (dry_run)
702 6 : pg_log_info("dry-run: would run pg_resetwal on the subscriber");
703 : else
704 2 : pg_log_info("running pg_resetwal on the subscriber");
705 :
706 8 : cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
707 : subscriber_dir, DEVNULL);
708 :
709 8 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
710 :
711 8 : if (!dry_run)
712 : {
713 2 : int rc = system(cmd_str);
714 :
715 2 : if (rc == 0)
716 2 : pg_log_info("subscriber successfully reset WAL on the subscriber");
717 : else
718 0 : pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
719 : }
720 :
721 8 : pg_free(cf);
722 8 : }
723 :
724 : /*
725 : * Generate an object name using a prefix, database oid and a random integer.
726 : * It is used in case the user does not specify an object name (publication,
727 : * subscription, replication slot).
728 : */
729 : static char *
730 16 : generate_object_name(PGconn *conn)
731 : {
732 : PGresult *res;
733 : Oid oid;
734 : uint32 rand;
735 : char *objname;
736 :
737 16 : res = PQexec(conn,
738 : "SELECT oid FROM pg_catalog.pg_database "
739 : "WHERE datname = pg_catalog.current_database()");
740 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
741 : {
742 0 : pg_log_error("could not obtain database OID: %s",
743 : PQresultErrorMessage(res));
744 0 : disconnect_database(conn, true);
745 : }
746 :
747 16 : if (PQntuples(res) != 1)
748 : {
749 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
750 : PQntuples(res), 1);
751 0 : disconnect_database(conn, true);
752 : }
753 :
754 : /* Database OID */
755 16 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
756 :
757 16 : PQclear(res);
758 :
759 : /* Random unsigned integer */
760 16 : rand = pg_prng_uint32(&prng_state);
761 :
762 : /*
763 : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
764 : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
765 : * '\0').
766 : */
767 16 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
768 :
769 16 : return objname;
770 : }
771 :
772 : /*
773 : * Create the publications and replication slots in preparation for logical
774 : * replication. Returns the LSN from latest replication slot. It will be the
775 : * replication start point that is used to adjust the subscriptions (see
776 : * set_replication_progress).
777 : */
778 : static char *
779 8 : setup_publisher(struct LogicalRepInfo *dbinfo)
780 : {
781 8 : char *lsn = NULL;
782 :
783 8 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
784 :
785 24 : for (int i = 0; i < num_dbs; i++)
786 : {
787 : PGconn *conn;
788 16 : char *genname = NULL;
789 :
790 16 : conn = connect_database(dbinfo[i].pubconninfo, true);
791 :
792 : /*
793 : * If an object name was not specified as command-line options, assign
794 : * a generated object name. The replication slot has a different rule.
795 : * The subscription name is assigned to the replication slot name if
796 : * no replication slot is specified. It follows the same rule as
797 : * CREATE SUBSCRIPTION.
798 : */
799 16 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
800 16 : genname = generate_object_name(conn);
801 16 : if (num_pubs == 0)
802 8 : dbinfo[i].pubname = pg_strdup(genname);
803 16 : if (num_subs == 0)
804 12 : dbinfo[i].subname = pg_strdup(genname);
805 16 : if (num_replslots == 0)
806 10 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
807 :
808 : /*
809 : * Create publication on publisher. This step should be executed
810 : * *before* promoting the subscriber to avoid any transactions between
811 : * consistent LSN and the new publication rows (such transactions
812 : * wouldn't see the new publication rows resulting in an error).
813 : */
814 16 : create_publication(conn, &dbinfo[i]);
815 :
816 : /* Create replication slot on publisher */
817 16 : if (lsn)
818 2 : pg_free(lsn);
819 16 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
820 16 : if (lsn == NULL && !dry_run)
821 0 : exit(1);
822 :
823 : /*
824 : * Since we are using the LSN returned by the last replication slot as
825 : * recovery_target_lsn, this LSN is ahead of the current WAL position
826 : * and the recovery waits until the publisher writes a WAL record to
827 : * reach the target and ends the recovery. On idle systems, this wait
828 : * time is unpredictable and could lead to failure in promoting the
829 : * subscriber. To avoid that, insert a harmless WAL record.
830 : */
831 16 : if (i == num_dbs - 1 && !dry_run)
832 : {
833 : PGresult *res;
834 :
835 2 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
836 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
837 : {
838 0 : pg_log_error("could not write an additional WAL record: %s",
839 : PQresultErrorMessage(res));
840 0 : disconnect_database(conn, true);
841 : }
842 2 : PQclear(res);
843 : }
844 :
845 16 : disconnect_database(conn, false);
846 : }
847 :
848 8 : return lsn;
849 : }
850 :
851 : /*
852 : * Is recovery still in progress?
853 : */
854 : static bool
855 30 : server_is_in_recovery(PGconn *conn)
856 : {
857 : PGresult *res;
858 : int ret;
859 :
860 30 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
861 :
862 30 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
863 : {
864 0 : pg_log_error("could not obtain recovery progress: %s",
865 : PQresultErrorMessage(res));
866 0 : disconnect_database(conn, true);
867 : }
868 :
869 :
870 30 : ret = strcmp("t", PQgetvalue(res, 0, 0));
871 :
872 30 : PQclear(res);
873 :
874 30 : return ret == 0;
875 : }
876 :
877 : /*
878 : * Is the primary server ready for logical replication?
879 : *
880 : * XXX Does it not allow a synchronous replica?
881 : */
882 : static void
883 12 : check_publisher(const struct LogicalRepInfo *dbinfo)
884 : {
885 : PGconn *conn;
886 : PGresult *res;
887 12 : bool failed = false;
888 :
889 : char *wal_level;
890 : int max_repslots;
891 : int cur_repslots;
892 : int max_walsenders;
893 : int cur_walsenders;
894 : int max_prepared_transactions;
895 : char *max_slot_wal_keep_size;
896 :
897 12 : pg_log_info("checking settings on publisher");
898 :
899 12 : conn = connect_database(dbinfo[0].pubconninfo, true);
900 :
901 : /*
902 : * If the primary server is in recovery (i.e. cascading replication),
903 : * objects (publication) cannot be created because it is read only.
904 : */
905 12 : if (server_is_in_recovery(conn))
906 : {
907 2 : pg_log_error("primary server cannot be in recovery");
908 2 : disconnect_database(conn, true);
909 : }
910 :
911 : /*------------------------------------------------------------------------
912 : * Logical replication requires a few parameters to be set on publisher.
913 : * Since these parameters are not a requirement for physical replication,
914 : * we should check it to make sure it won't fail.
915 : *
916 : * - wal_level = logical
917 : * - max_replication_slots >= current + number of dbs to be converted
918 : * - max_wal_senders >= current + number of dbs to be converted
919 : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
920 : * -----------------------------------------------------------------------
921 : */
922 10 : res = PQexec(conn,
923 : "SELECT pg_catalog.current_setting('wal_level'),"
924 : " pg_catalog.current_setting('max_replication_slots'),"
925 : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
926 : " pg_catalog.current_setting('max_wal_senders'),"
927 : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
928 : " pg_catalog.current_setting('max_prepared_transactions'),"
929 : " pg_catalog.current_setting('max_slot_wal_keep_size')");
930 :
931 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
932 : {
933 0 : pg_log_error("could not obtain publisher settings: %s",
934 : PQresultErrorMessage(res));
935 0 : disconnect_database(conn, true);
936 : }
937 :
938 10 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
939 10 : max_repslots = atoi(PQgetvalue(res, 0, 1));
940 10 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
941 10 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
942 10 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
943 10 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
944 10 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
945 :
946 10 : PQclear(res);
947 :
948 10 : pg_log_debug("publisher: wal_level: %s", wal_level);
949 10 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
950 10 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
951 10 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
952 10 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
953 10 : pg_log_debug("publisher: max_prepared_transactions: %d",
954 : max_prepared_transactions);
955 10 : pg_log_debug("publisher: max_slot_wal_keep_size: %s",
956 : max_slot_wal_keep_size);
957 :
958 10 : disconnect_database(conn, false);
959 :
960 10 : if (strcmp(wal_level, "logical") != 0)
961 : {
962 2 : pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
963 2 : failed = true;
964 : }
965 :
966 10 : if (max_repslots - cur_repslots < num_dbs)
967 : {
968 2 : pg_log_error("publisher requires %d replication slots, but only %d remain",
969 : num_dbs, max_repslots - cur_repslots);
970 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
971 : "max_replication_slots", cur_repslots + num_dbs);
972 2 : failed = true;
973 : }
974 :
975 10 : if (max_walsenders - cur_walsenders < num_dbs)
976 : {
977 2 : pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
978 : num_dbs, max_walsenders - cur_walsenders);
979 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
980 : "max_wal_senders", cur_walsenders + num_dbs);
981 2 : failed = true;
982 : }
983 :
984 10 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
985 : {
986 0 : pg_log_warning("two_phase option will not be enabled for replication slots");
987 0 : pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
988 : "Prepared transactions will be replicated at COMMIT PREPARED.");
989 0 : pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
990 : }
991 :
992 : /*
993 : * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
994 : * is set to a non-default value, it may cause replication failures due to
995 : * required WAL files being prematurely removed.
996 : */
997 10 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
998 : {
999 0 : pg_log_warning("required WAL could be removed from the publisher");
1000 0 : pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1001 : "max_slot_wal_keep_size");
1002 : }
1003 :
1004 10 : pg_free(wal_level);
1005 :
1006 10 : if (failed)
1007 2 : exit(1);
1008 8 : }
1009 :
1010 : /*
1011 : * Is the standby server ready for logical replication?
1012 : *
1013 : * XXX Does it not allow a time-delayed replica?
1014 : *
1015 : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1016 : * is S, it cannot detect there is a replica (server C) because server S starts
1017 : * accepting only local connections and server C cannot connect to it. Hence,
1018 : * there is not a reliable way to provide a suitable error saying the server C
1019 : * will be broken at the end of this process (due to pg_resetwal).
1020 : */
1021 : static void
1022 16 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1023 : {
1024 : PGconn *conn;
1025 : PGresult *res;
1026 16 : bool failed = false;
1027 :
1028 : int max_lrworkers;
1029 : int max_reporigins;
1030 : int max_wprocs;
1031 :
1032 16 : pg_log_info("checking settings on subscriber");
1033 :
1034 16 : conn = connect_database(dbinfo[0].subconninfo, true);
1035 :
1036 : /* The target server must be a standby */
1037 16 : if (!server_is_in_recovery(conn))
1038 : {
1039 2 : pg_log_error("target server must be a standby");
1040 2 : disconnect_database(conn, true);
1041 : }
1042 :
1043 : /*------------------------------------------------------------------------
1044 : * Logical replication requires a few parameters to be set on subscriber.
1045 : * Since these parameters are not a requirement for physical replication,
1046 : * we should check it to make sure it won't fail.
1047 : *
1048 : * - max_active_replication_origins >= number of dbs to be converted
1049 : * - max_logical_replication_workers >= number of dbs to be converted
1050 : * - max_worker_processes >= 1 + number of dbs to be converted
1051 : *------------------------------------------------------------------------
1052 : */
1053 14 : res = PQexec(conn,
1054 : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1055 : "'max_logical_replication_workers', "
1056 : "'max_active_replication_origins', "
1057 : "'max_worker_processes', "
1058 : "'primary_slot_name') "
1059 : "ORDER BY name");
1060 :
1061 14 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1062 : {
1063 0 : pg_log_error("could not obtain subscriber settings: %s",
1064 : PQresultErrorMessage(res));
1065 0 : disconnect_database(conn, true);
1066 : }
1067 :
1068 14 : max_reporigins = atoi(PQgetvalue(res, 0, 0));
1069 14 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1070 14 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1071 14 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1072 12 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1073 :
1074 14 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1075 : max_lrworkers);
1076 14 : pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1077 14 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1078 14 : if (primary_slot_name)
1079 12 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1080 :
1081 14 : PQclear(res);
1082 :
1083 14 : disconnect_database(conn, false);
1084 :
1085 14 : if (max_reporigins < num_dbs)
1086 : {
1087 2 : pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1088 : num_dbs, max_reporigins);
1089 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1090 : "max_active_replication_origins", num_dbs);
1091 2 : failed = true;
1092 : }
1093 :
1094 14 : if (max_lrworkers < num_dbs)
1095 : {
1096 2 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1097 : num_dbs, max_lrworkers);
1098 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1099 : "max_logical_replication_workers", num_dbs);
1100 2 : failed = true;
1101 : }
1102 :
1103 14 : if (max_wprocs < num_dbs + 1)
1104 : {
1105 2 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1106 : num_dbs + 1, max_wprocs);
1107 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1108 : "max_worker_processes", num_dbs + 1);
1109 2 : failed = true;
1110 : }
1111 :
1112 14 : if (failed)
1113 2 : exit(1);
1114 12 : }
1115 :
1116 : /*
1117 : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1118 : * the primary (publisher node) and the newly created subscriber. We
1119 : * shouldn't drop the associated slot as that would be used by the publisher
1120 : * node.
1121 : */
1122 : static void
1123 8 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
1124 : {
1125 8 : PQExpBuffer query = createPQExpBuffer();
1126 : PGresult *res;
1127 :
1128 : Assert(conn != NULL);
1129 :
1130 : /*
1131 : * Construct a query string. These commands are allowed to be executed
1132 : * within a transaction.
1133 : */
1134 8 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1135 : subname);
1136 8 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1137 : subname);
1138 8 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1139 :
1140 8 : if (dry_run)
1141 6 : pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1142 : subname, dbname);
1143 : else
1144 : {
1145 2 : pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1146 : subname, dbname);
1147 :
1148 2 : res = PQexec(conn, query->data);
1149 :
1150 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1151 : {
1152 0 : pg_log_error("could not drop subscription \"%s\": %s",
1153 : subname, PQresultErrorMessage(res));
1154 0 : disconnect_database(conn, true);
1155 : }
1156 :
1157 2 : PQclear(res);
1158 : }
1159 :
1160 8 : destroyPQExpBuffer(query);
1161 8 : }
1162 :
1163 : /*
1164 : * Retrieve and drop the pre-existing subscriptions.
1165 : */
1166 : static void
1167 16 : check_and_drop_existing_subscriptions(PGconn *conn,
1168 : const struct LogicalRepInfo *dbinfo)
1169 : {
1170 16 : PQExpBuffer query = createPQExpBuffer();
1171 : char *dbname;
1172 : PGresult *res;
1173 :
1174 : Assert(conn != NULL);
1175 :
1176 16 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1177 :
1178 16 : appendPQExpBuffer(query,
1179 : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1180 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1181 : "WHERE d.datname = %s",
1182 : dbname);
1183 16 : res = PQexec(conn, query->data);
1184 :
1185 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1186 : {
1187 0 : pg_log_error("could not obtain pre-existing subscriptions: %s",
1188 : PQresultErrorMessage(res));
1189 0 : disconnect_database(conn, true);
1190 : }
1191 :
1192 24 : for (int i = 0; i < PQntuples(res); i++)
1193 8 : drop_existing_subscription(conn, PQgetvalue(res, i, 0),
1194 8 : dbinfo->dbname);
1195 :
1196 16 : PQclear(res);
1197 16 : destroyPQExpBuffer(query);
1198 16 : PQfreemem(dbname);
1199 16 : }
1200 :
1201 : /*
1202 : * Create the subscriptions, adjust the initial location for logical
1203 : * replication and enable the subscriptions. That's the last step for logical
1204 : * replication setup.
1205 : */
1206 : static void
1207 8 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1208 : {
1209 24 : for (int i = 0; i < num_dbs; i++)
1210 : {
1211 : PGconn *conn;
1212 :
1213 : /* Connect to subscriber. */
1214 16 : conn = connect_database(dbinfo[i].subconninfo, true);
1215 :
1216 : /*
1217 : * We don't need the pre-existing subscriptions on the newly formed
1218 : * subscriber. They can connect to other publisher nodes and either
1219 : * get some unwarranted data or can lead to ERRORs in connecting to
1220 : * such nodes.
1221 : */
1222 16 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1223 :
1224 : /* Check and drop the required publications in the given database. */
1225 16 : check_and_drop_publications(conn, &dbinfo[i]);
1226 :
1227 16 : create_subscription(conn, &dbinfo[i]);
1228 :
1229 : /* Set the replication progress to the correct LSN */
1230 16 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1231 :
1232 : /* Enable subscription */
1233 16 : enable_subscription(conn, &dbinfo[i]);
1234 :
1235 16 : disconnect_database(conn, false);
1236 : }
1237 8 : }
1238 :
1239 : /*
1240 : * Write the required recovery parameters.
1241 : */
1242 : static void
1243 8 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1244 : {
1245 : PGconn *conn;
1246 : PQExpBuffer recoveryconfcontents;
1247 :
1248 : /*
1249 : * Despite of the recovery parameters will be written to the subscriber,
1250 : * use a publisher connection. The primary_conninfo is generated using the
1251 : * connection settings.
1252 : */
1253 8 : conn = connect_database(dbinfo[0].pubconninfo, true);
1254 :
1255 : /*
1256 : * Write recovery parameters.
1257 : *
1258 : * The subscriber is not running yet. In dry run mode, the recovery
1259 : * parameters *won't* be written. An invalid LSN is used for printing
1260 : * purposes. Additional recovery parameters are added here. It avoids
1261 : * unexpected behavior such as end of recovery as soon as a consistent
1262 : * state is reached (recovery_target) and failure due to multiple recovery
1263 : * targets (name, time, xid, LSN).
1264 : */
1265 8 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1266 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1267 8 : appendPQExpBufferStr(recoveryconfcontents,
1268 : "recovery_target_timeline = 'latest'\n");
1269 :
1270 : /*
1271 : * Set recovery_target_inclusive = false to avoid reapplying the
1272 : * transaction committed at 'lsn' after subscription is enabled. This is
1273 : * because the provided 'lsn' is also used as the replication start point
1274 : * for the subscription. So, the server can send the transaction committed
1275 : * at that 'lsn' after replication is started which can lead to applying
1276 : * the same transaction twice if we keep recovery_target_inclusive = true.
1277 : */
1278 8 : appendPQExpBufferStr(recoveryconfcontents,
1279 : "recovery_target_inclusive = false\n");
1280 8 : appendPQExpBufferStr(recoveryconfcontents,
1281 : "recovery_target_action = promote\n");
1282 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1283 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1284 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1285 :
1286 8 : if (dry_run)
1287 : {
1288 6 : appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1289 6 : appendPQExpBuffer(recoveryconfcontents,
1290 : "recovery_target_lsn = '%X/%08X'\n",
1291 6 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1292 : }
1293 : else
1294 : {
1295 2 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1296 : lsn);
1297 2 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1298 : }
1299 8 : disconnect_database(conn, false);
1300 :
1301 8 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1302 8 : }
1303 :
1304 : /*
1305 : * Drop physical replication slot on primary if the standby was using it. After
1306 : * the transformation, it has no use.
1307 : *
1308 : * XXX we might not fail here. Instead, we provide a warning so the user
1309 : * eventually drops this replication slot later.
1310 : */
1311 : static void
1312 8 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1313 : {
1314 : PGconn *conn;
1315 :
1316 : /* Replication slot does not exist, do nothing */
1317 8 : if (!primary_slot_name)
1318 0 : return;
1319 :
1320 8 : conn = connect_database(dbinfo[0].pubconninfo, false);
1321 8 : if (conn != NULL)
1322 : {
1323 8 : drop_replication_slot(conn, &dbinfo[0], slotname);
1324 8 : disconnect_database(conn, false);
1325 : }
1326 : else
1327 : {
1328 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1329 : slotname);
1330 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1331 : }
1332 : }
1333 :
1334 : /*
1335 : * Drop failover replication slots on subscriber. After the transformation,
1336 : * they have no use.
1337 : *
1338 : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1339 : * them later.
1340 : */
1341 : static void
1342 8 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1343 : {
1344 : PGconn *conn;
1345 : PGresult *res;
1346 :
1347 8 : conn = connect_database(dbinfo[0].subconninfo, false);
1348 8 : if (conn != NULL)
1349 : {
1350 : /* Get failover replication slot names */
1351 8 : res = PQexec(conn,
1352 : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1353 :
1354 8 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1355 : {
1356 : /* Remove failover replication slots from subscriber */
1357 16 : for (int i = 0; i < PQntuples(res); i++)
1358 8 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1359 : }
1360 : else
1361 : {
1362 0 : pg_log_warning("could not obtain failover replication slot information: %s",
1363 : PQresultErrorMessage(res));
1364 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1365 : }
1366 :
1367 8 : PQclear(res);
1368 8 : disconnect_database(conn, false);
1369 : }
1370 : else
1371 : {
1372 0 : pg_log_warning("could not drop failover replication slot");
1373 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1374 : }
1375 8 : }
1376 :
1377 : /*
1378 : * Create a logical replication slot and returns a LSN.
1379 : *
1380 : * CreateReplicationSlot() is not used because it does not provide the one-row
1381 : * result set that contains the LSN.
1382 : */
1383 : static char *
1384 16 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1385 : {
1386 16 : PQExpBuffer str = createPQExpBuffer();
1387 16 : PGresult *res = NULL;
1388 16 : const char *slot_name = dbinfo->replslotname;
1389 : char *slot_name_esc;
1390 16 : char *lsn = NULL;
1391 :
1392 : Assert(conn != NULL);
1393 :
1394 16 : if (dry_run)
1395 12 : pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1396 : slot_name, dbinfo->dbname);
1397 : else
1398 4 : pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1399 : slot_name, dbinfo->dbname);
1400 :
1401 16 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1402 :
1403 16 : appendPQExpBuffer(str,
1404 : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1405 : slot_name_esc,
1406 16 : dbinfos.two_phase ? "true" : "false");
1407 :
1408 16 : PQfreemem(slot_name_esc);
1409 :
1410 16 : pg_log_debug("command is: %s", str->data);
1411 :
1412 16 : if (!dry_run)
1413 : {
1414 4 : res = PQexec(conn, str->data);
1415 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1416 : {
1417 0 : pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1418 : slot_name, dbinfo->dbname,
1419 : PQresultErrorMessage(res));
1420 0 : PQclear(res);
1421 0 : destroyPQExpBuffer(str);
1422 0 : return NULL;
1423 : }
1424 :
1425 4 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1426 4 : PQclear(res);
1427 : }
1428 :
1429 : /* For cleanup purposes */
1430 16 : dbinfo->made_replslot = true;
1431 :
1432 16 : destroyPQExpBuffer(str);
1433 :
1434 16 : return lsn;
1435 : }
1436 :
1437 : static void
1438 16 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1439 : const char *slot_name)
1440 : {
1441 16 : PQExpBuffer str = createPQExpBuffer();
1442 : char *slot_name_esc;
1443 : PGresult *res;
1444 :
1445 : Assert(conn != NULL);
1446 :
1447 16 : if (dry_run)
1448 12 : pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1449 : slot_name, dbinfo->dbname);
1450 : else
1451 4 : pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1452 : slot_name, dbinfo->dbname);
1453 :
1454 16 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1455 :
1456 16 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1457 :
1458 16 : PQfreemem(slot_name_esc);
1459 :
1460 16 : pg_log_debug("command is: %s", str->data);
1461 :
1462 16 : if (!dry_run)
1463 : {
1464 4 : res = PQexec(conn, str->data);
1465 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1466 : {
1467 0 : pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1468 : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1469 0 : dbinfo->made_replslot = false; /* don't try again. */
1470 : }
1471 :
1472 4 : PQclear(res);
1473 : }
1474 :
1475 16 : destroyPQExpBuffer(str);
1476 16 : }
1477 :
1478 : /*
1479 : * Reports a suitable message if pg_ctl fails.
1480 : */
1481 : static void
1482 48 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1483 : {
1484 48 : if (rc != 0)
1485 : {
1486 0 : if (WIFEXITED(rc))
1487 : {
1488 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1489 : }
1490 0 : else if (WIFSIGNALED(rc))
1491 : {
1492 : #if defined(WIN32)
1493 : pg_log_error("pg_ctl was terminated by exception 0x%X",
1494 : WTERMSIG(rc));
1495 : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1496 : #else
1497 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1498 : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1499 : #endif
1500 : }
1501 : else
1502 : {
1503 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1504 : }
1505 :
1506 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1507 0 : exit(1);
1508 : }
1509 48 : }
1510 :
1511 : static void
1512 24 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1513 : bool restrict_logical_worker)
1514 : {
1515 24 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1516 : int rc;
1517 :
1518 24 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1519 24 : appendShellString(pg_ctl_cmd, subscriber_dir);
1520 24 : appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1521 :
1522 : /* Prevent unintended slot invalidation */
1523 24 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1524 :
1525 24 : if (restricted_access)
1526 : {
1527 24 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1528 : #if !defined(WIN32)
1529 :
1530 : /*
1531 : * An empty listen_addresses list means the server does not listen on
1532 : * any IP interfaces; only Unix-domain sockets can be used to connect
1533 : * to the server. Prevent external connections to minimize the chance
1534 : * of failure.
1535 : */
1536 24 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1537 24 : if (opt->socket_dir)
1538 24 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1539 24 : opt->socket_dir);
1540 24 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1541 : #endif
1542 : }
1543 24 : if (opt->config_file != NULL)
1544 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1545 0 : opt->config_file);
1546 :
1547 : /* Suppress to start logical replication if requested */
1548 24 : if (restrict_logical_worker)
1549 8 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1550 :
1551 24 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1552 24 : rc = system(pg_ctl_cmd->data);
1553 24 : pg_ctl_status(pg_ctl_cmd->data, rc);
1554 24 : standby_running = true;
1555 24 : destroyPQExpBuffer(pg_ctl_cmd);
1556 24 : pg_log_info("server was started");
1557 24 : }
1558 :
1559 : static void
1560 24 : stop_standby_server(const char *datadir)
1561 : {
1562 : char *pg_ctl_cmd;
1563 : int rc;
1564 :
1565 24 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1566 : datadir);
1567 24 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1568 24 : rc = system(pg_ctl_cmd);
1569 24 : pg_ctl_status(pg_ctl_cmd, rc);
1570 24 : standby_running = false;
1571 24 : pg_log_info("server was stopped");
1572 24 : }
1573 :
1574 : /*
1575 : * Returns after the server finishes the recovery process.
1576 : *
1577 : * If recovery_timeout option is set, terminate abnormally without finishing
1578 : * the recovery process. By default, it waits forever.
1579 : *
1580 : * XXX Is the recovery process still in progress? When recovery process has a
1581 : * better progress reporting mechanism, it should be added here.
1582 : */
1583 : static void
1584 8 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1585 : {
1586 : PGconn *conn;
1587 8 : int status = POSTMASTER_STILL_STARTING;
1588 8 : int timer = 0;
1589 :
1590 8 : pg_log_info("waiting for the target server to reach the consistent state");
1591 :
1592 8 : conn = connect_database(conninfo, true);
1593 :
1594 : for (;;)
1595 : {
1596 : /* Did the recovery process finish? We're done if so. */
1597 8 : if (dry_run || !server_is_in_recovery(conn))
1598 : {
1599 8 : status = POSTMASTER_READY;
1600 8 : recovery_ended = true;
1601 8 : break;
1602 : }
1603 :
1604 : /* Bail out after recovery_timeout seconds if this option is set */
1605 0 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1606 : {
1607 0 : stop_standby_server(subscriber_dir);
1608 0 : pg_log_error("recovery timed out");
1609 0 : disconnect_database(conn, true);
1610 : }
1611 :
1612 : /* Keep waiting */
1613 0 : pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
1614 :
1615 0 : timer += WAIT_INTERVAL;
1616 : }
1617 :
1618 8 : disconnect_database(conn, false);
1619 :
1620 8 : if (status == POSTMASTER_STILL_STARTING)
1621 0 : pg_fatal("server did not end recovery");
1622 :
1623 8 : pg_log_info("target server reached the consistent state");
1624 8 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1625 8 : }
1626 :
1627 : /*
1628 : * Create a publication that includes all tables in the database.
1629 : */
1630 : static void
1631 16 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1632 : {
1633 16 : PQExpBuffer str = createPQExpBuffer();
1634 : PGresult *res;
1635 : char *ipubname_esc;
1636 : char *spubname_esc;
1637 :
1638 : Assert(conn != NULL);
1639 :
1640 16 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1641 16 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1642 :
1643 : /* Check if the publication already exists */
1644 16 : appendPQExpBuffer(str,
1645 : "SELECT 1 FROM pg_catalog.pg_publication "
1646 : "WHERE pubname = %s",
1647 : spubname_esc);
1648 16 : res = PQexec(conn, str->data);
1649 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1650 : {
1651 0 : pg_log_error("could not obtain publication information: %s",
1652 : PQresultErrorMessage(res));
1653 0 : disconnect_database(conn, true);
1654 : }
1655 :
1656 16 : if (PQntuples(res) == 1)
1657 : {
1658 : /*
1659 : * Unfortunately, if it reaches this code path, it will always fail
1660 : * (unless you decide to change the existing publication name). That's
1661 : * bad but it is very unlikely that the user will choose a name with
1662 : * pg_createsubscriber_ prefix followed by the exact database oid and
1663 : * a random number.
1664 : */
1665 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1666 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
1667 0 : disconnect_database(conn, true);
1668 : }
1669 :
1670 16 : PQclear(res);
1671 16 : resetPQExpBuffer(str);
1672 :
1673 16 : if (dry_run)
1674 12 : pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1675 : dbinfo->pubname, dbinfo->dbname);
1676 : else
1677 4 : pg_log_info("creating publication \"%s\" in database \"%s\"",
1678 : dbinfo->pubname, dbinfo->dbname);
1679 :
1680 16 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1681 : ipubname_esc);
1682 :
1683 16 : pg_log_debug("command is: %s", str->data);
1684 :
1685 16 : if (!dry_run)
1686 : {
1687 4 : res = PQexec(conn, str->data);
1688 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1689 : {
1690 0 : pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1691 : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1692 0 : disconnect_database(conn, true);
1693 : }
1694 4 : PQclear(res);
1695 : }
1696 :
1697 : /* For cleanup purposes */
1698 16 : dbinfo->made_publication = true;
1699 :
1700 16 : PQfreemem(ipubname_esc);
1701 16 : PQfreemem(spubname_esc);
1702 16 : destroyPQExpBuffer(str);
1703 16 : }
1704 :
1705 : /*
1706 : * Drop the specified publication in the given database.
1707 : */
1708 : static void
1709 20 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1710 : bool *made_publication)
1711 : {
1712 20 : PQExpBuffer str = createPQExpBuffer();
1713 : PGresult *res;
1714 : char *pubname_esc;
1715 :
1716 : Assert(conn != NULL);
1717 :
1718 20 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1719 :
1720 20 : if (dry_run)
1721 12 : pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1722 : pubname, dbname);
1723 : else
1724 8 : pg_log_info("dropping publication \"%s\" in database \"%s\"",
1725 : pubname, dbname);
1726 :
1727 20 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1728 :
1729 20 : PQfreemem(pubname_esc);
1730 :
1731 20 : pg_log_debug("command is: %s", str->data);
1732 :
1733 20 : if (!dry_run)
1734 : {
1735 8 : res = PQexec(conn, str->data);
1736 8 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1737 : {
1738 0 : pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1739 : pubname, dbname, PQresultErrorMessage(res));
1740 0 : *made_publication = false; /* don't try again. */
1741 :
1742 : /*
1743 : * Don't disconnect and exit here. This routine is used by primary
1744 : * (cleanup publication / replication slot due to an error) and
1745 : * subscriber (remove the replicated publications). In both cases,
1746 : * it can continue and provide instructions for the user to remove
1747 : * it later if cleanup fails.
1748 : */
1749 : }
1750 8 : PQclear(res);
1751 : }
1752 :
1753 20 : destroyPQExpBuffer(str);
1754 20 : }
1755 :
1756 : /*
1757 : * Retrieve and drop the publications.
1758 : *
1759 : * Since the publications were created before the consistent LSN, they
1760 : * remain on the subscriber even after the physical replica is
1761 : * promoted. Remove these publications from the subscriber because
1762 : * they have no use. Additionally, if requested, drop all pre-existing
1763 : * publications.
1764 : */
1765 : static void
1766 16 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
1767 : {
1768 : PGresult *res;
1769 16 : bool drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
1770 :
1771 : Assert(conn != NULL);
1772 :
1773 16 : if (drop_all_pubs)
1774 : {
1775 4 : pg_log_info("dropping all existing publications in database \"%s\"",
1776 : dbinfo->dbname);
1777 :
1778 : /* Fetch all publication names */
1779 4 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1780 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1781 : {
1782 0 : pg_log_error("could not obtain publication information: %s",
1783 : PQresultErrorMessage(res));
1784 0 : PQclear(res);
1785 0 : disconnect_database(conn, true);
1786 : }
1787 :
1788 : /* Drop each publication */
1789 12 : for (int i = 0; i < PQntuples(res); i++)
1790 8 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1791 : &dbinfo->made_publication);
1792 :
1793 4 : PQclear(res);
1794 : }
1795 :
1796 : /*
1797 : * In dry-run mode, we don't create publications, but we still try to drop
1798 : * those to provide necessary information to the user.
1799 : */
1800 16 : if (!drop_all_pubs || dry_run)
1801 12 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1802 : &dbinfo->made_publication);
1803 16 : }
1804 :
1805 : /*
1806 : * Create a subscription with some predefined options.
1807 : *
1808 : * A replication slot was already created in a previous step. Let's use it. It
1809 : * is not required to copy data. The subscription will be created but it will
1810 : * not be enabled now. That's because the replication progress must be set and
1811 : * the replication origin name (one of the function arguments) contains the
1812 : * subscription OID in its name. Once the subscription is created,
1813 : * set_replication_progress() can obtain the chosen origin name and set up its
1814 : * initial location.
1815 : */
1816 : static void
1817 16 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1818 : {
1819 16 : PQExpBuffer str = createPQExpBuffer();
1820 : PGresult *res;
1821 : char *pubname_esc;
1822 : char *subname_esc;
1823 : char *pubconninfo_esc;
1824 : char *replslotname_esc;
1825 :
1826 : Assert(conn != NULL);
1827 :
1828 16 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1829 16 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1830 16 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1831 16 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1832 :
1833 16 : if (dry_run)
1834 12 : pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
1835 : dbinfo->subname, dbinfo->dbname);
1836 : else
1837 4 : pg_log_info("creating subscription \"%s\" in database \"%s\"",
1838 : dbinfo->subname, dbinfo->dbname);
1839 :
1840 16 : appendPQExpBuffer(str,
1841 : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1842 : "WITH (create_slot = false, enabled = false, "
1843 : "slot_name = %s, copy_data = false, two_phase = %s)",
1844 : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1845 16 : dbinfos.two_phase ? "true" : "false");
1846 :
1847 16 : PQfreemem(pubname_esc);
1848 16 : PQfreemem(subname_esc);
1849 16 : PQfreemem(pubconninfo_esc);
1850 16 : PQfreemem(replslotname_esc);
1851 :
1852 16 : pg_log_debug("command is: %s", str->data);
1853 :
1854 16 : if (!dry_run)
1855 : {
1856 4 : res = PQexec(conn, str->data);
1857 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1858 : {
1859 0 : pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1860 : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1861 0 : disconnect_database(conn, true);
1862 : }
1863 4 : PQclear(res);
1864 : }
1865 :
1866 16 : destroyPQExpBuffer(str);
1867 16 : }
1868 :
1869 : /*
1870 : * Sets the replication progress to the consistent LSN.
1871 : *
1872 : * The subscriber caught up to the consistent LSN provided by the last
1873 : * replication slot that was created. The goal is to set up the initial
1874 : * location for the logical replication that is the exact LSN that the
1875 : * subscriber was promoted. Once the subscription is enabled it will start
1876 : * streaming from that location onwards. In dry run mode, the subscription OID
1877 : * and LSN are set to invalid values for printing purposes.
1878 : */
1879 : static void
1880 16 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1881 : {
1882 16 : PQExpBuffer str = createPQExpBuffer();
1883 : PGresult *res;
1884 : Oid suboid;
1885 : char *subname;
1886 : char *dbname;
1887 : char *originname;
1888 : char *lsnstr;
1889 :
1890 : Assert(conn != NULL);
1891 :
1892 16 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1893 16 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1894 :
1895 16 : appendPQExpBuffer(str,
1896 : "SELECT s.oid FROM pg_catalog.pg_subscription s "
1897 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1898 : "WHERE s.subname = %s AND d.datname = %s",
1899 : subname, dbname);
1900 :
1901 16 : res = PQexec(conn, str->data);
1902 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1903 : {
1904 0 : pg_log_error("could not obtain subscription OID: %s",
1905 : PQresultErrorMessage(res));
1906 0 : disconnect_database(conn, true);
1907 : }
1908 :
1909 16 : if (PQntuples(res) != 1 && !dry_run)
1910 : {
1911 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1912 : PQntuples(res), 1);
1913 0 : disconnect_database(conn, true);
1914 : }
1915 :
1916 16 : if (dry_run)
1917 : {
1918 12 : suboid = InvalidOid;
1919 12 : lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1920 : }
1921 : else
1922 : {
1923 4 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1924 4 : lsnstr = psprintf("%s", lsn);
1925 : }
1926 :
1927 16 : PQclear(res);
1928 :
1929 : /*
1930 : * The origin name is defined as pg_%u. %u is the subscription OID. See
1931 : * ApplyWorkerMain().
1932 : */
1933 16 : originname = psprintf("pg_%u", suboid);
1934 :
1935 16 : if (dry_run)
1936 12 : pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1937 : originname, lsnstr, dbinfo->dbname);
1938 : else
1939 4 : pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1940 : originname, lsnstr, dbinfo->dbname);
1941 :
1942 16 : resetPQExpBuffer(str);
1943 16 : appendPQExpBuffer(str,
1944 : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1945 : originname, lsnstr);
1946 :
1947 16 : pg_log_debug("command is: %s", str->data);
1948 :
1949 16 : if (!dry_run)
1950 : {
1951 4 : res = PQexec(conn, str->data);
1952 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1953 : {
1954 0 : pg_log_error("could not set replication progress for subscription \"%s\": %s",
1955 : dbinfo->subname, PQresultErrorMessage(res));
1956 0 : disconnect_database(conn, true);
1957 : }
1958 4 : PQclear(res);
1959 : }
1960 :
1961 16 : PQfreemem(subname);
1962 16 : PQfreemem(dbname);
1963 16 : pg_free(originname);
1964 16 : pg_free(lsnstr);
1965 16 : destroyPQExpBuffer(str);
1966 16 : }
1967 :
1968 : /*
1969 : * Enables the subscription.
1970 : *
1971 : * The subscription was created in a previous step but it was disabled. After
1972 : * adjusting the initial logical replication location, enable the subscription.
1973 : */
1974 : static void
1975 16 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1976 : {
1977 16 : PQExpBuffer str = createPQExpBuffer();
1978 : PGresult *res;
1979 : char *subname;
1980 :
1981 : Assert(conn != NULL);
1982 :
1983 16 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1984 :
1985 16 : if (dry_run)
1986 12 : pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
1987 : dbinfo->subname, dbinfo->dbname);
1988 : else
1989 4 : pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1990 : dbinfo->subname, dbinfo->dbname);
1991 :
1992 16 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1993 :
1994 16 : pg_log_debug("command is: %s", str->data);
1995 :
1996 16 : if (!dry_run)
1997 : {
1998 4 : res = PQexec(conn, str->data);
1999 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2000 : {
2001 0 : pg_log_error("could not enable subscription \"%s\": %s",
2002 : dbinfo->subname, PQresultErrorMessage(res));
2003 0 : disconnect_database(conn, true);
2004 : }
2005 :
2006 4 : PQclear(res);
2007 : }
2008 :
2009 16 : PQfreemem(subname);
2010 16 : destroyPQExpBuffer(str);
2011 16 : }
2012 :
2013 : /*
2014 : * Fetch a list of all connectable non-template databases from the source server
2015 : * and form a list such that they appear as if the user has specified multiple
2016 : * --database options, one for each source database.
2017 : */
2018 : static void
2019 2 : get_publisher_databases(struct CreateSubscriberOptions *opt,
2020 : bool dbnamespecified)
2021 : {
2022 : PGconn *conn;
2023 : PGresult *res;
2024 :
2025 : /* If a database name was specified, just connect to it. */
2026 2 : if (dbnamespecified)
2027 0 : conn = connect_database(opt->pub_conninfo_str, true);
2028 : else
2029 : {
2030 : /* Otherwise, try postgres first and then template1. */
2031 : char *conninfo;
2032 :
2033 2 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2034 2 : conn = connect_database(conninfo, false);
2035 2 : pg_free(conninfo);
2036 2 : if (!conn)
2037 : {
2038 0 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2039 0 : conn = connect_database(conninfo, true);
2040 0 : pg_free(conninfo);
2041 : }
2042 : }
2043 :
2044 2 : res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2045 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2046 : {
2047 0 : pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2048 0 : PQclear(res);
2049 0 : disconnect_database(conn, true);
2050 : }
2051 :
2052 8 : for (int i = 0; i < PQntuples(res); i++)
2053 : {
2054 6 : const char *dbname = PQgetvalue(res, i, 0);
2055 :
2056 6 : simple_string_list_append(&opt->database_names, dbname);
2057 :
2058 : /* Increment num_dbs to reflect multiple --database options */
2059 6 : num_dbs++;
2060 : }
2061 :
2062 2 : PQclear(res);
2063 2 : disconnect_database(conn, false);
2064 2 : }
2065 :
2066 : int
2067 46 : main(int argc, char **argv)
2068 : {
2069 : static struct option long_options[] =
2070 : {
2071 : {"all", no_argument, NULL, 'a'},
2072 : {"database", required_argument, NULL, 'd'},
2073 : {"pgdata", required_argument, NULL, 'D'},
2074 : {"dry-run", no_argument, NULL, 'n'},
2075 : {"subscriber-port", required_argument, NULL, 'p'},
2076 : {"publisher-server", required_argument, NULL, 'P'},
2077 : {"socketdir", required_argument, NULL, 's'},
2078 : {"recovery-timeout", required_argument, NULL, 't'},
2079 : {"enable-two-phase", no_argument, NULL, 'T'},
2080 : {"subscriber-username", required_argument, NULL, 'U'},
2081 : {"verbose", no_argument, NULL, 'v'},
2082 : {"version", no_argument, NULL, 'V'},
2083 : {"help", no_argument, NULL, '?'},
2084 : {"config-file", required_argument, NULL, 1},
2085 : {"publication", required_argument, NULL, 2},
2086 : {"replication-slot", required_argument, NULL, 3},
2087 : {"subscription", required_argument, NULL, 4},
2088 : {"clean", required_argument, NULL, 5},
2089 : {NULL, 0, NULL, 0}
2090 : };
2091 :
2092 46 : struct CreateSubscriberOptions opt = {0};
2093 :
2094 : int c;
2095 : int option_index;
2096 :
2097 : char *pub_base_conninfo;
2098 : char *sub_base_conninfo;
2099 46 : char *dbname_conninfo = NULL;
2100 :
2101 : uint64 pub_sysid;
2102 : uint64 sub_sysid;
2103 : struct stat statbuf;
2104 :
2105 : char *consistent_lsn;
2106 :
2107 : char pidfile[MAXPGPATH];
2108 :
2109 46 : pg_logging_init(argv[0]);
2110 46 : pg_logging_set_level(PG_LOG_WARNING);
2111 46 : progname = get_progname(argv[0]);
2112 46 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2113 :
2114 46 : if (argc > 1)
2115 : {
2116 44 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2117 : {
2118 2 : usage();
2119 2 : exit(0);
2120 : }
2121 42 : else if (strcmp(argv[1], "-V") == 0
2122 42 : || strcmp(argv[1], "--version") == 0)
2123 : {
2124 2 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2125 2 : exit(0);
2126 : }
2127 : }
2128 :
2129 : /* Default settings */
2130 42 : subscriber_dir = NULL;
2131 42 : opt.config_file = NULL;
2132 42 : opt.pub_conninfo_str = NULL;
2133 42 : opt.socket_dir = NULL;
2134 42 : opt.sub_port = DEFAULT_SUB_PORT;
2135 42 : opt.sub_username = NULL;
2136 42 : opt.two_phase = false;
2137 42 : opt.database_names = (SimpleStringList)
2138 : {
2139 : 0
2140 : };
2141 42 : opt.recovery_timeout = 0;
2142 42 : opt.all_dbs = false;
2143 :
2144 : /*
2145 : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2146 : * it either.
2147 : */
2148 : #ifndef WIN32
2149 42 : if (geteuid() == 0)
2150 : {
2151 0 : pg_log_error("cannot be executed by \"root\"");
2152 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2153 : progname);
2154 0 : exit(1);
2155 : }
2156 : #endif
2157 :
2158 42 : get_restricted_token();
2159 :
2160 324 : while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
2161 324 : long_options, &option_index)) != -1)
2162 : {
2163 288 : switch (c)
2164 : {
2165 6 : case 'a':
2166 6 : opt.all_dbs = true;
2167 6 : break;
2168 50 : case 'd':
2169 50 : if (!simple_string_list_member(&opt.database_names, optarg))
2170 : {
2171 48 : simple_string_list_append(&opt.database_names, optarg);
2172 48 : num_dbs++;
2173 : }
2174 : else
2175 2 : pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2176 48 : break;
2177 38 : case 'D':
2178 38 : subscriber_dir = pg_strdup(optarg);
2179 38 : canonicalize_path(subscriber_dir);
2180 38 : break;
2181 18 : case 'n':
2182 18 : dry_run = true;
2183 18 : break;
2184 24 : case 'p':
2185 24 : opt.sub_port = pg_strdup(optarg);
2186 24 : break;
2187 36 : case 'P':
2188 36 : opt.pub_conninfo_str = pg_strdup(optarg);
2189 36 : break;
2190 24 : case 's':
2191 24 : opt.socket_dir = pg_strdup(optarg);
2192 24 : canonicalize_path(opt.socket_dir);
2193 24 : break;
2194 6 : case 't':
2195 6 : opt.recovery_timeout = atoi(optarg);
2196 6 : break;
2197 2 : case 'T':
2198 2 : opt.two_phase = true;
2199 2 : break;
2200 0 : case 'U':
2201 0 : opt.sub_username = pg_strdup(optarg);
2202 0 : break;
2203 38 : case 'v':
2204 38 : pg_logging_increase_verbosity();
2205 38 : break;
2206 0 : case 1:
2207 0 : opt.config_file = pg_strdup(optarg);
2208 0 : break;
2209 24 : case 2:
2210 24 : if (!simple_string_list_member(&opt.pub_names, optarg))
2211 : {
2212 22 : simple_string_list_append(&opt.pub_names, optarg);
2213 22 : num_pubs++;
2214 : }
2215 : else
2216 2 : pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2217 22 : break;
2218 8 : case 3:
2219 8 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2220 : {
2221 8 : simple_string_list_append(&opt.replslot_names, optarg);
2222 8 : num_replslots++;
2223 : }
2224 : else
2225 0 : pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2226 8 : break;
2227 10 : case 4:
2228 10 : if (!simple_string_list_member(&opt.sub_names, optarg))
2229 : {
2230 10 : simple_string_list_append(&opt.sub_names, optarg);
2231 10 : num_subs++;
2232 : }
2233 : else
2234 0 : pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2235 10 : break;
2236 2 : case 5:
2237 2 : if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
2238 2 : simple_string_list_append(&opt.objecttypes_to_clean, optarg);
2239 : else
2240 0 : pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2241 2 : break;
2242 2 : default:
2243 : /* getopt_long already emitted a complaint */
2244 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2245 2 : exit(1);
2246 : }
2247 : }
2248 :
2249 : /* Validate that --all is not used with incompatible options */
2250 36 : if (opt.all_dbs)
2251 : {
2252 6 : char *bad_switch = NULL;
2253 :
2254 6 : if (num_dbs > 0)
2255 2 : bad_switch = "--database";
2256 4 : else if (num_pubs > 0)
2257 2 : bad_switch = "--publication";
2258 2 : else if (num_replslots > 0)
2259 0 : bad_switch = "--replication-slot";
2260 2 : else if (num_subs > 0)
2261 0 : bad_switch = "--subscription";
2262 :
2263 6 : if (bad_switch)
2264 : {
2265 4 : pg_log_error("options %s and -a/--all cannot be used together", bad_switch);
2266 4 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2267 4 : exit(1);
2268 : }
2269 : }
2270 :
2271 : /* Any non-option arguments? */
2272 32 : if (optind < argc)
2273 : {
2274 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
2275 : argv[optind]);
2276 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2277 0 : exit(1);
2278 : }
2279 :
2280 : /* Required arguments */
2281 32 : if (subscriber_dir == NULL)
2282 : {
2283 2 : pg_log_error("no subscriber data directory specified");
2284 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2285 2 : exit(1);
2286 : }
2287 :
2288 : /* If socket directory is not provided, use the current directory */
2289 30 : if (opt.socket_dir == NULL)
2290 : {
2291 : char cwd[MAXPGPATH];
2292 :
2293 10 : if (!getcwd(cwd, MAXPGPATH))
2294 0 : pg_fatal("could not determine current directory");
2295 10 : opt.socket_dir = pg_strdup(cwd);
2296 10 : canonicalize_path(opt.socket_dir);
2297 : }
2298 :
2299 : /*
2300 : * Parse connection string. Build a base connection string that might be
2301 : * reused by multiple databases.
2302 : */
2303 30 : if (opt.pub_conninfo_str == NULL)
2304 : {
2305 : /*
2306 : * TODO use primary_conninfo (if available) from subscriber and
2307 : * extract publisher connection string. Assume that there are
2308 : * identical entries for physical and logical replication. If there is
2309 : * not, we would fail anyway.
2310 : */
2311 2 : pg_log_error("no publisher connection string specified");
2312 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2313 2 : exit(1);
2314 : }
2315 28 : pg_log_info("validating publisher connection string");
2316 28 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2317 : &dbname_conninfo);
2318 28 : if (pub_base_conninfo == NULL)
2319 0 : exit(1);
2320 :
2321 28 : pg_log_info("validating subscriber connection string");
2322 28 : sub_base_conninfo = get_sub_conninfo(&opt);
2323 :
2324 : /*
2325 : * Fetch all databases from the source (publisher) and treat them as if
2326 : * the user specified has multiple --database options, one for each source
2327 : * database.
2328 : */
2329 28 : if (opt.all_dbs)
2330 : {
2331 2 : bool dbnamespecified = (dbname_conninfo != NULL);
2332 :
2333 2 : get_publisher_databases(&opt, dbnamespecified);
2334 : }
2335 :
2336 28 : if (opt.database_names.head == NULL)
2337 : {
2338 4 : pg_log_info("no database was specified");
2339 :
2340 : /*
2341 : * Try to obtain the dbname from the publisher conninfo. If dbname
2342 : * parameter is not available, error out.
2343 : */
2344 4 : if (dbname_conninfo)
2345 : {
2346 2 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2347 2 : num_dbs++;
2348 :
2349 2 : pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2350 : dbname_conninfo);
2351 : }
2352 : else
2353 : {
2354 2 : pg_log_error("no database name specified");
2355 2 : pg_log_error_hint("Try \"%s --help\" for more information.",
2356 : progname);
2357 2 : exit(1);
2358 : }
2359 : }
2360 :
2361 : /* Number of object names must match number of databases */
2362 26 : if (num_pubs > 0 && num_pubs != num_dbs)
2363 : {
2364 2 : pg_log_error("wrong number of publication names specified");
2365 2 : pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2366 : num_pubs, num_dbs);
2367 2 : exit(1);
2368 : }
2369 24 : if (num_subs > 0 && num_subs != num_dbs)
2370 : {
2371 2 : pg_log_error("wrong number of subscription names specified");
2372 2 : pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2373 : num_subs, num_dbs);
2374 2 : exit(1);
2375 : }
2376 22 : if (num_replslots > 0 && num_replslots != num_dbs)
2377 : {
2378 2 : pg_log_error("wrong number of replication slot names specified");
2379 2 : pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2380 : num_replslots, num_dbs);
2381 2 : exit(1);
2382 : }
2383 :
2384 : /* Verify the object types specified for removal from the subscriber */
2385 22 : for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2386 : {
2387 2 : if (pg_strcasecmp(cell->val, "publications") == 0)
2388 2 : dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
2389 : else
2390 : {
2391 0 : pg_log_error("invalid object type \"%s\" specified for --clean", cell->val);
2392 0 : pg_log_error_hint("The valid value is: \"%s\"", "publications");
2393 0 : exit(1);
2394 : }
2395 : }
2396 :
2397 : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2398 20 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2399 20 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2400 :
2401 : /* Rudimentary check for a data directory */
2402 20 : check_data_directory(subscriber_dir);
2403 :
2404 20 : dbinfos.two_phase = opt.two_phase;
2405 :
2406 : /*
2407 : * Store database information for publisher and subscriber. It should be
2408 : * called before atexit() because its return is used in the
2409 : * cleanup_objects_atexit().
2410 : */
2411 20 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2412 :
2413 : /* Register a function to clean up objects in case of failure */
2414 20 : atexit(cleanup_objects_atexit);
2415 :
2416 : /*
2417 : * Check if the subscriber data directory has the same system identifier
2418 : * than the publisher data directory.
2419 : */
2420 20 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
2421 20 : sub_sysid = get_standby_sysid(subscriber_dir);
2422 20 : if (pub_sysid != sub_sysid)
2423 2 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2424 :
2425 : /* Subscriber PID file */
2426 18 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2427 :
2428 : /*
2429 : * The standby server must not be running. If the server is started under
2430 : * service manager and pg_createsubscriber stops it, the service manager
2431 : * might react to this action and start the server again. Therefore,
2432 : * refuse to proceed if the server is running to avoid possible failures.
2433 : */
2434 18 : if (stat(pidfile, &statbuf) == 0)
2435 : {
2436 2 : pg_log_error("standby server is running");
2437 2 : pg_log_error_hint("Stop the standby server and try again.");
2438 2 : exit(1);
2439 : }
2440 :
2441 : /*
2442 : * Start a short-lived standby server with temporary parameters (provided
2443 : * by command-line options). The goal is to avoid connections during the
2444 : * transformation steps.
2445 : */
2446 16 : pg_log_info("starting the standby server with command-line options");
2447 16 : start_standby_server(&opt, true, false);
2448 :
2449 : /* Check if the standby server is ready for logical replication */
2450 16 : check_subscriber(dbinfos.dbinfo);
2451 :
2452 : /* Check if the primary server is ready for logical replication */
2453 12 : check_publisher(dbinfos.dbinfo);
2454 :
2455 : /*
2456 : * Stop the target server. The recovery process requires that the server
2457 : * reaches a consistent state before targeting the recovery stop point.
2458 : * Make sure a consistent state is reached (stop the target server
2459 : * guarantees it) *before* creating the replication slots in
2460 : * setup_publisher().
2461 : */
2462 8 : pg_log_info("stopping the subscriber");
2463 8 : stop_standby_server(subscriber_dir);
2464 :
2465 : /* Create the required objects for each database on publisher */
2466 8 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2467 :
2468 : /* Write the required recovery parameters */
2469 8 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2470 :
2471 : /*
2472 : * Start subscriber so the recovery parameters will take effect. Wait
2473 : * until accepting connections. We don't want to start logical replication
2474 : * during setup.
2475 : */
2476 8 : pg_log_info("starting the subscriber");
2477 8 : start_standby_server(&opt, true, true);
2478 :
2479 : /* Waiting the subscriber to be promoted */
2480 8 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2481 :
2482 : /*
2483 : * Create the subscription for each database on subscriber. It does not
2484 : * enable it immediately because it needs to adjust the replication start
2485 : * point to the LSN reported by setup_publisher(). It also cleans up
2486 : * publications created by this tool and replication to the standby.
2487 : */
2488 8 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2489 :
2490 : /* Remove primary_slot_name if it exists on primary */
2491 8 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2492 :
2493 : /* Remove failover replication slots if they exist on subscriber */
2494 8 : drop_failover_replication_slots(dbinfos.dbinfo);
2495 :
2496 : /* Stop the subscriber */
2497 8 : pg_log_info("stopping the subscriber");
2498 8 : stop_standby_server(subscriber_dir);
2499 :
2500 : /* Change system identifier from subscriber */
2501 8 : modify_subscriber_sysid(&opt);
2502 :
2503 8 : success = true;
2504 :
2505 8 : pg_log_info("Done!");
2506 :
2507 8 : return 0;
2508 : }
|