LCOV - code coverage report
Current view: top level - src/bin/pg_rewind - libpq_fetch.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 155 190 81.6 %
Date: 2020-06-03 10:06:28 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq_fetch.c
       4             :  *    Functions for fetching files from a remote server.
       5             :  *
       6             :  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
       7             :  *
       8             :  *-------------------------------------------------------------------------
       9             :  */
      10             : #include "postgres_fe.h"
      11             : 
      12             : #include <sys/stat.h>
      13             : #include <dirent.h>
      14             : #include <fcntl.h>
      15             : #include <unistd.h>
      16             : 
      17             : #include "catalog/pg_type_d.h"
      18             : #include "datapagemap.h"
      19             : #include "fe_utils/connect.h"
      20             : #include "fetch.h"
      21             : #include "file_ops.h"
      22             : #include "filemap.h"
      23             : #include "pg_rewind.h"
      24             : #include "port/pg_bswap.h"
      25             : 
      26             : PGconn     *conn = NULL;
      27             : 
      28             : /*
      29             :  * Files are fetched max CHUNKSIZE bytes at a time.
      30             :  *
      31             :  * (This only applies to files that are copied in whole, or for truncated
      32             :  * files where we copy the tail. Relation files, where we know the individual
      33             :  * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
      34             :  */
      35             : #define CHUNKSIZE 1000000
      36             : 
      37             : static void receiveFileChunks(const char *sql);
      38             : static void execute_pagemap(datapagemap_t *pagemap, const char *path);
      39             : static char *run_simple_query(const char *sql);
      40             : static void run_simple_command(const char *sql);
      41             : 
      42             : void
      43           8 : libpqConnect(const char *connstr)
      44             : {
      45             :     char       *str;
      46             :     PGresult   *res;
      47             : 
      48           8 :     conn = PQconnectdb(connstr);
      49           8 :     if (PQstatus(conn) == CONNECTION_BAD)
      50           0 :         pg_fatal("could not connect to server: %s",
      51             :                  PQerrorMessage(conn));
      52             : 
      53           8 :     if (showprogress)
      54           0 :         pg_log_info("connected to server");
      55             : 
      56             :     /* disable all types of timeouts */
      57           8 :     run_simple_command("SET statement_timeout = 0");
      58           8 :     run_simple_command("SET lock_timeout = 0");
      59           8 :     run_simple_command("SET idle_in_transaction_session_timeout = 0");
      60             : 
      61           8 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
      62           8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
      63           0 :         pg_fatal("could not clear search_path: %s",
      64             :                  PQresultErrorMessage(res));
      65           8 :     PQclear(res);
      66             : 
      67             :     /*
      68             :      * Check that the server is not in hot standby mode. There is no
      69             :      * fundamental reason that couldn't be made to work, but it doesn't
      70             :      * currently because we use a temporary table. Better to check for it
      71             :      * explicitly than error out, for a better error message.
      72             :      */
      73           8 :     str = run_simple_query("SELECT pg_is_in_recovery()");
      74           8 :     if (strcmp(str, "f") != 0)
      75           0 :         pg_fatal("source server must not be in recovery mode");
      76           8 :     pg_free(str);
      77             : 
      78             :     /*
      79             :      * Also check that full_page_writes is enabled.  We can get torn pages if
      80             :      * a page is modified while we read it with pg_read_binary_file(), and we
      81             :      * rely on full page images to fix them.
      82             :      */
      83           8 :     str = run_simple_query("SHOW full_page_writes");
      84           8 :     if (strcmp(str, "on") != 0)
      85           0 :         pg_fatal("full_page_writes must be enabled in the source server");
      86           8 :     pg_free(str);
      87             : 
      88             :     /*
      89             :      * Although we don't do any "real" updates, we do work with a temporary
      90             :      * table. We don't care about synchronous commit for that. It doesn't
      91             :      * otherwise matter much, but if the server is using synchronous
      92             :      * replication, and replication isn't working for some reason, we don't
      93             :      * want to get stuck, waiting for it to start working again.
      94             :      */
      95           8 :     run_simple_command("SET synchronous_commit = off");
      96           8 : }
      97             : 
      98             : /*
      99             :  * Runs a query that returns a single value.
     100             :  * The result should be pg_free'd after use.
     101             :  */
     102             : static char *
     103          24 : run_simple_query(const char *sql)
     104             : {
     105             :     PGresult   *res;
     106             :     char       *result;
     107             : 
     108          24 :     res = PQexec(conn, sql);
     109             : 
     110          24 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     111           0 :         pg_fatal("error running query (%s) on source server: %s",
     112             :                  sql, PQresultErrorMessage(res));
     113             : 
     114             :     /* sanity check the result set */
     115          24 :     if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     116           0 :         pg_fatal("unexpected result set from query");
     117             : 
     118          24 :     result = pg_strdup(PQgetvalue(res, 0, 0));
     119             : 
     120          24 :     PQclear(res);
     121             : 
     122          24 :     return result;
     123             : }
     124             : 
     125             : /*
     126             :  * Runs a command.
     127             :  * In the event of a failure, exit immediately.
     128             :  */
     129             : static void
     130          40 : run_simple_command(const char *sql)
     131             : {
     132             :     PGresult   *res;
     133             : 
     134          40 :     res = PQexec(conn, sql);
     135             : 
     136          40 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     137           0 :         pg_fatal("error running query (%s) in source server: %s",
     138             :                  sql, PQresultErrorMessage(res));
     139             : 
     140          40 :     PQclear(res);
     141          40 : }
     142             : 
     143             : /*
     144             :  * Calls pg_current_wal_insert_lsn() function
     145             :  */
     146             : XLogRecPtr
     147           8 : libpqGetCurrentXlogInsertLocation(void)
     148             : {
     149             :     XLogRecPtr  result;
     150             :     uint32      hi;
     151             :     uint32      lo;
     152             :     char       *val;
     153             : 
     154           8 :     val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
     155             : 
     156           8 :     if (sscanf(val, "%X/%X", &hi, &lo) != 2)
     157           0 :         pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
     158             : 
     159           8 :     result = ((uint64) hi) << 32 | lo;
     160             : 
     161           8 :     pg_free(val);
     162             : 
     163           8 :     return result;
     164             : }
     165             : 
     166             : /*
     167             :  * Get a list of all files in the data directory.
     168             :  */
     169             : void
     170           8 : libpqProcessFileList(void)
     171             : {
     172             :     PGresult   *res;
     173             :     const char *sql;
     174             :     int         i;
     175             : 
     176             :     /*
     177             :      * Create a recursive directory listing of the whole data directory.
     178             :      *
     179             :      * The WITH RECURSIVE part does most of the work. The second part gets the
     180             :      * targets of the symlinks in pg_tblspc directory.
     181             :      *
     182             :      * XXX: There is no backend function to get a symbolic link's target in
     183             :      * general, so if the admin has put any custom symbolic links in the data
     184             :      * directory, they won't be copied correctly.
     185             :      */
     186           8 :     sql =
     187             :         "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
     188             :         "  SELECT '' AS path, filename, size, isdir FROM\n"
     189             :         "  (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
     190             :         "        pg_stat_file(fn.filename, true) AS this\n"
     191             :         "  UNION ALL\n"
     192             :         "  SELECT parent.path || parent.filename || '/' AS path,\n"
     193             :         "         fn, this.size, this.isdir\n"
     194             :         "  FROM files AS parent,\n"
     195             :         "       pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
     196             :         "       pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
     197             :         "       WHERE parent.isdir = 't'\n"
     198             :         ")\n"
     199             :         "SELECT path || filename, size, isdir,\n"
     200             :         "       pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
     201             :         "FROM files\n"
     202             :         "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
     203             :         "                             AND oid::text = files.filename\n";
     204           8 :     res = PQexec(conn, sql);
     205             : 
     206           8 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     207           0 :         pg_fatal("could not fetch file list: %s",
     208             :                  PQresultErrorMessage(res));
     209             : 
     210             :     /* sanity check the result set */
     211           8 :     if (PQnfields(res) != 4)
     212           0 :         pg_fatal("unexpected result set while fetching file list");
     213             : 
     214             :     /* Read result to local variables */
     215        9564 :     for (i = 0; i < PQntuples(res); i++)
     216             :     {
     217        9556 :         char       *path = PQgetvalue(res, i, 0);
     218        9556 :         int64       filesize = atol(PQgetvalue(res, i, 1));
     219        9556 :         bool        isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
     220        9556 :         char       *link_target = PQgetvalue(res, i, 3);
     221             :         file_type_t type;
     222             : 
     223        9556 :         if (PQgetisnull(res, 0, 1))
     224             :         {
     225             :             /*
     226             :              * The file was removed from the server while the query was
     227             :              * running. Ignore it.
     228             :              */
     229           0 :             continue;
     230             :         }
     231             : 
     232        9556 :         if (link_target[0])
     233           0 :             type = FILE_TYPE_SYMLINK;
     234        9556 :         else if (isdir)
     235         214 :             type = FILE_TYPE_DIRECTORY;
     236             :         else
     237        9342 :             type = FILE_TYPE_REGULAR;
     238             : 
     239        9556 :         process_source_file(path, type, filesize, link_target);
     240             :     }
     241           8 :     PQclear(res);
     242           8 : }
     243             : 
     244             : /*----
     245             :  * Runs a query, which returns pieces of files from the remote source data
     246             :  * directory, and overwrites the corresponding parts of target files with
     247             :  * the received parts. The result set is expected to be of format:
     248             :  *
     249             :  * path     text    -- path in the data directory, e.g "base/1/123"
     250             :  * begin    int8    -- offset within the file
     251             :  * chunk    bytea   -- file content
     252             :  *----
     253             :  */
     254             : static void
     255           8 : receiveFileChunks(const char *sql)
     256             : {
     257             :     PGresult   *res;
     258             : 
     259           8 :     if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
     260           0 :         pg_fatal("could not send query: %s", PQerrorMessage(conn));
     261             : 
     262           8 :     pg_log_debug("getting file chunks");
     263             : 
     264           8 :     if (PQsetSingleRowMode(conn) != 1)
     265           0 :         pg_fatal("could not set libpq connection to single row mode");
     266             : 
     267        3172 :     while ((res = PQgetResult(conn)) != NULL)
     268             :     {
     269             :         char       *filename;
     270             :         int         filenamelen;
     271             :         int64       chunkoff;
     272             :         int         chunksize;
     273             :         char       *chunk;
     274             : 
     275        3164 :         switch (PQresultStatus(res))
     276             :         {
     277        3156 :             case PGRES_SINGLE_TUPLE:
     278        3156 :                 break;
     279             : 
     280           8 :             case PGRES_TUPLES_OK:
     281           8 :                 PQclear(res);
     282           8 :                 continue;       /* final zero-row result */
     283             : 
     284           0 :             default:
     285           0 :                 pg_fatal("unexpected result while fetching remote files: %s",
     286             :                          PQresultErrorMessage(res));
     287             :         }
     288             : 
     289             :         /* sanity check the result set */
     290        3156 :         if (PQnfields(res) != 3 || PQntuples(res) != 1)
     291           0 :             pg_fatal("unexpected result set size while fetching remote files");
     292             : 
     293        6312 :         if (PQftype(res, 0) != TEXTOID ||
     294        6312 :             PQftype(res, 1) != INT8OID ||
     295        3156 :             PQftype(res, 2) != BYTEAOID)
     296             :         {
     297           0 :             pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
     298             :                      PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
     299             :         }
     300             : 
     301        3156 :         if (PQfformat(res, 0) != 1 &&
     302           0 :             PQfformat(res, 1) != 1 &&
     303           0 :             PQfformat(res, 2) != 1)
     304             :         {
     305           0 :             pg_fatal("unexpected result format while fetching remote files");
     306             :         }
     307             : 
     308        6312 :         if (PQgetisnull(res, 0, 0) ||
     309        3156 :             PQgetisnull(res, 0, 1))
     310             :         {
     311           0 :             pg_fatal("unexpected null values in result while fetching remote files");
     312             :         }
     313             : 
     314        3156 :         if (PQgetlength(res, 0, 1) != sizeof(int64))
     315           0 :             pg_fatal("unexpected result length while fetching remote files");
     316             : 
     317             :         /* Read result set to local variables */
     318        3156 :         memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
     319        3156 :         chunkoff = pg_ntoh64(chunkoff);
     320        3156 :         chunksize = PQgetlength(res, 0, 2);
     321             : 
     322        3156 :         filenamelen = PQgetlength(res, 0, 0);
     323        3156 :         filename = pg_malloc(filenamelen + 1);
     324        3156 :         memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
     325        3156 :         filename[filenamelen] = '\0';
     326             : 
     327        3156 :         chunk = PQgetvalue(res, 0, 2);
     328             : 
     329             :         /*
     330             :          * If a file has been deleted on the source, remove it on the target
     331             :          * as well.  Note that multiple unlink() calls may happen on the same
     332             :          * file if multiple data chunks are associated with it, hence ignore
     333             :          * unconditionally anything missing.  If this file is not a relation
     334             :          * data file, then it has been already truncated when creating the
     335             :          * file chunk list at the previous execution of the filemap.
     336             :          */
     337        3156 :         if (PQgetisnull(res, 0, 2))
     338             :         {
     339           0 :             pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
     340             :                          filename);
     341           0 :             remove_target_file(filename, true);
     342           0 :             pg_free(filename);
     343           0 :             PQclear(res);
     344           0 :             continue;
     345             :         }
     346             : 
     347        3156 :         pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
     348             :                      filename, (long long int) chunkoff, chunksize);
     349             : 
     350        3156 :         open_target_file(filename, false);
     351             : 
     352        3156 :         write_target_range(chunk, chunkoff, chunksize);
     353             : 
     354        3156 :         pg_free(filename);
     355             : 
     356        3156 :         PQclear(res);
     357             :     }
     358           8 : }
     359             : 
     360             : /*
     361             :  * Receive a single file as a malloc'd buffer.
     362             :  */
     363             : char *
     364          16 : libpqGetFile(const char *filename, size_t *filesize)
     365             : {
     366             :     PGresult   *res;
     367             :     char       *result;
     368             :     int         len;
     369             :     const char *paramValues[1];
     370             : 
     371          16 :     paramValues[0] = filename;
     372          16 :     res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
     373             :                        1, NULL, paramValues, NULL, NULL, 1);
     374             : 
     375          16 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     376           0 :         pg_fatal("could not fetch remote file \"%s\": %s",
     377             :                  filename, PQresultErrorMessage(res));
     378             : 
     379             :     /* sanity check the result set */
     380          16 :     if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     381           0 :         pg_fatal("unexpected result set while fetching remote file \"%s\"",
     382             :                  filename);
     383             : 
     384             :     /* Read result to local variables */
     385          16 :     len = PQgetlength(res, 0, 0);
     386          16 :     result = pg_malloc(len + 1);
     387          16 :     memcpy(result, PQgetvalue(res, 0, 0), len);
     388          16 :     result[len] = '\0';
     389             : 
     390          16 :     PQclear(res);
     391             : 
     392          16 :     pg_log_debug("fetched file \"%s\", length %d", filename, len);
     393             : 
     394          16 :     if (filesize)
     395           8 :         *filesize = len;
     396          16 :     return result;
     397             : }
     398             : 
     399             : /*
     400             :  * Write a file range to a temporary table in the server.
     401             :  *
     402             :  * The range is sent to the server as a COPY formatted line, to be inserted
     403             :  * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
     404             :  * function to actually fetch the data.
     405             :  */
     406             : static void
     407        2906 : fetch_file_range(const char *path, uint64 begin, uint64 end)
     408             : {
     409             :     char        linebuf[MAXPGPATH + 23];
     410             : 
     411             :     /* Split the range into CHUNKSIZE chunks */
     412        6062 :     while (end - begin > 0)
     413             :     {
     414             :         unsigned int len;
     415             : 
     416             :         /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
     417        3156 :         if (end - begin > CHUNKSIZE)
     418         352 :             len = CHUNKSIZE;
     419             :         else
     420        2804 :             len = (unsigned int) (end - begin);
     421             : 
     422        3156 :         snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
     423             : 
     424        3156 :         if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
     425           0 :             pg_fatal("could not send COPY data: %s",
     426             :                      PQerrorMessage(conn));
     427             : 
     428        3156 :         begin += len;
     429             :     }
     430        2906 : }
     431             : 
     432             : /*
     433             :  * Fetch all changed blocks from remote source data directory.
     434             :  */
     435             : void
     436           8 : libpq_executeFileMap(filemap_t *map)
     437             : {
     438             :     file_entry_t *entry;
     439             :     const char *sql;
     440             :     PGresult   *res;
     441             :     int         i;
     442             : 
     443             :     /*
     444             :      * First create a temporary table, and load it with the blocks that we
     445             :      * need to fetch.
     446             :      */
     447           8 :     sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
     448           8 :     run_simple_command(sql);
     449             : 
     450           8 :     sql = "COPY fetchchunks FROM STDIN";
     451           8 :     res = PQexec(conn, sql);
     452             : 
     453           8 :     if (PQresultStatus(res) != PGRES_COPY_IN)
     454           0 :         pg_fatal("could not send file list: %s",
     455             :                  PQresultErrorMessage(res));
     456           8 :     PQclear(res);
     457             : 
     458       10148 :     for (i = 0; i < map->narray; i++)
     459             :     {
     460       10140 :         entry = map->array[i];
     461             : 
     462             :         /* If this is a relation file, copy the modified blocks */
     463       10140 :         execute_pagemap(&entry->pagemap, entry->path);
     464             : 
     465       10140 :         switch (entry->action)
     466             :         {
     467        6634 :             case FILE_ACTION_NONE:
     468             :                 /* nothing else to do */
     469        6634 :                 break;
     470             : 
     471        2850 :             case FILE_ACTION_COPY:
     472             :                 /* Truncate the old file out of the way, if any */
     473        2850 :                 open_target_file(entry->path, true);
     474        2850 :                 fetch_file_range(entry->path, 0, entry->newsize);
     475        2850 :                 break;
     476             : 
     477           2 :             case FILE_ACTION_TRUNCATE:
     478           2 :                 truncate_target_file(entry->path, entry->newsize);
     479           2 :                 break;
     480             : 
     481           2 :             case FILE_ACTION_COPY_TAIL:
     482           2 :                 fetch_file_range(entry->path, entry->oldsize, entry->newsize);
     483           2 :                 break;
     484             : 
     485         646 :             case FILE_ACTION_REMOVE:
     486         646 :                 remove_target(entry);
     487         646 :                 break;
     488             : 
     489           6 :             case FILE_ACTION_CREATE:
     490           6 :                 create_target(entry);
     491           6 :                 break;
     492             :         }
     493       10140 :     }
     494             : 
     495           8 :     if (PQputCopyEnd(conn, NULL) != 1)
     496           0 :         pg_fatal("could not send end-of-COPY: %s",
     497             :                  PQerrorMessage(conn));
     498             : 
     499          16 :     while ((res = PQgetResult(conn)) != NULL)
     500             :     {
     501           8 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     502           0 :             pg_fatal("unexpected result while sending file list: %s",
     503             :                      PQresultErrorMessage(res));
     504           8 :         PQclear(res);
     505             :     }
     506             : 
     507             :     /*
     508             :      * We've now copied the list of file ranges that we need to fetch to the
     509             :      * temporary table. Now, actually fetch all of those ranges.
     510             :      */
     511           8 :     sql =
     512             :         "SELECT path, begin,\n"
     513             :         "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
     514             :         "FROM fetchchunks\n";
     515             : 
     516           8 :     receiveFileChunks(sql);
     517           8 : }
     518             : 
     519             : static void
     520       10140 : execute_pagemap(datapagemap_t *pagemap, const char *path)
     521             : {
     522             :     datapagemap_iterator_t *iter;
     523             :     BlockNumber blkno;
     524             :     off_t       offset;
     525             : 
     526       10140 :     iter = datapagemap_iterate(pagemap);
     527       10194 :     while (datapagemap_next(iter, &blkno))
     528             :     {
     529          54 :         offset = blkno * BLCKSZ;
     530             : 
     531          54 :         fetch_file_range(path, offset, offset + BLCKSZ);
     532             :     }
     533       10140 :     pg_free(iter);
     534       10140 : }

Generated by: LCOV version 1.13