Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_db.c
4 : *
5 : * Implements the basic DB functions used by the archiver.
6 : *
7 : * IDENTIFICATION
8 : * src/bin/pg_dump/pg_backup_db.c
9 : *
10 : *-------------------------------------------------------------------------
11 : */
12 : #include "postgres_fe.h"
13 :
14 : #include <unistd.h>
15 : #include <ctype.h>
16 : #ifdef HAVE_TERMIOS_H
17 : #include <termios.h>
18 : #endif
19 :
20 : #include "common/connect.h"
21 : #include "common/string.h"
22 : #include "connectdb.h"
23 : #include "parallel.h"
24 : #include "pg_backup_archiver.h"
25 : #include "pg_backup_db.h"
26 : #include "pg_backup_utils.h"
27 :
28 : static void _check_database_version(ArchiveHandle *AH);
29 : static void notice_processor(void *arg, const char *message);
30 :
31 : static void
32 732 : _check_database_version(ArchiveHandle *AH)
33 : {
34 : const char *remoteversion_str;
35 : int remoteversion;
36 : PGresult *res;
37 :
38 732 : remoteversion_str = PQparameterStatus(AH->connection, "server_version");
39 732 : remoteversion = PQserverVersion(AH->connection);
40 732 : if (remoteversion == 0 || !remoteversion_str)
41 0 : pg_fatal("could not get \"server_version\" from libpq");
42 :
43 732 : AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
44 732 : AH->public.remoteVersion = remoteversion;
45 732 : if (!AH->archiveRemoteVersion)
46 502 : AH->archiveRemoteVersion = AH->public.remoteVersionStr;
47 :
48 732 : if (remoteversion != PG_VERSION_NUM
49 0 : && (remoteversion < AH->public.minRemoteVersion ||
50 0 : remoteversion > AH->public.maxRemoteVersion))
51 : {
52 0 : pg_log_error("aborting because of server version mismatch");
53 0 : pg_log_error_detail("server version: %s; %s version: %s",
54 : remoteversion_str, progname, PG_VERSION);
55 0 : exit(1);
56 : }
57 :
58 : /*
59 : * Check if server is in recovery mode, which means we are on a hot
60 : * standby.
61 : */
62 732 : res = ExecuteSqlQueryForSingleRow((Archive *) AH,
63 : "SELECT pg_catalog.pg_is_in_recovery()");
64 732 : AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
65 732 : PQclear(res);
66 732 : }
67 :
68 : /*
69 : * Reconnect to the server. If dbname is not NULL, use that database,
70 : * else the one associated with the archive handle.
71 : */
72 : void
73 102 : ReconnectToServer(ArchiveHandle *AH, const char *dbname)
74 : {
75 102 : PGconn *oldConn = AH->connection;
76 102 : RestoreOptions *ropt = AH->public.ropt;
77 :
78 : /*
79 : * Save the dbname, if given, in override_dbname so that it will also
80 : * affect any later reconnection attempt.
81 : */
82 102 : if (dbname)
83 102 : ropt->cparams.override_dbname = pg_strdup(dbname);
84 :
85 : /*
86 : * Note: we want to establish the new connection, and in particular update
87 : * ArchiveHandle's connCancel, before closing old connection. Otherwise
88 : * an ill-timed SIGINT could try to access a dead connection.
89 : */
90 102 : AH->connection = NULL; /* dodge error check in ConnectDatabaseAhx */
91 :
92 102 : ConnectDatabaseAhx((Archive *) AH, &ropt->cparams, true);
93 :
94 102 : PQfinish(oldConn);
95 102 : }
96 :
97 : /*
98 : * Make, or remake, a database connection with the given parameters.
99 : *
100 : * The resulting connection handle is stored in AHX->connection.
101 : *
102 : * An interactive password prompt is automatically issued if required.
103 : * We store the results of that in AHX->savedPassword.
104 : * Note: it's not really all that sensible to use a single-entry password
105 : * cache if the username keeps changing. In current usage, however, the
106 : * username never does change, so one savedPassword is sufficient.
107 : */
108 : void
109 736 : ConnectDatabaseAhx(Archive *AHX,
110 : const ConnParams *cparams,
111 : bool isReconnect)
112 : {
113 736 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
114 : trivalue prompt_password;
115 : char *password;
116 :
117 736 : if (AH->connection)
118 0 : pg_fatal("already connected to a database");
119 :
120 : /* Never prompt for a password during a reconnection */
121 736 : prompt_password = isReconnect ? TRI_NO : cparams->promptPassword;
122 :
123 736 : password = AH->savedPassword;
124 :
125 736 : if (prompt_password == TRI_YES && password == NULL)
126 0 : password = simple_prompt("Password: ", false);
127 :
128 1468 : AH->connection = ConnectDatabase(cparams->dbname, NULL, cparams->pghost,
129 736 : cparams->pgport, cparams->username,
130 : prompt_password, true,
131 : progname, NULL, NULL, password, cparams->override_dbname);
132 :
133 : /* Start strict; later phases may override this. */
134 732 : PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
135 : ALWAYS_SECURE_SEARCH_PATH_SQL));
136 :
137 732 : if (password && password != AH->savedPassword)
138 0 : free(password);
139 :
140 : /*
141 : * We want to remember connection's actual password, whether or not we got
142 : * it by prompting. So we don't just store the password variable.
143 : */
144 732 : if (PQconnectionUsedPassword(AH->connection))
145 : {
146 0 : free(AH->savedPassword);
147 0 : AH->savedPassword = pg_strdup(PQpass(AH->connection));
148 : }
149 :
150 : /* check for version mismatch */
151 732 : _check_database_version(AH);
152 :
153 732 : PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
154 :
155 : /* arrange for SIGINT to issue a query cancel on this connection */
156 732 : set_archive_cancel_info(AH, AH->connection);
157 732 : }
158 :
159 : /*
160 : * Close the connection to the database and also cancel off the query if we
161 : * have one running.
162 : */
163 : void
164 630 : DisconnectDatabase(Archive *AHX)
165 : {
166 630 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
167 : char errbuf[1];
168 :
169 630 : if (!AH->connection)
170 4 : return;
171 :
172 626 : if (AH->connCancel)
173 : {
174 : /*
175 : * If we have an active query, send a cancel before closing, ignoring
176 : * any errors. This is of no use for a normal exit, but might be
177 : * helpful during pg_fatal().
178 : */
179 626 : if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
180 0 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
181 :
182 : /*
183 : * Prevent signal handler from sending a cancel after this.
184 : */
185 626 : set_archive_cancel_info(AH, NULL);
186 : }
187 :
188 626 : PQfinish(AH->connection);
189 626 : AH->connection = NULL;
190 : }
191 :
192 : PGconn *
193 13078 : GetConnection(Archive *AHX)
194 : {
195 13078 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
196 :
197 13078 : return AH->connection;
198 : }
199 :
200 : static void
201 4 : notice_processor(void *arg, const char *message)
202 : {
203 4 : pg_log_info("%s", message);
204 4 : }
205 :
206 : /* Like pg_fatal(), but with a complaint about a particular query. */
207 : static void
208 4 : die_on_query_failure(ArchiveHandle *AH, const char *query)
209 : {
210 4 : pg_log_error("query failed: %s",
211 : PQerrorMessage(AH->connection));
212 4 : pg_log_error_detail("Query was: %s", query);
213 4 : exit(1);
214 : }
215 :
216 : void
217 8466 : ExecuteSqlStatement(Archive *AHX, const char *query)
218 : {
219 8466 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
220 : PGresult *res;
221 :
222 8466 : res = PQexec(AH->connection, query);
223 8466 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
224 2 : die_on_query_failure(AH, query);
225 8464 : PQclear(res);
226 8464 : }
227 :
228 : PGresult *
229 78202 : ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
230 : {
231 78202 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
232 : PGresult *res;
233 :
234 78202 : res = PQexec(AH->connection, query);
235 78202 : if (PQresultStatus(res) != status)
236 2 : die_on_query_failure(AH, query);
237 78200 : return res;
238 : }
239 :
240 : /*
241 : * Execute an SQL query and verify that we got exactly one row back.
242 : */
243 : PGresult *
244 32046 : ExecuteSqlQueryForSingleRow(Archive *fout, const char *query)
245 : {
246 : PGresult *res;
247 : int ntups;
248 :
249 32046 : res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
250 :
251 : /* Expecting a single result only */
252 32046 : ntups = PQntuples(res);
253 32046 : if (ntups != 1)
254 0 : pg_fatal(ngettext("query returned %d row instead of one: %s",
255 : "query returned %d rows instead of one: %s",
256 : ntups),
257 : ntups, query);
258 :
259 32046 : return res;
260 : }
261 :
262 : /*
263 : * Convenience function to send a query.
264 : * Monitors result to detect COPY statements
265 : */
266 : static void
267 32180 : ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
268 : {
269 32180 : PGconn *conn = AH->connection;
270 : PGresult *res;
271 :
272 : #ifdef NOT_USED
273 : fprintf(stderr, "Executing: '%s'\n\n", qry);
274 : #endif
275 32180 : res = PQexec(conn, qry);
276 :
277 32180 : switch (PQresultStatus(res))
278 : {
279 30914 : case PGRES_COMMAND_OK:
280 : case PGRES_TUPLES_OK:
281 : case PGRES_EMPTY_QUERY:
282 : /* A-OK */
283 30914 : break;
284 1266 : case PGRES_COPY_IN:
285 : /* Assume this is an expected result */
286 1266 : AH->pgCopyIn = true;
287 1266 : break;
288 0 : default:
289 : /* trouble */
290 0 : warn_or_exit_horribly(AH, "%s: %sCommand was: %s",
291 : desc, PQerrorMessage(conn), qry);
292 0 : break;
293 : }
294 :
295 32180 : PQclear(res);
296 32180 : }
297 :
298 :
299 : /*
300 : * Process non-COPY table data (that is, INSERT commands).
301 : *
302 : * The commands have been run together as one long string for compressibility,
303 : * and we are receiving them in bufferloads with arbitrary boundaries, so we
304 : * have to locate command boundaries and save partial commands across calls.
305 : * All state must be kept in AH->sqlparse, not in local variables of this
306 : * routine. We assume that AH->sqlparse was filled with zeroes when created.
307 : *
308 : * We have to lex the data to the extent of identifying literals and quoted
309 : * identifiers, so that we can recognize statement-terminating semicolons.
310 : * We assume that INSERT data will not contain SQL comments, E'' literals,
311 : * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
312 : *
313 : * Note: when restoring from a pre-9.0 dump file, this code is also used to
314 : * process BLOB COMMENTS data, which has the same problem of containing
315 : * multiple SQL commands that might be split across bufferloads. Fortunately,
316 : * that data won't contain anything complicated to lex either.
317 : */
318 : static void
319 74 : ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
320 : {
321 74 : const char *qry = buf;
322 74 : const char *eos = buf + bufLen;
323 :
324 : /* initialize command buffer if first time through */
325 74 : if (AH->sqlparse.curCmd == NULL)
326 6 : AH->sqlparse.curCmd = createPQExpBuffer();
327 :
328 259460 : for (; qry < eos; qry++)
329 : {
330 259386 : char ch = *qry;
331 :
332 : /* For neatness, we skip any newlines between commands */
333 259386 : if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
334 253358 : appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
335 :
336 259386 : switch (AH->sqlparse.state)
337 : {
338 251386 : case SQL_SCAN: /* Default state == 0, set in _allocAH */
339 251386 : if (ch == ';')
340 : {
341 : /*
342 : * We've found the end of a statement. Send it and reset
343 : * the buffer.
344 : */
345 6000 : ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
346 : "could not execute query");
347 6000 : resetPQExpBuffer(AH->sqlparse.curCmd);
348 : }
349 245386 : else if (ch == '\'')
350 : {
351 4000 : AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
352 4000 : AH->sqlparse.backSlash = false;
353 : }
354 241386 : else if (ch == '"')
355 : {
356 0 : AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
357 : }
358 251386 : break;
359 :
360 8000 : case SQL_IN_SINGLE_QUOTE:
361 : /* We needn't handle '' specially */
362 8000 : if (ch == '\'' && !AH->sqlparse.backSlash)
363 4000 : AH->sqlparse.state = SQL_SCAN;
364 4000 : else if (ch == '\\' && !AH->public.std_strings)
365 0 : AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
366 : else
367 4000 : AH->sqlparse.backSlash = false;
368 8000 : break;
369 :
370 0 : case SQL_IN_DOUBLE_QUOTE:
371 : /* We needn't handle "" specially */
372 0 : if (ch == '"')
373 0 : AH->sqlparse.state = SQL_SCAN;
374 0 : break;
375 : }
376 259386 : }
377 74 : }
378 :
379 :
380 : /*
381 : * Implement ahwrite() for direct-to-DB restore
382 : */
383 : int
384 30358 : ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
385 : {
386 30358 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
387 :
388 30358 : if (AH->outputKind == OUTPUT_COPYDATA)
389 : {
390 : /*
391 : * COPY data.
392 : *
393 : * We drop the data on the floor if libpq has failed to enter COPY
394 : * mode; this allows us to behave reasonably when trying to continue
395 : * after an error in a COPY command.
396 : */
397 14200 : if (AH->pgCopyIn &&
398 7100 : PQputCopyData(AH->connection, buf, bufLen) <= 0)
399 0 : pg_fatal("error returned by PQputCopyData: %s",
400 : PQerrorMessage(AH->connection));
401 : }
402 23258 : else if (AH->outputKind == OUTPUT_OTHERDATA)
403 : {
404 : /*
405 : * Table data expressed as INSERT commands; or, in old dump files,
406 : * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
407 : */
408 74 : ExecuteSimpleCommands(AH, buf, bufLen);
409 : }
410 : else
411 : {
412 : /*
413 : * General SQL commands; we assume that commands will not be split
414 : * across calls.
415 : *
416 : * In most cases the data passed to us will be a null-terminated
417 : * string, but if it's not, we have to add a trailing null.
418 : */
419 23184 : if (buf[bufLen] == '\0')
420 23184 : ExecuteSqlCommand(AH, buf, "could not execute query");
421 : else
422 : {
423 0 : char *str = (char *) pg_malloc(bufLen + 1);
424 :
425 0 : memcpy(str, buf, bufLen);
426 0 : str[bufLen] = '\0';
427 0 : ExecuteSqlCommand(AH, str, "could not execute query");
428 0 : free(str);
429 : }
430 : }
431 :
432 30358 : return bufLen;
433 : }
434 :
435 : /*
436 : * Terminate a COPY operation during direct-to-DB restore
437 : */
438 : void
439 1266 : EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
440 : {
441 1266 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
442 :
443 1266 : if (AH->pgCopyIn)
444 : {
445 : PGresult *res;
446 :
447 1266 : if (PQputCopyEnd(AH->connection, NULL) <= 0)
448 0 : pg_fatal("error returned by PQputCopyEnd: %s",
449 : PQerrorMessage(AH->connection));
450 :
451 : /* Check command status and return to normal libpq state */
452 1266 : res = PQgetResult(AH->connection);
453 1266 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
454 0 : warn_or_exit_horribly(AH, "COPY failed for table \"%s\": %s",
455 0 : tocEntryTag, PQerrorMessage(AH->connection));
456 1266 : PQclear(res);
457 :
458 : /* Do this to ensure we've pumped libpq back to idle state */
459 1266 : if (PQgetResult(AH->connection) != NULL)
460 0 : pg_log_warning("unexpected extra results during COPY of table \"%s\"",
461 : tocEntryTag);
462 :
463 1266 : AH->pgCopyIn = false;
464 : }
465 1266 : }
466 :
467 : void
468 1498 : StartTransaction(Archive *AHX)
469 : {
470 1498 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
471 :
472 1498 : ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
473 1498 : }
474 :
475 : void
476 1498 : CommitTransaction(Archive *AHX)
477 : {
478 1498 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
479 :
480 1498 : ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
481 1498 : }
482 :
483 : /*
484 : * Issue per-blob commands for the large object(s) listed in the TocEntry
485 : *
486 : * The TocEntry's defn string is assumed to consist of large object OIDs,
487 : * one per line. Wrap these in the given SQL command fragments and issue
488 : * the commands. (cmdEnd need not include a semicolon.)
489 : */
490 : void
491 324 : IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
492 : const char *cmdBegin, const char *cmdEnd)
493 : {
494 : /* Make a writable copy of the command string */
495 324 : char *buf = pg_strdup(te->defn);
496 324 : RestoreOptions *ropt = AH->public.ropt;
497 : char *st;
498 : char *en;
499 :
500 324 : st = buf;
501 712 : while ((en = strchr(st, '\n')) != NULL)
502 : {
503 388 : *en++ = '\0';
504 388 : ahprintf(AH, "%s%s%s;\n", cmdBegin, st, cmdEnd);
505 :
506 : /* In --transaction-size mode, count each command as an action */
507 388 : if (ropt && ropt->txn_size > 0)
508 : {
509 12 : if (++AH->txnCount >= ropt->txn_size)
510 : {
511 0 : if (AH->connection)
512 : {
513 0 : CommitTransaction(&AH->public);
514 0 : StartTransaction(&AH->public);
515 : }
516 : else
517 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
518 0 : AH->txnCount = 0;
519 : }
520 : }
521 :
522 388 : st = en;
523 : }
524 324 : ahprintf(AH, "\n");
525 324 : pg_free(buf);
526 324 : }
527 :
528 : /*
529 : * Process a "LARGE OBJECTS" ACL TocEntry.
530 : *
531 : * To save space in the dump file, the TocEntry contains only one copy
532 : * of the required GRANT/REVOKE commands, written to apply to the first
533 : * blob in the group (although we do not depend on that detail here).
534 : * We must expand the text to generate commands for all the blobs listed
535 : * in the associated BLOB METADATA entry.
536 : */
537 : void
538 0 : IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te)
539 : {
540 0 : TocEntry *blobte = getTocEntryByDumpId(AH, te->dependencies[0]);
541 : char *buf;
542 : char *st;
543 : char *st2;
544 : char *en;
545 : bool inquotes;
546 :
547 0 : if (!blobte)
548 0 : pg_fatal("could not find entry for ID %d", te->dependencies[0]);
549 : Assert(strcmp(blobte->desc, "BLOB METADATA") == 0);
550 :
551 : /* Make a writable copy of the ACL commands string */
552 0 : buf = pg_strdup(te->defn);
553 :
554 : /*
555 : * We have to parse out the commands sufficiently to locate the blob OIDs
556 : * and find the command-ending semicolons. The commands should not
557 : * contain anything hard to parse except for double-quoted role names,
558 : * which are easy to ignore. Once we've split apart the first and second
559 : * halves of a command, apply IssueCommandPerBlob. (This means the
560 : * updates on the blobs are interleaved if there's multiple commands, but
561 : * that should cause no trouble.)
562 : */
563 0 : inquotes = false;
564 0 : st = en = buf;
565 0 : st2 = NULL;
566 0 : while (*en)
567 : {
568 : /* Ignore double-quoted material */
569 0 : if (*en == '"')
570 0 : inquotes = !inquotes;
571 0 : if (inquotes)
572 : {
573 0 : en++;
574 0 : continue;
575 : }
576 : /* If we found "LARGE OBJECT", that's the end of the first half */
577 0 : if (strncmp(en, "LARGE OBJECT ", 13) == 0)
578 : {
579 : /* Terminate the first-half string */
580 0 : en += 13;
581 : Assert(isdigit((unsigned char) *en));
582 0 : *en++ = '\0';
583 : /* Skip the rest of the blob OID */
584 0 : while (isdigit((unsigned char) *en))
585 0 : en++;
586 : /* Second half starts here */
587 : Assert(st2 == NULL);
588 0 : st2 = en;
589 : }
590 : /* If we found semicolon, that's the end of the second half */
591 0 : else if (*en == ';')
592 : {
593 : /* Terminate the second-half string */
594 0 : *en++ = '\0';
595 : Assert(st2 != NULL);
596 : /* Issue this command for each blob */
597 0 : IssueCommandPerBlob(AH, blobte, st, st2);
598 : /* For neatness, skip whitespace before the next command */
599 0 : while (isspace((unsigned char) *en))
600 0 : en++;
601 : /* Reset for new command */
602 0 : st = en;
603 0 : st2 = NULL;
604 : }
605 : else
606 0 : en++;
607 : }
608 0 : pg_free(buf);
609 0 : }
610 :
611 : void
612 0 : DropLOIfExists(ArchiveHandle *AH, Oid oid)
613 : {
614 0 : ahprintf(AH,
615 : "SELECT pg_catalog.lo_unlink(oid) "
616 : "FROM pg_catalog.pg_largeobject_metadata "
617 : "WHERE oid = '%u';\n",
618 : oid);
619 0 : }
|