Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_createsubscriber.c
4 : * Create a new logical replica from a standby server
5 : *
6 : * Copyright (c) 2024-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres_fe.h"
15 :
16 : #include <sys/stat.h>
17 : #include <sys/time.h>
18 : #include <sys/wait.h>
19 : #include <time.h>
20 :
21 : #include "common/connect.h"
22 : #include "common/controldata_utils.h"
23 : #include "common/file_perm.h"
24 : #include "common/file_utils.h"
25 : #include "common/logging.h"
26 : #include "common/pg_prng.h"
27 : #include "common/restricted_token.h"
28 : #include "datatype/timestamp.h"
29 : #include "fe_utils/recovery_gen.h"
30 : #include "fe_utils/simple_list.h"
31 : #include "fe_utils/string_utils.h"
32 : #include "fe_utils/version.h"
33 : #include "getopt_long.h"
34 :
35 : #define DEFAULT_SUB_PORT "50432"
36 : #define OBJECTTYPE_PUBLICATIONS 0x0001
37 :
38 : /*
39 : * Configuration files for recovery parameters.
40 : *
41 : * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
42 : * the server through an include_if_exists in postgresql.auto.conf.
43 : *
44 : * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
45 : * so as the recovery parameters set by this tool never take effect on node
46 : * restart. The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
47 : * debugging.
48 : */
49 : #define PG_AUTOCONF_FILENAME "postgresql.auto.conf"
50 : #define INCLUDED_CONF_FILE "pg_createsubscriber.conf"
51 : #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
52 :
53 : #define SERVER_LOG_FILE_NAME "pg_createsubscriber_server.log"
54 : #define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal.log"
55 :
56 : /* Command-line options */
57 : struct CreateSubscriberOptions
58 : {
59 : char *config_file; /* configuration file */
60 : char *log_dir; /* log directory name */
61 : char *pub_conninfo_str; /* publisher connection string */
62 : char *socket_dir; /* directory for Unix-domain socket, if any */
63 : char *sub_port; /* subscriber port number */
64 : const char *sub_username; /* subscriber username */
65 : bool two_phase; /* enable-two-phase option */
66 : SimpleStringList database_names; /* list of database names */
67 : SimpleStringList pub_names; /* list of publication names */
68 : SimpleStringList sub_names; /* list of subscription names */
69 : SimpleStringList replslot_names; /* list of replication slot names */
70 : int recovery_timeout; /* stop recovery after this time */
71 : bool all_dbs; /* all option */
72 : SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
73 : };
74 :
75 : /* per-database publication/subscription info */
76 : struct LogicalRepInfo
77 : {
78 : char *dbname; /* database name */
79 : char *pubconninfo; /* publisher connection string */
80 : char *subconninfo; /* subscriber connection string */
81 : char *pubname; /* publication name */
82 : char *subname; /* subscription name */
83 : char *replslotname; /* replication slot name */
84 :
85 : bool made_replslot; /* replication slot was created */
86 : bool made_publication; /* publication was created */
87 : };
88 :
89 : /*
90 : * Information shared across all the databases (or publications and
91 : * subscriptions).
92 : */
93 : struct LogicalRepInfos
94 : {
95 : struct LogicalRepInfo *dbinfo;
96 : bool two_phase; /* enable-two-phase option */
97 : uint32 objecttypes_to_clean; /* flags indicating which object types
98 : * to clean up on subscriber */
99 : };
100 :
101 : static void cleanup_objects_atexit(void);
102 : static void usage(void);
103 : static char *get_base_conninfo(const char *conninfo, char **dbname);
104 : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
105 : static char *get_exec_path(const char *argv0, const char *progname);
106 : static void check_data_directory(const char *datadir);
107 : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
108 : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
109 : const char *pub_base_conninfo,
110 : const char *sub_base_conninfo);
111 : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
112 : static void disconnect_database(PGconn *conn, bool exit_on_error);
113 : static uint64 get_primary_sysid(const char *conninfo);
114 : static uint64 get_standby_sysid(const char *datadir);
115 : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
116 : static bool server_is_in_recovery(PGconn *conn);
117 : static char *generate_object_name(PGconn *conn);
118 : static void check_publisher(const struct LogicalRepInfo *dbinfo);
119 : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
120 : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
121 : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
122 : const char *consistent_lsn);
123 : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
124 : const char *lsn);
125 : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
126 : const char *slotname);
127 : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
128 : static char *create_logical_replication_slot(PGconn *conn,
129 : struct LogicalRepInfo *dbinfo);
130 : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
131 : const char *slot_name);
132 : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
133 : static void start_standby_server(const struct CreateSubscriberOptions *opt,
134 : bool restricted_access,
135 : bool restrict_logical_worker);
136 : static void stop_standby_server(const char *datadir);
137 : static void wait_for_end_recovery(const char *conninfo,
138 : const struct CreateSubscriberOptions *opt);
139 : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
140 : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
141 : static void drop_publication(PGconn *conn, const char *pubname,
142 : const char *dbname, bool *made_publication);
143 : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
144 : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
145 : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
146 : const char *lsn);
147 : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
148 : static void check_and_drop_existing_subscriptions(PGconn *conn,
149 : const struct LogicalRepInfo *dbinfo);
150 : static void drop_existing_subscription(PGconn *conn, const char *subname,
151 : const char *dbname);
152 : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
153 : bool dbnamespecified);
154 :
155 : #define WAIT_INTERVAL 1 /* 1 second */
156 :
157 : static const char *progname;
158 :
159 : static char *primary_slot_name = NULL;
160 : static bool dry_run = false;
161 :
162 : static bool success = false;
163 :
164 : static struct LogicalRepInfos dbinfos;
165 : static int num_dbs = 0; /* number of specified databases */
166 : static int num_pubs = 0; /* number of specified publications */
167 : static int num_subs = 0; /* number of specified subscriptions */
168 : static int num_replslots = 0; /* number of specified replication slots */
169 :
170 : static pg_prng_state prng_state;
171 :
172 : static char *pg_ctl_path = NULL;
173 : static char *pg_resetwal_path = NULL;
174 :
175 : static char *logdir = NULL; /* Subdirectory of the user specified logdir
176 : * where the log files are written (if
177 : * specified) */
178 :
179 : /* standby / subscriber data directory */
180 : static char *subscriber_dir = NULL;
181 :
182 : static bool recovery_ended = false;
183 : static bool standby_running = false;
184 : static bool recovery_params_set = false;
185 :
186 :
187 : /*
188 : * Clean up objects created by pg_createsubscriber.
189 : *
190 : * Publications and replication slots are created on the primary. Depending
191 : * on the step where it failed, already-created objects should be removed if
192 : * possible (sometimes this won't work due to a connection issue).
193 : * There is no cleanup on the target server *after* its promotion, because any
194 : * failure at this point means recreating the physical replica and starting
195 : * again.
196 : *
197 : * The recovery configuration is always removed, by renaming the included
198 : * configuration file out of the way.
199 : */
200 : static void
201 10 : cleanup_objects_atexit(void)
202 : {
203 : /* Rename the included configuration file, if necessary. */
204 10 : if (recovery_params_set)
205 : {
206 : char conf_filename[MAXPGPATH];
207 : char conf_filename_disabled[MAXPGPATH];
208 :
209 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
210 : INCLUDED_CONF_FILE);
211 1 : snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
212 : INCLUDED_CONF_FILE_DISABLED);
213 :
214 1 : if (durable_rename(conf_filename, conf_filename_disabled) != 0)
215 : {
216 : /* durable_rename() has already logged something. */
217 0 : pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
218 : }
219 : }
220 :
221 10 : if (success)
222 4 : return;
223 :
224 : /*
225 : * If the server is promoted, there is no way to use the current setup
226 : * again. Warn the user that a new replication setup should be done before
227 : * trying again.
228 : */
229 6 : if (recovery_ended)
230 : {
231 0 : pg_log_warning("failed after the end of recovery");
232 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
233 : "You must recreate the physical replica before continuing.");
234 : }
235 :
236 18 : for (int i = 0; i < num_dbs; i++)
237 : {
238 12 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
239 :
240 12 : if (dbinfo->made_publication || dbinfo->made_replslot)
241 : {
242 : PGconn *conn;
243 :
244 0 : conn = connect_database(dbinfo->pubconninfo, false);
245 0 : if (conn != NULL)
246 : {
247 0 : if (dbinfo->made_publication)
248 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
249 : &dbinfo->made_publication);
250 0 : if (dbinfo->made_replslot)
251 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
252 0 : disconnect_database(conn, false);
253 : }
254 : else
255 : {
256 : /*
257 : * If a connection could not be established, inform the user
258 : * that some objects were left on primary and should be
259 : * removed before trying again.
260 : */
261 0 : if (dbinfo->made_publication)
262 : {
263 0 : pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
264 : dbinfo->pubname,
265 : dbinfo->dbname);
266 0 : pg_log_warning_hint("Drop this publication before trying again.");
267 : }
268 0 : if (dbinfo->made_replslot)
269 : {
270 0 : pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
271 : dbinfo->replslotname,
272 : dbinfo->dbname);
273 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
274 : }
275 : }
276 : }
277 : }
278 :
279 6 : if (standby_running)
280 4 : stop_standby_server(subscriber_dir);
281 : }
282 :
283 : static void
284 1 : usage(void)
285 : {
286 1 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
287 : progname);
288 1 : printf(_("Usage:\n"));
289 1 : printf(_(" %s [OPTION]...\n"), progname);
290 1 : printf(_("\nOptions:\n"));
291 1 : printf(_(" -a, --all create subscriptions for all databases except template\n"
292 : " databases and databases that don't allow connections\n"));
293 1 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
294 1 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
295 1 : printf(_(" -l, --logdir=LOGDIR location for the log directory\n"));
296 1 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
297 1 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
298 1 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
299 1 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
300 1 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
301 1 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
302 1 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
303 1 : printf(_(" -v, --verbose output verbose messages\n"));
304 1 : printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
305 : " databases on the subscriber; accepts: \"%s\"\n"), "publications");
306 1 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
307 : " file when running target cluster\n"));
308 1 : printf(_(" --publication=NAME publication name\n"));
309 1 : printf(_(" --replication-slot=NAME replication slot name\n"));
310 1 : printf(_(" --subscription=NAME subscription name\n"));
311 1 : printf(_(" -V, --version output version information, then exit\n"));
312 1 : printf(_(" -?, --help show this help, then exit\n"));
313 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
314 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
315 1 : }
316 :
317 : /*
318 : * Subroutine to append "keyword=value" to a connection string,
319 : * with proper quoting of the value. (We assume keywords don't need that.)
320 : */
321 : static void
322 107 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
323 : {
324 107 : if (buf->len > 0)
325 79 : appendPQExpBufferChar(buf, ' ');
326 107 : appendPQExpBufferStr(buf, keyword);
327 107 : appendPQExpBufferChar(buf, '=');
328 107 : appendConnStrVal(buf, val);
329 107 : }
330 :
331 : /*
332 : * Validate a connection string. Returns a base connection string that is a
333 : * connection string without a database name.
334 : *
335 : * Since we might process multiple databases, each database name will be
336 : * appended to this base connection string to provide a final connection
337 : * string. If the second argument (dbname) is not null, returns dbname if the
338 : * provided connection string contains it.
339 : *
340 : * It is the caller's responsibility to free the returned connection string and
341 : * dbname.
342 : */
343 : static char *
344 14 : get_base_conninfo(const char *conninfo, char **dbname)
345 : {
346 : PQExpBuffer buf;
347 : PQconninfoOption *conn_opts;
348 : PQconninfoOption *conn_opt;
349 14 : char *errmsg = NULL;
350 : char *ret;
351 :
352 14 : conn_opts = PQconninfoParse(conninfo, &errmsg);
353 14 : if (conn_opts == NULL)
354 : {
355 0 : pg_log_error("could not parse connection string: %s", errmsg);
356 0 : PQfreemem(errmsg);
357 0 : return NULL;
358 : }
359 :
360 14 : buf = createPQExpBuffer();
361 742 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
362 : {
363 728 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
364 : {
365 33 : if (strcmp(conn_opt->keyword, "dbname") == 0)
366 : {
367 9 : if (dbname)
368 9 : *dbname = pg_strdup(conn_opt->val);
369 9 : continue;
370 : }
371 24 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
372 : }
373 : }
374 :
375 14 : ret = pg_strdup(buf->data);
376 :
377 14 : destroyPQExpBuffer(buf);
378 14 : PQconninfoFree(conn_opts);
379 :
380 14 : return ret;
381 : }
382 :
383 : /*
384 : * Build a subscriber connection string. Only a few parameters are supported
385 : * since it starts a server with restricted access.
386 : */
387 : static char *
388 14 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
389 : {
390 14 : PQExpBuffer buf = createPQExpBuffer();
391 : char *ret;
392 :
393 14 : appendConnStrItem(buf, "port", opt->sub_port);
394 : #if !defined(WIN32)
395 14 : appendConnStrItem(buf, "host", opt->socket_dir);
396 : #endif
397 14 : if (opt->sub_username != NULL)
398 0 : appendConnStrItem(buf, "user", opt->sub_username);
399 14 : appendConnStrItem(buf, "fallback_application_name", progname);
400 :
401 14 : ret = pg_strdup(buf->data);
402 :
403 14 : destroyPQExpBuffer(buf);
404 :
405 14 : return ret;
406 : }
407 :
408 : /*
409 : * Verify if a PostgreSQL binary (progname) is available in the same directory as
410 : * pg_createsubscriber and it has the same version. It returns the absolute
411 : * path of the progname.
412 : */
413 : static char *
414 20 : get_exec_path(const char *argv0, const char *progname)
415 : {
416 : char *versionstr;
417 : char *exec_path;
418 : int ret;
419 :
420 20 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
421 20 : exec_path = pg_malloc(MAXPGPATH);
422 20 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
423 :
424 20 : if (ret < 0)
425 : {
426 : char full_path[MAXPGPATH];
427 :
428 0 : if (find_my_exec(argv0, full_path) < 0)
429 0 : strlcpy(full_path, progname, sizeof(full_path));
430 :
431 0 : if (ret == -1)
432 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
433 : progname, "pg_createsubscriber", full_path);
434 : else
435 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
436 : progname, full_path, "pg_createsubscriber");
437 : }
438 :
439 20 : pg_log_debug("%s path is: %s", progname, exec_path);
440 :
441 20 : return exec_path;
442 : }
443 :
444 : /*
445 : * Is it a cluster directory? These are preliminary checks. It is far from
446 : * making an accurate check. If it is not a clone from the publisher, it will
447 : * eventually fail in a future step.
448 : */
449 : static void
450 10 : check_data_directory(const char *datadir)
451 : {
452 : struct stat statbuf;
453 : uint32 major_version;
454 : char *version_str;
455 :
456 10 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
457 : datadir);
458 :
459 10 : if (stat(datadir, &statbuf) != 0)
460 : {
461 0 : if (errno == ENOENT)
462 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
463 : else
464 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
465 : }
466 :
467 : /*
468 : * Retrieve the contents of this cluster's PG_VERSION. We require
469 : * compatibility with the same major version as the one this tool is
470 : * compiled with.
471 : */
472 10 : major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
473 10 : if (major_version != PG_MAJORVERSION_NUM)
474 : {
475 0 : pg_log_error("data directory is of wrong version");
476 0 : pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
477 : "PG_VERSION", version_str, PG_MAJORVERSION);
478 0 : exit(1);
479 : }
480 10 : }
481 :
482 : /*
483 : * Append database name into a base connection string.
484 : *
485 : * dbname is the only parameter that changes so it is not included in the base
486 : * connection string. This function concatenates dbname to build a "real"
487 : * connection string.
488 : */
489 : static char *
490 41 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
491 : {
492 41 : PQExpBuffer buf = createPQExpBuffer();
493 : char *ret;
494 :
495 : Assert(conninfo != NULL);
496 :
497 41 : appendPQExpBufferStr(buf, conninfo);
498 41 : appendConnStrItem(buf, "dbname", dbname);
499 :
500 41 : ret = pg_strdup(buf->data);
501 41 : destroyPQExpBuffer(buf);
502 :
503 41 : return ret;
504 : }
505 :
506 : /*
507 : * Store publication and subscription information.
508 : *
509 : * If publication, replication slot and subscription names were specified,
510 : * store it here. Otherwise, a generated name will be assigned to the object in
511 : * setup_publisher().
512 : */
513 : static struct LogicalRepInfo *
514 10 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
515 : const char *pub_base_conninfo,
516 : const char *sub_base_conninfo)
517 : {
518 : struct LogicalRepInfo *dbinfo;
519 10 : SimpleStringListCell *pubcell = NULL;
520 10 : SimpleStringListCell *subcell = NULL;
521 10 : SimpleStringListCell *replslotcell = NULL;
522 10 : int i = 0;
523 :
524 10 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
525 :
526 10 : if (num_pubs > 0)
527 2 : pubcell = opt->pub_names.head;
528 10 : if (num_subs > 0)
529 1 : subcell = opt->sub_names.head;
530 10 : if (num_replslots > 0)
531 2 : replslotcell = opt->replslot_names.head;
532 :
533 30 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
534 : {
535 : char *conninfo;
536 :
537 : /* Fill publisher attributes */
538 20 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
539 20 : dbinfo[i].pubconninfo = conninfo;
540 20 : dbinfo[i].dbname = cell->val;
541 20 : if (num_pubs > 0)
542 4 : dbinfo[i].pubname = pubcell->val;
543 : else
544 16 : dbinfo[i].pubname = NULL;
545 20 : if (num_replslots > 0)
546 3 : dbinfo[i].replslotname = replslotcell->val;
547 : else
548 17 : dbinfo[i].replslotname = NULL;
549 20 : dbinfo[i].made_replslot = false;
550 20 : dbinfo[i].made_publication = false;
551 : /* Fill subscriber attributes */
552 20 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
553 20 : dbinfo[i].subconninfo = conninfo;
554 20 : if (num_subs > 0)
555 2 : dbinfo[i].subname = subcell->val;
556 : else
557 18 : dbinfo[i].subname = NULL;
558 : /* Other fields will be filled later */
559 :
560 20 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
561 : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
562 : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
563 : dbinfo[i].pubconninfo);
564 20 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
565 : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
566 : dbinfo[i].subconninfo,
567 : dbinfos.two_phase ? "true" : "false");
568 :
569 20 : if (num_pubs > 0)
570 4 : pubcell = pubcell->next;
571 20 : if (num_subs > 0)
572 2 : subcell = subcell->next;
573 20 : if (num_replslots > 0)
574 3 : replslotcell = replslotcell->next;
575 :
576 20 : i++;
577 : }
578 :
579 10 : return dbinfo;
580 : }
581 :
582 : /*
583 : * Open a new connection. If exit_on_error is true, it has an undesired
584 : * condition and it should exit immediately.
585 : */
586 : static PGconn *
587 57 : connect_database(const char *conninfo, bool exit_on_error)
588 : {
589 : PGconn *conn;
590 : PGresult *res;
591 :
592 57 : conn = PQconnectdb(conninfo);
593 57 : if (PQstatus(conn) != CONNECTION_OK)
594 : {
595 0 : pg_log_error("connection to database failed: %s",
596 : PQerrorMessage(conn));
597 0 : PQfinish(conn);
598 :
599 0 : if (exit_on_error)
600 0 : exit(1);
601 0 : return NULL;
602 : }
603 :
604 : /* Secure search_path */
605 57 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
606 57 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
607 : {
608 0 : pg_log_error("could not clear \"search_path\": %s",
609 : PQresultErrorMessage(res));
610 0 : PQclear(res);
611 0 : PQfinish(conn);
612 :
613 0 : if (exit_on_error)
614 0 : exit(1);
615 0 : return NULL;
616 : }
617 57 : PQclear(res);
618 :
619 57 : return conn;
620 : }
621 :
622 : /*
623 : * Close the connection. If exit_on_error is true, it has an undesired
624 : * condition and it should exit immediately.
625 : */
626 : static void
627 57 : disconnect_database(PGconn *conn, bool exit_on_error)
628 : {
629 : Assert(conn != NULL);
630 :
631 57 : PQfinish(conn);
632 :
633 57 : if (exit_on_error)
634 2 : exit(1);
635 55 : }
636 :
637 : /*
638 : * Obtain the system identifier using the provided connection. It will be used
639 : * to compare if a data directory is a clone of another one.
640 : */
641 : static uint64
642 10 : get_primary_sysid(const char *conninfo)
643 : {
644 : PGconn *conn;
645 : PGresult *res;
646 : uint64 sysid;
647 :
648 10 : pg_log_info("getting system identifier from publisher");
649 :
650 10 : conn = connect_database(conninfo, true);
651 :
652 10 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
653 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
654 : {
655 0 : pg_log_error("could not get system identifier: %s",
656 : PQresultErrorMessage(res));
657 0 : disconnect_database(conn, true);
658 : }
659 10 : if (PQntuples(res) != 1)
660 : {
661 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
662 : PQntuples(res), 1);
663 0 : disconnect_database(conn, true);
664 : }
665 :
666 10 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
667 :
668 10 : pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
669 :
670 10 : PQclear(res);
671 10 : disconnect_database(conn, false);
672 :
673 10 : return sysid;
674 : }
675 :
676 : /*
677 : * Obtain the system identifier from control file. It will be used to compare
678 : * if a data directory is a clone of another one. This routine is used locally
679 : * and avoids a connection.
680 : */
681 : static uint64
682 10 : get_standby_sysid(const char *datadir)
683 : {
684 : ControlFileData *cf;
685 : bool crc_ok;
686 : uint64 sysid;
687 :
688 10 : pg_log_info("getting system identifier from subscriber");
689 :
690 10 : cf = get_controlfile(datadir, &crc_ok);
691 10 : if (!crc_ok)
692 0 : pg_fatal("control file appears to be corrupt");
693 :
694 10 : sysid = cf->system_identifier;
695 :
696 10 : pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
697 :
698 10 : pg_free(cf);
699 :
700 10 : return sysid;
701 : }
702 :
703 : /*
704 : * Modify the system identifier. Since a standby server preserves the system
705 : * identifier, it makes sense to change it to avoid situations in which WAL
706 : * files from one of the systems might be used in the other one.
707 : */
708 : static void
709 4 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
710 : {
711 : ControlFileData *cf;
712 : bool crc_ok;
713 : struct timeval tv;
714 :
715 : char *out_file;
716 : char *cmd_str;
717 :
718 4 : pg_log_info("modifying system identifier of subscriber");
719 :
720 4 : cf = get_controlfile(subscriber_dir, &crc_ok);
721 4 : if (!crc_ok)
722 0 : pg_fatal("control file appears to be corrupt");
723 :
724 : /*
725 : * Select a new system identifier.
726 : *
727 : * XXX this code was extracted from BootStrapXLOG().
728 : */
729 4 : gettimeofday(&tv, NULL);
730 4 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
731 4 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
732 4 : cf->system_identifier |= getpid() & 0xFFF;
733 :
734 4 : if (dry_run)
735 3 : pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
736 : cf->system_identifier);
737 : else
738 : {
739 1 : update_controlfile(subscriber_dir, cf, true);
740 1 : pg_log_info("system identifier is %" PRIu64 " on subscriber",
741 : cf->system_identifier);
742 : }
743 :
744 4 : if (dry_run)
745 3 : pg_log_info("dry-run: would run pg_resetwal on the subscriber");
746 : else
747 1 : pg_log_info("running pg_resetwal on the subscriber");
748 :
749 : /*
750 : * Redirecting the output to the logfile if specified. Since the output
751 : * would be very short, around one line, we do not provide a separate file
752 : * for it; it's done as a part of the server log.
753 : */
754 4 : if (opt->log_dir)
755 1 : out_file = psprintf("%s/%s", logdir, SERVER_LOG_FILE_NAME);
756 : else
757 3 : out_file = DEVNULL;
758 :
759 4 : cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
760 : subscriber_dir, out_file);
761 4 : if (opt->log_dir)
762 1 : pg_free(out_file);
763 :
764 4 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
765 :
766 4 : if (!dry_run)
767 : {
768 1 : int rc = system(cmd_str);
769 :
770 1 : if (rc == 0)
771 1 : pg_log_info("successfully reset WAL on the subscriber");
772 : else
773 0 : pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
774 : }
775 :
776 4 : pg_free(cf);
777 4 : pg_free(cmd_str);
778 4 : }
779 :
780 : /*
781 : * Generate an object name using a prefix, database oid and a random integer.
782 : * It is used in case the user does not specify an object name (publication,
783 : * subscription, replication slot).
784 : */
785 : static char *
786 8 : generate_object_name(PGconn *conn)
787 : {
788 : PGresult *res;
789 : Oid oid;
790 : uint32 rand;
791 : char *objname;
792 :
793 8 : res = PQexec(conn,
794 : "SELECT oid FROM pg_catalog.pg_database "
795 : "WHERE datname = pg_catalog.current_database()");
796 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
797 : {
798 0 : pg_log_error("could not obtain database OID: %s",
799 : PQresultErrorMessage(res));
800 0 : disconnect_database(conn, true);
801 : }
802 :
803 8 : if (PQntuples(res) != 1)
804 : {
805 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
806 : PQntuples(res), 1);
807 0 : disconnect_database(conn, true);
808 : }
809 :
810 : /* Database OID */
811 8 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
812 :
813 8 : PQclear(res);
814 :
815 : /* Random unsigned integer */
816 8 : rand = pg_prng_uint32(&prng_state);
817 :
818 : /*
819 : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
820 : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
821 : * '\0').
822 : */
823 8 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
824 :
825 8 : return objname;
826 : }
827 :
828 : /*
829 : * Does the publication exist in the specified database?
830 : */
831 : static bool
832 8 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
833 : {
834 8 : PQExpBuffer str = createPQExpBuffer();
835 : PGresult *res;
836 8 : bool found = false;
837 8 : char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
838 :
839 8 : appendPQExpBuffer(str,
840 : "SELECT 1 FROM pg_catalog.pg_publication "
841 : "WHERE pubname = %s",
842 : pubname_esc);
843 8 : res = PQexec(conn, str->data);
844 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
845 : {
846 0 : pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
847 : pubname, dbname, PQerrorMessage(conn));
848 0 : disconnect_database(conn, true);
849 : }
850 :
851 8 : if (PQntuples(res) == 1)
852 1 : found = true;
853 :
854 8 : PQclear(res);
855 8 : PQfreemem(pubname_esc);
856 8 : destroyPQExpBuffer(str);
857 :
858 8 : return found;
859 : }
860 :
861 : /*
862 : * Create the publications and replication slots in preparation for logical
863 : * replication. Returns the LSN from latest replication slot. It will be the
864 : * replication start point that is used to adjust the subscriptions (see
865 : * set_replication_progress).
866 : */
867 : static char *
868 4 : setup_publisher(struct LogicalRepInfo *dbinfo)
869 : {
870 4 : char *lsn = NULL;
871 :
872 4 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
873 :
874 12 : for (int i = 0; i < num_dbs; i++)
875 : {
876 : PGconn *conn;
877 8 : char *genname = NULL;
878 :
879 8 : conn = connect_database(dbinfo[i].pubconninfo, true);
880 :
881 : /*
882 : * If an object name was not specified as command-line options, assign
883 : * a generated object name. The replication slot has a different rule.
884 : * The subscription name is assigned to the replication slot name if
885 : * no replication slot is specified. It follows the same rule as
886 : * CREATE SUBSCRIPTION.
887 : */
888 8 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
889 8 : genname = generate_object_name(conn);
890 8 : if (num_pubs == 0)
891 4 : dbinfo[i].pubname = pg_strdup(genname);
892 8 : if (num_subs == 0)
893 6 : dbinfo[i].subname = pg_strdup(genname);
894 8 : if (num_replslots == 0)
895 5 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
896 :
897 8 : if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
898 : {
899 : /* Reuse existing publication on publisher. */
900 1 : pg_log_info("use existing publication \"%s\" in database \"%s\"",
901 : dbinfo[i].pubname, dbinfo[i].dbname);
902 : /* Don't remove pre-existing publication if an error occurs. */
903 1 : dbinfo[i].made_publication = false;
904 : }
905 : else
906 : {
907 : /*
908 : * Create publication on publisher. This step should be executed
909 : * *before* promoting the subscriber to avoid any transactions
910 : * between consistent LSN and the new publication rows (such
911 : * transactions wouldn't see the new publication rows resulting in
912 : * an error).
913 : */
914 7 : create_publication(conn, &dbinfo[i]);
915 : }
916 :
917 : /* Create replication slot on publisher */
918 8 : if (lsn)
919 1 : pg_free(lsn);
920 8 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
921 8 : if (lsn == NULL && !dry_run)
922 0 : exit(1);
923 :
924 : /*
925 : * Since we are using the LSN returned by the last replication slot as
926 : * recovery_target_lsn, this LSN is ahead of the current WAL position
927 : * and the recovery waits until the publisher writes a WAL record to
928 : * reach the target and ends the recovery. On idle systems, this wait
929 : * time is unpredictable and could lead to failure in promoting the
930 : * subscriber. To avoid that, insert a harmless WAL record.
931 : */
932 8 : if (i == num_dbs - 1 && !dry_run)
933 : {
934 : PGresult *res;
935 :
936 1 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
937 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
938 : {
939 0 : pg_log_error("could not write an additional WAL record: %s",
940 : PQresultErrorMessage(res));
941 0 : disconnect_database(conn, true);
942 : }
943 1 : PQclear(res);
944 : }
945 :
946 8 : disconnect_database(conn, false);
947 : }
948 :
949 4 : return lsn;
950 : }
951 :
952 : /*
953 : * Is recovery still in progress?
954 : */
955 : static bool
956 15 : server_is_in_recovery(PGconn *conn)
957 : {
958 : PGresult *res;
959 : int ret;
960 :
961 15 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
962 :
963 15 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
964 : {
965 0 : pg_log_error("could not obtain recovery progress: %s",
966 : PQresultErrorMessage(res));
967 0 : disconnect_database(conn, true);
968 : }
969 :
970 :
971 15 : ret = strcmp("t", PQgetvalue(res, 0, 0));
972 :
973 15 : PQclear(res);
974 :
975 15 : return ret == 0;
976 : }
977 :
978 : static void
979 1 : make_output_dirs(const char *log_basedir)
980 : {
981 : char timestamp[128];
982 : struct timeval tval;
983 : time_t now;
984 : struct tm tmbuf;
985 :
986 : /* Generate timestamp */
987 1 : gettimeofday(&tval, NULL);
988 1 : now = tval.tv_sec;
989 :
990 1 : strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
991 1 : localtime_r(&now, &tmbuf));
992 :
993 : /* Append milliseconds */
994 1 : snprintf(timestamp + strlen(timestamp),
995 1 : sizeof(timestamp) - strlen(timestamp), ".%03u",
996 1 : (unsigned int) (tval.tv_usec / 1000));
997 :
998 : /* Build timestamp directory path */
999 1 : logdir = psprintf("%s/%s", log_basedir, timestamp);
1000 :
1001 : /* Create base directory (ignore if exists) */
1002 1 : if (mkdir(log_basedir, pg_dir_create_mode) < 0 && errno != EEXIST)
1003 0 : pg_fatal("could not create directory \"%s\": %m", log_basedir);
1004 :
1005 : /* Create a timestamp-named subdirectory under the base directory */
1006 1 : if (mkdir(logdir, pg_dir_create_mode) < 0)
1007 0 : pg_fatal("could not create directory \"%s\": %m", logdir);
1008 1 : }
1009 :
1010 : /*
1011 : * Is the primary server ready for logical replication?
1012 : *
1013 : * XXX Does it not allow a synchronous replica?
1014 : */
1015 : static void
1016 6 : check_publisher(const struct LogicalRepInfo *dbinfo)
1017 : {
1018 : PGconn *conn;
1019 : PGresult *res;
1020 6 : bool failed = false;
1021 :
1022 : char *wal_level;
1023 : int max_repslots;
1024 : int cur_repslots;
1025 : int max_walsenders;
1026 : int cur_walsenders;
1027 : int max_prepared_transactions;
1028 : char *max_slot_wal_keep_size;
1029 :
1030 6 : pg_log_info("checking settings on publisher");
1031 :
1032 6 : conn = connect_database(dbinfo[0].pubconninfo, true);
1033 :
1034 : /*
1035 : * If the primary server is in recovery (i.e. cascading replication),
1036 : * objects (publication) cannot be created because it is read only.
1037 : */
1038 6 : if (server_is_in_recovery(conn))
1039 : {
1040 1 : pg_log_error("primary server cannot be in recovery");
1041 1 : disconnect_database(conn, true);
1042 : }
1043 :
1044 : /*------------------------------------------------------------------------
1045 : * Logical replication requires a few parameters to be set on publisher.
1046 : * Since these parameters are not a requirement for physical replication,
1047 : * we should check it to make sure it won't fail.
1048 : *
1049 : * - wal_level >= replica
1050 : * - max_replication_slots >= current + number of dbs to be converted
1051 : * - max_wal_senders >= current + number of dbs to be converted
1052 : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
1053 : * -----------------------------------------------------------------------
1054 : */
1055 5 : res = PQexec(conn,
1056 : "SELECT pg_catalog.current_setting('wal_level'),"
1057 : " pg_catalog.current_setting('max_replication_slots'),"
1058 : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1059 : " pg_catalog.current_setting('max_wal_senders'),"
1060 : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1061 : " pg_catalog.current_setting('max_prepared_transactions'),"
1062 : " pg_catalog.current_setting('max_slot_wal_keep_size')");
1063 :
1064 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1065 : {
1066 0 : pg_log_error("could not obtain publisher settings: %s",
1067 : PQresultErrorMessage(res));
1068 0 : disconnect_database(conn, true);
1069 : }
1070 :
1071 5 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1072 5 : max_repslots = atoi(PQgetvalue(res, 0, 1));
1073 5 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
1074 5 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
1075 5 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
1076 5 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
1077 5 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
1078 :
1079 5 : PQclear(res);
1080 :
1081 5 : pg_log_debug("publisher: wal_level: %s", wal_level);
1082 5 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
1083 5 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
1084 5 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
1085 5 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
1086 5 : pg_log_debug("publisher: max_prepared_transactions: %d",
1087 : max_prepared_transactions);
1088 5 : pg_log_debug("publisher: max_slot_wal_keep_size: %s",
1089 : max_slot_wal_keep_size);
1090 :
1091 5 : disconnect_database(conn, false);
1092 :
1093 5 : if (strcmp(wal_level, "minimal") == 0)
1094 : {
1095 0 : pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
1096 0 : failed = true;
1097 : }
1098 :
1099 5 : if (max_repslots - cur_repslots < num_dbs)
1100 : {
1101 1 : pg_log_error("publisher requires %d replication slots, but only %d remain",
1102 : num_dbs, max_repslots - cur_repslots);
1103 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1104 : "max_replication_slots", cur_repslots + num_dbs);
1105 1 : failed = true;
1106 : }
1107 :
1108 5 : if (max_walsenders - cur_walsenders < num_dbs)
1109 : {
1110 1 : pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
1111 : num_dbs, max_walsenders - cur_walsenders);
1112 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1113 : "max_wal_senders", cur_walsenders + num_dbs);
1114 1 : failed = true;
1115 : }
1116 :
1117 5 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
1118 : {
1119 0 : pg_log_warning("two_phase option will not be enabled for replication slots");
1120 0 : pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
1121 : "Prepared transactions will be replicated at COMMIT PREPARED.");
1122 0 : pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
1123 : }
1124 :
1125 : /*
1126 : * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1127 : * is set to a non-default value, it may cause replication failures due to
1128 : * required WAL files being prematurely removed.
1129 : */
1130 5 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1131 : {
1132 0 : pg_log_warning("required WAL could be removed from the publisher");
1133 0 : pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1134 : "max_slot_wal_keep_size");
1135 : }
1136 :
1137 5 : pg_free(wal_level);
1138 :
1139 5 : if (failed)
1140 1 : exit(1);
1141 4 : }
1142 :
1143 : /*
1144 : * Is the standby server ready for logical replication?
1145 : *
1146 : * XXX Does it not allow a time-delayed replica?
1147 : *
1148 : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1149 : * is S, it cannot detect there is a replica (server C) because server S starts
1150 : * accepting only local connections and server C cannot connect to it. Hence,
1151 : * there is not a reliable way to provide a suitable error saying the server C
1152 : * will be broken at the end of this process (due to pg_resetwal).
1153 : */
1154 : static void
1155 8 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1156 : {
1157 : PGconn *conn;
1158 : PGresult *res;
1159 8 : bool failed = false;
1160 :
1161 : int max_lrworkers;
1162 : int max_replorigins;
1163 : int max_wprocs;
1164 :
1165 8 : pg_log_info("checking settings on subscriber");
1166 :
1167 8 : conn = connect_database(dbinfo[0].subconninfo, true);
1168 :
1169 : /* The target server must be a standby */
1170 8 : if (!server_is_in_recovery(conn))
1171 : {
1172 1 : pg_log_error("target server must be a standby");
1173 1 : disconnect_database(conn, true);
1174 : }
1175 :
1176 : /*------------------------------------------------------------------------
1177 : * Logical replication requires a few parameters to be set on subscriber.
1178 : * Since these parameters are not a requirement for physical replication,
1179 : * we should check it to make sure it won't fail.
1180 : *
1181 : * - max_active_replication_origins >= number of dbs to be converted
1182 : * - max_logical_replication_workers >= number of dbs to be converted
1183 : * - max_worker_processes >= 1 + number of dbs to be converted
1184 : *------------------------------------------------------------------------
1185 : */
1186 7 : res = PQexec(conn,
1187 : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1188 : "'max_logical_replication_workers', "
1189 : "'max_active_replication_origins', "
1190 : "'max_worker_processes', "
1191 : "'primary_slot_name') "
1192 : "ORDER BY name");
1193 :
1194 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1195 : {
1196 0 : pg_log_error("could not obtain subscriber settings: %s",
1197 : PQresultErrorMessage(res));
1198 0 : disconnect_database(conn, true);
1199 : }
1200 :
1201 7 : max_replorigins = atoi(PQgetvalue(res, 0, 0));
1202 7 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1203 7 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1204 7 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1205 6 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1206 :
1207 7 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1208 : max_lrworkers);
1209 7 : pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
1210 7 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1211 7 : if (primary_slot_name)
1212 6 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1213 :
1214 7 : PQclear(res);
1215 :
1216 7 : disconnect_database(conn, false);
1217 :
1218 7 : if (max_replorigins < num_dbs)
1219 : {
1220 1 : pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1221 : num_dbs, max_replorigins);
1222 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1223 : "max_active_replication_origins", num_dbs);
1224 1 : failed = true;
1225 : }
1226 :
1227 7 : if (max_lrworkers < num_dbs)
1228 : {
1229 1 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1230 : num_dbs, max_lrworkers);
1231 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1232 : "max_logical_replication_workers", num_dbs);
1233 1 : failed = true;
1234 : }
1235 :
1236 7 : if (max_wprocs < num_dbs + 1)
1237 : {
1238 1 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1239 : num_dbs + 1, max_wprocs);
1240 1 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1241 : "max_worker_processes", num_dbs + 1);
1242 1 : failed = true;
1243 : }
1244 :
1245 7 : if (failed)
1246 1 : exit(1);
1247 6 : }
1248 :
1249 : /*
1250 : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1251 : * the primary (publisher node) and the newly created subscriber. We
1252 : * shouldn't drop the associated slot as that would be used by the publisher
1253 : * node.
1254 : */
1255 : static void
1256 4 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
1257 : {
1258 4 : PQExpBuffer query = createPQExpBuffer();
1259 : PGresult *res;
1260 :
1261 : Assert(conn != NULL);
1262 :
1263 : /*
1264 : * Construct a query string. These commands are allowed to be executed
1265 : * within a transaction.
1266 : */
1267 4 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1268 : subname);
1269 4 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1270 : subname);
1271 4 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1272 :
1273 4 : if (dry_run)
1274 3 : pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1275 : subname, dbname);
1276 : else
1277 : {
1278 1 : pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1279 : subname, dbname);
1280 :
1281 1 : res = PQexec(conn, query->data);
1282 :
1283 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1284 : {
1285 0 : pg_log_error("could not drop subscription \"%s\": %s",
1286 : subname, PQresultErrorMessage(res));
1287 0 : disconnect_database(conn, true);
1288 : }
1289 :
1290 1 : PQclear(res);
1291 : }
1292 :
1293 4 : destroyPQExpBuffer(query);
1294 4 : }
1295 :
1296 : /*
1297 : * Retrieve and drop the pre-existing subscriptions.
1298 : */
1299 : static void
1300 8 : check_and_drop_existing_subscriptions(PGconn *conn,
1301 : const struct LogicalRepInfo *dbinfo)
1302 : {
1303 8 : PQExpBuffer query = createPQExpBuffer();
1304 : char *dbname;
1305 : PGresult *res;
1306 :
1307 : Assert(conn != NULL);
1308 :
1309 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1310 :
1311 8 : appendPQExpBuffer(query,
1312 : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1313 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1314 : "WHERE d.datname = %s",
1315 : dbname);
1316 8 : res = PQexec(conn, query->data);
1317 :
1318 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1319 : {
1320 0 : pg_log_error("could not obtain pre-existing subscriptions: %s",
1321 : PQresultErrorMessage(res));
1322 0 : disconnect_database(conn, true);
1323 : }
1324 :
1325 12 : for (int i = 0; i < PQntuples(res); i++)
1326 4 : drop_existing_subscription(conn, PQgetvalue(res, i, 0),
1327 4 : dbinfo->dbname);
1328 :
1329 8 : PQclear(res);
1330 8 : destroyPQExpBuffer(query);
1331 8 : PQfreemem(dbname);
1332 8 : }
1333 :
1334 : /*
1335 : * Create the subscriptions, adjust the initial location for logical
1336 : * replication and enable the subscriptions. That's the last step for logical
1337 : * replication setup.
1338 : */
1339 : static void
1340 4 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1341 : {
1342 12 : for (int i = 0; i < num_dbs; i++)
1343 : {
1344 : PGconn *conn;
1345 :
1346 : /* Connect to subscriber. */
1347 8 : conn = connect_database(dbinfo[i].subconninfo, true);
1348 :
1349 : /*
1350 : * We don't need the pre-existing subscriptions on the newly formed
1351 : * subscriber. They can connect to other publisher nodes and either
1352 : * get some unwarranted data or can lead to ERRORs in connecting to
1353 : * such nodes.
1354 : */
1355 8 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1356 :
1357 : /* Check and drop the required publications in the given database. */
1358 8 : check_and_drop_publications(conn, &dbinfo[i]);
1359 :
1360 8 : create_subscription(conn, &dbinfo[i]);
1361 :
1362 : /* Set the replication progress to the correct LSN */
1363 8 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1364 :
1365 : /* Enable subscription */
1366 8 : enable_subscription(conn, &dbinfo[i]);
1367 :
1368 8 : disconnect_database(conn, false);
1369 : }
1370 4 : }
1371 :
1372 : /*
1373 : * Write the required recovery parameters.
1374 : */
1375 : static void
1376 4 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1377 : {
1378 : PGconn *conn;
1379 : PQExpBuffer recoveryconfcontents;
1380 :
1381 : /*
1382 : * Despite of the recovery parameters will be written to the subscriber,
1383 : * use a publisher connection. The primary_conninfo is generated using the
1384 : * connection settings.
1385 : */
1386 4 : conn = connect_database(dbinfo[0].pubconninfo, true);
1387 :
1388 : /*
1389 : * Write recovery parameters.
1390 : *
1391 : * The subscriber is not running yet. In dry run mode, the recovery
1392 : * parameters *won't* be written. An invalid LSN is used for printing
1393 : * purposes. Additional recovery parameters are added here. It avoids
1394 : * unexpected behavior such as end of recovery as soon as a consistent
1395 : * state is reached (recovery_target) and failure due to multiple recovery
1396 : * targets (name, time, xid, LSN).
1397 : */
1398 4 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1399 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1400 4 : appendPQExpBufferStr(recoveryconfcontents,
1401 : "recovery_target_timeline = 'latest'\n");
1402 :
1403 : /*
1404 : * Set recovery_target_inclusive = false to avoid reapplying the
1405 : * transaction committed at 'lsn' after subscription is enabled. This is
1406 : * because the provided 'lsn' is also used as the replication start point
1407 : * for the subscription. So, the server can send the transaction committed
1408 : * at that 'lsn' after replication is started which can lead to applying
1409 : * the same transaction twice if we keep recovery_target_inclusive = true.
1410 : */
1411 4 : appendPQExpBufferStr(recoveryconfcontents,
1412 : "recovery_target_inclusive = false\n");
1413 4 : appendPQExpBufferStr(recoveryconfcontents,
1414 : "recovery_target_action = promote\n");
1415 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1416 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1417 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1418 :
1419 4 : if (dry_run)
1420 : {
1421 3 : appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1422 3 : appendPQExpBuffer(recoveryconfcontents,
1423 : "recovery_target_lsn = '%X/%08X'\n",
1424 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1425 : }
1426 : else
1427 : {
1428 1 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1429 : lsn);
1430 : }
1431 :
1432 4 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1433 :
1434 4 : if (!dry_run)
1435 : {
1436 : char conf_filename[MAXPGPATH];
1437 : FILE *fd;
1438 :
1439 : /* Write the recovery parameters to INCLUDED_CONF_FILE */
1440 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
1441 : INCLUDED_CONF_FILE);
1442 1 : fd = fopen(conf_filename, "w");
1443 1 : if (fd == NULL)
1444 0 : pg_fatal("could not open file \"%s\": %m", conf_filename);
1445 :
1446 1 : if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
1447 0 : pg_fatal("could not write to file \"%s\": %m", conf_filename);
1448 :
1449 1 : fclose(fd);
1450 1 : recovery_params_set = true;
1451 :
1452 : /* Include conditionally the recovery parameters. */
1453 1 : resetPQExpBuffer(recoveryconfcontents);
1454 1 : appendPQExpBufferStr(recoveryconfcontents,
1455 : "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1456 1 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1457 : }
1458 :
1459 4 : disconnect_database(conn, false);
1460 4 : }
1461 :
1462 : /*
1463 : * Drop physical replication slot on primary if the standby was using it. After
1464 : * the transformation, it has no use.
1465 : *
1466 : * XXX we might not fail here. Instead, we provide a warning so the user
1467 : * eventually drops this replication slot later.
1468 : */
1469 : static void
1470 4 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1471 : {
1472 : PGconn *conn;
1473 :
1474 : /* Replication slot does not exist, do nothing */
1475 4 : if (!primary_slot_name)
1476 0 : return;
1477 :
1478 4 : conn = connect_database(dbinfo[0].pubconninfo, false);
1479 4 : if (conn != NULL)
1480 : {
1481 4 : drop_replication_slot(conn, &dbinfo[0], slotname);
1482 4 : disconnect_database(conn, false);
1483 : }
1484 : else
1485 : {
1486 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1487 : slotname);
1488 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1489 : }
1490 : }
1491 :
1492 : /*
1493 : * Drop failover replication slots on subscriber. After the transformation,
1494 : * they have no use.
1495 : *
1496 : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1497 : * them later.
1498 : */
1499 : static void
1500 4 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1501 : {
1502 : PGconn *conn;
1503 : PGresult *res;
1504 :
1505 4 : conn = connect_database(dbinfo[0].subconninfo, false);
1506 4 : if (conn != NULL)
1507 : {
1508 : /* Get failover replication slot names */
1509 4 : res = PQexec(conn,
1510 : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1511 :
1512 4 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1513 : {
1514 : /* Remove failover replication slots from subscriber */
1515 8 : for (int i = 0; i < PQntuples(res); i++)
1516 4 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1517 : }
1518 : else
1519 : {
1520 0 : pg_log_warning("could not obtain failover replication slot information: %s",
1521 : PQresultErrorMessage(res));
1522 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1523 : }
1524 :
1525 4 : PQclear(res);
1526 4 : disconnect_database(conn, false);
1527 : }
1528 : else
1529 : {
1530 0 : pg_log_warning("could not drop failover replication slot");
1531 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1532 : }
1533 4 : }
1534 :
1535 : /*
1536 : * Create a logical replication slot and returns a LSN.
1537 : *
1538 : * CreateReplicationSlot() is not used because it does not provide the one-row
1539 : * result set that contains the LSN.
1540 : */
1541 : static char *
1542 8 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1543 : {
1544 8 : PQExpBuffer str = createPQExpBuffer();
1545 8 : PGresult *res = NULL;
1546 8 : const char *slot_name = dbinfo->replslotname;
1547 : char *slot_name_esc;
1548 8 : char *lsn = NULL;
1549 :
1550 : Assert(conn != NULL);
1551 :
1552 8 : if (dry_run)
1553 6 : pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1554 : slot_name, dbinfo->dbname);
1555 : else
1556 2 : pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1557 : slot_name, dbinfo->dbname);
1558 :
1559 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1560 :
1561 8 : appendPQExpBuffer(str,
1562 : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1563 : slot_name_esc,
1564 8 : dbinfos.two_phase ? "true" : "false");
1565 :
1566 8 : PQfreemem(slot_name_esc);
1567 :
1568 8 : pg_log_debug("command is: %s", str->data);
1569 :
1570 8 : if (!dry_run)
1571 : {
1572 2 : res = PQexec(conn, str->data);
1573 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1574 : {
1575 0 : pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1576 : slot_name, dbinfo->dbname,
1577 : PQresultErrorMessage(res));
1578 0 : PQclear(res);
1579 0 : destroyPQExpBuffer(str);
1580 0 : return NULL;
1581 : }
1582 :
1583 2 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1584 2 : PQclear(res);
1585 : }
1586 :
1587 : /* For cleanup purposes */
1588 8 : dbinfo->made_replslot = true;
1589 :
1590 8 : destroyPQExpBuffer(str);
1591 :
1592 8 : return lsn;
1593 : }
1594 :
1595 : static void
1596 8 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1597 : const char *slot_name)
1598 : {
1599 8 : PQExpBuffer str = createPQExpBuffer();
1600 : char *slot_name_esc;
1601 : PGresult *res;
1602 :
1603 : Assert(conn != NULL);
1604 :
1605 8 : if (dry_run)
1606 6 : pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1607 : slot_name, dbinfo->dbname);
1608 : else
1609 2 : pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1610 : slot_name, dbinfo->dbname);
1611 :
1612 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1613 :
1614 8 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1615 :
1616 8 : PQfreemem(slot_name_esc);
1617 :
1618 8 : pg_log_debug("command is: %s", str->data);
1619 :
1620 8 : if (!dry_run)
1621 : {
1622 2 : res = PQexec(conn, str->data);
1623 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1624 : {
1625 0 : pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1626 : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1627 0 : dbinfo->made_replslot = false; /* don't try again. */
1628 : }
1629 :
1630 2 : PQclear(res);
1631 : }
1632 :
1633 8 : destroyPQExpBuffer(str);
1634 8 : }
1635 :
1636 : /*
1637 : * Reports a suitable message if pg_ctl fails.
1638 : */
1639 : static void
1640 24 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1641 : {
1642 24 : if (rc != 0)
1643 : {
1644 0 : if (WIFEXITED(rc))
1645 : {
1646 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1647 : }
1648 0 : else if (WIFSIGNALED(rc))
1649 : {
1650 : #if defined(WIN32)
1651 : pg_log_error("pg_ctl was terminated by exception 0x%X",
1652 : WTERMSIG(rc));
1653 : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1654 : #else
1655 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1656 : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1657 : #endif
1658 : }
1659 : else
1660 : {
1661 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1662 : }
1663 :
1664 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1665 0 : exit(1);
1666 : }
1667 24 : }
1668 :
1669 : static void
1670 12 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1671 : bool restrict_logical_worker)
1672 : {
1673 12 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1674 : int rc;
1675 :
1676 12 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1677 12 : appendShellString(pg_ctl_cmd, subscriber_dir);
1678 12 : appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1679 :
1680 : /* Prevent unintended slot invalidation */
1681 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1682 :
1683 12 : if (restricted_access)
1684 : {
1685 12 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1686 : #if !defined(WIN32)
1687 :
1688 : /*
1689 : * An empty listen_addresses list means the server does not listen on
1690 : * any IP interfaces; only Unix-domain sockets can be used to connect
1691 : * to the server. Prevent external connections to minimize the chance
1692 : * of failure.
1693 : */
1694 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1695 12 : if (opt->socket_dir)
1696 12 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1697 12 : opt->socket_dir);
1698 12 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1699 : #endif
1700 : }
1701 12 : if (opt->config_file != NULL)
1702 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1703 0 : opt->config_file);
1704 :
1705 : /* Suppress to start logical replication if requested */
1706 12 : if (restrict_logical_worker)
1707 4 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1708 :
1709 12 : if (opt->log_dir)
1710 2 : appendPQExpBuffer(pg_ctl_cmd, " -l \"%s/%s\"", logdir, SERVER_LOG_FILE_NAME);
1711 :
1712 12 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1713 12 : rc = system(pg_ctl_cmd->data);
1714 12 : pg_ctl_status(pg_ctl_cmd->data, rc);
1715 12 : standby_running = true;
1716 12 : destroyPQExpBuffer(pg_ctl_cmd);
1717 12 : pg_log_info("server was started");
1718 12 : }
1719 :
1720 : static void
1721 12 : stop_standby_server(const char *datadir)
1722 : {
1723 : char *pg_ctl_cmd;
1724 : int rc;
1725 :
1726 12 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1727 : datadir);
1728 12 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1729 12 : rc = system(pg_ctl_cmd);
1730 12 : pg_ctl_status(pg_ctl_cmd, rc);
1731 12 : standby_running = false;
1732 12 : pg_log_info("server was stopped");
1733 12 : }
1734 :
1735 : /*
1736 : * Returns after the server finishes the recovery process.
1737 : *
1738 : * If recovery_timeout option is set, terminate abnormally without finishing
1739 : * the recovery process. By default, it waits forever.
1740 : *
1741 : * XXX Is the recovery process still in progress? When recovery process has a
1742 : * better progress reporting mechanism, it should be added here.
1743 : */
1744 : static void
1745 4 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1746 : {
1747 : PGconn *conn;
1748 4 : bool ready = false;
1749 4 : int timer = 0;
1750 :
1751 4 : pg_log_info("waiting for the target server to reach the consistent state");
1752 :
1753 4 : conn = connect_database(conninfo, true);
1754 :
1755 : for (;;)
1756 : {
1757 : /* Did the recovery process finish? We're done if so. */
1758 4 : if (dry_run || !server_is_in_recovery(conn))
1759 : {
1760 4 : ready = true;
1761 4 : recovery_ended = true;
1762 4 : break;
1763 : }
1764 :
1765 : /* Bail out after recovery_timeout seconds if this option is set */
1766 0 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1767 : {
1768 0 : stop_standby_server(subscriber_dir);
1769 0 : pg_log_error("recovery timed out");
1770 0 : disconnect_database(conn, true);
1771 : }
1772 :
1773 : /* Keep waiting */
1774 0 : pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
1775 0 : timer += WAIT_INTERVAL;
1776 : }
1777 :
1778 4 : disconnect_database(conn, false);
1779 :
1780 4 : if (!ready)
1781 0 : pg_fatal("server did not end recovery");
1782 :
1783 4 : pg_log_info("target server reached the consistent state");
1784 4 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1785 4 : }
1786 :
1787 : /*
1788 : * Create a publication that includes all tables in the database.
1789 : */
1790 : static void
1791 7 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1792 : {
1793 7 : PQExpBuffer str = createPQExpBuffer();
1794 : PGresult *res;
1795 : char *ipubname_esc;
1796 : char *spubname_esc;
1797 :
1798 : Assert(conn != NULL);
1799 :
1800 7 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1801 7 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1802 :
1803 : /* Check if the publication already exists */
1804 7 : appendPQExpBuffer(str,
1805 : "SELECT 1 FROM pg_catalog.pg_publication "
1806 : "WHERE pubname = %s",
1807 : spubname_esc);
1808 7 : res = PQexec(conn, str->data);
1809 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1810 : {
1811 0 : pg_log_error("could not obtain publication information: %s",
1812 : PQresultErrorMessage(res));
1813 0 : disconnect_database(conn, true);
1814 : }
1815 :
1816 7 : if (PQntuples(res) == 1)
1817 : {
1818 : /*
1819 : * Unfortunately, if it reaches this code path, it will always fail
1820 : * (unless you decide to change the existing publication name). That's
1821 : * bad but it is very unlikely that the user will choose a name with
1822 : * pg_createsubscriber_ prefix followed by the exact database oid and
1823 : * a random number.
1824 : */
1825 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1826 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
1827 0 : disconnect_database(conn, true);
1828 : }
1829 :
1830 7 : PQclear(res);
1831 7 : resetPQExpBuffer(str);
1832 :
1833 7 : if (dry_run)
1834 6 : pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1835 : dbinfo->pubname, dbinfo->dbname);
1836 : else
1837 1 : pg_log_info("creating publication \"%s\" in database \"%s\"",
1838 : dbinfo->pubname, dbinfo->dbname);
1839 :
1840 7 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1841 : ipubname_esc);
1842 :
1843 7 : pg_log_debug("command is: %s", str->data);
1844 :
1845 7 : if (!dry_run)
1846 : {
1847 1 : res = PQexec(conn, str->data);
1848 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1849 : {
1850 0 : pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1851 : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1852 0 : disconnect_database(conn, true);
1853 : }
1854 1 : PQclear(res);
1855 : }
1856 :
1857 : /* For cleanup purposes */
1858 7 : dbinfo->made_publication = true;
1859 :
1860 7 : PQfreemem(ipubname_esc);
1861 7 : PQfreemem(spubname_esc);
1862 7 : destroyPQExpBuffer(str);
1863 7 : }
1864 :
1865 : /*
1866 : * Drop the specified publication in the given database.
1867 : */
1868 : static void
1869 10 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1870 : bool *made_publication)
1871 : {
1872 10 : PQExpBuffer str = createPQExpBuffer();
1873 : PGresult *res;
1874 : char *pubname_esc;
1875 :
1876 : Assert(conn != NULL);
1877 :
1878 10 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1879 :
1880 10 : if (dry_run)
1881 6 : pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1882 : pubname, dbname);
1883 : else
1884 4 : pg_log_info("dropping publication \"%s\" in database \"%s\"",
1885 : pubname, dbname);
1886 :
1887 10 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1888 :
1889 10 : PQfreemem(pubname_esc);
1890 :
1891 10 : pg_log_debug("command is: %s", str->data);
1892 :
1893 10 : if (!dry_run)
1894 : {
1895 4 : res = PQexec(conn, str->data);
1896 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1897 : {
1898 0 : pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1899 : pubname, dbname, PQresultErrorMessage(res));
1900 0 : *made_publication = false; /* don't try again. */
1901 :
1902 : /*
1903 : * Don't disconnect and exit here. This routine is used by primary
1904 : * (cleanup publication / replication slot due to an error) and
1905 : * subscriber (remove the replicated publications). In both cases,
1906 : * it can continue and provide instructions for the user to remove
1907 : * it later if cleanup fails.
1908 : */
1909 : }
1910 4 : PQclear(res);
1911 : }
1912 :
1913 10 : destroyPQExpBuffer(str);
1914 10 : }
1915 :
1916 : /*
1917 : * Retrieve and drop the publications.
1918 : *
1919 : * Publications copied during physical replication remain on the subscriber
1920 : * after promotion. If --clean=publications is specified, drop all existing
1921 : * publications in the subscriber database. Otherwise, only drop publications
1922 : * that were created by pg_createsubscriber during this operation.
1923 : */
1924 : static void
1925 8 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
1926 : {
1927 : PGresult *res;
1928 8 : bool drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
1929 :
1930 : Assert(conn != NULL);
1931 :
1932 8 : if (drop_all_pubs)
1933 : {
1934 2 : pg_log_info("dropping all existing publications in database \"%s\"",
1935 : dbinfo->dbname);
1936 :
1937 : /* Fetch all publication names */
1938 2 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1939 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1940 : {
1941 0 : pg_log_error("could not obtain publication information: %s",
1942 : PQresultErrorMessage(res));
1943 0 : PQclear(res);
1944 0 : disconnect_database(conn, true);
1945 : }
1946 :
1947 : /* Drop each publication */
1948 6 : for (int i = 0; i < PQntuples(res); i++)
1949 4 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1950 : &dbinfo->made_publication);
1951 :
1952 2 : PQclear(res);
1953 : }
1954 : else
1955 : {
1956 : /* Drop publication only if it was created by this tool */
1957 6 : if (dbinfo->made_publication)
1958 : {
1959 6 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1960 : &dbinfo->made_publication);
1961 : }
1962 : else
1963 : {
1964 0 : if (dry_run)
1965 0 : pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
1966 : dbinfo->pubname, dbinfo->dbname);
1967 : else
1968 0 : pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
1969 : dbinfo->pubname, dbinfo->dbname);
1970 : }
1971 : }
1972 8 : }
1973 :
1974 : /*
1975 : * Create a subscription with some predefined options.
1976 : *
1977 : * A replication slot was already created in a previous step. Let's use it. It
1978 : * is not required to copy data. The subscription will be created but it will
1979 : * not be enabled now. That's because the replication progress must be set and
1980 : * the replication origin name (one of the function arguments) contains the
1981 : * subscription OID in its name. Once the subscription is created,
1982 : * set_replication_progress() can obtain the chosen origin name and set up its
1983 : * initial location.
1984 : */
1985 : static void
1986 8 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1987 : {
1988 8 : PQExpBuffer str = createPQExpBuffer();
1989 : PGresult *res;
1990 : char *pubname_esc;
1991 : char *subname_esc;
1992 : char *pubconninfo_esc;
1993 : char *replslotname_esc;
1994 :
1995 : Assert(conn != NULL);
1996 :
1997 8 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1998 8 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1999 8 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
2000 8 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
2001 :
2002 8 : if (dry_run)
2003 6 : pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
2004 : dbinfo->subname, dbinfo->dbname);
2005 : else
2006 2 : pg_log_info("creating subscription \"%s\" in database \"%s\"",
2007 : dbinfo->subname, dbinfo->dbname);
2008 :
2009 8 : appendPQExpBuffer(str,
2010 : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2011 : "WITH (create_slot = false, enabled = false, "
2012 : "slot_name = %s, copy_data = false, two_phase = %s)",
2013 : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
2014 8 : dbinfos.two_phase ? "true" : "false");
2015 :
2016 8 : PQfreemem(pubname_esc);
2017 8 : PQfreemem(subname_esc);
2018 8 : PQfreemem(pubconninfo_esc);
2019 8 : PQfreemem(replslotname_esc);
2020 :
2021 8 : pg_log_debug("command is: %s", str->data);
2022 :
2023 8 : if (!dry_run)
2024 : {
2025 2 : res = PQexec(conn, str->data);
2026 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2027 : {
2028 0 : pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
2029 : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
2030 0 : disconnect_database(conn, true);
2031 : }
2032 2 : PQclear(res);
2033 : }
2034 :
2035 8 : destroyPQExpBuffer(str);
2036 8 : }
2037 :
2038 : /*
2039 : * Sets the replication progress to the consistent LSN.
2040 : *
2041 : * The subscriber caught up to the consistent LSN provided by the last
2042 : * replication slot that was created. The goal is to set up the initial
2043 : * location for the logical replication that is the exact LSN that the
2044 : * subscriber was promoted. Once the subscription is enabled it will start
2045 : * streaming from that location onwards. In dry run mode, the subscription OID
2046 : * and LSN are set to invalid values for printing purposes.
2047 : */
2048 : static void
2049 8 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
2050 : {
2051 8 : PQExpBuffer str = createPQExpBuffer();
2052 : PGresult *res;
2053 : Oid suboid;
2054 : char *subname;
2055 : char *dbname;
2056 : char *originname;
2057 : char *lsnstr;
2058 :
2059 : Assert(conn != NULL);
2060 :
2061 8 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2062 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2063 :
2064 8 : appendPQExpBuffer(str,
2065 : "SELECT s.oid FROM pg_catalog.pg_subscription s "
2066 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2067 : "WHERE s.subname = %s AND d.datname = %s",
2068 : subname, dbname);
2069 :
2070 8 : res = PQexec(conn, str->data);
2071 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2072 : {
2073 0 : pg_log_error("could not obtain subscription OID: %s",
2074 : PQresultErrorMessage(res));
2075 0 : disconnect_database(conn, true);
2076 : }
2077 :
2078 8 : if (PQntuples(res) != 1 && !dry_run)
2079 : {
2080 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
2081 : PQntuples(res), 1);
2082 0 : disconnect_database(conn, true);
2083 : }
2084 :
2085 8 : if (dry_run)
2086 : {
2087 6 : suboid = InvalidOid;
2088 6 : lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
2089 : }
2090 : else
2091 : {
2092 2 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2093 2 : lsnstr = psprintf("%s", lsn);
2094 : }
2095 :
2096 8 : PQclear(res);
2097 :
2098 : /*
2099 : * The origin name is defined as pg_%u. %u is the subscription OID. See
2100 : * ApplyWorkerMain().
2101 : */
2102 8 : originname = psprintf("pg_%u", suboid);
2103 :
2104 8 : if (dry_run)
2105 6 : pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2106 : originname, lsnstr, dbinfo->dbname);
2107 : else
2108 2 : pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2109 : originname, lsnstr, dbinfo->dbname);
2110 :
2111 8 : resetPQExpBuffer(str);
2112 8 : appendPQExpBuffer(str,
2113 : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2114 : originname, lsnstr);
2115 :
2116 8 : pg_log_debug("command is: %s", str->data);
2117 :
2118 8 : if (!dry_run)
2119 : {
2120 2 : res = PQexec(conn, str->data);
2121 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2122 : {
2123 0 : pg_log_error("could not set replication progress for subscription \"%s\": %s",
2124 : dbinfo->subname, PQresultErrorMessage(res));
2125 0 : disconnect_database(conn, true);
2126 : }
2127 2 : PQclear(res);
2128 : }
2129 :
2130 8 : PQfreemem(subname);
2131 8 : PQfreemem(dbname);
2132 8 : pg_free(originname);
2133 8 : pg_free(lsnstr);
2134 8 : destroyPQExpBuffer(str);
2135 8 : }
2136 :
2137 : /*
2138 : * Enables the subscription.
2139 : *
2140 : * The subscription was created in a previous step but it was disabled. After
2141 : * adjusting the initial logical replication location, enable the subscription.
2142 : */
2143 : static void
2144 8 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
2145 : {
2146 8 : PQExpBuffer str = createPQExpBuffer();
2147 : PGresult *res;
2148 : char *subname;
2149 :
2150 : Assert(conn != NULL);
2151 :
2152 8 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2153 :
2154 8 : if (dry_run)
2155 6 : pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
2156 : dbinfo->subname, dbinfo->dbname);
2157 : else
2158 2 : pg_log_info("enabling subscription \"%s\" in database \"%s\"",
2159 : dbinfo->subname, dbinfo->dbname);
2160 :
2161 8 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2162 :
2163 8 : pg_log_debug("command is: %s", str->data);
2164 :
2165 8 : if (!dry_run)
2166 : {
2167 2 : res = PQexec(conn, str->data);
2168 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2169 : {
2170 0 : pg_log_error("could not enable subscription \"%s\": %s",
2171 : dbinfo->subname, PQresultErrorMessage(res));
2172 0 : disconnect_database(conn, true);
2173 : }
2174 :
2175 2 : PQclear(res);
2176 : }
2177 :
2178 8 : PQfreemem(subname);
2179 8 : destroyPQExpBuffer(str);
2180 8 : }
2181 :
2182 : /*
2183 : * Fetch a list of all connectable non-template databases from the source server
2184 : * and form a list such that they appear as if the user has specified multiple
2185 : * --database options, one for each source database.
2186 : */
2187 : static void
2188 1 : get_publisher_databases(struct CreateSubscriberOptions *opt,
2189 : bool dbnamespecified)
2190 : {
2191 : PGconn *conn;
2192 : PGresult *res;
2193 :
2194 : /* If a database name was specified, just connect to it. */
2195 1 : if (dbnamespecified)
2196 0 : conn = connect_database(opt->pub_conninfo_str, true);
2197 : else
2198 : {
2199 : /* Otherwise, try postgres first and then template1. */
2200 : char *conninfo;
2201 :
2202 1 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2203 1 : conn = connect_database(conninfo, false);
2204 1 : pg_free(conninfo);
2205 1 : if (!conn)
2206 : {
2207 0 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2208 0 : conn = connect_database(conninfo, true);
2209 0 : pg_free(conninfo);
2210 : }
2211 : }
2212 :
2213 1 : res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2214 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2215 : {
2216 0 : pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2217 0 : PQclear(res);
2218 0 : disconnect_database(conn, true);
2219 : }
2220 :
2221 4 : for (int i = 0; i < PQntuples(res); i++)
2222 : {
2223 3 : const char *dbname = PQgetvalue(res, i, 0);
2224 :
2225 3 : simple_string_list_append(&opt->database_names, dbname);
2226 :
2227 : /* Increment num_dbs to reflect multiple --database options */
2228 3 : num_dbs++;
2229 : }
2230 :
2231 1 : PQclear(res);
2232 1 : disconnect_database(conn, false);
2233 1 : }
2234 :
2235 : int
2236 23 : main(int argc, char **argv)
2237 : {
2238 : static struct option long_options[] =
2239 : {
2240 : {"all", no_argument, NULL, 'a'},
2241 : {"database", required_argument, NULL, 'd'},
2242 : {"pgdata", required_argument, NULL, 'D'},
2243 : {"logdir", required_argument, NULL, 'l'},
2244 : {"dry-run", no_argument, NULL, 'n'},
2245 : {"subscriber-port", required_argument, NULL, 'p'},
2246 : {"publisher-server", required_argument, NULL, 'P'},
2247 : {"socketdir", required_argument, NULL, 's'},
2248 : {"recovery-timeout", required_argument, NULL, 't'},
2249 : {"enable-two-phase", no_argument, NULL, 'T'},
2250 : {"subscriber-username", required_argument, NULL, 'U'},
2251 : {"verbose", no_argument, NULL, 'v'},
2252 : {"version", no_argument, NULL, 'V'},
2253 : {"help", no_argument, NULL, '?'},
2254 : {"config-file", required_argument, NULL, 1},
2255 : {"publication", required_argument, NULL, 2},
2256 : {"replication-slot", required_argument, NULL, 3},
2257 : {"subscription", required_argument, NULL, 4},
2258 : {"clean", required_argument, NULL, 5},
2259 : {NULL, 0, NULL, 0}
2260 : };
2261 :
2262 23 : struct CreateSubscriberOptions opt = {0};
2263 :
2264 : int c;
2265 : int option_index;
2266 :
2267 : char *pub_base_conninfo;
2268 : char *sub_base_conninfo;
2269 23 : char *dbname_conninfo = NULL;
2270 :
2271 : uint64 pub_sysid;
2272 : uint64 sub_sysid;
2273 : struct stat statbuf;
2274 :
2275 : char *consistent_lsn;
2276 :
2277 : char pidfile[MAXPGPATH];
2278 :
2279 23 : pg_logging_init(argv[0]);
2280 23 : pg_logging_set_level(PG_LOG_WARNING);
2281 23 : progname = get_progname(argv[0]);
2282 23 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2283 :
2284 23 : if (argc > 1)
2285 : {
2286 22 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2287 : {
2288 1 : usage();
2289 1 : exit(0);
2290 : }
2291 21 : else if (strcmp(argv[1], "-V") == 0
2292 21 : || strcmp(argv[1], "--version") == 0)
2293 : {
2294 1 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2295 1 : exit(0);
2296 : }
2297 : }
2298 :
2299 : /* Default settings */
2300 21 : subscriber_dir = NULL;
2301 21 : opt.config_file = NULL;
2302 21 : opt.log_dir = NULL;
2303 21 : opt.pub_conninfo_str = NULL;
2304 21 : opt.socket_dir = NULL;
2305 21 : opt.sub_port = DEFAULT_SUB_PORT;
2306 21 : opt.sub_username = NULL;
2307 21 : opt.two_phase = false;
2308 21 : opt.database_names = (SimpleStringList)
2309 : {
2310 : 0
2311 : };
2312 21 : opt.recovery_timeout = 0;
2313 21 : opt.all_dbs = false;
2314 :
2315 : /*
2316 : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2317 : * it either.
2318 : */
2319 : #ifndef WIN32
2320 21 : if (geteuid() == 0)
2321 : {
2322 0 : pg_log_error("cannot be executed by \"root\"");
2323 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2324 : progname);
2325 0 : exit(1);
2326 : }
2327 : #endif
2328 :
2329 21 : get_restricted_token();
2330 :
2331 163 : while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
2332 163 : long_options, &option_index)) != -1)
2333 : {
2334 145 : switch (c)
2335 : {
2336 3 : case 'a':
2337 3 : opt.all_dbs = true;
2338 3 : break;
2339 25 : case 'd':
2340 25 : if (!simple_string_list_member(&opt.database_names, optarg))
2341 : {
2342 24 : simple_string_list_append(&opt.database_names, optarg);
2343 24 : num_dbs++;
2344 : }
2345 : else
2346 1 : pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2347 24 : break;
2348 19 : case 'D':
2349 19 : subscriber_dir = pg_strdup(optarg);
2350 19 : canonicalize_path(subscriber_dir);
2351 19 : break;
2352 1 : case 'l':
2353 1 : opt.log_dir = pg_strdup(optarg);
2354 1 : canonicalize_path(opt.log_dir);
2355 1 : break;
2356 9 : case 'n':
2357 9 : dry_run = true;
2358 9 : break;
2359 12 : case 'p':
2360 12 : opt.sub_port = pg_strdup(optarg);
2361 12 : break;
2362 18 : case 'P':
2363 18 : opt.pub_conninfo_str = pg_strdup(optarg);
2364 18 : break;
2365 12 : case 's':
2366 12 : opt.socket_dir = pg_strdup(optarg);
2367 12 : canonicalize_path(opt.socket_dir);
2368 12 : break;
2369 3 : case 't':
2370 3 : opt.recovery_timeout = atoi(optarg);
2371 3 : break;
2372 1 : case 'T':
2373 1 : opt.two_phase = true;
2374 1 : break;
2375 0 : case 'U':
2376 0 : opt.sub_username = pg_strdup(optarg);
2377 0 : break;
2378 19 : case 'v':
2379 19 : pg_logging_increase_verbosity();
2380 19 : break;
2381 0 : case 1:
2382 0 : opt.config_file = pg_strdup(optarg);
2383 0 : break;
2384 12 : case 2:
2385 12 : if (!simple_string_list_member(&opt.pub_names, optarg))
2386 : {
2387 11 : simple_string_list_append(&opt.pub_names, optarg);
2388 11 : num_pubs++;
2389 : }
2390 : else
2391 1 : pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2392 11 : break;
2393 4 : case 3:
2394 4 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2395 : {
2396 4 : simple_string_list_append(&opt.replslot_names, optarg);
2397 4 : num_replslots++;
2398 : }
2399 : else
2400 0 : pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2401 4 : break;
2402 5 : case 4:
2403 5 : if (!simple_string_list_member(&opt.sub_names, optarg))
2404 : {
2405 5 : simple_string_list_append(&opt.sub_names, optarg);
2406 5 : num_subs++;
2407 : }
2408 : else
2409 0 : pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2410 5 : break;
2411 1 : case 5:
2412 1 : if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
2413 1 : simple_string_list_append(&opt.objecttypes_to_clean, optarg);
2414 : else
2415 0 : pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2416 1 : break;
2417 1 : default:
2418 : /* getopt_long already emitted a complaint */
2419 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2420 1 : exit(1);
2421 : }
2422 : }
2423 :
2424 : /* Validate that --all is not used with incompatible options */
2425 18 : if (opt.all_dbs)
2426 : {
2427 3 : char *bad_switch = NULL;
2428 :
2429 3 : if (num_dbs > 0)
2430 1 : bad_switch = "--database";
2431 2 : else if (num_pubs > 0)
2432 1 : bad_switch = "--publication";
2433 1 : else if (num_replslots > 0)
2434 0 : bad_switch = "--replication-slot";
2435 1 : else if (num_subs > 0)
2436 0 : bad_switch = "--subscription";
2437 :
2438 3 : if (bad_switch)
2439 : {
2440 2 : pg_log_error("options %s and %s cannot be used together",
2441 : bad_switch, "-a/--all");
2442 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2443 2 : exit(1);
2444 : }
2445 : }
2446 :
2447 : /* Any non-option arguments? */
2448 16 : if (optind < argc)
2449 : {
2450 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
2451 : argv[optind]);
2452 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2453 0 : exit(1);
2454 : }
2455 :
2456 : /* Required arguments */
2457 16 : if (subscriber_dir == NULL)
2458 : {
2459 1 : pg_log_error("no subscriber data directory specified");
2460 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2461 1 : exit(1);
2462 : }
2463 :
2464 : /* If socket directory is not provided, use the current directory */
2465 15 : if (opt.socket_dir == NULL)
2466 : {
2467 : char cwd[MAXPGPATH];
2468 :
2469 5 : if (!getcwd(cwd, MAXPGPATH))
2470 0 : pg_fatal("could not determine current directory");
2471 5 : opt.socket_dir = pg_strdup(cwd);
2472 5 : canonicalize_path(opt.socket_dir);
2473 : }
2474 :
2475 : /*
2476 : * Parse connection string. Build a base connection string that might be
2477 : * reused by multiple databases.
2478 : */
2479 15 : if (opt.pub_conninfo_str == NULL)
2480 : {
2481 : /*
2482 : * TODO use primary_conninfo (if available) from subscriber and
2483 : * extract publisher connection string. Assume that there are
2484 : * identical entries for physical and logical replication. If there is
2485 : * not, we would fail anyway.
2486 : */
2487 1 : pg_log_error("no publisher connection string specified");
2488 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2489 1 : exit(1);
2490 : }
2491 :
2492 14 : if (opt.log_dir != NULL)
2493 : {
2494 : char *internal_log_file;
2495 : FILE *internal_log_file_fp;
2496 :
2497 1 : umask(PG_MODE_MASK_OWNER);
2498 :
2499 : /*
2500 : * Set mask based on PGDATA permissions, needed for the creation of
2501 : * the output directories with correct permissions, similar with
2502 : * pg_ctl and pg_upgrade.
2503 : *
2504 : * Don't error here if the data directory cannot be stat'd. Upcoming
2505 : * checks for the data directory would raise the fatal error later.
2506 : */
2507 1 : if (GetDataDirectoryCreatePerm(subscriber_dir))
2508 1 : umask(pg_mode_mask);
2509 :
2510 1 : make_output_dirs(opt.log_dir);
2511 1 : internal_log_file = psprintf("%s/%s", logdir, INTERNAL_LOG_FILE_NAME);
2512 :
2513 1 : internal_log_file_fp = fopen(internal_log_file, "a");
2514 1 : if (!internal_log_file_fp)
2515 0 : pg_fatal("could not open log file \"%s\": %m", internal_log_file);
2516 :
2517 1 : pg_free(internal_log_file);
2518 :
2519 1 : pg_logging_set_logfile(internal_log_file_fp);
2520 : }
2521 :
2522 14 : if (dry_run)
2523 8 : pg_log_info("Executing in dry-run mode.\n"
2524 : "The target directory will not be modified.");
2525 :
2526 14 : pg_log_info("validating publisher connection string");
2527 14 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2528 : &dbname_conninfo);
2529 14 : if (pub_base_conninfo == NULL)
2530 0 : exit(1);
2531 :
2532 14 : pg_log_info("validating subscriber connection string");
2533 14 : sub_base_conninfo = get_sub_conninfo(&opt);
2534 :
2535 : /*
2536 : * Fetch all databases from the source (publisher) and treat them as if
2537 : * the user specified has multiple --database options, one for each source
2538 : * database.
2539 : */
2540 14 : if (opt.all_dbs)
2541 : {
2542 1 : bool dbnamespecified = (dbname_conninfo != NULL);
2543 :
2544 1 : get_publisher_databases(&opt, dbnamespecified);
2545 : }
2546 :
2547 14 : if (opt.database_names.head == NULL)
2548 : {
2549 2 : pg_log_info("no database was specified");
2550 :
2551 : /*
2552 : * Try to obtain the dbname from the publisher conninfo. If dbname
2553 : * parameter is not available, error out.
2554 : */
2555 2 : if (dbname_conninfo)
2556 : {
2557 1 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2558 1 : num_dbs++;
2559 :
2560 1 : pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2561 : dbname_conninfo);
2562 : }
2563 : else
2564 : {
2565 1 : pg_log_error("no database name specified");
2566 1 : pg_log_error_hint("Try \"%s --help\" for more information.",
2567 : progname);
2568 1 : exit(1);
2569 : }
2570 : }
2571 :
2572 : /* Number of object names must match number of databases */
2573 13 : if (num_pubs > 0 && num_pubs != num_dbs)
2574 : {
2575 1 : pg_log_error("wrong number of publication names specified");
2576 1 : pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2577 : num_pubs, num_dbs);
2578 1 : exit(1);
2579 : }
2580 12 : if (num_subs > 0 && num_subs != num_dbs)
2581 : {
2582 1 : pg_log_error("wrong number of subscription names specified");
2583 1 : pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2584 : num_subs, num_dbs);
2585 1 : exit(1);
2586 : }
2587 11 : if (num_replslots > 0 && num_replslots != num_dbs)
2588 : {
2589 1 : pg_log_error("wrong number of replication slot names specified");
2590 1 : pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2591 : num_replslots, num_dbs);
2592 1 : exit(1);
2593 : }
2594 :
2595 : /* Verify the object types specified for removal from the subscriber */
2596 11 : for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2597 : {
2598 1 : if (pg_strcasecmp(cell->val, "publications") == 0)
2599 1 : dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
2600 : else
2601 : {
2602 0 : pg_log_error("invalid object type \"%s\" specified for %s",
2603 : cell->val, "--clean");
2604 0 : pg_log_error_hint("The valid value is: \"%s\"", "publications");
2605 0 : exit(1);
2606 : }
2607 : }
2608 :
2609 : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2610 10 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2611 10 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2612 :
2613 : /* Rudimentary check for a data directory */
2614 10 : check_data_directory(subscriber_dir);
2615 :
2616 10 : dbinfos.two_phase = opt.two_phase;
2617 :
2618 : /*
2619 : * Store database information for publisher and subscriber. It should be
2620 : * called before atexit() because its return is used in the
2621 : * cleanup_objects_atexit().
2622 : */
2623 10 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2624 :
2625 : /* Register a function to clean up objects in case of failure */
2626 10 : atexit(cleanup_objects_atexit);
2627 :
2628 : /*
2629 : * Check if the subscriber data directory has the same system identifier
2630 : * than the publisher data directory.
2631 : */
2632 10 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
2633 10 : sub_sysid = get_standby_sysid(subscriber_dir);
2634 10 : if (pub_sysid != sub_sysid)
2635 1 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2636 :
2637 : /* Subscriber PID file */
2638 9 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2639 :
2640 : /*
2641 : * The standby server must not be running. If the server is started under
2642 : * service manager and pg_createsubscriber stops it, the service manager
2643 : * might react to this action and start the server again. Therefore,
2644 : * refuse to proceed if the server is running to avoid possible failures.
2645 : */
2646 9 : if (stat(pidfile, &statbuf) == 0)
2647 : {
2648 1 : pg_log_error("standby server is running");
2649 1 : pg_log_error_hint("Stop the standby server and try again.");
2650 1 : exit(1);
2651 : }
2652 :
2653 : /*
2654 : * Start a short-lived standby server with temporary parameters (provided
2655 : * by command-line options). The goal is to avoid connections during the
2656 : * transformation steps.
2657 : */
2658 8 : pg_log_info("starting the standby server with command-line options");
2659 8 : start_standby_server(&opt, true, false);
2660 :
2661 : /* Check if the standby server is ready for logical replication */
2662 8 : check_subscriber(dbinfos.dbinfo);
2663 :
2664 : /* Check if the primary server is ready for logical replication */
2665 6 : check_publisher(dbinfos.dbinfo);
2666 :
2667 : /*
2668 : * Stop the target server. The recovery process requires that the server
2669 : * reaches a consistent state before targeting the recovery stop point.
2670 : * Make sure a consistent state is reached (stop the target server
2671 : * guarantees it) *before* creating the replication slots in
2672 : * setup_publisher().
2673 : */
2674 4 : pg_log_info("stopping the subscriber");
2675 4 : stop_standby_server(subscriber_dir);
2676 :
2677 : /* Create the required objects for each database on publisher */
2678 4 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2679 :
2680 : /* Write the required recovery parameters */
2681 4 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2682 :
2683 : /*
2684 : * Start subscriber so the recovery parameters will take effect. Wait
2685 : * until accepting connections. We don't want to start logical replication
2686 : * during setup.
2687 : */
2688 4 : pg_log_info("starting the subscriber");
2689 4 : start_standby_server(&opt, true, true);
2690 :
2691 : /* Waiting the subscriber to be promoted */
2692 4 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2693 :
2694 : /*
2695 : * Create the subscription for each database on subscriber. It does not
2696 : * enable it immediately because it needs to adjust the replication start
2697 : * point to the LSN reported by setup_publisher(). It also cleans up
2698 : * publications created by this tool and replication to the standby.
2699 : */
2700 4 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2701 :
2702 : /* Remove primary_slot_name if it exists on primary */
2703 4 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2704 :
2705 : /* Remove failover replication slots if they exist on subscriber */
2706 4 : drop_failover_replication_slots(dbinfos.dbinfo);
2707 :
2708 : /* Stop the subscriber */
2709 4 : pg_log_info("stopping the subscriber");
2710 4 : stop_standby_server(subscriber_dir);
2711 :
2712 : /* Change system identifier from subscriber */
2713 4 : modify_subscriber_sysid(&opt);
2714 :
2715 4 : success = true;
2716 :
2717 4 : pg_log_info("Done!");
2718 :
2719 4 : return 0;
2720 : }
|