Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_archiver.c
4 : *
5 : * Private implementation of the archiver routines.
6 : *
7 : * See the headers to pg_restore for more details.
8 : *
9 : * Copyright (c) 2000, Philip Warner
10 : * Rights are granted to use this software in any way so long
11 : * as this notice is not removed.
12 : *
13 : * The author is not responsible for loss or damages that may
14 : * result from its use.
15 : *
16 : *
17 : * IDENTIFICATION
18 : * src/bin/pg_dump/pg_backup_archiver.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres_fe.h"
23 :
24 : #include <ctype.h>
25 : #include <fcntl.h>
26 : #include <unistd.h>
27 : #include <sys/stat.h>
28 : #include <sys/wait.h>
29 : #ifdef WIN32
30 : #include <io.h>
31 : #endif
32 :
33 : #include "catalog/pg_class_d.h"
34 : #include "catalog/pg_largeobject_metadata_d.h"
35 : #include "catalog/pg_shdepend_d.h"
36 : #include "common/string.h"
37 : #include "compress_io.h"
38 : #include "dumputils.h"
39 : #include "fe_utils/string_utils.h"
40 : #include "lib/binaryheap.h"
41 : #include "lib/stringinfo.h"
42 : #include "libpq/libpq-fs.h"
43 : #include "parallel.h"
44 : #include "pg_backup_archiver.h"
45 : #include "pg_backup_db.h"
46 : #include "pg_backup_utils.h"
47 :
48 : #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
49 : #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
50 :
51 : #define TOC_PREFIX_NONE ""
52 : #define TOC_PREFIX_DATA "Data for "
53 : #define TOC_PREFIX_STATS "Statistics for "
54 :
55 : static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
56 : const pg_compress_specification compression_spec,
57 : bool dosync, ArchiveMode mode,
58 : SetupWorkerPtrType setupWorkerPtr,
59 : DataDirSyncMethod sync_method);
60 : static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
61 : static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
62 : static void _doSetFixedOutputState(ArchiveHandle *AH);
63 : static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
64 : static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
65 : static void _becomeUser(ArchiveHandle *AH, const char *user);
66 : static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
67 : static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
68 : static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
69 : static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
70 : static void _printTableAccessMethodNoStorage(ArchiveHandle *AH,
71 : TocEntry *te);
72 : static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
73 : static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
74 : static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
75 : static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
76 : static RestorePass _tocEntryRestorePass(TocEntry *te);
77 : static bool _tocEntryIsACL(TocEntry *te);
78 : static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
79 : static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
80 : static bool is_load_via_partition_root(TocEntry *te);
81 : static void buildTocEntryArrays(ArchiveHandle *AH);
82 : static void _moveBefore(TocEntry *pos, TocEntry *te);
83 : static int _discoverArchiveFormat(ArchiveHandle *AH);
84 :
85 : static int RestoringToDB(ArchiveHandle *AH);
86 : static void dump_lo_buf(ArchiveHandle *AH);
87 : static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
88 : static void SetOutput(ArchiveHandle *AH, const char *filename,
89 : const pg_compress_specification compression_spec);
90 : static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
91 : static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
92 :
93 : static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
94 : static void restore_toc_entries_prefork(ArchiveHandle *AH,
95 : TocEntry *pending_list);
96 : static void restore_toc_entries_parallel(ArchiveHandle *AH,
97 : ParallelState *pstate,
98 : TocEntry *pending_list);
99 : static void restore_toc_entries_postfork(ArchiveHandle *AH,
100 : TocEntry *pending_list);
101 : static void pending_list_header_init(TocEntry *l);
102 : static void pending_list_append(TocEntry *l, TocEntry *te);
103 : static void pending_list_remove(TocEntry *te);
104 : static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
105 : static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
106 : static void move_to_ready_heap(TocEntry *pending_list,
107 : binaryheap *ready_heap,
108 : RestorePass pass);
109 : static TocEntry *pop_next_work_item(binaryheap *ready_heap,
110 : ParallelState *pstate);
111 : static void mark_dump_job_done(ArchiveHandle *AH,
112 : TocEntry *te,
113 : int status,
114 : void *callback_data);
115 : static void mark_restore_job_done(ArchiveHandle *AH,
116 : TocEntry *te,
117 : int status,
118 : void *callback_data);
119 : static void fix_dependencies(ArchiveHandle *AH);
120 : static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
121 : static void repoint_table_dependencies(ArchiveHandle *AH);
122 : static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
123 : static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
124 : binaryheap *ready_heap);
125 : static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
126 : static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
127 :
128 : static void StrictNamesCheck(RestoreOptions *ropt);
129 :
130 :
131 : /*
132 : * Allocate a new DumpOptions block containing all default values.
133 : */
134 : DumpOptions *
135 120 : NewDumpOptions(void)
136 : {
137 120 : DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
138 :
139 120 : InitDumpOptions(opts);
140 120 : return opts;
141 : }
142 :
143 : /*
144 : * Initialize a DumpOptions struct to all default values
145 : */
146 : void
147 582 : InitDumpOptions(DumpOptions *opts)
148 : {
149 582 : memset(opts, 0, sizeof(DumpOptions));
150 : /* set any fields that shouldn't default to zeroes */
151 582 : opts->include_everything = true;
152 582 : opts->cparams.promptPassword = TRI_DEFAULT;
153 582 : opts->dumpSections = DUMP_UNSECTIONED;
154 582 : opts->dumpSchema = true;
155 582 : opts->dumpData = true;
156 582 : opts->dumpStatistics = false;
157 582 : }
158 :
159 : /*
160 : * Create a freshly allocated DumpOptions with options equivalent to those
161 : * found in the given RestoreOptions.
162 : */
163 : DumpOptions *
164 120 : dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
165 : {
166 120 : DumpOptions *dopt = NewDumpOptions();
167 :
168 : /* this is the inverse of what's at the end of pg_dump.c's main() */
169 120 : dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
170 120 : dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
171 120 : dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
172 120 : dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
173 120 : dopt->cparams.promptPassword = ropt->cparams.promptPassword;
174 120 : dopt->outputClean = ropt->dropSchema;
175 120 : dopt->dumpData = ropt->dumpData;
176 120 : dopt->dumpSchema = ropt->dumpSchema;
177 120 : dopt->dumpSections = ropt->dumpSections;
178 120 : dopt->dumpStatistics = ropt->dumpStatistics;
179 120 : dopt->if_exists = ropt->if_exists;
180 120 : dopt->column_inserts = ropt->column_inserts;
181 120 : dopt->aclsSkip = ropt->aclsSkip;
182 120 : dopt->outputSuperuser = ropt->superuser;
183 120 : dopt->outputCreateDB = ropt->createDB;
184 120 : dopt->outputNoOwner = ropt->noOwner;
185 120 : dopt->outputNoTableAm = ropt->noTableAm;
186 120 : dopt->outputNoTablespaces = ropt->noTablespace;
187 120 : dopt->disable_triggers = ropt->disable_triggers;
188 120 : dopt->use_setsessauth = ropt->use_setsessauth;
189 120 : dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
190 120 : dopt->dump_inserts = ropt->dump_inserts;
191 120 : dopt->no_comments = ropt->no_comments;
192 120 : dopt->no_policies = ropt->no_policies;
193 120 : dopt->no_publications = ropt->no_publications;
194 120 : dopt->no_security_labels = ropt->no_security_labels;
195 120 : dopt->no_subscriptions = ropt->no_subscriptions;
196 120 : dopt->lockWaitTimeout = ropt->lockWaitTimeout;
197 120 : dopt->include_everything = ropt->include_everything;
198 120 : dopt->enable_row_security = ropt->enable_row_security;
199 120 : dopt->sequence_data = ropt->sequence_data;
200 120 : dopt->restrict_key = ropt->restrict_key ? pg_strdup(ropt->restrict_key) : NULL;
201 :
202 120 : return dopt;
203 : }
204 :
205 :
206 : /*
207 : * Wrapper functions.
208 : *
209 : * The objective is to make writing new formats and dumpers as simple
210 : * as possible, if necessary at the expense of extra function calls etc.
211 : *
212 : */
213 :
214 : /*
215 : * The dump worker setup needs lots of knowledge of the internals of pg_dump,
216 : * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
217 : * setup doesn't need to know anything much, so it's defined here.
218 : */
219 : static void
220 20 : setupRestoreWorker(Archive *AHX)
221 : {
222 20 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
223 :
224 20 : AH->ReopenPtr(AH);
225 20 : }
226 :
227 :
228 : /* Create a new archive */
229 : /* Public */
230 : Archive *
231 412 : CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
232 : const pg_compress_specification compression_spec,
233 : bool dosync, ArchiveMode mode,
234 : SetupWorkerPtrType setupDumpWorker,
235 : DataDirSyncMethod sync_method)
236 :
237 : {
238 412 : ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
239 : dosync, mode, setupDumpWorker, sync_method);
240 :
241 410 : return (Archive *) AH;
242 : }
243 :
244 : /* Open an existing archive */
245 : /* Public */
246 : Archive *
247 116 : OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
248 : {
249 : ArchiveHandle *AH;
250 116 : pg_compress_specification compression_spec = {0};
251 :
252 116 : compression_spec.algorithm = PG_COMPRESSION_NONE;
253 116 : AH = _allocAH(FileSpec, fmt, compression_spec, true,
254 : archModeRead, setupRestoreWorker,
255 : DATA_DIR_SYNC_METHOD_FSYNC);
256 :
257 116 : return (Archive *) AH;
258 : }
259 :
260 : /* Public */
261 : void
262 486 : CloseArchive(Archive *AHX)
263 : {
264 486 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
265 :
266 486 : AH->ClosePtr(AH);
267 :
268 : /* Close the output */
269 486 : errno = 0;
270 486 : if (!EndCompressFileHandle(AH->OF))
271 0 : pg_fatal("could not close output file: %m");
272 486 : }
273 :
274 : /* Public */
275 : void
276 906 : SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
277 : {
278 : /* Caller can omit dump options, in which case we synthesize them */
279 906 : if (dopt == NULL && ropt != NULL)
280 120 : dopt = dumpOptionsFromRestoreOptions(ropt);
281 :
282 : /* Save options for later access */
283 906 : AH->dopt = dopt;
284 906 : AH->ropt = ropt;
285 906 : }
286 :
287 : /* Public */
288 : void
289 480 : ProcessArchiveRestoreOptions(Archive *AHX)
290 : {
291 480 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
292 480 : RestoreOptions *ropt = AH->public.ropt;
293 : TocEntry *te;
294 : teSection curSection;
295 :
296 : /* Decide which TOC entries will be dumped/restored, and mark them */
297 480 : curSection = SECTION_PRE_DATA;
298 95864 : for (te = AH->toc->next; te != AH->toc; te = te->next)
299 : {
300 : /*
301 : * When writing an archive, we also take this opportunity to check
302 : * that we have generated the entries in a sane order that respects
303 : * the section divisions. When reading, don't complain, since buggy
304 : * old versions of pg_dump might generate out-of-order archives.
305 : */
306 95384 : if (AH->mode != archModeRead)
307 : {
308 82290 : switch (te->section)
309 : {
310 16728 : case SECTION_NONE:
311 : /* ok to be anywhere */
312 16728 : break;
313 38394 : case SECTION_PRE_DATA:
314 38394 : if (curSection != SECTION_PRE_DATA)
315 0 : pg_log_warning("archive items not in correct section order");
316 38394 : break;
317 13774 : case SECTION_DATA:
318 13774 : if (curSection == SECTION_POST_DATA)
319 0 : pg_log_warning("archive items not in correct section order");
320 13774 : break;
321 13394 : case SECTION_POST_DATA:
322 : /* ok no matter which section we were in */
323 13394 : break;
324 0 : default:
325 0 : pg_fatal("unexpected section code %d",
326 : (int) te->section);
327 : break;
328 : }
329 : }
330 :
331 95384 : if (te->section != SECTION_NONE)
332 77320 : curSection = te->section;
333 :
334 95384 : te->reqs = _tocEntryRequired(te, curSection, AH);
335 : }
336 :
337 : /* Enforce strict names checking */
338 480 : if (ropt->strict_names)
339 0 : StrictNamesCheck(ropt);
340 480 : }
341 :
342 : /* Public */
343 : void
344 372 : RestoreArchive(Archive *AHX)
345 : {
346 372 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
347 372 : RestoreOptions *ropt = AH->public.ropt;
348 : bool parallel_mode;
349 : TocEntry *te;
350 : CompressFileHandle *sav;
351 :
352 372 : AH->stage = STAGE_INITIALIZING;
353 :
354 : /*
355 : * If we're going to do parallel restore, there are some restrictions.
356 : */
357 372 : parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
358 372 : if (parallel_mode)
359 : {
360 : /* We haven't got round to making this work for all archive formats */
361 8 : if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
362 0 : pg_fatal("parallel restore is not supported with this archive file format");
363 :
364 : /* Doesn't work if the archive represents dependencies as OIDs */
365 8 : if (AH->version < K_VERS_1_8)
366 0 : pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
367 :
368 : /*
369 : * It's also not gonna work if we can't reopen the input file, so
370 : * let's try that immediately.
371 : */
372 8 : AH->ReopenPtr(AH);
373 : }
374 :
375 : /*
376 : * Make sure we won't need (de)compression we haven't got
377 : */
378 372 : if (AH->PrintTocDataPtr != NULL)
379 : {
380 56702 : for (te = AH->toc->next; te != AH->toc; te = te->next)
381 : {
382 56590 : if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
383 : {
384 260 : char *errmsg = supports_compression(AH->compression_spec);
385 :
386 260 : if (errmsg)
387 0 : pg_fatal("cannot restore from compressed archive (%s)",
388 : errmsg);
389 : else
390 260 : break;
391 : }
392 : }
393 : }
394 :
395 : /*
396 : * Prepare index arrays, so we can assume we have them throughout restore.
397 : * It's possible we already did this, though.
398 : */
399 372 : if (AH->tocsByDumpId == NULL)
400 368 : buildTocEntryArrays(AH);
401 :
402 : /*
403 : * If we're using a DB connection, then connect it.
404 : */
405 372 : if (ropt->useDB)
406 : {
407 64 : pg_log_info("connecting to database for restore");
408 64 : if (AH->version < K_VERS_1_3)
409 0 : pg_fatal("direct database connections are not supported in pre-1.3 archives");
410 :
411 : /*
412 : * We don't want to guess at whether the dump will successfully
413 : * restore; allow the attempt regardless of the version of the restore
414 : * target.
415 : */
416 64 : AHX->minRemoteVersion = 0;
417 64 : AHX->maxRemoteVersion = 9999999;
418 :
419 64 : ConnectDatabaseAhx(AHX, &ropt->cparams, false);
420 :
421 : /*
422 : * If we're talking to the DB directly, don't send comments since they
423 : * obscure SQL when displaying errors
424 : */
425 64 : AH->noTocComments = 1;
426 : }
427 :
428 : /*
429 : * Work out if we have an implied schema-less restore. This can happen if
430 : * the dump excluded the schema or the user has used a toc list to exclude
431 : * all of the schema data. All we do is look for schema entries - if none
432 : * are found then we unset the dumpSchema flag.
433 : *
434 : * We could scan for wanted TABLE entries, but that is not the same as
435 : * data-only. At this stage, it seems unnecessary (6-Mar-2001).
436 : */
437 372 : if (ropt->dumpSchema)
438 : {
439 354 : bool no_schema_found = true;
440 :
441 2912 : for (te = AH->toc->next; te != AH->toc; te = te->next)
442 : {
443 2868 : if ((te->reqs & REQ_SCHEMA) != 0)
444 : {
445 310 : no_schema_found = false;
446 310 : break;
447 : }
448 : }
449 354 : if (no_schema_found)
450 : {
451 44 : ropt->dumpSchema = false;
452 44 : pg_log_info("implied no-schema restore");
453 : }
454 : }
455 :
456 : /*
457 : * Setup the output file if necessary.
458 : */
459 372 : sav = SaveOutput(AH);
460 372 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
461 298 : SetOutput(AH, ropt->filename, ropt->compression_spec);
462 :
463 372 : ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
464 :
465 : /*
466 : * If generating plain-text output, enter restricted mode to block any
467 : * unexpected psql meta-commands. A malicious source might try to inject
468 : * a variety of things via bogus responses to queries. While we cannot
469 : * prevent such sources from affecting the destination at restore time, we
470 : * can block psql meta-commands so that the client machine that runs psql
471 : * with the dump output remains unaffected.
472 : */
473 372 : if (ropt->restrict_key)
474 304 : ahprintf(AH, "\\restrict %s\n\n", ropt->restrict_key);
475 :
476 372 : if (AH->archiveRemoteVersion)
477 372 : ahprintf(AH, "-- Dumped from database version %s\n",
478 : AH->archiveRemoteVersion);
479 372 : if (AH->archiveDumpVersion)
480 372 : ahprintf(AH, "-- Dumped by pg_dump version %s\n",
481 : AH->archiveDumpVersion);
482 :
483 372 : ahprintf(AH, "\n");
484 :
485 372 : if (AH->public.verbose)
486 78 : dumpTimestamp(AH, "Started on", AH->createDate);
487 :
488 372 : if (ropt->single_txn)
489 : {
490 0 : if (AH->connection)
491 0 : StartTransaction(AHX);
492 : else
493 0 : ahprintf(AH, "BEGIN;\n\n");
494 : }
495 :
496 : /*
497 : * Establish important parameter values right away.
498 : */
499 372 : _doSetFixedOutputState(AH);
500 :
501 372 : AH->stage = STAGE_PROCESSING;
502 :
503 : /*
504 : * Drop the items at the start, in reverse order
505 : */
506 372 : if (ropt->dropSchema)
507 : {
508 2892 : for (te = AH->toc->prev; te != AH->toc; te = te->prev)
509 : {
510 2846 : AH->currentTE = te;
511 :
512 : /*
513 : * In createDB mode, issue a DROP *only* for the database as a
514 : * whole. Issuing drops against anything else would be wrong,
515 : * because at this point we're connected to the wrong database.
516 : * (The DATABASE PROPERTIES entry, if any, should be treated like
517 : * the DATABASE entry.)
518 : */
519 2846 : if (ropt->createDB)
520 : {
521 1260 : if (strcmp(te->desc, "DATABASE") != 0 &&
522 1224 : strcmp(te->desc, "DATABASE PROPERTIES") != 0)
523 1192 : continue;
524 : }
525 :
526 : /* Otherwise, drop anything that's selected and has a dropStmt */
527 1654 : if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
528 : {
529 676 : bool not_allowed_in_txn = false;
530 :
531 676 : pg_log_info("dropping %s %s", te->desc, te->tag);
532 :
533 : /*
534 : * In --transaction-size mode, we have to temporarily exit our
535 : * transaction block to drop objects that can't be dropped
536 : * within a transaction.
537 : */
538 676 : if (ropt->txn_size > 0)
539 : {
540 64 : if (strcmp(te->desc, "DATABASE") == 0 ||
541 32 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
542 : {
543 64 : not_allowed_in_txn = true;
544 64 : if (AH->connection)
545 64 : CommitTransaction(AHX);
546 : else
547 0 : ahprintf(AH, "COMMIT;\n");
548 : }
549 : }
550 :
551 : /* Select owner and schema as necessary */
552 676 : _becomeOwner(AH, te);
553 676 : _selectOutputSchema(AH, te->namespace);
554 :
555 : /*
556 : * Now emit the DROP command, if the object has one. Note we
557 : * don't necessarily emit it verbatim; at this point we add an
558 : * appropriate IF EXISTS clause, if the user requested it.
559 : */
560 676 : if (strcmp(te->desc, "BLOB METADATA") == 0)
561 : {
562 : /* We must generate the per-blob commands */
563 8 : if (ropt->if_exists)
564 4 : IssueCommandPerBlob(AH, te,
565 : "SELECT pg_catalog.lo_unlink(oid) "
566 : "FROM pg_catalog.pg_largeobject_metadata "
567 : "WHERE oid = '", "'");
568 : else
569 4 : IssueCommandPerBlob(AH, te,
570 : "SELECT pg_catalog.lo_unlink('",
571 : "')");
572 : }
573 668 : else if (*te->dropStmt != '\0')
574 : {
575 620 : if (!ropt->if_exists ||
576 284 : strncmp(te->dropStmt, "--", 2) == 0)
577 : {
578 : /*
579 : * Without --if-exists, or if it's just a comment (as
580 : * happens for the public schema), print the dropStmt
581 : * as-is.
582 : */
583 338 : ahprintf(AH, "%s", te->dropStmt);
584 : }
585 : else
586 : {
587 : /*
588 : * Inject an appropriate spelling of "if exists". For
589 : * old-style large objects, we have a routine that
590 : * knows how to do it, without depending on
591 : * te->dropStmt; use that. For other objects we need
592 : * to parse the command.
593 : */
594 282 : if (strcmp(te->desc, "BLOB") == 0)
595 : {
596 0 : DropLOIfExists(AH, te->catalogId.oid);
597 : }
598 : else
599 : {
600 282 : char *dropStmt = pg_strdup(te->dropStmt);
601 282 : char *dropStmtOrig = dropStmt;
602 282 : PQExpBuffer ftStmt = createPQExpBuffer();
603 :
604 : /*
605 : * Need to inject IF EXISTS clause after ALTER
606 : * TABLE part in ALTER TABLE .. DROP statement
607 : */
608 282 : if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
609 : {
610 38 : appendPQExpBufferStr(ftStmt,
611 : "ALTER TABLE IF EXISTS");
612 38 : dropStmt = dropStmt + 11;
613 : }
614 :
615 : /*
616 : * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
617 : * not support the IF EXISTS clause, and therefore
618 : * we simply emit the original command for DEFAULT
619 : * objects (modulo the adjustment made above).
620 : *
621 : * Likewise, don't mess with DATABASE PROPERTIES.
622 : *
623 : * If we used CREATE OR REPLACE VIEW as a means of
624 : * quasi-dropping an ON SELECT rule, that should
625 : * be emitted unchanged as well.
626 : *
627 : * For other object types, we need to extract the
628 : * first part of the DROP which includes the
629 : * object type. Most of the time this matches
630 : * te->desc, so search for that; however for the
631 : * different kinds of CONSTRAINTs, we know to
632 : * search for hardcoded "DROP CONSTRAINT" instead.
633 : */
634 282 : if (strcmp(te->desc, "DEFAULT") == 0 ||
635 276 : strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
636 276 : strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
637 6 : appendPQExpBufferStr(ftStmt, dropStmt);
638 : else
639 : {
640 : char buffer[40];
641 : char *mark;
642 :
643 276 : if (strcmp(te->desc, "CONSTRAINT") == 0 ||
644 248 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
645 248 : strcmp(te->desc, "FK CONSTRAINT") == 0)
646 32 : strcpy(buffer, "DROP CONSTRAINT");
647 : else
648 244 : snprintf(buffer, sizeof(buffer), "DROP %s",
649 : te->desc);
650 :
651 276 : mark = strstr(dropStmt, buffer);
652 :
653 276 : if (mark)
654 : {
655 276 : *mark = '\0';
656 276 : appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
657 : dropStmt, buffer,
658 276 : mark + strlen(buffer));
659 : }
660 : else
661 : {
662 : /* complain and emit unmodified command */
663 0 : pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
664 : dropStmtOrig);
665 0 : appendPQExpBufferStr(ftStmt, dropStmt);
666 : }
667 : }
668 :
669 282 : ahprintf(AH, "%s", ftStmt->data);
670 :
671 282 : destroyPQExpBuffer(ftStmt);
672 282 : pg_free(dropStmtOrig);
673 : }
674 : }
675 : }
676 :
677 : /*
678 : * In --transaction-size mode, re-establish the transaction
679 : * block if needed; otherwise, commit after every N drops.
680 : */
681 676 : if (ropt->txn_size > 0)
682 : {
683 64 : if (not_allowed_in_txn)
684 : {
685 64 : if (AH->connection)
686 64 : StartTransaction(AHX);
687 : else
688 0 : ahprintf(AH, "BEGIN;\n");
689 64 : AH->txnCount = 0;
690 : }
691 0 : else if (++AH->txnCount >= ropt->txn_size)
692 : {
693 0 : if (AH->connection)
694 : {
695 0 : CommitTransaction(AHX);
696 0 : StartTransaction(AHX);
697 : }
698 : else
699 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n");
700 0 : AH->txnCount = 0;
701 : }
702 : }
703 : }
704 : }
705 :
706 : /*
707 : * _selectOutputSchema may have set currSchema to reflect the effect
708 : * of a "SET search_path" command it emitted. However, by now we may
709 : * have dropped that schema; or it might not have existed in the first
710 : * place. In either case the effective value of search_path will not
711 : * be what we think. Forcibly reset currSchema so that we will
712 : * re-establish the search_path setting when needed (after creating
713 : * the schema).
714 : *
715 : * If we treated users as pg_dump'able objects then we'd need to reset
716 : * currUser here too.
717 : */
718 46 : free(AH->currSchema);
719 46 : AH->currSchema = NULL;
720 : }
721 :
722 372 : if (parallel_mode)
723 : {
724 : /*
725 : * In parallel mode, turn control over to the parallel-restore logic.
726 : */
727 : ParallelState *pstate;
728 : TocEntry pending_list;
729 :
730 : /* The archive format module may need some setup for this */
731 8 : if (AH->PrepParallelRestorePtr)
732 8 : AH->PrepParallelRestorePtr(AH);
733 :
734 8 : pending_list_header_init(&pending_list);
735 :
736 : /* This runs PRE_DATA items and then disconnects from the database */
737 8 : restore_toc_entries_prefork(AH, &pending_list);
738 : Assert(AH->connection == NULL);
739 :
740 : /* ParallelBackupStart() will actually fork the processes */
741 8 : pstate = ParallelBackupStart(AH);
742 8 : restore_toc_entries_parallel(AH, pstate, &pending_list);
743 8 : ParallelBackupEnd(AH, pstate);
744 :
745 : /* reconnect the leader and see if we missed something */
746 8 : restore_toc_entries_postfork(AH, &pending_list);
747 : Assert(AH->connection != NULL);
748 : }
749 : else
750 : {
751 : /*
752 : * In serial mode, process everything in three phases: normal items,
753 : * then ACLs, then post-ACL items. We might be able to skip one or
754 : * both extra phases in some cases, eg data-only restores.
755 : */
756 364 : bool haveACL = false;
757 364 : bool havePostACL = false;
758 :
759 83242 : for (te = AH->toc->next; te != AH->toc; te = te->next)
760 : {
761 82880 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
762 2998 : continue; /* ignore if not to be dumped at all */
763 :
764 79882 : switch (_tocEntryRestorePass(te))
765 : {
766 73326 : case RESTORE_PASS_MAIN:
767 73326 : (void) restore_toc_entry(AH, te, false);
768 73324 : break;
769 3706 : case RESTORE_PASS_ACL:
770 3706 : haveACL = true;
771 3706 : break;
772 2850 : case RESTORE_PASS_POST_ACL:
773 2850 : havePostACL = true;
774 2850 : break;
775 : }
776 : }
777 :
778 362 : if (haveACL)
779 : {
780 80056 : for (te = AH->toc->next; te != AH->toc; te = te->next)
781 : {
782 157722 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
783 77816 : _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
784 3706 : (void) restore_toc_entry(AH, te, false);
785 : }
786 : }
787 :
788 362 : if (havePostACL)
789 : {
790 51764 : for (te = AH->toc->next; te != AH->toc; te = te->next)
791 : {
792 102414 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
793 50750 : _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
794 2850 : (void) restore_toc_entry(AH, te, false);
795 : }
796 : }
797 : }
798 :
799 : /*
800 : * Close out any persistent transaction we may have. While these two
801 : * cases are started in different places, we can end both cases here.
802 : */
803 370 : if (ropt->single_txn || ropt->txn_size > 0)
804 : {
805 56 : if (AH->connection)
806 56 : CommitTransaction(AHX);
807 : else
808 0 : ahprintf(AH, "COMMIT;\n\n");
809 : }
810 :
811 370 : if (AH->public.verbose)
812 78 : dumpTimestamp(AH, "Completed on", time(NULL));
813 :
814 370 : ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
815 :
816 : /*
817 : * If generating plain-text output, exit restricted mode at the very end
818 : * of the script. This is not pro forma; in particular, pg_dumpall
819 : * requires this when transitioning from one database to another.
820 : */
821 370 : if (ropt->restrict_key)
822 302 : ahprintf(AH, "\\unrestrict %s\n\n", ropt->restrict_key);
823 :
824 : /*
825 : * Clean up & we're done.
826 : */
827 370 : AH->stage = STAGE_FINALIZING;
828 :
829 370 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
830 298 : RestoreOutput(AH, sav);
831 :
832 370 : if (ropt->useDB)
833 64 : DisconnectDatabase(&AH->public);
834 370 : }
835 :
836 : /*
837 : * Restore a single TOC item. Used in both parallel and non-parallel restore;
838 : * is_parallel is true if we are in a worker child process.
839 : *
840 : * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
841 : * the parallel parent has to make the corresponding status update.
842 : */
843 : static int
844 80074 : restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
845 : {
846 80074 : RestoreOptions *ropt = AH->public.ropt;
847 80074 : int status = WORKER_OK;
848 : int reqs;
849 : bool defnDumped;
850 :
851 80074 : AH->currentTE = te;
852 :
853 : /* Dump any relevant dump warnings to stderr */
854 80074 : if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
855 : {
856 0 : if (ropt->dumpSchema && te->defn != NULL && strlen(te->defn) != 0)
857 0 : pg_log_warning("warning from original dump file: %s", te->defn);
858 0 : else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
859 0 : pg_log_warning("warning from original dump file: %s", te->copyStmt);
860 : }
861 :
862 : /* Work out what, if anything, we want from this entry */
863 80074 : reqs = te->reqs;
864 :
865 80074 : defnDumped = false;
866 :
867 : /*
868 : * If it has a schema component that we want, then process that
869 : */
870 80074 : if ((reqs & REQ_SCHEMA) != 0)
871 : {
872 64074 : bool object_is_db = false;
873 :
874 : /*
875 : * In --transaction-size mode, must exit our transaction block to
876 : * create a database or set its properties.
877 : */
878 64074 : if (strcmp(te->desc, "DATABASE") == 0 ||
879 63954 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
880 : {
881 184 : object_is_db = true;
882 184 : if (ropt->txn_size > 0)
883 : {
884 112 : if (AH->connection)
885 112 : CommitTransaction(&AH->public);
886 : else
887 0 : ahprintf(AH, "COMMIT;\n\n");
888 : }
889 : }
890 :
891 : /* Show namespace in log message if available */
892 64074 : if (te->namespace)
893 61270 : pg_log_info("creating %s \"%s.%s\"",
894 : te->desc, te->namespace, te->tag);
895 : else
896 2804 : pg_log_info("creating %s \"%s\"",
897 : te->desc, te->tag);
898 :
899 64074 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
900 64074 : defnDumped = true;
901 :
902 64074 : if (strcmp(te->desc, "TABLE") == 0)
903 : {
904 10498 : if (AH->lastErrorTE == te)
905 : {
906 : /*
907 : * We failed to create the table. If
908 : * --no-data-for-failed-tables was given, mark the
909 : * corresponding TABLE DATA to be ignored.
910 : *
911 : * In the parallel case this must be done in the parent, so we
912 : * just set the return value.
913 : */
914 0 : if (ropt->noDataForFailedTables)
915 : {
916 0 : if (is_parallel)
917 0 : status = WORKER_INHIBIT_DATA;
918 : else
919 0 : inhibit_data_for_failed_table(AH, te);
920 : }
921 : }
922 : else
923 : {
924 : /*
925 : * We created the table successfully. Mark the corresponding
926 : * TABLE DATA for possible truncation.
927 : *
928 : * In the parallel case this must be done in the parent, so we
929 : * just set the return value.
930 : */
931 10498 : if (is_parallel)
932 0 : status = WORKER_CREATE_DONE;
933 : else
934 10498 : mark_create_done(AH, te);
935 : }
936 : }
937 :
938 : /*
939 : * If we created a DB, connect to it. Also, if we changed DB
940 : * properties, reconnect to ensure that relevant GUC settings are
941 : * applied to our session. (That also restarts the transaction block
942 : * in --transaction-size mode.)
943 : */
944 64074 : if (object_is_db)
945 : {
946 184 : pg_log_info("connecting to new database \"%s\"", te->tag);
947 184 : _reconnectToDB(AH, te->tag);
948 : }
949 : }
950 :
951 : /*
952 : * If it has a data component that we want, then process that
953 : */
954 80074 : if ((reqs & REQ_DATA) != 0)
955 : {
956 : /*
957 : * hadDumper will be set if there is genuine data component for this
958 : * node. Otherwise, we need to check the defn field for statements
959 : * that need to be executed in data-only restores.
960 : */
961 9548 : if (te->hadDumper)
962 : {
963 : /*
964 : * If we can output the data, then restore it.
965 : */
966 8462 : if (AH->PrintTocDataPtr != NULL)
967 : {
968 8462 : _printTocEntry(AH, te, TOC_PREFIX_DATA);
969 :
970 8462 : if (strcmp(te->desc, "BLOBS") == 0 ||
971 8314 : strcmp(te->desc, "BLOB COMMENTS") == 0)
972 : {
973 148 : pg_log_info("processing %s", te->desc);
974 :
975 148 : _selectOutputSchema(AH, "pg_catalog");
976 :
977 : /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
978 148 : if (strcmp(te->desc, "BLOB COMMENTS") == 0)
979 0 : AH->outputKind = OUTPUT_OTHERDATA;
980 :
981 148 : AH->PrintTocDataPtr(AH, te);
982 :
983 148 : AH->outputKind = OUTPUT_SQLCMDS;
984 : }
985 : else
986 : {
987 : bool use_truncate;
988 :
989 8314 : _disableTriggersIfNecessary(AH, te);
990 :
991 : /* Select owner and schema as necessary */
992 8314 : _becomeOwner(AH, te);
993 8314 : _selectOutputSchema(AH, te->namespace);
994 :
995 8314 : pg_log_info("processing data for table \"%s.%s\"",
996 : te->namespace, te->tag);
997 :
998 : /*
999 : * In parallel restore, if we created the table earlier in
1000 : * this run (so that we know it is empty) and we are not
1001 : * restoring a load-via-partition-root data item then we
1002 : * wrap the COPY in a transaction and precede it with a
1003 : * TRUNCATE. If wal_level is set to minimal this prevents
1004 : * WAL-logging the COPY. This obtains a speedup similar
1005 : * to that from using single_txn mode in non-parallel
1006 : * restores.
1007 : *
1008 : * We mustn't do this for load-via-partition-root cases
1009 : * because some data might get moved across partition
1010 : * boundaries, risking deadlock and/or loss of previously
1011 : * loaded data. (We assume that all partitions of a
1012 : * partitioned table will be treated the same way.)
1013 : */
1014 8346 : use_truncate = is_parallel && te->created &&
1015 32 : !is_load_via_partition_root(te);
1016 :
1017 8314 : if (use_truncate)
1018 : {
1019 : /*
1020 : * Parallel restore is always talking directly to a
1021 : * server, so no need to see if we should issue BEGIN.
1022 : */
1023 20 : StartTransaction(&AH->public);
1024 :
1025 : /*
1026 : * Issue TRUNCATE with ONLY so that child tables are
1027 : * not wiped.
1028 : */
1029 20 : ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1030 20 : fmtQualifiedId(te->namespace, te->tag));
1031 : }
1032 :
1033 : /*
1034 : * If we have a copy statement, use it.
1035 : */
1036 8314 : if (te->copyStmt && strlen(te->copyStmt) > 0)
1037 : {
1038 8152 : ahprintf(AH, "%s", te->copyStmt);
1039 8152 : AH->outputKind = OUTPUT_COPYDATA;
1040 : }
1041 : else
1042 162 : AH->outputKind = OUTPUT_OTHERDATA;
1043 :
1044 8314 : AH->PrintTocDataPtr(AH, te);
1045 :
1046 : /*
1047 : * Terminate COPY if needed.
1048 : */
1049 16462 : if (AH->outputKind == OUTPUT_COPYDATA &&
1050 8150 : RestoringToDB(AH))
1051 130 : EndDBCopyMode(&AH->public, te->tag);
1052 8312 : AH->outputKind = OUTPUT_SQLCMDS;
1053 :
1054 : /* close out the transaction started above */
1055 8312 : if (use_truncate)
1056 20 : CommitTransaction(&AH->public);
1057 :
1058 8312 : _enableTriggersIfNecessary(AH, te);
1059 : }
1060 : }
1061 : }
1062 1086 : else if (!defnDumped)
1063 : {
1064 : /* If we haven't already dumped the defn part, do so now */
1065 1086 : pg_log_info("executing %s %s", te->desc, te->tag);
1066 1086 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
1067 : }
1068 : }
1069 :
1070 : /*
1071 : * If it has a statistics component that we want, then process that
1072 : */
1073 80072 : if ((reqs & REQ_STATS) != 0)
1074 6422 : _printTocEntry(AH, te, TOC_PREFIX_STATS);
1075 :
1076 : /*
1077 : * If we emitted anything for this TOC entry, that counts as one action
1078 : * against the transaction-size limit. Commit if it's time to.
1079 : */
1080 80072 : if ((reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 && ropt->txn_size > 0)
1081 : {
1082 6832 : if (++AH->txnCount >= ropt->txn_size)
1083 : {
1084 20 : if (AH->connection)
1085 : {
1086 20 : CommitTransaction(&AH->public);
1087 20 : StartTransaction(&AH->public);
1088 : }
1089 : else
1090 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1091 20 : AH->txnCount = 0;
1092 : }
1093 : }
1094 :
1095 80072 : if (AH->public.n_errors > 0 && status == WORKER_OK)
1096 0 : status = WORKER_IGNORED_ERRORS;
1097 :
1098 80072 : return status;
1099 : }
1100 :
1101 : /*
1102 : * Allocate a new RestoreOptions block.
1103 : * This is mainly so we can initialize it, but also for future expansion,
1104 : */
1105 : RestoreOptions *
1106 560 : NewRestoreOptions(void)
1107 : {
1108 : RestoreOptions *opts;
1109 :
1110 560 : opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
1111 :
1112 : /* set any fields that shouldn't default to zeroes */
1113 560 : opts->format = archUnknown;
1114 560 : opts->cparams.promptPassword = TRI_DEFAULT;
1115 560 : opts->dumpSections = DUMP_UNSECTIONED;
1116 560 : opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1117 560 : opts->compression_spec.level = 0;
1118 560 : opts->dumpSchema = true;
1119 560 : opts->dumpData = true;
1120 560 : opts->dumpStatistics = true;
1121 :
1122 560 : return opts;
1123 : }
1124 :
1125 : static void
1126 8314 : _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1127 : {
1128 8314 : RestoreOptions *ropt = AH->public.ropt;
1129 :
1130 : /* This hack is only needed in a data-only restore */
1131 8314 : if (ropt->dumpSchema || !ropt->disable_triggers)
1132 8242 : return;
1133 :
1134 72 : pg_log_info("disabling triggers for %s", te->tag);
1135 :
1136 : /*
1137 : * Become superuser if possible, since they are the only ones who can
1138 : * disable constraint triggers. If -S was not given, assume the initial
1139 : * user identity is a superuser. (XXX would it be better to become the
1140 : * table owner?)
1141 : */
1142 72 : _becomeUser(AH, ropt->superuser);
1143 :
1144 : /*
1145 : * Disable them.
1146 : */
1147 72 : ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1148 72 : fmtQualifiedId(te->namespace, te->tag));
1149 : }
1150 :
1151 : static void
1152 8312 : _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1153 : {
1154 8312 : RestoreOptions *ropt = AH->public.ropt;
1155 :
1156 : /* This hack is only needed in a data-only restore */
1157 8312 : if (ropt->dumpSchema || !ropt->disable_triggers)
1158 8240 : return;
1159 :
1160 72 : pg_log_info("enabling triggers for %s", te->tag);
1161 :
1162 : /*
1163 : * Become superuser if possible, since they are the only ones who can
1164 : * disable constraint triggers. If -S was not given, assume the initial
1165 : * user identity is a superuser. (XXX would it be better to become the
1166 : * table owner?)
1167 : */
1168 72 : _becomeUser(AH, ropt->superuser);
1169 :
1170 : /*
1171 : * Enable them.
1172 : */
1173 72 : ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1174 72 : fmtQualifiedId(te->namespace, te->tag));
1175 : }
1176 :
1177 : /*
1178 : * Detect whether a TABLE DATA TOC item is performing "load via partition
1179 : * root", that is the target table is an ancestor partition rather than the
1180 : * table the TOC item is nominally for.
1181 : *
1182 : * In newer archive files this can be detected by checking for a special
1183 : * comment placed in te->defn. In older files we have to fall back to seeing
1184 : * if the COPY statement targets the named table or some other one. This
1185 : * will not work for data dumped as INSERT commands, so we could give a false
1186 : * negative in that case; fortunately, that's a rarely-used option.
1187 : */
1188 : static bool
1189 32 : is_load_via_partition_root(TocEntry *te)
1190 : {
1191 32 : if (te->defn &&
1192 12 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
1193 12 : return true;
1194 20 : if (te->copyStmt && *te->copyStmt)
1195 : {
1196 12 : PQExpBuffer copyStmt = createPQExpBuffer();
1197 : bool result;
1198 :
1199 : /*
1200 : * Build the initial part of the COPY as it would appear if the
1201 : * nominal target table is the actual target. If we see anything
1202 : * else, it must be a load-via-partition-root case.
1203 : */
1204 12 : appendPQExpBuffer(copyStmt, "COPY %s ",
1205 12 : fmtQualifiedId(te->namespace, te->tag));
1206 12 : result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1207 12 : destroyPQExpBuffer(copyStmt);
1208 12 : return result;
1209 : }
1210 : /* Assume it's not load-via-partition-root */
1211 8 : return false;
1212 : }
1213 :
1214 : /*
1215 : * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1216 : */
1217 :
1218 : /* Public */
1219 : void
1220 3666526 : WriteData(Archive *AHX, const void *data, size_t dLen)
1221 : {
1222 3666526 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1223 :
1224 3666526 : if (!AH->currToc)
1225 0 : pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1226 :
1227 3666526 : AH->WriteDataPtr(AH, data, dLen);
1228 3666526 : }
1229 :
1230 : /*
1231 : * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1232 : * repository for all metadata. But the name has stuck.
1233 : *
1234 : * The new entry is added to the Archive's TOC list. Most callers can ignore
1235 : * the result value because nothing else need be done, but a few want to
1236 : * manipulate the TOC entry further.
1237 : */
1238 :
1239 : /* Public */
1240 : TocEntry *
1241 82290 : ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1242 : ArchiveOpts *opts)
1243 : {
1244 82290 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1245 : TocEntry *newToc;
1246 :
1247 82290 : newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1248 :
1249 82290 : AH->tocCount++;
1250 82290 : if (dumpId > AH->maxDumpId)
1251 25924 : AH->maxDumpId = dumpId;
1252 :
1253 82290 : newToc->prev = AH->toc->prev;
1254 82290 : newToc->next = AH->toc;
1255 82290 : AH->toc->prev->next = newToc;
1256 82290 : AH->toc->prev = newToc;
1257 :
1258 82290 : newToc->catalogId = catalogId;
1259 82290 : newToc->dumpId = dumpId;
1260 82290 : newToc->section = opts->section;
1261 :
1262 82290 : newToc->tag = pg_strdup(opts->tag);
1263 82290 : newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1264 82290 : newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1265 82290 : newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1266 82290 : newToc->relkind = opts->relkind;
1267 82290 : newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1268 82290 : newToc->desc = pg_strdup(opts->description);
1269 82290 : newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1270 82290 : newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1271 82290 : newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1272 :
1273 82290 : if (opts->nDeps > 0)
1274 : {
1275 33620 : newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1276 33620 : memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1277 33620 : newToc->nDeps = opts->nDeps;
1278 : }
1279 : else
1280 : {
1281 48670 : newToc->dependencies = NULL;
1282 48670 : newToc->nDeps = 0;
1283 : }
1284 :
1285 82290 : newToc->dataDumper = opts->dumpFn;
1286 82290 : newToc->dataDumperArg = opts->dumpArg;
1287 82290 : newToc->hadDumper = opts->dumpFn ? true : false;
1288 :
1289 82290 : newToc->defnDumper = opts->defnFn;
1290 82290 : newToc->defnDumperArg = opts->defnArg;
1291 :
1292 82290 : newToc->formatData = NULL;
1293 82290 : newToc->dataLength = 0;
1294 :
1295 82290 : if (AH->ArchiveEntryPtr != NULL)
1296 13110 : AH->ArchiveEntryPtr(AH, newToc);
1297 :
1298 82290 : return newToc;
1299 : }
1300 :
1301 : /* Public */
1302 : void
1303 8 : PrintTOCSummary(Archive *AHX)
1304 : {
1305 8 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1306 8 : RestoreOptions *ropt = AH->public.ropt;
1307 : TocEntry *te;
1308 8 : pg_compress_specification out_compression_spec = {0};
1309 : teSection curSection;
1310 : CompressFileHandle *sav;
1311 : const char *fmtName;
1312 : char stamp_str[64];
1313 :
1314 : /* TOC is always uncompressed */
1315 8 : out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1316 :
1317 8 : sav = SaveOutput(AH);
1318 8 : if (ropt->filename)
1319 0 : SetOutput(AH, ropt->filename, out_compression_spec);
1320 :
1321 8 : if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1322 8 : localtime(&AH->createDate)) == 0)
1323 0 : strcpy(stamp_str, "[unknown]");
1324 :
1325 8 : ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1326 16 : ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1327 8 : sanitize_line(AH->archdbname, false),
1328 : AH->tocCount,
1329 : get_compress_algorithm_name(AH->compression_spec.algorithm));
1330 :
1331 8 : switch (AH->format)
1332 : {
1333 6 : case archCustom:
1334 6 : fmtName = "CUSTOM";
1335 6 : break;
1336 2 : case archDirectory:
1337 2 : fmtName = "DIRECTORY";
1338 2 : break;
1339 0 : case archTar:
1340 0 : fmtName = "TAR";
1341 0 : break;
1342 0 : default:
1343 0 : fmtName = "UNKNOWN";
1344 : }
1345 :
1346 8 : ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1347 8 : ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1348 8 : ahprintf(AH, "; Format: %s\n", fmtName);
1349 8 : ahprintf(AH, "; Integer: %zu bytes\n", AH->intSize);
1350 8 : ahprintf(AH, "; Offset: %zu bytes\n", AH->offSize);
1351 8 : if (AH->archiveRemoteVersion)
1352 8 : ahprintf(AH, "; Dumped from database version: %s\n",
1353 : AH->archiveRemoteVersion);
1354 8 : if (AH->archiveDumpVersion)
1355 8 : ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1356 : AH->archiveDumpVersion);
1357 :
1358 8 : ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1359 :
1360 8 : curSection = SECTION_PRE_DATA;
1361 1584 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1362 : {
1363 : /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1364 1576 : if (te->section != SECTION_NONE)
1365 1268 : curSection = te->section;
1366 1576 : te->reqs = _tocEntryRequired(te, curSection, AH);
1367 : /* Now, should we print it? */
1368 1576 : if (ropt->verbose ||
1369 1576 : (te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0)
1370 : {
1371 : char *sanitized_name;
1372 : char *sanitized_schema;
1373 : char *sanitized_owner;
1374 :
1375 : /*
1376 : */
1377 1536 : sanitized_name = sanitize_line(te->tag, false);
1378 1536 : sanitized_schema = sanitize_line(te->namespace, true);
1379 1536 : sanitized_owner = sanitize_line(te->owner, false);
1380 :
1381 1536 : ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1382 : te->catalogId.tableoid, te->catalogId.oid,
1383 : te->desc, sanitized_schema, sanitized_name,
1384 : sanitized_owner);
1385 :
1386 1536 : free(sanitized_name);
1387 1536 : free(sanitized_schema);
1388 1536 : free(sanitized_owner);
1389 : }
1390 1576 : if (ropt->verbose && te->nDeps > 0)
1391 : {
1392 : int i;
1393 :
1394 0 : ahprintf(AH, ";\tdepends on:");
1395 0 : for (i = 0; i < te->nDeps; i++)
1396 0 : ahprintf(AH, " %d", te->dependencies[i]);
1397 0 : ahprintf(AH, "\n");
1398 : }
1399 : }
1400 :
1401 : /* Enforce strict names checking */
1402 8 : if (ropt->strict_names)
1403 0 : StrictNamesCheck(ropt);
1404 :
1405 8 : if (ropt->filename)
1406 0 : RestoreOutput(AH, sav);
1407 8 : }
1408 :
1409 : /***********
1410 : * Large Object Archival
1411 : ***********/
1412 :
1413 : /* Called by a dumper to signal start of a LO */
1414 : int
1415 162 : StartLO(Archive *AHX, Oid oid)
1416 : {
1417 162 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1418 :
1419 162 : if (!AH->StartLOPtr)
1420 0 : pg_fatal("large-object output not supported in chosen format");
1421 :
1422 162 : AH->StartLOPtr(AH, AH->currToc, oid);
1423 :
1424 162 : return 1;
1425 : }
1426 :
1427 : /* Called by a dumper to signal end of a LO */
1428 : int
1429 162 : EndLO(Archive *AHX, Oid oid)
1430 : {
1431 162 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1432 :
1433 162 : if (AH->EndLOPtr)
1434 162 : AH->EndLOPtr(AH, AH->currToc, oid);
1435 :
1436 162 : return 1;
1437 : }
1438 :
1439 : /**********
1440 : * Large Object Restoration
1441 : **********/
1442 :
1443 : /*
1444 : * Called by a format handler before a group of LOs is restored
1445 : */
1446 : void
1447 32 : StartRestoreLOs(ArchiveHandle *AH)
1448 : {
1449 32 : RestoreOptions *ropt = AH->public.ropt;
1450 :
1451 : /*
1452 : * LOs must be restored within a transaction block, since we need the LO
1453 : * handle to stay open while we write it. Establish a transaction unless
1454 : * there's one being used globally.
1455 : */
1456 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1457 : {
1458 32 : if (AH->connection)
1459 0 : StartTransaction(&AH->public);
1460 : else
1461 32 : ahprintf(AH, "BEGIN;\n\n");
1462 : }
1463 :
1464 32 : AH->loCount = 0;
1465 32 : }
1466 :
1467 : /*
1468 : * Called by a format handler after a group of LOs is restored
1469 : */
1470 : void
1471 32 : EndRestoreLOs(ArchiveHandle *AH)
1472 : {
1473 32 : RestoreOptions *ropt = AH->public.ropt;
1474 :
1475 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1476 : {
1477 32 : if (AH->connection)
1478 0 : CommitTransaction(&AH->public);
1479 : else
1480 32 : ahprintf(AH, "COMMIT;\n\n");
1481 : }
1482 :
1483 32 : pg_log_info(ngettext("restored %d large object",
1484 : "restored %d large objects",
1485 : AH->loCount),
1486 : AH->loCount);
1487 32 : }
1488 :
1489 :
1490 : /*
1491 : * Called by a format handler to initiate restoration of a LO
1492 : */
1493 : void
1494 32 : StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1495 : {
1496 32 : bool old_lo_style = (AH->version < K_VERS_1_12);
1497 : Oid loOid;
1498 :
1499 32 : AH->loCount++;
1500 :
1501 : /* Initialize the LO Buffer */
1502 32 : if (AH->lo_buf == NULL)
1503 : {
1504 : /* First time through (in this process) so allocate the buffer */
1505 20 : AH->lo_buf_size = LOBBUFSIZE;
1506 20 : AH->lo_buf = pg_malloc(LOBBUFSIZE);
1507 : }
1508 32 : AH->lo_buf_used = 0;
1509 :
1510 32 : pg_log_info("restoring large object with OID %u", oid);
1511 :
1512 : /* With an old archive we must do drop and create logic here */
1513 32 : if (old_lo_style && drop)
1514 0 : DropLOIfExists(AH, oid);
1515 :
1516 32 : if (AH->connection)
1517 : {
1518 0 : if (old_lo_style)
1519 : {
1520 0 : loOid = lo_create(AH->connection, oid);
1521 0 : if (loOid == 0 || loOid != oid)
1522 0 : pg_fatal("could not create large object %u: %s",
1523 : oid, PQerrorMessage(AH->connection));
1524 : }
1525 0 : AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1526 0 : if (AH->loFd == -1)
1527 0 : pg_fatal("could not open large object %u: %s",
1528 : oid, PQerrorMessage(AH->connection));
1529 : }
1530 : else
1531 : {
1532 32 : if (old_lo_style)
1533 0 : ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1534 : oid, INV_WRITE);
1535 : else
1536 32 : ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1537 : oid, INV_WRITE);
1538 : }
1539 :
1540 32 : AH->writingLO = true;
1541 32 : }
1542 :
1543 : void
1544 32 : EndRestoreLO(ArchiveHandle *AH, Oid oid)
1545 : {
1546 32 : if (AH->lo_buf_used > 0)
1547 : {
1548 : /* Write remaining bytes from the LO buffer */
1549 20 : dump_lo_buf(AH);
1550 : }
1551 :
1552 32 : AH->writingLO = false;
1553 :
1554 32 : if (AH->connection)
1555 : {
1556 0 : lo_close(AH->connection, AH->loFd);
1557 0 : AH->loFd = -1;
1558 : }
1559 : else
1560 : {
1561 32 : ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1562 : }
1563 32 : }
1564 :
1565 : /***********
1566 : * Sorting and Reordering
1567 : ***********/
1568 :
1569 : void
1570 0 : SortTocFromFile(Archive *AHX)
1571 : {
1572 0 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1573 0 : RestoreOptions *ropt = AH->public.ropt;
1574 : FILE *fh;
1575 : StringInfoData linebuf;
1576 :
1577 : /* Allocate space for the 'wanted' array, and init it */
1578 0 : ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1579 :
1580 : /* Setup the file */
1581 0 : fh = fopen(ropt->tocFile, PG_BINARY_R);
1582 0 : if (!fh)
1583 0 : pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1584 :
1585 0 : initStringInfo(&linebuf);
1586 :
1587 0 : while (pg_get_line_buf(fh, &linebuf))
1588 : {
1589 : char *cmnt;
1590 : char *endptr;
1591 : DumpId id;
1592 : TocEntry *te;
1593 :
1594 : /* Truncate line at comment, if any */
1595 0 : cmnt = strchr(linebuf.data, ';');
1596 0 : if (cmnt != NULL)
1597 : {
1598 0 : cmnt[0] = '\0';
1599 0 : linebuf.len = cmnt - linebuf.data;
1600 : }
1601 :
1602 : /* Ignore if all blank */
1603 0 : if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1604 0 : continue;
1605 :
1606 : /* Get an ID, check it's valid and not already seen */
1607 0 : id = strtol(linebuf.data, &endptr, 10);
1608 0 : if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1609 0 : ropt->idWanted[id - 1])
1610 : {
1611 0 : pg_log_warning("line ignored: %s", linebuf.data);
1612 0 : continue;
1613 : }
1614 :
1615 : /* Find TOC entry */
1616 0 : te = getTocEntryByDumpId(AH, id);
1617 0 : if (!te)
1618 0 : pg_fatal("could not find entry for ID %d",
1619 : id);
1620 :
1621 : /* Mark it wanted */
1622 0 : ropt->idWanted[id - 1] = true;
1623 :
1624 : /*
1625 : * Move each item to the end of the list as it is selected, so that
1626 : * they are placed in the desired order. Any unwanted items will end
1627 : * up at the front of the list, which may seem unintuitive but it's
1628 : * what we need. In an ordinary serial restore that makes no
1629 : * difference, but in a parallel restore we need to mark unrestored
1630 : * items' dependencies as satisfied before we start examining
1631 : * restorable items. Otherwise they could have surprising
1632 : * side-effects on the order in which restorable items actually get
1633 : * restored.
1634 : */
1635 0 : _moveBefore(AH->toc, te);
1636 : }
1637 :
1638 0 : pg_free(linebuf.data);
1639 :
1640 0 : if (fclose(fh) != 0)
1641 0 : pg_fatal("could not close TOC file: %m");
1642 0 : }
1643 :
1644 : /**********************
1645 : * Convenience functions that look like standard IO functions
1646 : * for writing data when in dump mode.
1647 : **********************/
1648 :
1649 : /* Public */
1650 : void
1651 46048 : archputs(const char *s, Archive *AH)
1652 : {
1653 46048 : WriteData(AH, s, strlen(s));
1654 46048 : }
1655 :
1656 : /* Public */
1657 : int
1658 8120 : archprintf(Archive *AH, const char *fmt,...)
1659 : {
1660 8120 : int save_errno = errno;
1661 : char *p;
1662 8120 : size_t len = 128; /* initial assumption about buffer size */
1663 : size_t cnt;
1664 :
1665 : for (;;)
1666 0 : {
1667 : va_list args;
1668 :
1669 : /* Allocate work buffer. */
1670 8120 : p = (char *) pg_malloc(len);
1671 :
1672 : /* Try to format the data. */
1673 8120 : errno = save_errno;
1674 8120 : va_start(args, fmt);
1675 8120 : cnt = pvsnprintf(p, len, fmt, args);
1676 8120 : va_end(args);
1677 :
1678 8120 : if (cnt < len)
1679 8120 : break; /* success */
1680 :
1681 : /* Release buffer and loop around to try again with larger len. */
1682 0 : free(p);
1683 0 : len = cnt;
1684 : }
1685 :
1686 8120 : WriteData(AH, p, cnt);
1687 8120 : free(p);
1688 8120 : return (int) cnt;
1689 : }
1690 :
1691 :
1692 : /*******************************
1693 : * Stuff below here should be 'private' to the archiver routines
1694 : *******************************/
1695 :
1696 : static void
1697 298 : SetOutput(ArchiveHandle *AH, const char *filename,
1698 : const pg_compress_specification compression_spec)
1699 : {
1700 : CompressFileHandle *CFH;
1701 : const char *mode;
1702 298 : int fn = -1;
1703 :
1704 298 : if (filename)
1705 : {
1706 298 : if (strcmp(filename, "-") == 0)
1707 0 : fn = fileno(stdout);
1708 : }
1709 0 : else if (AH->FH)
1710 0 : fn = fileno(AH->FH);
1711 0 : else if (AH->fSpec)
1712 : {
1713 0 : filename = AH->fSpec;
1714 : }
1715 : else
1716 0 : fn = fileno(stdout);
1717 :
1718 298 : if (AH->mode == archModeAppend)
1719 98 : mode = PG_BINARY_A;
1720 : else
1721 200 : mode = PG_BINARY_W;
1722 :
1723 298 : CFH = InitCompressFileHandle(compression_spec);
1724 :
1725 298 : if (!CFH->open_func(filename, fn, mode, CFH))
1726 : {
1727 0 : if (filename)
1728 0 : pg_fatal("could not open output file \"%s\": %m", filename);
1729 : else
1730 0 : pg_fatal("could not open output file: %m");
1731 : }
1732 :
1733 298 : AH->OF = CFH;
1734 298 : }
1735 :
1736 : static CompressFileHandle *
1737 380 : SaveOutput(ArchiveHandle *AH)
1738 : {
1739 380 : return (CompressFileHandle *) AH->OF;
1740 : }
1741 :
1742 : static void
1743 298 : RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
1744 : {
1745 298 : errno = 0;
1746 298 : if (!EndCompressFileHandle(AH->OF))
1747 0 : pg_fatal("could not close output file: %m");
1748 :
1749 298 : AH->OF = savedOutput;
1750 298 : }
1751 :
1752 :
1753 :
1754 : /*
1755 : * Print formatted text to the output file (usually stdout).
1756 : */
1757 : int
1758 428748 : ahprintf(ArchiveHandle *AH, const char *fmt,...)
1759 : {
1760 428748 : int save_errno = errno;
1761 : char *p;
1762 428748 : size_t len = 128; /* initial assumption about buffer size */
1763 : size_t cnt;
1764 :
1765 : for (;;)
1766 27008 : {
1767 : va_list args;
1768 :
1769 : /* Allocate work buffer. */
1770 455756 : p = (char *) pg_malloc(len);
1771 :
1772 : /* Try to format the data. */
1773 455756 : errno = save_errno;
1774 455756 : va_start(args, fmt);
1775 455756 : cnt = pvsnprintf(p, len, fmt, args);
1776 455756 : va_end(args);
1777 :
1778 455756 : if (cnt < len)
1779 428748 : break; /* success */
1780 :
1781 : /* Release buffer and loop around to try again with larger len. */
1782 27008 : free(p);
1783 27008 : len = cnt;
1784 : }
1785 :
1786 428748 : ahwrite(p, 1, cnt, AH);
1787 428748 : free(p);
1788 428748 : return (int) cnt;
1789 : }
1790 :
1791 : /*
1792 : * Single place for logic which says 'We are restoring to a direct DB connection'.
1793 : */
1794 : static int
1795 4050390 : RestoringToDB(ArchiveHandle *AH)
1796 : {
1797 4050390 : RestoreOptions *ropt = AH->public.ropt;
1798 :
1799 4050390 : return (ropt && ropt->useDB && AH->connection);
1800 : }
1801 :
1802 : /*
1803 : * Dump the current contents of the LO data buffer while writing a LO
1804 : */
1805 : static void
1806 20 : dump_lo_buf(ArchiveHandle *AH)
1807 : {
1808 20 : if (AH->connection)
1809 : {
1810 : int res;
1811 :
1812 0 : res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1813 0 : pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1814 : "wrote %zu bytes of large object data (result = %d)",
1815 : AH->lo_buf_used),
1816 : AH->lo_buf_used, res);
1817 : /* We assume there are no short writes, only errors */
1818 0 : if (res != AH->lo_buf_used)
1819 0 : warn_or_exit_horribly(AH, "could not write to large object: %s",
1820 0 : PQerrorMessage(AH->connection));
1821 : }
1822 : else
1823 : {
1824 20 : PQExpBuffer buf = createPQExpBuffer();
1825 :
1826 20 : appendByteaLiteralAHX(buf,
1827 : (const unsigned char *) AH->lo_buf,
1828 : AH->lo_buf_used,
1829 : AH);
1830 :
1831 : /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1832 20 : AH->writingLO = false;
1833 20 : ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1834 20 : AH->writingLO = true;
1835 :
1836 20 : destroyPQExpBuffer(buf);
1837 : }
1838 20 : AH->lo_buf_used = 0;
1839 20 : }
1840 :
1841 :
1842 : /*
1843 : * Write buffer to the output file (usually stdout). This is used for
1844 : * outputting 'restore' scripts etc. It is even possible for an archive
1845 : * format to create a custom output routine to 'fake' a restore if it
1846 : * wants to generate a script (see TAR output).
1847 : */
1848 : void
1849 4045482 : ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1850 : {
1851 4045482 : int bytes_written = 0;
1852 :
1853 4045482 : if (AH->writingLO)
1854 : {
1855 26 : size_t remaining = size * nmemb;
1856 :
1857 26 : while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1858 : {
1859 0 : size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1860 :
1861 0 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1862 0 : ptr = (const char *) ptr + avail;
1863 0 : remaining -= avail;
1864 0 : AH->lo_buf_used += avail;
1865 0 : dump_lo_buf(AH);
1866 : }
1867 :
1868 26 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1869 26 : AH->lo_buf_used += remaining;
1870 :
1871 26 : bytes_written = size * nmemb;
1872 : }
1873 4045456 : else if (AH->CustomOutPtr)
1874 4544 : bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1875 :
1876 : /*
1877 : * If we're doing a restore, and it's direct to DB, and we're connected
1878 : * then send it to the DB.
1879 : */
1880 4040912 : else if (RestoringToDB(AH))
1881 12812 : bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1882 : else
1883 : {
1884 4028100 : CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1885 :
1886 4028100 : CFH->write_func(ptr, size * nmemb, CFH);
1887 4028100 : bytes_written = size * nmemb;
1888 : }
1889 :
1890 4045482 : if (bytes_written != size * nmemb)
1891 0 : WRITE_ERROR_EXIT;
1892 4045482 : }
1893 :
1894 : /* on some error, we may decide to go on... */
1895 : void
1896 0 : warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1897 : {
1898 : va_list ap;
1899 :
1900 0 : switch (AH->stage)
1901 : {
1902 :
1903 0 : case STAGE_NONE:
1904 : /* Do nothing special */
1905 0 : break;
1906 :
1907 0 : case STAGE_INITIALIZING:
1908 0 : if (AH->stage != AH->lastErrorStage)
1909 0 : pg_log_info("while INITIALIZING:");
1910 0 : break;
1911 :
1912 0 : case STAGE_PROCESSING:
1913 0 : if (AH->stage != AH->lastErrorStage)
1914 0 : pg_log_info("while PROCESSING TOC:");
1915 0 : break;
1916 :
1917 0 : case STAGE_FINALIZING:
1918 0 : if (AH->stage != AH->lastErrorStage)
1919 0 : pg_log_info("while FINALIZING:");
1920 0 : break;
1921 : }
1922 0 : if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1923 : {
1924 0 : pg_log_info("from TOC entry %d; %u %u %s %s %s",
1925 : AH->currentTE->dumpId,
1926 : AH->currentTE->catalogId.tableoid,
1927 : AH->currentTE->catalogId.oid,
1928 : AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1929 : AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1930 : AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1931 : }
1932 0 : AH->lastErrorStage = AH->stage;
1933 0 : AH->lastErrorTE = AH->currentTE;
1934 :
1935 0 : va_start(ap, fmt);
1936 0 : pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, ap);
1937 0 : va_end(ap);
1938 :
1939 0 : if (AH->public.exit_on_error)
1940 0 : exit_nicely(1);
1941 : else
1942 0 : AH->public.n_errors++;
1943 0 : }
1944 :
1945 : #ifdef NOT_USED
1946 :
1947 : static void
1948 : _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1949 : {
1950 : /* Unlink te from list */
1951 : te->prev->next = te->next;
1952 : te->next->prev = te->prev;
1953 :
1954 : /* and insert it after "pos" */
1955 : te->prev = pos;
1956 : te->next = pos->next;
1957 : pos->next->prev = te;
1958 : pos->next = te;
1959 : }
1960 : #endif
1961 :
1962 : static void
1963 0 : _moveBefore(TocEntry *pos, TocEntry *te)
1964 : {
1965 : /* Unlink te from list */
1966 0 : te->prev->next = te->next;
1967 0 : te->next->prev = te->prev;
1968 :
1969 : /* and insert it before "pos" */
1970 0 : te->prev = pos->prev;
1971 0 : te->next = pos;
1972 0 : pos->prev->next = te;
1973 0 : pos->prev = te;
1974 0 : }
1975 :
1976 : /*
1977 : * Build index arrays for the TOC list
1978 : *
1979 : * This should be invoked only after we have created or read in all the TOC
1980 : * items.
1981 : *
1982 : * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1983 : * array entries run only up to maxDumpId. We might see dependency dump IDs
1984 : * beyond that (if the dump was partial); so always check the array bound
1985 : * before trying to touch an array entry.
1986 : */
1987 : static void
1988 448 : buildTocEntryArrays(ArchiveHandle *AH)
1989 : {
1990 448 : DumpId maxDumpId = AH->maxDumpId;
1991 : TocEntry *te;
1992 :
1993 448 : AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1994 448 : AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1995 :
1996 95466 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1997 : {
1998 : /* this check is purely paranoia, maxDumpId should be correct */
1999 95018 : if (te->dumpId <= 0 || te->dumpId > maxDumpId)
2000 0 : pg_fatal("bad dumpId");
2001 :
2002 : /* tocsByDumpId indexes all TOCs by their dump ID */
2003 95018 : AH->tocsByDumpId[te->dumpId] = te;
2004 :
2005 : /*
2006 : * tableDataId provides the TABLE DATA item's dump ID for each TABLE
2007 : * TOC entry that has a DATA item. We compute this by reversing the
2008 : * TABLE DATA item's dependency, knowing that a TABLE DATA item has
2009 : * just one dependency and it is the TABLE item.
2010 : */
2011 95018 : if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
2012 : {
2013 9056 : DumpId tableId = te->dependencies[0];
2014 :
2015 : /*
2016 : * The TABLE item might not have been in the archive, if this was
2017 : * a data-only dump; but its dump ID should be less than its data
2018 : * item's dump ID, so there should be a place for it in the array.
2019 : */
2020 9056 : if (tableId <= 0 || tableId > maxDumpId)
2021 0 : pg_fatal("bad table dumpId for TABLE DATA item");
2022 :
2023 9056 : AH->tableDataId[tableId] = te->dumpId;
2024 : }
2025 : }
2026 448 : }
2027 :
2028 : TocEntry *
2029 20516 : getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
2030 : {
2031 : /* build index arrays if we didn't already */
2032 20516 : if (AH->tocsByDumpId == NULL)
2033 80 : buildTocEntryArrays(AH);
2034 :
2035 20516 : if (id > 0 && id <= AH->maxDumpId)
2036 20516 : return AH->tocsByDumpId[id];
2037 :
2038 0 : return NULL;
2039 : }
2040 :
2041 : int
2042 20292 : TocIDRequired(ArchiveHandle *AH, DumpId id)
2043 : {
2044 20292 : TocEntry *te = getTocEntryByDumpId(AH, id);
2045 :
2046 20292 : if (!te)
2047 9768 : return 0;
2048 :
2049 10524 : return te->reqs;
2050 : }
2051 :
2052 : size_t
2053 12832 : WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
2054 : {
2055 : int off;
2056 :
2057 : /* Save the flag */
2058 12832 : AH->WriteBytePtr(AH, wasSet);
2059 :
2060 : /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2061 115488 : for (off = 0; off < sizeof(pgoff_t); off++)
2062 : {
2063 102656 : AH->WriteBytePtr(AH, o & 0xFF);
2064 102656 : o >>= 8;
2065 : }
2066 12832 : return sizeof(pgoff_t) + 1;
2067 : }
2068 :
2069 : int
2070 11182 : ReadOffset(ArchiveHandle *AH, pgoff_t *o)
2071 : {
2072 : int i;
2073 : int off;
2074 : int offsetFlg;
2075 :
2076 : /* Initialize to zero */
2077 11182 : *o = 0;
2078 :
2079 : /* Check for old version */
2080 11182 : if (AH->version < K_VERS_1_7)
2081 : {
2082 : /* Prior versions wrote offsets using WriteInt */
2083 0 : i = ReadInt(AH);
2084 : /* -1 means not set */
2085 0 : if (i < 0)
2086 0 : return K_OFFSET_POS_NOT_SET;
2087 0 : else if (i == 0)
2088 0 : return K_OFFSET_NO_DATA;
2089 :
2090 : /* Cast to pgoff_t because it was written as an int. */
2091 0 : *o = (pgoff_t) i;
2092 0 : return K_OFFSET_POS_SET;
2093 : }
2094 :
2095 : /*
2096 : * Read the flag indicating the state of the data pointer. Check if valid
2097 : * and die if not.
2098 : *
2099 : * This used to be handled by a negative or zero pointer, now we use an
2100 : * extra byte specifically for the state.
2101 : */
2102 11182 : offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2103 :
2104 11182 : switch (offsetFlg)
2105 : {
2106 11182 : case K_OFFSET_POS_NOT_SET:
2107 : case K_OFFSET_NO_DATA:
2108 : case K_OFFSET_POS_SET:
2109 :
2110 11182 : break;
2111 :
2112 0 : default:
2113 0 : pg_fatal("unexpected data offset flag %d", offsetFlg);
2114 : }
2115 :
2116 : /*
2117 : * Read the bytes
2118 : */
2119 100638 : for (off = 0; off < AH->offSize; off++)
2120 : {
2121 89456 : if (off < sizeof(pgoff_t))
2122 89456 : *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2123 : else
2124 : {
2125 0 : if (AH->ReadBytePtr(AH) != 0)
2126 0 : pg_fatal("file offset in dump file is too large");
2127 : }
2128 : }
2129 :
2130 11182 : return offsetFlg;
2131 : }
2132 :
2133 : size_t
2134 294146 : WriteInt(ArchiveHandle *AH, int i)
2135 : {
2136 : int b;
2137 :
2138 : /*
2139 : * This is a bit yucky, but I don't want to make the binary format very
2140 : * dependent on representation, and not knowing much about it, I write out
2141 : * a sign byte. If you change this, don't forget to change the file
2142 : * version #, and modify ReadInt to read the new format AS WELL AS the old
2143 : * formats.
2144 : */
2145 :
2146 : /* SIGN byte */
2147 294146 : if (i < 0)
2148 : {
2149 70880 : AH->WriteBytePtr(AH, 1);
2150 70880 : i = -i;
2151 : }
2152 : else
2153 223266 : AH->WriteBytePtr(AH, 0);
2154 :
2155 1470730 : for (b = 0; b < AH->intSize; b++)
2156 : {
2157 1176584 : AH->WriteBytePtr(AH, i & 0xFF);
2158 1176584 : i >>= 8;
2159 : }
2160 :
2161 294146 : return AH->intSize + 1;
2162 : }
2163 :
2164 : int
2165 278286 : ReadInt(ArchiveHandle *AH)
2166 : {
2167 278286 : int res = 0;
2168 : int bv,
2169 : b;
2170 278286 : int sign = 0; /* Default positive */
2171 278286 : int bitShift = 0;
2172 :
2173 278286 : if (AH->version > K_VERS_1_0)
2174 : /* Read a sign byte */
2175 278286 : sign = AH->ReadBytePtr(AH);
2176 :
2177 1391430 : for (b = 0; b < AH->intSize; b++)
2178 : {
2179 1113144 : bv = AH->ReadBytePtr(AH) & 0xFF;
2180 1113144 : if (bv != 0)
2181 264798 : res = res + (bv << bitShift);
2182 1113144 : bitShift += 8;
2183 : }
2184 :
2185 278286 : if (sign)
2186 66390 : res = -res;
2187 :
2188 278286 : return res;
2189 : }
2190 :
2191 : size_t
2192 229606 : WriteStr(ArchiveHandle *AH, const char *c)
2193 : {
2194 : size_t res;
2195 :
2196 229606 : if (c)
2197 : {
2198 158726 : int len = strlen(c);
2199 :
2200 158726 : res = WriteInt(AH, len);
2201 158726 : AH->WriteBufPtr(AH, c, len);
2202 158726 : res += len;
2203 : }
2204 : else
2205 70880 : res = WriteInt(AH, -1);
2206 :
2207 229606 : return res;
2208 : }
2209 :
2210 : char *
2211 217502 : ReadStr(ArchiveHandle *AH)
2212 : {
2213 : char *buf;
2214 : int l;
2215 :
2216 217502 : l = ReadInt(AH);
2217 217502 : if (l < 0)
2218 66390 : buf = NULL;
2219 : else
2220 : {
2221 151112 : buf = (char *) pg_malloc(l + 1);
2222 151112 : AH->ReadBufPtr(AH, buf, l);
2223 :
2224 151112 : buf[l] = '\0';
2225 : }
2226 :
2227 217502 : return buf;
2228 : }
2229 :
2230 : static bool
2231 22 : _fileExistsInDirectory(const char *dir, const char *filename)
2232 : {
2233 : struct stat st;
2234 : char buf[MAXPGPATH];
2235 :
2236 22 : if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2237 0 : pg_fatal("directory name too long: \"%s\"", dir);
2238 :
2239 22 : return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2240 : }
2241 :
2242 : static int
2243 92 : _discoverArchiveFormat(ArchiveHandle *AH)
2244 : {
2245 : FILE *fh;
2246 : char sig[6]; /* More than enough */
2247 : size_t cnt;
2248 92 : int wantClose = 0;
2249 :
2250 92 : pg_log_debug("attempting to ascertain archive format");
2251 :
2252 92 : free(AH->lookahead);
2253 :
2254 92 : AH->readHeader = 0;
2255 92 : AH->lookaheadSize = 512;
2256 92 : AH->lookahead = pg_malloc0(512);
2257 92 : AH->lookaheadLen = 0;
2258 92 : AH->lookaheadPos = 0;
2259 :
2260 92 : if (AH->fSpec)
2261 : {
2262 : struct stat st;
2263 :
2264 92 : wantClose = 1;
2265 :
2266 : /*
2267 : * Check if the specified archive is a directory. If so, check if
2268 : * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2269 : */
2270 92 : if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2271 : {
2272 20 : AH->format = archDirectory;
2273 20 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2274 20 : return AH->format;
2275 : #ifdef HAVE_LIBZ
2276 2 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2277 2 : return AH->format;
2278 : #endif
2279 : #ifdef USE_LZ4
2280 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2281 0 : return AH->format;
2282 : #endif
2283 : #ifdef USE_ZSTD
2284 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2285 : return AH->format;
2286 : #endif
2287 0 : pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2288 : AH->fSpec);
2289 : fh = NULL; /* keep compiler quiet */
2290 : }
2291 : else
2292 : {
2293 72 : fh = fopen(AH->fSpec, PG_BINARY_R);
2294 72 : if (!fh)
2295 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2296 : }
2297 : }
2298 : else
2299 : {
2300 0 : fh = stdin;
2301 0 : if (!fh)
2302 0 : pg_fatal("could not open input file: %m");
2303 : }
2304 :
2305 72 : if ((cnt = fread(sig, 1, 5, fh)) != 5)
2306 : {
2307 0 : if (ferror(fh))
2308 0 : pg_fatal("could not read input file: %m");
2309 : else
2310 0 : pg_fatal("input file is too short (read %zu, expected 5)", cnt);
2311 : }
2312 :
2313 : /* Save it, just in case we need it later */
2314 72 : memcpy(&AH->lookahead[0], sig, 5);
2315 72 : AH->lookaheadLen = 5;
2316 :
2317 72 : if (strncmp(sig, "PGDMP", 5) == 0)
2318 : {
2319 : /* It's custom format, stop here */
2320 70 : AH->format = archCustom;
2321 70 : AH->readHeader = 1;
2322 : }
2323 : else
2324 : {
2325 : /*
2326 : * *Maybe* we have a tar archive format file or a text dump ... So,
2327 : * read first 512 byte header...
2328 : */
2329 2 : cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2330 : /* read failure is checked below */
2331 2 : AH->lookaheadLen += cnt;
2332 :
2333 2 : if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2334 2 : (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2335 2 : strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2336 : {
2337 : /*
2338 : * looks like it's probably a text format dump. so suggest they
2339 : * try psql
2340 : */
2341 0 : pg_fatal("input file appears to be a text format dump. Please use psql.");
2342 : }
2343 :
2344 2 : if (AH->lookaheadLen != 512)
2345 : {
2346 0 : if (feof(fh))
2347 0 : pg_fatal("input file does not appear to be a valid archive (too short?)");
2348 : else
2349 0 : READ_ERROR_EXIT(fh);
2350 : }
2351 :
2352 2 : if (!isValidTarHeader(AH->lookahead))
2353 0 : pg_fatal("input file does not appear to be a valid archive");
2354 :
2355 2 : AH->format = archTar;
2356 : }
2357 :
2358 : /* Close the file if we opened it */
2359 72 : if (wantClose)
2360 : {
2361 72 : if (fclose(fh) != 0)
2362 0 : pg_fatal("could not close input file: %m");
2363 : /* Forget lookahead, since we'll re-read header after re-opening */
2364 72 : AH->readHeader = 0;
2365 72 : AH->lookaheadLen = 0;
2366 : }
2367 :
2368 72 : return AH->format;
2369 : }
2370 :
2371 :
2372 : /*
2373 : * Allocate an archive handle
2374 : */
2375 : static ArchiveHandle *
2376 528 : _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2377 : const pg_compress_specification compression_spec,
2378 : bool dosync, ArchiveMode mode,
2379 : SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
2380 : {
2381 : ArchiveHandle *AH;
2382 : CompressFileHandle *CFH;
2383 528 : pg_compress_specification out_compress_spec = {0};
2384 :
2385 528 : pg_log_debug("allocating AH for %s, format %d",
2386 : FileSpec ? FileSpec : "(stdio)", fmt);
2387 :
2388 528 : AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2389 :
2390 528 : AH->version = K_VERS_SELF;
2391 :
2392 : /* initialize for backwards compatible string processing */
2393 528 : AH->public.encoding = 0; /* PG_SQL_ASCII */
2394 528 : AH->public.std_strings = false;
2395 :
2396 : /* sql error handling */
2397 528 : AH->public.exit_on_error = true;
2398 528 : AH->public.n_errors = 0;
2399 :
2400 528 : AH->archiveDumpVersion = PG_VERSION;
2401 :
2402 528 : AH->createDate = time(NULL);
2403 :
2404 528 : AH->intSize = sizeof(int);
2405 528 : AH->offSize = sizeof(pgoff_t);
2406 528 : if (FileSpec)
2407 : {
2408 490 : AH->fSpec = pg_strdup(FileSpec);
2409 :
2410 : /*
2411 : * Not used; maybe later....
2412 : *
2413 : * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2414 : * i--) if (AH->workDir[i-1] == '/')
2415 : */
2416 : }
2417 : else
2418 38 : AH->fSpec = NULL;
2419 :
2420 528 : AH->currUser = NULL; /* unknown */
2421 528 : AH->currSchema = NULL; /* ditto */
2422 528 : AH->currTablespace = NULL; /* ditto */
2423 528 : AH->currTableAm = NULL; /* ditto */
2424 :
2425 528 : AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2426 :
2427 528 : AH->toc->next = AH->toc;
2428 528 : AH->toc->prev = AH->toc;
2429 :
2430 528 : AH->mode = mode;
2431 528 : AH->compression_spec = compression_spec;
2432 528 : AH->dosync = dosync;
2433 528 : AH->sync_method = sync_method;
2434 :
2435 528 : memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2436 :
2437 : /* Open stdout with no compression for AH output handle */
2438 528 : out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2439 528 : CFH = InitCompressFileHandle(out_compress_spec);
2440 528 : if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2441 0 : pg_fatal("could not open stdout for appending: %m");
2442 528 : AH->OF = CFH;
2443 :
2444 : /*
2445 : * On Windows, we need to use binary mode to read/write non-text files,
2446 : * which include all archive formats as well as compressed plain text.
2447 : * Force stdin/stdout into binary mode if that is what we are using.
2448 : */
2449 : #ifdef WIN32
2450 : if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2451 : (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2452 : {
2453 : if (mode == archModeWrite)
2454 : _setmode(fileno(stdout), O_BINARY);
2455 : else
2456 : _setmode(fileno(stdin), O_BINARY);
2457 : }
2458 : #endif
2459 :
2460 528 : AH->SetupWorkerPtr = setupWorkerPtr;
2461 :
2462 528 : if (fmt == archUnknown)
2463 92 : AH->format = _discoverArchiveFormat(AH);
2464 : else
2465 436 : AH->format = fmt;
2466 :
2467 528 : switch (AH->format)
2468 : {
2469 178 : case archCustom:
2470 178 : InitArchiveFmt_Custom(AH);
2471 178 : break;
2472 :
2473 298 : case archNull:
2474 298 : InitArchiveFmt_Null(AH);
2475 298 : break;
2476 :
2477 42 : case archDirectory:
2478 42 : InitArchiveFmt_Directory(AH);
2479 42 : break;
2480 :
2481 10 : case archTar:
2482 10 : InitArchiveFmt_Tar(AH);
2483 8 : break;
2484 :
2485 0 : default:
2486 0 : pg_fatal("unrecognized file format \"%d\"", AH->format);
2487 : }
2488 :
2489 526 : return AH;
2490 : }
2491 :
2492 : /*
2493 : * Write out all data (tables & LOs)
2494 : */
2495 : void
2496 112 : WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2497 : {
2498 : TocEntry *te;
2499 :
2500 112 : if (pstate && pstate->numWorkers > 1)
2501 16 : {
2502 : /*
2503 : * In parallel mode, this code runs in the leader process. We
2504 : * construct an array of candidate TEs, then sort it into decreasing
2505 : * size order, then dispatch each TE to a data-transfer worker. By
2506 : * dumping larger tables first, we avoid getting into a situation
2507 : * where we're down to one job and it's big, losing parallelism.
2508 : */
2509 : TocEntry **tes;
2510 : int ntes;
2511 :
2512 16 : tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2513 16 : ntes = 0;
2514 1172 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2515 : {
2516 : /* Consider only TEs with dataDumper functions ... */
2517 1156 : if (!te->dataDumper)
2518 1024 : continue;
2519 : /* ... and ignore ones not enabled for dump */
2520 132 : if ((te->reqs & REQ_DATA) == 0)
2521 0 : continue;
2522 :
2523 132 : tes[ntes++] = te;
2524 : }
2525 :
2526 16 : if (ntes > 1)
2527 14 : qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2528 :
2529 148 : for (int i = 0; i < ntes; i++)
2530 132 : DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2531 : mark_dump_job_done, NULL);
2532 :
2533 16 : pg_free(tes);
2534 :
2535 : /* Now wait for workers to finish. */
2536 16 : WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2537 : }
2538 : else
2539 : {
2540 : /* Non-parallel mode: just dump all candidate TEs sequentially. */
2541 12050 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2542 : {
2543 : /* Must have same filter conditions as above */
2544 11954 : if (!te->dataDumper)
2545 11348 : continue;
2546 606 : if ((te->reqs & REQ_DATA) == 0)
2547 26 : continue;
2548 :
2549 580 : WriteDataChunksForTocEntry(AH, te);
2550 : }
2551 : }
2552 112 : }
2553 :
2554 :
2555 : /*
2556 : * Callback function that's invoked in the leader process after a step has
2557 : * been parallel dumped.
2558 : *
2559 : * We don't need to do anything except check for worker failure.
2560 : */
2561 : static void
2562 132 : mark_dump_job_done(ArchiveHandle *AH,
2563 : TocEntry *te,
2564 : int status,
2565 : void *callback_data)
2566 : {
2567 132 : pg_log_info("finished item %d %s %s",
2568 : te->dumpId, te->desc, te->tag);
2569 :
2570 132 : if (status != 0)
2571 0 : pg_fatal("worker process failed: exit code %d",
2572 : status);
2573 132 : }
2574 :
2575 :
2576 : void
2577 712 : WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2578 : {
2579 : StartDataPtrType startPtr;
2580 : EndDataPtrType endPtr;
2581 :
2582 712 : AH->currToc = te;
2583 :
2584 712 : if (strcmp(te->desc, "BLOBS") == 0)
2585 : {
2586 34 : startPtr = AH->StartLOsPtr;
2587 34 : endPtr = AH->EndLOsPtr;
2588 : }
2589 : else
2590 : {
2591 678 : startPtr = AH->StartDataPtr;
2592 678 : endPtr = AH->EndDataPtr;
2593 : }
2594 :
2595 712 : if (startPtr != NULL)
2596 712 : (*startPtr) (AH, te);
2597 :
2598 : /*
2599 : * The user-provided DataDumper routine needs to call AH->WriteData
2600 : */
2601 712 : te->dataDumper((Archive *) AH, te->dataDumperArg);
2602 :
2603 712 : if (endPtr != NULL)
2604 712 : (*endPtr) (AH, te);
2605 :
2606 712 : AH->currToc = NULL;
2607 712 : }
2608 :
2609 : void
2610 130 : WriteToc(ArchiveHandle *AH)
2611 : {
2612 : TocEntry *te;
2613 : char workbuf[32];
2614 : int tocCount;
2615 : int i;
2616 :
2617 : /* count entries that will actually be dumped */
2618 130 : tocCount = 0;
2619 15740 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2620 : {
2621 15610 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) != 0)
2622 15584 : tocCount++;
2623 : }
2624 :
2625 : /* printf("%d TOC Entries to save\n", tocCount); */
2626 :
2627 130 : WriteInt(AH, tocCount);
2628 :
2629 15740 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2630 : {
2631 15610 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) == 0)
2632 26 : continue;
2633 :
2634 15584 : WriteInt(AH, te->dumpId);
2635 15584 : WriteInt(AH, te->dataDumper ? 1 : 0);
2636 :
2637 : /* OID is recorded as a string for historical reasons */
2638 15584 : sprintf(workbuf, "%u", te->catalogId.tableoid);
2639 15584 : WriteStr(AH, workbuf);
2640 15584 : sprintf(workbuf, "%u", te->catalogId.oid);
2641 15584 : WriteStr(AH, workbuf);
2642 :
2643 15584 : WriteStr(AH, te->tag);
2644 15584 : WriteStr(AH, te->desc);
2645 15584 : WriteInt(AH, te->section);
2646 :
2647 15584 : if (te->defnLen)
2648 : {
2649 : /*
2650 : * defnLen should only be set for custom format's second call to
2651 : * WriteToc(), which rewrites the TOC in place to update data
2652 : * offsets. Instead of calling the defnDumper a second time
2653 : * (which could involve re-executing queries), just skip writing
2654 : * the entry. While regenerating the definition should
2655 : * theoretically produce the same result as before, it's expensive
2656 : * and feels risky.
2657 : *
2658 : * The custom format only calls WriteToc() a second time if
2659 : * fseeko() is usable (see _CloseArchive() in pg_backup_custom.c),
2660 : * so we can safely use it without checking. For other formats,
2661 : * we fail because one of our assumptions must no longer hold
2662 : * true.
2663 : *
2664 : * XXX This is a layering violation, but the alternative is an
2665 : * awkward and complicated callback infrastructure for this
2666 : * special case. This might be worth revisiting in the future.
2667 : */
2668 432 : if (AH->format != archCustom)
2669 0 : pg_fatal("unexpected TOC entry in WriteToc(): %d %s %s",
2670 : te->dumpId, te->desc, te->tag);
2671 :
2672 432 : if (fseeko(AH->FH, te->defnLen, SEEK_CUR) != 0)
2673 0 : pg_fatal("error during file seek: %m");
2674 : }
2675 15152 : else if (te->defnDumper)
2676 : {
2677 3104 : char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
2678 :
2679 3104 : te->defnLen = WriteStr(AH, defn);
2680 3104 : pg_free(defn);
2681 : }
2682 : else
2683 12048 : WriteStr(AH, te->defn);
2684 :
2685 15584 : WriteStr(AH, te->dropStmt);
2686 15584 : WriteStr(AH, te->copyStmt);
2687 15584 : WriteStr(AH, te->namespace);
2688 15584 : WriteStr(AH, te->tablespace);
2689 15584 : WriteStr(AH, te->tableam);
2690 15584 : WriteInt(AH, te->relkind);
2691 15584 : WriteStr(AH, te->owner);
2692 15584 : WriteStr(AH, "false");
2693 :
2694 : /* Dump list of dependencies */
2695 39942 : for (i = 0; i < te->nDeps; i++)
2696 : {
2697 24358 : sprintf(workbuf, "%d", te->dependencies[i]);
2698 24358 : WriteStr(AH, workbuf);
2699 : }
2700 15584 : WriteStr(AH, NULL); /* Terminate List */
2701 :
2702 15584 : if (AH->WriteExtraTocPtr)
2703 15584 : AH->WriteExtraTocPtr(AH, te);
2704 : }
2705 130 : }
2706 :
2707 : void
2708 116 : ReadToc(ArchiveHandle *AH)
2709 : {
2710 : int i;
2711 : char *tmp;
2712 : DumpId *deps;
2713 : int depIdx;
2714 : int depSize;
2715 : TocEntry *te;
2716 : bool is_supported;
2717 :
2718 116 : AH->tocCount = ReadInt(AH);
2719 116 : AH->maxDumpId = 0;
2720 :
2721 14786 : for (i = 0; i < AH->tocCount; i++)
2722 : {
2723 14670 : te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2724 14670 : te->dumpId = ReadInt(AH);
2725 :
2726 14670 : if (te->dumpId > AH->maxDumpId)
2727 5926 : AH->maxDumpId = te->dumpId;
2728 :
2729 : /* Sanity check */
2730 14670 : if (te->dumpId <= 0)
2731 0 : pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2732 : te->dumpId);
2733 :
2734 14670 : te->hadDumper = ReadInt(AH);
2735 :
2736 14670 : if (AH->version >= K_VERS_1_8)
2737 : {
2738 14670 : tmp = ReadStr(AH);
2739 14670 : sscanf(tmp, "%u", &te->catalogId.tableoid);
2740 14670 : free(tmp);
2741 : }
2742 : else
2743 0 : te->catalogId.tableoid = InvalidOid;
2744 14670 : tmp = ReadStr(AH);
2745 14670 : sscanf(tmp, "%u", &te->catalogId.oid);
2746 14670 : free(tmp);
2747 :
2748 14670 : te->tag = ReadStr(AH);
2749 14670 : te->desc = ReadStr(AH);
2750 :
2751 14670 : if (AH->version >= K_VERS_1_11)
2752 : {
2753 14670 : te->section = ReadInt(AH);
2754 : }
2755 : else
2756 : {
2757 : /*
2758 : * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2759 : * the entries into sections. This list need not cover entry
2760 : * types added later than 8.4.
2761 : */
2762 0 : if (strcmp(te->desc, "COMMENT") == 0 ||
2763 0 : strcmp(te->desc, "ACL") == 0 ||
2764 0 : strcmp(te->desc, "ACL LANGUAGE") == 0)
2765 0 : te->section = SECTION_NONE;
2766 0 : else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2767 0 : strcmp(te->desc, "BLOBS") == 0 ||
2768 0 : strcmp(te->desc, "BLOB COMMENTS") == 0)
2769 0 : te->section = SECTION_DATA;
2770 0 : else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2771 0 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2772 0 : strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2773 0 : strcmp(te->desc, "INDEX") == 0 ||
2774 0 : strcmp(te->desc, "RULE") == 0 ||
2775 0 : strcmp(te->desc, "TRIGGER") == 0)
2776 0 : te->section = SECTION_POST_DATA;
2777 : else
2778 0 : te->section = SECTION_PRE_DATA;
2779 : }
2780 :
2781 14670 : te->defn = ReadStr(AH);
2782 14670 : te->dropStmt = ReadStr(AH);
2783 :
2784 14670 : if (AH->version >= K_VERS_1_3)
2785 14670 : te->copyStmt = ReadStr(AH);
2786 :
2787 14670 : if (AH->version >= K_VERS_1_6)
2788 14670 : te->namespace = ReadStr(AH);
2789 :
2790 14670 : if (AH->version >= K_VERS_1_10)
2791 14670 : te->tablespace = ReadStr(AH);
2792 :
2793 14670 : if (AH->version >= K_VERS_1_14)
2794 14670 : te->tableam = ReadStr(AH);
2795 :
2796 14670 : if (AH->version >= K_VERS_1_16)
2797 14670 : te->relkind = ReadInt(AH);
2798 :
2799 14670 : te->owner = ReadStr(AH);
2800 14670 : is_supported = true;
2801 14670 : if (AH->version < K_VERS_1_9)
2802 0 : is_supported = false;
2803 : else
2804 : {
2805 14670 : tmp = ReadStr(AH);
2806 :
2807 14670 : if (strcmp(tmp, "true") == 0)
2808 0 : is_supported = false;
2809 :
2810 14670 : free(tmp);
2811 : }
2812 :
2813 14670 : if (!is_supported)
2814 0 : pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2815 :
2816 : /* Read TOC entry dependencies */
2817 14670 : if (AH->version >= K_VERS_1_5)
2818 : {
2819 14670 : depSize = 100;
2820 14670 : deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2821 14670 : depIdx = 0;
2822 : for (;;)
2823 : {
2824 37626 : tmp = ReadStr(AH);
2825 37626 : if (!tmp)
2826 14670 : break; /* end of list */
2827 22956 : if (depIdx >= depSize)
2828 : {
2829 0 : depSize *= 2;
2830 0 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2831 : }
2832 22956 : sscanf(tmp, "%d", &deps[depIdx]);
2833 22956 : free(tmp);
2834 22956 : depIdx++;
2835 : }
2836 :
2837 14670 : if (depIdx > 0) /* We have a non-null entry */
2838 : {
2839 11952 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2840 11952 : te->dependencies = deps;
2841 11952 : te->nDeps = depIdx;
2842 : }
2843 : else
2844 : {
2845 2718 : free(deps);
2846 2718 : te->dependencies = NULL;
2847 2718 : te->nDeps = 0;
2848 : }
2849 : }
2850 : else
2851 : {
2852 0 : te->dependencies = NULL;
2853 0 : te->nDeps = 0;
2854 : }
2855 14670 : te->dataLength = 0;
2856 :
2857 14670 : if (AH->ReadExtraTocPtr)
2858 14670 : AH->ReadExtraTocPtr(AH, te);
2859 :
2860 14670 : pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2861 : i, te->dumpId, te->desc, te->tag);
2862 :
2863 : /* link completed entry into TOC circular list */
2864 14670 : te->prev = AH->toc->prev;
2865 14670 : AH->toc->prev->next = te;
2866 14670 : AH->toc->prev = te;
2867 14670 : te->next = AH->toc;
2868 :
2869 : /* special processing immediately upon read for some items */
2870 14670 : if (strcmp(te->desc, "ENCODING") == 0)
2871 116 : processEncodingEntry(AH, te);
2872 14554 : else if (strcmp(te->desc, "STDSTRINGS") == 0)
2873 116 : processStdStringsEntry(AH, te);
2874 14438 : else if (strcmp(te->desc, "SEARCHPATH") == 0)
2875 116 : processSearchPathEntry(AH, te);
2876 : }
2877 116 : }
2878 :
2879 : static void
2880 116 : processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2881 : {
2882 : /* te->defn should have the form SET client_encoding = 'foo'; */
2883 116 : char *defn = pg_strdup(te->defn);
2884 : char *ptr1;
2885 116 : char *ptr2 = NULL;
2886 : int encoding;
2887 :
2888 116 : ptr1 = strchr(defn, '\'');
2889 116 : if (ptr1)
2890 116 : ptr2 = strchr(++ptr1, '\'');
2891 116 : if (ptr2)
2892 : {
2893 116 : *ptr2 = '\0';
2894 116 : encoding = pg_char_to_encoding(ptr1);
2895 116 : if (encoding < 0)
2896 0 : pg_fatal("unrecognized encoding \"%s\"",
2897 : ptr1);
2898 116 : AH->public.encoding = encoding;
2899 116 : setFmtEncoding(encoding);
2900 : }
2901 : else
2902 0 : pg_fatal("invalid ENCODING item: %s",
2903 : te->defn);
2904 :
2905 116 : free(defn);
2906 116 : }
2907 :
2908 : static void
2909 116 : processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2910 : {
2911 : /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2912 : char *ptr1;
2913 :
2914 116 : ptr1 = strchr(te->defn, '\'');
2915 116 : if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2916 116 : AH->public.std_strings = true;
2917 0 : else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2918 0 : AH->public.std_strings = false;
2919 : else
2920 0 : pg_fatal("invalid STDSTRINGS item: %s",
2921 : te->defn);
2922 116 : }
2923 :
2924 : static void
2925 116 : processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2926 : {
2927 : /*
2928 : * te->defn should contain a command to set search_path. We just copy it
2929 : * verbatim for use later.
2930 : */
2931 116 : AH->public.searchpath = pg_strdup(te->defn);
2932 116 : }
2933 :
2934 : static void
2935 0 : StrictNamesCheck(RestoreOptions *ropt)
2936 : {
2937 : const char *missing_name;
2938 :
2939 : Assert(ropt->strict_names);
2940 :
2941 0 : if (ropt->schemaNames.head != NULL)
2942 : {
2943 0 : missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2944 0 : if (missing_name != NULL)
2945 0 : pg_fatal("schema \"%s\" not found", missing_name);
2946 : }
2947 :
2948 0 : if (ropt->tableNames.head != NULL)
2949 : {
2950 0 : missing_name = simple_string_list_not_touched(&ropt->tableNames);
2951 0 : if (missing_name != NULL)
2952 0 : pg_fatal("table \"%s\" not found", missing_name);
2953 : }
2954 :
2955 0 : if (ropt->indexNames.head != NULL)
2956 : {
2957 0 : missing_name = simple_string_list_not_touched(&ropt->indexNames);
2958 0 : if (missing_name != NULL)
2959 0 : pg_fatal("index \"%s\" not found", missing_name);
2960 : }
2961 :
2962 0 : if (ropt->functionNames.head != NULL)
2963 : {
2964 0 : missing_name = simple_string_list_not_touched(&ropt->functionNames);
2965 0 : if (missing_name != NULL)
2966 0 : pg_fatal("function \"%s\" not found", missing_name);
2967 : }
2968 :
2969 0 : if (ropt->triggerNames.head != NULL)
2970 : {
2971 0 : missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2972 0 : if (missing_name != NULL)
2973 0 : pg_fatal("trigger \"%s\" not found", missing_name);
2974 : }
2975 0 : }
2976 :
2977 : /*
2978 : * Determine whether we want to restore this TOC entry.
2979 : *
2980 : * Returns 0 if entry should be skipped, or some combination of the
2981 : * REQ_SCHEMA, REQ_DATA, and REQ_STATS bits if we want to restore schema, data
2982 : * and/or statistics portions of this TOC entry, or REQ_SPECIAL if it's a
2983 : * special entry.
2984 : */
2985 : static int
2986 96960 : _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2987 : {
2988 96960 : int res = REQ_SCHEMA | REQ_DATA;
2989 96960 : RestoreOptions *ropt = AH->public.ropt;
2990 :
2991 : /*
2992 : * For binary upgrade mode, dump pg_largeobject_metadata and the
2993 : * associated pg_shdepend rows. This is faster to restore than the
2994 : * equivalent set of large object commands. We can only do this for
2995 : * upgrades from v12 and newer; in older versions, pg_largeobject_metadata
2996 : * was created WITH OIDS, so the OID column is hidden and won't be dumped.
2997 : */
2998 96960 : if (ropt->binary_upgrade && AH->public.remoteVersion >= 120000 &&
2999 7956 : strcmp(te->desc, "TABLE DATA") == 0 &&
3000 146 : (te->catalogId.oid == LargeObjectMetadataRelationId ||
3001 74 : te->catalogId.oid == SharedDependRelationId))
3002 144 : return REQ_DATA;
3003 :
3004 : /* These items are treated specially */
3005 96816 : if (strcmp(te->desc, "ENCODING") == 0 ||
3006 96328 : strcmp(te->desc, "STDSTRINGS") == 0 ||
3007 95840 : strcmp(te->desc, "SEARCHPATH") == 0)
3008 1464 : return REQ_SPECIAL;
3009 :
3010 95352 : if (strcmp(te->desc, "STATISTICS DATA") == 0)
3011 : {
3012 9952 : if (!ropt->dumpStatistics)
3013 0 : return 0;
3014 :
3015 9952 : res = REQ_STATS;
3016 : }
3017 :
3018 : /*
3019 : * DATABASE and DATABASE PROPERTIES also have a special rule: they are
3020 : * restored in createDB mode, and not restored otherwise, independently of
3021 : * all else.
3022 : */
3023 95352 : if (strcmp(te->desc, "DATABASE") == 0 ||
3024 95066 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
3025 : {
3026 422 : if (ropt->createDB)
3027 362 : return REQ_SCHEMA;
3028 : else
3029 60 : return 0;
3030 : }
3031 :
3032 : /*
3033 : * Process exclusions that affect certain classes of TOC entries.
3034 : */
3035 :
3036 : /* If it's an ACL, maybe ignore it */
3037 94930 : if (ropt->aclsSkip && _tocEntryIsACL(te))
3038 0 : return 0;
3039 :
3040 : /* If it's a comment, maybe ignore it */
3041 94930 : if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
3042 0 : return 0;
3043 :
3044 : /* If it's a policy, maybe ignore it */
3045 94930 : if (ropt->no_policies &&
3046 1478 : (strcmp(te->desc, "POLICY") == 0 ||
3047 1466 : strcmp(te->desc, "ROW SECURITY") == 0))
3048 14 : return 0;
3049 :
3050 : /*
3051 : * If it's a comment on a policy, a publication, or a subscription, maybe
3052 : * ignore it.
3053 : */
3054 94916 : if (strcmp(te->desc, "COMMENT") == 0)
3055 : {
3056 13768 : if (ropt->no_policies &&
3057 96 : strncmp(te->tag, "POLICY", strlen("POLICY")) == 0)
3058 2 : return 0;
3059 :
3060 13766 : if (ropt->no_publications &&
3061 0 : strncmp(te->tag, "PUBLICATION", strlen("PUBLICATION")) == 0)
3062 0 : return 0;
3063 :
3064 13766 : if (ropt->no_subscriptions &&
3065 96 : strncmp(te->tag, "SUBSCRIPTION", strlen("SUBSCRIPTION")) == 0)
3066 2 : return 0;
3067 : }
3068 :
3069 : /*
3070 : * If it's a publication or a table part of a publication, maybe ignore
3071 : * it.
3072 : */
3073 94912 : if (ropt->no_publications &&
3074 0 : (strcmp(te->desc, "PUBLICATION") == 0 ||
3075 0 : strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
3076 0 : strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
3077 0 : return 0;
3078 :
3079 : /* If it's a security label, maybe ignore it */
3080 94912 : if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
3081 0 : return 0;
3082 :
3083 : /*
3084 : * If it's a security label on a publication or a subscription, maybe
3085 : * ignore it.
3086 : */
3087 94912 : if (strcmp(te->desc, "SECURITY LABEL") == 0)
3088 : {
3089 18 : if (ropt->no_publications &&
3090 0 : strncmp(te->tag, "PUBLICATION", strlen("PUBLICATION")) == 0)
3091 0 : return 0;
3092 :
3093 18 : if (ropt->no_subscriptions &&
3094 0 : strncmp(te->tag, "SUBSCRIPTION", strlen("SUBSCRIPTION")) == 0)
3095 0 : return 0;
3096 : }
3097 :
3098 : /* If it's a subscription, maybe ignore it */
3099 94912 : if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
3100 6 : return 0;
3101 :
3102 : /* Ignore it if section is not to be dumped/restored */
3103 94906 : switch (curSection)
3104 : {
3105 60214 : case SECTION_PRE_DATA:
3106 60214 : if (!(ropt->dumpSections & DUMP_PRE_DATA))
3107 748 : return 0;
3108 59466 : break;
3109 17258 : case SECTION_DATA:
3110 17258 : if (!(ropt->dumpSections & DUMP_DATA))
3111 360 : return 0;
3112 16898 : break;
3113 17434 : case SECTION_POST_DATA:
3114 17434 : if (!(ropt->dumpSections & DUMP_POST_DATA))
3115 448 : return 0;
3116 16986 : break;
3117 0 : default:
3118 : /* shouldn't get here, really, but ignore it */
3119 0 : return 0;
3120 : }
3121 :
3122 : /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
3123 93350 : if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
3124 0 : return 0;
3125 :
3126 : /*
3127 : * Check options for selective dump/restore.
3128 : */
3129 93350 : if (strcmp(te->desc, "ACL") == 0 ||
3130 88992 : strcmp(te->desc, "COMMENT") == 0 ||
3131 75328 : strcmp(te->desc, "STATISTICS DATA") == 0 ||
3132 65652 : strcmp(te->desc, "SECURITY LABEL") == 0)
3133 : {
3134 : /* Database properties react to createDB, not selectivity options. */
3135 27716 : if (strncmp(te->tag, "DATABASE ", 9) == 0)
3136 : {
3137 194 : if (!ropt->createDB)
3138 42 : return 0;
3139 : }
3140 27522 : else if (ropt->schemaNames.head != NULL ||
3141 27522 : ropt->schemaExcludeNames.head != NULL ||
3142 27522 : ropt->selTypes)
3143 : {
3144 : /*
3145 : * In a selective dump/restore, we want to restore these dependent
3146 : * TOC entry types only if their parent object is being restored.
3147 : * Without selectivity options, we let through everything in the
3148 : * archive. Note there may be such entries with no parent, eg
3149 : * non-default ACLs for built-in objects. Also, we make
3150 : * per-column ACLs additionally depend on the table's ACL if any
3151 : * to ensure correct restore order, so those dependencies should
3152 : * be ignored in this check.
3153 : *
3154 : * This code depends on the parent having been marked already,
3155 : * which should be the case; if it isn't, perhaps due to
3156 : * SortTocFromFile rearrangement, skipping the dependent entry
3157 : * seems prudent anyway.
3158 : *
3159 : * Ideally we'd handle, eg, table CHECK constraints this way too.
3160 : * But it's hard to tell which of their dependencies is the one to
3161 : * consult.
3162 : */
3163 0 : bool dumpthis = false;
3164 :
3165 0 : for (int i = 0; i < te->nDeps; i++)
3166 : {
3167 0 : TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3168 :
3169 0 : if (!pte)
3170 0 : continue; /* probably shouldn't happen */
3171 0 : if (strcmp(pte->desc, "ACL") == 0)
3172 0 : continue; /* ignore dependency on another ACL */
3173 0 : if (pte->reqs == 0)
3174 0 : continue; /* this object isn't marked, so ignore it */
3175 : /* Found a parent to be dumped, so we want to dump this too */
3176 0 : dumpthis = true;
3177 0 : break;
3178 : }
3179 0 : if (!dumpthis)
3180 0 : return 0;
3181 : }
3182 : }
3183 : else
3184 : {
3185 : /* Apply selective-restore rules for standalone TOC entries. */
3186 65634 : if (ropt->schemaNames.head != NULL)
3187 : {
3188 : /* If no namespace is specified, it means all. */
3189 40 : if (!te->namespace)
3190 4 : return 0;
3191 36 : if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3192 28 : return 0;
3193 : }
3194 :
3195 65602 : if (ropt->schemaExcludeNames.head != NULL &&
3196 76 : te->namespace &&
3197 36 : simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3198 8 : return 0;
3199 :
3200 65594 : if (ropt->selTypes)
3201 : {
3202 156 : if (strcmp(te->desc, "TABLE") == 0 ||
3203 116 : strcmp(te->desc, "TABLE DATA") == 0 ||
3204 76 : strcmp(te->desc, "VIEW") == 0 ||
3205 76 : strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3206 76 : strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3207 76 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3208 76 : strcmp(te->desc, "SEQUENCE") == 0 ||
3209 70 : strcmp(te->desc, "SEQUENCE SET") == 0)
3210 : {
3211 92 : if (!ropt->selTable)
3212 60 : return 0;
3213 32 : if (ropt->tableNames.head != NULL &&
3214 32 : !simple_string_list_member(&ropt->tableNames, te->tag))
3215 28 : return 0;
3216 : }
3217 64 : else if (strcmp(te->desc, "INDEX") == 0)
3218 : {
3219 12 : if (!ropt->selIndex)
3220 8 : return 0;
3221 4 : if (ropt->indexNames.head != NULL &&
3222 4 : !simple_string_list_member(&ropt->indexNames, te->tag))
3223 2 : return 0;
3224 : }
3225 52 : else if (strcmp(te->desc, "FUNCTION") == 0 ||
3226 28 : strcmp(te->desc, "AGGREGATE") == 0 ||
3227 28 : strcmp(te->desc, "PROCEDURE") == 0)
3228 : {
3229 24 : if (!ropt->selFunction)
3230 8 : return 0;
3231 16 : if (ropt->functionNames.head != NULL &&
3232 16 : !simple_string_list_member(&ropt->functionNames, te->tag))
3233 12 : return 0;
3234 : }
3235 28 : else if (strcmp(te->desc, "TRIGGER") == 0)
3236 : {
3237 12 : if (!ropt->selTrigger)
3238 8 : return 0;
3239 4 : if (ropt->triggerNames.head != NULL &&
3240 4 : !simple_string_list_member(&ropt->triggerNames, te->tag))
3241 2 : return 0;
3242 : }
3243 : else
3244 16 : return 0;
3245 : }
3246 : }
3247 :
3248 :
3249 : /*
3250 : * Determine whether the TOC entry contains schema and/or data components,
3251 : * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3252 : * it's both schema and data. Otherwise it's probably schema-only, but
3253 : * there are exceptions.
3254 : */
3255 93124 : if (!te->hadDumper)
3256 : {
3257 : /*
3258 : * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3259 : * is considered a data entry. We don't need to check for BLOBS or
3260 : * old-style BLOB COMMENTS entries, because they will have hadDumper =
3261 : * true ... but we do need to check new-style BLOB ACLs, comments,
3262 : * etc.
3263 : */
3264 83982 : if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3265 83036 : strcmp(te->desc, "BLOB") == 0 ||
3266 83036 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3267 82846 : (strcmp(te->desc, "ACL") == 0 &&
3268 4358 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3269 82770 : (strcmp(te->desc, "COMMENT") == 0 &&
3270 13622 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3271 82646 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3272 18 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3273 1354 : res = res & REQ_DATA;
3274 : else
3275 82628 : res = res & ~REQ_DATA;
3276 : }
3277 :
3278 : /*
3279 : * If there's no definition command, there's no schema component. Treat
3280 : * "load via partition root" comments as not schema.
3281 : */
3282 93124 : if (!te->defn || !te->defn[0] ||
3283 77864 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
3284 15440 : res = res & ~REQ_SCHEMA;
3285 :
3286 : /*
3287 : * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3288 : * always ignore it.
3289 : */
3290 93124 : if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3291 0 : return 0;
3292 :
3293 : /* Mask it if we don't want data */
3294 93124 : if (!ropt->dumpData)
3295 : {
3296 : /*
3297 : * The sequence_data option overrides dumpData for SEQUENCE SET.
3298 : *
3299 : * In binary-upgrade mode, even with dumpData unset, we do not mask
3300 : * out large objects. (Only large object definitions, comments and
3301 : * other metadata should be generated in binary-upgrade mode, not the
3302 : * actual data, but that need not concern us here.)
3303 : */
3304 8264 : if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3305 8134 : !(ropt->binary_upgrade &&
3306 7326 : (strcmp(te->desc, "BLOB") == 0 ||
3307 7326 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3308 7326 : (strcmp(te->desc, "ACL") == 0 &&
3309 176 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3310 7326 : (strcmp(te->desc, "COMMENT") == 0 &&
3311 212 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3312 7310 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3313 10 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3314 8108 : res = res & (REQ_SCHEMA | REQ_STATS);
3315 : }
3316 :
3317 : /* Mask it if we don't want schema */
3318 93124 : if (!ropt->dumpSchema)
3319 844 : res = res & (REQ_DATA | REQ_STATS);
3320 :
3321 93124 : return res;
3322 : }
3323 :
3324 : /*
3325 : * Identify which pass we should restore this TOC entry in.
3326 : *
3327 : * See notes with the RestorePass typedef in pg_backup_archiver.h.
3328 : */
3329 : static RestorePass
3330 208828 : _tocEntryRestorePass(TocEntry *te)
3331 : {
3332 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3333 208828 : if (strcmp(te->desc, "ACL") == 0 ||
3334 198896 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3335 198896 : strcmp(te->desc, "DEFAULT ACL") == 0)
3336 10700 : return RESTORE_PASS_ACL;
3337 198128 : if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3338 197920 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3339 1628 : return RESTORE_PASS_POST_ACL;
3340 :
3341 : /*
3342 : * Comments and security labels need to be emitted in the same pass as
3343 : * their parent objects. ACLs haven't got comments and security labels,
3344 : * and neither do matview data objects, but event triggers do.
3345 : * (Fortunately, event triggers haven't got ACLs, or we'd need yet another
3346 : * weird special case.)
3347 : */
3348 196500 : if ((strcmp(te->desc, "COMMENT") == 0 ||
3349 168954 : strcmp(te->desc, "SECURITY LABEL") == 0) &&
3350 27554 : strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3351 0 : return RESTORE_PASS_POST_ACL;
3352 :
3353 : /*
3354 : * If statistics data is dependent on materialized view data, it must be
3355 : * deferred to RESTORE_PASS_POST_ACL. Those entries are already marked as
3356 : * SECTION_POST_DATA, and some other stats entries (e.g., index stats)
3357 : * will also be marked as SECTION_POST_DATA. Additionally, our lookahead
3358 : * code in fetchAttributeStats() assumes that we dump all statistics data
3359 : * entries in TOC order. To ensure this assumption holds, we move all
3360 : * statistics data entries in SECTION_POST_DATA to RESTORE_PASS_POST_ACL.
3361 : */
3362 196500 : if (strcmp(te->desc, "STATISTICS DATA") == 0 &&
3363 18658 : te->section == SECTION_POST_DATA)
3364 6746 : return RESTORE_PASS_POST_ACL;
3365 :
3366 : /* All else can be handled in the main pass. */
3367 189754 : return RESTORE_PASS_MAIN;
3368 : }
3369 :
3370 : /*
3371 : * Identify TOC entries that are ACLs.
3372 : *
3373 : * Note: it seems worth duplicating some code here to avoid a hard-wired
3374 : * assumption that these are exactly the same entries that we restore during
3375 : * the RESTORE_PASS_ACL phase.
3376 : */
3377 : static bool
3378 80700 : _tocEntryIsACL(TocEntry *te)
3379 : {
3380 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3381 80700 : if (strcmp(te->desc, "ACL") == 0 ||
3382 77250 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3383 77250 : strcmp(te->desc, "DEFAULT ACL") == 0)
3384 3706 : return true;
3385 76994 : return false;
3386 : }
3387 :
3388 : /*
3389 : * Issue SET commands for parameters that we want to have set the same way
3390 : * at all times during execution of a restore script.
3391 : */
3392 : static void
3393 584 : _doSetFixedOutputState(ArchiveHandle *AH)
3394 : {
3395 584 : RestoreOptions *ropt = AH->public.ropt;
3396 :
3397 : /*
3398 : * Disable timeouts to allow for slow commands, idle parallel workers, etc
3399 : */
3400 584 : ahprintf(AH, "SET statement_timeout = 0;\n");
3401 584 : ahprintf(AH, "SET lock_timeout = 0;\n");
3402 584 : ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3403 584 : ahprintf(AH, "SET transaction_timeout = 0;\n");
3404 :
3405 : /* Select the correct character set encoding */
3406 584 : ahprintf(AH, "SET client_encoding = '%s';\n",
3407 : pg_encoding_to_char(AH->public.encoding));
3408 :
3409 : /* Select the correct string literal syntax */
3410 584 : ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3411 584 : AH->public.std_strings ? "on" : "off");
3412 :
3413 : /* Select the role to be used during restore */
3414 584 : if (ropt && ropt->use_role)
3415 0 : ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3416 :
3417 : /* Select the dump-time search_path */
3418 584 : if (AH->public.searchpath)
3419 584 : ahprintf(AH, "%s", AH->public.searchpath);
3420 :
3421 : /* Make sure function checking is disabled */
3422 584 : ahprintf(AH, "SET check_function_bodies = false;\n");
3423 :
3424 : /* Ensure that all valid XML data will be accepted */
3425 584 : ahprintf(AH, "SET xmloption = content;\n");
3426 :
3427 : /* Avoid annoying notices etc */
3428 584 : ahprintf(AH, "SET client_min_messages = warning;\n");
3429 584 : if (!AH->public.std_strings)
3430 0 : ahprintf(AH, "SET escape_string_warning = off;\n");
3431 :
3432 : /* Adjust row-security state */
3433 584 : if (ropt && ropt->enable_row_security)
3434 0 : ahprintf(AH, "SET row_security = on;\n");
3435 : else
3436 584 : ahprintf(AH, "SET row_security = off;\n");
3437 :
3438 : /*
3439 : * In --transaction-size mode, we should always be in a transaction when
3440 : * we begin to restore objects.
3441 : */
3442 584 : if (ropt && ropt->txn_size > 0)
3443 : {
3444 168 : if (AH->connection)
3445 168 : StartTransaction(&AH->public);
3446 : else
3447 0 : ahprintf(AH, "\nBEGIN;\n");
3448 168 : AH->txnCount = 0;
3449 : }
3450 :
3451 584 : ahprintf(AH, "\n");
3452 584 : }
3453 :
3454 : /*
3455 : * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3456 : * for updating state if appropriate. If user is NULL or an empty string,
3457 : * the specification DEFAULT will be used.
3458 : */
3459 : static void
3460 2 : _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3461 : {
3462 2 : PQExpBuffer cmd = createPQExpBuffer();
3463 :
3464 2 : appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3465 :
3466 : /*
3467 : * SQL requires a string literal here. Might as well be correct.
3468 : */
3469 2 : if (user && *user)
3470 2 : appendStringLiteralAHX(cmd, user, AH);
3471 : else
3472 0 : appendPQExpBufferStr(cmd, "DEFAULT");
3473 2 : appendPQExpBufferChar(cmd, ';');
3474 :
3475 2 : if (RestoringToDB(AH))
3476 : {
3477 : PGresult *res;
3478 :
3479 0 : res = PQexec(AH->connection, cmd->data);
3480 :
3481 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3482 : /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3483 0 : pg_fatal("could not set session user to \"%s\": %s",
3484 : user, PQerrorMessage(AH->connection));
3485 :
3486 0 : PQclear(res);
3487 : }
3488 : else
3489 2 : ahprintf(AH, "%s\n\n", cmd->data);
3490 :
3491 2 : destroyPQExpBuffer(cmd);
3492 2 : }
3493 :
3494 :
3495 : /*
3496 : * Issue the commands to connect to the specified database.
3497 : *
3498 : * If we're currently restoring right into a database, this will
3499 : * actually establish a connection. Otherwise it puts a \connect into
3500 : * the script output.
3501 : */
3502 : static void
3503 184 : _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3504 : {
3505 184 : if (RestoringToDB(AH))
3506 114 : ReconnectToServer(AH, dbname);
3507 : else
3508 : {
3509 : PQExpBufferData connectbuf;
3510 70 : RestoreOptions *ropt = AH->public.ropt;
3511 :
3512 : /*
3513 : * We must temporarily exit restricted mode for \connect, etc.
3514 : * Anything added between this line and the following \restrict must
3515 : * be careful to avoid any possible meta-command injection vectors.
3516 : */
3517 70 : ahprintf(AH, "\\unrestrict %s\n", ropt->restrict_key);
3518 :
3519 70 : initPQExpBuffer(&connectbuf);
3520 70 : appendPsqlMetaConnect(&connectbuf, dbname);
3521 70 : ahprintf(AH, "%s", connectbuf.data);
3522 70 : termPQExpBuffer(&connectbuf);
3523 :
3524 70 : ahprintf(AH, "\\restrict %s\n\n", ropt->restrict_key);
3525 : }
3526 :
3527 : /*
3528 : * NOTE: currUser keeps track of what the imaginary session user in our
3529 : * script is. It's now effectively reset to the original userID.
3530 : */
3531 184 : free(AH->currUser);
3532 184 : AH->currUser = NULL;
3533 :
3534 : /* don't assume we still know the output schema, tablespace, etc either */
3535 184 : free(AH->currSchema);
3536 184 : AH->currSchema = NULL;
3537 :
3538 184 : free(AH->currTableAm);
3539 184 : AH->currTableAm = NULL;
3540 :
3541 184 : free(AH->currTablespace);
3542 184 : AH->currTablespace = NULL;
3543 :
3544 : /* re-establish fixed state */
3545 184 : _doSetFixedOutputState(AH);
3546 184 : }
3547 :
3548 : /*
3549 : * Become the specified user, and update state to avoid redundant commands
3550 : *
3551 : * NULL or empty argument is taken to mean restoring the session default
3552 : */
3553 : static void
3554 144 : _becomeUser(ArchiveHandle *AH, const char *user)
3555 : {
3556 144 : if (!user)
3557 0 : user = ""; /* avoid null pointers */
3558 :
3559 144 : if (AH->currUser && strcmp(AH->currUser, user) == 0)
3560 142 : return; /* no need to do anything */
3561 :
3562 2 : _doSetSessionAuth(AH, user);
3563 :
3564 : /*
3565 : * NOTE: currUser keeps track of what the imaginary session user in our
3566 : * script is
3567 : */
3568 2 : free(AH->currUser);
3569 2 : AH->currUser = pg_strdup(user);
3570 : }
3571 :
3572 : /*
3573 : * Become the owner of the given TOC entry object. If
3574 : * changes in ownership are not allowed, this doesn't do anything.
3575 : */
3576 : static void
3577 89034 : _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3578 : {
3579 89034 : RestoreOptions *ropt = AH->public.ropt;
3580 :
3581 89034 : if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3582 89034 : return;
3583 :
3584 0 : _becomeUser(AH, te->owner);
3585 : }
3586 :
3587 :
3588 : /*
3589 : * Issue the commands to select the specified schema as the current schema
3590 : * in the target database.
3591 : */
3592 : static void
3593 89182 : _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3594 : {
3595 : PQExpBuffer qry;
3596 :
3597 : /*
3598 : * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3599 : * that search_path rather than switching to entry-specific paths.
3600 : * Otherwise, it's an old archive that will not restore correctly unless
3601 : * we set the search_path as it's expecting.
3602 : */
3603 89182 : if (AH->public.searchpath)
3604 89182 : return;
3605 :
3606 0 : if (!schemaName || *schemaName == '\0' ||
3607 0 : (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3608 0 : return; /* no need to do anything */
3609 :
3610 0 : qry = createPQExpBuffer();
3611 :
3612 0 : appendPQExpBuffer(qry, "SET search_path = %s",
3613 : fmtId(schemaName));
3614 0 : if (strcmp(schemaName, "pg_catalog") != 0)
3615 0 : appendPQExpBufferStr(qry, ", pg_catalog");
3616 :
3617 0 : if (RestoringToDB(AH))
3618 : {
3619 : PGresult *res;
3620 :
3621 0 : res = PQexec(AH->connection, qry->data);
3622 :
3623 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3624 0 : warn_or_exit_horribly(AH,
3625 : "could not set \"search_path\" to \"%s\": %s",
3626 0 : schemaName, PQerrorMessage(AH->connection));
3627 :
3628 0 : PQclear(res);
3629 : }
3630 : else
3631 0 : ahprintf(AH, "%s;\n\n", qry->data);
3632 :
3633 0 : free(AH->currSchema);
3634 0 : AH->currSchema = pg_strdup(schemaName);
3635 :
3636 0 : destroyPQExpBuffer(qry);
3637 : }
3638 :
3639 : /*
3640 : * Issue the commands to select the specified tablespace as the current one
3641 : * in the target database.
3642 : */
3643 : static void
3644 80044 : _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3645 : {
3646 80044 : RestoreOptions *ropt = AH->public.ropt;
3647 : PQExpBuffer qry;
3648 : const char *want,
3649 : *have;
3650 :
3651 : /* do nothing in --no-tablespaces mode */
3652 80044 : if (ropt->noTablespace)
3653 0 : return;
3654 :
3655 80044 : have = AH->currTablespace;
3656 80044 : want = tablespace;
3657 :
3658 : /* no need to do anything for non-tablespace object */
3659 80044 : if (!want)
3660 64012 : return;
3661 :
3662 16032 : if (have && strcmp(want, have) == 0)
3663 15570 : return; /* no need to do anything */
3664 :
3665 462 : qry = createPQExpBuffer();
3666 :
3667 462 : if (strcmp(want, "") == 0)
3668 : {
3669 : /* We want the tablespace to be the database's default */
3670 346 : appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3671 : }
3672 : else
3673 : {
3674 : /* We want an explicit tablespace */
3675 116 : appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3676 : }
3677 :
3678 462 : if (RestoringToDB(AH))
3679 : {
3680 : PGresult *res;
3681 :
3682 58 : res = PQexec(AH->connection, qry->data);
3683 :
3684 58 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3685 0 : warn_or_exit_horribly(AH,
3686 : "could not set \"default_tablespace\" to %s: %s",
3687 0 : fmtId(want), PQerrorMessage(AH->connection));
3688 :
3689 58 : PQclear(res);
3690 : }
3691 : else
3692 404 : ahprintf(AH, "%s;\n\n", qry->data);
3693 :
3694 462 : free(AH->currTablespace);
3695 462 : AH->currTablespace = pg_strdup(want);
3696 :
3697 462 : destroyPQExpBuffer(qry);
3698 : }
3699 :
3700 : /*
3701 : * Set the proper default_table_access_method value for the table.
3702 : */
3703 : static void
3704 78920 : _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3705 : {
3706 78920 : RestoreOptions *ropt = AH->public.ropt;
3707 : PQExpBuffer cmd;
3708 : const char *want,
3709 : *have;
3710 :
3711 : /* do nothing in --no-table-access-method mode */
3712 78920 : if (ropt->noTableAm)
3713 738 : return;
3714 :
3715 78182 : have = AH->currTableAm;
3716 78182 : want = tableam;
3717 :
3718 78182 : if (!want)
3719 68248 : return;
3720 :
3721 9934 : if (have && strcmp(want, have) == 0)
3722 9314 : return;
3723 :
3724 620 : cmd = createPQExpBuffer();
3725 620 : appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3726 :
3727 620 : if (RestoringToDB(AH))
3728 : {
3729 : PGresult *res;
3730 :
3731 46 : res = PQexec(AH->connection, cmd->data);
3732 :
3733 46 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3734 0 : warn_or_exit_horribly(AH,
3735 : "could not set \"default_table_access_method\": %s",
3736 0 : PQerrorMessage(AH->connection));
3737 :
3738 46 : PQclear(res);
3739 : }
3740 : else
3741 574 : ahprintf(AH, "%s\n\n", cmd->data);
3742 :
3743 620 : destroyPQExpBuffer(cmd);
3744 :
3745 620 : free(AH->currTableAm);
3746 620 : AH->currTableAm = pg_strdup(want);
3747 : }
3748 :
3749 : /*
3750 : * Set the proper default table access method for a table without storage.
3751 : * Currently, this is required only for partitioned tables with a table AM.
3752 : */
3753 : static void
3754 1124 : _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
3755 : {
3756 1124 : RestoreOptions *ropt = AH->public.ropt;
3757 1124 : const char *tableam = te->tableam;
3758 : PQExpBuffer cmd;
3759 :
3760 : /* do nothing in --no-table-access-method mode */
3761 1124 : if (ropt->noTableAm)
3762 8 : return;
3763 :
3764 1116 : if (!tableam)
3765 1056 : return;
3766 :
3767 : Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3768 :
3769 60 : cmd = createPQExpBuffer();
3770 :
3771 60 : appendPQExpBufferStr(cmd, "ALTER TABLE ");
3772 60 : appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3773 60 : appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3774 : fmtId(tableam));
3775 :
3776 60 : if (RestoringToDB(AH))
3777 : {
3778 : PGresult *res;
3779 :
3780 0 : res = PQexec(AH->connection, cmd->data);
3781 :
3782 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3783 0 : warn_or_exit_horribly(AH,
3784 : "could not alter table access method: %s",
3785 0 : PQerrorMessage(AH->connection));
3786 0 : PQclear(res);
3787 : }
3788 : else
3789 60 : ahprintf(AH, "%s\n\n", cmd->data);
3790 :
3791 60 : destroyPQExpBuffer(cmd);
3792 : }
3793 :
3794 : /*
3795 : * Extract an object description for a TOC entry, and append it to buf.
3796 : *
3797 : * This is used for ALTER ... OWNER TO.
3798 : *
3799 : * If the object type has no owner, do nothing.
3800 : */
3801 : static void
3802 40412 : _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
3803 : {
3804 40412 : const char *type = te->desc;
3805 :
3806 : /* objects that don't require special decoration */
3807 40412 : if (strcmp(type, "COLLATION") == 0 ||
3808 35370 : strcmp(type, "CONVERSION") == 0 ||
3809 34542 : strcmp(type, "DOMAIN") == 0 ||
3810 34242 : strcmp(type, "FOREIGN TABLE") == 0 ||
3811 34178 : strcmp(type, "MATERIALIZED VIEW") == 0 ||
3812 33546 : strcmp(type, "SEQUENCE") == 0 ||
3813 33090 : strcmp(type, "STATISTICS") == 0 ||
3814 32872 : strcmp(type, "TABLE") == 0 ||
3815 22458 : strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3816 22128 : strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3817 21848 : strcmp(type, "TYPE") == 0 ||
3818 20622 : strcmp(type, "VIEW") == 0 ||
3819 : /* non-schema-specified objects */
3820 19560 : strcmp(type, "DATABASE") == 0 ||
3821 19440 : strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3822 19382 : strcmp(type, "SCHEMA") == 0 ||
3823 19024 : strcmp(type, "EVENT TRIGGER") == 0 ||
3824 18956 : strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3825 18872 : strcmp(type, "SERVER") == 0 ||
3826 18784 : strcmp(type, "PUBLICATION") == 0 ||
3827 18326 : strcmp(type, "SUBSCRIPTION") == 0)
3828 : {
3829 22252 : appendPQExpBuffer(buf, "%s ", type);
3830 22252 : if (te->namespace && *te->namespace)
3831 20852 : appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3832 22252 : appendPQExpBufferStr(buf, fmtId(te->tag));
3833 : }
3834 : /* LOs just have a name, but it's numeric so must not use fmtId */
3835 18160 : else if (strcmp(type, "BLOB") == 0)
3836 : {
3837 0 : appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3838 : }
3839 :
3840 : /*
3841 : * These object types require additional decoration. Fortunately, the
3842 : * information needed is exactly what's in the DROP command.
3843 : */
3844 18160 : else if (strcmp(type, "AGGREGATE") == 0 ||
3845 17628 : strcmp(type, "FUNCTION") == 0 ||
3846 14508 : strcmp(type, "OPERATOR") == 0 ||
3847 9544 : strcmp(type, "OPERATOR CLASS") == 0 ||
3848 8272 : strcmp(type, "OPERATOR FAMILY") == 0 ||
3849 7206 : strcmp(type, "PROCEDURE") == 0)
3850 : {
3851 : /* Chop "DROP " off the front and make a modifiable copy */
3852 11134 : char *first = pg_strdup(te->dropStmt + 5);
3853 : char *last;
3854 :
3855 : /* point to last character in string */
3856 11134 : last = first + strlen(first) - 1;
3857 :
3858 : /* Strip off any ';' or '\n' at the end */
3859 33402 : while (last >= first && (*last == '\n' || *last == ';'))
3860 22268 : last--;
3861 11134 : *(last + 1) = '\0';
3862 :
3863 11134 : appendPQExpBufferStr(buf, first);
3864 :
3865 11134 : free(first);
3866 11134 : return;
3867 : }
3868 : /* these object types don't have separate owners */
3869 7026 : else if (strcmp(type, "CAST") == 0 ||
3870 7026 : strcmp(type, "CHECK CONSTRAINT") == 0 ||
3871 6926 : strcmp(type, "CONSTRAINT") == 0 ||
3872 4014 : strcmp(type, "DATABASE PROPERTIES") == 0 ||
3873 3998 : strcmp(type, "DEFAULT") == 0 ||
3874 3678 : strcmp(type, "FK CONSTRAINT") == 0 ||
3875 3368 : strcmp(type, "INDEX") == 0 ||
3876 1666 : strcmp(type, "RULE") == 0 ||
3877 1244 : strcmp(type, "TRIGGER") == 0 ||
3878 484 : strcmp(type, "ROW SECURITY") == 0 ||
3879 484 : strcmp(type, "POLICY") == 0 ||
3880 58 : strcmp(type, "USER MAPPING") == 0)
3881 : {
3882 : /* do nothing */
3883 : }
3884 : else
3885 0 : pg_fatal("don't know how to set owner for object type \"%s\"", type);
3886 : }
3887 :
3888 : /*
3889 : * Emit the SQL commands to create the object represented by a TOC entry
3890 : *
3891 : * This now also includes issuing an ALTER OWNER command to restore the
3892 : * object's ownership, if wanted. But note that the object's permissions
3893 : * will remain at default, until the matching ACL TOC entry is restored.
3894 : */
3895 : static void
3896 80044 : _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx)
3897 : {
3898 80044 : RestoreOptions *ropt = AH->public.ropt;
3899 :
3900 : /*
3901 : * Select owner, schema, tablespace and default AM as necessary. The
3902 : * default access method for partitioned tables is handled after
3903 : * generating the object definition, as it requires an ALTER command
3904 : * rather than SET.
3905 : */
3906 80044 : _becomeOwner(AH, te);
3907 80044 : _selectOutputSchema(AH, te->namespace);
3908 80044 : _selectTablespace(AH, te->tablespace);
3909 80044 : if (te->relkind != RELKIND_PARTITIONED_TABLE)
3910 78920 : _selectTableAccessMethod(AH, te->tableam);
3911 :
3912 : /* Emit header comment for item */
3913 80044 : if (!AH->noTocComments)
3914 : {
3915 : char *sanitized_name;
3916 : char *sanitized_schema;
3917 : char *sanitized_owner;
3918 :
3919 73050 : ahprintf(AH, "--\n");
3920 73050 : if (AH->public.verbose)
3921 : {
3922 2258 : ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3923 : te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3924 2258 : if (te->nDeps > 0)
3925 : {
3926 : int i;
3927 :
3928 1488 : ahprintf(AH, "-- Dependencies:");
3929 4068 : for (i = 0; i < te->nDeps; i++)
3930 2580 : ahprintf(AH, " %d", te->dependencies[i]);
3931 1488 : ahprintf(AH, "\n");
3932 : }
3933 : }
3934 :
3935 73050 : sanitized_name = sanitize_line(te->tag, false);
3936 73050 : sanitized_schema = sanitize_line(te->namespace, true);
3937 73050 : sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3938 :
3939 73050 : ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3940 : pfx, sanitized_name, te->desc, sanitized_schema,
3941 : sanitized_owner);
3942 :
3943 73050 : free(sanitized_name);
3944 73050 : free(sanitized_schema);
3945 73050 : free(sanitized_owner);
3946 :
3947 73050 : if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3948 : {
3949 : char *sanitized_tablespace;
3950 :
3951 204 : sanitized_tablespace = sanitize_line(te->tablespace, false);
3952 204 : ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3953 204 : free(sanitized_tablespace);
3954 : }
3955 73050 : ahprintf(AH, "\n");
3956 :
3957 73050 : if (AH->PrintExtraTocPtr != NULL)
3958 6268 : AH->PrintExtraTocPtr(AH, te);
3959 73050 : ahprintf(AH, "--\n\n");
3960 : }
3961 :
3962 : /*
3963 : * Actually print the definition. Normally we can just print the defn
3964 : * string if any, but we have four special cases:
3965 : *
3966 : * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3967 : * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3968 : * "public" that is a comment. We have to do this when --no-owner mode is
3969 : * selected. This is ugly, but I see no other good way ...
3970 : *
3971 : * 2. BLOB METADATA entries need special processing since their defn
3972 : * strings are just lists of OIDs, not complete SQL commands.
3973 : *
3974 : * 3. ACL LARGE OBJECTS entries need special processing because they
3975 : * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3976 : * apply to each large object listed in the associated BLOB METADATA.
3977 : *
3978 : * 4. Entries with a defnDumper need to call it to generate the
3979 : * definition. This is primarily intended to provide a way to save memory
3980 : * for objects that would otherwise need a lot of it (e.g., statistics
3981 : * data).
3982 : */
3983 80044 : if (ropt->noOwner &&
3984 778 : strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3985 : {
3986 4 : ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3987 : }
3988 80040 : else if (strcmp(te->desc, "BLOB METADATA") == 0)
3989 : {
3990 148 : IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3991 : }
3992 79892 : else if (strcmp(te->desc, "ACL") == 0 &&
3993 3450 : strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3994 : {
3995 0 : IssueACLPerBlob(AH, te);
3996 : }
3997 79892 : else if (te->defnLen && AH->format != archTar)
3998 : {
3999 : /*
4000 : * If defnLen is set, the defnDumper has already been called for this
4001 : * TOC entry. We don't normally expect a defnDumper to be called for
4002 : * a TOC entry a second time in _printTocEntry(), but there's an
4003 : * exception. The tar format first calls WriteToc(), which scans the
4004 : * entire TOC, and then it later calls RestoreArchive() to generate
4005 : * restore.sql, which scans the TOC again. There doesn't appear to be
4006 : * a good way to prevent a second defnDumper call in this case without
4007 : * storing the definition in memory, which defeats the purpose. This
4008 : * second defnDumper invocation should generate the same output as the
4009 : * first, but even if it doesn't, the worst-case scenario is that
4010 : * restore.sql might have different statistics data than the archive.
4011 : *
4012 : * In all other cases, encountering a TOC entry a second time in
4013 : * _printTocEntry() is unexpected, so we fail because one of our
4014 : * assumptions must no longer hold true.
4015 : *
4016 : * XXX This is a layering violation, but the alternative is an awkward
4017 : * and complicated callback infrastructure for this special case. This
4018 : * might be worth revisiting in the future.
4019 : */
4020 0 : pg_fatal("unexpected TOC entry in _printTocEntry(): %d %s %s",
4021 : te->dumpId, te->desc, te->tag);
4022 : }
4023 79892 : else if (te->defnDumper)
4024 : {
4025 3332 : char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
4026 :
4027 3332 : te->defnLen = ahprintf(AH, "%s\n\n", defn);
4028 3332 : pg_free(defn);
4029 : }
4030 76560 : else if (te->defn && strlen(te->defn) > 0)
4031 : {
4032 68238 : ahprintf(AH, "%s\n\n", te->defn);
4033 :
4034 : /*
4035 : * If the defn string contains multiple SQL commands, txn_size mode
4036 : * should count it as N actions not one. But rather than build a full
4037 : * SQL parser, approximate this by counting semicolons. One case
4038 : * where that tends to be badly fooled is function definitions, so
4039 : * ignore them. (restore_toc_entry will count one action anyway.)
4040 : */
4041 68238 : if (ropt->txn_size > 0 &&
4042 6720 : strcmp(te->desc, "FUNCTION") != 0 &&
4043 6182 : strcmp(te->desc, "PROCEDURE") != 0)
4044 : {
4045 6158 : const char *p = te->defn;
4046 6158 : int nsemis = 0;
4047 :
4048 27346 : while ((p = strchr(p, ';')) != NULL)
4049 : {
4050 21188 : nsemis++;
4051 21188 : p++;
4052 : }
4053 6158 : if (nsemis > 1)
4054 2978 : AH->txnCount += nsemis - 1;
4055 : }
4056 : }
4057 :
4058 : /*
4059 : * If we aren't using SET SESSION AUTH to determine ownership, we must
4060 : * instead issue an ALTER OWNER command. Schema "public" is special; when
4061 : * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
4062 : * when using SET SESSION for all other objects. We assume that anything
4063 : * without a DROP command is not a separately ownable object.
4064 : */
4065 80044 : if (!ropt->noOwner &&
4066 79266 : (!ropt->use_setsessauth ||
4067 0 : (strcmp(te->desc, "SCHEMA") == 0 &&
4068 0 : strncmp(te->defn, "--", 2) == 0)) &&
4069 79266 : te->owner && strlen(te->owner) > 0 &&
4070 72160 : te->dropStmt && strlen(te->dropStmt) > 0)
4071 : {
4072 40556 : if (strcmp(te->desc, "BLOB METADATA") == 0)
4073 : {
4074 : /* BLOB METADATA needs special code to handle multiple LOs */
4075 144 : char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
4076 :
4077 144 : IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
4078 144 : pg_free(cmdEnd);
4079 : }
4080 : else
4081 : {
4082 : /* For all other cases, we can use _getObjectDescription */
4083 : PQExpBufferData temp;
4084 :
4085 40412 : initPQExpBuffer(&temp);
4086 40412 : _getObjectDescription(&temp, te);
4087 :
4088 : /*
4089 : * If _getObjectDescription() didn't fill the buffer, then there
4090 : * is no owner.
4091 : */
4092 40412 : if (temp.data[0])
4093 33386 : ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
4094 33386 : temp.data, fmtId(te->owner));
4095 40412 : termPQExpBuffer(&temp);
4096 : }
4097 : }
4098 :
4099 : /*
4100 : * Select a partitioned table's default AM, once the table definition has
4101 : * been generated.
4102 : */
4103 80044 : if (te->relkind == RELKIND_PARTITIONED_TABLE)
4104 1124 : _printTableAccessMethodNoStorage(AH, te);
4105 :
4106 : /*
4107 : * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
4108 : * commands, so we can no longer assume we know the current auth setting.
4109 : */
4110 80044 : if (_tocEntryIsACL(te))
4111 : {
4112 3706 : free(AH->currUser);
4113 3706 : AH->currUser = NULL;
4114 : }
4115 80044 : }
4116 :
4117 : /*
4118 : * Write the file header for a custom-format archive
4119 : */
4120 : void
4121 112 : WriteHead(ArchiveHandle *AH)
4122 : {
4123 : struct tm crtm;
4124 :
4125 112 : AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
4126 112 : AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
4127 112 : AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
4128 112 : AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
4129 112 : AH->WriteBytePtr(AH, AH->intSize);
4130 112 : AH->WriteBytePtr(AH, AH->offSize);
4131 112 : AH->WriteBytePtr(AH, AH->format);
4132 112 : AH->WriteBytePtr(AH, AH->compression_spec.algorithm);
4133 112 : crtm = *localtime(&AH->createDate);
4134 112 : WriteInt(AH, crtm.tm_sec);
4135 112 : WriteInt(AH, crtm.tm_min);
4136 112 : WriteInt(AH, crtm.tm_hour);
4137 112 : WriteInt(AH, crtm.tm_mday);
4138 112 : WriteInt(AH, crtm.tm_mon);
4139 112 : WriteInt(AH, crtm.tm_year);
4140 112 : WriteInt(AH, crtm.tm_isdst);
4141 112 : WriteStr(AH, PQdb(AH->connection));
4142 112 : WriteStr(AH, AH->public.remoteVersionStr);
4143 112 : WriteStr(AH, PG_VERSION);
4144 112 : }
4145 :
4146 : void
4147 116 : ReadHead(ArchiveHandle *AH)
4148 : {
4149 : char *errmsg;
4150 : char vmaj,
4151 : vmin,
4152 : vrev;
4153 : int fmt;
4154 :
4155 : /*
4156 : * If we haven't already read the header, do so.
4157 : *
4158 : * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
4159 : * way to unify the cases?
4160 : */
4161 116 : if (!AH->readHeader)
4162 : {
4163 : char tmpMag[7];
4164 :
4165 116 : AH->ReadBufPtr(AH, tmpMag, 5);
4166 :
4167 116 : if (strncmp(tmpMag, "PGDMP", 5) != 0)
4168 0 : pg_fatal("did not find magic string in file header");
4169 : }
4170 :
4171 116 : vmaj = AH->ReadBytePtr(AH);
4172 116 : vmin = AH->ReadBytePtr(AH);
4173 :
4174 116 : if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4175 116 : vrev = AH->ReadBytePtr(AH);
4176 : else
4177 0 : vrev = 0;
4178 :
4179 116 : AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
4180 :
4181 116 : if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4182 0 : pg_fatal("unsupported version (%d.%d) in file header",
4183 : vmaj, vmin);
4184 :
4185 116 : AH->intSize = AH->ReadBytePtr(AH);
4186 116 : if (AH->intSize > 32)
4187 0 : pg_fatal("sanity check on integer size (%zu) failed", AH->intSize);
4188 :
4189 116 : if (AH->intSize > sizeof(int))
4190 0 : pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4191 :
4192 116 : if (AH->version >= K_VERS_1_7)
4193 116 : AH->offSize = AH->ReadBytePtr(AH);
4194 : else
4195 0 : AH->offSize = AH->intSize;
4196 :
4197 116 : fmt = AH->ReadBytePtr(AH);
4198 :
4199 116 : if (AH->format != fmt)
4200 0 : pg_fatal("expected format (%d) differs from format found in file (%d)",
4201 : AH->format, fmt);
4202 :
4203 116 : if (AH->version >= K_VERS_1_15)
4204 116 : AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
4205 0 : else if (AH->version >= K_VERS_1_2)
4206 : {
4207 : /* Guess the compression method based on the level */
4208 0 : if (AH->version < K_VERS_1_4)
4209 0 : AH->compression_spec.level = AH->ReadBytePtr(AH);
4210 : else
4211 0 : AH->compression_spec.level = ReadInt(AH);
4212 :
4213 0 : if (AH->compression_spec.level != 0)
4214 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4215 : }
4216 : else
4217 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4218 :
4219 116 : errmsg = supports_compression(AH->compression_spec);
4220 116 : if (errmsg)
4221 : {
4222 0 : pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4223 : errmsg);
4224 0 : pg_free(errmsg);
4225 : }
4226 :
4227 116 : if (AH->version >= K_VERS_1_4)
4228 : {
4229 : struct tm crtm;
4230 :
4231 116 : crtm.tm_sec = ReadInt(AH);
4232 116 : crtm.tm_min = ReadInt(AH);
4233 116 : crtm.tm_hour = ReadInt(AH);
4234 116 : crtm.tm_mday = ReadInt(AH);
4235 116 : crtm.tm_mon = ReadInt(AH);
4236 116 : crtm.tm_year = ReadInt(AH);
4237 116 : crtm.tm_isdst = ReadInt(AH);
4238 :
4239 : /*
4240 : * Newer versions of glibc have mktime() report failure if tm_isdst is
4241 : * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4242 : * TZ=UTC. This is problematic when restoring an archive under a
4243 : * different timezone setting. If we get a failure, try again with
4244 : * tm_isdst set to -1 ("don't know").
4245 : *
4246 : * XXX with or without this hack, we reconstruct createDate
4247 : * incorrectly when the prevailing timezone is different from
4248 : * pg_dump's. Next time we bump the archive version, we should flush
4249 : * this representation and store a plain seconds-since-the-Epoch
4250 : * timestamp instead.
4251 : */
4252 116 : AH->createDate = mktime(&crtm);
4253 116 : if (AH->createDate == (time_t) -1)
4254 : {
4255 0 : crtm.tm_isdst = -1;
4256 0 : AH->createDate = mktime(&crtm);
4257 0 : if (AH->createDate == (time_t) -1)
4258 0 : pg_log_warning("invalid creation date in header");
4259 : }
4260 : }
4261 :
4262 116 : if (AH->version >= K_VERS_1_4)
4263 : {
4264 116 : AH->archdbname = ReadStr(AH);
4265 : }
4266 :
4267 116 : if (AH->version >= K_VERS_1_10)
4268 : {
4269 116 : AH->archiveRemoteVersion = ReadStr(AH);
4270 116 : AH->archiveDumpVersion = ReadStr(AH);
4271 : }
4272 116 : }
4273 :
4274 :
4275 : /*
4276 : * checkSeek
4277 : * check to see if ftell/fseek can be performed.
4278 : */
4279 : bool
4280 188 : checkSeek(FILE *fp)
4281 : {
4282 : pgoff_t tpos;
4283 :
4284 : /* Check that ftello works on this file */
4285 188 : tpos = ftello(fp);
4286 188 : if (tpos < 0)
4287 2 : return false;
4288 :
4289 : /*
4290 : * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4291 : * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4292 : * successful no-op even on files that are otherwise unseekable.
4293 : */
4294 186 : if (fseeko(fp, tpos, SEEK_SET) != 0)
4295 0 : return false;
4296 :
4297 186 : return true;
4298 : }
4299 :
4300 :
4301 : /*
4302 : * dumpTimestamp
4303 : */
4304 : static void
4305 156 : dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4306 : {
4307 : char buf[64];
4308 :
4309 156 : if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4310 156 : ahprintf(AH, "-- %s %s\n\n", msg, buf);
4311 156 : }
4312 :
4313 : /*
4314 : * Main engine for parallel restore.
4315 : *
4316 : * Parallel restore is done in three phases. In this first phase,
4317 : * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4318 : * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4319 : * PRE_DATA items other than ACLs.) Entries we can't process now are
4320 : * added to the pending_list for later phases to deal with.
4321 : */
4322 : static void
4323 8 : restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
4324 : {
4325 : bool skipped_some;
4326 : TocEntry *next_work_item;
4327 :
4328 8 : pg_log_debug("entering restore_toc_entries_prefork");
4329 :
4330 : /* Adjust dependency information */
4331 8 : fix_dependencies(AH);
4332 :
4333 : /*
4334 : * Do all the early stuff in a single connection in the parent. There's no
4335 : * great point in running it in parallel, in fact it will actually run
4336 : * faster in a single connection because we avoid all the connection and
4337 : * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4338 : * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4339 : * not risk trying to process them out-of-order.
4340 : *
4341 : * Stuff that we can't do immediately gets added to the pending_list.
4342 : * Note: we don't yet filter out entries that aren't going to be restored.
4343 : * They might participate in dependency chains connecting entries that
4344 : * should be restored, so we treat them as live until we actually process
4345 : * them.
4346 : *
4347 : * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4348 : * before DATA items, and all DATA items before POST_DATA items. That is
4349 : * not certain to be true in older archives, though, and in any case use
4350 : * of a list file would destroy that ordering (cf. SortTocFromFile). So
4351 : * this loop cannot assume that it holds.
4352 : */
4353 8 : AH->restorePass = RESTORE_PASS_MAIN;
4354 8 : skipped_some = false;
4355 200 : for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4356 : {
4357 192 : bool do_now = true;
4358 :
4359 192 : if (next_work_item->section != SECTION_PRE_DATA)
4360 : {
4361 : /* DATA and POST_DATA items are just ignored for now */
4362 92 : if (next_work_item->section == SECTION_DATA ||
4363 60 : next_work_item->section == SECTION_POST_DATA)
4364 : {
4365 92 : do_now = false;
4366 92 : skipped_some = true;
4367 : }
4368 : else
4369 : {
4370 : /*
4371 : * SECTION_NONE items, such as comments, can be processed now
4372 : * if we are still in the PRE_DATA part of the archive. Once
4373 : * we've skipped any items, we have to consider whether the
4374 : * comment's dependencies are satisfied, so skip it for now.
4375 : */
4376 0 : if (skipped_some)
4377 0 : do_now = false;
4378 : }
4379 : }
4380 :
4381 : /*
4382 : * Also skip items that need to be forced into later passes. We need
4383 : * not set skipped_some in this case, since by assumption no main-pass
4384 : * items could depend on these.
4385 : */
4386 192 : if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4387 0 : do_now = false;
4388 :
4389 192 : if (do_now)
4390 : {
4391 : /* OK, restore the item and update its dependencies */
4392 100 : pg_log_info("processing item %d %s %s",
4393 : next_work_item->dumpId,
4394 : next_work_item->desc, next_work_item->tag);
4395 :
4396 100 : (void) restore_toc_entry(AH, next_work_item, false);
4397 :
4398 : /* Reduce dependencies, but don't move anything to ready_heap */
4399 100 : reduce_dependencies(AH, next_work_item, NULL);
4400 : }
4401 : else
4402 : {
4403 : /* Nope, so add it to pending_list */
4404 92 : pending_list_append(pending_list, next_work_item);
4405 : }
4406 : }
4407 :
4408 : /*
4409 : * In --transaction-size mode, we must commit the open transaction before
4410 : * dropping the database connection. This also ensures that child workers
4411 : * can see the objects we've created so far.
4412 : */
4413 8 : if (AH->public.ropt->txn_size > 0)
4414 0 : CommitTransaction(&AH->public);
4415 :
4416 : /*
4417 : * Now close parent connection in prep for parallel steps. We do this
4418 : * mainly to ensure that we don't exceed the specified number of parallel
4419 : * connections.
4420 : */
4421 8 : DisconnectDatabase(&AH->public);
4422 :
4423 : /* blow away any transient state from the old connection */
4424 8 : free(AH->currUser);
4425 8 : AH->currUser = NULL;
4426 8 : free(AH->currSchema);
4427 8 : AH->currSchema = NULL;
4428 8 : free(AH->currTablespace);
4429 8 : AH->currTablespace = NULL;
4430 8 : free(AH->currTableAm);
4431 8 : AH->currTableAm = NULL;
4432 8 : }
4433 :
4434 : /*
4435 : * Main engine for parallel restore.
4436 : *
4437 : * Parallel restore is done in three phases. In this second phase,
4438 : * we process entries by dispatching them to parallel worker children
4439 : * (processes on Unix, threads on Windows), each of which connects
4440 : * separately to the database. Inter-entry dependencies are respected,
4441 : * and so is the RestorePass multi-pass structure. When we can no longer
4442 : * make any entries ready to process, we exit. Normally, there will be
4443 : * nothing left to do; but if there is, the third phase will mop up.
4444 : */
4445 : static void
4446 8 : restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4447 : TocEntry *pending_list)
4448 : {
4449 : binaryheap *ready_heap;
4450 : TocEntry *next_work_item;
4451 :
4452 8 : pg_log_debug("entering restore_toc_entries_parallel");
4453 :
4454 : /* Set up ready_heap with enough room for all known TocEntrys */
4455 8 : ready_heap = binaryheap_allocate(AH->tocCount,
4456 : TocEntrySizeCompareBinaryheap,
4457 : NULL);
4458 :
4459 : /*
4460 : * The pending_list contains all items that we need to restore. Move all
4461 : * items that are available to process immediately into the ready_heap.
4462 : * After this setup, the pending list is everything that needs to be done
4463 : * but is blocked by one or more dependencies, while the ready heap
4464 : * contains items that have no remaining dependencies and are OK to
4465 : * process in the current restore pass.
4466 : */
4467 8 : AH->restorePass = RESTORE_PASS_MAIN;
4468 8 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4469 :
4470 : /*
4471 : * main parent loop
4472 : *
4473 : * Keep going until there is no worker still running AND there is no work
4474 : * left to be done. Note invariant: at top of loop, there should always
4475 : * be at least one worker available to dispatch a job to.
4476 : */
4477 8 : pg_log_info("entering main parallel loop");
4478 :
4479 : for (;;)
4480 : {
4481 : /* Look for an item ready to be dispatched to a worker */
4482 138 : next_work_item = pop_next_work_item(ready_heap, pstate);
4483 138 : if (next_work_item != NULL)
4484 : {
4485 : /* If not to be restored, don't waste time launching a worker */
4486 92 : if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
4487 : {
4488 0 : pg_log_info("skipping item %d %s %s",
4489 : next_work_item->dumpId,
4490 : next_work_item->desc, next_work_item->tag);
4491 : /* Update its dependencies as though we'd completed it */
4492 0 : reduce_dependencies(AH, next_work_item, ready_heap);
4493 : /* Loop around to see if anything else can be dispatched */
4494 0 : continue;
4495 : }
4496 :
4497 92 : pg_log_info("launching item %d %s %s",
4498 : next_work_item->dumpId,
4499 : next_work_item->desc, next_work_item->tag);
4500 :
4501 : /* Dispatch to some worker */
4502 92 : DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4503 : mark_restore_job_done, ready_heap);
4504 : }
4505 46 : else if (IsEveryWorkerIdle(pstate))
4506 : {
4507 : /*
4508 : * Nothing is ready and no worker is running, so we're done with
4509 : * the current pass or maybe with the whole process.
4510 : */
4511 24 : if (AH->restorePass == RESTORE_PASS_LAST)
4512 8 : break; /* No more parallel processing is possible */
4513 :
4514 : /* Advance to next restore pass */
4515 16 : AH->restorePass++;
4516 : /* That probably allows some stuff to be made ready */
4517 16 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4518 : /* Loop around to see if anything's now ready */
4519 16 : continue;
4520 : }
4521 : else
4522 : {
4523 : /*
4524 : * We have nothing ready, but at least one child is working, so
4525 : * wait for some subjob to finish.
4526 : */
4527 : }
4528 :
4529 : /*
4530 : * Before dispatching another job, check to see if anything has
4531 : * finished. We should check every time through the loop so as to
4532 : * reduce dependencies as soon as possible. If we were unable to
4533 : * dispatch any job this time through, wait until some worker finishes
4534 : * (and, hopefully, unblocks some pending item). If we did dispatch
4535 : * something, continue as soon as there's at least one idle worker.
4536 : * Note that in either case, there's guaranteed to be at least one
4537 : * idle worker when we return to the top of the loop. This ensures we
4538 : * won't block inside DispatchJobForTocEntry, which would be
4539 : * undesirable: we'd rather postpone dispatching until we see what's
4540 : * been unblocked by finished jobs.
4541 : */
4542 114 : WaitForWorkers(AH, pstate,
4543 : next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4544 : }
4545 :
4546 : /* There should now be nothing in ready_heap. */
4547 : Assert(binaryheap_empty(ready_heap));
4548 :
4549 8 : binaryheap_free(ready_heap);
4550 :
4551 8 : pg_log_info("finished main parallel loop");
4552 8 : }
4553 :
4554 : /*
4555 : * Main engine for parallel restore.
4556 : *
4557 : * Parallel restore is done in three phases. In this third phase,
4558 : * we mop up any remaining TOC entries by processing them serially.
4559 : * This phase normally should have nothing to do, but if we've somehow
4560 : * gotten stuck due to circular dependencies or some such, this provides
4561 : * at least some chance of completing the restore successfully.
4562 : */
4563 : static void
4564 8 : restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4565 : {
4566 8 : RestoreOptions *ropt = AH->public.ropt;
4567 : TocEntry *te;
4568 :
4569 8 : pg_log_debug("entering restore_toc_entries_postfork");
4570 :
4571 : /*
4572 : * Now reconnect the single parent connection.
4573 : */
4574 8 : ConnectDatabaseAhx((Archive *) AH, &ropt->cparams, true);
4575 :
4576 : /* re-establish fixed state */
4577 8 : _doSetFixedOutputState(AH);
4578 :
4579 : /*
4580 : * Make sure there is no work left due to, say, circular dependencies, or
4581 : * some other pathological condition. If so, do it in the single parent
4582 : * connection. We don't sweat about RestorePass ordering; it's likely we
4583 : * already violated that.
4584 : */
4585 8 : for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4586 : {
4587 0 : pg_log_info("processing missed item %d %s %s",
4588 : te->dumpId, te->desc, te->tag);
4589 0 : (void) restore_toc_entry(AH, te, false);
4590 : }
4591 8 : }
4592 :
4593 : /*
4594 : * Check if te1 has an exclusive lock requirement for an item that te2 also
4595 : * requires, whether or not te2's requirement is for an exclusive lock.
4596 : */
4597 : static bool
4598 312 : has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4599 : {
4600 : int j,
4601 : k;
4602 :
4603 724 : for (j = 0; j < te1->nLockDeps; j++)
4604 : {
4605 1622 : for (k = 0; k < te2->nDeps; k++)
4606 : {
4607 1210 : if (te1->lockDeps[j] == te2->dependencies[k])
4608 8 : return true;
4609 : }
4610 : }
4611 304 : return false;
4612 : }
4613 :
4614 :
4615 : /*
4616 : * Initialize the header of the pending-items list.
4617 : *
4618 : * This is a circular list with a dummy TocEntry as header, just like the
4619 : * main TOC list; but we use separate list links so that an entry can be in
4620 : * the main TOC list as well as in the pending list.
4621 : */
4622 : static void
4623 8 : pending_list_header_init(TocEntry *l)
4624 : {
4625 8 : l->pending_prev = l->pending_next = l;
4626 8 : }
4627 :
4628 : /* Append te to the end of the pending-list headed by l */
4629 : static void
4630 92 : pending_list_append(TocEntry *l, TocEntry *te)
4631 : {
4632 92 : te->pending_prev = l->pending_prev;
4633 92 : l->pending_prev->pending_next = te;
4634 92 : l->pending_prev = te;
4635 92 : te->pending_next = l;
4636 92 : }
4637 :
4638 : /* Remove te from the pending-list */
4639 : static void
4640 92 : pending_list_remove(TocEntry *te)
4641 : {
4642 92 : te->pending_prev->pending_next = te->pending_next;
4643 92 : te->pending_next->pending_prev = te->pending_prev;
4644 92 : te->pending_prev = NULL;
4645 92 : te->pending_next = NULL;
4646 92 : }
4647 :
4648 :
4649 : /* qsort comparator for sorting TocEntries by dataLength */
4650 : static int
4651 788 : TocEntrySizeCompareQsort(const void *p1, const void *p2)
4652 : {
4653 788 : const TocEntry *te1 = *(const TocEntry *const *) p1;
4654 788 : const TocEntry *te2 = *(const TocEntry *const *) p2;
4655 :
4656 : /* Sort by decreasing dataLength */
4657 788 : if (te1->dataLength > te2->dataLength)
4658 108 : return -1;
4659 680 : if (te1->dataLength < te2->dataLength)
4660 196 : return 1;
4661 :
4662 : /* For equal dataLengths, sort by dumpId, just to be stable */
4663 484 : if (te1->dumpId < te2->dumpId)
4664 208 : return -1;
4665 276 : if (te1->dumpId > te2->dumpId)
4666 252 : return 1;
4667 :
4668 24 : return 0;
4669 : }
4670 :
4671 : /* binaryheap comparator for sorting TocEntries by dataLength */
4672 : static int
4673 308 : TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4674 : {
4675 : /* return opposite of qsort comparator for max-heap */
4676 308 : return -TocEntrySizeCompareQsort(&p1, &p2);
4677 : }
4678 :
4679 :
4680 : /*
4681 : * Move all immediately-ready items from pending_list to ready_heap.
4682 : *
4683 : * Items are considered ready if they have no remaining dependencies and
4684 : * they belong in the current restore pass. (See also reduce_dependencies,
4685 : * which applies the same logic one-at-a-time.)
4686 : */
4687 : static void
4688 24 : move_to_ready_heap(TocEntry *pending_list,
4689 : binaryheap *ready_heap,
4690 : RestorePass pass)
4691 : {
4692 : TocEntry *te;
4693 : TocEntry *next_te;
4694 :
4695 116 : for (te = pending_list->pending_next; te != pending_list; te = next_te)
4696 : {
4697 : /* must save list link before possibly removing te from list */
4698 92 : next_te = te->pending_next;
4699 :
4700 132 : if (te->depCount == 0 &&
4701 40 : _tocEntryRestorePass(te) == pass)
4702 : {
4703 : /* Remove it from pending_list ... */
4704 40 : pending_list_remove(te);
4705 : /* ... and add to ready_heap */
4706 40 : binaryheap_add(ready_heap, te);
4707 : }
4708 : }
4709 24 : }
4710 :
4711 : /*
4712 : * Find the next work item (if any) that is capable of being run now,
4713 : * and remove it from the ready_heap.
4714 : *
4715 : * Returns the item, or NULL if nothing is runnable.
4716 : *
4717 : * To qualify, the item must have no remaining dependencies
4718 : * and no requirements for locks that are incompatible with
4719 : * items currently running. Items in the ready_heap are known to have
4720 : * no remaining dependencies, but we have to check for lock conflicts.
4721 : */
4722 : static TocEntry *
4723 138 : pop_next_work_item(binaryheap *ready_heap,
4724 : ParallelState *pstate)
4725 : {
4726 : /*
4727 : * Search the ready_heap until we find a suitable item. Note that we do a
4728 : * sequential scan through the heap nodes, so even though we will first
4729 : * try to choose the highest-priority item, we might end up picking
4730 : * something with a much lower priority. However, we expect that we will
4731 : * typically be able to pick one of the first few items, which should
4732 : * usually have a relatively high priority.
4733 : */
4734 146 : for (int i = 0; i < binaryheap_size(ready_heap); i++)
4735 : {
4736 100 : TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4737 100 : bool conflicts = false;
4738 :
4739 : /*
4740 : * Check to see if the item would need exclusive lock on something
4741 : * that a currently running item also needs lock on, or vice versa. If
4742 : * so, we don't want to schedule them together.
4743 : */
4744 388 : for (int k = 0; k < pstate->numWorkers; k++)
4745 : {
4746 296 : TocEntry *running_te = pstate->te[k];
4747 :
4748 296 : if (running_te == NULL)
4749 136 : continue;
4750 312 : if (has_lock_conflicts(te, running_te) ||
4751 152 : has_lock_conflicts(running_te, te))
4752 : {
4753 8 : conflicts = true;
4754 8 : break;
4755 : }
4756 : }
4757 :
4758 100 : if (conflicts)
4759 8 : continue;
4760 :
4761 : /* passed all tests, so this item can run */
4762 92 : binaryheap_remove_node(ready_heap, i);
4763 92 : return te;
4764 : }
4765 :
4766 46 : pg_log_debug("no item ready");
4767 46 : return NULL;
4768 : }
4769 :
4770 :
4771 : /*
4772 : * Restore a single TOC item in parallel with others
4773 : *
4774 : * this is run in the worker, i.e. in a thread (Windows) or a separate process
4775 : * (everything else). A worker process executes several such work items during
4776 : * a parallel backup or restore. Once we terminate here and report back that
4777 : * our work is finished, the leader process will assign us a new work item.
4778 : */
4779 : int
4780 92 : parallel_restore(ArchiveHandle *AH, TocEntry *te)
4781 : {
4782 : int status;
4783 :
4784 : Assert(AH->connection != NULL);
4785 :
4786 : /* Count only errors associated with this TOC entry */
4787 92 : AH->public.n_errors = 0;
4788 :
4789 : /* Restore the TOC item */
4790 92 : status = restore_toc_entry(AH, te, true);
4791 :
4792 92 : return status;
4793 : }
4794 :
4795 :
4796 : /*
4797 : * Callback function that's invoked in the leader process after a step has
4798 : * been parallel restored.
4799 : *
4800 : * Update status and reduce the dependency count of any dependent items.
4801 : */
4802 : static void
4803 92 : mark_restore_job_done(ArchiveHandle *AH,
4804 : TocEntry *te,
4805 : int status,
4806 : void *callback_data)
4807 : {
4808 92 : binaryheap *ready_heap = (binaryheap *) callback_data;
4809 :
4810 92 : pg_log_info("finished item %d %s %s",
4811 : te->dumpId, te->desc, te->tag);
4812 :
4813 92 : if (status == WORKER_CREATE_DONE)
4814 0 : mark_create_done(AH, te);
4815 92 : else if (status == WORKER_INHIBIT_DATA)
4816 : {
4817 0 : inhibit_data_for_failed_table(AH, te);
4818 0 : AH->public.n_errors++;
4819 : }
4820 92 : else if (status == WORKER_IGNORED_ERRORS)
4821 0 : AH->public.n_errors++;
4822 92 : else if (status != 0)
4823 0 : pg_fatal("worker process failed: exit code %d",
4824 : status);
4825 :
4826 92 : reduce_dependencies(AH, te, ready_heap);
4827 92 : }
4828 :
4829 :
4830 : /*
4831 : * Process the dependency information into a form useful for parallel restore.
4832 : *
4833 : * This function takes care of fixing up some missing or badly designed
4834 : * dependencies, and then prepares subsidiary data structures that will be
4835 : * used in the main parallel-restore logic, including:
4836 : * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4837 : * 2. We set up depCount fields that are the number of as-yet-unprocessed
4838 : * dependencies for each TOC entry.
4839 : *
4840 : * We also identify locking dependencies so that we can avoid trying to
4841 : * schedule conflicting items at the same time.
4842 : */
4843 : static void
4844 8 : fix_dependencies(ArchiveHandle *AH)
4845 : {
4846 : TocEntry *te;
4847 : int i;
4848 :
4849 : /*
4850 : * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4851 : * items are marked as not being in any parallel-processing list.
4852 : */
4853 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4854 : {
4855 192 : te->depCount = te->nDeps;
4856 192 : te->revDeps = NULL;
4857 192 : te->nRevDeps = 0;
4858 192 : te->pending_prev = NULL;
4859 192 : te->pending_next = NULL;
4860 : }
4861 :
4862 : /*
4863 : * POST_DATA items that are shown as depending on a table need to be
4864 : * re-pointed to depend on that table's data, instead. This ensures they
4865 : * won't get scheduled until the data has been loaded.
4866 : */
4867 8 : repoint_table_dependencies(AH);
4868 :
4869 : /*
4870 : * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4871 : * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4872 : * one BLOB COMMENTS in such files.)
4873 : */
4874 8 : if (AH->version < K_VERS_1_11)
4875 : {
4876 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4877 : {
4878 0 : if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4879 : {
4880 : TocEntry *te2;
4881 :
4882 0 : for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4883 : {
4884 0 : if (strcmp(te2->desc, "BLOBS") == 0)
4885 : {
4886 0 : te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4887 0 : te->dependencies[0] = te2->dumpId;
4888 0 : te->nDeps++;
4889 0 : te->depCount++;
4890 0 : break;
4891 : }
4892 : }
4893 0 : break;
4894 : }
4895 : }
4896 : }
4897 :
4898 : /*
4899 : * At this point we start to build the revDeps reverse-dependency arrays,
4900 : * so all changes of dependencies must be complete.
4901 : */
4902 :
4903 : /*
4904 : * Count the incoming dependencies for each item. Also, it is possible
4905 : * that the dependencies list items that are not in the archive at all
4906 : * (that should not happen in 9.2 and later, but is highly likely in older
4907 : * archives). Subtract such items from the depCounts.
4908 : */
4909 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4910 : {
4911 576 : for (i = 0; i < te->nDeps; i++)
4912 : {
4913 384 : DumpId depid = te->dependencies[i];
4914 :
4915 384 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4916 384 : AH->tocsByDumpId[depid]->nRevDeps++;
4917 : else
4918 0 : te->depCount--;
4919 : }
4920 : }
4921 :
4922 : /*
4923 : * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4924 : * it as a counter below.
4925 : */
4926 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4927 : {
4928 192 : if (te->nRevDeps > 0)
4929 104 : te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4930 192 : te->nRevDeps = 0;
4931 : }
4932 :
4933 : /*
4934 : * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4935 : * better agree with the loops above.
4936 : */
4937 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4938 : {
4939 576 : for (i = 0; i < te->nDeps; i++)
4940 : {
4941 384 : DumpId depid = te->dependencies[i];
4942 :
4943 384 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4944 : {
4945 384 : TocEntry *otherte = AH->tocsByDumpId[depid];
4946 :
4947 384 : otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4948 : }
4949 : }
4950 : }
4951 :
4952 : /*
4953 : * Lastly, work out the locking dependencies.
4954 : */
4955 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4956 : {
4957 192 : te->lockDeps = NULL;
4958 192 : te->nLockDeps = 0;
4959 192 : identify_locking_dependencies(AH, te);
4960 : }
4961 8 : }
4962 :
4963 : /*
4964 : * Change dependencies on table items to depend on table data items instead,
4965 : * but only in POST_DATA items.
4966 : *
4967 : * Also, for any item having such dependency(s), set its dataLength to the
4968 : * largest dataLength of the table data items it depends on. This ensures
4969 : * that parallel restore will prioritize larger jobs (index builds, FK
4970 : * constraint checks, etc) over smaller ones, avoiding situations where we
4971 : * end a restore with only one active job working on a large table.
4972 : */
4973 : static void
4974 8 : repoint_table_dependencies(ArchiveHandle *AH)
4975 : {
4976 : TocEntry *te;
4977 : int i;
4978 : DumpId olddep;
4979 :
4980 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4981 : {
4982 192 : if (te->section != SECTION_POST_DATA)
4983 132 : continue;
4984 320 : for (i = 0; i < te->nDeps; i++)
4985 : {
4986 260 : olddep = te->dependencies[i];
4987 260 : if (olddep <= AH->maxDumpId &&
4988 260 : AH->tableDataId[olddep] != 0)
4989 : {
4990 124 : DumpId tabledataid = AH->tableDataId[olddep];
4991 124 : TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4992 :
4993 124 : te->dependencies[i] = tabledataid;
4994 124 : te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4995 124 : pg_log_debug("transferring dependency %d -> %d to %d",
4996 : te->dumpId, olddep, tabledataid);
4997 : }
4998 : }
4999 : }
5000 8 : }
5001 :
5002 : /*
5003 : * Identify which objects we'll need exclusive lock on in order to restore
5004 : * the given TOC entry (*other* than the one identified by the TOC entry
5005 : * itself). Record their dump IDs in the entry's lockDeps[] array.
5006 : */
5007 : static void
5008 192 : identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
5009 : {
5010 : DumpId *lockids;
5011 : int nlockids;
5012 : int i;
5013 :
5014 : /*
5015 : * We only care about this for POST_DATA items. PRE_DATA items are not
5016 : * run in parallel, and DATA items are all independent by assumption.
5017 : */
5018 192 : if (te->section != SECTION_POST_DATA)
5019 132 : return;
5020 :
5021 : /* Quick exit if no dependencies at all */
5022 60 : if (te->nDeps == 0)
5023 0 : return;
5024 :
5025 : /*
5026 : * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
5027 : * and hence require exclusive lock. However, we know that CREATE INDEX
5028 : * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
5029 : * category too ... but today is not that day.)
5030 : */
5031 60 : if (strcmp(te->desc, "INDEX") == 0)
5032 0 : return;
5033 :
5034 : /*
5035 : * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
5036 : * item listed among its dependencies. Originally all of these would have
5037 : * been TABLE items, but repoint_table_dependencies would have repointed
5038 : * them to the TABLE DATA items if those are present (which they might not
5039 : * be, eg in a schema-only dump). Note that all of the entries we are
5040 : * processing here are POST_DATA; otherwise there might be a significant
5041 : * difference between a dependency on a table and a dependency on its
5042 : * data, so that closer analysis would be needed here.
5043 : */
5044 60 : lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
5045 60 : nlockids = 0;
5046 320 : for (i = 0; i < te->nDeps; i++)
5047 : {
5048 260 : DumpId depid = te->dependencies[i];
5049 :
5050 260 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
5051 260 : ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
5052 136 : strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
5053 164 : lockids[nlockids++] = depid;
5054 : }
5055 :
5056 60 : if (nlockids == 0)
5057 : {
5058 0 : free(lockids);
5059 0 : return;
5060 : }
5061 :
5062 60 : te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
5063 60 : te->nLockDeps = nlockids;
5064 : }
5065 :
5066 : /*
5067 : * Remove the specified TOC entry from the depCounts of items that depend on
5068 : * it, thereby possibly making them ready-to-run. Any pending item that
5069 : * becomes ready should be moved to the ready_heap, if that's provided.
5070 : */
5071 : static void
5072 192 : reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
5073 : binaryheap *ready_heap)
5074 : {
5075 : int i;
5076 :
5077 192 : pg_log_debug("reducing dependencies for %d", te->dumpId);
5078 :
5079 576 : for (i = 0; i < te->nRevDeps; i++)
5080 : {
5081 384 : TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
5082 :
5083 : Assert(otherte->depCount > 0);
5084 384 : otherte->depCount--;
5085 :
5086 : /*
5087 : * It's ready if it has no remaining dependencies, and it belongs in
5088 : * the current restore pass, and it is currently a member of the
5089 : * pending list (that check is needed to prevent double restore in
5090 : * some cases where a list-file forces out-of-order restoring).
5091 : * However, if ready_heap == NULL then caller doesn't want any list
5092 : * memberships changed.
5093 : */
5094 384 : if (otherte->depCount == 0 &&
5095 148 : _tocEntryRestorePass(otherte) == AH->restorePass &&
5096 148 : otherte->pending_prev != NULL &&
5097 : ready_heap != NULL)
5098 : {
5099 : /* Remove it from pending list ... */
5100 52 : pending_list_remove(otherte);
5101 : /* ... and add to ready_heap */
5102 52 : binaryheap_add(ready_heap, otherte);
5103 : }
5104 : }
5105 192 : }
5106 :
5107 : /*
5108 : * Set the created flag on the DATA member corresponding to the given
5109 : * TABLE member
5110 : */
5111 : static void
5112 10498 : mark_create_done(ArchiveHandle *AH, TocEntry *te)
5113 : {
5114 10498 : if (AH->tableDataId[te->dumpId] != 0)
5115 : {
5116 7866 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5117 :
5118 7866 : ted->created = true;
5119 : }
5120 10498 : }
5121 :
5122 : /*
5123 : * Mark the DATA member corresponding to the given TABLE member
5124 : * as not wanted
5125 : */
5126 : static void
5127 0 : inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
5128 : {
5129 0 : pg_log_info("table \"%s\" could not be created, will not restore its data",
5130 : te->tag);
5131 :
5132 0 : if (AH->tableDataId[te->dumpId] != 0)
5133 : {
5134 0 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5135 :
5136 0 : ted->reqs = 0;
5137 : }
5138 0 : }
5139 :
5140 : /*
5141 : * Clone and de-clone routines used in parallel restoration.
5142 : *
5143 : * Enough of the structure is cloned to ensure that there is no
5144 : * conflict between different threads each with their own clone.
5145 : */
5146 : ArchiveHandle *
5147 52 : CloneArchive(ArchiveHandle *AH)
5148 : {
5149 : ArchiveHandle *clone;
5150 :
5151 : /* Make a "flat" copy */
5152 52 : clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
5153 52 : memcpy(clone, AH, sizeof(ArchiveHandle));
5154 :
5155 : /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
5156 52 : clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
5157 52 : memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
5158 :
5159 : /* Handle format-independent fields */
5160 52 : memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
5161 :
5162 : /* The clone will have its own connection, so disregard connection state */
5163 52 : clone->connection = NULL;
5164 52 : clone->connCancel = NULL;
5165 52 : clone->currUser = NULL;
5166 52 : clone->currSchema = NULL;
5167 52 : clone->currTableAm = NULL;
5168 52 : clone->currTablespace = NULL;
5169 :
5170 : /* savedPassword must be local in case we change it while connecting */
5171 52 : if (clone->savedPassword)
5172 0 : clone->savedPassword = pg_strdup(clone->savedPassword);
5173 :
5174 : /* clone has its own error count, too */
5175 52 : clone->public.n_errors = 0;
5176 :
5177 : /* clones should not share lo_buf */
5178 52 : clone->lo_buf = NULL;
5179 :
5180 : /*
5181 : * Clone connections disregard --transaction-size; they must commit after
5182 : * each command so that the results are immediately visible to other
5183 : * workers.
5184 : */
5185 52 : clone->public.ropt->txn_size = 0;
5186 :
5187 : /*
5188 : * Connect our new clone object to the database, using the same connection
5189 : * parameters used for the original connection.
5190 : */
5191 52 : ConnectDatabaseAhx((Archive *) clone, &clone->public.ropt->cparams, true);
5192 :
5193 : /* re-establish fixed state */
5194 52 : if (AH->mode == archModeRead)
5195 20 : _doSetFixedOutputState(clone);
5196 : /* in write case, setupDumpWorker will fix up connection state */
5197 :
5198 : /* Let the format-specific code have a chance too */
5199 52 : clone->ClonePtr(clone);
5200 :
5201 : Assert(clone->connection != NULL);
5202 52 : return clone;
5203 : }
5204 :
5205 : /*
5206 : * Release clone-local storage.
5207 : *
5208 : * Note: we assume any clone-local connection was already closed.
5209 : */
5210 : void
5211 52 : DeCloneArchive(ArchiveHandle *AH)
5212 : {
5213 : /* Should not have an open database connection */
5214 : Assert(AH->connection == NULL);
5215 :
5216 : /* Clear format-specific state */
5217 52 : AH->DeClonePtr(AH);
5218 :
5219 : /* Clear state allocated by CloneArchive */
5220 52 : if (AH->sqlparse.curCmd)
5221 6 : destroyPQExpBuffer(AH->sqlparse.curCmd);
5222 :
5223 : /* Clear any connection-local state */
5224 52 : free(AH->currUser);
5225 52 : free(AH->currSchema);
5226 52 : free(AH->currTablespace);
5227 52 : free(AH->currTableAm);
5228 52 : free(AH->savedPassword);
5229 :
5230 52 : free(AH);
5231 52 : }
|