LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_createsubscriber.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 767 917 83.6 %
Date: 2025-11-01 02:18:25 Functions: 39 39 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_createsubscriber.c
       4             :  *    Create a new logical replica from a standby server
       5             :  *
       6             :  * Copyright (c) 2024-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/bin/pg_basebackup/pg_createsubscriber.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : 
      14             : #include "postgres_fe.h"
      15             : 
      16             : #include <sys/stat.h>
      17             : #include <sys/time.h>
      18             : #include <sys/wait.h>
      19             : #include <time.h>
      20             : 
      21             : #include "common/connect.h"
      22             : #include "common/controldata_utils.h"
      23             : #include "common/logging.h"
      24             : #include "common/pg_prng.h"
      25             : #include "common/restricted_token.h"
      26             : #include "fe_utils/recovery_gen.h"
      27             : #include "fe_utils/simple_list.h"
      28             : #include "fe_utils/string_utils.h"
      29             : #include "fe_utils/version.h"
      30             : #include "getopt_long.h"
      31             : 
      32             : #define DEFAULT_SUB_PORT    "50432"
      33             : #define OBJECTTYPE_PUBLICATIONS  0x0001
      34             : 
      35             : /* Command-line options */
      36             : struct CreateSubscriberOptions
      37             : {
      38             :     char       *config_file;    /* configuration file */
      39             :     char       *pub_conninfo_str;   /* publisher connection string */
      40             :     char       *socket_dir;     /* directory for Unix-domain socket, if any */
      41             :     char       *sub_port;       /* subscriber port number */
      42             :     const char *sub_username;   /* subscriber username */
      43             :     bool        two_phase;      /* enable-two-phase option */
      44             :     SimpleStringList database_names;    /* list of database names */
      45             :     SimpleStringList pub_names; /* list of publication names */
      46             :     SimpleStringList sub_names; /* list of subscription names */
      47             :     SimpleStringList replslot_names;    /* list of replication slot names */
      48             :     int         recovery_timeout;   /* stop recovery after this time */
      49             :     bool        all_dbs;        /* all option */
      50             :     SimpleStringList objecttypes_to_clean;  /* list of object types to cleanup */
      51             : };
      52             : 
      53             : /* per-database publication/subscription info */
      54             : struct LogicalRepInfo
      55             : {
      56             :     char       *dbname;         /* database name */
      57             :     char       *pubconninfo;    /* publisher connection string */
      58             :     char       *subconninfo;    /* subscriber connection string */
      59             :     char       *pubname;        /* publication name */
      60             :     char       *subname;        /* subscription name */
      61             :     char       *replslotname;   /* replication slot name */
      62             : 
      63             :     bool        made_replslot;  /* replication slot was created */
      64             :     bool        made_publication;   /* publication was created */
      65             : };
      66             : 
      67             : /*
      68             :  * Information shared across all the databases (or publications and
      69             :  * subscriptions).
      70             :  */
      71             : struct LogicalRepInfos
      72             : {
      73             :     struct LogicalRepInfo *dbinfo;
      74             :     bool        two_phase;      /* enable-two-phase option */
      75             :     bits32      objecttypes_to_clean;   /* flags indicating which object types
      76             :                                          * to clean up on subscriber */
      77             : };
      78             : 
      79             : static void cleanup_objects_atexit(void);
      80             : static void usage();
      81             : static char *get_base_conninfo(const char *conninfo, char **dbname);
      82             : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
      83             : static char *get_exec_path(const char *argv0, const char *progname);
      84             : static void check_data_directory(const char *datadir);
      85             : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
      86             : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
      87             :                                                  const char *pub_base_conninfo,
      88             :                                                  const char *sub_base_conninfo);
      89             : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
      90             : static void disconnect_database(PGconn *conn, bool exit_on_error);
      91             : static uint64 get_primary_sysid(const char *conninfo);
      92             : static uint64 get_standby_sysid(const char *datadir);
      93             : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
      94             : static bool server_is_in_recovery(PGconn *conn);
      95             : static char *generate_object_name(PGconn *conn);
      96             : static void check_publisher(const struct LogicalRepInfo *dbinfo);
      97             : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
      98             : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
      99             : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
     100             :                              const char *consistent_lsn);
     101             : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
     102             :                            const char *lsn);
     103             : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
     104             :                                           const char *slotname);
     105             : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
     106             : static char *create_logical_replication_slot(PGconn *conn,
     107             :                                              struct LogicalRepInfo *dbinfo);
     108             : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
     109             :                                   const char *slot_name);
     110             : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
     111             : static void start_standby_server(const struct CreateSubscriberOptions *opt,
     112             :                                  bool restricted_access,
     113             :                                  bool restrict_logical_worker);
     114             : static void stop_standby_server(const char *datadir);
     115             : static void wait_for_end_recovery(const char *conninfo,
     116             :                                   const struct CreateSubscriberOptions *opt);
     117             : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     118             : static void drop_publication(PGconn *conn, const char *pubname,
     119             :                              const char *dbname, bool *made_publication);
     120             : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
     121             : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     122             : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
     123             :                                      const char *lsn);
     124             : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     125             : static void check_and_drop_existing_subscriptions(PGconn *conn,
     126             :                                                   const struct LogicalRepInfo *dbinfo);
     127             : static void drop_existing_subscription(PGconn *conn, const char *subname,
     128             :                                        const char *dbname);
     129             : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
     130             :                                     bool dbnamespecified);
     131             : 
     132             : #define USEC_PER_SEC    1000000
     133             : #define WAIT_INTERVAL   1       /* 1 second */
     134             : 
     135             : static const char *progname;
     136             : 
     137             : static char *primary_slot_name = NULL;
     138             : static bool dry_run = false;
     139             : 
     140             : static bool success = false;
     141             : 
     142             : static struct LogicalRepInfos dbinfos;
     143             : static int  num_dbs = 0;        /* number of specified databases */
     144             : static int  num_pubs = 0;       /* number of specified publications */
     145             : static int  num_subs = 0;       /* number of specified subscriptions */
     146             : static int  num_replslots = 0;  /* number of specified replication slots */
     147             : 
     148             : static pg_prng_state prng_state;
     149             : 
     150             : static char *pg_ctl_path = NULL;
     151             : static char *pg_resetwal_path = NULL;
     152             : 
     153             : /* standby / subscriber data directory */
     154             : static char *subscriber_dir = NULL;
     155             : 
     156             : static bool recovery_ended = false;
     157             : static bool standby_running = false;
     158             : 
     159             : enum WaitPMResult
     160             : {
     161             :     POSTMASTER_READY,
     162             :     POSTMASTER_STILL_STARTING
     163             : };
     164             : 
     165             : 
     166             : /*
     167             :  * Cleanup objects that were created by pg_createsubscriber if there is an
     168             :  * error.
     169             :  *
     170             :  * Publications and replication slots are created on primary. Depending on the
     171             :  * step it failed, it should remove the already created objects if it is
     172             :  * possible (sometimes it won't work due to a connection issue).
     173             :  * There is no cleanup on the target server. The steps on the target server are
     174             :  * executed *after* promotion, hence, at this point, a failure means recreate
     175             :  * the physical replica and start again.
     176             :  */
     177             : static void
     178          20 : cleanup_objects_atexit(void)
     179             : {
     180          20 :     if (success)
     181           8 :         return;
     182             : 
     183             :     /*
     184             :      * If the server is promoted, there is no way to use the current setup
     185             :      * again. Warn the user that a new replication setup should be done before
     186             :      * trying again.
     187             :      */
     188          12 :     if (recovery_ended)
     189             :     {
     190           0 :         pg_log_warning("failed after the end of recovery");
     191           0 :         pg_log_warning_hint("The target server cannot be used as a physical replica anymore.  "
     192             :                             "You must recreate the physical replica before continuing.");
     193             :     }
     194             : 
     195          36 :     for (int i = 0; i < num_dbs; i++)
     196             :     {
     197          24 :         struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
     198             : 
     199          24 :         if (dbinfo->made_publication || dbinfo->made_replslot)
     200             :         {
     201             :             PGconn     *conn;
     202             : 
     203           0 :             conn = connect_database(dbinfo->pubconninfo, false);
     204           0 :             if (conn != NULL)
     205             :             {
     206           0 :                 if (dbinfo->made_publication)
     207           0 :                     drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
     208             :                                      &dbinfo->made_publication);
     209           0 :                 if (dbinfo->made_replslot)
     210           0 :                     drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
     211           0 :                 disconnect_database(conn, false);
     212             :             }
     213             :             else
     214             :             {
     215             :                 /*
     216             :                  * If a connection could not be established, inform the user
     217             :                  * that some objects were left on primary and should be
     218             :                  * removed before trying again.
     219             :                  */
     220           0 :                 if (dbinfo->made_publication)
     221             :                 {
     222           0 :                     pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
     223             :                                    dbinfo->pubname,
     224             :                                    dbinfo->dbname);
     225           0 :                     pg_log_warning_hint("Drop this publication before trying again.");
     226             :                 }
     227           0 :                 if (dbinfo->made_replslot)
     228             :                 {
     229           0 :                     pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
     230             :                                    dbinfo->replslotname,
     231             :                                    dbinfo->dbname);
     232           0 :                     pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
     233             :                 }
     234             :             }
     235             :         }
     236             :     }
     237             : 
     238          12 :     if (standby_running)
     239           8 :         stop_standby_server(subscriber_dir);
     240             : }
     241             : 
     242             : static void
     243           2 : usage(void)
     244             : {
     245           2 :     printf(_("%s creates a new logical replica from a standby server.\n\n"),
     246             :            progname);
     247           2 :     printf(_("Usage:\n"));
     248           2 :     printf(_("  %s [OPTION]...\n"), progname);
     249           2 :     printf(_("\nOptions:\n"));
     250           2 :     printf(_("  -a, --all                       create subscriptions for all databases except template\n"
     251             :              "                                  databases and databases that don't allow connections\n"));
     252           2 :     printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
     253           2 :     printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
     254           2 :     printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
     255           2 :     printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
     256           2 :     printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
     257           2 :     printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
     258           2 :     printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
     259           2 :     printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
     260           2 :     printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
     261           2 :     printf(_("  -v, --verbose                   output verbose messages\n"));
     262           2 :     printf(_("      --clean=OBJECTTYPE          drop all objects of the specified type from specified\n"
     263             :              "                                  databases on the subscriber; accepts: \"%s\"\n"), "publications");
     264           2 :     printf(_("      --config-file=FILENAME      use specified main server configuration\n"
     265             :              "                                  file when running target cluster\n"));
     266           2 :     printf(_("      --publication=NAME          publication name\n"));
     267           2 :     printf(_("      --replication-slot=NAME     replication slot name\n"));
     268           2 :     printf(_("      --subscription=NAME         subscription name\n"));
     269           2 :     printf(_("  -V, --version                   output version information, then exit\n"));
     270           2 :     printf(_("  -?, --help                      show this help, then exit\n"));
     271           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     272           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     273           2 : }
     274             : 
     275             : /*
     276             :  * Subroutine to append "keyword=value" to a connection string,
     277             :  * with proper quoting of the value.  (We assume keywords don't need that.)
     278             :  */
     279             : static void
     280         214 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
     281             : {
     282         214 :     if (buf->len > 0)
     283         158 :         appendPQExpBufferChar(buf, ' ');
     284         214 :     appendPQExpBufferStr(buf, keyword);
     285         214 :     appendPQExpBufferChar(buf, '=');
     286         214 :     appendConnStrVal(buf, val);
     287         214 : }
     288             : 
     289             : /*
     290             :  * Validate a connection string. Returns a base connection string that is a
     291             :  * connection string without a database name.
     292             :  *
     293             :  * Since we might process multiple databases, each database name will be
     294             :  * appended to this base connection string to provide a final connection
     295             :  * string. If the second argument (dbname) is not null, returns dbname if the
     296             :  * provided connection string contains it.
     297             :  *
     298             :  * It is the caller's responsibility to free the returned connection string and
     299             :  * dbname.
     300             :  */
     301             : static char *
     302          28 : get_base_conninfo(const char *conninfo, char **dbname)
     303             : {
     304             :     PQExpBuffer buf;
     305             :     PQconninfoOption *conn_opts;
     306             :     PQconninfoOption *conn_opt;
     307          28 :     char       *errmsg = NULL;
     308             :     char       *ret;
     309             : 
     310          28 :     conn_opts = PQconninfoParse(conninfo, &errmsg);
     311          28 :     if (conn_opts == NULL)
     312             :     {
     313           0 :         pg_log_error("could not parse connection string: %s", errmsg);
     314           0 :         PQfreemem(errmsg);
     315           0 :         return NULL;
     316             :     }
     317             : 
     318          28 :     buf = createPQExpBuffer();
     319        1456 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     320             :     {
     321        1428 :         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     322             :         {
     323          66 :             if (strcmp(conn_opt->keyword, "dbname") == 0)
     324             :             {
     325          18 :                 if (dbname)
     326          18 :                     *dbname = pg_strdup(conn_opt->val);
     327          18 :                 continue;
     328             :             }
     329          48 :             appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
     330             :         }
     331             :     }
     332             : 
     333          28 :     ret = pg_strdup(buf->data);
     334             : 
     335          28 :     destroyPQExpBuffer(buf);
     336          28 :     PQconninfoFree(conn_opts);
     337             : 
     338          28 :     return ret;
     339             : }
     340             : 
     341             : /*
     342             :  * Build a subscriber connection string. Only a few parameters are supported
     343             :  * since it starts a server with restricted access.
     344             :  */
     345             : static char *
     346          28 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
     347             : {
     348          28 :     PQExpBuffer buf = createPQExpBuffer();
     349             :     char       *ret;
     350             : 
     351          28 :     appendConnStrItem(buf, "port", opt->sub_port);
     352             : #if !defined(WIN32)
     353          28 :     appendConnStrItem(buf, "host", opt->socket_dir);
     354             : #endif
     355          28 :     if (opt->sub_username != NULL)
     356           0 :         appendConnStrItem(buf, "user", opt->sub_username);
     357          28 :     appendConnStrItem(buf, "fallback_application_name", progname);
     358             : 
     359          28 :     ret = pg_strdup(buf->data);
     360             : 
     361          28 :     destroyPQExpBuffer(buf);
     362             : 
     363          28 :     return ret;
     364             : }
     365             : 
     366             : /*
     367             :  * Verify if a PostgreSQL binary (progname) is available in the same directory as
     368             :  * pg_createsubscriber and it has the same version.  It returns the absolute
     369             :  * path of the progname.
     370             :  */
     371             : static char *
     372          40 : get_exec_path(const char *argv0, const char *progname)
     373             : {
     374             :     char       *versionstr;
     375             :     char       *exec_path;
     376             :     int         ret;
     377             : 
     378          40 :     versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
     379          40 :     exec_path = pg_malloc(MAXPGPATH);
     380          40 :     ret = find_other_exec(argv0, progname, versionstr, exec_path);
     381             : 
     382          40 :     if (ret < 0)
     383             :     {
     384             :         char        full_path[MAXPGPATH];
     385             : 
     386           0 :         if (find_my_exec(argv0, full_path) < 0)
     387           0 :             strlcpy(full_path, progname, sizeof(full_path));
     388             : 
     389           0 :         if (ret == -1)
     390           0 :             pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
     391             :                      progname, "pg_createsubscriber", full_path);
     392             :         else
     393           0 :             pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
     394             :                      progname, full_path, "pg_createsubscriber");
     395             :     }
     396             : 
     397          40 :     pg_log_debug("%s path is:  %s", progname, exec_path);
     398             : 
     399          40 :     return exec_path;
     400             : }
     401             : 
     402             : /*
     403             :  * Is it a cluster directory? These are preliminary checks. It is far from
     404             :  * making an accurate check. If it is not a clone from the publisher, it will
     405             :  * eventually fail in a future step.
     406             :  */
     407             : static void
     408          20 : check_data_directory(const char *datadir)
     409             : {
     410             :     struct stat statbuf;
     411             :     uint32      major_version;
     412             :     char       *version_str;
     413             : 
     414          20 :     pg_log_info("checking if directory \"%s\" is a cluster data directory",
     415             :                 datadir);
     416             : 
     417          20 :     if (stat(datadir, &statbuf) != 0)
     418             :     {
     419           0 :         if (errno == ENOENT)
     420           0 :             pg_fatal("data directory \"%s\" does not exist", datadir);
     421             :         else
     422           0 :             pg_fatal("could not access directory \"%s\": %m", datadir);
     423             :     }
     424             : 
     425             :     /*
     426             :      * Retrieve the contents of this cluster's PG_VERSION.  We require
     427             :      * compatibility with the same major version as the one this tool is
     428             :      * compiled with.
     429             :      */
     430          20 :     major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
     431          20 :     if (major_version != PG_MAJORVERSION_NUM)
     432             :     {
     433           0 :         pg_log_error("data directory is of wrong version");
     434           0 :         pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
     435             :                             "PG_VERSION", version_str, PG_MAJORVERSION);
     436           0 :         exit(1);
     437             :     }
     438          20 : }
     439             : 
     440             : /*
     441             :  * Append database name into a base connection string.
     442             :  *
     443             :  * dbname is the only parameter that changes so it is not included in the base
     444             :  * connection string. This function concatenates dbname to build a "real"
     445             :  * connection string.
     446             :  */
     447             : static char *
     448          82 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
     449             : {
     450          82 :     PQExpBuffer buf = createPQExpBuffer();
     451             :     char       *ret;
     452             : 
     453             :     Assert(conninfo != NULL);
     454             : 
     455          82 :     appendPQExpBufferStr(buf, conninfo);
     456          82 :     appendConnStrItem(buf, "dbname", dbname);
     457             : 
     458          82 :     ret = pg_strdup(buf->data);
     459          82 :     destroyPQExpBuffer(buf);
     460             : 
     461          82 :     return ret;
     462             : }
     463             : 
     464             : /*
     465             :  * Store publication and subscription information.
     466             :  *
     467             :  * If publication, replication slot and subscription names were specified,
     468             :  * store it here. Otherwise, a generated name will be assigned to the object in
     469             :  * setup_publisher().
     470             :  */
     471             : static struct LogicalRepInfo *
     472          20 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     473             :                    const char *pub_base_conninfo,
     474             :                    const char *sub_base_conninfo)
     475             : {
     476             :     struct LogicalRepInfo *dbinfo;
     477          20 :     SimpleStringListCell *pubcell = NULL;
     478          20 :     SimpleStringListCell *subcell = NULL;
     479          20 :     SimpleStringListCell *replslotcell = NULL;
     480          20 :     int         i = 0;
     481             : 
     482          20 :     dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
     483             : 
     484          20 :     if (num_pubs > 0)
     485           4 :         pubcell = opt->pub_names.head;
     486          20 :     if (num_subs > 0)
     487           2 :         subcell = opt->sub_names.head;
     488          20 :     if (num_replslots > 0)
     489           4 :         replslotcell = opt->replslot_names.head;
     490             : 
     491          60 :     for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
     492             :     {
     493             :         char       *conninfo;
     494             : 
     495             :         /* Fill publisher attributes */
     496          40 :         conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
     497          40 :         dbinfo[i].pubconninfo = conninfo;
     498          40 :         dbinfo[i].dbname = cell->val;
     499          40 :         if (num_pubs > 0)
     500           8 :             dbinfo[i].pubname = pubcell->val;
     501             :         else
     502          32 :             dbinfo[i].pubname = NULL;
     503          40 :         if (num_replslots > 0)
     504           6 :             dbinfo[i].replslotname = replslotcell->val;
     505             :         else
     506          34 :             dbinfo[i].replslotname = NULL;
     507          40 :         dbinfo[i].made_replslot = false;
     508          40 :         dbinfo[i].made_publication = false;
     509             :         /* Fill subscriber attributes */
     510          40 :         conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
     511          40 :         dbinfo[i].subconninfo = conninfo;
     512          40 :         if (num_subs > 0)
     513           4 :             dbinfo[i].subname = subcell->val;
     514             :         else
     515          36 :             dbinfo[i].subname = NULL;
     516             :         /* Other fields will be filled later */
     517             : 
     518          40 :         pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
     519             :                      dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
     520             :                      dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
     521             :                      dbinfo[i].pubconninfo);
     522          40 :         pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
     523             :                      dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
     524             :                      dbinfo[i].subconninfo,
     525             :                      dbinfos.two_phase ? "true" : "false");
     526             : 
     527          40 :         if (num_pubs > 0)
     528           8 :             pubcell = pubcell->next;
     529          40 :         if (num_subs > 0)
     530           4 :             subcell = subcell->next;
     531          40 :         if (num_replslots > 0)
     532           6 :             replslotcell = replslotcell->next;
     533             : 
     534          40 :         i++;
     535             :     }
     536             : 
     537          20 :     return dbinfo;
     538             : }
     539             : 
     540             : /*
     541             :  * Open a new connection. If exit_on_error is true, it has an undesired
     542             :  * condition and it should exit immediately.
     543             :  */
     544             : static PGconn *
     545         114 : connect_database(const char *conninfo, bool exit_on_error)
     546             : {
     547             :     PGconn     *conn;
     548             :     PGresult   *res;
     549             : 
     550         114 :     conn = PQconnectdb(conninfo);
     551         114 :     if (PQstatus(conn) != CONNECTION_OK)
     552             :     {
     553           0 :         pg_log_error("connection to database failed: %s",
     554             :                      PQerrorMessage(conn));
     555           0 :         PQfinish(conn);
     556             : 
     557           0 :         if (exit_on_error)
     558           0 :             exit(1);
     559           0 :         return NULL;
     560             :     }
     561             : 
     562             :     /* Secure search_path */
     563         114 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     564         114 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     565             :     {
     566           0 :         pg_log_error("could not clear \"search_path\": %s",
     567             :                      PQresultErrorMessage(res));
     568           0 :         PQclear(res);
     569           0 :         PQfinish(conn);
     570             : 
     571           0 :         if (exit_on_error)
     572           0 :             exit(1);
     573           0 :         return NULL;
     574             :     }
     575         114 :     PQclear(res);
     576             : 
     577         114 :     return conn;
     578             : }
     579             : 
     580             : /*
     581             :  * Close the connection. If exit_on_error is true, it has an undesired
     582             :  * condition and it should exit immediately.
     583             :  */
     584             : static void
     585         114 : disconnect_database(PGconn *conn, bool exit_on_error)
     586             : {
     587             :     Assert(conn != NULL);
     588             : 
     589         114 :     PQfinish(conn);
     590             : 
     591         114 :     if (exit_on_error)
     592           4 :         exit(1);
     593         110 : }
     594             : 
     595             : /*
     596             :  * Obtain the system identifier using the provided connection. It will be used
     597             :  * to compare if a data directory is a clone of another one.
     598             :  */
     599             : static uint64
     600          20 : get_primary_sysid(const char *conninfo)
     601             : {
     602             :     PGconn     *conn;
     603             :     PGresult   *res;
     604             :     uint64      sysid;
     605             : 
     606          20 :     pg_log_info("getting system identifier from publisher");
     607             : 
     608          20 :     conn = connect_database(conninfo, true);
     609             : 
     610          20 :     res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
     611          20 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     612             :     {
     613           0 :         pg_log_error("could not get system identifier: %s",
     614             :                      PQresultErrorMessage(res));
     615           0 :         disconnect_database(conn, true);
     616             :     }
     617          20 :     if (PQntuples(res) != 1)
     618             :     {
     619           0 :         pg_log_error("could not get system identifier: got %d rows, expected %d row",
     620             :                      PQntuples(res), 1);
     621           0 :         disconnect_database(conn, true);
     622             :     }
     623             : 
     624          20 :     sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
     625             : 
     626          20 :     pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
     627             : 
     628          20 :     PQclear(res);
     629          20 :     disconnect_database(conn, false);
     630             : 
     631          20 :     return sysid;
     632             : }
     633             : 
     634             : /*
     635             :  * Obtain the system identifier from control file. It will be used to compare
     636             :  * if a data directory is a clone of another one. This routine is used locally
     637             :  * and avoids a connection.
     638             :  */
     639             : static uint64
     640          20 : get_standby_sysid(const char *datadir)
     641             : {
     642             :     ControlFileData *cf;
     643             :     bool        crc_ok;
     644             :     uint64      sysid;
     645             : 
     646          20 :     pg_log_info("getting system identifier from subscriber");
     647             : 
     648          20 :     cf = get_controlfile(datadir, &crc_ok);
     649          20 :     if (!crc_ok)
     650           0 :         pg_fatal("control file appears to be corrupt");
     651             : 
     652          20 :     sysid = cf->system_identifier;
     653             : 
     654          20 :     pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
     655             : 
     656          20 :     pg_free(cf);
     657             : 
     658          20 :     return sysid;
     659             : }
     660             : 
     661             : /*
     662             :  * Modify the system identifier. Since a standby server preserves the system
     663             :  * identifier, it makes sense to change it to avoid situations in which WAL
     664             :  * files from one of the systems might be used in the other one.
     665             :  */
     666             : static void
     667           8 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
     668             : {
     669             :     ControlFileData *cf;
     670             :     bool        crc_ok;
     671             :     struct timeval tv;
     672             : 
     673             :     char       *cmd_str;
     674             : 
     675           8 :     pg_log_info("modifying system identifier of subscriber");
     676             : 
     677           8 :     cf = get_controlfile(subscriber_dir, &crc_ok);
     678           8 :     if (!crc_ok)
     679           0 :         pg_fatal("control file appears to be corrupt");
     680             : 
     681             :     /*
     682             :      * Select a new system identifier.
     683             :      *
     684             :      * XXX this code was extracted from BootStrapXLOG().
     685             :      */
     686           8 :     gettimeofday(&tv, NULL);
     687           8 :     cf->system_identifier = ((uint64) tv.tv_sec) << 32;
     688           8 :     cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
     689           8 :     cf->system_identifier |= getpid() & 0xFFF;
     690             : 
     691           8 :     if (dry_run)
     692           6 :         pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
     693             :                     cf->system_identifier);
     694             :     else
     695             :     {
     696           2 :         update_controlfile(subscriber_dir, cf, true);
     697           2 :         pg_log_info("system identifier is %" PRIu64 " on subscriber",
     698             :                     cf->system_identifier);
     699             :     }
     700             : 
     701           8 :     if (dry_run)
     702           6 :         pg_log_info("dry-run: would run pg_resetwal on the subscriber");
     703             :     else
     704           2 :         pg_log_info("running pg_resetwal on the subscriber");
     705             : 
     706           8 :     cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
     707             :                        subscriber_dir, DEVNULL);
     708             : 
     709           8 :     pg_log_debug("pg_resetwal command is: %s", cmd_str);
     710             : 
     711           8 :     if (!dry_run)
     712             :     {
     713           2 :         int         rc = system(cmd_str);
     714             : 
     715           2 :         if (rc == 0)
     716           2 :             pg_log_info("subscriber successfully reset WAL on the subscriber");
     717             :         else
     718           0 :             pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
     719             :     }
     720             : 
     721           8 :     pg_free(cf);
     722           8 : }
     723             : 
     724             : /*
     725             :  * Generate an object name using a prefix, database oid and a random integer.
     726             :  * It is used in case the user does not specify an object name (publication,
     727             :  * subscription, replication slot).
     728             :  */
     729             : static char *
     730          16 : generate_object_name(PGconn *conn)
     731             : {
     732             :     PGresult   *res;
     733             :     Oid         oid;
     734             :     uint32      rand;
     735             :     char       *objname;
     736             : 
     737          16 :     res = PQexec(conn,
     738             :                  "SELECT oid FROM pg_catalog.pg_database "
     739             :                  "WHERE datname = pg_catalog.current_database()");
     740          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     741             :     {
     742           0 :         pg_log_error("could not obtain database OID: %s",
     743             :                      PQresultErrorMessage(res));
     744           0 :         disconnect_database(conn, true);
     745             :     }
     746             : 
     747          16 :     if (PQntuples(res) != 1)
     748             :     {
     749           0 :         pg_log_error("could not obtain database OID: got %d rows, expected %d row",
     750             :                      PQntuples(res), 1);
     751           0 :         disconnect_database(conn, true);
     752             :     }
     753             : 
     754             :     /* Database OID */
     755          16 :     oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
     756             : 
     757          16 :     PQclear(res);
     758             : 
     759             :     /* Random unsigned integer */
     760          16 :     rand = pg_prng_uint32(&prng_state);
     761             : 
     762             :     /*
     763             :      * Build the object name. The name must not exceed NAMEDATALEN - 1. This
     764             :      * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
     765             :      * '\0').
     766             :      */
     767          16 :     objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
     768             : 
     769          16 :     return objname;
     770             : }
     771             : 
     772             : /*
     773             :  * Create the publications and replication slots in preparation for logical
     774             :  * replication. Returns the LSN from latest replication slot. It will be the
     775             :  * replication start point that is used to adjust the subscriptions (see
     776             :  * set_replication_progress).
     777             :  */
     778             : static char *
     779           8 : setup_publisher(struct LogicalRepInfo *dbinfo)
     780             : {
     781           8 :     char       *lsn = NULL;
     782             : 
     783           8 :     pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
     784             : 
     785          24 :     for (int i = 0; i < num_dbs; i++)
     786             :     {
     787             :         PGconn     *conn;
     788          16 :         char       *genname = NULL;
     789             : 
     790          16 :         conn = connect_database(dbinfo[i].pubconninfo, true);
     791             : 
     792             :         /*
     793             :          * If an object name was not specified as command-line options, assign
     794             :          * a generated object name. The replication slot has a different rule.
     795             :          * The subscription name is assigned to the replication slot name if
     796             :          * no replication slot is specified. It follows the same rule as
     797             :          * CREATE SUBSCRIPTION.
     798             :          */
     799          16 :         if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
     800          16 :             genname = generate_object_name(conn);
     801          16 :         if (num_pubs == 0)
     802           8 :             dbinfo[i].pubname = pg_strdup(genname);
     803          16 :         if (num_subs == 0)
     804          12 :             dbinfo[i].subname = pg_strdup(genname);
     805          16 :         if (num_replslots == 0)
     806          10 :             dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
     807             : 
     808             :         /*
     809             :          * Create publication on publisher. This step should be executed
     810             :          * *before* promoting the subscriber to avoid any transactions between
     811             :          * consistent LSN and the new publication rows (such transactions
     812             :          * wouldn't see the new publication rows resulting in an error).
     813             :          */
     814          16 :         create_publication(conn, &dbinfo[i]);
     815             : 
     816             :         /* Create replication slot on publisher */
     817          16 :         if (lsn)
     818           2 :             pg_free(lsn);
     819          16 :         lsn = create_logical_replication_slot(conn, &dbinfo[i]);
     820          16 :         if (lsn == NULL && !dry_run)
     821           0 :             exit(1);
     822             : 
     823             :         /*
     824             :          * Since we are using the LSN returned by the last replication slot as
     825             :          * recovery_target_lsn, this LSN is ahead of the current WAL position
     826             :          * and the recovery waits until the publisher writes a WAL record to
     827             :          * reach the target and ends the recovery. On idle systems, this wait
     828             :          * time is unpredictable and could lead to failure in promoting the
     829             :          * subscriber. To avoid that, insert a harmless WAL record.
     830             :          */
     831          16 :         if (i == num_dbs - 1 && !dry_run)
     832             :         {
     833             :             PGresult   *res;
     834             : 
     835           2 :             res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
     836           2 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     837             :             {
     838           0 :                 pg_log_error("could not write an additional WAL record: %s",
     839             :                              PQresultErrorMessage(res));
     840           0 :                 disconnect_database(conn, true);
     841             :             }
     842           2 :             PQclear(res);
     843             :         }
     844             : 
     845          16 :         disconnect_database(conn, false);
     846             :     }
     847             : 
     848           8 :     return lsn;
     849             : }
     850             : 
     851             : /*
     852             :  * Is recovery still in progress?
     853             :  */
     854             : static bool
     855          30 : server_is_in_recovery(PGconn *conn)
     856             : {
     857             :     PGresult   *res;
     858             :     int         ret;
     859             : 
     860          30 :     res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
     861             : 
     862          30 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     863             :     {
     864           0 :         pg_log_error("could not obtain recovery progress: %s",
     865             :                      PQresultErrorMessage(res));
     866           0 :         disconnect_database(conn, true);
     867             :     }
     868             : 
     869             : 
     870          30 :     ret = strcmp("t", PQgetvalue(res, 0, 0));
     871             : 
     872          30 :     PQclear(res);
     873             : 
     874          30 :     return ret == 0;
     875             : }
     876             : 
     877             : /*
     878             :  * Is the primary server ready for logical replication?
     879             :  *
     880             :  * XXX Does it not allow a synchronous replica?
     881             :  */
     882             : static void
     883          12 : check_publisher(const struct LogicalRepInfo *dbinfo)
     884             : {
     885             :     PGconn     *conn;
     886             :     PGresult   *res;
     887          12 :     bool        failed = false;
     888             : 
     889             :     char       *wal_level;
     890             :     int         max_repslots;
     891             :     int         cur_repslots;
     892             :     int         max_walsenders;
     893             :     int         cur_walsenders;
     894             :     int         max_prepared_transactions;
     895             :     char       *max_slot_wal_keep_size;
     896             : 
     897          12 :     pg_log_info("checking settings on publisher");
     898             : 
     899          12 :     conn = connect_database(dbinfo[0].pubconninfo, true);
     900             : 
     901             :     /*
     902             :      * If the primary server is in recovery (i.e. cascading replication),
     903             :      * objects (publication) cannot be created because it is read only.
     904             :      */
     905          12 :     if (server_is_in_recovery(conn))
     906             :     {
     907           2 :         pg_log_error("primary server cannot be in recovery");
     908           2 :         disconnect_database(conn, true);
     909             :     }
     910             : 
     911             :     /*------------------------------------------------------------------------
     912             :      * Logical replication requires a few parameters to be set on publisher.
     913             :      * Since these parameters are not a requirement for physical replication,
     914             :      * we should check it to make sure it won't fail.
     915             :      *
     916             :      * - wal_level = logical
     917             :      * - max_replication_slots >= current + number of dbs to be converted
     918             :      * - max_wal_senders >= current + number of dbs to be converted
     919             :      * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
     920             :      * -----------------------------------------------------------------------
     921             :      */
     922          10 :     res = PQexec(conn,
     923             :                  "SELECT pg_catalog.current_setting('wal_level'),"
     924             :                  " pg_catalog.current_setting('max_replication_slots'),"
     925             :                  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
     926             :                  " pg_catalog.current_setting('max_wal_senders'),"
     927             :                  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
     928             :                  " pg_catalog.current_setting('max_prepared_transactions'),"
     929             :                  " pg_catalog.current_setting('max_slot_wal_keep_size')");
     930             : 
     931          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     932             :     {
     933           0 :         pg_log_error("could not obtain publisher settings: %s",
     934             :                      PQresultErrorMessage(res));
     935           0 :         disconnect_database(conn, true);
     936             :     }
     937             : 
     938          10 :     wal_level = pg_strdup(PQgetvalue(res, 0, 0));
     939          10 :     max_repslots = atoi(PQgetvalue(res, 0, 1));
     940          10 :     cur_repslots = atoi(PQgetvalue(res, 0, 2));
     941          10 :     max_walsenders = atoi(PQgetvalue(res, 0, 3));
     942          10 :     cur_walsenders = atoi(PQgetvalue(res, 0, 4));
     943          10 :     max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
     944          10 :     max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
     945             : 
     946          10 :     PQclear(res);
     947             : 
     948          10 :     pg_log_debug("publisher: wal_level: %s", wal_level);
     949          10 :     pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
     950          10 :     pg_log_debug("publisher: current replication slots: %d", cur_repslots);
     951          10 :     pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
     952          10 :     pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
     953          10 :     pg_log_debug("publisher: max_prepared_transactions: %d",
     954             :                  max_prepared_transactions);
     955          10 :     pg_log_debug("publisher: max_slot_wal_keep_size: %s",
     956             :                  max_slot_wal_keep_size);
     957             : 
     958          10 :     disconnect_database(conn, false);
     959             : 
     960          10 :     if (strcmp(wal_level, "logical") != 0)
     961             :     {
     962           2 :         pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
     963           2 :         failed = true;
     964             :     }
     965             : 
     966          10 :     if (max_repslots - cur_repslots < num_dbs)
     967             :     {
     968           2 :         pg_log_error("publisher requires %d replication slots, but only %d remain",
     969             :                      num_dbs, max_repslots - cur_repslots);
     970           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
     971             :                           "max_replication_slots", cur_repslots + num_dbs);
     972           2 :         failed = true;
     973             :     }
     974             : 
     975          10 :     if (max_walsenders - cur_walsenders < num_dbs)
     976             :     {
     977           2 :         pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
     978             :                      num_dbs, max_walsenders - cur_walsenders);
     979           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
     980             :                           "max_wal_senders", cur_walsenders + num_dbs);
     981           2 :         failed = true;
     982             :     }
     983             : 
     984          10 :     if (max_prepared_transactions != 0 && !dbinfos.two_phase)
     985             :     {
     986           0 :         pg_log_warning("two_phase option will not be enabled for replication slots");
     987           0 :         pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
     988             :                               "Prepared transactions will be replicated at COMMIT PREPARED.");
     989           0 :         pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
     990             :     }
     991             : 
     992             :     /*
     993             :      * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
     994             :      * is set to a non-default value, it may cause replication failures due to
     995             :      * required WAL files being prematurely removed.
     996             :      */
     997          10 :     if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
     998             :     {
     999           0 :         pg_log_warning("required WAL could be removed from the publisher");
    1000           0 :         pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
    1001             :                             "max_slot_wal_keep_size");
    1002             :     }
    1003             : 
    1004          10 :     pg_free(wal_level);
    1005             : 
    1006          10 :     if (failed)
    1007           2 :         exit(1);
    1008           8 : }
    1009             : 
    1010             : /*
    1011             :  * Is the standby server ready for logical replication?
    1012             :  *
    1013             :  * XXX Does it not allow a time-delayed replica?
    1014             :  *
    1015             :  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
    1016             :  * is S, it cannot detect there is a replica (server C) because server S starts
    1017             :  * accepting only local connections and server C cannot connect to it. Hence,
    1018             :  * there is not a reliable way to provide a suitable error saying the server C
    1019             :  * will be broken at the end of this process (due to pg_resetwal).
    1020             :  */
    1021             : static void
    1022          16 : check_subscriber(const struct LogicalRepInfo *dbinfo)
    1023             : {
    1024             :     PGconn     *conn;
    1025             :     PGresult   *res;
    1026          16 :     bool        failed = false;
    1027             : 
    1028             :     int         max_lrworkers;
    1029             :     int         max_reporigins;
    1030             :     int         max_wprocs;
    1031             : 
    1032          16 :     pg_log_info("checking settings on subscriber");
    1033             : 
    1034          16 :     conn = connect_database(dbinfo[0].subconninfo, true);
    1035             : 
    1036             :     /* The target server must be a standby */
    1037          16 :     if (!server_is_in_recovery(conn))
    1038             :     {
    1039           2 :         pg_log_error("target server must be a standby");
    1040           2 :         disconnect_database(conn, true);
    1041             :     }
    1042             : 
    1043             :     /*------------------------------------------------------------------------
    1044             :      * Logical replication requires a few parameters to be set on subscriber.
    1045             :      * Since these parameters are not a requirement for physical replication,
    1046             :      * we should check it to make sure it won't fail.
    1047             :      *
    1048             :      * - max_active_replication_origins >= number of dbs to be converted
    1049             :      * - max_logical_replication_workers >= number of dbs to be converted
    1050             :      * - max_worker_processes >= 1 + number of dbs to be converted
    1051             :      *------------------------------------------------------------------------
    1052             :      */
    1053          14 :     res = PQexec(conn,
    1054             :                  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
    1055             :                  "'max_logical_replication_workers', "
    1056             :                  "'max_active_replication_origins', "
    1057             :                  "'max_worker_processes', "
    1058             :                  "'primary_slot_name') "
    1059             :                  "ORDER BY name");
    1060             : 
    1061          14 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1062             :     {
    1063           0 :         pg_log_error("could not obtain subscriber settings: %s",
    1064             :                      PQresultErrorMessage(res));
    1065           0 :         disconnect_database(conn, true);
    1066             :     }
    1067             : 
    1068          14 :     max_reporigins = atoi(PQgetvalue(res, 0, 0));
    1069          14 :     max_lrworkers = atoi(PQgetvalue(res, 1, 0));
    1070          14 :     max_wprocs = atoi(PQgetvalue(res, 2, 0));
    1071          14 :     if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
    1072          12 :         primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
    1073             : 
    1074          14 :     pg_log_debug("subscriber: max_logical_replication_workers: %d",
    1075             :                  max_lrworkers);
    1076          14 :     pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
    1077          14 :     pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
    1078          14 :     if (primary_slot_name)
    1079          12 :         pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
    1080             : 
    1081          14 :     PQclear(res);
    1082             : 
    1083          14 :     disconnect_database(conn, false);
    1084             : 
    1085          14 :     if (max_reporigins < num_dbs)
    1086             :     {
    1087           2 :         pg_log_error("subscriber requires %d active replication origins, but only %d remain",
    1088             :                      num_dbs, max_reporigins);
    1089           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1090             :                           "max_active_replication_origins", num_dbs);
    1091           2 :         failed = true;
    1092             :     }
    1093             : 
    1094          14 :     if (max_lrworkers < num_dbs)
    1095             :     {
    1096           2 :         pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
    1097             :                      num_dbs, max_lrworkers);
    1098           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1099             :                           "max_logical_replication_workers", num_dbs);
    1100           2 :         failed = true;
    1101             :     }
    1102             : 
    1103          14 :     if (max_wprocs < num_dbs + 1)
    1104             :     {
    1105           2 :         pg_log_error("subscriber requires %d worker processes, but only %d remain",
    1106             :                      num_dbs + 1, max_wprocs);
    1107           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1108             :                           "max_worker_processes", num_dbs + 1);
    1109           2 :         failed = true;
    1110             :     }
    1111             : 
    1112          14 :     if (failed)
    1113           2 :         exit(1);
    1114          12 : }
    1115             : 
    1116             : /*
    1117             :  * Drop a specified subscription. This is to avoid duplicate subscriptions on
    1118             :  * the primary (publisher node) and the newly created subscriber. We
    1119             :  * shouldn't drop the associated slot as that would be used by the publisher
    1120             :  * node.
    1121             :  */
    1122             : static void
    1123           8 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
    1124             : {
    1125           8 :     PQExpBuffer query = createPQExpBuffer();
    1126             :     PGresult   *res;
    1127             : 
    1128             :     Assert(conn != NULL);
    1129             : 
    1130             :     /*
    1131             :      * Construct a query string. These commands are allowed to be executed
    1132             :      * within a transaction.
    1133             :      */
    1134           8 :     appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
    1135             :                       subname);
    1136           8 :     appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
    1137             :                       subname);
    1138           8 :     appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
    1139             : 
    1140           8 :     if (dry_run)
    1141           6 :         pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
    1142             :                     subname, dbname);
    1143             :     else
    1144             :     {
    1145           2 :         pg_log_info("dropping subscription \"%s\" in database \"%s\"",
    1146             :                     subname, dbname);
    1147             : 
    1148           2 :         res = PQexec(conn, query->data);
    1149             : 
    1150           2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1151             :         {
    1152           0 :             pg_log_error("could not drop subscription \"%s\": %s",
    1153             :                          subname, PQresultErrorMessage(res));
    1154           0 :             disconnect_database(conn, true);
    1155             :         }
    1156             : 
    1157           2 :         PQclear(res);
    1158             :     }
    1159             : 
    1160           8 :     destroyPQExpBuffer(query);
    1161           8 : }
    1162             : 
    1163             : /*
    1164             :  * Retrieve and drop the pre-existing subscriptions.
    1165             :  */
    1166             : static void
    1167          16 : check_and_drop_existing_subscriptions(PGconn *conn,
    1168             :                                       const struct LogicalRepInfo *dbinfo)
    1169             : {
    1170          16 :     PQExpBuffer query = createPQExpBuffer();
    1171             :     char       *dbname;
    1172             :     PGresult   *res;
    1173             : 
    1174             :     Assert(conn != NULL);
    1175             : 
    1176          16 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1177             : 
    1178          16 :     appendPQExpBuffer(query,
    1179             :                       "SELECT s.subname FROM pg_catalog.pg_subscription s "
    1180             :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1181             :                       "WHERE d.datname = %s",
    1182             :                       dbname);
    1183          16 :     res = PQexec(conn, query->data);
    1184             : 
    1185          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1186             :     {
    1187           0 :         pg_log_error("could not obtain pre-existing subscriptions: %s",
    1188             :                      PQresultErrorMessage(res));
    1189           0 :         disconnect_database(conn, true);
    1190             :     }
    1191             : 
    1192          24 :     for (int i = 0; i < PQntuples(res); i++)
    1193           8 :         drop_existing_subscription(conn, PQgetvalue(res, i, 0),
    1194           8 :                                    dbinfo->dbname);
    1195             : 
    1196          16 :     PQclear(res);
    1197          16 :     destroyPQExpBuffer(query);
    1198          16 :     PQfreemem(dbname);
    1199          16 : }
    1200             : 
    1201             : /*
    1202             :  * Create the subscriptions, adjust the initial location for logical
    1203             :  * replication and enable the subscriptions. That's the last step for logical
    1204             :  * replication setup.
    1205             :  */
    1206             : static void
    1207           8 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
    1208             : {
    1209          24 :     for (int i = 0; i < num_dbs; i++)
    1210             :     {
    1211             :         PGconn     *conn;
    1212             : 
    1213             :         /* Connect to subscriber. */
    1214          16 :         conn = connect_database(dbinfo[i].subconninfo, true);
    1215             : 
    1216             :         /*
    1217             :          * We don't need the pre-existing subscriptions on the newly formed
    1218             :          * subscriber. They can connect to other publisher nodes and either
    1219             :          * get some unwarranted data or can lead to ERRORs in connecting to
    1220             :          * such nodes.
    1221             :          */
    1222          16 :         check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
    1223             : 
    1224             :         /* Check and drop the required publications in the given database. */
    1225          16 :         check_and_drop_publications(conn, &dbinfo[i]);
    1226             : 
    1227          16 :         create_subscription(conn, &dbinfo[i]);
    1228             : 
    1229             :         /* Set the replication progress to the correct LSN */
    1230          16 :         set_replication_progress(conn, &dbinfo[i], consistent_lsn);
    1231             : 
    1232             :         /* Enable subscription */
    1233          16 :         enable_subscription(conn, &dbinfo[i]);
    1234             : 
    1235          16 :         disconnect_database(conn, false);
    1236             :     }
    1237           8 : }
    1238             : 
    1239             : /*
    1240             :  * Write the required recovery parameters.
    1241             :  */
    1242             : static void
    1243           8 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
    1244             : {
    1245             :     PGconn     *conn;
    1246             :     PQExpBuffer recoveryconfcontents;
    1247             : 
    1248             :     /*
    1249             :      * Despite of the recovery parameters will be written to the subscriber,
    1250             :      * use a publisher connection. The primary_conninfo is generated using the
    1251             :      * connection settings.
    1252             :      */
    1253           8 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1254             : 
    1255             :     /*
    1256             :      * Write recovery parameters.
    1257             :      *
    1258             :      * The subscriber is not running yet. In dry run mode, the recovery
    1259             :      * parameters *won't* be written. An invalid LSN is used for printing
    1260             :      * purposes. Additional recovery parameters are added here. It avoids
    1261             :      * unexpected behavior such as end of recovery as soon as a consistent
    1262             :      * state is reached (recovery_target) and failure due to multiple recovery
    1263             :      * targets (name, time, xid, LSN).
    1264             :      */
    1265           8 :     recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
    1266           8 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
    1267           8 :     appendPQExpBufferStr(recoveryconfcontents,
    1268             :                          "recovery_target_timeline = 'latest'\n");
    1269             : 
    1270             :     /*
    1271             :      * Set recovery_target_inclusive = false to avoid reapplying the
    1272             :      * transaction committed at 'lsn' after subscription is enabled. This is
    1273             :      * because the provided 'lsn' is also used as the replication start point
    1274             :      * for the subscription. So, the server can send the transaction committed
    1275             :      * at that 'lsn' after replication is started which can lead to applying
    1276             :      * the same transaction twice if we keep recovery_target_inclusive = true.
    1277             :      */
    1278           8 :     appendPQExpBufferStr(recoveryconfcontents,
    1279             :                          "recovery_target_inclusive = false\n");
    1280           8 :     appendPQExpBufferStr(recoveryconfcontents,
    1281             :                          "recovery_target_action = promote\n");
    1282           8 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
    1283           8 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
    1284           8 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
    1285             : 
    1286           8 :     if (dry_run)
    1287             :     {
    1288           6 :         appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
    1289           6 :         appendPQExpBuffer(recoveryconfcontents,
    1290             :                           "recovery_target_lsn = '%X/%08X'\n",
    1291           6 :                           LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1292             :     }
    1293             :     else
    1294             :     {
    1295           2 :         appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
    1296             :                           lsn);
    1297           2 :         WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
    1298             :     }
    1299           8 :     disconnect_database(conn, false);
    1300             : 
    1301           8 :     pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
    1302           8 : }
    1303             : 
    1304             : /*
    1305             :  * Drop physical replication slot on primary if the standby was using it. After
    1306             :  * the transformation, it has no use.
    1307             :  *
    1308             :  * XXX we might not fail here. Instead, we provide a warning so the user
    1309             :  * eventually drops this replication slot later.
    1310             :  */
    1311             : static void
    1312           8 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
    1313             : {
    1314             :     PGconn     *conn;
    1315             : 
    1316             :     /* Replication slot does not exist, do nothing */
    1317           8 :     if (!primary_slot_name)
    1318           0 :         return;
    1319             : 
    1320           8 :     conn = connect_database(dbinfo[0].pubconninfo, false);
    1321           8 :     if (conn != NULL)
    1322             :     {
    1323           8 :         drop_replication_slot(conn, &dbinfo[0], slotname);
    1324           8 :         disconnect_database(conn, false);
    1325             :     }
    1326             :     else
    1327             :     {
    1328           0 :         pg_log_warning("could not drop replication slot \"%s\" on primary",
    1329             :                        slotname);
    1330           0 :         pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
    1331             :     }
    1332             : }
    1333             : 
    1334             : /*
    1335             :  * Drop failover replication slots on subscriber. After the transformation,
    1336             :  * they have no use.
    1337             :  *
    1338             :  * XXX We do not fail here. Instead, we provide a warning so the user can drop
    1339             :  * them later.
    1340             :  */
    1341             : static void
    1342           8 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
    1343             : {
    1344             :     PGconn     *conn;
    1345             :     PGresult   *res;
    1346             : 
    1347           8 :     conn = connect_database(dbinfo[0].subconninfo, false);
    1348           8 :     if (conn != NULL)
    1349             :     {
    1350             :         /* Get failover replication slot names */
    1351           8 :         res = PQexec(conn,
    1352             :                      "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
    1353             : 
    1354           8 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
    1355             :         {
    1356             :             /* Remove failover replication slots from subscriber */
    1357          16 :             for (int i = 0; i < PQntuples(res); i++)
    1358           8 :                 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
    1359             :         }
    1360             :         else
    1361             :         {
    1362           0 :             pg_log_warning("could not obtain failover replication slot information: %s",
    1363             :                            PQresultErrorMessage(res));
    1364           0 :             pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1365             :         }
    1366             : 
    1367           8 :         PQclear(res);
    1368           8 :         disconnect_database(conn, false);
    1369             :     }
    1370             :     else
    1371             :     {
    1372           0 :         pg_log_warning("could not drop failover replication slot");
    1373           0 :         pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1374             :     }
    1375           8 : }
    1376             : 
    1377             : /*
    1378             :  * Create a logical replication slot and returns a LSN.
    1379             :  *
    1380             :  * CreateReplicationSlot() is not used because it does not provide the one-row
    1381             :  * result set that contains the LSN.
    1382             :  */
    1383             : static char *
    1384          16 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1385             : {
    1386          16 :     PQExpBuffer str = createPQExpBuffer();
    1387          16 :     PGresult   *res = NULL;
    1388          16 :     const char *slot_name = dbinfo->replslotname;
    1389             :     char       *slot_name_esc;
    1390          16 :     char       *lsn = NULL;
    1391             : 
    1392             :     Assert(conn != NULL);
    1393             : 
    1394          16 :     if (dry_run)
    1395          12 :         pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
    1396             :                     slot_name, dbinfo->dbname);
    1397             :     else
    1398           4 :         pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
    1399             :                     slot_name, dbinfo->dbname);
    1400             : 
    1401          16 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1402             : 
    1403          16 :     appendPQExpBuffer(str,
    1404             :                       "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
    1405             :                       slot_name_esc,
    1406          16 :                       dbinfos.two_phase ? "true" : "false");
    1407             : 
    1408          16 :     PQfreemem(slot_name_esc);
    1409             : 
    1410          16 :     pg_log_debug("command is: %s", str->data);
    1411             : 
    1412          16 :     if (!dry_run)
    1413             :     {
    1414           4 :         res = PQexec(conn, str->data);
    1415           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1416             :         {
    1417           0 :             pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
    1418             :                          slot_name, dbinfo->dbname,
    1419             :                          PQresultErrorMessage(res));
    1420           0 :             PQclear(res);
    1421           0 :             destroyPQExpBuffer(str);
    1422           0 :             return NULL;
    1423             :         }
    1424             : 
    1425           4 :         lsn = pg_strdup(PQgetvalue(res, 0, 0));
    1426           4 :         PQclear(res);
    1427             :     }
    1428             : 
    1429             :     /* For cleanup purposes */
    1430          16 :     dbinfo->made_replslot = true;
    1431             : 
    1432          16 :     destroyPQExpBuffer(str);
    1433             : 
    1434          16 :     return lsn;
    1435             : }
    1436             : 
    1437             : static void
    1438          16 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
    1439             :                       const char *slot_name)
    1440             : {
    1441          16 :     PQExpBuffer str = createPQExpBuffer();
    1442             :     char       *slot_name_esc;
    1443             :     PGresult   *res;
    1444             : 
    1445             :     Assert(conn != NULL);
    1446             : 
    1447          16 :     if (dry_run)
    1448          12 :         pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
    1449             :                     slot_name, dbinfo->dbname);
    1450             :     else
    1451           4 :         pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
    1452             :                     slot_name, dbinfo->dbname);
    1453             : 
    1454          16 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1455             : 
    1456          16 :     appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
    1457             : 
    1458          16 :     PQfreemem(slot_name_esc);
    1459             : 
    1460          16 :     pg_log_debug("command is: %s", str->data);
    1461             : 
    1462          16 :     if (!dry_run)
    1463             :     {
    1464           4 :         res = PQexec(conn, str->data);
    1465           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1466             :         {
    1467           0 :             pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
    1468             :                          slot_name, dbinfo->dbname, PQresultErrorMessage(res));
    1469           0 :             dbinfo->made_replslot = false;   /* don't try again. */
    1470             :         }
    1471             : 
    1472           4 :         PQclear(res);
    1473             :     }
    1474             : 
    1475          16 :     destroyPQExpBuffer(str);
    1476          16 : }
    1477             : 
    1478             : /*
    1479             :  * Reports a suitable message if pg_ctl fails.
    1480             :  */
    1481             : static void
    1482          48 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
    1483             : {
    1484          48 :     if (rc != 0)
    1485             :     {
    1486           0 :         if (WIFEXITED(rc))
    1487             :         {
    1488           0 :             pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
    1489             :         }
    1490           0 :         else if (WIFSIGNALED(rc))
    1491             :         {
    1492             : #if defined(WIN32)
    1493             :             pg_log_error("pg_ctl was terminated by exception 0x%X",
    1494             :                          WTERMSIG(rc));
    1495             :             pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
    1496             : #else
    1497           0 :             pg_log_error("pg_ctl was terminated by signal %d: %s",
    1498             :                          WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
    1499             : #endif
    1500             :         }
    1501             :         else
    1502             :         {
    1503           0 :             pg_log_error("pg_ctl exited with unrecognized status %d", rc);
    1504             :         }
    1505             : 
    1506           0 :         pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
    1507           0 :         exit(1);
    1508             :     }
    1509          48 : }
    1510             : 
    1511             : static void
    1512          24 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
    1513             :                      bool restrict_logical_worker)
    1514             : {
    1515          24 :     PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    1516             :     int         rc;
    1517             : 
    1518          24 :     appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
    1519          24 :     appendShellString(pg_ctl_cmd, subscriber_dir);
    1520          24 :     appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
    1521             : 
    1522             :     /* Prevent unintended slot invalidation */
    1523          24 :     appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
    1524             : 
    1525          24 :     if (restricted_access)
    1526             :     {
    1527          24 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
    1528             : #if !defined(WIN32)
    1529             : 
    1530             :         /*
    1531             :          * An empty listen_addresses list means the server does not listen on
    1532             :          * any IP interfaces; only Unix-domain sockets can be used to connect
    1533             :          * to the server. Prevent external connections to minimize the chance
    1534             :          * of failure.
    1535             :          */
    1536          24 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
    1537          24 :         if (opt->socket_dir)
    1538          24 :             appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
    1539          24 :                               opt->socket_dir);
    1540          24 :         appendPQExpBufferChar(pg_ctl_cmd, '"');
    1541             : #endif
    1542             :     }
    1543          24 :     if (opt->config_file != NULL)
    1544           0 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
    1545           0 :                           opt->config_file);
    1546             : 
    1547             :     /* Suppress to start logical replication if requested */
    1548          24 :     if (restrict_logical_worker)
    1549           8 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
    1550             : 
    1551          24 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
    1552          24 :     rc = system(pg_ctl_cmd->data);
    1553          24 :     pg_ctl_status(pg_ctl_cmd->data, rc);
    1554          24 :     standby_running = true;
    1555          24 :     destroyPQExpBuffer(pg_ctl_cmd);
    1556          24 :     pg_log_info("server was started");
    1557          24 : }
    1558             : 
    1559             : static void
    1560          24 : stop_standby_server(const char *datadir)
    1561             : {
    1562             :     char       *pg_ctl_cmd;
    1563             :     int         rc;
    1564             : 
    1565          24 :     pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
    1566             :                           datadir);
    1567          24 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
    1568          24 :     rc = system(pg_ctl_cmd);
    1569          24 :     pg_ctl_status(pg_ctl_cmd, rc);
    1570          24 :     standby_running = false;
    1571          24 :     pg_log_info("server was stopped");
    1572          24 : }
    1573             : 
    1574             : /*
    1575             :  * Returns after the server finishes the recovery process.
    1576             :  *
    1577             :  * If recovery_timeout option is set, terminate abnormally without finishing
    1578             :  * the recovery process. By default, it waits forever.
    1579             :  *
    1580             :  * XXX Is the recovery process still in progress? When recovery process has a
    1581             :  * better progress reporting mechanism, it should be added here.
    1582             :  */
    1583             : static void
    1584           8 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
    1585             : {
    1586             :     PGconn     *conn;
    1587           8 :     int         status = POSTMASTER_STILL_STARTING;
    1588           8 :     int         timer = 0;
    1589             : 
    1590           8 :     pg_log_info("waiting for the target server to reach the consistent state");
    1591             : 
    1592           8 :     conn = connect_database(conninfo, true);
    1593             : 
    1594             :     for (;;)
    1595             :     {
    1596             :         /* Did the recovery process finish? We're done if so. */
    1597           8 :         if (dry_run || !server_is_in_recovery(conn))
    1598             :         {
    1599           8 :             status = POSTMASTER_READY;
    1600           8 :             recovery_ended = true;
    1601           8 :             break;
    1602             :         }
    1603             : 
    1604             :         /* Bail out after recovery_timeout seconds if this option is set */
    1605           0 :         if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
    1606             :         {
    1607           0 :             stop_standby_server(subscriber_dir);
    1608           0 :             pg_log_error("recovery timed out");
    1609           0 :             disconnect_database(conn, true);
    1610             :         }
    1611             : 
    1612             :         /* Keep waiting */
    1613           0 :         pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
    1614             : 
    1615           0 :         timer += WAIT_INTERVAL;
    1616             :     }
    1617             : 
    1618           8 :     disconnect_database(conn, false);
    1619             : 
    1620           8 :     if (status == POSTMASTER_STILL_STARTING)
    1621           0 :         pg_fatal("server did not end recovery");
    1622             : 
    1623           8 :     pg_log_info("target server reached the consistent state");
    1624           8 :     pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
    1625           8 : }
    1626             : 
    1627             : /*
    1628             :  * Create a publication that includes all tables in the database.
    1629             :  */
    1630             : static void
    1631          16 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1632             : {
    1633          16 :     PQExpBuffer str = createPQExpBuffer();
    1634             :     PGresult   *res;
    1635             :     char       *ipubname_esc;
    1636             :     char       *spubname_esc;
    1637             : 
    1638             :     Assert(conn != NULL);
    1639             : 
    1640          16 :     ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1641          16 :     spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1642             : 
    1643             :     /* Check if the publication already exists */
    1644          16 :     appendPQExpBuffer(str,
    1645             :                       "SELECT 1 FROM pg_catalog.pg_publication "
    1646             :                       "WHERE pubname = %s",
    1647             :                       spubname_esc);
    1648          16 :     res = PQexec(conn, str->data);
    1649          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1650             :     {
    1651           0 :         pg_log_error("could not obtain publication information: %s",
    1652             :                      PQresultErrorMessage(res));
    1653           0 :         disconnect_database(conn, true);
    1654             :     }
    1655             : 
    1656          16 :     if (PQntuples(res) == 1)
    1657             :     {
    1658             :         /*
    1659             :          * Unfortunately, if it reaches this code path, it will always fail
    1660             :          * (unless you decide to change the existing publication name). That's
    1661             :          * bad but it is very unlikely that the user will choose a name with
    1662             :          * pg_createsubscriber_ prefix followed by the exact database oid and
    1663             :          * a random number.
    1664             :          */
    1665           0 :         pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
    1666           0 :         pg_log_error_hint("Consider renaming this publication before continuing.");
    1667           0 :         disconnect_database(conn, true);
    1668             :     }
    1669             : 
    1670          16 :     PQclear(res);
    1671          16 :     resetPQExpBuffer(str);
    1672             : 
    1673          16 :     if (dry_run)
    1674          12 :         pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
    1675             :                     dbinfo->pubname, dbinfo->dbname);
    1676             :     else
    1677           4 :         pg_log_info("creating publication \"%s\" in database \"%s\"",
    1678             :                     dbinfo->pubname, dbinfo->dbname);
    1679             : 
    1680          16 :     appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
    1681             :                       ipubname_esc);
    1682             : 
    1683          16 :     pg_log_debug("command is: %s", str->data);
    1684             : 
    1685          16 :     if (!dry_run)
    1686             :     {
    1687           4 :         res = PQexec(conn, str->data);
    1688           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1689             :         {
    1690           0 :             pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
    1691             :                          dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1692           0 :             disconnect_database(conn, true);
    1693             :         }
    1694           4 :         PQclear(res);
    1695             :     }
    1696             : 
    1697             :     /* For cleanup purposes */
    1698          16 :     dbinfo->made_publication = true;
    1699             : 
    1700          16 :     PQfreemem(ipubname_esc);
    1701          16 :     PQfreemem(spubname_esc);
    1702          16 :     destroyPQExpBuffer(str);
    1703          16 : }
    1704             : 
    1705             : /*
    1706             :  * Drop the specified publication in the given database.
    1707             :  */
    1708             : static void
    1709          20 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
    1710             :                  bool *made_publication)
    1711             : {
    1712          20 :     PQExpBuffer str = createPQExpBuffer();
    1713             :     PGresult   *res;
    1714             :     char       *pubname_esc;
    1715             : 
    1716             :     Assert(conn != NULL);
    1717             : 
    1718          20 :     pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
    1719             : 
    1720          20 :     if (dry_run)
    1721          12 :         pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
    1722             :                     pubname, dbname);
    1723             :     else
    1724           8 :         pg_log_info("dropping publication \"%s\" in database \"%s\"",
    1725             :                     pubname, dbname);
    1726             : 
    1727          20 :     appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
    1728             : 
    1729          20 :     PQfreemem(pubname_esc);
    1730             : 
    1731          20 :     pg_log_debug("command is: %s", str->data);
    1732             : 
    1733          20 :     if (!dry_run)
    1734             :     {
    1735           8 :         res = PQexec(conn, str->data);
    1736           8 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1737             :         {
    1738           0 :             pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
    1739             :                          pubname, dbname, PQresultErrorMessage(res));
    1740           0 :             *made_publication = false;  /* don't try again. */
    1741             : 
    1742             :             /*
    1743             :              * Don't disconnect and exit here. This routine is used by primary
    1744             :              * (cleanup publication / replication slot due to an error) and
    1745             :              * subscriber (remove the replicated publications). In both cases,
    1746             :              * it can continue and provide instructions for the user to remove
    1747             :              * it later if cleanup fails.
    1748             :              */
    1749             :         }
    1750           8 :         PQclear(res);
    1751             :     }
    1752             : 
    1753          20 :     destroyPQExpBuffer(str);
    1754          20 : }
    1755             : 
    1756             : /*
    1757             :  * Retrieve and drop the publications.
    1758             :  *
    1759             :  * Since the publications were created before the consistent LSN, they
    1760             :  * remain on the subscriber even after the physical replica is
    1761             :  * promoted. Remove these publications from the subscriber because
    1762             :  * they have no use. Additionally, if requested, drop all pre-existing
    1763             :  * publications.
    1764             :  */
    1765             : static void
    1766          16 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1767             : {
    1768             :     PGresult   *res;
    1769          16 :     bool        drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
    1770             : 
    1771             :     Assert(conn != NULL);
    1772             : 
    1773          16 :     if (drop_all_pubs)
    1774             :     {
    1775           4 :         pg_log_info("dropping all existing publications in database \"%s\"",
    1776             :                     dbinfo->dbname);
    1777             : 
    1778             :         /* Fetch all publication names */
    1779           4 :         res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
    1780           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1781             :         {
    1782           0 :             pg_log_error("could not obtain publication information: %s",
    1783             :                          PQresultErrorMessage(res));
    1784           0 :             PQclear(res);
    1785           0 :             disconnect_database(conn, true);
    1786             :         }
    1787             : 
    1788             :         /* Drop each publication */
    1789          12 :         for (int i = 0; i < PQntuples(res); i++)
    1790           8 :             drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
    1791             :                              &dbinfo->made_publication);
    1792             : 
    1793           4 :         PQclear(res);
    1794             :     }
    1795             : 
    1796             :     /*
    1797             :      * In dry-run mode, we don't create publications, but we still try to drop
    1798             :      * those to provide necessary information to the user.
    1799             :      */
    1800          16 :     if (!drop_all_pubs || dry_run)
    1801          12 :         drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
    1802             :                          &dbinfo->made_publication);
    1803          16 : }
    1804             : 
    1805             : /*
    1806             :  * Create a subscription with some predefined options.
    1807             :  *
    1808             :  * A replication slot was already created in a previous step. Let's use it.  It
    1809             :  * is not required to copy data. The subscription will be created but it will
    1810             :  * not be enabled now. That's because the replication progress must be set and
    1811             :  * the replication origin name (one of the function arguments) contains the
    1812             :  * subscription OID in its name. Once the subscription is created,
    1813             :  * set_replication_progress() can obtain the chosen origin name and set up its
    1814             :  * initial location.
    1815             :  */
    1816             : static void
    1817          16 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1818             : {
    1819          16 :     PQExpBuffer str = createPQExpBuffer();
    1820             :     PGresult   *res;
    1821             :     char       *pubname_esc;
    1822             :     char       *subname_esc;
    1823             :     char       *pubconninfo_esc;
    1824             :     char       *replslotname_esc;
    1825             : 
    1826             :     Assert(conn != NULL);
    1827             : 
    1828          16 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1829          16 :     subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1830          16 :     pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
    1831          16 :     replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
    1832             : 
    1833          16 :     if (dry_run)
    1834          12 :         pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
    1835             :                     dbinfo->subname, dbinfo->dbname);
    1836             :     else
    1837           4 :         pg_log_info("creating subscription \"%s\" in database \"%s\"",
    1838             :                     dbinfo->subname, dbinfo->dbname);
    1839             : 
    1840          16 :     appendPQExpBuffer(str,
    1841             :                       "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
    1842             :                       "WITH (create_slot = false, enabled = false, "
    1843             :                       "slot_name = %s, copy_data = false, two_phase = %s)",
    1844             :                       subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
    1845          16 :                       dbinfos.two_phase ? "true" : "false");
    1846             : 
    1847          16 :     PQfreemem(pubname_esc);
    1848          16 :     PQfreemem(subname_esc);
    1849          16 :     PQfreemem(pubconninfo_esc);
    1850          16 :     PQfreemem(replslotname_esc);
    1851             : 
    1852          16 :     pg_log_debug("command is: %s", str->data);
    1853             : 
    1854          16 :     if (!dry_run)
    1855             :     {
    1856           4 :         res = PQexec(conn, str->data);
    1857           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1858             :         {
    1859           0 :             pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
    1860             :                          dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
    1861           0 :             disconnect_database(conn, true);
    1862             :         }
    1863           4 :         PQclear(res);
    1864             :     }
    1865             : 
    1866          16 :     destroyPQExpBuffer(str);
    1867          16 : }
    1868             : 
    1869             : /*
    1870             :  * Sets the replication progress to the consistent LSN.
    1871             :  *
    1872             :  * The subscriber caught up to the consistent LSN provided by the last
    1873             :  * replication slot that was created. The goal is to set up the initial
    1874             :  * location for the logical replication that is the exact LSN that the
    1875             :  * subscriber was promoted. Once the subscription is enabled it will start
    1876             :  * streaming from that location onwards.  In dry run mode, the subscription OID
    1877             :  * and LSN are set to invalid values for printing purposes.
    1878             :  */
    1879             : static void
    1880          16 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
    1881             : {
    1882          16 :     PQExpBuffer str = createPQExpBuffer();
    1883             :     PGresult   *res;
    1884             :     Oid         suboid;
    1885             :     char       *subname;
    1886             :     char       *dbname;
    1887             :     char       *originname;
    1888             :     char       *lsnstr;
    1889             : 
    1890             :     Assert(conn != NULL);
    1891             : 
    1892          16 :     subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
    1893          16 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1894             : 
    1895          16 :     appendPQExpBuffer(str,
    1896             :                       "SELECT s.oid FROM pg_catalog.pg_subscription s "
    1897             :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1898             :                       "WHERE s.subname = %s AND d.datname = %s",
    1899             :                       subname, dbname);
    1900             : 
    1901          16 :     res = PQexec(conn, str->data);
    1902          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1903             :     {
    1904           0 :         pg_log_error("could not obtain subscription OID: %s",
    1905             :                      PQresultErrorMessage(res));
    1906           0 :         disconnect_database(conn, true);
    1907             :     }
    1908             : 
    1909          16 :     if (PQntuples(res) != 1 && !dry_run)
    1910             :     {
    1911           0 :         pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
    1912             :                      PQntuples(res), 1);
    1913           0 :         disconnect_database(conn, true);
    1914             :     }
    1915             : 
    1916          16 :     if (dry_run)
    1917             :     {
    1918          12 :         suboid = InvalidOid;
    1919          12 :         lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1920             :     }
    1921             :     else
    1922             :     {
    1923           4 :         suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
    1924           4 :         lsnstr = psprintf("%s", lsn);
    1925             :     }
    1926             : 
    1927          16 :     PQclear(res);
    1928             : 
    1929             :     /*
    1930             :      * The origin name is defined as pg_%u. %u is the subscription OID. See
    1931             :      * ApplyWorkerMain().
    1932             :      */
    1933          16 :     originname = psprintf("pg_%u", suboid);
    1934             : 
    1935          16 :     if (dry_run)
    1936          12 :         pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    1937             :                     originname, lsnstr, dbinfo->dbname);
    1938             :     else
    1939           4 :         pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    1940             :                     originname, lsnstr, dbinfo->dbname);
    1941             : 
    1942          16 :     resetPQExpBuffer(str);
    1943          16 :     appendPQExpBuffer(str,
    1944             :                       "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
    1945             :                       originname, lsnstr);
    1946             : 
    1947          16 :     pg_log_debug("command is: %s", str->data);
    1948             : 
    1949          16 :     if (!dry_run)
    1950             :     {
    1951           4 :         res = PQexec(conn, str->data);
    1952           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1953             :         {
    1954           0 :             pg_log_error("could not set replication progress for subscription \"%s\": %s",
    1955             :                          dbinfo->subname, PQresultErrorMessage(res));
    1956           0 :             disconnect_database(conn, true);
    1957             :         }
    1958           4 :         PQclear(res);
    1959             :     }
    1960             : 
    1961          16 :     PQfreemem(subname);
    1962          16 :     PQfreemem(dbname);
    1963          16 :     pg_free(originname);
    1964          16 :     pg_free(lsnstr);
    1965          16 :     destroyPQExpBuffer(str);
    1966          16 : }
    1967             : 
    1968             : /*
    1969             :  * Enables the subscription.
    1970             :  *
    1971             :  * The subscription was created in a previous step but it was disabled. After
    1972             :  * adjusting the initial logical replication location, enable the subscription.
    1973             :  */
    1974             : static void
    1975          16 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1976             : {
    1977          16 :     PQExpBuffer str = createPQExpBuffer();
    1978             :     PGresult   *res;
    1979             :     char       *subname;
    1980             : 
    1981             :     Assert(conn != NULL);
    1982             : 
    1983          16 :     subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1984             : 
    1985          16 :     if (dry_run)
    1986          12 :         pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
    1987             :                     dbinfo->subname, dbinfo->dbname);
    1988             :     else
    1989           4 :         pg_log_info("enabling subscription \"%s\" in database \"%s\"",
    1990             :                     dbinfo->subname, dbinfo->dbname);
    1991             : 
    1992          16 :     appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
    1993             : 
    1994          16 :     pg_log_debug("command is: %s", str->data);
    1995             : 
    1996          16 :     if (!dry_run)
    1997             :     {
    1998           4 :         res = PQexec(conn, str->data);
    1999           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2000             :         {
    2001           0 :             pg_log_error("could not enable subscription \"%s\": %s",
    2002             :                          dbinfo->subname, PQresultErrorMessage(res));
    2003           0 :             disconnect_database(conn, true);
    2004             :         }
    2005             : 
    2006           4 :         PQclear(res);
    2007             :     }
    2008             : 
    2009          16 :     PQfreemem(subname);
    2010          16 :     destroyPQExpBuffer(str);
    2011          16 : }
    2012             : 
    2013             : /*
    2014             :  * Fetch a list of all connectable non-template databases from the source server
    2015             :  * and form a list such that they appear as if the user has specified multiple
    2016             :  * --database options, one for each source database.
    2017             :  */
    2018             : static void
    2019           2 : get_publisher_databases(struct CreateSubscriberOptions *opt,
    2020             :                         bool dbnamespecified)
    2021             : {
    2022             :     PGconn     *conn;
    2023             :     PGresult   *res;
    2024             : 
    2025             :     /* If a database name was specified, just connect to it. */
    2026           2 :     if (dbnamespecified)
    2027           0 :         conn = connect_database(opt->pub_conninfo_str, true);
    2028             :     else
    2029             :     {
    2030             :         /* Otherwise, try postgres first and then template1. */
    2031             :         char       *conninfo;
    2032             : 
    2033           2 :         conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
    2034           2 :         conn = connect_database(conninfo, false);
    2035           2 :         pg_free(conninfo);
    2036           2 :         if (!conn)
    2037             :         {
    2038           0 :             conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
    2039           0 :             conn = connect_database(conninfo, true);
    2040           0 :             pg_free(conninfo);
    2041             :         }
    2042             :     }
    2043             : 
    2044           2 :     res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
    2045           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2046             :     {
    2047           0 :         pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
    2048           0 :         PQclear(res);
    2049           0 :         disconnect_database(conn, true);
    2050             :     }
    2051             : 
    2052           8 :     for (int i = 0; i < PQntuples(res); i++)
    2053             :     {
    2054           6 :         const char *dbname = PQgetvalue(res, i, 0);
    2055             : 
    2056           6 :         simple_string_list_append(&opt->database_names, dbname);
    2057             : 
    2058             :         /* Increment num_dbs to reflect multiple --database options */
    2059           6 :         num_dbs++;
    2060             :     }
    2061             : 
    2062           2 :     PQclear(res);
    2063           2 :     disconnect_database(conn, false);
    2064           2 : }
    2065             : 
    2066             : int
    2067          46 : main(int argc, char **argv)
    2068             : {
    2069             :     static struct option long_options[] =
    2070             :     {
    2071             :         {"all", no_argument, NULL, 'a'},
    2072             :         {"database", required_argument, NULL, 'd'},
    2073             :         {"pgdata", required_argument, NULL, 'D'},
    2074             :         {"dry-run", no_argument, NULL, 'n'},
    2075             :         {"subscriber-port", required_argument, NULL, 'p'},
    2076             :         {"publisher-server", required_argument, NULL, 'P'},
    2077             :         {"socketdir", required_argument, NULL, 's'},
    2078             :         {"recovery-timeout", required_argument, NULL, 't'},
    2079             :         {"enable-two-phase", no_argument, NULL, 'T'},
    2080             :         {"subscriber-username", required_argument, NULL, 'U'},
    2081             :         {"verbose", no_argument, NULL, 'v'},
    2082             :         {"version", no_argument, NULL, 'V'},
    2083             :         {"help", no_argument, NULL, '?'},
    2084             :         {"config-file", required_argument, NULL, 1},
    2085             :         {"publication", required_argument, NULL, 2},
    2086             :         {"replication-slot", required_argument, NULL, 3},
    2087             :         {"subscription", required_argument, NULL, 4},
    2088             :         {"clean", required_argument, NULL, 5},
    2089             :         {NULL, 0, NULL, 0}
    2090             :     };
    2091             : 
    2092          46 :     struct CreateSubscriberOptions opt = {0};
    2093             : 
    2094             :     int         c;
    2095             :     int         option_index;
    2096             : 
    2097             :     char       *pub_base_conninfo;
    2098             :     char       *sub_base_conninfo;
    2099          46 :     char       *dbname_conninfo = NULL;
    2100             : 
    2101             :     uint64      pub_sysid;
    2102             :     uint64      sub_sysid;
    2103             :     struct stat statbuf;
    2104             : 
    2105             :     char       *consistent_lsn;
    2106             : 
    2107             :     char        pidfile[MAXPGPATH];
    2108             : 
    2109          46 :     pg_logging_init(argv[0]);
    2110          46 :     pg_logging_set_level(PG_LOG_WARNING);
    2111          46 :     progname = get_progname(argv[0]);
    2112          46 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
    2113             : 
    2114          46 :     if (argc > 1)
    2115             :     {
    2116          44 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
    2117             :         {
    2118           2 :             usage();
    2119           2 :             exit(0);
    2120             :         }
    2121          42 :         else if (strcmp(argv[1], "-V") == 0
    2122          42 :                  || strcmp(argv[1], "--version") == 0)
    2123             :         {
    2124           2 :             puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
    2125           2 :             exit(0);
    2126             :         }
    2127             :     }
    2128             : 
    2129             :     /* Default settings */
    2130          42 :     subscriber_dir = NULL;
    2131          42 :     opt.config_file = NULL;
    2132          42 :     opt.pub_conninfo_str = NULL;
    2133          42 :     opt.socket_dir = NULL;
    2134          42 :     opt.sub_port = DEFAULT_SUB_PORT;
    2135          42 :     opt.sub_username = NULL;
    2136          42 :     opt.two_phase = false;
    2137          42 :     opt.database_names = (SimpleStringList)
    2138             :     {
    2139             :         0
    2140             :     };
    2141          42 :     opt.recovery_timeout = 0;
    2142          42 :     opt.all_dbs = false;
    2143             : 
    2144             :     /*
    2145             :      * Don't allow it to be run as root. It uses pg_ctl which does not allow
    2146             :      * it either.
    2147             :      */
    2148             : #ifndef WIN32
    2149          42 :     if (geteuid() == 0)
    2150             :     {
    2151           0 :         pg_log_error("cannot be executed by \"root\"");
    2152           0 :         pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
    2153             :                           progname);
    2154           0 :         exit(1);
    2155             :     }
    2156             : #endif
    2157             : 
    2158          42 :     get_restricted_token();
    2159             : 
    2160         324 :     while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
    2161         324 :                             long_options, &option_index)) != -1)
    2162             :     {
    2163         288 :         switch (c)
    2164             :         {
    2165           6 :             case 'a':
    2166           6 :                 opt.all_dbs = true;
    2167           6 :                 break;
    2168          50 :             case 'd':
    2169          50 :                 if (!simple_string_list_member(&opt.database_names, optarg))
    2170             :                 {
    2171          48 :                     simple_string_list_append(&opt.database_names, optarg);
    2172          48 :                     num_dbs++;
    2173             :                 }
    2174             :                 else
    2175           2 :                     pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
    2176          48 :                 break;
    2177          38 :             case 'D':
    2178          38 :                 subscriber_dir = pg_strdup(optarg);
    2179          38 :                 canonicalize_path(subscriber_dir);
    2180          38 :                 break;
    2181          18 :             case 'n':
    2182          18 :                 dry_run = true;
    2183          18 :                 break;
    2184          24 :             case 'p':
    2185          24 :                 opt.sub_port = pg_strdup(optarg);
    2186          24 :                 break;
    2187          36 :             case 'P':
    2188          36 :                 opt.pub_conninfo_str = pg_strdup(optarg);
    2189          36 :                 break;
    2190          24 :             case 's':
    2191          24 :                 opt.socket_dir = pg_strdup(optarg);
    2192          24 :                 canonicalize_path(opt.socket_dir);
    2193          24 :                 break;
    2194           6 :             case 't':
    2195           6 :                 opt.recovery_timeout = atoi(optarg);
    2196           6 :                 break;
    2197           2 :             case 'T':
    2198           2 :                 opt.two_phase = true;
    2199           2 :                 break;
    2200           0 :             case 'U':
    2201           0 :                 opt.sub_username = pg_strdup(optarg);
    2202           0 :                 break;
    2203          38 :             case 'v':
    2204          38 :                 pg_logging_increase_verbosity();
    2205          38 :                 break;
    2206           0 :             case 1:
    2207           0 :                 opt.config_file = pg_strdup(optarg);
    2208           0 :                 break;
    2209          24 :             case 2:
    2210          24 :                 if (!simple_string_list_member(&opt.pub_names, optarg))
    2211             :                 {
    2212          22 :                     simple_string_list_append(&opt.pub_names, optarg);
    2213          22 :                     num_pubs++;
    2214             :                 }
    2215             :                 else
    2216           2 :                     pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
    2217          22 :                 break;
    2218           8 :             case 3:
    2219           8 :                 if (!simple_string_list_member(&opt.replslot_names, optarg))
    2220             :                 {
    2221           8 :                     simple_string_list_append(&opt.replslot_names, optarg);
    2222           8 :                     num_replslots++;
    2223             :                 }
    2224             :                 else
    2225           0 :                     pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
    2226           8 :                 break;
    2227          10 :             case 4:
    2228          10 :                 if (!simple_string_list_member(&opt.sub_names, optarg))
    2229             :                 {
    2230          10 :                     simple_string_list_append(&opt.sub_names, optarg);
    2231          10 :                     num_subs++;
    2232             :                 }
    2233             :                 else
    2234           0 :                     pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
    2235          10 :                 break;
    2236           2 :             case 5:
    2237           2 :                 if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
    2238           2 :                     simple_string_list_append(&opt.objecttypes_to_clean, optarg);
    2239             :                 else
    2240           0 :                     pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
    2241           2 :                 break;
    2242           2 :             default:
    2243             :                 /* getopt_long already emitted a complaint */
    2244           2 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2245           2 :                 exit(1);
    2246             :         }
    2247             :     }
    2248             : 
    2249             :     /* Validate that --all is not used with incompatible options */
    2250          36 :     if (opt.all_dbs)
    2251             :     {
    2252           6 :         char       *bad_switch = NULL;
    2253             : 
    2254           6 :         if (num_dbs > 0)
    2255           2 :             bad_switch = "--database";
    2256           4 :         else if (num_pubs > 0)
    2257           2 :             bad_switch = "--publication";
    2258           2 :         else if (num_replslots > 0)
    2259           0 :             bad_switch = "--replication-slot";
    2260           2 :         else if (num_subs > 0)
    2261           0 :             bad_switch = "--subscription";
    2262             : 
    2263           6 :         if (bad_switch)
    2264             :         {
    2265           4 :             pg_log_error("options %s and -a/--all cannot be used together", bad_switch);
    2266           4 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2267           4 :             exit(1);
    2268             :         }
    2269             :     }
    2270             : 
    2271             :     /* Any non-option arguments? */
    2272          32 :     if (optind < argc)
    2273             :     {
    2274           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
    2275             :                      argv[optind]);
    2276           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2277           0 :         exit(1);
    2278             :     }
    2279             : 
    2280             :     /* Required arguments */
    2281          32 :     if (subscriber_dir == NULL)
    2282             :     {
    2283           2 :         pg_log_error("no subscriber data directory specified");
    2284           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2285           2 :         exit(1);
    2286             :     }
    2287             : 
    2288             :     /* If socket directory is not provided, use the current directory */
    2289          30 :     if (opt.socket_dir == NULL)
    2290             :     {
    2291             :         char        cwd[MAXPGPATH];
    2292             : 
    2293          10 :         if (!getcwd(cwd, MAXPGPATH))
    2294           0 :             pg_fatal("could not determine current directory");
    2295          10 :         opt.socket_dir = pg_strdup(cwd);
    2296          10 :         canonicalize_path(opt.socket_dir);
    2297             :     }
    2298             : 
    2299             :     /*
    2300             :      * Parse connection string. Build a base connection string that might be
    2301             :      * reused by multiple databases.
    2302             :      */
    2303          30 :     if (opt.pub_conninfo_str == NULL)
    2304             :     {
    2305             :         /*
    2306             :          * TODO use primary_conninfo (if available) from subscriber and
    2307             :          * extract publisher connection string. Assume that there are
    2308             :          * identical entries for physical and logical replication. If there is
    2309             :          * not, we would fail anyway.
    2310             :          */
    2311           2 :         pg_log_error("no publisher connection string specified");
    2312           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2313           2 :         exit(1);
    2314             :     }
    2315          28 :     pg_log_info("validating publisher connection string");
    2316          28 :     pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
    2317             :                                           &dbname_conninfo);
    2318          28 :     if (pub_base_conninfo == NULL)
    2319           0 :         exit(1);
    2320             : 
    2321          28 :     pg_log_info("validating subscriber connection string");
    2322          28 :     sub_base_conninfo = get_sub_conninfo(&opt);
    2323             : 
    2324             :     /*
    2325             :      * Fetch all databases from the source (publisher) and treat them as if
    2326             :      * the user specified has multiple --database options, one for each source
    2327             :      * database.
    2328             :      */
    2329          28 :     if (opt.all_dbs)
    2330             :     {
    2331           2 :         bool        dbnamespecified = (dbname_conninfo != NULL);
    2332             : 
    2333           2 :         get_publisher_databases(&opt, dbnamespecified);
    2334             :     }
    2335             : 
    2336          28 :     if (opt.database_names.head == NULL)
    2337             :     {
    2338           4 :         pg_log_info("no database was specified");
    2339             : 
    2340             :         /*
    2341             :          * Try to obtain the dbname from the publisher conninfo. If dbname
    2342             :          * parameter is not available, error out.
    2343             :          */
    2344           4 :         if (dbname_conninfo)
    2345             :         {
    2346           2 :             simple_string_list_append(&opt.database_names, dbname_conninfo);
    2347           2 :             num_dbs++;
    2348             : 
    2349           2 :             pg_log_info("database name \"%s\" was extracted from the publisher connection string",
    2350             :                         dbname_conninfo);
    2351             :         }
    2352             :         else
    2353             :         {
    2354           2 :             pg_log_error("no database name specified");
    2355           2 :             pg_log_error_hint("Try \"%s --help\" for more information.",
    2356             :                               progname);
    2357           2 :             exit(1);
    2358             :         }
    2359             :     }
    2360             : 
    2361             :     /* Number of object names must match number of databases */
    2362          26 :     if (num_pubs > 0 && num_pubs != num_dbs)
    2363             :     {
    2364           2 :         pg_log_error("wrong number of publication names specified");
    2365           2 :         pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
    2366             :                             num_pubs, num_dbs);
    2367           2 :         exit(1);
    2368             :     }
    2369          24 :     if (num_subs > 0 && num_subs != num_dbs)
    2370             :     {
    2371           2 :         pg_log_error("wrong number of subscription names specified");
    2372           2 :         pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
    2373             :                             num_subs, num_dbs);
    2374           2 :         exit(1);
    2375             :     }
    2376          22 :     if (num_replslots > 0 && num_replslots != num_dbs)
    2377             :     {
    2378           2 :         pg_log_error("wrong number of replication slot names specified");
    2379           2 :         pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
    2380             :                             num_replslots, num_dbs);
    2381           2 :         exit(1);
    2382             :     }
    2383             : 
    2384             :     /* Verify the object types specified for removal from the subscriber */
    2385          22 :     for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
    2386             :     {
    2387           2 :         if (pg_strcasecmp(cell->val, "publications") == 0)
    2388           2 :             dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
    2389             :         else
    2390             :         {
    2391           0 :             pg_log_error("invalid object type \"%s\" specified for --clean", cell->val);
    2392           0 :             pg_log_error_hint("The valid value is: \"%s\"", "publications");
    2393           0 :             exit(1);
    2394             :         }
    2395             :     }
    2396             : 
    2397             :     /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    2398          20 :     pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    2399          20 :     pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
    2400             : 
    2401             :     /* Rudimentary check for a data directory */
    2402          20 :     check_data_directory(subscriber_dir);
    2403             : 
    2404          20 :     dbinfos.two_phase = opt.two_phase;
    2405             : 
    2406             :     /*
    2407             :      * Store database information for publisher and subscriber. It should be
    2408             :      * called before atexit() because its return is used in the
    2409             :      * cleanup_objects_atexit().
    2410             :      */
    2411          20 :     dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
    2412             : 
    2413             :     /* Register a function to clean up objects in case of failure */
    2414          20 :     atexit(cleanup_objects_atexit);
    2415             : 
    2416             :     /*
    2417             :      * Check if the subscriber data directory has the same system identifier
    2418             :      * than the publisher data directory.
    2419             :      */
    2420          20 :     pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
    2421          20 :     sub_sysid = get_standby_sysid(subscriber_dir);
    2422          20 :     if (pub_sysid != sub_sysid)
    2423           2 :         pg_fatal("subscriber data directory is not a copy of the source database cluster");
    2424             : 
    2425             :     /* Subscriber PID file */
    2426          18 :     snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
    2427             : 
    2428             :     /*
    2429             :      * The standby server must not be running. If the server is started under
    2430             :      * service manager and pg_createsubscriber stops it, the service manager
    2431             :      * might react to this action and start the server again. Therefore,
    2432             :      * refuse to proceed if the server is running to avoid possible failures.
    2433             :      */
    2434          18 :     if (stat(pidfile, &statbuf) == 0)
    2435             :     {
    2436           2 :         pg_log_error("standby server is running");
    2437           2 :         pg_log_error_hint("Stop the standby server and try again.");
    2438           2 :         exit(1);
    2439             :     }
    2440             : 
    2441             :     /*
    2442             :      * Start a short-lived standby server with temporary parameters (provided
    2443             :      * by command-line options). The goal is to avoid connections during the
    2444             :      * transformation steps.
    2445             :      */
    2446          16 :     pg_log_info("starting the standby server with command-line options");
    2447          16 :     start_standby_server(&opt, true, false);
    2448             : 
    2449             :     /* Check if the standby server is ready for logical replication */
    2450          16 :     check_subscriber(dbinfos.dbinfo);
    2451             : 
    2452             :     /* Check if the primary server is ready for logical replication */
    2453          12 :     check_publisher(dbinfos.dbinfo);
    2454             : 
    2455             :     /*
    2456             :      * Stop the target server. The recovery process requires that the server
    2457             :      * reaches a consistent state before targeting the recovery stop point.
    2458             :      * Make sure a consistent state is reached (stop the target server
    2459             :      * guarantees it) *before* creating the replication slots in
    2460             :      * setup_publisher().
    2461             :      */
    2462           8 :     pg_log_info("stopping the subscriber");
    2463           8 :     stop_standby_server(subscriber_dir);
    2464             : 
    2465             :     /* Create the required objects for each database on publisher */
    2466           8 :     consistent_lsn = setup_publisher(dbinfos.dbinfo);
    2467             : 
    2468             :     /* Write the required recovery parameters */
    2469           8 :     setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
    2470             : 
    2471             :     /*
    2472             :      * Start subscriber so the recovery parameters will take effect. Wait
    2473             :      * until accepting connections. We don't want to start logical replication
    2474             :      * during setup.
    2475             :      */
    2476           8 :     pg_log_info("starting the subscriber");
    2477           8 :     start_standby_server(&opt, true, true);
    2478             : 
    2479             :     /* Waiting the subscriber to be promoted */
    2480           8 :     wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
    2481             : 
    2482             :     /*
    2483             :      * Create the subscription for each database on subscriber. It does not
    2484             :      * enable it immediately because it needs to adjust the replication start
    2485             :      * point to the LSN reported by setup_publisher().  It also cleans up
    2486             :      * publications created by this tool and replication to the standby.
    2487             :      */
    2488           8 :     setup_subscriber(dbinfos.dbinfo, consistent_lsn);
    2489             : 
    2490             :     /* Remove primary_slot_name if it exists on primary */
    2491           8 :     drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
    2492             : 
    2493             :     /* Remove failover replication slots if they exist on subscriber */
    2494           8 :     drop_failover_replication_slots(dbinfos.dbinfo);
    2495             : 
    2496             :     /* Stop the subscriber */
    2497           8 :     pg_log_info("stopping the subscriber");
    2498           8 :     stop_standby_server(subscriber_dir);
    2499             : 
    2500             :     /* Change system identifier from subscriber */
    2501           8 :     modify_subscriber_sysid(&opt);
    2502             : 
    2503           8 :     success = true;
    2504             : 
    2505           8 :     pg_log_info("Done!");
    2506             : 
    2507           8 :     return 0;
    2508             : }

Generated by: LCOV version 1.16