Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_createsubscriber.c
4 : * Create a new logical replica from a standby server
5 : *
6 : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres_fe.h"
15 :
16 : #include <sys/stat.h>
17 : #include <sys/time.h>
18 : #include <sys/wait.h>
19 : #include <time.h>
20 :
21 : #include "common/connect.h"
22 : #include "common/controldata_utils.h"
23 : #include "common/logging.h"
24 : #include "common/pg_prng.h"
25 : #include "common/restricted_token.h"
26 : #include "fe_utils/recovery_gen.h"
27 : #include "fe_utils/simple_list.h"
28 : #include "fe_utils/string_utils.h"
29 : #include "getopt_long.h"
30 :
31 : #define DEFAULT_SUB_PORT "50432"
32 : #define OBJECTTYPE_PUBLICATIONS 0x0001
33 :
34 : /* Command-line options */
35 : struct CreateSubscriberOptions
36 : {
37 : char *config_file; /* configuration file */
38 : char *pub_conninfo_str; /* publisher connection string */
39 : char *socket_dir; /* directory for Unix-domain socket, if any */
40 : char *sub_port; /* subscriber port number */
41 : const char *sub_username; /* subscriber username */
42 : bool two_phase; /* enable-two-phase option */
43 : SimpleStringList database_names; /* list of database names */
44 : SimpleStringList pub_names; /* list of publication names */
45 : SimpleStringList sub_names; /* list of subscription names */
46 : SimpleStringList replslot_names; /* list of replication slot names */
47 : int recovery_timeout; /* stop recovery after this time */
48 : SimpleStringList objecttypes_to_remove; /* list of object types to remove */
49 : };
50 :
51 : /* per-database publication/subscription info */
52 : struct LogicalRepInfo
53 : {
54 : char *dbname; /* database name */
55 : char *pubconninfo; /* publisher connection string */
56 : char *subconninfo; /* subscriber connection string */
57 : char *pubname; /* publication name */
58 : char *subname; /* subscription name */
59 : char *replslotname; /* replication slot name */
60 :
61 : bool made_replslot; /* replication slot was created */
62 : bool made_publication; /* publication was created */
63 : };
64 :
65 : /*
66 : * Information shared across all the databases (or publications and
67 : * subscriptions).
68 : */
69 : struct LogicalRepInfos
70 : {
71 : struct LogicalRepInfo *dbinfo;
72 : bool two_phase; /* enable-two-phase option */
73 : bits32 objecttypes_to_remove; /* flags indicating which object types
74 : * to remove on subscriber */
75 : };
76 :
77 : static void cleanup_objects_atexit(void);
78 : static void usage();
79 : static char *get_base_conninfo(const char *conninfo, char **dbname);
80 : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
81 : static char *get_exec_path(const char *argv0, const char *progname);
82 : static void check_data_directory(const char *datadir);
83 : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
84 : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
85 : const char *pub_base_conninfo,
86 : const char *sub_base_conninfo);
87 : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
88 : static void disconnect_database(PGconn *conn, bool exit_on_error);
89 : static uint64 get_primary_sysid(const char *conninfo);
90 : static uint64 get_standby_sysid(const char *datadir);
91 : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
92 : static bool server_is_in_recovery(PGconn *conn);
93 : static char *generate_object_name(PGconn *conn);
94 : static void check_publisher(const struct LogicalRepInfo *dbinfo);
95 : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
96 : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
97 : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
98 : const char *consistent_lsn);
99 : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
100 : const char *lsn);
101 : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
102 : const char *slotname);
103 : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
104 : static char *create_logical_replication_slot(PGconn *conn,
105 : struct LogicalRepInfo *dbinfo);
106 : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
107 : const char *slot_name);
108 : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
109 : static void start_standby_server(const struct CreateSubscriberOptions *opt,
110 : bool restricted_access,
111 : bool restrict_logical_worker);
112 : static void stop_standby_server(const char *datadir);
113 : static void wait_for_end_recovery(const char *conninfo,
114 : const struct CreateSubscriberOptions *opt);
115 : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
116 : static void drop_publication(PGconn *conn, const char *pubname,
117 : const char *dbname, bool *made_publication);
118 : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
119 : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
120 : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
121 : const char *lsn);
122 : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
123 : static void check_and_drop_existing_subscriptions(PGconn *conn,
124 : const struct LogicalRepInfo *dbinfo);
125 : static void drop_existing_subscriptions(PGconn *conn, const char *subname,
126 : const char *dbname);
127 :
128 : #define USEC_PER_SEC 1000000
129 : #define WAIT_INTERVAL 1 /* 1 second */
130 :
131 : static const char *progname;
132 :
133 : static char *primary_slot_name = NULL;
134 : static bool dry_run = false;
135 :
136 : static bool success = false;
137 :
138 : static struct LogicalRepInfos dbinfos;
139 : static int num_dbs = 0; /* number of specified databases */
140 : static int num_pubs = 0; /* number of specified publications */
141 : static int num_subs = 0; /* number of specified subscriptions */
142 : static int num_replslots = 0; /* number of specified replication slots */
143 :
144 : static pg_prng_state prng_state;
145 :
146 : static char *pg_ctl_path = NULL;
147 : static char *pg_resetwal_path = NULL;
148 :
149 : /* standby / subscriber data directory */
150 : static char *subscriber_dir = NULL;
151 :
152 : static bool recovery_ended = false;
153 : static bool standby_running = false;
154 :
155 : enum WaitPMResult
156 : {
157 : POSTMASTER_READY,
158 : POSTMASTER_STILL_STARTING
159 : };
160 :
161 :
162 : /*
163 : * Cleanup objects that were created by pg_createsubscriber if there is an
164 : * error.
165 : *
166 : * Publications and replication slots are created on primary. Depending on the
167 : * step it failed, it should remove the already created objects if it is
168 : * possible (sometimes it won't work due to a connection issue).
169 : * There is no cleanup on the target server. The steps on the target server are
170 : * executed *after* promotion, hence, at this point, a failure means recreate
171 : * the physical replica and start again.
172 : */
173 : static void
174 18 : cleanup_objects_atexit(void)
175 : {
176 18 : if (success)
177 6 : return;
178 :
179 : /*
180 : * If the server is promoted, there is no way to use the current setup
181 : * again. Warn the user that a new replication setup should be done before
182 : * trying again.
183 : */
184 12 : if (recovery_ended)
185 : {
186 0 : pg_log_warning("failed after the end of recovery");
187 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
188 : "You must recreate the physical replica before continuing.");
189 : }
190 :
191 36 : for (int i = 0; i < num_dbs; i++)
192 : {
193 24 : struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
194 :
195 24 : if (dbinfo->made_publication || dbinfo->made_replslot)
196 : {
197 : PGconn *conn;
198 :
199 0 : conn = connect_database(dbinfo->pubconninfo, false);
200 0 : if (conn != NULL)
201 : {
202 0 : if (dbinfo->made_publication)
203 0 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
204 : &dbinfo->made_publication);
205 0 : if (dbinfo->made_replslot)
206 0 : drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
207 0 : disconnect_database(conn, false);
208 : }
209 : else
210 : {
211 : /*
212 : * If a connection could not be established, inform the user
213 : * that some objects were left on primary and should be
214 : * removed before trying again.
215 : */
216 0 : if (dbinfo->made_publication)
217 : {
218 0 : pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
219 : dbinfo->pubname,
220 : dbinfo->dbname);
221 0 : pg_log_warning_hint("Drop this publication before trying again.");
222 : }
223 0 : if (dbinfo->made_replslot)
224 : {
225 0 : pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
226 : dbinfo->replslotname,
227 : dbinfo->dbname);
228 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
229 : }
230 : }
231 : }
232 : }
233 :
234 12 : if (standby_running)
235 8 : stop_standby_server(subscriber_dir);
236 : }
237 :
238 : static void
239 2 : usage(void)
240 : {
241 2 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
242 : progname);
243 2 : printf(_("Usage:\n"));
244 2 : printf(_(" %s [OPTION]...\n"), progname);
245 2 : printf(_("\nOptions:\n"));
246 2 : printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
247 2 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
248 2 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
249 2 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
250 2 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
251 2 : printf(_(" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"
252 : " databases on the subscriber; accepts: publications\n"));
253 2 : printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
254 2 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
255 2 : printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
256 2 : printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
257 2 : printf(_(" -v, --verbose output verbose messages\n"));
258 2 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
259 : " file when running target cluster\n"));
260 2 : printf(_(" --publication=NAME publication name\n"));
261 2 : printf(_(" --replication-slot=NAME replication slot name\n"));
262 2 : printf(_(" --subscription=NAME subscription name\n"));
263 2 : printf(_(" -V, --version output version information, then exit\n"));
264 2 : printf(_(" -?, --help show this help, then exit\n"));
265 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
266 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
267 2 : }
268 :
269 : /*
270 : * Subroutine to append "keyword=value" to a connection string,
271 : * with proper quoting of the value. (We assume keywords don't need that.)
272 : */
273 : static void
274 190 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
275 : {
276 190 : if (buf->len > 0)
277 138 : appendPQExpBufferChar(buf, ' ');
278 190 : appendPQExpBufferStr(buf, keyword);
279 190 : appendPQExpBufferChar(buf, '=');
280 190 : appendConnStrVal(buf, val);
281 190 : }
282 :
283 : /*
284 : * Validate a connection string. Returns a base connection string that is a
285 : * connection string without a database name.
286 : *
287 : * Since we might process multiple databases, each database name will be
288 : * appended to this base connection string to provide a final connection
289 : * string. If the second argument (dbname) is not null, returns dbname if the
290 : * provided connection string contains it.
291 : *
292 : * It is the caller's responsibility to free the returned connection string and
293 : * dbname.
294 : */
295 : static char *
296 26 : get_base_conninfo(const char *conninfo, char **dbname)
297 : {
298 : PQExpBuffer buf;
299 : PQconninfoOption *conn_opts;
300 : PQconninfoOption *conn_opt;
301 26 : char *errmsg = NULL;
302 : char *ret;
303 :
304 26 : conn_opts = PQconninfoParse(conninfo, &errmsg);
305 26 : if (conn_opts == NULL)
306 : {
307 0 : pg_log_error("could not parse connection string: %s", errmsg);
308 0 : PQfreemem(errmsg);
309 0 : return NULL;
310 : }
311 :
312 26 : buf = createPQExpBuffer();
313 1248 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
314 : {
315 1222 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
316 : {
317 62 : if (strcmp(conn_opt->keyword, "dbname") == 0)
318 : {
319 18 : if (dbname)
320 18 : *dbname = pg_strdup(conn_opt->val);
321 18 : continue;
322 : }
323 44 : appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
324 : }
325 : }
326 :
327 26 : ret = pg_strdup(buf->data);
328 :
329 26 : destroyPQExpBuffer(buf);
330 26 : PQconninfoFree(conn_opts);
331 :
332 26 : return ret;
333 : }
334 :
335 : /*
336 : * Build a subscriber connection string. Only a few parameters are supported
337 : * since it starts a server with restricted access.
338 : */
339 : static char *
340 26 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
341 : {
342 26 : PQExpBuffer buf = createPQExpBuffer();
343 : char *ret;
344 :
345 26 : appendConnStrItem(buf, "port", opt->sub_port);
346 : #if !defined(WIN32)
347 26 : appendConnStrItem(buf, "host", opt->socket_dir);
348 : #endif
349 26 : if (opt->sub_username != NULL)
350 0 : appendConnStrItem(buf, "user", opt->sub_username);
351 26 : appendConnStrItem(buf, "fallback_application_name", progname);
352 :
353 26 : ret = pg_strdup(buf->data);
354 :
355 26 : destroyPQExpBuffer(buf);
356 :
357 26 : return ret;
358 : }
359 :
360 : /*
361 : * Verify if a PostgreSQL binary (progname) is available in the same directory as
362 : * pg_createsubscriber and it has the same version. It returns the absolute
363 : * path of the progname.
364 : */
365 : static char *
366 36 : get_exec_path(const char *argv0, const char *progname)
367 : {
368 : char *versionstr;
369 : char *exec_path;
370 : int ret;
371 :
372 36 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
373 36 : exec_path = pg_malloc(MAXPGPATH);
374 36 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
375 :
376 36 : if (ret < 0)
377 : {
378 : char full_path[MAXPGPATH];
379 :
380 0 : if (find_my_exec(argv0, full_path) < 0)
381 0 : strlcpy(full_path, progname, sizeof(full_path));
382 :
383 0 : if (ret == -1)
384 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
385 : progname, "pg_createsubscriber", full_path);
386 : else
387 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
388 : progname, full_path, "pg_createsubscriber");
389 : }
390 :
391 36 : pg_log_debug("%s path is: %s", progname, exec_path);
392 :
393 36 : return exec_path;
394 : }
395 :
396 : /*
397 : * Is it a cluster directory? These are preliminary checks. It is far from
398 : * making an accurate check. If it is not a clone from the publisher, it will
399 : * eventually fail in a future step.
400 : */
401 : static void
402 18 : check_data_directory(const char *datadir)
403 : {
404 : struct stat statbuf;
405 : char versionfile[MAXPGPATH];
406 :
407 18 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
408 : datadir);
409 :
410 18 : if (stat(datadir, &statbuf) != 0)
411 : {
412 0 : if (errno == ENOENT)
413 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
414 : else
415 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
416 : }
417 :
418 18 : snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
419 18 : if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
420 : {
421 0 : pg_fatal("directory \"%s\" is not a database cluster directory",
422 : datadir);
423 : }
424 18 : }
425 :
426 : /*
427 : * Append database name into a base connection string.
428 : *
429 : * dbname is the only parameter that changes so it is not included in the base
430 : * connection string. This function concatenates dbname to build a "real"
431 : * connection string.
432 : */
433 : static char *
434 68 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
435 : {
436 68 : PQExpBuffer buf = createPQExpBuffer();
437 : char *ret;
438 :
439 : Assert(conninfo != NULL);
440 :
441 68 : appendPQExpBufferStr(buf, conninfo);
442 68 : appendConnStrItem(buf, "dbname", dbname);
443 :
444 68 : ret = pg_strdup(buf->data);
445 68 : destroyPQExpBuffer(buf);
446 :
447 68 : return ret;
448 : }
449 :
450 : /*
451 : * Store publication and subscription information.
452 : *
453 : * If publication, replication slot and subscription names were specified,
454 : * store it here. Otherwise, a generated name will be assigned to the object in
455 : * setup_publisher().
456 : */
457 : static struct LogicalRepInfo *
458 18 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
459 : const char *pub_base_conninfo,
460 : const char *sub_base_conninfo)
461 : {
462 : struct LogicalRepInfo *dbinfo;
463 18 : SimpleStringListCell *pubcell = NULL;
464 18 : SimpleStringListCell *subcell = NULL;
465 18 : SimpleStringListCell *replslotcell = NULL;
466 18 : int i = 0;
467 :
468 18 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
469 :
470 18 : if (num_pubs > 0)
471 4 : pubcell = opt->pub_names.head;
472 18 : if (num_subs > 0)
473 2 : subcell = opt->sub_names.head;
474 18 : if (num_replslots > 0)
475 4 : replslotcell = opt->replslot_names.head;
476 :
477 52 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
478 : {
479 : char *conninfo;
480 :
481 : /* Fill publisher attributes */
482 34 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
483 34 : dbinfo[i].pubconninfo = conninfo;
484 34 : dbinfo[i].dbname = cell->val;
485 34 : if (num_pubs > 0)
486 8 : dbinfo[i].pubname = pubcell->val;
487 : else
488 26 : dbinfo[i].pubname = NULL;
489 34 : if (num_replslots > 0)
490 6 : dbinfo[i].replslotname = replslotcell->val;
491 : else
492 28 : dbinfo[i].replslotname = NULL;
493 34 : dbinfo[i].made_replslot = false;
494 34 : dbinfo[i].made_publication = false;
495 : /* Fill subscriber attributes */
496 34 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
497 34 : dbinfo[i].subconninfo = conninfo;
498 34 : if (num_subs > 0)
499 4 : dbinfo[i].subname = subcell->val;
500 : else
501 30 : dbinfo[i].subname = NULL;
502 : /* Other fields will be filled later */
503 :
504 34 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
505 : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
506 : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
507 : dbinfo[i].pubconninfo);
508 34 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
509 : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
510 : dbinfo[i].subconninfo,
511 : dbinfos.two_phase ? "true" : "false");
512 :
513 34 : if (num_pubs > 0)
514 8 : pubcell = pubcell->next;
515 34 : if (num_subs > 0)
516 4 : subcell = subcell->next;
517 34 : if (num_replslots > 0)
518 6 : replslotcell = replslotcell->next;
519 :
520 34 : i++;
521 : }
522 :
523 18 : return dbinfo;
524 : }
525 :
526 : /*
527 : * Open a new connection. If exit_on_error is true, it has an undesired
528 : * condition and it should exit immediately.
529 : */
530 : static PGconn *
531 86 : connect_database(const char *conninfo, bool exit_on_error)
532 : {
533 : PGconn *conn;
534 : PGresult *res;
535 :
536 86 : conn = PQconnectdb(conninfo);
537 86 : if (PQstatus(conn) != CONNECTION_OK)
538 : {
539 0 : pg_log_error("connection to database failed: %s",
540 : PQerrorMessage(conn));
541 0 : PQfinish(conn);
542 :
543 0 : if (exit_on_error)
544 0 : exit(1);
545 0 : return NULL;
546 : }
547 :
548 : /* Secure search_path */
549 86 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
550 86 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
551 : {
552 0 : pg_log_error("could not clear \"search_path\": %s",
553 : PQresultErrorMessage(res));
554 0 : PQclear(res);
555 0 : PQfinish(conn);
556 :
557 0 : if (exit_on_error)
558 0 : exit(1);
559 0 : return NULL;
560 : }
561 86 : PQclear(res);
562 :
563 86 : return conn;
564 : }
565 :
566 : /*
567 : * Close the connection. If exit_on_error is true, it has an undesired
568 : * condition and it should exit immediately.
569 : */
570 : static void
571 86 : disconnect_database(PGconn *conn, bool exit_on_error)
572 : {
573 : Assert(conn != NULL);
574 :
575 86 : PQfinish(conn);
576 :
577 86 : if (exit_on_error)
578 4 : exit(1);
579 82 : }
580 :
581 : /*
582 : * Obtain the system identifier using the provided connection. It will be used
583 : * to compare if a data directory is a clone of another one.
584 : */
585 : static uint64
586 18 : get_primary_sysid(const char *conninfo)
587 : {
588 : PGconn *conn;
589 : PGresult *res;
590 : uint64 sysid;
591 :
592 18 : pg_log_info("getting system identifier from publisher");
593 :
594 18 : conn = connect_database(conninfo, true);
595 :
596 18 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
597 18 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
598 : {
599 0 : pg_log_error("could not get system identifier: %s",
600 : PQresultErrorMessage(res));
601 0 : disconnect_database(conn, true);
602 : }
603 18 : if (PQntuples(res) != 1)
604 : {
605 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
606 : PQntuples(res), 1);
607 0 : disconnect_database(conn, true);
608 : }
609 :
610 18 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
611 :
612 18 : pg_log_info("system identifier is %llu on publisher",
613 : (unsigned long long) sysid);
614 :
615 18 : PQclear(res);
616 18 : disconnect_database(conn, false);
617 :
618 18 : return sysid;
619 : }
620 :
621 : /*
622 : * Obtain the system identifier from control file. It will be used to compare
623 : * if a data directory is a clone of another one. This routine is used locally
624 : * and avoids a connection.
625 : */
626 : static uint64
627 18 : get_standby_sysid(const char *datadir)
628 : {
629 : ControlFileData *cf;
630 : bool crc_ok;
631 : uint64 sysid;
632 :
633 18 : pg_log_info("getting system identifier from subscriber");
634 :
635 18 : cf = get_controlfile(datadir, &crc_ok);
636 18 : if (!crc_ok)
637 0 : pg_fatal("control file appears to be corrupt");
638 :
639 18 : sysid = cf->system_identifier;
640 :
641 18 : pg_log_info("system identifier is %llu on subscriber",
642 : (unsigned long long) sysid);
643 :
644 18 : pg_free(cf);
645 :
646 18 : return sysid;
647 : }
648 :
649 : /*
650 : * Modify the system identifier. Since a standby server preserves the system
651 : * identifier, it makes sense to change it to avoid situations in which WAL
652 : * files from one of the systems might be used in the other one.
653 : */
654 : static void
655 6 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
656 : {
657 : ControlFileData *cf;
658 : bool crc_ok;
659 : struct timeval tv;
660 :
661 : char *cmd_str;
662 :
663 6 : pg_log_info("modifying system identifier of subscriber");
664 :
665 6 : cf = get_controlfile(subscriber_dir, &crc_ok);
666 6 : if (!crc_ok)
667 0 : pg_fatal("control file appears to be corrupt");
668 :
669 : /*
670 : * Select a new system identifier.
671 : *
672 : * XXX this code was extracted from BootStrapXLOG().
673 : */
674 6 : gettimeofday(&tv, NULL);
675 6 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
676 6 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
677 6 : cf->system_identifier |= getpid() & 0xFFF;
678 :
679 6 : if (!dry_run)
680 2 : update_controlfile(subscriber_dir, cf, true);
681 :
682 6 : pg_log_info("system identifier is %llu on subscriber",
683 : (unsigned long long) cf->system_identifier);
684 :
685 6 : pg_log_info("running pg_resetwal on the subscriber");
686 :
687 6 : cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
688 : subscriber_dir, DEVNULL);
689 :
690 6 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
691 :
692 6 : if (!dry_run)
693 : {
694 2 : int rc = system(cmd_str);
695 :
696 2 : if (rc == 0)
697 2 : pg_log_info("subscriber successfully changed the system identifier");
698 : else
699 0 : pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
700 : }
701 :
702 6 : pg_free(cf);
703 6 : }
704 :
705 : /*
706 : * Generate an object name using a prefix, database oid and a random integer.
707 : * It is used in case the user does not specify an object name (publication,
708 : * subscription, replication slot).
709 : */
710 : static char *
711 10 : generate_object_name(PGconn *conn)
712 : {
713 : PGresult *res;
714 : Oid oid;
715 : uint32 rand;
716 : char *objname;
717 :
718 10 : res = PQexec(conn,
719 : "SELECT oid FROM pg_catalog.pg_database "
720 : "WHERE datname = pg_catalog.current_database()");
721 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
722 : {
723 0 : pg_log_error("could not obtain database OID: %s",
724 : PQresultErrorMessage(res));
725 0 : disconnect_database(conn, true);
726 : }
727 :
728 10 : if (PQntuples(res) != 1)
729 : {
730 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
731 : PQntuples(res), 1);
732 0 : disconnect_database(conn, true);
733 : }
734 :
735 : /* Database OID */
736 10 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
737 :
738 10 : PQclear(res);
739 :
740 : /* Random unsigned integer */
741 10 : rand = pg_prng_uint32(&prng_state);
742 :
743 : /*
744 : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
745 : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
746 : * '\0').
747 : */
748 10 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
749 :
750 10 : return objname;
751 : }
752 :
753 : /*
754 : * Create the publications and replication slots in preparation for logical
755 : * replication. Returns the LSN from latest replication slot. It will be the
756 : * replication start point that is used to adjust the subscriptions (see
757 : * set_replication_progress).
758 : */
759 : static char *
760 6 : setup_publisher(struct LogicalRepInfo *dbinfo)
761 : {
762 6 : char *lsn = NULL;
763 :
764 6 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
765 :
766 16 : for (int i = 0; i < num_dbs; i++)
767 : {
768 : PGconn *conn;
769 10 : char *genname = NULL;
770 :
771 10 : conn = connect_database(dbinfo[i].pubconninfo, true);
772 :
773 : /*
774 : * If an object name was not specified as command-line options, assign
775 : * a generated object name. The replication slot has a different rule.
776 : * The subscription name is assigned to the replication slot name if
777 : * no replication slot is specified. It follows the same rule as
778 : * CREATE SUBSCRIPTION.
779 : */
780 10 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
781 10 : genname = generate_object_name(conn);
782 10 : if (num_pubs == 0)
783 2 : dbinfo[i].pubname = pg_strdup(genname);
784 10 : if (num_subs == 0)
785 6 : dbinfo[i].subname = pg_strdup(genname);
786 10 : if (num_replslots == 0)
787 4 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
788 :
789 : /*
790 : * Create publication on publisher. This step should be executed
791 : * *before* promoting the subscriber to avoid any transactions between
792 : * consistent LSN and the new publication rows (such transactions
793 : * wouldn't see the new publication rows resulting in an error).
794 : */
795 10 : create_publication(conn, &dbinfo[i]);
796 :
797 : /* Create replication slot on publisher */
798 10 : if (lsn)
799 2 : pg_free(lsn);
800 10 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
801 10 : if (lsn != NULL || dry_run)
802 10 : pg_log_info("create replication slot \"%s\" on publisher",
803 : dbinfo[i].replslotname);
804 : else
805 0 : exit(1);
806 :
807 : /*
808 : * Since we are using the LSN returned by the last replication slot as
809 : * recovery_target_lsn, this LSN is ahead of the current WAL position
810 : * and the recovery waits until the publisher writes a WAL record to
811 : * reach the target and ends the recovery. On idle systems, this wait
812 : * time is unpredictable and could lead to failure in promoting the
813 : * subscriber. To avoid that, insert a harmless WAL record.
814 : */
815 10 : if (i == num_dbs - 1 && !dry_run)
816 : {
817 : PGresult *res;
818 :
819 2 : res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
820 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
821 : {
822 0 : pg_log_error("could not write an additional WAL record: %s",
823 : PQresultErrorMessage(res));
824 0 : disconnect_database(conn, true);
825 : }
826 2 : PQclear(res);
827 : }
828 :
829 10 : disconnect_database(conn, false);
830 : }
831 :
832 6 : return lsn;
833 : }
834 :
835 : /*
836 : * Is recovery still in progress?
837 : */
838 : static bool
839 30 : server_is_in_recovery(PGconn *conn)
840 : {
841 : PGresult *res;
842 : int ret;
843 :
844 30 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
845 :
846 30 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
847 : {
848 0 : pg_log_error("could not obtain recovery progress: %s",
849 : PQresultErrorMessage(res));
850 0 : disconnect_database(conn, true);
851 : }
852 :
853 :
854 30 : ret = strcmp("t", PQgetvalue(res, 0, 0));
855 :
856 30 : PQclear(res);
857 :
858 30 : return ret == 0;
859 : }
860 :
861 : /*
862 : * Is the primary server ready for logical replication?
863 : *
864 : * XXX Does it not allow a synchronous replica?
865 : */
866 : static void
867 10 : check_publisher(const struct LogicalRepInfo *dbinfo)
868 : {
869 : PGconn *conn;
870 : PGresult *res;
871 10 : bool failed = false;
872 :
873 : char *wal_level;
874 : int max_repslots;
875 : int cur_repslots;
876 : int max_walsenders;
877 : int cur_walsenders;
878 : int max_prepared_transactions;
879 : char *max_slot_wal_keep_size;
880 :
881 10 : pg_log_info("checking settings on publisher");
882 :
883 10 : conn = connect_database(dbinfo[0].pubconninfo, true);
884 :
885 : /*
886 : * If the primary server is in recovery (i.e. cascading replication),
887 : * objects (publication) cannot be created because it is read only.
888 : */
889 10 : if (server_is_in_recovery(conn))
890 : {
891 2 : pg_log_error("primary server cannot be in recovery");
892 2 : disconnect_database(conn, true);
893 : }
894 :
895 : /*------------------------------------------------------------------------
896 : * Logical replication requires a few parameters to be set on publisher.
897 : * Since these parameters are not a requirement for physical replication,
898 : * we should check it to make sure it won't fail.
899 : *
900 : * - wal_level = logical
901 : * - max_replication_slots >= current + number of dbs to be converted
902 : * - max_wal_senders >= current + number of dbs to be converted
903 : * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
904 : * -----------------------------------------------------------------------
905 : */
906 8 : res = PQexec(conn,
907 : "SELECT pg_catalog.current_setting('wal_level'),"
908 : " pg_catalog.current_setting('max_replication_slots'),"
909 : " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
910 : " pg_catalog.current_setting('max_wal_senders'),"
911 : " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
912 : " pg_catalog.current_setting('max_prepared_transactions'),"
913 : " pg_catalog.current_setting('max_slot_wal_keep_size')");
914 :
915 8 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
916 : {
917 0 : pg_log_error("could not obtain publisher settings: %s",
918 : PQresultErrorMessage(res));
919 0 : disconnect_database(conn, true);
920 : }
921 :
922 8 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
923 8 : max_repslots = atoi(PQgetvalue(res, 0, 1));
924 8 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
925 8 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
926 8 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
927 8 : max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
928 8 : max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
929 :
930 8 : PQclear(res);
931 :
932 8 : pg_log_debug("publisher: wal_level: %s", wal_level);
933 8 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
934 8 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
935 8 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
936 8 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
937 8 : pg_log_debug("publisher: max_prepared_transactions: %d",
938 : max_prepared_transactions);
939 8 : pg_log_debug("publisher: max_slot_wal_keep_size: %s",
940 : max_slot_wal_keep_size);
941 :
942 8 : disconnect_database(conn, false);
943 :
944 8 : if (strcmp(wal_level, "logical") != 0)
945 : {
946 2 : pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
947 2 : failed = true;
948 : }
949 :
950 8 : if (max_repslots - cur_repslots < num_dbs)
951 : {
952 2 : pg_log_error("publisher requires %d replication slots, but only %d remain",
953 : num_dbs, max_repslots - cur_repslots);
954 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
955 : "max_replication_slots", cur_repslots + num_dbs);
956 2 : failed = true;
957 : }
958 :
959 8 : if (max_walsenders - cur_walsenders < num_dbs)
960 : {
961 2 : pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
962 : num_dbs, max_walsenders - cur_walsenders);
963 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
964 : "max_wal_senders", cur_walsenders + num_dbs);
965 2 : failed = true;
966 : }
967 :
968 8 : if (max_prepared_transactions != 0 && !dbinfos.two_phase)
969 : {
970 0 : pg_log_warning("two_phase option will not be enabled for replication slots");
971 0 : pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
972 : "Prepared transactions will be replicated at COMMIT PREPARED.");
973 0 : pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
974 : }
975 :
976 : /*
977 : * Validate 'max_slot_wal_keep_size'. If this parameter is set to a
978 : * non-default value, it may cause replication failures due to required
979 : * WAL files being prematurely removed.
980 : */
981 8 : if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
982 : {
983 0 : pg_log_warning("required WAL could be removed from the publisher");
984 0 : pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
985 : "max_slot_wal_keep_size");
986 : }
987 :
988 8 : pg_free(wal_level);
989 :
990 8 : if (failed)
991 2 : exit(1);
992 6 : }
993 :
994 : /*
995 : * Is the standby server ready for logical replication?
996 : *
997 : * XXX Does it not allow a time-delayed replica?
998 : *
999 : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1000 : * is S, it cannot detect there is a replica (server C) because server S starts
1001 : * accepting only local connections and server C cannot connect to it. Hence,
1002 : * there is not a reliable way to provide a suitable error saying the server C
1003 : * will be broken at the end of this process (due to pg_resetwal).
1004 : */
1005 : static void
1006 14 : check_subscriber(const struct LogicalRepInfo *dbinfo)
1007 : {
1008 : PGconn *conn;
1009 : PGresult *res;
1010 14 : bool failed = false;
1011 :
1012 : int max_lrworkers;
1013 : int max_reporigins;
1014 : int max_wprocs;
1015 :
1016 14 : pg_log_info("checking settings on subscriber");
1017 :
1018 14 : conn = connect_database(dbinfo[0].subconninfo, true);
1019 :
1020 : /* The target server must be a standby */
1021 14 : if (!server_is_in_recovery(conn))
1022 : {
1023 2 : pg_log_error("target server must be a standby");
1024 2 : disconnect_database(conn, true);
1025 : }
1026 :
1027 : /*------------------------------------------------------------------------
1028 : * Logical replication requires a few parameters to be set on subscriber.
1029 : * Since these parameters are not a requirement for physical replication,
1030 : * we should check it to make sure it won't fail.
1031 : *
1032 : * - max_active_replication_origins >= number of dbs to be converted
1033 : * - max_logical_replication_workers >= number of dbs to be converted
1034 : * - max_worker_processes >= 1 + number of dbs to be converted
1035 : *------------------------------------------------------------------------
1036 : */
1037 12 : res = PQexec(conn,
1038 : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1039 : "'max_logical_replication_workers', "
1040 : "'max_active_replication_origins', "
1041 : "'max_worker_processes', "
1042 : "'primary_slot_name') "
1043 : "ORDER BY name");
1044 :
1045 12 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1046 : {
1047 0 : pg_log_error("could not obtain subscriber settings: %s",
1048 : PQresultErrorMessage(res));
1049 0 : disconnect_database(conn, true);
1050 : }
1051 :
1052 12 : max_reporigins = atoi(PQgetvalue(res, 0, 0));
1053 12 : max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1054 12 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1055 12 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1056 10 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1057 :
1058 12 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1059 : max_lrworkers);
1060 12 : pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1061 12 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1062 12 : if (primary_slot_name)
1063 10 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1064 :
1065 12 : PQclear(res);
1066 :
1067 12 : disconnect_database(conn, false);
1068 :
1069 12 : if (max_reporigins < num_dbs)
1070 : {
1071 2 : pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1072 : num_dbs, max_reporigins);
1073 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1074 : "max_active_replication_origins", num_dbs);
1075 2 : failed = true;
1076 : }
1077 :
1078 12 : if (max_lrworkers < num_dbs)
1079 : {
1080 2 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1081 : num_dbs, max_lrworkers);
1082 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1083 : "max_logical_replication_workers", num_dbs);
1084 2 : failed = true;
1085 : }
1086 :
1087 12 : if (max_wprocs < num_dbs + 1)
1088 : {
1089 2 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1090 : num_dbs + 1, max_wprocs);
1091 2 : pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1092 : "max_worker_processes", num_dbs + 1);
1093 2 : failed = true;
1094 : }
1095 :
1096 12 : if (failed)
1097 2 : exit(1);
1098 10 : }
1099 :
1100 : /*
1101 : * Drop a specified subscription. This is to avoid duplicate subscriptions on
1102 : * the primary (publisher node) and the newly created subscriber. We
1103 : * shouldn't drop the associated slot as that would be used by the publisher
1104 : * node.
1105 : */
1106 : static void
1107 6 : drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
1108 : {
1109 6 : PQExpBuffer query = createPQExpBuffer();
1110 : PGresult *res;
1111 :
1112 : Assert(conn != NULL);
1113 :
1114 : /*
1115 : * Construct a query string. These commands are allowed to be executed
1116 : * within a transaction.
1117 : */
1118 6 : appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1119 : subname);
1120 6 : appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1121 : subname);
1122 6 : appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1123 :
1124 6 : pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1125 : subname, dbname);
1126 :
1127 6 : if (!dry_run)
1128 : {
1129 2 : res = PQexec(conn, query->data);
1130 :
1131 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1132 : {
1133 0 : pg_log_error("could not drop subscription \"%s\": %s",
1134 : subname, PQresultErrorMessage(res));
1135 0 : disconnect_database(conn, true);
1136 : }
1137 :
1138 2 : PQclear(res);
1139 : }
1140 :
1141 6 : destroyPQExpBuffer(query);
1142 6 : }
1143 :
1144 : /*
1145 : * Retrieve and drop the pre-existing subscriptions.
1146 : */
1147 : static void
1148 10 : check_and_drop_existing_subscriptions(PGconn *conn,
1149 : const struct LogicalRepInfo *dbinfo)
1150 : {
1151 10 : PQExpBuffer query = createPQExpBuffer();
1152 : char *dbname;
1153 : PGresult *res;
1154 :
1155 : Assert(conn != NULL);
1156 :
1157 10 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1158 :
1159 10 : appendPQExpBuffer(query,
1160 : "SELECT s.subname FROM pg_catalog.pg_subscription s "
1161 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1162 : "WHERE d.datname = %s",
1163 : dbname);
1164 10 : res = PQexec(conn, query->data);
1165 :
1166 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1167 : {
1168 0 : pg_log_error("could not obtain pre-existing subscriptions: %s",
1169 : PQresultErrorMessage(res));
1170 0 : disconnect_database(conn, true);
1171 : }
1172 :
1173 16 : for (int i = 0; i < PQntuples(res); i++)
1174 6 : drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
1175 6 : dbinfo->dbname);
1176 :
1177 10 : PQclear(res);
1178 10 : destroyPQExpBuffer(query);
1179 10 : PQfreemem(dbname);
1180 10 : }
1181 :
1182 : /*
1183 : * Create the subscriptions, adjust the initial location for logical
1184 : * replication and enable the subscriptions. That's the last step for logical
1185 : * replication setup.
1186 : */
1187 : static void
1188 6 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1189 : {
1190 16 : for (int i = 0; i < num_dbs; i++)
1191 : {
1192 : PGconn *conn;
1193 :
1194 : /* Connect to subscriber. */
1195 10 : conn = connect_database(dbinfo[i].subconninfo, true);
1196 :
1197 : /*
1198 : * We don't need the pre-existing subscriptions on the newly formed
1199 : * subscriber. They can connect to other publisher nodes and either
1200 : * get some unwarranted data or can lead to ERRORs in connecting to
1201 : * such nodes.
1202 : */
1203 10 : check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1204 :
1205 : /* Check and drop the required publications in the given database. */
1206 10 : check_and_drop_publications(conn, &dbinfo[i]);
1207 :
1208 10 : create_subscription(conn, &dbinfo[i]);
1209 :
1210 : /* Set the replication progress to the correct LSN */
1211 10 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1212 :
1213 : /* Enable subscription */
1214 10 : enable_subscription(conn, &dbinfo[i]);
1215 :
1216 10 : disconnect_database(conn, false);
1217 : }
1218 6 : }
1219 :
1220 : /*
1221 : * Write the required recovery parameters.
1222 : */
1223 : static void
1224 6 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1225 : {
1226 : PGconn *conn;
1227 : PQExpBuffer recoveryconfcontents;
1228 :
1229 : /*
1230 : * Despite of the recovery parameters will be written to the subscriber,
1231 : * use a publisher connection. The primary_conninfo is generated using the
1232 : * connection settings.
1233 : */
1234 6 : conn = connect_database(dbinfo[0].pubconninfo, true);
1235 :
1236 : /*
1237 : * Write recovery parameters.
1238 : *
1239 : * The subscriber is not running yet. In dry run mode, the recovery
1240 : * parameters *won't* be written. An invalid LSN is used for printing
1241 : * purposes. Additional recovery parameters are added here. It avoids
1242 : * unexpected behavior such as end of recovery as soon as a consistent
1243 : * state is reached (recovery_target) and failure due to multiple recovery
1244 : * targets (name, time, xid, LSN).
1245 : */
1246 6 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1247 6 : appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1248 6 : appendPQExpBuffer(recoveryconfcontents,
1249 : "recovery_target_timeline = 'latest'\n");
1250 6 : appendPQExpBuffer(recoveryconfcontents,
1251 : "recovery_target_inclusive = true\n");
1252 6 : appendPQExpBuffer(recoveryconfcontents,
1253 : "recovery_target_action = promote\n");
1254 6 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1255 6 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1256 6 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1257 :
1258 6 : if (dry_run)
1259 : {
1260 4 : appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1261 4 : appendPQExpBuffer(recoveryconfcontents,
1262 : "recovery_target_lsn = '%X/%X'\n",
1263 4 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1264 : }
1265 : else
1266 : {
1267 2 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1268 : lsn);
1269 2 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1270 : }
1271 6 : disconnect_database(conn, false);
1272 :
1273 6 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1274 6 : }
1275 :
1276 : /*
1277 : * Drop physical replication slot on primary if the standby was using it. After
1278 : * the transformation, it has no use.
1279 : *
1280 : * XXX we might not fail here. Instead, we provide a warning so the user
1281 : * eventually drops this replication slot later.
1282 : */
1283 : static void
1284 6 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1285 : {
1286 : PGconn *conn;
1287 :
1288 : /* Replication slot does not exist, do nothing */
1289 6 : if (!primary_slot_name)
1290 0 : return;
1291 :
1292 6 : conn = connect_database(dbinfo[0].pubconninfo, false);
1293 6 : if (conn != NULL)
1294 : {
1295 6 : drop_replication_slot(conn, &dbinfo[0], slotname);
1296 6 : disconnect_database(conn, false);
1297 : }
1298 : else
1299 : {
1300 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1301 : slotname);
1302 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1303 : }
1304 : }
1305 :
1306 : /*
1307 : * Drop failover replication slots on subscriber. After the transformation,
1308 : * they have no use.
1309 : *
1310 : * XXX We do not fail here. Instead, we provide a warning so the user can drop
1311 : * them later.
1312 : */
1313 : static void
1314 6 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1315 : {
1316 : PGconn *conn;
1317 : PGresult *res;
1318 :
1319 6 : conn = connect_database(dbinfo[0].subconninfo, false);
1320 6 : if (conn != NULL)
1321 : {
1322 : /* Get failover replication slot names */
1323 6 : res = PQexec(conn,
1324 : "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1325 :
1326 6 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
1327 : {
1328 : /* Remove failover replication slots from subscriber */
1329 12 : for (int i = 0; i < PQntuples(res); i++)
1330 6 : drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1331 : }
1332 : else
1333 : {
1334 0 : pg_log_warning("could not obtain failover replication slot information: %s",
1335 : PQresultErrorMessage(res));
1336 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1337 : }
1338 :
1339 6 : PQclear(res);
1340 6 : disconnect_database(conn, false);
1341 : }
1342 : else
1343 : {
1344 0 : pg_log_warning("could not drop failover replication slot");
1345 0 : pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1346 : }
1347 6 : }
1348 :
1349 : /*
1350 : * Create a logical replication slot and returns a LSN.
1351 : *
1352 : * CreateReplicationSlot() is not used because it does not provide the one-row
1353 : * result set that contains the LSN.
1354 : */
1355 : static char *
1356 10 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1357 : {
1358 10 : PQExpBuffer str = createPQExpBuffer();
1359 10 : PGresult *res = NULL;
1360 10 : const char *slot_name = dbinfo->replslotname;
1361 : char *slot_name_esc;
1362 10 : char *lsn = NULL;
1363 :
1364 : Assert(conn != NULL);
1365 :
1366 10 : pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1367 : slot_name, dbinfo->dbname);
1368 :
1369 10 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1370 :
1371 10 : appendPQExpBuffer(str,
1372 : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1373 : slot_name_esc,
1374 10 : dbinfos.two_phase ? "true" : "false");
1375 :
1376 10 : PQfreemem(slot_name_esc);
1377 :
1378 10 : pg_log_debug("command is: %s", str->data);
1379 :
1380 10 : if (!dry_run)
1381 : {
1382 4 : res = PQexec(conn, str->data);
1383 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1384 : {
1385 0 : pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1386 : slot_name, dbinfo->dbname,
1387 : PQresultErrorMessage(res));
1388 0 : PQclear(res);
1389 0 : destroyPQExpBuffer(str);
1390 0 : return NULL;
1391 : }
1392 :
1393 4 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1394 4 : PQclear(res);
1395 : }
1396 :
1397 : /* For cleanup purposes */
1398 10 : dbinfo->made_replslot = true;
1399 :
1400 10 : destroyPQExpBuffer(str);
1401 :
1402 10 : return lsn;
1403 : }
1404 :
1405 : static void
1406 12 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1407 : const char *slot_name)
1408 : {
1409 12 : PQExpBuffer str = createPQExpBuffer();
1410 : char *slot_name_esc;
1411 : PGresult *res;
1412 :
1413 : Assert(conn != NULL);
1414 :
1415 12 : pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1416 : slot_name, dbinfo->dbname);
1417 :
1418 12 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1419 :
1420 12 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1421 :
1422 12 : PQfreemem(slot_name_esc);
1423 :
1424 12 : pg_log_debug("command is: %s", str->data);
1425 :
1426 12 : if (!dry_run)
1427 : {
1428 4 : res = PQexec(conn, str->data);
1429 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1430 : {
1431 0 : pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1432 : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1433 0 : dbinfo->made_replslot = false; /* don't try again. */
1434 : }
1435 :
1436 4 : PQclear(res);
1437 : }
1438 :
1439 12 : destroyPQExpBuffer(str);
1440 12 : }
1441 :
1442 : /*
1443 : * Reports a suitable message if pg_ctl fails.
1444 : */
1445 : static void
1446 40 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1447 : {
1448 40 : if (rc != 0)
1449 : {
1450 0 : if (WIFEXITED(rc))
1451 : {
1452 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1453 : }
1454 0 : else if (WIFSIGNALED(rc))
1455 : {
1456 : #if defined(WIN32)
1457 : pg_log_error("pg_ctl was terminated by exception 0x%X",
1458 : WTERMSIG(rc));
1459 : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1460 : #else
1461 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1462 : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1463 : #endif
1464 : }
1465 : else
1466 : {
1467 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1468 : }
1469 :
1470 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1471 0 : exit(1);
1472 : }
1473 40 : }
1474 :
1475 : static void
1476 20 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1477 : bool restrict_logical_worker)
1478 : {
1479 20 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1480 : int rc;
1481 :
1482 20 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1483 20 : appendShellString(pg_ctl_cmd, subscriber_dir);
1484 20 : appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1485 :
1486 : /* Prevent unintended slot invalidation */
1487 20 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1488 :
1489 20 : if (restricted_access)
1490 : {
1491 20 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1492 : #if !defined(WIN32)
1493 :
1494 : /*
1495 : * An empty listen_addresses list means the server does not listen on
1496 : * any IP interfaces; only Unix-domain sockets can be used to connect
1497 : * to the server. Prevent external connections to minimize the chance
1498 : * of failure.
1499 : */
1500 20 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1501 20 : if (opt->socket_dir)
1502 20 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1503 : opt->socket_dir);
1504 20 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1505 : #endif
1506 : }
1507 20 : if (opt->config_file != NULL)
1508 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1509 : opt->config_file);
1510 :
1511 : /* Suppress to start logical replication if requested */
1512 20 : if (restrict_logical_worker)
1513 6 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1514 :
1515 20 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1516 20 : rc = system(pg_ctl_cmd->data);
1517 20 : pg_ctl_status(pg_ctl_cmd->data, rc);
1518 20 : standby_running = true;
1519 20 : destroyPQExpBuffer(pg_ctl_cmd);
1520 20 : pg_log_info("server was started");
1521 20 : }
1522 :
1523 : static void
1524 20 : stop_standby_server(const char *datadir)
1525 : {
1526 : char *pg_ctl_cmd;
1527 : int rc;
1528 :
1529 20 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1530 : datadir);
1531 20 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1532 20 : rc = system(pg_ctl_cmd);
1533 20 : pg_ctl_status(pg_ctl_cmd, rc);
1534 20 : standby_running = false;
1535 20 : pg_log_info("server was stopped");
1536 20 : }
1537 :
1538 : /*
1539 : * Returns after the server finishes the recovery process.
1540 : *
1541 : * If recovery_timeout option is set, terminate abnormally without finishing
1542 : * the recovery process. By default, it waits forever.
1543 : *
1544 : * XXX Is the recovery process still in progress? When recovery process has a
1545 : * better progress reporting mechanism, it should be added here.
1546 : */
1547 : static void
1548 6 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1549 : {
1550 : PGconn *conn;
1551 6 : int status = POSTMASTER_STILL_STARTING;
1552 6 : int timer = 0;
1553 :
1554 6 : pg_log_info("waiting for the target server to reach the consistent state");
1555 :
1556 6 : conn = connect_database(conninfo, true);
1557 :
1558 : for (;;)
1559 0 : {
1560 6 : bool in_recovery = server_is_in_recovery(conn);
1561 :
1562 : /*
1563 : * Does the recovery process finish? In dry run mode, there is no
1564 : * recovery mode. Bail out as the recovery process has ended.
1565 : */
1566 6 : if (!in_recovery || dry_run)
1567 : {
1568 6 : status = POSTMASTER_READY;
1569 6 : recovery_ended = true;
1570 6 : break;
1571 : }
1572 :
1573 : /* Bail out after recovery_timeout seconds if this option is set */
1574 0 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1575 : {
1576 0 : stop_standby_server(subscriber_dir);
1577 0 : pg_log_error("recovery timed out");
1578 0 : disconnect_database(conn, true);
1579 : }
1580 :
1581 : /* Keep waiting */
1582 0 : pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
1583 :
1584 0 : timer += WAIT_INTERVAL;
1585 : }
1586 :
1587 6 : disconnect_database(conn, false);
1588 :
1589 6 : if (status == POSTMASTER_STILL_STARTING)
1590 0 : pg_fatal("server did not end recovery");
1591 :
1592 6 : pg_log_info("target server reached the consistent state");
1593 6 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1594 6 : }
1595 :
1596 : /*
1597 : * Create a publication that includes all tables in the database.
1598 : */
1599 : static void
1600 10 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1601 : {
1602 10 : PQExpBuffer str = createPQExpBuffer();
1603 : PGresult *res;
1604 : char *ipubname_esc;
1605 : char *spubname_esc;
1606 :
1607 : Assert(conn != NULL);
1608 :
1609 10 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1610 10 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1611 :
1612 : /* Check if the publication already exists */
1613 10 : appendPQExpBuffer(str,
1614 : "SELECT 1 FROM pg_catalog.pg_publication "
1615 : "WHERE pubname = %s",
1616 : spubname_esc);
1617 10 : res = PQexec(conn, str->data);
1618 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1619 : {
1620 0 : pg_log_error("could not obtain publication information: %s",
1621 : PQresultErrorMessage(res));
1622 0 : disconnect_database(conn, true);
1623 : }
1624 :
1625 10 : if (PQntuples(res) == 1)
1626 : {
1627 : /*
1628 : * Unfortunately, if it reaches this code path, it will always fail
1629 : * (unless you decide to change the existing publication name). That's
1630 : * bad but it is very unlikely that the user will choose a name with
1631 : * pg_createsubscriber_ prefix followed by the exact database oid and
1632 : * a random number.
1633 : */
1634 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1635 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
1636 0 : disconnect_database(conn, true);
1637 : }
1638 :
1639 10 : PQclear(res);
1640 10 : resetPQExpBuffer(str);
1641 :
1642 10 : pg_log_info("creating publication \"%s\" in database \"%s\"",
1643 : dbinfo->pubname, dbinfo->dbname);
1644 :
1645 10 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1646 : ipubname_esc);
1647 :
1648 10 : pg_log_debug("command is: %s", str->data);
1649 :
1650 10 : if (!dry_run)
1651 : {
1652 4 : res = PQexec(conn, str->data);
1653 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1654 : {
1655 0 : pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1656 : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1657 0 : disconnect_database(conn, true);
1658 : }
1659 4 : PQclear(res);
1660 : }
1661 :
1662 : /* For cleanup purposes */
1663 10 : dbinfo->made_publication = true;
1664 :
1665 10 : PQfreemem(ipubname_esc);
1666 10 : PQfreemem(spubname_esc);
1667 10 : destroyPQExpBuffer(str);
1668 10 : }
1669 :
1670 : /*
1671 : * Drop the specified publication in the given database.
1672 : */
1673 : static void
1674 14 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1675 : bool *made_publication)
1676 : {
1677 14 : PQExpBuffer str = createPQExpBuffer();
1678 : PGresult *res;
1679 : char *pubname_esc;
1680 :
1681 : Assert(conn != NULL);
1682 :
1683 14 : pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1684 :
1685 14 : pg_log_info("dropping publication \"%s\" in database \"%s\"",
1686 : pubname, dbname);
1687 :
1688 14 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1689 :
1690 14 : PQfreemem(pubname_esc);
1691 :
1692 14 : pg_log_debug("command is: %s", str->data);
1693 :
1694 14 : if (!dry_run)
1695 : {
1696 8 : res = PQexec(conn, str->data);
1697 8 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1698 : {
1699 0 : pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1700 : pubname, dbname, PQresultErrorMessage(res));
1701 0 : *made_publication = false; /* don't try again. */
1702 :
1703 : /*
1704 : * Don't disconnect and exit here. This routine is used by primary
1705 : * (cleanup publication / replication slot due to an error) and
1706 : * subscriber (remove the replicated publications). In both cases,
1707 : * it can continue and provide instructions for the user to remove
1708 : * it later if cleanup fails.
1709 : */
1710 : }
1711 8 : PQclear(res);
1712 : }
1713 :
1714 14 : destroyPQExpBuffer(str);
1715 14 : }
1716 :
1717 : /*
1718 : * Retrieve and drop the publications.
1719 : *
1720 : * Since the publications were created before the consistent LSN, they
1721 : * remain on the subscriber even after the physical replica is
1722 : * promoted. Remove these publications from the subscriber because
1723 : * they have no use. Additionally, if requested, drop all pre-existing
1724 : * publications.
1725 : */
1726 : static void
1727 10 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
1728 : {
1729 : PGresult *res;
1730 10 : bool drop_all_pubs = dbinfos.objecttypes_to_remove & OBJECTTYPE_PUBLICATIONS;
1731 :
1732 : Assert(conn != NULL);
1733 :
1734 10 : if (drop_all_pubs)
1735 : {
1736 4 : pg_log_info("dropping all existing publications in database \"%s\"",
1737 : dbinfo->dbname);
1738 :
1739 : /* Fetch all publication names */
1740 4 : res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1741 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1742 : {
1743 0 : pg_log_error("could not obtain publication information: %s",
1744 : PQresultErrorMessage(res));
1745 0 : PQclear(res);
1746 0 : disconnect_database(conn, true);
1747 : }
1748 :
1749 : /* Drop each publication */
1750 12 : for (int i = 0; i < PQntuples(res); i++)
1751 8 : drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1752 : &dbinfo->made_publication);
1753 :
1754 4 : PQclear(res);
1755 : }
1756 :
1757 : /*
1758 : * In dry-run mode, we don't create publications, but we still try to drop
1759 : * those to provide necessary information to the user.
1760 : */
1761 10 : if (!drop_all_pubs || dry_run)
1762 6 : drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1763 : &dbinfo->made_publication);
1764 10 : }
1765 :
1766 : /*
1767 : * Create a subscription with some predefined options.
1768 : *
1769 : * A replication slot was already created in a previous step. Let's use it. It
1770 : * is not required to copy data. The subscription will be created but it will
1771 : * not be enabled now. That's because the replication progress must be set and
1772 : * the replication origin name (one of the function arguments) contains the
1773 : * subscription OID in its name. Once the subscription is created,
1774 : * set_replication_progress() can obtain the chosen origin name and set up its
1775 : * initial location.
1776 : */
1777 : static void
1778 10 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1779 : {
1780 10 : PQExpBuffer str = createPQExpBuffer();
1781 : PGresult *res;
1782 : char *pubname_esc;
1783 : char *subname_esc;
1784 : char *pubconninfo_esc;
1785 : char *replslotname_esc;
1786 :
1787 : Assert(conn != NULL);
1788 :
1789 10 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1790 10 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1791 10 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1792 10 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1793 :
1794 10 : pg_log_info("creating subscription \"%s\" in database \"%s\"",
1795 : dbinfo->subname, dbinfo->dbname);
1796 :
1797 10 : appendPQExpBuffer(str,
1798 : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1799 : "WITH (create_slot = false, enabled = false, "
1800 : "slot_name = %s, copy_data = false, two_phase = %s)",
1801 : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1802 10 : dbinfos.two_phase ? "true" : "false");
1803 :
1804 10 : PQfreemem(pubname_esc);
1805 10 : PQfreemem(subname_esc);
1806 10 : PQfreemem(pubconninfo_esc);
1807 10 : PQfreemem(replslotname_esc);
1808 :
1809 10 : pg_log_debug("command is: %s", str->data);
1810 :
1811 10 : if (!dry_run)
1812 : {
1813 4 : res = PQexec(conn, str->data);
1814 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1815 : {
1816 0 : pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1817 : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1818 0 : disconnect_database(conn, true);
1819 : }
1820 4 : PQclear(res);
1821 : }
1822 :
1823 10 : destroyPQExpBuffer(str);
1824 10 : }
1825 :
1826 : /*
1827 : * Sets the replication progress to the consistent LSN.
1828 : *
1829 : * The subscriber caught up to the consistent LSN provided by the last
1830 : * replication slot that was created. The goal is to set up the initial
1831 : * location for the logical replication that is the exact LSN that the
1832 : * subscriber was promoted. Once the subscription is enabled it will start
1833 : * streaming from that location onwards. In dry run mode, the subscription OID
1834 : * and LSN are set to invalid values for printing purposes.
1835 : */
1836 : static void
1837 10 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1838 : {
1839 10 : PQExpBuffer str = createPQExpBuffer();
1840 : PGresult *res;
1841 : Oid suboid;
1842 : char *subname;
1843 : char *dbname;
1844 : char *originname;
1845 : char *lsnstr;
1846 :
1847 : Assert(conn != NULL);
1848 :
1849 10 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1850 10 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1851 :
1852 10 : appendPQExpBuffer(str,
1853 : "SELECT s.oid FROM pg_catalog.pg_subscription s "
1854 : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1855 : "WHERE s.subname = %s AND d.datname = %s",
1856 : subname, dbname);
1857 :
1858 10 : res = PQexec(conn, str->data);
1859 10 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1860 : {
1861 0 : pg_log_error("could not obtain subscription OID: %s",
1862 : PQresultErrorMessage(res));
1863 0 : disconnect_database(conn, true);
1864 : }
1865 :
1866 10 : if (PQntuples(res) != 1 && !dry_run)
1867 : {
1868 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1869 : PQntuples(res), 1);
1870 0 : disconnect_database(conn, true);
1871 : }
1872 :
1873 10 : if (dry_run)
1874 : {
1875 6 : suboid = InvalidOid;
1876 6 : lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1877 : }
1878 : else
1879 : {
1880 4 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1881 4 : lsnstr = psprintf("%s", lsn);
1882 : }
1883 :
1884 10 : PQclear(res);
1885 :
1886 : /*
1887 : * The origin name is defined as pg_%u. %u is the subscription OID. See
1888 : * ApplyWorkerMain().
1889 : */
1890 10 : originname = psprintf("pg_%u", suboid);
1891 :
1892 10 : pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1893 : originname, lsnstr, dbinfo->dbname);
1894 :
1895 10 : resetPQExpBuffer(str);
1896 10 : appendPQExpBuffer(str,
1897 : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1898 : originname, lsnstr);
1899 :
1900 10 : pg_log_debug("command is: %s", str->data);
1901 :
1902 10 : if (!dry_run)
1903 : {
1904 4 : res = PQexec(conn, str->data);
1905 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1906 : {
1907 0 : pg_log_error("could not set replication progress for subscription \"%s\": %s",
1908 : dbinfo->subname, PQresultErrorMessage(res));
1909 0 : disconnect_database(conn, true);
1910 : }
1911 4 : PQclear(res);
1912 : }
1913 :
1914 10 : PQfreemem(subname);
1915 10 : PQfreemem(dbname);
1916 10 : pg_free(originname);
1917 10 : pg_free(lsnstr);
1918 10 : destroyPQExpBuffer(str);
1919 10 : }
1920 :
1921 : /*
1922 : * Enables the subscription.
1923 : *
1924 : * The subscription was created in a previous step but it was disabled. After
1925 : * adjusting the initial logical replication location, enable the subscription.
1926 : */
1927 : static void
1928 10 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1929 : {
1930 10 : PQExpBuffer str = createPQExpBuffer();
1931 : PGresult *res;
1932 : char *subname;
1933 :
1934 : Assert(conn != NULL);
1935 :
1936 10 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1937 :
1938 10 : pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1939 : dbinfo->subname, dbinfo->dbname);
1940 :
1941 10 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1942 :
1943 10 : pg_log_debug("command is: %s", str->data);
1944 :
1945 10 : if (!dry_run)
1946 : {
1947 4 : res = PQexec(conn, str->data);
1948 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1949 : {
1950 0 : pg_log_error("could not enable subscription \"%s\": %s",
1951 : dbinfo->subname, PQresultErrorMessage(res));
1952 0 : disconnect_database(conn, true);
1953 : }
1954 :
1955 4 : PQclear(res);
1956 : }
1957 :
1958 10 : PQfreemem(subname);
1959 10 : destroyPQExpBuffer(str);
1960 10 : }
1961 :
1962 : int
1963 40 : main(int argc, char **argv)
1964 : {
1965 : static struct option long_options[] =
1966 : {
1967 : {"database", required_argument, NULL, 'd'},
1968 : {"pgdata", required_argument, NULL, 'D'},
1969 : {"dry-run", no_argument, NULL, 'n'},
1970 : {"subscriber-port", required_argument, NULL, 'p'},
1971 : {"publisher-server", required_argument, NULL, 'P'},
1972 : {"remove", required_argument, NULL, 'R'},
1973 : {"socketdir", required_argument, NULL, 's'},
1974 : {"recovery-timeout", required_argument, NULL, 't'},
1975 : {"enable-two-phase", no_argument, NULL, 'T'},
1976 : {"subscriber-username", required_argument, NULL, 'U'},
1977 : {"verbose", no_argument, NULL, 'v'},
1978 : {"version", no_argument, NULL, 'V'},
1979 : {"help", no_argument, NULL, '?'},
1980 : {"config-file", required_argument, NULL, 1},
1981 : {"publication", required_argument, NULL, 2},
1982 : {"replication-slot", required_argument, NULL, 3},
1983 : {"subscription", required_argument, NULL, 4},
1984 : {NULL, 0, NULL, 0}
1985 : };
1986 :
1987 40 : struct CreateSubscriberOptions opt = {0};
1988 :
1989 : int c;
1990 : int option_index;
1991 :
1992 : char *pub_base_conninfo;
1993 : char *sub_base_conninfo;
1994 40 : char *dbname_conninfo = NULL;
1995 :
1996 : uint64 pub_sysid;
1997 : uint64 sub_sysid;
1998 : struct stat statbuf;
1999 :
2000 : char *consistent_lsn;
2001 :
2002 : char pidfile[MAXPGPATH];
2003 :
2004 40 : pg_logging_init(argv[0]);
2005 40 : pg_logging_set_level(PG_LOG_WARNING);
2006 40 : progname = get_progname(argv[0]);
2007 40 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2008 :
2009 40 : if (argc > 1)
2010 : {
2011 38 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2012 : {
2013 2 : usage();
2014 2 : exit(0);
2015 : }
2016 36 : else if (strcmp(argv[1], "-V") == 0
2017 36 : || strcmp(argv[1], "--version") == 0)
2018 : {
2019 2 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2020 2 : exit(0);
2021 : }
2022 : }
2023 :
2024 : /* Default settings */
2025 36 : subscriber_dir = NULL;
2026 36 : opt.config_file = NULL;
2027 36 : opt.pub_conninfo_str = NULL;
2028 36 : opt.socket_dir = NULL;
2029 36 : opt.sub_port = DEFAULT_SUB_PORT;
2030 36 : opt.sub_username = NULL;
2031 36 : opt.two_phase = false;
2032 36 : opt.database_names = (SimpleStringList)
2033 : {
2034 : 0
2035 : };
2036 36 : opt.recovery_timeout = 0;
2037 :
2038 : /*
2039 : * Don't allow it to be run as root. It uses pg_ctl which does not allow
2040 : * it either.
2041 : */
2042 : #ifndef WIN32
2043 36 : if (geteuid() == 0)
2044 : {
2045 0 : pg_log_error("cannot be executed by \"root\"");
2046 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2047 : progname);
2048 0 : exit(1);
2049 : }
2050 : #endif
2051 :
2052 36 : get_restricted_token();
2053 :
2054 272 : while ((c = getopt_long(argc, argv, "d:D:np:P:R:s:t:TU:v",
2055 : long_options, &option_index)) != -1)
2056 : {
2057 242 : switch (c)
2058 : {
2059 48 : case 'd':
2060 48 : if (!simple_string_list_member(&opt.database_names, optarg))
2061 : {
2062 46 : simple_string_list_append(&opt.database_names, optarg);
2063 46 : num_dbs++;
2064 : }
2065 : else
2066 : {
2067 2 : pg_log_error("database \"%s\" specified more than once", optarg);
2068 2 : exit(1);
2069 : }
2070 46 : break;
2071 32 : case 'D':
2072 32 : subscriber_dir = pg_strdup(optarg);
2073 32 : canonicalize_path(subscriber_dir);
2074 32 : break;
2075 14 : case 'n':
2076 14 : dry_run = true;
2077 14 : break;
2078 18 : case 'p':
2079 18 : opt.sub_port = pg_strdup(optarg);
2080 18 : break;
2081 30 : case 'P':
2082 30 : opt.pub_conninfo_str = pg_strdup(optarg);
2083 30 : break;
2084 2 : case 'R':
2085 2 : if (!simple_string_list_member(&opt.objecttypes_to_remove, optarg))
2086 2 : simple_string_list_append(&opt.objecttypes_to_remove, optarg);
2087 : else
2088 0 : pg_fatal("object type \"%s\" is specified more than once for --remove", optarg);
2089 2 : break;
2090 18 : case 's':
2091 18 : opt.socket_dir = pg_strdup(optarg);
2092 18 : canonicalize_path(opt.socket_dir);
2093 18 : break;
2094 4 : case 't':
2095 4 : opt.recovery_timeout = atoi(optarg);
2096 4 : break;
2097 2 : case 'T':
2098 2 : opt.two_phase = true;
2099 2 : break;
2100 0 : case 'U':
2101 0 : opt.sub_username = pg_strdup(optarg);
2102 0 : break;
2103 32 : case 'v':
2104 32 : pg_logging_increase_verbosity();
2105 32 : break;
2106 0 : case 1:
2107 0 : opt.config_file = pg_strdup(optarg);
2108 0 : break;
2109 22 : case 2:
2110 22 : if (!simple_string_list_member(&opt.pub_names, optarg))
2111 : {
2112 20 : simple_string_list_append(&opt.pub_names, optarg);
2113 20 : num_pubs++;
2114 : }
2115 : else
2116 : {
2117 2 : pg_log_error("publication \"%s\" specified more than once", optarg);
2118 2 : exit(1);
2119 : }
2120 20 : break;
2121 8 : case 3:
2122 8 : if (!simple_string_list_member(&opt.replslot_names, optarg))
2123 : {
2124 8 : simple_string_list_append(&opt.replslot_names, optarg);
2125 8 : num_replslots++;
2126 : }
2127 : else
2128 : {
2129 0 : pg_log_error("replication slot \"%s\" specified more than once", optarg);
2130 0 : exit(1);
2131 : }
2132 8 : break;
2133 10 : case 4:
2134 10 : if (!simple_string_list_member(&opt.sub_names, optarg))
2135 : {
2136 10 : simple_string_list_append(&opt.sub_names, optarg);
2137 10 : num_subs++;
2138 : }
2139 : else
2140 : {
2141 0 : pg_log_error("subscription \"%s\" specified more than once", optarg);
2142 0 : exit(1);
2143 : }
2144 10 : break;
2145 2 : default:
2146 : /* getopt_long already emitted a complaint */
2147 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2148 2 : exit(1);
2149 : }
2150 : }
2151 :
2152 : /* Any non-option arguments? */
2153 30 : if (optind < argc)
2154 : {
2155 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
2156 : argv[optind]);
2157 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2158 0 : exit(1);
2159 : }
2160 :
2161 : /* Required arguments */
2162 30 : if (subscriber_dir == NULL)
2163 : {
2164 2 : pg_log_error("no subscriber data directory specified");
2165 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2166 2 : exit(1);
2167 : }
2168 :
2169 : /* If socket directory is not provided, use the current directory */
2170 28 : if (opt.socket_dir == NULL)
2171 : {
2172 : char cwd[MAXPGPATH];
2173 :
2174 10 : if (!getcwd(cwd, MAXPGPATH))
2175 0 : pg_fatal("could not determine current directory");
2176 10 : opt.socket_dir = pg_strdup(cwd);
2177 10 : canonicalize_path(opt.socket_dir);
2178 : }
2179 :
2180 : /*
2181 : * Parse connection string. Build a base connection string that might be
2182 : * reused by multiple databases.
2183 : */
2184 28 : if (opt.pub_conninfo_str == NULL)
2185 : {
2186 : /*
2187 : * TODO use primary_conninfo (if available) from subscriber and
2188 : * extract publisher connection string. Assume that there are
2189 : * identical entries for physical and logical replication. If there is
2190 : * not, we would fail anyway.
2191 : */
2192 2 : pg_log_error("no publisher connection string specified");
2193 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2194 2 : exit(1);
2195 : }
2196 26 : pg_log_info("validating publisher connection string");
2197 26 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2198 : &dbname_conninfo);
2199 26 : if (pub_base_conninfo == NULL)
2200 0 : exit(1);
2201 :
2202 26 : pg_log_info("validating subscriber connection string");
2203 26 : sub_base_conninfo = get_sub_conninfo(&opt);
2204 :
2205 26 : if (opt.database_names.head == NULL)
2206 : {
2207 4 : pg_log_info("no database was specified");
2208 :
2209 : /*
2210 : * If --database option is not provided, try to obtain the dbname from
2211 : * the publisher conninfo. If dbname parameter is not available, error
2212 : * out.
2213 : */
2214 4 : if (dbname_conninfo)
2215 : {
2216 2 : simple_string_list_append(&opt.database_names, dbname_conninfo);
2217 2 : num_dbs++;
2218 :
2219 2 : pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2220 : dbname_conninfo);
2221 : }
2222 : else
2223 : {
2224 2 : pg_log_error("no database name specified");
2225 2 : pg_log_error_hint("Try \"%s --help\" for more information.",
2226 : progname);
2227 2 : exit(1);
2228 : }
2229 : }
2230 :
2231 : /* Number of object names must match number of databases */
2232 24 : if (num_pubs > 0 && num_pubs != num_dbs)
2233 : {
2234 2 : pg_log_error("wrong number of publication names specified");
2235 2 : pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2236 : num_pubs, num_dbs);
2237 2 : exit(1);
2238 : }
2239 22 : if (num_subs > 0 && num_subs != num_dbs)
2240 : {
2241 2 : pg_log_error("wrong number of subscription names specified");
2242 2 : pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2243 : num_subs, num_dbs);
2244 2 : exit(1);
2245 : }
2246 20 : if (num_replslots > 0 && num_replslots != num_dbs)
2247 : {
2248 2 : pg_log_error("wrong number of replication slot names specified");
2249 2 : pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2250 : num_replslots, num_dbs);
2251 2 : exit(1);
2252 : }
2253 :
2254 : /* Verify the object types specified for removal from the subscriber */
2255 20 : for (SimpleStringListCell *cell = opt.objecttypes_to_remove.head; cell; cell = cell->next)
2256 : {
2257 2 : if (pg_strcasecmp(cell->val, "publications") == 0)
2258 2 : dbinfos.objecttypes_to_remove |= OBJECTTYPE_PUBLICATIONS;
2259 : else
2260 : {
2261 0 : pg_log_error("invalid object type \"%s\" specified for --remove", cell->val);
2262 0 : pg_log_error_hint("The valid option is: \"publications\"");
2263 0 : exit(1);
2264 : }
2265 : }
2266 :
2267 : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2268 18 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2269 18 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2270 :
2271 : /* Rudimentary check for a data directory */
2272 18 : check_data_directory(subscriber_dir);
2273 :
2274 18 : dbinfos.two_phase = opt.two_phase;
2275 :
2276 : /*
2277 : * Store database information for publisher and subscriber. It should be
2278 : * called before atexit() because its return is used in the
2279 : * cleanup_objects_atexit().
2280 : */
2281 18 : dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2282 :
2283 : /* Register a function to clean up objects in case of failure */
2284 18 : atexit(cleanup_objects_atexit);
2285 :
2286 : /*
2287 : * Check if the subscriber data directory has the same system identifier
2288 : * than the publisher data directory.
2289 : */
2290 18 : pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
2291 18 : sub_sysid = get_standby_sysid(subscriber_dir);
2292 18 : if (pub_sysid != sub_sysid)
2293 2 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2294 :
2295 : /* Subscriber PID file */
2296 16 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2297 :
2298 : /*
2299 : * The standby server must not be running. If the server is started under
2300 : * service manager and pg_createsubscriber stops it, the service manager
2301 : * might react to this action and start the server again. Therefore,
2302 : * refuse to proceed if the server is running to avoid possible failures.
2303 : */
2304 16 : if (stat(pidfile, &statbuf) == 0)
2305 : {
2306 2 : pg_log_error("standby server is running");
2307 2 : pg_log_error_hint("Stop the standby server and try again.");
2308 2 : exit(1);
2309 : }
2310 :
2311 : /*
2312 : * Start a short-lived standby server with temporary parameters (provided
2313 : * by command-line options). The goal is to avoid connections during the
2314 : * transformation steps.
2315 : */
2316 14 : pg_log_info("starting the standby server with command-line options");
2317 14 : start_standby_server(&opt, true, false);
2318 :
2319 : /* Check if the standby server is ready for logical replication */
2320 14 : check_subscriber(dbinfos.dbinfo);
2321 :
2322 : /* Check if the primary server is ready for logical replication */
2323 10 : check_publisher(dbinfos.dbinfo);
2324 :
2325 : /*
2326 : * Stop the target server. The recovery process requires that the server
2327 : * reaches a consistent state before targeting the recovery stop point.
2328 : * Make sure a consistent state is reached (stop the target server
2329 : * guarantees it) *before* creating the replication slots in
2330 : * setup_publisher().
2331 : */
2332 6 : pg_log_info("stopping the subscriber");
2333 6 : stop_standby_server(subscriber_dir);
2334 :
2335 : /* Create the required objects for each database on publisher */
2336 6 : consistent_lsn = setup_publisher(dbinfos.dbinfo);
2337 :
2338 : /* Write the required recovery parameters */
2339 6 : setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2340 :
2341 : /*
2342 : * Start subscriber so the recovery parameters will take effect. Wait
2343 : * until accepting connections. We don't want to start logical replication
2344 : * during setup.
2345 : */
2346 6 : pg_log_info("starting the subscriber");
2347 6 : start_standby_server(&opt, true, true);
2348 :
2349 : /* Waiting the subscriber to be promoted */
2350 6 : wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
2351 :
2352 : /*
2353 : * Create the subscription for each database on subscriber. It does not
2354 : * enable it immediately because it needs to adjust the replication start
2355 : * point to the LSN reported by setup_publisher(). It also cleans up
2356 : * publications created by this tool and replication to the standby.
2357 : */
2358 6 : setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2359 :
2360 : /* Remove primary_slot_name if it exists on primary */
2361 6 : drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
2362 :
2363 : /* Remove failover replication slots if they exist on subscriber */
2364 6 : drop_failover_replication_slots(dbinfos.dbinfo);
2365 :
2366 : /* Stop the subscriber */
2367 6 : pg_log_info("stopping the subscriber");
2368 6 : stop_standby_server(subscriber_dir);
2369 :
2370 : /* Change system identifier from subscriber */
2371 6 : modify_subscriber_sysid(&opt);
2372 :
2373 6 : success = true;
2374 :
2375 6 : pg_log_info("Done!");
2376 :
2377 6 : return 0;
2378 : }
|