LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_createsubscriber.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.1 % 1060 881
Test Date: 2026-04-07 14:16:30 Functions: 100.0 % 46 46
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_createsubscriber.c
       4              :  *    Create a new logical replica from a standby server
       5              :  *
       6              :  * Copyright (c) 2024-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/bin/pg_basebackup/pg_createsubscriber.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : 
      14              : #include "postgres_fe.h"
      15              : 
      16              : #include <sys/stat.h>
      17              : #include <sys/time.h>
      18              : #include <sys/wait.h>
      19              : #include <time.h>
      20              : 
      21              : #include "common/connect.h"
      22              : #include "common/controldata_utils.h"
      23              : #include "common/file_perm.h"
      24              : #include "common/file_utils.h"
      25              : #include "common/logging.h"
      26              : #include "common/pg_prng.h"
      27              : #include "common/restricted_token.h"
      28              : #include "datatype/timestamp.h"
      29              : #include "fe_utils/recovery_gen.h"
      30              : #include "fe_utils/simple_list.h"
      31              : #include "fe_utils/string_utils.h"
      32              : #include "fe_utils/version.h"
      33              : #include "getopt_long.h"
      34              : 
      35              : #define DEFAULT_SUB_PORT    "50432"
      36              : #define OBJECTTYPE_PUBLICATIONS  0x0001
      37              : 
      38              : /*
      39              :  * Configuration files for recovery parameters.
      40              :  *
      41              :  * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
      42              :  * the server through an include_if_exists in postgresql.auto.conf.
      43              :  *
      44              :  * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
      45              :  * so as the recovery parameters set by this tool never take effect on node
      46              :  * restart.  The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
      47              :  * debugging.
      48              :  */
      49              : #define PG_AUTOCONF_FILENAME        "postgresql.auto.conf"
      50              : #define INCLUDED_CONF_FILE          "pg_createsubscriber.conf"
      51              : #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
      52              : 
      53              : #define SERVER_LOG_FILE_NAME "pg_createsubscriber_server.log"
      54              : #define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal.log"
      55              : 
      56              : /* Command-line options */
      57              : struct CreateSubscriberOptions
      58              : {
      59              :     char       *config_file;    /* configuration file */
      60              :     char       *log_dir;        /* log directory name */
      61              :     char       *pub_conninfo_str;   /* publisher connection string */
      62              :     char       *socket_dir;     /* directory for Unix-domain socket, if any */
      63              :     char       *sub_port;       /* subscriber port number */
      64              :     const char *sub_username;   /* subscriber username */
      65              :     bool        two_phase;      /* enable-two-phase option */
      66              :     SimpleStringList database_names;    /* list of database names */
      67              :     SimpleStringList pub_names; /* list of publication names */
      68              :     SimpleStringList sub_names; /* list of subscription names */
      69              :     SimpleStringList replslot_names;    /* list of replication slot names */
      70              :     int         recovery_timeout;   /* stop recovery after this time */
      71              :     bool        all_dbs;        /* all option */
      72              :     SimpleStringList objecttypes_to_clean;  /* list of object types to cleanup */
      73              : };
      74              : 
      75              : /* per-database publication/subscription info */
      76              : struct LogicalRepInfo
      77              : {
      78              :     char       *dbname;         /* database name */
      79              :     char       *pubconninfo;    /* publisher connection string */
      80              :     char       *subconninfo;    /* subscriber connection string */
      81              :     char       *pubname;        /* publication name */
      82              :     char       *subname;        /* subscription name */
      83              :     char       *replslotname;   /* replication slot name */
      84              : 
      85              :     bool        made_replslot;  /* replication slot was created */
      86              :     bool        made_publication;   /* publication was created */
      87              : };
      88              : 
      89              : /*
      90              :  * Information shared across all the databases (or publications and
      91              :  * subscriptions).
      92              :  */
      93              : struct LogicalRepInfos
      94              : {
      95              :     struct LogicalRepInfo *dbinfo;
      96              :     bool        two_phase;      /* enable-two-phase option */
      97              :     uint32      objecttypes_to_clean;   /* flags indicating which object types
      98              :                                          * to clean up on subscriber */
      99              : };
     100              : 
     101              : static void cleanup_objects_atexit(void);
     102              : static void usage(void);
     103              : static char *get_base_conninfo(const char *conninfo, char **dbname);
     104              : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
     105              : static char *get_exec_path(const char *argv0, const char *progname);
     106              : static void check_data_directory(const char *datadir);
     107              : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
     108              : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     109              :                                                  const char *pub_base_conninfo,
     110              :                                                  const char *sub_base_conninfo);
     111              : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
     112              : static void disconnect_database(PGconn *conn, bool exit_on_error);
     113              : static uint64 get_primary_sysid(const char *conninfo);
     114              : static uint64 get_standby_sysid(const char *datadir);
     115              : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
     116              : static bool server_is_in_recovery(PGconn *conn);
     117              : static char *generate_object_name(PGconn *conn);
     118              : static void check_publisher(const struct LogicalRepInfo *dbinfo);
     119              : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
     120              : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
     121              : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
     122              :                              const char *consistent_lsn);
     123              : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
     124              :                            const char *lsn);
     125              : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
     126              :                                           const char *slotname);
     127              : static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
     128              : static char *create_logical_replication_slot(PGconn *conn,
     129              :                                              struct LogicalRepInfo *dbinfo);
     130              : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
     131              :                                   const char *slot_name);
     132              : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
     133              : static void start_standby_server(const struct CreateSubscriberOptions *opt,
     134              :                                  bool restricted_access,
     135              :                                  bool restrict_logical_worker);
     136              : static void stop_standby_server(const char *datadir);
     137              : static void wait_for_end_recovery(const char *conninfo,
     138              :                                   const struct CreateSubscriberOptions *opt);
     139              : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     140              : static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
     141              : static void drop_publication(PGconn *conn, const char *pubname,
     142              :                              const char *dbname, bool *made_publication);
     143              : static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
     144              : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     145              : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
     146              :                                      const char *lsn);
     147              : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     148              : static void check_and_drop_existing_subscriptions(PGconn *conn,
     149              :                                                   const struct LogicalRepInfo *dbinfo);
     150              : static void drop_existing_subscription(PGconn *conn, const char *subname,
     151              :                                        const char *dbname);
     152              : static void get_publisher_databases(struct CreateSubscriberOptions *opt,
     153              :                                     bool dbnamespecified);
     154              : static void report_createsub_log(enum pg_log_level, enum pg_log_part,
     155              :                                  const char *pg_restrict fmt,...)
     156              :             pg_attribute_printf(3, 4);
     157              : static void report_createsub_log_v(enum pg_log_level level, enum pg_log_part part,
     158              :                                    const char *pg_restrict fmt, va_list args)
     159              :             pg_attribute_printf(3, 0);
     160              : pg_noreturn static void report_createsub_fatal(const char *pg_restrict fmt,...)
     161              :             pg_attribute_printf(1, 2);
     162              : static void internal_log_file_write(enum pg_log_level level,
     163              :                                     enum pg_log_part part,
     164              :                                     const char *pg_restrict fmt, va_list args)
     165              :             pg_attribute_printf(3, 0);
     166              : 
     167              : #define WAIT_INTERVAL   1       /* 1 second */
     168              : 
     169              : static const char *progname;
     170              : 
     171              : static char *primary_slot_name = NULL;
     172              : static bool dry_run = false;
     173              : 
     174              : static bool success = false;
     175              : 
     176              : static struct LogicalRepInfos dbinfos;
     177              : static int  num_dbs = 0;        /* number of specified databases */
     178              : static int  num_pubs = 0;       /* number of specified publications */
     179              : static int  num_subs = 0;       /* number of specified subscriptions */
     180              : static int  num_replslots = 0;  /* number of specified replication slots */
     181              : 
     182              : static pg_prng_state prng_state;
     183              : 
     184              : static char *pg_ctl_path = NULL;
     185              : static char *pg_resetwal_path = NULL;
     186              : 
     187              : static FILE *internal_log_file_fp = NULL;   /* File ptr to log all messages to */
     188              : static char logdir[MAXPGPATH];  /* Subdirectory of the user specified logdir
     189              :                                  * where the log files are written (if
     190              :                                  * specified) */
     191              : 
     192              : /* standby / subscriber data directory */
     193              : static char *subscriber_dir = NULL;
     194              : 
     195              : static bool recovery_ended = false;
     196              : static bool standby_running = false;
     197              : static bool recovery_params_set = false;
     198              : 
     199              : /*
     200              :  * Report a message with a given log level.
     201              :  *
     202              :  * Writes to stderr, and also to the log file, if --logdir option was
     203              :  * specified.
     204              :  */
     205              : static void
     206          485 : report_createsub_log_v(enum pg_log_level level, enum pg_log_part part,
     207              :                        const char *pg_restrict fmt, va_list args)
     208              : {
     209          485 :     int         save_errno = errno;
     210              : 
     211          485 :     if (internal_log_file_fp != NULL)
     212              :     {
     213              :         /* Output to both stderr and the log file */
     214              :         va_list     arg_cpy;
     215              : 
     216           77 :         va_copy(arg_cpy, args);
     217           77 :         internal_log_file_write(level, part, fmt, arg_cpy);
     218           77 :         va_end(arg_cpy);
     219              :         /* Restore errno in case internal_log_file_write changed it */
     220           77 :         errno = save_errno;
     221              :     }
     222          485 :     pg_log_generic_v(level, part, fmt, args);
     223          485 : }
     224              : 
     225              : static void
     226          482 : report_createsub_log(enum pg_log_level level, enum pg_log_part part,
     227              :                      const char *pg_restrict fmt,...)
     228              : {
     229              :     va_list     args;
     230              : 
     231          482 :     va_start(args, fmt);
     232              : 
     233          482 :     report_createsub_log_v(level, part, fmt, args);
     234              : 
     235          482 :     va_end(args);
     236          482 : }
     237              : 
     238              : /*
     239              :  * Report a fatal error and exit
     240              :  */
     241              : static void
     242            3 : report_createsub_fatal(const char *pg_restrict fmt,...)
     243              : {
     244              :     va_list     args;
     245              : 
     246            3 :     va_start(args, fmt);
     247              : 
     248            3 :     report_createsub_log_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, args);
     249              : 
     250            3 :     va_end(args);
     251              : 
     252            3 :     exit(1);
     253              : }
     254              : 
     255              : /*
     256              :  * Clean up objects created by pg_createsubscriber.
     257              :  *
     258              :  * Publications and replication slots are created on the primary.  Depending
     259              :  * on the step where it failed, already-created objects should be removed if
     260              :  * possible (sometimes this won't work due to a connection issue).
     261              :  * There is no cleanup on the target server *after* its promotion, because any
     262              :  * failure at this point means recreating the physical replica and starting
     263              :  * again.
     264              :  *
     265              :  * The recovery configuration is always removed, by renaming the included
     266              :  * configuration file out of the way.
     267              :  */
     268              : static void
     269           10 : cleanup_objects_atexit(void)
     270              : {
     271              :     /* Rename the included configuration file, if necessary. */
     272           10 :     if (recovery_params_set)
     273              :     {
     274              :         char        conf_filename[MAXPGPATH];
     275              :         char        conf_filename_disabled[MAXPGPATH];
     276              : 
     277            1 :         snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
     278              :                  INCLUDED_CONF_FILE);
     279            1 :         snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
     280              :                  INCLUDED_CONF_FILE_DISABLED);
     281              : 
     282            1 :         if (durable_rename(conf_filename, conf_filename_disabled) != 0)
     283              :         {
     284              :             /* durable_rename() has already logged something. */
     285            0 :             report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
     286              :                                  "A manual removal of the recovery parameters may be required.");
     287              :         }
     288              :     }
     289              : 
     290           10 :     if (success)
     291            4 :         return;
     292              : 
     293              :     /*
     294              :      * If the server is promoted, there is no way to use the current setup
     295              :      * again. Warn the user that a new replication setup should be done before
     296              :      * trying again.
     297              :      */
     298            6 :     if (recovery_ended)
     299              :     {
     300            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
     301              :                              "failed after the end of recovery");
     302            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
     303              :                              "The target server cannot be used as a physical replica anymore.  "
     304              :                              "You must recreate the physical replica before continuing.");
     305              :     }
     306              : 
     307           18 :     for (int i = 0; i < num_dbs; i++)
     308              :     {
     309           12 :         struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
     310              : 
     311           12 :         if (dbinfo->made_publication || dbinfo->made_replslot)
     312              :         {
     313              :             PGconn     *conn;
     314              : 
     315            0 :             conn = connect_database(dbinfo->pubconninfo, false);
     316            0 :             if (conn != NULL)
     317              :             {
     318            0 :                 if (dbinfo->made_publication)
     319            0 :                     drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
     320              :                                      &dbinfo->made_publication);
     321            0 :                 if (dbinfo->made_replslot)
     322            0 :                     drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
     323            0 :                 disconnect_database(conn, false);
     324              :             }
     325              :             else
     326              :             {
     327              :                 /*
     328              :                  * If a connection could not be established, inform the user
     329              :                  * that some objects were left on primary and should be
     330              :                  * removed before trying again.
     331              :                  */
     332            0 :                 if (dbinfo->made_publication)
     333              :                 {
     334            0 :                     report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
     335              :                                          "publication \"%s\" created in database \"%s\" on primary was left behind",
     336              :                                          dbinfo->pubname,
     337              :                                          dbinfo->dbname);
     338            0 :                     report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
     339              :                                          "Drop this publication before trying again.");
     340              :                 }
     341            0 :                 if (dbinfo->made_replslot)
     342              :                 {
     343            0 :                     report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
     344              :                                          "replication slot \"%s\" created in database \"%s\" on primary was left behind",
     345              :                                          dbinfo->replslotname,
     346              :                                          dbinfo->dbname);
     347            0 :                     report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
     348              :                                          "Drop this replication slot soon to avoid retention of WAL files.");
     349              :                 }
     350              :             }
     351              :         }
     352              :     }
     353              : 
     354            6 :     if (standby_running)
     355            4 :         stop_standby_server(subscriber_dir);
     356              : }
     357              : 
     358              : static void
     359            1 : usage(void)
     360              : {
     361            1 :     printf(_("%s creates a new logical replica from a standby server.\n\n"),
     362              :            progname);
     363            1 :     printf(_("Usage:\n"));
     364            1 :     printf(_("  %s [OPTION]...\n"), progname);
     365            1 :     printf(_("\nOptions:\n"));
     366            1 :     printf(_("  -a, --all                       create subscriptions for all databases except template\n"
     367              :              "                                  databases and databases that don't allow connections\n"));
     368            1 :     printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
     369            1 :     printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
     370            1 :     printf(_("  -l, --logdir=LOGDIR             location for the log directory\n"));
     371            1 :     printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
     372            1 :     printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
     373            1 :     printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
     374            1 :     printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
     375            1 :     printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
     376            1 :     printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
     377            1 :     printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
     378            1 :     printf(_("  -v, --verbose                   output verbose messages\n"));
     379            1 :     printf(_("      --clean=OBJECTTYPE          drop all objects of the specified type from specified\n"
     380              :              "                                  databases on the subscriber; accepts: \"%s\"\n"), "publications");
     381            1 :     printf(_("      --config-file=FILENAME      use specified main server configuration\n"
     382              :              "                                  file when running target cluster\n"));
     383            1 :     printf(_("      --publication=NAME          publication name\n"));
     384            1 :     printf(_("      --replication-slot=NAME     replication slot name\n"));
     385            1 :     printf(_("      --subscription=NAME         subscription name\n"));
     386            1 :     printf(_("  -V, --version                   output version information, then exit\n"));
     387            1 :     printf(_("  -?, --help                      show this help, then exit\n"));
     388            1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     389            1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     390            1 : }
     391              : 
     392              : /*
     393              :  * Subroutine to append "keyword=value" to a connection string,
     394              :  * with proper quoting of the value.  (We assume keywords don't need that.)
     395              :  */
     396              : static void
     397          107 : appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
     398              : {
     399          107 :     if (buf->len > 0)
     400           79 :         appendPQExpBufferChar(buf, ' ');
     401          107 :     appendPQExpBufferStr(buf, keyword);
     402          107 :     appendPQExpBufferChar(buf, '=');
     403          107 :     appendConnStrVal(buf, val);
     404          107 : }
     405              : 
     406              : /*
     407              :  * Validate a connection string. Returns a base connection string that is a
     408              :  * connection string without a database name.
     409              :  *
     410              :  * Since we might process multiple databases, each database name will be
     411              :  * appended to this base connection string to provide a final connection
     412              :  * string. If the second argument (dbname) is not null, returns dbname if the
     413              :  * provided connection string contains it.
     414              :  *
     415              :  * It is the caller's responsibility to free the returned connection string and
     416              :  * dbname.
     417              :  */
     418              : static char *
     419           14 : get_base_conninfo(const char *conninfo, char **dbname)
     420              : {
     421              :     PQExpBuffer buf;
     422              :     PQconninfoOption *conn_opts;
     423              :     PQconninfoOption *conn_opt;
     424           14 :     char       *errmsg = NULL;
     425              :     char       *ret;
     426              : 
     427           14 :     conn_opts = PQconninfoParse(conninfo, &errmsg);
     428           14 :     if (conn_opts == NULL)
     429              :     {
     430            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     431              :                              "could not parse connection string: %s", errmsg);
     432            0 :         PQfreemem(errmsg);
     433            0 :         return NULL;
     434              :     }
     435              : 
     436           14 :     buf = createPQExpBuffer();
     437          742 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     438              :     {
     439          728 :         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     440              :         {
     441           33 :             if (strcmp(conn_opt->keyword, "dbname") == 0)
     442              :             {
     443            9 :                 if (dbname)
     444            9 :                     *dbname = pg_strdup(conn_opt->val);
     445            9 :                 continue;
     446              :             }
     447           24 :             appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
     448              :         }
     449              :     }
     450              : 
     451           14 :     ret = pg_strdup(buf->data);
     452              : 
     453           14 :     destroyPQExpBuffer(buf);
     454           14 :     PQconninfoFree(conn_opts);
     455              : 
     456           14 :     return ret;
     457              : }
     458              : 
     459              : /*
     460              :  * Build a subscriber connection string. Only a few parameters are supported
     461              :  * since it starts a server with restricted access.
     462              :  */
     463              : static char *
     464           14 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
     465              : {
     466           14 :     PQExpBuffer buf = createPQExpBuffer();
     467              :     char       *ret;
     468              : 
     469           14 :     appendConnStrItem(buf, "port", opt->sub_port);
     470              : #if !defined(WIN32)
     471           14 :     appendConnStrItem(buf, "host", opt->socket_dir);
     472              : #endif
     473           14 :     if (opt->sub_username != NULL)
     474            0 :         appendConnStrItem(buf, "user", opt->sub_username);
     475           14 :     appendConnStrItem(buf, "fallback_application_name", progname);
     476              : 
     477           14 :     ret = pg_strdup(buf->data);
     478              : 
     479           14 :     destroyPQExpBuffer(buf);
     480              : 
     481           14 :     return ret;
     482              : }
     483              : 
     484              : /*
     485              :  * Verify if a PostgreSQL binary (progname) is available in the same directory as
     486              :  * pg_createsubscriber and it has the same version.  It returns the absolute
     487              :  * path of the progname.
     488              :  */
     489              : static char *
     490           20 : get_exec_path(const char *argv0, const char *progname)
     491              : {
     492              :     char       *versionstr;
     493              :     char       *exec_path;
     494              :     int         ret;
     495              : 
     496           20 :     versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
     497           20 :     exec_path = pg_malloc(MAXPGPATH);
     498           20 :     ret = find_other_exec(argv0, progname, versionstr, exec_path);
     499              : 
     500           20 :     if (ret < 0)
     501              :     {
     502              :         char        full_path[MAXPGPATH];
     503              : 
     504            0 :         if (find_my_exec(argv0, full_path) < 0)
     505            0 :             strlcpy(full_path, progname, sizeof(full_path));
     506              : 
     507            0 :         if (ret == -1)
     508            0 :             report_createsub_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
     509              :                                    progname, "pg_createsubscriber", full_path);
     510              :         else
     511            0 :             report_createsub_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
     512              :                                    progname, full_path, "pg_createsubscriber");
     513              :     }
     514              : 
     515           20 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
     516              :                          "%s path is:  %s", progname, exec_path);
     517              : 
     518           20 :     return exec_path;
     519              : }
     520              : 
     521              : /*
     522              :  * Is it a cluster directory? These are preliminary checks. It is far from
     523              :  * making an accurate check. If it is not a clone from the publisher, it will
     524              :  * eventually fail in a future step.
     525              :  */
     526              : static void
     527           10 : check_data_directory(const char *datadir)
     528              : {
     529              :     struct stat statbuf;
     530              :     uint32      major_version;
     531              :     char       *version_str;
     532              : 
     533           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     534              :                          "checking if directory \"%s\" is a cluster data directory",
     535              :                          datadir);
     536              : 
     537           10 :     if (stat(datadir, &statbuf) != 0)
     538              :     {
     539            0 :         if (errno == ENOENT)
     540            0 :             report_createsub_fatal("data directory \"%s\" does not exist", datadir);
     541              :         else
     542            0 :             report_createsub_fatal("could not access directory \"%s\": %m", datadir);
     543              :     }
     544              : 
     545              :     /*
     546              :      * Retrieve the contents of this cluster's PG_VERSION.  We require
     547              :      * compatibility with the same major version as the one this tool is
     548              :      * compiled with.
     549              :      */
     550           10 :     major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
     551           10 :     if (major_version != PG_MAJORVERSION_NUM)
     552              :     {
     553            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     554              :                              "data directory is of wrong version");
     555            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
     556              :                              "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
     557              :                              "PG_VERSION", version_str, PG_MAJORVERSION);
     558            0 :         exit(1);
     559              :     }
     560           10 : }
     561              : 
     562              : /*
     563              :  * Append database name into a base connection string.
     564              :  *
     565              :  * dbname is the only parameter that changes so it is not included in the base
     566              :  * connection string. This function concatenates dbname to build a "real"
     567              :  * connection string.
     568              :  */
     569              : static char *
     570           41 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
     571              : {
     572           41 :     PQExpBuffer buf = createPQExpBuffer();
     573              :     char       *ret;
     574              : 
     575              :     Assert(conninfo != NULL);
     576              : 
     577           41 :     appendPQExpBufferStr(buf, conninfo);
     578           41 :     appendConnStrItem(buf, "dbname", dbname);
     579              : 
     580           41 :     ret = pg_strdup(buf->data);
     581           41 :     destroyPQExpBuffer(buf);
     582              : 
     583           41 :     return ret;
     584              : }
     585              : 
     586              : /*
     587              :  * Store publication and subscription information.
     588              :  *
     589              :  * If publication, replication slot and subscription names were specified,
     590              :  * store it here. Otherwise, a generated name will be assigned to the object in
     591              :  * setup_publisher().
     592              :  */
     593              : static struct LogicalRepInfo *
     594           10 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     595              :                    const char *pub_base_conninfo,
     596              :                    const char *sub_base_conninfo)
     597              : {
     598              :     struct LogicalRepInfo *dbinfo;
     599           10 :     SimpleStringListCell *pubcell = NULL;
     600           10 :     SimpleStringListCell *subcell = NULL;
     601           10 :     SimpleStringListCell *replslotcell = NULL;
     602           10 :     int         i = 0;
     603              : 
     604           10 :     dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
     605              : 
     606           10 :     if (num_pubs > 0)
     607            2 :         pubcell = opt->pub_names.head;
     608           10 :     if (num_subs > 0)
     609            1 :         subcell = opt->sub_names.head;
     610           10 :     if (num_replslots > 0)
     611            2 :         replslotcell = opt->replslot_names.head;
     612              : 
     613           30 :     for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
     614              :     {
     615              :         char       *conninfo;
     616              : 
     617              :         /* Fill publisher attributes */
     618           20 :         conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
     619           20 :         dbinfo[i].pubconninfo = conninfo;
     620           20 :         dbinfo[i].dbname = cell->val;
     621           20 :         if (num_pubs > 0)
     622            4 :             dbinfo[i].pubname = pubcell->val;
     623              :         else
     624           16 :             dbinfo[i].pubname = NULL;
     625           20 :         if (num_replslots > 0)
     626            3 :             dbinfo[i].replslotname = replslotcell->val;
     627              :         else
     628           17 :             dbinfo[i].replslotname = NULL;
     629           20 :         dbinfo[i].made_replslot = false;
     630           20 :         dbinfo[i].made_publication = false;
     631              :         /* Fill subscriber attributes */
     632           20 :         conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
     633           20 :         dbinfo[i].subconninfo = conninfo;
     634           20 :         if (num_subs > 0)
     635            2 :             dbinfo[i].subname = subcell->val;
     636              :         else
     637           18 :             dbinfo[i].subname = NULL;
     638              :         /* Other fields will be filled later */
     639              : 
     640           37 :         report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
     641              :                              "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
     642           20 :                              dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
     643            3 :                              dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
     644           20 :                              dbinfo[i].pubconninfo);
     645           40 :         report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
     646              :                              "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
     647            2 :                              dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
     648           20 :                              dbinfo[i].subconninfo,
     649           20 :                              dbinfos.two_phase ? "true" : "false");
     650              : 
     651           20 :         if (num_pubs > 0)
     652            4 :             pubcell = pubcell->next;
     653           20 :         if (num_subs > 0)
     654            2 :             subcell = subcell->next;
     655           20 :         if (num_replslots > 0)
     656            3 :             replslotcell = replslotcell->next;
     657              : 
     658           20 :         i++;
     659              :     }
     660              : 
     661           10 :     return dbinfo;
     662              : }
     663              : 
     664              : /*
     665              :  * Open a new connection. If exit_on_error is true, it has an undesired
     666              :  * condition and it should exit immediately.
     667              :  */
     668              : static PGconn *
     669           57 : connect_database(const char *conninfo, bool exit_on_error)
     670              : {
     671              :     PGconn     *conn;
     672              :     PGresult   *res;
     673              : 
     674           57 :     conn = PQconnectdb(conninfo);
     675           57 :     if (PQstatus(conn) != CONNECTION_OK)
     676              :     {
     677            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     678              :                              "connection to database failed: %s",
     679              :                              PQerrorMessage(conn));
     680            0 :         PQfinish(conn);
     681              : 
     682            0 :         if (exit_on_error)
     683            0 :             exit(1);
     684            0 :         return NULL;
     685              :     }
     686              : 
     687              :     /* Secure search_path */
     688           57 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     689           57 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     690              :     {
     691            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     692              :                              "could not clear \"search_path\": %s",
     693              :                              PQresultErrorMessage(res));
     694            0 :         PQclear(res);
     695            0 :         PQfinish(conn);
     696              : 
     697            0 :         if (exit_on_error)
     698            0 :             exit(1);
     699            0 :         return NULL;
     700              :     }
     701           57 :     PQclear(res);
     702              : 
     703           57 :     return conn;
     704              : }
     705              : 
     706              : /*
     707              :  * Close the connection. If exit_on_error is true, it has an undesired
     708              :  * condition and it should exit immediately.
     709              :  */
     710              : static void
     711           57 : disconnect_database(PGconn *conn, bool exit_on_error)
     712              : {
     713              :     Assert(conn != NULL);
     714              : 
     715           57 :     PQfinish(conn);
     716              : 
     717           57 :     if (exit_on_error)
     718            2 :         exit(1);
     719           55 : }
     720              : 
     721              : /*
     722              :  * Obtain the system identifier using the provided connection. It will be used
     723              :  * to compare if a data directory is a clone of another one.
     724              :  */
     725              : static uint64
     726           10 : get_primary_sysid(const char *conninfo)
     727              : {
     728              :     PGconn     *conn;
     729              :     PGresult   *res;
     730              :     uint64      sysid;
     731              : 
     732           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     733              :                          "getting system identifier from publisher");
     734              : 
     735           10 :     conn = connect_database(conninfo, true);
     736              : 
     737           10 :     res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
     738           10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     739              :     {
     740            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     741              :                              "could not get system identifier: %s",
     742              :                              PQresultErrorMessage(res));
     743            0 :         disconnect_database(conn, true);
     744              :     }
     745           10 :     if (PQntuples(res) != 1)
     746              :     {
     747            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     748              :                              "could not get system identifier: got %d rows, expected %d row",
     749              :                              PQntuples(res), 1);
     750            0 :         disconnect_database(conn, true);
     751              :     }
     752              : 
     753           10 :     sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
     754              : 
     755           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     756              :                          "system identifier is %" PRIu64 " on publisher", sysid);
     757              : 
     758           10 :     PQclear(res);
     759           10 :     disconnect_database(conn, false);
     760              : 
     761           10 :     return sysid;
     762              : }
     763              : 
     764              : /*
     765              :  * Obtain the system identifier from control file. It will be used to compare
     766              :  * if a data directory is a clone of another one. This routine is used locally
     767              :  * and avoids a connection.
     768              :  */
     769              : static uint64
     770           10 : get_standby_sysid(const char *datadir)
     771              : {
     772              :     ControlFileData *cf;
     773              :     bool        crc_ok;
     774              :     uint64      sysid;
     775              : 
     776           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     777              :                          "getting system identifier from subscriber");
     778              : 
     779           10 :     cf = get_controlfile(datadir, &crc_ok);
     780           10 :     if (!crc_ok)
     781            0 :         report_createsub_fatal("control file appears to be corrupt");
     782              : 
     783           10 :     sysid = cf->system_identifier;
     784              : 
     785           10 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     786              :                          "system identifier is %" PRIu64 " on subscriber", sysid);
     787              : 
     788           10 :     pg_free(cf);
     789              : 
     790           10 :     return sysid;
     791              : }
     792              : 
     793              : /*
     794              :  * Modify the system identifier. Since a standby server preserves the system
     795              :  * identifier, it makes sense to change it to avoid situations in which WAL
     796              :  * files from one of the systems might be used in the other one.
     797              :  */
     798              : static void
     799            4 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
     800              : {
     801              :     ControlFileData *cf;
     802              :     bool        crc_ok;
     803              :     struct timeval tv;
     804              : 
     805              :     char       *out_file;
     806              :     char       *cmd_str;
     807              : 
     808            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     809              :                          "modifying system identifier of subscriber");
     810              : 
     811            4 :     cf = get_controlfile(subscriber_dir, &crc_ok);
     812            4 :     if (!crc_ok)
     813            0 :         report_createsub_fatal("control file appears to be corrupt");
     814              : 
     815              :     /*
     816              :      * Select a new system identifier.
     817              :      *
     818              :      * XXX this code was extracted from BootStrapXLOG().
     819              :      */
     820            4 :     gettimeofday(&tv, NULL);
     821            4 :     cf->system_identifier = ((uint64) tv.tv_sec) << 32;
     822            4 :     cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
     823            4 :     cf->system_identifier |= getpid() & 0xFFF;
     824              : 
     825            4 :     if (dry_run)
     826            3 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     827              :                              "dry-run: would set system identifier to %" PRIu64 " on subscriber",
     828              :                              cf->system_identifier);
     829              :     else
     830              :     {
     831            1 :         update_controlfile(subscriber_dir, cf, true);
     832            1 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     833              :                              "system identifier is %" PRIu64 " on subscriber",
     834              :                              cf->system_identifier);
     835              :     }
     836              : 
     837            4 :     if (dry_run)
     838            3 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     839              :                              "dry-run: would run pg_resetwal on the subscriber");
     840              :     else
     841            1 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     842              :                              "running pg_resetwal on the subscriber");
     843              : 
     844              :     /*
     845              :      * Redirecting the output to the logfile if specified. Since the output
     846              :      * would be very short, around one line, we do not provide a separate file
     847              :      * for it; it's done as a part of the server log.
     848              :      */
     849            4 :     if (opt->log_dir)
     850            1 :         out_file = psprintf("%s/%s", logdir, SERVER_LOG_FILE_NAME);
     851              :     else
     852            3 :         out_file = DEVNULL;
     853              : 
     854            4 :     cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
     855              :                        subscriber_dir, out_file);
     856            4 :     if (opt->log_dir)
     857            1 :         pg_free(out_file);
     858              : 
     859            4 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
     860              :                          "pg_resetwal command is: %s", cmd_str);
     861              : 
     862            4 :     if (!dry_run)
     863              :     {
     864            1 :         int         rc = system(cmd_str);
     865              : 
     866            1 :         if (rc == 0)
     867            1 :             report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
     868              :                                  "successfully reset WAL on the subscriber");
     869              :         else
     870            0 :             report_createsub_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
     871              :     }
     872              : 
     873            4 :     pg_free(cf);
     874            4 :     pg_free(cmd_str);
     875            4 : }
     876              : 
     877              : /*
     878              :  * Generate an object name using a prefix, database oid and a random integer.
     879              :  * It is used in case the user does not specify an object name (publication,
     880              :  * subscription, replication slot).
     881              :  */
     882              : static char *
     883            8 : generate_object_name(PGconn *conn)
     884              : {
     885              :     PGresult   *res;
     886              :     Oid         oid;
     887              :     uint32      rand;
     888              :     char       *objname;
     889              : 
     890            8 :     res = PQexec(conn,
     891              :                  "SELECT oid FROM pg_catalog.pg_database "
     892              :                  "WHERE datname = pg_catalog.current_database()");
     893            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     894              :     {
     895            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     896              :                              "could not obtain database OID: %s",
     897              :                              PQresultErrorMessage(res));
     898            0 :         disconnect_database(conn, true);
     899              :     }
     900              : 
     901            8 :     if (PQntuples(res) != 1)
     902              :     {
     903            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     904              :                              "could not obtain database OID: got %d rows, expected %d row",
     905              :                              PQntuples(res), 1);
     906            0 :         disconnect_database(conn, true);
     907              :     }
     908              : 
     909              :     /* Database OID */
     910            8 :     oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
     911              : 
     912            8 :     PQclear(res);
     913              : 
     914              :     /* Random unsigned integer */
     915            8 :     rand = pg_prng_uint32(&prng_state);
     916              : 
     917              :     /*
     918              :      * Build the object name. The name must not exceed NAMEDATALEN - 1. This
     919              :      * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
     920              :      * '\0').
     921              :      */
     922            8 :     objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
     923              : 
     924            8 :     return objname;
     925              : }
     926              : 
     927              : /*
     928              :  * Does the publication exist in the specified database?
     929              :  */
     930              : static bool
     931            8 : find_publication(PGconn *conn, const char *pubname, const char *dbname)
     932              : {
     933            8 :     PQExpBuffer str = createPQExpBuffer();
     934              :     PGresult   *res;
     935            8 :     bool        found = false;
     936            8 :     char       *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
     937              : 
     938            8 :     appendPQExpBuffer(str,
     939              :                       "SELECT 1 FROM pg_catalog.pg_publication "
     940              :                       "WHERE pubname = %s",
     941              :                       pubname_esc);
     942            8 :     res = PQexec(conn, str->data);
     943            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     944              :     {
     945            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
     946              :                              "could not find publication \"%s\" in database \"%s\": %s",
     947              :                              pubname, dbname, PQerrorMessage(conn));
     948            0 :         disconnect_database(conn, true);
     949              :     }
     950              : 
     951            8 :     if (PQntuples(res) == 1)
     952            1 :         found = true;
     953              : 
     954            8 :     PQclear(res);
     955            8 :     PQfreemem(pubname_esc);
     956            8 :     destroyPQExpBuffer(str);
     957              : 
     958            8 :     return found;
     959              : }
     960              : 
     961              : /*
     962              :  * Create the publications and replication slots in preparation for logical
     963              :  * replication. Returns the LSN from latest replication slot. It will be the
     964              :  * replication start point that is used to adjust the subscriptions (see
     965              :  * set_replication_progress).
     966              :  */
     967              : static char *
     968            4 : setup_publisher(struct LogicalRepInfo *dbinfo)
     969              : {
     970            4 :     char       *lsn = NULL;
     971              : 
     972            4 :     pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
     973              : 
     974           12 :     for (int i = 0; i < num_dbs; i++)
     975              :     {
     976              :         PGconn     *conn;
     977            8 :         char       *genname = NULL;
     978              : 
     979            8 :         conn = connect_database(dbinfo[i].pubconninfo, true);
     980              : 
     981              :         /*
     982              :          * If an object name was not specified as command-line options, assign
     983              :          * a generated object name. The replication slot has a different rule.
     984              :          * The subscription name is assigned to the replication slot name if
     985              :          * no replication slot is specified. It follows the same rule as
     986              :          * CREATE SUBSCRIPTION.
     987              :          */
     988            8 :         if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
     989            8 :             genname = generate_object_name(conn);
     990            8 :         if (num_pubs == 0)
     991            4 :             dbinfo[i].pubname = pg_strdup(genname);
     992            8 :         if (num_subs == 0)
     993            6 :             dbinfo[i].subname = pg_strdup(genname);
     994            8 :         if (num_replslots == 0)
     995            5 :             dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
     996              : 
     997            8 :         if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
     998              :         {
     999              :             /* Reuse existing publication on publisher. */
    1000            1 :             report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1001              :                                  "use existing publication \"%s\" in database \"%s\"",
    1002            1 :                                  dbinfo[i].pubname, dbinfo[i].dbname);
    1003              :             /* Don't remove pre-existing publication if an error occurs. */
    1004            1 :             dbinfo[i].made_publication = false;
    1005              :         }
    1006              :         else
    1007              :         {
    1008              :             /*
    1009              :              * Create publication on publisher. This step should be executed
    1010              :              * *before* promoting the subscriber to avoid any transactions
    1011              :              * between consistent LSN and the new publication rows (such
    1012              :              * transactions wouldn't see the new publication rows resulting in
    1013              :              * an error).
    1014              :              */
    1015            7 :             create_publication(conn, &dbinfo[i]);
    1016              :         }
    1017              : 
    1018              :         /* Create replication slot on publisher */
    1019            8 :         if (lsn)
    1020            1 :             pg_free(lsn);
    1021            8 :         lsn = create_logical_replication_slot(conn, &dbinfo[i]);
    1022            8 :         if (lsn == NULL && !dry_run)
    1023            0 :             exit(1);
    1024              : 
    1025              :         /*
    1026              :          * Since we are using the LSN returned by the last replication slot as
    1027              :          * recovery_target_lsn, this LSN is ahead of the current WAL position
    1028              :          * and the recovery waits until the publisher writes a WAL record to
    1029              :          * reach the target and ends the recovery. On idle systems, this wait
    1030              :          * time is unpredictable and could lead to failure in promoting the
    1031              :          * subscriber. To avoid that, insert a harmless WAL record.
    1032              :          */
    1033            8 :         if (i == num_dbs - 1 && !dry_run)
    1034              :         {
    1035              :             PGresult   *res;
    1036              : 
    1037            1 :             res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
    1038            1 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1039              :             {
    1040            0 :                 report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1041              :                                      "could not write an additional WAL record: %s",
    1042              :                                      PQresultErrorMessage(res));
    1043            0 :                 disconnect_database(conn, true);
    1044              :             }
    1045            1 :             PQclear(res);
    1046              :         }
    1047              : 
    1048            8 :         disconnect_database(conn, false);
    1049              :     }
    1050              : 
    1051            4 :     return lsn;
    1052              : }
    1053              : 
    1054              : /*
    1055              :  * Is recovery still in progress?
    1056              :  */
    1057              : static bool
    1058           15 : server_is_in_recovery(PGconn *conn)
    1059              : {
    1060              :     PGresult   *res;
    1061              :     int         ret;
    1062              : 
    1063           15 :     res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
    1064              : 
    1065           15 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1066              :     {
    1067            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1068              :                              "could not obtain recovery progress: %s",
    1069              :                              PQresultErrorMessage(res));
    1070            0 :         disconnect_database(conn, true);
    1071              :     }
    1072              : 
    1073              : 
    1074           15 :     ret = strcmp("t", PQgetvalue(res, 0, 0));
    1075              : 
    1076           15 :     PQclear(res);
    1077              : 
    1078           15 :     return ret == 0;
    1079              : }
    1080              : 
    1081              : static void
    1082           77 : internal_log_file_write(enum pg_log_level level, enum pg_log_part part,
    1083              :                         const char *pg_restrict fmt, va_list args)
    1084              : {
    1085              :     Assert(internal_log_file_fp);
    1086              : 
    1087              :     /* Do nothing if log level is too low. */
    1088           77 :     if (level < __pg_log_level)
    1089           37 :         return;
    1090              : 
    1091              :     /* Add prefix based on the log part and log level */
    1092           40 :     switch (part)
    1093              :     {
    1094           39 :         case PG_LOG_PRIMARY:
    1095           39 :             switch (level)
    1096              :             {
    1097            0 :                 case PG_LOG_ERROR:
    1098            0 :                     fprintf(internal_log_file_fp, _("error: "));
    1099            0 :                     break;
    1100            0 :                 case PG_LOG_WARNING:
    1101            0 :                     fprintf(internal_log_file_fp, _("warning: "));
    1102            0 :                     break;
    1103           39 :                 default:
    1104           39 :                     break;
    1105              :             }
    1106           39 :             break;
    1107            0 :         case PG_LOG_DETAIL:
    1108            0 :             fprintf(internal_log_file_fp, _("detail: "));
    1109            0 :             break;
    1110            1 :         case PG_LOG_HINT:
    1111            1 :             fprintf(internal_log_file_fp, _("hint: "));
    1112            1 :             break;
    1113              :     }
    1114              : 
    1115           40 :     vfprintf(internal_log_file_fp, _(fmt), args);
    1116              : 
    1117           40 :     fprintf(internal_log_file_fp, "\n");
    1118           40 :     fflush(internal_log_file_fp);
    1119              : }
    1120              : 
    1121              : /*
    1122              :  * Open a new logfile with proper permissions.
    1123              :  */
    1124              : static FILE *
    1125            1 : logfile_open(const char *filename, const char *mode)
    1126              : {
    1127              :     FILE       *fh;
    1128              : 
    1129            1 :     fh = fopen(filename, mode);
    1130              : 
    1131            1 :     if (!fh)
    1132            0 :         report_createsub_fatal("could not open log file \"%s\": %m",
    1133              :                                filename);
    1134              : 
    1135            1 :     return fh;
    1136              : }
    1137              : 
    1138              : static void
    1139            1 : make_output_dirs(const char *log_basedir)
    1140              : {
    1141              :     char        timestamp[128];
    1142              :     struct timeval tval;
    1143              :     time_t      now;
    1144              :     struct tm   tmbuf;
    1145              :     int         len;
    1146              : 
    1147              :     /* Generate timestamp */
    1148            1 :     gettimeofday(&tval, NULL);
    1149            1 :     now = tval.tv_sec;
    1150              : 
    1151            1 :     strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
    1152            1 :              localtime_r(&now, &tmbuf));
    1153              : 
    1154              :     /* Append milliseconds */
    1155            1 :     snprintf(timestamp + strlen(timestamp),
    1156            1 :              sizeof(timestamp) - strlen(timestamp), ".%03u",
    1157            1 :              (unsigned int) (tval.tv_usec / 1000));
    1158              : 
    1159              :     /* Build timestamp directory path */
    1160            1 :     len = snprintf(logdir, MAXPGPATH, "%s/%s", log_basedir, timestamp);
    1161              : 
    1162            1 :     if (len >= MAXPGPATH)
    1163            0 :         report_createsub_fatal("directory path for log files is too long");
    1164              : 
    1165              :     /* Create base directory (ignore if exists) */
    1166            1 :     if (mkdir(log_basedir, pg_dir_create_mode) < 0 && errno != EEXIST)
    1167            0 :         report_createsub_fatal("could not create directory \"%s\": %m", log_basedir);
    1168              : 
    1169              :     /* Create a timestamp-named subdirectory under the base directory */
    1170            1 :     if (mkdir(logdir, pg_dir_create_mode) < 0)
    1171            0 :         report_createsub_fatal("could not create directory \"%s\": %m", logdir);
    1172            1 : }
    1173              : 
    1174              : /*
    1175              :  * Is the primary server ready for logical replication?
    1176              :  *
    1177              :  * XXX Does it not allow a synchronous replica?
    1178              :  */
    1179              : static void
    1180            6 : check_publisher(const struct LogicalRepInfo *dbinfo)
    1181              : {
    1182              :     PGconn     *conn;
    1183              :     PGresult   *res;
    1184            6 :     bool        failed = false;
    1185              : 
    1186              :     char       *wal_level;
    1187              :     int         max_repslots;
    1188              :     int         cur_repslots;
    1189              :     int         max_walsenders;
    1190              :     int         cur_walsenders;
    1191              :     int         max_prepared_transactions;
    1192              :     char       *max_slot_wal_keep_size;
    1193              : 
    1194            6 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1195              :                          "checking settings on publisher");
    1196              : 
    1197            6 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1198              : 
    1199              :     /*
    1200              :      * If the primary server is in recovery (i.e. cascading replication),
    1201              :      * objects (publication) cannot be created because it is read only.
    1202              :      */
    1203            6 :     if (server_is_in_recovery(conn))
    1204              :     {
    1205            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1206              :                              "primary server cannot be in recovery");
    1207            1 :         disconnect_database(conn, true);
    1208              :     }
    1209              : 
    1210              :     /*------------------------------------------------------------------------
    1211              :      * Logical replication requires a few parameters to be set on publisher.
    1212              :      * Since these parameters are not a requirement for physical replication,
    1213              :      * we should check it to make sure it won't fail.
    1214              :      *
    1215              :      * - wal_level >= replica
    1216              :      * - max_replication_slots >= current + number of dbs to be converted
    1217              :      * - max_wal_senders >= current + number of dbs to be converted
    1218              :      * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
    1219              :      * -----------------------------------------------------------------------
    1220              :      */
    1221            5 :     res = PQexec(conn,
    1222              :                  "SELECT pg_catalog.current_setting('wal_level'),"
    1223              :                  " pg_catalog.current_setting('max_replication_slots'),"
    1224              :                  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
    1225              :                  " pg_catalog.current_setting('max_wal_senders'),"
    1226              :                  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
    1227              :                  " pg_catalog.current_setting('max_prepared_transactions'),"
    1228              :                  " pg_catalog.current_setting('max_slot_wal_keep_size')");
    1229              : 
    1230            5 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1231              :     {
    1232            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1233              :                              "could not obtain publisher settings: %s",
    1234              :                              PQresultErrorMessage(res));
    1235            0 :         disconnect_database(conn, true);
    1236              :     }
    1237              : 
    1238            5 :     wal_level = pg_strdup(PQgetvalue(res, 0, 0));
    1239            5 :     max_repslots = atoi(PQgetvalue(res, 0, 1));
    1240            5 :     cur_repslots = atoi(PQgetvalue(res, 0, 2));
    1241            5 :     max_walsenders = atoi(PQgetvalue(res, 0, 3));
    1242            5 :     cur_walsenders = atoi(PQgetvalue(res, 0, 4));
    1243            5 :     max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
    1244            5 :     max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
    1245              : 
    1246            5 :     PQclear(res);
    1247              : 
    1248            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1249              :                          "publisher: wal_level: %s", wal_level);
    1250            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1251              :                          "publisher: max_replication_slots: %d", max_repslots);
    1252            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1253              :                          "publisher: current replication slots: %d", cur_repslots);
    1254            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1255              :                          "publisher: max_wal_senders: %d", max_walsenders);
    1256            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1257              :                          "publisher: current wal senders: %d", cur_walsenders);
    1258            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1259              :                          "publisher: max_prepared_transactions: %d",
    1260              :                          max_prepared_transactions);
    1261            5 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1262              :                          "publisher: max_slot_wal_keep_size: %s",
    1263              :                          max_slot_wal_keep_size);
    1264              : 
    1265            5 :     disconnect_database(conn, false);
    1266              : 
    1267            5 :     if (strcmp(wal_level, "minimal") == 0)
    1268              :     {
    1269            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1270              :                              "publisher requires \"wal_level\" >= \"replica\"");
    1271            0 :         failed = true;
    1272              :     }
    1273              : 
    1274            5 :     if (max_repslots - cur_repslots < num_dbs)
    1275              :     {
    1276            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1277              :                              "publisher requires %d replication slots, but only %d remain",
    1278              :                              num_dbs, max_repslots - cur_repslots);
    1279            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1280              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1281              :                              "max_replication_slots", cur_repslots + num_dbs);
    1282            1 :         failed = true;
    1283              :     }
    1284              : 
    1285            5 :     if (max_walsenders - cur_walsenders < num_dbs)
    1286              :     {
    1287            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1288              :                              "publisher requires %d WAL sender processes, but only %d remain",
    1289              :                              num_dbs, max_walsenders - cur_walsenders);
    1290            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1291              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1292              :                              "max_wal_senders", cur_walsenders + num_dbs);
    1293            1 :         failed = true;
    1294              :     }
    1295              : 
    1296            5 :     if (max_prepared_transactions != 0 && !dbinfos.two_phase)
    1297              :     {
    1298            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1299              :                              "two_phase option will not be enabled for replication slots");
    1300            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_DETAIL,
    1301              :                              "Subscriptions will be created with the two_phase option disabled.  "
    1302              :                              "Prepared transactions will be replicated at COMMIT PREPARED.");
    1303            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1304              :                              "You can use the command-line option --enable-two-phase to enable two_phase.");
    1305              :     }
    1306              : 
    1307              :     /*
    1308              :      * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
    1309              :      * is set to a non-default value, it may cause replication failures due to
    1310              :      * required WAL files being prematurely removed.
    1311              :      */
    1312            5 :     if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
    1313              :     {
    1314            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1315              :                              "required WAL could be removed from the publisher");
    1316            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1317              :                              "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
    1318              :                              "max_slot_wal_keep_size");
    1319              :     }
    1320              : 
    1321            5 :     pg_free(wal_level);
    1322              : 
    1323            5 :     if (failed)
    1324            1 :         exit(1);
    1325            4 : }
    1326              : 
    1327              : /*
    1328              :  * Is the standby server ready for logical replication?
    1329              :  *
    1330              :  * XXX Does it not allow a time-delayed replica?
    1331              :  *
    1332              :  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
    1333              :  * is S, it cannot detect there is a replica (server C) because server S starts
    1334              :  * accepting only local connections and server C cannot connect to it. Hence,
    1335              :  * there is not a reliable way to provide a suitable error saying the server C
    1336              :  * will be broken at the end of this process (due to pg_resetwal).
    1337              :  */
    1338              : static void
    1339            8 : check_subscriber(const struct LogicalRepInfo *dbinfo)
    1340              : {
    1341              :     PGconn     *conn;
    1342              :     PGresult   *res;
    1343            8 :     bool        failed = false;
    1344              : 
    1345              :     int         max_lrworkers;
    1346              :     int         max_replorigins;
    1347              :     int         max_wprocs;
    1348              : 
    1349            8 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1350              :                          "checking settings on subscriber");
    1351              : 
    1352            8 :     conn = connect_database(dbinfo[0].subconninfo, true);
    1353              : 
    1354              :     /* The target server must be a standby */
    1355            8 :     if (!server_is_in_recovery(conn))
    1356              :     {
    1357            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1358              :                              "target server must be a standby");
    1359            1 :         disconnect_database(conn, true);
    1360              :     }
    1361              : 
    1362              :     /*------------------------------------------------------------------------
    1363              :      * Logical replication requires a few parameters to be set on subscriber.
    1364              :      * Since these parameters are not a requirement for physical replication,
    1365              :      * we should check it to make sure it won't fail.
    1366              :      *
    1367              :      * - max_active_replication_origins >= number of dbs to be converted
    1368              :      * - max_logical_replication_workers >= number of dbs to be converted
    1369              :      * - max_worker_processes >= 1 + number of dbs to be converted
    1370              :      *------------------------------------------------------------------------
    1371              :      */
    1372            7 :     res = PQexec(conn,
    1373              :                  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
    1374              :                  "'max_logical_replication_workers', "
    1375              :                  "'max_active_replication_origins', "
    1376              :                  "'max_worker_processes', "
    1377              :                  "'primary_slot_name') "
    1378              :                  "ORDER BY name");
    1379              : 
    1380            7 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1381              :     {
    1382            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1383              :                              "could not obtain subscriber settings: %s",
    1384              :                              PQresultErrorMessage(res));
    1385            0 :         disconnect_database(conn, true);
    1386              :     }
    1387              : 
    1388            7 :     max_replorigins = atoi(PQgetvalue(res, 0, 0));
    1389            7 :     max_lrworkers = atoi(PQgetvalue(res, 1, 0));
    1390            7 :     max_wprocs = atoi(PQgetvalue(res, 2, 0));
    1391            7 :     if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
    1392            6 :         primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
    1393              : 
    1394            7 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1395              :                          "subscriber: max_logical_replication_workers: %d",
    1396              :                          max_lrworkers);
    1397            7 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1398              :                          "subscriber: max_active_replication_origins: %d", max_replorigins);
    1399            7 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1400              :                          "subscriber: max_worker_processes: %d", max_wprocs);
    1401            7 :     if (primary_slot_name)
    1402            6 :         report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1403              :                              "subscriber: primary_slot_name: %s", primary_slot_name);
    1404              : 
    1405            7 :     PQclear(res);
    1406              : 
    1407            7 :     disconnect_database(conn, false);
    1408              : 
    1409            7 :     if (max_replorigins < num_dbs)
    1410              :     {
    1411            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1412              :                              "subscriber requires %d active replication origins, but only %d remain",
    1413              :                              num_dbs, max_replorigins);
    1414            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1415              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1416              :                              "max_active_replication_origins", num_dbs);
    1417            1 :         failed = true;
    1418              :     }
    1419              : 
    1420            7 :     if (max_lrworkers < num_dbs)
    1421              :     {
    1422            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1423              :                              "subscriber requires %d logical replication workers, but only %d remain",
    1424              :                              num_dbs, max_lrworkers);
    1425            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1426              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1427              :                              "max_logical_replication_workers", num_dbs);
    1428            1 :         failed = true;
    1429              :     }
    1430              : 
    1431            7 :     if (max_wprocs < num_dbs + 1)
    1432              :     {
    1433            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1434              :                              "subscriber requires %d worker processes, but only %d remain",
    1435              :                              num_dbs + 1, max_wprocs);
    1436            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    1437              :                              "Increase the configuration parameter \"%s\" to at least %d.",
    1438              :                              "max_worker_processes", num_dbs + 1);
    1439            1 :         failed = true;
    1440              :     }
    1441              : 
    1442            7 :     if (failed)
    1443            1 :         exit(1);
    1444            6 : }
    1445              : 
    1446              : /*
    1447              :  * Drop a specified subscription. This is to avoid duplicate subscriptions on
    1448              :  * the primary (publisher node) and the newly created subscriber. We
    1449              :  * shouldn't drop the associated slot as that would be used by the publisher
    1450              :  * node.
    1451              :  */
    1452              : static void
    1453            4 : drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
    1454              : {
    1455            4 :     PQExpBuffer query = createPQExpBuffer();
    1456              :     PGresult   *res;
    1457              : 
    1458              :     Assert(conn != NULL);
    1459              : 
    1460              :     /*
    1461              :      * Construct a query string. These commands are allowed to be executed
    1462              :      * within a transaction.
    1463              :      */
    1464            4 :     appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
    1465              :                       subname);
    1466            4 :     appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
    1467              :                       subname);
    1468            4 :     appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
    1469              : 
    1470            4 :     if (dry_run)
    1471            3 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1472              :                              "dry-run: would drop subscription \"%s\" in database \"%s\"",
    1473              :                              subname, dbname);
    1474              :     else
    1475              :     {
    1476            1 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1477              :                              "dropping subscription \"%s\" in database \"%s\"",
    1478              :                              subname, dbname);
    1479              : 
    1480            1 :         res = PQexec(conn, query->data);
    1481              : 
    1482            1 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1483              :         {
    1484            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1485              :                                  "could not drop subscription \"%s\": %s",
    1486              :                                  subname, PQresultErrorMessage(res));
    1487            0 :             disconnect_database(conn, true);
    1488              :         }
    1489              : 
    1490            1 :         PQclear(res);
    1491              :     }
    1492              : 
    1493            4 :     destroyPQExpBuffer(query);
    1494            4 : }
    1495              : 
    1496              : /*
    1497              :  * Retrieve and drop the pre-existing subscriptions.
    1498              :  */
    1499              : static void
    1500            8 : check_and_drop_existing_subscriptions(PGconn *conn,
    1501              :                                       const struct LogicalRepInfo *dbinfo)
    1502              : {
    1503            8 :     PQExpBuffer query = createPQExpBuffer();
    1504              :     char       *dbname;
    1505              :     PGresult   *res;
    1506              : 
    1507              :     Assert(conn != NULL);
    1508              : 
    1509            8 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1510              : 
    1511            8 :     appendPQExpBuffer(query,
    1512              :                       "SELECT s.subname FROM pg_catalog.pg_subscription s "
    1513              :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1514              :                       "WHERE d.datname = %s",
    1515              :                       dbname);
    1516            8 :     res = PQexec(conn, query->data);
    1517              : 
    1518            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1519              :     {
    1520            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1521              :                              "could not obtain pre-existing subscriptions: %s",
    1522              :                              PQresultErrorMessage(res));
    1523            0 :         disconnect_database(conn, true);
    1524              :     }
    1525              : 
    1526           12 :     for (int i = 0; i < PQntuples(res); i++)
    1527            4 :         drop_existing_subscription(conn, PQgetvalue(res, i, 0),
    1528            4 :                                    dbinfo->dbname);
    1529              : 
    1530            8 :     PQclear(res);
    1531            8 :     destroyPQExpBuffer(query);
    1532            8 :     PQfreemem(dbname);
    1533            8 : }
    1534              : 
    1535              : /*
    1536              :  * Create the subscriptions, adjust the initial location for logical
    1537              :  * replication and enable the subscriptions. That's the last step for logical
    1538              :  * replication setup.
    1539              :  */
    1540              : static void
    1541            4 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
    1542              : {
    1543           12 :     for (int i = 0; i < num_dbs; i++)
    1544              :     {
    1545              :         PGconn     *conn;
    1546              : 
    1547              :         /* Connect to subscriber. */
    1548            8 :         conn = connect_database(dbinfo[i].subconninfo, true);
    1549              : 
    1550              :         /*
    1551              :          * We don't need the pre-existing subscriptions on the newly formed
    1552              :          * subscriber. They can connect to other publisher nodes and either
    1553              :          * get some unwarranted data or can lead to ERRORs in connecting to
    1554              :          * such nodes.
    1555              :          */
    1556            8 :         check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
    1557              : 
    1558              :         /* Check and drop the required publications in the given database. */
    1559            8 :         check_and_drop_publications(conn, &dbinfo[i]);
    1560              : 
    1561            8 :         create_subscription(conn, &dbinfo[i]);
    1562              : 
    1563              :         /* Set the replication progress to the correct LSN */
    1564            8 :         set_replication_progress(conn, &dbinfo[i], consistent_lsn);
    1565              : 
    1566              :         /* Enable subscription */
    1567            8 :         enable_subscription(conn, &dbinfo[i]);
    1568              : 
    1569            8 :         disconnect_database(conn, false);
    1570              :     }
    1571            4 : }
    1572              : 
    1573              : /*
    1574              :  * Write the required recovery parameters.
    1575              :  */
    1576              : static void
    1577            4 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
    1578              : {
    1579              :     PGconn     *conn;
    1580              :     PQExpBuffer recoveryconfcontents;
    1581              : 
    1582              :     /*
    1583              :      * Despite of the recovery parameters will be written to the subscriber,
    1584              :      * use a publisher connection. The primary_conninfo is generated using the
    1585              :      * connection settings.
    1586              :      */
    1587            4 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1588              : 
    1589              :     /*
    1590              :      * Write recovery parameters.
    1591              :      *
    1592              :      * The subscriber is not running yet. In dry run mode, the recovery
    1593              :      * parameters *won't* be written. An invalid LSN is used for printing
    1594              :      * purposes. Additional recovery parameters are added here. It avoids
    1595              :      * unexpected behavior such as end of recovery as soon as a consistent
    1596              :      * state is reached (recovery_target) and failure due to multiple recovery
    1597              :      * targets (name, time, xid, LSN).
    1598              :      */
    1599            4 :     recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
    1600            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
    1601            4 :     appendPQExpBufferStr(recoveryconfcontents,
    1602              :                          "recovery_target_timeline = 'latest'\n");
    1603              : 
    1604              :     /*
    1605              :      * Set recovery_target_inclusive = false to avoid reapplying the
    1606              :      * transaction committed at 'lsn' after subscription is enabled. This is
    1607              :      * because the provided 'lsn' is also used as the replication start point
    1608              :      * for the subscription. So, the server can send the transaction committed
    1609              :      * at that 'lsn' after replication is started which can lead to applying
    1610              :      * the same transaction twice if we keep recovery_target_inclusive = true.
    1611              :      */
    1612            4 :     appendPQExpBufferStr(recoveryconfcontents,
    1613              :                          "recovery_target_inclusive = false\n");
    1614            4 :     appendPQExpBufferStr(recoveryconfcontents,
    1615              :                          "recovery_target_action = promote\n");
    1616            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
    1617            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
    1618            4 :     appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
    1619              : 
    1620            4 :     if (dry_run)
    1621              :     {
    1622            3 :         appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
    1623            3 :         appendPQExpBuffer(recoveryconfcontents,
    1624              :                           "recovery_target_lsn = '%X/%08X'\n",
    1625              :                           LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1626              :     }
    1627              :     else
    1628              :     {
    1629            1 :         appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
    1630              :                           lsn);
    1631              :     }
    1632              : 
    1633            4 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1634              :                          "recovery parameters:\n%s", recoveryconfcontents->data);
    1635              : 
    1636            4 :     if (!dry_run)
    1637              :     {
    1638              :         char        conf_filename[MAXPGPATH];
    1639              :         FILE       *fd;
    1640              : 
    1641              :         /* Write the recovery parameters to INCLUDED_CONF_FILE */
    1642            1 :         snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
    1643              :                  INCLUDED_CONF_FILE);
    1644            1 :         fd = fopen(conf_filename, "w");
    1645            1 :         if (fd == NULL)
    1646            0 :             report_createsub_fatal("could not open file \"%s\": %m", conf_filename);
    1647              : 
    1648            1 :         if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
    1649            0 :             report_createsub_fatal("could not write to file \"%s\": %m", conf_filename);
    1650              : 
    1651            1 :         fclose(fd);
    1652            1 :         recovery_params_set = true;
    1653              : 
    1654              :         /* Include conditionally the recovery parameters. */
    1655            1 :         resetPQExpBuffer(recoveryconfcontents);
    1656            1 :         appendPQExpBufferStr(recoveryconfcontents,
    1657              :                              "include_if_exists '" INCLUDED_CONF_FILE "'\n");
    1658            1 :         WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
    1659              :     }
    1660              : 
    1661            4 :     disconnect_database(conn, false);
    1662            4 : }
    1663              : 
    1664              : /*
    1665              :  * Drop physical replication slot on primary if the standby was using it. After
    1666              :  * the transformation, it has no use.
    1667              :  *
    1668              :  * XXX we might not fail here. Instead, we provide a warning so the user
    1669              :  * eventually drops this replication slot later.
    1670              :  */
    1671              : static void
    1672            4 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
    1673              : {
    1674              :     PGconn     *conn;
    1675              : 
    1676              :     /* Replication slot does not exist, do nothing */
    1677            4 :     if (!primary_slot_name)
    1678            0 :         return;
    1679              : 
    1680            4 :     conn = connect_database(dbinfo[0].pubconninfo, false);
    1681            4 :     if (conn != NULL)
    1682              :     {
    1683            4 :         drop_replication_slot(conn, &dbinfo[0], slotname);
    1684            4 :         disconnect_database(conn, false);
    1685              :     }
    1686              :     else
    1687              :     {
    1688            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1689              :                              "could not drop replication slot \"%s\" on primary",
    1690              :                              slotname);
    1691            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1692              :                              "Drop this replication slot soon to avoid retention of WAL files.");
    1693              :     }
    1694              : }
    1695              : 
    1696              : /*
    1697              :  * Drop failover replication slots on subscriber. After the transformation,
    1698              :  * they have no use.
    1699              :  *
    1700              :  * XXX We do not fail here. Instead, we provide a warning so the user can drop
    1701              :  * them later.
    1702              :  */
    1703              : static void
    1704            4 : drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
    1705              : {
    1706              :     PGconn     *conn;
    1707              :     PGresult   *res;
    1708              : 
    1709            4 :     conn = connect_database(dbinfo[0].subconninfo, false);
    1710            4 :     if (conn != NULL)
    1711              :     {
    1712              :         /* Get failover replication slot names */
    1713            4 :         res = PQexec(conn,
    1714              :                      "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
    1715              : 
    1716            4 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
    1717              :         {
    1718              :             /* Remove failover replication slots from subscriber */
    1719            8 :             for (int i = 0; i < PQntuples(res); i++)
    1720            4 :                 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
    1721              :         }
    1722              :         else
    1723              :         {
    1724            0 :             report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1725              :                                  "could not obtain failover replication slot information: %s",
    1726              :                                  PQresultErrorMessage(res));
    1727            0 :             report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1728              :                                  "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1729              :         }
    1730              : 
    1731            4 :         PQclear(res);
    1732            4 :         disconnect_database(conn, false);
    1733              :     }
    1734              :     else
    1735              :     {
    1736            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
    1737              :                              "could not drop failover replication slot");
    1738            0 :         report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
    1739              :                              "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
    1740              :     }
    1741            4 : }
    1742              : 
    1743              : /*
    1744              :  * Create a logical replication slot and returns a LSN.
    1745              :  *
    1746              :  * CreateReplicationSlot() is not used because it does not provide the one-row
    1747              :  * result set that contains the LSN.
    1748              :  */
    1749              : static char *
    1750            8 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1751              : {
    1752            8 :     PQExpBuffer str = createPQExpBuffer();
    1753            8 :     PGresult   *res = NULL;
    1754            8 :     const char *slot_name = dbinfo->replslotname;
    1755              :     char       *slot_name_esc;
    1756            8 :     char       *lsn = NULL;
    1757              : 
    1758              :     Assert(conn != NULL);
    1759              : 
    1760            8 :     if (dry_run)
    1761            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1762              :                              "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
    1763              :                              slot_name, dbinfo->dbname);
    1764              :     else
    1765            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1766              :                              "creating the replication slot \"%s\" in database \"%s\" on publisher",
    1767              :                              slot_name, dbinfo->dbname);
    1768              : 
    1769            8 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1770              : 
    1771            8 :     appendPQExpBuffer(str,
    1772              :                       "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
    1773              :                       slot_name_esc,
    1774            8 :                       dbinfos.two_phase ? "true" : "false");
    1775              : 
    1776            8 :     PQfreemem(slot_name_esc);
    1777              : 
    1778            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1779              :                          "command is: %s", str->data);
    1780              : 
    1781            8 :     if (!dry_run)
    1782              :     {
    1783            2 :         res = PQexec(conn, str->data);
    1784            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1785              :         {
    1786            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1787              :                                  "could not create replication slot \"%s\" in database \"%s\": %s",
    1788              :                                  slot_name, dbinfo->dbname,
    1789              :                                  PQresultErrorMessage(res));
    1790            0 :             PQclear(res);
    1791            0 :             destroyPQExpBuffer(str);
    1792            0 :             return NULL;
    1793              :         }
    1794              : 
    1795            2 :         lsn = pg_strdup(PQgetvalue(res, 0, 0));
    1796            2 :         PQclear(res);
    1797              :     }
    1798              : 
    1799              :     /* For cleanup purposes */
    1800            8 :     dbinfo->made_replslot = true;
    1801              : 
    1802            8 :     destroyPQExpBuffer(str);
    1803              : 
    1804            8 :     return lsn;
    1805              : }
    1806              : 
    1807              : static void
    1808            8 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
    1809              :                       const char *slot_name)
    1810              : {
    1811            8 :     PQExpBuffer str = createPQExpBuffer();
    1812              :     char       *slot_name_esc;
    1813              :     PGresult   *res;
    1814              : 
    1815              :     Assert(conn != NULL);
    1816              : 
    1817            8 :     if (dry_run)
    1818            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1819              :                              "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
    1820              :                              slot_name, dbinfo->dbname);
    1821              :     else
    1822            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1823              :                              "dropping the replication slot \"%s\" in database \"%s\"",
    1824              :                              slot_name, dbinfo->dbname);
    1825              : 
    1826            8 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1827              : 
    1828            8 :     appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
    1829              : 
    1830            8 :     PQfreemem(slot_name_esc);
    1831              : 
    1832            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1833              :                          "command is: %s", str->data);
    1834              : 
    1835            8 :     if (!dry_run)
    1836              :     {
    1837            2 :         res = PQexec(conn, str->data);
    1838            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1839              :         {
    1840            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1841              :                                  "could not drop replication slot \"%s\" in database \"%s\": %s",
    1842              :                                  slot_name, dbinfo->dbname, PQresultErrorMessage(res));
    1843            0 :             dbinfo->made_replslot = false;   /* don't try again. */
    1844              :         }
    1845              : 
    1846            2 :         PQclear(res);
    1847              :     }
    1848              : 
    1849            8 :     destroyPQExpBuffer(str);
    1850            8 : }
    1851              : 
    1852              : /*
    1853              :  * Reports a suitable message if pg_ctl fails.
    1854              :  */
    1855              : static void
    1856           24 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
    1857              : {
    1858           24 :     if (rc != 0)
    1859              :     {
    1860            0 :         if (WIFEXITED(rc))
    1861              :         {
    1862            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1863              :                                  "pg_ctl failed with exit code %d",
    1864            0 :                                  WEXITSTATUS(rc));
    1865              :         }
    1866            0 :         else if (WIFSIGNALED(rc))
    1867              :         {
    1868              : #if defined(WIN32)
    1869              :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1870              :                                  "pg_ctl was terminated by exception 0x%X",
    1871              :                                  WTERMSIG(rc));
    1872              :             report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    1873              :                                  "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
    1874              : #else
    1875            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1876              :                                  "pg_ctl was terminated by signal %d: %s",
    1877              :                                  WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
    1878              : #endif
    1879              :         }
    1880              :         else
    1881              :         {
    1882            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1883              :                                  "pg_ctl exited with unrecognized status %d", rc);
    1884              :         }
    1885              : 
    1886            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    1887              :                              "The failed command was: %s", pg_ctl_cmd);
    1888            0 :         exit(1);
    1889              :     }
    1890           24 : }
    1891              : 
    1892              : static void
    1893           12 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
    1894              :                      bool restrict_logical_worker)
    1895              : {
    1896           12 :     PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    1897              :     int         rc;
    1898              : 
    1899           12 :     appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
    1900           12 :     appendShellString(pg_ctl_cmd, subscriber_dir);
    1901           12 :     appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
    1902              : 
    1903              :     /* Prevent unintended slot invalidation */
    1904           12 :     appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
    1905              : 
    1906           12 :     if (restricted_access)
    1907              :     {
    1908           12 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
    1909              : #if !defined(WIN32)
    1910              : 
    1911              :         /*
    1912              :          * An empty listen_addresses list means the server does not listen on
    1913              :          * any IP interfaces; only Unix-domain sockets can be used to connect
    1914              :          * to the server. Prevent external connections to minimize the chance
    1915              :          * of failure.
    1916              :          */
    1917           12 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
    1918           12 :         if (opt->socket_dir)
    1919           12 :             appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
    1920           12 :                               opt->socket_dir);
    1921           12 :         appendPQExpBufferChar(pg_ctl_cmd, '"');
    1922              : #endif
    1923              :     }
    1924           12 :     if (opt->config_file != NULL)
    1925            0 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
    1926            0 :                           opt->config_file);
    1927              : 
    1928              :     /* Suppress to start logical replication if requested */
    1929           12 :     if (restrict_logical_worker)
    1930            4 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
    1931              : 
    1932           12 :     if (opt->log_dir)
    1933            2 :         appendPQExpBuffer(pg_ctl_cmd, " -l \"%s/%s\"", logdir, SERVER_LOG_FILE_NAME);
    1934              : 
    1935           12 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1936              :                          "pg_ctl command is: %s", pg_ctl_cmd->data);
    1937           12 :     rc = system(pg_ctl_cmd->data);
    1938           12 :     pg_ctl_status(pg_ctl_cmd->data, rc);
    1939           12 :     standby_running = true;
    1940           12 :     destroyPQExpBuffer(pg_ctl_cmd);
    1941           12 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1942              :                          "server was started");
    1943           12 : }
    1944              : 
    1945              : static void
    1946           12 : stop_standby_server(const char *datadir)
    1947              : {
    1948              :     char       *pg_ctl_cmd;
    1949              :     int         rc;
    1950              : 
    1951           12 :     pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
    1952              :                           datadir);
    1953           12 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    1954              :                          "pg_ctl command is: %s", pg_ctl_cmd);
    1955           12 :     rc = system(pg_ctl_cmd);
    1956           12 :     pg_ctl_status(pg_ctl_cmd, rc);
    1957           12 :     standby_running = false;
    1958           12 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1959              :                          "server was stopped");
    1960           12 : }
    1961              : 
    1962              : /*
    1963              :  * Returns after the server finishes the recovery process.
    1964              :  *
    1965              :  * If recovery_timeout option is set, terminate abnormally without finishing
    1966              :  * the recovery process. By default, it waits forever.
    1967              :  *
    1968              :  * XXX Is the recovery process still in progress? When recovery process has a
    1969              :  * better progress reporting mechanism, it should be added here.
    1970              :  */
    1971              : static void
    1972            4 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
    1973              : {
    1974              :     PGconn     *conn;
    1975            4 :     bool        ready = false;
    1976            4 :     int         timer = 0;
    1977              : 
    1978            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    1979              :                          "waiting for the target server to reach the consistent state");
    1980              : 
    1981            4 :     conn = connect_database(conninfo, true);
    1982              : 
    1983              :     for (;;)
    1984              :     {
    1985              :         /* Did the recovery process finish? We're done if so. */
    1986            4 :         if (dry_run || !server_is_in_recovery(conn))
    1987              :         {
    1988            4 :             ready = true;
    1989            4 :             recovery_ended = true;
    1990            4 :             break;
    1991              :         }
    1992              : 
    1993              :         /* Bail out after recovery_timeout seconds if this option is set */
    1994            0 :         if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
    1995              :         {
    1996            0 :             stop_standby_server(subscriber_dir);
    1997            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    1998              :                                  "recovery timed out");
    1999            0 :             disconnect_database(conn, true);
    2000              :         }
    2001              : 
    2002              :         /* Keep waiting */
    2003            0 :         pg_usleep(WAIT_INTERVAL * USECS_PER_SEC);
    2004            0 :         timer += WAIT_INTERVAL;
    2005              :     }
    2006              : 
    2007            4 :     disconnect_database(conn, false);
    2008              : 
    2009            4 :     if (!ready)
    2010            0 :         report_createsub_fatal("server did not end recovery");
    2011              : 
    2012            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2013              :                          "target server reached the consistent state");
    2014            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_HINT,
    2015              :                          "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
    2016            4 : }
    2017              : 
    2018              : /*
    2019              :  * Create a publication that includes all tables in the database.
    2020              :  */
    2021              : static void
    2022            7 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    2023              : {
    2024            7 :     PQExpBuffer str = createPQExpBuffer();
    2025              :     PGresult   *res;
    2026              :     char       *ipubname_esc;
    2027              :     char       *spubname_esc;
    2028              : 
    2029              :     Assert(conn != NULL);
    2030              : 
    2031            7 :     ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    2032            7 :     spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    2033              : 
    2034              :     /* Check if the publication already exists */
    2035            7 :     appendPQExpBuffer(str,
    2036              :                       "SELECT 1 FROM pg_catalog.pg_publication "
    2037              :                       "WHERE pubname = %s",
    2038              :                       spubname_esc);
    2039            7 :     res = PQexec(conn, str->data);
    2040            7 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2041              :     {
    2042            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2043              :                              "could not obtain publication information: %s",
    2044              :                              PQresultErrorMessage(res));
    2045            0 :         disconnect_database(conn, true);
    2046              :     }
    2047              : 
    2048            7 :     if (PQntuples(res) == 1)
    2049              :     {
    2050              :         /*
    2051              :          * Unfortunately, if it reaches this code path, it will always fail
    2052              :          * (unless you decide to change the existing publication name). That's
    2053              :          * bad but it is very unlikely that the user will choose a name with
    2054              :          * pg_createsubscriber_ prefix followed by the exact database oid and
    2055              :          * a random number.
    2056              :          */
    2057            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2058              :                              "publication \"%s\" already exists", dbinfo->pubname);
    2059            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2060              :                              "Consider renaming this publication before continuing.");
    2061            0 :         disconnect_database(conn, true);
    2062              :     }
    2063              : 
    2064            7 :     PQclear(res);
    2065            7 :     resetPQExpBuffer(str);
    2066              : 
    2067            7 :     if (dry_run)
    2068            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2069              :                              "dry-run: would create publication \"%s\" in database \"%s\"",
    2070              :                              dbinfo->pubname, dbinfo->dbname);
    2071              :     else
    2072            1 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2073              :                              "creating publication \"%s\" in database \"%s\"",
    2074              :                              dbinfo->pubname, dbinfo->dbname);
    2075              : 
    2076            7 :     appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
    2077              :                       ipubname_esc);
    2078              : 
    2079            7 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    2080              :                          "command is: %s", str->data);
    2081              : 
    2082            7 :     if (!dry_run)
    2083              :     {
    2084            1 :         res = PQexec(conn, str->data);
    2085            1 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2086              :         {
    2087            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2088              :                                  "could not create publication \"%s\" in database \"%s\": %s",
    2089              :                                  dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    2090            0 :             disconnect_database(conn, true);
    2091              :         }
    2092            1 :         PQclear(res);
    2093              :     }
    2094              : 
    2095              :     /* For cleanup purposes */
    2096            7 :     dbinfo->made_publication = true;
    2097              : 
    2098            7 :     PQfreemem(ipubname_esc);
    2099            7 :     PQfreemem(spubname_esc);
    2100            7 :     destroyPQExpBuffer(str);
    2101            7 : }
    2102              : 
    2103              : /*
    2104              :  * Drop the specified publication in the given database.
    2105              :  */
    2106              : static void
    2107           10 : drop_publication(PGconn *conn, const char *pubname, const char *dbname,
    2108              :                  bool *made_publication)
    2109              : {
    2110           10 :     PQExpBuffer str = createPQExpBuffer();
    2111              :     PGresult   *res;
    2112              :     char       *pubname_esc;
    2113              : 
    2114              :     Assert(conn != NULL);
    2115              : 
    2116           10 :     pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
    2117              : 
    2118           10 :     if (dry_run)
    2119            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2120              :                              "dry-run: would drop publication \"%s\" in database \"%s\"",
    2121              :                              pubname, dbname);
    2122              :     else
    2123            4 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2124              :                              "dropping publication \"%s\" in database \"%s\"",
    2125              :                              pubname, dbname);
    2126              : 
    2127           10 :     appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
    2128              : 
    2129           10 :     PQfreemem(pubname_esc);
    2130              : 
    2131           10 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    2132              :                          "command is: %s", str->data);
    2133              : 
    2134           10 :     if (!dry_run)
    2135              :     {
    2136            4 :         res = PQexec(conn, str->data);
    2137            4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2138              :         {
    2139            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2140              :                                  "could not drop publication \"%s\" in database \"%s\": %s",
    2141              :                                  pubname, dbname, PQresultErrorMessage(res));
    2142            0 :             *made_publication = false;  /* don't try again. */
    2143              : 
    2144              :             /*
    2145              :              * Don't disconnect and exit here. This routine is used by primary
    2146              :              * (cleanup publication / replication slot due to an error) and
    2147              :              * subscriber (remove the replicated publications). In both cases,
    2148              :              * it can continue and provide instructions for the user to remove
    2149              :              * it later if cleanup fails.
    2150              :              */
    2151              :         }
    2152            4 :         PQclear(res);
    2153              :     }
    2154              : 
    2155           10 :     destroyPQExpBuffer(str);
    2156           10 : }
    2157              : 
    2158              : /*
    2159              :  * Retrieve and drop the publications.
    2160              :  *
    2161              :  * Publications copied during physical replication remain on the subscriber
    2162              :  * after promotion. If --clean=publications is specified, drop all existing
    2163              :  * publications in the subscriber database. Otherwise, only drop publications
    2164              :  * that were created by pg_createsubscriber during this operation.
    2165              :  */
    2166              : static void
    2167            8 : check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
    2168              : {
    2169              :     PGresult   *res;
    2170            8 :     bool        drop_all_pubs = dbinfos.objecttypes_to_clean & OBJECTTYPE_PUBLICATIONS;
    2171              : 
    2172              :     Assert(conn != NULL);
    2173              : 
    2174            8 :     if (drop_all_pubs)
    2175              :     {
    2176            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2177              :                              "dropping all existing publications in database \"%s\"",
    2178              :                              dbinfo->dbname);
    2179              : 
    2180              :         /* Fetch all publication names */
    2181            2 :         res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
    2182            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2183              :         {
    2184            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2185              :                                  "could not obtain publication information: %s",
    2186              :                                  PQresultErrorMessage(res));
    2187            0 :             PQclear(res);
    2188            0 :             disconnect_database(conn, true);
    2189              :         }
    2190              : 
    2191              :         /* Drop each publication */
    2192            6 :         for (int i = 0; i < PQntuples(res); i++)
    2193            4 :             drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
    2194              :                              &dbinfo->made_publication);
    2195              : 
    2196            2 :         PQclear(res);
    2197              :     }
    2198              :     else
    2199              :     {
    2200              :         /* Drop publication only if it was created by this tool */
    2201            6 :         if (dbinfo->made_publication)
    2202              :         {
    2203            6 :             drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
    2204              :                              &dbinfo->made_publication);
    2205              :         }
    2206              :         else
    2207              :         {
    2208            0 :             if (dry_run)
    2209            0 :                 report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2210              :                                      "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
    2211              :                                      dbinfo->pubname, dbinfo->dbname);
    2212              :             else
    2213            0 :                 report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2214              :                                      "preserve existing publication \"%s\" in database \"%s\"",
    2215              :                                      dbinfo->pubname, dbinfo->dbname);
    2216              :         }
    2217              :     }
    2218            8 : }
    2219              : 
    2220              : /*
    2221              :  * Create a subscription with some predefined options.
    2222              :  *
    2223              :  * A replication slot was already created in a previous step. Let's use it.  It
    2224              :  * is not required to copy data. The subscription will be created but it will
    2225              :  * not be enabled now. That's because the replication progress must be set and
    2226              :  * the replication origin name (one of the function arguments) contains the
    2227              :  * subscription OID in its name. Once the subscription is created,
    2228              :  * set_replication_progress() can obtain the chosen origin name and set up its
    2229              :  * initial location.
    2230              :  */
    2231              : static void
    2232            8 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    2233              : {
    2234            8 :     PQExpBuffer str = createPQExpBuffer();
    2235              :     PGresult   *res;
    2236              :     char       *pubname_esc;
    2237              :     char       *subname_esc;
    2238              :     char       *pubconninfo_esc;
    2239              :     char       *replslotname_esc;
    2240              : 
    2241              :     Assert(conn != NULL);
    2242              : 
    2243            8 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    2244            8 :     subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    2245            8 :     pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
    2246            8 :     replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
    2247              : 
    2248            8 :     if (dry_run)
    2249            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2250              :                              "dry-run: would create subscription \"%s\" in database \"%s\"",
    2251            6 :                              dbinfo->subname, dbinfo->dbname);
    2252              :     else
    2253            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2254              :                              "creating subscription \"%s\" in database \"%s\"",
    2255            2 :                              dbinfo->subname, dbinfo->dbname);
    2256              : 
    2257            8 :     appendPQExpBuffer(str,
    2258              :                       "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
    2259              :                       "WITH (create_slot = false, enabled = false, "
    2260              :                       "slot_name = %s, copy_data = false, two_phase = %s)",
    2261              :                       subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
    2262            8 :                       dbinfos.two_phase ? "true" : "false");
    2263              : 
    2264            8 :     PQfreemem(pubname_esc);
    2265            8 :     PQfreemem(subname_esc);
    2266            8 :     PQfreemem(pubconninfo_esc);
    2267            8 :     PQfreemem(replslotname_esc);
    2268              : 
    2269            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    2270              :                          "command is: %s", str->data);
    2271              : 
    2272            8 :     if (!dry_run)
    2273              :     {
    2274            2 :         res = PQexec(conn, str->data);
    2275            2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2276              :         {
    2277            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2278              :                                  "could not create subscription \"%s\" in database \"%s\": %s",
    2279            0 :                                  dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
    2280            0 :             disconnect_database(conn, true);
    2281              :         }
    2282            2 :         PQclear(res);
    2283              :     }
    2284              : 
    2285            8 :     destroyPQExpBuffer(str);
    2286            8 : }
    2287              : 
    2288              : /*
    2289              :  * Sets the replication progress to the consistent LSN.
    2290              :  *
    2291              :  * The subscriber caught up to the consistent LSN provided by the last
    2292              :  * replication slot that was created. The goal is to set up the initial
    2293              :  * location for the logical replication that is the exact LSN that the
    2294              :  * subscriber was promoted. Once the subscription is enabled it will start
    2295              :  * streaming from that location onwards.  In dry run mode, the subscription OID
    2296              :  * and LSN are set to invalid values for printing purposes.
    2297              :  */
    2298              : static void
    2299            8 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
    2300              : {
    2301            8 :     PQExpBuffer str = createPQExpBuffer();
    2302              :     PGresult   *res;
    2303              :     Oid         suboid;
    2304              :     char       *subname;
    2305              :     char       *dbname;
    2306              :     char       *originname;
    2307              :     char       *lsnstr;
    2308              : 
    2309              :     Assert(conn != NULL);
    2310              : 
    2311            8 :     subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
    2312            8 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    2313              : 
    2314            8 :     appendPQExpBuffer(str,
    2315              :                       "SELECT s.oid FROM pg_catalog.pg_subscription s "
    2316              :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    2317              :                       "WHERE s.subname = %s AND d.datname = %s",
    2318              :                       subname, dbname);
    2319              : 
    2320            8 :     res = PQexec(conn, str->data);
    2321            8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2322              :     {
    2323            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2324              :                              "could not obtain subscription OID: %s",
    2325              :                              PQresultErrorMessage(res));
    2326            0 :         disconnect_database(conn, true);
    2327              :     }
    2328              : 
    2329            8 :     if (PQntuples(res) != 1 && !dry_run)
    2330              :     {
    2331            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2332              :                              "could not obtain subscription OID: got %d rows, expected %d row",
    2333              :                              PQntuples(res), 1);
    2334            0 :         disconnect_database(conn, true);
    2335              :     }
    2336              : 
    2337            8 :     if (dry_run)
    2338              :     {
    2339            6 :         suboid = InvalidOid;
    2340            6 :         lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    2341              :     }
    2342              :     else
    2343              :     {
    2344            2 :         suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
    2345            2 :         lsnstr = psprintf("%s", lsn);
    2346              :     }
    2347              : 
    2348            8 :     PQclear(res);
    2349              : 
    2350              :     /*
    2351              :      * The origin name is defined as pg_%u. %u is the subscription OID. See
    2352              :      * ApplyWorkerMain().
    2353              :      */
    2354            8 :     originname = psprintf("pg_%u", suboid);
    2355              : 
    2356            8 :     if (dry_run)
    2357            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2358              :                              "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2359            6 :                              originname, lsnstr, dbinfo->dbname);
    2360              :     else
    2361            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2362              :                              "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
    2363            2 :                              originname, lsnstr, dbinfo->dbname);
    2364              : 
    2365            8 :     resetPQExpBuffer(str);
    2366            8 :     appendPQExpBuffer(str,
    2367              :                       "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
    2368              :                       originname, lsnstr);
    2369              : 
    2370            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    2371              :                          "command is: %s", str->data);
    2372              : 
    2373            8 :     if (!dry_run)
    2374              :     {
    2375            2 :         res = PQexec(conn, str->data);
    2376            2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2377              :         {
    2378            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2379              :                                  "could not set replication progress for subscription \"%s\": %s",
    2380            0 :                                  dbinfo->subname, PQresultErrorMessage(res));
    2381            0 :             disconnect_database(conn, true);
    2382              :         }
    2383            2 :         PQclear(res);
    2384              :     }
    2385              : 
    2386            8 :     PQfreemem(subname);
    2387            8 :     PQfreemem(dbname);
    2388            8 :     pg_free(originname);
    2389            8 :     pg_free(lsnstr);
    2390            8 :     destroyPQExpBuffer(str);
    2391            8 : }
    2392              : 
    2393              : /*
    2394              :  * Enables the subscription.
    2395              :  *
    2396              :  * The subscription was created in a previous step but it was disabled. After
    2397              :  * adjusting the initial logical replication location, enable the subscription.
    2398              :  */
    2399              : static void
    2400            8 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    2401              : {
    2402            8 :     PQExpBuffer str = createPQExpBuffer();
    2403              :     PGresult   *res;
    2404              :     char       *subname;
    2405              : 
    2406              :     Assert(conn != NULL);
    2407              : 
    2408            8 :     subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    2409              : 
    2410            8 :     if (dry_run)
    2411            6 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2412              :                              "dry-run: would enable subscription \"%s\" in database \"%s\"",
    2413            6 :                              dbinfo->subname, dbinfo->dbname);
    2414              :     else
    2415            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2416              :                              "enabling subscription \"%s\" in database \"%s\"",
    2417            2 :                              dbinfo->subname, dbinfo->dbname);
    2418              : 
    2419            8 :     appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
    2420              : 
    2421            8 :     report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
    2422              :                          "command is: %s", str->data);
    2423              : 
    2424            8 :     if (!dry_run)
    2425              :     {
    2426            2 :         res = PQexec(conn, str->data);
    2427            2 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2428              :         {
    2429            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2430              :                                  "could not enable subscription \"%s\": %s",
    2431            0 :                                  dbinfo->subname, PQresultErrorMessage(res));
    2432            0 :             disconnect_database(conn, true);
    2433              :         }
    2434              : 
    2435            2 :         PQclear(res);
    2436              :     }
    2437              : 
    2438            8 :     PQfreemem(subname);
    2439            8 :     destroyPQExpBuffer(str);
    2440            8 : }
    2441              : 
    2442              : /*
    2443              :  * Fetch a list of all connectable non-template databases from the source server
    2444              :  * and form a list such that they appear as if the user has specified multiple
    2445              :  * --database options, one for each source database.
    2446              :  */
    2447              : static void
    2448            1 : get_publisher_databases(struct CreateSubscriberOptions *opt,
    2449              :                         bool dbnamespecified)
    2450              : {
    2451              :     PGconn     *conn;
    2452              :     PGresult   *res;
    2453              : 
    2454              :     /* If a database name was specified, just connect to it. */
    2455            1 :     if (dbnamespecified)
    2456            0 :         conn = connect_database(opt->pub_conninfo_str, true);
    2457              :     else
    2458              :     {
    2459              :         /* Otherwise, try postgres first and then template1. */
    2460              :         char       *conninfo;
    2461              : 
    2462            1 :         conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
    2463            1 :         conn = connect_database(conninfo, false);
    2464            1 :         pg_free(conninfo);
    2465            1 :         if (!conn)
    2466              :         {
    2467            0 :             conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
    2468            0 :             conn = connect_database(conninfo, true);
    2469            0 :             pg_free(conninfo);
    2470              :         }
    2471              :     }
    2472              : 
    2473            1 :     res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
    2474            1 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2475              :     {
    2476            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2477              :                              "could not obtain a list of databases: %s",
    2478              :                              PQresultErrorMessage(res));
    2479            0 :         PQclear(res);
    2480            0 :         disconnect_database(conn, true);
    2481              :     }
    2482              : 
    2483            4 :     for (int i = 0; i < PQntuples(res); i++)
    2484              :     {
    2485            3 :         const char *dbname = PQgetvalue(res, i, 0);
    2486              : 
    2487            3 :         simple_string_list_append(&opt->database_names, dbname);
    2488              : 
    2489              :         /* Increment num_dbs to reflect multiple --database options */
    2490            3 :         num_dbs++;
    2491              :     }
    2492              : 
    2493            1 :     PQclear(res);
    2494            1 :     disconnect_database(conn, false);
    2495            1 : }
    2496              : 
    2497              : int
    2498           23 : main(int argc, char **argv)
    2499              : {
    2500              :     static struct option long_options[] =
    2501              :     {
    2502              :         {"all", no_argument, NULL, 'a'},
    2503              :         {"database", required_argument, NULL, 'd'},
    2504              :         {"pgdata", required_argument, NULL, 'D'},
    2505              :         {"logdir", required_argument, NULL, 'l'},
    2506              :         {"dry-run", no_argument, NULL, 'n'},
    2507              :         {"subscriber-port", required_argument, NULL, 'p'},
    2508              :         {"publisher-server", required_argument, NULL, 'P'},
    2509              :         {"socketdir", required_argument, NULL, 's'},
    2510              :         {"recovery-timeout", required_argument, NULL, 't'},
    2511              :         {"enable-two-phase", no_argument, NULL, 'T'},
    2512              :         {"subscriber-username", required_argument, NULL, 'U'},
    2513              :         {"verbose", no_argument, NULL, 'v'},
    2514              :         {"version", no_argument, NULL, 'V'},
    2515              :         {"help", no_argument, NULL, '?'},
    2516              :         {"config-file", required_argument, NULL, 1},
    2517              :         {"publication", required_argument, NULL, 2},
    2518              :         {"replication-slot", required_argument, NULL, 3},
    2519              :         {"subscription", required_argument, NULL, 4},
    2520              :         {"clean", required_argument, NULL, 5},
    2521              :         {NULL, 0, NULL, 0}
    2522              :     };
    2523              : 
    2524           23 :     struct CreateSubscriberOptions opt = {0};
    2525              : 
    2526              :     int         c;
    2527              :     int         option_index;
    2528              : 
    2529              :     char       *pub_base_conninfo;
    2530              :     char       *sub_base_conninfo;
    2531           23 :     char       *dbname_conninfo = NULL;
    2532              : 
    2533              :     uint64      pub_sysid;
    2534              :     uint64      sub_sysid;
    2535              :     struct stat statbuf;
    2536              : 
    2537              :     char       *consistent_lsn;
    2538              : 
    2539              :     char        pidfile[MAXPGPATH];
    2540              : 
    2541           23 :     pg_logging_init(argv[0]);
    2542           23 :     pg_logging_set_level(PG_LOG_WARNING);
    2543           23 :     progname = get_progname(argv[0]);
    2544           23 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
    2545              : 
    2546           23 :     if (argc > 1)
    2547              :     {
    2548           22 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
    2549              :         {
    2550            1 :             usage();
    2551            1 :             exit(0);
    2552              :         }
    2553           21 :         else if (strcmp(argv[1], "-V") == 0
    2554           21 :                  || strcmp(argv[1], "--version") == 0)
    2555              :         {
    2556            1 :             puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
    2557            1 :             exit(0);
    2558              :         }
    2559              :     }
    2560              : 
    2561              :     /* Default settings */
    2562           21 :     subscriber_dir = NULL;
    2563           21 :     opt.config_file = NULL;
    2564           21 :     opt.log_dir = NULL;
    2565           21 :     opt.pub_conninfo_str = NULL;
    2566           21 :     opt.socket_dir = NULL;
    2567           21 :     opt.sub_port = DEFAULT_SUB_PORT;
    2568           21 :     opt.sub_username = NULL;
    2569           21 :     opt.two_phase = false;
    2570           21 :     opt.database_names = (SimpleStringList)
    2571              :     {
    2572              :         0
    2573              :     };
    2574           21 :     opt.recovery_timeout = 0;
    2575           21 :     opt.all_dbs = false;
    2576              : 
    2577              :     /*
    2578              :      * Don't allow it to be run as root. It uses pg_ctl which does not allow
    2579              :      * it either.
    2580              :      */
    2581              : #ifndef WIN32
    2582           21 :     if (geteuid() == 0)
    2583              :     {
    2584            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2585              :                              "cannot be executed by \"root\"");
    2586            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2587              :                              "You must run %s as the PostgreSQL superuser.",
    2588              :                              progname);
    2589            0 :         exit(1);
    2590              :     }
    2591              : #endif
    2592              : 
    2593           21 :     get_restricted_token();
    2594              : 
    2595          163 :     while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
    2596          163 :                             long_options, &option_index)) != -1)
    2597              :     {
    2598          145 :         switch (c)
    2599              :         {
    2600            3 :             case 'a':
    2601            3 :                 opt.all_dbs = true;
    2602            3 :                 break;
    2603           25 :             case 'd':
    2604           25 :                 if (!simple_string_list_member(&opt.database_names, optarg))
    2605              :                 {
    2606           24 :                     simple_string_list_append(&opt.database_names, optarg);
    2607           24 :                     num_dbs++;
    2608              :                 }
    2609              :                 else
    2610            1 :                     report_createsub_fatal("database \"%s\" specified more than once for -d/--database", optarg);
    2611           24 :                 break;
    2612           19 :             case 'D':
    2613           19 :                 subscriber_dir = pg_strdup(optarg);
    2614           19 :                 canonicalize_path(subscriber_dir);
    2615           19 :                 break;
    2616            1 :             case 'l':
    2617            1 :                 opt.log_dir = pg_strdup(optarg);
    2618            1 :                 canonicalize_path(opt.log_dir);
    2619            1 :                 break;
    2620            9 :             case 'n':
    2621            9 :                 dry_run = true;
    2622            9 :                 break;
    2623           12 :             case 'p':
    2624           12 :                 opt.sub_port = pg_strdup(optarg);
    2625           12 :                 break;
    2626           18 :             case 'P':
    2627           18 :                 opt.pub_conninfo_str = pg_strdup(optarg);
    2628           18 :                 break;
    2629           12 :             case 's':
    2630           12 :                 opt.socket_dir = pg_strdup(optarg);
    2631           12 :                 canonicalize_path(opt.socket_dir);
    2632           12 :                 break;
    2633            3 :             case 't':
    2634            3 :                 opt.recovery_timeout = atoi(optarg);
    2635            3 :                 break;
    2636            1 :             case 'T':
    2637            1 :                 opt.two_phase = true;
    2638            1 :                 break;
    2639            0 :             case 'U':
    2640            0 :                 opt.sub_username = pg_strdup(optarg);
    2641            0 :                 break;
    2642           19 :             case 'v':
    2643           19 :                 pg_logging_increase_verbosity();
    2644           19 :                 break;
    2645            0 :             case 1:
    2646            0 :                 opt.config_file = pg_strdup(optarg);
    2647            0 :                 break;
    2648           12 :             case 2:
    2649           12 :                 if (!simple_string_list_member(&opt.pub_names, optarg))
    2650              :                 {
    2651           11 :                     simple_string_list_append(&opt.pub_names, optarg);
    2652           11 :                     num_pubs++;
    2653              :                 }
    2654              :                 else
    2655            1 :                     report_createsub_fatal("publication \"%s\" specified more than once for --publication", optarg);
    2656           11 :                 break;
    2657            4 :             case 3:
    2658            4 :                 if (!simple_string_list_member(&opt.replslot_names, optarg))
    2659              :                 {
    2660            4 :                     simple_string_list_append(&opt.replslot_names, optarg);
    2661            4 :                     num_replslots++;
    2662              :                 }
    2663              :                 else
    2664            0 :                     report_createsub_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
    2665            4 :                 break;
    2666            5 :             case 4:
    2667            5 :                 if (!simple_string_list_member(&opt.sub_names, optarg))
    2668              :                 {
    2669            5 :                     simple_string_list_append(&opt.sub_names, optarg);
    2670            5 :                     num_subs++;
    2671              :                 }
    2672              :                 else
    2673            0 :                     report_createsub_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
    2674            5 :                 break;
    2675            1 :             case 5:
    2676            1 :                 if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
    2677            1 :                     simple_string_list_append(&opt.objecttypes_to_clean, optarg);
    2678              :                 else
    2679            0 :                     report_createsub_fatal("object type \"%s\" specified more than once for --clean", optarg);
    2680            1 :                 break;
    2681            1 :             default:
    2682              :                 /* getopt_long already emitted a complaint */
    2683            1 :                 report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2684              :                                      "Try \"%s --help\" for more information.",
    2685              :                                      progname);
    2686            1 :                 exit(1);
    2687              :         }
    2688              :     }
    2689              : 
    2690              :     /* Validate that --all is not used with incompatible options */
    2691           18 :     if (opt.all_dbs)
    2692              :     {
    2693            3 :         char       *bad_switch = NULL;
    2694              : 
    2695            3 :         if (num_dbs > 0)
    2696            1 :             bad_switch = "--database";
    2697            2 :         else if (num_pubs > 0)
    2698            1 :             bad_switch = "--publication";
    2699            1 :         else if (num_replslots > 0)
    2700            0 :             bad_switch = "--replication-slot";
    2701            1 :         else if (num_subs > 0)
    2702            0 :             bad_switch = "--subscription";
    2703              : 
    2704            3 :         if (bad_switch)
    2705              :         {
    2706            2 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2707              :                                  "options %s and %s cannot be used together",
    2708              :                                  bad_switch, "-a/--all");
    2709            2 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2710              :                                  "Try \"%s --help\" for more information.",
    2711              :                                  progname);
    2712            2 :             exit(1);
    2713              :         }
    2714              :     }
    2715              : 
    2716              :     /* Any non-option arguments? */
    2717           16 :     if (optind < argc)
    2718              :     {
    2719            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2720              :                              "too many command-line arguments (first is \"%s\")",
    2721            0 :                              argv[optind]);
    2722            0 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2723              :                              "Try \"%s --help\" for more information.", progname);
    2724            0 :         exit(1);
    2725              :     }
    2726              : 
    2727              :     /* Required arguments */
    2728           16 :     if (subscriber_dir == NULL)
    2729              :     {
    2730            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2731              :                              "no subscriber data directory specified");
    2732            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2733              :                              "Try \"%s --help\" for more information.", progname);
    2734            1 :         exit(1);
    2735              :     }
    2736              : 
    2737              :     /* If socket directory is not provided, use the current directory */
    2738           15 :     if (opt.socket_dir == NULL)
    2739              :     {
    2740              :         char        cwd[MAXPGPATH];
    2741              : 
    2742            5 :         if (!getcwd(cwd, MAXPGPATH))
    2743            0 :             report_createsub_fatal("could not determine current directory");
    2744            5 :         opt.socket_dir = pg_strdup(cwd);
    2745            5 :         canonicalize_path(opt.socket_dir);
    2746              :     }
    2747              : 
    2748              :     /*
    2749              :      * Parse connection string. Build a base connection string that might be
    2750              :      * reused by multiple databases.
    2751              :      */
    2752           15 :     if (opt.pub_conninfo_str == NULL)
    2753              :     {
    2754              :         /*
    2755              :          * TODO use primary_conninfo (if available) from subscriber and
    2756              :          * extract publisher connection string. Assume that there are
    2757              :          * identical entries for physical and logical replication. If there is
    2758              :          * not, we would fail anyway.
    2759              :          */
    2760            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2761              :                              "no publisher connection string specified");
    2762            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2763              :                              "Try \"%s --help\" for more information.", progname);
    2764            1 :         exit(1);
    2765              :     }
    2766              : 
    2767           14 :     if (opt.log_dir != NULL)
    2768              :     {
    2769              :         char       *internal_log_file;
    2770              : 
    2771            1 :         umask(PG_MODE_MASK_OWNER);
    2772              : 
    2773              :         /*
    2774              :          * Set mask based on PGDATA permissions, needed for the creation of
    2775              :          * the output directories with correct permissions, similar with
    2776              :          * pg_ctl and pg_upgrade.
    2777              :          *
    2778              :          * Don't error here if the data directory cannot be stat'd. Upcoming
    2779              :          * checks for the data directory would raise the fatal error later.
    2780              :          */
    2781            1 :         if (GetDataDirectoryCreatePerm(subscriber_dir))
    2782            1 :             umask(pg_mode_mask);
    2783              : 
    2784            1 :         make_output_dirs(opt.log_dir);
    2785            1 :         internal_log_file = psprintf("%s/%s", logdir, INTERNAL_LOG_FILE_NAME);
    2786              : 
    2787              :         /* logfile_open() will exit if there is an error */
    2788            1 :         internal_log_file_fp = logfile_open(internal_log_file, "a");
    2789            1 :         pg_free(internal_log_file);
    2790              :     }
    2791              : 
    2792           14 :     if (dry_run)
    2793            8 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2794              :                              "Executing in dry-run mode.\n"
    2795              :                              "The target directory will not be modified.");
    2796              : 
    2797           14 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2798              :                          "validating publisher connection string");
    2799           14 :     pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
    2800              :                                           &dbname_conninfo);
    2801           14 :     if (pub_base_conninfo == NULL)
    2802            0 :         exit(1);
    2803              : 
    2804           14 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2805              :                          "validating subscriber connection string");
    2806           14 :     sub_base_conninfo = get_sub_conninfo(&opt);
    2807              : 
    2808              :     /*
    2809              :      * Fetch all databases from the source (publisher) and treat them as if
    2810              :      * the user specified has multiple --database options, one for each source
    2811              :      * database.
    2812              :      */
    2813           14 :     if (opt.all_dbs)
    2814              :     {
    2815            1 :         bool        dbnamespecified = (dbname_conninfo != NULL);
    2816              : 
    2817            1 :         get_publisher_databases(&opt, dbnamespecified);
    2818              :     }
    2819              : 
    2820           14 :     if (opt.database_names.head == NULL)
    2821              :     {
    2822            2 :         report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2823              :                              "no database was specified");
    2824              : 
    2825              :         /*
    2826              :          * Try to obtain the dbname from the publisher conninfo. If dbname
    2827              :          * parameter is not available, error out.
    2828              :          */
    2829            2 :         if (dbname_conninfo)
    2830              :         {
    2831            1 :             simple_string_list_append(&opt.database_names, dbname_conninfo);
    2832            1 :             num_dbs++;
    2833              : 
    2834            1 :             report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2835              :                                  "database name \"%s\" was extracted from the publisher connection string",
    2836              :                                  dbname_conninfo);
    2837              :         }
    2838              :         else
    2839              :         {
    2840            1 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2841              :                                  "no database name specified");
    2842            1 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2843              :                                  "Try \"%s --help\" for more information.",
    2844              :                                  progname);
    2845            1 :             exit(1);
    2846              :         }
    2847              :     }
    2848              : 
    2849              :     /* Number of object names must match number of databases */
    2850           13 :     if (num_pubs > 0 && num_pubs != num_dbs)
    2851              :     {
    2852            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2853              :                              "wrong number of publication names specified");
    2854            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    2855              :                              "The number of specified publication names (%d) must match the number of specified database names (%d).",
    2856              :                              num_pubs, num_dbs);
    2857            1 :         exit(1);
    2858              :     }
    2859           12 :     if (num_subs > 0 && num_subs != num_dbs)
    2860              :     {
    2861            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2862              :                              "wrong number of subscription names specified");
    2863            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    2864              :                              "The number of specified subscription names (%d) must match the number of specified database names (%d).",
    2865              :                              num_subs, num_dbs);
    2866            1 :         exit(1);
    2867              :     }
    2868           11 :     if (num_replslots > 0 && num_replslots != num_dbs)
    2869              :     {
    2870            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2871              :                              "wrong number of replication slot names specified");
    2872            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
    2873              :                              "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
    2874              :                              num_replslots, num_dbs);
    2875            1 :         exit(1);
    2876              :     }
    2877              : 
    2878              :     /* Verify the object types specified for removal from the subscriber */
    2879           11 :     for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
    2880              :     {
    2881            1 :         if (pg_strcasecmp(cell->val, "publications") == 0)
    2882            1 :             dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
    2883              :         else
    2884              :         {
    2885            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2886              :                                  "invalid object type \"%s\" specified for %s",
    2887            0 :                                  cell->val, "--clean");
    2888            0 :             report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2889              :                                  "The valid value is: \"%s\"", "publications");
    2890            0 :             exit(1);
    2891              :         }
    2892              :     }
    2893              : 
    2894              :     /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    2895           10 :     pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    2896           10 :     pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
    2897              : 
    2898              :     /* Rudimentary check for a data directory */
    2899           10 :     check_data_directory(subscriber_dir);
    2900              : 
    2901           10 :     dbinfos.two_phase = opt.two_phase;
    2902              : 
    2903              :     /*
    2904              :      * Store database information for publisher and subscriber. It should be
    2905              :      * called before atexit() because its return is used in the
    2906              :      * cleanup_objects_atexit().
    2907              :      */
    2908           10 :     dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
    2909              : 
    2910              :     /* Register a function to clean up objects in case of failure */
    2911           10 :     atexit(cleanup_objects_atexit);
    2912              : 
    2913              :     /*
    2914              :      * Check if the subscriber data directory has the same system identifier
    2915              :      * than the publisher data directory.
    2916              :      */
    2917           10 :     pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
    2918           10 :     sub_sysid = get_standby_sysid(subscriber_dir);
    2919           10 :     if (pub_sysid != sub_sysid)
    2920            1 :         report_createsub_fatal("subscriber data directory is not a copy of the source database cluster");
    2921              : 
    2922              :     /* Subscriber PID file */
    2923            9 :     snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
    2924              : 
    2925              :     /*
    2926              :      * The standby server must not be running. If the server is started under
    2927              :      * service manager and pg_createsubscriber stops it, the service manager
    2928              :      * might react to this action and start the server again. Therefore,
    2929              :      * refuse to proceed if the server is running to avoid possible failures.
    2930              :      */
    2931            9 :     if (stat(pidfile, &statbuf) == 0)
    2932              :     {
    2933            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
    2934              :                              "standby server is running");
    2935            1 :         report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
    2936              :                              "Stop the standby server and try again.");
    2937            1 :         exit(1);
    2938              :     }
    2939              : 
    2940              :     /*
    2941              :      * Start a short-lived standby server with temporary parameters (provided
    2942              :      * by command-line options). The goal is to avoid connections during the
    2943              :      * transformation steps.
    2944              :      */
    2945            8 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2946              :                          "starting the standby server with command-line options");
    2947            8 :     start_standby_server(&opt, true, false);
    2948              : 
    2949              :     /* Check if the standby server is ready for logical replication */
    2950            8 :     check_subscriber(dbinfos.dbinfo);
    2951              : 
    2952              :     /* Check if the primary server is ready for logical replication */
    2953            6 :     check_publisher(dbinfos.dbinfo);
    2954              : 
    2955              :     /*
    2956              :      * Stop the target server. The recovery process requires that the server
    2957              :      * reaches a consistent state before targeting the recovery stop point.
    2958              :      * Make sure a consistent state is reached (stop the target server
    2959              :      * guarantees it) *before* creating the replication slots in
    2960              :      * setup_publisher().
    2961              :      */
    2962            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2963              :                          "stopping the subscriber");
    2964            4 :     stop_standby_server(subscriber_dir);
    2965              : 
    2966              :     /* Create the required objects for each database on publisher */
    2967            4 :     consistent_lsn = setup_publisher(dbinfos.dbinfo);
    2968              : 
    2969              :     /* Write the required recovery parameters */
    2970            4 :     setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
    2971              : 
    2972              :     /*
    2973              :      * Start subscriber so the recovery parameters will take effect. Wait
    2974              :      * until accepting connections. We don't want to start logical replication
    2975              :      * during setup.
    2976              :      */
    2977            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    2978              :                          "starting the subscriber");
    2979            4 :     start_standby_server(&opt, true, true);
    2980              : 
    2981              :     /* Waiting the subscriber to be promoted */
    2982            4 :     wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
    2983              : 
    2984              :     /*
    2985              :      * Create the subscription for each database on subscriber. It does not
    2986              :      * enable it immediately because it needs to adjust the replication start
    2987              :      * point to the LSN reported by setup_publisher().  It also cleans up
    2988              :      * publications created by this tool and replication to the standby.
    2989              :      */
    2990            4 :     setup_subscriber(dbinfos.dbinfo, consistent_lsn);
    2991              : 
    2992              :     /* Remove primary_slot_name if it exists on primary */
    2993            4 :     drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
    2994              : 
    2995              :     /* Remove failover replication slots if they exist on subscriber */
    2996            4 :     drop_failover_replication_slots(dbinfos.dbinfo);
    2997              : 
    2998              :     /* Stop the subscriber */
    2999            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    3000              :                          "stopping the subscriber");
    3001            4 :     stop_standby_server(subscriber_dir);
    3002              : 
    3003              :     /* Change system identifier from subscriber */
    3004            4 :     modify_subscriber_sysid(&opt);
    3005              : 
    3006            4 :     success = true;
    3007              : 
    3008            4 :     report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
    3009              :                          "Done!");
    3010              : 
    3011            4 :     return 0;
    3012              : }
        

Generated by: LCOV version 2.0-1