LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_createsubscriber.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.6 % 994 831
Test Date: 2026-05-07 03:16:33 Functions: 100.0 % 41 41
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_createsubscriber.c
       4              :  *    Create a new logical replica from a standby server
       5              :  *
       6              :  * Copyright (c) 2024-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/bin/pg_basebackup/pg_createsubscriber.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : 
      14              : #include "postgres_fe.h"
      15              : 
      16              : #include <sys/stat.h>
      17              : #include <sys/time.h>
      18              : #include <sys/wait.h>
      19              : #include <time.h>
      20              : 
      21              : #include "common/connect.h"
      22              : #include "common/controldata_utils.h"
      23              : #include "common/file_perm.h"
      24              : #include "common/file_utils.h"
      25              : #include "common/logging.h"
      26              : #include "common/pg_prng.h"
      27              : #include "common/restricted_token.h"
      28              : #include "datatype/timestamp.h"
      29              : #include "fe_utils/recovery_gen.h"
      30              : #include "fe_utils/simple_list.h"
      31              : #include "fe_utils/string_utils.h"
      32              : #include "fe_utils/version.h"
      33              : #include "getopt_long.h"
      34              : 
      35              : #define DEFAULT_SUB_PORT    "50432"
      36              : #define OBJECTTYPE_PUBLICATIONS  0x0001
      37              : 
      38              : /*
      39              :  * Configuration files for recovery parameters.
      40              :  *
      41              :  * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
      42              :  * the server through an include_if_exists in postgresql.auto.conf.
      43              :  *
      44              :  * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
      45              :  * so as the recovery parameters set by this tool never take effect on node
      46              :  * restart.  The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
      47              :  * debugging.
      48              :  */
      49              : #define PG_AUTOCONF_FILENAME        "postgresql.auto.conf"
      50              : #define INCLUDED_CONF_FILE          "pg_createsubscriber.conf"
      51              : #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
      52              : 
      53              : #define SERVER_LOG_FILE_NAME "pg_createsubscriber_server.log"
      54              : #define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal.log"
      55              : 
      56              : /* Command-line options */
      57              : struct CreateSubscriberOptions
      58              : {
      59              :     char       *config_file;    /* configuration file */
      60              :     char       *log_dir;        /* log directory name */
      61              :     char       *pub_conninfo_str;   /* publisher connection string */
      62              :     char       *socket_dir;     /* directory for Unix-domain socket, if any */
      63              :     char       *sub_port;       /* subscriber port number */
      64              :     const char *sub_username;   /* subscriber username */
      65              :     bool        two_phase;      /* enable-two-phase option */
      66              :     SimpleStringList database_names;    /* list of database names */
      67              :     SimpleStringList pub_names; /* list of publication names */
      68              :     SimpleStringList sub_names; /* list of subscription names */
      69              :     SimpleStringList replslot_names;    /* list of replication slot names */
      70              :     int         recovery_timeout;   /* stop recovery after this time */
      71              :     bool        all_dbs;        /* all option */
      72              :     SimpleStringList objecttypes_to_clean;  /* list of object types to cleanup */
      73              : };
      74              : 
      75              : /* per-database publication/subscription info */
      76              : struct LogicalRepInfo
      77              : {
      78              :     char       *dbname;         /* database name */
      79              :     char       *pubconninfo;    /* publisher connection string */
      80              :     char       *subconninfo;    /* subscriber connection string */
      81              :     char       *pubname;        /* publication name */
      82              :     char       *subname;        /* subscription name */
      83              :     char       *replslotname;   /* replication slot name */
      84              : 
      85              :     bool        made_replslot;  /* replication slot was created */
      86              :     bool        made_publication;   /* publication was created */
      87              : };
      88              : 
      89              : /*
      90              :  * Information shared across all the databases (or publications and
      91              :  * subscriptions).
      92              :  */
      93              : struct LogicalRepInfos
      94              : {
      95              :     struct LogicalRepInfo *dbinfo;
      96              :     bool        two_phase;      /* enable-two-phase option */
      97              :     uint32      objecttypes_to_clean;   /* flags indicating which object types
      98              :                                          * to clean up on subscriber */
      99              : };
     100              : 
     101              : static void cleanup_objects_atexit(void);
     102              : static void usage(void);
     103              : static char *get_base_conninfo(const char *conninfo, char **dbname);
     104              : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
     105              : static char *get_exec_path(const char *argv0, const char *progname);
     106              : static void check_data_directory(const char *datadir);
     107              : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
     108              : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     109              :                                                  const char *pub_base_conninfo,
     110              :                                                  const char *sub_base_conninfo);
     111              : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
     112              : static void disconnect_database(PGconn *conn, bool exit_on_error);
     113              : static uint64 get_primary_sysid(const char *conninfo);
     114              : static uint64 get_standby_sysid(const char *datadir);
     115              : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
     116              : static bool server_is_in_recovery(PGconn *conn);
     117              : static char *generate_object_name(PGconn *conn);
     118              : static void check_publisher(const struct LogicalRepInfo *dbinfo);
     119              : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
     120              : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
     121              : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
     122              :                              const char *consistent_lsn);
     123              : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
     124              :                            const char *lsn);
     125              : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
     126              :                                           const char *slotname);
     127              : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
     128              : static char *create_logical_replication_slot(PGconn *conn,
     129              :                                              struct LogicalRepInfo *dbinfo);
     130              : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
     131              :                                   const char *slot_name);
     132              : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
     133              : static void start_standby_server(const struct CreateSubscriberOptions *opt,
     134              :                                  bool restricted_access,
     135              :                                  bool restrict_logical_worker);
     136              : static void stop_standby_server(const char *datadir);
     137              : static void wait_for_end_recovery(const char *conninfo,
     138              :                                   const struct CreateSubscriberOptions *opt);
     139              : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     140              : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
     141              : static void drop_publication(PGconn *conn, const char *pubname,
     142              :                              const char *dbname, bool *made_publication);
     143              : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
     144              : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     145              : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
     146              :                                      const char *lsn);
     147              : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     148              : static void check_and_drop_existing_subscriptions(PGconn *conn,
     149              :                                                   const struct LogicalRepInfo *dbinfo);
     150              : static void drop_existing_subscription(PGconn *conn, const char *subname,
     151              :                                        const char *dbname);
     152              : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
     153              :                                     bool dbnamespecified);
     154              : 
     155              : #define WAIT_INTERVAL   1       /* 1 second */
     156              : 
     157              : static const char *progname;
     158              : 
     159              : static char *primary_slot_name = NULL;
     160              : static bool dry_run = false;
     161              : 
     162              : static bool success = false;
     163              : 
     164              : static struct LogicalRepInfos dbinfos;
     165              : static int  num_dbs = 0;        /* number of specified databases */
     166              : static int  num_pubs = 0;       /* number of specified publications */
     167              : static int  num_subs = 0;       /* number of specified subscriptions */
     168              : static int  num_replslots = 0;  /* number of specified replication slots */
     169              : 
     170              : static pg_prng_state prng_state;
     171              : 
     172              : static char *pg_ctl_path = NULL;
     173              : static char *pg_resetwal_path = NULL;
     174              : 
     175              : static char *logdir = NULL;     /* Subdirectory of the user specified logdir
     176              :                                  * where the log files are written (if
     177              :                                  * specified) */
     178              : 
     179              : /* standby / subscriber data directory */
     180              : static char *subscriber_dir = NULL;
     181              : 
     182              : static bool recovery_ended = false;
     183              : static bool standby_running = false;
     184              : static bool recovery_params_set = false;
     185              : 
     186              : 
     187              : /*
     188              :  * Clean up objects created by pg_createsubscriber.
     189              :  *
     190              :  * Publications and replication slots are created on the primary.  Depending
     191              :  * on the step where it failed, already-created objects should be removed if
     192              :  * possible (sometimes this won't work due to a connection issue).
     193              :  * There is no cleanup on the target server *after* its promotion, because any
     194              :  * failure at this point means recreating the physical replica and starting
     195              :  * again.
     196              :  *
     197              :  * The recovery configuration is always removed, by renaming the included
     198              :  * configuration file out of the way.
     199              :  */
     200              : static void
     201           10 : cleanup_objects_atexit(void)
     202              : {
     203              :     /* Rename the included configuration file, if necessary. */
     204           10 :     if (recovery_params_set)
     205              :     {
     206              :         char        conf_filename[MAXPGPATH];
     207              :         char        conf_filename_disabled[MAXPGPATH];
     208              : 
     209            1 :         snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
     210              :                  INCLUDED_CONF_FILE);
     211            1 :         snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
     212              :                  INCLUDED_CONF_FILE_DISABLED);
     213              : 
     214            1 :         if (durable_rename(conf_filename, conf_filename_disabled) != 0)
     215              :         {
     216              :             /* durable_rename() has already logged something. */
     217            0 :             pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
     218              :         }
     219              :     }
     220              : 
     221           10 :     if (success)
     222            4 :         return;
     223              : 
     224              :     /*
     225              :      * If the server is promoted, there is no way to use the current setup
     226              :      * again. Warn the user that a new replication setup should be done before
     227              :      * trying again.
     228              :      */
     229            6 :     if (recovery_ended)
     230              :     {
     231            0 :         pg_log_warning("failed after the end of recovery");
     232            0 :         pg_log_warning_hint("The target server cannot be used as a physical replica anymore.  "
     233              :                             "You must recreate the physical replica before continuing.");
     234              :     }
     235              : 
     236           18 :     for (int i = 0; i < num_dbs; i++)
     237              :     {
     238           12 :         struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
     239              : 
     240           12 :         if (dbinfo->made_publication || dbinfo->made_replslot)
     241              :         {
     242              :             PGconn     *conn;
     243              : 
     244            0 :             conn = connect_database(dbinfo->pubconninfo, false);
     245            0 :             if (conn != NULL)
     246              :             {
     247            0 :                 if (dbinfo->made_publication)
     248            0 :                     drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
     249              :                                      &dbinfo->made_publication);
     250            0 :                 if (dbinfo->made_replslot)
     251            0 :                     drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
     252            0 :                 disconnect_database(conn, false);
     253              :             }
     254              :             else
     255              :             {
     256              :                 /*
     257              :                  * If a connection could not be established, inform the user
     258              :                  * that some objects were left on primary and should be
     259              :                  * removed before trying again.
     260              :                  */
     261            0 :                 if (dbinfo->made_publication)
     262              :                 {
     263            0 :                     pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
     264              :                                    dbinfo->pubname,
     265              :                                    dbinfo->dbname);
     266            0 :                     pg_log_warning_hint("Drop this publication before trying again.");
     267              :                 }
     268            0 :                 if (dbinfo->made_replslot)
     269              :                 {
     270            0 :                     pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
     271              :                                    dbinfo->replslotname,
     272              :                                    dbinfo->dbname);
     273            0 :                     pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
     274              :                 }
     275              :             }
     276              :         }
     277              :     }
     278              : 
     279            6 :     if (standby_running)
     280            4 :         stop_standby_server(subscriber_dir);
     281              : }
     282              : 
     283              : static void
     284            1 : usage(void)
     285              : {
     286            1 :     printf(_("%s creates a new logical replica from a standby server.\n\n"),
     287              :            progname);
     288            1 :     printf(_("Usage:\n"));
     289            1 :     printf(_("  %s [OPTION]...\n"), progname);
     290            1 :     printf(_("\nOptions:\n"));
     291            1 :     printf(_("  -a, --all                       create subscriptions for all databases except template\n"
     292              :              "                                  databases and databases that don't allow connections\n"));
     293            1 :     printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
     294            1 :     printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
     295            1 :     printf(_("  -l, --logdir=LOGDIR             location for the log directory\n"));
     296            1 :     printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
     297            1 :     printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
     298            1 :     printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
     299            1 :     printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
     300            1 :     printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
     301            1 :     printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
     302            1 :     printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
     303            1 :     printf(_("  -v, --verbose                   output verbose messages\n"));
     304            1 :     printf(_("      --clean=OBJECTTYPE          drop all objects of the specified type from specified\n"
     305              :              "                                  databases on the subscriber; accepts: \"%s\"\n"), "publications");
     306            1 :     printf(_("      --config-file=FILENAME      use specified main server configuration\n"
     307              :              "                                  file when running target cluster\n"));
     308            1 :     printf(_("      --publication=NAME          publication name\n"));
     309            1 :     printf(_("      --replication-slot=NAME     replication slot name\n"));
     310            1 :     printf(_("      --subscription=NAME         subscription name\n"));
     311            1 :     printf(_("  -V, --version                   output version information, then exit\n"));
     312            1 :     printf(_("  -?, --help                      show this help, then exit\n"));
     313            1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     314            1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     315            1 : }
     316              : 
     317              : /*
     318              :  * Subroutine to append "keyword=value" to a connection string,
     319              :  * with proper quoting of the value.  (We assume keywords don't need that.)
     320              :  */
     321              : static void
     322          107 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
     323              : {
     324          107 :     if (buf->len > 0)
     325           79 :         appendPQExpBufferChar(buf, ' ');
     326          107 :     appendPQExpBufferStr(buf, keyword);
     327          107 :     appendPQExpBufferChar(buf, '=');
     328          107 :     appendConnStrVal(buf, val);
     329          107 : }
     330              : 
     331              : /*
     332              :  * Validate a connection string. Returns a base connection string that is a
     333              :  * connection string without a database name.
     334              :  *
     335              :  * Since we might process multiple databases, each database name will be
     336              :  * appended to this base connection string to provide a final connection
     337              :  * string. If the second argument (dbname) is not null, returns dbname if the
     338              :  * provided connection string contains it.
     339              :  *
     340              :  * It is the caller's responsibility to free the returned connection string and
     341              :  * dbname.
     342              :  */
     343              : static char *
     344           14 : get_base_conninfo(const char *conninfo, char **dbname)
     345              : {
     346              :     PQExpBuffer buf;
     347              :     PQconninfoOption *conn_opts;
     348              :     PQconninfoOption *conn_opt;
     349           14 :     char       *errmsg = NULL;
     350              :     char       *ret;
     351              : 
     352           14 :     conn_opts = PQconninfoParse(conninfo, &errmsg);
     353           14 :     if (conn_opts == NULL)
     354              :     {
     355            0 :         pg_log_error("could not parse connection string: %s", errmsg);
     356            0 :         PQfreemem(errmsg);
     357            0 :         return NULL;
     358              :     }
     359              : 
     360           14 :     buf = createPQExpBuffer();
     361          742 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     362              :     {
     363          728 :         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     364              :         {
     365           33 :             if (strcmp(conn_opt->keyword, "dbname") == 0)
     366              :             {
     367            9 :                 if (dbname)
     368            9 :                     *dbname = pg_strdup(conn_opt->val);
     369            9 :                 continue;
     370              :             }
     371           24 :             appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
     372              :         }
     373              :     }
     374              : 
     375           14 :     ret = pg_strdup(buf->data);
     376              : 
     377           14 :     destroyPQExpBuffer(buf);
     378           14 :     PQconninfoFree(conn_opts);
     379              : 
     380           14 :     return ret;
     381              : }
     382              : 
     383              : /*
     384              :  * Build a subscriber connection string. Only a few parameters are supported
     385              :  * since it starts a server with restricted access.
     386              :  */
     387              : static char *
     388           14 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
     389              : {
     390           14 :     PQExpBuffer buf = createPQExpBuffer();
     391              :     char       *ret;
     392              : 
     393           14 :     appendConnStrItem(buf, "port", opt->sub_port);
     394              : #if !defined(WIN32)
     395           14 :     appendConnStrItem(buf, "host", opt->socket_dir);
     396              : #endif
     397           14 :     if (opt->sub_username != NULL)
     398            0 :         appendConnStrItem(buf, "user", opt->sub_username);
     399           14 :     appendConnStrItem(buf, "fallback_application_name", progname);
     400              : 
     401           14 :     ret = pg_strdup(buf->data);
     402              : 
     403           14 :     destroyPQExpBuffer(buf);
     404              : 
     405           14 :     return ret;
     406              : }
     407              : 
     408              : /*
     409              :  * Verify if a PostgreSQL binary (progname) is available in the same directory as
     410              :  * pg_createsubscriber and it has the same version.  It returns the absolute
     411              :  * path of the progname.
     412              :  */
     413              : static char *
     414           20 : get_exec_path(const char *argv0, const char *progname)
     415              : {
     416              :     char       *versionstr;
     417              :     char       *exec_path;
     418              :     int         ret;
     419              : 
     420           20 :     versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
     421           20 :     exec_path = pg_malloc(MAXPGPATH);
     422           20 :     ret = find_other_exec(argv0, progname, versionstr, exec_path);
     423              : 
     424           20 :     if (ret < 0)
     425              :     {
     426              :         char        full_path[MAXPGPATH];
     427              : 
     428            0 :         if (find_my_exec(argv0, full_path) < 0)
     429            0 :             strlcpy(full_path, progname, sizeof(full_path));
     430              : 
     431            0 :         if (ret == -1)
     432            0 :             pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
     433              :                      progname, "pg_createsubscriber", full_path);
     434              :         else
     435            0 :             pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
     436              :                      progname, full_path, "pg_createsubscriber");
     437              :     }
     438              : 
     439           20 :     pg_log_debug("%s path is:  %s", progname, exec_path);
     440              : 
     441           20 :     return exec_path;
     442              : }
     443              : 
     444              : /*
     445              :  * Is it a cluster directory? These are preliminary checks. It is far from
     446              :  * making an accurate check. If it is not a clone from the publisher, it will
     447              :  * eventually fail in a future step.
     448              :  */
     449              : static void
     450           10 : check_data_directory(const char *datadir)
     451              : {
     452              :     struct stat statbuf;
     453              :     uint32      major_version;
     454              :     char       *version_str;
     455              : 
     456           10 :     pg_log_info("checking if directory \"%s\" is a cluster data directory",
     457              :                 datadir);
     458              : 
     459           10 :     if (stat(datadir, &statbuf) != 0)
     460              :     {
     461            0 :         if (errno == ENOENT)
     462            0 :             pg_fatal("data directory \"%s\" does not exist", datadir);
     463              :         else
     464            0 :             pg_fatal("could not access directory \"%s\": %m", datadir);
     465              :     }
     466              : 
     467              :     /*
     468              :      * Retrieve the contents of this cluster's PG_VERSION.  We require
     469              :      * compatibility with the same major version as the one this tool is
     470              :      * compiled with.
     471              :      */
     472           10 :     major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
     473           10 :     if (major_version != PG_MAJORVERSION_NUM)
     474              :     {
     475            0 :         pg_log_error("data directory is of wrong version");
     476            0 :         pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
     477              :                             "PG_VERSION", version_str, PG_MAJORVERSION);
     478            0 :         exit(1);
     479              :     }
     480           10 : }
     481              : 
     482              : /*
     483              :  * Append database name into a base connection string.
     484              :  *
     485              :  * dbname is the only parameter that changes so it is not included in the base
     486              :  * connection string. This function concatenates dbname to build a "real"
     487              :  * connection string.
     488              :  */
     489              : static char *
     490           41 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
     491              : {
     492           41 :     PQExpBuffer buf = createPQExpBuffer();
     493              :     char       *ret;
     494              : 
     495              :     Assert(conninfo != NULL);
     496              : 
     497           41 :     appendPQExpBufferStr(buf, conninfo);
     498           41 :     appendConnStrItem(buf, "dbname", dbname);
     499              : 
     500           41 :     ret = pg_strdup(buf->data);
     501           41 :     destroyPQExpBuffer(buf);
     502              : 
     503           41 :     return ret;
     504              : }
     505              : 
     506              : /*
     507              :  * Store publication and subscription information.
     508              :  *
     509              :  * If publication, replication slot and subscription names were specified,
     510              :  * store it here. Otherwise, a generated name will be assigned to the object in
     511              :  * setup_publisher().
     512              :  */
     513              : static struct LogicalRepInfo *
     514           10 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     515              :                    const char *pub_base_conninfo,
     516              :                    const char *sub_base_conninfo)
     517              : {
     518              :     struct LogicalRepInfo *dbinfo;
     519           10 :     SimpleStringListCell *pubcell = NULL;
     520           10 :     SimpleStringListCell *subcell = NULL;
     521           10 :     SimpleStringListCell *replslotcell = NULL;
     522           10 :     int         i = 0;
     523              : 
     524           10 :     dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
     525              : 
     526           10 :     if (num_pubs > 0)
     527            2 :         pubcell = opt->pub_names.head;
     528           10 :     if (num_subs > 0)
     529            1 :         subcell = opt->sub_names.head;
     530           10 :     if (num_replslots > 0)
     531            2 :         replslotcell = opt->replslot_names.head;
     532              : 
     533           30 :     for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
     534              :     {
     535              :         char       *conninfo;
     536              : 
     537              :         /* Fill publisher attributes */
     538           20 :         conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
     539           20 :         dbinfo[i].pubconninfo = conninfo;
     540           20 :         dbinfo[i].dbname = cell->val;
     541           20 :         if (num_pubs > 0)
     542            4 :             dbinfo[i].pubname = pubcell->val;
     543              :         else
     544           16 :             dbinfo[i].pubname = NULL;
     545           20 :         if (num_replslots > 0)
     546            3 :             dbinfo[i].replslotname = replslotcell->val;
     547              :         else
     548           17 :             dbinfo[i].replslotname = NULL;
     549           20 :         dbinfo[i].made_replslot = false;
     550           20 :         dbinfo[i].made_publication = false;
     551              :         /* Fill subscriber attributes */
     552           20 :         conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
     553           20 :         dbinfo[i].subconninfo = conninfo;
     554           20 :         if (num_subs > 0)
     555            2 :             dbinfo[i].subname = subcell->val;
     556              :         else
     557           18 :             dbinfo[i].subname = NULL;
     558              :         /* Other fields will be filled later */
     559              : 
     560           20 :         pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
     561              :                      dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
     562              :                      dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
     563              :                      dbinfo[i].pubconninfo);
     564           20 :         pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
     565              :                      dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
     566              :                      dbinfo[i].subconninfo,
     567              :                      dbinfos.two_phase ? "true" : "false");
     568              : 
     569           20 :         if (num_pubs > 0)
     570            4 :             pubcell = pubcell->next;
     571           20 :         if (num_subs > 0)
     572            2 :             subcell = subcell->next;
     573           20 :         if (num_replslots > 0)
     574            3 :             replslotcell = replslotcell->next;
     575              : 
     576           20 :         i++;
     577              :     }
     578              : 
     579           10 :     return dbinfo;
     580              : }
     581              : 
     582              : /*
     583              :  * Open a new connection. If exit_on_error is true, it has an undesired
     584              :  * condition and it should exit immediately.
     585              :  */
     586              : static PGconn *
     587           57 : connect_database(const char *conninfo, bool exit_on_error)
     588              : {
     589              :     PGconn     *conn;
     590              :     PGresult   *res;
     591              : 
     592           57 :     conn = PQconnectdb(conninfo);
     593           57 :     if (PQstatus(conn) != CONNECTION_OK)
     594              :     {
     595            0 :         pg_log_error("connection to database failed: %s",
     596              :                      PQerrorMessage(conn));
     597            0 :         PQfinish(conn);
     598              : 
     599            0 :         if (exit_on_error)
     600            0 :             exit(1);
     601            0 :         return NULL;
     602              :     }
     603              : 
     604              :     /* Secure search_path */
     605           57 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     606           57 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     607              :     {
     608            0 :         pg_log_error("could not clear \"search_path\": %s",
     609              :                      PQresultErrorMessage(res));
     610            0 :         PQclear(res);
     611            0 :         PQfinish(conn);
     612              : 
     613            0 :         if (exit_on_error)
     614            0 :             exit(1);
     615            0 :         return NULL;
     616              :     }
     617           57 :     PQclear(res);
     618              : 
     619           57 :     return conn;
     620              : }
     621              : 
     622              : /*
     623              :  * Close the connection. If exit_on_error is true, it has an undesired
     624              :  * condition and it should exit immediately.
     625              :  */
     626              : static void
     627           57 : disconnect_database(PGconn *conn, bool exit_on_error)
     628              : {
     629              :     Assert(conn != NULL);
     630              : 
     631           57 :     PQfinish(conn);
     632              : 
     633           57 :     if (exit_on_error)
     634            2 :         exit(1);
     635           55 : }
     636              : 
     637              : /*
     638              :  * Obtain the system identifier using the provided connection. It will be used
     639              :  * to compare if a data directory is a clone of another one.
     640              :  */
     641              : static uint64
     642           10 : get_primary_sysid(const char *conninfo)
     643              : {
     644              :     PGconn     *conn;
     645              :     PGresult   *res;
     646              :     uint64      sysid;
     647              : 
     648           10 :     pg_log_info("getting system identifier from publisher");
     649              : 
     650           10 :     conn = connect_database(conninfo, true);
     651              : 
     652           10 :     res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
     653           10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     654              :     {
     655            0 :         pg_log_error("could not get system identifier: %s",
     656              :                      PQresultErrorMessage(res));
     657            0 :         disconnect_database(conn, true);
     658              :     }
     659           10 :     if (PQntuples(res) != 1)
     660              :     {
     661            0 :         pg_log_error("could not get system identifier: got %d rows, expected %d row",
     662              :                      PQntuples(res), 1);
     663            0 :         disconnect_database(conn, true);
     664              :     }
     665              : 
     666           10 :     sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
     667              : 
     668           10 :     pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
     669              : 
     670           10 :     PQclear(res);
     671           10 :     disconnect_database(conn, false);
     672              : 
     673           10 :     return sysid;
     674              : }
     675              : 
     676              : /*
     677              :  * Obtain the system identifier from control file. It will be used to compare
     678              :  * if a data directory is a clone of another one. This routine is used locally
     679              :  * and avoids a connection.
     680              :  */
     681              : static uint64
     682           10 : get_standby_sysid(const char *datadir)
     683              : {
     684              :     ControlFileData *cf;
     685              :     bool        crc_ok;
     686              :     uint64      sysid;
     687              : 
     688           10 :     pg_log_info("getting system identifier from subscriber");
     689              : 
     690           10 :     cf = get_controlfile(datadir, &crc_ok);
     691           10 :     if (!crc_ok)
     692            0 :         pg_fatal("control file appears to be corrupt");
     693              : 
     694           10 :     sysid = cf->system_identifier;
     695              : 
     696           10 :     pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
     697              : 
     698           10 :     pg_free(cf);
     699              : 
     700           10 :     return sysid;
     701              : }
     702              : 
     703              : /*
     704              :  * Modify the system identifier. Since a standby server preserves the system
     705              :  * identifier, it makes sense to change it to avoid situations in which WAL
     706              :  * files from one of the systems might be used in the other one.
     707              :  */
     708              : static void
     709            4 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
     710              : {
     711              :     ControlFileData *cf;
     712              :     bool        crc_ok;
     713              :     struct timeval tv;
     714              : 
     715              :     char       *out_file;
     716              :     char       *cmd_str;
     717              : 
     718            4 :     pg_log_info("modifying system identifier of subscriber");
     719              : 
     720            4 :     cf = get_controlfile(subscriber_dir, &crc_ok);
     721            4 :     if (!crc_ok)
     722            0 :         pg_fatal("control file appears to be corrupt");
     723              : 
     724              :     /*
     725              :      * Select a new system identifier.
     726              :      *
     727              :      * XXX this code was extracted from BootStrapXLOG().
     728              :      */
     729            4 :     gettimeofday(&tv, NULL);
     730            4 :     cf->system_identifier = ((uint64) tv.tv_sec) << 32;
     731            4 :     cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
     732            4 :     cf->system_identifier |= getpid() & 0xFFF;
     733              : 
     734            4 :     if (dry_run)
     735            3 :         pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
     736              :                     cf->system_identifier);
     737              :     else
     738              :     {
     739            1 :         update_controlfile(subscriber_dir, cf, true);
     740            1 :         pg_log_info("system identifier is %" PRIu64 " on subscriber",
     741              :                     cf->system_identifier);
     742              :     }
     743              : 
     744            4 :     if (dry_run)
     745            3 :         pg_log_info("dry-run: would run pg_resetwal on the subscriber");
     746              :     else
     747            1 :         pg_log_info("running pg_resetwal on the subscriber");
     748              : 
     749              :     /*
     750              :      * Redirecting the output to the logfile if specified. Since the output
     751              :      * would be very short, around one line, we do not provide a separate file
     752              :      * for it; it's done as a part of the server log.
     753              :      */
     754            4 :     if (opt->log_dir)
     755            1 :         out_file = psprintf("%s/%s", logdir, SERVER_LOG_FILE_NAME);
     756              :     else
     757            3 :         out_file = DEVNULL;
     758              : 
     759            4 :     cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
     760              :                        subscriber_dir, out_file);
     761            4 :     if (opt->log_dir)
     762            1 :         pg_free(out_file);
     763              : 
     764            4 :     pg_log_debug("pg_resetwal command is: %s", cmd_str);
     765              : 
     766            4 :     if (!dry_run)
     767              :     {
     768            1 :         int         rc = system(cmd_str);
     769              : 
     770            1 :         if (rc == 0)
     771            1 :             pg_log_info("successfully reset WAL on the subscriber");
     772              :         else
     773            0 :             pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
     774              :     }
     775              : 
     776            4 :     pg_free(cf);
     777            4 :     pg_free(cmd_str);
     778            4 : }
     779              : 
     780              : /*
     781              :  * Generate an object name using a prefix, database oid and a random integer.
     782              :  * It is used in case the user does not specify an object name (publication,
     783              :  * subscription, replication slot).
     784              :  */
     785              : static char *
     786            8 : generate_object_name(PGconn *conn)
     787              : {
     788              :     PGresult   *res;
     789              :     Oid         oid;
     790              :     uint32      rand;
     791              :     char       *objname;
     792              : 
     793            8 :     res = PQexec(conn,
     794              :                  "SELECT oid FROM pg_catalog.pg_database "
     795              :                  "WHERE datname = pg_catalog.current_database()");
     796            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     797              :     {
     798            0 :         pg_log_error("could not obtain database OID: %s",
     799              :                      PQresultErrorMessage(res));
     800            0 :         disconnect_database(conn, true);
     801              :     }
     802              : 
     803            8 :     if (PQntuples(res) != 1)
     804              :     {
     805            0 :         pg_log_error("could not obtain database OID: got %d rows, expected %d row",
     806              :                      PQntuples(res), 1);
     807            0 :         disconnect_database(conn, true);
     808              :     }
     809              : 
     810              :     /* Database OID */
     811            8 :     oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
     812              : 
     813            8 :     PQclear(res);
     814              : 
     815              :     /* Random unsigned integer */
     816            8 :     rand = pg_prng_uint32(&prng_state);
     817              : 
     818              :     /*
     819              :      * Build the object name. The name must not exceed NAMEDATALEN - 1. This
     820              :      * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
     821              :      * '\0').
     822              :      */
     823            8 :     objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
     824              : 
     825            8 :     return objname;
     826              : }
     827              : 
     828              : /*
     829              :  * Does the publication exist in the specified database?
     830              :  */
     831              : static bool
     832            8 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
     833              : {
     834            8 :     PQExpBuffer str = createPQExpBuffer();
     835              :     PGresult   *res;
     836            8 :     bool        found = false;
     837            8 :     char       *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
     838              : 
     839            8 :     appendPQExpBuffer(str,
     840              :                       "SELECT 1 FROM pg_catalog.pg_publication "
     841              :                       "WHERE pubname = %s",
     842              :                       pubname_esc);
     843            8 :     res = PQexec(conn, str->data);
     844            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     845              :     {
     846            0 :         pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
     847              :                      pubname, dbname, PQerrorMessage(conn));
     848            0 :         disconnect_database(conn, true);
     849              :     }
     850              : 
     851            8 :     if (PQntuples(res) == 1)
     852            1 :         found = true;
     853              : 
     854            8 :     PQclear(res);
     855            8 :     PQfreemem(pubname_esc);
     856            8 :     destroyPQExpBuffer(str);
     857              : 
     858            8 :     return found;
     859              : }
     860              : 
     861              : /*
     862              :  * Create the publications and replication slots in preparation for logical
     863              :  * replication. Returns the LSN from latest replication slot. It will be the
     864              :  * replication start point that is used to adjust the subscriptions (see
     865              :  * set_replication_progress).
     866              :  */
     867              : static char *
     868            4 : setup_publisher(struct LogicalRepInfo *dbinfo)
     869              : {
     870            4 :     char       *lsn = NULL;
     871              : 
     872            4 :     pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
     873              : 
     874           12 :     for (int i = 0; i < num_dbs; i++)
     875              :     {
     876              :         PGconn     *conn;
     877            8 :         char       *genname = NULL;
     878              : 
     879            8 :         conn = connect_database(dbinfo[i].pubconninfo, true);
     880              : 
     881              :         /*
     882              :          * If an object name was not specified as command-line options, assign
     883              :          * a generated object name. The replication slot has a different rule.
     884              :          * The subscription name is assigned to the replication slot name if
     885              :          * no replication slot is specified. It follows the same rule as
     886              :          * CREATE SUBSCRIPTION.
     887              :          */
     888            8 :         if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
     889            8 :             genname = generate_object_name(conn);
     890            8 :         if (num_pubs == 0)
     891            4 :             dbinfo[i].pubname = pg_strdup(genname);
     892            8 :         if (num_subs == 0)
     893            6 :             dbinfo[i].subname = pg_strdup(genname);
     894            8 :         if (num_replslots == 0)
     895            5 :             dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
     896              : 
     897            8 :         if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
     898              :         {
     899              :             /* Reuse existing publication on publisher. */
     900            1 :             pg_log_info("use existing publication \"%s\" in database \"%s\"",
     901              :                         dbinfo[i].pubname, dbinfo[i].dbname);
     902              :             /* Don't remove pre-existing publication if an error occurs. */
     903            1 :             dbinfo[i].made_publication = false;
     904              :         }
     905              :         else
     906              :         {
     907              :             /*
     908              :              * Create publication on publisher. This step should be executed
     909              :              * *before* promoting the subscriber to avoid any transactions
     910              :              * between consistent LSN and the new publication rows (such
     911              :              * transactions wouldn't see the new publication rows resulting in
     912              :              * an error).
     913              :              */
     914            7 :             create_publication(conn, &dbinfo[i]);
     915              :         }
     916              : 
     917              :         /* Create replication slot on publisher */
     918            8 :         if (lsn)
     919            1 :             pg_free(lsn);
     920            8 :         lsn = create_logical_replication_slot(conn, &dbinfo[i]);
     921            8 :         if (lsn == NULL && !dry_run)
     922            0 :             exit(1);
     923              : 
     924              :         /*
     925              :          * Since we are using the LSN returned by the last replication slot as
     926              :          * recovery_target_lsn, this LSN is ahead of the current WAL position
     927              :          * and the recovery waits until the publisher writes a WAL record to
     928              :          * reach the target and ends the recovery. On idle systems, this wait
     929              :          * time is unpredictable and could lead to failure in promoting the
     930              :          * subscriber. To avoid that, insert a harmless WAL record.
     931              :          */
     932            8 :         if (i == num_dbs - 1 && !dry_run)
     933              :         {
     934              :             PGresult   *res;
     935              : 
     936            1 :             res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
     937            1 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     938              :             {
     939            0 :                 pg_log_error("could not write an additional WAL record: %s",
     940              :                              PQresultErrorMessage(res));
     941            0 :                 disconnect_database(conn, true);
     942              :             }
     943            1 :             PQclear(res);
     944              :         }
     945              : 
     946            8 :         disconnect_database(conn, false);
     947              :     }
     948              : 
     949            4 :     return lsn;
     950              : }
     951              : 
     952              : /*
     953              :  * Is recovery still in progress?
     954              :  */
     955              : static bool
     956           15 : server_is_in_recovery(PGconn *conn)
     957              : {
     958              :     PGresult   *res;
     959              :     int         ret;
     960              : 
     961           15 :     res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
     962              : 
     963           15 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     964              :     {
     965            0 :         pg_log_error("could not obtain recovery progress: %s",
     966              :                      PQresultErrorMessage(res));
     967            0 :         disconnect_database(conn, true);
     968              :     }
     969              : 
     970              : 
     971           15 :     ret = strcmp("t", PQgetvalue(res, 0, 0));
     972              : 
     973           15 :     PQclear(res);
     974              : 
     975           15 :     return ret == 0;
     976              : }
     977              : 
     978              : static void
     979            1 : make_output_dirs(const char *log_basedir)
     980              : {
     981              :     char        timestamp[128];
     982              :     struct timeval tval;
     983              :     time_t      now;
     984              :     struct tm   tmbuf;
     985              : 
     986              :     /* Generate timestamp */
     987            1 :     gettimeofday(&tval, NULL);
     988            1 :     now = tval.tv_sec;
     989              : 
     990            1 :     strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
     991            1 :              localtime_r(&now, &tmbuf));
     992              : 
     993              :     /* Append milliseconds */
     994            1 :     snprintf(timestamp + strlen(timestamp),
     995            1 :              sizeof(timestamp) - strlen(timestamp), ".%03u",
     996            1 :              (unsigned int) (tval.tv_usec / 1000));
     997              : 
     998              :     /* Build timestamp directory path */
     999            1 :     logdir = psprintf("%s/%s", log_basedir, timestamp);
    1000              : 
    1001              :     /* Create base directory (ignore if exists) */
    1002            1 :     if (mkdir(log_basedir, pg_dir_create_mode) < 0 && errno != EEXIST)
    1003            0 :         pg_fatal("could not create directory \"%s\": %m", log_basedir);
    1004              : 
    1005              :     /* Create a timestamp-named subdirectory under the base directory */
    1006            1 :     if (mkdir(logdir, pg_dir_create_mode) < 0)
    1007            0 :         pg_fatal("could not create directory \"%s\": %m", logdir);
    1008            1 : }
    1009              : 
    1010              : /*
    1011              :  * Is the primary server ready for logical replication?
    1012              :  *
    1013              :  * XXX Does it not allow a synchronous replica?
    1014              :  */
    1015              : static void
    1016            6 : check_publisher(const struct LogicalRepInfo *dbinfo)
    1017              : {
    1018              :     PGconn     *conn;
    1019              :     PGresult   *res;
    1020            6 :     bool        failed = false;
    1021              : 
    1022              :     char       *wal_level;
    1023              :     int         max_repslots;
    1024              :     int         cur_repslots;
    1025              :     int         max_walsenders;
    1026              :     int         cur_walsenders;
    1027              :     int         max_prepared_transactions;
    1028              :     char       *max_slot_wal_keep_size;
    1029              : 
    1030            6 :     pg_log_info("checking settings on publisher");
    1031              : 
    1032            6 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1033              : 
    1034              :     /*
    1035              :      * If the primary server is in recovery (i.e. cascading replication),
    1036              :      * objects (publication) cannot be created because it is read only.
    1037              :      */
    1038            6 :     if (server_is_in_recovery(conn))
    1039              :     {
    1040            1 :         pg_log_error("primary server cannot be in recovery");
    1041            1 :         disconnect_database(conn, true);
    1042              :     }
    1043              : 
    1044              :     /*------------------------------------------------------------------------
    1045              :      * Logical replication requires a few parameters to be set on publisher.
    1046              :      * Since these parameters are not a requirement for physical replication,
    1047              :      * we should check it to make sure it won't fail.
    1048              :      *
    1049              :      * - wal_level >= replica
    1050              :      * - max_replication_slots >= current + number of dbs to be converted
    1051              :      * - max_wal_senders >= current + number of dbs to be converted
    1052              :      * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
    1053              :      * -----------------------------------------------------------------------
    1054              :      */
    1055            5 :     res = PQexec(conn,
    1056              :                  "SELECT pg_catalog.current_setting('wal_level'),"
    1057              :                  " pg_catalog.current_setting('max_replication_slots'),"
    1058              :                  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
    1059              :                  " pg_catalog.current_setting('max_wal_senders'),"
    1060              :                  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
    1061              :                  " pg_catalog.current_setting('max_prepared_transactions'),"
    1062              :                  " pg_catalog.current_setting('max_slot_wal_keep_size')");
    1063              : 
    1064            5 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1065              :     {
    1066            0 :         pg_log_error("could not obtain publisher settings: %s",
    1067              :                      PQresultErrorMessage(res));
    1068            0 :         disconnect_database(conn, true);
    1069              :     }
    1070              : 
    1071            5 :     wal_level = pg_strdup(PQgetvalue(res, 0, 0));
    1072            5 :     max_repslots = atoi(PQgetvalue(res, 0, 1));
    1073            5 :     cur_repslots = atoi(PQgetvalue(res, 0, 2));
    1074            5 :     max_walsenders = atoi(PQgetvalue(res, 0, 3));
    1075            5 :     cur_walsenders = atoi(PQgetvalue(res, 0, 4));
    1076            5 :     max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
    1077            5 :     max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
    1078              : 
    1079            5 :     PQclear(res);
    1080              : 
    1081            5 :     pg_log_debug("publisher: wal_level: %s", wal_level);
    1082            5 :     pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
    1083            5 :     pg_log_debug("publisher: current replication slots: %d", cur_repslots);
    1084            5 :     pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
    1085            5 :     pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
    1086            5 :     pg_log_debug("publisher: max_prepared_transactions: %d",
    1087              :                  max_prepared_transactions);
    1088            5 :     pg_log_debug("publisher: max_slot_wal_keep_size: %s",
    1089              :                  max_slot_wal_keep_size);
    1090              : 
    1091            5 :     disconnect_database(conn, false);
    1092              : 
    1093            5 :     if (strcmp(wal_level, "minimal") == 0)
    1094              :     {
    1095            0 :         pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
    1096            0 :         failed = true;
    1097              :     }
    1098              : 
    1099            5 :     if (max_repslots - cur_repslots < num_dbs)
    1100              :     {
    1101            1 :         pg_log_error("publisher requires %d replication slots, but only %d remain",
    1102              :                      num_dbs, max_repslots - cur_repslots);
    1103            1 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1104              :                           "max_replication_slots", cur_repslots + num_dbs);
    1105            1 :         failed = true;
    1106              :     }
    1107              : 
    1108            5 :     if (max_walsenders - cur_walsenders < num_dbs)
    1109              :     {
    1110            1 :         pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
    1111              :                      num_dbs, max_walsenders - cur_walsenders);
    1112            1 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1113              :                           "max_wal_senders", cur_walsenders + num_dbs);
    1114            1 :         failed = true;
    1115              :     }
    1116              : 
    1117            5 :     if (max_prepared_transactions != 0 && !dbinfos.two_phase)
    1118              :     {
    1119            0 :         pg_log_warning("two_phase option will not be enabled for replication slots");
    1120            0 :         pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
    1121              :                               "Prepared transactions will be replicated at COMMIT PREPARED.");
    1122            0 :         pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
    1123              :     }
    1124              : 
    1125              :     /*
    1126              :      * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
    1127              :      * is set to a non-default value, it may cause replication failures due to
    1128              :      * required WAL files being prematurely removed.
    1129              :      */
    1130            5 :     if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
    1131              :     {
    1132            0 :         pg_log_warning("required WAL could be removed from the publisher");
    1133            0 :         pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
    1134              :                             "max_slot_wal_keep_size");
    1135              :     }
    1136              : 
    1137            5 :     pg_free(wal_level);
    1138              : 
    1139            5 :     if (failed)
    1140            1 :         exit(1);
    1141            4 : }
    1142              : 
    1143              : /*
    1144              :  * Is the standby server ready for logical replication?
    1145              :  *
    1146              :  * XXX Does it not allow a time-delayed replica?
    1147              :  *
    1148              :  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
    1149              :  * is S, it cannot detect there is a replica (server C) because server S starts
    1150              :  * accepting only local connections and server C cannot connect to it. Hence,
    1151              :  * there is not a reliable way to provide a suitable error saying the server C
    1152              :  * will be broken at the end of this process (due to pg_resetwal).
    1153              :  */
    1154              : static void
    1155            8 : check_subscriber(const struct LogicalRepInfo *dbinfo)
    1156              : {
    1157              :     PGconn     *conn;
    1158              :     PGresult   *res;
    1159            8 :     bool        failed = false;
    1160              : 
    1161              :     int         max_lrworkers;
    1162              :     int         max_replorigins;
    1163              :     int         max_wprocs;
    1164              : 
    1165            8 :     pg_log_info("checking settings on subscriber");
    1166              : 
    1167            8 :     conn = connect_database(dbinfo[0].subconninfo, true);
    1168              : 
    1169              :     /* The target server must be a standby */
    1170            8 :     if (!server_is_in_recovery(conn))
    1171              :     {
    1172            1 :         pg_log_error("target server must be a standby");
    1173            1 :         disconnect_database(conn, true);
    1174              :     }
    1175              : 
    1176              :     /*------------------------------------------------------------------------
    1177              :      * Logical replication requires a few parameters to be set on subscriber.
    1178              :      * Since these parameters are not a requirement for physical replication,
    1179              :      * we should check it to make sure it won't fail.
    1180              :      *
    1181              :      * - max_active_replication_origins >= number of dbs to be converted
    1182              :      * - max_logical_replication_workers >= number of dbs to be converted
    1183              :      * - max_worker_processes >= 1 + number of dbs to be converted
    1184              :      *------------------------------------------------------------------------
    1185              :      */
    1186            7 :     res = PQexec(conn,
    1187              :                  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
    1188              :                  "'max_logical_replication_workers', "
    1189              :                  "'max_active_replication_origins', "
    1190              :                  "'max_worker_processes', "
    1191              :                  "'primary_slot_name') "
    1192              :                  "ORDER BY name");
    1193              : 
    1194            7 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1195              :     {
    1196            0 :         pg_log_error("could not obtain subscriber settings: %s",
    1197              :                      PQresultErrorMessage(res));
    1198            0 :         disconnect_database(conn, true);
    1199              :     }
    1200              : 
    1201            7 :     max_replorigins = atoi(PQgetvalue(res, 0, 0));
    1202            7 :     max_lrworkers = atoi(PQgetvalue(res, 1, 0));
    1203            7 :     max_wprocs = atoi(PQgetvalue(res, 2, 0));
    1204            7 :     if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
    1205            6 :         primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
    1206              : 
    1207            7 :     pg_log_debug("subscriber: max_logical_replication_workers: %d",
    1208              :                  max_lrworkers);
    1209            7 :     pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
    1210            7 :     pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
    1211            7 :     if (primary_slot_name)
    1212            6 :         pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
    1213              : 
    1214            7 :     PQclear(res);
    1215              : 
    1216            7 :     disconnect_database(conn, false);
    1217              : 
    1218            7 :     if (max_replorigins < num_dbs)
    1219              :     {
    1220            1 :         pg_log_error("subscriber requires %d active replication origins, but only %d remain",
    1221              :                      num_dbs, max_replorigins);
    1222            1 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1223              :                           "max_active_replication_origins", num_dbs);
    1224            1 :         failed = true;
    1225              :     }
    1226              : 
    1227            7 :     if (max_lrworkers < num_dbs)
    1228              :     {
    1229            1 :         pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
    1230              :                      num_dbs, max_lrworkers);
    1231            1 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1232              :                           "max_logical_replication_workers", num_dbs);
    1233            1 :         failed = true;
    1234              :     }
    1235              : 
    1236            7 :     if (max_wprocs < num_dbs + 1)
    1237              :     {
    1238            1 :         pg_log_error("subscriber requires %d worker processes, but only %d remain",
    1239              :                      num_dbs + 1, max_wprocs);
    1240            1 :         pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
    1241              :                           "max_worker_processes", num_dbs + 1);
    1242            1 :         failed = true;
    1243              :     }
    1244              : 
    1245            7 :     if (failed)
    1246            1 :         exit(1);
    1247            6 : }
    1248              : 
    1249              : /*
    1250              :  * Drop a specified subscription. This is to avoid duplicate subscriptions on
    1251              :  * the primary (publisher node) and the newly created subscriber. We
    1252              :  * shouldn't drop the associated slot as that would be used by the publisher
    1253              :  * node.
    1254              :  */
    1255              : static void
    1256            4 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
    1257              : {
    1258            4 :     PQExpBuffer query = createPQExpBuffer();
    1259              :     PGresult   *res;
    1260              : 
    1261              :     Assert(conn != NULL);
    1262              : 
    1263              :     /*
    1264              :      * Construct a query string. These commands are allowed to be executed
    1265              :      * within a transaction.
    1266              :      */
    1267            4 :     appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
    1268              :                       subname);
    1269            4 :     appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
    1270              :                       subname);
    1271            4 :     appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
    1272              : 
    1273            4 :     if (dry_run)
    1274            3 :         pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
    1275              :                     subname, dbname);
    1276              :     else
    1277              :     {
    1278            1 :         pg_log_info("dropping subscription \"%s\" in database \"%s\"",
    1279              :                     subname, dbname);
    1280              : 
    1281            1 :         res = PQexec(conn, query->data);
    1282              : 
    1283            1 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1284              :         {
    1285            0 :             pg_log_error("could not drop subscription \"%s\": %s",
    1286              :                          subname, PQresultErrorMessage(res));
    1287            0 :             disconnect_database(conn, true);
    1288              :         }
    1289              : 
    1290            1 :         PQclear(res);
    1291              :     }
    1292              : 
    1293            4 :     destroyPQExpBuffer(query);
    1294            4 : }
    1295              : 
    1296              : /*
    1297              :  * Retrieve and drop the pre-existing subscriptions.
    1298              :  */
    1299              : static void
    1300            8 : check_and_drop_existing_subscriptions(PGconn *conn,
    1301              :                                       const struct LogicalRepInfo *dbinfo)
    1302              : {
    1303            8 :     PQExpBuffer query = createPQExpBuffer();
    1304              :     char       *dbname;
    1305              :     PGresult   *res;
    1306              : 
    1307              :     Assert(conn != NULL);
    1308              : 
    1309            8 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1310              : 
    1311            8 :     appendPQExpBuffer(query,
    1312              :                       "SELECT s.subname FROM pg_catalog.pg_subscription s "
    1313              :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1314              :                       "WHERE d.datname = %s",
    1315              :                       dbname);
    1316            8 :     res = PQexec(conn, query->data);
    1317              : 
    1318            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1319              :     {
    1320            0 :         pg_log_error("could not obtain pre-existing subscriptions: %s",
    1321              :                      PQresultErrorMessage(res));
    1322            0 :         disconnect_database(conn, true);
    1323              :     }
    1324              : 
    1325           12 :     for (int i = 0; i < PQntuples(res); i++)
    1326            4 :         drop_existing_subscription(conn, PQgetvalue(res, i, 0),
    1327            4 :                                    dbinfo->dbname);
    1328              : 
    1329            8 :     PQclear(res);
    1330            8 :     destroyPQExpBuffer(query);
    1331            8 :     PQfreemem(dbname);
    1332            8 : }
    1333              : 
    1334              : /*
    1335              :  * Create the subscriptions, adjust the initial location for logical
    1336              :  * replication and enable the subscriptions. That's the last step for logical
    1337              :  * replication setup.
    1338              :  */
    1339              : static void
    1340            4 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
    1341              : {
    1342           12 :     for (int i = 0; i < num_dbs; i++)
    1343              :     {
    1344              :         PGconn     *conn;
    1345              : 
    1346              :         /* Connect to subscriber. */
    1347            8 :         conn = connect_database(dbinfo[i].subconninfo, true);
    1348              : 
    1349              :         /*
    1350              :          * We don't need the pre-existing subscriptions on the newly formed
    1351              :          * subscriber. They can connect to other publisher nodes and either
    1352              :          * get some unwarranted data or can lead to ERRORs in connecting to
    1353              :          * such nodes.
    1354              :          */
    1355            8 :         check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
    1356              : 
    1357              :         /* Check and drop the required publications in the given database. */
    1358            8 :         check_and_drop_publications(conn, &dbinfo[i]);
    1359              : 
    1360            8 :         create_subscription(conn, &dbinfo[i]);
    1361              : 
    1362              :         /* Set the replication progress to the correct LSN */
    1363            8 :         set_replication_progress(conn, &dbinfo[i], consistent_lsn);
    1364              : 
    1365              :         /* Enable subscription */
    1366            8 :         enable_subscription(conn, &dbinfo[i]);
    1367              : 
    1368            8 :         disconnect_database(conn, false);
    1369              :     }
    1370            4 : }
    1371              : 
    1372              : /*
    1373              :  * Write the required recovery parameters.
    1374              :  */
    1375              : static void
    1376            4 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
    1377              : {
    1378              :     PGconn     *conn;
    1379              :     PQExpBuffer recoveryconfcontents;
    1380              : 
    1381              :     /*
    1382              :      * Despite of the recovery parameters will be written to the subscriber,
    1383              :      * use a publisher connection. The primary_conninfo is generated using the
    1384              :      * connection settings.
    1385              :      */
    1386            4 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1387              : 
    1388              :     /*
    1389              :      * Write recovery parameters.
    1390              :      *
    1391              :      * The subscriber is not running yet. In dry run mode, the recovery
    1392              :      * parameters *won't* be written. An invalid LSN is used for printing
    1393              :      * purposes. Additional recovery parameters are added here. It avoids
    1394              :      * unexpected behavior such as end of recovery as soon as a consistent
    1395              :      * state is reached (recovery_target) and failure due to multiple recovery
    1396              :      * targets (name, time, xid, LSN).
    1397              :      */
    1398            4 :     recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
    1399            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
    1400            4 :     appendPQExpBufferStr(recoveryconfcontents,
    1401              :                          "recovery_target_timeline = 'latest'\n");
    1402              : 
    1403              :     /*
    1404              :      * Set recovery_target_inclusive = false to avoid reapplying the
    1405              :      * transaction committed at 'lsn' after subscription is enabled. This is
    1406              :      * because the provided 'lsn' is also used as the replication start point
    1407              :      * for the subscription. So, the server can send the transaction committed
    1408              :      * at that 'lsn' after replication is started which can lead to applying
    1409              :      * the same transaction twice if we keep recovery_target_inclusive = true.
    1410              :      */
    1411            4 :     appendPQExpBufferStr(recoveryconfcontents,
    1412              :                          "recovery_target_inclusive = false\n");
    1413            4 :     appendPQExpBufferStr(recoveryconfcontents,
    1414              :                          "recovery_target_action = promote\n");
    1415            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
    1416            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
    1417            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
    1418              : 
    1419            4 :     if (dry_run)
    1420              :     {
    1421            3 :         appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
    1422            3 :         appendPQExpBuffer(recoveryconfcontents,
    1423              :                           "recovery_target_lsn = '%X/%08X'\n",
    1424              :                           LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1425              :     }
    1426              :     else
    1427              :     {
    1428            1 :         appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
    1429              :                           lsn);
    1430              :     }
    1431              : 
    1432            4 :     pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
    1433              : 
    1434            4 :     if (!dry_run)
    1435              :     {
    1436              :         char        conf_filename[MAXPGPATH];
    1437              :         FILE       *fd;
    1438              : 
    1439              :         /* Write the recovery parameters to INCLUDED_CONF_FILE */
    1440            1 :         snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
    1441              :                  INCLUDED_CONF_FILE);
    1442            1 :         fd = fopen(conf_filename, "w");
    1443            1 :         if (fd == NULL)
    1444            0 :             pg_fatal("could not open file \"%s\": %m", conf_filename);
    1445              : 
    1446            1 :         if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
    1447            0 :             pg_fatal("could not write to file \"%s\": %m", conf_filename);
    1448              : 
    1449            1 :         fclose(fd);
    1450            1 :         recovery_params_set = true;
    1451              : 
    1452              :         /* Include conditionally the recovery parameters. */
    1453            1 :         resetPQExpBuffer(recoveryconfcontents);
    1454            1 :         appendPQExpBufferStr(recoveryconfcontents,
    1455              :                              "include_if_exists '" INCLUDED_CONF_FILE "'\n");
    1456            1 :         WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
    1457              :     }
    1458              : 
    1459            4 :     disconnect_database(conn, false);
    1460            4 : }
    1461              : 
    1462              : /*
    1463              :  * Drop physical replication slot on primary if the standby was using it. After
    1464              :  * the transformation, it has no use.
    1465              :  *
    1466              :  * XXX we might not fail here. Instead, we provide a warning so the user
    1467              :  * eventually drops this replication slot later.
    1468              :  */
    1469              : static void
    1470            4 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
    1471              : {
    1472              :     PGconn     *conn;
    1473              : 
    1474              :     /* Replication slot does not exist, do nothing */
    1475            4 :     if (!primary_slot_name)
    1476            0 :         return;
    1477              : 
    1478            4 :     conn = connect_database(dbinfo[0].pubconninfo, false);
    1479            4 :     if (conn != NULL)
    1480              :     {
    1481            4 :         drop_replication_slot(conn, &dbinfo[0], slotname);
    1482            4 :         disconnect_database(conn, false);
    1483              :     }
    1484              :     else
    1485              :     {
    1486            0 :         pg_log_warning("could not drop replication slot \"%s\" on primary",
    1487              :                        slotname);
    1488            0 :         pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
    1489              :     }
    1490              : }
    1491              : 
    1492              : /*
    1493              :  * Drop failover replication slots on subscriber. After the transformation,
    1494              :  * they have no use.
    1495              :  *
    1496              :  * XXX We do not fail here. Instead, we provide a warning so the user can drop
    1497              :  * them later.
    1498              :  */
    1499              : static void
    1500            4 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
    1501              : {
    1502              :     PGconn     *conn;
    1503              :     PGresult   *res;
    1504              : 
    1505            4 :     conn = connect_database(dbinfo[0].subconninfo, false);
    1506            4 :     if (conn != NULL)
    1507              :     {
    1508              :         /* Get failover replication slot names */
    1509            4 :         res = PQexec(conn,
    1510              :                      "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
    1511              : 
    1512            4 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
    1513              :         {
    1514              :             /* Remove failover replication slots from subscriber */
    1515            8 :             for (int i = 0; i < PQntuples(res); i++)
    1516            4 :                 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
    1517              :         }
    1518              :         else
    1519              :         {
    1520            0 :             pg_log_warning("could not obtain failover replication slot information: %s",
    1521              :                            PQresultErrorMessage(res));
    1522            0 :             pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1523              :         }
    1524              : 
    1525            4 :         PQclear(res);
    1526            4 :         disconnect_database(conn, false);
    1527              :     }
    1528              :     else
    1529              :     {
    1530            0 :         pg_log_warning("could not drop failover replication slot");
    1531            0 :         pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1532              :     }
    1533            4 : }
    1534              : 
    1535              : /*
    1536              :  * Create a logical replication slot and returns a LSN.
    1537              :  *
    1538              :  * CreateReplicationSlot() is not used because it does not provide the one-row
    1539              :  * result set that contains the LSN.
    1540              :  */
    1541              : static char *
    1542            8 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1543              : {
    1544            8 :     PQExpBuffer str = createPQExpBuffer();
    1545            8 :     PGresult   *res = NULL;
    1546            8 :     const char *slot_name = dbinfo->replslotname;
    1547              :     char       *slot_name_esc;
    1548            8 :     char       *lsn = NULL;
    1549              : 
    1550              :     Assert(conn != NULL);
    1551              : 
    1552            8 :     if (dry_run)
    1553            6 :         pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
    1554              :                     slot_name, dbinfo->dbname);
    1555              :     else
    1556            2 :         pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
    1557              :                     slot_name, dbinfo->dbname);
    1558              : 
    1559            8 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1560              : 
    1561            8 :     appendPQExpBuffer(str,
    1562              :                       "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
    1563              :                       slot_name_esc,
    1564            8 :                       dbinfos.two_phase ? "true" : "false");
    1565              : 
    1566            8 :     PQfreemem(slot_name_esc);
    1567              : 
    1568            8 :     pg_log_debug("command is: %s", str->data);
    1569              : 
    1570            8 :     if (!dry_run)
    1571              :     {
    1572            2 :         res = PQexec(conn, str->data);
    1573            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1574              :         {
    1575            0 :             pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
    1576              :                          slot_name, dbinfo->dbname,
    1577              :                          PQresultErrorMessage(res));
    1578            0 :             PQclear(res);
    1579            0 :             destroyPQExpBuffer(str);
    1580            0 :             return NULL;
    1581              :         }
    1582              : 
    1583            2 :         lsn = pg_strdup(PQgetvalue(res, 0, 0));
    1584            2 :         PQclear(res);
    1585              :     }
    1586              : 
    1587              :     /* For cleanup purposes */
    1588            8 :     dbinfo->made_replslot = true;
    1589              : 
    1590            8 :     destroyPQExpBuffer(str);
    1591              : 
    1592            8 :     return lsn;
    1593              : }
    1594              : 
    1595              : static void
    1596            8 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
    1597              :                       const char *slot_name)
    1598              : {
    1599            8 :     PQExpBuffer str = createPQExpBuffer();
    1600              :     char       *slot_name_esc;
    1601              :     PGresult   *res;
    1602              : 
    1603              :     Assert(conn != NULL);
    1604              : 
    1605            8 :     if (dry_run)
    1606            6 :         pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
    1607              :                     slot_name, dbinfo->dbname);
    1608              :     else
    1609            2 :         pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
    1610              :                     slot_name, dbinfo->dbname);
    1611              : 
    1612            8 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1613              : 
    1614            8 :     appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
    1615              : 
    1616            8 :     PQfreemem(slot_name_esc);
    1617              : 
    1618            8 :     pg_log_debug("command is: %s", str->data);
    1619              : 
    1620            8 :     if (!dry_run)
    1621              :     {
    1622            2 :         res = PQexec(conn, str->data);
    1623            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1624              :         {
    1625            0 :             pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
    1626              :                          slot_name, dbinfo->dbname, PQresultErrorMessage(res));
    1627            0 :             dbinfo->made_replslot = false;   /* don't try again. */
    1628              :         }
    1629              : 
    1630            2 :         PQclear(res);
    1631              :     }
    1632              : 
    1633            8 :     destroyPQExpBuffer(str);
    1634            8 : }
    1635              : 
    1636              : /*
    1637              :  * Reports a suitable message if pg_ctl fails.
    1638              :  */
    1639              : static void
    1640           24 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
    1641              : {
    1642           24 :     if (rc != 0)
    1643              :     {
    1644            0 :         if (WIFEXITED(rc))
    1645              :         {
    1646            0 :             pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
    1647              :         }
    1648            0 :         else if (WIFSIGNALED(rc))
    1649              :         {
    1650              : #if defined(WIN32)
    1651              :             pg_log_error("pg_ctl was terminated by exception 0x%X",
    1652              :                          WTERMSIG(rc));
    1653              :             pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
    1654              : #else
    1655            0 :             pg_log_error("pg_ctl was terminated by signal %d: %s",
    1656              :                          WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
    1657              : #endif
    1658              :         }
    1659              :         else
    1660              :         {
    1661            0 :             pg_log_error("pg_ctl exited with unrecognized status %d", rc);
    1662              :         }
    1663              : 
    1664            0 :         pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
    1665            0 :         exit(1);
    1666              :     }
    1667           24 : }
    1668              : 
    1669              : static void
    1670           12 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
    1671              :                      bool restrict_logical_worker)
    1672              : {
    1673           12 :     PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    1674              :     int         rc;
    1675              : 
    1676           12 :     appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
    1677           12 :     appendShellString(pg_ctl_cmd, subscriber_dir);
    1678           12 :     appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
    1679              : 
    1680              :     /* Prevent unintended slot invalidation */
    1681           12 :     appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
    1682              : 
    1683           12 :     if (restricted_access)
    1684              :     {
    1685           12 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
    1686              : #if !defined(WIN32)
    1687              : 
    1688              :         /*
    1689              :          * An empty listen_addresses list means the server does not listen on
    1690              :          * any IP interfaces; only Unix-domain sockets can be used to connect
    1691              :          * to the server. Prevent external connections to minimize the chance
    1692              :          * of failure.
    1693              :          */
    1694           12 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
    1695           12 :         if (opt->socket_dir)
    1696           12 :             appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
    1697           12 :                               opt->socket_dir);
    1698           12 :         appendPQExpBufferChar(pg_ctl_cmd, '"');
    1699              : #endif
    1700              :     }
    1701           12 :     if (opt->config_file != NULL)
    1702            0 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
    1703            0 :                           opt->config_file);
    1704              : 
    1705              :     /* Suppress to start logical replication if requested */
    1706           12 :     if (restrict_logical_worker)
    1707            4 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
    1708              : 
    1709           12 :     if (opt->log_dir)
    1710            2 :         appendPQExpBuffer(pg_ctl_cmd, " -l \"%s/%s\"", logdir, SERVER_LOG_FILE_NAME);
    1711              : 
    1712           12 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
    1713           12 :     rc = system(pg_ctl_cmd->data);
    1714           12 :     pg_ctl_status(pg_ctl_cmd->data, rc);
    1715           12 :     standby_running = true;
    1716           12 :     destroyPQExpBuffer(pg_ctl_cmd);
    1717           12 :     pg_log_info("server was started");
    1718           12 : }
    1719              : 
    1720              : static void
    1721           12 : stop_standby_server(const char *datadir)
    1722              : {
    1723              :     char       *pg_ctl_cmd;
    1724              :     int         rc;
    1725              : 
    1726           12 :     pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
    1727              :                           datadir);
    1728           12 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
    1729           12 :     rc = system(pg_ctl_cmd);
    1730           12 :     pg_ctl_status(pg_ctl_cmd, rc);
    1731           12 :     standby_running = false;
    1732           12 :     pg_log_info("server was stopped");
    1733           12 : }
    1734              : 
    1735              : /*
    1736              :  * Returns after the server finishes the recovery process.
    1737              :  *
    1738              :  * If recovery_timeout option is set, terminate abnormally without finishing
    1739              :  * the recovery process. By default, it waits forever.
    1740              :  *
    1741              :  * XXX Is the recovery process still in progress? When recovery process has a
    1742              :  * better progress reporting mechanism, it should be added here.
    1743              :  */
    1744              : static void
    1745            4 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
    1746              : {
    1747              :     PGconn     *conn;
    1748            4 :     bool        ready = false;
    1749            4 :     int         timer = 0;
    1750              : 
    1751            4 :     pg_log_info("waiting for the target server to reach the consistent state");
    1752              : 
    1753            4 :     conn = connect_database(conninfo, true);
    1754              : 
    1755              :     for (;;)
    1756              :     {
    1757              :         /* Did the recovery process finish? We're done if so. */
    1758            4 :         if (dry_run || !server_is_in_recovery(conn))
    1759              :         {
    1760            4 :             ready = true;
    1761            4 :             recovery_ended = true;
    1762            4 :             break;
    1763              :         }
    1764              : 
    1765              :         /* Bail out after recovery_timeout seconds if this option is set */
    1766            0 :         if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
    1767              :         {
    1768            0 :             stop_standby_server(subscriber_dir);
    1769            0 :             pg_log_error("recovery timed out");
    1770            0 :             disconnect_database(conn, true);
    1771              :         }
    1772              : 
    1773              :         /* Keep waiting */
    1774            0 :         pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
    1775            0 :         timer += WAIT_INTERVAL;
    1776              :     }
    1777              : 
    1778            4 :     disconnect_database(conn, false);
    1779              : 
    1780            4 :     if (!ready)
    1781            0 :         pg_fatal("server did not end recovery");
    1782              : 
    1783            4 :     pg_log_info("target server reached the consistent state");
    1784            4 :     pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
    1785            4 : }
    1786              : 
    1787              : /*
    1788              :  * Create a publication that includes all tables in the database.
    1789              :  */
    1790              : static void
    1791            7 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1792              : {
    1793            7 :     PQExpBuffer str = createPQExpBuffer();
    1794              :     PGresult   *res;
    1795              :     char       *ipubname_esc;
    1796              :     char       *spubname_esc;
    1797              : 
    1798              :     Assert(conn != NULL);
    1799              : 
    1800            7 :     ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1801            7 :     spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1802              : 
    1803              :     /* Check if the publication already exists */
    1804            7 :     appendPQExpBuffer(str,
    1805              :                       "SELECT 1 FROM pg_catalog.pg_publication "
    1806              :                       "WHERE pubname = %s",
    1807              :                       spubname_esc);
    1808            7 :     res = PQexec(conn, str->data);
    1809            7 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1810              :     {
    1811            0 :         pg_log_error("could not obtain publication information: %s",
    1812              :                      PQresultErrorMessage(res));
    1813            0 :         disconnect_database(conn, true);
    1814              :     }
    1815              : 
    1816            7 :     if (PQntuples(res) == 1)
    1817              :     {
    1818              :         /*
    1819              :          * Unfortunately, if it reaches this code path, it will always fail
    1820              :          * (unless you decide to change the existing publication name). That's
    1821              :          * bad but it is very unlikely that the user will choose a name with
    1822              :          * pg_createsubscriber_ prefix followed by the exact database oid and
    1823              :          * a random number.
    1824              :          */
    1825            0 :         pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
    1826            0 :         pg_log_error_hint("Consider renaming this publication before continuing.");
    1827            0 :         disconnect_database(conn, true);
    1828              :     }
    1829              : 
    1830            7 :     PQclear(res);
    1831            7 :     resetPQExpBuffer(str);
    1832              : 
    1833            7 :     if (dry_run)
    1834            6 :         pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
    1835              :                     dbinfo->pubname, dbinfo->dbname);
    1836              :     else
    1837            1 :         pg_log_info("creating publication \"%s\" in database \"%s\"",
    1838              :                     dbinfo->pubname, dbinfo->dbname);
    1839              : 
    1840            7 :     appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
    1841              :                       ipubname_esc);
    1842              : 
    1843            7 :     pg_log_debug("command is: %s", str->data);
    1844              : 
    1845            7 :     if (!dry_run)
    1846              :     {
    1847            1 :         res = PQexec(conn, str->data);
    1848            1 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1849              :         {
    1850            0 :             pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
    1851              :                          dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1852            0 :             disconnect_database(conn, true);
    1853              :         }
    1854            1 :         PQclear(res);
    1855              :     }
    1856              : 
    1857              :     /* For cleanup purposes */
    1858            7 :     dbinfo->made_publication = true;
    1859              : 
    1860            7 :     PQfreemem(ipubname_esc);
    1861            7 :     PQfreemem(spubname_esc);
    1862            7 :     destroyPQExpBuffer(str);
    1863            7 : }
    1864              : 
    1865              : /*
    1866              :  * Drop the specified publication in the given database.
    1867              :  */
    1868              : static void
    1869           10 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
    1870              :                  bool *made_publication)
    1871              : {
    1872           10 :     PQExpBuffer str = createPQExpBuffer();
    1873              :     PGresult   *res;
    1874              :     char       *pubname_esc;
    1875              : 
    1876              :     Assert(conn != NULL);
    1877              : 
    1878           10 :     pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
    1879              : 
    1880           10 :     if (dry_run)
    1881            6 :         pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
    1882              :                     pubname, dbname);
    1883              :     else
    1884            4 :         pg_log_info("dropping publication \"%s\" in database \"%s\"",
    1885              :                     pubname, dbname);
    1886              : 
    1887           10 :     appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
    1888              : 
    1889           10 :     PQfreemem(pubname_esc);
    1890              : 
    1891           10 :     pg_log_debug("command is: %s", str->data);
    1892              : 
    1893           10 :     if (!dry_run)
    1894              :     {
    1895            4 :         res = PQexec(conn, str->data);
    1896            4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1897              :         {
    1898            0 :             pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
    1899              :                          pubname, dbname, PQresultErrorMessage(res));
    1900            0 :             *made_publication = false;  /* don't try again. */
    1901              : 
    1902              :             /*
    1903              :              * Don't disconnect and exit here. This routine is used by primary
    1904              :              * (cleanup publication / replication slot due to an error) and
    1905              :              * subscriber (remove the replicated publications). In both cases,
    1906              :              * it can continue and provide instructions for the user to remove
    1907              :              * it later if cleanup fails.
    1908              :              */
    1909              :         }
    1910            4 :         PQclear(res);
    1911              :     }
    1912              : 
    1913           10 :     destroyPQExpBuffer(str);
    1914           10 : }
    1915              : 
    1916              : /*
    1917              :  * Retrieve and drop the publications.
    1918              :  *
    1919              :  * Publications copied during physical replication remain on the subscriber
    1920              :  * after promotion. If --clean=publications is specified, drop all existing
    1921              :  * publications in the subscriber database. Otherwise, only drop publications
    1922              :  * that were created by pg_createsubscriber during this operation.
    1923              :  */
    1924              : static void
    1925            8 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1926              : {
    1927              :     PGresult   *res;
    1928            8 :     bool        drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
    1929              : 
    1930              :     Assert(conn != NULL);
    1931              : 
    1932            8 :     if (drop_all_pubs)
    1933              :     {
    1934            2 :         pg_log_info("dropping all existing publications in database \"%s\"",
    1935              :                     dbinfo->dbname);
    1936              : 
    1937              :         /* Fetch all publication names */
    1938            2 :         res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
    1939            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1940              :         {
    1941            0 :             pg_log_error("could not obtain publication information: %s",
    1942              :                          PQresultErrorMessage(res));
    1943            0 :             PQclear(res);
    1944            0 :             disconnect_database(conn, true);
    1945              :         }
    1946              : 
    1947              :         /* Drop each publication */
    1948            6 :         for (int i = 0; i < PQntuples(res); i++)
    1949            4 :             drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
    1950              :                              &dbinfo->made_publication);
    1951              : 
    1952            2 :         PQclear(res);
    1953              :     }
    1954              :     else
    1955              :     {
    1956              :         /* Drop publication only if it was created by this tool */
    1957            6 :         if (dbinfo->made_publication)
    1958              :         {
    1959            6 :             drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
    1960              :                              &dbinfo->made_publication);
    1961              :         }
    1962              :         else
    1963              :         {
    1964            0 :             if (dry_run)
    1965            0 :                 pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
    1966              :                             dbinfo->pubname, dbinfo->dbname);
    1967              :             else
    1968            0 :                 pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
    1969              :                             dbinfo->pubname, dbinfo->dbname);
    1970              :         }
    1971              :     }
    1972            8 : }
    1973              : 
    1974              : /*
    1975              :  * Create a subscription with some predefined options.
    1976              :  *
    1977              :  * A replication slot was already created in a previous step. Let's use it.  It
    1978              :  * is not required to copy data. The subscription will be created but it will
    1979              :  * not be enabled now. That's because the replication progress must be set and
    1980              :  * the replication origin name (one of the function arguments) contains the
    1981              :  * subscription OID in its name. Once the subscription is created,
    1982              :  * set_replication_progress() can obtain the chosen origin name and set up its
    1983              :  * initial location.
    1984              :  */
    1985              : static void
    1986            8 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1987              : {
    1988            8 :     PQExpBuffer str = createPQExpBuffer();
    1989              :     PGresult   *res;
    1990              :     char       *pubname_esc;
    1991              :     char       *subname_esc;
    1992              :     char       *pubconninfo_esc;
    1993              :     char       *replslotname_esc;
    1994              : 
    1995              :     Assert(conn != NULL);
    1996              : 
    1997            8 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1998            8 :     subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1999            8 :     pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
    2000            8 :     replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
    2001              : 
    2002            8 :     if (dry_run)
    2003            6 :         pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
    2004              :                     dbinfo->subname, dbinfo->dbname);
    2005              :     else
    2006            2 :         pg_log_info("creating subscription \"%s\" in database \"%s\"",
    2007              :                     dbinfo->subname, dbinfo->dbname);
    2008              : 
    2009            8 :     appendPQExpBuffer(str,
    2010              :                       "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
    2011              :                       "WITH (create_slot = false, enabled = false, "
    2012              :                       "slot_name = %s, copy_data = false, two_phase = %s)",
    2013              :                       subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
    2014            8 :                       dbinfos.two_phase ? "true" : "false");
    2015              : 
    2016            8 :     PQfreemem(pubname_esc);
    2017            8 :     PQfreemem(subname_esc);
    2018            8 :     PQfreemem(pubconninfo_esc);
    2019            8 :     PQfreemem(replslotname_esc);
    2020              : 
    2021            8 :     pg_log_debug("command is: %s", str->data);
    2022              : 
    2023            8 :     if (!dry_run)
    2024              :     {
    2025            2 :         res = PQexec(conn, str->data);
    2026            2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2027              :         {
    2028            0 :             pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
    2029              :                          dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
    2030            0 :             disconnect_database(conn, true);
    2031              :         }
    2032            2 :         PQclear(res);
    2033              :     }
    2034              : 
    2035            8 :     destroyPQExpBuffer(str);
    2036            8 : }
    2037              : 
    2038              : /*
    2039              :  * Sets the replication progress to the consistent LSN.
    2040              :  *
    2041              :  * The subscriber caught up to the consistent LSN provided by the last
    2042              :  * replication slot that was created. The goal is to set up the initial
    2043              :  * location for the logical replication that is the exact LSN that the
    2044              :  * subscriber was promoted. Once the subscription is enabled it will start
    2045              :  * streaming from that location onwards.  In dry run mode, the subscription OID
    2046              :  * and LSN are set to invalid values for printing purposes.
    2047              :  */
    2048              : static void
    2049            8 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
    2050              : {
    2051            8 :     PQExpBuffer str = createPQExpBuffer();
    2052              :     PGresult   *res;
    2053              :     Oid         suboid;
    2054              :     char       *subname;
    2055              :     char       *dbname;
    2056              :     char       *originname;
    2057              :     char       *lsnstr;
    2058              : 
    2059              :     Assert(conn != NULL);
    2060              : 
    2061            8 :     subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
    2062            8 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    2063              : 
    2064            8 :     appendPQExpBuffer(str,
    2065              :                       "SELECT s.oid FROM pg_catalog.pg_subscription s "
    2066              :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    2067              :                       "WHERE s.subname = %s AND d.datname = %s",
    2068              :                       subname, dbname);
    2069              : 
    2070            8 :     res = PQexec(conn, str->data);
    2071            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2072              :     {
    2073            0 :         pg_log_error("could not obtain subscription OID: %s",
    2074              :                      PQresultErrorMessage(res));
    2075            0 :         disconnect_database(conn, true);
    2076              :     }
    2077              : 
    2078            8 :     if (PQntuples(res) != 1 && !dry_run)
    2079              :     {
    2080            0 :         pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
    2081              :                      PQntuples(res), 1);
    2082            0 :         disconnect_database(conn, true);
    2083              :     }
    2084              : 
    2085            8 :     if (dry_run)
    2086              :     {
    2087            6 :         suboid = InvalidOid;
    2088            6 :         lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    2089              :     }
    2090              :     else
    2091              :     {
    2092            2 :         suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
    2093            2 :         lsnstr = psprintf("%s", lsn);
    2094              :     }
    2095              : 
    2096            8 :     PQclear(res);
    2097              : 
    2098              :     /*
    2099              :      * The origin name is defined as pg_%u. %u is the subscription OID. See
    2100              :      * ApplyWorkerMain().
    2101              :      */
    2102            8 :     originname = psprintf("pg_%u", suboid);
    2103              : 
    2104            8 :     if (dry_run)
    2105            6 :         pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2106              :                     originname, lsnstr, dbinfo->dbname);
    2107              :     else
    2108            2 :         pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2109              :                     originname, lsnstr, dbinfo->dbname);
    2110              : 
    2111            8 :     resetPQExpBuffer(str);
    2112            8 :     appendPQExpBuffer(str,
    2113              :                       "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
    2114              :                       originname, lsnstr);
    2115              : 
    2116            8 :     pg_log_debug("command is: %s", str->data);
    2117              : 
    2118            8 :     if (!dry_run)
    2119              :     {
    2120            2 :         res = PQexec(conn, str->data);
    2121            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2122              :         {
    2123            0 :             pg_log_error("could not set replication progress for subscription \"%s\": %s",
    2124              :                          dbinfo->subname, PQresultErrorMessage(res));
    2125            0 :             disconnect_database(conn, true);
    2126              :         }
    2127            2 :         PQclear(res);
    2128              :     }
    2129              : 
    2130            8 :     PQfreemem(subname);
    2131            8 :     PQfreemem(dbname);
    2132            8 :     pg_free(originname);
    2133            8 :     pg_free(lsnstr);
    2134            8 :     destroyPQExpBuffer(str);
    2135            8 : }
    2136              : 
    2137              : /*
    2138              :  * Enables the subscription.
    2139              :  *
    2140              :  * The subscription was created in a previous step but it was disabled. After
    2141              :  * adjusting the initial logical replication location, enable the subscription.
    2142              :  */
    2143              : static void
    2144            8 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    2145              : {
    2146            8 :     PQExpBuffer str = createPQExpBuffer();
    2147              :     PGresult   *res;
    2148              :     char       *subname;
    2149              : 
    2150              :     Assert(conn != NULL);
    2151              : 
    2152            8 :     subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    2153              : 
    2154            8 :     if (dry_run)
    2155            6 :         pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
    2156              :                     dbinfo->subname, dbinfo->dbname);
    2157              :     else
    2158            2 :         pg_log_info("enabling subscription \"%s\" in database \"%s\"",
    2159              :                     dbinfo->subname, dbinfo->dbname);
    2160              : 
    2161            8 :     appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
    2162              : 
    2163            8 :     pg_log_debug("command is: %s", str->data);
    2164              : 
    2165            8 :     if (!dry_run)
    2166              :     {
    2167            2 :         res = PQexec(conn, str->data);
    2168            2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2169              :         {
    2170            0 :             pg_log_error("could not enable subscription \"%s\": %s",
    2171              :                          dbinfo->subname, PQresultErrorMessage(res));
    2172            0 :             disconnect_database(conn, true);
    2173              :         }
    2174              : 
    2175            2 :         PQclear(res);
    2176              :     }
    2177              : 
    2178            8 :     PQfreemem(subname);
    2179            8 :     destroyPQExpBuffer(str);
    2180            8 : }
    2181              : 
    2182              : /*
    2183              :  * Fetch a list of all connectable non-template databases from the source server
    2184              :  * and form a list such that they appear as if the user has specified multiple
    2185              :  * --database options, one for each source database.
    2186              :  */
    2187              : static void
    2188            1 : get_publisher_databases(struct CreateSubscriberOptions *opt,
    2189              :                         bool dbnamespecified)
    2190              : {
    2191              :     PGconn     *conn;
    2192              :     PGresult   *res;
    2193              : 
    2194              :     /* If a database name was specified, just connect to it. */
    2195            1 :     if (dbnamespecified)
    2196            0 :         conn = connect_database(opt->pub_conninfo_str, true);
    2197              :     else
    2198              :     {
    2199              :         /* Otherwise, try postgres first and then template1. */
    2200              :         char       *conninfo;
    2201              : 
    2202            1 :         conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
    2203            1 :         conn = connect_database(conninfo, false);
    2204            1 :         pg_free(conninfo);
    2205            1 :         if (!conn)
    2206              :         {
    2207            0 :             conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
    2208            0 :             conn = connect_database(conninfo, true);
    2209            0 :             pg_free(conninfo);
    2210              :         }
    2211              :     }
    2212              : 
    2213            1 :     res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
    2214            1 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2215              :     {
    2216            0 :         pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
    2217            0 :         PQclear(res);
    2218            0 :         disconnect_database(conn, true);
    2219              :     }
    2220              : 
    2221            4 :     for (int i = 0; i < PQntuples(res); i++)
    2222              :     {
    2223            3 :         const char *dbname = PQgetvalue(res, i, 0);
    2224              : 
    2225            3 :         simple_string_list_append(&opt->database_names, dbname);
    2226              : 
    2227              :         /* Increment num_dbs to reflect multiple --database options */
    2228            3 :         num_dbs++;
    2229              :     }
    2230              : 
    2231            1 :     PQclear(res);
    2232            1 :     disconnect_database(conn, false);
    2233            1 : }
    2234              : 
    2235              : int
    2236           23 : main(int argc, char **argv)
    2237              : {
    2238              :     static struct option long_options[] =
    2239              :     {
    2240              :         {"all", no_argument, NULL, 'a'},
    2241              :         {"database", required_argument, NULL, 'd'},
    2242              :         {"pgdata", required_argument, NULL, 'D'},
    2243              :         {"logdir", required_argument, NULL, 'l'},
    2244              :         {"dry-run", no_argument, NULL, 'n'},
    2245              :         {"subscriber-port", required_argument, NULL, 'p'},
    2246              :         {"publisher-server", required_argument, NULL, 'P'},
    2247              :         {"socketdir", required_argument, NULL, 's'},
    2248              :         {"recovery-timeout", required_argument, NULL, 't'},
    2249              :         {"enable-two-phase", no_argument, NULL, 'T'},
    2250              :         {"subscriber-username", required_argument, NULL, 'U'},
    2251              :         {"verbose", no_argument, NULL, 'v'},
    2252              :         {"version", no_argument, NULL, 'V'},
    2253              :         {"help", no_argument, NULL, '?'},
    2254              :         {"config-file", required_argument, NULL, 1},
    2255              :         {"publication", required_argument, NULL, 2},
    2256              :         {"replication-slot", required_argument, NULL, 3},
    2257              :         {"subscription", required_argument, NULL, 4},
    2258              :         {"clean", required_argument, NULL, 5},
    2259              :         {NULL, 0, NULL, 0}
    2260              :     };
    2261              : 
    2262           23 :     struct CreateSubscriberOptions opt = {0};
    2263              : 
    2264              :     int         c;
    2265              :     int         option_index;
    2266              : 
    2267              :     char       *pub_base_conninfo;
    2268              :     char       *sub_base_conninfo;
    2269           23 :     char       *dbname_conninfo = NULL;
    2270              : 
    2271              :     uint64      pub_sysid;
    2272              :     uint64      sub_sysid;
    2273              :     struct stat statbuf;
    2274              : 
    2275              :     char       *consistent_lsn;
    2276              : 
    2277              :     char        pidfile[MAXPGPATH];
    2278              : 
    2279           23 :     pg_logging_init(argv[0]);
    2280           23 :     pg_logging_set_level(PG_LOG_WARNING);
    2281           23 :     progname = get_progname(argv[0]);
    2282           23 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
    2283              : 
    2284           23 :     if (argc > 1)
    2285              :     {
    2286           22 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
    2287              :         {
    2288            1 :             usage();
    2289            1 :             exit(0);
    2290              :         }
    2291           21 :         else if (strcmp(argv[1], "-V") == 0
    2292           21 :                  || strcmp(argv[1], "--version") == 0)
    2293              :         {
    2294            1 :             puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
    2295            1 :             exit(0);
    2296              :         }
    2297              :     }
    2298              : 
    2299              :     /* Default settings */
    2300           21 :     subscriber_dir = NULL;
    2301           21 :     opt.config_file = NULL;
    2302           21 :     opt.log_dir = NULL;
    2303           21 :     opt.pub_conninfo_str = NULL;
    2304           21 :     opt.socket_dir = NULL;
    2305           21 :     opt.sub_port = DEFAULT_SUB_PORT;
    2306           21 :     opt.sub_username = NULL;
    2307           21 :     opt.two_phase = false;
    2308           21 :     opt.database_names = (SimpleStringList)
    2309              :     {
    2310              :         0
    2311              :     };
    2312           21 :     opt.recovery_timeout = 0;
    2313           21 :     opt.all_dbs = false;
    2314              : 
    2315              :     /*
    2316              :      * Don't allow it to be run as root. It uses pg_ctl which does not allow
    2317              :      * it either.
    2318              :      */
    2319              : #ifndef WIN32
    2320           21 :     if (geteuid() == 0)
    2321              :     {
    2322            0 :         pg_log_error("cannot be executed by \"root\"");
    2323            0 :         pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
    2324              :                           progname);
    2325            0 :         exit(1);
    2326              :     }
    2327              : #endif
    2328              : 
    2329           21 :     get_restricted_token();
    2330              : 
    2331          163 :     while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
    2332          163 :                             long_options, &option_index)) != -1)
    2333              :     {
    2334          145 :         switch (c)
    2335              :         {
    2336            3 :             case 'a':
    2337            3 :                 opt.all_dbs = true;
    2338            3 :                 break;
    2339           25 :             case 'd':
    2340           25 :                 if (!simple_string_list_member(&opt.database_names, optarg))
    2341              :                 {
    2342           24 :                     simple_string_list_append(&opt.database_names, optarg);
    2343           24 :                     num_dbs++;
    2344              :                 }
    2345              :                 else
    2346            1 :                     pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
    2347           24 :                 break;
    2348           19 :             case 'D':
    2349           19 :                 subscriber_dir = pg_strdup(optarg);
    2350           19 :                 canonicalize_path(subscriber_dir);
    2351           19 :                 break;
    2352            1 :             case 'l':
    2353            1 :                 opt.log_dir = pg_strdup(optarg);
    2354            1 :                 canonicalize_path(opt.log_dir);
    2355            1 :                 break;
    2356            9 :             case 'n':
    2357            9 :                 dry_run = true;
    2358            9 :                 break;
    2359           12 :             case 'p':
    2360           12 :                 opt.sub_port = pg_strdup(optarg);
    2361           12 :                 break;
    2362           18 :             case 'P':
    2363           18 :                 opt.pub_conninfo_str = pg_strdup(optarg);
    2364           18 :                 break;
    2365           12 :             case 's':
    2366           12 :                 opt.socket_dir = pg_strdup(optarg);
    2367           12 :                 canonicalize_path(opt.socket_dir);
    2368           12 :                 break;
    2369            3 :             case 't':
    2370            3 :                 opt.recovery_timeout = atoi(optarg);
    2371            3 :                 break;
    2372            1 :             case 'T':
    2373            1 :                 opt.two_phase = true;
    2374            1 :                 break;
    2375            0 :             case 'U':
    2376            0 :                 opt.sub_username = pg_strdup(optarg);
    2377            0 :                 break;
    2378           19 :             case 'v':
    2379           19 :                 pg_logging_increase_verbosity();
    2380           19 :                 break;
    2381            0 :             case 1:
    2382            0 :                 opt.config_file = pg_strdup(optarg);
    2383            0 :                 break;
    2384           12 :             case 2:
    2385           12 :                 if (!simple_string_list_member(&opt.pub_names, optarg))
    2386              :                 {
    2387           11 :                     simple_string_list_append(&opt.pub_names, optarg);
    2388           11 :                     num_pubs++;
    2389              :                 }
    2390              :                 else
    2391            1 :                     pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
    2392           11 :                 break;
    2393            4 :             case 3:
    2394            4 :                 if (!simple_string_list_member(&opt.replslot_names, optarg))
    2395              :                 {
    2396            4 :                     simple_string_list_append(&opt.replslot_names, optarg);
    2397            4 :                     num_replslots++;
    2398              :                 }
    2399              :                 else
    2400            0 :                     pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
    2401            4 :                 break;
    2402            5 :             case 4:
    2403            5 :                 if (!simple_string_list_member(&opt.sub_names, optarg))
    2404              :                 {
    2405            5 :                     simple_string_list_append(&opt.sub_names, optarg);
    2406            5 :                     num_subs++;
    2407              :                 }
    2408              :                 else
    2409            0 :                     pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
    2410            5 :                 break;
    2411            1 :             case 5:
    2412            1 :                 if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
    2413            1 :                     simple_string_list_append(&opt.objecttypes_to_clean, optarg);
    2414              :                 else
    2415            0 :                     pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
    2416            1 :                 break;
    2417            1 :             default:
    2418              :                 /* getopt_long already emitted a complaint */
    2419            1 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2420            1 :                 exit(1);
    2421              :         }
    2422              :     }
    2423              : 
    2424              :     /* Validate that --all is not used with incompatible options */
    2425           18 :     if (opt.all_dbs)
    2426              :     {
    2427            3 :         char       *bad_switch = NULL;
    2428              : 
    2429            3 :         if (num_dbs > 0)
    2430            1 :             bad_switch = "--database";
    2431            2 :         else if (num_pubs > 0)
    2432            1 :             bad_switch = "--publication";
    2433            1 :         else if (num_replslots > 0)
    2434            0 :             bad_switch = "--replication-slot";
    2435            1 :         else if (num_subs > 0)
    2436            0 :             bad_switch = "--subscription";
    2437              : 
    2438            3 :         if (bad_switch)
    2439              :         {
    2440            2 :             pg_log_error("options %s and %s cannot be used together",
    2441              :                          bad_switch, "-a/--all");
    2442            2 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2443            2 :             exit(1);
    2444              :         }
    2445              :     }
    2446              : 
    2447              :     /* Any non-option arguments? */
    2448           16 :     if (optind < argc)
    2449              :     {
    2450            0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
    2451              :                      argv[optind]);
    2452            0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2453            0 :         exit(1);
    2454              :     }
    2455              : 
    2456              :     /* Required arguments */
    2457           16 :     if (subscriber_dir == NULL)
    2458              :     {
    2459            1 :         pg_log_error("no subscriber data directory specified");
    2460            1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2461            1 :         exit(1);
    2462              :     }
    2463              : 
    2464              :     /* If socket directory is not provided, use the current directory */
    2465           15 :     if (opt.socket_dir == NULL)
    2466              :     {
    2467              :         char        cwd[MAXPGPATH];
    2468              : 
    2469            5 :         if (!getcwd(cwd, MAXPGPATH))
    2470            0 :             pg_fatal("could not determine current directory");
    2471            5 :         opt.socket_dir = pg_strdup(cwd);
    2472            5 :         canonicalize_path(opt.socket_dir);
    2473              :     }
    2474              : 
    2475              :     /*
    2476              :      * Parse connection string. Build a base connection string that might be
    2477              :      * reused by multiple databases.
    2478              :      */
    2479           15 :     if (opt.pub_conninfo_str == NULL)
    2480              :     {
    2481              :         /*
    2482              :          * TODO use primary_conninfo (if available) from subscriber and
    2483              :          * extract publisher connection string. Assume that there are
    2484              :          * identical entries for physical and logical replication. If there is
    2485              :          * not, we would fail anyway.
    2486              :          */
    2487            1 :         pg_log_error("no publisher connection string specified");
    2488            1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    2489            1 :         exit(1);
    2490              :     }
    2491              : 
    2492           14 :     if (opt.log_dir != NULL)
    2493              :     {
    2494              :         char       *internal_log_file;
    2495              :         FILE       *internal_log_file_fp;
    2496              : 
    2497            1 :         umask(PG_MODE_MASK_OWNER);
    2498              : 
    2499              :         /*
    2500              :          * Set mask based on PGDATA permissions, needed for the creation of
    2501              :          * the output directories with correct permissions, similar with
    2502              :          * pg_ctl and pg_upgrade.
    2503              :          *
    2504              :          * Don't error here if the data directory cannot be stat'd. Upcoming
    2505              :          * checks for the data directory would raise the fatal error later.
    2506              :          */
    2507            1 :         if (GetDataDirectoryCreatePerm(subscriber_dir))
    2508            1 :             umask(pg_mode_mask);
    2509              : 
    2510            1 :         make_output_dirs(opt.log_dir);
    2511            1 :         internal_log_file = psprintf("%s/%s", logdir, INTERNAL_LOG_FILE_NAME);
    2512              : 
    2513            1 :         internal_log_file_fp = fopen(internal_log_file, "a");
    2514            1 :         if (!internal_log_file_fp)
    2515            0 :             pg_fatal("could not open log file \"%s\": %m", internal_log_file);
    2516              : 
    2517            1 :         pg_free(internal_log_file);
    2518              : 
    2519            1 :         pg_logging_set_logfile(internal_log_file_fp);
    2520              :     }
    2521              : 
    2522           14 :     if (dry_run)
    2523            8 :         pg_log_info("Executing in dry-run mode.\n"
    2524              :                     "The target directory will not be modified.");
    2525              : 
    2526           14 :     pg_log_info("validating publisher connection string");
    2527           14 :     pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
    2528              :                                           &dbname_conninfo);
    2529           14 :     if (pub_base_conninfo == NULL)
    2530            0 :         exit(1);
    2531              : 
    2532           14 :     pg_log_info("validating subscriber connection string");
    2533           14 :     sub_base_conninfo = get_sub_conninfo(&opt);
    2534              : 
    2535              :     /*
    2536              :      * Fetch all databases from the source (publisher) and treat them as if
    2537              :      * the user specified has multiple --database options, one for each source
    2538              :      * database.
    2539              :      */
    2540           14 :     if (opt.all_dbs)
    2541              :     {
    2542            1 :         bool        dbnamespecified = (dbname_conninfo != NULL);
    2543              : 
    2544            1 :         get_publisher_databases(&opt, dbnamespecified);
    2545              :     }
    2546              : 
    2547           14 :     if (opt.database_names.head == NULL)
    2548              :     {
    2549            2 :         pg_log_info("no database was specified");
    2550              : 
    2551              :         /*
    2552              :          * Try to obtain the dbname from the publisher conninfo. If dbname
    2553              :          * parameter is not available, error out.
    2554              :          */
    2555            2 :         if (dbname_conninfo)
    2556              :         {
    2557            1 :             simple_string_list_append(&opt.database_names, dbname_conninfo);
    2558            1 :             num_dbs++;
    2559              : 
    2560            1 :             pg_log_info("database name \"%s\" was extracted from the publisher connection string",
    2561              :                         dbname_conninfo);
    2562              :         }
    2563              :         else
    2564              :         {
    2565            1 :             pg_log_error("no database name specified");
    2566            1 :             pg_log_error_hint("Try \"%s --help\" for more information.",
    2567              :                               progname);
    2568            1 :             exit(1);
    2569              :         }
    2570              :     }
    2571              : 
    2572              :     /* Number of object names must match number of databases */
    2573           13 :     if (num_pubs > 0 && num_pubs != num_dbs)
    2574              :     {
    2575            1 :         pg_log_error("wrong number of publication names specified");
    2576            1 :         pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
    2577              :                             num_pubs, num_dbs);
    2578            1 :         exit(1);
    2579              :     }
    2580           12 :     if (num_subs > 0 && num_subs != num_dbs)
    2581              :     {
    2582            1 :         pg_log_error("wrong number of subscription names specified");
    2583            1 :         pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
    2584              :                             num_subs, num_dbs);
    2585            1 :         exit(1);
    2586              :     }
    2587           11 :     if (num_replslots > 0 && num_replslots != num_dbs)
    2588              :     {
    2589            1 :         pg_log_error("wrong number of replication slot names specified");
    2590            1 :         pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
    2591              :                             num_replslots, num_dbs);
    2592            1 :         exit(1);
    2593              :     }
    2594              : 
    2595              :     /* Verify the object types specified for removal from the subscriber */
    2596           11 :     for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
    2597              :     {
    2598            1 :         if (pg_strcasecmp(cell->val, "publications") == 0)
    2599            1 :             dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
    2600              :         else
    2601              :         {
    2602            0 :             pg_log_error("invalid object type \"%s\" specified for %s",
    2603              :                          cell->val, "--clean");
    2604            0 :             pg_log_error_hint("The valid value is: \"%s\"", "publications");
    2605            0 :             exit(1);
    2606              :         }
    2607              :     }
    2608              : 
    2609              :     /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    2610           10 :     pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    2611           10 :     pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
    2612              : 
    2613              :     /* Rudimentary check for a data directory */
    2614           10 :     check_data_directory(subscriber_dir);
    2615              : 
    2616           10 :     dbinfos.two_phase = opt.two_phase;
    2617              : 
    2618              :     /*
    2619              :      * Store database information for publisher and subscriber. It should be
    2620              :      * called before atexit() because its return is used in the
    2621              :      * cleanup_objects_atexit().
    2622              :      */
    2623           10 :     dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
    2624              : 
    2625              :     /* Register a function to clean up objects in case of failure */
    2626           10 :     atexit(cleanup_objects_atexit);
    2627              : 
    2628              :     /*
    2629              :      * Check if the subscriber data directory has the same system identifier
    2630              :      * than the publisher data directory.
    2631              :      */
    2632           10 :     pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
    2633           10 :     sub_sysid = get_standby_sysid(subscriber_dir);
    2634           10 :     if (pub_sysid != sub_sysid)
    2635            1 :         pg_fatal("subscriber data directory is not a copy of the source database cluster");
    2636              : 
    2637              :     /* Subscriber PID file */
    2638            9 :     snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
    2639              : 
    2640              :     /*
    2641              :      * The standby server must not be running. If the server is started under
    2642              :      * service manager and pg_createsubscriber stops it, the service manager
    2643              :      * might react to this action and start the server again. Therefore,
    2644              :      * refuse to proceed if the server is running to avoid possible failures.
    2645              :      */
    2646            9 :     if (stat(pidfile, &statbuf) == 0)
    2647              :     {
    2648            1 :         pg_log_error("standby server is running");
    2649            1 :         pg_log_error_hint("Stop the standby server and try again.");
    2650            1 :         exit(1);
    2651              :     }
    2652              : 
    2653              :     /*
    2654              :      * Start a short-lived standby server with temporary parameters (provided
    2655              :      * by command-line options). The goal is to avoid connections during the
    2656              :      * transformation steps.
    2657              :      */
    2658            8 :     pg_log_info("starting the standby server with command-line options");
    2659            8 :     start_standby_server(&opt, true, false);
    2660              : 
    2661              :     /* Check if the standby server is ready for logical replication */
    2662            8 :     check_subscriber(dbinfos.dbinfo);
    2663              : 
    2664              :     /* Check if the primary server is ready for logical replication */
    2665            6 :     check_publisher(dbinfos.dbinfo);
    2666              : 
    2667              :     /*
    2668              :      * Stop the target server. The recovery process requires that the server
    2669              :      * reaches a consistent state before targeting the recovery stop point.
    2670              :      * Make sure a consistent state is reached (stop the target server
    2671              :      * guarantees it) *before* creating the replication slots in
    2672              :      * setup_publisher().
    2673              :      */
    2674            4 :     pg_log_info("stopping the subscriber");
    2675            4 :     stop_standby_server(subscriber_dir);
    2676              : 
    2677              :     /* Create the required objects for each database on publisher */
    2678            4 :     consistent_lsn = setup_publisher(dbinfos.dbinfo);
    2679              : 
    2680              :     /* Write the required recovery parameters */
    2681            4 :     setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
    2682              : 
    2683              :     /*
    2684              :      * Start subscriber so the recovery parameters will take effect. Wait
    2685              :      * until accepting connections. We don't want to start logical replication
    2686              :      * during setup.
    2687              :      */
    2688            4 :     pg_log_info("starting the subscriber");
    2689            4 :     start_standby_server(&opt, true, true);
    2690              : 
    2691              :     /* Waiting the subscriber to be promoted */
    2692            4 :     wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
    2693              : 
    2694              :     /*
    2695              :      * Create the subscription for each database on subscriber. It does not
    2696              :      * enable it immediately because it needs to adjust the replication start
    2697              :      * point to the LSN reported by setup_publisher().  It also cleans up
    2698              :      * publications created by this tool and replication to the standby.
    2699              :      */
    2700            4 :     setup_subscriber(dbinfos.dbinfo, consistent_lsn);
    2701              : 
    2702              :     /* Remove primary_slot_name if it exists on primary */
    2703            4 :     drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
    2704              : 
    2705              :     /* Remove failover replication slots if they exist on subscriber */
    2706            4 :     drop_failover_replication_slots(dbinfos.dbinfo);
    2707              : 
    2708              :     /* Stop the subscriber */
    2709            4 :     pg_log_info("stopping the subscriber");
    2710            4 :     stop_standby_server(subscriber_dir);
    2711              : 
    2712              :     /* Change system identifier from subscriber */
    2713            4 :     modify_subscriber_sysid(&opt);
    2714              : 
    2715            4 :     success = true;
    2716              : 
    2717            4 :     pg_log_info("Done!");
    2718              : 
    2719            4 :     return 0;
    2720              : }
        

Generated by: LCOV version 2.0-1