LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_createsubscriber.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 691 820 84.3 %
Date: 2025-02-22 07:14:56 Functions: 37 37 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_createsubscriber.c
       4             :  *    Create a new logical replica from a standby server
       5             :  *
       6             :  * Copyright (c) 2024-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/bin/pg_basebackup/pg_createsubscriber.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : 
      14             : #include "postgres_fe.h"
      15             : 
      16             : #include <sys/stat.h>
      17             : #include <sys/time.h>
      18             : #include <sys/wait.h>
      19             : #include <time.h>
      20             : 
      21             : #include "common/connect.h"
      22             : #include "common/controldata_utils.h"
      23             : #include "common/logging.h"
      24             : #include "common/pg_prng.h"
      25             : #include "common/restricted_token.h"
      26             : #include "fe_utils/recovery_gen.h"
      27             : #include "fe_utils/simple_list.h"
      28             : #include "fe_utils/string_utils.h"
      29             : #include "getopt_long.h"
      30             : 
      31             : #define DEFAULT_SUB_PORT    "50432"
      32             : 
      33             : /* Command-line options */
      34             : struct CreateSubscriberOptions
      35             : {
      36             :     char       *config_file;    /* configuration file */
      37             :     char       *pub_conninfo_str;   /* publisher connection string */
      38             :     char       *socket_dir;     /* directory for Unix-domain socket, if any */
      39             :     char       *sub_port;       /* subscriber port number */
      40             :     const char *sub_username;   /* subscriber username */
      41             :     SimpleStringList database_names;    /* list of database names */
      42             :     SimpleStringList pub_names; /* list of publication names */
      43             :     SimpleStringList sub_names; /* list of subscription names */
      44             :     SimpleStringList replslot_names;    /* list of replication slot names */
      45             :     int         recovery_timeout;   /* stop recovery after this time */
      46             : };
      47             : 
      48             : struct LogicalRepInfo
      49             : {
      50             :     char       *dbname;         /* database name */
      51             :     char       *pubconninfo;    /* publisher connection string */
      52             :     char       *subconninfo;    /* subscriber connection string */
      53             :     char       *pubname;        /* publication name */
      54             :     char       *subname;        /* subscription name */
      55             :     char       *replslotname;   /* replication slot name */
      56             : 
      57             :     bool        made_replslot;  /* replication slot was created */
      58             :     bool        made_publication;   /* publication was created */
      59             : };
      60             : 
      61             : static void cleanup_objects_atexit(void);
      62             : static void usage();
      63             : static char *get_base_conninfo(const char *conninfo, char **dbname);
      64             : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
      65             : static char *get_exec_path(const char *argv0, const char *progname);
      66             : static void check_data_directory(const char *datadir);
      67             : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
      68             : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
      69             :                                                  const char *pub_base_conninfo,
      70             :                                                  const char *sub_base_conninfo);
      71             : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
      72             : static void disconnect_database(PGconn *conn, bool exit_on_error);
      73             : static uint64 get_primary_sysid(const char *conninfo);
      74             : static uint64 get_standby_sysid(const char *datadir);
      75             : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
      76             : static bool server_is_in_recovery(PGconn *conn);
      77             : static char *generate_object_name(PGconn *conn);
      78             : static void check_publisher(const struct LogicalRepInfo *dbinfo);
      79             : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
      80             : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
      81             : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
      82             :                              const char *consistent_lsn);
      83             : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
      84             :                            const char *lsn);
      85             : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
      86             :                                           const char *slotname);
      87             : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
      88             : static char *create_logical_replication_slot(PGconn *conn,
      89             :                                              struct LogicalRepInfo *dbinfo);
      90             : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
      91             :                                   const char *slot_name);
      92             : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
      93             : static void start_standby_server(const struct CreateSubscriberOptions *opt,
      94             :                                  bool restricted_access,
      95             :                                  bool restrict_logical_worker);
      96             : static void stop_standby_server(const char *datadir);
      97             : static void wait_for_end_recovery(const char *conninfo,
      98             :                                   const struct CreateSubscriberOptions *opt);
      99             : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     100             : static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     101             : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     102             : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
     103             :                                      const char *lsn);
     104             : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     105             : static void check_and_drop_existing_subscriptions(PGconn *conn,
     106             :                                                   const struct LogicalRepInfo *dbinfo);
     107             : static void drop_existing_subscriptions(PGconn *conn, const char *subname,
     108             :                                         const char *dbname);
     109             : 
     110             : #define USEC_PER_SEC    1000000
     111             : #define WAIT_INTERVAL   1       /* 1 second */
     112             : 
     113             : static const char *progname;
     114             : 
     115             : static char *primary_slot_name = NULL;
     116             : static bool dry_run = false;
     117             : 
     118             : static bool success = false;
     119             : 
     120             : static struct LogicalRepInfo *dbinfo;
     121             : static int  num_dbs = 0;        /* number of specified databases */
     122             : static int  num_pubs = 0;       /* number of specified publications */
     123             : static int  num_subs = 0;       /* number of specified subscriptions */
     124             : static int  num_replslots = 0;  /* number of specified replication slots */
     125             : 
     126             : static pg_prng_state prng_state;
     127             : 
     128             : static char *pg_ctl_path = NULL;
     129             : static char *pg_resetwal_path = NULL;
     130             : 
     131             : /* standby / subscriber data directory */
     132             : static char *subscriber_dir = NULL;
     133             : 
     134             : static bool recovery_ended = false;
     135             : static bool standby_running = false;
     136             : 
     137             : enum WaitPMResult
     138             : {
     139             :     POSTMASTER_READY,
     140             :     POSTMASTER_STILL_STARTING
     141             : };
     142             : 
     143             : 
     144             : /*
     145             :  * Cleanup objects that were created by pg_createsubscriber if there is an
     146             :  * error.
     147             :  *
     148             :  * Publications and replication slots are created on primary. Depending on the
     149             :  * step it failed, it should remove the already created objects if it is
     150             :  * possible (sometimes it won't work due to a connection issue).
     151             :  * There is no cleanup on the target server. The steps on the target server are
     152             :  * executed *after* promotion, hence, at this point, a failure means recreate
     153             :  * the physical replica and start again.
     154             :  */
     155             : static void
     156          18 : cleanup_objects_atexit(void)
     157             : {
     158          18 :     if (success)
     159           6 :         return;
     160             : 
     161             :     /*
     162             :      * If the server is promoted, there is no way to use the current setup
     163             :      * again. Warn the user that a new replication setup should be done before
     164             :      * trying again.
     165             :      */
     166          12 :     if (recovery_ended)
     167             :     {
     168           0 :         pg_log_warning("failed after the end of recovery");
     169           0 :         pg_log_warning_hint("The target server cannot be used as a physical replica anymore.  "
     170             :                             "You must recreate the physical replica before continuing.");
     171             :     }
     172             : 
     173          36 :     for (int i = 0; i < num_dbs; i++)
     174             :     {
     175          24 :         if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
     176             :         {
     177             :             PGconn     *conn;
     178             : 
     179           0 :             conn = connect_database(dbinfo[i].pubconninfo, false);
     180           0 :             if (conn != NULL)
     181             :             {
     182           0 :                 if (dbinfo[i].made_publication)
     183           0 :                     drop_publication(conn, &dbinfo[i]);
     184           0 :                 if (dbinfo[i].made_replslot)
     185           0 :                     drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
     186           0 :                 disconnect_database(conn, false);
     187             :             }
     188             :             else
     189             :             {
     190             :                 /*
     191             :                  * If a connection could not be established, inform the user
     192             :                  * that some objects were left on primary and should be
     193             :                  * removed before trying again.
     194             :                  */
     195           0 :                 if (dbinfo[i].made_publication)
     196             :                 {
     197           0 :                     pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
     198             :                                    dbinfo[i].pubname, dbinfo[i].dbname);
     199           0 :                     pg_log_warning_hint("Drop this publication before trying again.");
     200             :                 }
     201           0 :                 if (dbinfo[i].made_replslot)
     202             :                 {
     203           0 :                     pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
     204             :                                    dbinfo[i].replslotname, dbinfo[i].dbname);
     205           0 :                     pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
     206             :                 }
     207             :             }
     208             :         }
     209             :     }
     210             : 
     211          12 :     if (standby_running)
     212           8 :         stop_standby_server(subscriber_dir);
     213             : }
     214             : 
     215             : static void
     216           2 : usage(void)
     217             : {
     218           2 :     printf(_("%s creates a new logical replica from a standby server.\n\n"),
     219             :            progname);
     220           2 :     printf(_("Usage:\n"));
     221           2 :     printf(_("  %s [OPTION]...\n"), progname);
     222           2 :     printf(_("\nOptions:\n"));
     223           2 :     printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
     224           2 :     printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
     225           2 :     printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
     226           2 :     printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
     227           2 :     printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
     228           2 :     printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
     229           2 :     printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
     230           2 :     printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
     231           2 :     printf(_("  -v, --verbose                   output verbose messages\n"));
     232           2 :     printf(_("      --config-file=FILENAME      use specified main server configuration\n"
     233             :              "                                  file when running target cluster\n"));
     234           2 :     printf(_("      --publication=NAME          publication name\n"));
     235           2 :     printf(_("      --replication-slot=NAME     replication slot name\n"));
     236           2 :     printf(_("      --subscription=NAME         subscription name\n"));
     237           2 :     printf(_("  -V, --version                   output version information, then exit\n"));
     238           2 :     printf(_("  -?, --help                      show this help, then exit\n"));
     239           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     240           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     241           2 : }
     242             : 
     243             : /*
     244             :  * Subroutine to append "keyword=value" to a connection string,
     245             :  * with proper quoting of the value.  (We assume keywords don't need that.)
     246             :  */
     247             : static void
     248         190 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
     249             : {
     250         190 :     if (buf->len > 0)
     251         138 :         appendPQExpBufferChar(buf, ' ');
     252         190 :     appendPQExpBufferStr(buf, keyword);
     253         190 :     appendPQExpBufferChar(buf, '=');
     254         190 :     appendConnStrVal(buf, val);
     255         190 : }
     256             : 
     257             : /*
     258             :  * Validate a connection string. Returns a base connection string that is a
     259             :  * connection string without a database name.
     260             :  *
     261             :  * Since we might process multiple databases, each database name will be
     262             :  * appended to this base connection string to provide a final connection
     263             :  * string. If the second argument (dbname) is not null, returns dbname if the
     264             :  * provided connection string contains it.
     265             :  *
     266             :  * It is the caller's responsibility to free the returned connection string and
     267             :  * dbname.
     268             :  */
     269             : static char *
     270          26 : get_base_conninfo(const char *conninfo, char **dbname)
     271             : {
     272             :     PQExpBuffer buf;
     273             :     PQconninfoOption *conn_opts;
     274             :     PQconninfoOption *conn_opt;
     275          26 :     char       *errmsg = NULL;
     276             :     char       *ret;
     277             : 
     278          26 :     conn_opts = PQconninfoParse(conninfo, &errmsg);
     279          26 :     if (conn_opts == NULL)
     280             :     {
     281           0 :         pg_log_error("could not parse connection string: %s", errmsg);
     282           0 :         PQfreemem(errmsg);
     283           0 :         return NULL;
     284             :     }
     285             : 
     286          26 :     buf = createPQExpBuffer();
     287        1248 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     288             :     {
     289        1222 :         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     290             :         {
     291          62 :             if (strcmp(conn_opt->keyword, "dbname") == 0)
     292             :             {
     293          18 :                 if (dbname)
     294          18 :                     *dbname = pg_strdup(conn_opt->val);
     295          18 :                 continue;
     296             :             }
     297          44 :             appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
     298             :         }
     299             :     }
     300             : 
     301          26 :     ret = pg_strdup(buf->data);
     302             : 
     303          26 :     destroyPQExpBuffer(buf);
     304          26 :     PQconninfoFree(conn_opts);
     305             : 
     306          26 :     return ret;
     307             : }
     308             : 
     309             : /*
     310             :  * Build a subscriber connection string. Only a few parameters are supported
     311             :  * since it starts a server with restricted access.
     312             :  */
     313             : static char *
     314          26 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
     315             : {
     316          26 :     PQExpBuffer buf = createPQExpBuffer();
     317             :     char       *ret;
     318             : 
     319          26 :     appendConnStrItem(buf, "port", opt->sub_port);
     320             : #if !defined(WIN32)
     321          26 :     appendConnStrItem(buf, "host", opt->socket_dir);
     322             : #endif
     323          26 :     if (opt->sub_username != NULL)
     324           0 :         appendConnStrItem(buf, "user", opt->sub_username);
     325          26 :     appendConnStrItem(buf, "fallback_application_name", progname);
     326             : 
     327          26 :     ret = pg_strdup(buf->data);
     328             : 
     329          26 :     destroyPQExpBuffer(buf);
     330             : 
     331          26 :     return ret;
     332             : }
     333             : 
     334             : /*
     335             :  * Verify if a PostgreSQL binary (progname) is available in the same directory as
     336             :  * pg_createsubscriber and it has the same version.  It returns the absolute
     337             :  * path of the progname.
     338             :  */
     339             : static char *
     340          36 : get_exec_path(const char *argv0, const char *progname)
     341             : {
     342             :     char       *versionstr;
     343             :     char       *exec_path;
     344             :     int         ret;
     345             : 
     346          36 :     versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
     347          36 :     exec_path = pg_malloc(MAXPGPATH);
     348          36 :     ret = find_other_exec(argv0, progname, versionstr, exec_path);
     349             : 
     350          36 :     if (ret < 0)
     351             :     {
     352             :         char        full_path[MAXPGPATH];
     353             : 
     354           0 :         if (find_my_exec(argv0, full_path) < 0)
     355           0 :             strlcpy(full_path, progname, sizeof(full_path));
     356             : 
     357           0 :         if (ret == -1)
     358           0 :             pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
     359             :                      progname, "pg_createsubscriber", full_path);
     360             :         else
     361           0 :             pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
     362             :                      progname, full_path, "pg_createsubscriber");
     363             :     }
     364             : 
     365          36 :     pg_log_debug("%s path is:  %s", progname, exec_path);
     366             : 
     367          36 :     return exec_path;
     368             : }
     369             : 
     370             : /*
     371             :  * Is it a cluster directory? These are preliminary checks. It is far from
     372             :  * making an accurate check. If it is not a clone from the publisher, it will
     373             :  * eventually fail in a future step.
     374             :  */
     375             : static void
     376          18 : check_data_directory(const char *datadir)
     377             : {
     378             :     struct stat statbuf;
     379             :     char        versionfile[MAXPGPATH];
     380             : 
     381          18 :     pg_log_info("checking if directory \"%s\" is a cluster data directory",
     382             :                 datadir);
     383             : 
     384          18 :     if (stat(datadir, &statbuf) != 0)
     385             :     {
     386           0 :         if (errno == ENOENT)
     387           0 :             pg_fatal("data directory \"%s\" does not exist", datadir);
     388             :         else
     389           0 :             pg_fatal("could not access directory \"%s\": %m", datadir);
     390             :     }
     391             : 
     392          18 :     snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
     393          18 :     if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
     394             :     {
     395           0 :         pg_fatal("directory \"%s\" is not a database cluster directory",
     396             :                  datadir);
     397             :     }
     398          18 : }
     399             : 
     400             : /*
     401             :  * Append database name into a base connection string.
     402             :  *
     403             :  * dbname is the only parameter that changes so it is not included in the base
     404             :  * connection string. This function concatenates dbname to build a "real"
     405             :  * connection string.
     406             :  */
     407             : static char *
     408          68 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
     409             : {
     410          68 :     PQExpBuffer buf = createPQExpBuffer();
     411             :     char       *ret;
     412             : 
     413             :     Assert(conninfo != NULL);
     414             : 
     415          68 :     appendPQExpBufferStr(buf, conninfo);
     416          68 :     appendConnStrItem(buf, "dbname", dbname);
     417             : 
     418          68 :     ret = pg_strdup(buf->data);
     419          68 :     destroyPQExpBuffer(buf);
     420             : 
     421          68 :     return ret;
     422             : }
     423             : 
     424             : /*
     425             :  * Store publication and subscription information.
     426             :  *
     427             :  * If publication, replication slot and subscription names were specified,
     428             :  * store it here. Otherwise, a generated name will be assigned to the object in
     429             :  * setup_publisher().
     430             :  */
     431             : static struct LogicalRepInfo *
     432          18 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     433             :                    const char *pub_base_conninfo,
     434             :                    const char *sub_base_conninfo)
     435             : {
     436             :     struct LogicalRepInfo *dbinfo;
     437          18 :     SimpleStringListCell *pubcell = NULL;
     438          18 :     SimpleStringListCell *subcell = NULL;
     439          18 :     SimpleStringListCell *replslotcell = NULL;
     440          18 :     int         i = 0;
     441             : 
     442          18 :     dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
     443             : 
     444          18 :     if (num_pubs > 0)
     445           4 :         pubcell = opt->pub_names.head;
     446          18 :     if (num_subs > 0)
     447           2 :         subcell = opt->sub_names.head;
     448          18 :     if (num_replslots > 0)
     449           4 :         replslotcell = opt->replslot_names.head;
     450             : 
     451          52 :     for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
     452             :     {
     453             :         char       *conninfo;
     454             : 
     455             :         /* Fill publisher attributes */
     456          34 :         conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
     457          34 :         dbinfo[i].pubconninfo = conninfo;
     458          34 :         dbinfo[i].dbname = cell->val;
     459          34 :         if (num_pubs > 0)
     460           8 :             dbinfo[i].pubname = pubcell->val;
     461             :         else
     462          26 :             dbinfo[i].pubname = NULL;
     463          34 :         if (num_replslots > 0)
     464           6 :             dbinfo[i].replslotname = replslotcell->val;
     465             :         else
     466          28 :             dbinfo[i].replslotname = NULL;
     467          34 :         dbinfo[i].made_replslot = false;
     468          34 :         dbinfo[i].made_publication = false;
     469             :         /* Fill subscriber attributes */
     470          34 :         conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
     471          34 :         dbinfo[i].subconninfo = conninfo;
     472          34 :         if (num_subs > 0)
     473           4 :             dbinfo[i].subname = subcell->val;
     474             :         else
     475          30 :             dbinfo[i].subname = NULL;
     476             :         /* Other fields will be filled later */
     477             : 
     478          34 :         pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
     479             :                      dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
     480             :                      dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
     481             :                      dbinfo[i].pubconninfo);
     482          34 :         pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
     483             :                      dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
     484             :                      dbinfo[i].subconninfo);
     485             : 
     486          34 :         if (num_pubs > 0)
     487           8 :             pubcell = pubcell->next;
     488          34 :         if (num_subs > 0)
     489           4 :             subcell = subcell->next;
     490          34 :         if (num_replslots > 0)
     491           6 :             replslotcell = replslotcell->next;
     492             : 
     493          34 :         i++;
     494             :     }
     495             : 
     496          18 :     return dbinfo;
     497             : }
     498             : 
     499             : /*
     500             :  * Open a new connection. If exit_on_error is true, it has an undesired
     501             :  * condition and it should exit immediately.
     502             :  */
     503             : static PGconn *
     504          86 : connect_database(const char *conninfo, bool exit_on_error)
     505             : {
     506             :     PGconn     *conn;
     507             :     PGresult   *res;
     508             : 
     509          86 :     conn = PQconnectdb(conninfo);
     510          86 :     if (PQstatus(conn) != CONNECTION_OK)
     511             :     {
     512           0 :         pg_log_error("connection to database failed: %s",
     513             :                      PQerrorMessage(conn));
     514           0 :         PQfinish(conn);
     515             : 
     516           0 :         if (exit_on_error)
     517           0 :             exit(1);
     518           0 :         return NULL;
     519             :     }
     520             : 
     521             :     /* Secure search_path */
     522          86 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     523          86 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     524             :     {
     525           0 :         pg_log_error("could not clear \"search_path\": %s",
     526             :                      PQresultErrorMessage(res));
     527           0 :         PQclear(res);
     528           0 :         PQfinish(conn);
     529             : 
     530           0 :         if (exit_on_error)
     531           0 :             exit(1);
     532           0 :         return NULL;
     533             :     }
     534          86 :     PQclear(res);
     535             : 
     536          86 :     return conn;
     537             : }
     538             : 
     539             : /*
     540             :  * Close the connection. If exit_on_error is true, it has an undesired
     541             :  * condition and it should exit immediately.
     542             :  */
     543             : static void
     544          86 : disconnect_database(PGconn *conn, bool exit_on_error)
     545             : {
     546             :     Assert(conn != NULL);
     547             : 
     548          86 :     PQfinish(conn);
     549             : 
     550          86 :     if (exit_on_error)
     551           4 :         exit(1);
     552          82 : }
     553             : 
     554             : /*
     555             :  * Obtain the system identifier using the provided connection. It will be used
     556             :  * to compare if a data directory is a clone of another one.
     557             :  */
     558             : static uint64
     559          18 : get_primary_sysid(const char *conninfo)
     560             : {
     561             :     PGconn     *conn;
     562             :     PGresult   *res;
     563             :     uint64      sysid;
     564             : 
     565          18 :     pg_log_info("getting system identifier from publisher");
     566             : 
     567          18 :     conn = connect_database(conninfo, true);
     568             : 
     569          18 :     res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
     570          18 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     571             :     {
     572           0 :         pg_log_error("could not get system identifier: %s",
     573             :                      PQresultErrorMessage(res));
     574           0 :         disconnect_database(conn, true);
     575             :     }
     576          18 :     if (PQntuples(res) != 1)
     577             :     {
     578           0 :         pg_log_error("could not get system identifier: got %d rows, expected %d row",
     579             :                      PQntuples(res), 1);
     580           0 :         disconnect_database(conn, true);
     581             :     }
     582             : 
     583          18 :     sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
     584             : 
     585          18 :     pg_log_info("system identifier is %llu on publisher",
     586             :                 (unsigned long long) sysid);
     587             : 
     588          18 :     PQclear(res);
     589          18 :     disconnect_database(conn, false);
     590             : 
     591          18 :     return sysid;
     592             : }
     593             : 
     594             : /*
     595             :  * Obtain the system identifier from control file. It will be used to compare
     596             :  * if a data directory is a clone of another one. This routine is used locally
     597             :  * and avoids a connection.
     598             :  */
     599             : static uint64
     600          18 : get_standby_sysid(const char *datadir)
     601             : {
     602             :     ControlFileData *cf;
     603             :     bool        crc_ok;
     604             :     uint64      sysid;
     605             : 
     606          18 :     pg_log_info("getting system identifier from subscriber");
     607             : 
     608          18 :     cf = get_controlfile(datadir, &crc_ok);
     609          18 :     if (!crc_ok)
     610           0 :         pg_fatal("control file appears to be corrupt");
     611             : 
     612          18 :     sysid = cf->system_identifier;
     613             : 
     614          18 :     pg_log_info("system identifier is %llu on subscriber",
     615             :                 (unsigned long long) sysid);
     616             : 
     617          18 :     pg_free(cf);
     618             : 
     619          18 :     return sysid;
     620             : }
     621             : 
     622             : /*
     623             :  * Modify the system identifier. Since a standby server preserves the system
     624             :  * identifier, it makes sense to change it to avoid situations in which WAL
     625             :  * files from one of the systems might be used in the other one.
     626             :  */
     627             : static void
     628           6 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
     629             : {
     630             :     ControlFileData *cf;
     631             :     bool        crc_ok;
     632             :     struct timeval tv;
     633             : 
     634             :     char       *cmd_str;
     635             : 
     636           6 :     pg_log_info("modifying system identifier of subscriber");
     637             : 
     638           6 :     cf = get_controlfile(subscriber_dir, &crc_ok);
     639           6 :     if (!crc_ok)
     640           0 :         pg_fatal("control file appears to be corrupt");
     641             : 
     642             :     /*
     643             :      * Select a new system identifier.
     644             :      *
     645             :      * XXX this code was extracted from BootStrapXLOG().
     646             :      */
     647           6 :     gettimeofday(&tv, NULL);
     648           6 :     cf->system_identifier = ((uint64) tv.tv_sec) << 32;
     649           6 :     cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
     650           6 :     cf->system_identifier |= getpid() & 0xFFF;
     651             : 
     652           6 :     if (!dry_run)
     653           2 :         update_controlfile(subscriber_dir, cf, true);
     654             : 
     655           6 :     pg_log_info("system identifier is %llu on subscriber",
     656             :                 (unsigned long long) cf->system_identifier);
     657             : 
     658           6 :     pg_log_info("running pg_resetwal on the subscriber");
     659             : 
     660           6 :     cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
     661             :                        subscriber_dir, DEVNULL);
     662             : 
     663           6 :     pg_log_debug("pg_resetwal command is: %s", cmd_str);
     664             : 
     665           6 :     if (!dry_run)
     666             :     {
     667           2 :         int         rc = system(cmd_str);
     668             : 
     669           2 :         if (rc == 0)
     670           2 :             pg_log_info("subscriber successfully changed the system identifier");
     671             :         else
     672           0 :             pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
     673             :     }
     674             : 
     675           6 :     pg_free(cf);
     676           6 : }
     677             : 
     678             : /*
     679             :  * Generate an object name using a prefix, database oid and a random integer.
     680             :  * It is used in case the user does not specify an object name (publication,
     681             :  * subscription, replication slot).
     682             :  */
     683             : static char *
     684          10 : generate_object_name(PGconn *conn)
     685             : {
     686             :     PGresult   *res;
     687             :     Oid         oid;
     688             :     uint32      rand;
     689             :     char       *objname;
     690             : 
     691          10 :     res = PQexec(conn,
     692             :                  "SELECT oid FROM pg_catalog.pg_database "
     693             :                  "WHERE datname = pg_catalog.current_database()");
     694          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     695             :     {
     696           0 :         pg_log_error("could not obtain database OID: %s",
     697             :                      PQresultErrorMessage(res));
     698           0 :         disconnect_database(conn, true);
     699             :     }
     700             : 
     701          10 :     if (PQntuples(res) != 1)
     702             :     {
     703           0 :         pg_log_error("could not obtain database OID: got %d rows, expected %d row",
     704             :                      PQntuples(res), 1);
     705           0 :         disconnect_database(conn, true);
     706             :     }
     707             : 
     708             :     /* Database OID */
     709          10 :     oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
     710             : 
     711          10 :     PQclear(res);
     712             : 
     713             :     /* Random unsigned integer */
     714          10 :     rand = pg_prng_uint32(&prng_state);
     715             : 
     716             :     /*
     717             :      * Build the object name. The name must not exceed NAMEDATALEN - 1. This
     718             :      * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
     719             :      * '\0').
     720             :      */
     721          10 :     objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
     722             : 
     723          10 :     return objname;
     724             : }
     725             : 
     726             : /*
     727             :  * Create the publications and replication slots in preparation for logical
     728             :  * replication. Returns the LSN from latest replication slot. It will be the
     729             :  * replication start point that is used to adjust the subscriptions (see
     730             :  * set_replication_progress).
     731             :  */
     732             : static char *
     733           6 : setup_publisher(struct LogicalRepInfo *dbinfo)
     734             : {
     735           6 :     char       *lsn = NULL;
     736             : 
     737           6 :     pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
     738             : 
     739          16 :     for (int i = 0; i < num_dbs; i++)
     740             :     {
     741             :         PGconn     *conn;
     742          10 :         char       *genname = NULL;
     743             : 
     744          10 :         conn = connect_database(dbinfo[i].pubconninfo, true);
     745             : 
     746             :         /*
     747             :          * If an object name was not specified as command-line options, assign
     748             :          * a generated object name. The replication slot has a different rule.
     749             :          * The subscription name is assigned to the replication slot name if
     750             :          * no replication slot is specified. It follows the same rule as
     751             :          * CREATE SUBSCRIPTION.
     752             :          */
     753          10 :         if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
     754          10 :             genname = generate_object_name(conn);
     755          10 :         if (num_pubs == 0)
     756           2 :             dbinfo[i].pubname = pg_strdup(genname);
     757          10 :         if (num_subs == 0)
     758           6 :             dbinfo[i].subname = pg_strdup(genname);
     759          10 :         if (num_replslots == 0)
     760           4 :             dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
     761             : 
     762             :         /*
     763             :          * Create publication on publisher. This step should be executed
     764             :          * *before* promoting the subscriber to avoid any transactions between
     765             :          * consistent LSN and the new publication rows (such transactions
     766             :          * wouldn't see the new publication rows resulting in an error).
     767             :          */
     768          10 :         create_publication(conn, &dbinfo[i]);
     769             : 
     770             :         /* Create replication slot on publisher */
     771          10 :         if (lsn)
     772           2 :             pg_free(lsn);
     773          10 :         lsn = create_logical_replication_slot(conn, &dbinfo[i]);
     774          10 :         if (lsn != NULL || dry_run)
     775          10 :             pg_log_info("create replication slot \"%s\" on publisher",
     776             :                         dbinfo[i].replslotname);
     777             :         else
     778           0 :             exit(1);
     779             : 
     780             :         /*
     781             :          * Since we are using the LSN returned by the last replication slot as
     782             :          * recovery_target_lsn, this LSN is ahead of the current WAL position
     783             :          * and the recovery waits until the publisher writes a WAL record to
     784             :          * reach the target and ends the recovery. On idle systems, this wait
     785             :          * time is unpredictable and could lead to failure in promoting the
     786             :          * subscriber. To avoid that, insert a harmless WAL record.
     787             :          */
     788          10 :         if (i == num_dbs - 1 && !dry_run)
     789             :         {
     790             :             PGresult   *res;
     791             : 
     792           2 :             res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
     793           2 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     794             :             {
     795           0 :                 pg_log_error("could not write an additional WAL record: %s",
     796             :                              PQresultErrorMessage(res));
     797           0 :                 disconnect_database(conn, true);
     798             :             }
     799           2 :             PQclear(res);
     800             :         }
     801             : 
     802          10 :         disconnect_database(conn, false);
     803             :     }
     804             : 
     805           6 :     return lsn;
     806             : }
     807             : 
     808             : /*
     809             :  * Is recovery still in progress?
     810             :  */
     811             : static bool
     812          32 : server_is_in_recovery(PGconn *conn)
     813             : {
     814             :     PGresult   *res;
     815             :     int         ret;
     816             : 
     817          32 :     res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
     818             : 
     819          32 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     820             :     {
     821           0 :         pg_log_error("could not obtain recovery progress: %s",
     822             :                      PQresultErrorMessage(res));
     823           0 :         disconnect_database(conn, true);
     824             :     }
     825             : 
     826             : 
     827          32 :     ret = strcmp("t", PQgetvalue(res, 0, 0));
     828             : 
     829          32 :     PQclear(res);
     830             : 
     831          32 :     return ret == 0;
     832             : }
     833             : 
     834             : /*
     835             :  * Is the primary server ready for logical replication?
     836             :  *
     837             :  * XXX Does it not allow a synchronous replica?
     838             :  */
     839             : static void
     840          10 : check_publisher(const struct LogicalRepInfo *dbinfo)
     841             : {
     842             :     PGconn     *conn;
     843             :     PGresult   *res;
     844          10 :     bool        failed = false;
     845             : 
     846             :     char       *wal_level;
     847             :     int         max_repslots;
     848             :     int         cur_repslots;
     849             :     int         max_walsenders;
     850             :     int         cur_walsenders;
     851             :     int         max_prepared_transactions;
     852             :     char       *max_slot_wal_keep_size;
     853             : 
     854          10 :     pg_log_info("checking settings on publisher");
     855             : 
     856          10 :     conn = connect_database(dbinfo[0].pubconninfo, true);
     857             : 
     858             :     /*
     859             :      * If the primary server is in recovery (i.e. cascading replication),
     860             :      * objects (publication) cannot be created because it is read only.
     861             :      */
     862          10 :     if (server_is_in_recovery(conn))
     863             :     {
     864           2 :         pg_log_error("primary server cannot be in recovery");
     865           2 :         disconnect_database(conn, true);
     866             :     }
     867             : 
     868             :     /*------------------------------------------------------------------------
     869             :      * Logical replication requires a few parameters to be set on publisher.
     870             :      * Since these parameters are not a requirement for physical replication,
     871             :      * we should check it to make sure it won't fail.
     872             :      *
     873             :      * - wal_level = logical
     874             :      * - max_replication_slots >= current + number of dbs to be converted
     875             :      * - max_wal_senders >= current + number of dbs to be converted
     876             :      * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
     877             :      * -----------------------------------------------------------------------
     878             :      */
     879           8 :     res = PQexec(conn,
     880             :                  "SELECT pg_catalog.current_setting('wal_level'),"
     881             :                  " pg_catalog.current_setting('max_replication_slots'),"
     882             :                  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
     883             :                  " pg_catalog.current_setting('max_wal_senders'),"
     884             :                  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
     885             :                  " pg_catalog.current_setting('max_prepared_transactions'),"
     886             :                  " pg_catalog.current_setting('max_slot_wal_keep_size')");
     887             : 
     888           8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     889             :     {
     890           0 :         pg_log_error("could not obtain publisher settings: %s",
     891             :                      PQresultErrorMessage(res));
     892           0 :         disconnect_database(conn, true);
     893             :     }
     894             : 
     895           8 :     wal_level = pg_strdup(PQgetvalue(res, 0, 0));
     896           8 :     max_repslots = atoi(PQgetvalue(res, 0, 1));
     897           8 :     cur_repslots = atoi(PQgetvalue(res, 0, 2));
     898           8 :     max_walsenders = atoi(PQgetvalue(res, 0, 3));
     899           8 :     cur_walsenders = atoi(PQgetvalue(res, 0, 4));
     900           8 :     max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
     901           8 :     max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
     902             : 
     903           8 :     PQclear(res);
     904             : 
     905           8 :     pg_log_debug("publisher: wal_level: %s", wal_level);
     906           8 :     pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
     907           8 :     pg_log_debug("publisher: current replication slots: %d", cur_repslots);
     908           8 :     pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
     909           8 :     pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
     910           8 :     pg_log_debug("publisher: max_prepared_transactions: %d",
     911             :                  max_prepared_transactions);
     912           8 :     pg_log_debug("publisher: max_slot_wal_keep_size: %s",
     913             :                  max_slot_wal_keep_size);
     914             : 
     915           8 :     disconnect_database(conn, false);
     916             : 
     917           8 :     if (strcmp(wal_level, "logical") != 0)
     918             :     {
     919           2 :         pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
     920           2 :         failed = true;
     921             :     }
     922             : 
     923           8 :     if (max_repslots - cur_repslots < num_dbs)
     924             :     {
     925           2 :         pg_log_error("publisher requires %d replication slots, but only %d remain",
     926             :                      num_dbs, max_repslots - cur_repslots);
     927           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
     928             :                           "max_replication_slots", cur_repslots + num_dbs);
     929           2 :         failed = true;
     930             :     }
     931             : 
     932           8 :     if (max_walsenders - cur_walsenders < num_dbs)
     933             :     {
     934           2 :         pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
     935             :                      num_dbs, max_walsenders - cur_walsenders);
     936           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
     937             :                           "max_wal_senders", cur_walsenders + num_dbs);
     938           2 :         failed = true;
     939             :     }
     940             : 
     941           8 :     if (max_prepared_transactions != 0)
     942             :     {
     943           0 :         pg_log_warning("two_phase option will not be enabled for replication slots");
     944           0 :         pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
     945             :                               "Prepared transactions will be replicated at COMMIT PREPARED.");
     946             :     }
     947             : 
     948             :     /*
     949             :      * Validate 'max_slot_wal_keep_size'. If this parameter is set to a
     950             :      * non-default value, it may cause replication failures due to required
     951             :      * WAL files being prematurely removed.
     952             :      */
     953           8 :     if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
     954             :     {
     955           0 :         pg_log_warning("required WAL could be removed from the publisher");
     956           0 :         pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
     957             :                             "max_slot_wal_keep_size");
     958             :     }
     959             : 
     960           8 :     pg_free(wal_level);
     961             : 
     962           8 :     if (failed)
     963           2 :         exit(1);
     964           6 : }
     965             : 
     966             : /*
     967             :  * Is the standby server ready for logical replication?
     968             :  *
     969             :  * XXX Does it not allow a time-delayed replica?
     970             :  *
     971             :  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
     972             :  * is S, it cannot detect there is a replica (server C) because server S starts
     973             :  * accepting only local connections and server C cannot connect to it. Hence,
     974             :  * there is not a reliable way to provide a suitable error saying the server C
     975             :  * will be broken at the end of this process (due to pg_resetwal).
     976             :  */
     977             : static void
     978          14 : check_subscriber(const struct LogicalRepInfo *dbinfo)
     979             : {
     980             :     PGconn     *conn;
     981             :     PGresult   *res;
     982          14 :     bool        failed = false;
     983             : 
     984             :     int         max_lrworkers;
     985             :     int         max_repslots;
     986             :     int         max_wprocs;
     987             : 
     988          14 :     pg_log_info("checking settings on subscriber");
     989             : 
     990          14 :     conn = connect_database(dbinfo[0].subconninfo, true);
     991             : 
     992             :     /* The target server must be a standby */
     993          14 :     if (!server_is_in_recovery(conn))
     994             :     {
     995           2 :         pg_log_error("target server must be a standby");
     996           2 :         disconnect_database(conn, true);
     997             :     }
     998             : 
     999             :     /*------------------------------------------------------------------------
    1000             :      * Logical replication requires a few parameters to be set on subscriber.
    1001             :      * Since these parameters are not a requirement for physical replication,
    1002             :      * we should check it to make sure it won't fail.
    1003             :      *
    1004             :      * - max_replication_slots >= number of dbs to be converted
    1005             :      * - max_logical_replication_workers >= number of dbs to be converted
    1006             :      * - max_worker_processes >= 1 + number of dbs to be converted
    1007             :      *------------------------------------------------------------------------
    1008             :      */
    1009          12 :     res = PQexec(conn,
    1010             :                  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
    1011             :                  "'max_logical_replication_workers', "
    1012             :                  "'max_replication_slots', "
    1013             :                  "'max_worker_processes', "
    1014             :                  "'primary_slot_name') "
    1015             :                  "ORDER BY name");
    1016             : 
    1017          12 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1018             :     {
    1019           0 :         pg_log_error("could not obtain subscriber settings: %s",
    1020             :                      PQresultErrorMessage(res));
    1021           0 :         disconnect_database(conn, true);
    1022             :     }
    1023             : 
    1024          12 :     max_lrworkers = atoi(PQgetvalue(res, 0, 0));
    1025          12 :     max_repslots = atoi(PQgetvalue(res, 1, 0));
    1026          12 :     max_wprocs = atoi(PQgetvalue(res, 2, 0));
    1027          12 :     if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
    1028          10 :         primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
    1029             : 
    1030          12 :     pg_log_debug("subscriber: max_logical_replication_workers: %d",
    1031             :                  max_lrworkers);
    1032          12 :     pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
    1033          12 :     pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
    1034          12 :     if (primary_slot_name)
    1035          10 :         pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
    1036             : 
    1037          12 :     PQclear(res);
    1038             : 
    1039          12 :     disconnect_database(conn, false);
    1040             : 
    1041          12 :     if (max_repslots < num_dbs)
    1042             :     {
    1043           2 :         pg_log_error("subscriber requires %d replication slots, but only %d remain",
    1044             :                      num_dbs, max_repslots);
    1045           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1046             :                           "max_replication_slots", num_dbs);
    1047           2 :         failed = true;
    1048             :     }
    1049             : 
    1050          12 :     if (max_lrworkers < num_dbs)
    1051             :     {
    1052           2 :         pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
    1053             :                      num_dbs, max_lrworkers);
    1054           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1055             :                           "max_logical_replication_workers", num_dbs);
    1056           2 :         failed = true;
    1057             :     }
    1058             : 
    1059          12 :     if (max_wprocs < num_dbs + 1)
    1060             :     {
    1061           2 :         pg_log_error("subscriber requires %d worker processes, but only %d remain",
    1062             :                      num_dbs + 1, max_wprocs);
    1063           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1064             :                           "max_worker_processes", num_dbs + 1);
    1065           2 :         failed = true;
    1066             :     }
    1067             : 
    1068          12 :     if (failed)
    1069           2 :         exit(1);
    1070          10 : }
    1071             : 
    1072             : /*
    1073             :  * Drop a specified subscription. This is to avoid duplicate subscriptions on
    1074             :  * the primary (publisher node) and the newly created subscriber. We
    1075             :  * shouldn't drop the associated slot as that would be used by the publisher
    1076             :  * node.
    1077             :  */
    1078             : static void
    1079           6 : drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
    1080             : {
    1081           6 :     PQExpBuffer query = createPQExpBuffer();
    1082             :     PGresult   *res;
    1083             : 
    1084             :     Assert(conn != NULL);
    1085             : 
    1086             :     /*
    1087             :      * Construct a query string. These commands are allowed to be executed
    1088             :      * within a transaction.
    1089             :      */
    1090           6 :     appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
    1091             :                       subname);
    1092           6 :     appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
    1093             :                       subname);
    1094           6 :     appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
    1095             : 
    1096           6 :     pg_log_info("dropping subscription \"%s\" in database \"%s\"",
    1097             :                 subname, dbname);
    1098             : 
    1099           6 :     if (!dry_run)
    1100             :     {
    1101           2 :         res = PQexec(conn, query->data);
    1102             : 
    1103           2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1104             :         {
    1105           0 :             pg_log_error("could not drop subscription \"%s\": %s",
    1106             :                          subname, PQresultErrorMessage(res));
    1107           0 :             disconnect_database(conn, true);
    1108             :         }
    1109             : 
    1110           2 :         PQclear(res);
    1111             :     }
    1112             : 
    1113           6 :     destroyPQExpBuffer(query);
    1114           6 : }
    1115             : 
    1116             : /*
    1117             :  * Retrieve and drop the pre-existing subscriptions.
    1118             :  */
    1119             : static void
    1120          10 : check_and_drop_existing_subscriptions(PGconn *conn,
    1121             :                                       const struct LogicalRepInfo *dbinfo)
    1122             : {
    1123          10 :     PQExpBuffer query = createPQExpBuffer();
    1124             :     char       *dbname;
    1125             :     PGresult   *res;
    1126             : 
    1127             :     Assert(conn != NULL);
    1128             : 
    1129          10 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1130             : 
    1131          10 :     appendPQExpBuffer(query,
    1132             :                       "SELECT s.subname FROM pg_catalog.pg_subscription s "
    1133             :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1134             :                       "WHERE d.datname = %s",
    1135             :                       dbname);
    1136          10 :     res = PQexec(conn, query->data);
    1137             : 
    1138          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1139             :     {
    1140           0 :         pg_log_error("could not obtain pre-existing subscriptions: %s",
    1141             :                      PQresultErrorMessage(res));
    1142           0 :         disconnect_database(conn, true);
    1143             :     }
    1144             : 
    1145          16 :     for (int i = 0; i < PQntuples(res); i++)
    1146           6 :         drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
    1147           6 :                                     dbinfo->dbname);
    1148             : 
    1149          10 :     PQclear(res);
    1150          10 :     destroyPQExpBuffer(query);
    1151          10 :     PQfreemem(dbname);
    1152          10 : }
    1153             : 
    1154             : /*
    1155             :  * Create the subscriptions, adjust the initial location for logical
    1156             :  * replication and enable the subscriptions. That's the last step for logical
    1157             :  * replication setup.
    1158             :  */
    1159             : static void
    1160           6 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
    1161             : {
    1162          16 :     for (int i = 0; i < num_dbs; i++)
    1163             :     {
    1164             :         PGconn     *conn;
    1165             : 
    1166             :         /* Connect to subscriber. */
    1167          10 :         conn = connect_database(dbinfo[i].subconninfo, true);
    1168             : 
    1169             :         /*
    1170             :          * We don't need the pre-existing subscriptions on the newly formed
    1171             :          * subscriber. They can connect to other publisher nodes and either
    1172             :          * get some unwarranted data or can lead to ERRORs in connecting to
    1173             :          * such nodes.
    1174             :          */
    1175          10 :         check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
    1176             : 
    1177             :         /*
    1178             :          * Since the publication was created before the consistent LSN, it is
    1179             :          * available on the subscriber when the physical replica is promoted.
    1180             :          * Remove publications from the subscriber because it has no use.
    1181             :          */
    1182          10 :         drop_publication(conn, &dbinfo[i]);
    1183             : 
    1184          10 :         create_subscription(conn, &dbinfo[i]);
    1185             : 
    1186             :         /* Set the replication progress to the correct LSN */
    1187          10 :         set_replication_progress(conn, &dbinfo[i], consistent_lsn);
    1188             : 
    1189             :         /* Enable subscription */
    1190          10 :         enable_subscription(conn, &dbinfo[i]);
    1191             : 
    1192          10 :         disconnect_database(conn, false);
    1193             :     }
    1194           6 : }
    1195             : 
    1196             : /*
    1197             :  * Write the required recovery parameters.
    1198             :  */
    1199             : static void
    1200           6 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
    1201             : {
    1202             :     PGconn     *conn;
    1203             :     PQExpBuffer recoveryconfcontents;
    1204             : 
    1205             :     /*
    1206             :      * Despite of the recovery parameters will be written to the subscriber,
    1207             :      * use a publisher connection. The primary_conninfo is generated using the
    1208             :      * connection settings.
    1209             :      */
    1210           6 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1211             : 
    1212             :     /*
    1213             :      * Write recovery parameters.
    1214             :      *
    1215             :      * The subscriber is not running yet. In dry run mode, the recovery
    1216             :      * parameters *won't* be written. An invalid LSN is used for printing
    1217             :      * purposes. Additional recovery parameters are added here. It avoids
    1218             :      * unexpected behavior such as end of recovery as soon as a consistent
    1219             :      * state is reached (recovery_target) and failure due to multiple recovery
    1220             :      * targets (name, time, xid, LSN).
    1221             :      */
    1222           6 :     recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
    1223           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
    1224           6 :     appendPQExpBuffer(recoveryconfcontents,
    1225             :                       "recovery_target_timeline = 'latest'\n");
    1226           6 :     appendPQExpBuffer(recoveryconfcontents,
    1227             :                       "recovery_target_inclusive = true\n");
    1228           6 :     appendPQExpBuffer(recoveryconfcontents,
    1229             :                       "recovery_target_action = promote\n");
    1230           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
    1231           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
    1232           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
    1233             : 
    1234           6 :     if (dry_run)
    1235             :     {
    1236           4 :         appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
    1237           4 :         appendPQExpBuffer(recoveryconfcontents,
    1238             :                           "recovery_target_lsn = '%X/%X'\n",
    1239           4 :                           LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1240             :     }
    1241             :     else
    1242             :     {
    1243           2 :         appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
    1244             :                           lsn);
    1245           2 :         WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
    1246             :     }
    1247           6 :     disconnect_database(conn, false);
    1248             : 
    1249           6 :     pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
    1250           6 : }
    1251             : 
    1252             : /*
    1253             :  * Drop physical replication slot on primary if the standby was using it. After
    1254             :  * the transformation, it has no use.
    1255             :  *
    1256             :  * XXX we might not fail here. Instead, we provide a warning so the user
    1257             :  * eventually drops this replication slot later.
    1258             :  */
    1259             : static void
    1260           6 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
    1261             : {
    1262             :     PGconn     *conn;
    1263             : 
    1264             :     /* Replication slot does not exist, do nothing */
    1265           6 :     if (!primary_slot_name)
    1266           0 :         return;
    1267             : 
    1268           6 :     conn = connect_database(dbinfo[0].pubconninfo, false);
    1269           6 :     if (conn != NULL)
    1270             :     {
    1271           6 :         drop_replication_slot(conn, &dbinfo[0], slotname);
    1272           6 :         disconnect_database(conn, false);
    1273             :     }
    1274             :     else
    1275             :     {
    1276           0 :         pg_log_warning("could not drop replication slot \"%s\" on primary",
    1277             :                        slotname);
    1278           0 :         pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
    1279             :     }
    1280             : }
    1281             : 
    1282             : /*
    1283             :  * Drop failover replication slots on subscriber. After the transformation,
    1284             :  * they have no use.
    1285             :  *
    1286             :  * XXX We do not fail here. Instead, we provide a warning so the user can drop
    1287             :  * them later.
    1288             :  */
    1289             : static void
    1290           6 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
    1291             : {
    1292             :     PGconn     *conn;
    1293             :     PGresult   *res;
    1294             : 
    1295           6 :     conn = connect_database(dbinfo[0].subconninfo, false);
    1296           6 :     if (conn != NULL)
    1297             :     {
    1298             :         /* Get failover replication slot names */
    1299           6 :         res = PQexec(conn,
    1300             :                      "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
    1301             : 
    1302           6 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
    1303             :         {
    1304             :             /* Remove failover replication slots from subscriber */
    1305          12 :             for (int i = 0; i < PQntuples(res); i++)
    1306           6 :                 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
    1307             :         }
    1308             :         else
    1309             :         {
    1310           0 :             pg_log_warning("could not obtain failover replication slot information: %s",
    1311             :                            PQresultErrorMessage(res));
    1312           0 :             pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1313             :         }
    1314             : 
    1315           6 :         PQclear(res);
    1316           6 :         disconnect_database(conn, false);
    1317             :     }
    1318             :     else
    1319             :     {
    1320           0 :         pg_log_warning("could not drop failover replication slot");
    1321           0 :         pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1322             :     }
    1323           6 : }
    1324             : 
    1325             : /*
    1326             :  * Create a logical replication slot and returns a LSN.
    1327             :  *
    1328             :  * CreateReplicationSlot() is not used because it does not provide the one-row
    1329             :  * result set that contains the LSN.
    1330             :  */
    1331             : static char *
    1332          10 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1333             : {
    1334          10 :     PQExpBuffer str = createPQExpBuffer();
    1335          10 :     PGresult   *res = NULL;
    1336          10 :     const char *slot_name = dbinfo->replslotname;
    1337             :     char       *slot_name_esc;
    1338          10 :     char       *lsn = NULL;
    1339             : 
    1340             :     Assert(conn != NULL);
    1341             : 
    1342          10 :     pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
    1343             :                 slot_name, dbinfo->dbname);
    1344             : 
    1345          10 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1346             : 
    1347          10 :     appendPQExpBuffer(str,
    1348             :                       "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
    1349             :                       slot_name_esc);
    1350             : 
    1351          10 :     PQfreemem(slot_name_esc);
    1352             : 
    1353          10 :     pg_log_debug("command is: %s", str->data);
    1354             : 
    1355          10 :     if (!dry_run)
    1356             :     {
    1357           4 :         res = PQexec(conn, str->data);
    1358           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1359             :         {
    1360           0 :             pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
    1361             :                          slot_name, dbinfo->dbname,
    1362             :                          PQresultErrorMessage(res));
    1363           0 :             PQclear(res);
    1364           0 :             destroyPQExpBuffer(str);
    1365           0 :             return NULL;
    1366             :         }
    1367             : 
    1368           4 :         lsn = pg_strdup(PQgetvalue(res, 0, 0));
    1369           4 :         PQclear(res);
    1370             :     }
    1371             : 
    1372             :     /* For cleanup purposes */
    1373          10 :     dbinfo->made_replslot = true;
    1374             : 
    1375          10 :     destroyPQExpBuffer(str);
    1376             : 
    1377          10 :     return lsn;
    1378             : }
    1379             : 
    1380             : static void
    1381          12 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
    1382             :                       const char *slot_name)
    1383             : {
    1384          12 :     PQExpBuffer str = createPQExpBuffer();
    1385             :     char       *slot_name_esc;
    1386             :     PGresult   *res;
    1387             : 
    1388             :     Assert(conn != NULL);
    1389             : 
    1390          12 :     pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
    1391             :                 slot_name, dbinfo->dbname);
    1392             : 
    1393          12 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1394             : 
    1395          12 :     appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
    1396             : 
    1397          12 :     PQfreemem(slot_name_esc);
    1398             : 
    1399          12 :     pg_log_debug("command is: %s", str->data);
    1400             : 
    1401          12 :     if (!dry_run)
    1402             :     {
    1403           4 :         res = PQexec(conn, str->data);
    1404           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1405             :         {
    1406           0 :             pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
    1407             :                          slot_name, dbinfo->dbname, PQresultErrorMessage(res));
    1408           0 :             dbinfo->made_replslot = false;   /* don't try again. */
    1409             :         }
    1410             : 
    1411           4 :         PQclear(res);
    1412             :     }
    1413             : 
    1414          12 :     destroyPQExpBuffer(str);
    1415          12 : }
    1416             : 
    1417             : /*
    1418             :  * Reports a suitable message if pg_ctl fails.
    1419             :  */
    1420             : static void
    1421          40 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
    1422             : {
    1423          40 :     if (rc != 0)
    1424             :     {
    1425           0 :         if (WIFEXITED(rc))
    1426             :         {
    1427           0 :             pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
    1428             :         }
    1429           0 :         else if (WIFSIGNALED(rc))
    1430             :         {
    1431             : #if defined(WIN32)
    1432             :             pg_log_error("pg_ctl was terminated by exception 0x%X",
    1433             :                          WTERMSIG(rc));
    1434             :             pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
    1435             : #else
    1436           0 :             pg_log_error("pg_ctl was terminated by signal %d: %s",
    1437             :                          WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
    1438             : #endif
    1439             :         }
    1440             :         else
    1441             :         {
    1442           0 :             pg_log_error("pg_ctl exited with unrecognized status %d", rc);
    1443             :         }
    1444             : 
    1445           0 :         pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
    1446           0 :         exit(1);
    1447             :     }
    1448          40 : }
    1449             : 
    1450             : static void
    1451          20 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
    1452             :                      bool restrict_logical_worker)
    1453             : {
    1454          20 :     PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    1455             :     int         rc;
    1456             : 
    1457          20 :     appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
    1458          20 :     appendShellString(pg_ctl_cmd, subscriber_dir);
    1459          20 :     appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
    1460             : 
    1461             :     /* Prevent unintended slot invalidation */
    1462          20 :     appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
    1463             : 
    1464          20 :     if (restricted_access)
    1465             :     {
    1466          20 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
    1467             : #if !defined(WIN32)
    1468             : 
    1469             :         /*
    1470             :          * An empty listen_addresses list means the server does not listen on
    1471             :          * any IP interfaces; only Unix-domain sockets can be used to connect
    1472             :          * to the server. Prevent external connections to minimize the chance
    1473             :          * of failure.
    1474             :          */
    1475          20 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
    1476          20 :         if (opt->socket_dir)
    1477          20 :             appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
    1478             :                               opt->socket_dir);
    1479          20 :         appendPQExpBufferChar(pg_ctl_cmd, '"');
    1480             : #endif
    1481             :     }
    1482          20 :     if (opt->config_file != NULL)
    1483           0 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
    1484             :                           opt->config_file);
    1485             : 
    1486             :     /* Suppress to start logical replication if requested */
    1487          20 :     if (restrict_logical_worker)
    1488           6 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
    1489             : 
    1490          20 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
    1491          20 :     rc = system(pg_ctl_cmd->data);
    1492          20 :     pg_ctl_status(pg_ctl_cmd->data, rc);
    1493          20 :     standby_running = true;
    1494          20 :     destroyPQExpBuffer(pg_ctl_cmd);
    1495          20 :     pg_log_info("server was started");
    1496          20 : }
    1497             : 
    1498             : static void
    1499          20 : stop_standby_server(const char *datadir)
    1500             : {
    1501             :     char       *pg_ctl_cmd;
    1502             :     int         rc;
    1503             : 
    1504          20 :     pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
    1505             :                           datadir);
    1506          20 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
    1507          20 :     rc = system(pg_ctl_cmd);
    1508          20 :     pg_ctl_status(pg_ctl_cmd, rc);
    1509          20 :     standby_running = false;
    1510          20 :     pg_log_info("server was stopped");
    1511          20 : }
    1512             : 
    1513             : /*
    1514             :  * Returns after the server finishes the recovery process.
    1515             :  *
    1516             :  * If recovery_timeout option is set, terminate abnormally without finishing
    1517             :  * the recovery process. By default, it waits forever.
    1518             :  *
    1519             :  * XXX Is the recovery process still in progress? When recovery process has a
    1520             :  * better progress reporting mechanism, it should be added here.
    1521             :  */
    1522             : static void
    1523           6 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
    1524             : {
    1525             :     PGconn     *conn;
    1526           6 :     int         status = POSTMASTER_STILL_STARTING;
    1527           6 :     int         timer = 0;
    1528             : 
    1529           6 :     pg_log_info("waiting for the target server to reach the consistent state");
    1530             : 
    1531           6 :     conn = connect_database(conninfo, true);
    1532             : 
    1533             :     for (;;)
    1534           2 :     {
    1535           8 :         bool        in_recovery = server_is_in_recovery(conn);
    1536             : 
    1537             :         /*
    1538             :          * Does the recovery process finish? In dry run mode, there is no
    1539             :          * recovery mode. Bail out as the recovery process has ended.
    1540             :          */
    1541           8 :         if (!in_recovery || dry_run)
    1542             :         {
    1543           6 :             status = POSTMASTER_READY;
    1544           6 :             recovery_ended = true;
    1545           6 :             break;
    1546             :         }
    1547             : 
    1548             :         /* Bail out after recovery_timeout seconds if this option is set */
    1549           2 :         if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
    1550             :         {
    1551           0 :             stop_standby_server(subscriber_dir);
    1552           0 :             pg_log_error("recovery timed out");
    1553           0 :             disconnect_database(conn, true);
    1554             :         }
    1555             : 
    1556             :         /* Keep waiting */
    1557           2 :         pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
    1558             : 
    1559           2 :         timer += WAIT_INTERVAL;
    1560             :     }
    1561             : 
    1562           6 :     disconnect_database(conn, false);
    1563             : 
    1564           6 :     if (status == POSTMASTER_STILL_STARTING)
    1565           0 :         pg_fatal("server did not end recovery");
    1566             : 
    1567           6 :     pg_log_info("target server reached the consistent state");
    1568           6 :     pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
    1569           6 : }
    1570             : 
    1571             : /*
    1572             :  * Create a publication that includes all tables in the database.
    1573             :  */
    1574             : static void
    1575          10 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1576             : {
    1577          10 :     PQExpBuffer str = createPQExpBuffer();
    1578             :     PGresult   *res;
    1579             :     char       *ipubname_esc;
    1580             :     char       *spubname_esc;
    1581             : 
    1582             :     Assert(conn != NULL);
    1583             : 
    1584          10 :     ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1585          10 :     spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1586             : 
    1587             :     /* Check if the publication already exists */
    1588          10 :     appendPQExpBuffer(str,
    1589             :                       "SELECT 1 FROM pg_catalog.pg_publication "
    1590             :                       "WHERE pubname = %s",
    1591             :                       spubname_esc);
    1592          10 :     res = PQexec(conn, str->data);
    1593          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1594             :     {
    1595           0 :         pg_log_error("could not obtain publication information: %s",
    1596             :                      PQresultErrorMessage(res));
    1597           0 :         disconnect_database(conn, true);
    1598             :     }
    1599             : 
    1600          10 :     if (PQntuples(res) == 1)
    1601             :     {
    1602             :         /*
    1603             :          * Unfortunately, if it reaches this code path, it will always fail
    1604             :          * (unless you decide to change the existing publication name). That's
    1605             :          * bad but it is very unlikely that the user will choose a name with
    1606             :          * pg_createsubscriber_ prefix followed by the exact database oid and
    1607             :          * a random number.
    1608             :          */
    1609           0 :         pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
    1610           0 :         pg_log_error_hint("Consider renaming this publication before continuing.");
    1611           0 :         disconnect_database(conn, true);
    1612             :     }
    1613             : 
    1614          10 :     PQclear(res);
    1615          10 :     resetPQExpBuffer(str);
    1616             : 
    1617          10 :     pg_log_info("creating publication \"%s\" in database \"%s\"",
    1618             :                 dbinfo->pubname, dbinfo->dbname);
    1619             : 
    1620          10 :     appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
    1621             :                       ipubname_esc);
    1622             : 
    1623          10 :     pg_log_debug("command is: %s", str->data);
    1624             : 
    1625          10 :     if (!dry_run)
    1626             :     {
    1627           4 :         res = PQexec(conn, str->data);
    1628           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1629             :         {
    1630           0 :             pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
    1631             :                          dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1632           0 :             disconnect_database(conn, true);
    1633             :         }
    1634           4 :         PQclear(res);
    1635             :     }
    1636             : 
    1637             :     /* For cleanup purposes */
    1638          10 :     dbinfo->made_publication = true;
    1639             : 
    1640          10 :     PQfreemem(ipubname_esc);
    1641          10 :     PQfreemem(spubname_esc);
    1642          10 :     destroyPQExpBuffer(str);
    1643          10 : }
    1644             : 
    1645             : /*
    1646             :  * Remove publication if it couldn't finish all steps.
    1647             :  */
    1648             : static void
    1649          10 : drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1650             : {
    1651          10 :     PQExpBuffer str = createPQExpBuffer();
    1652             :     PGresult   *res;
    1653             :     char       *pubname_esc;
    1654             : 
    1655             :     Assert(conn != NULL);
    1656             : 
    1657          10 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1658             : 
    1659          10 :     pg_log_info("dropping publication \"%s\" in database \"%s\"",
    1660             :                 dbinfo->pubname, dbinfo->dbname);
    1661             : 
    1662          10 :     appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
    1663             : 
    1664          10 :     PQfreemem(pubname_esc);
    1665             : 
    1666          10 :     pg_log_debug("command is: %s", str->data);
    1667             : 
    1668          10 :     if (!dry_run)
    1669             :     {
    1670           4 :         res = PQexec(conn, str->data);
    1671           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1672             :         {
    1673           0 :             pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
    1674             :                          dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1675           0 :             dbinfo->made_publication = false;    /* don't try again. */
    1676             : 
    1677             :             /*
    1678             :              * Don't disconnect and exit here. This routine is used by primary
    1679             :              * (cleanup publication / replication slot due to an error) and
    1680             :              * subscriber (remove the replicated publications). In both cases,
    1681             :              * it can continue and provide instructions for the user to remove
    1682             :              * it later if cleanup fails.
    1683             :              */
    1684             :         }
    1685           4 :         PQclear(res);
    1686             :     }
    1687             : 
    1688          10 :     destroyPQExpBuffer(str);
    1689          10 : }
    1690             : 
    1691             : /*
    1692             :  * Create a subscription with some predefined options.
    1693             :  *
    1694             :  * A replication slot was already created in a previous step. Let's use it.  It
    1695             :  * is not required to copy data. The subscription will be created but it will
    1696             :  * not be enabled now. That's because the replication progress must be set and
    1697             :  * the replication origin name (one of the function arguments) contains the
    1698             :  * subscription OID in its name. Once the subscription is created,
    1699             :  * set_replication_progress() can obtain the chosen origin name and set up its
    1700             :  * initial location.
    1701             :  */
    1702             : static void
    1703          10 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1704             : {
    1705          10 :     PQExpBuffer str = createPQExpBuffer();
    1706             :     PGresult   *res;
    1707             :     char       *pubname_esc;
    1708             :     char       *subname_esc;
    1709             :     char       *pubconninfo_esc;
    1710             :     char       *replslotname_esc;
    1711             : 
    1712             :     Assert(conn != NULL);
    1713             : 
    1714          10 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1715          10 :     subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1716          10 :     pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
    1717          10 :     replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
    1718             : 
    1719          10 :     pg_log_info("creating subscription \"%s\" in database \"%s\"",
    1720             :                 dbinfo->subname, dbinfo->dbname);
    1721             : 
    1722          10 :     appendPQExpBuffer(str,
    1723             :                       "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
    1724             :                       "WITH (create_slot = false, enabled = false, "
    1725             :                       "slot_name = %s, copy_data = false)",
    1726             :                       subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
    1727             : 
    1728          10 :     PQfreemem(pubname_esc);
    1729          10 :     PQfreemem(subname_esc);
    1730          10 :     PQfreemem(pubconninfo_esc);
    1731          10 :     PQfreemem(replslotname_esc);
    1732             : 
    1733          10 :     pg_log_debug("command is: %s", str->data);
    1734             : 
    1735          10 :     if (!dry_run)
    1736             :     {
    1737           4 :         res = PQexec(conn, str->data);
    1738           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1739             :         {
    1740           0 :             pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
    1741             :                          dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
    1742           0 :             disconnect_database(conn, true);
    1743             :         }
    1744           4 :         PQclear(res);
    1745             :     }
    1746             : 
    1747          10 :     destroyPQExpBuffer(str);
    1748          10 : }
    1749             : 
    1750             : /*
    1751             :  * Sets the replication progress to the consistent LSN.
    1752             :  *
    1753             :  * The subscriber caught up to the consistent LSN provided by the last
    1754             :  * replication slot that was created. The goal is to set up the initial
    1755             :  * location for the logical replication that is the exact LSN that the
    1756             :  * subscriber was promoted. Once the subscription is enabled it will start
    1757             :  * streaming from that location onwards.  In dry run mode, the subscription OID
    1758             :  * and LSN are set to invalid values for printing purposes.
    1759             :  */
    1760             : static void
    1761          10 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
    1762             : {
    1763          10 :     PQExpBuffer str = createPQExpBuffer();
    1764             :     PGresult   *res;
    1765             :     Oid         suboid;
    1766             :     char       *subname;
    1767             :     char       *dbname;
    1768             :     char       *originname;
    1769             :     char       *lsnstr;
    1770             : 
    1771             :     Assert(conn != NULL);
    1772             : 
    1773          10 :     subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
    1774          10 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1775             : 
    1776          10 :     appendPQExpBuffer(str,
    1777             :                       "SELECT s.oid FROM pg_catalog.pg_subscription s "
    1778             :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1779             :                       "WHERE s.subname = %s AND d.datname = %s",
    1780             :                       subname, dbname);
    1781             : 
    1782          10 :     res = PQexec(conn, str->data);
    1783          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1784             :     {
    1785           0 :         pg_log_error("could not obtain subscription OID: %s",
    1786             :                      PQresultErrorMessage(res));
    1787           0 :         disconnect_database(conn, true);
    1788             :     }
    1789             : 
    1790          10 :     if (PQntuples(res) != 1 && !dry_run)
    1791             :     {
    1792           0 :         pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
    1793             :                      PQntuples(res), 1);
    1794           0 :         disconnect_database(conn, true);
    1795             :     }
    1796             : 
    1797          10 :     if (dry_run)
    1798             :     {
    1799           6 :         suboid = InvalidOid;
    1800           6 :         lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1801             :     }
    1802             :     else
    1803             :     {
    1804           4 :         suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
    1805           4 :         lsnstr = psprintf("%s", lsn);
    1806             :     }
    1807             : 
    1808          10 :     PQclear(res);
    1809             : 
    1810             :     /*
    1811             :      * The origin name is defined as pg_%u. %u is the subscription OID. See
    1812             :      * ApplyWorkerMain().
    1813             :      */
    1814          10 :     originname = psprintf("pg_%u", suboid);
    1815             : 
    1816          10 :     pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    1817             :                 originname, lsnstr, dbinfo->dbname);
    1818             : 
    1819          10 :     resetPQExpBuffer(str);
    1820          10 :     appendPQExpBuffer(str,
    1821             :                       "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
    1822             :                       originname, lsnstr);
    1823             : 
    1824          10 :     pg_log_debug("command is: %s", str->data);
    1825             : 
    1826          10 :     if (!dry_run)
    1827             :     {
    1828           4 :         res = PQexec(conn, str->data);
    1829           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1830             :         {
    1831           0 :             pg_log_error("could not set replication progress for subscription \"%s\": %s",
    1832             :                          dbinfo->subname, PQresultErrorMessage(res));
    1833           0 :             disconnect_database(conn, true);
    1834             :         }
    1835           4 :         PQclear(res);
    1836             :     }
    1837             : 
    1838          10 :     PQfreemem(subname);
    1839          10 :     PQfreemem(dbname);
    1840          10 :     pg_free(originname);
    1841          10 :     pg_free(lsnstr);
    1842          10 :     destroyPQExpBuffer(str);
    1843          10 : }
    1844             : 
    1845             : /*
    1846             :  * Enables the subscription.
    1847             :  *
    1848             :  * The subscription was created in a previous step but it was disabled. After
    1849             :  * adjusting the initial logical replication location, enable the subscription.
    1850             :  */
    1851             : static void
    1852          10 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1853             : {
    1854          10 :     PQExpBuffer str = createPQExpBuffer();
    1855             :     PGresult   *res;
    1856             :     char       *subname;
    1857             : 
    1858             :     Assert(conn != NULL);
    1859             : 
    1860          10 :     subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1861             : 
    1862          10 :     pg_log_info("enabling subscription \"%s\" in database \"%s\"",
    1863             :                 dbinfo->subname, dbinfo->dbname);
    1864             : 
    1865          10 :     appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
    1866             : 
    1867          10 :     pg_log_debug("command is: %s", str->data);
    1868             : 
    1869          10 :     if (!dry_run)
    1870             :     {
    1871           4 :         res = PQexec(conn, str->data);
    1872           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1873             :         {
    1874           0 :             pg_log_error("could not enable subscription \"%s\": %s",
    1875             :                          dbinfo->subname, PQresultErrorMessage(res));
    1876           0 :             disconnect_database(conn, true);
    1877             :         }
    1878             : 
    1879           4 :         PQclear(res);
    1880             :     }
    1881             : 
    1882          10 :     PQfreemem(subname);
    1883          10 :     destroyPQExpBuffer(str);
    1884          10 : }
    1885             : 
    1886             : int
    1887          40 : main(int argc, char **argv)
    1888             : {
    1889             :     static struct option long_options[] =
    1890             :     {
    1891             :         {"database", required_argument, NULL, 'd'},
    1892             :         {"pgdata", required_argument, NULL, 'D'},
    1893             :         {"dry-run", no_argument, NULL, 'n'},
    1894             :         {"subscriber-port", required_argument, NULL, 'p'},
    1895             :         {"publisher-server", required_argument, NULL, 'P'},
    1896             :         {"socketdir", required_argument, NULL, 's'},
    1897             :         {"recovery-timeout", required_argument, NULL, 't'},
    1898             :         {"subscriber-username", required_argument, NULL, 'U'},
    1899             :         {"verbose", no_argument, NULL, 'v'},
    1900             :         {"version", no_argument, NULL, 'V'},
    1901             :         {"help", no_argument, NULL, '?'},
    1902             :         {"config-file", required_argument, NULL, 1},
    1903             :         {"publication", required_argument, NULL, 2},
    1904             :         {"replication-slot", required_argument, NULL, 3},
    1905             :         {"subscription", required_argument, NULL, 4},
    1906             :         {NULL, 0, NULL, 0}
    1907             :     };
    1908             : 
    1909          40 :     struct CreateSubscriberOptions opt = {0};
    1910             : 
    1911             :     int         c;
    1912             :     int         option_index;
    1913             : 
    1914             :     char       *pub_base_conninfo;
    1915             :     char       *sub_base_conninfo;
    1916          40 :     char       *dbname_conninfo = NULL;
    1917             : 
    1918             :     uint64      pub_sysid;
    1919             :     uint64      sub_sysid;
    1920             :     struct stat statbuf;
    1921             : 
    1922             :     char       *consistent_lsn;
    1923             : 
    1924             :     char        pidfile[MAXPGPATH];
    1925             : 
    1926          40 :     pg_logging_init(argv[0]);
    1927          40 :     pg_logging_set_level(PG_LOG_WARNING);
    1928          40 :     progname = get_progname(argv[0]);
    1929          40 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
    1930             : 
    1931          40 :     if (argc > 1)
    1932             :     {
    1933          38 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
    1934             :         {
    1935           2 :             usage();
    1936           2 :             exit(0);
    1937             :         }
    1938          36 :         else if (strcmp(argv[1], "-V") == 0
    1939          36 :                  || strcmp(argv[1], "--version") == 0)
    1940             :         {
    1941           2 :             puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
    1942           2 :             exit(0);
    1943             :         }
    1944             :     }
    1945             : 
    1946             :     /* Default settings */
    1947          36 :     subscriber_dir = NULL;
    1948          36 :     opt.config_file = NULL;
    1949          36 :     opt.pub_conninfo_str = NULL;
    1950          36 :     opt.socket_dir = NULL;
    1951          36 :     opt.sub_port = DEFAULT_SUB_PORT;
    1952          36 :     opt.sub_username = NULL;
    1953          36 :     opt.database_names = (SimpleStringList)
    1954             :     {
    1955             :         0
    1956             :     };
    1957          36 :     opt.recovery_timeout = 0;
    1958             : 
    1959             :     /*
    1960             :      * Don't allow it to be run as root. It uses pg_ctl which does not allow
    1961             :      * it either.
    1962             :      */
    1963             : #ifndef WIN32
    1964          36 :     if (geteuid() == 0)
    1965             :     {
    1966           0 :         pg_log_error("cannot be executed by \"root\"");
    1967           0 :         pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
    1968             :                           progname);
    1969           0 :         exit(1);
    1970             :     }
    1971             : #endif
    1972             : 
    1973          36 :     get_restricted_token();
    1974             : 
    1975         268 :     while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
    1976             :                             long_options, &option_index)) != -1)
    1977             :     {
    1978         238 :         switch (c)
    1979             :         {
    1980          48 :             case 'd':
    1981          48 :                 if (!simple_string_list_member(&opt.database_names, optarg))
    1982             :                 {
    1983          46 :                     simple_string_list_append(&opt.database_names, optarg);
    1984          46 :                     num_dbs++;
    1985             :                 }
    1986             :                 else
    1987             :                 {
    1988           2 :                     pg_log_error("database \"%s\" specified more than once", optarg);
    1989           2 :                     exit(1);
    1990             :                 }
    1991          46 :                 break;
    1992          32 :             case 'D':
    1993          32 :                 subscriber_dir = pg_strdup(optarg);
    1994          32 :                 canonicalize_path(subscriber_dir);
    1995          32 :                 break;
    1996          14 :             case 'n':
    1997          14 :                 dry_run = true;
    1998          14 :                 break;
    1999          18 :             case 'p':
    2000          18 :                 opt.sub_port = pg_strdup(optarg);
    2001          18 :                 break;
    2002          30 :             case 'P':
    2003          30 :                 opt.pub_conninfo_str = pg_strdup(optarg);
    2004          30 :                 break;
    2005          18 :             case 's':
    2006          18 :                 opt.socket_dir = pg_strdup(optarg);
    2007          18 :                 canonicalize_path(opt.socket_dir);
    2008          18 :                 break;
    2009           4 :             case 't':
    2010           4 :                 opt.recovery_timeout = atoi(optarg);
    2011           4 :                 break;
    2012           0 :             case 'U':
    2013           0 :                 opt.sub_username = pg_strdup(optarg);
    2014           0 :                 break;
    2015          32 :             case 'v':
    2016          32 :                 pg_logging_increase_verbosity();
    2017          32 :                 break;
    2018           0 :             case 1:
    2019           0 :                 opt.config_file = pg_strdup(optarg);
    2020           0 :                 break;
    2021          22 :             case 2:
    2022          22 :                 if (!simple_string_list_member(&opt.pub_names, optarg))
    2023             :                 {
    2024          20 :                     simple_string_list_append(&opt.pub_names, optarg);
    2025          20 :                     num_pubs++;
    2026             :                 }
    2027             :                 else
    2028             :                 {
    2029           2 :                     pg_log_error("publication \"%s\" specified more than once", optarg);
    2030           2 :                     exit(1);
    2031             :                 }
    2032          20 :                 break;
    2033           8 :             case 3:
    2034           8 :                 if (!simple_string_list_member(&opt.replslot_names, optarg))
    2035             :                 {
    2036           8 :                     simple_string_list_append(&opt.replslot_names, optarg);
    2037           8 :                     num_replslots++;
    2038             :                 }
    2039             :                 else
    2040             :                 {
    2041           0 :                     pg_log_error("replication slot \"%s\" specified more than once", optarg);
    2042           0 :                     exit(1);
    2043             :                 }
    2044           8 :                 break;
    2045          10 :             case 4:
    2046          10 :                 if (!simple_string_list_member(&opt.sub_names, optarg))
    2047             :                 {
    2048          10 :                     simple_string_list_append(&opt.sub_names, optarg);
    2049          10 :                     num_subs++;
    2050             :                 }
    2051             :                 else
    2052             :                 {
    2053           0 :                     pg_log_error("subscription \"%s\" specified more than once", optarg);
    2054           0 :                     exit(1);
    2055             :                 }
    2056          10 :                 break;
    2057           2 :             default:
    2058             :                 /* getopt_long already emitted a complaint */
    2059           2 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2060           2 :                 exit(1);
    2061             :         }
    2062             :     }
    2063             : 
    2064             :     /* Any non-option arguments? */
    2065          30 :     if (optind < argc)
    2066             :     {
    2067           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
    2068             :                      argv[optind]);
    2069           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2070           0 :         exit(1);
    2071             :     }
    2072             : 
    2073             :     /* Required arguments */
    2074          30 :     if (subscriber_dir == NULL)
    2075             :     {
    2076           2 :         pg_log_error("no subscriber data directory specified");
    2077           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2078           2 :         exit(1);
    2079             :     }
    2080             : 
    2081             :     /* If socket directory is not provided, use the current directory */
    2082          28 :     if (opt.socket_dir == NULL)
    2083             :     {
    2084             :         char        cwd[MAXPGPATH];
    2085             : 
    2086          10 :         if (!getcwd(cwd, MAXPGPATH))
    2087           0 :             pg_fatal("could not determine current directory");
    2088          10 :         opt.socket_dir = pg_strdup(cwd);
    2089          10 :         canonicalize_path(opt.socket_dir);
    2090             :     }
    2091             : 
    2092             :     /*
    2093             :      * Parse connection string. Build a base connection string that might be
    2094             :      * reused by multiple databases.
    2095             :      */
    2096          28 :     if (opt.pub_conninfo_str == NULL)
    2097             :     {
    2098             :         /*
    2099             :          * TODO use primary_conninfo (if available) from subscriber and
    2100             :          * extract publisher connection string. Assume that there are
    2101             :          * identical entries for physical and logical replication. If there is
    2102             :          * not, we would fail anyway.
    2103             :          */
    2104           2 :         pg_log_error("no publisher connection string specified");
    2105           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2106           2 :         exit(1);
    2107             :     }
    2108          26 :     pg_log_info("validating publisher connection string");
    2109          26 :     pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
    2110             :                                           &dbname_conninfo);
    2111          26 :     if (pub_base_conninfo == NULL)
    2112           0 :         exit(1);
    2113             : 
    2114          26 :     pg_log_info("validating subscriber connection string");
    2115          26 :     sub_base_conninfo = get_sub_conninfo(&opt);
    2116             : 
    2117          26 :     if (opt.database_names.head == NULL)
    2118             :     {
    2119           4 :         pg_log_info("no database was specified");
    2120             : 
    2121             :         /*
    2122             :          * If --database option is not provided, try to obtain the dbname from
    2123             :          * the publisher conninfo. If dbname parameter is not available, error
    2124             :          * out.
    2125             :          */
    2126           4 :         if (dbname_conninfo)
    2127             :         {
    2128           2 :             simple_string_list_append(&opt.database_names, dbname_conninfo);
    2129           2 :             num_dbs++;
    2130             : 
    2131           2 :             pg_log_info("database name \"%s\" was extracted from the publisher connection string",
    2132             :                         dbname_conninfo);
    2133             :         }
    2134             :         else
    2135             :         {
    2136           2 :             pg_log_error("no database name specified");
    2137           2 :             pg_log_error_hint("Try \"%s --help\" for more information.",
    2138             :                               progname);
    2139           2 :             exit(1);
    2140             :         }
    2141             :     }
    2142             : 
    2143             :     /* Number of object names must match number of databases */
    2144          24 :     if (num_pubs > 0 && num_pubs != num_dbs)
    2145             :     {
    2146           2 :         pg_log_error("wrong number of publication names specified");
    2147           2 :         pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
    2148             :                             num_pubs, num_dbs);
    2149           2 :         exit(1);
    2150             :     }
    2151          22 :     if (num_subs > 0 && num_subs != num_dbs)
    2152             :     {
    2153           2 :         pg_log_error("wrong number of subscription names specified");
    2154           2 :         pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
    2155             :                             num_subs, num_dbs);
    2156           2 :         exit(1);
    2157             :     }
    2158          20 :     if (num_replslots > 0 && num_replslots != num_dbs)
    2159             :     {
    2160           2 :         pg_log_error("wrong number of replication slot names specified");
    2161           2 :         pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
    2162             :                             num_replslots, num_dbs);
    2163           2 :         exit(1);
    2164             :     }
    2165             : 
    2166             :     /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    2167          18 :     pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    2168          18 :     pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
    2169             : 
    2170             :     /* Rudimentary check for a data directory */
    2171          18 :     check_data_directory(subscriber_dir);
    2172             : 
    2173             :     /*
    2174             :      * Store database information for publisher and subscriber. It should be
    2175             :      * called before atexit() because its return is used in the
    2176             :      * cleanup_objects_atexit().
    2177             :      */
    2178          18 :     dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
    2179             : 
    2180             :     /* Register a function to clean up objects in case of failure */
    2181          18 :     atexit(cleanup_objects_atexit);
    2182             : 
    2183             :     /*
    2184             :      * Check if the subscriber data directory has the same system identifier
    2185             :      * than the publisher data directory.
    2186             :      */
    2187          18 :     pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
    2188          18 :     sub_sysid = get_standby_sysid(subscriber_dir);
    2189          18 :     if (pub_sysid != sub_sysid)
    2190           2 :         pg_fatal("subscriber data directory is not a copy of the source database cluster");
    2191             : 
    2192             :     /* Subscriber PID file */
    2193          16 :     snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
    2194             : 
    2195             :     /*
    2196             :      * The standby server must not be running. If the server is started under
    2197             :      * service manager and pg_createsubscriber stops it, the service manager
    2198             :      * might react to this action and start the server again. Therefore,
    2199             :      * refuse to proceed if the server is running to avoid possible failures.
    2200             :      */
    2201          16 :     if (stat(pidfile, &statbuf) == 0)
    2202             :     {
    2203           2 :         pg_log_error("standby server is running");
    2204           2 :         pg_log_error_hint("Stop the standby server and try again.");
    2205           2 :         exit(1);
    2206             :     }
    2207             : 
    2208             :     /*
    2209             :      * Start a short-lived standby server with temporary parameters (provided
    2210             :      * by command-line options). The goal is to avoid connections during the
    2211             :      * transformation steps.
    2212             :      */
    2213          14 :     pg_log_info("starting the standby server with command-line options");
    2214          14 :     start_standby_server(&opt, true, false);
    2215             : 
    2216             :     /* Check if the standby server is ready for logical replication */
    2217          14 :     check_subscriber(dbinfo);
    2218             : 
    2219             :     /* Check if the primary server is ready for logical replication */
    2220          10 :     check_publisher(dbinfo);
    2221             : 
    2222             :     /*
    2223             :      * Stop the target server. The recovery process requires that the server
    2224             :      * reaches a consistent state before targeting the recovery stop point.
    2225             :      * Make sure a consistent state is reached (stop the target server
    2226             :      * guarantees it) *before* creating the replication slots in
    2227             :      * setup_publisher().
    2228             :      */
    2229           6 :     pg_log_info("stopping the subscriber");
    2230           6 :     stop_standby_server(subscriber_dir);
    2231             : 
    2232             :     /* Create the required objects for each database on publisher */
    2233           6 :     consistent_lsn = setup_publisher(dbinfo);
    2234             : 
    2235             :     /* Write the required recovery parameters */
    2236           6 :     setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
    2237             : 
    2238             :     /*
    2239             :      * Start subscriber so the recovery parameters will take effect. Wait
    2240             :      * until accepting connections. We don't want to start logical replication
    2241             :      * during setup.
    2242             :      */
    2243           6 :     pg_log_info("starting the subscriber");
    2244           6 :     start_standby_server(&opt, true, true);
    2245             : 
    2246             :     /* Waiting the subscriber to be promoted */
    2247           6 :     wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
    2248             : 
    2249             :     /*
    2250             :      * Create the subscription for each database on subscriber. It does not
    2251             :      * enable it immediately because it needs to adjust the replication start
    2252             :      * point to the LSN reported by setup_publisher().  It also cleans up
    2253             :      * publications created by this tool and replication to the standby.
    2254             :      */
    2255           6 :     setup_subscriber(dbinfo, consistent_lsn);
    2256             : 
    2257             :     /* Remove primary_slot_name if it exists on primary */
    2258           6 :     drop_primary_replication_slot(dbinfo, primary_slot_name);
    2259             : 
    2260             :     /* Remove failover replication slots if they exist on subscriber */
    2261           6 :     drop_failover_replication_slots(dbinfo);
    2262             : 
    2263             :     /* Stop the subscriber */
    2264           6 :     pg_log_info("stopping the subscriber");
    2265           6 :     stop_standby_server(subscriber_dir);
    2266             : 
    2267             :     /* Change system identifier from subscriber */
    2268           6 :     modify_subscriber_sysid(&opt);
    2269             : 
    2270           6 :     success = true;
    2271             : 
    2272           6 :     pg_log_info("Done!");
    2273             : 
    2274           6 :     return 0;
    2275             : }

Generated by: LCOV version 1.14