Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_createsubscriber.c
4 : * Create a new logical replica from a standby server
5 : *
6 : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres_fe.h"
15 :
16 : #include <sys/stat.h>
17 : #include <sys/time.h>
18 : #include <sys/wait.h>
19 : #include <time.h>
20 :
21 : #include "common/connect.h"
22 : #include "common/controldata_utils.h"
23 : #include "common/logging.h"
24 : #include "common/pg_prng.h"
25 : #include "common/restricted_token.h"
26 : #include "fe_utils/recovery_gen.h"
27 : #include "fe_utils/simple_list.h"
28 : #include "fe_utils/string_utils.h"
29 : #include "getopt_long.h"
30 :
31 : #define DEFAULT_SUB_PORT "50432"
32 : #define OBJECTTYPE_PUBLICATIONS 0x0001
33 :
34 : /* Command-line options */
35 : struct CreateSubscriberOptions
36 : {
37 : char *config_file; /* configuration file */
38 : char *pub_conninfo_str; /* publisher connection string */
39 : char *socket_dir; /* directory for Unix-domain socket, if any */
40 : char *sub_port; /* subscriber port number */
41 : const char *sub_username; /* subscriber username */
42 : bool two_phase; /* enable-two-phase option */
43 : SimpleStringList database_names; /* list of database names */
44 : SimpleStringList pub_names; /* list of publication names */
45 : SimpleStringList sub_names; /* list of subscription names */
46 : SimpleStringList replslot_names; /* list of replication slot names */
47 : int recovery_timeout; /* stop recovery after this time */
48 : bool all_dbs; /* all option */
49 : SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
50 : };
51 :
52 : /* per-database publication/subscription info */
53 : struct LogicalRepInfo
54 : {
55 : char *dbname; /* database name */
56 : char *pubconninfo; /* publisher connection string */
57 : char *subconninfo; /* subscriber connection string */
58 : char *pubname; /* publication name */
59 : char *subname; /* subscription name */
60 : char *replslotname; /* replication slot name */
61 :
62 : bool made_replslot; /* replication slot was created */
63 : bool made_publication; /* publication was created */
64 : };
65 :
66 : /*
67 : * Information shared across all the databases (or publications and
68 : * subscriptions).
69 : */
70 : struct LogicalRepInfos
71 : {
72 : struct LogicalRepInfo *dbinfo;
73 : bool two_phase; /* enable-two-phase option */
74 : bits32 objecttypes_to_clean; /* flags indicating which object types
75 : * to clean up on subscriber */
76 : };
77 :
78 : static void cleanup_objects_atexit(void);
79 : static void usage();
80 : static char *get_base_conninfo(const char *conninfo, char **dbname);
81 : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
82 : static char *get_exec_path(const char *argv0, const char *progname);
83 : static void check_data_directory(const char *datadir);
84 : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
85 : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
86 : const char *pub_base_conninfo,
87 : const char *sub_base_conninfo);
88 : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
89 : static void disconnect_database(PGconn *conn, bool exit_on_error);
90 : static uint64 get_primary_sysid(const char *conninfo);
91 : static uint64 get_standby_sysid(const char *datadir);
92 : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
93 : static bool server_is_in_recovery(PGconn *conn);
94 : static char *generate_object_name(PGconn *conn);
95 : static void check_publisher(const struct LogicalRepInfo *dbinfo);
96 : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
97 : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
98 : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
99 : const char *consistent_lsn);
100 : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
101 : const char *lsn);
102 : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
103 : const char *slotname);
104 : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
105 : static char *create_logical_replication_slot(PGconn *conn,
106 : struct LogicalRepInfo *dbinfo);
107 : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
108 : const char *slot_name);
109 : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
110 : static void start_standby_server(const struct CreateSubscriberOptions *opt,
111 : bool restricted_access,
112 : bool restrict_logical_worker);
113 : static void stop_standby_server(const char *datadir);
114 : static void wait_for_end_recovery(const char *conninfo,
115 : const struct CreateSubscriberOptions *opt);
116 : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
117 : static void drop_publication(PGconn *conn, const char *pubname,
118 : const char *dbname, bool *made_publication);
119 : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
120 : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
121 : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
122 : const char *lsn);
123 : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
124 : static void check_and_drop_existing_subscriptions(PGconn *conn,
125 : const struct LogicalRepInfo *dbinfo);
126 : static void drop_existing_subscriptions(PGconn *conn, const char *subname,
127 : const char *dbname);
128 : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
129 : bool dbnamespecified);
130 :
131 : #define USEC_PER_SEC 1000000
132 : #define WAIT_INTERVAL 1 /* 1 second */
133 :
134 : static const char *progname;
135 :
136 : static char *primary_slot_name = NULL;
137 : static bool dry_run = false;
138 :
139 : static bool success = false;
140 :
141 : static struct LogicalRepInfos dbinfos;
142 : static int num_dbs = 0; /* number of specified databases */
143 : static int num_pubs = 0; /* number of specified publications */
144 : static int num_subs = 0; /* number of specified subscriptions */
145 : static int num_replslots = 0; /* number of specified replication slots */
146 :
147 : static pg_prng_state prng_state;
148 :
149 : static char *pg_ctl_path = NULL;
150 : static char *pg_resetwal_path = NULL;
151 :
152 : /* standby / subscriber data directory */
153 : static char *subscriber_dir = NULL;
154 :
155 : static bool recovery_ended = false;
156 : static bool standby_running = false;
157 :
158 : enum WaitPMResult
159 : {
160 : POSTMASTER_READY,
161 : POSTMASTER_STILL_STARTING
162 : };
163 :
164 :
165 : /*
166 : * Cleanup objects that were created by pg_createsubscriber if there is an
167 : * error.
168 : *
169 : * Publications and replication slots are created on primary. Depending on the
170 : * step it failed, it should remove the already created objects if it is
171 : * possible (sometimes it won't work due to a connection issue).
172 : * There is no cleanup on the target server. The steps on the target server are
173 : * executed *after* promotion, hence, at this point, a failure means recreate
174 : * the physical replica and start again.
175 : */
176 : static void
177 20 : cleanup_objects_atexit(void)
178 : {
179 20 : if (success)
180 8 : return;
181 :
182 : /*
183 : * If the server is promoted, there is no way to use the current setup
184 : * again. Warn the user that a new replication setup should be done before
185 : * trying again.
186 : */
187 12 : if (recovery_ended)
188 : {
189 0 : pg_log_warning("failed after the end of recovery");
190 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
191 : "You must recreate the physical replica before continuing.");
192 : }
193 :
194 36 : for (int i = 0; i < num_dbs; i++)
195 : {
196 24 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
197 :
198 24 : if (dbinfo->made_publication || dbinfo->made_replslot)
199 : {
200 : PGconn *conn;
201 :
202 0 : conn = connect_database(dbinfo->pubconninfo, false);
203 0 : if (conn != NULL)
204 : {
205 0 : if (dbinfo->made_publication)
206 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
207 : &dbinfo->made_publication);
208 0 : if (dbinfo->made_replslot)
209 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
210 0 : disconnect_database(conn, false);
211 : }
212 : else
213 : {
214 : /*
215 : * If a connection could not be established, inform the user
216 : * that some objects were left on primary and should be
217 : * removed before trying again.
218 : */
219 0 : if (dbinfo->made_publication)
220 : {
221 0 : pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
222 : dbinfo->pubname,
223 : dbinfo->dbname);
224 0 : pg_log_warning_hint("Drop this publication before trying again.");
225 : }
226 0 : if (dbinfo->made_replslot)
227 : {
228 0 : pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
229 : dbinfo->replslotname,
230 : dbinfo->dbname);
231 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
232 : }
233 : }
234 : }
235 : }
236 :
237 12 : if (standby_running)
238 8 : stop_standby_server(subscriber_dir);
239 : }
240 :
241 : static void
242 2 : usage(void)
243 : {
244 2 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
245 : progname);
246 2 : printf(_("Usage:\n"));
247 2 : printf(_(" %s [OPTION]...\n"), progname);
248 2 : printf(_("\nOptions:\n"));
249 2 : printf(_(" -a, --all create subscriptions for all databases except template\n"
250 : " databases and databases that don't allow connections\n"));
251 2 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
252 2 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
253 2 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
254 2 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
255 2 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
256 2 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
257 2 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
258 2 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
259 2 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
260 2 : printf(_(" -v, --verbose output verbose messages\n"));
261 2 : printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
262 : " databases on the subscriber; accepts: \"%s\"\n"), "publications");
263 2 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
264 : " file when running target cluster\n"));
265 2 : printf(_(" --publication=NAME publication name\n"));
266 2 : printf(_(" --replication-slot=NAME replication slot name\n"));
267 2 : printf(_(" --subscription=NAME subscription name\n"));
268 2 : printf(_(" -V, --version output version information, then exit\n"));
269 2 : printf(_(" -?, --help show this help, then exit\n"));
270 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
271 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
272 2 : }
273 :
274 : /*
275 : * Subroutine to append "keyword=value" to a connection string,
276 : * with proper quoting of the value. (We assume keywords don't need that.)
277 : */
278 : static void
279 214 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
280 : {
281 214 : if (buf->len > 0)
282 158 : appendPQExpBufferChar(buf, ' ');
283 214 : appendPQExpBufferStr(buf, keyword);
284 214 : appendPQExpBufferChar(buf, '=');
285 214 : appendConnStrVal(buf, val);
286 214 : }
287 :
288 : /*
289 : * Validate a connection string. Returns a base connection string that is a
290 : * connection string without a database name.
291 : *
292 : * Since we might process multiple databases, each database name will be
293 : * appended to this base connection string to provide a final connection
294 : * string. If the second argument (dbname) is not null, returns dbname if the
295 : * provided connection string contains it.
296 : *
297 : * It is the caller's responsibility to free the returned connection string and
298 : * dbname.
299 : */
300 : static char *
301 28 : get_base_conninfo(const char *conninfo, char **dbname)
302 : {
303 : PQExpBuffer buf;
304 : PQconninfoOption *conn_opts;
305 : PQconninfoOption *conn_opt;
306 28 : char *errmsg = NULL;
307 : char *ret;
308 :
309 28 : conn_opts = PQconninfoParse(conninfo, &errmsg);
310 28 : if (conn_opts == NULL)
311 : {
312 0 : pg_log_error("could not parse connection string: %s", errmsg);
313 0 : PQfreemem(errmsg);
314 0 : return NULL;
315 : }
316 :
317 28 : buf = createPQExpBuffer();
318 1456 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
319 : {
320 1428 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
321 : {
322 66 : if (strcmp(conn_opt->keyword, "dbname") == 0)
323 : {
324 18 : if (dbname)
325 18 : *dbname = pg_strdup(conn_opt->val);
326 18 : continue;
327 : }
328 48 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
329 : }
330 : }
331 :
332 28 : ret = pg_strdup(buf->data);
333 :
334 28 : destroyPQExpBuffer(buf);
335 28 : PQconninfoFree(conn_opts);
336 :
337 28 : return ret;
338 : }
339 :
340 : /*
341 : * Build a subscriber connection string. Only a few parameters are supported
342 : * since it starts a server with restricted access.
343 : */
344 : static char *
345 28 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
346 : {
347 28 : PQExpBuffer buf = createPQExpBuffer();
348 : char *ret;
349 :
350 28 : appendConnStrItem(buf, "port", opt->sub_port);
351 : #if !defined(WIN32)
352 28 : appendConnStrItem(buf, "host", opt->socket_dir);
353 : #endif
354 28 : if (opt->sub_username != NULL)
355 0 : appendConnStrItem(buf, "user", opt->sub_username);
356 28 : appendConnStrItem(buf, "fallback_application_name", progname);
357 :
358 28 : ret = pg_strdup(buf->data);
359 :
360 28 : destroyPQExpBuffer(buf);
361 :
362 28 : return ret;
363 : }
364 :
365 : /*
366 : * Verify if a PostgreSQL binary (progname) is available in the same directory as
367 : * pg_createsubscriber and it has the same version. It returns the absolute
368 : * path of the progname.
369 : */
370 : static char *
371 40 : get_exec_path(const char *argv0, const char *progname)
372 : {
373 : char *versionstr;
374 : char *exec_path;
375 : int ret;
376 :
377 40 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
378 40 : exec_path = pg_malloc(MAXPGPATH);
379 40 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
380 :
381 40 : if (ret < 0)
382 : {
383 : char full_path[MAXPGPATH];
384 :
385 0 : if (find_my_exec(argv0, full_path) < 0)
386 0 : strlcpy(full_path, progname, sizeof(full_path));
387 :
388 0 : if (ret == -1)
389 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
390 : progname, "pg_createsubscriber", full_path);
391 : else
392 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
393 : progname, full_path, "pg_createsubscriber");
394 : }
395 :
396 40 : pg_log_debug("%s path is: %s", progname, exec_path);
397 :
398 40 : return exec_path;
399 : }
400 :
401 : /*
402 : * Is it a cluster directory? These are preliminary checks. It is far from
403 : * making an accurate check. If it is not a clone from the publisher, it will
404 : * eventually fail in a future step.
405 : */
406 : static void
407 20 : check_data_directory(const char *datadir)
408 : {
409 : struct stat statbuf;
410 : char versionfile[MAXPGPATH];
411 :
412 20 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
413 : datadir);
414 :
415 20 : if (stat(datadir, &statbuf) != 0)
416 : {
417 0 : if (errno == ENOENT)
418 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
419 : else
420 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
421 : }
422 :
423 20 : snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
424 20 : if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
425 : {
426 0 : pg_fatal("directory \"%s\" is not a database cluster directory",
427 : datadir);
428 : }
429 20 : }
430 :
431 : /*
432 : * Append database name into a base connection string.
433 : *
434 : * dbname is the only parameter that changes so it is not included in the base
435 : * connection string. This function concatenates dbname to build a "real"
436 : * connection string.
437 : */
438 : static char *
439 82 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
440 : {
441 82 : PQExpBuffer buf = createPQExpBuffer();
442 : char *ret;
443 :
444 : Assert(conninfo != NULL);
445 :
446 82 : appendPQExpBufferStr(buf, conninfo);
447 82 : appendConnStrItem(buf, "dbname", dbname);
448 :
449 82 : ret = pg_strdup(buf->data);
450 82 : destroyPQExpBuffer(buf);
451 :
452 82 : return ret;
453 : }
454 :
455 : /*
456 : * Store publication and subscription information.
457 : *
458 : * If publication, replication slot and subscription names were specified,
459 : * store it here. Otherwise, a generated name will be assigned to the object in
460 : * setup_publisher().
461 : */
462 : static struct LogicalRepInfo *
463 20 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
464 : const char *pub_base_conninfo,
465 : const char *sub_base_conninfo)
466 : {
467 : struct LogicalRepInfo *dbinfo;
468 20 : SimpleStringListCell *pubcell = NULL;
469 20 : SimpleStringListCell *subcell = NULL;
470 20 : SimpleStringListCell *replslotcell = NULL;
471 20 : int i = 0;
472 :
473 20 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
474 :
475 20 : if (num_pubs > 0)
476 4 : pubcell = opt->pub_names.head;
477 20 : if (num_subs > 0)
478 2 : subcell = opt->sub_names.head;
479 20 : if (num_replslots > 0)
480 4 : replslotcell = opt->replslot_names.head;
481 :
482 60 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
483 : {
484 : char *conninfo;
485 :
486 : /* Fill publisher attributes */
487 40 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
488 40 : dbinfo[i].pubconninfo = conninfo;
489 40 : dbinfo[i].dbname = cell->val;
490 40 : if (num_pubs > 0)
491 8 : dbinfo[i].pubname = pubcell->val;
492 : else
493 32 : dbinfo[i].pubname = NULL;
494 40 : if (num_replslots > 0)
495 6 : dbinfo[i].replslotname = replslotcell->val;
496 : else
497 34 : dbinfo[i].replslotname = NULL;
498 40 : dbinfo[i].made_replslot = false;
499 40 : dbinfo[i].made_publication = false;
500 : /* Fill subscriber attributes */
501 40 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
502 40 : dbinfo[i].subconninfo = conninfo;
503 40 : if (num_subs > 0)
504 4 : dbinfo[i].subname = subcell->val;
505 : else
506 36 : dbinfo[i].subname = NULL;
507 : /* Other fields will be filled later */
508 :
509 40 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
510 : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
511 : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
512 : dbinfo[i].pubconninfo);
513 40 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
514 : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
515 : dbinfo[i].subconninfo,
516 : dbinfos.two_phase ? "true" : "false");
517 :
518 40 : if (num_pubs > 0)
519 8 : pubcell = pubcell->next;
520 40 : if (num_subs > 0)
521 4 : subcell = subcell->next;
522 40 : if (num_replslots > 0)
523 6 : replslotcell = replslotcell->next;
524 :
525 40 : i++;
526 : }
527 :
528 20 : return dbinfo;
529 : }
530 :
531 : /*
532 : * Open a new connection. If exit_on_error is true, it has an undesired
533 : * condition and it should exit immediately.
534 : */
535 : static PGconn *
536 114 : connect_database(const char *conninfo, bool exit_on_error)
537 : {
538 : PGconn *conn;
539 : PGresult *res;
540 :
541 114 : conn = PQconnectdb(conninfo);
542 114 : if (PQstatus(conn) != CONNECTION_OK)
543 : {
544 0 : pg_log_error("connection to database failed: %s",
545 : PQerrorMessage(conn));
546 0 : PQfinish(conn);
547 :
548 0 : if (exit_on_error)
549 0 : exit(1);
550 0 : return NULL;
551 : }
552 :
553 : /* Secure search_path */
554 114 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
555 114 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
556 : {
557 0 : pg_log_error("could not clear \"search_path\": %s",
558 : PQresultErrorMessage(res));
559 0 : PQclear(res);
560 0 : PQfinish(conn);
561 :
562 0 : if (exit_on_error)
563 0 : exit(1);
564 0 : return NULL;
565 : }
566 114 : PQclear(res);
567 :
568 114 : return conn;
569 : }
570 :
571 : /*
572 : * Close the connection. If exit_on_error is true, it has an undesired
573 : * condition and it should exit immediately.
574 : */
575 : static void
576 114 : disconnect_database(PGconn *conn, bool exit_on_error)
577 : {
578 : Assert(conn != NULL);
579 :
580 114 : PQfinish(conn);
581 :
582 114 : if (exit_on_error)
583 4 : exit(1);
584 110 : }
585 :
586 : /*
587 : * Obtain the system identifier using the provided connection. It will be used
588 : * to compare if a data directory is a clone of another one.
589 : */
590 : static uint64
591 20 : get_primary_sysid(const char *conninfo)
592 : {
593 : PGconn *conn;
594 : PGresult *res;
595 : uint64 sysid;
596 :
597 20 : pg_log_info("getting system identifier from publisher");
598 :
599 20 : conn = connect_database(conninfo, true);
600 :
601 20 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
602 20 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
603 : {
604 0 : pg_log_error("could not get system identifier: %s",
605 : PQresultErrorMessage(res));
606 0 : disconnect_database(conn, true);
607 : }
608 20 : if (PQntuples(res) != 1)
609 : {
610 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
611 : PQntuples(res), 1);
612 0 : disconnect_database(conn, true);
613 : }
614 :
615 20 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
616 :
617 20 : pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
618 :
619 20 : PQclear(res);
620 20 : disconnect_database(conn, false);
621 :
622 20 : return sysid;
623 : }
624 :
625 : /*
626 : * Obtain the system identifier from control file. It will be used to compare
627 : * if a data directory is a clone of another one. This routine is used locally
628 : * and avoids a connection.
629 : */
630 : static uint64
631 20 : get_standby_sysid(const char *datadir)
632 : {
633 : ControlFileData *cf;
634 : bool crc_ok;
635 : uint64 sysid;
636 :
637 20 : pg_log_info("getting system identifier from subscriber");
638 :
639 20 : cf = get_controlfile(datadir, &crc_ok);
640 20 : if (!crc_ok)
641 0 : pg_fatal("control file appears to be corrupt");
642 :
643 20 : sysid = cf->system_identifier;
644 :
645 20 : pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
646 :
647 20 : pg_free(cf);
648 :
649 20 : return sysid;
650 : }
651 :
652 : /*
653 : * Modify the system identifier. Since a standby server preserves the system
654 : * identifier, it makes sense to change it to avoid situations in which WAL
655 : * files from one of the systems might be used in the other one.
656 : */
657 : static void
658 8 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
659 : {
660 : ControlFileData *cf;
661 : bool crc_ok;
662 : struct timeval tv;
663 :
664 : char *cmd_str;
665 :
666 8 : pg_log_info("modifying system identifier of subscriber");
667 :
668 8 : cf = get_controlfile(subscriber_dir, &crc_ok);
669 8 : if (!crc_ok)
670 0 : pg_fatal("control file appears to be corrupt");
671 :
672 : /*
673 : * Select a new system identifier.
674 : *
675 : * XXX this code was extracted from BootStrapXLOG().
676 : */
677 8 : gettimeofday(&tv, NULL);
678 8 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
679 8 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
680 8 : cf->system_identifier |= getpid() & 0xFFF;
681 :
682 8 : if (!dry_run)
683 2 : update_controlfile(subscriber_dir, cf, true);
684 :
685 8 : pg_log_info("system identifier is %" PRIu64 " on subscriber",
686 : cf->system_identifier);
687 :
688 8 : pg_log_info("running pg_resetwal on the subscriber");
689 :
690 8 : cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
691 : subscriber_dir, DEVNULL);
692 :
693 8 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
694 :
695 8 : if (!dry_run)
696 : {
697 2 : int rc = system(cmd_str);
698 :
699 2 : if (rc == 0)
700 2 : pg_log_info("subscriber successfully changed the system identifier");
701 : else
702 0 : pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
703 : }
704 :
705 8 : pg_free(cf);
706 8 : }
707 :
708 : /*
709 : * Generate an object name using a prefix, database oid and a random integer.
710 : * It is used in case the user does not specify an object name (publication,
711 : * subscription, replication slot).
712 : */
713 : static char *
714 16 : generate_object_name(PGconn *conn)
715 : {
716 : PGresult *res;
717 : Oid oid;
718 : uint32 rand;
719 : char *objname;
720 :
721 16 : res = PQexec(conn,
722 : "SELECT oid FROM pg_catalog.pg_database "
723 : "WHERE datname = pg_catalog.current_database()");
724 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
725 : {
726 0 : pg_log_error("could not obtain database OID: %s",
727 : PQresultErrorMessage(res));
728 0 : disconnect_database(conn, true);
729 : }
730 :
731 16 : if (PQntuples(res) != 1)
732 : {
733 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
734 : PQntuples(res), 1);
735 0 : disconnect_database(conn, true);
736 : }
737 :
738 : /* Database OID */
739 16 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
740 :
741 16 : PQclear(res);
742 :
743 : /* Random unsigned integer */
744 16 : rand = pg_prng_uint32(&prng_state);
745 :
746 : /*
747 : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
748 : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
749 : * '\0').
750 : */
751 16 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
752 :
753 16 : return objname;
754 : }
755 :
756 : /*
757 : * Create the publications and replication slots in preparation for logical
758 : * replication. Returns the LSN from latest replication slot. It will be the
759 : * replication start point that is used to adjust the subscriptions (see
760 : * set_replication_progress).
761 : */
762 : static char *
763 8 : setup_publisher(struct LogicalRepInfo *dbinfo)
764 : {
765 8 : char *lsn = NULL;
766 :
767 8 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
768 :
769 24 : for (int i = 0; i < num_dbs; i++)
770 : {
771 : PGconn *conn;
772 16 : char *genname = NULL;
773 :
774 16 : conn = connect_database(dbinfo[i].pubconninfo, true);
775 :
776 : /*
777 : * If an object name was not specified as command-line options, assign
778 : * a generated object name. The replication slot has a different rule.
779 : * The subscription name is assigned to the replication slot name if
780 : * no replication slot is specified. It follows the same rule as
781 : * CREATE SUBSCRIPTION.
782 : */
783 16 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
784 16 : genname = generate_object_name(conn);
785 16 : if (num_pubs == 0)
786 8 : dbinfo[i].pubname = pg_strdup(genname);
787 16 : if (num_subs == 0)
788 12 : dbinfo[i].subname = pg_strdup(genname);
789 16 : if (num_replslots == 0)
790 10 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
791 :
792 : /*
793 : * Create publication on publisher. This step should be executed
794 : * *before* promoting the subscriber to avoid any transactions between
795 : * consistent LSN and the new publication rows (such transactions
796 : * wouldn't see the new publication rows resulting in an error).
797 : */
798 16 : create_publication(conn, &dbinfo[i]);
799 :
800 : /* Create replication slot on publisher */
801 16 : if (lsn)
802 2 : pg_free(lsn);
803 16 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
804 16 : if (lsn == NULL && !dry_run)
805 0 : exit(1);
806 :
807 : /*
808 : * Since we are using the LSN returned by the last replication slot as
809 : * recovery_target_lsn, this LSN is ahead of the current WAL position
810 : * and the recovery waits until the publisher writes a WAL record to
811 : * reach the target and ends the recovery. On idle systems, this wait
812 : * time is unpredictable and could lead to failure in promoting the
813 : * subscriber. To avoid that, insert a harmless WAL record.
814 : */
815 16 : if (i == num_dbs - 1 && !dry_run)
816 : {
817 : PGresult *res;
818 :
819 2 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
820 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
821 : {
822 0 : pg_log_error("could not write an additional WAL record: %s",
823 : PQresultErrorMessage(res));
824 0 : disconnect_database(conn, true);
825 : }
826 2 : PQclear(res);
827 : }
828 :
829 16 : disconnect_database(conn, false);
830 : }
831 :
832 8 : return lsn;
833 : }
834 :
835 : /*
836 : * Is recovery still in progress?
837 : */
838 : static bool
839 36 : server_is_in_recovery(PGconn *conn)
840 : {
841 : PGresult *res;
842 : int ret;
843 :
844 36 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
845 :
846 36 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
847 : {
848 0 : pg_log_error("could not obtain recovery progress: %s",
849 : PQresultErrorMessage(res));
850 0 : disconnect_database(conn, true);
851 : }
852 :
853 :
854 36 : ret = strcmp("t", PQgetvalue(res, 0, 0));
855 :
856 36 : PQclear(res);
857 :
858 36 : return ret == 0;
859 : }
860 :
861 : /*
862 : * Is the primary server ready for logical replication?
863 : *
864 : * XXX Does it not allow a synchronous replica?
865 : */
866 : static void
867 12 : check_publisher(const struct LogicalRepInfo *dbinfo)
868 : {
869 : PGconn *conn;
870 : PGresult *res;
871 12 : bool failed = false;
872 :
873 : char *wal_level;
874 : int max_repslots;
875 : int cur_repslots;
876 : int max_walsenders;
877 : int cur_walsenders;
878 : int max_prepared_transactions;
879 : char *max_slot_wal_keep_size;
880 :
881 12 : pg_log_info("checking settings on publisher");
882 :
883 12 : conn = connect_database(dbinfo[0].pubconninfo, true);
884 :
885 : /*
886 : * If the primary server is in recovery (i.e. cascading replication),
887 : * objects (publication) cannot be created because it is read only.
888 : */
889 12 : if (server_is_in_recovery(conn))
890 : {
891 2 : pg_log_error("primary server cannot be in recovery");
892 2 : disconnect_database(conn, true);
893 : }
894 :
895 : /*------------------------------------------------------------------------
896 : * Logical replication requires a few parameters to be set on publisher.
897 : * Since these parameters are not a requirement for physical replication,
898 : * we should check it to make sure it won't fail.
899 : *
900 : * - wal_level = logical
901 : * - max_replication_slots >= current + number of dbs to be converted
902 : * - max_wal_senders >= current + number of dbs to be converted
903 : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
904 : * -----------------------------------------------------------------------
905 : */
906 10 : res = PQexec(conn,
907 : "SELECT pg_catalog.current_setting('wal_level'),"
908 : " pg_catalog.current_setting('max_replication_slots'),"
909 : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
910 : " pg_catalog.current_setting('max_wal_senders'),"
911 : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
912 : " pg_catalog.current_setting('max_prepared_transactions'),"
913 : " pg_catalog.current_setting('max_slot_wal_keep_size')");
914 :
915 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
916 : {
917 0 : pg_log_error("could not obtain publisher settings: %s",
918 : PQresultErrorMessage(res));
919 0 : disconnect_database(conn, true);
920 : }
921 :
922 10 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
923 10 : max_repslots = atoi(PQgetvalue(res, 0, 1));
924 10 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
925 10 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
926 10 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
927 10 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
928 10 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
929 :
930 10 : PQclear(res);
931 :
932 10 : pg_log_debug("publisher: wal_level: %s", wal_level);
933 10 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
934 10 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
935 10 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
936 10 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
937 10 : pg_log_debug("publisher: max_prepared_transactions: %d",
938 : max_prepared_transactions);
939 10 : pg_log_debug("publisher: max_slot_wal_keep_size: %s",
940 : max_slot_wal_keep_size);
941 :
942 10 : disconnect_database(conn, false);
943 :
944 10 : if (strcmp(wal_level, "logical") != 0)
945 : {
946 2 : pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
947 2 : failed = true;
948 : }
949 :
950 10 : if (max_repslots - cur_repslots < num_dbs)
951 : {
952 2 : pg_log_error("publisher requires %d replication slots, but only %d remain",
953 : num_dbs, max_repslots - cur_repslots);
954 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
955 : "max_replication_slots", cur_repslots + num_dbs);
956 2 : failed = true;
957 : }
958 :
959 10 : if (max_walsenders - cur_walsenders < num_dbs)
960 : {
961 2 : pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
962 : num_dbs, max_walsenders - cur_walsenders);
963 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
964 : "max_wal_senders", cur_walsenders + num_dbs);
965 2 : failed = true;
966 : }
967 :
968 10 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
969 : {
970 0 : pg_log_warning("two_phase option will not be enabled for replication slots");
971 0 : pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
972 : "Prepared transactions will be replicated at COMMIT PREPARED.");
973 0 : pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
974 : }
975 :
976 : /*
977 : * Validate 'max_slot_wal_keep_size'. If this parameter is set to a
978 : * non-default value, it may cause replication failures due to required
979 : * WAL files being prematurely removed.
980 : */
981 10 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
982 : {
983 0 : pg_log_warning("required WAL could be removed from the publisher");
984 0 : pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
985 : "max_slot_wal_keep_size");
986 : }
987 :
988 10 : pg_free(wal_level);
989 :
990 10 : if (failed)
991 2 : exit(1);
992 8 : }
993 :
994 : /*
995 : * Is the standby server ready for logical replication?
996 : *
997 : * XXX Does it not allow a time-delayed replica?
998 : *
999 : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1000 : * is S, it cannot detect there is a replica (server C) because server S starts
1001 : * accepting only local connections and server C cannot connect to it. Hence,
1002 : * there is not a reliable way to provide a suitable error saying the server C
1003 : * will be broken at the end of this process (due to pg_resetwal).
1004 : */
1005 : static void
1006 16 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1007 : {
1008 : PGconn *conn;
1009 : PGresult *res;
1010 16 : bool failed = false;
1011 :
1012 : int max_lrworkers;
1013 : int max_reporigins;
1014 : int max_wprocs;
1015 :
1016 16 : pg_log_info("checking settings on subscriber");
1017 :
1018 16 : conn = connect_database(dbinfo[0].subconninfo, true);
1019 :
1020 : /* The target server must be a standby */
1021 16 : if (!server_is_in_recovery(conn))
1022 : {
1023 2 : pg_log_error("target server must be a standby");
1024 2 : disconnect_database(conn, true);
1025 : }
1026 :
1027 : /*------------------------------------------------------------------------
1028 : * Logical replication requires a few parameters to be set on subscriber.
1029 : * Since these parameters are not a requirement for physical replication,
1030 : * we should check it to make sure it won't fail.
1031 : *
1032 : * - max_active_replication_origins >= number of dbs to be converted
1033 : * - max_logical_replication_workers >= number of dbs to be converted
1034 : * - max_worker_processes >= 1 + number of dbs to be converted
1035 : *------------------------------------------------------------------------
1036 : */
1037 14 : res = PQexec(conn,
1038 : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1039 : "'max_logical_replication_workers', "
1040 : "'max_active_replication_origins', "
1041 : "'max_worker_processes', "
1042 : "'primary_slot_name') "
1043 : "ORDER BY name");
1044 :
1045 14 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1046 : {
1047 0 : pg_log_error("could not obtain subscriber settings: %s",
1048 : PQresultErrorMessage(res));
1049 0 : disconnect_database(conn, true);
1050 : }
1051 :
1052 14 : max_reporigins = atoi(PQgetvalue(res, 0, 0));
1053 14 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1054 14 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1055 14 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1056 12 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1057 :
1058 14 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1059 : max_lrworkers);
1060 14 : pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1061 14 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1062 14 : if (primary_slot_name)
1063 12 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1064 :
1065 14 : PQclear(res);
1066 :
1067 14 : disconnect_database(conn, false);
1068 :
1069 14 : if (max_reporigins < num_dbs)
1070 : {
1071 2 : pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1072 : num_dbs, max_reporigins);
1073 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1074 : "max_active_replication_origins", num_dbs);
1075 2 : failed = true;
1076 : }
1077 :
1078 14 : if (max_lrworkers < num_dbs)
1079 : {
1080 2 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1081 : num_dbs, max_lrworkers);
1082 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1083 : "max_logical_replication_workers", num_dbs);
1084 2 : failed = true;
1085 : }
1086 :
1087 14 : if (max_wprocs < num_dbs + 1)
1088 : {
1089 2 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1090 : num_dbs + 1, max_wprocs);
1091 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1092 : "max_worker_processes", num_dbs + 1);
1093 2 : failed = true;
1094 : }
1095 :
1096 14 : if (failed)
1097 2 : exit(1);
1098 12 : }
1099 :
1100 : /*
1101 : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1102 : * the primary (publisher node) and the newly created subscriber. We
1103 : * shouldn't drop the associated slot as that would be used by the publisher
1104 : * node.
1105 : */
1106 : static void
1107 8 : drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
1108 : {
1109 8 : PQExpBuffer query = createPQExpBuffer();
1110 : PGresult *res;
1111 :
1112 : Assert(conn != NULL);
1113 :
1114 : /*
1115 : * Construct a query string. These commands are allowed to be executed
1116 : * within a transaction.
1117 : */
1118 8 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1119 : subname);
1120 8 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1121 : subname);
1122 8 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1123 :
1124 8 : pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1125 : subname, dbname);
1126 :
1127 8 : if (!dry_run)
1128 : {
1129 2 : res = PQexec(conn, query->data);
1130 :
1131 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1132 : {
1133 0 : pg_log_error("could not drop subscription \"%s\": %s",
1134 : subname, PQresultErrorMessage(res));
1135 0 : disconnect_database(conn, true);
1136 : }
1137 :
1138 2 : PQclear(res);
1139 : }
1140 :
1141 8 : destroyPQExpBuffer(query);
1142 8 : }
1143 :
1144 : /*
1145 : * Retrieve and drop the pre-existing subscriptions.
1146 : */
1147 : static void
1148 16 : check_and_drop_existing_subscriptions(PGconn *conn,
1149 : const struct LogicalRepInfo *dbinfo)
1150 : {
1151 16 : PQExpBuffer query = createPQExpBuffer();
1152 : char *dbname;
1153 : PGresult *res;
1154 :
1155 : Assert(conn != NULL);
1156 :
1157 16 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1158 :
1159 16 : appendPQExpBuffer(query,
1160 : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1161 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1162 : "WHERE d.datname = %s",
1163 : dbname);
1164 16 : res = PQexec(conn, query->data);
1165 :
1166 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1167 : {
1168 0 : pg_log_error("could not obtain pre-existing subscriptions: %s",
1169 : PQresultErrorMessage(res));
1170 0 : disconnect_database(conn, true);
1171 : }
1172 :
1173 24 : for (int i = 0; i < PQntuples(res); i++)
1174 8 : drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
1175 8 : dbinfo->dbname);
1176 :
1177 16 : PQclear(res);
1178 16 : destroyPQExpBuffer(query);
1179 16 : PQfreemem(dbname);
1180 16 : }
1181 :
1182 : /*
1183 : * Create the subscriptions, adjust the initial location for logical
1184 : * replication and enable the subscriptions. That's the last step for logical
1185 : * replication setup.
1186 : */
1187 : static void
1188 8 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1189 : {
1190 24 : for (int i = 0; i < num_dbs; i++)
1191 : {
1192 : PGconn *conn;
1193 :
1194 : /* Connect to subscriber. */
1195 16 : conn = connect_database(dbinfo[i].subconninfo, true);
1196 :
1197 : /*
1198 : * We don't need the pre-existing subscriptions on the newly formed
1199 : * subscriber. They can connect to other publisher nodes and either
1200 : * get some unwarranted data or can lead to ERRORs in connecting to
1201 : * such nodes.
1202 : */
1203 16 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1204 :
1205 : /* Check and drop the required publications in the given database. */
1206 16 : check_and_drop_publications(conn, &dbinfo[i]);
1207 :
1208 16 : create_subscription(conn, &dbinfo[i]);
1209 :
1210 : /* Set the replication progress to the correct LSN */
1211 16 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1212 :
1213 : /* Enable subscription */
1214 16 : enable_subscription(conn, &dbinfo[i]);
1215 :
1216 16 : disconnect_database(conn, false);
1217 : }
1218 8 : }
1219 :
1220 : /*
1221 : * Write the required recovery parameters.
1222 : */
1223 : static void
1224 8 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1225 : {
1226 : PGconn *conn;
1227 : PQExpBuffer recoveryconfcontents;
1228 :
1229 : /*
1230 : * Despite of the recovery parameters will be written to the subscriber,
1231 : * use a publisher connection. The primary_conninfo is generated using the
1232 : * connection settings.
1233 : */
1234 8 : conn = connect_database(dbinfo[0].pubconninfo, true);
1235 :
1236 : /*
1237 : * Write recovery parameters.
1238 : *
1239 : * The subscriber is not running yet. In dry run mode, the recovery
1240 : * parameters *won't* be written. An invalid LSN is used for printing
1241 : * purposes. Additional recovery parameters are added here. It avoids
1242 : * unexpected behavior such as end of recovery as soon as a consistent
1243 : * state is reached (recovery_target) and failure due to multiple recovery
1244 : * targets (name, time, xid, LSN).
1245 : */
1246 8 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1247 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1248 8 : appendPQExpBufferStr(recoveryconfcontents,
1249 : "recovery_target_timeline = 'latest'\n");
1250 :
1251 : /*
1252 : * Set recovery_target_inclusive = false to avoid reapplying the
1253 : * transaction committed at 'lsn' after subscription is enabled. This is
1254 : * because the provided 'lsn' is also used as the replication start point
1255 : * for the subscription. So, the server can send the transaction committed
1256 : * at that 'lsn' after replication is started which can lead to applying
1257 : * the same transaction twice if we keep recovery_target_inclusive = true.
1258 : */
1259 8 : appendPQExpBufferStr(recoveryconfcontents,
1260 : "recovery_target_inclusive = false\n");
1261 8 : appendPQExpBufferStr(recoveryconfcontents,
1262 : "recovery_target_action = promote\n");
1263 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1264 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1265 8 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1266 :
1267 8 : if (dry_run)
1268 : {
1269 6 : appendPQExpBufferStr(recoveryconfcontents, "# dry run mode");
1270 6 : appendPQExpBuffer(recoveryconfcontents,
1271 : "recovery_target_lsn = '%X/%08X'\n",
1272 6 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1273 : }
1274 : else
1275 : {
1276 2 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1277 : lsn);
1278 2 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1279 : }
1280 8 : disconnect_database(conn, false);
1281 :
1282 8 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1283 8 : }
1284 :
1285 : /*
1286 : * Drop physical replication slot on primary if the standby was using it. After
1287 : * the transformation, it has no use.
1288 : *
1289 : * XXX we might not fail here. Instead, we provide a warning so the user
1290 : * eventually drops this replication slot later.
1291 : */
1292 : static void
1293 8 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1294 : {
1295 : PGconn *conn;
1296 :
1297 : /* Replication slot does not exist, do nothing */
1298 8 : if (!primary_slot_name)
1299 0 : return;
1300 :
1301 8 : conn = connect_database(dbinfo[0].pubconninfo, false);
1302 8 : if (conn != NULL)
1303 : {
1304 8 : drop_replication_slot(conn, &dbinfo[0], slotname);
1305 8 : disconnect_database(conn, false);
1306 : }
1307 : else
1308 : {
1309 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1310 : slotname);
1311 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1312 : }
1313 : }
1314 :
1315 : /*
1316 : * Drop failover replication slots on subscriber. After the transformation,
1317 : * they have no use.
1318 : *
1319 : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1320 : * them later.
1321 : */
1322 : static void
1323 8 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1324 : {
1325 : PGconn *conn;
1326 : PGresult *res;
1327 :
1328 8 : conn = connect_database(dbinfo[0].subconninfo, false);
1329 8 : if (conn != NULL)
1330 : {
1331 : /* Get failover replication slot names */
1332 8 : res = PQexec(conn,
1333 : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1334 :
1335 8 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1336 : {
1337 : /* Remove failover replication slots from subscriber */
1338 16 : for (int i = 0; i < PQntuples(res); i++)
1339 8 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1340 : }
1341 : else
1342 : {
1343 0 : pg_log_warning("could not obtain failover replication slot information: %s",
1344 : PQresultErrorMessage(res));
1345 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1346 : }
1347 :
1348 8 : PQclear(res);
1349 8 : disconnect_database(conn, false);
1350 : }
1351 : else
1352 : {
1353 0 : pg_log_warning("could not drop failover replication slot");
1354 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1355 : }
1356 8 : }
1357 :
1358 : /*
1359 : * Create a logical replication slot and returns a LSN.
1360 : *
1361 : * CreateReplicationSlot() is not used because it does not provide the one-row
1362 : * result set that contains the LSN.
1363 : */
1364 : static char *
1365 16 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1366 : {
1367 16 : PQExpBuffer str = createPQExpBuffer();
1368 16 : PGresult *res = NULL;
1369 16 : const char *slot_name = dbinfo->replslotname;
1370 : char *slot_name_esc;
1371 16 : char *lsn = NULL;
1372 :
1373 : Assert(conn != NULL);
1374 :
1375 16 : pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1376 : slot_name, dbinfo->dbname);
1377 :
1378 16 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1379 :
1380 16 : appendPQExpBuffer(str,
1381 : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1382 : slot_name_esc,
1383 16 : dbinfos.two_phase ? "true" : "false");
1384 :
1385 16 : PQfreemem(slot_name_esc);
1386 :
1387 16 : pg_log_debug("command is: %s", str->data);
1388 :
1389 16 : if (!dry_run)
1390 : {
1391 4 : res = PQexec(conn, str->data);
1392 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1393 : {
1394 0 : pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1395 : slot_name, dbinfo->dbname,
1396 : PQresultErrorMessage(res));
1397 0 : PQclear(res);
1398 0 : destroyPQExpBuffer(str);
1399 0 : return NULL;
1400 : }
1401 :
1402 4 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1403 4 : PQclear(res);
1404 : }
1405 :
1406 : /* For cleanup purposes */
1407 16 : dbinfo->made_replslot = true;
1408 :
1409 16 : destroyPQExpBuffer(str);
1410 :
1411 16 : return lsn;
1412 : }
1413 :
1414 : static void
1415 16 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1416 : const char *slot_name)
1417 : {
1418 16 : PQExpBuffer str = createPQExpBuffer();
1419 : char *slot_name_esc;
1420 : PGresult *res;
1421 :
1422 : Assert(conn != NULL);
1423 :
1424 16 : pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1425 : slot_name, dbinfo->dbname);
1426 :
1427 16 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1428 :
1429 16 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1430 :
1431 16 : PQfreemem(slot_name_esc);
1432 :
1433 16 : pg_log_debug("command is: %s", str->data);
1434 :
1435 16 : if (!dry_run)
1436 : {
1437 4 : res = PQexec(conn, str->data);
1438 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1439 : {
1440 0 : pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1441 : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1442 0 : dbinfo->made_replslot = false; /* don't try again. */
1443 : }
1444 :
1445 4 : PQclear(res);
1446 : }
1447 :
1448 16 : destroyPQExpBuffer(str);
1449 16 : }
1450 :
1451 : /*
1452 : * Reports a suitable message if pg_ctl fails.
1453 : */
1454 : static void
1455 48 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1456 : {
1457 48 : if (rc != 0)
1458 : {
1459 0 : if (WIFEXITED(rc))
1460 : {
1461 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1462 : }
1463 0 : else if (WIFSIGNALED(rc))
1464 : {
1465 : #if defined(WIN32)
1466 : pg_log_error("pg_ctl was terminated by exception 0x%X",
1467 : WTERMSIG(rc));
1468 : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1469 : #else
1470 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1471 : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1472 : #endif
1473 : }
1474 : else
1475 : {
1476 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1477 : }
1478 :
1479 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1480 0 : exit(1);
1481 : }
1482 48 : }
1483 :
1484 : static void
1485 24 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1486 : bool restrict_logical_worker)
1487 : {
1488 24 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1489 : int rc;
1490 :
1491 24 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1492 24 : appendShellString(pg_ctl_cmd, subscriber_dir);
1493 24 : appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1494 :
1495 : /* Prevent unintended slot invalidation */
1496 24 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1497 :
1498 24 : if (restricted_access)
1499 : {
1500 24 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1501 : #if !defined(WIN32)
1502 :
1503 : /*
1504 : * An empty listen_addresses list means the server does not listen on
1505 : * any IP interfaces; only Unix-domain sockets can be used to connect
1506 : * to the server. Prevent external connections to minimize the chance
1507 : * of failure.
1508 : */
1509 24 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1510 24 : if (opt->socket_dir)
1511 24 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1512 24 : opt->socket_dir);
1513 24 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1514 : #endif
1515 : }
1516 24 : if (opt->config_file != NULL)
1517 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1518 0 : opt->config_file);
1519 :
1520 : /* Suppress to start logical replication if requested */
1521 24 : if (restrict_logical_worker)
1522 8 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1523 :
1524 24 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1525 24 : rc = system(pg_ctl_cmd->data);
1526 24 : pg_ctl_status(pg_ctl_cmd->data, rc);
1527 24 : standby_running = true;
1528 24 : destroyPQExpBuffer(pg_ctl_cmd);
1529 24 : pg_log_info("server was started");
1530 24 : }
1531 :
1532 : static void
1533 24 : stop_standby_server(const char *datadir)
1534 : {
1535 : char *pg_ctl_cmd;
1536 : int rc;
1537 :
1538 24 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1539 : datadir);
1540 24 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1541 24 : rc = system(pg_ctl_cmd);
1542 24 : pg_ctl_status(pg_ctl_cmd, rc);
1543 24 : standby_running = false;
1544 24 : pg_log_info("server was stopped");
1545 24 : }
1546 :
1547 : /*
1548 : * Returns after the server finishes the recovery process.
1549 : *
1550 : * If recovery_timeout option is set, terminate abnormally without finishing
1551 : * the recovery process. By default, it waits forever.
1552 : *
1553 : * XXX Is the recovery process still in progress? When recovery process has a
1554 : * better progress reporting mechanism, it should be added here.
1555 : */
1556 : static void
1557 8 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1558 : {
1559 : PGconn *conn;
1560 8 : int status = POSTMASTER_STILL_STARTING;
1561 8 : int timer = 0;
1562 :
1563 8 : pg_log_info("waiting for the target server to reach the consistent state");
1564 :
1565 8 : conn = connect_database(conninfo, true);
1566 :
1567 : for (;;)
1568 0 : {
1569 8 : bool in_recovery = server_is_in_recovery(conn);
1570 :
1571 : /*
1572 : * Does the recovery process finish? In dry run mode, there is no
1573 : * recovery mode. Bail out as the recovery process has ended.
1574 : */
1575 8 : if (!in_recovery || dry_run)
1576 : {
1577 8 : status = POSTMASTER_READY;
1578 8 : recovery_ended = true;
1579 8 : break;
1580 : }
1581 :
1582 : /* Bail out after recovery_timeout seconds if this option is set */
1583 0 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1584 : {
1585 0 : stop_standby_server(subscriber_dir);
1586 0 : pg_log_error("recovery timed out");
1587 0 : disconnect_database(conn, true);
1588 : }
1589 :
1590 : /* Keep waiting */
1591 0 : pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
1592 :
1593 0 : timer += WAIT_INTERVAL;
1594 : }
1595 :
1596 8 : disconnect_database(conn, false);
1597 :
1598 8 : if (status == POSTMASTER_STILL_STARTING)
1599 0 : pg_fatal("server did not end recovery");
1600 :
1601 8 : pg_log_info("target server reached the consistent state");
1602 8 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1603 8 : }
1604 :
1605 : /*
1606 : * Create a publication that includes all tables in the database.
1607 : */
1608 : static void
1609 16 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1610 : {
1611 16 : PQExpBuffer str = createPQExpBuffer();
1612 : PGresult *res;
1613 : char *ipubname_esc;
1614 : char *spubname_esc;
1615 :
1616 : Assert(conn != NULL);
1617 :
1618 16 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1619 16 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1620 :
1621 : /* Check if the publication already exists */
1622 16 : appendPQExpBuffer(str,
1623 : "SELECT 1 FROM pg_catalog.pg_publication "
1624 : "WHERE pubname = %s",
1625 : spubname_esc);
1626 16 : res = PQexec(conn, str->data);
1627 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1628 : {
1629 0 : pg_log_error("could not obtain publication information: %s",
1630 : PQresultErrorMessage(res));
1631 0 : disconnect_database(conn, true);
1632 : }
1633 :
1634 16 : if (PQntuples(res) == 1)
1635 : {
1636 : /*
1637 : * Unfortunately, if it reaches this code path, it will always fail
1638 : * (unless you decide to change the existing publication name). That's
1639 : * bad but it is very unlikely that the user will choose a name with
1640 : * pg_createsubscriber_ prefix followed by the exact database oid and
1641 : * a random number.
1642 : */
1643 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1644 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
1645 0 : disconnect_database(conn, true);
1646 : }
1647 :
1648 16 : PQclear(res);
1649 16 : resetPQExpBuffer(str);
1650 :
1651 16 : pg_log_info("creating publication \"%s\" in database \"%s\"",
1652 : dbinfo->pubname, dbinfo->dbname);
1653 :
1654 16 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1655 : ipubname_esc);
1656 :
1657 16 : pg_log_debug("command is: %s", str->data);
1658 :
1659 16 : if (!dry_run)
1660 : {
1661 4 : res = PQexec(conn, str->data);
1662 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1663 : {
1664 0 : pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1665 : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1666 0 : disconnect_database(conn, true);
1667 : }
1668 4 : PQclear(res);
1669 : }
1670 :
1671 : /* For cleanup purposes */
1672 16 : dbinfo->made_publication = true;
1673 :
1674 16 : PQfreemem(ipubname_esc);
1675 16 : PQfreemem(spubname_esc);
1676 16 : destroyPQExpBuffer(str);
1677 16 : }
1678 :
1679 : /*
1680 : * Drop the specified publication in the given database.
1681 : */
1682 : static void
1683 20 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1684 : bool *made_publication)
1685 : {
1686 20 : PQExpBuffer str = createPQExpBuffer();
1687 : PGresult *res;
1688 : char *pubname_esc;
1689 :
1690 : Assert(conn != NULL);
1691 :
1692 20 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1693 :
1694 20 : pg_log_info("dropping publication \"%s\" in database \"%s\"",
1695 : pubname, dbname);
1696 :
1697 20 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1698 :
1699 20 : PQfreemem(pubname_esc);
1700 :
1701 20 : pg_log_debug("command is: %s", str->data);
1702 :
1703 20 : if (!dry_run)
1704 : {
1705 8 : res = PQexec(conn, str->data);
1706 8 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1707 : {
1708 0 : pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1709 : pubname, dbname, PQresultErrorMessage(res));
1710 0 : *made_publication = false; /* don't try again. */
1711 :
1712 : /*
1713 : * Don't disconnect and exit here. This routine is used by primary
1714 : * (cleanup publication / replication slot due to an error) and
1715 : * subscriber (remove the replicated publications). In both cases,
1716 : * it can continue and provide instructions for the user to remove
1717 : * it later if cleanup fails.
1718 : */
1719 : }
1720 8 : PQclear(res);
1721 : }
1722 :
1723 20 : destroyPQExpBuffer(str);
1724 20 : }
1725 :
1726 : /*
1727 : * Retrieve and drop the publications.
1728 : *
1729 : * Since the publications were created before the consistent LSN, they
1730 : * remain on the subscriber even after the physical replica is
1731 : * promoted. Remove these publications from the subscriber because
1732 : * they have no use. Additionally, if requested, drop all pre-existing
1733 : * publications.
1734 : */
1735 : static void
1736 16 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
1737 : {
1738 : PGresult *res;
1739 16 : bool drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
1740 :
1741 : Assert(conn != NULL);
1742 :
1743 16 : if (drop_all_pubs)
1744 : {
1745 4 : pg_log_info("dropping all existing publications in database \"%s\"",
1746 : dbinfo->dbname);
1747 :
1748 : /* Fetch all publication names */
1749 4 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1750 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1751 : {
1752 0 : pg_log_error("could not obtain publication information: %s",
1753 : PQresultErrorMessage(res));
1754 0 : PQclear(res);
1755 0 : disconnect_database(conn, true);
1756 : }
1757 :
1758 : /* Drop each publication */
1759 12 : for (int i = 0; i < PQntuples(res); i++)
1760 8 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1761 : &dbinfo->made_publication);
1762 :
1763 4 : PQclear(res);
1764 : }
1765 :
1766 : /*
1767 : * In dry-run mode, we don't create publications, but we still try to drop
1768 : * those to provide necessary information to the user.
1769 : */
1770 16 : if (!drop_all_pubs || dry_run)
1771 12 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1772 : &dbinfo->made_publication);
1773 16 : }
1774 :
1775 : /*
1776 : * Create a subscription with some predefined options.
1777 : *
1778 : * A replication slot was already created in a previous step. Let's use it. It
1779 : * is not required to copy data. The subscription will be created but it will
1780 : * not be enabled now. That's because the replication progress must be set and
1781 : * the replication origin name (one of the function arguments) contains the
1782 : * subscription OID in its name. Once the subscription is created,
1783 : * set_replication_progress() can obtain the chosen origin name and set up its
1784 : * initial location.
1785 : */
1786 : static void
1787 16 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1788 : {
1789 16 : PQExpBuffer str = createPQExpBuffer();
1790 : PGresult *res;
1791 : char *pubname_esc;
1792 : char *subname_esc;
1793 : char *pubconninfo_esc;
1794 : char *replslotname_esc;
1795 :
1796 : Assert(conn != NULL);
1797 :
1798 16 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1799 16 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1800 16 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1801 16 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1802 :
1803 16 : pg_log_info("creating subscription \"%s\" in database \"%s\"",
1804 : dbinfo->subname, dbinfo->dbname);
1805 :
1806 16 : appendPQExpBuffer(str,
1807 : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1808 : "WITH (create_slot = false, enabled = false, "
1809 : "slot_name = %s, copy_data = false, two_phase = %s)",
1810 : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1811 16 : dbinfos.two_phase ? "true" : "false");
1812 :
1813 16 : PQfreemem(pubname_esc);
1814 16 : PQfreemem(subname_esc);
1815 16 : PQfreemem(pubconninfo_esc);
1816 16 : PQfreemem(replslotname_esc);
1817 :
1818 16 : pg_log_debug("command is: %s", str->data);
1819 :
1820 16 : if (!dry_run)
1821 : {
1822 4 : res = PQexec(conn, str->data);
1823 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1824 : {
1825 0 : pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1826 : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1827 0 : disconnect_database(conn, true);
1828 : }
1829 4 : PQclear(res);
1830 : }
1831 :
1832 16 : destroyPQExpBuffer(str);
1833 16 : }
1834 :
1835 : /*
1836 : * Sets the replication progress to the consistent LSN.
1837 : *
1838 : * The subscriber caught up to the consistent LSN provided by the last
1839 : * replication slot that was created. The goal is to set up the initial
1840 : * location for the logical replication that is the exact LSN that the
1841 : * subscriber was promoted. Once the subscription is enabled it will start
1842 : * streaming from that location onwards. In dry run mode, the subscription OID
1843 : * and LSN are set to invalid values for printing purposes.
1844 : */
1845 : static void
1846 16 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1847 : {
1848 16 : PQExpBuffer str = createPQExpBuffer();
1849 : PGresult *res;
1850 : Oid suboid;
1851 : char *subname;
1852 : char *dbname;
1853 : char *originname;
1854 : char *lsnstr;
1855 :
1856 : Assert(conn != NULL);
1857 :
1858 16 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1859 16 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1860 :
1861 16 : appendPQExpBuffer(str,
1862 : "SELECT s.oid FROM pg_catalog.pg_subscription s "
1863 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1864 : "WHERE s.subname = %s AND d.datname = %s",
1865 : subname, dbname);
1866 :
1867 16 : res = PQexec(conn, str->data);
1868 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1869 : {
1870 0 : pg_log_error("could not obtain subscription OID: %s",
1871 : PQresultErrorMessage(res));
1872 0 : disconnect_database(conn, true);
1873 : }
1874 :
1875 16 : if (PQntuples(res) != 1 && !dry_run)
1876 : {
1877 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1878 : PQntuples(res), 1);
1879 0 : disconnect_database(conn, true);
1880 : }
1881 :
1882 16 : if (dry_run)
1883 : {
1884 12 : suboid = InvalidOid;
1885 12 : lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1886 : }
1887 : else
1888 : {
1889 4 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1890 4 : lsnstr = psprintf("%s", lsn);
1891 : }
1892 :
1893 16 : PQclear(res);
1894 :
1895 : /*
1896 : * The origin name is defined as pg_%u. %u is the subscription OID. See
1897 : * ApplyWorkerMain().
1898 : */
1899 16 : originname = psprintf("pg_%u", suboid);
1900 :
1901 16 : pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1902 : originname, lsnstr, dbinfo->dbname);
1903 :
1904 16 : resetPQExpBuffer(str);
1905 16 : appendPQExpBuffer(str,
1906 : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1907 : originname, lsnstr);
1908 :
1909 16 : pg_log_debug("command is: %s", str->data);
1910 :
1911 16 : if (!dry_run)
1912 : {
1913 4 : res = PQexec(conn, str->data);
1914 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1915 : {
1916 0 : pg_log_error("could not set replication progress for subscription \"%s\": %s",
1917 : dbinfo->subname, PQresultErrorMessage(res));
1918 0 : disconnect_database(conn, true);
1919 : }
1920 4 : PQclear(res);
1921 : }
1922 :
1923 16 : PQfreemem(subname);
1924 16 : PQfreemem(dbname);
1925 16 : pg_free(originname);
1926 16 : pg_free(lsnstr);
1927 16 : destroyPQExpBuffer(str);
1928 16 : }
1929 :
1930 : /*
1931 : * Enables the subscription.
1932 : *
1933 : * The subscription was created in a previous step but it was disabled. After
1934 : * adjusting the initial logical replication location, enable the subscription.
1935 : */
1936 : static void
1937 16 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1938 : {
1939 16 : PQExpBuffer str = createPQExpBuffer();
1940 : PGresult *res;
1941 : char *subname;
1942 :
1943 : Assert(conn != NULL);
1944 :
1945 16 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1946 :
1947 16 : pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1948 : dbinfo->subname, dbinfo->dbname);
1949 :
1950 16 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1951 :
1952 16 : pg_log_debug("command is: %s", str->data);
1953 :
1954 16 : if (!dry_run)
1955 : {
1956 4 : res = PQexec(conn, str->data);
1957 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1958 : {
1959 0 : pg_log_error("could not enable subscription \"%s\": %s",
1960 : dbinfo->subname, PQresultErrorMessage(res));
1961 0 : disconnect_database(conn, true);
1962 : }
1963 :
1964 4 : PQclear(res);
1965 : }
1966 :
1967 16 : PQfreemem(subname);
1968 16 : destroyPQExpBuffer(str);
1969 16 : }
1970 :
1971 : /*
1972 : * Fetch a list of all connectable non-template databases from the source server
1973 : * and form a list such that they appear as if the user has specified multiple
1974 : * --database options, one for each source database.
1975 : */
1976 : static void
1977 2 : get_publisher_databases(struct CreateSubscriberOptions *opt,
1978 : bool dbnamespecified)
1979 : {
1980 : PGconn *conn;
1981 : PGresult *res;
1982 :
1983 : /* If a database name was specified, just connect to it. */
1984 2 : if (dbnamespecified)
1985 0 : conn = connect_database(opt->pub_conninfo_str, true);
1986 : else
1987 : {
1988 : /* Otherwise, try postgres first and then template1. */
1989 : char *conninfo;
1990 :
1991 2 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
1992 2 : conn = connect_database(conninfo, false);
1993 2 : pg_free(conninfo);
1994 2 : if (!conn)
1995 : {
1996 0 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
1997 0 : conn = connect_database(conninfo, true);
1998 0 : pg_free(conninfo);
1999 : }
2000 : }
2001 :
2002 2 : res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2003 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2004 : {
2005 0 : pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2006 0 : PQclear(res);
2007 0 : disconnect_database(conn, true);
2008 : }
2009 :
2010 8 : for (int i = 0; i < PQntuples(res); i++)
2011 : {
2012 6 : const char *dbname = PQgetvalue(res, i, 0);
2013 :
2014 6 : simple_string_list_append(&opt->database_names, dbname);
2015 :
2016 : /* Increment num_dbs to reflect multiple --database options */
2017 6 : num_dbs++;
2018 : }
2019 :
2020 2 : PQclear(res);
2021 2 : disconnect_database(conn, false);
2022 2 : }
2023 :
2024 : int
2025 46 : main(int argc, char **argv)
2026 : {
2027 : static struct option long_options[] =
2028 : {
2029 : {"all", no_argument, NULL, 'a'},
2030 : {"database", required_argument, NULL, 'd'},
2031 : {"pgdata", required_argument, NULL, 'D'},
2032 : {"dry-run", no_argument, NULL, 'n'},
2033 : {"subscriber-port", required_argument, NULL, 'p'},
2034 : {"publisher-server", required_argument, NULL, 'P'},
2035 : {"socketdir", required_argument, NULL, 's'},
2036 : {"recovery-timeout", required_argument, NULL, 't'},
2037 : {"enable-two-phase", no_argument, NULL, 'T'},
2038 : {"subscriber-username", required_argument, NULL, 'U'},
2039 : {"verbose", no_argument, NULL, 'v'},
2040 : {"version", no_argument, NULL, 'V'},
2041 : {"help", no_argument, NULL, '?'},
2042 : {"config-file", required_argument, NULL, 1},
2043 : {"publication", required_argument, NULL, 2},
2044 : {"replication-slot", required_argument, NULL, 3},
2045 : {"subscription", required_argument, NULL, 4},
2046 : {"clean", required_argument, NULL, 5},
2047 : {NULL, 0, NULL, 0}
2048 : };
2049 :
2050 46 : struct CreateSubscriberOptions opt = {0};
2051 :
2052 : int c;
2053 : int option_index;
2054 :
2055 : char *pub_base_conninfo;
2056 : char *sub_base_conninfo;
2057 46 : char *dbname_conninfo = NULL;
2058 :
2059 : uint64 pub_sysid;
2060 : uint64 sub_sysid;
2061 : struct stat statbuf;
2062 :
2063 : char *consistent_lsn;
2064 :
2065 : char pidfile[MAXPGPATH];
2066 :
2067 46 : pg_logging_init(argv[0]);
2068 46 : pg_logging_set_level(PG_LOG_WARNING);
2069 46 : progname = get_progname(argv[0]);
2070 46 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2071 :
2072 46 : if (argc > 1)
2073 : {
2074 44 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2075 : {
2076 2 : usage();
2077 2 : exit(0);
2078 : }
2079 42 : else if (strcmp(argv[1], "-V") == 0
2080 42 : || strcmp(argv[1], "--version") == 0)
2081 : {
2082 2 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2083 2 : exit(0);
2084 : }
2085 : }
2086 :
2087 : /* Default settings */
2088 42 : subscriber_dir = NULL;
2089 42 : opt.config_file = NULL;
2090 42 : opt.pub_conninfo_str = NULL;
2091 42 : opt.socket_dir = NULL;
2092 42 : opt.sub_port = DEFAULT_SUB_PORT;
2093 42 : opt.sub_username = NULL;
2094 42 : opt.two_phase = false;
2095 42 : opt.database_names = (SimpleStringList)
2096 : {
2097 : 0
2098 : };
2099 42 : opt.recovery_timeout = 0;
2100 42 : opt.all_dbs = false;
2101 :
2102 : /*
2103 : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2104 : * it either.
2105 : */
2106 : #ifndef WIN32
2107 42 : if (geteuid() == 0)
2108 : {
2109 0 : pg_log_error("cannot be executed by \"root\"");
2110 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2111 : progname);
2112 0 : exit(1);
2113 : }
2114 : #endif
2115 :
2116 42 : get_restricted_token();
2117 :
2118 324 : while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
2119 324 : long_options, &option_index)) != -1)
2120 : {
2121 288 : switch (c)
2122 : {
2123 6 : case 'a':
2124 6 : opt.all_dbs = true;
2125 6 : break;
2126 50 : case 'd':
2127 50 : if (!simple_string_list_member(&opt.database_names, optarg))
2128 : {
2129 48 : simple_string_list_append(&opt.database_names, optarg);
2130 48 : num_dbs++;
2131 : }
2132 : else
2133 2 : pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2134 48 : break;
2135 38 : case 'D':
2136 38 : subscriber_dir = pg_strdup(optarg);
2137 38 : canonicalize_path(subscriber_dir);
2138 38 : break;
2139 18 : case 'n':
2140 18 : dry_run = true;
2141 18 : break;
2142 24 : case 'p':
2143 24 : opt.sub_port = pg_strdup(optarg);
2144 24 : break;
2145 36 : case 'P':
2146 36 : opt.pub_conninfo_str = pg_strdup(optarg);
2147 36 : break;
2148 24 : case 's':
2149 24 : opt.socket_dir = pg_strdup(optarg);
2150 24 : canonicalize_path(opt.socket_dir);
2151 24 : break;
2152 6 : case 't':
2153 6 : opt.recovery_timeout = atoi(optarg);
2154 6 : break;
2155 2 : case 'T':
2156 2 : opt.two_phase = true;
2157 2 : break;
2158 0 : case 'U':
2159 0 : opt.sub_username = pg_strdup(optarg);
2160 0 : break;
2161 38 : case 'v':
2162 38 : pg_logging_increase_verbosity();
2163 38 : break;
2164 0 : case 1:
2165 0 : opt.config_file = pg_strdup(optarg);
2166 0 : break;
2167 24 : case 2:
2168 24 : if (!simple_string_list_member(&opt.pub_names, optarg))
2169 : {
2170 22 : simple_string_list_append(&opt.pub_names, optarg);
2171 22 : num_pubs++;
2172 : }
2173 : else
2174 2 : pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2175 22 : break;
2176 8 : case 3:
2177 8 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2178 : {
2179 8 : simple_string_list_append(&opt.replslot_names, optarg);
2180 8 : num_replslots++;
2181 : }
2182 : else
2183 0 : pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2184 8 : break;
2185 10 : case 4:
2186 10 : if (!simple_string_list_member(&opt.sub_names, optarg))
2187 : {
2188 10 : simple_string_list_append(&opt.sub_names, optarg);
2189 10 : num_subs++;
2190 : }
2191 : else
2192 0 : pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2193 10 : break;
2194 2 : case 5:
2195 2 : if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
2196 2 : simple_string_list_append(&opt.objecttypes_to_clean, optarg);
2197 : else
2198 0 : pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2199 2 : break;
2200 2 : default:
2201 : /* getopt_long already emitted a complaint */
2202 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2203 2 : exit(1);
2204 : }
2205 : }
2206 :
2207 : /* Validate that --all is not used with incompatible options */
2208 36 : if (opt.all_dbs)
2209 : {
2210 6 : char *bad_switch = NULL;
2211 :
2212 6 : if (num_dbs > 0)
2213 2 : bad_switch = "--database";
2214 4 : else if (num_pubs > 0)
2215 2 : bad_switch = "--publication";
2216 2 : else if (num_replslots > 0)
2217 0 : bad_switch = "--replication-slot";
2218 2 : else if (num_subs > 0)
2219 0 : bad_switch = "--subscription";
2220 :
2221 6 : if (bad_switch)
2222 : {
2223 4 : pg_log_error("options %s and -a/--all cannot be used together", bad_switch);
2224 4 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2225 4 : exit(1);
2226 : }
2227 : }
2228 :
2229 : /* Any non-option arguments? */
2230 32 : if (optind < argc)
2231 : {
2232 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
2233 : argv[optind]);
2234 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2235 0 : exit(1);
2236 : }
2237 :
2238 : /* Required arguments */
2239 32 : if (subscriber_dir == NULL)
2240 : {
2241 2 : pg_log_error("no subscriber data directory specified");
2242 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2243 2 : exit(1);
2244 : }
2245 :
2246 : /* If socket directory is not provided, use the current directory */
2247 30 : if (opt.socket_dir == NULL)
2248 : {
2249 : char cwd[MAXPGPATH];
2250 :
2251 10 : if (!getcwd(cwd, MAXPGPATH))
2252 0 : pg_fatal("could not determine current directory");
2253 10 : opt.socket_dir = pg_strdup(cwd);
2254 10 : canonicalize_path(opt.socket_dir);
2255 : }
2256 :
2257 : /*
2258 : * Parse connection string. Build a base connection string that might be
2259 : * reused by multiple databases.
2260 : */
2261 30 : if (opt.pub_conninfo_str == NULL)
2262 : {
2263 : /*
2264 : * TODO use primary_conninfo (if available) from subscriber and
2265 : * extract publisher connection string. Assume that there are
2266 : * identical entries for physical and logical replication. If there is
2267 : * not, we would fail anyway.
2268 : */
2269 2 : pg_log_error("no publisher connection string specified");
2270 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2271 2 : exit(1);
2272 : }
2273 28 : pg_log_info("validating publisher connection string");
2274 28 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2275 : &dbname_conninfo);
2276 28 : if (pub_base_conninfo == NULL)
2277 0 : exit(1);
2278 :
2279 28 : pg_log_info("validating subscriber connection string");
2280 28 : sub_base_conninfo = get_sub_conninfo(&opt);
2281 :
2282 : /*
2283 : * Fetch all databases from the source (publisher) and treat them as if
2284 : * the user specified has multiple --database options, one for each source
2285 : * database.
2286 : */
2287 28 : if (opt.all_dbs)
2288 : {
2289 2 : bool dbnamespecified = (dbname_conninfo != NULL);
2290 :
2291 2 : get_publisher_databases(&opt, dbnamespecified);
2292 : }
2293 :
2294 28 : if (opt.database_names.head == NULL)
2295 : {
2296 4 : pg_log_info("no database was specified");
2297 :
2298 : /*
2299 : * Try to obtain the dbname from the publisher conninfo. If dbname
2300 : * parameter is not available, error out.
2301 : */
2302 4 : if (dbname_conninfo)
2303 : {
2304 2 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2305 2 : num_dbs++;
2306 :
2307 2 : pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2308 : dbname_conninfo);
2309 : }
2310 : else
2311 : {
2312 2 : pg_log_error("no database name specified");
2313 2 : pg_log_error_hint("Try \"%s --help\" for more information.",
2314 : progname);
2315 2 : exit(1);
2316 : }
2317 : }
2318 :
2319 : /* Number of object names must match number of databases */
2320 26 : if (num_pubs > 0 && num_pubs != num_dbs)
2321 : {
2322 2 : pg_log_error("wrong number of publication names specified");
2323 2 : pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2324 : num_pubs, num_dbs);
2325 2 : exit(1);
2326 : }
2327 24 : if (num_subs > 0 && num_subs != num_dbs)
2328 : {
2329 2 : pg_log_error("wrong number of subscription names specified");
2330 2 : pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2331 : num_subs, num_dbs);
2332 2 : exit(1);
2333 : }
2334 22 : if (num_replslots > 0 && num_replslots != num_dbs)
2335 : {
2336 2 : pg_log_error("wrong number of replication slot names specified");
2337 2 : pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2338 : num_replslots, num_dbs);
2339 2 : exit(1);
2340 : }
2341 :
2342 : /* Verify the object types specified for removal from the subscriber */
2343 22 : for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2344 : {
2345 2 : if (pg_strcasecmp(cell->val, "publications") == 0)
2346 2 : dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
2347 : else
2348 : {
2349 0 : pg_log_error("invalid object type \"%s\" specified for --clean", cell->val);
2350 0 : pg_log_error_hint("The valid value is: \"%s\"", "publications");
2351 0 : exit(1);
2352 : }
2353 : }
2354 :
2355 : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2356 20 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2357 20 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2358 :
2359 : /* Rudimentary check for a data directory */
2360 20 : check_data_directory(subscriber_dir);
2361 :
2362 20 : dbinfos.two_phase = opt.two_phase;
2363 :
2364 : /*
2365 : * Store database information for publisher and subscriber. It should be
2366 : * called before atexit() because its return is used in the
2367 : * cleanup_objects_atexit().
2368 : */
2369 20 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2370 :
2371 : /* Register a function to clean up objects in case of failure */
2372 20 : atexit(cleanup_objects_atexit);
2373 :
2374 : /*
2375 : * Check if the subscriber data directory has the same system identifier
2376 : * than the publisher data directory.
2377 : */
2378 20 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
2379 20 : sub_sysid = get_standby_sysid(subscriber_dir);
2380 20 : if (pub_sysid != sub_sysid)
2381 2 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2382 :
2383 : /* Subscriber PID file */
2384 18 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2385 :
2386 : /*
2387 : * The standby server must not be running. If the server is started under
2388 : * service manager and pg_createsubscriber stops it, the service manager
2389 : * might react to this action and start the server again. Therefore,
2390 : * refuse to proceed if the server is running to avoid possible failures.
2391 : */
2392 18 : if (stat(pidfile, &statbuf) == 0)
2393 : {
2394 2 : pg_log_error("standby server is running");
2395 2 : pg_log_error_hint("Stop the standby server and try again.");
2396 2 : exit(1);
2397 : }
2398 :
2399 : /*
2400 : * Start a short-lived standby server with temporary parameters (provided
2401 : * by command-line options). The goal is to avoid connections during the
2402 : * transformation steps.
2403 : */
2404 16 : pg_log_info("starting the standby server with command-line options");
2405 16 : start_standby_server(&opt, true, false);
2406 :
2407 : /* Check if the standby server is ready for logical replication */
2408 16 : check_subscriber(dbinfos.dbinfo);
2409 :
2410 : /* Check if the primary server is ready for logical replication */
2411 12 : check_publisher(dbinfos.dbinfo);
2412 :
2413 : /*
2414 : * Stop the target server. The recovery process requires that the server
2415 : * reaches a consistent state before targeting the recovery stop point.
2416 : * Make sure a consistent state is reached (stop the target server
2417 : * guarantees it) *before* creating the replication slots in
2418 : * setup_publisher().
2419 : */
2420 8 : pg_log_info("stopping the subscriber");
2421 8 : stop_standby_server(subscriber_dir);
2422 :
2423 : /* Create the required objects for each database on publisher */
2424 8 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2425 :
2426 : /* Write the required recovery parameters */
2427 8 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2428 :
2429 : /*
2430 : * Start subscriber so the recovery parameters will take effect. Wait
2431 : * until accepting connections. We don't want to start logical replication
2432 : * during setup.
2433 : */
2434 8 : pg_log_info("starting the subscriber");
2435 8 : start_standby_server(&opt, true, true);
2436 :
2437 : /* Waiting the subscriber to be promoted */
2438 8 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2439 :
2440 : /*
2441 : * Create the subscription for each database on subscriber. It does not
2442 : * enable it immediately because it needs to adjust the replication start
2443 : * point to the LSN reported by setup_publisher(). It also cleans up
2444 : * publications created by this tool and replication to the standby.
2445 : */
2446 8 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2447 :
2448 : /* Remove primary_slot_name if it exists on primary */
2449 8 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2450 :
2451 : /* Remove failover replication slots if they exist on subscriber */
2452 8 : drop_failover_replication_slots(dbinfos.dbinfo);
2453 :
2454 : /* Stop the subscriber */
2455 8 : pg_log_info("stopping the subscriber");
2456 8 : stop_standby_server(subscriber_dir);
2457 :
2458 : /* Change system identifier from subscriber */
2459 8 : modify_subscriber_sysid(&opt);
2460 :
2461 8 : success = true;
2462 :
2463 8 : pg_log_info("Done!");
2464 :
2465 8 : return 0;
2466 : }
|