LCOV - code coverage report
Current view: top level - src/bin/pg_rewind - libpq_fetch.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 143 178 80.3 %
Date: 2019-06-18 07:06:57 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq_fetch.c
       4             :  *    Functions for fetching files from a remote server.
       5             :  *
       6             :  * Copyright (c) 2013-2019, PostgreSQL Global Development Group
       7             :  *
       8             :  *-------------------------------------------------------------------------
       9             :  */
      10             : #include "postgres_fe.h"
      11             : 
      12             : #include <sys/stat.h>
      13             : #include <dirent.h>
      14             : #include <fcntl.h>
      15             : #include <unistd.h>
      16             : 
      17             : #include "pg_rewind.h"
      18             : #include "datapagemap.h"
      19             : #include "fetch.h"
      20             : #include "file_ops.h"
      21             : #include "filemap.h"
      22             : 
      23             : #include "libpq-fe.h"
      24             : #include "catalog/pg_type_d.h"
      25             : #include "fe_utils/connect.h"
      26             : #include "port/pg_bswap.h"
      27             : 
      28             : static PGconn *conn = NULL;
      29             : 
      30             : /*
      31             :  * Files are fetched max CHUNKSIZE bytes at a time.
      32             :  *
      33             :  * (This only applies to files that are copied in whole, or for truncated
      34             :  * files where we copy the tail. Relation files, where we know the individual
      35             :  * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
      36             :  */
      37             : #define CHUNKSIZE 1000000
      38             : 
      39             : static void receiveFileChunks(const char *sql);
      40             : static void execute_pagemap(datapagemap_t *pagemap, const char *path);
      41             : static char *run_simple_query(const char *sql);
      42             : 
      43             : void
      44           8 : libpqConnect(const char *connstr)
      45             : {
      46             :     char       *str;
      47             :     PGresult   *res;
      48             : 
      49           8 :     conn = PQconnectdb(connstr);
      50           8 :     if (PQstatus(conn) == CONNECTION_BAD)
      51           0 :         pg_fatal("could not connect to server: %s",
      52             :                  PQerrorMessage(conn));
      53             : 
      54           8 :     if (showprogress)
      55           0 :         pg_log_info("connected to server");
      56             : 
      57           8 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
      58           8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
      59           0 :         pg_fatal("could not clear search_path: %s",
      60             :                  PQresultErrorMessage(res));
      61           8 :     PQclear(res);
      62             : 
      63             :     /*
      64             :      * Check that the server is not in hot standby mode. There is no
      65             :      * fundamental reason that couldn't be made to work, but it doesn't
      66             :      * currently because we use a temporary table. Better to check for it
      67             :      * explicitly than error out, for a better error message.
      68             :      */
      69           8 :     str = run_simple_query("SELECT pg_is_in_recovery()");
      70           8 :     if (strcmp(str, "f") != 0)
      71           0 :         pg_fatal("source server must not be in recovery mode");
      72           8 :     pg_free(str);
      73             : 
      74             :     /*
      75             :      * Also check that full_page_writes is enabled.  We can get torn pages if
      76             :      * a page is modified while we read it with pg_read_binary_file(), and we
      77             :      * rely on full page images to fix them.
      78             :      */
      79           8 :     str = run_simple_query("SHOW full_page_writes");
      80           8 :     if (strcmp(str, "on") != 0)
      81           0 :         pg_fatal("full_page_writes must be enabled in the source server");
      82           8 :     pg_free(str);
      83             : 
      84             :     /*
      85             :      * Although we don't do any "real" updates, we do work with a temporary
      86             :      * table. We don't care about synchronous commit for that. It doesn't
      87             :      * otherwise matter much, but if the server is using synchronous
      88             :      * replication, and replication isn't working for some reason, we don't
      89             :      * want to get stuck, waiting for it to start working again.
      90             :      */
      91           8 :     res = PQexec(conn, "SET synchronous_commit = off");
      92           8 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
      93           0 :         pg_fatal("could not set up connection context: %s",
      94             :                  PQresultErrorMessage(res));
      95           8 :     PQclear(res);
      96           8 : }
      97             : 
      98             : /*
      99             :  * Runs a query that returns a single value.
     100             :  * The result should be pg_free'd after use.
     101             :  */
     102             : static char *
     103          24 : run_simple_query(const char *sql)
     104             : {
     105             :     PGresult   *res;
     106             :     char       *result;
     107             : 
     108          24 :     res = PQexec(conn, sql);
     109             : 
     110          24 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     111           0 :         pg_fatal("error running query (%s) in source server: %s",
     112             :                  sql, PQresultErrorMessage(res));
     113             : 
     114             :     /* sanity check the result set */
     115          24 :     if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     116           0 :         pg_fatal("unexpected result set from query");
     117             : 
     118          24 :     result = pg_strdup(PQgetvalue(res, 0, 0));
     119             : 
     120          24 :     PQclear(res);
     121             : 
     122          24 :     return result;
     123             : }
     124             : 
     125             : /*
     126             :  * Calls pg_current_wal_insert_lsn() function
     127             :  */
     128             : XLogRecPtr
     129           8 : libpqGetCurrentXlogInsertLocation(void)
     130             : {
     131             :     XLogRecPtr  result;
     132             :     uint32      hi;
     133             :     uint32      lo;
     134             :     char       *val;
     135             : 
     136           8 :     val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
     137             : 
     138           8 :     if (sscanf(val, "%X/%X", &hi, &lo) != 2)
     139           0 :         pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
     140             : 
     141           8 :     result = ((uint64) hi) << 32 | lo;
     142             : 
     143           8 :     pg_free(val);
     144             : 
     145           8 :     return result;
     146             : }
     147             : 
     148             : /*
     149             :  * Get a list of all files in the data directory.
     150             :  */
     151             : void
     152           8 : libpqProcessFileList(void)
     153             : {
     154             :     PGresult   *res;
     155             :     const char *sql;
     156             :     int         i;
     157             : 
     158             :     /*
     159             :      * Create a recursive directory listing of the whole data directory.
     160             :      *
     161             :      * The WITH RECURSIVE part does most of the work. The second part gets the
     162             :      * targets of the symlinks in pg_tblspc directory.
     163             :      *
     164             :      * XXX: There is no backend function to get a symbolic link's target in
     165             :      * general, so if the admin has put any custom symbolic links in the data
     166             :      * directory, they won't be copied correctly.
     167             :      */
     168           8 :     sql =
     169             :         "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
     170             :         "  SELECT '' AS path, filename, size, isdir FROM\n"
     171             :         "  (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
     172             :         "        pg_stat_file(fn.filename, true) AS this\n"
     173             :         "  UNION ALL\n"
     174             :         "  SELECT parent.path || parent.filename || '/' AS path,\n"
     175             :         "         fn, this.size, this.isdir\n"
     176             :         "  FROM files AS parent,\n"
     177             :         "       pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
     178             :         "       pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
     179             :         "       WHERE parent.isdir = 't'\n"
     180             :         ")\n"
     181             :         "SELECT path || filename, size, isdir,\n"
     182             :         "       pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
     183             :         "FROM files\n"
     184             :         "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
     185             :         "                             AND oid::text = files.filename\n";
     186           8 :     res = PQexec(conn, sql);
     187             : 
     188           8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     189           0 :         pg_fatal("could not fetch file list: %s",
     190             :                  PQresultErrorMessage(res));
     191             : 
     192             :     /* sanity check the result set */
     193           8 :     if (PQnfields(res) != 4)
     194           0 :         pg_fatal("unexpected result set while fetching file list");
     195             : 
     196             :     /* Read result to local variables */
     197       10000 :     for (i = 0; i < PQntuples(res); i++)
     198             :     {
     199        9992 :         char       *path = PQgetvalue(res, i, 0);
     200        9992 :         int64       filesize = atol(PQgetvalue(res, i, 1));
     201        9992 :         bool        isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
     202        9992 :         char       *link_target = PQgetvalue(res, i, 3);
     203             :         file_type_t type;
     204             : 
     205        9992 :         if (PQgetisnull(res, 0, 1))
     206             :         {
     207             :             /*
     208             :              * The file was removed from the server while the query was
     209             :              * running. Ignore it.
     210             :              */
     211           0 :             continue;
     212             :         }
     213             : 
     214        9992 :         if (link_target[0])
     215           0 :             type = FILE_TYPE_SYMLINK;
     216        9992 :         else if (isdir)
     217         214 :             type = FILE_TYPE_DIRECTORY;
     218             :         else
     219        9778 :             type = FILE_TYPE_REGULAR;
     220             : 
     221        9992 :         process_source_file(path, type, filesize, link_target);
     222             :     }
     223           8 :     PQclear(res);
     224           8 : }
     225             : 
     226             : /*----
     227             :  * Runs a query, which returns pieces of files from the remote source data
     228             :  * directory, and overwrites the corresponding parts of target files with
     229             :  * the received parts. The result set is expected to be of format:
     230             :  *
     231             :  * path     text    -- path in the data directory, e.g "base/1/123"
     232             :  * begin    int8    -- offset within the file
     233             :  * chunk    bytea   -- file content
     234             :  *----
     235             :  */
     236             : static void
     237           8 : receiveFileChunks(const char *sql)
     238             : {
     239             :     PGresult   *res;
     240             : 
     241           8 :     if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
     242           0 :         pg_fatal("could not send query: %s", PQerrorMessage(conn));
     243             : 
     244           8 :     pg_log_debug("getting file chunks");
     245             : 
     246           8 :     if (PQsetSingleRowMode(conn) != 1)
     247           0 :         pg_fatal("could not set libpq connection to single row mode");
     248             : 
     249        3332 :     while ((res = PQgetResult(conn)) != NULL)
     250             :     {
     251             :         char       *filename;
     252             :         int         filenamelen;
     253             :         int64       chunkoff;
     254             :         char        chunkoff_str[32];
     255             :         int         chunksize;
     256             :         char       *chunk;
     257             : 
     258        3316 :         switch (PQresultStatus(res))
     259             :         {
     260             :             case PGRES_SINGLE_TUPLE:
     261        3308 :                 break;
     262             : 
     263             :             case PGRES_TUPLES_OK:
     264           8 :                 PQclear(res);
     265          16 :                 continue;       /* final zero-row result */
     266             : 
     267             :             default:
     268           0 :                 pg_fatal("unexpected result while fetching remote files: %s",
     269             :                          PQresultErrorMessage(res));
     270             :         }
     271             : 
     272             :         /* sanity check the result set */
     273        3308 :         if (PQnfields(res) != 3 || PQntuples(res) != 1)
     274           0 :             pg_fatal("unexpected result set size while fetching remote files");
     275             : 
     276        6616 :         if (PQftype(res, 0) != TEXTOID ||
     277        6616 :             PQftype(res, 1) != INT8OID ||
     278        3308 :             PQftype(res, 2) != BYTEAOID)
     279             :         {
     280           0 :             pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
     281             :                      PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
     282             :         }
     283             : 
     284        3308 :         if (PQfformat(res, 0) != 1 &&
     285           0 :             PQfformat(res, 1) != 1 &&
     286           0 :             PQfformat(res, 2) != 1)
     287             :         {
     288           0 :             pg_fatal("unexpected result format while fetching remote files");
     289             :         }
     290             : 
     291        6616 :         if (PQgetisnull(res, 0, 0) ||
     292        3308 :             PQgetisnull(res, 0, 1))
     293             :         {
     294           0 :             pg_fatal("unexpected null values in result while fetching remote files");
     295             :         }
     296             : 
     297        3308 :         if (PQgetlength(res, 0, 1) != sizeof(int64))
     298           0 :             pg_fatal("unexpected result length while fetching remote files");
     299             : 
     300             :         /* Read result set to local variables */
     301        3308 :         memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
     302        3308 :         chunkoff = pg_ntoh64(chunkoff);
     303        3308 :         chunksize = PQgetlength(res, 0, 2);
     304             : 
     305        3308 :         filenamelen = PQgetlength(res, 0, 0);
     306        3308 :         filename = pg_malloc(filenamelen + 1);
     307        3308 :         memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
     308        3308 :         filename[filenamelen] = '\0';
     309             : 
     310        3308 :         chunk = PQgetvalue(res, 0, 2);
     311             : 
     312             :         /*
     313             :          * If a file has been deleted on the source, remove it on the target
     314             :          * as well.  Note that multiple unlink() calls may happen on the same
     315             :          * file if multiple data chunks are associated with it, hence ignore
     316             :          * unconditionally anything missing.  If this file is not a relation
     317             :          * data file, then it has been already truncated when creating the
     318             :          * file chunk list at the previous execution of the filemap.
     319             :          */
     320        3308 :         if (PQgetisnull(res, 0, 2))
     321             :         {
     322           0 :             pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
     323             :                          filename);
     324           0 :             remove_target_file(filename, true);
     325           0 :             pg_free(filename);
     326           0 :             PQclear(res);
     327           0 :             continue;
     328             :         }
     329             : 
     330             :         /*
     331             :          * Separate step to keep platform-dependent format code out of
     332             :          * translatable strings.
     333             :          */
     334        3308 :         snprintf(chunkoff_str, sizeof(chunkoff_str), INT64_FORMAT, chunkoff);
     335        3308 :         pg_log_debug("received chunk for file \"%s\", offset %s, size %d",
     336             :                      filename, chunkoff_str, chunksize);
     337             : 
     338        3308 :         open_target_file(filename, false);
     339             : 
     340        3308 :         write_target_range(chunk, chunkoff, chunksize);
     341             : 
     342        3308 :         pg_free(filename);
     343             : 
     344        3308 :         PQclear(res);
     345             :     }
     346           8 : }
     347             : 
     348             : /*
     349             :  * Receive a single file as a malloc'd buffer.
     350             :  */
     351             : char *
     352          16 : libpqGetFile(const char *filename, size_t *filesize)
     353             : {
     354             :     PGresult   *res;
     355             :     char       *result;
     356             :     int         len;
     357             :     const char *paramValues[1];
     358             : 
     359          16 :     paramValues[0] = filename;
     360          16 :     res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
     361             :                        1, NULL, paramValues, NULL, NULL, 1);
     362             : 
     363          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     364           0 :         pg_fatal("could not fetch remote file \"%s\": %s",
     365             :                  filename, PQresultErrorMessage(res));
     366             : 
     367             :     /* sanity check the result set */
     368          16 :     if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     369           0 :         pg_fatal("unexpected result set while fetching remote file \"%s\"",
     370             :                  filename);
     371             : 
     372             :     /* Read result to local variables */
     373          16 :     len = PQgetlength(res, 0, 0);
     374          16 :     result = pg_malloc(len + 1);
     375          16 :     memcpy(result, PQgetvalue(res, 0, 0), len);
     376          16 :     result[len] = '\0';
     377             : 
     378          16 :     PQclear(res);
     379             : 
     380          16 :     pg_log_debug("fetched file \"%s\", length %d", filename, len);
     381             : 
     382          16 :     if (filesize)
     383           8 :         *filesize = len;
     384          16 :     return result;
     385             : }
     386             : 
     387             : /*
     388             :  * Write a file range to a temporary table in the server.
     389             :  *
     390             :  * The range is sent to the server as a COPY formatted line, to be inserted
     391             :  * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
     392             :  * function to actually fetch the data.
     393             :  */
     394             : static void
     395        3066 : fetch_file_range(const char *path, uint64 begin, uint64 end)
     396             : {
     397             :     char        linebuf[MAXPGPATH + 23];
     398             : 
     399             :     /* Split the range into CHUNKSIZE chunks */
     400        9440 :     while (end - begin > 0)
     401             :     {
     402             :         unsigned int len;
     403             : 
     404             :         /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
     405        3308 :         if (end - begin > CHUNKSIZE)
     406         352 :             len = CHUNKSIZE;
     407             :         else
     408        2956 :             len = (unsigned int) (end - begin);
     409             : 
     410        3308 :         snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
     411             : 
     412        3308 :         if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
     413           0 :             pg_fatal("could not send COPY data: %s",
     414             :                      PQerrorMessage(conn));
     415             : 
     416        3308 :         begin += len;
     417             :     }
     418        3066 : }
     419             : 
     420             : /*
     421             :  * Fetch all changed blocks from remote source data directory.
     422             :  */
     423             : void
     424           8 : libpq_executeFileMap(filemap_t *map)
     425             : {
     426             :     file_entry_t *entry;
     427             :     const char *sql;
     428             :     PGresult   *res;
     429             :     int         i;
     430             : 
     431             :     /*
     432             :      * First create a temporary table, and load it with the blocks that we
     433             :      * need to fetch.
     434             :      */
     435           8 :     sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
     436           8 :     res = PQexec(conn, sql);
     437             : 
     438           8 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     439           0 :         pg_fatal("could not create temporary table: %s",
     440             :                  PQresultErrorMessage(res));
     441           8 :     PQclear(res);
     442             : 
     443           8 :     sql = "COPY fetchchunks FROM STDIN";
     444           8 :     res = PQexec(conn, sql);
     445             : 
     446           8 :     if (PQresultStatus(res) != PGRES_COPY_IN)
     447           0 :         pg_fatal("could not send file list: %s",
     448             :                  PQresultErrorMessage(res));
     449           8 :     PQclear(res);
     450             : 
     451       10648 :     for (i = 0; i < map->narray; i++)
     452             :     {
     453       10640 :         entry = map->array[i];
     454             : 
     455             :         /* If this is a relation file, copy the modified blocks */
     456       10640 :         execute_pagemap(&entry->pagemap, entry->path);
     457             : 
     458       10640 :         switch (entry->action)
     459             :         {
     460             :             case FILE_ACTION_NONE:
     461             :                 /* nothing else to do */
     462        6918 :                 break;
     463             : 
     464             :             case FILE_ACTION_COPY:
     465             :                 /* Truncate the old file out of the way, if any */
     466        3004 :                 open_target_file(entry->path, true);
     467        3004 :                 fetch_file_range(entry->path, 0, entry->newsize);
     468        3004 :                 break;
     469             : 
     470             :             case FILE_ACTION_TRUNCATE:
     471           2 :                 truncate_target_file(entry->path, entry->newsize);
     472           2 :                 break;
     473             : 
     474             :             case FILE_ACTION_COPY_TAIL:
     475           2 :                 fetch_file_range(entry->path, entry->oldsize, entry->newsize);
     476           2 :                 break;
     477             : 
     478             :             case FILE_ACTION_REMOVE:
     479         708 :                 remove_target(entry);
     480         708 :                 break;
     481             : 
     482             :             case FILE_ACTION_CREATE:
     483           6 :                 create_target(entry);
     484           6 :                 break;
     485             :         }
     486             :     }
     487             : 
     488           8 :     if (PQputCopyEnd(conn, NULL) != 1)
     489           0 :         pg_fatal("could not send end-of-COPY: %s",
     490             :                  PQerrorMessage(conn));
     491             : 
     492          24 :     while ((res = PQgetResult(conn)) != NULL)
     493             :     {
     494           8 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     495           0 :             pg_fatal("unexpected result while sending file list: %s",
     496             :                      PQresultErrorMessage(res));
     497           8 :         PQclear(res);
     498             :     }
     499             : 
     500             :     /*
     501             :      * We've now copied the list of file ranges that we need to fetch to the
     502             :      * temporary table. Now, actually fetch all of those ranges.
     503             :      */
     504           8 :     sql =
     505             :         "SELECT path, begin,\n"
     506             :         "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
     507             :         "FROM fetchchunks\n";
     508             : 
     509           8 :     receiveFileChunks(sql);
     510           8 : }
     511             : 
     512             : static void
     513       10640 : execute_pagemap(datapagemap_t *pagemap, const char *path)
     514             : {
     515             :     datapagemap_iterator_t *iter;
     516             :     BlockNumber blkno;
     517             :     off_t       offset;
     518             : 
     519       10640 :     iter = datapagemap_iterate(pagemap);
     520       21340 :     while (datapagemap_next(iter, &blkno))
     521             :     {
     522          60 :         offset = blkno * BLCKSZ;
     523             : 
     524          60 :         fetch_file_range(path, offset, offset + BLCKSZ);
     525             :     }
     526       10640 :     pg_free(iter);
     527       10640 : }

Generated by: LCOV version 1.13