LCOV - code coverage report
Current view: top level - src/bin/pg_dump - pg_backup_db.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 138 210 65.7 %
Date: 2021-12-09 04:09:06 Functions: 15 17 88.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_backup_db.c
       4             :  *
       5             :  *  Implements the basic DB functions used by the archiver.
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/bin/pg_dump/pg_backup_db.c
       9             :  *
      10             :  *-------------------------------------------------------------------------
      11             :  */
      12             : #include "postgres_fe.h"
      13             : 
      14             : #include <unistd.h>
      15             : #include <ctype.h>
      16             : #ifdef HAVE_TERMIOS_H
      17             : #include <termios.h>
      18             : #endif
      19             : 
      20             : #include "common/connect.h"
      21             : #include "common/string.h"
      22             : #include "dumputils.h"
      23             : #include "fe_utils/string_utils.h"
      24             : #include "parallel.h"
      25             : #include "pg_backup_archiver.h"
      26             : #include "pg_backup_db.h"
      27             : #include "pg_backup_utils.h"
      28             : 
      29             : static void _check_database_version(ArchiveHandle *AH);
      30             : static void notice_processor(void *arg, const char *message);
      31             : 
      32             : static void
      33         262 : _check_database_version(ArchiveHandle *AH)
      34             : {
      35             :     const char *remoteversion_str;
      36             :     int         remoteversion;
      37             :     PGresult   *res;
      38             : 
      39         262 :     remoteversion_str = PQparameterStatus(AH->connection, "server_version");
      40         262 :     remoteversion = PQserverVersion(AH->connection);
      41         262 :     if (remoteversion == 0 || !remoteversion_str)
      42           0 :         fatal("could not get server_version from libpq");
      43             : 
      44         262 :     AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
      45         262 :     AH->public.remoteVersion = remoteversion;
      46         262 :     if (!AH->archiveRemoteVersion)
      47         192 :         AH->archiveRemoteVersion = AH->public.remoteVersionStr;
      48             : 
      49         262 :     if (remoteversion != PG_VERSION_NUM
      50           0 :         && (remoteversion < AH->public.minRemoteVersion ||
      51           0 :             remoteversion > AH->public.maxRemoteVersion))
      52             :     {
      53           0 :         pg_log_error("server version: %s; %s version: %s",
      54             :                      remoteversion_str, progname, PG_VERSION);
      55           0 :         fatal("aborting because of server version mismatch");
      56             :     }
      57             : 
      58             :     /*
      59             :      * When running against 9.0 or later, check if we are in recovery mode,
      60             :      * which means we are on a hot standby.
      61             :      */
      62         262 :     if (remoteversion >= 90000)
      63             :     {
      64         262 :         res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()");
      65             : 
      66         262 :         AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
      67         262 :         PQclear(res);
      68             :     }
      69             :     else
      70           0 :         AH->public.isStandby = false;
      71         262 : }
      72             : 
      73             : /*
      74             :  * Reconnect to the server.  If dbname is not NULL, use that database,
      75             :  * else the one associated with the archive handle.
      76             :  */
      77             : void
      78          26 : ReconnectToServer(ArchiveHandle *AH, const char *dbname)
      79             : {
      80          26 :     PGconn     *oldConn = AH->connection;
      81          26 :     RestoreOptions *ropt = AH->public.ropt;
      82             : 
      83             :     /*
      84             :      * Save the dbname, if given, in override_dbname so that it will also
      85             :      * affect any later reconnection attempt.
      86             :      */
      87          26 :     if (dbname)
      88          26 :         ropt->cparams.override_dbname = pg_strdup(dbname);
      89             : 
      90             :     /*
      91             :      * Note: we want to establish the new connection, and in particular update
      92             :      * ArchiveHandle's connCancel, before closing old connection.  Otherwise
      93             :      * an ill-timed SIGINT could try to access a dead connection.
      94             :      */
      95          26 :     AH->connection = NULL;       /* dodge error check in ConnectDatabase */
      96             : 
      97          26 :     ConnectDatabase((Archive *) AH, &ropt->cparams, true);
      98             : 
      99          26 :     PQfinish(oldConn);
     100          26 : }
     101             : 
     102             : /*
     103             :  * Make, or remake, a database connection with the given parameters.
     104             :  *
     105             :  * The resulting connection handle is stored in AHX->connection.
     106             :  *
     107             :  * An interactive password prompt is automatically issued if required.
     108             :  * We store the results of that in AHX->savedPassword.
     109             :  * Note: it's not really all that sensible to use a single-entry password
     110             :  * cache if the username keeps changing.  In current usage, however, the
     111             :  * username never does change, so one savedPassword is sufficient.
     112             :  */
     113             : void
     114         264 : ConnectDatabase(Archive *AHX,
     115             :                 const ConnParams *cparams,
     116             :                 bool isReconnect)
     117             : {
     118         264 :     ArchiveHandle *AH = (ArchiveHandle *) AHX;
     119             :     trivalue    prompt_password;
     120             :     char       *password;
     121             :     bool        new_pass;
     122             : 
     123         264 :     if (AH->connection)
     124           0 :         fatal("already connected to a database");
     125             : 
     126             :     /* Never prompt for a password during a reconnection */
     127         264 :     prompt_password = isReconnect ? TRI_NO : cparams->promptPassword;
     128             : 
     129         264 :     password = AH->savedPassword;
     130             : 
     131         264 :     if (prompt_password == TRI_YES && password == NULL)
     132           0 :         password = simple_prompt("Password: ", false);
     133             : 
     134             :     /*
     135             :      * Start the connection.  Loop until we have a password if requested by
     136             :      * backend.
     137             :      */
     138             :     do
     139             :     {
     140             :         const char *keywords[8];
     141             :         const char *values[8];
     142         264 :         int         i = 0;
     143             : 
     144             :         /*
     145             :          * If dbname is a connstring, its entries can override the other
     146             :          * values obtained from cparams; but in turn, override_dbname can
     147             :          * override the dbname component of it.
     148             :          */
     149         264 :         keywords[i] = "host";
     150         264 :         values[i++] = cparams->pghost;
     151         264 :         keywords[i] = "port";
     152         264 :         values[i++] = cparams->pgport;
     153         264 :         keywords[i] = "user";
     154         264 :         values[i++] = cparams->username;
     155         264 :         keywords[i] = "password";
     156         264 :         values[i++] = password;
     157         264 :         keywords[i] = "dbname";
     158         264 :         values[i++] = cparams->dbname;
     159         264 :         if (cparams->override_dbname)
     160             :         {
     161          32 :             keywords[i] = "dbname";
     162          32 :             values[i++] = cparams->override_dbname;
     163             :         }
     164         264 :         keywords[i] = "fallback_application_name";
     165         264 :         values[i++] = progname;
     166         264 :         keywords[i] = NULL;
     167         264 :         values[i++] = NULL;
     168             :         Assert(i <= lengthof(keywords));
     169             : 
     170         264 :         new_pass = false;
     171         264 :         AH->connection = PQconnectdbParams(keywords, values, true);
     172             : 
     173         264 :         if (!AH->connection)
     174           0 :             fatal("could not connect to database");
     175             : 
     176         266 :         if (PQstatus(AH->connection) == CONNECTION_BAD &&
     177           2 :             PQconnectionNeedsPassword(AH->connection) &&
     178           0 :             password == NULL &&
     179             :             prompt_password != TRI_NO)
     180             :         {
     181           0 :             PQfinish(AH->connection);
     182           0 :             password = simple_prompt("Password: ", false);
     183           0 :             new_pass = true;
     184             :         }
     185         264 :     } while (new_pass);
     186             : 
     187             :     /* check to see that the backend connection was successfully made */
     188         264 :     if (PQstatus(AH->connection) == CONNECTION_BAD)
     189             :     {
     190           2 :         if (isReconnect)
     191           0 :             fatal("reconnection failed: %s",
     192             :                   PQerrorMessage(AH->connection));
     193             :         else
     194           2 :             fatal("%s",
     195             :                   PQerrorMessage(AH->connection));
     196             :     }
     197             : 
     198             :     /* Start strict; later phases may override this. */
     199         262 :     PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
     200             :                                         ALWAYS_SECURE_SEARCH_PATH_SQL));
     201             : 
     202         262 :     if (password && password != AH->savedPassword)
     203           0 :         free(password);
     204             : 
     205             :     /*
     206             :      * We want to remember connection's actual password, whether or not we got
     207             :      * it by prompting.  So we don't just store the password variable.
     208             :      */
     209         262 :     if (PQconnectionUsedPassword(AH->connection))
     210             :     {
     211           0 :         if (AH->savedPassword)
     212           0 :             free(AH->savedPassword);
     213           0 :         AH->savedPassword = pg_strdup(PQpass(AH->connection));
     214             :     }
     215             : 
     216             :     /* check for version mismatch */
     217         262 :     _check_database_version(AH);
     218             : 
     219         262 :     PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
     220             : 
     221             :     /* arrange for SIGINT to issue a query cancel on this connection */
     222         262 :     set_archive_cancel_info(AH, AH->connection);
     223         262 : }
     224             : 
     225             : /*
     226             :  * Close the connection to the database and also cancel off the query if we
     227             :  * have one running.
     228             :  */
     229             : void
     230         238 : DisconnectDatabase(Archive *AHX)
     231             : {
     232         238 :     ArchiveHandle *AH = (ArchiveHandle *) AHX;
     233             :     char        errbuf[1];
     234             : 
     235         238 :     if (!AH->connection)
     236           0 :         return;
     237             : 
     238         238 :     if (AH->connCancel)
     239             :     {
     240             :         /*
     241             :          * If we have an active query, send a cancel before closing, ignoring
     242             :          * any errors.  This is of no use for a normal exit, but might be
     243             :          * helpful during fatal().
     244             :          */
     245         236 :         if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
     246           0 :             (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
     247             : 
     248             :         /*
     249             :          * Prevent signal handler from sending a cancel after this.
     250             :          */
     251         236 :         set_archive_cancel_info(AH, NULL);
     252             :     }
     253             : 
     254         238 :     PQfinish(AH->connection);
     255         238 :     AH->connection = NULL;
     256             : }
     257             : 
     258             : PGconn *
     259        3534 : GetConnection(Archive *AHX)
     260             : {
     261        3534 :     ArchiveHandle *AH = (ArchiveHandle *) AHX;
     262             : 
     263        3534 :     return AH->connection;
     264             : }
     265             : 
     266             : static void
     267           0 : notice_processor(void *arg, const char *message)
     268             : {
     269           0 :     pg_log_generic(PG_LOG_INFO, "%s", message);
     270           0 : }
     271             : 
     272             : /* Like fatal(), but with a complaint about a particular query. */
     273             : static void
     274           4 : die_on_query_failure(ArchiveHandle *AH, const char *query)
     275             : {
     276           4 :     pg_log_error("query failed: %s",
     277             :                  PQerrorMessage(AH->connection));
     278           4 :     fatal("query was: %s", query);
     279             : }
     280             : 
     281             : void
     282        8250 : ExecuteSqlStatement(Archive *AHX, const char *query)
     283             : {
     284        8250 :     ArchiveHandle *AH = (ArchiveHandle *) AHX;
     285             :     PGresult   *res;
     286             : 
     287        8250 :     res = PQexec(AH->connection, query);
     288        8250 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     289           2 :         die_on_query_failure(AH, query);
     290        8248 :     PQclear(res);
     291        8248 : }
     292             : 
     293             : PGresult *
     294       25520 : ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
     295             : {
     296       25520 :     ArchiveHandle *AH = (ArchiveHandle *) AHX;
     297             :     PGresult   *res;
     298             : 
     299       25520 :     res = PQexec(AH->connection, query);
     300       25520 :     if (PQresultStatus(res) != status)
     301           2 :         die_on_query_failure(AH, query);
     302       25518 :     return res;
     303             : }
     304             : 
     305             : /*
     306             :  * Execute an SQL query and verify that we got exactly one row back.
     307             :  */
     308             : PGresult *
     309       10574 : ExecuteSqlQueryForSingleRow(Archive *fout, const char *query)
     310             : {
     311             :     PGresult   *res;
     312             :     int         ntups;
     313             : 
     314       10574 :     res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
     315             : 
     316             :     /* Expecting a single result only */
     317       10574 :     ntups = PQntuples(res);
     318       10574 :     if (ntups != 1)
     319           0 :         fatal(ngettext("query returned %d row instead of one: %s",
     320             :                        "query returned %d rows instead of one: %s",
     321             :                        ntups),
     322             :               ntups, query);
     323             : 
     324       10574 :     return res;
     325             : }
     326             : 
     327             : /*
     328             :  * Convenience function to send a query.
     329             :  * Monitors result to detect COPY statements
     330             :  */
     331             : static void
     332        6514 : ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
     333             : {
     334        6514 :     PGconn     *conn = AH->connection;
     335             :     PGresult   *res;
     336             : 
     337             : #ifdef NOT_USED
     338             :     fprintf(stderr, "Executing: '%s'\n\n", qry);
     339             : #endif
     340        6514 :     res = PQexec(conn, qry);
     341             : 
     342        6514 :     switch (PQresultStatus(res))
     343             :     {
     344        6510 :         case PGRES_COMMAND_OK:
     345             :         case PGRES_TUPLES_OK:
     346             :         case PGRES_EMPTY_QUERY:
     347             :             /* A-OK */
     348        6510 :             break;
     349           4 :         case PGRES_COPY_IN:
     350             :             /* Assume this is an expected result */
     351           4 :             AH->pgCopyIn = true;
     352           4 :             break;
     353           0 :         default:
     354             :             /* trouble */
     355           0 :             warn_or_exit_horribly(AH, "%s: %sCommand was: %s",
     356             :                                   desc, PQerrorMessage(conn), qry);
     357           0 :             break;
     358             :     }
     359             : 
     360        6514 :     PQclear(res);
     361        6514 : }
     362             : 
     363             : 
     364             : /*
     365             :  * Process non-COPY table data (that is, INSERT commands).
     366             :  *
     367             :  * The commands have been run together as one long string for compressibility,
     368             :  * and we are receiving them in bufferloads with arbitrary boundaries, so we
     369             :  * have to locate command boundaries and save partial commands across calls.
     370             :  * All state must be kept in AH->sqlparse, not in local variables of this
     371             :  * routine.  We assume that AH->sqlparse was filled with zeroes when created.
     372             :  *
     373             :  * We have to lex the data to the extent of identifying literals and quoted
     374             :  * identifiers, so that we can recognize statement-terminating semicolons.
     375             :  * We assume that INSERT data will not contain SQL comments, E'' literals,
     376             :  * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
     377             :  *
     378             :  * Note: when restoring from a pre-9.0 dump file, this code is also used to
     379             :  * process BLOB COMMENTS data, which has the same problem of containing
     380             :  * multiple SQL commands that might be split across bufferloads.  Fortunately,
     381             :  * that data won't contain anything complicated to lex either.
     382             :  */
     383             : static void
     384           0 : ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
     385             : {
     386           0 :     const char *qry = buf;
     387           0 :     const char *eos = buf + bufLen;
     388             : 
     389             :     /* initialize command buffer if first time through */
     390           0 :     if (AH->sqlparse.curCmd == NULL)
     391           0 :         AH->sqlparse.curCmd = createPQExpBuffer();
     392             : 
     393           0 :     for (; qry < eos; qry++)
     394             :     {
     395           0 :         char        ch = *qry;
     396             : 
     397             :         /* For neatness, we skip any newlines between commands */
     398           0 :         if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
     399           0 :             appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
     400             : 
     401           0 :         switch (AH->sqlparse.state)
     402             :         {
     403           0 :             case SQL_SCAN:      /* Default state == 0, set in _allocAH */
     404           0 :                 if (ch == ';')
     405             :                 {
     406             :                     /*
     407             :                      * We've found the end of a statement. Send it and reset
     408             :                      * the buffer.
     409             :                      */
     410           0 :                     ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
     411             :                                       "could not execute query");
     412           0 :                     resetPQExpBuffer(AH->sqlparse.curCmd);
     413             :                 }
     414           0 :                 else if (ch == '\'')
     415             :                 {
     416           0 :                     AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
     417           0 :                     AH->sqlparse.backSlash = false;
     418             :                 }
     419           0 :                 else if (ch == '"')
     420             :                 {
     421           0 :                     AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
     422             :                 }
     423           0 :                 break;
     424             : 
     425           0 :             case SQL_IN_SINGLE_QUOTE:
     426             :                 /* We needn't handle '' specially */
     427           0 :                 if (ch == '\'' && !AH->sqlparse.backSlash)
     428           0 :                     AH->sqlparse.state = SQL_SCAN;
     429           0 :                 else if (ch == '\\' && !AH->public.std_strings)
     430           0 :                     AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
     431             :                 else
     432           0 :                     AH->sqlparse.backSlash = false;
     433           0 :                 break;
     434             : 
     435           0 :             case SQL_IN_DOUBLE_QUOTE:
     436             :                 /* We needn't handle "" specially */
     437           0 :                 if (ch == '"')
     438           0 :                     AH->sqlparse.state = SQL_SCAN;
     439           0 :                 break;
     440             :         }
     441           0 :     }
     442           0 : }
     443             : 
     444             : 
     445             : /*
     446             :  * Implement ahwrite() for direct-to-DB restore
     447             :  */
     448             : int
     449        6510 : ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
     450             : {
     451        6510 :     ArchiveHandle *AH = (ArchiveHandle *) AHX;
     452             : 
     453        6510 :     if (AH->outputKind == OUTPUT_COPYDATA)
     454             :     {
     455             :         /*
     456             :          * COPY data.
     457             :          *
     458             :          * We drop the data on the floor if libpq has failed to enter COPY
     459             :          * mode; this allows us to behave reasonably when trying to continue
     460             :          * after an error in a COPY command.
     461             :          */
     462           8 :         if (AH->pgCopyIn &&
     463           4 :             PQputCopyData(AH->connection, buf, bufLen) <= 0)
     464           0 :             fatal("error returned by PQputCopyData: %s",
     465             :                   PQerrorMessage(AH->connection));
     466             :     }
     467        6506 :     else if (AH->outputKind == OUTPUT_OTHERDATA)
     468             :     {
     469             :         /*
     470             :          * Table data expressed as INSERT commands; or, in old dump files,
     471             :          * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
     472             :          */
     473           0 :         ExecuteSimpleCommands(AH, buf, bufLen);
     474             :     }
     475             :     else
     476             :     {
     477             :         /*
     478             :          * General SQL commands; we assume that commands will not be split
     479             :          * across calls.
     480             :          *
     481             :          * In most cases the data passed to us will be a null-terminated
     482             :          * string, but if it's not, we have to add a trailing null.
     483             :          */
     484        6506 :         if (buf[bufLen] == '\0')
     485        6506 :             ExecuteSqlCommand(AH, buf, "could not execute query");
     486             :         else
     487             :         {
     488           0 :             char       *str = (char *) pg_malloc(bufLen + 1);
     489             : 
     490           0 :             memcpy(str, buf, bufLen);
     491           0 :             str[bufLen] = '\0';
     492           0 :             ExecuteSqlCommand(AH, str, "could not execute query");
     493           0 :             free(str);
     494             :         }
     495             :     }
     496             : 
     497        6510 :     return bufLen;
     498             : }
     499             : 
     500             : /*
     501             :  * Terminate a COPY operation during direct-to-DB restore
     502             :  */
     503             : void
     504           4 : EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
     505             : {
     506           4 :     ArchiveHandle *AH = (ArchiveHandle *) AHX;
     507             : 
     508           4 :     if (AH->pgCopyIn)
     509             :     {
     510             :         PGresult   *res;
     511             : 
     512           4 :         if (PQputCopyEnd(AH->connection, NULL) <= 0)
     513           0 :             fatal("error returned by PQputCopyEnd: %s",
     514             :                   PQerrorMessage(AH->connection));
     515             : 
     516             :         /* Check command status and return to normal libpq state */
     517           4 :         res = PQgetResult(AH->connection);
     518           4 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     519           0 :             warn_or_exit_horribly(AH, "COPY failed for table \"%s\": %s",
     520           0 :                                   tocEntryTag, PQerrorMessage(AH->connection));
     521           4 :         PQclear(res);
     522             : 
     523             :         /* Do this to ensure we've pumped libpq back to idle state */
     524           4 :         if (PQgetResult(AH->connection) != NULL)
     525           0 :             pg_log_warning("unexpected extra results during COPY of table \"%s\"",
     526             :                            tocEntryTag);
     527             : 
     528           4 :         AH->pgCopyIn = false;
     529             :     }
     530           4 : }
     531             : 
     532             : void
     533           4 : StartTransaction(Archive *AHX)
     534             : {
     535           4 :     ArchiveHandle *AH = (ArchiveHandle *) AHX;
     536             : 
     537           4 :     ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
     538           4 : }
     539             : 
     540             : void
     541           4 : CommitTransaction(Archive *AHX)
     542             : {
     543           4 :     ArchiveHandle *AH = (ArchiveHandle *) AHX;
     544             : 
     545           4 :     ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
     546           4 : }
     547             : 
     548             : void
     549           2 : DropBlobIfExists(ArchiveHandle *AH, Oid oid)
     550             : {
     551             :     /*
     552             :      * If we are not restoring to a direct database connection, we have to
     553             :      * guess about how to detect whether the blob exists.  Assume new-style.
     554             :      */
     555           4 :     if (AH->connection == NULL ||
     556           2 :         PQserverVersion(AH->connection) >= 90000)
     557             :     {
     558           2 :         ahprintf(AH,
     559             :                  "SELECT pg_catalog.lo_unlink(oid) "
     560             :                  "FROM pg_catalog.pg_largeobject_metadata "
     561             :                  "WHERE oid = '%u';\n",
     562             :                  oid);
     563             :     }
     564             :     else
     565             :     {
     566             :         /* Restoring to pre-9.0 server, so do it the old way */
     567           0 :         ahprintf(AH,
     568             :                  "SELECT CASE WHEN EXISTS("
     569             :                  "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
     570             :                  ") THEN pg_catalog.lo_unlink('%u') END;\n",
     571             :                  oid, oid);
     572             :     }
     573           2 : }

Generated by: LCOV version 1.14