LCOV - code coverage report
Current view: top level - src/bin/pg_rewind - libpq_fetch.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 146 180 81.1 %
Date: 2019-09-19 16:06:56 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq_fetch.c
       4             :  *    Functions for fetching files from a remote server.
       5             :  *
       6             :  * Copyright (c) 2013-2019, PostgreSQL Global Development Group
       7             :  *
       8             :  *-------------------------------------------------------------------------
       9             :  */
      10             : #include "postgres_fe.h"
      11             : 
      12             : #include <sys/stat.h>
      13             : #include <dirent.h>
      14             : #include <fcntl.h>
      15             : #include <unistd.h>
      16             : 
      17             : #include "pg_rewind.h"
      18             : #include "datapagemap.h"
      19             : #include "fetch.h"
      20             : #include "file_ops.h"
      21             : #include "filemap.h"
      22             : 
      23             : #include "libpq-fe.h"
      24             : #include "catalog/pg_type_d.h"
      25             : #include "fe_utils/connect.h"
      26             : #include "port/pg_bswap.h"
      27             : 
      28             : static PGconn *conn = NULL;
      29             : 
      30             : /*
      31             :  * Files are fetched max CHUNKSIZE bytes at a time.
      32             :  *
      33             :  * (This only applies to files that are copied in whole, or for truncated
      34             :  * files where we copy the tail. Relation files, where we know the individual
      35             :  * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
      36             :  */
      37             : #define CHUNKSIZE 1000000
      38             : 
      39             : static void receiveFileChunks(const char *sql);
      40             : static void execute_pagemap(datapagemap_t *pagemap, const char *path);
      41             : static char *run_simple_query(const char *sql);
      42             : static void run_simple_command(const char *sql);
      43             : 
      44             : void
      45           8 : libpqConnect(const char *connstr)
      46             : {
      47             :     char       *str;
      48             :     PGresult   *res;
      49             : 
      50           8 :     conn = PQconnectdb(connstr);
      51           8 :     if (PQstatus(conn) == CONNECTION_BAD)
      52           0 :         pg_fatal("could not connect to server: %s",
      53             :                  PQerrorMessage(conn));
      54             : 
      55           8 :     if (showprogress)
      56           0 :         pg_log_info("connected to server");
      57             : 
      58             :     /* disable all types of timeouts */
      59           8 :     run_simple_command("SET statement_timeout = 0");
      60           8 :     run_simple_command("SET lock_timeout = 0");
      61           8 :     run_simple_command("SET idle_in_transaction_session_timeout = 0");
      62             : 
      63           8 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
      64           8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
      65           0 :         pg_fatal("could not clear search_path: %s",
      66             :                  PQresultErrorMessage(res));
      67           8 :     PQclear(res);
      68             : 
      69             :     /*
      70             :      * Check that the server is not in hot standby mode. There is no
      71             :      * fundamental reason that couldn't be made to work, but it doesn't
      72             :      * currently because we use a temporary table. Better to check for it
      73             :      * explicitly than error out, for a better error message.
      74             :      */
      75           8 :     str = run_simple_query("SELECT pg_is_in_recovery()");
      76           8 :     if (strcmp(str, "f") != 0)
      77           0 :         pg_fatal("source server must not be in recovery mode");
      78           8 :     pg_free(str);
      79             : 
      80             :     /*
      81             :      * Also check that full_page_writes is enabled.  We can get torn pages if
      82             :      * a page is modified while we read it with pg_read_binary_file(), and we
      83             :      * rely on full page images to fix them.
      84             :      */
      85           8 :     str = run_simple_query("SHOW full_page_writes");
      86           8 :     if (strcmp(str, "on") != 0)
      87           0 :         pg_fatal("full_page_writes must be enabled in the source server");
      88           8 :     pg_free(str);
      89             : 
      90             :     /*
      91             :      * Although we don't do any "real" updates, we do work with a temporary
      92             :      * table. We don't care about synchronous commit for that. It doesn't
      93             :      * otherwise matter much, but if the server is using synchronous
      94             :      * replication, and replication isn't working for some reason, we don't
      95             :      * want to get stuck, waiting for it to start working again.
      96             :      */
      97           8 :     run_simple_command("SET synchronous_commit = off");
      98           8 : }
      99             : 
     100             : /*
     101             :  * Runs a query that returns a single value.
     102             :  * The result should be pg_free'd after use.
     103             :  */
     104             : static char *
     105          24 : run_simple_query(const char *sql)
     106             : {
     107             :     PGresult   *res;
     108             :     char       *result;
     109             : 
     110          24 :     res = PQexec(conn, sql);
     111             : 
     112          24 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     113           0 :         pg_fatal("error running query (%s) in source server: %s",
     114             :                  sql, PQresultErrorMessage(res));
     115             : 
     116             :     /* sanity check the result set */
     117          24 :     if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     118           0 :         pg_fatal("unexpected result set from query");
     119             : 
     120          24 :     result = pg_strdup(PQgetvalue(res, 0, 0));
     121             : 
     122          24 :     PQclear(res);
     123             : 
     124          24 :     return result;
     125             : }
     126             : 
     127             : /*
     128             :  * Runs a command.
     129             :  * In the event of a failure, exit immediately.
     130             :  */
     131             : static void
     132          40 : run_simple_command(const char *sql)
     133             : {
     134             :     PGresult   *res;
     135             : 
     136          40 :     res = PQexec(conn, sql);
     137             : 
     138          40 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     139           0 :         pg_fatal("error running query (%s) in source server: %s",
     140             :                  sql, PQresultErrorMessage(res));
     141             : 
     142          40 :     PQclear(res);
     143          40 : }
     144             : 
     145             : /*
     146             :  * Calls pg_current_wal_insert_lsn() function
     147             :  */
     148             : XLogRecPtr
     149           8 : libpqGetCurrentXlogInsertLocation(void)
     150             : {
     151             :     XLogRecPtr  result;
     152             :     uint32      hi;
     153             :     uint32      lo;
     154             :     char       *val;
     155             : 
     156           8 :     val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
     157             : 
     158           8 :     if (sscanf(val, "%X/%X", &hi, &lo) != 2)
     159           0 :         pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
     160             : 
     161           8 :     result = ((uint64) hi) << 32 | lo;
     162             : 
     163           8 :     pg_free(val);
     164             : 
     165           8 :     return result;
     166             : }
     167             : 
     168             : /*
     169             :  * Get a list of all files in the data directory.
     170             :  */
     171             : void
     172           8 : libpqProcessFileList(void)
     173             : {
     174             :     PGresult   *res;
     175             :     const char *sql;
     176             :     int         i;
     177             : 
     178             :     /*
     179             :      * Create a recursive directory listing of the whole data directory.
     180             :      *
     181             :      * The WITH RECURSIVE part does most of the work. The second part gets the
     182             :      * targets of the symlinks in pg_tblspc directory.
     183             :      *
     184             :      * XXX: There is no backend function to get a symbolic link's target in
     185             :      * general, so if the admin has put any custom symbolic links in the data
     186             :      * directory, they won't be copied correctly.
     187             :      */
     188           8 :     sql =
     189             :         "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
     190             :         "  SELECT '' AS path, filename, size, isdir FROM\n"
     191             :         "  (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
     192             :         "        pg_stat_file(fn.filename, true) AS this\n"
     193             :         "  UNION ALL\n"
     194             :         "  SELECT parent.path || parent.filename || '/' AS path,\n"
     195             :         "         fn, this.size, this.isdir\n"
     196             :         "  FROM files AS parent,\n"
     197             :         "       pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
     198             :         "       pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
     199             :         "       WHERE parent.isdir = 't'\n"
     200             :         ")\n"
     201             :         "SELECT path || filename, size, isdir,\n"
     202             :         "       pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
     203             :         "FROM files\n"
     204             :         "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
     205             :         "                             AND oid::text = files.filename\n";
     206           8 :     res = PQexec(conn, sql);
     207             : 
     208           8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     209           0 :         pg_fatal("could not fetch file list: %s",
     210             :                  PQresultErrorMessage(res));
     211             : 
     212             :     /* sanity check the result set */
     213           8 :     if (PQnfields(res) != 4)
     214           0 :         pg_fatal("unexpected result set while fetching file list");
     215             : 
     216             :     /* Read result to local variables */
     217       10000 :     for (i = 0; i < PQntuples(res); i++)
     218             :     {
     219        9992 :         char       *path = PQgetvalue(res, i, 0);
     220        9992 :         int64       filesize = atol(PQgetvalue(res, i, 1));
     221        9992 :         bool        isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
     222        9992 :         char       *link_target = PQgetvalue(res, i, 3);
     223             :         file_type_t type;
     224             : 
     225        9992 :         if (PQgetisnull(res, 0, 1))
     226             :         {
     227             :             /*
     228             :              * The file was removed from the server while the query was
     229             :              * running. Ignore it.
     230             :              */
     231           0 :             continue;
     232             :         }
     233             : 
     234        9992 :         if (link_target[0])
     235           0 :             type = FILE_TYPE_SYMLINK;
     236        9992 :         else if (isdir)
     237         214 :             type = FILE_TYPE_DIRECTORY;
     238             :         else
     239        9778 :             type = FILE_TYPE_REGULAR;
     240             : 
     241        9992 :         process_source_file(path, type, filesize, link_target);
     242             :     }
     243           8 :     PQclear(res);
     244           8 : }
     245             : 
     246             : /*----
     247             :  * Runs a query, which returns pieces of files from the remote source data
     248             :  * directory, and overwrites the corresponding parts of target files with
     249             :  * the received parts. The result set is expected to be of format:
     250             :  *
     251             :  * path     text    -- path in the data directory, e.g "base/1/123"
     252             :  * begin    int8    -- offset within the file
     253             :  * chunk    bytea   -- file content
     254             :  *----
     255             :  */
     256             : static void
     257           8 : receiveFileChunks(const char *sql)
     258             : {
     259             :     PGresult   *res;
     260             : 
     261           8 :     if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
     262           0 :         pg_fatal("could not send query: %s", PQerrorMessage(conn));
     263             : 
     264           8 :     pg_log_debug("getting file chunks");
     265             : 
     266           8 :     if (PQsetSingleRowMode(conn) != 1)
     267           0 :         pg_fatal("could not set libpq connection to single row mode");
     268             : 
     269        3332 :     while ((res = PQgetResult(conn)) != NULL)
     270             :     {
     271             :         char       *filename;
     272             :         int         filenamelen;
     273             :         int64       chunkoff;
     274             :         int         chunksize;
     275             :         char       *chunk;
     276             : 
     277        3316 :         switch (PQresultStatus(res))
     278             :         {
     279             :             case PGRES_SINGLE_TUPLE:
     280        3308 :                 break;
     281             : 
     282             :             case PGRES_TUPLES_OK:
     283           8 :                 PQclear(res);
     284          16 :                 continue;       /* final zero-row result */
     285             : 
     286             :             default:
     287           0 :                 pg_fatal("unexpected result while fetching remote files: %s",
     288             :                          PQresultErrorMessage(res));
     289             :         }
     290             : 
     291             :         /* sanity check the result set */
     292        3308 :         if (PQnfields(res) != 3 || PQntuples(res) != 1)
     293           0 :             pg_fatal("unexpected result set size while fetching remote files");
     294             : 
     295        6616 :         if (PQftype(res, 0) != TEXTOID ||
     296        6616 :             PQftype(res, 1) != INT8OID ||
     297        3308 :             PQftype(res, 2) != BYTEAOID)
     298             :         {
     299           0 :             pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
     300             :                      PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
     301             :         }
     302             : 
     303        3308 :         if (PQfformat(res, 0) != 1 &&
     304           0 :             PQfformat(res, 1) != 1 &&
     305           0 :             PQfformat(res, 2) != 1)
     306             :         {
     307           0 :             pg_fatal("unexpected result format while fetching remote files");
     308             :         }
     309             : 
     310        6616 :         if (PQgetisnull(res, 0, 0) ||
     311        3308 :             PQgetisnull(res, 0, 1))
     312             :         {
     313           0 :             pg_fatal("unexpected null values in result while fetching remote files");
     314             :         }
     315             : 
     316        3308 :         if (PQgetlength(res, 0, 1) != sizeof(int64))
     317           0 :             pg_fatal("unexpected result length while fetching remote files");
     318             : 
     319             :         /* Read result set to local variables */
     320        3308 :         memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
     321        3308 :         chunkoff = pg_ntoh64(chunkoff);
     322        3308 :         chunksize = PQgetlength(res, 0, 2);
     323             : 
     324        3308 :         filenamelen = PQgetlength(res, 0, 0);
     325        3308 :         filename = pg_malloc(filenamelen + 1);
     326        3308 :         memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
     327        3308 :         filename[filenamelen] = '\0';
     328             : 
     329        3308 :         chunk = PQgetvalue(res, 0, 2);
     330             : 
     331             :         /*
     332             :          * If a file has been deleted on the source, remove it on the target
     333             :          * as well.  Note that multiple unlink() calls may happen on the same
     334             :          * file if multiple data chunks are associated with it, hence ignore
     335             :          * unconditionally anything missing.  If this file is not a relation
     336             :          * data file, then it has been already truncated when creating the
     337             :          * file chunk list at the previous execution of the filemap.
     338             :          */
     339        3308 :         if (PQgetisnull(res, 0, 2))
     340             :         {
     341           0 :             pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
     342             :                          filename);
     343           0 :             remove_target_file(filename, true);
     344           0 :             pg_free(filename);
     345           0 :             PQclear(res);
     346           0 :             continue;
     347             :         }
     348             : 
     349        3308 :         pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
     350             :                      filename, (long long int) chunkoff, chunksize);
     351             : 
     352        3308 :         open_target_file(filename, false);
     353             : 
     354        3308 :         write_target_range(chunk, chunkoff, chunksize);
     355             : 
     356        3308 :         pg_free(filename);
     357             : 
     358        3308 :         PQclear(res);
     359             :     }
     360           8 : }
     361             : 
     362             : /*
     363             :  * Receive a single file as a malloc'd buffer.
     364             :  */
     365             : char *
     366          16 : libpqGetFile(const char *filename, size_t *filesize)
     367             : {
     368             :     PGresult   *res;
     369             :     char       *result;
     370             :     int         len;
     371             :     const char *paramValues[1];
     372             : 
     373          16 :     paramValues[0] = filename;
     374          16 :     res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
     375             :                        1, NULL, paramValues, NULL, NULL, 1);
     376             : 
     377          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     378           0 :         pg_fatal("could not fetch remote file \"%s\": %s",
     379             :                  filename, PQresultErrorMessage(res));
     380             : 
     381             :     /* sanity check the result set */
     382          16 :     if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     383           0 :         pg_fatal("unexpected result set while fetching remote file \"%s\"",
     384             :                  filename);
     385             : 
     386             :     /* Read result to local variables */
     387          16 :     len = PQgetlength(res, 0, 0);
     388          16 :     result = pg_malloc(len + 1);
     389          16 :     memcpy(result, PQgetvalue(res, 0, 0), len);
     390          16 :     result[len] = '\0';
     391             : 
     392          16 :     PQclear(res);
     393             : 
     394          16 :     pg_log_debug("fetched file \"%s\", length %d", filename, len);
     395             : 
     396          16 :     if (filesize)
     397           8 :         *filesize = len;
     398          16 :     return result;
     399             : }
     400             : 
     401             : /*
     402             :  * Write a file range to a temporary table in the server.
     403             :  *
     404             :  * The range is sent to the server as a COPY formatted line, to be inserted
     405             :  * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
     406             :  * function to actually fetch the data.
     407             :  */
     408             : static void
     409        3066 : fetch_file_range(const char *path, uint64 begin, uint64 end)
     410             : {
     411             :     char        linebuf[MAXPGPATH + 23];
     412             : 
     413             :     /* Split the range into CHUNKSIZE chunks */
     414        9440 :     while (end - begin > 0)
     415             :     {
     416             :         unsigned int len;
     417             : 
     418             :         /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
     419        3308 :         if (end - begin > CHUNKSIZE)
     420         352 :             len = CHUNKSIZE;
     421             :         else
     422        2956 :             len = (unsigned int) (end - begin);
     423             : 
     424        3308 :         snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
     425             : 
     426        3308 :         if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
     427           0 :             pg_fatal("could not send COPY data: %s",
     428             :                      PQerrorMessage(conn));
     429             : 
     430        3308 :         begin += len;
     431             :     }
     432        3066 : }
     433             : 
     434             : /*
     435             :  * Fetch all changed blocks from remote source data directory.
     436             :  */
     437             : void
     438           8 : libpq_executeFileMap(filemap_t *map)
     439             : {
     440             :     file_entry_t *entry;
     441             :     const char *sql;
     442             :     PGresult   *res;
     443             :     int         i;
     444             : 
     445             :     /*
     446             :      * First create a temporary table, and load it with the blocks that we
     447             :      * need to fetch.
     448             :      */
     449           8 :     sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
     450           8 :     run_simple_command(sql);
     451             : 
     452           8 :     sql = "COPY fetchchunks FROM STDIN";
     453           8 :     res = PQexec(conn, sql);
     454             : 
     455           8 :     if (PQresultStatus(res) != PGRES_COPY_IN)
     456           0 :         pg_fatal("could not send file list: %s",
     457             :                  PQresultErrorMessage(res));
     458           8 :     PQclear(res);
     459             : 
     460       10648 :     for (i = 0; i < map->narray; i++)
     461             :     {
     462       10640 :         entry = map->array[i];
     463             : 
     464             :         /* If this is a relation file, copy the modified blocks */
     465       10640 :         execute_pagemap(&entry->pagemap, entry->path);
     466             : 
     467       10640 :         switch (entry->action)
     468             :         {
     469             :             case FILE_ACTION_NONE:
     470             :                 /* nothing else to do */
     471        6918 :                 break;
     472             : 
     473             :             case FILE_ACTION_COPY:
     474             :                 /* Truncate the old file out of the way, if any */
     475        3004 :                 open_target_file(entry->path, true);
     476        3004 :                 fetch_file_range(entry->path, 0, entry->newsize);
     477        3004 :                 break;
     478             : 
     479             :             case FILE_ACTION_TRUNCATE:
     480           2 :                 truncate_target_file(entry->path, entry->newsize);
     481           2 :                 break;
     482             : 
     483             :             case FILE_ACTION_COPY_TAIL:
     484           2 :                 fetch_file_range(entry->path, entry->oldsize, entry->newsize);
     485           2 :                 break;
     486             : 
     487             :             case FILE_ACTION_REMOVE:
     488         708 :                 remove_target(entry);
     489         708 :                 break;
     490             : 
     491             :             case FILE_ACTION_CREATE:
     492           6 :                 create_target(entry);
     493           6 :                 break;
     494             :         }
     495             :     }
     496             : 
     497           8 :     if (PQputCopyEnd(conn, NULL) != 1)
     498           0 :         pg_fatal("could not send end-of-COPY: %s",
     499             :                  PQerrorMessage(conn));
     500             : 
     501          24 :     while ((res = PQgetResult(conn)) != NULL)
     502             :     {
     503           8 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     504           0 :             pg_fatal("unexpected result while sending file list: %s",
     505             :                      PQresultErrorMessage(res));
     506           8 :         PQclear(res);
     507             :     }
     508             : 
     509             :     /*
     510             :      * We've now copied the list of file ranges that we need to fetch to the
     511             :      * temporary table. Now, actually fetch all of those ranges.
     512             :      */
     513           8 :     sql =
     514             :         "SELECT path, begin,\n"
     515             :         "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
     516             :         "FROM fetchchunks\n";
     517             : 
     518           8 :     receiveFileChunks(sql);
     519           8 : }
     520             : 
     521             : static void
     522       10640 : execute_pagemap(datapagemap_t *pagemap, const char *path)
     523             : {
     524             :     datapagemap_iterator_t *iter;
     525             :     BlockNumber blkno;
     526             :     off_t       offset;
     527             : 
     528       10640 :     iter = datapagemap_iterate(pagemap);
     529       21340 :     while (datapagemap_next(iter, &blkno))
     530             :     {
     531          60 :         offset = blkno * BLCKSZ;
     532             : 
     533          60 :         fetch_file_range(path, offset, offset + BLCKSZ);
     534             :     }
     535       10640 :     pg_free(iter);
     536       10640 : }

Generated by: LCOV version 1.13