LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_createsubscriber.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 649 775 83.7 %
Date: 2024-05-02 00:11:32 Functions: 33 33 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_createsubscriber.c
       4             :  *    Create a new logical replica from a standby server
       5             :  *
       6             :  * Copyright (C) 2024, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/bin/pg_basebackup/pg_createsubscriber.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : 
      14             : #include "postgres_fe.h"
      15             : 
      16             : #include <sys/time.h>
      17             : #include <sys/wait.h>
      18             : #include <time.h>
      19             : 
      20             : #include "catalog/pg_authid_d.h"
      21             : #include "common/connect.h"
      22             : #include "common/controldata_utils.h"
      23             : #include "common/file_perm.h"
      24             : #include "common/logging.h"
      25             : #include "common/pg_prng.h"
      26             : #include "common/restricted_token.h"
      27             : #include "fe_utils/recovery_gen.h"
      28             : #include "fe_utils/simple_list.h"
      29             : #include "getopt_long.h"
      30             : 
      31             : #define DEFAULT_SUB_PORT    "50432"
      32             : 
      33             : /* Command-line options */
      34             : struct CreateSubscriberOptions
      35             : {
      36             :     char       *config_file;    /* configuration file */
      37             :     char       *pub_conninfo_str;   /* publisher connection string */
      38             :     char       *socket_dir;     /* directory for Unix-domain socket, if any */
      39             :     char       *sub_port;       /* subscriber port number */
      40             :     const char *sub_username;   /* subscriber username */
      41             :     SimpleStringList database_names;    /* list of database names */
      42             :     SimpleStringList pub_names; /* list of publication names */
      43             :     SimpleStringList sub_names; /* list of subscription names */
      44             :     SimpleStringList replslot_names;    /* list of replication slot names */
      45             :     int         recovery_timeout;   /* stop recovery after this time */
      46             : };
      47             : 
      48             : struct LogicalRepInfo
      49             : {
      50             :     Oid         oid;            /* database OID */
      51             :     char       *dbname;         /* database name */
      52             :     char       *pubconninfo;    /* publisher connection string */
      53             :     char       *subconninfo;    /* subscriber connection string */
      54             :     char       *pubname;        /* publication name */
      55             :     char       *subname;        /* subscription name */
      56             :     char       *replslotname;   /* replication slot name */
      57             : 
      58             :     bool        made_replslot;  /* replication slot was created */
      59             :     bool        made_publication;   /* publication was created */
      60             : };
      61             : 
      62             : static void cleanup_objects_atexit(void);
      63             : static void usage();
      64             : static char *get_base_conninfo(const char *conninfo, char **dbname);
      65             : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
      66             : static char *get_exec_path(const char *argv0, const char *progname);
      67             : static void check_data_directory(const char *datadir);
      68             : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
      69             : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
      70             :                                                  const char *pub_base_conninfo,
      71             :                                                  const char *sub_base_conninfo);
      72             : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
      73             : static void disconnect_database(PGconn *conn, bool exit_on_error);
      74             : static uint64 get_primary_sysid(const char *conninfo);
      75             : static uint64 get_standby_sysid(const char *datadir);
      76             : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
      77             : static bool server_is_in_recovery(PGconn *conn);
      78             : static char *generate_object_name(PGconn *conn);
      79             : static void check_publisher(const struct LogicalRepInfo *dbinfo);
      80             : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
      81             : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
      82             : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
      83             :                              const char *consistent_lsn);
      84             : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
      85             :                            const char *lsn);
      86             : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
      87             :                                           const char *slotname);
      88             : static char *create_logical_replication_slot(PGconn *conn,
      89             :                                              struct LogicalRepInfo *dbinfo);
      90             : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
      91             :                                   const char *slot_name);
      92             : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
      93             : static void start_standby_server(const struct CreateSubscriberOptions *opt,
      94             :                                  bool restricted_access);
      95             : static void stop_standby_server(const char *datadir);
      96             : static void wait_for_end_recovery(const char *conninfo,
      97             :                                   const struct CreateSubscriberOptions *opt);
      98             : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
      99             : static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
     100             : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     101             : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
     102             :                                      const char *lsn);
     103             : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
     104             : 
     105             : #define USEC_PER_SEC    1000000
     106             : #define WAIT_INTERVAL   1       /* 1 second */
     107             : 
     108             : static const char *progname;
     109             : 
     110             : static char *primary_slot_name = NULL;
     111             : static bool dry_run = false;
     112             : 
     113             : static bool success = false;
     114             : 
     115             : static struct LogicalRepInfo *dbinfo;
     116             : static int  num_dbs = 0;        /* number of specified databases */
     117             : static int  num_pubs = 0;       /* number of specified publications */
     118             : static int  num_subs = 0;       /* number of specified subscriptions */
     119             : static int  num_replslots = 0;  /* number of specified replication slots */
     120             : 
     121             : static pg_prng_state prng_state;
     122             : 
     123             : static char *pg_ctl_path = NULL;
     124             : static char *pg_resetwal_path = NULL;
     125             : 
     126             : /* standby / subscriber data directory */
     127             : static char *subscriber_dir = NULL;
     128             : 
     129             : static bool recovery_ended = false;
     130             : static bool standby_running = false;
     131             : 
     132             : enum WaitPMResult
     133             : {
     134             :     POSTMASTER_READY,
     135             :     POSTMASTER_STILL_STARTING
     136             : };
     137             : 
     138             : 
     139             : /*
     140             :  * Cleanup objects that were created by pg_createsubscriber if there is an
     141             :  * error.
     142             :  *
     143             :  * Publications and replication slots are created on primary. Depending on the
     144             :  * step it failed, it should remove the already created objects if it is
     145             :  * possible (sometimes it won't work due to a connection issue).
     146             :  * There is no cleanup on the target server. The steps on the target server are
     147             :  * executed *after* promotion, hence, at this point, a failure means recreate
     148             :  * the physical replica and start again.
     149             :  */
     150             : static void
     151          18 : cleanup_objects_atexit(void)
     152             : {
     153          18 :     if (success)
     154           6 :         return;
     155             : 
     156             :     /*
     157             :      * If the server is promoted, there is no way to use the current setup
     158             :      * again. Warn the user that a new replication setup should be done before
     159             :      * trying again.
     160             :      */
     161          12 :     if (recovery_ended)
     162             :     {
     163           0 :         pg_log_warning("failed after the end of recovery");
     164           0 :         pg_log_warning_hint("The target server cannot be used as a physical replica anymore.  "
     165             :                             "You must recreate the physical replica before continuing.");
     166             :     }
     167             : 
     168          36 :     for (int i = 0; i < num_dbs; i++)
     169             :     {
     170          24 :         if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
     171             :         {
     172             :             PGconn     *conn;
     173             : 
     174           0 :             conn = connect_database(dbinfo[i].pubconninfo, false);
     175           0 :             if (conn != NULL)
     176             :             {
     177           0 :                 if (dbinfo[i].made_publication)
     178           0 :                     drop_publication(conn, &dbinfo[i]);
     179           0 :                 if (dbinfo[i].made_replslot)
     180           0 :                     drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
     181           0 :                 disconnect_database(conn, false);
     182             :             }
     183             :             else
     184             :             {
     185             :                 /*
     186             :                  * If a connection could not be established, inform the user
     187             :                  * that some objects were left on primary and should be
     188             :                  * removed before trying again.
     189             :                  */
     190           0 :                 if (dbinfo[i].made_publication)
     191             :                 {
     192           0 :                     pg_log_warning("publication \"%s\" in database \"%s\" on primary might be left behind",
     193             :                                    dbinfo[i].pubname, dbinfo[i].dbname);
     194           0 :                     pg_log_warning_hint("Consider dropping this publication before trying again.");
     195             :                 }
     196           0 :                 if (dbinfo[i].made_replslot)
     197             :                 {
     198           0 :                     pg_log_warning("replication slot \"%s\" in database \"%s\" on primary might be left behind",
     199             :                                    dbinfo[i].replslotname, dbinfo[i].dbname);
     200           0 :                     pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
     201             :                 }
     202             :             }
     203             :         }
     204             :     }
     205             : 
     206          12 :     if (standby_running)
     207           8 :         stop_standby_server(subscriber_dir);
     208             : }
     209             : 
     210             : static void
     211           2 : usage(void)
     212             : {
     213           2 :     printf(_("%s creates a new logical replica from a standby server.\n\n"),
     214             :            progname);
     215           2 :     printf(_("Usage:\n"));
     216           2 :     printf(_("  %s [OPTION]...\n"), progname);
     217           2 :     printf(_("\nOptions:\n"));
     218           2 :     printf(_(" -d, --database=DBNAME               database to create a subscription\n"));
     219           2 :     printf(_(" -D, --pgdata=DATADIR                location for the subscriber data directory\n"));
     220           2 :     printf(_(" -n, --dry-run                       dry run, just show what would be done\n"));
     221           2 :     printf(_(" -p, --subscriber-port=PORT          subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
     222           2 :     printf(_(" -P, --publisher-server=CONNSTR      publisher connection string\n"));
     223           2 :     printf(_(" -s, --socket-directory=DIR          socket directory to use (default current directory)\n"));
     224           2 :     printf(_(" -t, --recovery-timeout=SECS         seconds to wait for recovery to end\n"));
     225           2 :     printf(_(" -U, --subscriber-username=NAME      subscriber username\n"));
     226           2 :     printf(_(" -v, --verbose                       output verbose messages\n"));
     227           2 :     printf(_("     --config-file=FILENAME          use specified main server configuration\n"
     228             :              "                                     file when running target cluster\n"));
     229           2 :     printf(_("     --publication=NAME              publication name\n"));
     230           2 :     printf(_("     --replication-slot=NAME         replication slot name\n"));
     231           2 :     printf(_("     --subscription=NAME             subscription name\n"));
     232           2 :     printf(_(" -V, --version                       output version information, then exit\n"));
     233           2 :     printf(_(" -?, --help                          show this help, then exit\n"));
     234           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     235           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     236           2 : }
     237             : 
     238             : /*
     239             :  * Validate a connection string. Returns a base connection string that is a
     240             :  * connection string without a database name.
     241             :  *
     242             :  * Since we might process multiple databases, each database name will be
     243             :  * appended to this base connection string to provide a final connection
     244             :  * string. If the second argument (dbname) is not null, returns dbname if the
     245             :  * provided connection string contains it. If option --database is not
     246             :  * provided, uses dbname as the only database to setup the logical replica.
     247             :  *
     248             :  * It is the caller's responsibility to free the returned connection string and
     249             :  * dbname.
     250             :  */
     251             : static char *
     252          26 : get_base_conninfo(const char *conninfo, char **dbname)
     253             : {
     254             :     PQExpBuffer buf;
     255             :     PQconninfoOption *conn_opts;
     256             :     PQconninfoOption *conn_opt;
     257          26 :     char       *errmsg = NULL;
     258             :     char       *ret;
     259             :     int         i;
     260             : 
     261          26 :     conn_opts = PQconninfoParse(conninfo, &errmsg);
     262          26 :     if (conn_opts == NULL)
     263             :     {
     264           0 :         pg_log_error("could not parse connection string: %s", errmsg);
     265           0 :         PQfreemem(errmsg);
     266           0 :         return NULL;
     267             :     }
     268             : 
     269          26 :     buf = createPQExpBuffer();
     270          26 :     i = 0;
     271        1092 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     272             :     {
     273        1066 :         if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL)
     274             :         {
     275          18 :             if (dbname)
     276          18 :                 *dbname = pg_strdup(conn_opt->val);
     277          18 :             continue;
     278             :         }
     279             : 
     280        1048 :         if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     281             :         {
     282          44 :             if (i > 0)
     283          18 :                 appendPQExpBufferChar(buf, ' ');
     284          44 :             appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val);
     285          44 :             i++;
     286             :         }
     287             :     }
     288             : 
     289          26 :     ret = pg_strdup(buf->data);
     290             : 
     291          26 :     destroyPQExpBuffer(buf);
     292          26 :     PQconninfoFree(conn_opts);
     293             : 
     294          26 :     return ret;
     295             : }
     296             : 
     297             : /*
     298             :  * Build a subscriber connection string. Only a few parameters are supported
     299             :  * since it starts a server with restricted access.
     300             :  */
     301             : static char *
     302          26 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
     303             : {
     304          26 :     PQExpBuffer buf = createPQExpBuffer();
     305             :     char       *ret;
     306             : 
     307          26 :     appendPQExpBuffer(buf, "port=%s", opt->sub_port);
     308             : #if !defined(WIN32)
     309          26 :     appendPQExpBuffer(buf, " host=%s", opt->socket_dir);
     310             : #endif
     311          26 :     if (opt->sub_username != NULL)
     312           0 :         appendPQExpBuffer(buf, " user=%s", opt->sub_username);
     313          26 :     appendPQExpBuffer(buf, " fallback_application_name=%s", progname);
     314             : 
     315          26 :     ret = pg_strdup(buf->data);
     316             : 
     317          26 :     destroyPQExpBuffer(buf);
     318             : 
     319          26 :     return ret;
     320             : }
     321             : 
     322             : /*
     323             :  * Verify if a PostgreSQL binary (progname) is available in the same directory as
     324             :  * pg_createsubscriber and it has the same version.  It returns the absolute
     325             :  * path of the progname.
     326             :  */
     327             : static char *
     328          36 : get_exec_path(const char *argv0, const char *progname)
     329             : {
     330             :     char       *versionstr;
     331             :     char       *exec_path;
     332             :     int         ret;
     333             : 
     334          36 :     versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
     335          36 :     exec_path = pg_malloc(MAXPGPATH);
     336          36 :     ret = find_other_exec(argv0, progname, versionstr, exec_path);
     337             : 
     338          36 :     if (ret < 0)
     339             :     {
     340             :         char        full_path[MAXPGPATH];
     341             : 
     342           0 :         if (find_my_exec(argv0, full_path) < 0)
     343           0 :             strlcpy(full_path, progname, sizeof(full_path));
     344             : 
     345           0 :         if (ret == -1)
     346           0 :             pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
     347             :                      progname, "pg_createsubscriber", full_path);
     348             :         else
     349           0 :             pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
     350             :                      progname, full_path, "pg_createsubscriber");
     351             :     }
     352             : 
     353          36 :     pg_log_debug("%s path is:  %s", progname, exec_path);
     354             : 
     355          36 :     return exec_path;
     356             : }
     357             : 
     358             : /*
     359             :  * Is it a cluster directory? These are preliminary checks. It is far from
     360             :  * making an accurate check. If it is not a clone from the publisher, it will
     361             :  * eventually fail in a future step.
     362             :  */
     363             : static void
     364          18 : check_data_directory(const char *datadir)
     365             : {
     366             :     struct stat statbuf;
     367             :     char        versionfile[MAXPGPATH];
     368             : 
     369          18 :     pg_log_info("checking if directory \"%s\" is a cluster data directory",
     370             :                 datadir);
     371             : 
     372          18 :     if (stat(datadir, &statbuf) != 0)
     373             :     {
     374           0 :         if (errno == ENOENT)
     375           0 :             pg_fatal("data directory \"%s\" does not exist", datadir);
     376             :         else
     377           0 :             pg_fatal("could not access directory \"%s\": %m", datadir);
     378             :     }
     379             : 
     380          18 :     snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
     381          18 :     if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
     382             :     {
     383           0 :         pg_fatal("directory \"%s\" is not a database cluster directory",
     384             :                  datadir);
     385             :     }
     386          18 : }
     387             : 
     388             : /*
     389             :  * Append database name into a base connection string.
     390             :  *
     391             :  * dbname is the only parameter that changes so it is not included in the base
     392             :  * connection string. This function concatenates dbname to build a "real"
     393             :  * connection string.
     394             :  */
     395             : static char *
     396          68 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
     397             : {
     398          68 :     PQExpBuffer buf = createPQExpBuffer();
     399             :     char       *ret;
     400             : 
     401             :     Assert(conninfo != NULL);
     402             : 
     403          68 :     appendPQExpBufferStr(buf, conninfo);
     404          68 :     appendPQExpBuffer(buf, " dbname=%s", dbname);
     405             : 
     406          68 :     ret = pg_strdup(buf->data);
     407          68 :     destroyPQExpBuffer(buf);
     408             : 
     409          68 :     return ret;
     410             : }
     411             : 
     412             : /*
     413             :  * Store publication and subscription information.
     414             :  *
     415             :  * If publication, replication slot and subscription names were specified,
     416             :  * store it here. Otherwise, a generated name will be assigned to the object in
     417             :  * setup_publisher().
     418             :  */
     419             : static struct LogicalRepInfo *
     420          18 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
     421             :                    const char *pub_base_conninfo,
     422             :                    const char *sub_base_conninfo)
     423             : {
     424             :     struct LogicalRepInfo *dbinfo;
     425          18 :     SimpleStringListCell *pubcell = NULL;
     426          18 :     SimpleStringListCell *subcell = NULL;
     427          18 :     SimpleStringListCell *replslotcell = NULL;
     428          18 :     int         i = 0;
     429             : 
     430          18 :     dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
     431             : 
     432          18 :     if (num_pubs > 0)
     433           4 :         pubcell = opt->pub_names.head;
     434          18 :     if (num_subs > 0)
     435           2 :         subcell = opt->sub_names.head;
     436          18 :     if (num_replslots > 0)
     437           4 :         replslotcell = opt->replslot_names.head;
     438             : 
     439          52 :     for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
     440             :     {
     441             :         char       *conninfo;
     442             : 
     443             :         /* Fill publisher attributes */
     444          34 :         conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
     445          34 :         dbinfo[i].pubconninfo = conninfo;
     446          34 :         dbinfo[i].dbname = cell->val;
     447          34 :         if (num_pubs > 0)
     448           8 :             dbinfo[i].pubname = pubcell->val;
     449             :         else
     450          26 :             dbinfo[i].pubname = NULL;
     451          34 :         if (num_replslots > 0)
     452           6 :             dbinfo[i].replslotname = replslotcell->val;
     453             :         else
     454          28 :             dbinfo[i].replslotname = NULL;
     455          34 :         dbinfo[i].made_replslot = false;
     456          34 :         dbinfo[i].made_publication = false;
     457             :         /* Fill subscriber attributes */
     458          34 :         conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
     459          34 :         dbinfo[i].subconninfo = conninfo;
     460          34 :         if (num_subs > 0)
     461           4 :             dbinfo[i].subname = subcell->val;
     462             :         else
     463          30 :             dbinfo[i].subname = NULL;
     464             :         /* Other fields will be filled later */
     465             : 
     466          34 :         pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
     467             :                      dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
     468             :                      dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
     469             :                      dbinfo[i].pubconninfo);
     470          34 :         pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
     471             :                      dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
     472             :                      dbinfo[i].subconninfo);
     473             : 
     474          34 :         if (num_pubs > 0)
     475           8 :             pubcell = pubcell->next;
     476          34 :         if (num_subs > 0)
     477           4 :             subcell = subcell->next;
     478          34 :         if (num_replslots > 0)
     479           6 :             replslotcell = replslotcell->next;
     480             : 
     481          34 :         i++;
     482             :     }
     483             : 
     484          18 :     return dbinfo;
     485             : }
     486             : 
     487             : /*
     488             :  * Open a new connection. If exit_on_error is true, it has an undesired
     489             :  * condition and it should exit immediately.
     490             :  */
     491             : static PGconn *
     492          80 : connect_database(const char *conninfo, bool exit_on_error)
     493             : {
     494             :     PGconn     *conn;
     495             :     PGresult   *res;
     496             : 
     497          80 :     conn = PQconnectdb(conninfo);
     498          80 :     if (PQstatus(conn) != CONNECTION_OK)
     499             :     {
     500           0 :         pg_log_error("connection to database failed: %s",
     501             :                      PQerrorMessage(conn));
     502           0 :         PQfinish(conn);
     503             : 
     504           0 :         if (exit_on_error)
     505           0 :             exit(1);
     506           0 :         return NULL;
     507             :     }
     508             : 
     509             :     /* Secure search_path */
     510          80 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     511          80 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     512             :     {
     513           0 :         pg_log_error("could not clear search_path: %s",
     514             :                      PQresultErrorMessage(res));
     515           0 :         PQclear(res);
     516           0 :         PQfinish(conn);
     517             : 
     518           0 :         if (exit_on_error)
     519           0 :             exit(1);
     520           0 :         return NULL;
     521             :     }
     522          80 :     PQclear(res);
     523             : 
     524          80 :     return conn;
     525             : }
     526             : 
     527             : /*
     528             :  * Close the connection. If exit_on_error is true, it has an undesired
     529             :  * condition and it should exit immediately.
     530             :  */
     531             : static void
     532          80 : disconnect_database(PGconn *conn, bool exit_on_error)
     533             : {
     534             :     Assert(conn != NULL);
     535             : 
     536          80 :     PQfinish(conn);
     537             : 
     538          80 :     if (exit_on_error)
     539           4 :         exit(1);
     540          76 : }
     541             : 
     542             : /*
     543             :  * Obtain the system identifier using the provided connection. It will be used
     544             :  * to compare if a data directory is a clone of another one.
     545             :  */
     546             : static uint64
     547          18 : get_primary_sysid(const char *conninfo)
     548             : {
     549             :     PGconn     *conn;
     550             :     PGresult   *res;
     551             :     uint64      sysid;
     552             : 
     553          18 :     pg_log_info("getting system identifier from publisher");
     554             : 
     555          18 :     conn = connect_database(conninfo, true);
     556             : 
     557          18 :     res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
     558          18 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     559             :     {
     560           0 :         pg_log_error("could not get system identifier: %s",
     561             :                      PQresultErrorMessage(res));
     562           0 :         disconnect_database(conn, true);
     563             :     }
     564          18 :     if (PQntuples(res) != 1)
     565             :     {
     566           0 :         pg_log_error("could not get system identifier: got %d rows, expected %d row",
     567             :                      PQntuples(res), 1);
     568           0 :         disconnect_database(conn, true);
     569             :     }
     570             : 
     571          18 :     sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
     572             : 
     573          18 :     pg_log_info("system identifier is %llu on publisher",
     574             :                 (unsigned long long) sysid);
     575             : 
     576          18 :     PQclear(res);
     577          18 :     disconnect_database(conn, false);
     578             : 
     579          18 :     return sysid;
     580             : }
     581             : 
     582             : /*
     583             :  * Obtain the system identifier from control file. It will be used to compare
     584             :  * if a data directory is a clone of another one. This routine is used locally
     585             :  * and avoids a connection.
     586             :  */
     587             : static uint64
     588          18 : get_standby_sysid(const char *datadir)
     589             : {
     590             :     ControlFileData *cf;
     591             :     bool        crc_ok;
     592             :     uint64      sysid;
     593             : 
     594          18 :     pg_log_info("getting system identifier from subscriber");
     595             : 
     596          18 :     cf = get_controlfile(datadir, &crc_ok);
     597          18 :     if (!crc_ok)
     598           0 :         pg_fatal("control file appears to be corrupt");
     599             : 
     600          18 :     sysid = cf->system_identifier;
     601             : 
     602          18 :     pg_log_info("system identifier is %llu on subscriber",
     603             :                 (unsigned long long) sysid);
     604             : 
     605          18 :     pg_free(cf);
     606             : 
     607          18 :     return sysid;
     608             : }
     609             : 
     610             : /*
     611             :  * Modify the system identifier. Since a standby server preserves the system
     612             :  * identifier, it makes sense to change it to avoid situations in which WAL
     613             :  * files from one of the systems might be used in the other one.
     614             :  */
     615             : static void
     616           6 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
     617             : {
     618             :     ControlFileData *cf;
     619             :     bool        crc_ok;
     620             :     struct timeval tv;
     621             : 
     622             :     char       *cmd_str;
     623             : 
     624           6 :     pg_log_info("modifying system identifier of subscriber");
     625             : 
     626           6 :     cf = get_controlfile(subscriber_dir, &crc_ok);
     627           6 :     if (!crc_ok)
     628           0 :         pg_fatal("control file appears to be corrupt");
     629             : 
     630             :     /*
     631             :      * Select a new system identifier.
     632             :      *
     633             :      * XXX this code was extracted from BootStrapXLOG().
     634             :      */
     635           6 :     gettimeofday(&tv, NULL);
     636           6 :     cf->system_identifier = ((uint64) tv.tv_sec) << 32;
     637           6 :     cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
     638           6 :     cf->system_identifier |= getpid() & 0xFFF;
     639             : 
     640           6 :     if (!dry_run)
     641           2 :         update_controlfile(subscriber_dir, cf, true);
     642             : 
     643           6 :     pg_log_info("system identifier is %llu on subscriber",
     644             :                 (unsigned long long) cf->system_identifier);
     645             : 
     646           6 :     pg_log_info("running pg_resetwal on the subscriber");
     647             : 
     648           6 :     cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
     649             :                        subscriber_dir, DEVNULL);
     650             : 
     651           6 :     pg_log_debug("pg_resetwal command is: %s", cmd_str);
     652             : 
     653           6 :     if (!dry_run)
     654             :     {
     655           2 :         int         rc = system(cmd_str);
     656             : 
     657           2 :         if (rc == 0)
     658           2 :             pg_log_info("subscriber successfully changed the system identifier");
     659             :         else
     660           0 :             pg_fatal("subscriber failed to change system identifier: exit code: %d", rc);
     661             :     }
     662             : 
     663           6 :     pg_free(cf);
     664           6 : }
     665             : 
     666             : /*
     667             :  * Generate an object name using a prefix, database oid and a random integer.
     668             :  * It is used in case the user does not specify an object name (publication,
     669             :  * subscription, replication slot).
     670             :  */
     671             : static char *
     672          10 : generate_object_name(PGconn *conn)
     673             : {
     674             :     PGresult   *res;
     675             :     Oid         oid;
     676             :     uint32      rand;
     677             :     char       *objname;
     678             : 
     679          10 :     res = PQexec(conn,
     680             :                  "SELECT oid FROM pg_catalog.pg_database "
     681             :                  "WHERE datname = pg_catalog.current_database()");
     682          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     683             :     {
     684           0 :         pg_log_error("could not obtain database OID: %s",
     685             :                      PQresultErrorMessage(res));
     686           0 :         disconnect_database(conn, true);
     687             :     }
     688             : 
     689          10 :     if (PQntuples(res) != 1)
     690             :     {
     691           0 :         pg_log_error("could not obtain database OID: got %d rows, expected %d row",
     692             :                      PQntuples(res), 1);
     693           0 :         disconnect_database(conn, true);
     694             :     }
     695             : 
     696             :     /* Database OID */
     697          10 :     oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
     698             : 
     699          10 :     PQclear(res);
     700             : 
     701             :     /* Random unsigned integer */
     702          10 :     rand = pg_prng_uint32(&prng_state);
     703             : 
     704             :     /*
     705             :      * Build the object name. The name must not exceed NAMEDATALEN - 1. This
     706             :      * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
     707             :      * '\0').
     708             :      */
     709          10 :     objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
     710             : 
     711          10 :     return objname;
     712             : }
     713             : 
     714             : /*
     715             :  * Create the publications and replication slots in preparation for logical
     716             :  * replication. Returns the LSN from latest replication slot. It will be the
     717             :  * replication start point that is used to adjust the subscriptions (see
     718             :  * set_replication_progress).
     719             :  */
     720             : static char *
     721           6 : setup_publisher(struct LogicalRepInfo *dbinfo)
     722             : {
     723           6 :     char       *lsn = NULL;
     724             : 
     725           6 :     pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
     726             : 
     727          16 :     for (int i = 0; i < num_dbs; i++)
     728             :     {
     729             :         PGconn     *conn;
     730          10 :         char       *genname = NULL;
     731             : 
     732          10 :         conn = connect_database(dbinfo[i].pubconninfo, true);
     733             : 
     734             :         /*
     735             :          * If an object name was not specified as command-line options, assign
     736             :          * a generated object name. The replication slot has a different rule.
     737             :          * The subscription name is assigned to the replication slot name if
     738             :          * no replication slot is specified. It follows the same rule as
     739             :          * CREATE SUBSCRIPTION.
     740             :          */
     741          10 :         if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
     742          10 :             genname = generate_object_name(conn);
     743          10 :         if (num_pubs == 0)
     744           2 :             dbinfo[i].pubname = pg_strdup(genname);
     745          10 :         if (num_subs == 0)
     746           6 :             dbinfo[i].subname = pg_strdup(genname);
     747          10 :         if (num_replslots == 0)
     748           4 :             dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
     749             : 
     750             :         /*
     751             :          * Create publication on publisher. This step should be executed
     752             :          * *before* promoting the subscriber to avoid any transactions between
     753             :          * consistent LSN and the new publication rows (such transactions
     754             :          * wouldn't see the new publication rows resulting in an error).
     755             :          */
     756          10 :         create_publication(conn, &dbinfo[i]);
     757             : 
     758             :         /* Create replication slot on publisher */
     759          10 :         if (lsn)
     760           2 :             pg_free(lsn);
     761          10 :         lsn = create_logical_replication_slot(conn, &dbinfo[i]);
     762          10 :         if (lsn != NULL || dry_run)
     763          10 :             pg_log_info("create replication slot \"%s\" on publisher",
     764             :                         dbinfo[i].replslotname);
     765             :         else
     766           0 :             exit(1);
     767             : 
     768          10 :         disconnect_database(conn, false);
     769             :     }
     770             : 
     771           6 :     return lsn;
     772             : }
     773             : 
     774             : /*
     775             :  * Is recovery still in progress?
     776             :  */
     777             : static bool
     778          48 : server_is_in_recovery(PGconn *conn)
     779             : {
     780             :     PGresult   *res;
     781             :     int         ret;
     782             : 
     783          48 :     res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
     784             : 
     785          48 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     786             :     {
     787           0 :         pg_log_error("could not obtain recovery progress: %s",
     788             :                      PQresultErrorMessage(res));
     789           0 :         disconnect_database(conn, true);
     790             :     }
     791             : 
     792             : 
     793          48 :     ret = strcmp("t", PQgetvalue(res, 0, 0));
     794             : 
     795          48 :     PQclear(res);
     796             : 
     797          48 :     return ret == 0;
     798             : }
     799             : 
     800             : /*
     801             :  * Is the primary server ready for logical replication?
     802             :  *
     803             :  * XXX Does it not allow a synchronous replica?
     804             :  */
     805             : static void
     806          10 : check_publisher(const struct LogicalRepInfo *dbinfo)
     807             : {
     808             :     PGconn     *conn;
     809             :     PGresult   *res;
     810          10 :     bool        failed = false;
     811             : 
     812             :     char       *wal_level;
     813             :     int         max_repslots;
     814             :     int         cur_repslots;
     815             :     int         max_walsenders;
     816             :     int         cur_walsenders;
     817             : 
     818          10 :     pg_log_info("checking settings on publisher");
     819             : 
     820          10 :     conn = connect_database(dbinfo[0].pubconninfo, true);
     821             : 
     822             :     /*
     823             :      * If the primary server is in recovery (i.e. cascading replication),
     824             :      * objects (publication) cannot be created because it is read only.
     825             :      */
     826          10 :     if (server_is_in_recovery(conn))
     827             :     {
     828           2 :         pg_log_error("primary server cannot be in recovery");
     829           2 :         disconnect_database(conn, true);
     830             :     }
     831             : 
     832             :     /*------------------------------------------------------------------------
     833             :      * Logical replication requires a few parameters to be set on publisher.
     834             :      * Since these parameters are not a requirement for physical replication,
     835             :      * we should check it to make sure it won't fail.
     836             :      *
     837             :      * - wal_level = logical
     838             :      * - max_replication_slots >= current + number of dbs to be converted
     839             :      * - max_wal_senders >= current + number of dbs to be converted
     840             :      * -----------------------------------------------------------------------
     841             :      */
     842           8 :     res = PQexec(conn,
     843             :                  "WITH wl AS "
     844             :                  "(SELECT setting AS wallevel FROM pg_catalog.pg_settings "
     845             :                  "WHERE name = 'wal_level'), "
     846             :                  "total_mrs AS "
     847             :                  "(SELECT setting AS tmrs FROM pg_catalog.pg_settings "
     848             :                  "WHERE name = 'max_replication_slots'), "
     849             :                  "cur_mrs AS "
     850             :                  "(SELECT count(*) AS cmrs "
     851             :                  "FROM pg_catalog.pg_replication_slots), "
     852             :                  "total_mws AS "
     853             :                  "(SELECT setting AS tmws FROM pg_catalog.pg_settings "
     854             :                  "WHERE name = 'max_wal_senders'), "
     855             :                  "cur_mws AS "
     856             :                  "(SELECT count(*) AS cmws FROM pg_catalog.pg_stat_activity "
     857             :                  "WHERE backend_type = 'walsender') "
     858             :                  "SELECT wallevel, tmrs, cmrs, tmws, cmws "
     859             :                  "FROM wl, total_mrs, cur_mrs, total_mws, cur_mws");
     860             : 
     861           8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     862             :     {
     863           0 :         pg_log_error("could not obtain publisher settings: %s",
     864             :                      PQresultErrorMessage(res));
     865           0 :         disconnect_database(conn, true);
     866             :     }
     867             : 
     868           8 :     wal_level = pg_strdup(PQgetvalue(res, 0, 0));
     869           8 :     max_repslots = atoi(PQgetvalue(res, 0, 1));
     870           8 :     cur_repslots = atoi(PQgetvalue(res, 0, 2));
     871           8 :     max_walsenders = atoi(PQgetvalue(res, 0, 3));
     872           8 :     cur_walsenders = atoi(PQgetvalue(res, 0, 4));
     873             : 
     874           8 :     PQclear(res);
     875             : 
     876           8 :     pg_log_debug("publisher: wal_level: %s", wal_level);
     877           8 :     pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
     878           8 :     pg_log_debug("publisher: current replication slots: %d", cur_repslots);
     879           8 :     pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
     880           8 :     pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
     881             : 
     882             :     /*
     883             :      * If standby sets primary_slot_name, check if this replication slot is in
     884             :      * use on primary for WAL retention purposes. This replication slot has no
     885             :      * use after the transformation, hence, it will be removed at the end of
     886             :      * this process.
     887             :      */
     888           8 :     if (primary_slot_name)
     889             :     {
     890           8 :         PQExpBuffer str = createPQExpBuffer();
     891           8 :         char       *psn_esc = PQescapeLiteral(conn, primary_slot_name, strlen(primary_slot_name));
     892             : 
     893           8 :         appendPQExpBuffer(str,
     894             :                           "SELECT 1 FROM pg_catalog.pg_replication_slots "
     895             :                           "WHERE active AND slot_name = %s",
     896             :                           psn_esc);
     897             : 
     898           8 :         pg_free(psn_esc);
     899             : 
     900           8 :         pg_log_debug("command is: %s", str->data);
     901             : 
     902           8 :         res = PQexec(conn, str->data);
     903           8 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     904             :         {
     905           0 :             pg_log_error("could not obtain replication slot information: %s",
     906             :                          PQresultErrorMessage(res));
     907           0 :             disconnect_database(conn, true);
     908             :         }
     909             : 
     910           8 :         if (PQntuples(res) != 1)
     911             :         {
     912           0 :             pg_log_error("could not obtain replication slot information: got %d rows, expected %d row",
     913             :                          PQntuples(res), 1);
     914           0 :             disconnect_database(conn, true);
     915             :         }
     916             :         else
     917           8 :             pg_log_info("primary has replication slot \"%s\"",
     918             :                         primary_slot_name);
     919             : 
     920           8 :         PQclear(res);
     921             :     }
     922             : 
     923           8 :     disconnect_database(conn, false);
     924             : 
     925           8 :     if (strcmp(wal_level, "logical") != 0)
     926             :     {
     927           2 :         pg_log_error("publisher requires wal_level >= \"logical\"");
     928           2 :         failed = true;
     929             :     }
     930             : 
     931           8 :     if (max_repslots - cur_repslots < num_dbs)
     932             :     {
     933           2 :         pg_log_error("publisher requires %d replication slots, but only %d remain",
     934             :                      num_dbs, max_repslots - cur_repslots);
     935           2 :         pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
     936             :                           cur_repslots + num_dbs);
     937           2 :         failed = true;
     938             :     }
     939             : 
     940           8 :     if (max_walsenders - cur_walsenders < num_dbs)
     941             :     {
     942           2 :         pg_log_error("publisher requires %d wal sender processes, but only %d remain",
     943             :                      num_dbs, max_walsenders - cur_walsenders);
     944           2 :         pg_log_error_hint("Consider increasing max_wal_senders to at least %d.",
     945             :                           cur_walsenders + num_dbs);
     946           2 :         failed = true;
     947             :     }
     948             : 
     949           8 :     pg_free(wal_level);
     950             : 
     951           8 :     if (failed)
     952           2 :         exit(1);
     953           6 : }
     954             : 
     955             : /*
     956             :  * Is the standby server ready for logical replication?
     957             :  *
     958             :  * XXX Does it not allow a time-delayed replica?
     959             :  *
     960             :  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
     961             :  * is S, it cannot detect there is a replica (server C) because server S starts
     962             :  * accepting only local connections and server C cannot connect to it. Hence,
     963             :  * there is not a reliable way to provide a suitable error saying the server C
     964             :  * will be broken at the end of this process (due to pg_resetwal).
     965             :  */
     966             : static void
     967          14 : check_subscriber(const struct LogicalRepInfo *dbinfo)
     968             : {
     969             :     PGconn     *conn;
     970             :     PGresult   *res;
     971          14 :     bool        failed = false;
     972             : 
     973             :     int         max_lrworkers;
     974             :     int         max_repslots;
     975             :     int         max_wprocs;
     976             : 
     977          14 :     pg_log_info("checking settings on subscriber");
     978             : 
     979          14 :     conn = connect_database(dbinfo[0].subconninfo, true);
     980             : 
     981             :     /* The target server must be a standby */
     982          14 :     if (!server_is_in_recovery(conn))
     983             :     {
     984           2 :         pg_log_error("target server must be a standby");
     985           2 :         disconnect_database(conn, true);
     986             :     }
     987             : 
     988             :     /*------------------------------------------------------------------------
     989             :      * Logical replication requires a few parameters to be set on subscriber.
     990             :      * Since these parameters are not a requirement for physical replication,
     991             :      * we should check it to make sure it won't fail.
     992             :      *
     993             :      * - max_replication_slots >= number of dbs to be converted
     994             :      * - max_logical_replication_workers >= number of dbs to be converted
     995             :      * - max_worker_processes >= 1 + number of dbs to be converted
     996             :      *------------------------------------------------------------------------
     997             :      */
     998          12 :     res = PQexec(conn,
     999             :                  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
    1000             :                  "'max_logical_replication_workers', "
    1001             :                  "'max_replication_slots', "
    1002             :                  "'max_worker_processes', "
    1003             :                  "'primary_slot_name') "
    1004             :                  "ORDER BY name");
    1005             : 
    1006          12 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1007             :     {
    1008           0 :         pg_log_error("could not obtain subscriber settings: %s",
    1009             :                      PQresultErrorMessage(res));
    1010           0 :         disconnect_database(conn, true);
    1011             :     }
    1012             : 
    1013          12 :     max_lrworkers = atoi(PQgetvalue(res, 0, 0));
    1014          12 :     max_repslots = atoi(PQgetvalue(res, 1, 0));
    1015          12 :     max_wprocs = atoi(PQgetvalue(res, 2, 0));
    1016          12 :     if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
    1017          10 :         primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
    1018             : 
    1019          12 :     pg_log_debug("subscriber: max_logical_replication_workers: %d",
    1020             :                  max_lrworkers);
    1021          12 :     pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
    1022          12 :     pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
    1023          12 :     if (primary_slot_name)
    1024          10 :         pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
    1025             : 
    1026          12 :     PQclear(res);
    1027             : 
    1028          12 :     disconnect_database(conn, false);
    1029             : 
    1030          12 :     if (max_repslots < num_dbs)
    1031             :     {
    1032           2 :         pg_log_error("subscriber requires %d replication slots, but only %d remain",
    1033             :                      num_dbs, max_repslots);
    1034           2 :         pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
    1035             :                           num_dbs);
    1036           2 :         failed = true;
    1037             :     }
    1038             : 
    1039          12 :     if (max_lrworkers < num_dbs)
    1040             :     {
    1041           2 :         pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
    1042             :                      num_dbs, max_lrworkers);
    1043           2 :         pg_log_error_hint("Consider increasing max_logical_replication_workers to at least %d.",
    1044             :                           num_dbs);
    1045           2 :         failed = true;
    1046             :     }
    1047             : 
    1048          12 :     if (max_wprocs < num_dbs + 1)
    1049             :     {
    1050           2 :         pg_log_error("subscriber requires %d worker processes, but only %d remain",
    1051             :                      num_dbs + 1, max_wprocs);
    1052           2 :         pg_log_error_hint("Consider increasing max_worker_processes to at least %d.",
    1053             :                           num_dbs + 1);
    1054           2 :         failed = true;
    1055             :     }
    1056             : 
    1057          12 :     if (failed)
    1058           2 :         exit(1);
    1059          10 : }
    1060             : 
    1061             : /*
    1062             :  * Create the subscriptions, adjust the initial location for logical
    1063             :  * replication and enable the subscriptions. That's the last step for logical
    1064             :  * replication setup.
    1065             :  */
    1066             : static void
    1067           6 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
    1068             : {
    1069          16 :     for (int i = 0; i < num_dbs; i++)
    1070             :     {
    1071             :         PGconn     *conn;
    1072             : 
    1073             :         /* Connect to subscriber. */
    1074          10 :         conn = connect_database(dbinfo[i].subconninfo, true);
    1075             : 
    1076             :         /*
    1077             :          * Since the publication was created before the consistent LSN, it is
    1078             :          * available on the subscriber when the physical replica is promoted.
    1079             :          * Remove publications from the subscriber because it has no use.
    1080             :          */
    1081          10 :         drop_publication(conn, &dbinfo[i]);
    1082             : 
    1083          10 :         create_subscription(conn, &dbinfo[i]);
    1084             : 
    1085             :         /* Set the replication progress to the correct LSN */
    1086          10 :         set_replication_progress(conn, &dbinfo[i], consistent_lsn);
    1087             : 
    1088             :         /* Enable subscription */
    1089          10 :         enable_subscription(conn, &dbinfo[i]);
    1090             : 
    1091          10 :         disconnect_database(conn, false);
    1092             :     }
    1093           6 : }
    1094             : 
    1095             : /*
    1096             :  * Write the required recovery parameters.
    1097             :  */
    1098             : static void
    1099           6 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
    1100             : {
    1101             :     PGconn     *conn;
    1102             :     PQExpBuffer recoveryconfcontents;
    1103             : 
    1104             :     /*
    1105             :      * Despite of the recovery parameters will be written to the subscriber,
    1106             :      * use a publisher connection. The primary_conninfo is generated using the
    1107             :      * connection settings.
    1108             :      */
    1109           6 :     conn = connect_database(dbinfo[0].pubconninfo, true);
    1110             : 
    1111             :     /*
    1112             :      * Write recovery parameters.
    1113             :      *
    1114             :      * The subscriber is not running yet. In dry run mode, the recovery
    1115             :      * parameters *won't* be written. An invalid LSN is used for printing
    1116             :      * purposes. Additional recovery parameters are added here. It avoids
    1117             :      * unexpected behavior such as end of recovery as soon as a consistent
    1118             :      * state is reached (recovery_target) and failure due to multiple recovery
    1119             :      * targets (name, time, xid, LSN).
    1120             :      */
    1121           6 :     recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
    1122           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
    1123           6 :     appendPQExpBuffer(recoveryconfcontents,
    1124             :                       "recovery_target_timeline = 'latest'\n");
    1125           6 :     appendPQExpBuffer(recoveryconfcontents,
    1126             :                       "recovery_target_inclusive = true\n");
    1127           6 :     appendPQExpBuffer(recoveryconfcontents,
    1128             :                       "recovery_target_action = promote\n");
    1129           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
    1130           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
    1131           6 :     appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
    1132             : 
    1133           6 :     if (dry_run)
    1134             :     {
    1135           4 :         appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
    1136           4 :         appendPQExpBuffer(recoveryconfcontents,
    1137             :                           "recovery_target_lsn = '%X/%X'\n",
    1138           4 :                           LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1139             :     }
    1140             :     else
    1141             :     {
    1142           2 :         appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
    1143             :                           lsn);
    1144           2 :         WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
    1145             :     }
    1146           6 :     disconnect_database(conn, false);
    1147             : 
    1148           6 :     pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
    1149           6 : }
    1150             : 
    1151             : /*
    1152             :  * Drop physical replication slot on primary if the standby was using it. After
    1153             :  * the transformation, it has no use.
    1154             :  *
    1155             :  * XXX we might not fail here. Instead, we provide a warning so the user
    1156             :  * eventually drops this replication slot later.
    1157             :  */
    1158             : static void
    1159           6 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
    1160             : {
    1161             :     PGconn     *conn;
    1162             : 
    1163             :     /* Replication slot does not exist, do nothing */
    1164           6 :     if (!primary_slot_name)
    1165           0 :         return;
    1166             : 
    1167           6 :     conn = connect_database(dbinfo[0].pubconninfo, false);
    1168           6 :     if (conn != NULL)
    1169             :     {
    1170           6 :         drop_replication_slot(conn, &dbinfo[0], slotname);
    1171           6 :         disconnect_database(conn, false);
    1172             :     }
    1173             :     else
    1174             :     {
    1175           0 :         pg_log_warning("could not drop replication slot \"%s\" on primary",
    1176             :                        slotname);
    1177           0 :         pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
    1178             :     }
    1179             : }
    1180             : 
    1181             : /*
    1182             :  * Create a logical replication slot and returns a LSN.
    1183             :  *
    1184             :  * CreateReplicationSlot() is not used because it does not provide the one-row
    1185             :  * result set that contains the LSN.
    1186             :  */
    1187             : static char *
    1188          10 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1189             : {
    1190          10 :     PQExpBuffer str = createPQExpBuffer();
    1191          10 :     PGresult   *res = NULL;
    1192          10 :     const char *slot_name = dbinfo->replslotname;
    1193             :     char       *slot_name_esc;
    1194          10 :     char       *lsn = NULL;
    1195             : 
    1196             :     Assert(conn != NULL);
    1197             : 
    1198          10 :     pg_log_info("creating the replication slot \"%s\" on database \"%s\"",
    1199             :                 slot_name, dbinfo->dbname);
    1200             : 
    1201          10 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1202             : 
    1203          10 :     appendPQExpBuffer(str,
    1204             :                       "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
    1205             :                       slot_name_esc);
    1206             : 
    1207          10 :     pg_free(slot_name_esc);
    1208             : 
    1209          10 :     pg_log_debug("command is: %s", str->data);
    1210             : 
    1211          10 :     if (!dry_run)
    1212             :     {
    1213           4 :         res = PQexec(conn, str->data);
    1214           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1215             :         {
    1216           0 :             pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s",
    1217             :                          slot_name, dbinfo->dbname,
    1218             :                          PQresultErrorMessage(res));
    1219           0 :             PQclear(res);
    1220           0 :             destroyPQExpBuffer(str);
    1221           0 :             return NULL;
    1222             :         }
    1223             : 
    1224           4 :         lsn = pg_strdup(PQgetvalue(res, 0, 0));
    1225           4 :         PQclear(res);
    1226             :     }
    1227             : 
    1228             :     /* For cleanup purposes */
    1229          10 :     dbinfo->made_replslot = true;
    1230             : 
    1231          10 :     destroyPQExpBuffer(str);
    1232             : 
    1233          10 :     return lsn;
    1234             : }
    1235             : 
    1236             : static void
    1237           6 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
    1238             :                       const char *slot_name)
    1239             : {
    1240           6 :     PQExpBuffer str = createPQExpBuffer();
    1241             :     char       *slot_name_esc;
    1242             :     PGresult   *res;
    1243             : 
    1244             :     Assert(conn != NULL);
    1245             : 
    1246           6 :     pg_log_info("dropping the replication slot \"%s\" on database \"%s\"",
    1247             :                 slot_name, dbinfo->dbname);
    1248             : 
    1249           6 :     slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
    1250             : 
    1251           6 :     appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
    1252             : 
    1253           6 :     pg_free(slot_name_esc);
    1254             : 
    1255           6 :     pg_log_debug("command is: %s", str->data);
    1256             : 
    1257           6 :     if (!dry_run)
    1258             :     {
    1259           2 :         res = PQexec(conn, str->data);
    1260           2 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1261             :         {
    1262           0 :             pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s",
    1263             :                          slot_name, dbinfo->dbname, PQresultErrorMessage(res));
    1264           0 :             dbinfo->made_replslot = false;   /* don't try again. */
    1265             :         }
    1266             : 
    1267           2 :         PQclear(res);
    1268             :     }
    1269             : 
    1270           6 :     destroyPQExpBuffer(str);
    1271           6 : }
    1272             : 
    1273             : /*
    1274             :  * Reports a suitable message if pg_ctl fails.
    1275             :  */
    1276             : static void
    1277          40 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
    1278             : {
    1279          40 :     if (rc != 0)
    1280             :     {
    1281           0 :         if (WIFEXITED(rc))
    1282             :         {
    1283           0 :             pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
    1284             :         }
    1285           0 :         else if (WIFSIGNALED(rc))
    1286             :         {
    1287             : #if defined(WIN32)
    1288             :             pg_log_error("pg_ctl was terminated by exception 0x%X",
    1289             :                          WTERMSIG(rc));
    1290             :             pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
    1291             : #else
    1292           0 :             pg_log_error("pg_ctl was terminated by signal %d: %s",
    1293             :                          WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
    1294             : #endif
    1295             :         }
    1296             :         else
    1297             :         {
    1298           0 :             pg_log_error("pg_ctl exited with unrecognized status %d", rc);
    1299             :         }
    1300             : 
    1301           0 :         pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
    1302           0 :         exit(1);
    1303             :     }
    1304          40 : }
    1305             : 
    1306             : static void
    1307          20 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
    1308             : {
    1309          20 :     PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
    1310             :     int         rc;
    1311             : 
    1312          20 :     appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s",
    1313             :                       pg_ctl_path, subscriber_dir);
    1314          20 :     if (restricted_access)
    1315             :     {
    1316          20 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
    1317             : #if !defined(WIN32)
    1318             : 
    1319             :         /*
    1320             :          * An empty listen_addresses list means the server does not listen on
    1321             :          * any IP interfaces; only Unix-domain sockets can be used to connect
    1322             :          * to the server. Prevent external connections to minimize the chance
    1323             :          * of failure.
    1324             :          */
    1325          20 :         appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
    1326          20 :         if (opt->socket_dir)
    1327          20 :             appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
    1328             :                               opt->socket_dir);
    1329          20 :         appendPQExpBufferChar(pg_ctl_cmd, '"');
    1330             : #endif
    1331             :     }
    1332          20 :     if (opt->config_file != NULL)
    1333           0 :         appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
    1334             :                           opt->config_file);
    1335          20 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
    1336          20 :     rc = system(pg_ctl_cmd->data);
    1337          20 :     pg_ctl_status(pg_ctl_cmd->data, rc);
    1338          20 :     standby_running = true;
    1339          20 :     destroyPQExpBuffer(pg_ctl_cmd);
    1340          20 :     pg_log_info("server was started");
    1341          20 : }
    1342             : 
    1343             : static void
    1344          20 : stop_standby_server(const char *datadir)
    1345             : {
    1346             :     char       *pg_ctl_cmd;
    1347             :     int         rc;
    1348             : 
    1349          20 :     pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
    1350             :                           datadir);
    1351          20 :     pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
    1352          20 :     rc = system(pg_ctl_cmd);
    1353          20 :     pg_ctl_status(pg_ctl_cmd, rc);
    1354          20 :     standby_running = false;
    1355          20 :     pg_log_info("server was stopped");
    1356          20 : }
    1357             : 
    1358             : /*
    1359             :  * Returns after the server finishes the recovery process.
    1360             :  *
    1361             :  * If recovery_timeout option is set, terminate abnormally without finishing
    1362             :  * the recovery process. By default, it waits forever.
    1363             :  */
    1364             : static void
    1365           6 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
    1366             : {
    1367             :     PGconn     *conn;
    1368           6 :     int         status = POSTMASTER_STILL_STARTING;
    1369           6 :     int         timer = 0;
    1370           6 :     int         count = 0;      /* number of consecutive connection attempts */
    1371             : 
    1372             : #define NUM_CONN_ATTEMPTS   10
    1373             : 
    1374           6 :     pg_log_info("waiting for the target server to reach the consistent state");
    1375             : 
    1376           6 :     conn = connect_database(conninfo, true);
    1377             : 
    1378             :     for (;;)
    1379          18 :     {
    1380             :         PGresult   *res;
    1381          24 :         bool        in_recovery = server_is_in_recovery(conn);
    1382             : 
    1383             :         /*
    1384             :          * Does the recovery process finish? In dry run mode, there is no
    1385             :          * recovery mode. Bail out as the recovery process has ended.
    1386             :          */
    1387          24 :         if (!in_recovery || dry_run)
    1388             :         {
    1389           6 :             status = POSTMASTER_READY;
    1390           6 :             recovery_ended = true;
    1391           6 :             break;
    1392             :         }
    1393             : 
    1394             :         /*
    1395             :          * If it is still in recovery, make sure the target server is
    1396             :          * connected to the primary so it can receive the required WAL to
    1397             :          * finish the recovery process. If it is disconnected try
    1398             :          * NUM_CONN_ATTEMPTS in a row and bail out if not succeed.
    1399             :          */
    1400          18 :         res = PQexec(conn,
    1401             :                      "SELECT 1 FROM pg_catalog.pg_stat_wal_receiver");
    1402          18 :         if (PQntuples(res) == 0)
    1403             :         {
    1404           0 :             if (++count > NUM_CONN_ATTEMPTS)
    1405             :             {
    1406           0 :                 stop_standby_server(subscriber_dir);
    1407           0 :                 pg_log_error("standby server disconnected from the primary");
    1408           0 :                 break;
    1409             :             }
    1410             :         }
    1411             :         else
    1412          18 :             count = 0;          /* reset counter if it connects again */
    1413             : 
    1414          18 :         PQclear(res);
    1415             : 
    1416             :         /* Bail out after recovery_timeout seconds if this option is set */
    1417          18 :         if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
    1418             :         {
    1419           0 :             stop_standby_server(subscriber_dir);
    1420           0 :             pg_log_error("recovery timed out");
    1421           0 :             disconnect_database(conn, true);
    1422             :         }
    1423             : 
    1424             :         /* Keep waiting */
    1425          18 :         pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
    1426             : 
    1427          18 :         timer += WAIT_INTERVAL;
    1428             :     }
    1429             : 
    1430           6 :     disconnect_database(conn, false);
    1431             : 
    1432           6 :     if (status == POSTMASTER_STILL_STARTING)
    1433           0 :         pg_fatal("server did not end recovery");
    1434             : 
    1435           6 :     pg_log_info("target server reached the consistent state");
    1436           6 :     pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
    1437           6 : }
    1438             : 
    1439             : /*
    1440             :  * Create a publication that includes all tables in the database.
    1441             :  */
    1442             : static void
    1443          10 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1444             : {
    1445          10 :     PQExpBuffer str = createPQExpBuffer();
    1446             :     PGresult   *res;
    1447             :     char       *ipubname_esc;
    1448             :     char       *spubname_esc;
    1449             : 
    1450             :     Assert(conn != NULL);
    1451             : 
    1452          10 :     ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1453          10 :     spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1454             : 
    1455             :     /* Check if the publication already exists */
    1456          10 :     appendPQExpBuffer(str,
    1457             :                       "SELECT 1 FROM pg_catalog.pg_publication "
    1458             :                       "WHERE pubname = %s",
    1459             :                       spubname_esc);
    1460          10 :     res = PQexec(conn, str->data);
    1461          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1462             :     {
    1463           0 :         pg_log_error("could not obtain publication information: %s",
    1464             :                      PQresultErrorMessage(res));
    1465           0 :         disconnect_database(conn, true);
    1466             :     }
    1467             : 
    1468          10 :     if (PQntuples(res) == 1)
    1469             :     {
    1470             :         /*
    1471             :          * Unfortunately, if it reaches this code path, it will always fail
    1472             :          * (unless you decide to change the existing publication name). That's
    1473             :          * bad but it is very unlikely that the user will choose a name with
    1474             :          * pg_createsubscriber_ prefix followed by the exact database oid and
    1475             :          * a random number.
    1476             :          */
    1477           0 :         pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
    1478           0 :         pg_log_error_hint("Consider renaming this publication before continuing.");
    1479           0 :         disconnect_database(conn, true);
    1480             :     }
    1481             : 
    1482          10 :     PQclear(res);
    1483          10 :     resetPQExpBuffer(str);
    1484             : 
    1485          10 :     pg_log_info("creating publication \"%s\" on database \"%s\"",
    1486             :                 dbinfo->pubname, dbinfo->dbname);
    1487             : 
    1488          10 :     appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
    1489             :                       ipubname_esc);
    1490             : 
    1491          10 :     pg_log_debug("command is: %s", str->data);
    1492             : 
    1493          10 :     if (!dry_run)
    1494             :     {
    1495           4 :         res = PQexec(conn, str->data);
    1496           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1497             :         {
    1498           0 :             pg_log_error("could not create publication \"%s\" on database \"%s\": %s",
    1499             :                          dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1500           0 :             disconnect_database(conn, true);
    1501             :         }
    1502           4 :         PQclear(res);
    1503             :     }
    1504             : 
    1505             :     /* For cleanup purposes */
    1506          10 :     dbinfo->made_publication = true;
    1507             : 
    1508          10 :     pg_free(ipubname_esc);
    1509          10 :     pg_free(spubname_esc);
    1510          10 :     destroyPQExpBuffer(str);
    1511          10 : }
    1512             : 
    1513             : /*
    1514             :  * Remove publication if it couldn't finish all steps.
    1515             :  */
    1516             : static void
    1517          10 : drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
    1518             : {
    1519          10 :     PQExpBuffer str = createPQExpBuffer();
    1520             :     PGresult   *res;
    1521             :     char       *pubname_esc;
    1522             : 
    1523             :     Assert(conn != NULL);
    1524             : 
    1525          10 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1526             : 
    1527          10 :     pg_log_info("dropping publication \"%s\" on database \"%s\"",
    1528             :                 dbinfo->pubname, dbinfo->dbname);
    1529             : 
    1530          10 :     appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
    1531             : 
    1532          10 :     pg_free(pubname_esc);
    1533             : 
    1534          10 :     pg_log_debug("command is: %s", str->data);
    1535             : 
    1536          10 :     if (!dry_run)
    1537             :     {
    1538           4 :         res = PQexec(conn, str->data);
    1539           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1540             :         {
    1541           0 :             pg_log_error("could not drop publication \"%s\" on database \"%s\": %s",
    1542             :                          dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
    1543           0 :             dbinfo->made_publication = false;    /* don't try again. */
    1544             : 
    1545             :             /*
    1546             :              * Don't disconnect and exit here. This routine is used by primary
    1547             :              * (cleanup publication / replication slot due to an error) and
    1548             :              * subscriber (remove the replicated publications). In both cases,
    1549             :              * it can continue and provide instructions for the user to remove
    1550             :              * it later if cleanup fails.
    1551             :              */
    1552             :         }
    1553           4 :         PQclear(res);
    1554             :     }
    1555             : 
    1556          10 :     destroyPQExpBuffer(str);
    1557          10 : }
    1558             : 
    1559             : /*
    1560             :  * Create a subscription with some predefined options.
    1561             :  *
    1562             :  * A replication slot was already created in a previous step. Let's use it.  It
    1563             :  * is not required to copy data. The subscription will be created but it will
    1564             :  * not be enabled now. That's because the replication progress must be set and
    1565             :  * the replication origin name (one of the function arguments) contains the
    1566             :  * subscription OID in its name. Once the subscription is created,
    1567             :  * set_replication_progress() can obtain the chosen origin name and set up its
    1568             :  * initial location.
    1569             :  */
    1570             : static void
    1571          10 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1572             : {
    1573          10 :     PQExpBuffer str = createPQExpBuffer();
    1574             :     PGresult   *res;
    1575             :     char       *pubname_esc;
    1576             :     char       *subname_esc;
    1577             :     char       *pubconninfo_esc;
    1578             :     char       *replslotname_esc;
    1579             : 
    1580             :     Assert(conn != NULL);
    1581             : 
    1582          10 :     pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
    1583          10 :     subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1584          10 :     pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
    1585          10 :     replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
    1586             : 
    1587          10 :     pg_log_info("creating subscription \"%s\" on database \"%s\"",
    1588             :                 dbinfo->subname, dbinfo->dbname);
    1589             : 
    1590          10 :     appendPQExpBuffer(str,
    1591             :                       "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
    1592             :                       "WITH (create_slot = false, enabled = false, "
    1593             :                       "slot_name = %s, copy_data = false)",
    1594             :                       subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
    1595             : 
    1596          10 :     pg_free(pubname_esc);
    1597          10 :     pg_free(subname_esc);
    1598          10 :     pg_free(pubconninfo_esc);
    1599          10 :     pg_free(replslotname_esc);
    1600             : 
    1601          10 :     pg_log_debug("command is: %s", str->data);
    1602             : 
    1603          10 :     if (!dry_run)
    1604             :     {
    1605           4 :         res = PQexec(conn, str->data);
    1606           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1607             :         {
    1608           0 :             pg_log_error("could not create subscription \"%s\" on database \"%s\": %s",
    1609             :                          dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
    1610           0 :             disconnect_database(conn, true);
    1611             :         }
    1612           4 :         PQclear(res);
    1613             :     }
    1614             : 
    1615          10 :     destroyPQExpBuffer(str);
    1616          10 : }
    1617             : 
    1618             : /*
    1619             :  * Sets the replication progress to the consistent LSN.
    1620             :  *
    1621             :  * The subscriber caught up to the consistent LSN provided by the last
    1622             :  * replication slot that was created. The goal is to set up the initial
    1623             :  * location for the logical replication that is the exact LSN that the
    1624             :  * subscriber was promoted. Once the subscription is enabled it will start
    1625             :  * streaming from that location onwards.  In dry run mode, the subscription OID
    1626             :  * and LSN are set to invalid values for printing purposes.
    1627             :  */
    1628             : static void
    1629          10 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
    1630             : {
    1631          10 :     PQExpBuffer str = createPQExpBuffer();
    1632             :     PGresult   *res;
    1633             :     Oid         suboid;
    1634             :     char       *subname;
    1635             :     char       *dbname;
    1636             :     char       *originname;
    1637             :     char       *lsnstr;
    1638             : 
    1639             :     Assert(conn != NULL);
    1640             : 
    1641          10 :     subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
    1642          10 :     dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
    1643             : 
    1644          10 :     appendPQExpBuffer(str,
    1645             :                       "SELECT s.oid FROM pg_catalog.pg_subscription s "
    1646             :                       "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
    1647             :                       "WHERE s.subname = %s AND d.datname = %s",
    1648             :                       subname, dbname);
    1649             : 
    1650          10 :     res = PQexec(conn, str->data);
    1651          10 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1652             :     {
    1653           0 :         pg_log_error("could not obtain subscription OID: %s",
    1654             :                      PQresultErrorMessage(res));
    1655           0 :         disconnect_database(conn, true);
    1656             :     }
    1657             : 
    1658          10 :     if (PQntuples(res) != 1 && !dry_run)
    1659             :     {
    1660           0 :         pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
    1661             :                      PQntuples(res), 1);
    1662           0 :         disconnect_database(conn, true);
    1663             :     }
    1664             : 
    1665          10 :     if (dry_run)
    1666             :     {
    1667           6 :         suboid = InvalidOid;
    1668           6 :         lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
    1669             :     }
    1670             :     else
    1671             :     {
    1672           4 :         suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
    1673           4 :         lsnstr = psprintf("%s", lsn);
    1674             :     }
    1675             : 
    1676          10 :     PQclear(res);
    1677             : 
    1678             :     /*
    1679             :      * The origin name is defined as pg_%u. %u is the subscription OID. See
    1680             :      * ApplyWorkerMain().
    1681             :      */
    1682          10 :     originname = psprintf("pg_%u", suboid);
    1683             : 
    1684          10 :     pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
    1685             :                 originname, lsnstr, dbinfo->dbname);
    1686             : 
    1687          10 :     resetPQExpBuffer(str);
    1688          10 :     appendPQExpBuffer(str,
    1689             :                       "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
    1690             :                       originname, lsnstr);
    1691             : 
    1692          10 :     pg_log_debug("command is: %s", str->data);
    1693             : 
    1694          10 :     if (!dry_run)
    1695             :     {
    1696           4 :         res = PQexec(conn, str->data);
    1697           4 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1698             :         {
    1699           0 :             pg_log_error("could not set replication progress for the subscription \"%s\": %s",
    1700             :                          dbinfo->subname, PQresultErrorMessage(res));
    1701           0 :             disconnect_database(conn, true);
    1702             :         }
    1703           4 :         PQclear(res);
    1704             :     }
    1705             : 
    1706          10 :     pg_free(subname);
    1707          10 :     pg_free(dbname);
    1708          10 :     pg_free(originname);
    1709          10 :     pg_free(lsnstr);
    1710          10 :     destroyPQExpBuffer(str);
    1711          10 : }
    1712             : 
    1713             : /*
    1714             :  * Enables the subscription.
    1715             :  *
    1716             :  * The subscription was created in a previous step but it was disabled. After
    1717             :  * adjusting the initial logical replication location, enable the subscription.
    1718             :  */
    1719             : static void
    1720          10 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
    1721             : {
    1722          10 :     PQExpBuffer str = createPQExpBuffer();
    1723             :     PGresult   *res;
    1724             :     char       *subname;
    1725             : 
    1726             :     Assert(conn != NULL);
    1727             : 
    1728          10 :     subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
    1729             : 
    1730          10 :     pg_log_info("enabling subscription \"%s\" on database \"%s\"",
    1731             :                 dbinfo->subname, dbinfo->dbname);
    1732             : 
    1733          10 :     appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
    1734             : 
    1735          10 :     pg_log_debug("command is: %s", str->data);
    1736             : 
    1737          10 :     if (!dry_run)
    1738             :     {
    1739           4 :         res = PQexec(conn, str->data);
    1740           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1741             :         {
    1742           0 :             pg_log_error("could not enable subscription \"%s\": %s",
    1743             :                          dbinfo->subname, PQresultErrorMessage(res));
    1744           0 :             disconnect_database(conn, true);
    1745             :         }
    1746             : 
    1747           4 :         PQclear(res);
    1748             :     }
    1749             : 
    1750          10 :     pg_free(subname);
    1751          10 :     destroyPQExpBuffer(str);
    1752          10 : }
    1753             : 
    1754             : int
    1755          40 : main(int argc, char **argv)
    1756             : {
    1757             :     static struct option long_options[] =
    1758             :     {
    1759             :         {"database", required_argument, NULL, 'd'},
    1760             :         {"pgdata", required_argument, NULL, 'D'},
    1761             :         {"dry-run", no_argument, NULL, 'n'},
    1762             :         {"subscriber-port", required_argument, NULL, 'p'},
    1763             :         {"publisher-server", required_argument, NULL, 'P'},
    1764             :         {"socket-directory", required_argument, NULL, 's'},
    1765             :         {"recovery-timeout", required_argument, NULL, 't'},
    1766             :         {"subscriber-username", required_argument, NULL, 'U'},
    1767             :         {"verbose", no_argument, NULL, 'v'},
    1768             :         {"version", no_argument, NULL, 'V'},
    1769             :         {"help", no_argument, NULL, '?'},
    1770             :         {"config-file", required_argument, NULL, 1},
    1771             :         {"publication", required_argument, NULL, 2},
    1772             :         {"replication-slot", required_argument, NULL, 3},
    1773             :         {"subscription", required_argument, NULL, 4},
    1774             :         {NULL, 0, NULL, 0}
    1775             :     };
    1776             : 
    1777          40 :     struct CreateSubscriberOptions opt = {0};
    1778             : 
    1779             :     int         c;
    1780             :     int         option_index;
    1781             : 
    1782             :     char       *pub_base_conninfo;
    1783             :     char       *sub_base_conninfo;
    1784          40 :     char       *dbname_conninfo = NULL;
    1785             : 
    1786             :     uint64      pub_sysid;
    1787             :     uint64      sub_sysid;
    1788             :     struct stat statbuf;
    1789             : 
    1790             :     char       *consistent_lsn;
    1791             : 
    1792             :     char        pidfile[MAXPGPATH];
    1793             : 
    1794          40 :     pg_logging_init(argv[0]);
    1795          40 :     pg_logging_set_level(PG_LOG_WARNING);
    1796          40 :     progname = get_progname(argv[0]);
    1797          40 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber"));
    1798             : 
    1799          40 :     if (argc > 1)
    1800             :     {
    1801          38 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
    1802             :         {
    1803           2 :             usage();
    1804           2 :             exit(0);
    1805             :         }
    1806          36 :         else if (strcmp(argv[1], "-V") == 0
    1807          36 :                  || strcmp(argv[1], "--version") == 0)
    1808             :         {
    1809           2 :             puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
    1810           2 :             exit(0);
    1811             :         }
    1812             :     }
    1813             : 
    1814             :     /* Default settings */
    1815          36 :     subscriber_dir = NULL;
    1816          36 :     opt.config_file = NULL;
    1817          36 :     opt.pub_conninfo_str = NULL;
    1818          36 :     opt.socket_dir = NULL;
    1819          36 :     opt.sub_port = DEFAULT_SUB_PORT;
    1820          36 :     opt.sub_username = NULL;
    1821          36 :     opt.database_names = (SimpleStringList)
    1822             :     {
    1823             :         0
    1824             :     };
    1825          36 :     opt.recovery_timeout = 0;
    1826             : 
    1827             :     /*
    1828             :      * Don't allow it to be run as root. It uses pg_ctl which does not allow
    1829             :      * it either.
    1830             :      */
    1831             : #ifndef WIN32
    1832          36 :     if (geteuid() == 0)
    1833             :     {
    1834           0 :         pg_log_error("cannot be executed by \"root\"");
    1835           0 :         pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
    1836             :                           progname);
    1837           0 :         exit(1);
    1838             :     }
    1839             : #endif
    1840             : 
    1841          36 :     get_restricted_token();
    1842             : 
    1843         264 :     while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
    1844             :                             long_options, &option_index)) != -1)
    1845             :     {
    1846         234 :         switch (c)
    1847             :         {
    1848          48 :             case 'd':
    1849          48 :                 if (!simple_string_list_member(&opt.database_names, optarg))
    1850             :                 {
    1851          46 :                     simple_string_list_append(&opt.database_names, optarg);
    1852          46 :                     num_dbs++;
    1853             :                 }
    1854             :                 else
    1855             :                 {
    1856           2 :                     pg_log_error("duplicate database \"%s\"", optarg);
    1857           2 :                     exit(1);
    1858             :                 }
    1859          46 :                 break;
    1860          32 :             case 'D':
    1861          32 :                 subscriber_dir = pg_strdup(optarg);
    1862          32 :                 canonicalize_path(subscriber_dir);
    1863          32 :                 break;
    1864          14 :             case 'n':
    1865          14 :                 dry_run = true;
    1866          14 :                 break;
    1867          18 :             case 'p':
    1868          18 :                 opt.sub_port = pg_strdup(optarg);
    1869          18 :                 break;
    1870          30 :             case 'P':
    1871          30 :                 opt.pub_conninfo_str = pg_strdup(optarg);
    1872          30 :                 break;
    1873          18 :             case 's':
    1874          18 :                 opt.socket_dir = pg_strdup(optarg);
    1875          18 :                 canonicalize_path(opt.socket_dir);
    1876          18 :                 break;
    1877           0 :             case 't':
    1878           0 :                 opt.recovery_timeout = atoi(optarg);
    1879           0 :                 break;
    1880           0 :             case 'U':
    1881           0 :                 opt.sub_username = pg_strdup(optarg);
    1882           0 :                 break;
    1883          32 :             case 'v':
    1884          32 :                 pg_logging_increase_verbosity();
    1885          32 :                 break;
    1886           0 :             case 1:
    1887           0 :                 opt.config_file = pg_strdup(optarg);
    1888           0 :                 break;
    1889          22 :             case 2:
    1890          22 :                 if (!simple_string_list_member(&opt.pub_names, optarg))
    1891             :                 {
    1892          20 :                     simple_string_list_append(&opt.pub_names, optarg);
    1893          20 :                     num_pubs++;
    1894             :                 }
    1895             :                 else
    1896             :                 {
    1897           2 :                     pg_log_error("duplicate publication \"%s\"", optarg);
    1898           2 :                     exit(1);
    1899             :                 }
    1900          20 :                 break;
    1901           8 :             case 3:
    1902           8 :                 if (!simple_string_list_member(&opt.replslot_names, optarg))
    1903             :                 {
    1904           8 :                     simple_string_list_append(&opt.replslot_names, optarg);
    1905           8 :                     num_replslots++;
    1906             :                 }
    1907             :                 else
    1908             :                 {
    1909           0 :                     pg_log_error("duplicate replication slot \"%s\"", optarg);
    1910           0 :                     exit(1);
    1911             :                 }
    1912           8 :                 break;
    1913          10 :             case 4:
    1914          10 :                 if (!simple_string_list_member(&opt.sub_names, optarg))
    1915             :                 {
    1916          10 :                     simple_string_list_append(&opt.sub_names, optarg);
    1917          10 :                     num_subs++;
    1918             :                 }
    1919             :                 else
    1920             :                 {
    1921           0 :                     pg_log_error("duplicate subscription \"%s\"", optarg);
    1922           0 :                     exit(1);
    1923             :                 }
    1924          10 :                 break;
    1925           2 :             default:
    1926             :                 /* getopt_long already emitted a complaint */
    1927           2 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    1928           2 :                 exit(1);
    1929             :         }
    1930             :     }
    1931             : 
    1932             :     /* Any non-option arguments? */
    1933          30 :     if (optind < argc)
    1934             :     {
    1935           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
    1936             :                      argv[optind]);
    1937           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    1938           0 :         exit(1);
    1939             :     }
    1940             : 
    1941             :     /* Required arguments */
    1942          30 :     if (subscriber_dir == NULL)
    1943             :     {
    1944           2 :         pg_log_error("no subscriber data directory specified");
    1945           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    1946           2 :         exit(1);
    1947             :     }
    1948             : 
    1949             :     /* If socket directory is not provided, use the current directory */
    1950          28 :     if (opt.socket_dir == NULL)
    1951             :     {
    1952             :         char        cwd[MAXPGPATH];
    1953             : 
    1954          10 :         if (!getcwd(cwd, MAXPGPATH))
    1955           0 :             pg_fatal("could not determine current directory");
    1956          10 :         opt.socket_dir = pg_strdup(cwd);
    1957          10 :         canonicalize_path(opt.socket_dir);
    1958             :     }
    1959             : 
    1960             :     /*
    1961             :      * Parse connection string. Build a base connection string that might be
    1962             :      * reused by multiple databases.
    1963             :      */
    1964          28 :     if (opt.pub_conninfo_str == NULL)
    1965             :     {
    1966             :         /*
    1967             :          * TODO use primary_conninfo (if available) from subscriber and
    1968             :          * extract publisher connection string. Assume that there are
    1969             :          * identical entries for physical and logical replication. If there is
    1970             :          * not, we would fail anyway.
    1971             :          */
    1972           2 :         pg_log_error("no publisher connection string specified");
    1973           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
    1974           2 :         exit(1);
    1975             :     }
    1976          26 :     pg_log_info("validating connection string on publisher");
    1977          26 :     pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
    1978             :                                           &dbname_conninfo);
    1979          26 :     if (pub_base_conninfo == NULL)
    1980           0 :         exit(1);
    1981             : 
    1982          26 :     pg_log_info("validating connection string on subscriber");
    1983          26 :     sub_base_conninfo = get_sub_conninfo(&opt);
    1984             : 
    1985          26 :     if (opt.database_names.head == NULL)
    1986             :     {
    1987           4 :         pg_log_info("no database was specified");
    1988             : 
    1989             :         /*
    1990             :          * If --database option is not provided, try to obtain the dbname from
    1991             :          * the publisher conninfo. If dbname parameter is not available, error
    1992             :          * out.
    1993             :          */
    1994           4 :         if (dbname_conninfo)
    1995             :         {
    1996           2 :             simple_string_list_append(&opt.database_names, dbname_conninfo);
    1997           2 :             num_dbs++;
    1998             : 
    1999           2 :             pg_log_info("database \"%s\" was extracted from the publisher connection string",
    2000             :                         dbname_conninfo);
    2001             :         }
    2002             :         else
    2003             :         {
    2004           2 :             pg_log_error("no database name specified");
    2005           2 :             pg_log_error_hint("Try \"%s --help\" for more information.",
    2006             :                               progname);
    2007           2 :             exit(1);
    2008             :         }
    2009             :     }
    2010             : 
    2011             :     /* Number of object names must match number of databases */
    2012          24 :     if (num_pubs > 0 && num_pubs != num_dbs)
    2013             :     {
    2014           2 :         pg_log_error("wrong number of publication names");
    2015           2 :         pg_log_error_hint("Number of publication names (%d) must match number of database names (%d).",
    2016             :                           num_pubs, num_dbs);
    2017           2 :         exit(1);
    2018             :     }
    2019          22 :     if (num_subs > 0 && num_subs != num_dbs)
    2020             :     {
    2021           2 :         pg_log_error("wrong number of subscription names");
    2022           2 :         pg_log_error_hint("Number of subscription names (%d) must match number of database names (%d).",
    2023             :                           num_subs, num_dbs);
    2024           2 :         exit(1);
    2025             :     }
    2026          20 :     if (num_replslots > 0 && num_replslots != num_dbs)
    2027             :     {
    2028           2 :         pg_log_error("wrong number of replication slot names");
    2029           2 :         pg_log_error_hint("Number of replication slot names (%d) must match number of database names (%d).",
    2030             :                           num_replslots, num_dbs);
    2031           2 :         exit(1);
    2032             :     }
    2033             : 
    2034             :     /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
    2035          18 :     pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
    2036          18 :     pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
    2037             : 
    2038             :     /* Rudimentary check for a data directory */
    2039          18 :     check_data_directory(subscriber_dir);
    2040             : 
    2041             :     /*
    2042             :      * Store database information for publisher and subscriber. It should be
    2043             :      * called before atexit() because its return is used in the
    2044             :      * cleanup_objects_atexit().
    2045             :      */
    2046          18 :     dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
    2047             : 
    2048             :     /* Register a function to clean up objects in case of failure */
    2049          18 :     atexit(cleanup_objects_atexit);
    2050             : 
    2051             :     /*
    2052             :      * Check if the subscriber data directory has the same system identifier
    2053             :      * than the publisher data directory.
    2054             :      */
    2055          18 :     pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
    2056          18 :     sub_sysid = get_standby_sysid(subscriber_dir);
    2057          18 :     if (pub_sysid != sub_sysid)
    2058           2 :         pg_fatal("subscriber data directory is not a copy of the source database cluster");
    2059             : 
    2060             :     /* Subscriber PID file */
    2061          16 :     snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
    2062             : 
    2063             :     /*
    2064             :      * The standby server must not be running. If the server is started under
    2065             :      * service manager and pg_createsubscriber stops it, the service manager
    2066             :      * might react to this action and start the server again. Therefore,
    2067             :      * refuse to proceed if the server is running to avoid possible failures.
    2068             :      */
    2069          16 :     if (stat(pidfile, &statbuf) == 0)
    2070             :     {
    2071           2 :         pg_log_error("standby is up and running");
    2072           2 :         pg_log_error_hint("Stop the standby and try again.");
    2073           2 :         exit(1);
    2074             :     }
    2075             : 
    2076             :     /*
    2077             :      * Start a short-lived standby server with temporary parameters (provided
    2078             :      * by command-line options). The goal is to avoid connections during the
    2079             :      * transformation steps.
    2080             :      */
    2081          14 :     pg_log_info("starting the standby with command-line options");
    2082          14 :     start_standby_server(&opt, true);
    2083             : 
    2084             :     /* Check if the standby server is ready for logical replication */
    2085          14 :     check_subscriber(dbinfo);
    2086             : 
    2087             :     /*
    2088             :      * Check if the primary server is ready for logical replication. This
    2089             :      * routine checks if a replication slot is in use on primary so it relies
    2090             :      * on check_subscriber() to obtain the primary_slot_name. That's why it is
    2091             :      * called after it.
    2092             :      */
    2093          10 :     check_publisher(dbinfo);
    2094             : 
    2095             :     /*
    2096             :      * Stop the target server. The recovery process requires that the server
    2097             :      * reaches a consistent state before targeting the recovery stop point.
    2098             :      * Make sure a consistent state is reached (stop the target server
    2099             :      * guarantees it) *before* creating the replication slots in
    2100             :      * setup_publisher().
    2101             :      */
    2102           6 :     pg_log_info("stopping the subscriber");
    2103           6 :     stop_standby_server(subscriber_dir);
    2104             : 
    2105             :     /*
    2106             :      * Create the required objects for each database on publisher. This step
    2107             :      * is here mainly because if we stop the standby we cannot verify if the
    2108             :      * primary slot is in use. We could use an extra connection for it but it
    2109             :      * doesn't seem worth.
    2110             :      */
    2111           6 :     consistent_lsn = setup_publisher(dbinfo);
    2112             : 
    2113             :     /* Write the required recovery parameters */
    2114           6 :     setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
    2115             : 
    2116             :     /*
    2117             :      * Start subscriber so the recovery parameters will take effect. Wait
    2118             :      * until accepting connections.
    2119             :      */
    2120           6 :     pg_log_info("starting the subscriber");
    2121           6 :     start_standby_server(&opt, true);
    2122             : 
    2123             :     /* Waiting the subscriber to be promoted */
    2124           6 :     wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
    2125             : 
    2126             :     /*
    2127             :      * Create the subscription for each database on subscriber. It does not
    2128             :      * enable it immediately because it needs to adjust the replication start
    2129             :      * point to the LSN reported by setup_publisher().  It also cleans up
    2130             :      * publications created by this tool and replication to the standby.
    2131             :      */
    2132           6 :     setup_subscriber(dbinfo, consistent_lsn);
    2133             : 
    2134             :     /* Remove primary_slot_name if it exists on primary */
    2135           6 :     drop_primary_replication_slot(dbinfo, primary_slot_name);
    2136             : 
    2137             :     /* Stop the subscriber */
    2138           6 :     pg_log_info("stopping the subscriber");
    2139           6 :     stop_standby_server(subscriber_dir);
    2140             : 
    2141             :     /* Change system identifier from subscriber */
    2142           6 :     modify_subscriber_sysid(&opt);
    2143             : 
    2144           6 :     success = true;
    2145             : 
    2146           6 :     pg_log_info("Done!");
    2147             : 
    2148           6 :     return 0;
    2149             : }

Generated by: LCOV version 1.14