LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_createsubscriber.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 716 857 83.5 %
Date: 2025-03-23 01:15:33 Functions: 38 38 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_createsubscriber.c
       4             :  *    Create a new logical replica from a standby server
       5             :  *
       6             :  * Copyright (c) 2024-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/bin/pg_basebackup/pg_createsubscriber.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : 
      14             : #include "postgres_fe.h"
      15             : 
      16             : #include <sys/stat.h>
      17             : #include <sys/time.h>
      18             : #include <sys/wait.h>
      19             : #include <time.h>
      20             : 
      21             : #include "common/connect.h"
      22             : #include "common/controldata_utils.h"
      23             : #include "common/logging.h"
      24             : #include "common/pg_prng.h"
      25             : #include "common/restricted_token.h"
      26             : #include "fe_utils/recovery_gen.h"
      27             : #include "fe_utils/simple_list.h"
      28             : #include "fe_utils/string_utils.h"
      29             : #include "getopt_long.h"
      30             : 
      31             : #define DEFAULT_SUB_PORT    "50432"
      32             : #define OBJECTTYPE_PUBLICATIONS  0x0001
      33             : 
      34             : /* Command-line options */
      35             : struct CreateSubscriberOptions
      36             : {
      37             :     char       *config_file;    /* configuration file */
      38             :     char       *pub_conninfo_str;   /* publisher connection string */
      39             :     char       *socket_dir;     /* directory for Unix-domain socket, if any */
      40             :     char       *sub_port;       /* subscriber port number */
      41             :     const char *sub_username;   /* subscriber username */
      42             :     bool        two_phase;      /* enable-two-phase option */
      43             :     SimpleStringList database_names;    /* list of database names */
      44             :     SimpleStringList pub_names; /* list of publication names */
      45             :     SimpleStringList sub_names; /* list of subscription names */
      46             :     SimpleStringList replslot_names;    /* list of replication slot names */
      47             :     int         recovery_timeout;   /* stop recovery after this time */
      48             :     SimpleStringList objecttypes_to_remove; /* list of object types to remove */
      49             : };
      50             : 
      51             : /* per-database publication/subscription info */
      52             : struct LogicalRepInfo
      53             : {
      54             :     char       *dbname;         /* database name */
      55             :     char       *pubconninfo;    /* publisher connection string */
      56             :     char       *subconninfo;    /* subscriber connection string */
      57             :     char       *pubname;        /* publication name */
      58             :     char       *subname;        /* subscription name */
      59             :     char       *replslotname;   /* replication slot name */
      60             : 
      61             :     bool        made_replslot;  /* replication slot was created */
      62             :     bool        made_publication;   /* publication was created */
      63             : };
      64             : 
      65             : /*
      66             :  * Information shared across all the databases (or publications and
      67             :  * subscriptions).
      68             :  */
      69             : struct LogicalRepInfos
      70             : {
      71             :     struct LogicalRepInfo *dbinfo;
      72             :     bool        two_phase;      /* enable-two-phase option */
      73             :     bits32      objecttypes_to_remove;  /* flags indicating which object types
      74             :                                          * to remove on subscriber */
      75             : };
      76             : 
      77             : static void cleanup_objects_atexit(void);
      78             : static void usage();
      79             : static char *get_base_conninfo(const char *conninfo, char **dbname);
      80             : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
      81             : static char *get_exec_path(const char *argv0, const char *progname);
      82             : static void check_data_directory(const char *datadir);
      83             : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
      84             : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
      85             :                                                  const char *pub_base_conninfo,
      86             :                                                  const char *sub_base_conninfo);
      87             : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
      88             : static void disconnect_database(PGconn *conn, bool exit_on_error);
      89             : static uint64 get_primary_sysid(const char *conninfo);
      90             : static uint64 get_standby_sysid(const char *datadir);
      91             : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
      92             : static bool server_is_in_recovery(PGconn *conn);
      93             : static char *generate_object_name(PGconn *conn);
      94             : static void check_publisher(const struct LogicalRepInfo *dbinfo);
      95             : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
      96             : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
      97             : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
      98             :                              const char *consistent_lsn);
      99             : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
     100             :                            const char *lsn);
     101             : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
     102             :                                           const char *slotname);
     103             : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
     104             : static char *create_logical_replication_slot(PGconn *conn,
     105             :                                              struct LogicalRepInfo *dbinfo);
     106             : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
     107             :                                   const char *slot_name);
     108             : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
     109             : static void start_standby_server(const struct CreateSubscriberOptions *opt,
     110             :                                  bool restricted_access,
     111             :                                  bool restrict_logical_worker);
     112             : static void stop_standby_server(const char *datadir);
     113             : static void wait_for_end_recovery(const char *conninfo,
     114             :                                   const struct CreateSubscriberOptions *opt);
     115             : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     116             : static void drop_publication(PGconn *conn, const char *pubname,
     117             :                              const char *dbname, bool *made_publication);
     118             : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
     119             : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     120             : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
     121             :                                      const char *lsn);
     122             : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     123             : static void check_and_drop_existing_subscriptions(PGconn *conn,
     124             :                                                   const struct LogicalRepInfo *dbinfo);
     125             : static void drop_existing_subscriptions(PGconn *conn, const char *subname,
     126             :                                         const char *dbname);
     127             : 
     128             : #define USEC_PER_SEC    1000000
     129             : #define WAIT_INTERVAL   1       /* 1 second */
     130             : 
     131             : static const char *progname;
     132             : 
     133             : static char *primary_slot_name = NULL;
     134             : static bool dry_run = false;
     135             : 
     136             : static bool success = false;
     137             : 
     138             : static struct LogicalRepInfos dbinfos;
     139             : static int  num_dbs = 0;        /* number of specified databases */
     140             : static int  num_pubs = 0;       /* number of specified publications */
     141             : static int  num_subs = 0;       /* number of specified subscriptions */
     142             : static int  num_replslots = 0;  /* number of specified replication slots */
     143             : 
     144             : static pg_prng_state prng_state;
     145             : 
     146             : static char *pg_ctl_path = NULL;
     147             : static char *pg_resetwal_path = NULL;
     148             : 
     149             : /* standby / subscriber data directory */
     150             : static char *subscriber_dir = NULL;
     151             : 
     152             : static bool recovery_ended = false;
     153             : static bool standby_running = false;
     154             : 
     155             : enum WaitPMResult
     156             : {
     157             :     POSTMASTER_READY,
     158             :     POSTMASTER_STILL_STARTING
     159             : };
     160             : 
     161             : 
     162             : /*
     163             :  * Cleanup objects that were created by pg_createsubscriber if there is an
     164             :  * error.
     165             :  *
     166             :  * Publications and replication slots are created on primary. Depending on the
     167             :  * step it failed, it should remove the already created objects if it is
     168             :  * possible (sometimes it won't work due to a connection issue).
     169             :  * There is no cleanup on the target server. The steps on the target server are
     170             :  * executed *after* promotion, hence, at this point, a failure means recreate
     171             :  * the physical replica and start again.
     172             :  */
     173             : static void
     174          18 : cleanup_objects_atexit(void)
     175             : {
     176          18 :     if (success)
     177           6 :         return;
     178             : 
     179             :     /*
     180             :      * If the server is promoted, there is no way to use the current setup
     181             :      * again. Warn the user that a new replication setup should be done before
     182             :      * trying again.
     183             :      */
     184          12 :     if (recovery_ended)
     185             :     {
     186           0 :         pg_log_warning("failed after the end of recovery");
     187           0 :         pg_log_warning_hint("The target server cannot be used as a physical replica anymore.  "
     188             :                             "You must recreate the physical replica before continuing.");
     189             :     }
     190             : 
     191          36 :     for (int i = 0; i < num_dbs; i++)
     192             :     {
     193          24 :         struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
     194             : 
     195          24 :         if (dbinfo->made_publication || dbinfo->made_replslot)
     196             :         {
     197             :             PGconn     *conn;
     198             : 
     199           0 :             conn = connect_database(dbinfo->pubconninfo, false);
     200           0 :             if (conn != NULL)
     201             :             {
     202           0 :                 if (dbinfo->made_publication)
     203           0 :                     drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
     204             :                                      &dbinfo->made_publication);
     205           0 :                 if (dbinfo->made_replslot)
     206           0 :                     drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
     207           0 :                 disconnect_database(conn, false);
     208             :             }
     209             :             else
     210             :             {
     211             :                 /*
     212             :                  * If a connection could not be established, inform the user
     213             :                  * that some objects were left on primary and should be
     214             :                  * removed before trying again.
     215             :                  */
     216           0 :                 if (dbinfo->made_publication)
     217             :                 {
     218           0 :                     pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
     219             :                                    dbinfo->pubname,
     220             :                                    dbinfo->dbname);
     221           0 :                     pg_log_warning_hint("Drop this publication before trying again.");
     222             :                 }
     223           0 :                 if (dbinfo->made_replslot)
     224             :                 {
     225           0 :                     pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
     226             :                                    dbinfo->replslotname,
     227             :                                    dbinfo->dbname);
     228           0 :                     pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
     229             :                 }
     230             :             }
     231             :         }
     232             :     }
     233             : 
     234          12 :     if (standby_running)
     235           8 :         stop_standby_server(subscriber_dir);
     236             : }
     237             : 
     238             : static void
     239           2 : usage(void)
     240             : {
     241           2 :     printf(_("%s creates a new logical replica from a standby server.\n\n"),
     242             :            progname);
     243           2 :     printf(_("Usage:\n"));
     244           2 :     printf(_("  %s [OPTION]...\n"), progname);
     245           2 :     printf(_("\nOptions:\n"));
     246           2 :     printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
     247           2 :     printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
     248           2 :     printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
     249           2 :     printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
     250           2 :     printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
     251           2 :     printf(_("  -R, --remove=OBJECTTYPE         remove all objects of the specified type from specified\n"
     252             :              "                                  databases on the subscriber; accepts: publications\n"));
     253           2 :     printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
     254           2 :     printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
     255           2 :     printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
     256           2 :     printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
     257           2 :     printf(_("  -v, --verbose                   output verbose messages\n"));
     258           2 :     printf(_("      --config-file=FILENAME      use specified main server configuration\n"
     259             :              "                                  file when running target cluster\n"));
     260           2 :     printf(_("      --publication=NAME          publication name\n"));
     261           2 :     printf(_("      --replication-slot=NAME     replication slot name\n"));
     262           2 :     printf(_("      --subscription=NAME         subscription name\n"));
     263           2 :     printf(_("  -V, --version                   output version information, then exit\n"));
     264           2 :     printf(_("  -?, --help                      show this help, then exit\n"));
     265           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     266           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     267           2 : }
     268             : 
     269             : /*
     270             :  * Subroutine to append "keyword=value" to a connection string,
     271             :  * with proper quoting of the value.  (We assume keywords don't need that.)
     272             :  */
     273             : static void
     274         190 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
     275             : {
     276         190 :     if (buf->len > 0)
     277         138 :         appendPQExpBufferChar(buf, ' ');
     278         190 :     appendPQExpBufferStr(buf, keyword);
     279         190 :     appendPQExpBufferChar(buf, '=');
     280         190 :     appendConnStrVal(buf, val);
     281         190 : }
     282             : 
     283             : /*
     284             :  * Validate a connection string. Returns a base connection string that is a
     285             :  * connection string without a database name.
     286             :  *
     287             :  * Since we might process multiple databases, each database name will be
     288             :  * appended to this base connection string to provide a final connection
     289             :  * string. If the second argument (dbname) is not null, returns dbname if the
     290             :  * provided connection string contains it.
     291             :  *
     292             :  * It is the caller's responsibility to free the returned connection string and
     293             :  * dbname.
     294             :  */
     295             : static char *
     296          26 : get_base_conninfo(const char *conninfo, char **dbname)
     297             : {
     298             :     PQExpBuffer buf;
     299             :     PQconninfoOption *conn_opts;
     300             :     PQconninfoOption *conn_opt;
     301          26 :     char       *errmsg = NULL;
     302             :     char       *ret;
     303             : 
     304          26 :     conn_opts = PQconninfoParse(conninfo, &errmsg);
     305          26 :     if (conn_opts == NULL)
     306             :     {
     307           0 :         pg_log_error("could not parse connection string: %s", errmsg);
     308           0 :         PQfreemem(errmsg);
     309           0 :         return NULL;
     310             :     }
     311             : 
     312          26 :     buf = createPQExpBuffer();
     313        1248 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     314             :     {
     315        1222 :         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     316             :         {
     317          62 :             if (strcmp(conn_opt->keyword, "dbname") == 0)
     318             :             {
     319          18 :                 if (dbname)
     320          18 :                     *dbname = pg_strdup(conn_opt->val);
     321          18 :                 continue;
     322             :             }
     323          44 :             appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
     324             :         }
     325             :     }
     326             : 
     327          26 :     ret = pg_strdup(buf->data);
     328             : 
     329          26 :     destroyPQExpBuffer(buf);
     330          26 :     PQconninfoFree(conn_opts);
     331             : 
     332          26 :     return ret;
     333             : }
     334             : 
     335             : /*
     336             :  * Build a subscriber connection string. Only a few parameters are supported
     337             :  * since it starts a server with restricted access.
     338             :  */
     339             : static char *
     340          26 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
     341             : {
     342          26 :     PQExpBuffer buf = createPQExpBuffer();
     343             :     char       *ret;
     344             : 
     345          26 :     appendConnStrItem(buf, "port", opt->sub_port);
     346             : #if !defined(WIN32)
     347          26 :     appendConnStrItem(buf, "host", opt->socket_dir);
     348             : #endif
     349          26 :     if (opt->sub_username != NULL)
     350           0 :         appendConnStrItem(buf, "user", opt->sub_username);
     351          26 :     appendConnStrItem(buf, "fallback_application_name", progname);
     352             : 
     353          26 :     ret = pg_strdup(buf->data);
     354             : 
     355          26 :     destroyPQExpBuffer(buf);
     356             : 
     357          26 :     return ret;
     358             : }
     359             : 
     360             : /*
     361             :  * Verify if a PostgreSQL binary (progname) is available in the same directory as
     362             :  * pg_createsubscriber and it has the same version.  It returns the absolute
     363             :  * path of the progname.
     364             :  */
     365             : static char *
     366          36 : get_exec_path(const char *argv0, const char *progname)
     367             : {
     368             :     char       *versionstr;
     369             :     char       *exec_path;
     370             :     int         ret;
     371             : 
     372          36 :     versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
     373          36 :     exec_path = pg_malloc(MAXPGPATH);
     374          36 :     ret = find_other_exec(argv0, progname, versionstr, exec_path);
     375             : 
     376          36 :     if (ret < 0)
     377             :     {
     378             :         char        full_path[MAXPGPATH];
     379             : 
     380           0 :         if (find_my_exec(argv0, full_path) < 0)
     381           0 :             strlcpy(full_path, progname, sizeof(full_path));
     382             : 
     383           0 :         if (ret == -1)
     384           0 :             pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
     385             :                      progname, "pg_createsubscriber", full_path);
     386             :         else
     387           0 :             pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
     388             :                      progname, full_path, "pg_createsubscriber");
     389             :     }
     390             : 
     391          36 :     pg_log_debug("%s path is:  %s", progname, exec_path);
     392             : 
     393          36 :     return exec_path;
     394             : }
     395             : 
     396             : /*
     397             :  * Is it a cluster directory? These are preliminary checks. It is far from
     398             :  * making an accurate check. If it is not a clone from the publisher, it will
     399             :  * eventually fail in a future step.
     400             :  */
     401             : static void
     402          18 : check_data_directory(const char *datadir)
     403             : {
     404             :     struct stat statbuf;
     405             :     char        versionfile[MAXPGPATH];
     406             : 
     407          18 :     pg_log_info("checking if directory \"%s\" is a cluster data directory",
     408             :                 datadir);
     409             : 
     410          18 :     if (stat(datadir, &statbuf) != 0)
     411             :     {
     412           0 :         if (errno == ENOENT)
     413           0 :             pg_fatal("data directory \"%s\" does not exist", datadir);
     414             :         else
     415           0 :             pg_fatal("could not access directory \"%s\": %m", datadir);
     416             :     }
     417             : 
     418          18 :     snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
     419          18 :     if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
     420             :     {
     421           0 :         pg_fatal("directory \"%s\" is not a database cluster directory",
     422             :                  datadir);
     423             :     }
     424          18 : }
     425             : 
     426             : /*
     427             :  * Append database name into a base connection string.
     428             :  *
     429             :  * dbname is the only parameter that changes so it is not included in the base
     430             :  * connection string. This function concatenates dbname to build a "real"
     431             :  * connection string.
     432             :  */
     433             : static char *
     434          68 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
     435             : {
     436          68 :     PQExpBuffer buf = createPQExpBuffer();
     437             :     char       *ret;
     438             : 
     439             :     Assert(conninfo != NULL);
     440             : 
     441          68 :     appendPQExpBufferStr(buf, conninfo);
     442          68 :     appendConnStrItem(buf, "dbname", dbname);
     443             : 
     444          68 :     ret = pg_strdup(buf->data);
     445          68 :     destroyPQExpBuffer(buf);
     446             : 
     447          68 :     return ret;
     448             : }
     449             : 
     450             : /*
     451             :  * Store publication and subscription information.
     452             :  *
     453             :  * If publication, replication slot and subscription names were specified,
     454             :  * store it here. Otherwise, a generated name will be assigned to the object in
     455             :  * setup_publisher().
     456             :  */
     457             : static struct LogicalRepInfo *
     458          18 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     459             :                    const char *pub_base_conninfo,
     460             :                    const char *sub_base_conninfo)
     461             : {
     462             :     struct LogicalRepInfo *dbinfo;
     463          18 :     SimpleStringListCell *pubcell = NULL;
     464          18 :     SimpleStringListCell *subcell = NULL;
     465          18 :     SimpleStringListCell *replslotcell = NULL;
     466          18 :     int         i = 0;
     467             : 
     468          18 :     dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
     469             : 
     470          18 :     if (num_pubs > 0)
     471           4 :         pubcell = opt->pub_names.head;
     472          18 :     if (num_subs > 0)
     473           2 :         subcell = opt->sub_names.head;
     474          18 :     if (num_replslots > 0)
     475           4 :         replslotcell = opt->replslot_names.head;
     476             : 
     477          52 :     for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
     478             :     {
     479             :         char       *conninfo;
     480             : 
     481             :         /* Fill publisher attributes */
     482          34 :         conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
     483          34 :         dbinfo[i].pubconninfo = conninfo;
     484          34 :         dbinfo[i].dbname = cell->val;
     485          34 :         if (num_pubs > 0)
     486           8 :             dbinfo[i].pubname = pubcell->val;
     487             :         else
     488          26 :             dbinfo[i].pubname = NULL;
     489          34 :         if (num_replslots > 0)
     490           6 :             dbinfo[i].replslotname = replslotcell->val;
     491             :         else
     492          28 :             dbinfo[i].replslotname = NULL;
     493          34 :         dbinfo[i].made_replslot = false;
     494          34 :         dbinfo[i].made_publication = false;
     495             :         /* Fill subscriber attributes */
     496          34 :         conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
     497          34 :         dbinfo[i].subconninfo = conninfo;
     498          34 :         if (num_subs > 0)
     499           4 :             dbinfo[i].subname = subcell->val;
     500             :         else
     501          30 :             dbinfo[i].subname = NULL;
     502             :         /* Other fields will be filled later */
     503             : 
     504          34 :         pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
     505             :                      dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
     506             :                      dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
     507             :                      dbinfo[i].pubconninfo);
     508          34 :         pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
     509             :                      dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
     510             :                      dbinfo[i].subconninfo,
     511             :                      dbinfos.two_phase ? "true" : "false");
     512             : 
     513          34 :         if (num_pubs > 0)
     514           8 :             pubcell = pubcell->next;
     515          34 :         if (num_subs > 0)
     516           4 :             subcell = subcell->next;
     517          34 :         if (num_replslots > 0)
     518           6 :             replslotcell = replslotcell->next;
     519             : 
     520          34 :         i++;
     521             :     }
     522             : 
     523          18 :     return dbinfo;
     524             : }
     525             : 
     526             : /*
     527             :  * Open a new connection. If exit_on_error is true, it has an undesired
     528             :  * condition and it should exit immediately.
     529             :  */
     530             : static PGconn *
     531          86 : connect_database(const char *conninfo, bool exit_on_error)
     532             : {
     533             :     PGconn     *conn;
     534             :     PGresult   *res;
     535             : 
     536          86 :     conn = PQconnectdb(conninfo);
     537          86 :     if (PQstatus(conn) != CONNECTION_OK)
     538             :     {
     539           0 :         pg_log_error("connection to database failed: %s",
     540             :                      PQerrorMessage(conn));
     541           0 :         PQfinish(conn);
     542             : 
     543           0 :         if (exit_on_error)
     544           0 :             exit(1);
     545           0 :         return NULL;
     546             :     }
     547             : 
     548             :     /* Secure search_path */
     549          86 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     550          86 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     551             :     {
     552           0 :         pg_log_error("could not clear \"search_path\": %s",
     553             :                      PQresultErrorMessage(res));
     554           0 :         PQclear(res);
     555           0 :         PQfinish(conn);
     556             : 
     557           0 :         if (exit_on_error)
     558           0 :             exit(1);
     559           0 :         return NULL;
     560             :     }
     561          86 :     PQclear(res);
     562             : 
     563          86 :     return conn;
     564             : }
     565             : 
     566             : /*
     567             :  * Close the connection. If exit_on_error is true, it has an undesired
     568             :  * condition and it should exit immediately.
     569             :  */
     570             : static void
     571          86 : disconnect_database(PGconn *conn, bool exit_on_error)
     572             : {
     573             :     Assert(conn != NULL);
     574             : 
     575          86 :     PQfinish(conn);
     576             : 
     577          86 :     if (exit_on_error)
     578           4 :         exit(1);
     579          82 : }
     580             : 
     581             : /*
     582             :  * Obtain the system identifier using the provided connection. It will be used
     583             :  * to compare if a data directory is a clone of another one.
     584             :  */
     585             : static uint64
     586          18 : get_primary_sysid(const char *conninfo)
     587             : {
     588             :     PGconn     *conn;
     589             :     PGresult   *res;
     590             :     uint64      sysid;
     591             : 
     592          18 :     pg_log_info("getting system identifier from publisher");
     593             : 
     594          18 :     conn = connect_database(conninfo, true);
     595             : 
     596          18 :     res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
     597          18 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     598             :     {
     599           0 :         pg_log_error("could not get system identifier: %s",
     600             :                      PQresultErrorMessage(res));
     601           0 :         disconnect_database(conn, true);
     602             :     }
     603          18 :     if (PQntuples(res) != 1)
     604             :     {
     605           0 :         pg_log_error("could not get system identifier: got %d rows, expected %d row",
     606             :                      PQntuples(res), 1);
     607           0 :         disconnect_database(conn, true);
     608             :     }
     609             : 
     610          18 :     sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
     611             : 
     612          18 :     pg_log_info("system identifier is %llu on publisher",
     613             :                 (unsigned long long) sysid);
     614             : 
     615          18 :     PQclear(res);
     616          18 :     disconnect_database(conn, false);
     617             : 
     618          18 :     return sysid;
     619             : }
     620             : 
     621             : /*
     622             :  * Obtain the system identifier from control file. It will be used to compare
     623             :  * if a data directory is a clone of another one. This routine is used locally
     624             :  * and avoids a connection.
     625             :  */
     626             : static uint64
     627          18 : get_standby_sysid(const char *datadir)
     628             : {
     629             :     ControlFileData *cf;
     630             :     bool        crc_ok;
     631             :     uint64      sysid;
     632             : 
     633          18 :     pg_log_info("getting system identifier from subscriber");
     634             : 
     635          18 :     cf = get_controlfile(datadir, &crc_ok);
     636          18 :     if (!crc_ok)
     637           0 :         pg_fatal("control file appears to be corrupt");
     638             : 
     639          18 :     sysid = cf->system_identifier;
     640             : 
     641          18 :     pg_log_info("system identifier is %llu on subscriber",
     642             :                 (unsigned long long) sysid);
     643             : 
     644          18 :     pg_free(cf);
     645             : 
     646          18 :     return sysid;
     647             : }
     648             : 
     649             : /*
     650             :  * Modify the system identifier. Since a standby server preserves the system
     651             :  * identifier, it makes sense to change it to avoid situations in which WAL
     652             :  * files from one of the systems might be used in the other one.
     653             :  */
     654             : static void
     655           6 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
     656             : {
     657             :     ControlFileData *cf;
     658             :     bool        crc_ok;
     659             :     struct timeval tv;
     660             : 
     661             :     char       *cmd_str;
     662             : 
     663           6 :     pg_log_info("modifying system identifier of subscriber");
     664             : 
     665           6 :     cf = get_controlfile(subscriber_dir, &crc_ok);
     666           6 :     if (!crc_ok)
     667           0 :         pg_fatal("control file appears to be corrupt");
     668             : 
     669             :     /*
     670             :      * Select a new system identifier.
     671             :      *
     672             :      * XXX this code was extracted from BootStrapXLOG().
     673             :      */
     674           6 :     gettimeofday(&tv, NULL);
     675           6 :     cf->system_identifier = ((uint64) tv.tv_sec) << 32;
     676           6 :     cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
     677           6 :     cf->system_identifier |= getpid() & 0xFFF;
     678             : 
     679           6 :     if (!dry_run)
     680           2 :         update_controlfile(subscriber_dir, cf, true);
     681             : 
     682           6 :     pg_log_info("system identifier is %llu on subscriber",
     683             :                 (unsigned long long) cf->system_identifier);
     684             : 
     685           6 :     pg_log_info("running pg_resetwal on the subscriber");
     686             : 
     687           6 :     cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
     688             :                        subscriber_dir, DEVNULL);
     689             : 
     690           6 :     pg_log_debug("pg_resetwal command is: %s", cmd_str);
     691             : 
     692           6 :     if (!dry_run)
     693             :     {
     694           2 :         int         rc = system(cmd_str);
     695             : 
     696           2 :         if (rc == 0)
     697           2 :             pg_log_info("subscriber successfully changed the system identifier");
     698             :         else
     699           0 :             pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
     700             :     }
     701             : 
     702           6 :     pg_free(cf);
     703           6 : }
     704             : 
     705             : /*
     706             :  * Generate an object name using a prefix, database oid and a random integer.
     707             :  * It is used in case the user does not specify an object name (publication,
     708             :  * subscription, replication slot).
     709             :  */
     710             : static char *
     711          10 : generate_object_name(PGconn *conn)
     712             : {
     713             :     PGresult   *res;
     714             :     Oid         oid;
     715             :     uint32      rand;
     716             :     char       *objname;
     717             : 
     718          10 :     res = PQexec(conn,
     719             :                  "SELECT oid FROM pg_catalog.pg_database "
     720             :                  "WHERE datname = pg_catalog.current_database()");
     721          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     722             :     {
     723           0 :         pg_log_error("could not obtain database OID: %s",
     724             :                      PQresultErrorMessage(res));
     725           0 :         disconnect_database(conn, true);
     726             :     }
     727             : 
     728          10 :     if (PQntuples(res) != 1)
     729             :     {
     730           0 :         pg_log_error("could not obtain database OID: got %d rows, expected %d row",
     731             :                      PQntuples(res), 1);
     732           0 :         disconnect_database(conn, true);
     733             :     }
     734             : 
     735             :     /* Database OID */
     736          10 :     oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
     737             : 
     738          10 :     PQclear(res);
     739             : 
     740             :     /* Random unsigned integer */
     741          10 :     rand = pg_prng_uint32(&prng_state);
     742             : 
     743             :     /*
     744             :      * Build the object name. The name must not exceed NAMEDATALEN - 1. This
     745             :      * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
     746             :      * '\0').
     747             :      */
     748          10 :     objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
     749             : 
     750          10 :     return objname;
     751             : }
     752             : 
     753             : /*
     754             :  * Create the publications and replication slots in preparation for logical
     755             :  * replication. Returns the LSN from latest replication slot. It will be the
     756             :  * replication start point that is used to adjust the subscriptions (see
     757             :  * set_replication_progress).
     758             :  */
     759             : static char *
     760           6 : setup_publisher(struct LogicalRepInfo *dbinfo)
     761             : {
     762           6 :     char       *lsn = NULL;
     763             : 
     764           6 :     pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
     765             : 
     766          16 :     for (int i = 0; i < num_dbs; i++)
     767             :     {
     768             :         PGconn     *conn;
     769          10 :         char       *genname = NULL;
     770             : 
     771          10 :         conn = connect_database(dbinfo[i].pubconninfo, true);
     772             : 
     773             :         /*
     774             :          * If an object name was not specified as command-line options, assign
     775             :          * a generated object name. The replication slot has a different rule.
     776             :          * The subscription name is assigned to the replication slot name if
     777             :          * no replication slot is specified. It follows the same rule as
     778             :          * CREATE SUBSCRIPTION.
     779             :          */
     780          10 :         if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
     781          10 :             genname = generate_object_name(conn);
     782          10 :         if (num_pubs == 0)
     783           2 :             dbinfo[i].pubname = pg_strdup(genname);
     784          10 :         if (num_subs == 0)
     785           6 :             dbinfo[i].subname = pg_strdup(genname);
     786          10 :         if (num_replslots == 0)
     787           4 :             dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
     788             : 
     789             :         /*
     790             :          * Create publication on publisher. This step should be executed
     791             :          * *before* promoting the subscriber to avoid any transactions between
     792             :          * consistent LSN and the new publication rows (such transactions
     793             :          * wouldn't see the new publication rows resulting in an error).
     794             :          */
     795          10 :         create_publication(conn, &dbinfo[i]);
     796             : 
     797             :         /* Create replication slot on publisher */
     798          10 :         if (lsn)
     799           2 :             pg_free(lsn);
     800          10 :         lsn = create_logical_replication_slot(conn, &dbinfo[i]);
     801          10 :         if (lsn != NULL || dry_run)
     802          10 :             pg_log_info("create replication slot \"%s\" on publisher",
     803             :                         dbinfo[i].replslotname);
     804             :         else
     805           0 :             exit(1);
     806             : 
     807             :         /*
     808             :          * Since we are using the LSN returned by the last replication slot as
     809             :          * recovery_target_lsn, this LSN is ahead of the current WAL position
     810             :          * and the recovery waits until the publisher writes a WAL record to
     811             :          * reach the target and ends the recovery. On idle systems, this wait
     812             :          * time is unpredictable and could lead to failure in promoting the
     813             :          * subscriber. To avoid that, insert a harmless WAL record.
     814             :          */
     815          10 :         if (i == num_dbs - 1 && !dry_run)
     816             :         {
     817             :             PGresult   *res;
     818             : 
     819           2 :             res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
     820           2 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     821             :             {
     822           0 :                 pg_log_error("could not write an additional WAL record: %s",
     823             :                              PQresultErrorMessage(res));
     824           0 :                 disconnect_database(conn, true);
     825             :             }
     826           2 :             PQclear(res);
     827             :         }
     828             : 
     829          10 :         disconnect_database(conn, false);
     830             :     }
     831             : 
     832           6 :     return lsn;
     833             : }
     834             : 
     835             : /*
     836             :  * Is recovery still in progress?
     837             :  */
     838             : static bool
     839          30 : server_is_in_recovery(PGconn *conn)
     840             : {
     841             :     PGresult   *res;
     842             :     int         ret;
     843             : 
     844          30 :     res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
     845             : 
     846          30 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     847             :     {
     848           0 :         pg_log_error("could not obtain recovery progress: %s",
     849             :                      PQresultErrorMessage(res));
     850           0 :         disconnect_database(conn, true);
     851             :     }
     852             : 
     853             : 
     854          30 :     ret = strcmp("t", PQgetvalue(res, 0, 0));
     855             : 
     856          30 :     PQclear(res);
     857             : 
     858          30 :     return ret == 0;
     859             : }
     860             : 
     861             : /*
     862             :  * Is the primary server ready for logical replication?
     863             :  *
     864             :  * XXX Does it not allow a synchronous replica?
     865             :  */
     866             : static void
     867          10 : check_publisher(const struct LogicalRepInfo *dbinfo)
     868             : {
     869             :     PGconn     *conn;
     870             :     PGresult   *res;
     871          10 :     bool        failed = false;
     872             : 
     873             :     char       *wal_level;
     874             :     int         max_repslots;
     875             :     int         cur_repslots;
     876             :     int         max_walsenders;
     877             :     int         cur_walsenders;
     878             :     int         max_prepared_transactions;
     879             :     char       *max_slot_wal_keep_size;
     880             : 
     881          10 :     pg_log_info("checking settings on publisher");
     882             : 
     883          10 :     conn = connect_database(dbinfo[0].pubconninfo, true);
     884             : 
     885             :     /*
     886             :      * If the primary server is in recovery (i.e. cascading replication),
     887             :      * objects (publication) cannot be created because it is read only.
     888             :      */
     889          10 :     if (server_is_in_recovery(conn))
     890             :     {
     891           2 :         pg_log_error("primary server cannot be in recovery");
     892           2 :         disconnect_database(conn, true);
     893             :     }
     894             : 
     895             :     /*------------------------------------------------------------------------
     896             :      * Logical replication requires a few parameters to be set on publisher.
     897             :      * Since these parameters are not a requirement for physical replication,
     898             :      * we should check it to make sure it won't fail.
     899             :      *
     900             :      * - wal_level = logical
     901             :      * - max_replication_slots >= current + number of dbs to be converted
     902             :      * - max_wal_senders >= current + number of dbs to be converted
     903             :      * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
     904             :      * -----------------------------------------------------------------------
     905             :      */
     906           8 :     res = PQexec(conn,
     907             :                  "SELECT pg_catalog.current_setting('wal_level'),"
     908             :                  " pg_catalog.current_setting('max_replication_slots'),"
     909             :                  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
     910             :                  " pg_catalog.current_setting('max_wal_senders'),"
     911             :                  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
     912             :                  " pg_catalog.current_setting('max_prepared_transactions'),"
     913             :                  " pg_catalog.current_setting('max_slot_wal_keep_size')");
     914             : 
     915           8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     916             :     {
     917           0 :         pg_log_error("could not obtain publisher settings: %s",
     918             :                      PQresultErrorMessage(res));
     919           0 :         disconnect_database(conn, true);
     920             :     }
     921             : 
     922           8 :     wal_level = pg_strdup(PQgetvalue(res, 0, 0));
     923           8 :     max_repslots = atoi(PQgetvalue(res, 0, 1));
     924           8 :     cur_repslots = atoi(PQgetvalue(res, 0, 2));
     925           8 :     max_walsenders = atoi(PQgetvalue(res, 0, 3));
     926           8 :     cur_walsenders = atoi(PQgetvalue(res, 0, 4));
     927           8 :     max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
     928           8 :     max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
     929             : 
     930           8 :     PQclear(res);
     931             : 
     932           8 :     pg_log_debug("publisher: wal_level: %s", wal_level);
     933           8 :     pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
     934           8 :     pg_log_debug("publisher: current replication slots: %d", cur_repslots);
     935           8 :     pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
     936           8 :     pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
     937           8 :     pg_log_debug("publisher: max_prepared_transactions: %d",
     938             :                  max_prepared_transactions);
     939           8 :     pg_log_debug("publisher: max_slot_wal_keep_size: %s",
     940             :                  max_slot_wal_keep_size);
     941             : 
     942           8 :     disconnect_database(conn, false);
     943             : 
     944           8 :     if (strcmp(wal_level, "logical") != 0)
     945             :     {
     946           2 :         pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
     947           2 :         failed = true;
     948             :     }
     949             : 
     950           8 :     if (max_repslots - cur_repslots < num_dbs)
     951             :     {
     952           2 :         pg_log_error("publisher requires %d replication slots, but only %d remain",
     953             :                      num_dbs, max_repslots - cur_repslots);
     954           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
     955             :                           "max_replication_slots", cur_repslots + num_dbs);
     956           2 :         failed = true;
     957             :     }
     958             : 
     959           8 :     if (max_walsenders - cur_walsenders < num_dbs)
     960             :     {
     961           2 :         pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
     962             :                      num_dbs, max_walsenders - cur_walsenders);
     963           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
     964             :                           "max_wal_senders", cur_walsenders + num_dbs);
     965           2 :         failed = true;
     966             :     }
     967             : 
     968           8 :     if (max_prepared_transactions != 0 && !dbinfos.two_phase)
     969             :     {
     970           0 :         pg_log_warning("two_phase option will not be enabled for replication slots");
     971           0 :         pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
     972             :                               "Prepared transactions will be replicated at COMMIT PREPARED.");
     973           0 :         pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
     974             :     }
     975             : 
     976             :     /*
     977             :      * Validate 'max_slot_wal_keep_size'. If this parameter is set to a
     978             :      * non-default value, it may cause replication failures due to required
     979             :      * WAL files being prematurely removed.
     980             :      */
     981           8 :     if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
     982             :     {
     983           0 :         pg_log_warning("required WAL could be removed from the publisher");
     984           0 :         pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
     985             :                             "max_slot_wal_keep_size");
     986             :     }
     987             : 
     988           8 :     pg_free(wal_level);
     989             : 
     990           8 :     if (failed)
     991           2 :         exit(1);
     992           6 : }
     993             : 
     994             : /*
     995             :  * Is the standby server ready for logical replication?
     996             :  *
     997             :  * XXX Does it not allow a time-delayed replica?
     998             :  *
     999             :  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
    1000             :  * is S, it cannot detect there is a replica (server C) because server S starts
    1001             :  * accepting only local connections and server C cannot connect to it. Hence,
    1002             :  * there is not a reliable way to provide a suitable error saying the server C
    1003             :  * will be broken at the end of this process (due to pg_resetwal).
    1004             :  */
    1005             : static void
    1006          14 : check_subscriber(const struct LogicalRepInfo *dbinfo)
    1007             : {
    1008             :     PGconn     *conn;
    1009             :     PGresult   *res;
    1010          14 :     bool        failed = false;
    1011             : 
    1012             :     int         max_lrworkers;
    1013             :     int         max_reporigins;
    1014             :     int         max_wprocs;
    1015             : 
    1016          14 :     pg_log_info("checking settings on subscriber");
    1017             : 
    1018          14 :     conn = connect_database(dbinfo[0].subconninfo, true);
    1019             : 
    1020             :     /* The target server must be a standby */
    1021          14 :     if (!server_is_in_recovery(conn))
    1022             :     {
    1023           2 :         pg_log_error("target server must be a standby");
    1024           2 :         disconnect_database(conn, true);
    1025             :     }
    1026             : 
    1027             :     /*------------------------------------------------------------------------
    1028             :      * Logical replication requires a few parameters to be set on subscriber.
    1029             :      * Since these parameters are not a requirement for physical replication,
    1030             :      * we should check it to make sure it won't fail.
    1031             :      *
    1032             :      * - max_active_replication_origins >= number of dbs to be converted
    1033             :      * - max_logical_replication_workers >= number of dbs to be converted
    1034             :      * - max_worker_processes >= 1 + number of dbs to be converted
    1035             :      *------------------------------------------------------------------------
    1036             :      */
    1037          12 :     res = PQexec(conn,
    1038             :                  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
    1039             :                  "'max_logical_replication_workers', "
    1040             :                  "'max_active_replication_origins', "
    1041             :                  "'max_worker_processes', "
    1042             :                  "'primary_slot_name') "
    1043             :                  "ORDER BY name");
    1044             : 
    1045          12 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1046             :     {
    1047           0 :         pg_log_error("could not obtain subscriber settings: %s",
    1048             :                      PQresultErrorMessage(res));
    1049           0 :         disconnect_database(conn, true);
    1050             :     }
    1051             : 
    1052          12 :     max_reporigins = atoi(PQgetvalue(res, 0, 0));
    1053          12 :     max_lrworkers = atoi(PQgetvalue(res, 1, 0));
    1054          12 :     max_wprocs = atoi(PQgetvalue(res, 2, 0));
    1055          12 :     if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
    1056          10 :         primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
    1057             : 
    1058          12 :     pg_log_debug("subscriber: max_logical_replication_workers: %d",
    1059             :                  max_lrworkers);
    1060          12 :     pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
    1061          12 :     pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
    1062          12 :     if (primary_slot_name)
    1063          10 :         pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
    1064             : 
    1065          12 :     PQclear(res);
    1066             : 
    1067          12 :     disconnect_database(conn, false);
    1068             : 
    1069          12 :     if (max_reporigins < num_dbs)
    1070             :     {
    1071           2 :         pg_log_error("subscriber requires %d active replication origins, but only %d remain",
    1072             :                      num_dbs, max_reporigins);
    1073           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1074             :                           "max_active_replication_origins", num_dbs);
    1075           2 :         failed = true;
    1076             :     }
    1077             : 
    1078          12 :     if (max_lrworkers < num_dbs)
    1079             :     {
    1080           2 :         pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
    1081             :                      num_dbs, max_lrworkers);
    1082           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1083             :                           "max_logical_replication_workers", num_dbs);
    1084           2 :         failed = true;
    1085             :     }
    1086             : 
    1087          12 :     if (max_wprocs < num_dbs + 1)
    1088             :     {
    1089           2 :         pg_log_error("subscriber requires %d worker processes, but only %d remain",
    1090             :                      num_dbs + 1, max_wprocs);
    1091           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1092             :                           "max_worker_processes", num_dbs + 1);
    1093           2 :         failed = true;
    1094             :     }
    1095             : 
    1096          12 :     if (failed)
    1097           2 :         exit(1);
    1098          10 : }
    1099             : 
    1100             : /*
    1101             :  * Drop a specified subscription. This is to avoid duplicate subscriptions on
    1102             :  * the primary (publisher node) and the newly created subscriber. We
    1103             :  * shouldn't drop the associated slot as that would be used by the publisher
    1104             :  * node.
    1105             :  */
    1106             : static void
    1107           6 : drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
    1108             : {
    1109           6 :     PQExpBuffer query = createPQExpBuffer();
    1110             :     PGresult   *res;
    1111             : 
    1112             :     Assert(conn != NULL);
    1113             : 
    1114             :     /*
    1115             :      * Construct a query string. These commands are allowed to be executed
    1116             :      * within a transaction.
    1117             :      */
    1118           6 :     appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
    1119             :                       subname);
    1120           6 :     appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
    1121             :                       subname);
    1122           6 :     appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
    1123             : 
    1124           6 :     pg_log_info("dropping subscription \"%s\" in database \"%s\"",
    1125             :                 subname, dbname);
    1126             : 
    1127           6 :     if (!dry_run)
    1128             :     {
    1129           2 :         res = PQexec(conn, query->data);
    1130             : 
    1131           2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1132             :         {
    1133           0 :             pg_log_error("could not drop subscription \"%s\": %s",
    1134             :                          subname, PQresultErrorMessage(res));
    1135           0 :             disconnect_database(conn, true);
    1136             :         }
    1137             : 
    1138           2 :         PQclear(res);
    1139             :     }
    1140             : 
    1141           6 :     destroyPQExpBuffer(query);
    1142           6 : }
    1143             : 
    1144             : /*
    1145             :  * Retrieve and drop the pre-existing subscriptions.
    1146             :  */
    1147             : static void
    1148          10 : check_and_drop_existing_subscriptions(PGconn *conn,
    1149             :                                       const struct LogicalRepInfo *dbinfo)
    1150             : {
    1151          10 :     PQExpBuffer query = createPQExpBuffer();
    1152             :     char       *dbname;
    1153             :     PGresult   *res;
    1154             : 
    1155             :     Assert(conn != NULL);
    1156             : 
    1157          10 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1158             : 
    1159          10 :     appendPQExpBuffer(query,
    1160             :                       "SELECT s.subname FROM pg_catalog.pg_subscription s "
    1161             :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1162             :                       "WHERE d.datname = %s",
    1163             :                       dbname);
    1164          10 :     res = PQexec(conn, query->data);
    1165             : 
    1166          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1167             :     {
    1168           0 :         pg_log_error("could not obtain pre-existing subscriptions: %s",
    1169             :                      PQresultErrorMessage(res));
    1170           0 :         disconnect_database(conn, true);
    1171             :     }
    1172             : 
    1173          16 :     for (int i = 0; i < PQntuples(res); i++)
    1174           6 :         drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
    1175           6 :                                     dbinfo->dbname);
    1176             : 
    1177          10 :     PQclear(res);
    1178          10 :     destroyPQExpBuffer(query);
    1179          10 :     PQfreemem(dbname);
    1180          10 : }
    1181             : 
    1182             : /*
    1183             :  * Create the subscriptions, adjust the initial location for logical
    1184             :  * replication and enable the subscriptions. That's the last step for logical
    1185             :  * replication setup.
    1186             :  */
    1187             : static void
    1188           6 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
    1189             : {
    1190          16 :     for (int i = 0; i < num_dbs; i++)
    1191             :     {
    1192             :         PGconn     *conn;
    1193             : 
    1194             :         /* Connect to subscriber. */
    1195          10 :         conn = connect_database(dbinfo[i].subconninfo, true);
    1196             : 
    1197             :         /*
    1198             :          * We don't need the pre-existing subscriptions on the newly formed
    1199             :          * subscriber. They can connect to other publisher nodes and either
    1200             :          * get some unwarranted data or can lead to ERRORs in connecting to
    1201             :          * such nodes.
    1202             :          */
    1203          10 :         check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
    1204             : 
    1205             :         /* Check and drop the required publications in the given database. */
    1206          10 :         check_and_drop_publications(conn, &dbinfo[i]);
    1207             : 
    1208          10 :         create_subscription(conn, &dbinfo[i]);
    1209             : 
    1210             :         /* Set the replication progress to the correct LSN */
    1211          10 :         set_replication_progress(conn, &dbinfo[i], consistent_lsn);
    1212             : 
    1213             :         /* Enable subscription */
    1214          10 :         enable_subscription(conn, &dbinfo[i]);
    1215             : 
    1216          10 :         disconnect_database(conn, false);
    1217             :     }
    1218           6 : }
    1219             : 
    1220             : /*
    1221             :  * Write the required recovery parameters.
    1222             :  */
    1223             : static void
    1224           6 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
    1225             : {
    1226             :     PGconn     *conn;
    1227             :     PQExpBuffer recoveryconfcontents;
    1228             : 
    1229             :     /*
    1230             :      * Despite of the recovery parameters will be written to the subscriber,
    1231             :      * use a publisher connection. The primary_conninfo is generated using the
    1232             :      * connection settings.
    1233             :      */
    1234           6 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1235             : 
    1236             :     /*
    1237             :      * Write recovery parameters.
    1238             :      *
    1239             :      * The subscriber is not running yet. In dry run mode, the recovery
    1240             :      * parameters *won't* be written. An invalid LSN is used for printing
    1241             :      * purposes. Additional recovery parameters are added here. It avoids
    1242             :      * unexpected behavior such as end of recovery as soon as a consistent
    1243             :      * state is reached (recovery_target) and failure due to multiple recovery
    1244             :      * targets (name, time, xid, LSN).
    1245             :      */
    1246           6 :     recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
    1247           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
    1248           6 :     appendPQExpBuffer(recoveryconfcontents,
    1249             :                       "recovery_target_timeline = 'latest'\n");
    1250           6 :     appendPQExpBuffer(recoveryconfcontents,
    1251             :                       "recovery_target_inclusive = true\n");
    1252           6 :     appendPQExpBuffer(recoveryconfcontents,
    1253             :                       "recovery_target_action = promote\n");
    1254           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
    1255           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
    1256           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
    1257             : 
    1258           6 :     if (dry_run)
    1259             :     {
    1260           4 :         appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
    1261           4 :         appendPQExpBuffer(recoveryconfcontents,
    1262             :                           "recovery_target_lsn = '%X/%X'\n",
    1263           4 :                           LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1264             :     }
    1265             :     else
    1266             :     {
    1267           2 :         appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
    1268             :                           lsn);
    1269           2 :         WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
    1270             :     }
    1271           6 :     disconnect_database(conn, false);
    1272             : 
    1273           6 :     pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
    1274           6 : }
    1275             : 
    1276             : /*
    1277             :  * Drop physical replication slot on primary if the standby was using it. After
    1278             :  * the transformation, it has no use.
    1279             :  *
    1280             :  * XXX we might not fail here. Instead, we provide a warning so the user
    1281             :  * eventually drops this replication slot later.
    1282             :  */
    1283             : static void
    1284           6 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
    1285             : {
    1286             :     PGconn     *conn;
    1287             : 
    1288             :     /* Replication slot does not exist, do nothing */
    1289           6 :     if (!primary_slot_name)
    1290           0 :         return;
    1291             : 
    1292           6 :     conn = connect_database(dbinfo[0].pubconninfo, false);
    1293           6 :     if (conn != NULL)
    1294             :     {
    1295           6 :         drop_replication_slot(conn, &dbinfo[0], slotname);
    1296           6 :         disconnect_database(conn, false);
    1297             :     }
    1298             :     else
    1299             :     {
    1300           0 :         pg_log_warning("could not drop replication slot \"%s\" on primary",
    1301             :                        slotname);
    1302           0 :         pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
    1303             :     }
    1304             : }
    1305             : 
    1306             : /*
    1307             :  * Drop failover replication slots on subscriber. After the transformation,
    1308             :  * they have no use.
    1309             :  *
    1310             :  * XXX We do not fail here. Instead, we provide a warning so the user can drop
    1311             :  * them later.
    1312             :  */
    1313             : static void
    1314           6 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
    1315             : {
    1316             :     PGconn     *conn;
    1317             :     PGresult   *res;
    1318             : 
    1319           6 :     conn = connect_database(dbinfo[0].subconninfo, false);
    1320           6 :     if (conn != NULL)
    1321             :     {
    1322             :         /* Get failover replication slot names */
    1323           6 :         res = PQexec(conn,
    1324             :                      "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
    1325             : 
    1326           6 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
    1327             :         {
    1328             :             /* Remove failover replication slots from subscriber */
    1329          12 :             for (int i = 0; i < PQntuples(res); i++)
    1330           6 :                 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
    1331             :         }
    1332             :         else
    1333             :         {
    1334           0 :             pg_log_warning("could not obtain failover replication slot information: %s",
    1335             :                            PQresultErrorMessage(res));
    1336           0 :             pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1337             :         }
    1338             : 
    1339           6 :         PQclear(res);
    1340           6 :         disconnect_database(conn, false);
    1341             :     }
    1342             :     else
    1343             :     {
    1344           0 :         pg_log_warning("could not drop failover replication slot");
    1345           0 :         pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1346             :     }
    1347           6 : }
    1348             : 
    1349             : /*
    1350             :  * Create a logical replication slot and returns a LSN.
    1351             :  *
    1352             :  * CreateReplicationSlot() is not used because it does not provide the one-row
    1353             :  * result set that contains the LSN.
    1354             :  */
    1355             : static char *
    1356          10 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1357             : {
    1358          10 :     PQExpBuffer str = createPQExpBuffer();
    1359          10 :     PGresult   *res = NULL;
    1360          10 :     const char *slot_name = dbinfo->replslotname;
    1361             :     char       *slot_name_esc;
    1362          10 :     char       *lsn = NULL;
    1363             : 
    1364             :     Assert(conn != NULL);
    1365             : 
    1366          10 :     pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
    1367             :                 slot_name, dbinfo->dbname);
    1368             : 
    1369          10 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1370             : 
    1371          10 :     appendPQExpBuffer(str,
    1372             :                       "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
    1373             :                       slot_name_esc,
    1374          10 :                       dbinfos.two_phase ? "true" : "false");
    1375             : 
    1376          10 :     PQfreemem(slot_name_esc);
    1377             : 
    1378          10 :     pg_log_debug("command is: %s", str->data);
    1379             : 
    1380          10 :     if (!dry_run)
    1381             :     {
    1382           4 :         res = PQexec(conn, str->data);
    1383           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1384             :         {
    1385           0 :             pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
    1386             :                          slot_name, dbinfo->dbname,
    1387             :                          PQresultErrorMessage(res));
    1388           0 :             PQclear(res);
    1389           0 :             destroyPQExpBuffer(str);
    1390           0 :             return NULL;
    1391             :         }
    1392             : 
    1393           4 :         lsn = pg_strdup(PQgetvalue(res, 0, 0));
    1394           4 :         PQclear(res);
    1395             :     }
    1396             : 
    1397             :     /* For cleanup purposes */
    1398          10 :     dbinfo->made_replslot = true;
    1399             : 
    1400          10 :     destroyPQExpBuffer(str);
    1401             : 
    1402          10 :     return lsn;
    1403             : }
    1404             : 
    1405             : static void
    1406          12 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
    1407             :                       const char *slot_name)
    1408             : {
    1409          12 :     PQExpBuffer str = createPQExpBuffer();
    1410             :     char       *slot_name_esc;
    1411             :     PGresult   *res;
    1412             : 
    1413             :     Assert(conn != NULL);
    1414             : 
    1415          12 :     pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
    1416             :                 slot_name, dbinfo->dbname);
    1417             : 
    1418          12 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1419             : 
    1420          12 :     appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
    1421             : 
    1422          12 :     PQfreemem(slot_name_esc);
    1423             : 
    1424          12 :     pg_log_debug("command is: %s", str->data);
    1425             : 
    1426          12 :     if (!dry_run)
    1427             :     {
    1428           4 :         res = PQexec(conn, str->data);
    1429           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1430             :         {
    1431           0 :             pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
    1432             :                          slot_name, dbinfo->dbname, PQresultErrorMessage(res));
    1433           0 :             dbinfo->made_replslot = false;   /* don't try again. */
    1434             :         }
    1435             : 
    1436           4 :         PQclear(res);
    1437             :     }
    1438             : 
    1439          12 :     destroyPQExpBuffer(str);
    1440          12 : }
    1441             : 
    1442             : /*
    1443             :  * Reports a suitable message if pg_ctl fails.
    1444             :  */
    1445             : static void
    1446          40 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
    1447             : {
    1448          40 :     if (rc != 0)
    1449             :     {
    1450           0 :         if (WIFEXITED(rc))
    1451             :         {
    1452           0 :             pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
    1453             :         }
    1454           0 :         else if (WIFSIGNALED(rc))
    1455             :         {
    1456             : #if defined(WIN32)
    1457             :             pg_log_error("pg_ctl was terminated by exception 0x%X",
    1458             :                          WTERMSIG(rc));
    1459             :             pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
    1460             : #else
    1461           0 :             pg_log_error("pg_ctl was terminated by signal %d: %s",
    1462             :                          WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
    1463             : #endif
    1464             :         }
    1465             :         else
    1466             :         {
    1467           0 :             pg_log_error("pg_ctl exited with unrecognized status %d", rc);
    1468             :         }
    1469             : 
    1470           0 :         pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
    1471           0 :         exit(1);
    1472             :     }
    1473          40 : }
    1474             : 
    1475             : static void
    1476          20 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
    1477             :                      bool restrict_logical_worker)
    1478             : {
    1479          20 :     PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    1480             :     int         rc;
    1481             : 
    1482          20 :     appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
    1483          20 :     appendShellString(pg_ctl_cmd, subscriber_dir);
    1484          20 :     appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
    1485             : 
    1486             :     /* Prevent unintended slot invalidation */
    1487          20 :     appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
    1488             : 
    1489          20 :     if (restricted_access)
    1490             :     {
    1491          20 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
    1492             : #if !defined(WIN32)
    1493             : 
    1494             :         /*
    1495             :          * An empty listen_addresses list means the server does not listen on
    1496             :          * any IP interfaces; only Unix-domain sockets can be used to connect
    1497             :          * to the server. Prevent external connections to minimize the chance
    1498             :          * of failure.
    1499             :          */
    1500          20 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
    1501          20 :         if (opt->socket_dir)
    1502          20 :             appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
    1503             :                               opt->socket_dir);
    1504          20 :         appendPQExpBufferChar(pg_ctl_cmd, '"');
    1505             : #endif
    1506             :     }
    1507          20 :     if (opt->config_file != NULL)
    1508           0 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
    1509             :                           opt->config_file);
    1510             : 
    1511             :     /* Suppress to start logical replication if requested */
    1512          20 :     if (restrict_logical_worker)
    1513           6 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
    1514             : 
    1515          20 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
    1516          20 :     rc = system(pg_ctl_cmd->data);
    1517          20 :     pg_ctl_status(pg_ctl_cmd->data, rc);
    1518          20 :     standby_running = true;
    1519          20 :     destroyPQExpBuffer(pg_ctl_cmd);
    1520          20 :     pg_log_info("server was started");
    1521          20 : }
    1522             : 
    1523             : static void
    1524          20 : stop_standby_server(const char *datadir)
    1525             : {
    1526             :     char       *pg_ctl_cmd;
    1527             :     int         rc;
    1528             : 
    1529          20 :     pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
    1530             :                           datadir);
    1531          20 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
    1532          20 :     rc = system(pg_ctl_cmd);
    1533          20 :     pg_ctl_status(pg_ctl_cmd, rc);
    1534          20 :     standby_running = false;
    1535          20 :     pg_log_info("server was stopped");
    1536          20 : }
    1537             : 
    1538             : /*
    1539             :  * Returns after the server finishes the recovery process.
    1540             :  *
    1541             :  * If recovery_timeout option is set, terminate abnormally without finishing
    1542             :  * the recovery process. By default, it waits forever.
    1543             :  *
    1544             :  * XXX Is the recovery process still in progress? When recovery process has a
    1545             :  * better progress reporting mechanism, it should be added here.
    1546             :  */
    1547             : static void
    1548           6 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
    1549             : {
    1550             :     PGconn     *conn;
    1551           6 :     int         status = POSTMASTER_STILL_STARTING;
    1552           6 :     int         timer = 0;
    1553             : 
    1554           6 :     pg_log_info("waiting for the target server to reach the consistent state");
    1555             : 
    1556           6 :     conn = connect_database(conninfo, true);
    1557             : 
    1558             :     for (;;)
    1559           0 :     {
    1560           6 :         bool        in_recovery = server_is_in_recovery(conn);
    1561             : 
    1562             :         /*
    1563             :          * Does the recovery process finish? In dry run mode, there is no
    1564             :          * recovery mode. Bail out as the recovery process has ended.
    1565             :          */
    1566           6 :         if (!in_recovery || dry_run)
    1567             :         {
    1568           6 :             status = POSTMASTER_READY;
    1569           6 :             recovery_ended = true;
    1570           6 :             break;
    1571             :         }
    1572             : 
    1573             :         /* Bail out after recovery_timeout seconds if this option is set */
    1574           0 :         if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
    1575             :         {
    1576           0 :             stop_standby_server(subscriber_dir);
    1577           0 :             pg_log_error("recovery timed out");
    1578           0 :             disconnect_database(conn, true);
    1579             :         }
    1580             : 
    1581             :         /* Keep waiting */
    1582           0 :         pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
    1583             : 
    1584           0 :         timer += WAIT_INTERVAL;
    1585             :     }
    1586             : 
    1587           6 :     disconnect_database(conn, false);
    1588             : 
    1589           6 :     if (status == POSTMASTER_STILL_STARTING)
    1590           0 :         pg_fatal("server did not end recovery");
    1591             : 
    1592           6 :     pg_log_info("target server reached the consistent state");
    1593           6 :     pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
    1594           6 : }
    1595             : 
    1596             : /*
    1597             :  * Create a publication that includes all tables in the database.
    1598             :  */
    1599             : static void
    1600          10 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1601             : {
    1602          10 :     PQExpBuffer str = createPQExpBuffer();
    1603             :     PGresult   *res;
    1604             :     char       *ipubname_esc;
    1605             :     char       *spubname_esc;
    1606             : 
    1607             :     Assert(conn != NULL);
    1608             : 
    1609          10 :     ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1610          10 :     spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1611             : 
    1612             :     /* Check if the publication already exists */
    1613          10 :     appendPQExpBuffer(str,
    1614             :                       "SELECT 1 FROM pg_catalog.pg_publication "
    1615             :                       "WHERE pubname = %s",
    1616             :                       spubname_esc);
    1617          10 :     res = PQexec(conn, str->data);
    1618          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1619             :     {
    1620           0 :         pg_log_error("could not obtain publication information: %s",
    1621             :                      PQresultErrorMessage(res));
    1622           0 :         disconnect_database(conn, true);
    1623             :     }
    1624             : 
    1625          10 :     if (PQntuples(res) == 1)
    1626             :     {
    1627             :         /*
    1628             :          * Unfortunately, if it reaches this code path, it will always fail
    1629             :          * (unless you decide to change the existing publication name). That's
    1630             :          * bad but it is very unlikely that the user will choose a name with
    1631             :          * pg_createsubscriber_ prefix followed by the exact database oid and
    1632             :          * a random number.
    1633             :          */
    1634           0 :         pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
    1635           0 :         pg_log_error_hint("Consider renaming this publication before continuing.");
    1636           0 :         disconnect_database(conn, true);
    1637             :     }
    1638             : 
    1639          10 :     PQclear(res);
    1640          10 :     resetPQExpBuffer(str);
    1641             : 
    1642          10 :     pg_log_info("creating publication \"%s\" in database \"%s\"",
    1643             :                 dbinfo->pubname, dbinfo->dbname);
    1644             : 
    1645          10 :     appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
    1646             :                       ipubname_esc);
    1647             : 
    1648          10 :     pg_log_debug("command is: %s", str->data);
    1649             : 
    1650          10 :     if (!dry_run)
    1651             :     {
    1652           4 :         res = PQexec(conn, str->data);
    1653           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1654             :         {
    1655           0 :             pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
    1656             :                          dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1657           0 :             disconnect_database(conn, true);
    1658             :         }
    1659           4 :         PQclear(res);
    1660             :     }
    1661             : 
    1662             :     /* For cleanup purposes */
    1663          10 :     dbinfo->made_publication = true;
    1664             : 
    1665          10 :     PQfreemem(ipubname_esc);
    1666          10 :     PQfreemem(spubname_esc);
    1667          10 :     destroyPQExpBuffer(str);
    1668          10 : }
    1669             : 
    1670             : /*
    1671             :  * Drop the specified publication in the given database.
    1672             :  */
    1673             : static void
    1674          14 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
    1675             :                  bool *made_publication)
    1676             : {
    1677          14 :     PQExpBuffer str = createPQExpBuffer();
    1678             :     PGresult   *res;
    1679             :     char       *pubname_esc;
    1680             : 
    1681             :     Assert(conn != NULL);
    1682             : 
    1683          14 :     pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
    1684             : 
    1685          14 :     pg_log_info("dropping publication \"%s\" in database \"%s\"",
    1686             :                 pubname, dbname);
    1687             : 
    1688          14 :     appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
    1689             : 
    1690          14 :     PQfreemem(pubname_esc);
    1691             : 
    1692          14 :     pg_log_debug("command is: %s", str->data);
    1693             : 
    1694          14 :     if (!dry_run)
    1695             :     {
    1696           8 :         res = PQexec(conn, str->data);
    1697           8 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1698             :         {
    1699           0 :             pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
    1700             :                          pubname, dbname, PQresultErrorMessage(res));
    1701           0 :             *made_publication = false;  /* don't try again. */
    1702             : 
    1703             :             /*
    1704             :              * Don't disconnect and exit here. This routine is used by primary
    1705             :              * (cleanup publication / replication slot due to an error) and
    1706             :              * subscriber (remove the replicated publications). In both cases,
    1707             :              * it can continue and provide instructions for the user to remove
    1708             :              * it later if cleanup fails.
    1709             :              */
    1710             :         }
    1711           8 :         PQclear(res);
    1712             :     }
    1713             : 
    1714          14 :     destroyPQExpBuffer(str);
    1715          14 : }
    1716             : 
    1717             : /*
    1718             :  * Retrieve and drop the publications.
    1719             :  *
    1720             :  * Since the publications were created before the consistent LSN, they
    1721             :  * remain on the subscriber even after the physical replica is
    1722             :  * promoted. Remove these publications from the subscriber because
    1723             :  * they have no use. Additionally, if requested, drop all pre-existing
    1724             :  * publications.
    1725             :  */
    1726             : static void
    1727          10 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1728             : {
    1729             :     PGresult   *res;
    1730          10 :     bool        drop_all_pubs = dbinfos.objecttypes_to_remove & OBJECTTYPE_PUBLICATIONS;
    1731             : 
    1732             :     Assert(conn != NULL);
    1733             : 
    1734          10 :     if (drop_all_pubs)
    1735             :     {
    1736           4 :         pg_log_info("dropping all existing publications in database \"%s\"",
    1737             :                     dbinfo->dbname);
    1738             : 
    1739             :         /* Fetch all publication names */
    1740           4 :         res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
    1741           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1742             :         {
    1743           0 :             pg_log_error("could not obtain publication information: %s",
    1744             :                          PQresultErrorMessage(res));
    1745           0 :             PQclear(res);
    1746           0 :             disconnect_database(conn, true);
    1747             :         }
    1748             : 
    1749             :         /* Drop each publication */
    1750          12 :         for (int i = 0; i < PQntuples(res); i++)
    1751           8 :             drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
    1752             :                              &dbinfo->made_publication);
    1753             : 
    1754           4 :         PQclear(res);
    1755             :     }
    1756             : 
    1757             :     /*
    1758             :      * In dry-run mode, we don't create publications, but we still try to drop
    1759             :      * those to provide necessary information to the user.
    1760             :      */
    1761          10 :     if (!drop_all_pubs || dry_run)
    1762           6 :         drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
    1763             :                          &dbinfo->made_publication);
    1764          10 : }
    1765             : 
    1766             : /*
    1767             :  * Create a subscription with some predefined options.
    1768             :  *
    1769             :  * A replication slot was already created in a previous step. Let's use it.  It
    1770             :  * is not required to copy data. The subscription will be created but it will
    1771             :  * not be enabled now. That's because the replication progress must be set and
    1772             :  * the replication origin name (one of the function arguments) contains the
    1773             :  * subscription OID in its name. Once the subscription is created,
    1774             :  * set_replication_progress() can obtain the chosen origin name and set up its
    1775             :  * initial location.
    1776             :  */
    1777             : static void
    1778          10 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1779             : {
    1780          10 :     PQExpBuffer str = createPQExpBuffer();
    1781             :     PGresult   *res;
    1782             :     char       *pubname_esc;
    1783             :     char       *subname_esc;
    1784             :     char       *pubconninfo_esc;
    1785             :     char       *replslotname_esc;
    1786             : 
    1787             :     Assert(conn != NULL);
    1788             : 
    1789          10 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1790          10 :     subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1791          10 :     pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
    1792          10 :     replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
    1793             : 
    1794          10 :     pg_log_info("creating subscription \"%s\" in database \"%s\"",
    1795             :                 dbinfo->subname, dbinfo->dbname);
    1796             : 
    1797          10 :     appendPQExpBuffer(str,
    1798             :                       "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
    1799             :                       "WITH (create_slot = false, enabled = false, "
    1800             :                       "slot_name = %s, copy_data = false, two_phase = %s)",
    1801             :                       subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
    1802          10 :                       dbinfos.two_phase ? "true" : "false");
    1803             : 
    1804          10 :     PQfreemem(pubname_esc);
    1805          10 :     PQfreemem(subname_esc);
    1806          10 :     PQfreemem(pubconninfo_esc);
    1807          10 :     PQfreemem(replslotname_esc);
    1808             : 
    1809          10 :     pg_log_debug("command is: %s", str->data);
    1810             : 
    1811          10 :     if (!dry_run)
    1812             :     {
    1813           4 :         res = PQexec(conn, str->data);
    1814           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1815             :         {
    1816           0 :             pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
    1817             :                          dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
    1818           0 :             disconnect_database(conn, true);
    1819             :         }
    1820           4 :         PQclear(res);
    1821             :     }
    1822             : 
    1823          10 :     destroyPQExpBuffer(str);
    1824          10 : }
    1825             : 
    1826             : /*
    1827             :  * Sets the replication progress to the consistent LSN.
    1828             :  *
    1829             :  * The subscriber caught up to the consistent LSN provided by the last
    1830             :  * replication slot that was created. The goal is to set up the initial
    1831             :  * location for the logical replication that is the exact LSN that the
    1832             :  * subscriber was promoted. Once the subscription is enabled it will start
    1833             :  * streaming from that location onwards.  In dry run mode, the subscription OID
    1834             :  * and LSN are set to invalid values for printing purposes.
    1835             :  */
    1836             : static void
    1837          10 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
    1838             : {
    1839          10 :     PQExpBuffer str = createPQExpBuffer();
    1840             :     PGresult   *res;
    1841             :     Oid         suboid;
    1842             :     char       *subname;
    1843             :     char       *dbname;
    1844             :     char       *originname;
    1845             :     char       *lsnstr;
    1846             : 
    1847             :     Assert(conn != NULL);
    1848             : 
    1849          10 :     subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
    1850          10 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1851             : 
    1852          10 :     appendPQExpBuffer(str,
    1853             :                       "SELECT s.oid FROM pg_catalog.pg_subscription s "
    1854             :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1855             :                       "WHERE s.subname = %s AND d.datname = %s",
    1856             :                       subname, dbname);
    1857             : 
    1858          10 :     res = PQexec(conn, str->data);
    1859          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1860             :     {
    1861           0 :         pg_log_error("could not obtain subscription OID: %s",
    1862             :                      PQresultErrorMessage(res));
    1863           0 :         disconnect_database(conn, true);
    1864             :     }
    1865             : 
    1866          10 :     if (PQntuples(res) != 1 && !dry_run)
    1867             :     {
    1868           0 :         pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
    1869             :                      PQntuples(res), 1);
    1870           0 :         disconnect_database(conn, true);
    1871             :     }
    1872             : 
    1873          10 :     if (dry_run)
    1874             :     {
    1875           6 :         suboid = InvalidOid;
    1876           6 :         lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1877             :     }
    1878             :     else
    1879             :     {
    1880           4 :         suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
    1881           4 :         lsnstr = psprintf("%s", lsn);
    1882             :     }
    1883             : 
    1884          10 :     PQclear(res);
    1885             : 
    1886             :     /*
    1887             :      * The origin name is defined as pg_%u. %u is the subscription OID. See
    1888             :      * ApplyWorkerMain().
    1889             :      */
    1890          10 :     originname = psprintf("pg_%u", suboid);
    1891             : 
    1892          10 :     pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    1893             :                 originname, lsnstr, dbinfo->dbname);
    1894             : 
    1895          10 :     resetPQExpBuffer(str);
    1896          10 :     appendPQExpBuffer(str,
    1897             :                       "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
    1898             :                       originname, lsnstr);
    1899             : 
    1900          10 :     pg_log_debug("command is: %s", str->data);
    1901             : 
    1902          10 :     if (!dry_run)
    1903             :     {
    1904           4 :         res = PQexec(conn, str->data);
    1905           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1906             :         {
    1907           0 :             pg_log_error("could not set replication progress for subscription \"%s\": %s",
    1908             :                          dbinfo->subname, PQresultErrorMessage(res));
    1909           0 :             disconnect_database(conn, true);
    1910             :         }
    1911           4 :         PQclear(res);
    1912             :     }
    1913             : 
    1914          10 :     PQfreemem(subname);
    1915          10 :     PQfreemem(dbname);
    1916          10 :     pg_free(originname);
    1917          10 :     pg_free(lsnstr);
    1918          10 :     destroyPQExpBuffer(str);
    1919          10 : }
    1920             : 
    1921             : /*
    1922             :  * Enables the subscription.
    1923             :  *
    1924             :  * The subscription was created in a previous step but it was disabled. After
    1925             :  * adjusting the initial logical replication location, enable the subscription.
    1926             :  */
    1927             : static void
    1928          10 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1929             : {
    1930          10 :     PQExpBuffer str = createPQExpBuffer();
    1931             :     PGresult   *res;
    1932             :     char       *subname;
    1933             : 
    1934             :     Assert(conn != NULL);
    1935             : 
    1936          10 :     subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1937             : 
    1938          10 :     pg_log_info("enabling subscription \"%s\" in database \"%s\"",
    1939             :                 dbinfo->subname, dbinfo->dbname);
    1940             : 
    1941          10 :     appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
    1942             : 
    1943          10 :     pg_log_debug("command is: %s", str->data);
    1944             : 
    1945          10 :     if (!dry_run)
    1946             :     {
    1947           4 :         res = PQexec(conn, str->data);
    1948           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1949             :         {
    1950           0 :             pg_log_error("could not enable subscription \"%s\": %s",
    1951             :                          dbinfo->subname, PQresultErrorMessage(res));
    1952           0 :             disconnect_database(conn, true);
    1953             :         }
    1954             : 
    1955           4 :         PQclear(res);
    1956             :     }
    1957             : 
    1958          10 :     PQfreemem(subname);
    1959          10 :     destroyPQExpBuffer(str);
    1960          10 : }
    1961             : 
    1962             : int
    1963          40 : main(int argc, char **argv)
    1964             : {
    1965             :     static struct option long_options[] =
    1966             :     {
    1967             :         {"database", required_argument, NULL, 'd'},
    1968             :         {"pgdata", required_argument, NULL, 'D'},
    1969             :         {"dry-run", no_argument, NULL, 'n'},
    1970             :         {"subscriber-port", required_argument, NULL, 'p'},
    1971             :         {"publisher-server", required_argument, NULL, 'P'},
    1972             :         {"remove", required_argument, NULL, 'R'},
    1973             :         {"socketdir", required_argument, NULL, 's'},
    1974             :         {"recovery-timeout", required_argument, NULL, 't'},
    1975             :         {"enable-two-phase", no_argument, NULL, 'T'},
    1976             :         {"subscriber-username", required_argument, NULL, 'U'},
    1977             :         {"verbose", no_argument, NULL, 'v'},
    1978             :         {"version", no_argument, NULL, 'V'},
    1979             :         {"help", no_argument, NULL, '?'},
    1980             :         {"config-file", required_argument, NULL, 1},
    1981             :         {"publication", required_argument, NULL, 2},
    1982             :         {"replication-slot", required_argument, NULL, 3},
    1983             :         {"subscription", required_argument, NULL, 4},
    1984             :         {NULL, 0, NULL, 0}
    1985             :     };
    1986             : 
    1987          40 :     struct CreateSubscriberOptions opt = {0};
    1988             : 
    1989             :     int         c;
    1990             :     int         option_index;
    1991             : 
    1992             :     char       *pub_base_conninfo;
    1993             :     char       *sub_base_conninfo;
    1994          40 :     char       *dbname_conninfo = NULL;
    1995             : 
    1996             :     uint64      pub_sysid;
    1997             :     uint64      sub_sysid;
    1998             :     struct stat statbuf;
    1999             : 
    2000             :     char       *consistent_lsn;
    2001             : 
    2002             :     char        pidfile[MAXPGPATH];
    2003             : 
    2004          40 :     pg_logging_init(argv[0]);
    2005          40 :     pg_logging_set_level(PG_LOG_WARNING);
    2006          40 :     progname = get_progname(argv[0]);
    2007          40 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
    2008             : 
    2009          40 :     if (argc > 1)
    2010             :     {
    2011          38 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
    2012             :         {
    2013           2 :             usage();
    2014           2 :             exit(0);
    2015             :         }
    2016          36 :         else if (strcmp(argv[1], "-V") == 0
    2017          36 :                  || strcmp(argv[1], "--version") == 0)
    2018             :         {
    2019           2 :             puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
    2020           2 :             exit(0);
    2021             :         }
    2022             :     }
    2023             : 
    2024             :     /* Default settings */
    2025          36 :     subscriber_dir = NULL;
    2026          36 :     opt.config_file = NULL;
    2027          36 :     opt.pub_conninfo_str = NULL;
    2028          36 :     opt.socket_dir = NULL;
    2029          36 :     opt.sub_port = DEFAULT_SUB_PORT;
    2030          36 :     opt.sub_username = NULL;
    2031          36 :     opt.two_phase = false;
    2032          36 :     opt.database_names = (SimpleStringList)
    2033             :     {
    2034             :         0
    2035             :     };
    2036          36 :     opt.recovery_timeout = 0;
    2037             : 
    2038             :     /*
    2039             :      * Don't allow it to be run as root. It uses pg_ctl which does not allow
    2040             :      * it either.
    2041             :      */
    2042             : #ifndef WIN32
    2043          36 :     if (geteuid() == 0)
    2044             :     {
    2045           0 :         pg_log_error("cannot be executed by \"root\"");
    2046           0 :         pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
    2047             :                           progname);
    2048           0 :         exit(1);
    2049             :     }
    2050             : #endif
    2051             : 
    2052          36 :     get_restricted_token();
    2053             : 
    2054         272 :     while ((c = getopt_long(argc, argv, "d:D:np:P:R:s:t:TU:v",
    2055             :                             long_options, &option_index)) != -1)
    2056             :     {
    2057         242 :         switch (c)
    2058             :         {
    2059          48 :             case 'd':
    2060          48 :                 if (!simple_string_list_member(&opt.database_names, optarg))
    2061             :                 {
    2062          46 :                     simple_string_list_append(&opt.database_names, optarg);
    2063          46 :                     num_dbs++;
    2064             :                 }
    2065             :                 else
    2066             :                 {
    2067           2 :                     pg_log_error("database \"%s\" specified more than once", optarg);
    2068           2 :                     exit(1);
    2069             :                 }
    2070          46 :                 break;
    2071          32 :             case 'D':
    2072          32 :                 subscriber_dir = pg_strdup(optarg);
    2073          32 :                 canonicalize_path(subscriber_dir);
    2074          32 :                 break;
    2075          14 :             case 'n':
    2076          14 :                 dry_run = true;
    2077          14 :                 break;
    2078          18 :             case 'p':
    2079          18 :                 opt.sub_port = pg_strdup(optarg);
    2080          18 :                 break;
    2081          30 :             case 'P':
    2082          30 :                 opt.pub_conninfo_str = pg_strdup(optarg);
    2083          30 :                 break;
    2084           2 :             case 'R':
    2085           2 :                 if (!simple_string_list_member(&opt.objecttypes_to_remove, optarg))
    2086           2 :                     simple_string_list_append(&opt.objecttypes_to_remove, optarg);
    2087             :                 else
    2088           0 :                     pg_fatal("object type \"%s\" is specified more than once for --remove", optarg);
    2089           2 :                 break;
    2090          18 :             case 's':
    2091          18 :                 opt.socket_dir = pg_strdup(optarg);
    2092          18 :                 canonicalize_path(opt.socket_dir);
    2093          18 :                 break;
    2094           4 :             case 't':
    2095           4 :                 opt.recovery_timeout = atoi(optarg);
    2096           4 :                 break;
    2097           2 :             case 'T':
    2098           2 :                 opt.two_phase = true;
    2099           2 :                 break;
    2100           0 :             case 'U':
    2101           0 :                 opt.sub_username = pg_strdup(optarg);
    2102           0 :                 break;
    2103          32 :             case 'v':
    2104          32 :                 pg_logging_increase_verbosity();
    2105          32 :                 break;
    2106           0 :             case 1:
    2107           0 :                 opt.config_file = pg_strdup(optarg);
    2108           0 :                 break;
    2109          22 :             case 2:
    2110          22 :                 if (!simple_string_list_member(&opt.pub_names, optarg))
    2111             :                 {
    2112          20 :                     simple_string_list_append(&opt.pub_names, optarg);
    2113          20 :                     num_pubs++;
    2114             :                 }
    2115             :                 else
    2116             :                 {
    2117           2 :                     pg_log_error("publication \"%s\" specified more than once", optarg);
    2118           2 :                     exit(1);
    2119             :                 }
    2120          20 :                 break;
    2121           8 :             case 3:
    2122           8 :                 if (!simple_string_list_member(&opt.replslot_names, optarg))
    2123             :                 {
    2124           8 :                     simple_string_list_append(&opt.replslot_names, optarg);
    2125           8 :                     num_replslots++;
    2126             :                 }
    2127             :                 else
    2128             :                 {
    2129           0 :                     pg_log_error("replication slot \"%s\" specified more than once", optarg);
    2130           0 :                     exit(1);
    2131             :                 }
    2132           8 :                 break;
    2133          10 :             case 4:
    2134          10 :                 if (!simple_string_list_member(&opt.sub_names, optarg))
    2135             :                 {
    2136          10 :                     simple_string_list_append(&opt.sub_names, optarg);
    2137          10 :                     num_subs++;
    2138             :                 }
    2139             :                 else
    2140             :                 {
    2141           0 :                     pg_log_error("subscription \"%s\" specified more than once", optarg);
    2142           0 :                     exit(1);
    2143             :                 }
    2144          10 :                 break;
    2145           2 :             default:
    2146             :                 /* getopt_long already emitted a complaint */
    2147           2 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2148           2 :                 exit(1);
    2149             :         }
    2150             :     }
    2151             : 
    2152             :     /* Any non-option arguments? */
    2153          30 :     if (optind < argc)
    2154             :     {
    2155           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
    2156             :                      argv[optind]);
    2157           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2158           0 :         exit(1);
    2159             :     }
    2160             : 
    2161             :     /* Required arguments */
    2162          30 :     if (subscriber_dir == NULL)
    2163             :     {
    2164           2 :         pg_log_error("no subscriber data directory specified");
    2165           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2166           2 :         exit(1);
    2167             :     }
    2168             : 
    2169             :     /* If socket directory is not provided, use the current directory */
    2170          28 :     if (opt.socket_dir == NULL)
    2171             :     {
    2172             :         char        cwd[MAXPGPATH];
    2173             : 
    2174          10 :         if (!getcwd(cwd, MAXPGPATH))
    2175           0 :             pg_fatal("could not determine current directory");
    2176          10 :         opt.socket_dir = pg_strdup(cwd);
    2177          10 :         canonicalize_path(opt.socket_dir);
    2178             :     }
    2179             : 
    2180             :     /*
    2181             :      * Parse connection string. Build a base connection string that might be
    2182             :      * reused by multiple databases.
    2183             :      */
    2184          28 :     if (opt.pub_conninfo_str == NULL)
    2185             :     {
    2186             :         /*
    2187             :          * TODO use primary_conninfo (if available) from subscriber and
    2188             :          * extract publisher connection string. Assume that there are
    2189             :          * identical entries for physical and logical replication. If there is
    2190             :          * not, we would fail anyway.
    2191             :          */
    2192           2 :         pg_log_error("no publisher connection string specified");
    2193           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2194           2 :         exit(1);
    2195             :     }
    2196          26 :     pg_log_info("validating publisher connection string");
    2197          26 :     pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
    2198             :                                           &dbname_conninfo);
    2199          26 :     if (pub_base_conninfo == NULL)
    2200           0 :         exit(1);
    2201             : 
    2202          26 :     pg_log_info("validating subscriber connection string");
    2203          26 :     sub_base_conninfo = get_sub_conninfo(&opt);
    2204             : 
    2205          26 :     if (opt.database_names.head == NULL)
    2206             :     {
    2207           4 :         pg_log_info("no database was specified");
    2208             : 
    2209             :         /*
    2210             :          * If --database option is not provided, try to obtain the dbname from
    2211             :          * the publisher conninfo. If dbname parameter is not available, error
    2212             :          * out.
    2213             :          */
    2214           4 :         if (dbname_conninfo)
    2215             :         {
    2216           2 :             simple_string_list_append(&opt.database_names, dbname_conninfo);
    2217           2 :             num_dbs++;
    2218             : 
    2219           2 :             pg_log_info("database name \"%s\" was extracted from the publisher connection string",
    2220             :                         dbname_conninfo);
    2221             :         }
    2222             :         else
    2223             :         {
    2224           2 :             pg_log_error("no database name specified");
    2225           2 :             pg_log_error_hint("Try \"%s --help\" for more information.",
    2226             :                               progname);
    2227           2 :             exit(1);
    2228             :         }
    2229             :     }
    2230             : 
    2231             :     /* Number of object names must match number of databases */
    2232          24 :     if (num_pubs > 0 && num_pubs != num_dbs)
    2233             :     {
    2234           2 :         pg_log_error("wrong number of publication names specified");
    2235           2 :         pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
    2236             :                             num_pubs, num_dbs);
    2237           2 :         exit(1);
    2238             :     }
    2239          22 :     if (num_subs > 0 && num_subs != num_dbs)
    2240             :     {
    2241           2 :         pg_log_error("wrong number of subscription names specified");
    2242           2 :         pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
    2243             :                             num_subs, num_dbs);
    2244           2 :         exit(1);
    2245             :     }
    2246          20 :     if (num_replslots > 0 && num_replslots != num_dbs)
    2247             :     {
    2248           2 :         pg_log_error("wrong number of replication slot names specified");
    2249           2 :         pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
    2250             :                             num_replslots, num_dbs);
    2251           2 :         exit(1);
    2252             :     }
    2253             : 
    2254             :     /* Verify the object types specified for removal from the subscriber */
    2255          20 :     for (SimpleStringListCell *cell = opt.objecttypes_to_remove.head; cell; cell = cell->next)
    2256             :     {
    2257           2 :         if (pg_strcasecmp(cell->val, "publications") == 0)
    2258           2 :             dbinfos.objecttypes_to_remove |= OBJECTTYPE_PUBLICATIONS;
    2259             :         else
    2260             :         {
    2261           0 :             pg_log_error("invalid object type \"%s\" specified for --remove", cell->val);
    2262           0 :             pg_log_error_hint("The valid option is: \"publications\"");
    2263           0 :             exit(1);
    2264             :         }
    2265             :     }
    2266             : 
    2267             :     /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    2268          18 :     pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    2269          18 :     pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
    2270             : 
    2271             :     /* Rudimentary check for a data directory */
    2272          18 :     check_data_directory(subscriber_dir);
    2273             : 
    2274          18 :     dbinfos.two_phase = opt.two_phase;
    2275             : 
    2276             :     /*
    2277             :      * Store database information for publisher and subscriber. It should be
    2278             :      * called before atexit() because its return is used in the
    2279             :      * cleanup_objects_atexit().
    2280             :      */
    2281          18 :     dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
    2282             : 
    2283             :     /* Register a function to clean up objects in case of failure */
    2284          18 :     atexit(cleanup_objects_atexit);
    2285             : 
    2286             :     /*
    2287             :      * Check if the subscriber data directory has the same system identifier
    2288             :      * than the publisher data directory.
    2289             :      */
    2290          18 :     pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
    2291          18 :     sub_sysid = get_standby_sysid(subscriber_dir);
    2292          18 :     if (pub_sysid != sub_sysid)
    2293           2 :         pg_fatal("subscriber data directory is not a copy of the source database cluster");
    2294             : 
    2295             :     /* Subscriber PID file */
    2296          16 :     snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
    2297             : 
    2298             :     /*
    2299             :      * The standby server must not be running. If the server is started under
    2300             :      * service manager and pg_createsubscriber stops it, the service manager
    2301             :      * might react to this action and start the server again. Therefore,
    2302             :      * refuse to proceed if the server is running to avoid possible failures.
    2303             :      */
    2304          16 :     if (stat(pidfile, &statbuf) == 0)
    2305             :     {
    2306           2 :         pg_log_error("standby server is running");
    2307           2 :         pg_log_error_hint("Stop the standby server and try again.");
    2308           2 :         exit(1);
    2309             :     }
    2310             : 
    2311             :     /*
    2312             :      * Start a short-lived standby server with temporary parameters (provided
    2313             :      * by command-line options). The goal is to avoid connections during the
    2314             :      * transformation steps.
    2315             :      */
    2316          14 :     pg_log_info("starting the standby server with command-line options");
    2317          14 :     start_standby_server(&opt, true, false);
    2318             : 
    2319             :     /* Check if the standby server is ready for logical replication */
    2320          14 :     check_subscriber(dbinfos.dbinfo);
    2321             : 
    2322             :     /* Check if the primary server is ready for logical replication */
    2323          10 :     check_publisher(dbinfos.dbinfo);
    2324             : 
    2325             :     /*
    2326             :      * Stop the target server. The recovery process requires that the server
    2327             :      * reaches a consistent state before targeting the recovery stop point.
    2328             :      * Make sure a consistent state is reached (stop the target server
    2329             :      * guarantees it) *before* creating the replication slots in
    2330             :      * setup_publisher().
    2331             :      */
    2332           6 :     pg_log_info("stopping the subscriber");
    2333           6 :     stop_standby_server(subscriber_dir);
    2334             : 
    2335             :     /* Create the required objects for each database on publisher */
    2336           6 :     consistent_lsn = setup_publisher(dbinfos.dbinfo);
    2337             : 
    2338             :     /* Write the required recovery parameters */
    2339           6 :     setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
    2340             : 
    2341             :     /*
    2342             :      * Start subscriber so the recovery parameters will take effect. Wait
    2343             :      * until accepting connections. We don't want to start logical replication
    2344             :      * during setup.
    2345             :      */
    2346           6 :     pg_log_info("starting the subscriber");
    2347           6 :     start_standby_server(&opt, true, true);
    2348             : 
    2349             :     /* Waiting the subscriber to be promoted */
    2350           6 :     wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
    2351             : 
    2352             :     /*
    2353             :      * Create the subscription for each database on subscriber. It does not
    2354             :      * enable it immediately because it needs to adjust the replication start
    2355             :      * point to the LSN reported by setup_publisher().  It also cleans up
    2356             :      * publications created by this tool and replication to the standby.
    2357             :      */
    2358           6 :     setup_subscriber(dbinfos.dbinfo, consistent_lsn);
    2359             : 
    2360             :     /* Remove primary_slot_name if it exists on primary */
    2361           6 :     drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
    2362             : 
    2363             :     /* Remove failover replication slots if they exist on subscriber */
    2364           6 :     drop_failover_replication_slots(dbinfos.dbinfo);
    2365             : 
    2366             :     /* Stop the subscriber */
    2367           6 :     pg_log_info("stopping the subscriber");
    2368           6 :     stop_standby_server(subscriber_dir);
    2369             : 
    2370             :     /* Change system identifier from subscriber */
    2371           6 :     modify_subscriber_sysid(&opt);
    2372             : 
    2373           6 :     success = true;
    2374             : 
    2375           6 :     pg_log_info("Done!");
    2376             : 
    2377           6 :     return 0;
    2378             : }

Generated by: LCOV version 1.14