LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_createsubscriber.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.4 % 984 821
Test Date: 2026-03-24 01:16:09 Functions: 100.0 % 42 42
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_createsubscriber.c
       4              :  *    Create a new logical replica from a standby server
       5              :  *
       6              :  * Copyright (c) 2024-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/bin/pg_basebackup/pg_createsubscriber.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : 
      14              : #include "postgres_fe.h"
      15              : 
      16              : #include <sys/stat.h>
      17              : #include <sys/time.h>
      18              : #include <sys/wait.h>
      19              : #include <time.h>
      20              : 
      21              : #include "common/connect.h"
      22              : #include "common/controldata_utils.h"
      23              : #include "common/file_utils.h"
      24              : #include "common/logging.h"
      25              : #include "common/pg_prng.h"
      26              : #include "common/restricted_token.h"
      27              : #include "datatype/timestamp.h"
      28              : #include "fe_utils/recovery_gen.h"
      29              : #include "fe_utils/simple_list.h"
      30              : #include "fe_utils/string_utils.h"
      31              : #include "fe_utils/version.h"
      32              : #include "getopt_long.h"
      33              : 
      34              : #define DEFAULT_SUB_PORT    "50432"
      35              : #define OBJECTTYPE_PUBLICATIONS  0x0001
      36              : 
      37              : /*
      38              :  * Configuration files for recovery parameters.
      39              :  *
      40              :  * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
      41              :  * the server through an include_if_exists in postgresql.auto.conf.
      42              :  *
      43              :  * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
      44              :  * so as the recovery parameters set by this tool never take effect on node
      45              :  * restart.  The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
      46              :  * debugging.
      47              :  */
      48              : #define PG_AUTOCONF_FILENAME        "postgresql.auto.conf"
      49              : #define INCLUDED_CONF_FILE          "pg_createsubscriber.conf"
      50              : #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
      51              : 
      52              : /* Command-line options */
      53              : struct CreateSubscriberOptions
      54              : {
      55              :     char       *config_file;    /* configuration file */
      56              :     char       *pub_conninfo_str;   /* publisher connection string */
      57              :     char       *socket_dir;     /* directory for Unix-domain socket, if any */
      58              :     char       *sub_port;       /* subscriber port number */
      59              :     const char *sub_username;   /* subscriber username */
      60              :     bool        two_phase;      /* enable-two-phase option */
      61              :     SimpleStringList database_names;    /* list of database names */
      62              :     SimpleStringList pub_names; /* list of publication names */
      63              :     SimpleStringList sub_names; /* list of subscription names */
      64              :     SimpleStringList replslot_names;    /* list of replication slot names */
      65              :     int         recovery_timeout;   /* stop recovery after this time */
      66              :     bool        all_dbs;        /* all option */
      67              :     SimpleStringList objecttypes_to_clean;  /* list of object types to cleanup */
      68              : };
      69              : 
      70              : /* per-database publication/subscription info */
      71              : struct LogicalRepInfo
      72              : {
      73              :     char       *dbname;         /* database name */
      74              :     char       *pubconninfo;    /* publisher connection string */
      75              :     char       *subconninfo;    /* subscriber connection string */
      76              :     char       *pubname;        /* publication name */
      77              :     char       *subname;        /* subscription name */
      78              :     char       *replslotname;   /* replication slot name */
      79              : 
      80              :     bool        made_replslot;  /* replication slot was created */
      81              :     bool        made_publication;   /* publication was created */
      82              : };
      83              : 
      84              : /*
      85              :  * Information shared across all the databases (or publications and
      86              :  * subscriptions).
      87              :  */
      88              : struct LogicalRepInfos
      89              : {
      90              :     struct LogicalRepInfo *dbinfo;
      91              :     bool        two_phase;      /* enable-two-phase option */
      92              :     bits32      objecttypes_to_clean;   /* flags indicating which object types
      93              :                                          * to clean up on subscriber */
      94              : };
      95              : 
      96              : static void cleanup_objects_atexit(void);
      97              : static void usage(void);
      98              : static char *get_base_conninfo(const char *conninfo, char **dbname);
      99              : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
     100              : static char *get_exec_path(const char *argv0, const char *progname);
     101              : static void check_data_directory(const char *datadir);
     102              : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
     103              : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     104              :                                                  const char *pub_base_conninfo,
     105              :                                                  const char *sub_base_conninfo);
     106              : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
     107              : static void disconnect_database(PGconn *conn, bool exit_on_error);
     108              : static uint64 get_primary_sysid(const char *conninfo);
     109              : static uint64 get_standby_sysid(const char *datadir);
     110              : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
     111              : static bool server_is_in_recovery(PGconn *conn);
     112              : static char *generate_object_name(PGconn *conn);
     113              : static void check_publisher(const struct LogicalRepInfo *dbinfo);
     114              : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
     115              : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
     116              : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
     117              :                              const char *consistent_lsn);
     118              : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
     119              :                            const char *lsn);
     120              : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
     121              :                                           const char *slotname);
     122              : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
     123              : static char *create_logical_replication_slot(PGconn *conn,
     124              :                                              struct LogicalRepInfo *dbinfo);
     125              : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
     126              :                                   const char *slot_name);
     127              : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
     128              : static void start_standby_server(const struct CreateSubscriberOptions *opt,
     129              :                                  bool restricted_access,
     130              :                                  bool restrict_logical_worker);
     131              : static void stop_standby_server(const char *datadir);
     132              : static void wait_for_end_recovery(const char *conninfo,
     133              :                                   const struct CreateSubscriberOptions *opt);
     134              : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     135              : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
     136              : static void drop_publication(PGconn *conn, const char *pubname,
     137              :                              const char *dbname, bool *made_publication);
     138              : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
     139              : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     140              : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
     141              :                                      const char *lsn);
     142              : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     143              : static void check_and_drop_existing_subscriptions(PGconn *conn,
     144              :                                                   const struct LogicalRepInfo *dbinfo);
     145              : static void drop_existing_subscription(PGconn *conn, const char *subname,
     146              :                                        const char *dbname);
     147              : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
     148              :                                     bool dbnamespecified);
     149              : static void report_createsub_log(enum pg_log_level, enum pg_log_part,
     150              :                                  const char *pg_restrict fmt,...)
     151              :             pg_attribute_printf(3, 4);
     152              : pg_noreturn static void report_createsub_fatal(const char *pg_restrict fmt,...)
     153              :             pg_attribute_printf(1, 2);
     154              : 
     155              : #define WAIT_INTERVAL   1       /* 1 second */
     156              : 
     157              : static const char *progname;
     158              : 
     159              : static char *primary_slot_name = NULL;
     160              : static bool dry_run = false;
     161              : 
     162              : static bool success = false;
     163              : 
     164              : static struct LogicalRepInfos dbinfos;
     165              : static int  num_dbs = 0;        /* number of specified databases */
     166              : static int  num_pubs = 0;       /* number of specified publications */
     167              : static int  num_subs = 0;       /* number of specified subscriptions */
     168              : static int  num_replslots = 0;  /* number of specified replication slots */
     169              : 
     170              : static pg_prng_state prng_state;
     171              : 
     172              : static char *pg_ctl_path = NULL;
     173              : static char *pg_resetwal_path = NULL;
     174              : 
     175              : /* standby / subscriber data directory */
     176              : static char *subscriber_dir = NULL;
     177              : 
     178              : static bool recovery_ended = false;
     179              : static bool standby_running = false;
     180              : static bool recovery_params_set = false;
     181              : 
     182              : /*
     183              :  * Report a message with a given log level
     184              :  */
     185              : static void
     186          482 : report_createsub_log(enum pg_log_level level, enum pg_log_part part,
     187              :                      const char *pg_restrict fmt,...)
     188              : {
     189              :     va_list     args;
     190              : 
     191          482 :     va_start(args, fmt);
     192              : 
     193          482 :     pg_log_generic_v(level, part, fmt, args);
     194              : 
     195          482 :     va_end(args);
     196          482 : }
     197              : 
     198              : /*
     199              :  * Report a fatal error and exit
     200              :  */
     201              : static void
     202            3 : report_createsub_fatal(const char *pg_restrict fmt,...)
     203              : {
     204              :     va_list     args;
     205              : 
     206            3 :     va_start(args, fmt);
     207              : 
     208            3 :     pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, args);
     209              : 
     210            3 :     va_end(args);
     211              : 
     212            3 :     exit(1);
     213              : }
     214              : 
     215              : /*
     216              :  * Clean up objects created by pg_createsubscriber.
     217              :  *
     218              :  * Publications and replication slots are created on the primary.  Depending
     219              :  * on the step where it failed, already-created objects should be removed if
     220              :  * possible (sometimes this won't work due to a connection issue).
     221              :  * There is no cleanup on the target server *after* its promotion, because any
     222              :  * failure at this point means recreating the physical replica and starting
     223              :  * again.
     224              :  *
     225              :  * The recovery configuration is always removed, by renaming the included
     226              :  * configuration file out of the way.
     227              :  */
     228              : static void
     229           10 : cleanup_objects_atexit(void)
     230              : {
     231              :     /* Rename the included configuration file, if necessary. */
     232           10 :     if (recovery_params_set)
     233              :     {
     234              :         char        conf_filename[MAXPGPATH];
     235              :         char        conf_filename_disabled[MAXPGPATH];
     236              : 
     237            1 :         snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
     238              :                  INCLUDED_CONF_FILE);
     239            1 :         snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
     240              :                  INCLUDED_CONF_FILE_DISABLED);
     241              : 
     242            1 :         if (durable_rename(conf_filename, conf_filename_disabled) != 0)
     243              :         {
     244              :             /* durable_rename() has already logged something. */
     245            0 :             report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
     246              :                                  "A manual removal of the recovery parameters may be required.");
     247              :         }
     248              :     }
     249              : 
     250           10 :     if (success)
     251            4 :         return;
     252              : 
     253              :     /*
     254              :      * If the server is promoted, there is no way to use the current setup
     255              :      * again. Warn the user that a new replication setup should be done before
     256              :      * trying again.
     257              :      */
     258            6 :     if (recovery_ended)
     259              :     {
     260            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
     261              :                              "failed after the end of recovery");
     262            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
     263              :                              "The target server cannot be used as a physical replica anymore.  "
     264              :                              "You must recreate the physical replica before continuing.");
     265              :     }
     266              : 
     267           18 :     for (int i = 0; i < num_dbs; i++)
     268              :     {
     269           12 :         struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
     270              : 
     271           12 :         if (dbinfo->made_publication || dbinfo->made_replslot)
     272              :         {
     273              :             PGconn     *conn;
     274              : 
     275            0 :             conn = connect_database(dbinfo->pubconninfo, false);
     276            0 :             if (conn != NULL)
     277              :             {
     278            0 :                 if (dbinfo->made_publication)
     279            0 :                     drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
     280              :                                      &dbinfo->made_publication);
     281            0 :                 if (dbinfo->made_replslot)
     282            0 :                     drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
     283            0 :                 disconnect_database(conn, false);
     284              :             }
     285              :             else
     286              :             {
     287              :                 /*
     288              :                  * If a connection could not be established, inform the user
     289              :                  * that some objects were left on primary and should be
     290              :                  * removed before trying again.
     291              :                  */
     292            0 :                 if (dbinfo->made_publication)
     293              :                 {
     294            0 :                     report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
     295              :                                          "publication \"%s\" created in database \"%s\" on primary was left behind",
     296              :                                          dbinfo->pubname,
     297              :                                          dbinfo->dbname);
     298            0 :                     report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
     299              :                                          "Drop this publication before trying again.");
     300              :                 }
     301            0 :                 if (dbinfo->made_replslot)
     302              :                 {
     303            0 :                     report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
     304              :                                          "replication slot \"%s\" created in database \"%s\" on primary was left behind",
     305              :                                          dbinfo->replslotname,
     306              :                                          dbinfo->dbname);
     307            0 :                     report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
     308              :                                          "Drop this replication slot soon to avoid retention of WAL files.");
     309              :                 }
     310              :             }
     311              :         }
     312              :     }
     313              : 
     314            6 :     if (standby_running)
     315            4 :         stop_standby_server(subscriber_dir);
     316              : }
     317              : 
     318              : static void
     319            1 : usage(void)
     320              : {
     321            1 :     printf(_("%s creates a new logical replica from a standby server.\n\n"),
     322              :            progname);
     323            1 :     printf(_("Usage:\n"));
     324            1 :     printf(_("  %s [OPTION]...\n"), progname);
     325            1 :     printf(_("\nOptions:\n"));
     326            1 :     printf(_("  -a, --all                       create subscriptions for all databases except template\n"
     327              :              "                                  databases and databases that don't allow connections\n"));
     328            1 :     printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
     329            1 :     printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
     330            1 :     printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
     331            1 :     printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
     332            1 :     printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
     333            1 :     printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
     334            1 :     printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
     335            1 :     printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
     336            1 :     printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
     337            1 :     printf(_("  -v, --verbose                   output verbose messages\n"));
     338            1 :     printf(_("      --clean=OBJECTTYPE          drop all objects of the specified type from specified\n"
     339              :              "                                  databases on the subscriber; accepts: \"%s\"\n"), "publications");
     340            1 :     printf(_("      --config-file=FILENAME      use specified main server configuration\n"
     341              :              "                                  file when running target cluster\n"));
     342            1 :     printf(_("      --publication=NAME          publication name\n"));
     343            1 :     printf(_("      --replication-slot=NAME     replication slot name\n"));
     344            1 :     printf(_("      --subscription=NAME         subscription name\n"));
     345            1 :     printf(_("  -V, --version                   output version information, then exit\n"));
     346            1 :     printf(_("  -?, --help                      show this help, then exit\n"));
     347            1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     348            1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     349            1 : }
     350              : 
     351              : /*
     352              :  * Subroutine to append "keyword=value" to a connection string,
     353              :  * with proper quoting of the value.  (We assume keywords don't need that.)
     354              :  */
     355              : static void
     356          107 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
     357              : {
     358          107 :     if (buf->len > 0)
     359           79 :         appendPQExpBufferChar(buf, ' ');
     360          107 :     appendPQExpBufferStr(buf, keyword);
     361          107 :     appendPQExpBufferChar(buf, '=');
     362          107 :     appendConnStrVal(buf, val);
     363          107 : }
     364              : 
     365              : /*
     366              :  * Validate a connection string. Returns a base connection string that is a
     367              :  * connection string without a database name.
     368              :  *
     369              :  * Since we might process multiple databases, each database name will be
     370              :  * appended to this base connection string to provide a final connection
     371              :  * string. If the second argument (dbname) is not null, returns dbname if the
     372              :  * provided connection string contains it.
     373              :  *
     374              :  * It is the caller's responsibility to free the returned connection string and
     375              :  * dbname.
     376              :  */
     377              : static char *
     378           14 : get_base_conninfo(const char *conninfo, char **dbname)
     379              : {
     380              :     PQExpBuffer buf;
     381              :     PQconninfoOption *conn_opts;
     382              :     PQconninfoOption *conn_opt;
     383           14 :     char       *errmsg = NULL;
     384              :     char       *ret;
     385              : 
     386           14 :     conn_opts = PQconninfoParse(conninfo, &errmsg);
     387           14 :     if (conn_opts == NULL)
     388              :     {
     389            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     390              :                              "could not parse connection string: %s", errmsg);
     391            0 :         PQfreemem(errmsg);
     392            0 :         return NULL;
     393              :     }
     394              : 
     395           14 :     buf = createPQExpBuffer();
     396          728 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     397              :     {
     398          714 :         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     399              :         {
     400           33 :             if (strcmp(conn_opt->keyword, "dbname") == 0)
     401              :             {
     402            9 :                 if (dbname)
     403            9 :                     *dbname = pg_strdup(conn_opt->val);
     404            9 :                 continue;
     405              :             }
     406           24 :             appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
     407              :         }
     408              :     }
     409              : 
     410           14 :     ret = pg_strdup(buf->data);
     411              : 
     412           14 :     destroyPQExpBuffer(buf);
     413           14 :     PQconninfoFree(conn_opts);
     414              : 
     415           14 :     return ret;
     416              : }
     417              : 
     418              : /*
     419              :  * Build a subscriber connection string. Only a few parameters are supported
     420              :  * since it starts a server with restricted access.
     421              :  */
     422              : static char *
     423           14 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
     424              : {
     425           14 :     PQExpBuffer buf = createPQExpBuffer();
     426              :     char       *ret;
     427              : 
     428           14 :     appendConnStrItem(buf, "port", opt->sub_port);
     429              : #if !defined(WIN32)
     430           14 :     appendConnStrItem(buf, "host", opt->socket_dir);
     431              : #endif
     432           14 :     if (opt->sub_username != NULL)
     433            0 :         appendConnStrItem(buf, "user", opt->sub_username);
     434           14 :     appendConnStrItem(buf, "fallback_application_name", progname);
     435              : 
     436           14 :     ret = pg_strdup(buf->data);
     437              : 
     438           14 :     destroyPQExpBuffer(buf);
     439              : 
     440           14 :     return ret;
     441              : }
     442              : 
     443              : /*
     444              :  * Verify if a PostgreSQL binary (progname) is available in the same directory as
     445              :  * pg_createsubscriber and it has the same version.  It returns the absolute
     446              :  * path of the progname.
     447              :  */
     448              : static char *
     449           20 : get_exec_path(const char *argv0, const char *progname)
     450              : {
     451              :     char       *versionstr;
     452              :     char       *exec_path;
     453              :     int         ret;
     454              : 
     455           20 :     versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
     456           20 :     exec_path = pg_malloc(MAXPGPATH);
     457           20 :     ret = find_other_exec(argv0, progname, versionstr, exec_path);
     458              : 
     459           20 :     if (ret < 0)
     460              :     {
     461              :         char        full_path[MAXPGPATH];
     462              : 
     463            0 :         if (find_my_exec(argv0, full_path) < 0)
     464            0 :             strlcpy(full_path, progname, sizeof(full_path));
     465              : 
     466            0 :         if (ret == -1)
     467            0 :             report_createsub_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
     468              :                                    progname, "pg_createsubscriber", full_path);
     469              :         else
     470            0 :             report_createsub_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
     471              :                                    progname, full_path, "pg_createsubscriber");
     472              :     }
     473              : 
     474           20 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
     475              :                          "%s path is:  %s", progname, exec_path);
     476              : 
     477           20 :     return exec_path;
     478              : }
     479              : 
     480              : /*
     481              :  * Is it a cluster directory? These are preliminary checks. It is far from
     482              :  * making an accurate check. If it is not a clone from the publisher, it will
     483              :  * eventually fail in a future step.
     484              :  */
     485              : static void
     486           10 : check_data_directory(const char *datadir)
     487              : {
     488              :     struct stat statbuf;
     489              :     uint32      major_version;
     490              :     char       *version_str;
     491              : 
     492           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     493              :                          "checking if directory \"%s\" is a cluster data directory",
     494              :                          datadir);
     495              : 
     496           10 :     if (stat(datadir, &statbuf) != 0)
     497              :     {
     498            0 :         if (errno == ENOENT)
     499            0 :             report_createsub_fatal("data directory \"%s\" does not exist", datadir);
     500              :         else
     501            0 :             report_createsub_fatal("could not access directory \"%s\": %m", datadir);
     502              :     }
     503              : 
     504              :     /*
     505              :      * Retrieve the contents of this cluster's PG_VERSION.  We require
     506              :      * compatibility with the same major version as the one this tool is
     507              :      * compiled with.
     508              :      */
     509           10 :     major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
     510           10 :     if (major_version != PG_MAJORVERSION_NUM)
     511              :     {
     512            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     513              :                              "data directory is of wrong version");
     514            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
     515              :                              "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
     516              :                              "PG_VERSION", version_str, PG_MAJORVERSION);
     517            0 :         exit(1);
     518              :     }
     519           10 : }
     520              : 
     521              : /*
     522              :  * Append database name into a base connection string.
     523              :  *
     524              :  * dbname is the only parameter that changes so it is not included in the base
     525              :  * connection string. This function concatenates dbname to build a "real"
     526              :  * connection string.
     527              :  */
     528              : static char *
     529           41 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
     530              : {
     531           41 :     PQExpBuffer buf = createPQExpBuffer();
     532              :     char       *ret;
     533              : 
     534              :     Assert(conninfo != NULL);
     535              : 
     536           41 :     appendPQExpBufferStr(buf, conninfo);
     537           41 :     appendConnStrItem(buf, "dbname", dbname);
     538              : 
     539           41 :     ret = pg_strdup(buf->data);
     540           41 :     destroyPQExpBuffer(buf);
     541              : 
     542           41 :     return ret;
     543              : }
     544              : 
     545              : /*
     546              :  * Store publication and subscription information.
     547              :  *
     548              :  * If publication, replication slot and subscription names were specified,
     549              :  * store it here. Otherwise, a generated name will be assigned to the object in
     550              :  * setup_publisher().
     551              :  */
     552              : static struct LogicalRepInfo *
     553           10 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     554              :                    const char *pub_base_conninfo,
     555              :                    const char *sub_base_conninfo)
     556              : {
     557              :     struct LogicalRepInfo *dbinfo;
     558           10 :     SimpleStringListCell *pubcell = NULL;
     559           10 :     SimpleStringListCell *subcell = NULL;
     560           10 :     SimpleStringListCell *replslotcell = NULL;
     561           10 :     int         i = 0;
     562              : 
     563           10 :     dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
     564              : 
     565           10 :     if (num_pubs > 0)
     566            2 :         pubcell = opt->pub_names.head;
     567           10 :     if (num_subs > 0)
     568            1 :         subcell = opt->sub_names.head;
     569           10 :     if (num_replslots > 0)
     570            2 :         replslotcell = opt->replslot_names.head;
     571              : 
     572           30 :     for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
     573              :     {
     574              :         char       *conninfo;
     575              : 
     576              :         /* Fill publisher attributes */
     577           20 :         conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
     578           20 :         dbinfo[i].pubconninfo = conninfo;
     579           20 :         dbinfo[i].dbname = cell->val;
     580           20 :         if (num_pubs > 0)
     581            4 :             dbinfo[i].pubname = pubcell->val;
     582              :         else
     583           16 :             dbinfo[i].pubname = NULL;
     584           20 :         if (num_replslots > 0)
     585            3 :             dbinfo[i].replslotname = replslotcell->val;
     586              :         else
     587           17 :             dbinfo[i].replslotname = NULL;
     588           20 :         dbinfo[i].made_replslot = false;
     589           20 :         dbinfo[i].made_publication = false;
     590              :         /* Fill subscriber attributes */
     591           20 :         conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
     592           20 :         dbinfo[i].subconninfo = conninfo;
     593           20 :         if (num_subs > 0)
     594            2 :             dbinfo[i].subname = subcell->val;
     595              :         else
     596           18 :             dbinfo[i].subname = NULL;
     597              :         /* Other fields will be filled later */
     598              : 
     599           37 :         report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
     600              :                              "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
     601           20 :                              dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
     602            3 :                              dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
     603           20 :                              dbinfo[i].pubconninfo);
     604           40 :         report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
     605              :                              "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
     606            2 :                              dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
     607           20 :                              dbinfo[i].subconninfo,
     608           20 :                              dbinfos.two_phase ? "true" : "false");
     609              : 
     610           20 :         if (num_pubs > 0)
     611            4 :             pubcell = pubcell->next;
     612           20 :         if (num_subs > 0)
     613            2 :             subcell = subcell->next;
     614           20 :         if (num_replslots > 0)
     615            3 :             replslotcell = replslotcell->next;
     616              : 
     617           20 :         i++;
     618              :     }
     619              : 
     620           10 :     return dbinfo;
     621              : }
     622              : 
     623              : /*
     624              :  * Open a new connection. If exit_on_error is true, it has an undesired
     625              :  * condition and it should exit immediately.
     626              :  */
     627              : static PGconn *
     628           57 : connect_database(const char *conninfo, bool exit_on_error)
     629              : {
     630              :     PGconn     *conn;
     631              :     PGresult   *res;
     632              : 
     633           57 :     conn = PQconnectdb(conninfo);
     634           57 :     if (PQstatus(conn) != CONNECTION_OK)
     635              :     {
     636            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     637              :                              "connection to database failed: %s",
     638              :                              PQerrorMessage(conn));
     639            0 :         PQfinish(conn);
     640              : 
     641            0 :         if (exit_on_error)
     642            0 :             exit(1);
     643            0 :         return NULL;
     644              :     }
     645              : 
     646              :     /* Secure search_path */
     647           57 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     648           57 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     649              :     {
     650            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     651              :                              "could not clear \"search_path\": %s",
     652              :                              PQresultErrorMessage(res));
     653            0 :         PQclear(res);
     654            0 :         PQfinish(conn);
     655              : 
     656            0 :         if (exit_on_error)
     657            0 :             exit(1);
     658            0 :         return NULL;
     659              :     }
     660           57 :     PQclear(res);
     661              : 
     662           57 :     return conn;
     663              : }
     664              : 
     665              : /*
     666              :  * Close the connection. If exit_on_error is true, it has an undesired
     667              :  * condition and it should exit immediately.
     668              :  */
     669              : static void
     670           57 : disconnect_database(PGconn *conn, bool exit_on_error)
     671              : {
     672              :     Assert(conn != NULL);
     673              : 
     674           57 :     PQfinish(conn);
     675              : 
     676           57 :     if (exit_on_error)
     677            2 :         exit(1);
     678           55 : }
     679              : 
     680              : /*
     681              :  * Obtain the system identifier using the provided connection. It will be used
     682              :  * to compare if a data directory is a clone of another one.
     683              :  */
     684              : static uint64
     685           10 : get_primary_sysid(const char *conninfo)
     686              : {
     687              :     PGconn     *conn;
     688              :     PGresult   *res;
     689              :     uint64      sysid;
     690              : 
     691           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     692              :                          "getting system identifier from publisher");
     693              : 
     694           10 :     conn = connect_database(conninfo, true);
     695              : 
     696           10 :     res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
     697           10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     698              :     {
     699            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     700              :                              "could not get system identifier: %s",
     701              :                              PQresultErrorMessage(res));
     702            0 :         disconnect_database(conn, true);
     703              :     }
     704           10 :     if (PQntuples(res) != 1)
     705              :     {
     706            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     707              :                              "could not get system identifier: got %d rows, expected %d row",
     708              :                              PQntuples(res), 1);
     709            0 :         disconnect_database(conn, true);
     710              :     }
     711              : 
     712           10 :     sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
     713              : 
     714           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     715              :                          "system identifier is %" PRIu64 " on publisher", sysid);
     716              : 
     717           10 :     PQclear(res);
     718           10 :     disconnect_database(conn, false);
     719              : 
     720           10 :     return sysid;
     721              : }
     722              : 
     723              : /*
     724              :  * Obtain the system identifier from control file. It will be used to compare
     725              :  * if a data directory is a clone of another one. This routine is used locally
     726              :  * and avoids a connection.
     727              :  */
     728              : static uint64
     729           10 : get_standby_sysid(const char *datadir)
     730              : {
     731              :     ControlFileData *cf;
     732              :     bool        crc_ok;
     733              :     uint64      sysid;
     734              : 
     735           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     736              :                          "getting system identifier from subscriber");
     737              : 
     738           10 :     cf = get_controlfile(datadir, &crc_ok);
     739           10 :     if (!crc_ok)
     740            0 :         report_createsub_fatal("control file appears to be corrupt");
     741              : 
     742           10 :     sysid = cf->system_identifier;
     743              : 
     744           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     745              :                          "system identifier is %" PRIu64 " on subscriber", sysid);
     746              : 
     747           10 :     pg_free(cf);
     748              : 
     749           10 :     return sysid;
     750              : }
     751              : 
     752              : /*
     753              :  * Modify the system identifier. Since a standby server preserves the system
     754              :  * identifier, it makes sense to change it to avoid situations in which WAL
     755              :  * files from one of the systems might be used in the other one.
     756              :  */
     757              : static void
     758            4 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
     759              : {
     760              :     ControlFileData *cf;
     761              :     bool        crc_ok;
     762              :     struct timeval tv;
     763              : 
     764              :     char       *cmd_str;
     765              : 
     766            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     767              :                          "modifying system identifier of subscriber");
     768              : 
     769            4 :     cf = get_controlfile(subscriber_dir, &crc_ok);
     770            4 :     if (!crc_ok)
     771            0 :         report_createsub_fatal("control file appears to be corrupt");
     772              : 
     773              :     /*
     774              :      * Select a new system identifier.
     775              :      *
     776              :      * XXX this code was extracted from BootStrapXLOG().
     777              :      */
     778            4 :     gettimeofday(&tv, NULL);
     779            4 :     cf->system_identifier = ((uint64) tv.tv_sec) << 32;
     780            4 :     cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
     781            4 :     cf->system_identifier |= getpid() & 0xFFF;
     782              : 
     783            4 :     if (dry_run)
     784            3 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     785              :                              "dry-run: would set system identifier to %" PRIu64 " on subscriber",
     786              :                              cf->system_identifier);
     787              :     else
     788              :     {
     789            1 :         update_controlfile(subscriber_dir, cf, true);
     790            1 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     791              :                              "system identifier is %" PRIu64 " on subscriber",
     792              :                              cf->system_identifier);
     793              :     }
     794              : 
     795            4 :     if (dry_run)
     796            3 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     797              :                              "dry-run: would run pg_resetwal on the subscriber");
     798              :     else
     799            1 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     800              :                              "running pg_resetwal on the subscriber");
     801              : 
     802            4 :     cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
     803              :                        subscriber_dir, DEVNULL);
     804              : 
     805            4 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
     806              :                          "pg_resetwal command is: %s", cmd_str);
     807              : 
     808            4 :     if (!dry_run)
     809              :     {
     810            1 :         int         rc = system(cmd_str);
     811              : 
     812            1 :         if (rc == 0)
     813            1 :             report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     814              :                                  "successfully reset WAL on the subscriber");
     815              :         else
     816            0 :             report_createsub_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
     817              :     }
     818              : 
     819            4 :     pg_free(cf);
     820            4 : }
     821              : 
     822              : /*
     823              :  * Generate an object name using a prefix, database oid and a random integer.
     824              :  * It is used in case the user does not specify an object name (publication,
     825              :  * subscription, replication slot).
     826              :  */
     827              : static char *
     828            8 : generate_object_name(PGconn *conn)
     829              : {
     830              :     PGresult   *res;
     831              :     Oid         oid;
     832              :     uint32      rand;
     833              :     char       *objname;
     834              : 
     835            8 :     res = PQexec(conn,
     836              :                  "SELECT oid FROM pg_catalog.pg_database "
     837              :                  "WHERE datname = pg_catalog.current_database()");
     838            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     839              :     {
     840            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     841              :                              "could not obtain database OID: %s",
     842              :                              PQresultErrorMessage(res));
     843            0 :         disconnect_database(conn, true);
     844              :     }
     845              : 
     846            8 :     if (PQntuples(res) != 1)
     847              :     {
     848            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     849              :                              "could not obtain database OID: got %d rows, expected %d row",
     850              :                              PQntuples(res), 1);
     851            0 :         disconnect_database(conn, true);
     852              :     }
     853              : 
     854              :     /* Database OID */
     855            8 :     oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
     856              : 
     857            8 :     PQclear(res);
     858              : 
     859              :     /* Random unsigned integer */
     860            8 :     rand = pg_prng_uint32(&prng_state);
     861              : 
     862              :     /*
     863              :      * Build the object name. The name must not exceed NAMEDATALEN - 1. This
     864              :      * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
     865              :      * '\0').
     866              :      */
     867            8 :     objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
     868              : 
     869            8 :     return objname;
     870              : }
     871              : 
     872              : /*
     873              :  * Does the publication exist in the specified database?
     874              :  */
     875              : static bool
     876            8 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
     877              : {
     878            8 :     PQExpBuffer str = createPQExpBuffer();
     879              :     PGresult   *res;
     880            8 :     bool        found = false;
     881            8 :     char       *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
     882              : 
     883            8 :     appendPQExpBuffer(str,
     884              :                       "SELECT 1 FROM pg_catalog.pg_publication "
     885              :                       "WHERE pubname = %s",
     886              :                       pubname_esc);
     887            8 :     res = PQexec(conn, str->data);
     888            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     889              :     {
     890            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     891              :                              "could not find publication \"%s\" in database \"%s\": %s",
     892              :                              pubname, dbname, PQerrorMessage(conn));
     893            0 :         disconnect_database(conn, true);
     894              :     }
     895              : 
     896            8 :     if (PQntuples(res) == 1)
     897            1 :         found = true;
     898              : 
     899            8 :     PQclear(res);
     900            8 :     PQfreemem(pubname_esc);
     901            8 :     destroyPQExpBuffer(str);
     902              : 
     903            8 :     return found;
     904              : }
     905              : 
     906              : /*
     907              :  * Create the publications and replication slots in preparation for logical
     908              :  * replication. Returns the LSN from latest replication slot. It will be the
     909              :  * replication start point that is used to adjust the subscriptions (see
     910              :  * set_replication_progress).
     911              :  */
     912              : static char *
     913            4 : setup_publisher(struct LogicalRepInfo *dbinfo)
     914              : {
     915            4 :     char       *lsn = NULL;
     916              : 
     917            4 :     pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
     918              : 
     919           12 :     for (int i = 0; i < num_dbs; i++)
     920              :     {
     921              :         PGconn     *conn;
     922            8 :         char       *genname = NULL;
     923              : 
     924            8 :         conn = connect_database(dbinfo[i].pubconninfo, true);
     925              : 
     926              :         /*
     927              :          * If an object name was not specified as command-line options, assign
     928              :          * a generated object name. The replication slot has a different rule.
     929              :          * The subscription name is assigned to the replication slot name if
     930              :          * no replication slot is specified. It follows the same rule as
     931              :          * CREATE SUBSCRIPTION.
     932              :          */
     933            8 :         if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
     934            8 :             genname = generate_object_name(conn);
     935            8 :         if (num_pubs == 0)
     936            4 :             dbinfo[i].pubname = pg_strdup(genname);
     937            8 :         if (num_subs == 0)
     938            6 :             dbinfo[i].subname = pg_strdup(genname);
     939            8 :         if (num_replslots == 0)
     940            5 :             dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
     941              : 
     942            8 :         if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
     943              :         {
     944              :             /* Reuse existing publication on publisher. */
     945            1 :             report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     946              :                                  "use existing publication \"%s\" in database \"%s\"",
     947            1 :                                  dbinfo[i].pubname, dbinfo[i].dbname);
     948              :             /* Don't remove pre-existing publication if an error occurs. */
     949            1 :             dbinfo[i].made_publication = false;
     950              :         }
     951              :         else
     952              :         {
     953              :             /*
     954              :              * Create publication on publisher. This step should be executed
     955              :              * *before* promoting the subscriber to avoid any transactions
     956              :              * between consistent LSN and the new publication rows (such
     957              :              * transactions wouldn't see the new publication rows resulting in
     958              :              * an error).
     959              :              */
     960            7 :             create_publication(conn, &dbinfo[i]);
     961              :         }
     962              : 
     963              :         /* Create replication slot on publisher */
     964            8 :         if (lsn)
     965            1 :             pg_free(lsn);
     966            8 :         lsn = create_logical_replication_slot(conn, &dbinfo[i]);
     967            8 :         if (lsn == NULL && !dry_run)
     968            0 :             exit(1);
     969              : 
     970              :         /*
     971              :          * Since we are using the LSN returned by the last replication slot as
     972              :          * recovery_target_lsn, this LSN is ahead of the current WAL position
     973              :          * and the recovery waits until the publisher writes a WAL record to
     974              :          * reach the target and ends the recovery. On idle systems, this wait
     975              :          * time is unpredictable and could lead to failure in promoting the
     976              :          * subscriber. To avoid that, insert a harmless WAL record.
     977              :          */
     978            8 :         if (i == num_dbs - 1 && !dry_run)
     979              :         {
     980              :             PGresult   *res;
     981              : 
     982            1 :             res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
     983            1 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     984              :             {
     985            0 :                 report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     986              :                                      "could not write an additional WAL record: %s",
     987              :                                      PQresultErrorMessage(res));
     988            0 :                 disconnect_database(conn, true);
     989              :             }
     990            1 :             PQclear(res);
     991              :         }
     992              : 
     993            8 :         disconnect_database(conn, false);
     994              :     }
     995              : 
     996            4 :     return lsn;
     997              : }
     998              : 
     999              : /*
    1000              :  * Is recovery still in progress?
    1001              :  */
    1002              : static bool
    1003           16 : server_is_in_recovery(PGconn *conn)
    1004              : {
    1005              :     PGresult   *res;
    1006              :     int         ret;
    1007              : 
    1008           16 :     res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
    1009              : 
    1010           16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1011              :     {
    1012            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1013              :                              "could not obtain recovery progress: %s",
    1014              :                              PQresultErrorMessage(res));
    1015            0 :         disconnect_database(conn, true);
    1016              :     }
    1017              : 
    1018              : 
    1019           16 :     ret = strcmp("t", PQgetvalue(res, 0, 0));
    1020              : 
    1021           16 :     PQclear(res);
    1022              : 
    1023           16 :     return ret == 0;
    1024              : }
    1025              : 
    1026              : /*
    1027              :  * Is the primary server ready for logical replication?
    1028              :  *
    1029              :  * XXX Does it not allow a synchronous replica?
    1030              :  */
    1031              : static void
    1032            6 : check_publisher(const struct LogicalRepInfo *dbinfo)
    1033              : {
    1034              :     PGconn     *conn;
    1035              :     PGresult   *res;
    1036            6 :     bool        failed = false;
    1037              : 
    1038              :     char       *wal_level;
    1039              :     int         max_repslots;
    1040              :     int         cur_repslots;
    1041              :     int         max_walsenders;
    1042              :     int         cur_walsenders;
    1043              :     int         max_prepared_transactions;
    1044              :     char       *max_slot_wal_keep_size;
    1045              : 
    1046            6 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1047              :                          "checking settings on publisher");
    1048              : 
    1049            6 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1050              : 
    1051              :     /*
    1052              :      * If the primary server is in recovery (i.e. cascading replication),
    1053              :      * objects (publication) cannot be created because it is read only.
    1054              :      */
    1055            6 :     if (server_is_in_recovery(conn))
    1056              :     {
    1057            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1058              :                              "primary server cannot be in recovery");
    1059            1 :         disconnect_database(conn, true);
    1060              :     }
    1061              : 
    1062              :     /*------------------------------------------------------------------------
    1063              :      * Logical replication requires a few parameters to be set on publisher.
    1064              :      * Since these parameters are not a requirement for physical replication,
    1065              :      * we should check it to make sure it won't fail.
    1066              :      *
    1067              :      * - wal_level >= replica
    1068              :      * - max_replication_slots >= current + number of dbs to be converted
    1069              :      * - max_wal_senders >= current + number of dbs to be converted
    1070              :      * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
    1071              :      * -----------------------------------------------------------------------
    1072              :      */
    1073            5 :     res = PQexec(conn,
    1074              :                  "SELECT pg_catalog.current_setting('wal_level'),"
    1075              :                  " pg_catalog.current_setting('max_replication_slots'),"
    1076              :                  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
    1077              :                  " pg_catalog.current_setting('max_wal_senders'),"
    1078              :                  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
    1079              :                  " pg_catalog.current_setting('max_prepared_transactions'),"
    1080              :                  " pg_catalog.current_setting('max_slot_wal_keep_size')");
    1081              : 
    1082            5 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1083              :     {
    1084            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1085              :                              "could not obtain publisher settings: %s",
    1086              :                              PQresultErrorMessage(res));
    1087            0 :         disconnect_database(conn, true);
    1088              :     }
    1089              : 
    1090            5 :     wal_level = pg_strdup(PQgetvalue(res, 0, 0));
    1091            5 :     max_repslots = atoi(PQgetvalue(res, 0, 1));
    1092            5 :     cur_repslots = atoi(PQgetvalue(res, 0, 2));
    1093            5 :     max_walsenders = atoi(PQgetvalue(res, 0, 3));
    1094            5 :     cur_walsenders = atoi(PQgetvalue(res, 0, 4));
    1095            5 :     max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
    1096            5 :     max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
    1097              : 
    1098            5 :     PQclear(res);
    1099              : 
    1100            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1101              :                          "publisher: wal_level: %s", wal_level);
    1102            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1103              :                          "publisher: max_replication_slots: %d", max_repslots);
    1104            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1105              :                          "publisher: current replication slots: %d", cur_repslots);
    1106            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1107              :                          "publisher: max_wal_senders: %d", max_walsenders);
    1108            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1109              :                          "publisher: current wal senders: %d", cur_walsenders);
    1110            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1111              :                          "publisher: max_prepared_transactions: %d",
    1112              :                          max_prepared_transactions);
    1113            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1114              :                          "publisher: max_slot_wal_keep_size: %s",
    1115              :                          max_slot_wal_keep_size);
    1116              : 
    1117            5 :     disconnect_database(conn, false);
    1118              : 
    1119            5 :     if (strcmp(wal_level, "minimal") == 0)
    1120              :     {
    1121            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1122              :                              "publisher requires \"wal_level\" >= \"replica\"");
    1123            0 :         failed = true;
    1124              :     }
    1125              : 
    1126            5 :     if (max_repslots - cur_repslots < num_dbs)
    1127              :     {
    1128            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1129              :                              "publisher requires %d replication slots, but only %d remain",
    1130              :                              num_dbs, max_repslots - cur_repslots);
    1131            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1132              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1133              :                              "max_replication_slots", cur_repslots + num_dbs);
    1134            1 :         failed = true;
    1135              :     }
    1136              : 
    1137            5 :     if (max_walsenders - cur_walsenders < num_dbs)
    1138              :     {
    1139            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1140              :                              "publisher requires %d WAL sender processes, but only %d remain",
    1141              :                              num_dbs, max_walsenders - cur_walsenders);
    1142            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1143              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1144              :                              "max_wal_senders", cur_walsenders + num_dbs);
    1145            1 :         failed = true;
    1146              :     }
    1147              : 
    1148            5 :     if (max_prepared_transactions != 0 && !dbinfos.two_phase)
    1149              :     {
    1150            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1151              :                              "two_phase option will not be enabled for replication slots");
    1152            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_DETAIL,
    1153              :                              "Subscriptions will be created with the two_phase option disabled.  "
    1154              :                              "Prepared transactions will be replicated at COMMIT PREPARED.");
    1155            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1156              :                              "You can use the command-line option --enable-two-phase to enable two_phase.");
    1157              :     }
    1158              : 
    1159              :     /*
    1160              :      * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
    1161              :      * is set to a non-default value, it may cause replication failures due to
    1162              :      * required WAL files being prematurely removed.
    1163              :      */
    1164            5 :     if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
    1165              :     {
    1166            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1167              :                              "required WAL could be removed from the publisher");
    1168            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1169              :                              "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
    1170              :                              "max_slot_wal_keep_size");
    1171              :     }
    1172              : 
    1173            5 :     pg_free(wal_level);
    1174              : 
    1175            5 :     if (failed)
    1176            1 :         exit(1);
    1177            4 : }
    1178              : 
    1179              : /*
    1180              :  * Is the standby server ready for logical replication?
    1181              :  *
    1182              :  * XXX Does it not allow a time-delayed replica?
    1183              :  *
    1184              :  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
    1185              :  * is S, it cannot detect there is a replica (server C) because server S starts
    1186              :  * accepting only local connections and server C cannot connect to it. Hence,
    1187              :  * there is not a reliable way to provide a suitable error saying the server C
    1188              :  * will be broken at the end of this process (due to pg_resetwal).
    1189              :  */
    1190              : static void
    1191            8 : check_subscriber(const struct LogicalRepInfo *dbinfo)
    1192              : {
    1193              :     PGconn     *conn;
    1194              :     PGresult   *res;
    1195            8 :     bool        failed = false;
    1196              : 
    1197              :     int         max_lrworkers;
    1198              :     int         max_replorigins;
    1199              :     int         max_wprocs;
    1200              : 
    1201            8 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1202              :                          "checking settings on subscriber");
    1203              : 
    1204            8 :     conn = connect_database(dbinfo[0].subconninfo, true);
    1205              : 
    1206              :     /* The target server must be a standby */
    1207            8 :     if (!server_is_in_recovery(conn))
    1208              :     {
    1209            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1210              :                              "target server must be a standby");
    1211            1 :         disconnect_database(conn, true);
    1212              :     }
    1213              : 
    1214              :     /*------------------------------------------------------------------------
    1215              :      * Logical replication requires a few parameters to be set on subscriber.
    1216              :      * Since these parameters are not a requirement for physical replication,
    1217              :      * we should check it to make sure it won't fail.
    1218              :      *
    1219              :      * - max_active_replication_origins >= number of dbs to be converted
    1220              :      * - max_logical_replication_workers >= number of dbs to be converted
    1221              :      * - max_worker_processes >= 1 + number of dbs to be converted
    1222              :      *------------------------------------------------------------------------
    1223              :      */
    1224            7 :     res = PQexec(conn,
    1225              :                  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
    1226              :                  "'max_logical_replication_workers', "
    1227              :                  "'max_active_replication_origins', "
    1228              :                  "'max_worker_processes', "
    1229              :                  "'primary_slot_name') "
    1230              :                  "ORDER BY name");
    1231              : 
    1232            7 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1233              :     {
    1234            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1235              :                              "could not obtain subscriber settings: %s",
    1236              :                              PQresultErrorMessage(res));
    1237            0 :         disconnect_database(conn, true);
    1238              :     }
    1239              : 
    1240            7 :     max_replorigins = atoi(PQgetvalue(res, 0, 0));
    1241            7 :     max_lrworkers = atoi(PQgetvalue(res, 1, 0));
    1242            7 :     max_wprocs = atoi(PQgetvalue(res, 2, 0));
    1243            7 :     if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
    1244            6 :         primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
    1245              : 
    1246            7 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1247              :                          "subscriber: max_logical_replication_workers: %d",
    1248              :                          max_lrworkers);
    1249            7 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1250              :                          "subscriber: max_active_replication_origins: %d", max_replorigins);
    1251            7 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1252              :                          "subscriber: max_worker_processes: %d", max_wprocs);
    1253            7 :     if (primary_slot_name)
    1254            6 :         report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1255              :                              "subscriber: primary_slot_name: %s", primary_slot_name);
    1256              : 
    1257            7 :     PQclear(res);
    1258              : 
    1259            7 :     disconnect_database(conn, false);
    1260              : 
    1261            7 :     if (max_replorigins < num_dbs)
    1262              :     {
    1263            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1264              :                              "subscriber requires %d active replication origins, but only %d remain",
    1265              :                              num_dbs, max_replorigins);
    1266            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1267              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1268              :                              "max_active_replication_origins", num_dbs);
    1269            1 :         failed = true;
    1270              :     }
    1271              : 
    1272            7 :     if (max_lrworkers < num_dbs)
    1273              :     {
    1274            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1275              :                              "subscriber requires %d logical replication workers, but only %d remain",
    1276              :                              num_dbs, max_lrworkers);
    1277            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1278              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1279              :                              "max_logical_replication_workers", num_dbs);
    1280            1 :         failed = true;
    1281              :     }
    1282              : 
    1283            7 :     if (max_wprocs < num_dbs + 1)
    1284              :     {
    1285            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1286              :                              "subscriber requires %d worker processes, but only %d remain",
    1287              :                              num_dbs + 1, max_wprocs);
    1288            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1289              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1290              :                              "max_worker_processes", num_dbs + 1);
    1291            1 :         failed = true;
    1292              :     }
    1293              : 
    1294            7 :     if (failed)
    1295            1 :         exit(1);
    1296            6 : }
    1297              : 
    1298              : /*
    1299              :  * Drop a specified subscription. This is to avoid duplicate subscriptions on
    1300              :  * the primary (publisher node) and the newly created subscriber. We
    1301              :  * shouldn't drop the associated slot as that would be used by the publisher
    1302              :  * node.
    1303              :  */
    1304              : static void
    1305            4 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
    1306              : {
    1307            4 :     PQExpBuffer query = createPQExpBuffer();
    1308              :     PGresult   *res;
    1309              : 
    1310              :     Assert(conn != NULL);
    1311              : 
    1312              :     /*
    1313              :      * Construct a query string. These commands are allowed to be executed
    1314              :      * within a transaction.
    1315              :      */
    1316            4 :     appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
    1317              :                       subname);
    1318            4 :     appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
    1319              :                       subname);
    1320            4 :     appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
    1321              : 
    1322            4 :     if (dry_run)
    1323            3 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1324              :                              "dry-run: would drop subscription \"%s\" in database \"%s\"",
    1325              :                              subname, dbname);
    1326              :     else
    1327              :     {
    1328            1 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1329              :                              "dropping subscription \"%s\" in database \"%s\"",
    1330              :                              subname, dbname);
    1331              : 
    1332            1 :         res = PQexec(conn, query->data);
    1333              : 
    1334            1 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1335              :         {
    1336            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1337              :                                  "could not drop subscription \"%s\": %s",
    1338              :                                  subname, PQresultErrorMessage(res));
    1339            0 :             disconnect_database(conn, true);
    1340              :         }
    1341              : 
    1342            1 :         PQclear(res);
    1343              :     }
    1344              : 
    1345            4 :     destroyPQExpBuffer(query);
    1346            4 : }
    1347              : 
    1348              : /*
    1349              :  * Retrieve and drop the pre-existing subscriptions.
    1350              :  */
    1351              : static void
    1352            8 : check_and_drop_existing_subscriptions(PGconn *conn,
    1353              :                                       const struct LogicalRepInfo *dbinfo)
    1354              : {
    1355            8 :     PQExpBuffer query = createPQExpBuffer();
    1356              :     char       *dbname;
    1357              :     PGresult   *res;
    1358              : 
    1359              :     Assert(conn != NULL);
    1360              : 
    1361            8 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1362              : 
    1363            8 :     appendPQExpBuffer(query,
    1364              :                       "SELECT s.subname FROM pg_catalog.pg_subscription s "
    1365              :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1366              :                       "WHERE d.datname = %s",
    1367              :                       dbname);
    1368            8 :     res = PQexec(conn, query->data);
    1369              : 
    1370            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1371              :     {
    1372            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1373              :                              "could not obtain pre-existing subscriptions: %s",
    1374              :                              PQresultErrorMessage(res));
    1375            0 :         disconnect_database(conn, true);
    1376              :     }
    1377              : 
    1378           12 :     for (int i = 0; i < PQntuples(res); i++)
    1379            4 :         drop_existing_subscription(conn, PQgetvalue(res, i, 0),
    1380            4 :                                    dbinfo->dbname);
    1381              : 
    1382            8 :     PQclear(res);
    1383            8 :     destroyPQExpBuffer(query);
    1384            8 :     PQfreemem(dbname);
    1385            8 : }
    1386              : 
    1387              : /*
    1388              :  * Create the subscriptions, adjust the initial location for logical
    1389              :  * replication and enable the subscriptions. That's the last step for logical
    1390              :  * replication setup.
    1391              :  */
    1392              : static void
    1393            4 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
    1394              : {
    1395           12 :     for (int i = 0; i < num_dbs; i++)
    1396              :     {
    1397              :         PGconn     *conn;
    1398              : 
    1399              :         /* Connect to subscriber. */
    1400            8 :         conn = connect_database(dbinfo[i].subconninfo, true);
    1401              : 
    1402              :         /*
    1403              :          * We don't need the pre-existing subscriptions on the newly formed
    1404              :          * subscriber. They can connect to other publisher nodes and either
    1405              :          * get some unwarranted data or can lead to ERRORs in connecting to
    1406              :          * such nodes.
    1407              :          */
    1408            8 :         check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
    1409              : 
    1410              :         /* Check and drop the required publications in the given database. */
    1411            8 :         check_and_drop_publications(conn, &dbinfo[i]);
    1412              : 
    1413            8 :         create_subscription(conn, &dbinfo[i]);
    1414              : 
    1415              :         /* Set the replication progress to the correct LSN */
    1416            8 :         set_replication_progress(conn, &dbinfo[i], consistent_lsn);
    1417              : 
    1418              :         /* Enable subscription */
    1419            8 :         enable_subscription(conn, &dbinfo[i]);
    1420              : 
    1421            8 :         disconnect_database(conn, false);
    1422              :     }
    1423            4 : }
    1424              : 
    1425              : /*
    1426              :  * Write the required recovery parameters.
    1427              :  */
    1428              : static void
    1429            4 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
    1430              : {
    1431              :     PGconn     *conn;
    1432              :     PQExpBuffer recoveryconfcontents;
    1433              : 
    1434              :     /*
    1435              :      * Despite of the recovery parameters will be written to the subscriber,
    1436              :      * use a publisher connection. The primary_conninfo is generated using the
    1437              :      * connection settings.
    1438              :      */
    1439            4 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1440              : 
    1441              :     /*
    1442              :      * Write recovery parameters.
    1443              :      *
    1444              :      * The subscriber is not running yet. In dry run mode, the recovery
    1445              :      * parameters *won't* be written. An invalid LSN is used for printing
    1446              :      * purposes. Additional recovery parameters are added here. It avoids
    1447              :      * unexpected behavior such as end of recovery as soon as a consistent
    1448              :      * state is reached (recovery_target) and failure due to multiple recovery
    1449              :      * targets (name, time, xid, LSN).
    1450              :      */
    1451            4 :     recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
    1452            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
    1453            4 :     appendPQExpBufferStr(recoveryconfcontents,
    1454              :                          "recovery_target_timeline = 'latest'\n");
    1455              : 
    1456              :     /*
    1457              :      * Set recovery_target_inclusive = false to avoid reapplying the
    1458              :      * transaction committed at 'lsn' after subscription is enabled. This is
    1459              :      * because the provided 'lsn' is also used as the replication start point
    1460              :      * for the subscription. So, the server can send the transaction committed
    1461              :      * at that 'lsn' after replication is started which can lead to applying
    1462              :      * the same transaction twice if we keep recovery_target_inclusive = true.
    1463              :      */
    1464            4 :     appendPQExpBufferStr(recoveryconfcontents,
    1465              :                          "recovery_target_inclusive = false\n");
    1466            4 :     appendPQExpBufferStr(recoveryconfcontents,
    1467              :                          "recovery_target_action = promote\n");
    1468            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
    1469            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
    1470            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
    1471              : 
    1472            4 :     if (dry_run)
    1473              :     {
    1474            3 :         appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
    1475            3 :         appendPQExpBuffer(recoveryconfcontents,
    1476              :                           "recovery_target_lsn = '%X/%08X'\n",
    1477              :                           LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1478              :     }
    1479              :     else
    1480              :     {
    1481            1 :         appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
    1482              :                           lsn);
    1483              :     }
    1484              : 
    1485            4 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1486              :                          "recovery parameters:\n%s", recoveryconfcontents->data);
    1487              : 
    1488            4 :     if (!dry_run)
    1489              :     {
    1490              :         char        conf_filename[MAXPGPATH];
    1491              :         FILE       *fd;
    1492              : 
    1493              :         /* Write the recovery parameters to INCLUDED_CONF_FILE */
    1494            1 :         snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
    1495              :                  INCLUDED_CONF_FILE);
    1496            1 :         fd = fopen(conf_filename, "w");
    1497            1 :         if (fd == NULL)
    1498            0 :             report_createsub_fatal("could not open file \"%s\": %m", conf_filename);
    1499              : 
    1500            1 :         if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
    1501            0 :             report_createsub_fatal("could not write to file \"%s\": %m", conf_filename);
    1502              : 
    1503            1 :         fclose(fd);
    1504            1 :         recovery_params_set = true;
    1505              : 
    1506              :         /* Include conditionally the recovery parameters. */
    1507            1 :         resetPQExpBuffer(recoveryconfcontents);
    1508            1 :         appendPQExpBufferStr(recoveryconfcontents,
    1509              :                              "include_if_exists '" INCLUDED_CONF_FILE "'\n");
    1510            1 :         WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
    1511              :     }
    1512              : 
    1513            4 :     disconnect_database(conn, false);
    1514            4 : }
    1515              : 
    1516              : /*
    1517              :  * Drop physical replication slot on primary if the standby was using it. After
    1518              :  * the transformation, it has no use.
    1519              :  *
    1520              :  * XXX we might not fail here. Instead, we provide a warning so the user
    1521              :  * eventually drops this replication slot later.
    1522              :  */
    1523              : static void
    1524            4 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
    1525              : {
    1526              :     PGconn     *conn;
    1527              : 
    1528              :     /* Replication slot does not exist, do nothing */
    1529            4 :     if (!primary_slot_name)
    1530            0 :         return;
    1531              : 
    1532            4 :     conn = connect_database(dbinfo[0].pubconninfo, false);
    1533            4 :     if (conn != NULL)
    1534              :     {
    1535            4 :         drop_replication_slot(conn, &dbinfo[0], slotname);
    1536            4 :         disconnect_database(conn, false);
    1537              :     }
    1538              :     else
    1539              :     {
    1540            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1541              :                              "could not drop replication slot \"%s\" on primary",
    1542              :                              slotname);
    1543            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1544              :                              "Drop this replication slot soon to avoid retention of WAL files.");
    1545              :     }
    1546              : }
    1547              : 
    1548              : /*
    1549              :  * Drop failover replication slots on subscriber. After the transformation,
    1550              :  * they have no use.
    1551              :  *
    1552              :  * XXX We do not fail here. Instead, we provide a warning so the user can drop
    1553              :  * them later.
    1554              :  */
    1555              : static void
    1556            4 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
    1557              : {
    1558              :     PGconn     *conn;
    1559              :     PGresult   *res;
    1560              : 
    1561            4 :     conn = connect_database(dbinfo[0].subconninfo, false);
    1562            4 :     if (conn != NULL)
    1563              :     {
    1564              :         /* Get failover replication slot names */
    1565            4 :         res = PQexec(conn,
    1566              :                      "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
    1567              : 
    1568            4 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
    1569              :         {
    1570              :             /* Remove failover replication slots from subscriber */
    1571            8 :             for (int i = 0; i < PQntuples(res); i++)
    1572            4 :                 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
    1573              :         }
    1574              :         else
    1575              :         {
    1576            0 :             report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1577              :                                  "could not obtain failover replication slot information: %s",
    1578              :                                  PQresultErrorMessage(res));
    1579            0 :             report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1580              :                                  "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1581              :         }
    1582              : 
    1583            4 :         PQclear(res);
    1584            4 :         disconnect_database(conn, false);
    1585              :     }
    1586              :     else
    1587              :     {
    1588            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1589              :                              "could not drop failover replication slot");
    1590            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1591              :                              "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1592              :     }
    1593            4 : }
    1594              : 
    1595              : /*
    1596              :  * Create a logical replication slot and returns a LSN.
    1597              :  *
    1598              :  * CreateReplicationSlot() is not used because it does not provide the one-row
    1599              :  * result set that contains the LSN.
    1600              :  */
    1601              : static char *
    1602            8 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1603              : {
    1604            8 :     PQExpBuffer str = createPQExpBuffer();
    1605            8 :     PGresult   *res = NULL;
    1606            8 :     const char *slot_name = dbinfo->replslotname;
    1607              :     char       *slot_name_esc;
    1608            8 :     char       *lsn = NULL;
    1609              : 
    1610              :     Assert(conn != NULL);
    1611              : 
    1612            8 :     if (dry_run)
    1613            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1614              :                              "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
    1615              :                              slot_name, dbinfo->dbname);
    1616              :     else
    1617            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1618              :                              "creating the replication slot \"%s\" in database \"%s\" on publisher",
    1619              :                              slot_name, dbinfo->dbname);
    1620              : 
    1621            8 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1622              : 
    1623            8 :     appendPQExpBuffer(str,
    1624              :                       "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
    1625              :                       slot_name_esc,
    1626            8 :                       dbinfos.two_phase ? "true" : "false");
    1627              : 
    1628            8 :     PQfreemem(slot_name_esc);
    1629              : 
    1630            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1631              :                          "command is: %s", str->data);
    1632              : 
    1633            8 :     if (!dry_run)
    1634              :     {
    1635            2 :         res = PQexec(conn, str->data);
    1636            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1637              :         {
    1638            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1639              :                                  "could not create replication slot \"%s\" in database \"%s\": %s",
    1640              :                                  slot_name, dbinfo->dbname,
    1641              :                                  PQresultErrorMessage(res));
    1642            0 :             PQclear(res);
    1643            0 :             destroyPQExpBuffer(str);
    1644            0 :             return NULL;
    1645              :         }
    1646              : 
    1647            2 :         lsn = pg_strdup(PQgetvalue(res, 0, 0));
    1648            2 :         PQclear(res);
    1649              :     }
    1650              : 
    1651              :     /* For cleanup purposes */
    1652            8 :     dbinfo->made_replslot = true;
    1653              : 
    1654            8 :     destroyPQExpBuffer(str);
    1655              : 
    1656            8 :     return lsn;
    1657              : }
    1658              : 
    1659              : static void
    1660            8 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
    1661              :                       const char *slot_name)
    1662              : {
    1663            8 :     PQExpBuffer str = createPQExpBuffer();
    1664              :     char       *slot_name_esc;
    1665              :     PGresult   *res;
    1666              : 
    1667              :     Assert(conn != NULL);
    1668              : 
    1669            8 :     if (dry_run)
    1670            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1671              :                              "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
    1672              :                              slot_name, dbinfo->dbname);
    1673              :     else
    1674            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1675              :                              "dropping the replication slot \"%s\" in database \"%s\"",
    1676              :                              slot_name, dbinfo->dbname);
    1677              : 
    1678            8 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1679              : 
    1680            8 :     appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
    1681              : 
    1682            8 :     PQfreemem(slot_name_esc);
    1683              : 
    1684            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1685              :                          "command is: %s", str->data);
    1686              : 
    1687            8 :     if (!dry_run)
    1688              :     {
    1689            2 :         res = PQexec(conn, str->data);
    1690            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1691              :         {
    1692            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1693              :                                  "could not drop replication slot \"%s\" in database \"%s\": %s",
    1694              :                                  slot_name, dbinfo->dbname, PQresultErrorMessage(res));
    1695            0 :             dbinfo->made_replslot = false;   /* don't try again. */
    1696              :         }
    1697              : 
    1698            2 :         PQclear(res);
    1699              :     }
    1700              : 
    1701            8 :     destroyPQExpBuffer(str);
    1702            8 : }
    1703              : 
    1704              : /*
    1705              :  * Reports a suitable message if pg_ctl fails.
    1706              :  */
    1707              : static void
    1708           24 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
    1709              : {
    1710           24 :     if (rc != 0)
    1711              :     {
    1712            0 :         if (WIFEXITED(rc))
    1713              :         {
    1714            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1715              :                                  "pg_ctl failed with exit code %d",
    1716            0 :                                  WEXITSTATUS(rc));
    1717              :         }
    1718            0 :         else if (WIFSIGNALED(rc))
    1719              :         {
    1720              : #if defined(WIN32)
    1721              :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1722              :                                  "pg_ctl was terminated by exception 0x%X",
    1723              :                                  WTERMSIG(rc));
    1724              :             report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    1725              :                                  "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
    1726              : #else
    1727            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1728              :                                  "pg_ctl was terminated by signal %d: %s",
    1729              :                                  WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
    1730              : #endif
    1731              :         }
    1732              :         else
    1733              :         {
    1734            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1735              :                                  "pg_ctl exited with unrecognized status %d", rc);
    1736              :         }
    1737              : 
    1738            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    1739              :                              "The failed command was: %s", pg_ctl_cmd);
    1740            0 :         exit(1);
    1741              :     }
    1742           24 : }
    1743              : 
    1744              : static void
    1745           12 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
    1746              :                      bool restrict_logical_worker)
    1747              : {
    1748           12 :     PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    1749              :     int         rc;
    1750              : 
    1751           12 :     appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
    1752           12 :     appendShellString(pg_ctl_cmd, subscriber_dir);
    1753           12 :     appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
    1754              : 
    1755              :     /* Prevent unintended slot invalidation */
    1756           12 :     appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
    1757              : 
    1758           12 :     if (restricted_access)
    1759              :     {
    1760           12 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
    1761              : #if !defined(WIN32)
    1762              : 
    1763              :         /*
    1764              :          * An empty listen_addresses list means the server does not listen on
    1765              :          * any IP interfaces; only Unix-domain sockets can be used to connect
    1766              :          * to the server. Prevent external connections to minimize the chance
    1767              :          * of failure.
    1768              :          */
    1769           12 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
    1770           12 :         if (opt->socket_dir)
    1771           12 :             appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
    1772           12 :                               opt->socket_dir);
    1773           12 :         appendPQExpBufferChar(pg_ctl_cmd, '"');
    1774              : #endif
    1775              :     }
    1776           12 :     if (opt->config_file != NULL)
    1777            0 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
    1778            0 :                           opt->config_file);
    1779              : 
    1780              :     /* Suppress to start logical replication if requested */
    1781           12 :     if (restrict_logical_worker)
    1782            4 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
    1783              : 
    1784           12 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1785              :                          "pg_ctl command is: %s", pg_ctl_cmd->data);
    1786           12 :     rc = system(pg_ctl_cmd->data);
    1787           12 :     pg_ctl_status(pg_ctl_cmd->data, rc);
    1788           12 :     standby_running = true;
    1789           12 :     destroyPQExpBuffer(pg_ctl_cmd);
    1790           12 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1791              :                          "server was started");
    1792           12 : }
    1793              : 
    1794              : static void
    1795           12 : stop_standby_server(const char *datadir)
    1796              : {
    1797              :     char       *pg_ctl_cmd;
    1798              :     int         rc;
    1799              : 
    1800           12 :     pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
    1801              :                           datadir);
    1802           12 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1803              :                          "pg_ctl command is: %s", pg_ctl_cmd);
    1804           12 :     rc = system(pg_ctl_cmd);
    1805           12 :     pg_ctl_status(pg_ctl_cmd, rc);
    1806           12 :     standby_running = false;
    1807           12 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1808              :                          "server was stopped");
    1809           12 : }
    1810              : 
    1811              : /*
    1812              :  * Returns after the server finishes the recovery process.
    1813              :  *
    1814              :  * If recovery_timeout option is set, terminate abnormally without finishing
    1815              :  * the recovery process. By default, it waits forever.
    1816              :  *
    1817              :  * XXX Is the recovery process still in progress? When recovery process has a
    1818              :  * better progress reporting mechanism, it should be added here.
    1819              :  */
    1820              : static void
    1821            4 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
    1822              : {
    1823              :     PGconn     *conn;
    1824            4 :     bool        ready = false;
    1825            4 :     int         timer = 0;
    1826              : 
    1827            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1828              :                          "waiting for the target server to reach the consistent state");
    1829              : 
    1830            4 :     conn = connect_database(conninfo, true);
    1831              : 
    1832              :     for (;;)
    1833              :     {
    1834              :         /* Did the recovery process finish? We're done if so. */
    1835            5 :         if (dry_run || !server_is_in_recovery(conn))
    1836              :         {
    1837            4 :             ready = true;
    1838            4 :             recovery_ended = true;
    1839            4 :             break;
    1840              :         }
    1841              : 
    1842              :         /* Bail out after recovery_timeout seconds if this option is set */
    1843            1 :         if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
    1844              :         {
    1845            0 :             stop_standby_server(subscriber_dir);
    1846            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1847              :                                  "recovery timed out");
    1848            0 :             disconnect_database(conn, true);
    1849              :         }
    1850              : 
    1851              :         /* Keep waiting */
    1852            1 :         pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
    1853            1 :         timer += WAIT_INTERVAL;
    1854              :     }
    1855              : 
    1856            4 :     disconnect_database(conn, false);
    1857              : 
    1858            4 :     if (!ready)
    1859            0 :         report_createsub_fatal("server did not end recovery");
    1860              : 
    1861            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1862              :                          "target server reached the consistent state");
    1863            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_HINT,
    1864              :                          "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
    1865            4 : }
    1866              : 
    1867              : /*
    1868              :  * Create a publication that includes all tables in the database.
    1869              :  */
    1870              : static void
    1871            7 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1872              : {
    1873            7 :     PQExpBuffer str = createPQExpBuffer();
    1874              :     PGresult   *res;
    1875              :     char       *ipubname_esc;
    1876              :     char       *spubname_esc;
    1877              : 
    1878              :     Assert(conn != NULL);
    1879              : 
    1880            7 :     ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1881            7 :     spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1882              : 
    1883              :     /* Check if the publication already exists */
    1884            7 :     appendPQExpBuffer(str,
    1885              :                       "SELECT 1 FROM pg_catalog.pg_publication "
    1886              :                       "WHERE pubname = %s",
    1887              :                       spubname_esc);
    1888            7 :     res = PQexec(conn, str->data);
    1889            7 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1890              :     {
    1891            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1892              :                              "could not obtain publication information: %s",
    1893              :                              PQresultErrorMessage(res));
    1894            0 :         disconnect_database(conn, true);
    1895              :     }
    1896              : 
    1897            7 :     if (PQntuples(res) == 1)
    1898              :     {
    1899              :         /*
    1900              :          * Unfortunately, if it reaches this code path, it will always fail
    1901              :          * (unless you decide to change the existing publication name). That's
    1902              :          * bad but it is very unlikely that the user will choose a name with
    1903              :          * pg_createsubscriber_ prefix followed by the exact database oid and
    1904              :          * a random number.
    1905              :          */
    1906            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1907              :                              "publication \"%s\" already exists", dbinfo->pubname);
    1908            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1909              :                              "Consider renaming this publication before continuing.");
    1910            0 :         disconnect_database(conn, true);
    1911              :     }
    1912              : 
    1913            7 :     PQclear(res);
    1914            7 :     resetPQExpBuffer(str);
    1915              : 
    1916            7 :     if (dry_run)
    1917            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1918              :                              "dry-run: would create publication \"%s\" in database \"%s\"",
    1919              :                              dbinfo->pubname, dbinfo->dbname);
    1920              :     else
    1921            1 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1922              :                              "creating publication \"%s\" in database \"%s\"",
    1923              :                              dbinfo->pubname, dbinfo->dbname);
    1924              : 
    1925            7 :     appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
    1926              :                       ipubname_esc);
    1927              : 
    1928            7 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1929              :                          "command is: %s", str->data);
    1930              : 
    1931            7 :     if (!dry_run)
    1932              :     {
    1933            1 :         res = PQexec(conn, str->data);
    1934            1 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1935              :         {
    1936            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1937              :                                  "could not create publication \"%s\" in database \"%s\": %s",
    1938              :                                  dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1939            0 :             disconnect_database(conn, true);
    1940              :         }
    1941            1 :         PQclear(res);
    1942              :     }
    1943              : 
    1944              :     /* For cleanup purposes */
    1945            7 :     dbinfo->made_publication = true;
    1946              : 
    1947            7 :     PQfreemem(ipubname_esc);
    1948            7 :     PQfreemem(spubname_esc);
    1949            7 :     destroyPQExpBuffer(str);
    1950            7 : }
    1951              : 
    1952              : /*
    1953              :  * Drop the specified publication in the given database.
    1954              :  */
    1955              : static void
    1956           10 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
    1957              :                  bool *made_publication)
    1958              : {
    1959           10 :     PQExpBuffer str = createPQExpBuffer();
    1960              :     PGresult   *res;
    1961              :     char       *pubname_esc;
    1962              : 
    1963              :     Assert(conn != NULL);
    1964              : 
    1965           10 :     pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
    1966              : 
    1967           10 :     if (dry_run)
    1968            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1969              :                              "dry-run: would drop publication \"%s\" in database \"%s\"",
    1970              :                              pubname, dbname);
    1971              :     else
    1972            4 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1973              :                              "dropping publication \"%s\" in database \"%s\"",
    1974              :                              pubname, dbname);
    1975              : 
    1976           10 :     appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
    1977              : 
    1978           10 :     PQfreemem(pubname_esc);
    1979              : 
    1980           10 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1981              :                          "command is: %s", str->data);
    1982              : 
    1983           10 :     if (!dry_run)
    1984              :     {
    1985            4 :         res = PQexec(conn, str->data);
    1986            4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1987              :         {
    1988            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1989              :                                  "could not drop publication \"%s\" in database \"%s\": %s",
    1990              :                                  pubname, dbname, PQresultErrorMessage(res));
    1991            0 :             *made_publication = false;  /* don't try again. */
    1992              : 
    1993              :             /*
    1994              :              * Don't disconnect and exit here. This routine is used by primary
    1995              :              * (cleanup publication / replication slot due to an error) and
    1996              :              * subscriber (remove the replicated publications). In both cases,
    1997              :              * it can continue and provide instructions for the user to remove
    1998              :              * it later if cleanup fails.
    1999              :              */
    2000              :         }
    2001            4 :         PQclear(res);
    2002              :     }
    2003              : 
    2004           10 :     destroyPQExpBuffer(str);
    2005           10 : }
    2006              : 
    2007              : /*
    2008              :  * Retrieve and drop the publications.
    2009              :  *
    2010              :  * Publications copied during physical replication remain on the subscriber
    2011              :  * after promotion. If --clean=publications is specified, drop all existing
    2012              :  * publications in the subscriber database. Otherwise, only drop publications
    2013              :  * that were created by pg_createsubscriber during this operation.
    2014              :  */
    2015              : static void
    2016            8 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
    2017              : {
    2018              :     PGresult   *res;
    2019            8 :     bool        drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
    2020              : 
    2021              :     Assert(conn != NULL);
    2022              : 
    2023            8 :     if (drop_all_pubs)
    2024              :     {
    2025            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2026              :                              "dropping all existing publications in database \"%s\"",
    2027              :                              dbinfo->dbname);
    2028              : 
    2029              :         /* Fetch all publication names */
    2030            2 :         res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
    2031            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2032              :         {
    2033            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2034              :                                  "could not obtain publication information: %s",
    2035              :                                  PQresultErrorMessage(res));
    2036            0 :             PQclear(res);
    2037            0 :             disconnect_database(conn, true);
    2038              :         }
    2039              : 
    2040              :         /* Drop each publication */
    2041            6 :         for (int i = 0; i < PQntuples(res); i++)
    2042            4 :             drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
    2043              :                              &dbinfo->made_publication);
    2044              : 
    2045            2 :         PQclear(res);
    2046              :     }
    2047              :     else
    2048              :     {
    2049              :         /* Drop publication only if it was created by this tool */
    2050            6 :         if (dbinfo->made_publication)
    2051              :         {
    2052            6 :             drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
    2053              :                              &dbinfo->made_publication);
    2054              :         }
    2055              :         else
    2056              :         {
    2057            0 :             if (dry_run)
    2058            0 :                 report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2059              :                                      "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
    2060              :                                      dbinfo->pubname, dbinfo->dbname);
    2061              :             else
    2062            0 :                 report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2063              :                                      "preserve existing publication \"%s\" in database \"%s\"",
    2064              :                                      dbinfo->pubname, dbinfo->dbname);
    2065              :         }
    2066              :     }
    2067            8 : }
    2068              : 
    2069              : /*
    2070              :  * Create a subscription with some predefined options.
    2071              :  *
    2072              :  * A replication slot was already created in a previous step. Let's use it.  It
    2073              :  * is not required to copy data. The subscription will be created but it will
    2074              :  * not be enabled now. That's because the replication progress must be set and
    2075              :  * the replication origin name (one of the function arguments) contains the
    2076              :  * subscription OID in its name. Once the subscription is created,
    2077              :  * set_replication_progress() can obtain the chosen origin name and set up its
    2078              :  * initial location.
    2079              :  */
    2080              : static void
    2081            8 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    2082              : {
    2083            8 :     PQExpBuffer str = createPQExpBuffer();
    2084              :     PGresult   *res;
    2085              :     char       *pubname_esc;
    2086              :     char       *subname_esc;
    2087              :     char       *pubconninfo_esc;
    2088              :     char       *replslotname_esc;
    2089              : 
    2090              :     Assert(conn != NULL);
    2091              : 
    2092            8 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    2093            8 :     subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    2094            8 :     pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
    2095            8 :     replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
    2096              : 
    2097            8 :     if (dry_run)
    2098            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2099              :                              "dry-run: would create subscription \"%s\" in database \"%s\"",
    2100            6 :                              dbinfo->subname, dbinfo->dbname);
    2101              :     else
    2102            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2103              :                              "creating subscription \"%s\" in database \"%s\"",
    2104            2 :                              dbinfo->subname, dbinfo->dbname);
    2105              : 
    2106            8 :     appendPQExpBuffer(str,
    2107              :                       "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
    2108              :                       "WITH (create_slot = false, enabled = false, "
    2109              :                       "slot_name = %s, copy_data = false, two_phase = %s)",
    2110              :                       subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
    2111            8 :                       dbinfos.two_phase ? "true" : "false");
    2112              : 
    2113            8 :     PQfreemem(pubname_esc);
    2114            8 :     PQfreemem(subname_esc);
    2115            8 :     PQfreemem(pubconninfo_esc);
    2116            8 :     PQfreemem(replslotname_esc);
    2117              : 
    2118            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    2119              :                          "command is: %s", str->data);
    2120              : 
    2121            8 :     if (!dry_run)
    2122              :     {
    2123            2 :         res = PQexec(conn, str->data);
    2124            2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2125              :         {
    2126            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2127              :                                  "could not create subscription \"%s\" in database \"%s\": %s",
    2128            0 :                                  dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
    2129            0 :             disconnect_database(conn, true);
    2130              :         }
    2131            2 :         PQclear(res);
    2132              :     }
    2133              : 
    2134            8 :     destroyPQExpBuffer(str);
    2135            8 : }
    2136              : 
    2137              : /*
    2138              :  * Sets the replication progress to the consistent LSN.
    2139              :  *
    2140              :  * The subscriber caught up to the consistent LSN provided by the last
    2141              :  * replication slot that was created. The goal is to set up the initial
    2142              :  * location for the logical replication that is the exact LSN that the
    2143              :  * subscriber was promoted. Once the subscription is enabled it will start
    2144              :  * streaming from that location onwards.  In dry run mode, the subscription OID
    2145              :  * and LSN are set to invalid values for printing purposes.
    2146              :  */
    2147              : static void
    2148            8 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
    2149              : {
    2150            8 :     PQExpBuffer str = createPQExpBuffer();
    2151              :     PGresult   *res;
    2152              :     Oid         suboid;
    2153              :     char       *subname;
    2154              :     char       *dbname;
    2155              :     char       *originname;
    2156              :     char       *lsnstr;
    2157              : 
    2158              :     Assert(conn != NULL);
    2159              : 
    2160            8 :     subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
    2161            8 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    2162              : 
    2163            8 :     appendPQExpBuffer(str,
    2164              :                       "SELECT s.oid FROM pg_catalog.pg_subscription s "
    2165              :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    2166              :                       "WHERE s.subname = %s AND d.datname = %s",
    2167              :                       subname, dbname);
    2168              : 
    2169            8 :     res = PQexec(conn, str->data);
    2170            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2171              :     {
    2172            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2173              :                              "could not obtain subscription OID: %s",
    2174              :                              PQresultErrorMessage(res));
    2175            0 :         disconnect_database(conn, true);
    2176              :     }
    2177              : 
    2178            8 :     if (PQntuples(res) != 1 && !dry_run)
    2179              :     {
    2180            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2181              :                              "could not obtain subscription OID: got %d rows, expected %d row",
    2182              :                              PQntuples(res), 1);
    2183            0 :         disconnect_database(conn, true);
    2184              :     }
    2185              : 
    2186            8 :     if (dry_run)
    2187              :     {
    2188            6 :         suboid = InvalidOid;
    2189            6 :         lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    2190              :     }
    2191              :     else
    2192              :     {
    2193            2 :         suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
    2194            2 :         lsnstr = psprintf("%s", lsn);
    2195              :     }
    2196              : 
    2197            8 :     PQclear(res);
    2198              : 
    2199              :     /*
    2200              :      * The origin name is defined as pg_%u. %u is the subscription OID. See
    2201              :      * ApplyWorkerMain().
    2202              :      */
    2203            8 :     originname = psprintf("pg_%u", suboid);
    2204              : 
    2205            8 :     if (dry_run)
    2206            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2207              :                              "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2208            6 :                              originname, lsnstr, dbinfo->dbname);
    2209              :     else
    2210            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2211              :                              "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2212            2 :                              originname, lsnstr, dbinfo->dbname);
    2213              : 
    2214            8 :     resetPQExpBuffer(str);
    2215            8 :     appendPQExpBuffer(str,
    2216              :                       "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
    2217              :                       originname, lsnstr);
    2218              : 
    2219            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    2220              :                          "command is: %s", str->data);
    2221              : 
    2222            8 :     if (!dry_run)
    2223              :     {
    2224            2 :         res = PQexec(conn, str->data);
    2225            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2226              :         {
    2227            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2228              :                                  "could not set replication progress for subscription \"%s\": %s",
    2229            0 :                                  dbinfo->subname, PQresultErrorMessage(res));
    2230            0 :             disconnect_database(conn, true);
    2231              :         }
    2232            2 :         PQclear(res);
    2233              :     }
    2234              : 
    2235            8 :     PQfreemem(subname);
    2236            8 :     PQfreemem(dbname);
    2237            8 :     pg_free(originname);
    2238            8 :     pg_free(lsnstr);
    2239            8 :     destroyPQExpBuffer(str);
    2240            8 : }
    2241              : 
    2242              : /*
    2243              :  * Enables the subscription.
    2244              :  *
    2245              :  * The subscription was created in a previous step but it was disabled. After
    2246              :  * adjusting the initial logical replication location, enable the subscription.
    2247              :  */
    2248              : static void
    2249            8 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    2250              : {
    2251            8 :     PQExpBuffer str = createPQExpBuffer();
    2252              :     PGresult   *res;
    2253              :     char       *subname;
    2254              : 
    2255              :     Assert(conn != NULL);
    2256              : 
    2257            8 :     subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    2258              : 
    2259            8 :     if (dry_run)
    2260            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2261              :                              "dry-run: would enable subscription \"%s\" in database \"%s\"",
    2262            6 :                              dbinfo->subname, dbinfo->dbname);
    2263              :     else
    2264            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2265              :                              "enabling subscription \"%s\" in database \"%s\"",
    2266            2 :                              dbinfo->subname, dbinfo->dbname);
    2267              : 
    2268            8 :     appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
    2269              : 
    2270            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    2271              :                          "command is: %s", str->data);
    2272              : 
    2273            8 :     if (!dry_run)
    2274              :     {
    2275            2 :         res = PQexec(conn, str->data);
    2276            2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2277              :         {
    2278            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2279              :                                  "could not enable subscription \"%s\": %s",
    2280            0 :                                  dbinfo->subname, PQresultErrorMessage(res));
    2281            0 :             disconnect_database(conn, true);
    2282              :         }
    2283              : 
    2284            2 :         PQclear(res);
    2285              :     }
    2286              : 
    2287            8 :     PQfreemem(subname);
    2288            8 :     destroyPQExpBuffer(str);
    2289            8 : }
    2290              : 
    2291              : /*
    2292              :  * Fetch a list of all connectable non-template databases from the source server
    2293              :  * and form a list such that they appear as if the user has specified multiple
    2294              :  * --database options, one for each source database.
    2295              :  */
    2296              : static void
    2297            1 : get_publisher_databases(struct CreateSubscriberOptions *opt,
    2298              :                         bool dbnamespecified)
    2299              : {
    2300              :     PGconn     *conn;
    2301              :     PGresult   *res;
    2302              : 
    2303              :     /* If a database name was specified, just connect to it. */
    2304            1 :     if (dbnamespecified)
    2305            0 :         conn = connect_database(opt->pub_conninfo_str, true);
    2306              :     else
    2307              :     {
    2308              :         /* Otherwise, try postgres first and then template1. */
    2309              :         char       *conninfo;
    2310              : 
    2311            1 :         conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
    2312            1 :         conn = connect_database(conninfo, false);
    2313            1 :         pg_free(conninfo);
    2314            1 :         if (!conn)
    2315              :         {
    2316            0 :             conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
    2317            0 :             conn = connect_database(conninfo, true);
    2318            0 :             pg_free(conninfo);
    2319              :         }
    2320              :     }
    2321              : 
    2322            1 :     res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
    2323            1 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2324              :     {
    2325            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2326              :                              "could not obtain a list of databases: %s",
    2327              :                              PQresultErrorMessage(res));
    2328            0 :         PQclear(res);
    2329            0 :         disconnect_database(conn, true);
    2330              :     }
    2331              : 
    2332            4 :     for (int i = 0; i < PQntuples(res); i++)
    2333              :     {
    2334            3 :         const char *dbname = PQgetvalue(res, i, 0);
    2335              : 
    2336            3 :         simple_string_list_append(&opt->database_names, dbname);
    2337              : 
    2338              :         /* Increment num_dbs to reflect multiple --database options */
    2339            3 :         num_dbs++;
    2340              :     }
    2341              : 
    2342            1 :     PQclear(res);
    2343            1 :     disconnect_database(conn, false);
    2344            1 : }
    2345              : 
    2346              : int
    2347           23 : main(int argc, char **argv)
    2348              : {
    2349              :     static struct option long_options[] =
    2350              :     {
    2351              :         {"all", no_argument, NULL, 'a'},
    2352              :         {"database", required_argument, NULL, 'd'},
    2353              :         {"pgdata", required_argument, NULL, 'D'},
    2354              :         {"dry-run", no_argument, NULL, 'n'},
    2355              :         {"subscriber-port", required_argument, NULL, 'p'},
    2356              :         {"publisher-server", required_argument, NULL, 'P'},
    2357              :         {"socketdir", required_argument, NULL, 's'},
    2358              :         {"recovery-timeout", required_argument, NULL, 't'},
    2359              :         {"enable-two-phase", no_argument, NULL, 'T'},
    2360              :         {"subscriber-username", required_argument, NULL, 'U'},
    2361              :         {"verbose", no_argument, NULL, 'v'},
    2362              :         {"version", no_argument, NULL, 'V'},
    2363              :         {"help", no_argument, NULL, '?'},
    2364              :         {"config-file", required_argument, NULL, 1},
    2365              :         {"publication", required_argument, NULL, 2},
    2366              :         {"replication-slot", required_argument, NULL, 3},
    2367              :         {"subscription", required_argument, NULL, 4},
    2368              :         {"clean", required_argument, NULL, 5},
    2369              :         {NULL, 0, NULL, 0}
    2370              :     };
    2371              : 
    2372           23 :     struct CreateSubscriberOptions opt = {0};
    2373              : 
    2374              :     int         c;
    2375              :     int         option_index;
    2376              : 
    2377              :     char       *pub_base_conninfo;
    2378              :     char       *sub_base_conninfo;
    2379           23 :     char       *dbname_conninfo = NULL;
    2380              : 
    2381              :     uint64      pub_sysid;
    2382              :     uint64      sub_sysid;
    2383              :     struct stat statbuf;
    2384              : 
    2385              :     char       *consistent_lsn;
    2386              : 
    2387              :     char        pidfile[MAXPGPATH];
    2388              : 
    2389           23 :     pg_logging_init(argv[0]);
    2390           23 :     pg_logging_set_level(PG_LOG_WARNING);
    2391           23 :     progname = get_progname(argv[0]);
    2392           23 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
    2393              : 
    2394           23 :     if (argc > 1)
    2395              :     {
    2396           22 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
    2397              :         {
    2398            1 :             usage();
    2399            1 :             exit(0);
    2400              :         }
    2401           21 :         else if (strcmp(argv[1], "-V") == 0
    2402           21 :                  || strcmp(argv[1], "--version") == 0)
    2403              :         {
    2404            1 :             puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
    2405            1 :             exit(0);
    2406              :         }
    2407              :     }
    2408              : 
    2409              :     /* Default settings */
    2410           21 :     subscriber_dir = NULL;
    2411           21 :     opt.config_file = NULL;
    2412           21 :     opt.pub_conninfo_str = NULL;
    2413           21 :     opt.socket_dir = NULL;
    2414           21 :     opt.sub_port = DEFAULT_SUB_PORT;
    2415           21 :     opt.sub_username = NULL;
    2416           21 :     opt.two_phase = false;
    2417           21 :     opt.database_names = (SimpleStringList)
    2418              :     {
    2419              :         0
    2420              :     };
    2421           21 :     opt.recovery_timeout = 0;
    2422           21 :     opt.all_dbs = false;
    2423              : 
    2424              :     /*
    2425              :      * Don't allow it to be run as root. It uses pg_ctl which does not allow
    2426              :      * it either.
    2427              :      */
    2428              : #ifndef WIN32
    2429           21 :     if (geteuid() == 0)
    2430              :     {
    2431            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2432              :                              "cannot be executed by \"root\"");
    2433            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2434              :                              "You must run %s as the PostgreSQL superuser.",
    2435              :                              progname);
    2436            0 :         exit(1);
    2437              :     }
    2438              : #endif
    2439              : 
    2440           21 :     get_restricted_token();
    2441              : 
    2442          162 :     while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
    2443          162 :                             long_options, &option_index)) != -1)
    2444              :     {
    2445          144 :         switch (c)
    2446              :         {
    2447            3 :             case 'a':
    2448            3 :                 opt.all_dbs = true;
    2449            3 :                 break;
    2450           25 :             case 'd':
    2451           25 :                 if (!simple_string_list_member(&opt.database_names, optarg))
    2452              :                 {
    2453           24 :                     simple_string_list_append(&opt.database_names, optarg);
    2454           24 :                     num_dbs++;
    2455              :                 }
    2456              :                 else
    2457            1 :                     report_createsub_fatal("database \"%s\" specified more than once for -d/--database", optarg);
    2458           24 :                 break;
    2459           19 :             case 'D':
    2460           19 :                 subscriber_dir = pg_strdup(optarg);
    2461           19 :                 canonicalize_path(subscriber_dir);
    2462           19 :                 break;
    2463            9 :             case 'n':
    2464            9 :                 dry_run = true;
    2465            9 :                 break;
    2466           12 :             case 'p':
    2467           12 :                 opt.sub_port = pg_strdup(optarg);
    2468           12 :                 break;
    2469           18 :             case 'P':
    2470           18 :                 opt.pub_conninfo_str = pg_strdup(optarg);
    2471           18 :                 break;
    2472           12 :             case 's':
    2473           12 :                 opt.socket_dir = pg_strdup(optarg);
    2474           12 :                 canonicalize_path(opt.socket_dir);
    2475           12 :                 break;
    2476            3 :             case 't':
    2477            3 :                 opt.recovery_timeout = atoi(optarg);
    2478            3 :                 break;
    2479            1 :             case 'T':
    2480            1 :                 opt.two_phase = true;
    2481            1 :                 break;
    2482            0 :             case 'U':
    2483            0 :                 opt.sub_username = pg_strdup(optarg);
    2484            0 :                 break;
    2485           19 :             case 'v':
    2486           19 :                 pg_logging_increase_verbosity();
    2487           19 :                 break;
    2488            0 :             case 1:
    2489            0 :                 opt.config_file = pg_strdup(optarg);
    2490            0 :                 break;
    2491           12 :             case 2:
    2492           12 :                 if (!simple_string_list_member(&opt.pub_names, optarg))
    2493              :                 {
    2494           11 :                     simple_string_list_append(&opt.pub_names, optarg);
    2495           11 :                     num_pubs++;
    2496              :                 }
    2497              :                 else
    2498            1 :                     report_createsub_fatal("publication \"%s\" specified more than once for --publication", optarg);
    2499           11 :                 break;
    2500            4 :             case 3:
    2501            4 :                 if (!simple_string_list_member(&opt.replslot_names, optarg))
    2502              :                 {
    2503            4 :                     simple_string_list_append(&opt.replslot_names, optarg);
    2504            4 :                     num_replslots++;
    2505              :                 }
    2506              :                 else
    2507            0 :                     report_createsub_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
    2508            4 :                 break;
    2509            5 :             case 4:
    2510            5 :                 if (!simple_string_list_member(&opt.sub_names, optarg))
    2511              :                 {
    2512            5 :                     simple_string_list_append(&opt.sub_names, optarg);
    2513            5 :                     num_subs++;
    2514              :                 }
    2515              :                 else
    2516            0 :                     report_createsub_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
    2517            5 :                 break;
    2518            1 :             case 5:
    2519            1 :                 if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
    2520            1 :                     simple_string_list_append(&opt.objecttypes_to_clean, optarg);
    2521              :                 else
    2522            0 :                     report_createsub_fatal("object type \"%s\" specified more than once for --clean", optarg);
    2523            1 :                 break;
    2524            1 :             default:
    2525              :                 /* getopt_long already emitted a complaint */
    2526            1 :                 report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2527              :                                      "Try \"%s --help\" for more information.",
    2528              :                                      progname);
    2529            1 :                 exit(1);
    2530              :         }
    2531              :     }
    2532              : 
    2533              :     /* Validate that --all is not used with incompatible options */
    2534           18 :     if (opt.all_dbs)
    2535              :     {
    2536            3 :         char       *bad_switch = NULL;
    2537              : 
    2538            3 :         if (num_dbs > 0)
    2539            1 :             bad_switch = "--database";
    2540            2 :         else if (num_pubs > 0)
    2541            1 :             bad_switch = "--publication";
    2542            1 :         else if (num_replslots > 0)
    2543            0 :             bad_switch = "--replication-slot";
    2544            1 :         else if (num_subs > 0)
    2545            0 :             bad_switch = "--subscription";
    2546              : 
    2547            3 :         if (bad_switch)
    2548              :         {
    2549            2 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2550              :                                  "options %s and %s cannot be used together",
    2551              :                                  bad_switch, "-a/--all");
    2552            2 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2553              :                                  "Try \"%s --help\" for more information.",
    2554              :                                  progname);
    2555            2 :             exit(1);
    2556              :         }
    2557              :     }
    2558              : 
    2559              :     /* Any non-option arguments? */
    2560           16 :     if (optind < argc)
    2561              :     {
    2562            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2563              :                              "too many command-line arguments (first is \"%s\")",
    2564            0 :                              argv[optind]);
    2565            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2566              :                              "Try \"%s --help\" for more information.", progname);
    2567            0 :         exit(1);
    2568              :     }
    2569              : 
    2570              :     /* Required arguments */
    2571           16 :     if (subscriber_dir == NULL)
    2572              :     {
    2573            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2574              :                              "no subscriber data directory specified");
    2575            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2576              :                              "Try \"%s --help\" for more information.", progname);
    2577            1 :         exit(1);
    2578              :     }
    2579              : 
    2580              :     /* If socket directory is not provided, use the current directory */
    2581           15 :     if (opt.socket_dir == NULL)
    2582              :     {
    2583              :         char        cwd[MAXPGPATH];
    2584              : 
    2585            5 :         if (!getcwd(cwd, MAXPGPATH))
    2586            0 :             report_createsub_fatal("could not determine current directory");
    2587            5 :         opt.socket_dir = pg_strdup(cwd);
    2588            5 :         canonicalize_path(opt.socket_dir);
    2589              :     }
    2590              : 
    2591              :     /*
    2592              :      * Parse connection string. Build a base connection string that might be
    2593              :      * reused by multiple databases.
    2594              :      */
    2595           15 :     if (opt.pub_conninfo_str == NULL)
    2596              :     {
    2597              :         /*
    2598              :          * TODO use primary_conninfo (if available) from subscriber and
    2599              :          * extract publisher connection string. Assume that there are
    2600              :          * identical entries for physical and logical replication. If there is
    2601              :          * not, we would fail anyway.
    2602              :          */
    2603            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2604              :                              "no publisher connection string specified");
    2605            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2606              :                              "Try \"%s --help\" for more information.", progname);
    2607            1 :         exit(1);
    2608              :     }
    2609              : 
    2610           14 :     if (dry_run)
    2611            8 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2612              :                              "Executing in dry-run mode.\n"
    2613              :                              "The target directory will not be modified.");
    2614              : 
    2615           14 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2616              :                          "validating publisher connection string");
    2617           14 :     pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
    2618              :                                           &dbname_conninfo);
    2619           14 :     if (pub_base_conninfo == NULL)
    2620            0 :         exit(1);
    2621              : 
    2622           14 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2623              :                          "validating subscriber connection string");
    2624           14 :     sub_base_conninfo = get_sub_conninfo(&opt);
    2625              : 
    2626              :     /*
    2627              :      * Fetch all databases from the source (publisher) and treat them as if
    2628              :      * the user specified has multiple --database options, one for each source
    2629              :      * database.
    2630              :      */
    2631           14 :     if (opt.all_dbs)
    2632              :     {
    2633            1 :         bool        dbnamespecified = (dbname_conninfo != NULL);
    2634              : 
    2635            1 :         get_publisher_databases(&opt, dbnamespecified);
    2636              :     }
    2637              : 
    2638           14 :     if (opt.database_names.head == NULL)
    2639              :     {
    2640            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2641              :                              "no database was specified");
    2642              : 
    2643              :         /*
    2644              :          * Try to obtain the dbname from the publisher conninfo. If dbname
    2645              :          * parameter is not available, error out.
    2646              :          */
    2647            2 :         if (dbname_conninfo)
    2648              :         {
    2649            1 :             simple_string_list_append(&opt.database_names, dbname_conninfo);
    2650            1 :             num_dbs++;
    2651              : 
    2652            1 :             report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2653              :                                  "database name \"%s\" was extracted from the publisher connection string",
    2654              :                                  dbname_conninfo);
    2655              :         }
    2656              :         else
    2657              :         {
    2658            1 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2659              :                                  "no database name specified");
    2660            1 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2661              :                                  "Try \"%s --help\" for more information.",
    2662              :                                  progname);
    2663            1 :             exit(1);
    2664              :         }
    2665              :     }
    2666              : 
    2667              :     /* Number of object names must match number of databases */
    2668           13 :     if (num_pubs > 0 && num_pubs != num_dbs)
    2669              :     {
    2670            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2671              :                              "wrong number of publication names specified");
    2672            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    2673              :                              "The number of specified publication names (%d) must match the number of specified database names (%d).",
    2674              :                              num_pubs, num_dbs);
    2675            1 :         exit(1);
    2676              :     }
    2677           12 :     if (num_subs > 0 && num_subs != num_dbs)
    2678              :     {
    2679            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2680              :                              "wrong number of subscription names specified");
    2681            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    2682              :                              "The number of specified subscription names (%d) must match the number of specified database names (%d).",
    2683              :                              num_subs, num_dbs);
    2684            1 :         exit(1);
    2685              :     }
    2686           11 :     if (num_replslots > 0 && num_replslots != num_dbs)
    2687              :     {
    2688            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2689              :                              "wrong number of replication slot names specified");
    2690            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    2691              :                              "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
    2692              :                              num_replslots, num_dbs);
    2693            1 :         exit(1);
    2694              :     }
    2695              : 
    2696              :     /* Verify the object types specified for removal from the subscriber */
    2697           11 :     for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
    2698              :     {
    2699            1 :         if (pg_strcasecmp(cell->val, "publications") == 0)
    2700            1 :             dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
    2701              :         else
    2702              :         {
    2703            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2704              :                                  "invalid object type \"%s\" specified for %s",
    2705            0 :                                  cell->val, "--clean");
    2706            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2707              :                                  "The valid value is: \"%s\"", "publications");
    2708            0 :             exit(1);
    2709              :         }
    2710              :     }
    2711              : 
    2712              :     /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    2713           10 :     pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    2714           10 :     pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
    2715              : 
    2716              :     /* Rudimentary check for a data directory */
    2717           10 :     check_data_directory(subscriber_dir);
    2718              : 
    2719           10 :     dbinfos.two_phase = opt.two_phase;
    2720              : 
    2721              :     /*
    2722              :      * Store database information for publisher and subscriber. It should be
    2723              :      * called before atexit() because its return is used in the
    2724              :      * cleanup_objects_atexit().
    2725              :      */
    2726           10 :     dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
    2727              : 
    2728              :     /* Register a function to clean up objects in case of failure */
    2729           10 :     atexit(cleanup_objects_atexit);
    2730              : 
    2731              :     /*
    2732              :      * Check if the subscriber data directory has the same system identifier
    2733              :      * than the publisher data directory.
    2734              :      */
    2735           10 :     pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
    2736           10 :     sub_sysid = get_standby_sysid(subscriber_dir);
    2737           10 :     if (pub_sysid != sub_sysid)
    2738            1 :         report_createsub_fatal("subscriber data directory is not a copy of the source database cluster");
    2739              : 
    2740              :     /* Subscriber PID file */
    2741            9 :     snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
    2742              : 
    2743              :     /*
    2744              :      * The standby server must not be running. If the server is started under
    2745              :      * service manager and pg_createsubscriber stops it, the service manager
    2746              :      * might react to this action and start the server again. Therefore,
    2747              :      * refuse to proceed if the server is running to avoid possible failures.
    2748              :      */
    2749            9 :     if (stat(pidfile, &statbuf) == 0)
    2750              :     {
    2751            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2752              :                              "standby server is running");
    2753            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2754              :                              "Stop the standby server and try again.");
    2755            1 :         exit(1);
    2756              :     }
    2757              : 
    2758              :     /*
    2759              :      * Start a short-lived standby server with temporary parameters (provided
    2760              :      * by command-line options). The goal is to avoid connections during the
    2761              :      * transformation steps.
    2762              :      */
    2763            8 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2764              :                          "starting the standby server with command-line options");
    2765            8 :     start_standby_server(&opt, true, false);
    2766              : 
    2767              :     /* Check if the standby server is ready for logical replication */
    2768            8 :     check_subscriber(dbinfos.dbinfo);
    2769              : 
    2770              :     /* Check if the primary server is ready for logical replication */
    2771            6 :     check_publisher(dbinfos.dbinfo);
    2772              : 
    2773              :     /*
    2774              :      * Stop the target server. The recovery process requires that the server
    2775              :      * reaches a consistent state before targeting the recovery stop point.
    2776              :      * Make sure a consistent state is reached (stop the target server
    2777              :      * guarantees it) *before* creating the replication slots in
    2778              :      * setup_publisher().
    2779              :      */
    2780            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2781              :                          "stopping the subscriber");
    2782            4 :     stop_standby_server(subscriber_dir);
    2783              : 
    2784              :     /* Create the required objects for each database on publisher */
    2785            4 :     consistent_lsn = setup_publisher(dbinfos.dbinfo);
    2786              : 
    2787              :     /* Write the required recovery parameters */
    2788            4 :     setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
    2789              : 
    2790              :     /*
    2791              :      * Start subscriber so the recovery parameters will take effect. Wait
    2792              :      * until accepting connections. We don't want to start logical replication
    2793              :      * during setup.
    2794              :      */
    2795            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2796              :                          "starting the subscriber");
    2797            4 :     start_standby_server(&opt, true, true);
    2798              : 
    2799              :     /* Waiting the subscriber to be promoted */
    2800            4 :     wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
    2801              : 
    2802              :     /*
    2803              :      * Create the subscription for each database on subscriber. It does not
    2804              :      * enable it immediately because it needs to adjust the replication start
    2805              :      * point to the LSN reported by setup_publisher().  It also cleans up
    2806              :      * publications created by this tool and replication to the standby.
    2807              :      */
    2808            4 :     setup_subscriber(dbinfos.dbinfo, consistent_lsn);
    2809              : 
    2810              :     /* Remove primary_slot_name if it exists on primary */
    2811            4 :     drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
    2812              : 
    2813              :     /* Remove failover replication slots if they exist on subscriber */
    2814            4 :     drop_failover_replication_slots(dbinfos.dbinfo);
    2815              : 
    2816              :     /* Stop the subscriber */
    2817            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2818              :                          "stopping the subscriber");
    2819            4 :     stop_standby_server(subscriber_dir);
    2820              : 
    2821              :     /* Change system identifier from subscriber */
    2822            4 :     modify_subscriber_sysid(&opt);
    2823              : 
    2824            4 :     success = true;
    2825              : 
    2826            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2827              :                          "Done!");
    2828              : 
    2829            4 :     return 0;
    2830              : }
        

Generated by: LCOV version 2.0-1