Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_createsubscriber.c
4 : * Create a new logical replica from a standby server
5 : *
6 : * Copyright (c) 2024-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres_fe.h"
15 :
16 : #include <sys/stat.h>
17 : #include <sys/time.h>
18 : #include <sys/wait.h>
19 : #include <time.h>
20 :
21 : #include "common/connect.h"
22 : #include "common/controldata_utils.h"
23 : #include "common/file_perm.h"
24 : #include "common/file_utils.h"
25 : #include "common/logging.h"
26 : #include "common/pg_prng.h"
27 : #include "common/restricted_token.h"
28 : #include "datatype/timestamp.h"
29 : #include "fe_utils/recovery_gen.h"
30 : #include "fe_utils/simple_list.h"
31 : #include "fe_utils/string_utils.h"
32 : #include "fe_utils/version.h"
33 : #include "getopt_long.h"
34 :
35 : #define DEFAULT_SUB_PORT "50432"
36 : #define OBJECTTYPE_PUBLICATIONS 0x0001
37 :
38 : /*
39 : * Configuration files for recovery parameters.
40 : *
41 : * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
42 : * the server through an include_if_exists in postgresql.auto.conf.
43 : *
44 : * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
45 : * so as the recovery parameters set by this tool never take effect on node
46 : * restart. The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
47 : * debugging.
48 : */
49 : #define PG_AUTOCONF_FILENAME "postgresql.auto.conf"
50 : #define INCLUDED_CONF_FILE "pg_createsubscriber.conf"
51 : #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
52 :
53 : #define SERVER_LOG_FILE_NAME "pg_createsubscriber_server.log"
54 : #define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal.log"
55 :
56 : /* Command-line options */
57 : struct CreateSubscriberOptions
58 : {
59 : char *config_file; /* configuration file */
60 : char *log_dir; /* log directory name */
61 : char *pub_conninfo_str; /* publisher connection string */
62 : char *socket_dir; /* directory for Unix-domain socket, if any */
63 : char *sub_port; /* subscriber port number */
64 : const char *sub_username; /* subscriber username */
65 : bool two_phase; /* enable-two-phase option */
66 : SimpleStringList database_names; /* list of database names */
67 : SimpleStringList pub_names; /* list of publication names */
68 : SimpleStringList sub_names; /* list of subscription names */
69 : SimpleStringList replslot_names; /* list of replication slot names */
70 : int recovery_timeout; /* stop recovery after this time */
71 : bool all_dbs; /* all option */
72 : SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
73 : };
74 :
75 : /* per-database publication/subscription info */
76 : struct LogicalRepInfo
77 : {
78 : char *dbname; /* database name */
79 : char *pubconninfo; /* publisher connection string */
80 : char *subconninfo; /* subscriber connection string */
81 : char *pubname; /* publication name */
82 : char *subname; /* subscription name */
83 : char *replslotname; /* replication slot name */
84 :
85 : bool made_replslot; /* replication slot was created */
86 : bool made_publication; /* publication was created */
87 : };
88 :
89 : /*
90 : * Information shared across all the databases (or publications and
91 : * subscriptions).
92 : */
93 : struct LogicalRepInfos
94 : {
95 : struct LogicalRepInfo *dbinfo;
96 : bool two_phase; /* enable-two-phase option */
97 : uint32 objecttypes_to_clean; /* flags indicating which object types
98 : * to clean up on subscriber */
99 : };
100 :
101 : static void cleanup_objects_atexit(void);
102 : static void usage(void);
103 : static char *get_base_conninfo(const char *conninfo, char **dbname);
104 : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
105 : static char *get_exec_path(const char *argv0, const char *progname);
106 : static void check_data_directory(const char *datadir);
107 : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
108 : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
109 : const char *pub_base_conninfo,
110 : const char *sub_base_conninfo);
111 : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
112 : static void disconnect_database(PGconn *conn, bool exit_on_error);
113 : static uint64 get_primary_sysid(const char *conninfo);
114 : static uint64 get_standby_sysid(const char *datadir);
115 : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
116 : static bool server_is_in_recovery(PGconn *conn);
117 : static char *generate_object_name(PGconn *conn);
118 : static void check_publisher(const struct LogicalRepInfo *dbinfo);
119 : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
120 : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
121 : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
122 : const char *consistent_lsn);
123 : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
124 : const char *lsn);
125 : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
126 : const char *slotname);
127 : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
128 : static char *create_logical_replication_slot(PGconn *conn,
129 : struct LogicalRepInfo *dbinfo);
130 : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
131 : const char *slot_name);
132 : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
133 : static void start_standby_server(const struct CreateSubscriberOptions *opt,
134 : bool restricted_access,
135 : bool restrict_logical_worker);
136 : static void stop_standby_server(const char *datadir);
137 : static void wait_for_end_recovery(const char *conninfo,
138 : const struct CreateSubscriberOptions *opt);
139 : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
140 : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
141 : static void drop_publication(PGconn *conn, const char *pubname,
142 : const char *dbname, bool *made_publication);
143 : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
144 : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
145 : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
146 : const char *lsn);
147 : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
148 : static void check_and_drop_existing_subscriptions(PGconn *conn,
149 : const struct LogicalRepInfo *dbinfo);
150 : static void drop_existing_subscription(PGconn *conn, const char *subname,
151 : const char *dbname);
152 : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
153 : bool dbnamespecified);
154 : static void report_createsub_log(enum pg_log_level, enum pg_log_part,
155 : const char *pg_restrict fmt,...)
156 : pg_attribute_printf(3, 4);
157 : static void report_createsub_log_v(enum pg_log_level level, enum pg_log_part part,
158 : const char *pg_restrict fmt, va_list args)
159 : pg_attribute_printf(3, 0);
160 : pg_noreturn static void report_createsub_fatal(const char *pg_restrict fmt,...)
161 : pg_attribute_printf(1, 2);
162 : static void internal_log_file_write(enum pg_log_level level,
163 : enum pg_log_part part,
164 : const char *pg_restrict fmt, va_list args)
165 : pg_attribute_printf(3, 0);
166 :
167 : #define WAIT_INTERVAL 1 /* 1 second */
168 :
169 : static const char *progname;
170 :
171 : static char *primary_slot_name = NULL;
172 : static bool dry_run = false;
173 :
174 : static bool success = false;
175 :
176 : static struct LogicalRepInfos dbinfos;
177 : static int num_dbs = 0; /* number of specified databases */
178 : static int num_pubs = 0; /* number of specified publications */
179 : static int num_subs = 0; /* number of specified subscriptions */
180 : static int num_replslots = 0; /* number of specified replication slots */
181 :
182 : static pg_prng_state prng_state;
183 :
184 : static char *pg_ctl_path = NULL;
185 : static char *pg_resetwal_path = NULL;
186 :
187 : static FILE *internal_log_file_fp = NULL; /* File ptr to log all messages to */
188 : static char logdir[MAXPGPATH]; /* Subdirectory of the user specified logdir
189 : * where the log files are written (if
190 : * specified) */
191 :
192 : /* standby / subscriber data directory */
193 : static char *subscriber_dir = NULL;
194 :
195 : static bool recovery_ended = false;
196 : static bool standby_running = false;
197 : static bool recovery_params_set = false;
198 :
199 : /*
200 : * Report a message with a given log level.
201 : *
202 : * Writes to stderr, and also to the log file, if --logdir option was
203 : * specified.
204 : */
205 : static void
206 485 : report_createsub_log_v(enum pg_log_level level, enum pg_log_part part,
207 : const char *pg_restrict fmt, va_list args)
208 : {
209 485 : int save_errno = errno;
210 :
211 485 : if (internal_log_file_fp != NULL)
212 : {
213 : /* Output to both stderr and the log file */
214 : va_list arg_cpy;
215 :
216 77 : va_copy(arg_cpy, args);
217 77 : internal_log_file_write(level, part, fmt, arg_cpy);
218 77 : va_end(arg_cpy);
219 : /* Restore errno in case internal_log_file_write changed it */
220 77 : errno = save_errno;
221 : }
222 485 : pg_log_generic_v(level, part, fmt, args);
223 485 : }
224 :
225 : static void
226 482 : report_createsub_log(enum pg_log_level level, enum pg_log_part part,
227 : const char *pg_restrict fmt,...)
228 : {
229 : va_list args;
230 :
231 482 : va_start(args, fmt);
232 :
233 482 : report_createsub_log_v(level, part, fmt, args);
234 :
235 482 : va_end(args);
236 482 : }
237 :
238 : /*
239 : * Report a fatal error and exit
240 : */
241 : static void
242 3 : report_createsub_fatal(const char *pg_restrict fmt,...)
243 : {
244 : va_list args;
245 :
246 3 : va_start(args, fmt);
247 :
248 3 : report_createsub_log_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, args);
249 :
250 3 : va_end(args);
251 :
252 3 : exit(1);
253 : }
254 :
255 : /*
256 : * Clean up objects created by pg_createsubscriber.
257 : *
258 : * Publications and replication slots are created on the primary. Depending
259 : * on the step where it failed, already-created objects should be removed if
260 : * possible (sometimes this won't work due to a connection issue).
261 : * There is no cleanup on the target server *after* its promotion, because any
262 : * failure at this point means recreating the physical replica and starting
263 : * again.
264 : *
265 : * The recovery configuration is always removed, by renaming the included
266 : * configuration file out of the way.
267 : */
268 : static void
269 10 : cleanup_objects_atexit(void)
270 : {
271 : /* Rename the included configuration file, if necessary. */
272 10 : if (recovery_params_set)
273 : {
274 : char conf_filename[MAXPGPATH];
275 : char conf_filename_disabled[MAXPGPATH];
276 :
277 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
278 : INCLUDED_CONF_FILE);
279 1 : snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
280 : INCLUDED_CONF_FILE_DISABLED);
281 :
282 1 : if (durable_rename(conf_filename, conf_filename_disabled) != 0)
283 : {
284 : /* durable_rename() has already logged something. */
285 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
286 : "A manual removal of the recovery parameters may be required.");
287 : }
288 : }
289 :
290 10 : if (success)
291 4 : return;
292 :
293 : /*
294 : * If the server is promoted, there is no way to use the current setup
295 : * again. Warn the user that a new replication setup should be done before
296 : * trying again.
297 : */
298 6 : if (recovery_ended)
299 : {
300 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
301 : "failed after the end of recovery");
302 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
303 : "The target server cannot be used as a physical replica anymore. "
304 : "You must recreate the physical replica before continuing.");
305 : }
306 :
307 18 : for (int i = 0; i < num_dbs; i++)
308 : {
309 12 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
310 :
311 12 : if (dbinfo->made_publication || dbinfo->made_replslot)
312 : {
313 : PGconn *conn;
314 :
315 0 : conn = connect_database(dbinfo->pubconninfo, false);
316 0 : if (conn != NULL)
317 : {
318 0 : if (dbinfo->made_publication)
319 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
320 : &dbinfo->made_publication);
321 0 : if (dbinfo->made_replslot)
322 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
323 0 : disconnect_database(conn, false);
324 : }
325 : else
326 : {
327 : /*
328 : * If a connection could not be established, inform the user
329 : * that some objects were left on primary and should be
330 : * removed before trying again.
331 : */
332 0 : if (dbinfo->made_publication)
333 : {
334 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
335 : "publication \"%s\" created in database \"%s\" on primary was left behind",
336 : dbinfo->pubname,
337 : dbinfo->dbname);
338 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
339 : "Drop this publication before trying again.");
340 : }
341 0 : if (dbinfo->made_replslot)
342 : {
343 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
344 : "replication slot \"%s\" created in database \"%s\" on primary was left behind",
345 : dbinfo->replslotname,
346 : dbinfo->dbname);
347 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
348 : "Drop this replication slot soon to avoid retention of WAL files.");
349 : }
350 : }
351 : }
352 : }
353 :
354 6 : if (standby_running)
355 4 : stop_standby_server(subscriber_dir);
356 : }
357 :
358 : static void
359 1 : usage(void)
360 : {
361 1 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
362 : progname);
363 1 : printf(_("Usage:\n"));
364 1 : printf(_(" %s [OPTION]...\n"), progname);
365 1 : printf(_("\nOptions:\n"));
366 1 : printf(_(" -a, --all create subscriptions for all databases except template\n"
367 : " databases and databases that don't allow connections\n"));
368 1 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
369 1 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
370 1 : printf(_(" -l, --logdir=LOGDIR location for the log directory\n"));
371 1 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
372 1 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
373 1 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
374 1 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
375 1 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
376 1 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
377 1 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
378 1 : printf(_(" -v, --verbose output verbose messages\n"));
379 1 : printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
380 : " databases on the subscriber; accepts: \"%s\"\n"), "publications");
381 1 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
382 : " file when running target cluster\n"));
383 1 : printf(_(" --publication=NAME publication name\n"));
384 1 : printf(_(" --replication-slot=NAME replication slot name\n"));
385 1 : printf(_(" --subscription=NAME subscription name\n"));
386 1 : printf(_(" -V, --version output version information, then exit\n"));
387 1 : printf(_(" -?, --help show this help, then exit\n"));
388 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
389 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
390 1 : }
391 :
392 : /*
393 : * Subroutine to append "keyword=value" to a connection string,
394 : * with proper quoting of the value. (We assume keywords don't need that.)
395 : */
396 : static void
397 107 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
398 : {
399 107 : if (buf->len > 0)
400 79 : appendPQExpBufferChar(buf, ' ');
401 107 : appendPQExpBufferStr(buf, keyword);
402 107 : appendPQExpBufferChar(buf, '=');
403 107 : appendConnStrVal(buf, val);
404 107 : }
405 :
406 : /*
407 : * Validate a connection string. Returns a base connection string that is a
408 : * connection string without a database name.
409 : *
410 : * Since we might process multiple databases, each database name will be
411 : * appended to this base connection string to provide a final connection
412 : * string. If the second argument (dbname) is not null, returns dbname if the
413 : * provided connection string contains it.
414 : *
415 : * It is the caller's responsibility to free the returned connection string and
416 : * dbname.
417 : */
418 : static char *
419 14 : get_base_conninfo(const char *conninfo, char **dbname)
420 : {
421 : PQExpBuffer buf;
422 : PQconninfoOption *conn_opts;
423 : PQconninfoOption *conn_opt;
424 14 : char *errmsg = NULL;
425 : char *ret;
426 :
427 14 : conn_opts = PQconninfoParse(conninfo, &errmsg);
428 14 : if (conn_opts == NULL)
429 : {
430 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
431 : "could not parse connection string: %s", errmsg);
432 0 : PQfreemem(errmsg);
433 0 : return NULL;
434 : }
435 :
436 14 : buf = createPQExpBuffer();
437 742 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
438 : {
439 728 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
440 : {
441 33 : if (strcmp(conn_opt->keyword, "dbname") == 0)
442 : {
443 9 : if (dbname)
444 9 : *dbname = pg_strdup(conn_opt->val);
445 9 : continue;
446 : }
447 24 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
448 : }
449 : }
450 :
451 14 : ret = pg_strdup(buf->data);
452 :
453 14 : destroyPQExpBuffer(buf);
454 14 : PQconninfoFree(conn_opts);
455 :
456 14 : return ret;
457 : }
458 :
459 : /*
460 : * Build a subscriber connection string. Only a few parameters are supported
461 : * since it starts a server with restricted access.
462 : */
463 : static char *
464 14 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
465 : {
466 14 : PQExpBuffer buf = createPQExpBuffer();
467 : char *ret;
468 :
469 14 : appendConnStrItem(buf, "port", opt->sub_port);
470 : #if !defined(WIN32)
471 14 : appendConnStrItem(buf, "host", opt->socket_dir);
472 : #endif
473 14 : if (opt->sub_username != NULL)
474 0 : appendConnStrItem(buf, "user", opt->sub_username);
475 14 : appendConnStrItem(buf, "fallback_application_name", progname);
476 :
477 14 : ret = pg_strdup(buf->data);
478 :
479 14 : destroyPQExpBuffer(buf);
480 :
481 14 : return ret;
482 : }
483 :
484 : /*
485 : * Verify if a PostgreSQL binary (progname) is available in the same directory as
486 : * pg_createsubscriber and it has the same version. It returns the absolute
487 : * path of the progname.
488 : */
489 : static char *
490 20 : get_exec_path(const char *argv0, const char *progname)
491 : {
492 : char *versionstr;
493 : char *exec_path;
494 : int ret;
495 :
496 20 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
497 20 : exec_path = pg_malloc(MAXPGPATH);
498 20 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
499 :
500 20 : if (ret < 0)
501 : {
502 : char full_path[MAXPGPATH];
503 :
504 0 : if (find_my_exec(argv0, full_path) < 0)
505 0 : strlcpy(full_path, progname, sizeof(full_path));
506 :
507 0 : if (ret == -1)
508 0 : report_createsub_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
509 : progname, "pg_createsubscriber", full_path);
510 : else
511 0 : report_createsub_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
512 : progname, full_path, "pg_createsubscriber");
513 : }
514 :
515 20 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
516 : "%s path is: %s", progname, exec_path);
517 :
518 20 : return exec_path;
519 : }
520 :
521 : /*
522 : * Is it a cluster directory? These are preliminary checks. It is far from
523 : * making an accurate check. If it is not a clone from the publisher, it will
524 : * eventually fail in a future step.
525 : */
526 : static void
527 10 : check_data_directory(const char *datadir)
528 : {
529 : struct stat statbuf;
530 : uint32 major_version;
531 : char *version_str;
532 :
533 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
534 : "checking if directory \"%s\" is a cluster data directory",
535 : datadir);
536 :
537 10 : if (stat(datadir, &statbuf) != 0)
538 : {
539 0 : if (errno == ENOENT)
540 0 : report_createsub_fatal("data directory \"%s\" does not exist", datadir);
541 : else
542 0 : report_createsub_fatal("could not access directory \"%s\": %m", datadir);
543 : }
544 :
545 : /*
546 : * Retrieve the contents of this cluster's PG_VERSION. We require
547 : * compatibility with the same major version as the one this tool is
548 : * compiled with.
549 : */
550 10 : major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
551 10 : if (major_version != PG_MAJORVERSION_NUM)
552 : {
553 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
554 : "data directory is of wrong version");
555 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
556 : "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
557 : "PG_VERSION", version_str, PG_MAJORVERSION);
558 0 : exit(1);
559 : }
560 10 : }
561 :
562 : /*
563 : * Append database name into a base connection string.
564 : *
565 : * dbname is the only parameter that changes so it is not included in the base
566 : * connection string. This function concatenates dbname to build a "real"
567 : * connection string.
568 : */
569 : static char *
570 41 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
571 : {
572 41 : PQExpBuffer buf = createPQExpBuffer();
573 : char *ret;
574 :
575 : Assert(conninfo != NULL);
576 :
577 41 : appendPQExpBufferStr(buf, conninfo);
578 41 : appendConnStrItem(buf, "dbname", dbname);
579 :
580 41 : ret = pg_strdup(buf->data);
581 41 : destroyPQExpBuffer(buf);
582 :
583 41 : return ret;
584 : }
585 :
586 : /*
587 : * Store publication and subscription information.
588 : *
589 : * If publication, replication slot and subscription names were specified,
590 : * store it here. Otherwise, a generated name will be assigned to the object in
591 : * setup_publisher().
592 : */
593 : static struct LogicalRepInfo *
594 10 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
595 : const char *pub_base_conninfo,
596 : const char *sub_base_conninfo)
597 : {
598 : struct LogicalRepInfo *dbinfo;
599 10 : SimpleStringListCell *pubcell = NULL;
600 10 : SimpleStringListCell *subcell = NULL;
601 10 : SimpleStringListCell *replslotcell = NULL;
602 10 : int i = 0;
603 :
604 10 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
605 :
606 10 : if (num_pubs > 0)
607 2 : pubcell = opt->pub_names.head;
608 10 : if (num_subs > 0)
609 1 : subcell = opt->sub_names.head;
610 10 : if (num_replslots > 0)
611 2 : replslotcell = opt->replslot_names.head;
612 :
613 30 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
614 : {
615 : char *conninfo;
616 :
617 : /* Fill publisher attributes */
618 20 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
619 20 : dbinfo[i].pubconninfo = conninfo;
620 20 : dbinfo[i].dbname = cell->val;
621 20 : if (num_pubs > 0)
622 4 : dbinfo[i].pubname = pubcell->val;
623 : else
624 16 : dbinfo[i].pubname = NULL;
625 20 : if (num_replslots > 0)
626 3 : dbinfo[i].replslotname = replslotcell->val;
627 : else
628 17 : dbinfo[i].replslotname = NULL;
629 20 : dbinfo[i].made_replslot = false;
630 20 : dbinfo[i].made_publication = false;
631 : /* Fill subscriber attributes */
632 20 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
633 20 : dbinfo[i].subconninfo = conninfo;
634 20 : if (num_subs > 0)
635 2 : dbinfo[i].subname = subcell->val;
636 : else
637 18 : dbinfo[i].subname = NULL;
638 : /* Other fields will be filled later */
639 :
640 37 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
641 : "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
642 20 : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
643 3 : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
644 20 : dbinfo[i].pubconninfo);
645 40 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
646 : "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
647 2 : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
648 20 : dbinfo[i].subconninfo,
649 20 : dbinfos.two_phase ? "true" : "false");
650 :
651 20 : if (num_pubs > 0)
652 4 : pubcell = pubcell->next;
653 20 : if (num_subs > 0)
654 2 : subcell = subcell->next;
655 20 : if (num_replslots > 0)
656 3 : replslotcell = replslotcell->next;
657 :
658 20 : i++;
659 : }
660 :
661 10 : return dbinfo;
662 : }
663 :
664 : /*
665 : * Open a new connection. If exit_on_error is true, it has an undesired
666 : * condition and it should exit immediately.
667 : */
668 : static PGconn *
669 57 : connect_database(const char *conninfo, bool exit_on_error)
670 : {
671 : PGconn *conn;
672 : PGresult *res;
673 :
674 57 : conn = PQconnectdb(conninfo);
675 57 : if (PQstatus(conn) != CONNECTION_OK)
676 : {
677 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
678 : "connection to database failed: %s",
679 : PQerrorMessage(conn));
680 0 : PQfinish(conn);
681 :
682 0 : if (exit_on_error)
683 0 : exit(1);
684 0 : return NULL;
685 : }
686 :
687 : /* Secure search_path */
688 57 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
689 57 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
690 : {
691 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
692 : "could not clear \"search_path\": %s",
693 : PQresultErrorMessage(res));
694 0 : PQclear(res);
695 0 : PQfinish(conn);
696 :
697 0 : if (exit_on_error)
698 0 : exit(1);
699 0 : return NULL;
700 : }
701 57 : PQclear(res);
702 :
703 57 : return conn;
704 : }
705 :
706 : /*
707 : * Close the connection. If exit_on_error is true, it has an undesired
708 : * condition and it should exit immediately.
709 : */
710 : static void
711 57 : disconnect_database(PGconn *conn, bool exit_on_error)
712 : {
713 : Assert(conn != NULL);
714 :
715 57 : PQfinish(conn);
716 :
717 57 : if (exit_on_error)
718 2 : exit(1);
719 55 : }
720 :
721 : /*
722 : * Obtain the system identifier using the provided connection. It will be used
723 : * to compare if a data directory is a clone of another one.
724 : */
725 : static uint64
726 10 : get_primary_sysid(const char *conninfo)
727 : {
728 : PGconn *conn;
729 : PGresult *res;
730 : uint64 sysid;
731 :
732 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
733 : "getting system identifier from publisher");
734 :
735 10 : conn = connect_database(conninfo, true);
736 :
737 10 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
738 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
739 : {
740 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
741 : "could not get system identifier: %s",
742 : PQresultErrorMessage(res));
743 0 : disconnect_database(conn, true);
744 : }
745 10 : if (PQntuples(res) != 1)
746 : {
747 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
748 : "could not get system identifier: got %d rows, expected %d row",
749 : PQntuples(res), 1);
750 0 : disconnect_database(conn, true);
751 : }
752 :
753 10 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
754 :
755 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
756 : "system identifier is %" PRIu64 " on publisher", sysid);
757 :
758 10 : PQclear(res);
759 10 : disconnect_database(conn, false);
760 :
761 10 : return sysid;
762 : }
763 :
764 : /*
765 : * Obtain the system identifier from control file. It will be used to compare
766 : * if a data directory is a clone of another one. This routine is used locally
767 : * and avoids a connection.
768 : */
769 : static uint64
770 10 : get_standby_sysid(const char *datadir)
771 : {
772 : ControlFileData *cf;
773 : bool crc_ok;
774 : uint64 sysid;
775 :
776 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
777 : "getting system identifier from subscriber");
778 :
779 10 : cf = get_controlfile(datadir, &crc_ok);
780 10 : if (!crc_ok)
781 0 : report_createsub_fatal("control file appears to be corrupt");
782 :
783 10 : sysid = cf->system_identifier;
784 :
785 10 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
786 : "system identifier is %" PRIu64 " on subscriber", sysid);
787 :
788 10 : pg_free(cf);
789 :
790 10 : return sysid;
791 : }
792 :
793 : /*
794 : * Modify the system identifier. Since a standby server preserves the system
795 : * identifier, it makes sense to change it to avoid situations in which WAL
796 : * files from one of the systems might be used in the other one.
797 : */
798 : static void
799 4 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
800 : {
801 : ControlFileData *cf;
802 : bool crc_ok;
803 : struct timeval tv;
804 :
805 : char *out_file;
806 : char *cmd_str;
807 :
808 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
809 : "modifying system identifier of subscriber");
810 :
811 4 : cf = get_controlfile(subscriber_dir, &crc_ok);
812 4 : if (!crc_ok)
813 0 : report_createsub_fatal("control file appears to be corrupt");
814 :
815 : /*
816 : * Select a new system identifier.
817 : *
818 : * XXX this code was extracted from BootStrapXLOG().
819 : */
820 4 : gettimeofday(&tv, NULL);
821 4 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
822 4 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
823 4 : cf->system_identifier |= getpid() & 0xFFF;
824 :
825 4 : if (dry_run)
826 3 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
827 : "dry-run: would set system identifier to %" PRIu64 " on subscriber",
828 : cf->system_identifier);
829 : else
830 : {
831 1 : update_controlfile(subscriber_dir, cf, true);
832 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
833 : "system identifier is %" PRIu64 " on subscriber",
834 : cf->system_identifier);
835 : }
836 :
837 4 : if (dry_run)
838 3 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
839 : "dry-run: would run pg_resetwal on the subscriber");
840 : else
841 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
842 : "running pg_resetwal on the subscriber");
843 :
844 : /*
845 : * Redirecting the output to the logfile if specified. Since the output
846 : * would be very short, around one line, we do not provide a separate file
847 : * for it; it's done as a part of the server log.
848 : */
849 4 : if (opt->log_dir)
850 1 : out_file = psprintf("%s/%s", logdir, SERVER_LOG_FILE_NAME);
851 : else
852 3 : out_file = DEVNULL;
853 :
854 4 : cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
855 : subscriber_dir, out_file);
856 4 : if (opt->log_dir)
857 1 : pg_free(out_file);
858 :
859 4 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
860 : "pg_resetwal command is: %s", cmd_str);
861 :
862 4 : if (!dry_run)
863 : {
864 1 : int rc = system(cmd_str);
865 :
866 1 : if (rc == 0)
867 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
868 : "successfully reset WAL on the subscriber");
869 : else
870 0 : report_createsub_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
871 : }
872 :
873 4 : pg_free(cf);
874 4 : pg_free(cmd_str);
875 4 : }
876 :
877 : /*
878 : * Generate an object name using a prefix, database oid and a random integer.
879 : * It is used in case the user does not specify an object name (publication,
880 : * subscription, replication slot).
881 : */
882 : static char *
883 8 : generate_object_name(PGconn *conn)
884 : {
885 : PGresult *res;
886 : Oid oid;
887 : uint32 rand;
888 : char *objname;
889 :
890 8 : res = PQexec(conn,
891 : "SELECT oid FROM pg_catalog.pg_database "
892 : "WHERE datname = pg_catalog.current_database()");
893 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
894 : {
895 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
896 : "could not obtain database OID: %s",
897 : PQresultErrorMessage(res));
898 0 : disconnect_database(conn, true);
899 : }
900 :
901 8 : if (PQntuples(res) != 1)
902 : {
903 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
904 : "could not obtain database OID: got %d rows, expected %d row",
905 : PQntuples(res), 1);
906 0 : disconnect_database(conn, true);
907 : }
908 :
909 : /* Database OID */
910 8 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
911 :
912 8 : PQclear(res);
913 :
914 : /* Random unsigned integer */
915 8 : rand = pg_prng_uint32(&prng_state);
916 :
917 : /*
918 : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
919 : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
920 : * '\0').
921 : */
922 8 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
923 :
924 8 : return objname;
925 : }
926 :
927 : /*
928 : * Does the publication exist in the specified database?
929 : */
930 : static bool
931 8 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
932 : {
933 8 : PQExpBuffer str = createPQExpBuffer();
934 : PGresult *res;
935 8 : bool found = false;
936 8 : char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
937 :
938 8 : appendPQExpBuffer(str,
939 : "SELECT 1 FROM pg_catalog.pg_publication "
940 : "WHERE pubname = %s",
941 : pubname_esc);
942 8 : res = PQexec(conn, str->data);
943 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
944 : {
945 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
946 : "could not find publication \"%s\" in database \"%s\": %s",
947 : pubname, dbname, PQerrorMessage(conn));
948 0 : disconnect_database(conn, true);
949 : }
950 :
951 8 : if (PQntuples(res) == 1)
952 1 : found = true;
953 :
954 8 : PQclear(res);
955 8 : PQfreemem(pubname_esc);
956 8 : destroyPQExpBuffer(str);
957 :
958 8 : return found;
959 : }
960 :
961 : /*
962 : * Create the publications and replication slots in preparation for logical
963 : * replication. Returns the LSN from latest replication slot. It will be the
964 : * replication start point that is used to adjust the subscriptions (see
965 : * set_replication_progress).
966 : */
967 : static char *
968 4 : setup_publisher(struct LogicalRepInfo *dbinfo)
969 : {
970 4 : char *lsn = NULL;
971 :
972 4 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
973 :
974 12 : for (int i = 0; i < num_dbs; i++)
975 : {
976 : PGconn *conn;
977 8 : char *genname = NULL;
978 :
979 8 : conn = connect_database(dbinfo[i].pubconninfo, true);
980 :
981 : /*
982 : * If an object name was not specified as command-line options, assign
983 : * a generated object name. The replication slot has a different rule.
984 : * The subscription name is assigned to the replication slot name if
985 : * no replication slot is specified. It follows the same rule as
986 : * CREATE SUBSCRIPTION.
987 : */
988 8 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
989 8 : genname = generate_object_name(conn);
990 8 : if (num_pubs == 0)
991 4 : dbinfo[i].pubname = pg_strdup(genname);
992 8 : if (num_subs == 0)
993 6 : dbinfo[i].subname = pg_strdup(genname);
994 8 : if (num_replslots == 0)
995 5 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
996 :
997 8 : if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
998 : {
999 : /* Reuse existing publication on publisher. */
1000 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1001 : "use existing publication \"%s\" in database \"%s\"",
1002 1 : dbinfo[i].pubname, dbinfo[i].dbname);
1003 : /* Don't remove pre-existing publication if an error occurs. */
1004 1 : dbinfo[i].made_publication = false;
1005 : }
1006 : else
1007 : {
1008 : /*
1009 : * Create publication on publisher. This step should be executed
1010 : * *before* promoting the subscriber to avoid any transactions
1011 : * between consistent LSN and the new publication rows (such
1012 : * transactions wouldn't see the new publication rows resulting in
1013 : * an error).
1014 : */
1015 7 : create_publication(conn, &dbinfo[i]);
1016 : }
1017 :
1018 : /* Create replication slot on publisher */
1019 8 : if (lsn)
1020 1 : pg_free(lsn);
1021 8 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
1022 8 : if (lsn == NULL && !dry_run)
1023 0 : exit(1);
1024 :
1025 : /*
1026 : * Since we are using the LSN returned by the last replication slot as
1027 : * recovery_target_lsn, this LSN is ahead of the current WAL position
1028 : * and the recovery waits until the publisher writes a WAL record to
1029 : * reach the target and ends the recovery. On idle systems, this wait
1030 : * time is unpredictable and could lead to failure in promoting the
1031 : * subscriber. To avoid that, insert a harmless WAL record.
1032 : */
1033 8 : if (i == num_dbs - 1 && !dry_run)
1034 : {
1035 : PGresult *res;
1036 :
1037 1 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
1038 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1039 : {
1040 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1041 : "could not write an additional WAL record: %s",
1042 : PQresultErrorMessage(res));
1043 0 : disconnect_database(conn, true);
1044 : }
1045 1 : PQclear(res);
1046 : }
1047 :
1048 8 : disconnect_database(conn, false);
1049 : }
1050 :
1051 4 : return lsn;
1052 : }
1053 :
1054 : /*
1055 : * Is recovery still in progress?
1056 : */
1057 : static bool
1058 15 : server_is_in_recovery(PGconn *conn)
1059 : {
1060 : PGresult *res;
1061 : int ret;
1062 :
1063 15 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
1064 :
1065 15 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1066 : {
1067 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1068 : "could not obtain recovery progress: %s",
1069 : PQresultErrorMessage(res));
1070 0 : disconnect_database(conn, true);
1071 : }
1072 :
1073 :
1074 15 : ret = strcmp("t", PQgetvalue(res, 0, 0));
1075 :
1076 15 : PQclear(res);
1077 :
1078 15 : return ret == 0;
1079 : }
1080 :
1081 : static void
1082 77 : internal_log_file_write(enum pg_log_level level, enum pg_log_part part,
1083 : const char *pg_restrict fmt, va_list args)
1084 : {
1085 : Assert(internal_log_file_fp);
1086 :
1087 : /* Do nothing if log level is too low. */
1088 77 : if (level < __pg_log_level)
1089 37 : return;
1090 :
1091 : /* Add prefix based on the log part and log level */
1092 40 : switch (part)
1093 : {
1094 39 : case PG_LOG_PRIMARY:
1095 39 : switch (level)
1096 : {
1097 0 : case PG_LOG_ERROR:
1098 0 : fprintf(internal_log_file_fp, _("error: "));
1099 0 : break;
1100 0 : case PG_LOG_WARNING:
1101 0 : fprintf(internal_log_file_fp, _("warning: "));
1102 0 : break;
1103 39 : default:
1104 39 : break;
1105 : }
1106 39 : break;
1107 0 : case PG_LOG_DETAIL:
1108 0 : fprintf(internal_log_file_fp, _("detail: "));
1109 0 : break;
1110 1 : case PG_LOG_HINT:
1111 1 : fprintf(internal_log_file_fp, _("hint: "));
1112 1 : break;
1113 : }
1114 :
1115 40 : vfprintf(internal_log_file_fp, _(fmt), args);
1116 :
1117 40 : fprintf(internal_log_file_fp, "\n");
1118 40 : fflush(internal_log_file_fp);
1119 : }
1120 :
1121 : /*
1122 : * Open a new logfile with proper permissions.
1123 : */
1124 : static FILE *
1125 1 : logfile_open(const char *filename, const char *mode)
1126 : {
1127 : FILE *fh;
1128 :
1129 1 : fh = fopen(filename, mode);
1130 :
1131 1 : if (!fh)
1132 0 : report_createsub_fatal("could not open log file \"%s\": %m",
1133 : filename);
1134 :
1135 1 : return fh;
1136 : }
1137 :
1138 : static void
1139 1 : make_output_dirs(const char *log_basedir)
1140 : {
1141 : char timestamp[128];
1142 : struct timeval tval;
1143 : time_t now;
1144 : struct tm tmbuf;
1145 : int len;
1146 :
1147 : /* Generate timestamp */
1148 1 : gettimeofday(&tval, NULL);
1149 1 : now = tval.tv_sec;
1150 :
1151 1 : strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
1152 1 : localtime_r(&now, &tmbuf));
1153 :
1154 : /* Append milliseconds */
1155 1 : snprintf(timestamp + strlen(timestamp),
1156 1 : sizeof(timestamp) - strlen(timestamp), ".%03u",
1157 1 : (unsigned int) (tval.tv_usec / 1000));
1158 :
1159 : /* Build timestamp directory path */
1160 1 : len = snprintf(logdir, MAXPGPATH, "%s/%s", log_basedir, timestamp);
1161 :
1162 1 : if (len >= MAXPGPATH)
1163 0 : report_createsub_fatal("directory path for log files is too long");
1164 :
1165 : /* Create base directory (ignore if exists) */
1166 1 : if (mkdir(log_basedir, pg_dir_create_mode) < 0 && errno != EEXIST)
1167 0 : report_createsub_fatal("could not create directory \"%s\": %m", log_basedir);
1168 :
1169 : /* Create a timestamp-named subdirectory under the base directory */
1170 1 : if (mkdir(logdir, pg_dir_create_mode) < 0)
1171 0 : report_createsub_fatal("could not create directory \"%s\": %m", logdir);
1172 1 : }
1173 :
1174 : /*
1175 : * Is the primary server ready for logical replication?
1176 : *
1177 : * XXX Does it not allow a synchronous replica?
1178 : */
1179 : static void
1180 6 : check_publisher(const struct LogicalRepInfo *dbinfo)
1181 : {
1182 : PGconn *conn;
1183 : PGresult *res;
1184 6 : bool failed = false;
1185 :
1186 : char *wal_level;
1187 : int max_repslots;
1188 : int cur_repslots;
1189 : int max_walsenders;
1190 : int cur_walsenders;
1191 : int max_prepared_transactions;
1192 : char *max_slot_wal_keep_size;
1193 :
1194 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1195 : "checking settings on publisher");
1196 :
1197 6 : conn = connect_database(dbinfo[0].pubconninfo, true);
1198 :
1199 : /*
1200 : * If the primary server is in recovery (i.e. cascading replication),
1201 : * objects (publication) cannot be created because it is read only.
1202 : */
1203 6 : if (server_is_in_recovery(conn))
1204 : {
1205 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1206 : "primary server cannot be in recovery");
1207 1 : disconnect_database(conn, true);
1208 : }
1209 :
1210 : /*------------------------------------------------------------------------
1211 : * Logical replication requires a few parameters to be set on publisher.
1212 : * Since these parameters are not a requirement for physical replication,
1213 : * we should check it to make sure it won't fail.
1214 : *
1215 : * - wal_level >= replica
1216 : * - max_replication_slots >= current + number of dbs to be converted
1217 : * - max_wal_senders >= current + number of dbs to be converted
1218 : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
1219 : * -----------------------------------------------------------------------
1220 : */
1221 5 : res = PQexec(conn,
1222 : "SELECT pg_catalog.current_setting('wal_level'),"
1223 : " pg_catalog.current_setting('max_replication_slots'),"
1224 : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1225 : " pg_catalog.current_setting('max_wal_senders'),"
1226 : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1227 : " pg_catalog.current_setting('max_prepared_transactions'),"
1228 : " pg_catalog.current_setting('max_slot_wal_keep_size')");
1229 :
1230 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1231 : {
1232 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1233 : "could not obtain publisher settings: %s",
1234 : PQresultErrorMessage(res));
1235 0 : disconnect_database(conn, true);
1236 : }
1237 :
1238 5 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1239 5 : max_repslots = atoi(PQgetvalue(res, 0, 1));
1240 5 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
1241 5 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
1242 5 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
1243 5 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
1244 5 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
1245 :
1246 5 : PQclear(res);
1247 :
1248 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1249 : "publisher: wal_level: %s", wal_level);
1250 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1251 : "publisher: max_replication_slots: %d", max_repslots);
1252 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1253 : "publisher: current replication slots: %d", cur_repslots);
1254 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1255 : "publisher: max_wal_senders: %d", max_walsenders);
1256 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1257 : "publisher: current wal senders: %d", cur_walsenders);
1258 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1259 : "publisher: max_prepared_transactions: %d",
1260 : max_prepared_transactions);
1261 5 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1262 : "publisher: max_slot_wal_keep_size: %s",
1263 : max_slot_wal_keep_size);
1264 :
1265 5 : disconnect_database(conn, false);
1266 :
1267 5 : if (strcmp(wal_level, "minimal") == 0)
1268 : {
1269 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1270 : "publisher requires \"wal_level\" >= \"replica\"");
1271 0 : failed = true;
1272 : }
1273 :
1274 5 : if (max_repslots - cur_repslots < num_dbs)
1275 : {
1276 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1277 : "publisher requires %d replication slots, but only %d remain",
1278 : num_dbs, max_repslots - cur_repslots);
1279 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1280 : "Increase the configuration parameter \"%s\" to at least %d.",
1281 : "max_replication_slots", cur_repslots + num_dbs);
1282 1 : failed = true;
1283 : }
1284 :
1285 5 : if (max_walsenders - cur_walsenders < num_dbs)
1286 : {
1287 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1288 : "publisher requires %d WAL sender processes, but only %d remain",
1289 : num_dbs, max_walsenders - cur_walsenders);
1290 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1291 : "Increase the configuration parameter \"%s\" to at least %d.",
1292 : "max_wal_senders", cur_walsenders + num_dbs);
1293 1 : failed = true;
1294 : }
1295 :
1296 5 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
1297 : {
1298 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1299 : "two_phase option will not be enabled for replication slots");
1300 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_DETAIL,
1301 : "Subscriptions will be created with the two_phase option disabled. "
1302 : "Prepared transactions will be replicated at COMMIT PREPARED.");
1303 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1304 : "You can use the command-line option --enable-two-phase to enable two_phase.");
1305 : }
1306 :
1307 : /*
1308 : * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1309 : * is set to a non-default value, it may cause replication failures due to
1310 : * required WAL files being prematurely removed.
1311 : */
1312 5 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1313 : {
1314 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1315 : "required WAL could be removed from the publisher");
1316 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1317 : "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1318 : "max_slot_wal_keep_size");
1319 : }
1320 :
1321 5 : pg_free(wal_level);
1322 :
1323 5 : if (failed)
1324 1 : exit(1);
1325 4 : }
1326 :
1327 : /*
1328 : * Is the standby server ready for logical replication?
1329 : *
1330 : * XXX Does it not allow a time-delayed replica?
1331 : *
1332 : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1333 : * is S, it cannot detect there is a replica (server C) because server S starts
1334 : * accepting only local connections and server C cannot connect to it. Hence,
1335 : * there is not a reliable way to provide a suitable error saying the server C
1336 : * will be broken at the end of this process (due to pg_resetwal).
1337 : */
1338 : static void
1339 8 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1340 : {
1341 : PGconn *conn;
1342 : PGresult *res;
1343 8 : bool failed = false;
1344 :
1345 : int max_lrworkers;
1346 : int max_replorigins;
1347 : int max_wprocs;
1348 :
1349 8 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1350 : "checking settings on subscriber");
1351 :
1352 8 : conn = connect_database(dbinfo[0].subconninfo, true);
1353 :
1354 : /* The target server must be a standby */
1355 8 : if (!server_is_in_recovery(conn))
1356 : {
1357 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1358 : "target server must be a standby");
1359 1 : disconnect_database(conn, true);
1360 : }
1361 :
1362 : /*------------------------------------------------------------------------
1363 : * Logical replication requires a few parameters to be set on subscriber.
1364 : * Since these parameters are not a requirement for physical replication,
1365 : * we should check it to make sure it won't fail.
1366 : *
1367 : * - max_active_replication_origins >= number of dbs to be converted
1368 : * - max_logical_replication_workers >= number of dbs to be converted
1369 : * - max_worker_processes >= 1 + number of dbs to be converted
1370 : *------------------------------------------------------------------------
1371 : */
1372 7 : res = PQexec(conn,
1373 : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1374 : "'max_logical_replication_workers', "
1375 : "'max_active_replication_origins', "
1376 : "'max_worker_processes', "
1377 : "'primary_slot_name') "
1378 : "ORDER BY name");
1379 :
1380 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1381 : {
1382 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1383 : "could not obtain subscriber settings: %s",
1384 : PQresultErrorMessage(res));
1385 0 : disconnect_database(conn, true);
1386 : }
1387 :
1388 7 : max_replorigins = atoi(PQgetvalue(res, 0, 0));
1389 7 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1390 7 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1391 7 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1392 6 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1393 :
1394 7 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1395 : "subscriber: max_logical_replication_workers: %d",
1396 : max_lrworkers);
1397 7 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1398 : "subscriber: max_active_replication_origins: %d", max_replorigins);
1399 7 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1400 : "subscriber: max_worker_processes: %d", max_wprocs);
1401 7 : if (primary_slot_name)
1402 6 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1403 : "subscriber: primary_slot_name: %s", primary_slot_name);
1404 :
1405 7 : PQclear(res);
1406 :
1407 7 : disconnect_database(conn, false);
1408 :
1409 7 : if (max_replorigins < num_dbs)
1410 : {
1411 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1412 : "subscriber requires %d active replication origins, but only %d remain",
1413 : num_dbs, max_replorigins);
1414 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1415 : "Increase the configuration parameter \"%s\" to at least %d.",
1416 : "max_active_replication_origins", num_dbs);
1417 1 : failed = true;
1418 : }
1419 :
1420 7 : if (max_lrworkers < num_dbs)
1421 : {
1422 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1423 : "subscriber requires %d logical replication workers, but only %d remain",
1424 : num_dbs, max_lrworkers);
1425 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1426 : "Increase the configuration parameter \"%s\" to at least %d.",
1427 : "max_logical_replication_workers", num_dbs);
1428 1 : failed = true;
1429 : }
1430 :
1431 7 : if (max_wprocs < num_dbs + 1)
1432 : {
1433 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1434 : "subscriber requires %d worker processes, but only %d remain",
1435 : num_dbs + 1, max_wprocs);
1436 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
1437 : "Increase the configuration parameter \"%s\" to at least %d.",
1438 : "max_worker_processes", num_dbs + 1);
1439 1 : failed = true;
1440 : }
1441 :
1442 7 : if (failed)
1443 1 : exit(1);
1444 6 : }
1445 :
1446 : /*
1447 : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1448 : * the primary (publisher node) and the newly created subscriber. We
1449 : * shouldn't drop the associated slot as that would be used by the publisher
1450 : * node.
1451 : */
1452 : static void
1453 4 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
1454 : {
1455 4 : PQExpBuffer query = createPQExpBuffer();
1456 : PGresult *res;
1457 :
1458 : Assert(conn != NULL);
1459 :
1460 : /*
1461 : * Construct a query string. These commands are allowed to be executed
1462 : * within a transaction.
1463 : */
1464 4 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1465 : subname);
1466 4 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1467 : subname);
1468 4 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1469 :
1470 4 : if (dry_run)
1471 3 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1472 : "dry-run: would drop subscription \"%s\" in database \"%s\"",
1473 : subname, dbname);
1474 : else
1475 : {
1476 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1477 : "dropping subscription \"%s\" in database \"%s\"",
1478 : subname, dbname);
1479 :
1480 1 : res = PQexec(conn, query->data);
1481 :
1482 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1483 : {
1484 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1485 : "could not drop subscription \"%s\": %s",
1486 : subname, PQresultErrorMessage(res));
1487 0 : disconnect_database(conn, true);
1488 : }
1489 :
1490 1 : PQclear(res);
1491 : }
1492 :
1493 4 : destroyPQExpBuffer(query);
1494 4 : }
1495 :
1496 : /*
1497 : * Retrieve and drop the pre-existing subscriptions.
1498 : */
1499 : static void
1500 8 : check_and_drop_existing_subscriptions(PGconn *conn,
1501 : const struct LogicalRepInfo *dbinfo)
1502 : {
1503 8 : PQExpBuffer query = createPQExpBuffer();
1504 : char *dbname;
1505 : PGresult *res;
1506 :
1507 : Assert(conn != NULL);
1508 :
1509 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1510 :
1511 8 : appendPQExpBuffer(query,
1512 : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1513 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1514 : "WHERE d.datname = %s",
1515 : dbname);
1516 8 : res = PQexec(conn, query->data);
1517 :
1518 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1519 : {
1520 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1521 : "could not obtain pre-existing subscriptions: %s",
1522 : PQresultErrorMessage(res));
1523 0 : disconnect_database(conn, true);
1524 : }
1525 :
1526 12 : for (int i = 0; i < PQntuples(res); i++)
1527 4 : drop_existing_subscription(conn, PQgetvalue(res, i, 0),
1528 4 : dbinfo->dbname);
1529 :
1530 8 : PQclear(res);
1531 8 : destroyPQExpBuffer(query);
1532 8 : PQfreemem(dbname);
1533 8 : }
1534 :
1535 : /*
1536 : * Create the subscriptions, adjust the initial location for logical
1537 : * replication and enable the subscriptions. That's the last step for logical
1538 : * replication setup.
1539 : */
1540 : static void
1541 4 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1542 : {
1543 12 : for (int i = 0; i < num_dbs; i++)
1544 : {
1545 : PGconn *conn;
1546 :
1547 : /* Connect to subscriber. */
1548 8 : conn = connect_database(dbinfo[i].subconninfo, true);
1549 :
1550 : /*
1551 : * We don't need the pre-existing subscriptions on the newly formed
1552 : * subscriber. They can connect to other publisher nodes and either
1553 : * get some unwarranted data or can lead to ERRORs in connecting to
1554 : * such nodes.
1555 : */
1556 8 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1557 :
1558 : /* Check and drop the required publications in the given database. */
1559 8 : check_and_drop_publications(conn, &dbinfo[i]);
1560 :
1561 8 : create_subscription(conn, &dbinfo[i]);
1562 :
1563 : /* Set the replication progress to the correct LSN */
1564 8 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1565 :
1566 : /* Enable subscription */
1567 8 : enable_subscription(conn, &dbinfo[i]);
1568 :
1569 8 : disconnect_database(conn, false);
1570 : }
1571 4 : }
1572 :
1573 : /*
1574 : * Write the required recovery parameters.
1575 : */
1576 : static void
1577 4 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1578 : {
1579 : PGconn *conn;
1580 : PQExpBuffer recoveryconfcontents;
1581 :
1582 : /*
1583 : * Despite of the recovery parameters will be written to the subscriber,
1584 : * use a publisher connection. The primary_conninfo is generated using the
1585 : * connection settings.
1586 : */
1587 4 : conn = connect_database(dbinfo[0].pubconninfo, true);
1588 :
1589 : /*
1590 : * Write recovery parameters.
1591 : *
1592 : * The subscriber is not running yet. In dry run mode, the recovery
1593 : * parameters *won't* be written. An invalid LSN is used for printing
1594 : * purposes. Additional recovery parameters are added here. It avoids
1595 : * unexpected behavior such as end of recovery as soon as a consistent
1596 : * state is reached (recovery_target) and failure due to multiple recovery
1597 : * targets (name, time, xid, LSN).
1598 : */
1599 4 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1600 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1601 4 : appendPQExpBufferStr(recoveryconfcontents,
1602 : "recovery_target_timeline = 'latest'\n");
1603 :
1604 : /*
1605 : * Set recovery_target_inclusive = false to avoid reapplying the
1606 : * transaction committed at 'lsn' after subscription is enabled. This is
1607 : * because the provided 'lsn' is also used as the replication start point
1608 : * for the subscription. So, the server can send the transaction committed
1609 : * at that 'lsn' after replication is started which can lead to applying
1610 : * the same transaction twice if we keep recovery_target_inclusive = true.
1611 : */
1612 4 : appendPQExpBufferStr(recoveryconfcontents,
1613 : "recovery_target_inclusive = false\n");
1614 4 : appendPQExpBufferStr(recoveryconfcontents,
1615 : "recovery_target_action = promote\n");
1616 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1617 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1618 4 : appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1619 :
1620 4 : if (dry_run)
1621 : {
1622 3 : appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1623 3 : appendPQExpBuffer(recoveryconfcontents,
1624 : "recovery_target_lsn = '%X/%08X'\n",
1625 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1626 : }
1627 : else
1628 : {
1629 1 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1630 : lsn);
1631 : }
1632 :
1633 4 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1634 : "recovery parameters:\n%s", recoveryconfcontents->data);
1635 :
1636 4 : if (!dry_run)
1637 : {
1638 : char conf_filename[MAXPGPATH];
1639 : FILE *fd;
1640 :
1641 : /* Write the recovery parameters to INCLUDED_CONF_FILE */
1642 1 : snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
1643 : INCLUDED_CONF_FILE);
1644 1 : fd = fopen(conf_filename, "w");
1645 1 : if (fd == NULL)
1646 0 : report_createsub_fatal("could not open file \"%s\": %m", conf_filename);
1647 :
1648 1 : if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
1649 0 : report_createsub_fatal("could not write to file \"%s\": %m", conf_filename);
1650 :
1651 1 : fclose(fd);
1652 1 : recovery_params_set = true;
1653 :
1654 : /* Include conditionally the recovery parameters. */
1655 1 : resetPQExpBuffer(recoveryconfcontents);
1656 1 : appendPQExpBufferStr(recoveryconfcontents,
1657 : "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1658 1 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1659 : }
1660 :
1661 4 : disconnect_database(conn, false);
1662 4 : }
1663 :
1664 : /*
1665 : * Drop physical replication slot on primary if the standby was using it. After
1666 : * the transformation, it has no use.
1667 : *
1668 : * XXX we might not fail here. Instead, we provide a warning so the user
1669 : * eventually drops this replication slot later.
1670 : */
1671 : static void
1672 4 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1673 : {
1674 : PGconn *conn;
1675 :
1676 : /* Replication slot does not exist, do nothing */
1677 4 : if (!primary_slot_name)
1678 0 : return;
1679 :
1680 4 : conn = connect_database(dbinfo[0].pubconninfo, false);
1681 4 : if (conn != NULL)
1682 : {
1683 4 : drop_replication_slot(conn, &dbinfo[0], slotname);
1684 4 : disconnect_database(conn, false);
1685 : }
1686 : else
1687 : {
1688 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1689 : "could not drop replication slot \"%s\" on primary",
1690 : slotname);
1691 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1692 : "Drop this replication slot soon to avoid retention of WAL files.");
1693 : }
1694 : }
1695 :
1696 : /*
1697 : * Drop failover replication slots on subscriber. After the transformation,
1698 : * they have no use.
1699 : *
1700 : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1701 : * them later.
1702 : */
1703 : static void
1704 4 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1705 : {
1706 : PGconn *conn;
1707 : PGresult *res;
1708 :
1709 4 : conn = connect_database(dbinfo[0].subconninfo, false);
1710 4 : if (conn != NULL)
1711 : {
1712 : /* Get failover replication slot names */
1713 4 : res = PQexec(conn,
1714 : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1715 :
1716 4 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1717 : {
1718 : /* Remove failover replication slots from subscriber */
1719 8 : for (int i = 0; i < PQntuples(res); i++)
1720 4 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1721 : }
1722 : else
1723 : {
1724 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1725 : "could not obtain failover replication slot information: %s",
1726 : PQresultErrorMessage(res));
1727 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1728 : "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1729 : }
1730 :
1731 4 : PQclear(res);
1732 4 : disconnect_database(conn, false);
1733 : }
1734 : else
1735 : {
1736 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
1737 : "could not drop failover replication slot");
1738 0 : report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
1739 : "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1740 : }
1741 4 : }
1742 :
1743 : /*
1744 : * Create a logical replication slot and returns a LSN.
1745 : *
1746 : * CreateReplicationSlot() is not used because it does not provide the one-row
1747 : * result set that contains the LSN.
1748 : */
1749 : static char *
1750 8 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1751 : {
1752 8 : PQExpBuffer str = createPQExpBuffer();
1753 8 : PGresult *res = NULL;
1754 8 : const char *slot_name = dbinfo->replslotname;
1755 : char *slot_name_esc;
1756 8 : char *lsn = NULL;
1757 :
1758 : Assert(conn != NULL);
1759 :
1760 8 : if (dry_run)
1761 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1762 : "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1763 : slot_name, dbinfo->dbname);
1764 : else
1765 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1766 : "creating the replication slot \"%s\" in database \"%s\" on publisher",
1767 : slot_name, dbinfo->dbname);
1768 :
1769 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1770 :
1771 8 : appendPQExpBuffer(str,
1772 : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1773 : slot_name_esc,
1774 8 : dbinfos.two_phase ? "true" : "false");
1775 :
1776 8 : PQfreemem(slot_name_esc);
1777 :
1778 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1779 : "command is: %s", str->data);
1780 :
1781 8 : if (!dry_run)
1782 : {
1783 2 : res = PQexec(conn, str->data);
1784 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1785 : {
1786 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1787 : "could not create replication slot \"%s\" in database \"%s\": %s",
1788 : slot_name, dbinfo->dbname,
1789 : PQresultErrorMessage(res));
1790 0 : PQclear(res);
1791 0 : destroyPQExpBuffer(str);
1792 0 : return NULL;
1793 : }
1794 :
1795 2 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1796 2 : PQclear(res);
1797 : }
1798 :
1799 : /* For cleanup purposes */
1800 8 : dbinfo->made_replslot = true;
1801 :
1802 8 : destroyPQExpBuffer(str);
1803 :
1804 8 : return lsn;
1805 : }
1806 :
1807 : static void
1808 8 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1809 : const char *slot_name)
1810 : {
1811 8 : PQExpBuffer str = createPQExpBuffer();
1812 : char *slot_name_esc;
1813 : PGresult *res;
1814 :
1815 : Assert(conn != NULL);
1816 :
1817 8 : if (dry_run)
1818 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1819 : "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1820 : slot_name, dbinfo->dbname);
1821 : else
1822 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1823 : "dropping the replication slot \"%s\" in database \"%s\"",
1824 : slot_name, dbinfo->dbname);
1825 :
1826 8 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1827 :
1828 8 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1829 :
1830 8 : PQfreemem(slot_name_esc);
1831 :
1832 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1833 : "command is: %s", str->data);
1834 :
1835 8 : if (!dry_run)
1836 : {
1837 2 : res = PQexec(conn, str->data);
1838 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1839 : {
1840 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1841 : "could not drop replication slot \"%s\" in database \"%s\": %s",
1842 : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1843 0 : dbinfo->made_replslot = false; /* don't try again. */
1844 : }
1845 :
1846 2 : PQclear(res);
1847 : }
1848 :
1849 8 : destroyPQExpBuffer(str);
1850 8 : }
1851 :
1852 : /*
1853 : * Reports a suitable message if pg_ctl fails.
1854 : */
1855 : static void
1856 24 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1857 : {
1858 24 : if (rc != 0)
1859 : {
1860 0 : if (WIFEXITED(rc))
1861 : {
1862 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1863 : "pg_ctl failed with exit code %d",
1864 0 : WEXITSTATUS(rc));
1865 : }
1866 0 : else if (WIFSIGNALED(rc))
1867 : {
1868 : #if defined(WIN32)
1869 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1870 : "pg_ctl was terminated by exception 0x%X",
1871 : WTERMSIG(rc));
1872 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
1873 : "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1874 : #else
1875 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1876 : "pg_ctl was terminated by signal %d: %s",
1877 : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1878 : #endif
1879 : }
1880 : else
1881 : {
1882 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1883 : "pg_ctl exited with unrecognized status %d", rc);
1884 : }
1885 :
1886 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
1887 : "The failed command was: %s", pg_ctl_cmd);
1888 0 : exit(1);
1889 : }
1890 24 : }
1891 :
1892 : static void
1893 12 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1894 : bool restrict_logical_worker)
1895 : {
1896 12 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1897 : int rc;
1898 :
1899 12 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1900 12 : appendShellString(pg_ctl_cmd, subscriber_dir);
1901 12 : appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1902 :
1903 : /* Prevent unintended slot invalidation */
1904 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1905 :
1906 12 : if (restricted_access)
1907 : {
1908 12 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1909 : #if !defined(WIN32)
1910 :
1911 : /*
1912 : * An empty listen_addresses list means the server does not listen on
1913 : * any IP interfaces; only Unix-domain sockets can be used to connect
1914 : * to the server. Prevent external connections to minimize the chance
1915 : * of failure.
1916 : */
1917 12 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1918 12 : if (opt->socket_dir)
1919 12 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1920 12 : opt->socket_dir);
1921 12 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1922 : #endif
1923 : }
1924 12 : if (opt->config_file != NULL)
1925 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1926 0 : opt->config_file);
1927 :
1928 : /* Suppress to start logical replication if requested */
1929 12 : if (restrict_logical_worker)
1930 4 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1931 :
1932 12 : if (opt->log_dir)
1933 2 : appendPQExpBuffer(pg_ctl_cmd, " -l \"%s/%s\"", logdir, SERVER_LOG_FILE_NAME);
1934 :
1935 12 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1936 : "pg_ctl command is: %s", pg_ctl_cmd->data);
1937 12 : rc = system(pg_ctl_cmd->data);
1938 12 : pg_ctl_status(pg_ctl_cmd->data, rc);
1939 12 : standby_running = true;
1940 12 : destroyPQExpBuffer(pg_ctl_cmd);
1941 12 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1942 : "server was started");
1943 12 : }
1944 :
1945 : static void
1946 12 : stop_standby_server(const char *datadir)
1947 : {
1948 : char *pg_ctl_cmd;
1949 : int rc;
1950 :
1951 12 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1952 : datadir);
1953 12 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
1954 : "pg_ctl command is: %s", pg_ctl_cmd);
1955 12 : rc = system(pg_ctl_cmd);
1956 12 : pg_ctl_status(pg_ctl_cmd, rc);
1957 12 : standby_running = false;
1958 12 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1959 : "server was stopped");
1960 12 : }
1961 :
1962 : /*
1963 : * Returns after the server finishes the recovery process.
1964 : *
1965 : * If recovery_timeout option is set, terminate abnormally without finishing
1966 : * the recovery process. By default, it waits forever.
1967 : *
1968 : * XXX Is the recovery process still in progress? When recovery process has a
1969 : * better progress reporting mechanism, it should be added here.
1970 : */
1971 : static void
1972 4 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1973 : {
1974 : PGconn *conn;
1975 4 : bool ready = false;
1976 4 : int timer = 0;
1977 :
1978 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
1979 : "waiting for the target server to reach the consistent state");
1980 :
1981 4 : conn = connect_database(conninfo, true);
1982 :
1983 : for (;;)
1984 : {
1985 : /* Did the recovery process finish? We're done if so. */
1986 4 : if (dry_run || !server_is_in_recovery(conn))
1987 : {
1988 4 : ready = true;
1989 4 : recovery_ended = true;
1990 4 : break;
1991 : }
1992 :
1993 : /* Bail out after recovery_timeout seconds if this option is set */
1994 0 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1995 : {
1996 0 : stop_standby_server(subscriber_dir);
1997 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
1998 : "recovery timed out");
1999 0 : disconnect_database(conn, true);
2000 : }
2001 :
2002 : /* Keep waiting */
2003 0 : pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
2004 0 : timer += WAIT_INTERVAL;
2005 : }
2006 :
2007 4 : disconnect_database(conn, false);
2008 :
2009 4 : if (!ready)
2010 0 : report_createsub_fatal("server did not end recovery");
2011 :
2012 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2013 : "target server reached the consistent state");
2014 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_HINT,
2015 : "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
2016 4 : }
2017 :
2018 : /*
2019 : * Create a publication that includes all tables in the database.
2020 : */
2021 : static void
2022 7 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
2023 : {
2024 7 : PQExpBuffer str = createPQExpBuffer();
2025 : PGresult *res;
2026 : char *ipubname_esc;
2027 : char *spubname_esc;
2028 :
2029 : Assert(conn != NULL);
2030 :
2031 7 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
2032 7 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
2033 :
2034 : /* Check if the publication already exists */
2035 7 : appendPQExpBuffer(str,
2036 : "SELECT 1 FROM pg_catalog.pg_publication "
2037 : "WHERE pubname = %s",
2038 : spubname_esc);
2039 7 : res = PQexec(conn, str->data);
2040 7 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2041 : {
2042 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2043 : "could not obtain publication information: %s",
2044 : PQresultErrorMessage(res));
2045 0 : disconnect_database(conn, true);
2046 : }
2047 :
2048 7 : if (PQntuples(res) == 1)
2049 : {
2050 : /*
2051 : * Unfortunately, if it reaches this code path, it will always fail
2052 : * (unless you decide to change the existing publication name). That's
2053 : * bad but it is very unlikely that the user will choose a name with
2054 : * pg_createsubscriber_ prefix followed by the exact database oid and
2055 : * a random number.
2056 : */
2057 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2058 : "publication \"%s\" already exists", dbinfo->pubname);
2059 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2060 : "Consider renaming this publication before continuing.");
2061 0 : disconnect_database(conn, true);
2062 : }
2063 :
2064 7 : PQclear(res);
2065 7 : resetPQExpBuffer(str);
2066 :
2067 7 : if (dry_run)
2068 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2069 : "dry-run: would create publication \"%s\" in database \"%s\"",
2070 : dbinfo->pubname, dbinfo->dbname);
2071 : else
2072 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2073 : "creating publication \"%s\" in database \"%s\"",
2074 : dbinfo->pubname, dbinfo->dbname);
2075 :
2076 7 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
2077 : ipubname_esc);
2078 :
2079 7 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
2080 : "command is: %s", str->data);
2081 :
2082 7 : if (!dry_run)
2083 : {
2084 1 : res = PQexec(conn, str->data);
2085 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2086 : {
2087 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2088 : "could not create publication \"%s\" in database \"%s\": %s",
2089 : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
2090 0 : disconnect_database(conn, true);
2091 : }
2092 1 : PQclear(res);
2093 : }
2094 :
2095 : /* For cleanup purposes */
2096 7 : dbinfo->made_publication = true;
2097 :
2098 7 : PQfreemem(ipubname_esc);
2099 7 : PQfreemem(spubname_esc);
2100 7 : destroyPQExpBuffer(str);
2101 7 : }
2102 :
2103 : /*
2104 : * Drop the specified publication in the given database.
2105 : */
2106 : static void
2107 10 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
2108 : bool *made_publication)
2109 : {
2110 10 : PQExpBuffer str = createPQExpBuffer();
2111 : PGresult *res;
2112 : char *pubname_esc;
2113 :
2114 : Assert(conn != NULL);
2115 :
2116 10 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
2117 :
2118 10 : if (dry_run)
2119 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2120 : "dry-run: would drop publication \"%s\" in database \"%s\"",
2121 : pubname, dbname);
2122 : else
2123 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2124 : "dropping publication \"%s\" in database \"%s\"",
2125 : pubname, dbname);
2126 :
2127 10 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
2128 :
2129 10 : PQfreemem(pubname_esc);
2130 :
2131 10 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
2132 : "command is: %s", str->data);
2133 :
2134 10 : if (!dry_run)
2135 : {
2136 4 : res = PQexec(conn, str->data);
2137 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2138 : {
2139 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2140 : "could not drop publication \"%s\" in database \"%s\": %s",
2141 : pubname, dbname, PQresultErrorMessage(res));
2142 0 : *made_publication = false; /* don't try again. */
2143 :
2144 : /*
2145 : * Don't disconnect and exit here. This routine is used by primary
2146 : * (cleanup publication / replication slot due to an error) and
2147 : * subscriber (remove the replicated publications). In both cases,
2148 : * it can continue and provide instructions for the user to remove
2149 : * it later if cleanup fails.
2150 : */
2151 : }
2152 4 : PQclear(res);
2153 : }
2154 :
2155 10 : destroyPQExpBuffer(str);
2156 10 : }
2157 :
2158 : /*
2159 : * Retrieve and drop the publications.
2160 : *
2161 : * Publications copied during physical replication remain on the subscriber
2162 : * after promotion. If --clean=publications is specified, drop all existing
2163 : * publications in the subscriber database. Otherwise, only drop publications
2164 : * that were created by pg_createsubscriber during this operation.
2165 : */
2166 : static void
2167 8 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
2168 : {
2169 : PGresult *res;
2170 8 : bool drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
2171 :
2172 : Assert(conn != NULL);
2173 :
2174 8 : if (drop_all_pubs)
2175 : {
2176 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2177 : "dropping all existing publications in database \"%s\"",
2178 : dbinfo->dbname);
2179 :
2180 : /* Fetch all publication names */
2181 2 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
2182 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2183 : {
2184 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2185 : "could not obtain publication information: %s",
2186 : PQresultErrorMessage(res));
2187 0 : PQclear(res);
2188 0 : disconnect_database(conn, true);
2189 : }
2190 :
2191 : /* Drop each publication */
2192 6 : for (int i = 0; i < PQntuples(res); i++)
2193 4 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
2194 : &dbinfo->made_publication);
2195 :
2196 2 : PQclear(res);
2197 : }
2198 : else
2199 : {
2200 : /* Drop publication only if it was created by this tool */
2201 6 : if (dbinfo->made_publication)
2202 : {
2203 6 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
2204 : &dbinfo->made_publication);
2205 : }
2206 : else
2207 : {
2208 0 : if (dry_run)
2209 0 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2210 : "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
2211 : dbinfo->pubname, dbinfo->dbname);
2212 : else
2213 0 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2214 : "preserve existing publication \"%s\" in database \"%s\"",
2215 : dbinfo->pubname, dbinfo->dbname);
2216 : }
2217 : }
2218 8 : }
2219 :
2220 : /*
2221 : * Create a subscription with some predefined options.
2222 : *
2223 : * A replication slot was already created in a previous step. Let's use it. It
2224 : * is not required to copy data. The subscription will be created but it will
2225 : * not be enabled now. That's because the replication progress must be set and
2226 : * the replication origin name (one of the function arguments) contains the
2227 : * subscription OID in its name. Once the subscription is created,
2228 : * set_replication_progress() can obtain the chosen origin name and set up its
2229 : * initial location.
2230 : */
2231 : static void
2232 8 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
2233 : {
2234 8 : PQExpBuffer str = createPQExpBuffer();
2235 : PGresult *res;
2236 : char *pubname_esc;
2237 : char *subname_esc;
2238 : char *pubconninfo_esc;
2239 : char *replslotname_esc;
2240 :
2241 : Assert(conn != NULL);
2242 :
2243 8 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
2244 8 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2245 8 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
2246 8 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
2247 :
2248 8 : if (dry_run)
2249 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2250 : "dry-run: would create subscription \"%s\" in database \"%s\"",
2251 6 : dbinfo->subname, dbinfo->dbname);
2252 : else
2253 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2254 : "creating subscription \"%s\" in database \"%s\"",
2255 2 : dbinfo->subname, dbinfo->dbname);
2256 :
2257 8 : appendPQExpBuffer(str,
2258 : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2259 : "WITH (create_slot = false, enabled = false, "
2260 : "slot_name = %s, copy_data = false, two_phase = %s)",
2261 : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
2262 8 : dbinfos.two_phase ? "true" : "false");
2263 :
2264 8 : PQfreemem(pubname_esc);
2265 8 : PQfreemem(subname_esc);
2266 8 : PQfreemem(pubconninfo_esc);
2267 8 : PQfreemem(replslotname_esc);
2268 :
2269 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
2270 : "command is: %s", str->data);
2271 :
2272 8 : if (!dry_run)
2273 : {
2274 2 : res = PQexec(conn, str->data);
2275 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2276 : {
2277 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2278 : "could not create subscription \"%s\" in database \"%s\": %s",
2279 0 : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
2280 0 : disconnect_database(conn, true);
2281 : }
2282 2 : PQclear(res);
2283 : }
2284 :
2285 8 : destroyPQExpBuffer(str);
2286 8 : }
2287 :
2288 : /*
2289 : * Sets the replication progress to the consistent LSN.
2290 : *
2291 : * The subscriber caught up to the consistent LSN provided by the last
2292 : * replication slot that was created. The goal is to set up the initial
2293 : * location for the logical replication that is the exact LSN that the
2294 : * subscriber was promoted. Once the subscription is enabled it will start
2295 : * streaming from that location onwards. In dry run mode, the subscription OID
2296 : * and LSN are set to invalid values for printing purposes.
2297 : */
2298 : static void
2299 8 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
2300 : {
2301 8 : PQExpBuffer str = createPQExpBuffer();
2302 : PGresult *res;
2303 : Oid suboid;
2304 : char *subname;
2305 : char *dbname;
2306 : char *originname;
2307 : char *lsnstr;
2308 :
2309 : Assert(conn != NULL);
2310 :
2311 8 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2312 8 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2313 :
2314 8 : appendPQExpBuffer(str,
2315 : "SELECT s.oid FROM pg_catalog.pg_subscription s "
2316 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2317 : "WHERE s.subname = %s AND d.datname = %s",
2318 : subname, dbname);
2319 :
2320 8 : res = PQexec(conn, str->data);
2321 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2322 : {
2323 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2324 : "could not obtain subscription OID: %s",
2325 : PQresultErrorMessage(res));
2326 0 : disconnect_database(conn, true);
2327 : }
2328 :
2329 8 : if (PQntuples(res) != 1 && !dry_run)
2330 : {
2331 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2332 : "could not obtain subscription OID: got %d rows, expected %d row",
2333 : PQntuples(res), 1);
2334 0 : disconnect_database(conn, true);
2335 : }
2336 :
2337 8 : if (dry_run)
2338 : {
2339 6 : suboid = InvalidOid;
2340 6 : lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
2341 : }
2342 : else
2343 : {
2344 2 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2345 2 : lsnstr = psprintf("%s", lsn);
2346 : }
2347 :
2348 8 : PQclear(res);
2349 :
2350 : /*
2351 : * The origin name is defined as pg_%u. %u is the subscription OID. See
2352 : * ApplyWorkerMain().
2353 : */
2354 8 : originname = psprintf("pg_%u", suboid);
2355 :
2356 8 : if (dry_run)
2357 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2358 : "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2359 6 : originname, lsnstr, dbinfo->dbname);
2360 : else
2361 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2362 : "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2363 2 : originname, lsnstr, dbinfo->dbname);
2364 :
2365 8 : resetPQExpBuffer(str);
2366 8 : appendPQExpBuffer(str,
2367 : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2368 : originname, lsnstr);
2369 :
2370 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
2371 : "command is: %s", str->data);
2372 :
2373 8 : if (!dry_run)
2374 : {
2375 2 : res = PQexec(conn, str->data);
2376 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2377 : {
2378 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2379 : "could not set replication progress for subscription \"%s\": %s",
2380 0 : dbinfo->subname, PQresultErrorMessage(res));
2381 0 : disconnect_database(conn, true);
2382 : }
2383 2 : PQclear(res);
2384 : }
2385 :
2386 8 : PQfreemem(subname);
2387 8 : PQfreemem(dbname);
2388 8 : pg_free(originname);
2389 8 : pg_free(lsnstr);
2390 8 : destroyPQExpBuffer(str);
2391 8 : }
2392 :
2393 : /*
2394 : * Enables the subscription.
2395 : *
2396 : * The subscription was created in a previous step but it was disabled. After
2397 : * adjusting the initial logical replication location, enable the subscription.
2398 : */
2399 : static void
2400 8 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
2401 : {
2402 8 : PQExpBuffer str = createPQExpBuffer();
2403 : PGresult *res;
2404 : char *subname;
2405 :
2406 : Assert(conn != NULL);
2407 :
2408 8 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2409 :
2410 8 : if (dry_run)
2411 6 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2412 : "dry-run: would enable subscription \"%s\" in database \"%s\"",
2413 6 : dbinfo->subname, dbinfo->dbname);
2414 : else
2415 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2416 : "enabling subscription \"%s\" in database \"%s\"",
2417 2 : dbinfo->subname, dbinfo->dbname);
2418 :
2419 8 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2420 :
2421 8 : report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
2422 : "command is: %s", str->data);
2423 :
2424 8 : if (!dry_run)
2425 : {
2426 2 : res = PQexec(conn, str->data);
2427 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2428 : {
2429 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2430 : "could not enable subscription \"%s\": %s",
2431 0 : dbinfo->subname, PQresultErrorMessage(res));
2432 0 : disconnect_database(conn, true);
2433 : }
2434 :
2435 2 : PQclear(res);
2436 : }
2437 :
2438 8 : PQfreemem(subname);
2439 8 : destroyPQExpBuffer(str);
2440 8 : }
2441 :
2442 : /*
2443 : * Fetch a list of all connectable non-template databases from the source server
2444 : * and form a list such that they appear as if the user has specified multiple
2445 : * --database options, one for each source database.
2446 : */
2447 : static void
2448 1 : get_publisher_databases(struct CreateSubscriberOptions *opt,
2449 : bool dbnamespecified)
2450 : {
2451 : PGconn *conn;
2452 : PGresult *res;
2453 :
2454 : /* If a database name was specified, just connect to it. */
2455 1 : if (dbnamespecified)
2456 0 : conn = connect_database(opt->pub_conninfo_str, true);
2457 : else
2458 : {
2459 : /* Otherwise, try postgres first and then template1. */
2460 : char *conninfo;
2461 :
2462 1 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2463 1 : conn = connect_database(conninfo, false);
2464 1 : pg_free(conninfo);
2465 1 : if (!conn)
2466 : {
2467 0 : conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2468 0 : conn = connect_database(conninfo, true);
2469 0 : pg_free(conninfo);
2470 : }
2471 : }
2472 :
2473 1 : res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2474 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
2475 : {
2476 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2477 : "could not obtain a list of databases: %s",
2478 : PQresultErrorMessage(res));
2479 0 : PQclear(res);
2480 0 : disconnect_database(conn, true);
2481 : }
2482 :
2483 4 : for (int i = 0; i < PQntuples(res); i++)
2484 : {
2485 3 : const char *dbname = PQgetvalue(res, i, 0);
2486 :
2487 3 : simple_string_list_append(&opt->database_names, dbname);
2488 :
2489 : /* Increment num_dbs to reflect multiple --database options */
2490 3 : num_dbs++;
2491 : }
2492 :
2493 1 : PQclear(res);
2494 1 : disconnect_database(conn, false);
2495 1 : }
2496 :
2497 : int
2498 23 : main(int argc, char **argv)
2499 : {
2500 : static struct option long_options[] =
2501 : {
2502 : {"all", no_argument, NULL, 'a'},
2503 : {"database", required_argument, NULL, 'd'},
2504 : {"pgdata", required_argument, NULL, 'D'},
2505 : {"logdir", required_argument, NULL, 'l'},
2506 : {"dry-run", no_argument, NULL, 'n'},
2507 : {"subscriber-port", required_argument, NULL, 'p'},
2508 : {"publisher-server", required_argument, NULL, 'P'},
2509 : {"socketdir", required_argument, NULL, 's'},
2510 : {"recovery-timeout", required_argument, NULL, 't'},
2511 : {"enable-two-phase", no_argument, NULL, 'T'},
2512 : {"subscriber-username", required_argument, NULL, 'U'},
2513 : {"verbose", no_argument, NULL, 'v'},
2514 : {"version", no_argument, NULL, 'V'},
2515 : {"help", no_argument, NULL, '?'},
2516 : {"config-file", required_argument, NULL, 1},
2517 : {"publication", required_argument, NULL, 2},
2518 : {"replication-slot", required_argument, NULL, 3},
2519 : {"subscription", required_argument, NULL, 4},
2520 : {"clean", required_argument, NULL, 5},
2521 : {NULL, 0, NULL, 0}
2522 : };
2523 :
2524 23 : struct CreateSubscriberOptions opt = {0};
2525 :
2526 : int c;
2527 : int option_index;
2528 :
2529 : char *pub_base_conninfo;
2530 : char *sub_base_conninfo;
2531 23 : char *dbname_conninfo = NULL;
2532 :
2533 : uint64 pub_sysid;
2534 : uint64 sub_sysid;
2535 : struct stat statbuf;
2536 :
2537 : char *consistent_lsn;
2538 :
2539 : char pidfile[MAXPGPATH];
2540 :
2541 23 : pg_logging_init(argv[0]);
2542 23 : pg_logging_set_level(PG_LOG_WARNING);
2543 23 : progname = get_progname(argv[0]);
2544 23 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2545 :
2546 23 : if (argc > 1)
2547 : {
2548 22 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2549 : {
2550 1 : usage();
2551 1 : exit(0);
2552 : }
2553 21 : else if (strcmp(argv[1], "-V") == 0
2554 21 : || strcmp(argv[1], "--version") == 0)
2555 : {
2556 1 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2557 1 : exit(0);
2558 : }
2559 : }
2560 :
2561 : /* Default settings */
2562 21 : subscriber_dir = NULL;
2563 21 : opt.config_file = NULL;
2564 21 : opt.log_dir = NULL;
2565 21 : opt.pub_conninfo_str = NULL;
2566 21 : opt.socket_dir = NULL;
2567 21 : opt.sub_port = DEFAULT_SUB_PORT;
2568 21 : opt.sub_username = NULL;
2569 21 : opt.two_phase = false;
2570 21 : opt.database_names = (SimpleStringList)
2571 : {
2572 : 0
2573 : };
2574 21 : opt.recovery_timeout = 0;
2575 21 : opt.all_dbs = false;
2576 :
2577 : /*
2578 : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2579 : * it either.
2580 : */
2581 : #ifndef WIN32
2582 21 : if (geteuid() == 0)
2583 : {
2584 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2585 : "cannot be executed by \"root\"");
2586 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2587 : "You must run %s as the PostgreSQL superuser.",
2588 : progname);
2589 0 : exit(1);
2590 : }
2591 : #endif
2592 :
2593 21 : get_restricted_token();
2594 :
2595 163 : while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
2596 163 : long_options, &option_index)) != -1)
2597 : {
2598 145 : switch (c)
2599 : {
2600 3 : case 'a':
2601 3 : opt.all_dbs = true;
2602 3 : break;
2603 25 : case 'd':
2604 25 : if (!simple_string_list_member(&opt.database_names, optarg))
2605 : {
2606 24 : simple_string_list_append(&opt.database_names, optarg);
2607 24 : num_dbs++;
2608 : }
2609 : else
2610 1 : report_createsub_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2611 24 : break;
2612 19 : case 'D':
2613 19 : subscriber_dir = pg_strdup(optarg);
2614 19 : canonicalize_path(subscriber_dir);
2615 19 : break;
2616 1 : case 'l':
2617 1 : opt.log_dir = pg_strdup(optarg);
2618 1 : canonicalize_path(opt.log_dir);
2619 1 : break;
2620 9 : case 'n':
2621 9 : dry_run = true;
2622 9 : break;
2623 12 : case 'p':
2624 12 : opt.sub_port = pg_strdup(optarg);
2625 12 : break;
2626 18 : case 'P':
2627 18 : opt.pub_conninfo_str = pg_strdup(optarg);
2628 18 : break;
2629 12 : case 's':
2630 12 : opt.socket_dir = pg_strdup(optarg);
2631 12 : canonicalize_path(opt.socket_dir);
2632 12 : break;
2633 3 : case 't':
2634 3 : opt.recovery_timeout = atoi(optarg);
2635 3 : break;
2636 1 : case 'T':
2637 1 : opt.two_phase = true;
2638 1 : break;
2639 0 : case 'U':
2640 0 : opt.sub_username = pg_strdup(optarg);
2641 0 : break;
2642 19 : case 'v':
2643 19 : pg_logging_increase_verbosity();
2644 19 : break;
2645 0 : case 1:
2646 0 : opt.config_file = pg_strdup(optarg);
2647 0 : break;
2648 12 : case 2:
2649 12 : if (!simple_string_list_member(&opt.pub_names, optarg))
2650 : {
2651 11 : simple_string_list_append(&opt.pub_names, optarg);
2652 11 : num_pubs++;
2653 : }
2654 : else
2655 1 : report_createsub_fatal("publication \"%s\" specified more than once for --publication", optarg);
2656 11 : break;
2657 4 : case 3:
2658 4 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2659 : {
2660 4 : simple_string_list_append(&opt.replslot_names, optarg);
2661 4 : num_replslots++;
2662 : }
2663 : else
2664 0 : report_createsub_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2665 4 : break;
2666 5 : case 4:
2667 5 : if (!simple_string_list_member(&opt.sub_names, optarg))
2668 : {
2669 5 : simple_string_list_append(&opt.sub_names, optarg);
2670 5 : num_subs++;
2671 : }
2672 : else
2673 0 : report_createsub_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2674 5 : break;
2675 1 : case 5:
2676 1 : if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
2677 1 : simple_string_list_append(&opt.objecttypes_to_clean, optarg);
2678 : else
2679 0 : report_createsub_fatal("object type \"%s\" specified more than once for --clean", optarg);
2680 1 : break;
2681 1 : default:
2682 : /* getopt_long already emitted a complaint */
2683 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2684 : "Try \"%s --help\" for more information.",
2685 : progname);
2686 1 : exit(1);
2687 : }
2688 : }
2689 :
2690 : /* Validate that --all is not used with incompatible options */
2691 18 : if (opt.all_dbs)
2692 : {
2693 3 : char *bad_switch = NULL;
2694 :
2695 3 : if (num_dbs > 0)
2696 1 : bad_switch = "--database";
2697 2 : else if (num_pubs > 0)
2698 1 : bad_switch = "--publication";
2699 1 : else if (num_replslots > 0)
2700 0 : bad_switch = "--replication-slot";
2701 1 : else if (num_subs > 0)
2702 0 : bad_switch = "--subscription";
2703 :
2704 3 : if (bad_switch)
2705 : {
2706 2 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2707 : "options %s and %s cannot be used together",
2708 : bad_switch, "-a/--all");
2709 2 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2710 : "Try \"%s --help\" for more information.",
2711 : progname);
2712 2 : exit(1);
2713 : }
2714 : }
2715 :
2716 : /* Any non-option arguments? */
2717 16 : if (optind < argc)
2718 : {
2719 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2720 : "too many command-line arguments (first is \"%s\")",
2721 0 : argv[optind]);
2722 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2723 : "Try \"%s --help\" for more information.", progname);
2724 0 : exit(1);
2725 : }
2726 :
2727 : /* Required arguments */
2728 16 : if (subscriber_dir == NULL)
2729 : {
2730 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2731 : "no subscriber data directory specified");
2732 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2733 : "Try \"%s --help\" for more information.", progname);
2734 1 : exit(1);
2735 : }
2736 :
2737 : /* If socket directory is not provided, use the current directory */
2738 15 : if (opt.socket_dir == NULL)
2739 : {
2740 : char cwd[MAXPGPATH];
2741 :
2742 5 : if (!getcwd(cwd, MAXPGPATH))
2743 0 : report_createsub_fatal("could not determine current directory");
2744 5 : opt.socket_dir = pg_strdup(cwd);
2745 5 : canonicalize_path(opt.socket_dir);
2746 : }
2747 :
2748 : /*
2749 : * Parse connection string. Build a base connection string that might be
2750 : * reused by multiple databases.
2751 : */
2752 15 : if (opt.pub_conninfo_str == NULL)
2753 : {
2754 : /*
2755 : * TODO use primary_conninfo (if available) from subscriber and
2756 : * extract publisher connection string. Assume that there are
2757 : * identical entries for physical and logical replication. If there is
2758 : * not, we would fail anyway.
2759 : */
2760 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2761 : "no publisher connection string specified");
2762 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2763 : "Try \"%s --help\" for more information.", progname);
2764 1 : exit(1);
2765 : }
2766 :
2767 14 : if (opt.log_dir != NULL)
2768 : {
2769 : char *internal_log_file;
2770 :
2771 1 : umask(PG_MODE_MASK_OWNER);
2772 :
2773 : /*
2774 : * Set mask based on PGDATA permissions, needed for the creation of
2775 : * the output directories with correct permissions, similar with
2776 : * pg_ctl and pg_upgrade.
2777 : *
2778 : * Don't error here if the data directory cannot be stat'd. Upcoming
2779 : * checks for the data directory would raise the fatal error later.
2780 : */
2781 1 : if (GetDataDirectoryCreatePerm(subscriber_dir))
2782 1 : umask(pg_mode_mask);
2783 :
2784 1 : make_output_dirs(opt.log_dir);
2785 1 : internal_log_file = psprintf("%s/%s", logdir, INTERNAL_LOG_FILE_NAME);
2786 :
2787 : /* logfile_open() will exit if there is an error */
2788 1 : internal_log_file_fp = logfile_open(internal_log_file, "a");
2789 1 : pg_free(internal_log_file);
2790 : }
2791 :
2792 14 : if (dry_run)
2793 8 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2794 : "Executing in dry-run mode.\n"
2795 : "The target directory will not be modified.");
2796 :
2797 14 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2798 : "validating publisher connection string");
2799 14 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2800 : &dbname_conninfo);
2801 14 : if (pub_base_conninfo == NULL)
2802 0 : exit(1);
2803 :
2804 14 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2805 : "validating subscriber connection string");
2806 14 : sub_base_conninfo = get_sub_conninfo(&opt);
2807 :
2808 : /*
2809 : * Fetch all databases from the source (publisher) and treat them as if
2810 : * the user specified has multiple --database options, one for each source
2811 : * database.
2812 : */
2813 14 : if (opt.all_dbs)
2814 : {
2815 1 : bool dbnamespecified = (dbname_conninfo != NULL);
2816 :
2817 1 : get_publisher_databases(&opt, dbnamespecified);
2818 : }
2819 :
2820 14 : if (opt.database_names.head == NULL)
2821 : {
2822 2 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2823 : "no database was specified");
2824 :
2825 : /*
2826 : * Try to obtain the dbname from the publisher conninfo. If dbname
2827 : * parameter is not available, error out.
2828 : */
2829 2 : if (dbname_conninfo)
2830 : {
2831 1 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2832 1 : num_dbs++;
2833 :
2834 1 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2835 : "database name \"%s\" was extracted from the publisher connection string",
2836 : dbname_conninfo);
2837 : }
2838 : else
2839 : {
2840 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2841 : "no database name specified");
2842 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2843 : "Try \"%s --help\" for more information.",
2844 : progname);
2845 1 : exit(1);
2846 : }
2847 : }
2848 :
2849 : /* Number of object names must match number of databases */
2850 13 : if (num_pubs > 0 && num_pubs != num_dbs)
2851 : {
2852 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2853 : "wrong number of publication names specified");
2854 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
2855 : "The number of specified publication names (%d) must match the number of specified database names (%d).",
2856 : num_pubs, num_dbs);
2857 1 : exit(1);
2858 : }
2859 12 : if (num_subs > 0 && num_subs != num_dbs)
2860 : {
2861 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2862 : "wrong number of subscription names specified");
2863 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
2864 : "The number of specified subscription names (%d) must match the number of specified database names (%d).",
2865 : num_subs, num_dbs);
2866 1 : exit(1);
2867 : }
2868 11 : if (num_replslots > 0 && num_replslots != num_dbs)
2869 : {
2870 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2871 : "wrong number of replication slot names specified");
2872 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
2873 : "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2874 : num_replslots, num_dbs);
2875 1 : exit(1);
2876 : }
2877 :
2878 : /* Verify the object types specified for removal from the subscriber */
2879 11 : for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2880 : {
2881 1 : if (pg_strcasecmp(cell->val, "publications") == 0)
2882 1 : dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
2883 : else
2884 : {
2885 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2886 : "invalid object type \"%s\" specified for %s",
2887 0 : cell->val, "--clean");
2888 0 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2889 : "The valid value is: \"%s\"", "publications");
2890 0 : exit(1);
2891 : }
2892 : }
2893 :
2894 : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2895 10 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2896 10 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2897 :
2898 : /* Rudimentary check for a data directory */
2899 10 : check_data_directory(subscriber_dir);
2900 :
2901 10 : dbinfos.two_phase = opt.two_phase;
2902 :
2903 : /*
2904 : * Store database information for publisher and subscriber. It should be
2905 : * called before atexit() because its return is used in the
2906 : * cleanup_objects_atexit().
2907 : */
2908 10 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2909 :
2910 : /* Register a function to clean up objects in case of failure */
2911 10 : atexit(cleanup_objects_atexit);
2912 :
2913 : /*
2914 : * Check if the subscriber data directory has the same system identifier
2915 : * than the publisher data directory.
2916 : */
2917 10 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
2918 10 : sub_sysid = get_standby_sysid(subscriber_dir);
2919 10 : if (pub_sysid != sub_sysid)
2920 1 : report_createsub_fatal("subscriber data directory is not a copy of the source database cluster");
2921 :
2922 : /* Subscriber PID file */
2923 9 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2924 :
2925 : /*
2926 : * The standby server must not be running. If the server is started under
2927 : * service manager and pg_createsubscriber stops it, the service manager
2928 : * might react to this action and start the server again. Therefore,
2929 : * refuse to proceed if the server is running to avoid possible failures.
2930 : */
2931 9 : if (stat(pidfile, &statbuf) == 0)
2932 : {
2933 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
2934 : "standby server is running");
2935 1 : report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
2936 : "Stop the standby server and try again.");
2937 1 : exit(1);
2938 : }
2939 :
2940 : /*
2941 : * Start a short-lived standby server with temporary parameters (provided
2942 : * by command-line options). The goal is to avoid connections during the
2943 : * transformation steps.
2944 : */
2945 8 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2946 : "starting the standby server with command-line options");
2947 8 : start_standby_server(&opt, true, false);
2948 :
2949 : /* Check if the standby server is ready for logical replication */
2950 8 : check_subscriber(dbinfos.dbinfo);
2951 :
2952 : /* Check if the primary server is ready for logical replication */
2953 6 : check_publisher(dbinfos.dbinfo);
2954 :
2955 : /*
2956 : * Stop the target server. The recovery process requires that the server
2957 : * reaches a consistent state before targeting the recovery stop point.
2958 : * Make sure a consistent state is reached (stop the target server
2959 : * guarantees it) *before* creating the replication slots in
2960 : * setup_publisher().
2961 : */
2962 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2963 : "stopping the subscriber");
2964 4 : stop_standby_server(subscriber_dir);
2965 :
2966 : /* Create the required objects for each database on publisher */
2967 4 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2968 :
2969 : /* Write the required recovery parameters */
2970 4 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2971 :
2972 : /*
2973 : * Start subscriber so the recovery parameters will take effect. Wait
2974 : * until accepting connections. We don't want to start logical replication
2975 : * during setup.
2976 : */
2977 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
2978 : "starting the subscriber");
2979 4 : start_standby_server(&opt, true, true);
2980 :
2981 : /* Waiting the subscriber to be promoted */
2982 4 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2983 :
2984 : /*
2985 : * Create the subscription for each database on subscriber. It does not
2986 : * enable it immediately because it needs to adjust the replication start
2987 : * point to the LSN reported by setup_publisher(). It also cleans up
2988 : * publications created by this tool and replication to the standby.
2989 : */
2990 4 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2991 :
2992 : /* Remove primary_slot_name if it exists on primary */
2993 4 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2994 :
2995 : /* Remove failover replication slots if they exist on subscriber */
2996 4 : drop_failover_replication_slots(dbinfos.dbinfo);
2997 :
2998 : /* Stop the subscriber */
2999 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
3000 : "stopping the subscriber");
3001 4 : stop_standby_server(subscriber_dir);
3002 :
3003 : /* Change system identifier from subscriber */
3004 4 : modify_subscriber_sysid(&opt);
3005 :
3006 4 : success = true;
3007 :
3008 4 : report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
3009 : "Done!");
3010 :
3011 4 : return 0;
3012 : }
|