Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_createsubscriber.c
4 : * Create a new logical replica from a standby server
5 : *
6 : * Copyright (c) 2024-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres_fe.h"
15 :
16 : #include <sys/stat.h>
17 : #include <sys/time.h>
18 : #include <sys/wait.h>
19 : #include <time.h>
20 :
21 : #include "common/connect.h"
22 : #include "common/controldata_utils.h"
23 : #include "common/file_utils.h"
24 : #include "common/logging.h"
25 : #include "common/pg_prng.h"
26 : #include "common/restricted_token.h"
27 : #include "datatype/timestamp.h"
28 : #include "fe_utils/recovery_gen.h"
29 : #include "fe_utils/simple_list.h"
30 : #include "fe_utils/string_utils.h"
31 : #include "fe_utils/version.h"
32 : #include "getopt_long.h"
33 :
34 : #define DEFAULT_SUB_PORT "50432"
35 : #define OBJECTTYPE_PUBLICATIONS 0x0001
36 :
37 : /*
38 : * Configuration files for recovery parameters.
39 : *
40 : * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
41 : * the server through an include_if_exists in postgresql.auto.conf.
42 : *
43 : * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
44 : * so as the recovery parameters set by this tool never take effect on node
45 : * restart. The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
46 : * debugging.
47 : */
48 : #define PG_AUTOCONF_FILENAME "postgresql.auto.conf"
49 : #define INCLUDED_CONF_FILE "pg_createsubscriber.conf"
50 : #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
51 :
52 : /* Command-line options */
53 : struct CreateSubscriberOptions
54 : {
55 : char *config_file; /* configuration file */
56 : char *pub_conninfo_str; /* publisher connection string */
57 : char *socket_dir; /* directory for Unix-domain socket, if any */
58 : char *sub_port; /* subscriber port number */
59 : const char *sub_username; /* subscriber username */
60 : bool two_phase; /* enable-two-phase option */
61 : SimpleStringList database_names; /* list of database names */
62 : SimpleStringList pub_names; /* list of publication names */
63 : SimpleStringList sub_names; /* list of subscription names */
64 : SimpleStringList replslot_names; /* list of replication slot names */
65 : int recovery_timeout; /* stop recovery after this time */
66 : bool all_dbs; /* all option */
67 : SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
68 : };
69 :
70 : /* per-database publication/subscription info */
71 : struct LogicalRepInfo
72 : {
73 : char *dbname; /* database name */
74 : char *pubconninfo; /* publisher connection string */
75 : char *subconninfo; /* subscriber connection string */
76 : char *pubname; /* publication name */
77 : char *subname; /* subscription name */
78 : char *replslotname; /* replication slot name */
79 :
80 : bool made_replslot; /* replication slot was created */
81 : bool made_publication; /* publication was created */
82 : };
83 :
84 : /*
85 : * Information shared across all the databases (or publications and
86 : * subscriptions).
87 : */
88 : struct LogicalRepInfos
89 : {
90 : struct LogicalRepInfo *dbinfo;
91 : bool two_phase; /* enable-two-phase option */
92 : bits32 objecttypes_to_clean; /* flags indicating which object types
93 : * to clean up on subscriber */
94 : };
95 :
96 : static void cleanup_objects_atexit(void);
97 : static void usage(void);
98 : static char *get_base_conninfo(const char *conninfo, char **dbname);
99 : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
100 : static char *get_exec_path(const char *argv0, const char *progname);
101 : static void check_data_directory(const char *datadir);
102 : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
103 : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
104 : const char *pub_base_conninfo,
105 : const char *sub_base_conninfo);
106 : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
107 : static void disconnect_database(PGconn *conn, bool exit_on_error);
108 : static uint64 get_primary_sysid(const char *conninfo);
109 : static uint64 get_standby_sysid(const char *datadir);
110 : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
111 : static bool server_is_in_recovery(PGconn *conn);
112 : static char *generate_object_name(PGconn *conn);
113 : static void check_publisher(const struct LogicalRepInfo *dbinfo);
114 : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
115 : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
116 : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
117 : const char *consistent_lsn);
118 : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
119 : const char *lsn);
120 : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
121 : const char *slotname);
122 : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
123 : static char *create_logical_replication_slot(PGconn *conn,
124 : struct LogicalRepInfo *dbinfo);
125 : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
126 : const char *slot_name);
127 : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
128 : static void start_standby_server(const struct CreateSubscriberOptions *opt,
129 : bool restricted_access,
130 : bool restrict_logical_worker);
131 : static void stop_standby_server(const char *datadir);
132 : static void wait_for_end_recovery(const char *conninfo,
133 : const struct CreateSubscriberOptions *opt);
134 : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
135 : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
136 : static void drop_publication(PGconn *conn, const char *pubname,
137 : const char *dbname, bool *made_publication);
138 : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
139 : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
140 : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
141 : const char *lsn);
142 : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
143 : static void check_and_drop_existing_subscriptions(PGconn *conn,
144 : const struct LogicalRepInfo *dbinfo);
145 : static void drop_existing_subscription(PGconn *conn, const char *subname,
146 : const char *dbname);
147 : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
148 : bool dbnamespecified);
149 : static void report_createsub_log(enum pg_log_level, enum pg_log_part,
150 : const char *pg_restrict fmt,...)
151 : pg_attribute_printf(3, 4);
152 : pg_noreturn static void report_createsub_fatal(const char *pg_restrict fmt,...)
153 : pg_attribute_printf(1, 2);
154 :
155 : #define WAIT_INTERVAL 1 /* 1 second */
156 :
157 : static const char *progname;
158 :
159 : static char *primary_slot_name = NULL;
160 : static bool dry_run = false;
161 :
162 : static bool success = false;
163 :
164 : static struct LogicalRepInfos dbinfos;
165 : static int num_dbs = 0; /* number of specified databases */
166 : static int num_pubs = 0; /* number of specified publications */
167 : static int num_subs = 0; /* number of specified subscriptions */
168 : static int num_replslots = 0; /* number of specified replication slots */
169 :
170 : static pg_prng_state prng_state;
171 :
172 : static char *pg_ctl_path = NULL;
173 : static char *pg_resetwal_path = NULL;
174 :
175 : /* standby / subscriber data directory */
176 : static char *subscriber_dir = NULL;
177 :
178 : static bool recovery_ended = false;
179 : static bool standby_running = false;
180 : static bool recovery_params_set = false;
181 :
182 : /*
183 : * Report a message with a given log level
184 : */
185 : static void
186 482 : report_createsub_log(enum pg_log_level level, enum pg_log_part part,
187 : const char *pg_restrict fmt,...)
188 : {
189 : va_list args;
190 :
191 482 : va_start(args, fmt);
192 :
193 482 : pg_log_generic_v(level, part, fmt, args);
194 :
195 482 : va_end(args);
196 482 : }
197 :
198 : /*
199 : * Report a fatal error and exit
200 : */
201 : static void
202 3 : report_createsub_fatal(const char *pg_restrict fmt,...)
203 : {
204 : va_list args;
205 :
206 3 : va_start(args, fmt);
207 :
208 3 : pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, args);
209 :
210 3 : va_end(args);
211 :
212 3 : exit(1);
213 : }
214 :
215 : /*
216 : * Clean up objects created by pg_createsubscriber.
217 : *
218 : * Publications and replication slots are created on the primary. Depending
219 : * on the step where it failed, already-created objects should be removed if
220 : * possible (sometimes this won't work due to a connection issue).
221 : * There is no cleanup on the target server *after* its promotion, because any
222 : * failure at this point means recreating the physical replica and starting
223 : * again.
224 : *
225 : * The recovery configuration is always removed, by renaming the included
226 : * configuration file out of the way.
227 : */
228 : static void
229 10 : cleanup_objects_atexit(void)
230 : {
231 : /* Rename the included configuration file, if necessary. */
232 10 : if (recovery_params_set)
233 : {
234 : char conf_filename[MAXPGPATH];
235 : char conf_filename_disabled[MAXPGPATH];
236 :
237 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
238 : INCLUDED_CONF_FILE);
239 1 : snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
240 : INCLUDED_CONF_FILE_DISABLED);
241 :
242 1 : if (durable_rename(conf_filename, conf_filename_disabled) != 0)
243 : {
244 : /* durable_rename() has already logged something. */
245 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
246 : "A manual removal of the recovery parameters may be required.");
247 : }
248 : }
249 :
250 10 : if (success)
251 4 : return;
252 :
253 : /*
254 : * If the server is promoted, there is no way to use the current setup
255 : * again. Warn the user that a new replication setup should be done before
256 : * trying again.
257 : */
258 6 : if (recovery_ended)
259 : {
260 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
261 : "failed after the end of recovery");
262 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
263 : "The target server cannot be used as a physical replica anymore. "
264 : "You must recreate the physical replica before continuing.");
265 : }
266 :
267 18 : for (int i = 0; i < num_dbs; i++)
268 : {
269 12 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
270 :
271 12 : if (dbinfo->made_publication || dbinfo->made_replslot)
272 : {
273 : PGconn *conn;
274 :
275 0 : conn = connect_database(dbinfo->pubconninfo, false);
276 0 : if (conn != NULL)
277 : {
278 0 : if (dbinfo->made_publication)
279 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
280 : &dbinfo->made_publication);
281 0 : if (dbinfo->made_replslot)
282 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
283 0 : disconnect_database(conn, false);
284 : }
285 : else
286 : {
287 : /*
288 : * If a connection could not be established, inform the user
289 : * that some objects were left on primary and should be
290 : * removed before trying again.
291 : */
292 0 : if (dbinfo->made_publication)
293 : {
294 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
295 : "publication \"%s\" created in database \"%s\" on primary was left behind",
296 : dbinfo->pubname,
297 : dbinfo->dbname);
298 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
299 : "Drop this publication before trying again.");
300 : }
301 0 : if (dbinfo->made_replslot)
302 : {
303 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
304 : "replication slot \"%s\" created in database \"%s\" on primary was left behind",
305 : dbinfo->replslotname,
306 : dbinfo->dbname);
307 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
308 : "Drop this replication slot soon to avoid retention of WAL files.");
309 : }
310 : }
311 : }
312 : }
313 :
314 6 : if (standby_running)
315 4 : stop_standby_server(subscriber_dir);
316 : }
317 :
318 : static void
319 1 : usage(void)
320 : {
321 1 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
322 : progname);
323 1 : printf(_("Usage:\n"));
324 1 : printf(_(" %s [OPTION]...\n"), progname);
325 1 : printf(_("\nOptions:\n"));
326 1 : printf(_(" -a, --all create subscriptions for all databases except template\n"
327 : " databases and databases that don't allow connections\n"));
328 1 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
329 1 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
330 1 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
331 1 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
332 1 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
333 1 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
334 1 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
335 1 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
336 1 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
337 1 : printf(_(" -v, --verbose output verbose messages\n"));
338 1 : printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
339 : " databases on the subscriber; accepts: \"%s\"\n"), "publications");
340 1 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
341 : " file when running target cluster\n"));
342 1 : printf(_(" --publication=NAME publication name\n"));
343 1 : printf(_(" --replication-slot=NAME replication slot name\n"));
344 1 : printf(_(" --subscription=NAME subscription name\n"));
345 1 : printf(_(" -V, --version output version information, then exit\n"));
346 1 : printf(_(" -?, --help show this help, then exit\n"));
347 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
348 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
349 1 : }
350 :
351 : /*
352 : * Subroutine to append "keyword=value" to a connection string,
353 : * with proper quoting of the value. (We assume keywords don't need that.)
354 : */
355 : static void
356 107 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
357 : {
358 107 : if (buf->len > 0)
359 79 : appendPQExpBufferChar(buf, ' ');
360 107 : appendPQExpBufferStr(buf, keyword);
361 107 : appendPQExpBufferChar(buf, '=');
362 107 : appendConnStrVal(buf, val);
363 107 : }
364 :
365 : /*
366 : * Validate a connection string. Returns a base connection string that is a
367 : * connection string without a database name.
368 : *
369 : * Since we might process multiple databases, each database name will be
370 : * appended to this base connection string to provide a final connection
371 : * string. If the second argument (dbname) is not null, returns dbname if the
372 : * provided connection string contains it.
373 : *
374 : * It is the caller's responsibility to free the returned connection string and
375 : * dbname.
376 : */
377 : static char *
378 14 : get_base_conninfo(const char *conninfo, char **dbname)
379 : {
380 : PQExpBuffer buf;
381 : PQconninfoOption *conn_opts;
382 : PQconninfoOption *conn_opt;
383 14 : char *errmsg = NULL;
384 : char *ret;
385 :
386 14 : conn_opts = PQconninfoParse(conninfo, &errmsg);
387 14 : if (conn_opts == NULL)
388 : {
389 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
390 : "could not parse connection string: %s", errmsg);
391 0 : PQfreemem(errmsg);
392 0 : return NULL;
393 : }
394 :
395 14 : buf = createPQExpBuffer();
396 728 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
397 : {
398 714 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
399 : {
400 33 : if (strcmp(conn_opt->keyword, "dbname") == 0)
401 : {
402 9 : if (dbname)
403 9 : *dbname = pg_strdup(conn_opt->val);
404 9 : continue;
405 : }
406 24 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
407 : }
408 : }
409 :
410 14 : ret = pg_strdup(buf->data);
411 :
412 14 : destroyPQExpBuffer(buf);
413 14 : PQconninfoFree(conn_opts);
414 :
415 14 : return ret;
416 : }
417 :
418 : /*
419 : * Build a subscriber connection string. Only a few parameters are supported
420 : * since it starts a server with restricted access.
421 : */
422 : static char *
423 14 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
424 : {
425 14 : PQExpBuffer buf = createPQExpBuffer();
426 : char *ret;
427 :
428 14 : appendConnStrItem(buf, "port", opt->sub_port);
429 : #if !defined(WIN32)
430 14 : appendConnStrItem(buf, "host", opt->socket_dir);
431 : #endif
432 14 : if (opt->sub_username != NULL)
433 0 : appendConnStrItem(buf, "user", opt->sub_username);
434 14 : appendConnStrItem(buf, "fallback_application_name", progname);
435 :
436 14 : ret = pg_strdup(buf->data);
437 :
438 14 : destroyPQExpBuffer(buf);
439 :
440 14 : return ret;
441 : }
442 :
443 : /*
444 : * Verify if a PostgreSQL binary (progname) is available in the same directory as
445 : * pg_createsubscriber and it has the same version. It returns the absolute
446 : * path of the progname.
447 : */
448 : static char *
449 20 : get_exec_path(const char *argv0, const char *progname)
450 : {
451 : char *versionstr;
452 : char *exec_path;
453 : int ret;
454 :
455 20 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
456 20 : exec_path = pg_malloc(MAXPGPATH);
457 20 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
458 :
459 20 : if (ret < 0)
460 : {
461 : char full_path[MAXPGPATH];
462 :
463 0 : if (find_my_exec(argv0, full_path) < 0)
464 0 : strlcpy(full_path, progname, sizeof(full_path));
465 :
466 0 : if (ret == -1)
467 0 : report_createsub_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
468 : progname, "pg_createsubscriber", full_path);
469 : else
470 0 : report_createsub_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
471 : progname, full_path, "pg_createsubscriber");
472 : }
473 :
474 20 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
475 : "%s path is: %s", progname, exec_path);
476 :
477 20 : return exec_path;
478 : }
479 :
480 : /*
481 : * Is it a cluster directory? These are preliminary checks. It is far from
482 : * making an accurate check. If it is not a clone from the publisher, it will
483 : * eventually fail in a future step.
484 : */
485 : static void
486 10 : check_data_directory(const char *datadir)
487 : {
488 : struct stat statbuf;
489 : uint32 major_version;
490 : char *version_str;
491 :
492 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
493 : "checking if directory \"%s\" is a cluster data directory",
494 : datadir);
495 :
496 10 : if (stat(datadir, &statbuf) != 0)
497 : {
498 0 : if (errno == ENOENT)
499 0 : report_createsub_fatal("data directory \"%s\" does not exist", datadir);
500 : else
501 0 : report_createsub_fatal("could not access directory \"%s\": %m", datadir);
502 : }
503 :
504 : /*
505 : * Retrieve the contents of this cluster's PG_VERSION. We require
506 : * compatibility with the same major version as the one this tool is
507 : * compiled with.
508 : */
509 10 : major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
510 10 : if (major_version != PG_MAJORVERSION_NUM)
511 : {
512 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
513 : "data directory is of wrong version");
514 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
515 : "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
516 : "PG_VERSION", version_str, PG_MAJORVERSION);
517 0 : exit(1);
518 : }
519 10 : }
520 :
521 : /*
522 : * Append database name into a base connection string.
523 : *
524 : * dbname is the only parameter that changes so it is not included in the base
525 : * connection string. This function concatenates dbname to build a "real"
526 : * connection string.
527 : */
528 : static char *
529 41 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
530 : {
531 41 : PQExpBuffer buf = createPQExpBuffer();
532 : char *ret;
533 :
534 : Assert(conninfo != NULL);
535 :
536 41 : appendPQExpBufferStr(buf, conninfo);
537 41 : appendConnStrItem(buf, "dbname", dbname);
538 :
539 41 : ret = pg_strdup(buf->data);
540 41 : destroyPQExpBuffer(buf);
541 :
542 41 : return ret;
543 : }
544 :
545 : /*
546 : * Store publication and subscription information.
547 : *
548 : * If publication, replication slot and subscription names were specified,
549 : * store it here. Otherwise, a generated name will be assigned to the object in
550 : * setup_publisher().
551 : */
552 : static struct LogicalRepInfo *
553 10 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
554 : const char *pub_base_conninfo,
555 : const char *sub_base_conninfo)
556 : {
557 : struct LogicalRepInfo *dbinfo;
558 10 : SimpleStringListCell *pubcell = NULL;
559 10 : SimpleStringListCell *subcell = NULL;
560 10 : SimpleStringListCell *replslotcell = NULL;
561 10 : int i = 0;
562 :
563 10 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
564 :
565 10 : if (num_pubs > 0)
566 2 : pubcell = opt->pub_names.head;
567 10 : if (num_subs > 0)
568 1 : subcell = opt->sub_names.head;
569 10 : if (num_replslots > 0)
570 2 : replslotcell = opt->replslot_names.head;
571 :
572 30 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
573 : {
574 : char *conninfo;
575 :
576 : /* Fill publisher attributes */
577 20 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
578 20 : dbinfo[i].pubconninfo = conninfo;
579 20 : dbinfo[i].dbname = cell->val;
580 20 : if (num_pubs > 0)
581 4 : dbinfo[i].pubname = pubcell->val;
582 : else
583 16 : dbinfo[i].pubname = NULL;
584 20 : if (num_replslots > 0)
585 3 : dbinfo[i].replslotname = replslotcell->val;
586 : else
587 17 : dbinfo[i].replslotname = NULL;
588 20 : dbinfo[i].made_replslot = false;
589 20 : dbinfo[i].made_publication = false;
590 : /* Fill subscriber attributes */
591 20 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
592 20 : dbinfo[i].subconninfo = conninfo;
593 20 : if (num_subs > 0)
594 2 : dbinfo[i].subname = subcell->val;
595 : else
596 18 : dbinfo[i].subname = NULL;
597 : /* Other fields will be filled later */
598 :
599 37 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
600 : "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
601 20 : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
602 3 : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
603 20 : dbinfo[i].pubconninfo);
604 40 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
605 : "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
606 2 : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
607 20 : dbinfo[i].subconninfo,
608 20 : dbinfos.two_phase ? "true" : "false");
609 :
610 20 : if (num_pubs > 0)
611 4 : pubcell = pubcell->next;
612 20 : if (num_subs > 0)
613 2 : subcell = subcell->next;
614 20 : if (num_replslots > 0)
615 3 : replslotcell = replslotcell->next;
616 :
617 20 : i++;
618 : }
619 :
620 10 : return dbinfo;
621 : }
622 :
623 : /*
624 : * Open a new connection. If exit_on_error is true, it has an undesired
625 : * condition and it should exit immediately.
626 : */
627 : static PGconn *
628 57 : connect_database(const char *conninfo, bool exit_on_error)
629 : {
630 : PGconn *conn;
631 : PGresult *res;
632 :
633 57 : conn = PQconnectdb(conninfo);
634 57 : if (PQstatus(conn) != CONNECTION_OK)
635 : {
636 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
637 : "connection to database failed: %s",
638 : PQerrorMessage(conn));
639 0 : PQfinish(conn);
640 :
641 0 : if (exit_on_error)
642 0 : exit(1);
643 0 : return NULL;
644 : }
645 :
646 : /* Secure search_path */
647 57 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
648 57 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
649 : {
650 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
651 : "could not clear \"search_path\": %s",
652 : PQresultErrorMessage(res));
653 0 : PQclear(res);
654 0 : PQfinish(conn);
655 :
656 0 : if (exit_on_error)
657 0 : exit(1);
658 0 : return NULL;
659 : }
660 57 : PQclear(res);
661 :
662 57 : return conn;
663 : }
664 :
665 : /*
666 : * Close the connection. If exit_on_error is true, it has an undesired
667 : * condition and it should exit immediately.
668 : */
669 : static void
670 57 : disconnect_database(PGconn *conn, bool exit_on_error)
671 : {
672 : Assert(conn != NULL);
673 :
674 57 : PQfinish(conn);
675 :
676 57 : if (exit_on_error)
677 2 : exit(1);
678 55 : }
679 :
680 : /*
681 : * Obtain the system identifier using the provided connection. It will be used
682 : * to compare if a data directory is a clone of another one.
683 : */
684 : static uint64
685 10 : get_primary_sysid(const char *conninfo)
686 : {
687 : PGconn *conn;
688 : PGresult *res;
689 : uint64 sysid;
690 :
691 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
692 : "getting system identifier from publisher");
693 :
694 10 : conn = connect_database(conninfo, true);
695 :
696 10 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
697 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
698 : {
699 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
700 : "could not get system identifier: %s",
701 : PQresultErrorMessage(res));
702 0 : disconnect_database(conn, true);
703 : }
704 10 : if (PQntuples(res) != 1)
705 : {
706 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
707 : "could not get system identifier: got %d rows, expected %d row",
708 : PQntuples(res), 1);
709 0 : disconnect_database(conn, true);
710 : }
711 :
712 10 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
713 :
714 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
715 : "system identifier is %" PRIu64 " on publisher", sysid);
716 :
717 10 : PQclear(res);
718 10 : disconnect_database(conn, false);
719 :
720 10 : return sysid;
721 : }
722 :
723 : /*
724 : * Obtain the system identifier from control file. It will be used to compare
725 : * if a data directory is a clone of another one. This routine is used locally
726 : * and avoids a connection.
727 : */
728 : static uint64
729 10 : get_standby_sysid(const char *datadir)
730 : {
731 : ControlFileData *cf;
732 : bool crc_ok;
733 : uint64 sysid;
734 :
735 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
736 : "getting system identifier from subscriber");
737 :
738 10 : cf = get_controlfile(datadir, &crc_ok);
739 10 : if (!crc_ok)
740 0 : report_createsub_fatal("control file appears to be corrupt");
741 :
742 10 : sysid = cf->system_identifier;
743 :
744 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
745 : "system identifier is %" PRIu64 " on subscriber", sysid);
746 :
747 10 : pg_free(cf);
748 :
749 10 : return sysid;
750 : }
751 :
752 : /*
753 : * Modify the system identifier. Since a standby server preserves the system
754 : * identifier, it makes sense to change it to avoid situations in which WAL
755 : * files from one of the systems might be used in the other one.
756 : */
757 : static void
758 4 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
759 : {
760 : ControlFileData *cf;
761 : bool crc_ok;
762 : struct timeval tv;
763 :
764 : char *cmd_str;
765 :
766 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
767 : "modifying system identifier of subscriber");
768 :
769 4 : cf = get_controlfile(subscriber_dir, &crc_ok);
770 4 : if (!crc_ok)
771 0 : report_createsub_fatal("control file appears to be corrupt");
772 :
773 : /*
774 : * Select a new system identifier.
775 : *
776 : * XXX this code was extracted from BootStrapXLOG().
777 : */
778 4 : gettimeofday(&tv, NULL);
779 4 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
780 4 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
781 4 : cf->system_identifier |= getpid() & 0xFFF;
782 :
783 4 : if (dry_run)
784 3 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
785 : "dry-run: would set system identifier to %" PRIu64 " on subscriber",
786 : cf->system_identifier);
787 : else
788 : {
789 1 : update_controlfile(subscriber_dir, cf, true);
790 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
791 : "system identifier is %" PRIu64 " on subscriber",
792 : cf->system_identifier);
793 : }
794 :
795 4 : if (dry_run)
796 3 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
797 : "dry-run: would run pg_resetwal on the subscriber");
798 : else
799 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
800 : "running pg_resetwal on the subscriber");
801 :
802 4 : cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
803 : subscriber_dir, DEVNULL);
804 :
805 4 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
806 : "pg_resetwal command is: %s", cmd_str);
807 :
808 4 : if (!dry_run)
809 : {
810 1 : int rc = system(cmd_str);
811 :
812 1 : if (rc == 0)
813 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
814 : "successfully reset WAL on the subscriber");
815 : else
816 0 : report_createsub_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
817 : }
818 :
819 4 : pg_free(cf);
820 4 : }
821 :
822 : /*
823 : * Generate an object name using a prefix, database oid and a random integer.
824 : * It is used in case the user does not specify an object name (publication,
825 : * subscription, replication slot).
826 : */
827 : static char *
828 8 : generate_object_name(PGconn *conn)
829 : {
830 : PGresult *res;
831 : Oid oid;
832 : uint32 rand;
833 : char *objname;
834 :
835 8 : res = PQexec(conn,
836 : "SELECT oid FROM pg_catalog.pg_database "
837 : "WHERE datname = pg_catalog.current_database()");
838 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
839 : {
840 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
841 : "could not obtain database OID: %s",
842 : PQresultErrorMessage(res));
843 0 : disconnect_database(conn, true);
844 : }
845 :
846 8 : if (PQntuples(res) != 1)
847 : {
848 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
849 : "could not obtain database OID: got %d rows, expected %d row",
850 : PQntuples(res), 1);
851 0 : disconnect_database(conn, true);
852 : }
853 :
854 : /* Database OID */
855 8 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
856 :
857 8 : PQclear(res);
858 :
859 : /* Random unsigned integer */
860 8 : rand = pg_prng_uint32(&prng_state);
861 :
862 : /*
863 : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
864 : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
865 : * '\0').
866 : */
867 8 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
868 :
869 8 : return objname;
870 : }
871 :
872 : /*
873 : * Does the publication exist in the specified database?
874 : */
875 : static bool
876 8 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
877 : {
878 8 : PQExpBuffer str = createPQExpBuffer();
879 : PGresult *res;
880 8 : bool found = false;
881 8 : char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
882 :
883 8 : appendPQExpBuffer(str,
884 : "SELECT 1 FROM pg_catalog.pg_publication "
885 : "WHERE pubname = %s",
886 : pubname_esc);
887 8 : res = PQexec(conn, str->data);
888 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
889 : {
890 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
891 : "could not find publication \"%s\" in database \"%s\": %s",
892 : pubname, dbname, PQerrorMessage(conn));
893 0 : disconnect_database(conn, true);
894 : }
895 :
896 8 : if (PQntuples(res) == 1)
897 1 : found = true;
898 :
899 8 : PQclear(res);
900 8 : PQfreemem(pubname_esc);
901 8 : destroyPQExpBuffer(str);
902 :
903 8 : return found;
904 : }
905 :
906 : /*
907 : * Create the publications and replication slots in preparation for logical
908 : * replication. Returns the LSN from latest replication slot. It will be the
909 : * replication start point that is used to adjust the subscriptions (see
910 : * set_replication_progress).
911 : */
912 : static char *
913 4 : setup_publisher(struct LogicalRepInfo *dbinfo)
914 : {
915 4 : char *lsn = NULL;
916 :
917 4 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
918 :
919 12 : for (int i = 0; i < num_dbs; i++)
920 : {
921 : PGconn *conn;
922 8 : char *genname = NULL;
923 :
924 8 : conn = connect_database(dbinfo[i].pubconninfo, true);
925 :
926 : /*
927 : * If an object name was not specified as command-line options, assign
928 : * a generated object name. The replication slot has a different rule.
929 : * The subscription name is assigned to the replication slot name if
930 : * no replication slot is specified. It follows the same rule as
931 : * CREATE SUBSCRIPTION.
932 : */
933 8 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
934 8 : genname = generate_object_name(conn);
935 8 : if (num_pubs == 0)
936 4 : dbinfo[i].pubname = pg_strdup(genname);
937 8 : if (num_subs == 0)
938 6 : dbinfo[i].subname = pg_strdup(genname);
939 8 : if (num_replslots == 0)
940 5 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
941 :
942 8 : if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
943 : {
944 : /* Reuse existing publication on publisher. */
945 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
946 : "use existing publication \"%s\" in database \"%s\"",
947 1 : dbinfo[i].pubname, dbinfo[i].dbname);
948 : /* Don't remove pre-existing publication if an error occurs. */
949 1 : dbinfo[i].made_publication = false;
950 : }
951 : else
952 : {
953 : /*
954 : * Create publication on publisher. This step should be executed
955 : * *before* promoting the subscriber to avoid any transactions
956 : * between consistent LSN and the new publication rows (such
957 : * transactions wouldn't see the new publication rows resulting in
958 : * an error).
959 : */
960 7 : create_publication(conn, &dbinfo[i]);
961 : }
962 :
963 : /* Create replication slot on publisher */
964 8 : if (lsn)
965 1 : pg_free(lsn);
966 8 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
967 8 : if (lsn == NULL && !dry_run)
968 0 : exit(1);
969 :
970 : /*
971 : * Since we are using the LSN returned by the last replication slot as
972 : * recovery_target_lsn, this LSN is ahead of the current WAL position
973 : * and the recovery waits until the publisher writes a WAL record to
974 : * reach the target and ends the recovery. On idle systems, this wait
975 : * time is unpredictable and could lead to failure in promoting the
976 : * subscriber. To avoid that, insert a harmless WAL record.
977 : */
978 8 : if (i == num_dbs - 1 && !dry_run)
979 : {
980 : PGresult *res;
981 :
982 1 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
983 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
984 : {
985 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
986 : "could not write an additional WAL record: %s",
987 : PQresultErrorMessage(res));
988 0 : disconnect_database(conn, true);
989 : }
990 1 : PQclear(res);
991 : }
992 :
993 8 : disconnect_database(conn, false);
994 : }
995 :
996 4 : return lsn;
997 : }
998 :
999 : /*
1000 : * Is recovery still in progress?
1001 : */
1002 : static bool
1003 16 : server_is_in_recovery(PGconn *conn)
1004 : {
1005 : PGresult *res;
1006 : int ret;
1007 :
1008 16 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
1009 :
1010 16 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1011 : {
1012 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1013 : "could not obtain recovery progress: %s",
1014 : PQresultErrorMessage(res));
1015 0 : disconnect_database(conn, true);
1016 : }
1017 :
1018 :
1019 16 : ret = strcmp("t", PQgetvalue(res, 0, 0));
1020 :
1021 16 : PQclear(res);
1022 :
1023 16 : return ret == 0;
1024 : }
1025 :
1026 : /*
1027 : * Is the primary server ready for logical replication?
1028 : *
1029 : * XXX Does it not allow a synchronous replica?
1030 : */
1031 : static void
1032 6 : check_publisher(const struct LogicalRepInfo *dbinfo)
1033 : {
1034 : PGconn *conn;
1035 : PGresult *res;
1036 6 : bool failed = false;
1037 :
1038 : char *wal_level;
1039 : int max_repslots;
1040 : int cur_repslots;
1041 : int max_walsenders;
1042 : int cur_walsenders;
1043 : int max_prepared_transactions;
1044 : char *max_slot_wal_keep_size;
1045 :
1046 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1047 : "checking settings on publisher");
1048 :
1049 6 : conn = connect_database(dbinfo[0].pubconninfo, true);
1050 :
1051 : /*
1052 : * If the primary server is in recovery (i.e. cascading replication),
1053 : * objects (publication) cannot be created because it is read only.
1054 : */
1055 6 : if (server_is_in_recovery(conn))
1056 : {
1057 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1058 : "primary server cannot be in recovery");
1059 1 : disconnect_database(conn, true);
1060 : }
1061 :
1062 : /*------------------------------------------------------------------------
1063 : * Logical replication requires a few parameters to be set on publisher.
1064 : * Since these parameters are not a requirement for physical replication,
1065 : * we should check it to make sure it won't fail.
1066 : *
1067 : * - wal_level >= replica
1068 : * - max_replication_slots >= current + number of dbs to be converted
1069 : * - max_wal_senders >= current + number of dbs to be converted
1070 : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
1071 : * -----------------------------------------------------------------------
1072 : */
1073 5 : res = PQexec(conn,
1074 : "SELECT pg_catalog.current_setting('wal_level'),"
1075 : " pg_catalog.current_setting('max_replication_slots'),"
1076 : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1077 : " pg_catalog.current_setting('max_wal_senders'),"
1078 : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1079 : " pg_catalog.current_setting('max_prepared_transactions'),"
1080 : " pg_catalog.current_setting('max_slot_wal_keep_size')");
1081 :
1082 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1083 : {
1084 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1085 : "could not obtain publisher settings: %s",
1086 : PQresultErrorMessage(res));
1087 0 : disconnect_database(conn, true);
1088 : }
1089 :
1090 5 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1091 5 : max_repslots = atoi(PQgetvalue(res, 0, 1));
1092 5 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
1093 5 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
1094 5 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
1095 5 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
1096 5 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
1097 :
1098 5 : PQclear(res);
1099 :
1100 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1101 : "publisher: wal_level: %s", wal_level);
1102 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1103 : "publisher: max_replication_slots: %d", max_repslots);
1104 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1105 : "publisher: current replication slots: %d", cur_repslots);
1106 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1107 : "publisher: max_wal_senders: %d", max_walsenders);
1108 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1109 : "publisher: current wal senders: %d", cur_walsenders);
1110 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1111 : "publisher: max_prepared_transactions: %d",
1112 : max_prepared_transactions);
1113 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1114 : "publisher: max_slot_wal_keep_size: %s",
1115 : max_slot_wal_keep_size);
1116 :
1117 5 : disconnect_database(conn, false);
1118 :
1119 5 : if (strcmp(wal_level, "minimal") == 0)
1120 : {
1121 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1122 : "publisher requires \"wal_level\" >= \"replica\"");
1123 0 : failed = true;
1124 : }
1125 :
1126 5 : if (max_repslots - cur_repslots < num_dbs)
1127 : {
1128 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1129 : "publisher requires %d replication slots, but only %d remain",
1130 : num_dbs, max_repslots - cur_repslots);
1131 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1132 : "Increase the configuration parameter \"%s\" to at least %d.",
1133 : "max_replication_slots", cur_repslots + num_dbs);
1134 1 : failed = true;
1135 : }
1136 :
1137 5 : if (max_walsenders - cur_walsenders < num_dbs)
1138 : {
1139 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1140 : "publisher requires %d WAL sender processes, but only %d remain",
1141 : num_dbs, max_walsenders - cur_walsenders);
1142 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1143 : "Increase the configuration parameter \"%s\" to at least %d.",
1144 : "max_wal_senders", cur_walsenders + num_dbs);
1145 1 : failed = true;
1146 : }
1147 :
1148 5 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
1149 : {
1150 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1151 : "two_phase option will not be enabled for replication slots");
1152 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_DETAIL,
1153 : "Subscriptions will be created with the two_phase option disabled. "
1154 : "Prepared transactions will be replicated at COMMIT PREPARED.");
1155 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1156 : "You can use the command-line option --enable-two-phase to enable two_phase.");
1157 : }
1158 :
1159 : /*
1160 : * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1161 : * is set to a non-default value, it may cause replication failures due to
1162 : * required WAL files being prematurely removed.
1163 : */
1164 5 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1165 : {
1166 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1167 : "required WAL could be removed from the publisher");
1168 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1169 : "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1170 : "max_slot_wal_keep_size");
1171 : }
1172 :
1173 5 : pg_free(wal_level);
1174 :
1175 5 : if (failed)
1176 1 : exit(1);
1177 4 : }
1178 :
1179 : /*
1180 : * Is the standby server ready for logical replication?
1181 : *
1182 : * XXX Does it not allow a time-delayed replica?
1183 : *
1184 : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1185 : * is S, it cannot detect there is a replica (server C) because server S starts
1186 : * accepting only local connections and server C cannot connect to it. Hence,
1187 : * there is not a reliable way to provide a suitable error saying the server C
1188 : * will be broken at the end of this process (due to pg_resetwal).
1189 : */
1190 : static void
1191 8 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1192 : {
1193 : PGconn *conn;
1194 : PGresult *res;
1195 8 : bool failed = false;
1196 :
1197 : int max_lrworkers;
1198 : int max_replorigins;
1199 : int max_wprocs;
1200 :
1201 8 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1202 : "checking settings on subscriber");
1203 :
1204 8 : conn = connect_database(dbinfo[0].subconninfo, true);
1205 :
1206 : /* The target server must be a standby */
1207 8 : if (!server_is_in_recovery(conn))
1208 : {
1209 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1210 : "target server must be a standby");
1211 1 : disconnect_database(conn, true);
1212 : }
1213 :
1214 : /*------------------------------------------------------------------------
1215 : * Logical replication requires a few parameters to be set on subscriber.
1216 : * Since these parameters are not a requirement for physical replication,
1217 : * we should check it to make sure it won't fail.
1218 : *
1219 : * - max_active_replication_origins >= number of dbs to be converted
1220 : * - max_logical_replication_workers >= number of dbs to be converted
1221 : * - max_worker_processes >= 1 + number of dbs to be converted
1222 : *------------------------------------------------------------------------
1223 : */
1224 7 : res = PQexec(conn,
1225 : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1226 : "'max_logical_replication_workers', "
1227 : "'max_active_replication_origins', "
1228 : "'max_worker_processes', "
1229 : "'primary_slot_name') "
1230 : "ORDER BY name");
1231 :
1232 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1233 : {
1234 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1235 : "could not obtain subscriber settings: %s",
1236 : PQresultErrorMessage(res));
1237 0 : disconnect_database(conn, true);
1238 : }
1239 :
1240 7 : max_replorigins = atoi(PQgetvalue(res, 0, 0));
1241 7 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1242 7 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1243 7 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1244 6 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1245 :
1246 7 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1247 : "subscriber: max_logical_replication_workers: %d",
1248 : max_lrworkers);
1249 7 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1250 : "subscriber: max_active_replication_origins: %d", max_replorigins);
1251 7 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1252 : "subscriber: max_worker_processes: %d", max_wprocs);
1253 7 : if (primary_slot_name)
1254 6 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1255 : "subscriber: primary_slot_name: %s", primary_slot_name);
1256 :
1257 7 : PQclear(res);
1258 :
1259 7 : disconnect_database(conn, false);
1260 :
1261 7 : if (max_replorigins < num_dbs)
1262 : {
1263 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1264 : "subscriber requires %d active replication origins, but only %d remain",
1265 : num_dbs, max_replorigins);
1266 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1267 : "Increase the configuration parameter \"%s\" to at least %d.",
1268 : "max_active_replication_origins", num_dbs);
1269 1 : failed = true;
1270 : }
1271 :
1272 7 : if (max_lrworkers < num_dbs)
1273 : {
1274 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1275 : "subscriber requires %d logical replication workers, but only %d remain",
1276 : num_dbs, max_lrworkers);
1277 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1278 : "Increase the configuration parameter \"%s\" to at least %d.",
1279 : "max_logical_replication_workers", num_dbs);
1280 1 : failed = true;
1281 : }
1282 :
1283 7 : if (max_wprocs < num_dbs + 1)
1284 : {
1285 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1286 : "subscriber requires %d worker processes, but only %d remain",
1287 : num_dbs + 1, max_wprocs);
1288 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1289 : "Increase the configuration parameter \"%s\" to at least %d.",
1290 : "max_worker_processes", num_dbs + 1);
1291 1 : failed = true;
1292 : }
1293 :
1294 7 : if (failed)
1295 1 : exit(1);
1296 6 : }
1297 :
1298 : /*
1299 : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1300 : * the primary (publisher node) and the newly created subscriber. We
1301 : * shouldn't drop the associated slot as that would be used by the publisher
1302 : * node.
1303 : */
1304 : static void
1305 4 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
1306 : {
1307 4 : PQExpBuffer query = createPQExpBuffer();
1308 : PGresult *res;
1309 :
1310 : Assert(conn != NULL);
1311 :
1312 : /*
1313 : * Construct a query string. These commands are allowed to be executed
1314 : * within a transaction.
1315 : */
1316 4 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1317 : subname);
1318 4 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1319 : subname);
1320 4 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1321 :
1322 4 : if (dry_run)
1323 3 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1324 : "dry-run: would drop subscription \"%s\" in database \"%s\"",
1325 : subname, dbname);
1326 : else
1327 : {
1328 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1329 : "dropping subscription \"%s\" in database \"%s\"",
1330 : subname, dbname);
1331 :
1332 1 : res = PQexec(conn, query->data);
1333 :
1334 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1335 : {
1336 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1337 : "could not drop subscription \"%s\": %s",
1338 : subname, PQresultErrorMessage(res));
1339 0 : disconnect_database(conn, true);
1340 : }
1341 :
1342 1 : PQclear(res);
1343 : }
1344 :
1345 4 : destroyPQExpBuffer(query);
1346 4 : }
1347 :
1348 : /*
1349 : * Retrieve and drop the pre-existing subscriptions.
1350 : */
1351 : static void
1352 8 : check_and_drop_existing_subscriptions(PGconn *conn,
1353 : const struct LogicalRepInfo *dbinfo)
1354 : {
1355 8 : PQExpBuffer query = createPQExpBuffer();
1356 : char *dbname;
1357 : PGresult *res;
1358 :
1359 : Assert(conn != NULL);
1360 :
1361 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1362 :
1363 8 : appendPQExpBuffer(query,
1364 : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1365 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1366 : "WHERE d.datname = %s",
1367 : dbname);
1368 8 : res = PQexec(conn, query->data);
1369 :
1370 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1371 : {
1372 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1373 : "could not obtain pre-existing subscriptions: %s",
1374 : PQresultErrorMessage(res));
1375 0 : disconnect_database(conn, true);
1376 : }
1377 :
1378 12 : for (int i = 0; i < PQntuples(res); i++)
1379 4 : drop_existing_subscription(conn, PQgetvalue(res, i, 0),
1380 4 : dbinfo->dbname);
1381 :
1382 8 : PQclear(res);
1383 8 : destroyPQExpBuffer(query);
1384 8 : PQfreemem(dbname);
1385 8 : }
1386 :
1387 : /*
1388 : * Create the subscriptions, adjust the initial location for logical
1389 : * replication and enable the subscriptions. That's the last step for logical
1390 : * replication setup.
1391 : */
1392 : static void
1393 4 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1394 : {
1395 12 : for (int i = 0; i < num_dbs; i++)
1396 : {
1397 : PGconn *conn;
1398 :
1399 : /* Connect to subscriber. */
1400 8 : conn = connect_database(dbinfo[i].subconninfo, true);
1401 :
1402 : /*
1403 : * We don't need the pre-existing subscriptions on the newly formed
1404 : * subscriber. They can connect to other publisher nodes and either
1405 : * get some unwarranted data or can lead to ERRORs in connecting to
1406 : * such nodes.
1407 : */
1408 8 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1409 :
1410 : /* Check and drop the required publications in the given database. */
1411 8 : check_and_drop_publications(conn, &dbinfo[i]);
1412 :
1413 8 : create_subscription(conn, &dbinfo[i]);
1414 :
1415 : /* Set the replication progress to the correct LSN */
1416 8 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1417 :
1418 : /* Enable subscription */
1419 8 : enable_subscription(conn, &dbinfo[i]);
1420 :
1421 8 : disconnect_database(conn, false);
1422 : }
1423 4 : }
1424 :
1425 : /*
1426 : * Write the required recovery parameters.
1427 : */
1428 : static void
1429 4 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1430 : {
1431 : PGconn *conn;
1432 : PQExpBuffer recoveryconfcontents;
1433 :
1434 : /*
1435 : * Despite of the recovery parameters will be written to the subscriber,
1436 : * use a publisher connection. The primary_conninfo is generated using the
1437 : * connection settings.
1438 : */
1439 4 : conn = connect_database(dbinfo[0].pubconninfo, true);
1440 :
1441 : /*
1442 : * Write recovery parameters.
1443 : *
1444 : * The subscriber is not running yet. In dry run mode, the recovery
1445 : * parameters *won't* be written. An invalid LSN is used for printing
1446 : * purposes. Additional recovery parameters are added here. It avoids
1447 : * unexpected behavior such as end of recovery as soon as a consistent
1448 : * state is reached (recovery_target) and failure due to multiple recovery
1449 : * targets (name, time, xid, LSN).
1450 : */
1451 4 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1452 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1453 4 : appendPQExpBufferStr(recoveryconfcontents,
1454 : "recovery_target_timeline = 'latest'\n");
1455 :
1456 : /*
1457 : * Set recovery_target_inclusive = false to avoid reapplying the
1458 : * transaction committed at 'lsn' after subscription is enabled. This is
1459 : * because the provided 'lsn' is also used as the replication start point
1460 : * for the subscription. So, the server can send the transaction committed
1461 : * at that 'lsn' after replication is started which can lead to applying
1462 : * the same transaction twice if we keep recovery_target_inclusive = true.
1463 : */
1464 4 : appendPQExpBufferStr(recoveryconfcontents,
1465 : "recovery_target_inclusive = false\n");
1466 4 : appendPQExpBufferStr(recoveryconfcontents,
1467 : "recovery_target_action = promote\n");
1468 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1469 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1470 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1471 :
1472 4 : if (dry_run)
1473 : {
1474 3 : appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1475 3 : appendPQExpBuffer(recoveryconfcontents,
1476 : "recovery_target_lsn = '%X/%08X'\n",
1477 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1478 : }
1479 : else
1480 : {
1481 1 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1482 : lsn);
1483 : }
1484 :
1485 4 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1486 : "recovery parameters:\n%s", recoveryconfcontents->data);
1487 :
1488 4 : if (!dry_run)
1489 : {
1490 : char conf_filename[MAXPGPATH];
1491 : FILE *fd;
1492 :
1493 : /* Write the recovery parameters to INCLUDED_CONF_FILE */
1494 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
1495 : INCLUDED_CONF_FILE);
1496 1 : fd = fopen(conf_filename, "w");
1497 1 : if (fd == NULL)
1498 0 : report_createsub_fatal("could not open file \"%s\": %m", conf_filename);
1499 :
1500 1 : if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
1501 0 : report_createsub_fatal("could not write to file \"%s\": %m", conf_filename);
1502 :
1503 1 : fclose(fd);
1504 1 : recovery_params_set = true;
1505 :
1506 : /* Include conditionally the recovery parameters. */
1507 1 : resetPQExpBuffer(recoveryconfcontents);
1508 1 : appendPQExpBufferStr(recoveryconfcontents,
1509 : "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1510 1 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1511 : }
1512 :
1513 4 : disconnect_database(conn, false);
1514 4 : }
1515 :
1516 : /*
1517 : * Drop physical replication slot on primary if the standby was using it. After
1518 : * the transformation, it has no use.
1519 : *
1520 : * XXX we might not fail here. Instead, we provide a warning so the user
1521 : * eventually drops this replication slot later.
1522 : */
1523 : static void
1524 4 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1525 : {
1526 : PGconn *conn;
1527 :
1528 : /* Replication slot does not exist, do nothing */
1529 4 : if (!primary_slot_name)
1530 0 : return;
1531 :
1532 4 : conn = connect_database(dbinfo[0].pubconninfo, false);
1533 4 : if (conn != NULL)
1534 : {
1535 4 : drop_replication_slot(conn, &dbinfo[0], slotname);
1536 4 : disconnect_database(conn, false);
1537 : }
1538 : else
1539 : {
1540 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1541 : "could not drop replication slot \"%s\" on primary",
1542 : slotname);
1543 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1544 : "Drop this replication slot soon to avoid retention of WAL files.");
1545 : }
1546 : }
1547 :
1548 : /*
1549 : * Drop failover replication slots on subscriber. After the transformation,
1550 : * they have no use.
1551 : *
1552 : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1553 : * them later.
1554 : */
1555 : static void
1556 4 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1557 : {
1558 : PGconn *conn;
1559 : PGresult *res;
1560 :
1561 4 : conn = connect_database(dbinfo[0].subconninfo, false);
1562 4 : if (conn != NULL)
1563 : {
1564 : /* Get failover replication slot names */
1565 4 : res = PQexec(conn,
1566 : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1567 :
1568 4 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1569 : {
1570 : /* Remove failover replication slots from subscriber */
1571 8 : for (int i = 0; i < PQntuples(res); i++)
1572 4 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1573 : }
1574 : else
1575 : {
1576 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1577 : "could not obtain failover replication slot information: %s",
1578 : PQresultErrorMessage(res));
1579 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1580 : "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1581 : }
1582 :
1583 4 : PQclear(res);
1584 4 : disconnect_database(conn, false);
1585 : }
1586 : else
1587 : {
1588 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1589 : "could not drop failover replication slot");
1590 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1591 : "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1592 : }
1593 4 : }
1594 :
1595 : /*
1596 : * Create a logical replication slot and returns a LSN.
1597 : *
1598 : * CreateReplicationSlot() is not used because it does not provide the one-row
1599 : * result set that contains the LSN.
1600 : */
1601 : static char *
1602 8 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1603 : {
1604 8 : PQExpBuffer str = createPQExpBuffer();
1605 8 : PGresult *res = NULL;
1606 8 : const char *slot_name = dbinfo->replslotname;
1607 : char *slot_name_esc;
1608 8 : char *lsn = NULL;
1609 :
1610 : Assert(conn != NULL);
1611 :
1612 8 : if (dry_run)
1613 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1614 : "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1615 : slot_name, dbinfo->dbname);
1616 : else
1617 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1618 : "creating the replication slot \"%s\" in database \"%s\" on publisher",
1619 : slot_name, dbinfo->dbname);
1620 :
1621 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1622 :
1623 8 : appendPQExpBuffer(str,
1624 : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1625 : slot_name_esc,
1626 8 : dbinfos.two_phase ? "true" : "false");
1627 :
1628 8 : PQfreemem(slot_name_esc);
1629 :
1630 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1631 : "command is: %s", str->data);
1632 :
1633 8 : if (!dry_run)
1634 : {
1635 2 : res = PQexec(conn, str->data);
1636 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1637 : {
1638 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1639 : "could not create replication slot \"%s\" in database \"%s\": %s",
1640 : slot_name, dbinfo->dbname,
1641 : PQresultErrorMessage(res));
1642 0 : PQclear(res);
1643 0 : destroyPQExpBuffer(str);
1644 0 : return NULL;
1645 : }
1646 :
1647 2 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1648 2 : PQclear(res);
1649 : }
1650 :
1651 : /* For cleanup purposes */
1652 8 : dbinfo->made_replslot = true;
1653 :
1654 8 : destroyPQExpBuffer(str);
1655 :
1656 8 : return lsn;
1657 : }
1658 :
1659 : static void
1660 8 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1661 : const char *slot_name)
1662 : {
1663 8 : PQExpBuffer str = createPQExpBuffer();
1664 : char *slot_name_esc;
1665 : PGresult *res;
1666 :
1667 : Assert(conn != NULL);
1668 :
1669 8 : if (dry_run)
1670 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1671 : "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1672 : slot_name, dbinfo->dbname);
1673 : else
1674 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1675 : "dropping the replication slot \"%s\" in database \"%s\"",
1676 : slot_name, dbinfo->dbname);
1677 :
1678 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1679 :
1680 8 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1681 :
1682 8 : PQfreemem(slot_name_esc);
1683 :
1684 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1685 : "command is: %s", str->data);
1686 :
1687 8 : if (!dry_run)
1688 : {
1689 2 : res = PQexec(conn, str->data);
1690 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1691 : {
1692 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1693 : "could not drop replication slot \"%s\" in database \"%s\": %s",
1694 : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1695 0 : dbinfo->made_replslot = false; /* don't try again. */
1696 : }
1697 :
1698 2 : PQclear(res);
1699 : }
1700 :
1701 8 : destroyPQExpBuffer(str);
1702 8 : }
1703 :
1704 : /*
1705 : * Reports a suitable message if pg_ctl fails.
1706 : */
1707 : static void
1708 24 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1709 : {
1710 24 : if (rc != 0)
1711 : {
1712 0 : if (WIFEXITED(rc))
1713 : {
1714 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1715 : "pg_ctl failed with exit code %d",
1716 0 : WEXITSTATUS(rc));
1717 : }
1718 0 : else if (WIFSIGNALED(rc))
1719 : {
1720 : #if defined(WIN32)
1721 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1722 : "pg_ctl was terminated by exception 0x%X",
1723 : WTERMSIG(rc));
1724 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
1725 : "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1726 : #else
1727 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1728 : "pg_ctl was terminated by signal %d: %s",
1729 : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1730 : #endif
1731 : }
1732 : else
1733 : {
1734 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1735 : "pg_ctl exited with unrecognized status %d", rc);
1736 : }
1737 :
1738 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
1739 : "The failed command was: %s", pg_ctl_cmd);
1740 0 : exit(1);
1741 : }
1742 24 : }
1743 :
1744 : static void
1745 12 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1746 : bool restrict_logical_worker)
1747 : {
1748 12 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1749 : int rc;
1750 :
1751 12 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1752 12 : appendShellString(pg_ctl_cmd, subscriber_dir);
1753 12 : appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1754 :
1755 : /* Prevent unintended slot invalidation */
1756 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1757 :
1758 12 : if (restricted_access)
1759 : {
1760 12 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1761 : #if !defined(WIN32)
1762 :
1763 : /*
1764 : * An empty listen_addresses list means the server does not listen on
1765 : * any IP interfaces; only Unix-domain sockets can be used to connect
1766 : * to the server. Prevent external connections to minimize the chance
1767 : * of failure.
1768 : */
1769 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1770 12 : if (opt->socket_dir)
1771 12 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1772 12 : opt->socket_dir);
1773 12 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1774 : #endif
1775 : }
1776 12 : if (opt->config_file != NULL)
1777 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1778 0 : opt->config_file);
1779 :
1780 : /* Suppress to start logical replication if requested */
1781 12 : if (restrict_logical_worker)
1782 4 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1783 :
1784 12 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1785 : "pg_ctl command is: %s", pg_ctl_cmd->data);
1786 12 : rc = system(pg_ctl_cmd->data);
1787 12 : pg_ctl_status(pg_ctl_cmd->data, rc);
1788 12 : standby_running = true;
1789 12 : destroyPQExpBuffer(pg_ctl_cmd);
1790 12 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1791 : "server was started");
1792 12 : }
1793 :
1794 : static void
1795 12 : stop_standby_server(const char *datadir)
1796 : {
1797 : char *pg_ctl_cmd;
1798 : int rc;
1799 :
1800 12 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1801 : datadir);
1802 12 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1803 : "pg_ctl command is: %s", pg_ctl_cmd);
1804 12 : rc = system(pg_ctl_cmd);
1805 12 : pg_ctl_status(pg_ctl_cmd, rc);
1806 12 : standby_running = false;
1807 12 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1808 : "server was stopped");
1809 12 : }
1810 :
1811 : /*
1812 : * Returns after the server finishes the recovery process.
1813 : *
1814 : * If recovery_timeout option is set, terminate abnormally without finishing
1815 : * the recovery process. By default, it waits forever.
1816 : *
1817 : * XXX Is the recovery process still in progress? When recovery process has a
1818 : * better progress reporting mechanism, it should be added here.
1819 : */
1820 : static void
1821 4 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1822 : {
1823 : PGconn *conn;
1824 4 : bool ready = false;
1825 4 : int timer = 0;
1826 :
1827 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1828 : "waiting for the target server to reach the consistent state");
1829 :
1830 4 : conn = connect_database(conninfo, true);
1831 :
1832 : for (;;)
1833 : {
1834 : /* Did the recovery process finish? We're done if so. */
1835 5 : if (dry_run || !server_is_in_recovery(conn))
1836 : {
1837 4 : ready = true;
1838 4 : recovery_ended = true;
1839 4 : break;
1840 : }
1841 :
1842 : /* Bail out after recovery_timeout seconds if this option is set */
1843 1 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1844 : {
1845 0 : stop_standby_server(subscriber_dir);
1846 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1847 : "recovery timed out");
1848 0 : disconnect_database(conn, true);
1849 : }
1850 :
1851 : /* Keep waiting */
1852 1 : pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
1853 1 : timer += WAIT_INTERVAL;
1854 : }
1855 :
1856 4 : disconnect_database(conn, false);
1857 :
1858 4 : if (!ready)
1859 0 : report_createsub_fatal("server did not end recovery");
1860 :
1861 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1862 : "target server reached the consistent state");
1863 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_HINT,
1864 : "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1865 4 : }
1866 :
1867 : /*
1868 : * Create a publication that includes all tables in the database.
1869 : */
1870 : static void
1871 7 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1872 : {
1873 7 : PQExpBuffer str = createPQExpBuffer();
1874 : PGresult *res;
1875 : char *ipubname_esc;
1876 : char *spubname_esc;
1877 :
1878 : Assert(conn != NULL);
1879 :
1880 7 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1881 7 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1882 :
1883 : /* Check if the publication already exists */
1884 7 : appendPQExpBuffer(str,
1885 : "SELECT 1 FROM pg_catalog.pg_publication "
1886 : "WHERE pubname = %s",
1887 : spubname_esc);
1888 7 : res = PQexec(conn, str->data);
1889 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1890 : {
1891 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1892 : "could not obtain publication information: %s",
1893 : PQresultErrorMessage(res));
1894 0 : disconnect_database(conn, true);
1895 : }
1896 :
1897 7 : if (PQntuples(res) == 1)
1898 : {
1899 : /*
1900 : * Unfortunately, if it reaches this code path, it will always fail
1901 : * (unless you decide to change the existing publication name). That's
1902 : * bad but it is very unlikely that the user will choose a name with
1903 : * pg_createsubscriber_ prefix followed by the exact database oid and
1904 : * a random number.
1905 : */
1906 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1907 : "publication \"%s\" already exists", dbinfo->pubname);
1908 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1909 : "Consider renaming this publication before continuing.");
1910 0 : disconnect_database(conn, true);
1911 : }
1912 :
1913 7 : PQclear(res);
1914 7 : resetPQExpBuffer(str);
1915 :
1916 7 : if (dry_run)
1917 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1918 : "dry-run: would create publication \"%s\" in database \"%s\"",
1919 : dbinfo->pubname, dbinfo->dbname);
1920 : else
1921 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1922 : "creating publication \"%s\" in database \"%s\"",
1923 : dbinfo->pubname, dbinfo->dbname);
1924 :
1925 7 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1926 : ipubname_esc);
1927 :
1928 7 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1929 : "command is: %s", str->data);
1930 :
1931 7 : if (!dry_run)
1932 : {
1933 1 : res = PQexec(conn, str->data);
1934 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1935 : {
1936 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1937 : "could not create publication \"%s\" in database \"%s\": %s",
1938 : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1939 0 : disconnect_database(conn, true);
1940 : }
1941 1 : PQclear(res);
1942 : }
1943 :
1944 : /* For cleanup purposes */
1945 7 : dbinfo->made_publication = true;
1946 :
1947 7 : PQfreemem(ipubname_esc);
1948 7 : PQfreemem(spubname_esc);
1949 7 : destroyPQExpBuffer(str);
1950 7 : }
1951 :
1952 : /*
1953 : * Drop the specified publication in the given database.
1954 : */
1955 : static void
1956 10 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1957 : bool *made_publication)
1958 : {
1959 10 : PQExpBuffer str = createPQExpBuffer();
1960 : PGresult *res;
1961 : char *pubname_esc;
1962 :
1963 : Assert(conn != NULL);
1964 :
1965 10 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1966 :
1967 10 : if (dry_run)
1968 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1969 : "dry-run: would drop publication \"%s\" in database \"%s\"",
1970 : pubname, dbname);
1971 : else
1972 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1973 : "dropping publication \"%s\" in database \"%s\"",
1974 : pubname, dbname);
1975 :
1976 10 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1977 :
1978 10 : PQfreemem(pubname_esc);
1979 :
1980 10 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1981 : "command is: %s", str->data);
1982 :
1983 10 : if (!dry_run)
1984 : {
1985 4 : res = PQexec(conn, str->data);
1986 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1987 : {
1988 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1989 : "could not drop publication \"%s\" in database \"%s\": %s",
1990 : pubname, dbname, PQresultErrorMessage(res));
1991 0 : *made_publication = false; /* don't try again. */
1992 :
1993 : /*
1994 : * Don't disconnect and exit here. This routine is used by primary
1995 : * (cleanup publication / replication slot due to an error) and
1996 : * subscriber (remove the replicated publications). In both cases,
1997 : * it can continue and provide instructions for the user to remove
1998 : * it later if cleanup fails.
1999 : */
2000 : }
2001 4 : PQclear(res);
2002 : }
2003 :
2004 10 : destroyPQExpBuffer(str);
2005 10 : }
2006 :
2007 : /*
2008 : * Retrieve and drop the publications.
2009 : *
2010 : * Publications copied during physical replication remain on the subscriber
2011 : * after promotion. If --clean=publications is specified, drop all existing
2012 : * publications in the subscriber database. Otherwise, only drop publications
2013 : * that were created by pg_createsubscriber during this operation.
2014 : */
2015 : static void
2016 8 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
2017 : {
2018 : PGresult *res;
2019 8 : bool drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
2020 :
2021 : Assert(conn != NULL);
2022 :
2023 8 : if (drop_all_pubs)
2024 : {
2025 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2026 : "dropping all existing publications in database \"%s\"",
2027 : dbinfo->dbname);
2028 :
2029 : /* Fetch all publication names */
2030 2 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
2031 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2032 : {
2033 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2034 : "could not obtain publication information: %s",
2035 : PQresultErrorMessage(res));
2036 0 : PQclear(res);
2037 0 : disconnect_database(conn, true);
2038 : }
2039 :
2040 : /* Drop each publication */
2041 6 : for (int i = 0; i < PQntuples(res); i++)
2042 4 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
2043 : &dbinfo->made_publication);
2044 :
2045 2 : PQclear(res);
2046 : }
2047 : else
2048 : {
2049 : /* Drop publication only if it was created by this tool */
2050 6 : if (dbinfo->made_publication)
2051 : {
2052 6 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
2053 : &dbinfo->made_publication);
2054 : }
2055 : else
2056 : {
2057 0 : if (dry_run)
2058 0 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2059 : "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
2060 : dbinfo->pubname, dbinfo->dbname);
2061 : else
2062 0 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2063 : "preserve existing publication \"%s\" in database \"%s\"",
2064 : dbinfo->pubname, dbinfo->dbname);
2065 : }
2066 : }
2067 8 : }
2068 :
2069 : /*
2070 : * Create a subscription with some predefined options.
2071 : *
2072 : * A replication slot was already created in a previous step. Let's use it. It
2073 : * is not required to copy data. The subscription will be created but it will
2074 : * not be enabled now. That's because the replication progress must be set and
2075 : * the replication origin name (one of the function arguments) contains the
2076 : * subscription OID in its name. Once the subscription is created,
2077 : * set_replication_progress() can obtain the chosen origin name and set up its
2078 : * initial location.
2079 : */
2080 : static void
2081 8 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
2082 : {
2083 8 : PQExpBuffer str = createPQExpBuffer();
2084 : PGresult *res;
2085 : char *pubname_esc;
2086 : char *subname_esc;
2087 : char *pubconninfo_esc;
2088 : char *replslotname_esc;
2089 :
2090 : Assert(conn != NULL);
2091 :
2092 8 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
2093 8 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2094 8 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
2095 8 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
2096 :
2097 8 : if (dry_run)
2098 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2099 : "dry-run: would create subscription \"%s\" in database \"%s\"",
2100 6 : dbinfo->subname, dbinfo->dbname);
2101 : else
2102 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2103 : "creating subscription \"%s\" in database \"%s\"",
2104 2 : dbinfo->subname, dbinfo->dbname);
2105 :
2106 8 : appendPQExpBuffer(str,
2107 : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2108 : "WITH (create_slot = false, enabled = false, "
2109 : "slot_name = %s, copy_data = false, two_phase = %s)",
2110 : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
2111 8 : dbinfos.two_phase ? "true" : "false");
2112 :
2113 8 : PQfreemem(pubname_esc);
2114 8 : PQfreemem(subname_esc);
2115 8 : PQfreemem(pubconninfo_esc);
2116 8 : PQfreemem(replslotname_esc);
2117 :
2118 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
2119 : "command is: %s", str->data);
2120 :
2121 8 : if (!dry_run)
2122 : {
2123 2 : res = PQexec(conn, str->data);
2124 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2125 : {
2126 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2127 : "could not create subscription \"%s\" in database \"%s\": %s",
2128 0 : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
2129 0 : disconnect_database(conn, true);
2130 : }
2131 2 : PQclear(res);
2132 : }
2133 :
2134 8 : destroyPQExpBuffer(str);
2135 8 : }
2136 :
2137 : /*
2138 : * Sets the replication progress to the consistent LSN.
2139 : *
2140 : * The subscriber caught up to the consistent LSN provided by the last
2141 : * replication slot that was created. The goal is to set up the initial
2142 : * location for the logical replication that is the exact LSN that the
2143 : * subscriber was promoted. Once the subscription is enabled it will start
2144 : * streaming from that location onwards. In dry run mode, the subscription OID
2145 : * and LSN are set to invalid values for printing purposes.
2146 : */
2147 : static void
2148 8 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
2149 : {
2150 8 : PQExpBuffer str = createPQExpBuffer();
2151 : PGresult *res;
2152 : Oid suboid;
2153 : char *subname;
2154 : char *dbname;
2155 : char *originname;
2156 : char *lsnstr;
2157 :
2158 : Assert(conn != NULL);
2159 :
2160 8 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2161 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2162 :
2163 8 : appendPQExpBuffer(str,
2164 : "SELECT s.oid FROM pg_catalog.pg_subscription s "
2165 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2166 : "WHERE s.subname = %s AND d.datname = %s",
2167 : subname, dbname);
2168 :
2169 8 : res = PQexec(conn, str->data);
2170 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2171 : {
2172 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2173 : "could not obtain subscription OID: %s",
2174 : PQresultErrorMessage(res));
2175 0 : disconnect_database(conn, true);
2176 : }
2177 :
2178 8 : if (PQntuples(res) != 1 && !dry_run)
2179 : {
2180 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2181 : "could not obtain subscription OID: got %d rows, expected %d row",
2182 : PQntuples(res), 1);
2183 0 : disconnect_database(conn, true);
2184 : }
2185 :
2186 8 : if (dry_run)
2187 : {
2188 6 : suboid = InvalidOid;
2189 6 : lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
2190 : }
2191 : else
2192 : {
2193 2 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2194 2 : lsnstr = psprintf("%s", lsn);
2195 : }
2196 :
2197 8 : PQclear(res);
2198 :
2199 : /*
2200 : * The origin name is defined as pg_%u. %u is the subscription OID. See
2201 : * ApplyWorkerMain().
2202 : */
2203 8 : originname = psprintf("pg_%u", suboid);
2204 :
2205 8 : if (dry_run)
2206 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2207 : "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2208 6 : originname, lsnstr, dbinfo->dbname);
2209 : else
2210 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2211 : "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2212 2 : originname, lsnstr, dbinfo->dbname);
2213 :
2214 8 : resetPQExpBuffer(str);
2215 8 : appendPQExpBuffer(str,
2216 : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2217 : originname, lsnstr);
2218 :
2219 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
2220 : "command is: %s", str->data);
2221 :
2222 8 : if (!dry_run)
2223 : {
2224 2 : res = PQexec(conn, str->data);
2225 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2226 : {
2227 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2228 : "could not set replication progress for subscription \"%s\": %s",
2229 0 : dbinfo->subname, PQresultErrorMessage(res));
2230 0 : disconnect_database(conn, true);
2231 : }
2232 2 : PQclear(res);
2233 : }
2234 :
2235 8 : PQfreemem(subname);
2236 8 : PQfreemem(dbname);
2237 8 : pg_free(originname);
2238 8 : pg_free(lsnstr);
2239 8 : destroyPQExpBuffer(str);
2240 8 : }
2241 :
2242 : /*
2243 : * Enables the subscription.
2244 : *
2245 : * The subscription was created in a previous step but it was disabled. After
2246 : * adjusting the initial logical replication location, enable the subscription.
2247 : */
2248 : static void
2249 8 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
2250 : {
2251 8 : PQExpBuffer str = createPQExpBuffer();
2252 : PGresult *res;
2253 : char *subname;
2254 :
2255 : Assert(conn != NULL);
2256 :
2257 8 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2258 :
2259 8 : if (dry_run)
2260 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2261 : "dry-run: would enable subscription \"%s\" in database \"%s\"",
2262 6 : dbinfo->subname, dbinfo->dbname);
2263 : else
2264 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2265 : "enabling subscription \"%s\" in database \"%s\"",
2266 2 : dbinfo->subname, dbinfo->dbname);
2267 :
2268 8 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2269 :
2270 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
2271 : "command is: %s", str->data);
2272 :
2273 8 : if (!dry_run)
2274 : {
2275 2 : res = PQexec(conn, str->data);
2276 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2277 : {
2278 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2279 : "could not enable subscription \"%s\": %s",
2280 0 : dbinfo->subname, PQresultErrorMessage(res));
2281 0 : disconnect_database(conn, true);
2282 : }
2283 :
2284 2 : PQclear(res);
2285 : }
2286 :
2287 8 : PQfreemem(subname);
2288 8 : destroyPQExpBuffer(str);
2289 8 : }
2290 :
2291 : /*
2292 : * Fetch a list of all connectable non-template databases from the source server
2293 : * and form a list such that they appear as if the user has specified multiple
2294 : * --database options, one for each source database.
2295 : */
2296 : static void
2297 1 : get_publisher_databases(struct CreateSubscriberOptions *opt,
2298 : bool dbnamespecified)
2299 : {
2300 : PGconn *conn;
2301 : PGresult *res;
2302 :
2303 : /* If a database name was specified, just connect to it. */
2304 1 : if (dbnamespecified)
2305 0 : conn = connect_database(opt->pub_conninfo_str, true);
2306 : else
2307 : {
2308 : /* Otherwise, try postgres first and then template1. */
2309 : char *conninfo;
2310 :
2311 1 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2312 1 : conn = connect_database(conninfo, false);
2313 1 : pg_free(conninfo);
2314 1 : if (!conn)
2315 : {
2316 0 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2317 0 : conn = connect_database(conninfo, true);
2318 0 : pg_free(conninfo);
2319 : }
2320 : }
2321 :
2322 1 : res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2323 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2324 : {
2325 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2326 : "could not obtain a list of databases: %s",
2327 : PQresultErrorMessage(res));
2328 0 : PQclear(res);
2329 0 : disconnect_database(conn, true);
2330 : }
2331 :
2332 4 : for (int i = 0; i < PQntuples(res); i++)
2333 : {
2334 3 : const char *dbname = PQgetvalue(res, i, 0);
2335 :
2336 3 : simple_string_list_append(&opt->database_names, dbname);
2337 :
2338 : /* Increment num_dbs to reflect multiple --database options */
2339 3 : num_dbs++;
2340 : }
2341 :
2342 1 : PQclear(res);
2343 1 : disconnect_database(conn, false);
2344 1 : }
2345 :
2346 : int
2347 23 : main(int argc, char **argv)
2348 : {
2349 : static struct option long_options[] =
2350 : {
2351 : {"all", no_argument, NULL, 'a'},
2352 : {"database", required_argument, NULL, 'd'},
2353 : {"pgdata", required_argument, NULL, 'D'},
2354 : {"dry-run", no_argument, NULL, 'n'},
2355 : {"subscriber-port", required_argument, NULL, 'p'},
2356 : {"publisher-server", required_argument, NULL, 'P'},
2357 : {"socketdir", required_argument, NULL, 's'},
2358 : {"recovery-timeout", required_argument, NULL, 't'},
2359 : {"enable-two-phase", no_argument, NULL, 'T'},
2360 : {"subscriber-username", required_argument, NULL, 'U'},
2361 : {"verbose", no_argument, NULL, 'v'},
2362 : {"version", no_argument, NULL, 'V'},
2363 : {"help", no_argument, NULL, '?'},
2364 : {"config-file", required_argument, NULL, 1},
2365 : {"publication", required_argument, NULL, 2},
2366 : {"replication-slot", required_argument, NULL, 3},
2367 : {"subscription", required_argument, NULL, 4},
2368 : {"clean", required_argument, NULL, 5},
2369 : {NULL, 0, NULL, 0}
2370 : };
2371 :
2372 23 : struct CreateSubscriberOptions opt = {0};
2373 :
2374 : int c;
2375 : int option_index;
2376 :
2377 : char *pub_base_conninfo;
2378 : char *sub_base_conninfo;
2379 23 : char *dbname_conninfo = NULL;
2380 :
2381 : uint64 pub_sysid;
2382 : uint64 sub_sysid;
2383 : struct stat statbuf;
2384 :
2385 : char *consistent_lsn;
2386 :
2387 : char pidfile[MAXPGPATH];
2388 :
2389 23 : pg_logging_init(argv[0]);
2390 23 : pg_logging_set_level(PG_LOG_WARNING);
2391 23 : progname = get_progname(argv[0]);
2392 23 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2393 :
2394 23 : if (argc > 1)
2395 : {
2396 22 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2397 : {
2398 1 : usage();
2399 1 : exit(0);
2400 : }
2401 21 : else if (strcmp(argv[1], "-V") == 0
2402 21 : || strcmp(argv[1], "--version") == 0)
2403 : {
2404 1 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2405 1 : exit(0);
2406 : }
2407 : }
2408 :
2409 : /* Default settings */
2410 21 : subscriber_dir = NULL;
2411 21 : opt.config_file = NULL;
2412 21 : opt.pub_conninfo_str = NULL;
2413 21 : opt.socket_dir = NULL;
2414 21 : opt.sub_port = DEFAULT_SUB_PORT;
2415 21 : opt.sub_username = NULL;
2416 21 : opt.two_phase = false;
2417 21 : opt.database_names = (SimpleStringList)
2418 : {
2419 : 0
2420 : };
2421 21 : opt.recovery_timeout = 0;
2422 21 : opt.all_dbs = false;
2423 :
2424 : /*
2425 : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2426 : * it either.
2427 : */
2428 : #ifndef WIN32
2429 21 : if (geteuid() == 0)
2430 : {
2431 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2432 : "cannot be executed by \"root\"");
2433 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2434 : "You must run %s as the PostgreSQL superuser.",
2435 : progname);
2436 0 : exit(1);
2437 : }
2438 : #endif
2439 :
2440 21 : get_restricted_token();
2441 :
2442 162 : while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
2443 162 : long_options, &option_index)) != -1)
2444 : {
2445 144 : switch (c)
2446 : {
2447 3 : case 'a':
2448 3 : opt.all_dbs = true;
2449 3 : break;
2450 25 : case 'd':
2451 25 : if (!simple_string_list_member(&opt.database_names, optarg))
2452 : {
2453 24 : simple_string_list_append(&opt.database_names, optarg);
2454 24 : num_dbs++;
2455 : }
2456 : else
2457 1 : report_createsub_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2458 24 : break;
2459 19 : case 'D':
2460 19 : subscriber_dir = pg_strdup(optarg);
2461 19 : canonicalize_path(subscriber_dir);
2462 19 : break;
2463 9 : case 'n':
2464 9 : dry_run = true;
2465 9 : break;
2466 12 : case 'p':
2467 12 : opt.sub_port = pg_strdup(optarg);
2468 12 : break;
2469 18 : case 'P':
2470 18 : opt.pub_conninfo_str = pg_strdup(optarg);
2471 18 : break;
2472 12 : case 's':
2473 12 : opt.socket_dir = pg_strdup(optarg);
2474 12 : canonicalize_path(opt.socket_dir);
2475 12 : break;
2476 3 : case 't':
2477 3 : opt.recovery_timeout = atoi(optarg);
2478 3 : break;
2479 1 : case 'T':
2480 1 : opt.two_phase = true;
2481 1 : break;
2482 0 : case 'U':
2483 0 : opt.sub_username = pg_strdup(optarg);
2484 0 : break;
2485 19 : case 'v':
2486 19 : pg_logging_increase_verbosity();
2487 19 : break;
2488 0 : case 1:
2489 0 : opt.config_file = pg_strdup(optarg);
2490 0 : break;
2491 12 : case 2:
2492 12 : if (!simple_string_list_member(&opt.pub_names, optarg))
2493 : {
2494 11 : simple_string_list_append(&opt.pub_names, optarg);
2495 11 : num_pubs++;
2496 : }
2497 : else
2498 1 : report_createsub_fatal("publication \"%s\" specified more than once for --publication", optarg);
2499 11 : break;
2500 4 : case 3:
2501 4 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2502 : {
2503 4 : simple_string_list_append(&opt.replslot_names, optarg);
2504 4 : num_replslots++;
2505 : }
2506 : else
2507 0 : report_createsub_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2508 4 : break;
2509 5 : case 4:
2510 5 : if (!simple_string_list_member(&opt.sub_names, optarg))
2511 : {
2512 5 : simple_string_list_append(&opt.sub_names, optarg);
2513 5 : num_subs++;
2514 : }
2515 : else
2516 0 : report_createsub_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2517 5 : break;
2518 1 : case 5:
2519 1 : if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
2520 1 : simple_string_list_append(&opt.objecttypes_to_clean, optarg);
2521 : else
2522 0 : report_createsub_fatal("object type \"%s\" specified more than once for --clean", optarg);
2523 1 : break;
2524 1 : default:
2525 : /* getopt_long already emitted a complaint */
2526 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2527 : "Try \"%s --help\" for more information.",
2528 : progname);
2529 1 : exit(1);
2530 : }
2531 : }
2532 :
2533 : /* Validate that --all is not used with incompatible options */
2534 18 : if (opt.all_dbs)
2535 : {
2536 3 : char *bad_switch = NULL;
2537 :
2538 3 : if (num_dbs > 0)
2539 1 : bad_switch = "--database";
2540 2 : else if (num_pubs > 0)
2541 1 : bad_switch = "--publication";
2542 1 : else if (num_replslots > 0)
2543 0 : bad_switch = "--replication-slot";
2544 1 : else if (num_subs > 0)
2545 0 : bad_switch = "--subscription";
2546 :
2547 3 : if (bad_switch)
2548 : {
2549 2 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2550 : "options %s and %s cannot be used together",
2551 : bad_switch, "-a/--all");
2552 2 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2553 : "Try \"%s --help\" for more information.",
2554 : progname);
2555 2 : exit(1);
2556 : }
2557 : }
2558 :
2559 : /* Any non-option arguments? */
2560 16 : if (optind < argc)
2561 : {
2562 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2563 : "too many command-line arguments (first is \"%s\")",
2564 0 : argv[optind]);
2565 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2566 : "Try \"%s --help\" for more information.", progname);
2567 0 : exit(1);
2568 : }
2569 :
2570 : /* Required arguments */
2571 16 : if (subscriber_dir == NULL)
2572 : {
2573 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2574 : "no subscriber data directory specified");
2575 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2576 : "Try \"%s --help\" for more information.", progname);
2577 1 : exit(1);
2578 : }
2579 :
2580 : /* If socket directory is not provided, use the current directory */
2581 15 : if (opt.socket_dir == NULL)
2582 : {
2583 : char cwd[MAXPGPATH];
2584 :
2585 5 : if (!getcwd(cwd, MAXPGPATH))
2586 0 : report_createsub_fatal("could not determine current directory");
2587 5 : opt.socket_dir = pg_strdup(cwd);
2588 5 : canonicalize_path(opt.socket_dir);
2589 : }
2590 :
2591 : /*
2592 : * Parse connection string. Build a base connection string that might be
2593 : * reused by multiple databases.
2594 : */
2595 15 : if (opt.pub_conninfo_str == NULL)
2596 : {
2597 : /*
2598 : * TODO use primary_conninfo (if available) from subscriber and
2599 : * extract publisher connection string. Assume that there are
2600 : * identical entries for physical and logical replication. If there is
2601 : * not, we would fail anyway.
2602 : */
2603 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2604 : "no publisher connection string specified");
2605 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2606 : "Try \"%s --help\" for more information.", progname);
2607 1 : exit(1);
2608 : }
2609 :
2610 14 : if (dry_run)
2611 8 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2612 : "Executing in dry-run mode.\n"
2613 : "The target directory will not be modified.");
2614 :
2615 14 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2616 : "validating publisher connection string");
2617 14 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2618 : &dbname_conninfo);
2619 14 : if (pub_base_conninfo == NULL)
2620 0 : exit(1);
2621 :
2622 14 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2623 : "validating subscriber connection string");
2624 14 : sub_base_conninfo = get_sub_conninfo(&opt);
2625 :
2626 : /*
2627 : * Fetch all databases from the source (publisher) and treat them as if
2628 : * the user specified has multiple --database options, one for each source
2629 : * database.
2630 : */
2631 14 : if (opt.all_dbs)
2632 : {
2633 1 : bool dbnamespecified = (dbname_conninfo != NULL);
2634 :
2635 1 : get_publisher_databases(&opt, dbnamespecified);
2636 : }
2637 :
2638 14 : if (opt.database_names.head == NULL)
2639 : {
2640 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2641 : "no database was specified");
2642 :
2643 : /*
2644 : * Try to obtain the dbname from the publisher conninfo. If dbname
2645 : * parameter is not available, error out.
2646 : */
2647 2 : if (dbname_conninfo)
2648 : {
2649 1 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2650 1 : num_dbs++;
2651 :
2652 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2653 : "database name \"%s\" was extracted from the publisher connection string",
2654 : dbname_conninfo);
2655 : }
2656 : else
2657 : {
2658 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2659 : "no database name specified");
2660 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2661 : "Try \"%s --help\" for more information.",
2662 : progname);
2663 1 : exit(1);
2664 : }
2665 : }
2666 :
2667 : /* Number of object names must match number of databases */
2668 13 : if (num_pubs > 0 && num_pubs != num_dbs)
2669 : {
2670 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2671 : "wrong number of publication names specified");
2672 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
2673 : "The number of specified publication names (%d) must match the number of specified database names (%d).",
2674 : num_pubs, num_dbs);
2675 1 : exit(1);
2676 : }
2677 12 : if (num_subs > 0 && num_subs != num_dbs)
2678 : {
2679 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2680 : "wrong number of subscription names specified");
2681 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
2682 : "The number of specified subscription names (%d) must match the number of specified database names (%d).",
2683 : num_subs, num_dbs);
2684 1 : exit(1);
2685 : }
2686 11 : if (num_replslots > 0 && num_replslots != num_dbs)
2687 : {
2688 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2689 : "wrong number of replication slot names specified");
2690 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
2691 : "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2692 : num_replslots, num_dbs);
2693 1 : exit(1);
2694 : }
2695 :
2696 : /* Verify the object types specified for removal from the subscriber */
2697 11 : for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2698 : {
2699 1 : if (pg_strcasecmp(cell->val, "publications") == 0)
2700 1 : dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
2701 : else
2702 : {
2703 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2704 : "invalid object type \"%s\" specified for %s",
2705 0 : cell->val, "--clean");
2706 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2707 : "The valid value is: \"%s\"", "publications");
2708 0 : exit(1);
2709 : }
2710 : }
2711 :
2712 : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2713 10 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2714 10 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2715 :
2716 : /* Rudimentary check for a data directory */
2717 10 : check_data_directory(subscriber_dir);
2718 :
2719 10 : dbinfos.two_phase = opt.two_phase;
2720 :
2721 : /*
2722 : * Store database information for publisher and subscriber. It should be
2723 : * called before atexit() because its return is used in the
2724 : * cleanup_objects_atexit().
2725 : */
2726 10 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2727 :
2728 : /* Register a function to clean up objects in case of failure */
2729 10 : atexit(cleanup_objects_atexit);
2730 :
2731 : /*
2732 : * Check if the subscriber data directory has the same system identifier
2733 : * than the publisher data directory.
2734 : */
2735 10 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
2736 10 : sub_sysid = get_standby_sysid(subscriber_dir);
2737 10 : if (pub_sysid != sub_sysid)
2738 1 : report_createsub_fatal("subscriber data directory is not a copy of the source database cluster");
2739 :
2740 : /* Subscriber PID file */
2741 9 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2742 :
2743 : /*
2744 : * The standby server must not be running. If the server is started under
2745 : * service manager and pg_createsubscriber stops it, the service manager
2746 : * might react to this action and start the server again. Therefore,
2747 : * refuse to proceed if the server is running to avoid possible failures.
2748 : */
2749 9 : if (stat(pidfile, &statbuf) == 0)
2750 : {
2751 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2752 : "standby server is running");
2753 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2754 : "Stop the standby server and try again.");
2755 1 : exit(1);
2756 : }
2757 :
2758 : /*
2759 : * Start a short-lived standby server with temporary parameters (provided
2760 : * by command-line options). The goal is to avoid connections during the
2761 : * transformation steps.
2762 : */
2763 8 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2764 : "starting the standby server with command-line options");
2765 8 : start_standby_server(&opt, true, false);
2766 :
2767 : /* Check if the standby server is ready for logical replication */
2768 8 : check_subscriber(dbinfos.dbinfo);
2769 :
2770 : /* Check if the primary server is ready for logical replication */
2771 6 : check_publisher(dbinfos.dbinfo);
2772 :
2773 : /*
2774 : * Stop the target server. The recovery process requires that the server
2775 : * reaches a consistent state before targeting the recovery stop point.
2776 : * Make sure a consistent state is reached (stop the target server
2777 : * guarantees it) *before* creating the replication slots in
2778 : * setup_publisher().
2779 : */
2780 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2781 : "stopping the subscriber");
2782 4 : stop_standby_server(subscriber_dir);
2783 :
2784 : /* Create the required objects for each database on publisher */
2785 4 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2786 :
2787 : /* Write the required recovery parameters */
2788 4 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2789 :
2790 : /*
2791 : * Start subscriber so the recovery parameters will take effect. Wait
2792 : * until accepting connections. We don't want to start logical replication
2793 : * during setup.
2794 : */
2795 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2796 : "starting the subscriber");
2797 4 : start_standby_server(&opt, true, true);
2798 :
2799 : /* Waiting the subscriber to be promoted */
2800 4 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2801 :
2802 : /*
2803 : * Create the subscription for each database on subscriber. It does not
2804 : * enable it immediately because it needs to adjust the replication start
2805 : * point to the LSN reported by setup_publisher(). It also cleans up
2806 : * publications created by this tool and replication to the standby.
2807 : */
2808 4 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2809 :
2810 : /* Remove primary_slot_name if it exists on primary */
2811 4 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2812 :
2813 : /* Remove failover replication slots if they exist on subscriber */
2814 4 : drop_failover_replication_slots(dbinfos.dbinfo);
2815 :
2816 : /* Stop the subscriber */
2817 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2818 : "stopping the subscriber");
2819 4 : stop_standby_server(subscriber_dir);
2820 :
2821 : /* Change system identifier from subscriber */
2822 4 : modify_subscriber_sysid(&opt);
2823 :
2824 4 : success = true;
2825 :
2826 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2827 : "Done!");
2828 :
2829 4 : return 0;
2830 : }
|