LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_createsubscriber.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 795 955 83.2 %
Date: 2026-01-12 00:17:24 Functions: 40 40 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_createsubscriber.c
       4             :  *    Create a new logical replica from a standby server
       5             :  *
       6             :  * Copyright (c) 2024-2026, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/bin/pg_basebackup/pg_createsubscriber.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : 
      14             : #include "postgres_fe.h"
      15             : 
      16             : #include <sys/stat.h>
      17             : #include <sys/time.h>
      18             : #include <sys/wait.h>
      19             : #include <time.h>
      20             : 
      21             : #include "common/connect.h"
      22             : #include "common/controldata_utils.h"
      23             : #include "common/file_utils.h"
      24             : #include "common/logging.h"
      25             : #include "common/pg_prng.h"
      26             : #include "common/restricted_token.h"
      27             : #include "datatype/timestamp.h"
      28             : #include "fe_utils/recovery_gen.h"
      29             : #include "fe_utils/simple_list.h"
      30             : #include "fe_utils/string_utils.h"
      31             : #include "fe_utils/version.h"
      32             : #include "getopt_long.h"
      33             : 
      34             : #define DEFAULT_SUB_PORT    "50432"
      35             : #define OBJECTTYPE_PUBLICATIONS  0x0001
      36             : 
      37             : /*
      38             :  * Configuration files for recovery parameters.
      39             :  *
      40             :  * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
      41             :  * the server through an include_if_exists in postgresql.auto.conf.
      42             :  *
      43             :  * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
      44             :  * so as the recovery parameters set by this tool never take effect on node
      45             :  * restart.  The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
      46             :  * debugging.
      47             :  */
      48             : #define PG_AUTOCONF_FILENAME        "postgresql.auto.conf"
      49             : #define INCLUDED_CONF_FILE          "pg_createsubscriber.conf"
      50             : #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
      51             : 
      52             : /* Command-line options */
      53             : struct CreateSubscriberOptions
      54             : {
      55             :     char       *config_file;    /* configuration file */
      56             :     char       *pub_conninfo_str;   /* publisher connection string */
      57             :     char       *socket_dir;     /* directory for Unix-domain socket, if any */
      58             :     char       *sub_port;       /* subscriber port number */
      59             :     const char *sub_username;   /* subscriber username */
      60             :     bool        two_phase;      /* enable-two-phase option */
      61             :     SimpleStringList database_names;    /* list of database names */
      62             :     SimpleStringList pub_names; /* list of publication names */
      63             :     SimpleStringList sub_names; /* list of subscription names */
      64             :     SimpleStringList replslot_names;    /* list of replication slot names */
      65             :     int         recovery_timeout;   /* stop recovery after this time */
      66             :     bool        all_dbs;        /* all option */
      67             :     SimpleStringList objecttypes_to_clean;  /* list of object types to cleanup */
      68             : };
      69             : 
      70             : /* per-database publication/subscription info */
      71             : struct LogicalRepInfo
      72             : {
      73             :     char       *dbname;         /* database name */
      74             :     char       *pubconninfo;    /* publisher connection string */
      75             :     char       *subconninfo;    /* subscriber connection string */
      76             :     char       *pubname;        /* publication name */
      77             :     char       *subname;        /* subscription name */
      78             :     char       *replslotname;   /* replication slot name */
      79             : 
      80             :     bool        made_replslot;  /* replication slot was created */
      81             :     bool        made_publication;   /* publication was created */
      82             : };
      83             : 
      84             : /*
      85             :  * Information shared across all the databases (or publications and
      86             :  * subscriptions).
      87             :  */
      88             : struct LogicalRepInfos
      89             : {
      90             :     struct LogicalRepInfo *dbinfo;
      91             :     bool        two_phase;      /* enable-two-phase option */
      92             :     bits32      objecttypes_to_clean;   /* flags indicating which object types
      93             :                                          * to clean up on subscriber */
      94             : };
      95             : 
      96             : static void cleanup_objects_atexit(void);
      97             : static void usage(void);
      98             : static char *get_base_conninfo(const char *conninfo, char **dbname);
      99             : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
     100             : static char *get_exec_path(const char *argv0, const char *progname);
     101             : static void check_data_directory(const char *datadir);
     102             : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
     103             : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     104             :                                                  const char *pub_base_conninfo,
     105             :                                                  const char *sub_base_conninfo);
     106             : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
     107             : static void disconnect_database(PGconn *conn, bool exit_on_error);
     108             : static uint64 get_primary_sysid(const char *conninfo);
     109             : static uint64 get_standby_sysid(const char *datadir);
     110             : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
     111             : static bool server_is_in_recovery(PGconn *conn);
     112             : static char *generate_object_name(PGconn *conn);
     113             : static void check_publisher(const struct LogicalRepInfo *dbinfo);
     114             : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
     115             : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
     116             : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
     117             :                              const char *consistent_lsn);
     118             : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
     119             :                            const char *lsn);
     120             : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
     121             :                                           const char *slotname);
     122             : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
     123             : static char *create_logical_replication_slot(PGconn *conn,
     124             :                                              struct LogicalRepInfo *dbinfo);
     125             : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
     126             :                                   const char *slot_name);
     127             : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
     128             : static void start_standby_server(const struct CreateSubscriberOptions *opt,
     129             :                                  bool restricted_access,
     130             :                                  bool restrict_logical_worker);
     131             : static void stop_standby_server(const char *datadir);
     132             : static void wait_for_end_recovery(const char *conninfo,
     133             :                                   const struct CreateSubscriberOptions *opt);
     134             : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     135             : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
     136             : static void drop_publication(PGconn *conn, const char *pubname,
     137             :                              const char *dbname, bool *made_publication);
     138             : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
     139             : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     140             : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
     141             :                                      const char *lsn);
     142             : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     143             : static void check_and_drop_existing_subscriptions(PGconn *conn,
     144             :                                                   const struct LogicalRepInfo *dbinfo);
     145             : static void drop_existing_subscription(PGconn *conn, const char *subname,
     146             :                                        const char *dbname);
     147             : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
     148             :                                     bool dbnamespecified);
     149             : 
     150             : #define WAIT_INTERVAL   1       /* 1 second */
     151             : 
     152             : static const char *progname;
     153             : 
     154             : static char *primary_slot_name = NULL;
     155             : static bool dry_run = false;
     156             : 
     157             : static bool success = false;
     158             : 
     159             : static struct LogicalRepInfos dbinfos;
     160             : static int  num_dbs = 0;        /* number of specified databases */
     161             : static int  num_pubs = 0;       /* number of specified publications */
     162             : static int  num_subs = 0;       /* number of specified subscriptions */
     163             : static int  num_replslots = 0;  /* number of specified replication slots */
     164             : 
     165             : static pg_prng_state prng_state;
     166             : 
     167             : static char *pg_ctl_path = NULL;
     168             : static char *pg_resetwal_path = NULL;
     169             : 
     170             : /* standby / subscriber data directory */
     171             : static char *subscriber_dir = NULL;
     172             : 
     173             : static bool recovery_ended = false;
     174             : static bool standby_running = false;
     175             : static bool recovery_params_set = false;
     176             : 
     177             : 
     178             : /*
     179             :  * Clean up objects created by pg_createsubscriber.
     180             :  *
     181             :  * Publications and replication slots are created on the primary.  Depending
     182             :  * on the step where it failed, already-created objects should be removed if
     183             :  * possible (sometimes this won't work due to a connection issue).
     184             :  * There is no cleanup on the target server *after* its promotion, because any
     185             :  * failure at this point means recreating the physical replica and starting
     186             :  * again.
     187             :  *
     188             :  * The recovery configuration is always removed, by renaming the included
     189             :  * configuration file out of the way.
     190             :  */
     191             : static void
     192          20 : cleanup_objects_atexit(void)
     193             : {
     194             :     /* Rename the included configuration file, if necessary. */
     195          20 :     if (recovery_params_set)
     196             :     {
     197             :         char        conf_filename[MAXPGPATH];
     198             :         char        conf_filename_disabled[MAXPGPATH];
     199             : 
     200           2 :         snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
     201             :                  INCLUDED_CONF_FILE);
     202           2 :         snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
     203             :                  INCLUDED_CONF_FILE_DISABLED);
     204             : 
     205           2 :         if (durable_rename(conf_filename, conf_filename_disabled) != 0)
     206             :         {
     207             :             /* durable_rename() has already logged something. */
     208           0 :             pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
     209             :         }
     210             :     }
     211             : 
     212          20 :     if (success)
     213           8 :         return;
     214             : 
     215             :     /*
     216             :      * If the server is promoted, there is no way to use the current setup
     217             :      * again. Warn the user that a new replication setup should be done before
     218             :      * trying again.
     219             :      */
     220          12 :     if (recovery_ended)
     221             :     {
     222           0 :         pg_log_warning("failed after the end of recovery");
     223           0 :         pg_log_warning_hint("The target server cannot be used as a physical replica anymore.  "
     224             :                             "You must recreate the physical replica before continuing.");
     225             :     }
     226             : 
     227          36 :     for (int i = 0; i < num_dbs; i++)
     228             :     {
     229          24 :         struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
     230             : 
     231          24 :         if (dbinfo->made_publication || dbinfo->made_replslot)
     232             :         {
     233             :             PGconn     *conn;
     234             : 
     235           0 :             conn = connect_database(dbinfo->pubconninfo, false);
     236           0 :             if (conn != NULL)
     237             :             {
     238           0 :                 if (dbinfo->made_publication)
     239           0 :                     drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
     240             :                                      &dbinfo->made_publication);
     241           0 :                 if (dbinfo->made_replslot)
     242           0 :                     drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
     243           0 :                 disconnect_database(conn, false);
     244             :             }
     245             :             else
     246             :             {
     247             :                 /*
     248             :                  * If a connection could not be established, inform the user
     249             :                  * that some objects were left on primary and should be
     250             :                  * removed before trying again.
     251             :                  */
     252           0 :                 if (dbinfo->made_publication)
     253             :                 {
     254           0 :                     pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
     255             :                                    dbinfo->pubname,
     256             :                                    dbinfo->dbname);
     257           0 :                     pg_log_warning_hint("Drop this publication before trying again.");
     258             :                 }
     259           0 :                 if (dbinfo->made_replslot)
     260             :                 {
     261           0 :                     pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
     262             :                                    dbinfo->replslotname,
     263             :                                    dbinfo->dbname);
     264           0 :                     pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
     265             :                 }
     266             :             }
     267             :         }
     268             :     }
     269             : 
     270          12 :     if (standby_running)
     271           8 :         stop_standby_server(subscriber_dir);
     272             : }
     273             : 
     274             : static void
     275           2 : usage(void)
     276             : {
     277           2 :     printf(_("%s creates a new logical replica from a standby server.\n\n"),
     278             :            progname);
     279           2 :     printf(_("Usage:\n"));
     280           2 :     printf(_("  %s [OPTION]...\n"), progname);
     281           2 :     printf(_("\nOptions:\n"));
     282           2 :     printf(_("  -a, --all                       create subscriptions for all databases except template\n"
     283             :              "                                  databases and databases that don't allow connections\n"));
     284           2 :     printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
     285           2 :     printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
     286           2 :     printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
     287           2 :     printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
     288           2 :     printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
     289           2 :     printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
     290           2 :     printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
     291           2 :     printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
     292           2 :     printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
     293           2 :     printf(_("  -v, --verbose                   output verbose messages\n"));
     294           2 :     printf(_("      --clean=OBJECTTYPE          drop all objects of the specified type from specified\n"
     295             :              "                                  databases on the subscriber; accepts: \"%s\"\n"), "publications");
     296           2 :     printf(_("      --config-file=FILENAME      use specified main server configuration\n"
     297             :              "                                  file when running target cluster\n"));
     298           2 :     printf(_("      --publication=NAME          publication name\n"));
     299           2 :     printf(_("      --replication-slot=NAME     replication slot name\n"));
     300           2 :     printf(_("      --subscription=NAME         subscription name\n"));
     301           2 :     printf(_("  -V, --version                   output version information, then exit\n"));
     302           2 :     printf(_("  -?, --help                      show this help, then exit\n"));
     303           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     304           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     305           2 : }
     306             : 
     307             : /*
     308             :  * Subroutine to append "keyword=value" to a connection string,
     309             :  * with proper quoting of the value.  (We assume keywords don't need that.)
     310             :  */
     311             : static void
     312         214 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
     313             : {
     314         214 :     if (buf->len > 0)
     315         158 :         appendPQExpBufferChar(buf, ' ');
     316         214 :     appendPQExpBufferStr(buf, keyword);
     317         214 :     appendPQExpBufferChar(buf, '=');
     318         214 :     appendConnStrVal(buf, val);
     319         214 : }
     320             : 
     321             : /*
     322             :  * Validate a connection string. Returns a base connection string that is a
     323             :  * connection string without a database name.
     324             :  *
     325             :  * Since we might process multiple databases, each database name will be
     326             :  * appended to this base connection string to provide a final connection
     327             :  * string. If the second argument (dbname) is not null, returns dbname if the
     328             :  * provided connection string contains it.
     329             :  *
     330             :  * It is the caller's responsibility to free the returned connection string and
     331             :  * dbname.
     332             :  */
     333             : static char *
     334          28 : get_base_conninfo(const char *conninfo, char **dbname)
     335             : {
     336             :     PQExpBuffer buf;
     337             :     PQconninfoOption *conn_opts;
     338             :     PQconninfoOption *conn_opt;
     339          28 :     char       *errmsg = NULL;
     340             :     char       *ret;
     341             : 
     342          28 :     conn_opts = PQconninfoParse(conninfo, &errmsg);
     343          28 :     if (conn_opts == NULL)
     344             :     {
     345           0 :         pg_log_error("could not parse connection string: %s", errmsg);
     346           0 :         PQfreemem(errmsg);
     347           0 :         return NULL;
     348             :     }
     349             : 
     350          28 :     buf = createPQExpBuffer();
     351        1456 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     352             :     {
     353        1428 :         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     354             :         {
     355          66 :             if (strcmp(conn_opt->keyword, "dbname") == 0)
     356             :             {
     357          18 :                 if (dbname)
     358          18 :                     *dbname = pg_strdup(conn_opt->val);
     359          18 :                 continue;
     360             :             }
     361          48 :             appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
     362             :         }
     363             :     }
     364             : 
     365          28 :     ret = pg_strdup(buf->data);
     366             : 
     367          28 :     destroyPQExpBuffer(buf);
     368          28 :     PQconninfoFree(conn_opts);
     369             : 
     370          28 :     return ret;
     371             : }
     372             : 
     373             : /*
     374             :  * Build a subscriber connection string. Only a few parameters are supported
     375             :  * since it starts a server with restricted access.
     376             :  */
     377             : static char *
     378          28 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
     379             : {
     380          28 :     PQExpBuffer buf = createPQExpBuffer();
     381             :     char       *ret;
     382             : 
     383          28 :     appendConnStrItem(buf, "port", opt->sub_port);
     384             : #if !defined(WIN32)
     385          28 :     appendConnStrItem(buf, "host", opt->socket_dir);
     386             : #endif
     387          28 :     if (opt->sub_username != NULL)
     388           0 :         appendConnStrItem(buf, "user", opt->sub_username);
     389          28 :     appendConnStrItem(buf, "fallback_application_name", progname);
     390             : 
     391          28 :     ret = pg_strdup(buf->data);
     392             : 
     393          28 :     destroyPQExpBuffer(buf);
     394             : 
     395          28 :     return ret;
     396             : }
     397             : 
     398             : /*
     399             :  * Verify if a PostgreSQL binary (progname) is available in the same directory as
     400             :  * pg_createsubscriber and it has the same version.  It returns the absolute
     401             :  * path of the progname.
     402             :  */
     403             : static char *
     404          40 : get_exec_path(const char *argv0, const char *progname)
     405             : {
     406             :     char       *versionstr;
     407             :     char       *exec_path;
     408             :     int         ret;
     409             : 
     410          40 :     versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
     411          40 :     exec_path = pg_malloc(MAXPGPATH);
     412          40 :     ret = find_other_exec(argv0, progname, versionstr, exec_path);
     413             : 
     414          40 :     if (ret < 0)
     415             :     {
     416             :         char        full_path[MAXPGPATH];
     417             : 
     418           0 :         if (find_my_exec(argv0, full_path) < 0)
     419           0 :             strlcpy(full_path, progname, sizeof(full_path));
     420             : 
     421           0 :         if (ret == -1)
     422           0 :             pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
     423             :                      progname, "pg_createsubscriber", full_path);
     424             :         else
     425           0 :             pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
     426             :                      progname, full_path, "pg_createsubscriber");
     427             :     }
     428             : 
     429          40 :     pg_log_debug("%s path is:  %s", progname, exec_path);
     430             : 
     431          40 :     return exec_path;
     432             : }
     433             : 
     434             : /*
     435             :  * Is it a cluster directory? These are preliminary checks. It is far from
     436             :  * making an accurate check. If it is not a clone from the publisher, it will
     437             :  * eventually fail in a future step.
     438             :  */
     439             : static void
     440          20 : check_data_directory(const char *datadir)
     441             : {
     442             :     struct stat statbuf;
     443             :     uint32      major_version;
     444             :     char       *version_str;
     445             : 
     446          20 :     pg_log_info("checking if directory \"%s\" is a cluster data directory",
     447             :                 datadir);
     448             : 
     449          20 :     if (stat(datadir, &statbuf) != 0)
     450             :     {
     451           0 :         if (errno == ENOENT)
     452           0 :             pg_fatal("data directory \"%s\" does not exist", datadir);
     453             :         else
     454           0 :             pg_fatal("could not access directory \"%s\": %m", datadir);
     455             :     }
     456             : 
     457             :     /*
     458             :      * Retrieve the contents of this cluster's PG_VERSION.  We require
     459             :      * compatibility with the same major version as the one this tool is
     460             :      * compiled with.
     461             :      */
     462          20 :     major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
     463          20 :     if (major_version != PG_MAJORVERSION_NUM)
     464             :     {
     465           0 :         pg_log_error("data directory is of wrong version");
     466           0 :         pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
     467             :                             "PG_VERSION", version_str, PG_MAJORVERSION);
     468           0 :         exit(1);
     469             :     }
     470          20 : }
     471             : 
     472             : /*
     473             :  * Append database name into a base connection string.
     474             :  *
     475             :  * dbname is the only parameter that changes so it is not included in the base
     476             :  * connection string. This function concatenates dbname to build a "real"
     477             :  * connection string.
     478             :  */
     479             : static char *
     480          82 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
     481             : {
     482          82 :     PQExpBuffer buf = createPQExpBuffer();
     483             :     char       *ret;
     484             : 
     485             :     Assert(conninfo != NULL);
     486             : 
     487          82 :     appendPQExpBufferStr(buf, conninfo);
     488          82 :     appendConnStrItem(buf, "dbname", dbname);
     489             : 
     490          82 :     ret = pg_strdup(buf->data);
     491          82 :     destroyPQExpBuffer(buf);
     492             : 
     493          82 :     return ret;
     494             : }
     495             : 
     496             : /*
     497             :  * Store publication and subscription information.
     498             :  *
     499             :  * If publication, replication slot and subscription names were specified,
     500             :  * store it here. Otherwise, a generated name will be assigned to the object in
     501             :  * setup_publisher().
     502             :  */
     503             : static struct LogicalRepInfo *
     504          20 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     505             :                    const char *pub_base_conninfo,
     506             :                    const char *sub_base_conninfo)
     507             : {
     508             :     struct LogicalRepInfo *dbinfo;
     509          20 :     SimpleStringListCell *pubcell = NULL;
     510          20 :     SimpleStringListCell *subcell = NULL;
     511          20 :     SimpleStringListCell *replslotcell = NULL;
     512          20 :     int         i = 0;
     513             : 
     514          20 :     dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
     515             : 
     516          20 :     if (num_pubs > 0)
     517           4 :         pubcell = opt->pub_names.head;
     518          20 :     if (num_subs > 0)
     519           2 :         subcell = opt->sub_names.head;
     520          20 :     if (num_replslots > 0)
     521           4 :         replslotcell = opt->replslot_names.head;
     522             : 
     523          60 :     for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
     524             :     {
     525             :         char       *conninfo;
     526             : 
     527             :         /* Fill publisher attributes */
     528          40 :         conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
     529          40 :         dbinfo[i].pubconninfo = conninfo;
     530          40 :         dbinfo[i].dbname = cell->val;
     531          40 :         if (num_pubs > 0)
     532           8 :             dbinfo[i].pubname = pubcell->val;
     533             :         else
     534          32 :             dbinfo[i].pubname = NULL;
     535          40 :         if (num_replslots > 0)
     536           6 :             dbinfo[i].replslotname = replslotcell->val;
     537             :         else
     538          34 :             dbinfo[i].replslotname = NULL;
     539          40 :         dbinfo[i].made_replslot = false;
     540          40 :         dbinfo[i].made_publication = false;
     541             :         /* Fill subscriber attributes */
     542          40 :         conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
     543          40 :         dbinfo[i].subconninfo = conninfo;
     544          40 :         if (num_subs > 0)
     545           4 :             dbinfo[i].subname = subcell->val;
     546             :         else
     547          36 :             dbinfo[i].subname = NULL;
     548             :         /* Other fields will be filled later */
     549             : 
     550          40 :         pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
     551             :                      dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
     552             :                      dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
     553             :                      dbinfo[i].pubconninfo);
     554          40 :         pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
     555             :                      dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
     556             :                      dbinfo[i].subconninfo,
     557             :                      dbinfos.two_phase ? "true" : "false");
     558             : 
     559          40 :         if (num_pubs > 0)
     560           8 :             pubcell = pubcell->next;
     561          40 :         if (num_subs > 0)
     562           4 :             subcell = subcell->next;
     563          40 :         if (num_replslots > 0)
     564           6 :             replslotcell = replslotcell->next;
     565             : 
     566          40 :         i++;
     567             :     }
     568             : 
     569          20 :     return dbinfo;
     570             : }
     571             : 
     572             : /*
     573             :  * Open a new connection. If exit_on_error is true, it has an undesired
     574             :  * condition and it should exit immediately.
     575             :  */
     576             : static PGconn *
     577         114 : connect_database(const char *conninfo, bool exit_on_error)
     578             : {
     579             :     PGconn     *conn;
     580             :     PGresult   *res;
     581             : 
     582         114 :     conn = PQconnectdb(conninfo);
     583         114 :     if (PQstatus(conn) != CONNECTION_OK)
     584             :     {
     585           0 :         pg_log_error("connection to database failed: %s",
     586             :                      PQerrorMessage(conn));
     587           0 :         PQfinish(conn);
     588             : 
     589           0 :         if (exit_on_error)
     590           0 :             exit(1);
     591           0 :         return NULL;
     592             :     }
     593             : 
     594             :     /* Secure search_path */
     595         114 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     596         114 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     597             :     {
     598           0 :         pg_log_error("could not clear \"search_path\": %s",
     599             :                      PQresultErrorMessage(res));
     600           0 :         PQclear(res);
     601           0 :         PQfinish(conn);
     602             : 
     603           0 :         if (exit_on_error)
     604           0 :             exit(1);
     605           0 :         return NULL;
     606             :     }
     607         114 :     PQclear(res);
     608             : 
     609         114 :     return conn;
     610             : }
     611             : 
     612             : /*
     613             :  * Close the connection. If exit_on_error is true, it has an undesired
     614             :  * condition and it should exit immediately.
     615             :  */
     616             : static void
     617         114 : disconnect_database(PGconn *conn, bool exit_on_error)
     618             : {
     619             :     Assert(conn != NULL);
     620             : 
     621         114 :     PQfinish(conn);
     622             : 
     623         114 :     if (exit_on_error)
     624           4 :         exit(1);
     625         110 : }
     626             : 
     627             : /*
     628             :  * Obtain the system identifier using the provided connection. It will be used
     629             :  * to compare if a data directory is a clone of another one.
     630             :  */
     631             : static uint64
     632          20 : get_primary_sysid(const char *conninfo)
     633             : {
     634             :     PGconn     *conn;
     635             :     PGresult   *res;
     636             :     uint64      sysid;
     637             : 
     638          20 :     pg_log_info("getting system identifier from publisher");
     639             : 
     640          20 :     conn = connect_database(conninfo, true);
     641             : 
     642          20 :     res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
     643          20 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     644             :     {
     645           0 :         pg_log_error("could not get system identifier: %s",
     646             :                      PQresultErrorMessage(res));
     647           0 :         disconnect_database(conn, true);
     648             :     }
     649          20 :     if (PQntuples(res) != 1)
     650             :     {
     651           0 :         pg_log_error("could not get system identifier: got %d rows, expected %d row",
     652             :                      PQntuples(res), 1);
     653           0 :         disconnect_database(conn, true);
     654             :     }
     655             : 
     656          20 :     sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
     657             : 
     658          20 :     pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
     659             : 
     660          20 :     PQclear(res);
     661          20 :     disconnect_database(conn, false);
     662             : 
     663          20 :     return sysid;
     664             : }
     665             : 
     666             : /*
     667             :  * Obtain the system identifier from control file. It will be used to compare
     668             :  * if a data directory is a clone of another one. This routine is used locally
     669             :  * and avoids a connection.
     670             :  */
     671             : static uint64
     672          20 : get_standby_sysid(const char *datadir)
     673             : {
     674             :     ControlFileData *cf;
     675             :     bool        crc_ok;
     676             :     uint64      sysid;
     677             : 
     678          20 :     pg_log_info("getting system identifier from subscriber");
     679             : 
     680          20 :     cf = get_controlfile(datadir, &crc_ok);
     681          20 :     if (!crc_ok)
     682           0 :         pg_fatal("control file appears to be corrupt");
     683             : 
     684          20 :     sysid = cf->system_identifier;
     685             : 
     686          20 :     pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
     687             : 
     688          20 :     pg_free(cf);
     689             : 
     690          20 :     return sysid;
     691             : }
     692             : 
     693             : /*
     694             :  * Modify the system identifier. Since a standby server preserves the system
     695             :  * identifier, it makes sense to change it to avoid situations in which WAL
     696             :  * files from one of the systems might be used in the other one.
     697             :  */
     698             : static void
     699           8 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
     700             : {
     701             :     ControlFileData *cf;
     702             :     bool        crc_ok;
     703             :     struct timeval tv;
     704             : 
     705             :     char       *cmd_str;
     706             : 
     707           8 :     pg_log_info("modifying system identifier of subscriber");
     708             : 
     709           8 :     cf = get_controlfile(subscriber_dir, &crc_ok);
     710           8 :     if (!crc_ok)
     711           0 :         pg_fatal("control file appears to be corrupt");
     712             : 
     713             :     /*
     714             :      * Select a new system identifier.
     715             :      *
     716             :      * XXX this code was extracted from BootStrapXLOG().
     717             :      */
     718           8 :     gettimeofday(&tv, NULL);
     719           8 :     cf->system_identifier = ((uint64) tv.tv_sec) << 32;
     720           8 :     cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
     721           8 :     cf->system_identifier |= getpid() & 0xFFF;
     722             : 
     723           8 :     if (dry_run)
     724           6 :         pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
     725             :                     cf->system_identifier);
     726             :     else
     727             :     {
     728           2 :         update_controlfile(subscriber_dir, cf, true);
     729           2 :         pg_log_info("system identifier is %" PRIu64 " on subscriber",
     730             :                     cf->system_identifier);
     731             :     }
     732             : 
     733           8 :     if (dry_run)
     734           6 :         pg_log_info("dry-run: would run pg_resetwal on the subscriber");
     735             :     else
     736           2 :         pg_log_info("running pg_resetwal on the subscriber");
     737             : 
     738           8 :     cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
     739             :                        subscriber_dir, DEVNULL);
     740             : 
     741           8 :     pg_log_debug("pg_resetwal command is: %s", cmd_str);
     742             : 
     743           8 :     if (!dry_run)
     744             :     {
     745           2 :         int         rc = system(cmd_str);
     746             : 
     747           2 :         if (rc == 0)
     748           2 :             pg_log_info("successfully reset WAL on the subscriber");
     749             :         else
     750           0 :             pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
     751             :     }
     752             : 
     753           8 :     pg_free(cf);
     754           8 : }
     755             : 
     756             : /*
     757             :  * Generate an object name using a prefix, database oid and a random integer.
     758             :  * It is used in case the user does not specify an object name (publication,
     759             :  * subscription, replication slot).
     760             :  */
     761             : static char *
     762          16 : generate_object_name(PGconn *conn)
     763             : {
     764             :     PGresult   *res;
     765             :     Oid         oid;
     766             :     uint32      rand;
     767             :     char       *objname;
     768             : 
     769          16 :     res = PQexec(conn,
     770             :                  "SELECT oid FROM pg_catalog.pg_database "
     771             :                  "WHERE datname = pg_catalog.current_database()");
     772          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     773             :     {
     774           0 :         pg_log_error("could not obtain database OID: %s",
     775             :                      PQresultErrorMessage(res));
     776           0 :         disconnect_database(conn, true);
     777             :     }
     778             : 
     779          16 :     if (PQntuples(res) != 1)
     780             :     {
     781           0 :         pg_log_error("could not obtain database OID: got %d rows, expected %d row",
     782             :                      PQntuples(res), 1);
     783           0 :         disconnect_database(conn, true);
     784             :     }
     785             : 
     786             :     /* Database OID */
     787          16 :     oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
     788             : 
     789          16 :     PQclear(res);
     790             : 
     791             :     /* Random unsigned integer */
     792          16 :     rand = pg_prng_uint32(&prng_state);
     793             : 
     794             :     /*
     795             :      * Build the object name. The name must not exceed NAMEDATALEN - 1. This
     796             :      * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
     797             :      * '\0').
     798             :      */
     799          16 :     objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
     800             : 
     801          16 :     return objname;
     802             : }
     803             : 
     804             : /*
     805             :  * Does the publication exist in the specified database?
     806             :  */
     807             : static bool
     808          16 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
     809             : {
     810          16 :     PQExpBuffer str = createPQExpBuffer();
     811             :     PGresult   *res;
     812          16 :     bool        found = false;
     813          16 :     char       *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
     814             : 
     815          16 :     appendPQExpBuffer(str,
     816             :                       "SELECT 1 FROM pg_catalog.pg_publication "
     817             :                       "WHERE pubname = %s",
     818             :                       pubname_esc);
     819          16 :     res = PQexec(conn, str->data);
     820          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     821             :     {
     822           0 :         pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
     823             :                      pubname, dbname, PQerrorMessage(conn));
     824           0 :         disconnect_database(conn, true);
     825             :     }
     826             : 
     827          16 :     if (PQntuples(res) == 1)
     828           2 :         found = true;
     829             : 
     830          16 :     PQclear(res);
     831          16 :     PQfreemem(pubname_esc);
     832          16 :     destroyPQExpBuffer(str);
     833             : 
     834          16 :     return found;
     835             : }
     836             : 
     837             : /*
     838             :  * Create the publications and replication slots in preparation for logical
     839             :  * replication. Returns the LSN from latest replication slot. It will be the
     840             :  * replication start point that is used to adjust the subscriptions (see
     841             :  * set_replication_progress).
     842             :  */
     843             : static char *
     844           8 : setup_publisher(struct LogicalRepInfo *dbinfo)
     845             : {
     846           8 :     char       *lsn = NULL;
     847             : 
     848           8 :     pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
     849             : 
     850          24 :     for (int i = 0; i < num_dbs; i++)
     851             :     {
     852             :         PGconn     *conn;
     853          16 :         char       *genname = NULL;
     854             : 
     855          16 :         conn = connect_database(dbinfo[i].pubconninfo, true);
     856             : 
     857             :         /*
     858             :          * If an object name was not specified as command-line options, assign
     859             :          * a generated object name. The replication slot has a different rule.
     860             :          * The subscription name is assigned to the replication slot name if
     861             :          * no replication slot is specified. It follows the same rule as
     862             :          * CREATE SUBSCRIPTION.
     863             :          */
     864          16 :         if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
     865          16 :             genname = generate_object_name(conn);
     866          16 :         if (num_pubs == 0)
     867           8 :             dbinfo[i].pubname = pg_strdup(genname);
     868          16 :         if (num_subs == 0)
     869          12 :             dbinfo[i].subname = pg_strdup(genname);
     870          16 :         if (num_replslots == 0)
     871          10 :             dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
     872             : 
     873          16 :         if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
     874             :         {
     875             :             /* Reuse existing publication on publisher. */
     876           2 :             pg_log_info("use existing publication \"%s\" in database \"%s\"",
     877             :                         dbinfo[i].pubname, dbinfo[i].dbname);
     878             :             /* Don't remove pre-existing publication if an error occurs. */
     879           2 :             dbinfo[i].made_publication = false;
     880             :         }
     881             :         else
     882             :         {
     883             :             /*
     884             :              * Create publication on publisher. This step should be executed
     885             :              * *before* promoting the subscriber to avoid any transactions
     886             :              * between consistent LSN and the new publication rows (such
     887             :              * transactions wouldn't see the new publication rows resulting in
     888             :              * an error).
     889             :              */
     890          14 :             create_publication(conn, &dbinfo[i]);
     891             :         }
     892             : 
     893             :         /* Create replication slot on publisher */
     894          16 :         if (lsn)
     895           2 :             pg_free(lsn);
     896          16 :         lsn = create_logical_replication_slot(conn, &dbinfo[i]);
     897          16 :         if (lsn == NULL && !dry_run)
     898           0 :             exit(1);
     899             : 
     900             :         /*
     901             :          * Since we are using the LSN returned by the last replication slot as
     902             :          * recovery_target_lsn, this LSN is ahead of the current WAL position
     903             :          * and the recovery waits until the publisher writes a WAL record to
     904             :          * reach the target and ends the recovery. On idle systems, this wait
     905             :          * time is unpredictable and could lead to failure in promoting the
     906             :          * subscriber. To avoid that, insert a harmless WAL record.
     907             :          */
     908          16 :         if (i == num_dbs - 1 && !dry_run)
     909             :         {
     910             :             PGresult   *res;
     911             : 
     912           2 :             res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
     913           2 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     914             :             {
     915           0 :                 pg_log_error("could not write an additional WAL record: %s",
     916             :                              PQresultErrorMessage(res));
     917           0 :                 disconnect_database(conn, true);
     918             :             }
     919           2 :             PQclear(res);
     920             :         }
     921             : 
     922          16 :         disconnect_database(conn, false);
     923             :     }
     924             : 
     925           8 :     return lsn;
     926             : }
     927             : 
     928             : /*
     929             :  * Is recovery still in progress?
     930             :  */
     931             : static bool
     932          30 : server_is_in_recovery(PGconn *conn)
     933             : {
     934             :     PGresult   *res;
     935             :     int         ret;
     936             : 
     937          30 :     res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
     938             : 
     939          30 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     940             :     {
     941           0 :         pg_log_error("could not obtain recovery progress: %s",
     942             :                      PQresultErrorMessage(res));
     943           0 :         disconnect_database(conn, true);
     944             :     }
     945             : 
     946             : 
     947          30 :     ret = strcmp("t", PQgetvalue(res, 0, 0));
     948             : 
     949          30 :     PQclear(res);
     950             : 
     951          30 :     return ret == 0;
     952             : }
     953             : 
     954             : /*
     955             :  * Is the primary server ready for logical replication?
     956             :  *
     957             :  * XXX Does it not allow a synchronous replica?
     958             :  */
     959             : static void
     960          12 : check_publisher(const struct LogicalRepInfo *dbinfo)
     961             : {
     962             :     PGconn     *conn;
     963             :     PGresult   *res;
     964          12 :     bool        failed = false;
     965             : 
     966             :     char       *wal_level;
     967             :     int         max_repslots;
     968             :     int         cur_repslots;
     969             :     int         max_walsenders;
     970             :     int         cur_walsenders;
     971             :     int         max_prepared_transactions;
     972             :     char       *max_slot_wal_keep_size;
     973             : 
     974          12 :     pg_log_info("checking settings on publisher");
     975             : 
     976          12 :     conn = connect_database(dbinfo[0].pubconninfo, true);
     977             : 
     978             :     /*
     979             :      * If the primary server is in recovery (i.e. cascading replication),
     980             :      * objects (publication) cannot be created because it is read only.
     981             :      */
     982          12 :     if (server_is_in_recovery(conn))
     983             :     {
     984           2 :         pg_log_error("primary server cannot be in recovery");
     985           2 :         disconnect_database(conn, true);
     986             :     }
     987             : 
     988             :     /*------------------------------------------------------------------------
     989             :      * Logical replication requires a few parameters to be set on publisher.
     990             :      * Since these parameters are not a requirement for physical replication,
     991             :      * we should check it to make sure it won't fail.
     992             :      *
     993             :      * - wal_level >= replica
     994             :      * - max_replication_slots >= current + number of dbs to be converted
     995             :      * - max_wal_senders >= current + number of dbs to be converted
     996             :      * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
     997             :      * -----------------------------------------------------------------------
     998             :      */
     999          10 :     res = PQexec(conn,
    1000             :                  "SELECT pg_catalog.current_setting('wal_level'),"
    1001             :                  " pg_catalog.current_setting('max_replication_slots'),"
    1002             :                  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
    1003             :                  " pg_catalog.current_setting('max_wal_senders'),"
    1004             :                  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
    1005             :                  " pg_catalog.current_setting('max_prepared_transactions'),"
    1006             :                  " pg_catalog.current_setting('max_slot_wal_keep_size')");
    1007             : 
    1008          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1009             :     {
    1010           0 :         pg_log_error("could not obtain publisher settings: %s",
    1011             :                      PQresultErrorMessage(res));
    1012           0 :         disconnect_database(conn, true);
    1013             :     }
    1014             : 
    1015          10 :     wal_level = pg_strdup(PQgetvalue(res, 0, 0));
    1016          10 :     max_repslots = atoi(PQgetvalue(res, 0, 1));
    1017          10 :     cur_repslots = atoi(PQgetvalue(res, 0, 2));
    1018          10 :     max_walsenders = atoi(PQgetvalue(res, 0, 3));
    1019          10 :     cur_walsenders = atoi(PQgetvalue(res, 0, 4));
    1020          10 :     max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
    1021          10 :     max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
    1022             : 
    1023          10 :     PQclear(res);
    1024             : 
    1025          10 :     pg_log_debug("publisher: wal_level: %s", wal_level);
    1026          10 :     pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
    1027          10 :     pg_log_debug("publisher: current replication slots: %d", cur_repslots);
    1028          10 :     pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
    1029          10 :     pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
    1030          10 :     pg_log_debug("publisher: max_prepared_transactions: %d",
    1031             :                  max_prepared_transactions);
    1032          10 :     pg_log_debug("publisher: max_slot_wal_keep_size: %s",
    1033             :                  max_slot_wal_keep_size);
    1034             : 
    1035          10 :     disconnect_database(conn, false);
    1036             : 
    1037          10 :     if (strcmp(wal_level, "minimal") == 0)
    1038             :     {
    1039           0 :         pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
    1040           0 :         failed = true;
    1041             :     }
    1042             : 
    1043          10 :     if (max_repslots - cur_repslots < num_dbs)
    1044             :     {
    1045           2 :         pg_log_error("publisher requires %d replication slots, but only %d remain",
    1046             :                      num_dbs, max_repslots - cur_repslots);
    1047           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1048             :                           "max_replication_slots", cur_repslots + num_dbs);
    1049           2 :         failed = true;
    1050             :     }
    1051             : 
    1052          10 :     if (max_walsenders - cur_walsenders < num_dbs)
    1053             :     {
    1054           2 :         pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
    1055             :                      num_dbs, max_walsenders - cur_walsenders);
    1056           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1057             :                           "max_wal_senders", cur_walsenders + num_dbs);
    1058           2 :         failed = true;
    1059             :     }
    1060             : 
    1061          10 :     if (max_prepared_transactions != 0 && !dbinfos.two_phase)
    1062             :     {
    1063           0 :         pg_log_warning("two_phase option will not be enabled for replication slots");
    1064           0 :         pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
    1065             :                               "Prepared transactions will be replicated at COMMIT PREPARED.");
    1066           0 :         pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
    1067             :     }
    1068             : 
    1069             :     /*
    1070             :      * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
    1071             :      * is set to a non-default value, it may cause replication failures due to
    1072             :      * required WAL files being prematurely removed.
    1073             :      */
    1074          10 :     if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
    1075             :     {
    1076           0 :         pg_log_warning("required WAL could be removed from the publisher");
    1077           0 :         pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
    1078             :                             "max_slot_wal_keep_size");
    1079             :     }
    1080             : 
    1081          10 :     pg_free(wal_level);
    1082             : 
    1083          10 :     if (failed)
    1084           2 :         exit(1);
    1085           8 : }
    1086             : 
    1087             : /*
    1088             :  * Is the standby server ready for logical replication?
    1089             :  *
    1090             :  * XXX Does it not allow a time-delayed replica?
    1091             :  *
    1092             :  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
    1093             :  * is S, it cannot detect there is a replica (server C) because server S starts
    1094             :  * accepting only local connections and server C cannot connect to it. Hence,
    1095             :  * there is not a reliable way to provide a suitable error saying the server C
    1096             :  * will be broken at the end of this process (due to pg_resetwal).
    1097             :  */
    1098             : static void
    1099          16 : check_subscriber(const struct LogicalRepInfo *dbinfo)
    1100             : {
    1101             :     PGconn     *conn;
    1102             :     PGresult   *res;
    1103          16 :     bool        failed = false;
    1104             : 
    1105             :     int         max_lrworkers;
    1106             :     int         max_reporigins;
    1107             :     int         max_wprocs;
    1108             : 
    1109          16 :     pg_log_info("checking settings on subscriber");
    1110             : 
    1111          16 :     conn = connect_database(dbinfo[0].subconninfo, true);
    1112             : 
    1113             :     /* The target server must be a standby */
    1114          16 :     if (!server_is_in_recovery(conn))
    1115             :     {
    1116           2 :         pg_log_error("target server must be a standby");
    1117           2 :         disconnect_database(conn, true);
    1118             :     }
    1119             : 
    1120             :     /*------------------------------------------------------------------------
    1121             :      * Logical replication requires a few parameters to be set on subscriber.
    1122             :      * Since these parameters are not a requirement for physical replication,
    1123             :      * we should check it to make sure it won't fail.
    1124             :      *
    1125             :      * - max_active_replication_origins >= number of dbs to be converted
    1126             :      * - max_logical_replication_workers >= number of dbs to be converted
    1127             :      * - max_worker_processes >= 1 + number of dbs to be converted
    1128             :      *------------------------------------------------------------------------
    1129             :      */
    1130          14 :     res = PQexec(conn,
    1131             :                  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
    1132             :                  "'max_logical_replication_workers', "
    1133             :                  "'max_active_replication_origins', "
    1134             :                  "'max_worker_processes', "
    1135             :                  "'primary_slot_name') "
    1136             :                  "ORDER BY name");
    1137             : 
    1138          14 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1139             :     {
    1140           0 :         pg_log_error("could not obtain subscriber settings: %s",
    1141             :                      PQresultErrorMessage(res));
    1142           0 :         disconnect_database(conn, true);
    1143             :     }
    1144             : 
    1145          14 :     max_reporigins = atoi(PQgetvalue(res, 0, 0));
    1146          14 :     max_lrworkers = atoi(PQgetvalue(res, 1, 0));
    1147          14 :     max_wprocs = atoi(PQgetvalue(res, 2, 0));
    1148          14 :     if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
    1149          12 :         primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
    1150             : 
    1151          14 :     pg_log_debug("subscriber: max_logical_replication_workers: %d",
    1152             :                  max_lrworkers);
    1153          14 :     pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
    1154          14 :     pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
    1155          14 :     if (primary_slot_name)
    1156          12 :         pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
    1157             : 
    1158          14 :     PQclear(res);
    1159             : 
    1160          14 :     disconnect_database(conn, false);
    1161             : 
    1162          14 :     if (max_reporigins < num_dbs)
    1163             :     {
    1164           2 :         pg_log_error("subscriber requires %d active replication origins, but only %d remain",
    1165             :                      num_dbs, max_reporigins);
    1166           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1167             :                           "max_active_replication_origins", num_dbs);
    1168           2 :         failed = true;
    1169             :     }
    1170             : 
    1171          14 :     if (max_lrworkers < num_dbs)
    1172             :     {
    1173           2 :         pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
    1174             :                      num_dbs, max_lrworkers);
    1175           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1176             :                           "max_logical_replication_workers", num_dbs);
    1177           2 :         failed = true;
    1178             :     }
    1179             : 
    1180          14 :     if (max_wprocs < num_dbs + 1)
    1181             :     {
    1182           2 :         pg_log_error("subscriber requires %d worker processes, but only %d remain",
    1183             :                      num_dbs + 1, max_wprocs);
    1184           2 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1185             :                           "max_worker_processes", num_dbs + 1);
    1186           2 :         failed = true;
    1187             :     }
    1188             : 
    1189          14 :     if (failed)
    1190           2 :         exit(1);
    1191          12 : }
    1192             : 
    1193             : /*
    1194             :  * Drop a specified subscription. This is to avoid duplicate subscriptions on
    1195             :  * the primary (publisher node) and the newly created subscriber. We
    1196             :  * shouldn't drop the associated slot as that would be used by the publisher
    1197             :  * node.
    1198             :  */
    1199             : static void
    1200           8 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
    1201             : {
    1202           8 :     PQExpBuffer query = createPQExpBuffer();
    1203             :     PGresult   *res;
    1204             : 
    1205             :     Assert(conn != NULL);
    1206             : 
    1207             :     /*
    1208             :      * Construct a query string. These commands are allowed to be executed
    1209             :      * within a transaction.
    1210             :      */
    1211           8 :     appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
    1212             :                       subname);
    1213           8 :     appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
    1214             :                       subname);
    1215           8 :     appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
    1216             : 
    1217           8 :     if (dry_run)
    1218           6 :         pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
    1219             :                     subname, dbname);
    1220             :     else
    1221             :     {
    1222           2 :         pg_log_info("dropping subscription \"%s\" in database \"%s\"",
    1223             :                     subname, dbname);
    1224             : 
    1225           2 :         res = PQexec(conn, query->data);
    1226             : 
    1227           2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1228             :         {
    1229           0 :             pg_log_error("could not drop subscription \"%s\": %s",
    1230             :                          subname, PQresultErrorMessage(res));
    1231           0 :             disconnect_database(conn, true);
    1232             :         }
    1233             : 
    1234           2 :         PQclear(res);
    1235             :     }
    1236             : 
    1237           8 :     destroyPQExpBuffer(query);
    1238           8 : }
    1239             : 
    1240             : /*
    1241             :  * Retrieve and drop the pre-existing subscriptions.
    1242             :  */
    1243             : static void
    1244          16 : check_and_drop_existing_subscriptions(PGconn *conn,
    1245             :                                       const struct LogicalRepInfo *dbinfo)
    1246             : {
    1247          16 :     PQExpBuffer query = createPQExpBuffer();
    1248             :     char       *dbname;
    1249             :     PGresult   *res;
    1250             : 
    1251             :     Assert(conn != NULL);
    1252             : 
    1253          16 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1254             : 
    1255          16 :     appendPQExpBuffer(query,
    1256             :                       "SELECT s.subname FROM pg_catalog.pg_subscription s "
    1257             :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1258             :                       "WHERE d.datname = %s",
    1259             :                       dbname);
    1260          16 :     res = PQexec(conn, query->data);
    1261             : 
    1262          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1263             :     {
    1264           0 :         pg_log_error("could not obtain pre-existing subscriptions: %s",
    1265             :                      PQresultErrorMessage(res));
    1266           0 :         disconnect_database(conn, true);
    1267             :     }
    1268             : 
    1269          24 :     for (int i = 0; i < PQntuples(res); i++)
    1270           8 :         drop_existing_subscription(conn, PQgetvalue(res, i, 0),
    1271           8 :                                    dbinfo->dbname);
    1272             : 
    1273          16 :     PQclear(res);
    1274          16 :     destroyPQExpBuffer(query);
    1275          16 :     PQfreemem(dbname);
    1276          16 : }
    1277             : 
    1278             : /*
    1279             :  * Create the subscriptions, adjust the initial location for logical
    1280             :  * replication and enable the subscriptions. That's the last step for logical
    1281             :  * replication setup.
    1282             :  */
    1283             : static void
    1284           8 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
    1285             : {
    1286          24 :     for (int i = 0; i < num_dbs; i++)
    1287             :     {
    1288             :         PGconn     *conn;
    1289             : 
    1290             :         /* Connect to subscriber. */
    1291          16 :         conn = connect_database(dbinfo[i].subconninfo, true);
    1292             : 
    1293             :         /*
    1294             :          * We don't need the pre-existing subscriptions on the newly formed
    1295             :          * subscriber. They can connect to other publisher nodes and either
    1296             :          * get some unwarranted data or can lead to ERRORs in connecting to
    1297             :          * such nodes.
    1298             :          */
    1299          16 :         check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
    1300             : 
    1301             :         /* Check and drop the required publications in the given database. */
    1302          16 :         check_and_drop_publications(conn, &dbinfo[i]);
    1303             : 
    1304          16 :         create_subscription(conn, &dbinfo[i]);
    1305             : 
    1306             :         /* Set the replication progress to the correct LSN */
    1307          16 :         set_replication_progress(conn, &dbinfo[i], consistent_lsn);
    1308             : 
    1309             :         /* Enable subscription */
    1310          16 :         enable_subscription(conn, &dbinfo[i]);
    1311             : 
    1312          16 :         disconnect_database(conn, false);
    1313             :     }
    1314           8 : }
    1315             : 
    1316             : /*
    1317             :  * Write the required recovery parameters.
    1318             :  */
    1319             : static void
    1320           8 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
    1321             : {
    1322             :     PGconn     *conn;
    1323             :     PQExpBuffer recoveryconfcontents;
    1324             : 
    1325             :     /*
    1326             :      * Despite of the recovery parameters will be written to the subscriber,
    1327             :      * use a publisher connection. The primary_conninfo is generated using the
    1328             :      * connection settings.
    1329             :      */
    1330           8 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1331             : 
    1332             :     /*
    1333             :      * Write recovery parameters.
    1334             :      *
    1335             :      * The subscriber is not running yet. In dry run mode, the recovery
    1336             :      * parameters *won't* be written. An invalid LSN is used for printing
    1337             :      * purposes. Additional recovery parameters are added here. It avoids
    1338             :      * unexpected behavior such as end of recovery as soon as a consistent
    1339             :      * state is reached (recovery_target) and failure due to multiple recovery
    1340             :      * targets (name, time, xid, LSN).
    1341             :      */
    1342           8 :     recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
    1343           8 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
    1344           8 :     appendPQExpBufferStr(recoveryconfcontents,
    1345             :                          "recovery_target_timeline = 'latest'\n");
    1346             : 
    1347             :     /*
    1348             :      * Set recovery_target_inclusive = false to avoid reapplying the
    1349             :      * transaction committed at 'lsn' after subscription is enabled. This is
    1350             :      * because the provided 'lsn' is also used as the replication start point
    1351             :      * for the subscription. So, the server can send the transaction committed
    1352             :      * at that 'lsn' after replication is started which can lead to applying
    1353             :      * the same transaction twice if we keep recovery_target_inclusive = true.
    1354             :      */
    1355           8 :     appendPQExpBufferStr(recoveryconfcontents,
    1356             :                          "recovery_target_inclusive = false\n");
    1357           8 :     appendPQExpBufferStr(recoveryconfcontents,
    1358             :                          "recovery_target_action = promote\n");
    1359           8 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
    1360           8 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
    1361           8 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
    1362             : 
    1363           8 :     if (dry_run)
    1364             :     {
    1365           6 :         appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
    1366           6 :         appendPQExpBuffer(recoveryconfcontents,
    1367             :                           "recovery_target_lsn = '%X/%08X'\n",
    1368             :                           LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1369             :     }
    1370             :     else
    1371             :     {
    1372           2 :         appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
    1373             :                           lsn);
    1374             :     }
    1375             : 
    1376           8 :     pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
    1377             : 
    1378           8 :     if (!dry_run)
    1379             :     {
    1380             :         char        conf_filename[MAXPGPATH];
    1381             :         FILE       *fd;
    1382             : 
    1383             :         /* Write the recovery parameters to INCLUDED_CONF_FILE */
    1384           2 :         snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
    1385             :                  INCLUDED_CONF_FILE);
    1386           2 :         fd = fopen(conf_filename, "w");
    1387           2 :         if (fd == NULL)
    1388           0 :             pg_fatal("could not open file \"%s\": %m", conf_filename);
    1389             : 
    1390           2 :         if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
    1391           0 :             pg_fatal("could not write to file \"%s\": %m", conf_filename);
    1392             : 
    1393           2 :         fclose(fd);
    1394           2 :         recovery_params_set = true;
    1395             : 
    1396             :         /* Include conditionally the recovery parameters. */
    1397           2 :         resetPQExpBuffer(recoveryconfcontents);
    1398           2 :         appendPQExpBufferStr(recoveryconfcontents,
    1399             :                              "include_if_exists '" INCLUDED_CONF_FILE "'\n");
    1400           2 :         WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
    1401             :     }
    1402             : 
    1403           8 :     disconnect_database(conn, false);
    1404           8 : }
    1405             : 
    1406             : /*
    1407             :  * Drop physical replication slot on primary if the standby was using it. After
    1408             :  * the transformation, it has no use.
    1409             :  *
    1410             :  * XXX we might not fail here. Instead, we provide a warning so the user
    1411             :  * eventually drops this replication slot later.
    1412             :  */
    1413             : static void
    1414           8 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
    1415             : {
    1416             :     PGconn     *conn;
    1417             : 
    1418             :     /* Replication slot does not exist, do nothing */
    1419           8 :     if (!primary_slot_name)
    1420           0 :         return;
    1421             : 
    1422           8 :     conn = connect_database(dbinfo[0].pubconninfo, false);
    1423           8 :     if (conn != NULL)
    1424             :     {
    1425           8 :         drop_replication_slot(conn, &dbinfo[0], slotname);
    1426           8 :         disconnect_database(conn, false);
    1427             :     }
    1428             :     else
    1429             :     {
    1430           0 :         pg_log_warning("could not drop replication slot \"%s\" on primary",
    1431             :                        slotname);
    1432           0 :         pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
    1433             :     }
    1434             : }
    1435             : 
    1436             : /*
    1437             :  * Drop failover replication slots on subscriber. After the transformation,
    1438             :  * they have no use.
    1439             :  *
    1440             :  * XXX We do not fail here. Instead, we provide a warning so the user can drop
    1441             :  * them later.
    1442             :  */
    1443             : static void
    1444           8 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
    1445             : {
    1446             :     PGconn     *conn;
    1447             :     PGresult   *res;
    1448             : 
    1449           8 :     conn = connect_database(dbinfo[0].subconninfo, false);
    1450           8 :     if (conn != NULL)
    1451             :     {
    1452             :         /* Get failover replication slot names */
    1453           8 :         res = PQexec(conn,
    1454             :                      "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
    1455             : 
    1456           8 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
    1457             :         {
    1458             :             /* Remove failover replication slots from subscriber */
    1459          16 :             for (int i = 0; i < PQntuples(res); i++)
    1460           8 :                 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
    1461             :         }
    1462             :         else
    1463             :         {
    1464           0 :             pg_log_warning("could not obtain failover replication slot information: %s",
    1465             :                            PQresultErrorMessage(res));
    1466           0 :             pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1467             :         }
    1468             : 
    1469           8 :         PQclear(res);
    1470           8 :         disconnect_database(conn, false);
    1471             :     }
    1472             :     else
    1473             :     {
    1474           0 :         pg_log_warning("could not drop failover replication slot");
    1475           0 :         pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1476             :     }
    1477           8 : }
    1478             : 
    1479             : /*
    1480             :  * Create a logical replication slot and returns a LSN.
    1481             :  *
    1482             :  * CreateReplicationSlot() is not used because it does not provide the one-row
    1483             :  * result set that contains the LSN.
    1484             :  */
    1485             : static char *
    1486          16 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1487             : {
    1488          16 :     PQExpBuffer str = createPQExpBuffer();
    1489          16 :     PGresult   *res = NULL;
    1490          16 :     const char *slot_name = dbinfo->replslotname;
    1491             :     char       *slot_name_esc;
    1492          16 :     char       *lsn = NULL;
    1493             : 
    1494             :     Assert(conn != NULL);
    1495             : 
    1496          16 :     if (dry_run)
    1497          12 :         pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
    1498             :                     slot_name, dbinfo->dbname);
    1499             :     else
    1500           4 :         pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
    1501             :                     slot_name, dbinfo->dbname);
    1502             : 
    1503          16 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1504             : 
    1505          16 :     appendPQExpBuffer(str,
    1506             :                       "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
    1507             :                       slot_name_esc,
    1508          16 :                       dbinfos.two_phase ? "true" : "false");
    1509             : 
    1510          16 :     PQfreemem(slot_name_esc);
    1511             : 
    1512          16 :     pg_log_debug("command is: %s", str->data);
    1513             : 
    1514          16 :     if (!dry_run)
    1515             :     {
    1516           4 :         res = PQexec(conn, str->data);
    1517           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1518             :         {
    1519           0 :             pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
    1520             :                          slot_name, dbinfo->dbname,
    1521             :                          PQresultErrorMessage(res));
    1522           0 :             PQclear(res);
    1523           0 :             destroyPQExpBuffer(str);
    1524           0 :             return NULL;
    1525             :         }
    1526             : 
    1527           4 :         lsn = pg_strdup(PQgetvalue(res, 0, 0));
    1528           4 :         PQclear(res);
    1529             :     }
    1530             : 
    1531             :     /* For cleanup purposes */
    1532          16 :     dbinfo->made_replslot = true;
    1533             : 
    1534          16 :     destroyPQExpBuffer(str);
    1535             : 
    1536          16 :     return lsn;
    1537             : }
    1538             : 
    1539             : static void
    1540          16 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
    1541             :                       const char *slot_name)
    1542             : {
    1543          16 :     PQExpBuffer str = createPQExpBuffer();
    1544             :     char       *slot_name_esc;
    1545             :     PGresult   *res;
    1546             : 
    1547             :     Assert(conn != NULL);
    1548             : 
    1549          16 :     if (dry_run)
    1550          12 :         pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
    1551             :                     slot_name, dbinfo->dbname);
    1552             :     else
    1553           4 :         pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
    1554             :                     slot_name, dbinfo->dbname);
    1555             : 
    1556          16 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1557             : 
    1558          16 :     appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
    1559             : 
    1560          16 :     PQfreemem(slot_name_esc);
    1561             : 
    1562          16 :     pg_log_debug("command is: %s", str->data);
    1563             : 
    1564          16 :     if (!dry_run)
    1565             :     {
    1566           4 :         res = PQexec(conn, str->data);
    1567           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1568             :         {
    1569           0 :             pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
    1570             :                          slot_name, dbinfo->dbname, PQresultErrorMessage(res));
    1571           0 :             dbinfo->made_replslot = false;   /* don't try again. */
    1572             :         }
    1573             : 
    1574           4 :         PQclear(res);
    1575             :     }
    1576             : 
    1577          16 :     destroyPQExpBuffer(str);
    1578          16 : }
    1579             : 
    1580             : /*
    1581             :  * Reports a suitable message if pg_ctl fails.
    1582             :  */
    1583             : static void
    1584          48 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
    1585             : {
    1586          48 :     if (rc != 0)
    1587             :     {
    1588           0 :         if (WIFEXITED(rc))
    1589             :         {
    1590           0 :             pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
    1591             :         }
    1592           0 :         else if (WIFSIGNALED(rc))
    1593             :         {
    1594             : #if defined(WIN32)
    1595             :             pg_log_error("pg_ctl was terminated by exception 0x%X",
    1596             :                          WTERMSIG(rc));
    1597             :             pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
    1598             : #else
    1599           0 :             pg_log_error("pg_ctl was terminated by signal %d: %s",
    1600             :                          WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
    1601             : #endif
    1602             :         }
    1603             :         else
    1604             :         {
    1605           0 :             pg_log_error("pg_ctl exited with unrecognized status %d", rc);
    1606             :         }
    1607             : 
    1608           0 :         pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
    1609           0 :         exit(1);
    1610             :     }
    1611          48 : }
    1612             : 
    1613             : static void
    1614          24 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
    1615             :                      bool restrict_logical_worker)
    1616             : {
    1617          24 :     PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    1618             :     int         rc;
    1619             : 
    1620          24 :     appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
    1621          24 :     appendShellString(pg_ctl_cmd, subscriber_dir);
    1622          24 :     appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
    1623             : 
    1624             :     /* Prevent unintended slot invalidation */
    1625          24 :     appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
    1626             : 
    1627          24 :     if (restricted_access)
    1628             :     {
    1629          24 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
    1630             : #if !defined(WIN32)
    1631             : 
    1632             :         /*
    1633             :          * An empty listen_addresses list means the server does not listen on
    1634             :          * any IP interfaces; only Unix-domain sockets can be used to connect
    1635             :          * to the server. Prevent external connections to minimize the chance
    1636             :          * of failure.
    1637             :          */
    1638          24 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
    1639          24 :         if (opt->socket_dir)
    1640          24 :             appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
    1641          24 :                               opt->socket_dir);
    1642          24 :         appendPQExpBufferChar(pg_ctl_cmd, '"');
    1643             : #endif
    1644             :     }
    1645          24 :     if (opt->config_file != NULL)
    1646           0 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
    1647           0 :                           opt->config_file);
    1648             : 
    1649             :     /* Suppress to start logical replication if requested */
    1650          24 :     if (restrict_logical_worker)
    1651           8 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
    1652             : 
    1653          24 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
    1654          24 :     rc = system(pg_ctl_cmd->data);
    1655          24 :     pg_ctl_status(pg_ctl_cmd->data, rc);
    1656          24 :     standby_running = true;
    1657          24 :     destroyPQExpBuffer(pg_ctl_cmd);
    1658          24 :     pg_log_info("server was started");
    1659          24 : }
    1660             : 
    1661             : static void
    1662          24 : stop_standby_server(const char *datadir)
    1663             : {
    1664             :     char       *pg_ctl_cmd;
    1665             :     int         rc;
    1666             : 
    1667          24 :     pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
    1668             :                           datadir);
    1669          24 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
    1670          24 :     rc = system(pg_ctl_cmd);
    1671          24 :     pg_ctl_status(pg_ctl_cmd, rc);
    1672          24 :     standby_running = false;
    1673          24 :     pg_log_info("server was stopped");
    1674          24 : }
    1675             : 
    1676             : /*
    1677             :  * Returns after the server finishes the recovery process.
    1678             :  *
    1679             :  * If recovery_timeout option is set, terminate abnormally without finishing
    1680             :  * the recovery process. By default, it waits forever.
    1681             :  *
    1682             :  * XXX Is the recovery process still in progress? When recovery process has a
    1683             :  * better progress reporting mechanism, it should be added here.
    1684             :  */
    1685             : static void
    1686           8 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
    1687             : {
    1688             :     PGconn     *conn;
    1689           8 :     bool        ready = false;
    1690           8 :     int         timer = 0;
    1691             : 
    1692           8 :     pg_log_info("waiting for the target server to reach the consistent state");
    1693             : 
    1694           8 :     conn = connect_database(conninfo, true);
    1695             : 
    1696             :     for (;;)
    1697             :     {
    1698             :         /* Did the recovery process finish? We're done if so. */
    1699           8 :         if (dry_run || !server_is_in_recovery(conn))
    1700             :         {
    1701           8 :             ready = true;
    1702           8 :             recovery_ended = true;
    1703           8 :             break;
    1704             :         }
    1705             : 
    1706             :         /* Bail out after recovery_timeout seconds if this option is set */
    1707           0 :         if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
    1708             :         {
    1709           0 :             stop_standby_server(subscriber_dir);
    1710           0 :             pg_log_error("recovery timed out");
    1711           0 :             disconnect_database(conn, true);
    1712             :         }
    1713             : 
    1714             :         /* Keep waiting */
    1715           0 :         pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
    1716           0 :         timer += WAIT_INTERVAL;
    1717             :     }
    1718             : 
    1719           8 :     disconnect_database(conn, false);
    1720             : 
    1721           8 :     if (!ready)
    1722           0 :         pg_fatal("server did not end recovery");
    1723             : 
    1724           8 :     pg_log_info("target server reached the consistent state");
    1725           8 :     pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
    1726           8 : }
    1727             : 
    1728             : /*
    1729             :  * Create a publication that includes all tables in the database.
    1730             :  */
    1731             : static void
    1732          14 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1733             : {
    1734          14 :     PQExpBuffer str = createPQExpBuffer();
    1735             :     PGresult   *res;
    1736             :     char       *ipubname_esc;
    1737             :     char       *spubname_esc;
    1738             : 
    1739             :     Assert(conn != NULL);
    1740             : 
    1741          14 :     ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1742          14 :     spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1743             : 
    1744             :     /* Check if the publication already exists */
    1745          14 :     appendPQExpBuffer(str,
    1746             :                       "SELECT 1 FROM pg_catalog.pg_publication "
    1747             :                       "WHERE pubname = %s",
    1748             :                       spubname_esc);
    1749          14 :     res = PQexec(conn, str->data);
    1750          14 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1751             :     {
    1752           0 :         pg_log_error("could not obtain publication information: %s",
    1753             :                      PQresultErrorMessage(res));
    1754           0 :         disconnect_database(conn, true);
    1755             :     }
    1756             : 
    1757          14 :     if (PQntuples(res) == 1)
    1758             :     {
    1759             :         /*
    1760             :          * Unfortunately, if it reaches this code path, it will always fail
    1761             :          * (unless you decide to change the existing publication name). That's
    1762             :          * bad but it is very unlikely that the user will choose a name with
    1763             :          * pg_createsubscriber_ prefix followed by the exact database oid and
    1764             :          * a random number.
    1765             :          */
    1766           0 :         pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
    1767           0 :         pg_log_error_hint("Consider renaming this publication before continuing.");
    1768           0 :         disconnect_database(conn, true);
    1769             :     }
    1770             : 
    1771          14 :     PQclear(res);
    1772          14 :     resetPQExpBuffer(str);
    1773             : 
    1774          14 :     if (dry_run)
    1775          12 :         pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
    1776             :                     dbinfo->pubname, dbinfo->dbname);
    1777             :     else
    1778           2 :         pg_log_info("creating publication \"%s\" in database \"%s\"",
    1779             :                     dbinfo->pubname, dbinfo->dbname);
    1780             : 
    1781          14 :     appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
    1782             :                       ipubname_esc);
    1783             : 
    1784          14 :     pg_log_debug("command is: %s", str->data);
    1785             : 
    1786          14 :     if (!dry_run)
    1787             :     {
    1788           2 :         res = PQexec(conn, str->data);
    1789           2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1790             :         {
    1791           0 :             pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
    1792             :                          dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1793           0 :             disconnect_database(conn, true);
    1794             :         }
    1795           2 :         PQclear(res);
    1796             :     }
    1797             : 
    1798             :     /* For cleanup purposes */
    1799          14 :     dbinfo->made_publication = true;
    1800             : 
    1801          14 :     PQfreemem(ipubname_esc);
    1802          14 :     PQfreemem(spubname_esc);
    1803          14 :     destroyPQExpBuffer(str);
    1804          14 : }
    1805             : 
    1806             : /*
    1807             :  * Drop the specified publication in the given database.
    1808             :  */
    1809             : static void
    1810          20 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
    1811             :                  bool *made_publication)
    1812             : {
    1813          20 :     PQExpBuffer str = createPQExpBuffer();
    1814             :     PGresult   *res;
    1815             :     char       *pubname_esc;
    1816             : 
    1817             :     Assert(conn != NULL);
    1818             : 
    1819          20 :     pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
    1820             : 
    1821          20 :     if (dry_run)
    1822          12 :         pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
    1823             :                     pubname, dbname);
    1824             :     else
    1825           8 :         pg_log_info("dropping publication \"%s\" in database \"%s\"",
    1826             :                     pubname, dbname);
    1827             : 
    1828          20 :     appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
    1829             : 
    1830          20 :     PQfreemem(pubname_esc);
    1831             : 
    1832          20 :     pg_log_debug("command is: %s", str->data);
    1833             : 
    1834          20 :     if (!dry_run)
    1835             :     {
    1836           8 :         res = PQexec(conn, str->data);
    1837           8 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1838             :         {
    1839           0 :             pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
    1840             :                          pubname, dbname, PQresultErrorMessage(res));
    1841           0 :             *made_publication = false;  /* don't try again. */
    1842             : 
    1843             :             /*
    1844             :              * Don't disconnect and exit here. This routine is used by primary
    1845             :              * (cleanup publication / replication slot due to an error) and
    1846             :              * subscriber (remove the replicated publications). In both cases,
    1847             :              * it can continue and provide instructions for the user to remove
    1848             :              * it later if cleanup fails.
    1849             :              */
    1850             :         }
    1851           8 :         PQclear(res);
    1852             :     }
    1853             : 
    1854          20 :     destroyPQExpBuffer(str);
    1855          20 : }
    1856             : 
    1857             : /*
    1858             :  * Retrieve and drop the publications.
    1859             :  *
    1860             :  * Publications copied during physical replication remain on the subscriber
    1861             :  * after promotion. If --clean=publications is specified, drop all existing
    1862             :  * publications in the subscriber database. Otherwise, only drop publications
    1863             :  * that were created by pg_createsubscriber during this operation.
    1864             :  */
    1865             : static void
    1866          16 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1867             : {
    1868             :     PGresult   *res;
    1869          16 :     bool        drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
    1870             : 
    1871             :     Assert(conn != NULL);
    1872             : 
    1873          16 :     if (drop_all_pubs)
    1874             :     {
    1875           4 :         pg_log_info("dropping all existing publications in database \"%s\"",
    1876             :                     dbinfo->dbname);
    1877             : 
    1878             :         /* Fetch all publication names */
    1879           4 :         res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
    1880           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1881             :         {
    1882           0 :             pg_log_error("could not obtain publication information: %s",
    1883             :                          PQresultErrorMessage(res));
    1884           0 :             PQclear(res);
    1885           0 :             disconnect_database(conn, true);
    1886             :         }
    1887             : 
    1888             :         /* Drop each publication */
    1889          12 :         for (int i = 0; i < PQntuples(res); i++)
    1890           8 :             drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
    1891             :                              &dbinfo->made_publication);
    1892             : 
    1893           4 :         PQclear(res);
    1894             :     }
    1895             :     else
    1896             :     {
    1897             :         /* Drop publication only if it was created by this tool */
    1898          12 :         if (dbinfo->made_publication)
    1899             :         {
    1900          12 :             drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
    1901             :                              &dbinfo->made_publication);
    1902             :         }
    1903             :         else
    1904             :         {
    1905           0 :             if (dry_run)
    1906           0 :                 pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
    1907             :                             dbinfo->pubname, dbinfo->dbname);
    1908             :             else
    1909           0 :                 pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
    1910             :                             dbinfo->pubname, dbinfo->dbname);
    1911             :         }
    1912             :     }
    1913          16 : }
    1914             : 
    1915             : /*
    1916             :  * Create a subscription with some predefined options.
    1917             :  *
    1918             :  * A replication slot was already created in a previous step. Let's use it.  It
    1919             :  * is not required to copy data. The subscription will be created but it will
    1920             :  * not be enabled now. That's because the replication progress must be set and
    1921             :  * the replication origin name (one of the function arguments) contains the
    1922             :  * subscription OID in its name. Once the subscription is created,
    1923             :  * set_replication_progress() can obtain the chosen origin name and set up its
    1924             :  * initial location.
    1925             :  */
    1926             : static void
    1927          16 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1928             : {
    1929          16 :     PQExpBuffer str = createPQExpBuffer();
    1930             :     PGresult   *res;
    1931             :     char       *pubname_esc;
    1932             :     char       *subname_esc;
    1933             :     char       *pubconninfo_esc;
    1934             :     char       *replslotname_esc;
    1935             : 
    1936             :     Assert(conn != NULL);
    1937             : 
    1938          16 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1939          16 :     subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1940          16 :     pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
    1941          16 :     replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
    1942             : 
    1943          16 :     if (dry_run)
    1944          12 :         pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
    1945             :                     dbinfo->subname, dbinfo->dbname);
    1946             :     else
    1947           4 :         pg_log_info("creating subscription \"%s\" in database \"%s\"",
    1948             :                     dbinfo->subname, dbinfo->dbname);
    1949             : 
    1950          16 :     appendPQExpBuffer(str,
    1951             :                       "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
    1952             :                       "WITH (create_slot = false, enabled = false, "
    1953             :                       "slot_name = %s, copy_data = false, two_phase = %s)",
    1954             :                       subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
    1955          16 :                       dbinfos.two_phase ? "true" : "false");
    1956             : 
    1957          16 :     PQfreemem(pubname_esc);
    1958          16 :     PQfreemem(subname_esc);
    1959          16 :     PQfreemem(pubconninfo_esc);
    1960          16 :     PQfreemem(replslotname_esc);
    1961             : 
    1962          16 :     pg_log_debug("command is: %s", str->data);
    1963             : 
    1964          16 :     if (!dry_run)
    1965             :     {
    1966           4 :         res = PQexec(conn, str->data);
    1967           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1968             :         {
    1969           0 :             pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
    1970             :                          dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
    1971           0 :             disconnect_database(conn, true);
    1972             :         }
    1973           4 :         PQclear(res);
    1974             :     }
    1975             : 
    1976          16 :     destroyPQExpBuffer(str);
    1977          16 : }
    1978             : 
    1979             : /*
    1980             :  * Sets the replication progress to the consistent LSN.
    1981             :  *
    1982             :  * The subscriber caught up to the consistent LSN provided by the last
    1983             :  * replication slot that was created. The goal is to set up the initial
    1984             :  * location for the logical replication that is the exact LSN that the
    1985             :  * subscriber was promoted. Once the subscription is enabled it will start
    1986             :  * streaming from that location onwards.  In dry run mode, the subscription OID
    1987             :  * and LSN are set to invalid values for printing purposes.
    1988             :  */
    1989             : static void
    1990          16 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
    1991             : {
    1992          16 :     PQExpBuffer str = createPQExpBuffer();
    1993             :     PGresult   *res;
    1994             :     Oid         suboid;
    1995             :     char       *subname;
    1996             :     char       *dbname;
    1997             :     char       *originname;
    1998             :     char       *lsnstr;
    1999             : 
    2000             :     Assert(conn != NULL);
    2001             : 
    2002          16 :     subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
    2003          16 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    2004             : 
    2005          16 :     appendPQExpBuffer(str,
    2006             :                       "SELECT s.oid FROM pg_catalog.pg_subscription s "
    2007             :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    2008             :                       "WHERE s.subname = %s AND d.datname = %s",
    2009             :                       subname, dbname);
    2010             : 
    2011          16 :     res = PQexec(conn, str->data);
    2012          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2013             :     {
    2014           0 :         pg_log_error("could not obtain subscription OID: %s",
    2015             :                      PQresultErrorMessage(res));
    2016           0 :         disconnect_database(conn, true);
    2017             :     }
    2018             : 
    2019          16 :     if (PQntuples(res) != 1 && !dry_run)
    2020             :     {
    2021           0 :         pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
    2022             :                      PQntuples(res), 1);
    2023           0 :         disconnect_database(conn, true);
    2024             :     }
    2025             : 
    2026          16 :     if (dry_run)
    2027             :     {
    2028          12 :         suboid = InvalidOid;
    2029          12 :         lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    2030             :     }
    2031             :     else
    2032             :     {
    2033           4 :         suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
    2034           4 :         lsnstr = psprintf("%s", lsn);
    2035             :     }
    2036             : 
    2037          16 :     PQclear(res);
    2038             : 
    2039             :     /*
    2040             :      * The origin name is defined as pg_%u. %u is the subscription OID. See
    2041             :      * ApplyWorkerMain().
    2042             :      */
    2043          16 :     originname = psprintf("pg_%u", suboid);
    2044             : 
    2045          16 :     if (dry_run)
    2046          12 :         pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2047             :                     originname, lsnstr, dbinfo->dbname);
    2048             :     else
    2049           4 :         pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2050             :                     originname, lsnstr, dbinfo->dbname);
    2051             : 
    2052          16 :     resetPQExpBuffer(str);
    2053          16 :     appendPQExpBuffer(str,
    2054             :                       "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
    2055             :                       originname, lsnstr);
    2056             : 
    2057          16 :     pg_log_debug("command is: %s", str->data);
    2058             : 
    2059          16 :     if (!dry_run)
    2060             :     {
    2061           4 :         res = PQexec(conn, str->data);
    2062           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2063             :         {
    2064           0 :             pg_log_error("could not set replication progress for subscription \"%s\": %s",
    2065             :                          dbinfo->subname, PQresultErrorMessage(res));
    2066           0 :             disconnect_database(conn, true);
    2067             :         }
    2068           4 :         PQclear(res);
    2069             :     }
    2070             : 
    2071          16 :     PQfreemem(subname);
    2072          16 :     PQfreemem(dbname);
    2073          16 :     pg_free(originname);
    2074          16 :     pg_free(lsnstr);
    2075          16 :     destroyPQExpBuffer(str);
    2076          16 : }
    2077             : 
    2078             : /*
    2079             :  * Enables the subscription.
    2080             :  *
    2081             :  * The subscription was created in a previous step but it was disabled. After
    2082             :  * adjusting the initial logical replication location, enable the subscription.
    2083             :  */
    2084             : static void
    2085          16 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    2086             : {
    2087          16 :     PQExpBuffer str = createPQExpBuffer();
    2088             :     PGresult   *res;
    2089             :     char       *subname;
    2090             : 
    2091             :     Assert(conn != NULL);
    2092             : 
    2093          16 :     subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    2094             : 
    2095          16 :     if (dry_run)
    2096          12 :         pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
    2097             :                     dbinfo->subname, dbinfo->dbname);
    2098             :     else
    2099           4 :         pg_log_info("enabling subscription \"%s\" in database \"%s\"",
    2100             :                     dbinfo->subname, dbinfo->dbname);
    2101             : 
    2102          16 :     appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
    2103             : 
    2104          16 :     pg_log_debug("command is: %s", str->data);
    2105             : 
    2106          16 :     if (!dry_run)
    2107             :     {
    2108           4 :         res = PQexec(conn, str->data);
    2109           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2110             :         {
    2111           0 :             pg_log_error("could not enable subscription \"%s\": %s",
    2112             :                          dbinfo->subname, PQresultErrorMessage(res));
    2113           0 :             disconnect_database(conn, true);
    2114             :         }
    2115             : 
    2116           4 :         PQclear(res);
    2117             :     }
    2118             : 
    2119          16 :     PQfreemem(subname);
    2120          16 :     destroyPQExpBuffer(str);
    2121          16 : }
    2122             : 
    2123             : /*
    2124             :  * Fetch a list of all connectable non-template databases from the source server
    2125             :  * and form a list such that they appear as if the user has specified multiple
    2126             :  * --database options, one for each source database.
    2127             :  */
    2128             : static void
    2129           2 : get_publisher_databases(struct CreateSubscriberOptions *opt,
    2130             :                         bool dbnamespecified)
    2131             : {
    2132             :     PGconn     *conn;
    2133             :     PGresult   *res;
    2134             : 
    2135             :     /* If a database name was specified, just connect to it. */
    2136           2 :     if (dbnamespecified)
    2137           0 :         conn = connect_database(opt->pub_conninfo_str, true);
    2138             :     else
    2139             :     {
    2140             :         /* Otherwise, try postgres first and then template1. */
    2141             :         char       *conninfo;
    2142             : 
    2143           2 :         conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
    2144           2 :         conn = connect_database(conninfo, false);
    2145           2 :         pg_free(conninfo);
    2146           2 :         if (!conn)
    2147             :         {
    2148           0 :             conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
    2149           0 :             conn = connect_database(conninfo, true);
    2150           0 :             pg_free(conninfo);
    2151             :         }
    2152             :     }
    2153             : 
    2154           2 :     res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
    2155           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2156             :     {
    2157           0 :         pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
    2158           0 :         PQclear(res);
    2159           0 :         disconnect_database(conn, true);
    2160             :     }
    2161             : 
    2162           8 :     for (int i = 0; i < PQntuples(res); i++)
    2163             :     {
    2164           6 :         const char *dbname = PQgetvalue(res, i, 0);
    2165             : 
    2166           6 :         simple_string_list_append(&opt->database_names, dbname);
    2167             : 
    2168             :         /* Increment num_dbs to reflect multiple --database options */
    2169           6 :         num_dbs++;
    2170             :     }
    2171             : 
    2172           2 :     PQclear(res);
    2173           2 :     disconnect_database(conn, false);
    2174           2 : }
    2175             : 
    2176             : int
    2177          46 : main(int argc, char **argv)
    2178             : {
    2179             :     static struct option long_options[] =
    2180             :     {
    2181             :         {"all", no_argument, NULL, 'a'},
    2182             :         {"database", required_argument, NULL, 'd'},
    2183             :         {"pgdata", required_argument, NULL, 'D'},
    2184             :         {"dry-run", no_argument, NULL, 'n'},
    2185             :         {"subscriber-port", required_argument, NULL, 'p'},
    2186             :         {"publisher-server", required_argument, NULL, 'P'},
    2187             :         {"socketdir", required_argument, NULL, 's'},
    2188             :         {"recovery-timeout", required_argument, NULL, 't'},
    2189             :         {"enable-two-phase", no_argument, NULL, 'T'},
    2190             :         {"subscriber-username", required_argument, NULL, 'U'},
    2191             :         {"verbose", no_argument, NULL, 'v'},
    2192             :         {"version", no_argument, NULL, 'V'},
    2193             :         {"help", no_argument, NULL, '?'},
    2194             :         {"config-file", required_argument, NULL, 1},
    2195             :         {"publication", required_argument, NULL, 2},
    2196             :         {"replication-slot", required_argument, NULL, 3},
    2197             :         {"subscription", required_argument, NULL, 4},
    2198             :         {"clean", required_argument, NULL, 5},
    2199             :         {NULL, 0, NULL, 0}
    2200             :     };
    2201             : 
    2202          46 :     struct CreateSubscriberOptions opt = {0};
    2203             : 
    2204             :     int         c;
    2205             :     int         option_index;
    2206             : 
    2207             :     char       *pub_base_conninfo;
    2208             :     char       *sub_base_conninfo;
    2209          46 :     char       *dbname_conninfo = NULL;
    2210             : 
    2211             :     uint64      pub_sysid;
    2212             :     uint64      sub_sysid;
    2213             :     struct stat statbuf;
    2214             : 
    2215             :     char       *consistent_lsn;
    2216             : 
    2217             :     char        pidfile[MAXPGPATH];
    2218             : 
    2219          46 :     pg_logging_init(argv[0]);
    2220          46 :     pg_logging_set_level(PG_LOG_WARNING);
    2221          46 :     progname = get_progname(argv[0]);
    2222          46 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
    2223             : 
    2224          46 :     if (argc > 1)
    2225             :     {
    2226          44 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
    2227             :         {
    2228           2 :             usage();
    2229           2 :             exit(0);
    2230             :         }
    2231          42 :         else if (strcmp(argv[1], "-V") == 0
    2232          42 :                  || strcmp(argv[1], "--version") == 0)
    2233             :         {
    2234           2 :             puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
    2235           2 :             exit(0);
    2236             :         }
    2237             :     }
    2238             : 
    2239             :     /* Default settings */
    2240          42 :     subscriber_dir = NULL;
    2241          42 :     opt.config_file = NULL;
    2242          42 :     opt.pub_conninfo_str = NULL;
    2243          42 :     opt.socket_dir = NULL;
    2244          42 :     opt.sub_port = DEFAULT_SUB_PORT;
    2245          42 :     opt.sub_username = NULL;
    2246          42 :     opt.two_phase = false;
    2247          42 :     opt.database_names = (SimpleStringList)
    2248             :     {
    2249             :         0
    2250             :     };
    2251          42 :     opt.recovery_timeout = 0;
    2252          42 :     opt.all_dbs = false;
    2253             : 
    2254             :     /*
    2255             :      * Don't allow it to be run as root. It uses pg_ctl which does not allow
    2256             :      * it either.
    2257             :      */
    2258             : #ifndef WIN32
    2259          42 :     if (geteuid() == 0)
    2260             :     {
    2261           0 :         pg_log_error("cannot be executed by \"root\"");
    2262           0 :         pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
    2263             :                           progname);
    2264           0 :         exit(1);
    2265             :     }
    2266             : #endif
    2267             : 
    2268          42 :     get_restricted_token();
    2269             : 
    2270         324 :     while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
    2271         324 :                             long_options, &option_index)) != -1)
    2272             :     {
    2273         288 :         switch (c)
    2274             :         {
    2275           6 :             case 'a':
    2276           6 :                 opt.all_dbs = true;
    2277           6 :                 break;
    2278          50 :             case 'd':
    2279          50 :                 if (!simple_string_list_member(&opt.database_names, optarg))
    2280             :                 {
    2281          48 :                     simple_string_list_append(&opt.database_names, optarg);
    2282          48 :                     num_dbs++;
    2283             :                 }
    2284             :                 else
    2285           2 :                     pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
    2286          48 :                 break;
    2287          38 :             case 'D':
    2288          38 :                 subscriber_dir = pg_strdup(optarg);
    2289          38 :                 canonicalize_path(subscriber_dir);
    2290          38 :                 break;
    2291          18 :             case 'n':
    2292          18 :                 dry_run = true;
    2293          18 :                 break;
    2294          24 :             case 'p':
    2295          24 :                 opt.sub_port = pg_strdup(optarg);
    2296          24 :                 break;
    2297          36 :             case 'P':
    2298          36 :                 opt.pub_conninfo_str = pg_strdup(optarg);
    2299          36 :                 break;
    2300          24 :             case 's':
    2301          24 :                 opt.socket_dir = pg_strdup(optarg);
    2302          24 :                 canonicalize_path(opt.socket_dir);
    2303          24 :                 break;
    2304           6 :             case 't':
    2305           6 :                 opt.recovery_timeout = atoi(optarg);
    2306           6 :                 break;
    2307           2 :             case 'T':
    2308           2 :                 opt.two_phase = true;
    2309           2 :                 break;
    2310           0 :             case 'U':
    2311           0 :                 opt.sub_username = pg_strdup(optarg);
    2312           0 :                 break;
    2313          38 :             case 'v':
    2314          38 :                 pg_logging_increase_verbosity();
    2315          38 :                 break;
    2316           0 :             case 1:
    2317           0 :                 opt.config_file = pg_strdup(optarg);
    2318           0 :                 break;
    2319          24 :             case 2:
    2320          24 :                 if (!simple_string_list_member(&opt.pub_names, optarg))
    2321             :                 {
    2322          22 :                     simple_string_list_append(&opt.pub_names, optarg);
    2323          22 :                     num_pubs++;
    2324             :                 }
    2325             :                 else
    2326           2 :                     pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
    2327          22 :                 break;
    2328           8 :             case 3:
    2329           8 :                 if (!simple_string_list_member(&opt.replslot_names, optarg))
    2330             :                 {
    2331           8 :                     simple_string_list_append(&opt.replslot_names, optarg);
    2332           8 :                     num_replslots++;
    2333             :                 }
    2334             :                 else
    2335           0 :                     pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
    2336           8 :                 break;
    2337          10 :             case 4:
    2338          10 :                 if (!simple_string_list_member(&opt.sub_names, optarg))
    2339             :                 {
    2340          10 :                     simple_string_list_append(&opt.sub_names, optarg);
    2341          10 :                     num_subs++;
    2342             :                 }
    2343             :                 else
    2344           0 :                     pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
    2345          10 :                 break;
    2346           2 :             case 5:
    2347           2 :                 if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
    2348           2 :                     simple_string_list_append(&opt.objecttypes_to_clean, optarg);
    2349             :                 else
    2350           0 :                     pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
    2351           2 :                 break;
    2352           2 :             default:
    2353             :                 /* getopt_long already emitted a complaint */
    2354           2 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2355           2 :                 exit(1);
    2356             :         }
    2357             :     }
    2358             : 
    2359             :     /* Validate that --all is not used with incompatible options */
    2360          36 :     if (opt.all_dbs)
    2361             :     {
    2362           6 :         char       *bad_switch = NULL;
    2363             : 
    2364           6 :         if (num_dbs > 0)
    2365           2 :             bad_switch = "--database";
    2366           4 :         else if (num_pubs > 0)
    2367           2 :             bad_switch = "--publication";
    2368           2 :         else if (num_replslots > 0)
    2369           0 :             bad_switch = "--replication-slot";
    2370           2 :         else if (num_subs > 0)
    2371           0 :             bad_switch = "--subscription";
    2372             : 
    2373           6 :         if (bad_switch)
    2374             :         {
    2375           4 :             pg_log_error("options %s and %s cannot be used together",
    2376             :                          bad_switch, "-a/--all");
    2377           4 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2378           4 :             exit(1);
    2379             :         }
    2380             :     }
    2381             : 
    2382             :     /* Any non-option arguments? */
    2383          32 :     if (optind < argc)
    2384             :     {
    2385           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
    2386             :                      argv[optind]);
    2387           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2388           0 :         exit(1);
    2389             :     }
    2390             : 
    2391             :     /* Required arguments */
    2392          32 :     if (subscriber_dir == NULL)
    2393             :     {
    2394           2 :         pg_log_error("no subscriber data directory specified");
    2395           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2396           2 :         exit(1);
    2397             :     }
    2398             : 
    2399             :     /* If socket directory is not provided, use the current directory */
    2400          30 :     if (opt.socket_dir == NULL)
    2401             :     {
    2402             :         char        cwd[MAXPGPATH];
    2403             : 
    2404          10 :         if (!getcwd(cwd, MAXPGPATH))
    2405           0 :             pg_fatal("could not determine current directory");
    2406          10 :         opt.socket_dir = pg_strdup(cwd);
    2407          10 :         canonicalize_path(opt.socket_dir);
    2408             :     }
    2409             : 
    2410             :     /*
    2411             :      * Parse connection string. Build a base connection string that might be
    2412             :      * reused by multiple databases.
    2413             :      */
    2414          30 :     if (opt.pub_conninfo_str == NULL)
    2415             :     {
    2416             :         /*
    2417             :          * TODO use primary_conninfo (if available) from subscriber and
    2418             :          * extract publisher connection string. Assume that there are
    2419             :          * identical entries for physical and logical replication. If there is
    2420             :          * not, we would fail anyway.
    2421             :          */
    2422           2 :         pg_log_error("no publisher connection string specified");
    2423           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2424           2 :         exit(1);
    2425             :     }
    2426             : 
    2427          28 :     if (dry_run)
    2428          16 :         pg_log_info("Executing in dry-run mode.\n"
    2429             :                     "The target directory will not be modified.");
    2430             : 
    2431          28 :     pg_log_info("validating publisher connection string");
    2432          28 :     pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
    2433             :                                           &dbname_conninfo);
    2434          28 :     if (pub_base_conninfo == NULL)
    2435           0 :         exit(1);
    2436             : 
    2437          28 :     pg_log_info("validating subscriber connection string");
    2438          28 :     sub_base_conninfo = get_sub_conninfo(&opt);
    2439             : 
    2440             :     /*
    2441             :      * Fetch all databases from the source (publisher) and treat them as if
    2442             :      * the user specified has multiple --database options, one for each source
    2443             :      * database.
    2444             :      */
    2445          28 :     if (opt.all_dbs)
    2446             :     {
    2447           2 :         bool        dbnamespecified = (dbname_conninfo != NULL);
    2448             : 
    2449           2 :         get_publisher_databases(&opt, dbnamespecified);
    2450             :     }
    2451             : 
    2452          28 :     if (opt.database_names.head == NULL)
    2453             :     {
    2454           4 :         pg_log_info("no database was specified");
    2455             : 
    2456             :         /*
    2457             :          * Try to obtain the dbname from the publisher conninfo. If dbname
    2458             :          * parameter is not available, error out.
    2459             :          */
    2460           4 :         if (dbname_conninfo)
    2461             :         {
    2462           2 :             simple_string_list_append(&opt.database_names, dbname_conninfo);
    2463           2 :             num_dbs++;
    2464             : 
    2465           2 :             pg_log_info("database name \"%s\" was extracted from the publisher connection string",
    2466             :                         dbname_conninfo);
    2467             :         }
    2468             :         else
    2469             :         {
    2470           2 :             pg_log_error("no database name specified");
    2471           2 :             pg_log_error_hint("Try \"%s --help\" for more information.",
    2472             :                               progname);
    2473           2 :             exit(1);
    2474             :         }
    2475             :     }
    2476             : 
    2477             :     /* Number of object names must match number of databases */
    2478          26 :     if (num_pubs > 0 && num_pubs != num_dbs)
    2479             :     {
    2480           2 :         pg_log_error("wrong number of publication names specified");
    2481           2 :         pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
    2482             :                             num_pubs, num_dbs);
    2483           2 :         exit(1);
    2484             :     }
    2485          24 :     if (num_subs > 0 && num_subs != num_dbs)
    2486             :     {
    2487           2 :         pg_log_error("wrong number of subscription names specified");
    2488           2 :         pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
    2489             :                             num_subs, num_dbs);
    2490           2 :         exit(1);
    2491             :     }
    2492          22 :     if (num_replslots > 0 && num_replslots != num_dbs)
    2493             :     {
    2494           2 :         pg_log_error("wrong number of replication slot names specified");
    2495           2 :         pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
    2496             :                             num_replslots, num_dbs);
    2497           2 :         exit(1);
    2498             :     }
    2499             : 
    2500             :     /* Verify the object types specified for removal from the subscriber */
    2501          22 :     for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
    2502             :     {
    2503           2 :         if (pg_strcasecmp(cell->val, "publications") == 0)
    2504           2 :             dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
    2505             :         else
    2506             :         {
    2507           0 :             pg_log_error("invalid object type \"%s\" specified for %s",
    2508             :                          cell->val, "--clean");
    2509           0 :             pg_log_error_hint("The valid value is: \"%s\"", "publications");
    2510           0 :             exit(1);
    2511             :         }
    2512             :     }
    2513             : 
    2514             :     /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    2515          20 :     pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    2516          20 :     pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
    2517             : 
    2518             :     /* Rudimentary check for a data directory */
    2519          20 :     check_data_directory(subscriber_dir);
    2520             : 
    2521          20 :     dbinfos.two_phase = opt.two_phase;
    2522             : 
    2523             :     /*
    2524             :      * Store database information for publisher and subscriber. It should be
    2525             :      * called before atexit() because its return is used in the
    2526             :      * cleanup_objects_atexit().
    2527             :      */
    2528          20 :     dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
    2529             : 
    2530             :     /* Register a function to clean up objects in case of failure */
    2531          20 :     atexit(cleanup_objects_atexit);
    2532             : 
    2533             :     /*
    2534             :      * Check if the subscriber data directory has the same system identifier
    2535             :      * than the publisher data directory.
    2536             :      */
    2537          20 :     pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
    2538          20 :     sub_sysid = get_standby_sysid(subscriber_dir);
    2539          20 :     if (pub_sysid != sub_sysid)
    2540           2 :         pg_fatal("subscriber data directory is not a copy of the source database cluster");
    2541             : 
    2542             :     /* Subscriber PID file */
    2543          18 :     snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
    2544             : 
    2545             :     /*
    2546             :      * The standby server must not be running. If the server is started under
    2547             :      * service manager and pg_createsubscriber stops it, the service manager
    2548             :      * might react to this action and start the server again. Therefore,
    2549             :      * refuse to proceed if the server is running to avoid possible failures.
    2550             :      */
    2551          18 :     if (stat(pidfile, &statbuf) == 0)
    2552             :     {
    2553           2 :         pg_log_error("standby server is running");
    2554           2 :         pg_log_error_hint("Stop the standby server and try again.");
    2555           2 :         exit(1);
    2556             :     }
    2557             : 
    2558             :     /*
    2559             :      * Start a short-lived standby server with temporary parameters (provided
    2560             :      * by command-line options). The goal is to avoid connections during the
    2561             :      * transformation steps.
    2562             :      */
    2563          16 :     pg_log_info("starting the standby server with command-line options");
    2564          16 :     start_standby_server(&opt, true, false);
    2565             : 
    2566             :     /* Check if the standby server is ready for logical replication */
    2567          16 :     check_subscriber(dbinfos.dbinfo);
    2568             : 
    2569             :     /* Check if the primary server is ready for logical replication */
    2570          12 :     check_publisher(dbinfos.dbinfo);
    2571             : 
    2572             :     /*
    2573             :      * Stop the target server. The recovery process requires that the server
    2574             :      * reaches a consistent state before targeting the recovery stop point.
    2575             :      * Make sure a consistent state is reached (stop the target server
    2576             :      * guarantees it) *before* creating the replication slots in
    2577             :      * setup_publisher().
    2578             :      */
    2579           8 :     pg_log_info("stopping the subscriber");
    2580           8 :     stop_standby_server(subscriber_dir);
    2581             : 
    2582             :     /* Create the required objects for each database on publisher */
    2583           8 :     consistent_lsn = setup_publisher(dbinfos.dbinfo);
    2584             : 
    2585             :     /* Write the required recovery parameters */
    2586           8 :     setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
    2587             : 
    2588             :     /*
    2589             :      * Start subscriber so the recovery parameters will take effect. Wait
    2590             :      * until accepting connections. We don't want to start logical replication
    2591             :      * during setup.
    2592             :      */
    2593           8 :     pg_log_info("starting the subscriber");
    2594           8 :     start_standby_server(&opt, true, true);
    2595             : 
    2596             :     /* Waiting the subscriber to be promoted */
    2597           8 :     wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
    2598             : 
    2599             :     /*
    2600             :      * Create the subscription for each database on subscriber. It does not
    2601             :      * enable it immediately because it needs to adjust the replication start
    2602             :      * point to the LSN reported by setup_publisher().  It also cleans up
    2603             :      * publications created by this tool and replication to the standby.
    2604             :      */
    2605           8 :     setup_subscriber(dbinfos.dbinfo, consistent_lsn);
    2606             : 
    2607             :     /* Remove primary_slot_name if it exists on primary */
    2608           8 :     drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
    2609             : 
    2610             :     /* Remove failover replication slots if they exist on subscriber */
    2611           8 :     drop_failover_replication_slots(dbinfos.dbinfo);
    2612             : 
    2613             :     /* Stop the subscriber */
    2614           8 :     pg_log_info("stopping the subscriber");
    2615           8 :     stop_standby_server(subscriber_dir);
    2616             : 
    2617             :     /* Change system identifier from subscriber */
    2618           8 :     modify_subscriber_sysid(&opt);
    2619             : 
    2620           8 :     success = true;
    2621             : 
    2622           8 :     pg_log_info("Done!");
    2623             : 
    2624           8 :     return 0;
    2625             : }

Generated by: LCOV version 1.16