Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_archiver.c
4 : *
5 : * Private implementation of the archiver routines.
6 : *
7 : * See the headers to pg_restore for more details.
8 : *
9 : * Copyright (c) 2000, Philip Warner
10 : * Rights are granted to use this software in any way so long
11 : * as this notice is not removed.
12 : *
13 : * The author is not responsible for loss or damages that may
14 : * result from its use.
15 : *
16 : *
17 : * IDENTIFICATION
18 : * src/bin/pg_dump/pg_backup_archiver.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres_fe.h"
23 :
24 : #include <ctype.h>
25 : #include <fcntl.h>
26 : #include <unistd.h>
27 : #include <sys/stat.h>
28 : #include <sys/wait.h>
29 : #ifdef WIN32
30 : #include <io.h>
31 : #endif
32 :
33 : #include "catalog/pg_class_d.h"
34 : #include "catalog/pg_largeobject_metadata_d.h"
35 : #include "catalog/pg_shdepend_d.h"
36 : #include "common/string.h"
37 : #include "compress_io.h"
38 : #include "dumputils.h"
39 : #include "fe_utils/string_utils.h"
40 : #include "lib/binaryheap.h"
41 : #include "lib/stringinfo.h"
42 : #include "libpq/libpq-fs.h"
43 : #include "parallel.h"
44 : #include "pg_backup_archiver.h"
45 : #include "pg_backup_db.h"
46 : #include "pg_backup_utils.h"
47 :
48 : #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
49 : #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
50 :
51 : #define TOC_PREFIX_NONE ""
52 : #define TOC_PREFIX_DATA "Data for "
53 : #define TOC_PREFIX_STATS "Statistics for "
54 :
55 : static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
56 : const pg_compress_specification compression_spec,
57 : bool dosync, ArchiveMode mode,
58 : SetupWorkerPtrType setupWorkerPtr,
59 : DataDirSyncMethod sync_method);
60 : static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
61 : static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
62 : static char *sanitize_line(const char *str, bool want_hyphen);
63 : static void _doSetFixedOutputState(ArchiveHandle *AH);
64 : static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
65 : static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
66 : static void _becomeUser(ArchiveHandle *AH, const char *user);
67 : static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
68 : static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
69 : static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
70 : static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
71 : static void _printTableAccessMethodNoStorage(ArchiveHandle *AH,
72 : TocEntry *te);
73 : static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
74 : static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
75 : static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
76 : static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
77 : static RestorePass _tocEntryRestorePass(TocEntry *te);
78 : static bool _tocEntryIsACL(TocEntry *te);
79 : static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
80 : static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
81 : static bool is_load_via_partition_root(TocEntry *te);
82 : static void buildTocEntryArrays(ArchiveHandle *AH);
83 : static void _moveBefore(TocEntry *pos, TocEntry *te);
84 : static int _discoverArchiveFormat(ArchiveHandle *AH);
85 :
86 : static int RestoringToDB(ArchiveHandle *AH);
87 : static void dump_lo_buf(ArchiveHandle *AH);
88 : static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
89 : static void SetOutput(ArchiveHandle *AH, const char *filename,
90 : const pg_compress_specification compression_spec);
91 : static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
92 : static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
93 :
94 : static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
95 : static void restore_toc_entries_prefork(ArchiveHandle *AH,
96 : TocEntry *pending_list);
97 : static void restore_toc_entries_parallel(ArchiveHandle *AH,
98 : ParallelState *pstate,
99 : TocEntry *pending_list);
100 : static void restore_toc_entries_postfork(ArchiveHandle *AH,
101 : TocEntry *pending_list);
102 : static void pending_list_header_init(TocEntry *l);
103 : static void pending_list_append(TocEntry *l, TocEntry *te);
104 : static void pending_list_remove(TocEntry *te);
105 : static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
106 : static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
107 : static void move_to_ready_heap(TocEntry *pending_list,
108 : binaryheap *ready_heap,
109 : RestorePass pass);
110 : static TocEntry *pop_next_work_item(binaryheap *ready_heap,
111 : ParallelState *pstate);
112 : static void mark_dump_job_done(ArchiveHandle *AH,
113 : TocEntry *te,
114 : int status,
115 : void *callback_data);
116 : static void mark_restore_job_done(ArchiveHandle *AH,
117 : TocEntry *te,
118 : int status,
119 : void *callback_data);
120 : static void fix_dependencies(ArchiveHandle *AH);
121 : static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
122 : static void repoint_table_dependencies(ArchiveHandle *AH);
123 : static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
124 : static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
125 : binaryheap *ready_heap);
126 : static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
127 : static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
128 :
129 : static void StrictNamesCheck(RestoreOptions *ropt);
130 :
131 :
132 : /*
133 : * Allocate a new DumpOptions block containing all default values.
134 : */
135 : DumpOptions *
136 118 : NewDumpOptions(void)
137 : {
138 118 : DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
139 :
140 118 : InitDumpOptions(opts);
141 118 : return opts;
142 : }
143 :
144 : /*
145 : * Initialize a DumpOptions struct to all default values
146 : */
147 : void
148 576 : InitDumpOptions(DumpOptions *opts)
149 : {
150 576 : memset(opts, 0, sizeof(DumpOptions));
151 : /* set any fields that shouldn't default to zeroes */
152 576 : opts->include_everything = true;
153 576 : opts->cparams.promptPassword = TRI_DEFAULT;
154 576 : opts->dumpSections = DUMP_UNSECTIONED;
155 576 : opts->dumpSchema = true;
156 576 : opts->dumpData = true;
157 576 : opts->dumpStatistics = false;
158 576 : }
159 :
160 : /*
161 : * Create a freshly allocated DumpOptions with options equivalent to those
162 : * found in the given RestoreOptions.
163 : */
164 : DumpOptions *
165 118 : dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
166 : {
167 118 : DumpOptions *dopt = NewDumpOptions();
168 :
169 : /* this is the inverse of what's at the end of pg_dump.c's main() */
170 118 : dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
171 118 : dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
172 118 : dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
173 118 : dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
174 118 : dopt->cparams.promptPassword = ropt->cparams.promptPassword;
175 118 : dopt->outputClean = ropt->dropSchema;
176 118 : dopt->dumpData = ropt->dumpData;
177 118 : dopt->dumpSchema = ropt->dumpSchema;
178 118 : dopt->dumpSections = ropt->dumpSections;
179 118 : dopt->dumpStatistics = ropt->dumpStatistics;
180 118 : dopt->if_exists = ropt->if_exists;
181 118 : dopt->column_inserts = ropt->column_inserts;
182 118 : dopt->aclsSkip = ropt->aclsSkip;
183 118 : dopt->outputSuperuser = ropt->superuser;
184 118 : dopt->outputCreateDB = ropt->createDB;
185 118 : dopt->outputNoOwner = ropt->noOwner;
186 118 : dopt->outputNoTableAm = ropt->noTableAm;
187 118 : dopt->outputNoTablespaces = ropt->noTablespace;
188 118 : dopt->disable_triggers = ropt->disable_triggers;
189 118 : dopt->use_setsessauth = ropt->use_setsessauth;
190 118 : dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
191 118 : dopt->dump_inserts = ropt->dump_inserts;
192 118 : dopt->no_comments = ropt->no_comments;
193 118 : dopt->no_policies = ropt->no_policies;
194 118 : dopt->no_publications = ropt->no_publications;
195 118 : dopt->no_security_labels = ropt->no_security_labels;
196 118 : dopt->no_subscriptions = ropt->no_subscriptions;
197 118 : dopt->lockWaitTimeout = ropt->lockWaitTimeout;
198 118 : dopt->include_everything = ropt->include_everything;
199 118 : dopt->enable_row_security = ropt->enable_row_security;
200 118 : dopt->sequence_data = ropt->sequence_data;
201 :
202 118 : return dopt;
203 : }
204 :
205 :
206 : /*
207 : * Wrapper functions.
208 : *
209 : * The objective is to make writing new formats and dumpers as simple
210 : * as possible, if necessary at the expense of extra function calls etc.
211 : *
212 : */
213 :
214 : /*
215 : * The dump worker setup needs lots of knowledge of the internals of pg_dump,
216 : * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
217 : * setup doesn't need to know anything much, so it's defined here.
218 : */
219 : static void
220 24 : setupRestoreWorker(Archive *AHX)
221 : {
222 24 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
223 :
224 24 : AH->ReopenPtr(AH);
225 24 : }
226 :
227 :
228 : /* Create a new archive */
229 : /* Public */
230 : Archive *
231 408 : CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
232 : const pg_compress_specification compression_spec,
233 : bool dosync, ArchiveMode mode,
234 : SetupWorkerPtrType setupDumpWorker,
235 : DataDirSyncMethod sync_method)
236 :
237 : {
238 408 : ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
239 : dosync, mode, setupDumpWorker, sync_method);
240 :
241 406 : return (Archive *) AH;
242 : }
243 :
244 : /* Open an existing archive */
245 : /* Public */
246 : Archive *
247 114 : OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
248 : {
249 : ArchiveHandle *AH;
250 114 : pg_compress_specification compression_spec = {0};
251 :
252 114 : compression_spec.algorithm = PG_COMPRESSION_NONE;
253 114 : AH = _allocAH(FileSpec, fmt, compression_spec, true,
254 : archModeRead, setupRestoreWorker,
255 : DATA_DIR_SYNC_METHOD_FSYNC);
256 :
257 114 : return (Archive *) AH;
258 : }
259 :
260 : /* Public */
261 : void
262 480 : CloseArchive(Archive *AHX)
263 : {
264 480 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
265 :
266 480 : AH->ClosePtr(AH);
267 :
268 : /* Close the output */
269 480 : errno = 0;
270 480 : if (!EndCompressFileHandle(AH->OF))
271 0 : pg_fatal("could not close output file: %m");
272 480 : }
273 :
274 : /* Public */
275 : void
276 896 : SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
277 : {
278 : /* Caller can omit dump options, in which case we synthesize them */
279 896 : if (dopt == NULL && ropt != NULL)
280 118 : dopt = dumpOptionsFromRestoreOptions(ropt);
281 :
282 : /* Save options for later access */
283 896 : AH->dopt = dopt;
284 896 : AH->ropt = ropt;
285 896 : }
286 :
287 : /* Public */
288 : void
289 474 : ProcessArchiveRestoreOptions(Archive *AHX)
290 : {
291 474 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
292 474 : RestoreOptions *ropt = AH->public.ropt;
293 : TocEntry *te;
294 : teSection curSection;
295 :
296 : /* Decide which TOC entries will be dumped/restored, and mark them */
297 474 : curSection = SECTION_PRE_DATA;
298 120626 : for (te = AH->toc->next; te != AH->toc; te = te->next)
299 : {
300 : /*
301 : * When writing an archive, we also take this opportunity to check
302 : * that we have generated the entries in a sane order that respects
303 : * the section divisions. When reading, don't complain, since buggy
304 : * old versions of pg_dump might generate out-of-order archives.
305 : */
306 120152 : if (AH->mode != archModeRead)
307 : {
308 100360 : switch (te->section)
309 : {
310 17250 : case SECTION_NONE:
311 : /* ok to be anywhere */
312 17250 : break;
313 47798 : case SECTION_PRE_DATA:
314 47798 : if (curSection != SECTION_PRE_DATA)
315 0 : pg_log_warning("archive items not in correct section order");
316 47798 : break;
317 18274 : case SECTION_DATA:
318 18274 : if (curSection == SECTION_POST_DATA)
319 0 : pg_log_warning("archive items not in correct section order");
320 18274 : break;
321 17038 : case SECTION_POST_DATA:
322 : /* ok no matter which section we were in */
323 17038 : break;
324 0 : default:
325 0 : pg_fatal("unexpected section code %d",
326 : (int) te->section);
327 : break;
328 : }
329 : }
330 :
331 120152 : if (te->section != SECTION_NONE)
332 101254 : curSection = te->section;
333 :
334 120152 : te->reqs = _tocEntryRequired(te, curSection, AH);
335 : }
336 :
337 : /* Enforce strict names checking */
338 474 : if (ropt->strict_names)
339 0 : StrictNamesCheck(ropt);
340 474 : }
341 :
342 : /* Public */
343 : void
344 370 : RestoreArchive(Archive *AHX)
345 : {
346 370 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
347 370 : RestoreOptions *ropt = AH->public.ropt;
348 : bool parallel_mode;
349 : TocEntry *te;
350 : CompressFileHandle *sav;
351 :
352 370 : AH->stage = STAGE_INITIALIZING;
353 :
354 : /*
355 : * If we're going to do parallel restore, there are some restrictions.
356 : */
357 370 : parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
358 370 : if (parallel_mode)
359 : {
360 : /* We haven't got round to making this work for all archive formats */
361 10 : if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
362 0 : pg_fatal("parallel restore is not supported with this archive file format");
363 :
364 : /* Doesn't work if the archive represents dependencies as OIDs */
365 10 : if (AH->version < K_VERS_1_8)
366 0 : pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
367 :
368 : /*
369 : * It's also not gonna work if we can't reopen the input file, so
370 : * let's try that immediately.
371 : */
372 10 : AH->ReopenPtr(AH);
373 : }
374 :
375 : /*
376 : * Make sure we won't need (de)compression we haven't got
377 : */
378 370 : if (AH->PrintTocDataPtr != NULL)
379 : {
380 66636 : for (te = AH->toc->next; te != AH->toc; te = te->next)
381 : {
382 66526 : if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
383 : {
384 260 : char *errmsg = supports_compression(AH->compression_spec);
385 :
386 260 : if (errmsg)
387 0 : pg_fatal("cannot restore from compressed archive (%s)",
388 : errmsg);
389 : else
390 260 : break;
391 : }
392 : }
393 : }
394 :
395 : /*
396 : * Prepare index arrays, so we can assume we have them throughout restore.
397 : * It's possible we already did this, though.
398 : */
399 370 : if (AH->tocsByDumpId == NULL)
400 366 : buildTocEntryArrays(AH);
401 :
402 : /*
403 : * If we're using a DB connection, then connect it.
404 : */
405 370 : if (ropt->useDB)
406 : {
407 66 : pg_log_info("connecting to database for restore");
408 66 : if (AH->version < K_VERS_1_3)
409 0 : pg_fatal("direct database connections are not supported in pre-1.3 archives");
410 :
411 : /*
412 : * We don't want to guess at whether the dump will successfully
413 : * restore; allow the attempt regardless of the version of the restore
414 : * target.
415 : */
416 66 : AHX->minRemoteVersion = 0;
417 66 : AHX->maxRemoteVersion = 9999999;
418 :
419 66 : ConnectDatabaseAhx(AHX, &ropt->cparams, false);
420 :
421 : /*
422 : * If we're talking to the DB directly, don't send comments since they
423 : * obscure SQL when displaying errors
424 : */
425 66 : AH->noTocComments = 1;
426 : }
427 :
428 : /*
429 : * Work out if we have an implied schema-less restore. This can happen if
430 : * the dump excluded the schema or the user has used a toc list to exclude
431 : * all of the schema data. All we do is look for schema entries - if none
432 : * are found then we unset the dumpSchema flag.
433 : *
434 : * We could scan for wanted TABLE entries, but that is not the same as
435 : * data-only. At this stage, it seems unnecessary (6-Mar-2001).
436 : */
437 370 : if (ropt->dumpSchema)
438 : {
439 352 : bool no_schema_found = true;
440 :
441 2878 : for (te = AH->toc->next; te != AH->toc; te = te->next)
442 : {
443 2836 : if ((te->reqs & REQ_SCHEMA) != 0)
444 : {
445 310 : no_schema_found = false;
446 310 : break;
447 : }
448 : }
449 352 : if (no_schema_found)
450 : {
451 42 : ropt->dumpSchema = false;
452 42 : pg_log_info("implied no-schema restore");
453 : }
454 : }
455 :
456 : /*
457 : * Setup the output file if necessary.
458 : */
459 370 : sav = SaveOutput(AH);
460 370 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
461 282 : SetOutput(AH, ropt->filename, ropt->compression_spec);
462 :
463 370 : ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
464 :
465 370 : if (AH->archiveRemoteVersion)
466 370 : ahprintf(AH, "-- Dumped from database version %s\n",
467 : AH->archiveRemoteVersion);
468 370 : if (AH->archiveDumpVersion)
469 370 : ahprintf(AH, "-- Dumped by pg_dump version %s\n",
470 : AH->archiveDumpVersion);
471 :
472 370 : ahprintf(AH, "\n");
473 :
474 370 : if (AH->public.verbose)
475 78 : dumpTimestamp(AH, "Started on", AH->createDate);
476 :
477 370 : if (ropt->single_txn)
478 : {
479 0 : if (AH->connection)
480 0 : StartTransaction(AHX);
481 : else
482 0 : ahprintf(AH, "BEGIN;\n\n");
483 : }
484 :
485 : /*
486 : * Establish important parameter values right away.
487 : */
488 370 : _doSetFixedOutputState(AH);
489 :
490 370 : AH->stage = STAGE_PROCESSING;
491 :
492 : /*
493 : * Drop the items at the start, in reverse order
494 : */
495 370 : if (ropt->dropSchema)
496 : {
497 2790 : for (te = AH->toc->prev; te != AH->toc; te = te->prev)
498 : {
499 2744 : AH->currentTE = te;
500 :
501 : /*
502 : * In createDB mode, issue a DROP *only* for the database as a
503 : * whole. Issuing drops against anything else would be wrong,
504 : * because at this point we're connected to the wrong database.
505 : * (The DATABASE PROPERTIES entry, if any, should be treated like
506 : * the DATABASE entry.)
507 : */
508 2744 : if (ropt->createDB)
509 : {
510 1178 : if (strcmp(te->desc, "DATABASE") != 0 &&
511 1142 : strcmp(te->desc, "DATABASE PROPERTIES") != 0)
512 1110 : continue;
513 : }
514 :
515 : /* Otherwise, drop anything that's selected and has a dropStmt */
516 1634 : if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
517 : {
518 664 : bool not_allowed_in_txn = false;
519 :
520 664 : pg_log_info("dropping %s %s", te->desc, te->tag);
521 :
522 : /*
523 : * In --transaction-size mode, we have to temporarily exit our
524 : * transaction block to drop objects that can't be dropped
525 : * within a transaction.
526 : */
527 664 : if (ropt->txn_size > 0)
528 : {
529 64 : if (strcmp(te->desc, "DATABASE") == 0 ||
530 32 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
531 : {
532 64 : not_allowed_in_txn = true;
533 64 : if (AH->connection)
534 64 : CommitTransaction(AHX);
535 : else
536 0 : ahprintf(AH, "COMMIT;\n");
537 : }
538 : }
539 :
540 : /* Select owner and schema as necessary */
541 664 : _becomeOwner(AH, te);
542 664 : _selectOutputSchema(AH, te->namespace);
543 :
544 : /*
545 : * Now emit the DROP command, if the object has one. Note we
546 : * don't necessarily emit it verbatim; at this point we add an
547 : * appropriate IF EXISTS clause, if the user requested it.
548 : */
549 664 : if (strcmp(te->desc, "BLOB METADATA") == 0)
550 : {
551 : /* We must generate the per-blob commands */
552 8 : if (ropt->if_exists)
553 4 : IssueCommandPerBlob(AH, te,
554 : "SELECT pg_catalog.lo_unlink(oid) "
555 : "FROM pg_catalog.pg_largeobject_metadata "
556 : "WHERE oid = '", "'");
557 : else
558 4 : IssueCommandPerBlob(AH, te,
559 : "SELECT pg_catalog.lo_unlink('",
560 : "')");
561 : }
562 656 : else if (*te->dropStmt != '\0')
563 : {
564 608 : if (!ropt->if_exists ||
565 278 : strncmp(te->dropStmt, "--", 2) == 0)
566 : {
567 : /*
568 : * Without --if-exists, or if it's just a comment (as
569 : * happens for the public schema), print the dropStmt
570 : * as-is.
571 : */
572 332 : ahprintf(AH, "%s", te->dropStmt);
573 : }
574 : else
575 : {
576 : /*
577 : * Inject an appropriate spelling of "if exists". For
578 : * old-style large objects, we have a routine that
579 : * knows how to do it, without depending on
580 : * te->dropStmt; use that. For other objects we need
581 : * to parse the command.
582 : */
583 276 : if (strcmp(te->desc, "BLOB") == 0)
584 : {
585 0 : DropLOIfExists(AH, te->catalogId.oid);
586 : }
587 : else
588 : {
589 276 : char *dropStmt = pg_strdup(te->dropStmt);
590 276 : char *dropStmtOrig = dropStmt;
591 276 : PQExpBuffer ftStmt = createPQExpBuffer();
592 :
593 : /*
594 : * Need to inject IF EXISTS clause after ALTER
595 : * TABLE part in ALTER TABLE .. DROP statement
596 : */
597 276 : if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
598 : {
599 38 : appendPQExpBufferStr(ftStmt,
600 : "ALTER TABLE IF EXISTS");
601 38 : dropStmt = dropStmt + 11;
602 : }
603 :
604 : /*
605 : * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
606 : * not support the IF EXISTS clause, and therefore
607 : * we simply emit the original command for DEFAULT
608 : * objects (modulo the adjustment made above).
609 : *
610 : * Likewise, don't mess with DATABASE PROPERTIES.
611 : *
612 : * If we used CREATE OR REPLACE VIEW as a means of
613 : * quasi-dropping an ON SELECT rule, that should
614 : * be emitted unchanged as well.
615 : *
616 : * For other object types, we need to extract the
617 : * first part of the DROP which includes the
618 : * object type. Most of the time this matches
619 : * te->desc, so search for that; however for the
620 : * different kinds of CONSTRAINTs, we know to
621 : * search for hardcoded "DROP CONSTRAINT" instead.
622 : */
623 276 : if (strcmp(te->desc, "DEFAULT") == 0 ||
624 270 : strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
625 270 : strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
626 6 : appendPQExpBufferStr(ftStmt, dropStmt);
627 : else
628 : {
629 : char buffer[40];
630 : char *mark;
631 :
632 270 : if (strcmp(te->desc, "CONSTRAINT") == 0 ||
633 242 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
634 242 : strcmp(te->desc, "FK CONSTRAINT") == 0)
635 32 : strcpy(buffer, "DROP CONSTRAINT");
636 : else
637 238 : snprintf(buffer, sizeof(buffer), "DROP %s",
638 : te->desc);
639 :
640 270 : mark = strstr(dropStmt, buffer);
641 :
642 270 : if (mark)
643 : {
644 270 : *mark = '\0';
645 270 : appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
646 : dropStmt, buffer,
647 270 : mark + strlen(buffer));
648 : }
649 : else
650 : {
651 : /* complain and emit unmodified command */
652 0 : pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
653 : dropStmtOrig);
654 0 : appendPQExpBufferStr(ftStmt, dropStmt);
655 : }
656 : }
657 :
658 276 : ahprintf(AH, "%s", ftStmt->data);
659 :
660 276 : destroyPQExpBuffer(ftStmt);
661 276 : pg_free(dropStmtOrig);
662 : }
663 : }
664 : }
665 :
666 : /*
667 : * In --transaction-size mode, re-establish the transaction
668 : * block if needed; otherwise, commit after every N drops.
669 : */
670 664 : if (ropt->txn_size > 0)
671 : {
672 64 : if (not_allowed_in_txn)
673 : {
674 64 : if (AH->connection)
675 64 : StartTransaction(AHX);
676 : else
677 0 : ahprintf(AH, "BEGIN;\n");
678 64 : AH->txnCount = 0;
679 : }
680 0 : else if (++AH->txnCount >= ropt->txn_size)
681 : {
682 0 : if (AH->connection)
683 : {
684 0 : CommitTransaction(AHX);
685 0 : StartTransaction(AHX);
686 : }
687 : else
688 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n");
689 0 : AH->txnCount = 0;
690 : }
691 : }
692 : }
693 : }
694 :
695 : /*
696 : * _selectOutputSchema may have set currSchema to reflect the effect
697 : * of a "SET search_path" command it emitted. However, by now we may
698 : * have dropped that schema; or it might not have existed in the first
699 : * place. In either case the effective value of search_path will not
700 : * be what we think. Forcibly reset currSchema so that we will
701 : * re-establish the search_path setting when needed (after creating
702 : * the schema).
703 : *
704 : * If we treated users as pg_dump'able objects then we'd need to reset
705 : * currUser here too.
706 : */
707 46 : free(AH->currSchema);
708 46 : AH->currSchema = NULL;
709 : }
710 :
711 370 : if (parallel_mode)
712 : {
713 : /*
714 : * In parallel mode, turn control over to the parallel-restore logic.
715 : */
716 : ParallelState *pstate;
717 : TocEntry pending_list;
718 :
719 : /* The archive format module may need some setup for this */
720 10 : if (AH->PrepParallelRestorePtr)
721 10 : AH->PrepParallelRestorePtr(AH);
722 :
723 10 : pending_list_header_init(&pending_list);
724 :
725 : /* This runs PRE_DATA items and then disconnects from the database */
726 10 : restore_toc_entries_prefork(AH, &pending_list);
727 : Assert(AH->connection == NULL);
728 :
729 : /* ParallelBackupStart() will actually fork the processes */
730 10 : pstate = ParallelBackupStart(AH);
731 10 : restore_toc_entries_parallel(AH, pstate, &pending_list);
732 10 : ParallelBackupEnd(AH, pstate);
733 :
734 : /* reconnect the leader and see if we missed something */
735 10 : restore_toc_entries_postfork(AH, &pending_list);
736 : Assert(AH->connection != NULL);
737 : }
738 : else
739 : {
740 : /*
741 : * In serial mode, process everything in three phases: normal items,
742 : * then ACLs, then post-ACL items. We might be able to skip one or
743 : * both extra phases in some cases, eg data-only restores.
744 : */
745 360 : bool haveACL = false;
746 360 : bool havePostACL = false;
747 :
748 95854 : for (te = AH->toc->next; te != AH->toc; te = te->next)
749 : {
750 95496 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
751 2942 : continue; /* ignore if not to be dumped at all */
752 :
753 92554 : switch (_tocEntryRestorePass(te))
754 : {
755 85284 : case RESTORE_PASS_MAIN:
756 85284 : (void) restore_toc_entry(AH, te, false);
757 85282 : break;
758 4068 : case RESTORE_PASS_ACL:
759 4068 : haveACL = true;
760 4068 : break;
761 3202 : case RESTORE_PASS_POST_ACL:
762 3202 : havePostACL = true;
763 3202 : break;
764 : }
765 : }
766 :
767 358 : if (haveACL)
768 : {
769 92960 : for (te = AH->toc->next; te != AH->toc; te = te->next)
770 : {
771 183508 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
772 90708 : _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
773 4068 : (void) restore_toc_entry(AH, te, false);
774 : }
775 : }
776 :
777 358 : if (havePostACL)
778 : {
779 64498 : for (te = AH->toc->next; te != AH->toc; te = te->next)
780 : {
781 127930 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
782 63530 : _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
783 3202 : (void) restore_toc_entry(AH, te, false);
784 : }
785 : }
786 : }
787 :
788 : /*
789 : * Close out any persistent transaction we may have. While these two
790 : * cases are started in different places, we can end both cases here.
791 : */
792 368 : if (ropt->single_txn || ropt->txn_size > 0)
793 : {
794 56 : if (AH->connection)
795 56 : CommitTransaction(AHX);
796 : else
797 0 : ahprintf(AH, "COMMIT;\n\n");
798 : }
799 :
800 368 : if (AH->public.verbose)
801 78 : dumpTimestamp(AH, "Completed on", time(NULL));
802 :
803 368 : ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
804 :
805 : /*
806 : * Clean up & we're done.
807 : */
808 368 : AH->stage = STAGE_FINALIZING;
809 :
810 368 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
811 282 : RestoreOutput(AH, sav);
812 :
813 368 : if (ropt->useDB)
814 66 : DisconnectDatabase(&AH->public);
815 368 : }
816 :
817 : /*
818 : * Restore a single TOC item. Used in both parallel and non-parallel restore;
819 : * is_parallel is true if we are in a worker child process.
820 : *
821 : * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
822 : * the parallel parent has to make the corresponding status update.
823 : */
824 : static int
825 98264 : restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
826 : {
827 98264 : RestoreOptions *ropt = AH->public.ropt;
828 98264 : int status = WORKER_OK;
829 : int reqs;
830 : bool defnDumped;
831 :
832 98264 : AH->currentTE = te;
833 :
834 : /* Dump any relevant dump warnings to stderr */
835 98264 : if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
836 : {
837 0 : if (ropt->dumpSchema && te->defn != NULL && strlen(te->defn) != 0)
838 0 : pg_log_warning("warning from original dump file: %s", te->defn);
839 0 : else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
840 0 : pg_log_warning("warning from original dump file: %s", te->copyStmt);
841 : }
842 :
843 : /* Work out what, if anything, we want from this entry */
844 98264 : reqs = te->reqs;
845 :
846 98264 : defnDumped = false;
847 :
848 : /*
849 : * If it has a schema component that we want, then process that
850 : */
851 98264 : if ((reqs & REQ_SCHEMA) != 0)
852 : {
853 77500 : bool object_is_db = false;
854 :
855 : /*
856 : * In --transaction-size mode, must exit our transaction block to
857 : * create a database or set its properties.
858 : */
859 77500 : if (strcmp(te->desc, "DATABASE") == 0 ||
860 77378 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
861 : {
862 188 : object_is_db = true;
863 188 : if (ropt->txn_size > 0)
864 : {
865 112 : if (AH->connection)
866 112 : CommitTransaction(&AH->public);
867 : else
868 0 : ahprintf(AH, "COMMIT;\n\n");
869 : }
870 : }
871 :
872 : /* Show namespace in log message if available */
873 77500 : if (te->namespace)
874 74512 : pg_log_info("creating %s \"%s.%s\"",
875 : te->desc, te->namespace, te->tag);
876 : else
877 2988 : pg_log_info("creating %s \"%s\"",
878 : te->desc, te->tag);
879 :
880 77500 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
881 77500 : defnDumped = true;
882 :
883 77500 : if (strcmp(te->desc, "TABLE") == 0)
884 : {
885 14998 : if (AH->lastErrorTE == te)
886 : {
887 : /*
888 : * We failed to create the table. If
889 : * --no-data-for-failed-tables was given, mark the
890 : * corresponding TABLE DATA to be ignored.
891 : *
892 : * In the parallel case this must be done in the parent, so we
893 : * just set the return value.
894 : */
895 0 : if (ropt->noDataForFailedTables)
896 : {
897 0 : if (is_parallel)
898 0 : status = WORKER_INHIBIT_DATA;
899 : else
900 0 : inhibit_data_for_failed_table(AH, te);
901 : }
902 : }
903 : else
904 : {
905 : /*
906 : * We created the table successfully. Mark the corresponding
907 : * TABLE DATA for possible truncation.
908 : *
909 : * In the parallel case this must be done in the parent, so we
910 : * just set the return value.
911 : */
912 14998 : if (is_parallel)
913 0 : status = WORKER_CREATE_DONE;
914 : else
915 14998 : mark_create_done(AH, te);
916 : }
917 : }
918 :
919 : /*
920 : * If we created a DB, connect to it. Also, if we changed DB
921 : * properties, reconnect to ensure that relevant GUC settings are
922 : * applied to our session. (That also restarts the transaction block
923 : * in --transaction-size mode.)
924 : */
925 77500 : if (object_is_db)
926 : {
927 188 : pg_log_info("connecting to new database \"%s\"", te->tag);
928 188 : _reconnectToDB(AH, te->tag);
929 : }
930 : }
931 :
932 : /*
933 : * If it has a data component that we want, then process that
934 : */
935 98264 : if ((reqs & REQ_DATA) != 0)
936 : {
937 : /*
938 : * hadDumper will be set if there is genuine data component for this
939 : * node. Otherwise, we need to check the defn field for statements
940 : * that need to be executed in data-only restores.
941 : */
942 13926 : if (te->hadDumper)
943 : {
944 : /*
945 : * If we can output the data, then restore it.
946 : */
947 12490 : if (AH->PrintTocDataPtr != NULL)
948 : {
949 12490 : _printTocEntry(AH, te, TOC_PREFIX_DATA);
950 :
951 12490 : if (strcmp(te->desc, "BLOBS") == 0 ||
952 12336 : strcmp(te->desc, "BLOB COMMENTS") == 0)
953 : {
954 154 : pg_log_info("processing %s", te->desc);
955 :
956 154 : _selectOutputSchema(AH, "pg_catalog");
957 :
958 : /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
959 154 : if (strcmp(te->desc, "BLOB COMMENTS") == 0)
960 0 : AH->outputKind = OUTPUT_OTHERDATA;
961 :
962 154 : AH->PrintTocDataPtr(AH, te);
963 :
964 154 : AH->outputKind = OUTPUT_SQLCMDS;
965 : }
966 : else
967 : {
968 : bool use_truncate;
969 :
970 12336 : _disableTriggersIfNecessary(AH, te);
971 :
972 : /* Select owner and schema as necessary */
973 12336 : _becomeOwner(AH, te);
974 12336 : _selectOutputSchema(AH, te->namespace);
975 :
976 12336 : pg_log_info("processing data for table \"%s.%s\"",
977 : te->namespace, te->tag);
978 :
979 : /*
980 : * In parallel restore, if we created the table earlier in
981 : * this run (so that we know it is empty) and we are not
982 : * restoring a load-via-partition-root data item then we
983 : * wrap the COPY in a transaction and precede it with a
984 : * TRUNCATE. If wal_level is set to minimal this prevents
985 : * WAL-logging the COPY. This obtains a speedup similar
986 : * to that from using single_txn mode in non-parallel
987 : * restores.
988 : *
989 : * We mustn't do this for load-via-partition-root cases
990 : * because some data might get moved across partition
991 : * boundaries, risking deadlock and/or loss of previously
992 : * loaded data. (We assume that all partitions of a
993 : * partitioned table will be treated the same way.)
994 : */
995 13646 : use_truncate = is_parallel && te->created &&
996 1310 : !is_load_via_partition_root(te);
997 :
998 12336 : if (use_truncate)
999 : {
1000 : /*
1001 : * Parallel restore is always talking directly to a
1002 : * server, so no need to see if we should issue BEGIN.
1003 : */
1004 1298 : StartTransaction(&AH->public);
1005 :
1006 : /*
1007 : * Issue TRUNCATE with ONLY so that child tables are
1008 : * not wiped.
1009 : */
1010 1298 : ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1011 1298 : fmtQualifiedId(te->namespace, te->tag));
1012 : }
1013 :
1014 : /*
1015 : * If we have a copy statement, use it.
1016 : */
1017 12336 : if (te->copyStmt && strlen(te->copyStmt) > 0)
1018 : {
1019 12174 : ahprintf(AH, "%s", te->copyStmt);
1020 12174 : AH->outputKind = OUTPUT_COPYDATA;
1021 : }
1022 : else
1023 162 : AH->outputKind = OUTPUT_OTHERDATA;
1024 :
1025 12336 : AH->PrintTocDataPtr(AH, te);
1026 :
1027 : /*
1028 : * Terminate COPY if needed.
1029 : */
1030 24506 : if (AH->outputKind == OUTPUT_COPYDATA &&
1031 12172 : RestoringToDB(AH))
1032 1408 : EndDBCopyMode(&AH->public, te->tag);
1033 12334 : AH->outputKind = OUTPUT_SQLCMDS;
1034 :
1035 : /* close out the transaction started above */
1036 12334 : if (use_truncate)
1037 1298 : CommitTransaction(&AH->public);
1038 :
1039 12334 : _enableTriggersIfNecessary(AH, te);
1040 : }
1041 : }
1042 : }
1043 1436 : else if (!defnDumped)
1044 : {
1045 : /* If we haven't already dumped the defn part, do so now */
1046 1436 : pg_log_info("executing %s %s", te->desc, te->tag);
1047 1436 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
1048 : }
1049 : }
1050 :
1051 : /*
1052 : * If it has a statistics component that we want, then process that
1053 : */
1054 98262 : if ((reqs & REQ_STATS) != 0)
1055 6802 : _printTocEntry(AH, te, TOC_PREFIX_STATS);
1056 :
1057 : /*
1058 : * If we emitted anything for this TOC entry, that counts as one action
1059 : * against the transaction-size limit. Commit if it's time to.
1060 : */
1061 98262 : if ((reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 && ropt->txn_size > 0)
1062 : {
1063 6722 : if (++AH->txnCount >= ropt->txn_size)
1064 : {
1065 20 : if (AH->connection)
1066 : {
1067 20 : CommitTransaction(&AH->public);
1068 20 : StartTransaction(&AH->public);
1069 : }
1070 : else
1071 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1072 20 : AH->txnCount = 0;
1073 : }
1074 : }
1075 :
1076 98262 : if (AH->public.n_errors > 0 && status == WORKER_OK)
1077 0 : status = WORKER_IGNORED_ERRORS;
1078 :
1079 98262 : return status;
1080 : }
1081 :
1082 : /*
1083 : * Allocate a new RestoreOptions block.
1084 : * This is mainly so we can initialize it, but also for future expansion,
1085 : */
1086 : RestoreOptions *
1087 554 : NewRestoreOptions(void)
1088 : {
1089 : RestoreOptions *opts;
1090 :
1091 554 : opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
1092 :
1093 : /* set any fields that shouldn't default to zeroes */
1094 554 : opts->format = archUnknown;
1095 554 : opts->cparams.promptPassword = TRI_DEFAULT;
1096 554 : opts->dumpSections = DUMP_UNSECTIONED;
1097 554 : opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1098 554 : opts->compression_spec.level = 0;
1099 554 : opts->dumpSchema = true;
1100 554 : opts->dumpData = true;
1101 554 : opts->dumpStatistics = true;
1102 :
1103 554 : return opts;
1104 : }
1105 :
1106 : static void
1107 12336 : _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1108 : {
1109 12336 : RestoreOptions *ropt = AH->public.ropt;
1110 :
1111 : /* This hack is only needed in a data-only restore */
1112 12336 : if (ropt->dumpSchema || !ropt->disable_triggers)
1113 12264 : return;
1114 :
1115 72 : pg_log_info("disabling triggers for %s", te->tag);
1116 :
1117 : /*
1118 : * Become superuser if possible, since they are the only ones who can
1119 : * disable constraint triggers. If -S was not given, assume the initial
1120 : * user identity is a superuser. (XXX would it be better to become the
1121 : * table owner?)
1122 : */
1123 72 : _becomeUser(AH, ropt->superuser);
1124 :
1125 : /*
1126 : * Disable them.
1127 : */
1128 72 : ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1129 72 : fmtQualifiedId(te->namespace, te->tag));
1130 : }
1131 :
1132 : static void
1133 12334 : _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1134 : {
1135 12334 : RestoreOptions *ropt = AH->public.ropt;
1136 :
1137 : /* This hack is only needed in a data-only restore */
1138 12334 : if (ropt->dumpSchema || !ropt->disable_triggers)
1139 12262 : return;
1140 :
1141 72 : pg_log_info("enabling triggers for %s", te->tag);
1142 :
1143 : /*
1144 : * Become superuser if possible, since they are the only ones who can
1145 : * disable constraint triggers. If -S was not given, assume the initial
1146 : * user identity is a superuser. (XXX would it be better to become the
1147 : * table owner?)
1148 : */
1149 72 : _becomeUser(AH, ropt->superuser);
1150 :
1151 : /*
1152 : * Enable them.
1153 : */
1154 72 : ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1155 72 : fmtQualifiedId(te->namespace, te->tag));
1156 : }
1157 :
1158 : /*
1159 : * Detect whether a TABLE DATA TOC item is performing "load via partition
1160 : * root", that is the target table is an ancestor partition rather than the
1161 : * table the TOC item is nominally for.
1162 : *
1163 : * In newer archive files this can be detected by checking for a special
1164 : * comment placed in te->defn. In older files we have to fall back to seeing
1165 : * if the COPY statement targets the named table or some other one. This
1166 : * will not work for data dumped as INSERT commands, so we could give a false
1167 : * negative in that case; fortunately, that's a rarely-used option.
1168 : */
1169 : static bool
1170 1310 : is_load_via_partition_root(TocEntry *te)
1171 : {
1172 1310 : if (te->defn &&
1173 12 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
1174 12 : return true;
1175 1298 : if (te->copyStmt && *te->copyStmt)
1176 : {
1177 1290 : PQExpBuffer copyStmt = createPQExpBuffer();
1178 : bool result;
1179 :
1180 : /*
1181 : * Build the initial part of the COPY as it would appear if the
1182 : * nominal target table is the actual target. If we see anything
1183 : * else, it must be a load-via-partition-root case.
1184 : */
1185 1290 : appendPQExpBuffer(copyStmt, "COPY %s ",
1186 1290 : fmtQualifiedId(te->namespace, te->tag));
1187 1290 : result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1188 1290 : destroyPQExpBuffer(copyStmt);
1189 1290 : return result;
1190 : }
1191 : /* Assume it's not load-via-partition-root */
1192 8 : return false;
1193 : }
1194 :
1195 : /*
1196 : * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1197 : */
1198 :
1199 : /* Public */
1200 : void
1201 6182270 : WriteData(Archive *AHX, const void *data, size_t dLen)
1202 : {
1203 6182270 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1204 :
1205 6182270 : if (!AH->currToc)
1206 0 : pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1207 :
1208 6182270 : AH->WriteDataPtr(AH, data, dLen);
1209 6182270 : }
1210 :
1211 : /*
1212 : * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1213 : * repository for all metadata. But the name has stuck.
1214 : *
1215 : * The new entry is added to the Archive's TOC list. Most callers can ignore
1216 : * the result value because nothing else need be done, but a few want to
1217 : * manipulate the TOC entry further.
1218 : */
1219 :
1220 : /* Public */
1221 : TocEntry *
1222 100360 : ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1223 : ArchiveOpts *opts)
1224 : {
1225 100360 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1226 : TocEntry *newToc;
1227 :
1228 100360 : newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1229 :
1230 100360 : AH->tocCount++;
1231 100360 : if (dumpId > AH->maxDumpId)
1232 27174 : AH->maxDumpId = dumpId;
1233 :
1234 100360 : newToc->prev = AH->toc->prev;
1235 100360 : newToc->next = AH->toc;
1236 100360 : AH->toc->prev->next = newToc;
1237 100360 : AH->toc->prev = newToc;
1238 :
1239 100360 : newToc->catalogId = catalogId;
1240 100360 : newToc->dumpId = dumpId;
1241 100360 : newToc->section = opts->section;
1242 :
1243 100360 : newToc->tag = pg_strdup(opts->tag);
1244 100360 : newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1245 100360 : newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1246 100360 : newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1247 100360 : newToc->relkind = opts->relkind;
1248 100360 : newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1249 100360 : newToc->desc = pg_strdup(opts->description);
1250 100360 : newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1251 100360 : newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1252 100360 : newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1253 :
1254 100360 : if (opts->nDeps > 0)
1255 : {
1256 39124 : newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1257 39124 : memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1258 39124 : newToc->nDeps = opts->nDeps;
1259 : }
1260 : else
1261 : {
1262 61236 : newToc->dependencies = NULL;
1263 61236 : newToc->nDeps = 0;
1264 : }
1265 :
1266 100360 : newToc->dataDumper = opts->dumpFn;
1267 100360 : newToc->dataDumperArg = opts->dumpArg;
1268 100360 : newToc->hadDumper = opts->dumpFn ? true : false;
1269 :
1270 100360 : newToc->defnDumper = opts->defnFn;
1271 100360 : newToc->defnDumperArg = opts->defnArg;
1272 :
1273 100360 : newToc->formatData = NULL;
1274 100360 : newToc->dataLength = 0;
1275 :
1276 100360 : if (AH->ArchiveEntryPtr != NULL)
1277 19734 : AH->ArchiveEntryPtr(AH, newToc);
1278 :
1279 100360 : return newToc;
1280 : }
1281 :
1282 : /* Public */
1283 : void
1284 8 : PrintTOCSummary(Archive *AHX)
1285 : {
1286 8 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1287 8 : RestoreOptions *ropt = AH->public.ropt;
1288 : TocEntry *te;
1289 8 : pg_compress_specification out_compression_spec = {0};
1290 : teSection curSection;
1291 : CompressFileHandle *sav;
1292 : const char *fmtName;
1293 : char stamp_str[64];
1294 :
1295 : /* TOC is always uncompressed */
1296 8 : out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1297 :
1298 8 : sav = SaveOutput(AH);
1299 8 : if (ropt->filename)
1300 0 : SetOutput(AH, ropt->filename, out_compression_spec);
1301 :
1302 8 : if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1303 8 : localtime(&AH->createDate)) == 0)
1304 0 : strcpy(stamp_str, "[unknown]");
1305 :
1306 8 : ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1307 16 : ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1308 8 : sanitize_line(AH->archdbname, false),
1309 : AH->tocCount,
1310 : get_compress_algorithm_name(AH->compression_spec.algorithm));
1311 :
1312 8 : switch (AH->format)
1313 : {
1314 6 : case archCustom:
1315 6 : fmtName = "CUSTOM";
1316 6 : break;
1317 2 : case archDirectory:
1318 2 : fmtName = "DIRECTORY";
1319 2 : break;
1320 0 : case archTar:
1321 0 : fmtName = "TAR";
1322 0 : break;
1323 0 : default:
1324 0 : fmtName = "UNKNOWN";
1325 : }
1326 :
1327 8 : ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1328 8 : ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1329 8 : ahprintf(AH, "; Format: %s\n", fmtName);
1330 8 : ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1331 8 : ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1332 8 : if (AH->archiveRemoteVersion)
1333 8 : ahprintf(AH, "; Dumped from database version: %s\n",
1334 : AH->archiveRemoteVersion);
1335 8 : if (AH->archiveDumpVersion)
1336 8 : ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1337 : AH->archiveDumpVersion);
1338 :
1339 8 : ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1340 :
1341 8 : curSection = SECTION_PRE_DATA;
1342 2992 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1343 : {
1344 : /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1345 2984 : if (te->section != SECTION_NONE)
1346 2384 : curSection = te->section;
1347 2984 : te->reqs = _tocEntryRequired(te, curSection, AH);
1348 : /* Now, should we print it? */
1349 2984 : if (ropt->verbose ||
1350 2984 : (te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0)
1351 : {
1352 : char *sanitized_name;
1353 : char *sanitized_schema;
1354 : char *sanitized_owner;
1355 :
1356 : /*
1357 : */
1358 2944 : sanitized_name = sanitize_line(te->tag, false);
1359 2944 : sanitized_schema = sanitize_line(te->namespace, true);
1360 2944 : sanitized_owner = sanitize_line(te->owner, false);
1361 :
1362 2944 : ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1363 : te->catalogId.tableoid, te->catalogId.oid,
1364 : te->desc, sanitized_schema, sanitized_name,
1365 : sanitized_owner);
1366 :
1367 2944 : free(sanitized_name);
1368 2944 : free(sanitized_schema);
1369 2944 : free(sanitized_owner);
1370 : }
1371 2984 : if (ropt->verbose && te->nDeps > 0)
1372 : {
1373 : int i;
1374 :
1375 0 : ahprintf(AH, ";\tdepends on:");
1376 0 : for (i = 0; i < te->nDeps; i++)
1377 0 : ahprintf(AH, " %d", te->dependencies[i]);
1378 0 : ahprintf(AH, "\n");
1379 : }
1380 : }
1381 :
1382 : /* Enforce strict names checking */
1383 8 : if (ropt->strict_names)
1384 0 : StrictNamesCheck(ropt);
1385 :
1386 8 : if (ropt->filename)
1387 0 : RestoreOutput(AH, sav);
1388 8 : }
1389 :
1390 : /***********
1391 : * Large Object Archival
1392 : ***********/
1393 :
1394 : /* Called by a dumper to signal start of a LO */
1395 : int
1396 178 : StartLO(Archive *AHX, Oid oid)
1397 : {
1398 178 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1399 :
1400 178 : if (!AH->StartLOPtr)
1401 0 : pg_fatal("large-object output not supported in chosen format");
1402 :
1403 178 : AH->StartLOPtr(AH, AH->currToc, oid);
1404 :
1405 178 : return 1;
1406 : }
1407 :
1408 : /* Called by a dumper to signal end of a LO */
1409 : int
1410 178 : EndLO(Archive *AHX, Oid oid)
1411 : {
1412 178 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1413 :
1414 178 : if (AH->EndLOPtr)
1415 178 : AH->EndLOPtr(AH, AH->currToc, oid);
1416 :
1417 178 : return 1;
1418 : }
1419 :
1420 : /**********
1421 : * Large Object Restoration
1422 : **********/
1423 :
1424 : /*
1425 : * Called by a format handler before a group of LOs is restored
1426 : */
1427 : void
1428 34 : StartRestoreLOs(ArchiveHandle *AH)
1429 : {
1430 34 : RestoreOptions *ropt = AH->public.ropt;
1431 :
1432 : /*
1433 : * LOs must be restored within a transaction block, since we need the LO
1434 : * handle to stay open while we write it. Establish a transaction unless
1435 : * there's one being used globally.
1436 : */
1437 34 : if (!(ropt->single_txn || ropt->txn_size > 0))
1438 : {
1439 34 : if (AH->connection)
1440 2 : StartTransaction(&AH->public);
1441 : else
1442 32 : ahprintf(AH, "BEGIN;\n\n");
1443 : }
1444 :
1445 34 : AH->loCount = 0;
1446 34 : }
1447 :
1448 : /*
1449 : * Called by a format handler after a group of LOs is restored
1450 : */
1451 : void
1452 34 : EndRestoreLOs(ArchiveHandle *AH)
1453 : {
1454 34 : RestoreOptions *ropt = AH->public.ropt;
1455 :
1456 34 : if (!(ropt->single_txn || ropt->txn_size > 0))
1457 : {
1458 34 : if (AH->connection)
1459 2 : CommitTransaction(&AH->public);
1460 : else
1461 32 : ahprintf(AH, "COMMIT;\n\n");
1462 : }
1463 :
1464 34 : pg_log_info(ngettext("restored %d large object",
1465 : "restored %d large objects",
1466 : AH->loCount),
1467 : AH->loCount);
1468 34 : }
1469 :
1470 :
1471 : /*
1472 : * Called by a format handler to initiate restoration of a LO
1473 : */
1474 : void
1475 38 : StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1476 : {
1477 38 : bool old_lo_style = (AH->version < K_VERS_1_12);
1478 : Oid loOid;
1479 :
1480 38 : AH->loCount++;
1481 :
1482 : /* Initialize the LO Buffer */
1483 38 : if (AH->lo_buf == NULL)
1484 : {
1485 : /* First time through (in this process) so allocate the buffer */
1486 18 : AH->lo_buf_size = LOBBUFSIZE;
1487 18 : AH->lo_buf = pg_malloc(LOBBUFSIZE);
1488 : }
1489 38 : AH->lo_buf_used = 0;
1490 :
1491 38 : pg_log_info("restoring large object with OID %u", oid);
1492 :
1493 : /* With an old archive we must do drop and create logic here */
1494 38 : if (old_lo_style && drop)
1495 0 : DropLOIfExists(AH, oid);
1496 :
1497 38 : if (AH->connection)
1498 : {
1499 6 : if (old_lo_style)
1500 : {
1501 0 : loOid = lo_create(AH->connection, oid);
1502 0 : if (loOid == 0 || loOid != oid)
1503 0 : pg_fatal("could not create large object %u: %s",
1504 : oid, PQerrorMessage(AH->connection));
1505 : }
1506 6 : AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1507 6 : if (AH->loFd == -1)
1508 0 : pg_fatal("could not open large object %u: %s",
1509 : oid, PQerrorMessage(AH->connection));
1510 : }
1511 : else
1512 : {
1513 32 : if (old_lo_style)
1514 0 : ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1515 : oid, INV_WRITE);
1516 : else
1517 32 : ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1518 : oid, INV_WRITE);
1519 : }
1520 :
1521 38 : AH->writingLO = true;
1522 38 : }
1523 :
1524 : void
1525 38 : EndRestoreLO(ArchiveHandle *AH, Oid oid)
1526 : {
1527 38 : if (AH->lo_buf_used > 0)
1528 : {
1529 : /* Write remaining bytes from the LO buffer */
1530 20 : dump_lo_buf(AH);
1531 : }
1532 :
1533 38 : AH->writingLO = false;
1534 :
1535 38 : if (AH->connection)
1536 : {
1537 6 : lo_close(AH->connection, AH->loFd);
1538 6 : AH->loFd = -1;
1539 : }
1540 : else
1541 : {
1542 32 : ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1543 : }
1544 38 : }
1545 :
1546 : /***********
1547 : * Sorting and Reordering
1548 : ***********/
1549 :
1550 : void
1551 0 : SortTocFromFile(Archive *AHX)
1552 : {
1553 0 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1554 0 : RestoreOptions *ropt = AH->public.ropt;
1555 : FILE *fh;
1556 : StringInfoData linebuf;
1557 :
1558 : /* Allocate space for the 'wanted' array, and init it */
1559 0 : ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1560 :
1561 : /* Setup the file */
1562 0 : fh = fopen(ropt->tocFile, PG_BINARY_R);
1563 0 : if (!fh)
1564 0 : pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1565 :
1566 0 : initStringInfo(&linebuf);
1567 :
1568 0 : while (pg_get_line_buf(fh, &linebuf))
1569 : {
1570 : char *cmnt;
1571 : char *endptr;
1572 : DumpId id;
1573 : TocEntry *te;
1574 :
1575 : /* Truncate line at comment, if any */
1576 0 : cmnt = strchr(linebuf.data, ';');
1577 0 : if (cmnt != NULL)
1578 : {
1579 0 : cmnt[0] = '\0';
1580 0 : linebuf.len = cmnt - linebuf.data;
1581 : }
1582 :
1583 : /* Ignore if all blank */
1584 0 : if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1585 0 : continue;
1586 :
1587 : /* Get an ID, check it's valid and not already seen */
1588 0 : id = strtol(linebuf.data, &endptr, 10);
1589 0 : if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1590 0 : ropt->idWanted[id - 1])
1591 : {
1592 0 : pg_log_warning("line ignored: %s", linebuf.data);
1593 0 : continue;
1594 : }
1595 :
1596 : /* Find TOC entry */
1597 0 : te = getTocEntryByDumpId(AH, id);
1598 0 : if (!te)
1599 0 : pg_fatal("could not find entry for ID %d",
1600 : id);
1601 :
1602 : /* Mark it wanted */
1603 0 : ropt->idWanted[id - 1] = true;
1604 :
1605 : /*
1606 : * Move each item to the end of the list as it is selected, so that
1607 : * they are placed in the desired order. Any unwanted items will end
1608 : * up at the front of the list, which may seem unintuitive but it's
1609 : * what we need. In an ordinary serial restore that makes no
1610 : * difference, but in a parallel restore we need to mark unrestored
1611 : * items' dependencies as satisfied before we start examining
1612 : * restorable items. Otherwise they could have surprising
1613 : * side-effects on the order in which restorable items actually get
1614 : * restored.
1615 : */
1616 0 : _moveBefore(AH->toc, te);
1617 : }
1618 :
1619 0 : pg_free(linebuf.data);
1620 :
1621 0 : if (fclose(fh) != 0)
1622 0 : pg_fatal("could not close TOC file: %m");
1623 0 : }
1624 :
1625 : /**********************
1626 : * Convenience functions that look like standard IO functions
1627 : * for writing data when in dump mode.
1628 : **********************/
1629 :
1630 : /* Public */
1631 : void
1632 46068 : archputs(const char *s, Archive *AH)
1633 : {
1634 46068 : WriteData(AH, s, strlen(s));
1635 46068 : }
1636 :
1637 : /* Public */
1638 : int
1639 12138 : archprintf(Archive *AH, const char *fmt,...)
1640 : {
1641 12138 : int save_errno = errno;
1642 : char *p;
1643 12138 : size_t len = 128; /* initial assumption about buffer size */
1644 : size_t cnt;
1645 :
1646 : for (;;)
1647 0 : {
1648 : va_list args;
1649 :
1650 : /* Allocate work buffer. */
1651 12138 : p = (char *) pg_malloc(len);
1652 :
1653 : /* Try to format the data. */
1654 12138 : errno = save_errno;
1655 12138 : va_start(args, fmt);
1656 12138 : cnt = pvsnprintf(p, len, fmt, args);
1657 12138 : va_end(args);
1658 :
1659 12138 : if (cnt < len)
1660 12138 : break; /* success */
1661 :
1662 : /* Release buffer and loop around to try again with larger len. */
1663 0 : free(p);
1664 0 : len = cnt;
1665 : }
1666 :
1667 12138 : WriteData(AH, p, cnt);
1668 12138 : free(p);
1669 12138 : return (int) cnt;
1670 : }
1671 :
1672 :
1673 : /*******************************
1674 : * Stuff below here should be 'private' to the archiver routines
1675 : *******************************/
1676 :
1677 : static void
1678 282 : SetOutput(ArchiveHandle *AH, const char *filename,
1679 : const pg_compress_specification compression_spec)
1680 : {
1681 : CompressFileHandle *CFH;
1682 : const char *mode;
1683 282 : int fn = -1;
1684 :
1685 282 : if (filename)
1686 : {
1687 282 : if (strcmp(filename, "-") == 0)
1688 0 : fn = fileno(stdout);
1689 : }
1690 0 : else if (AH->FH)
1691 0 : fn = fileno(AH->FH);
1692 0 : else if (AH->fSpec)
1693 : {
1694 0 : filename = AH->fSpec;
1695 : }
1696 : else
1697 0 : fn = fileno(stdout);
1698 :
1699 282 : if (AH->mode == archModeAppend)
1700 86 : mode = PG_BINARY_A;
1701 : else
1702 196 : mode = PG_BINARY_W;
1703 :
1704 282 : CFH = InitCompressFileHandle(compression_spec);
1705 :
1706 282 : if (!CFH->open_func(filename, fn, mode, CFH))
1707 : {
1708 0 : if (filename)
1709 0 : pg_fatal("could not open output file \"%s\": %m", filename);
1710 : else
1711 0 : pg_fatal("could not open output file: %m");
1712 : }
1713 :
1714 282 : AH->OF = CFH;
1715 282 : }
1716 :
1717 : static CompressFileHandle *
1718 378 : SaveOutput(ArchiveHandle *AH)
1719 : {
1720 378 : return (CompressFileHandle *) AH->OF;
1721 : }
1722 :
1723 : static void
1724 282 : RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
1725 : {
1726 282 : errno = 0;
1727 282 : if (!EndCompressFileHandle(AH->OF))
1728 0 : pg_fatal("could not close output file: %m");
1729 :
1730 282 : AH->OF = savedOutput;
1731 282 : }
1732 :
1733 :
1734 :
1735 : /*
1736 : * Print formatted text to the output file (usually stdout).
1737 : */
1738 : int
1739 507924 : ahprintf(ArchiveHandle *AH, const char *fmt,...)
1740 : {
1741 507924 : int save_errno = errno;
1742 : char *p;
1743 507924 : size_t len = 128; /* initial assumption about buffer size */
1744 : size_t cnt;
1745 :
1746 : for (;;)
1747 31768 : {
1748 : va_list args;
1749 :
1750 : /* Allocate work buffer. */
1751 539692 : p = (char *) pg_malloc(len);
1752 :
1753 : /* Try to format the data. */
1754 539692 : errno = save_errno;
1755 539692 : va_start(args, fmt);
1756 539692 : cnt = pvsnprintf(p, len, fmt, args);
1757 539692 : va_end(args);
1758 :
1759 539692 : if (cnt < len)
1760 507924 : break; /* success */
1761 :
1762 : /* Release buffer and loop around to try again with larger len. */
1763 31768 : free(p);
1764 31768 : len = cnt;
1765 : }
1766 :
1767 507924 : ahwrite(p, 1, cnt, AH);
1768 507924 : free(p);
1769 507924 : return (int) cnt;
1770 : }
1771 :
1772 : /*
1773 : * Single place for logic which says 'We are restoring to a direct DB connection'.
1774 : */
1775 : static int
1776 5817836 : RestoringToDB(ArchiveHandle *AH)
1777 : {
1778 5817836 : RestoreOptions *ropt = AH->public.ropt;
1779 :
1780 5817836 : return (ropt && ropt->useDB && AH->connection);
1781 : }
1782 :
1783 : /*
1784 : * Dump the current contents of the LO data buffer while writing a LO
1785 : */
1786 : static void
1787 20 : dump_lo_buf(ArchiveHandle *AH)
1788 : {
1789 20 : if (AH->connection)
1790 : {
1791 : int res;
1792 :
1793 4 : res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1794 4 : pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1795 : "wrote %zu bytes of large object data (result = %d)",
1796 : AH->lo_buf_used),
1797 : AH->lo_buf_used, res);
1798 : /* We assume there are no short writes, only errors */
1799 4 : if (res != AH->lo_buf_used)
1800 0 : warn_or_exit_horribly(AH, "could not write to large object: %s",
1801 0 : PQerrorMessage(AH->connection));
1802 : }
1803 : else
1804 : {
1805 16 : PQExpBuffer buf = createPQExpBuffer();
1806 :
1807 16 : appendByteaLiteralAHX(buf,
1808 : (const unsigned char *) AH->lo_buf,
1809 : AH->lo_buf_used,
1810 : AH);
1811 :
1812 : /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1813 16 : AH->writingLO = false;
1814 16 : ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1815 16 : AH->writingLO = true;
1816 :
1817 16 : destroyPQExpBuffer(buf);
1818 : }
1819 20 : AH->lo_buf_used = 0;
1820 20 : }
1821 :
1822 :
1823 : /*
1824 : * Write buffer to the output file (usually stdout). This is used for
1825 : * outputting 'restore' scripts etc. It is even possible for an archive
1826 : * format to create a custom output routine to 'fake' a restore if it
1827 : * wants to generate a script (see TAR output).
1828 : */
1829 : void
1830 5808772 : ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1831 : {
1832 5808772 : int bytes_written = 0;
1833 :
1834 5808772 : if (AH->writingLO)
1835 : {
1836 30 : size_t remaining = size * nmemb;
1837 :
1838 30 : while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1839 : {
1840 0 : size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1841 :
1842 0 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1843 0 : ptr = (const char *) ptr + avail;
1844 0 : remaining -= avail;
1845 0 : AH->lo_buf_used += avail;
1846 0 : dump_lo_buf(AH);
1847 : }
1848 :
1849 30 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1850 30 : AH->lo_buf_used += remaining;
1851 :
1852 30 : bytes_written = size * nmemb;
1853 : }
1854 5808742 : else if (AH->CustomOutPtr)
1855 4476 : bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1856 :
1857 : /*
1858 : * If we're doing a restore, and it's direct to DB, and we're connected
1859 : * then send it to the DB.
1860 : */
1861 5804266 : else if (RestoringToDB(AH))
1862 29344 : bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1863 : else
1864 : {
1865 5774922 : CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1866 :
1867 5774922 : if (CFH->write_func(ptr, size * nmemb, CFH))
1868 5774922 : bytes_written = size * nmemb;
1869 : }
1870 :
1871 5808772 : if (bytes_written != size * nmemb)
1872 0 : WRITE_ERROR_EXIT;
1873 5808772 : }
1874 :
1875 : /* on some error, we may decide to go on... */
1876 : void
1877 0 : warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1878 : {
1879 : va_list ap;
1880 :
1881 0 : switch (AH->stage)
1882 : {
1883 :
1884 0 : case STAGE_NONE:
1885 : /* Do nothing special */
1886 0 : break;
1887 :
1888 0 : case STAGE_INITIALIZING:
1889 0 : if (AH->stage != AH->lastErrorStage)
1890 0 : pg_log_info("while INITIALIZING:");
1891 0 : break;
1892 :
1893 0 : case STAGE_PROCESSING:
1894 0 : if (AH->stage != AH->lastErrorStage)
1895 0 : pg_log_info("while PROCESSING TOC:");
1896 0 : break;
1897 :
1898 0 : case STAGE_FINALIZING:
1899 0 : if (AH->stage != AH->lastErrorStage)
1900 0 : pg_log_info("while FINALIZING:");
1901 0 : break;
1902 : }
1903 0 : if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1904 : {
1905 0 : pg_log_info("from TOC entry %d; %u %u %s %s %s",
1906 : AH->currentTE->dumpId,
1907 : AH->currentTE->catalogId.tableoid,
1908 : AH->currentTE->catalogId.oid,
1909 : AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1910 : AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1911 : AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1912 : }
1913 0 : AH->lastErrorStage = AH->stage;
1914 0 : AH->lastErrorTE = AH->currentTE;
1915 :
1916 0 : va_start(ap, fmt);
1917 0 : pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, ap);
1918 0 : va_end(ap);
1919 :
1920 0 : if (AH->public.exit_on_error)
1921 0 : exit_nicely(1);
1922 : else
1923 0 : AH->public.n_errors++;
1924 0 : }
1925 :
1926 : #ifdef NOT_USED
1927 :
1928 : static void
1929 : _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1930 : {
1931 : /* Unlink te from list */
1932 : te->prev->next = te->next;
1933 : te->next->prev = te->prev;
1934 :
1935 : /* and insert it after "pos" */
1936 : te->prev = pos;
1937 : te->next = pos->next;
1938 : pos->next->prev = te;
1939 : pos->next = te;
1940 : }
1941 : #endif
1942 :
1943 : static void
1944 0 : _moveBefore(TocEntry *pos, TocEntry *te)
1945 : {
1946 : /* Unlink te from list */
1947 0 : te->prev->next = te->next;
1948 0 : te->next->prev = te->prev;
1949 :
1950 : /* and insert it before "pos" */
1951 0 : te->prev = pos->prev;
1952 0 : te->next = pos;
1953 0 : pos->prev->next = te;
1954 0 : pos->prev = te;
1955 0 : }
1956 :
1957 : /*
1958 : * Build index arrays for the TOC list
1959 : *
1960 : * This should be invoked only after we have created or read in all the TOC
1961 : * items.
1962 : *
1963 : * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1964 : * array entries run only up to maxDumpId. We might see dependency dump IDs
1965 : * beyond that (if the dump was partial); so always check the array bound
1966 : * before trying to touch an array entry.
1967 : */
1968 : static void
1969 442 : buildTocEntryArrays(ArchiveHandle *AH)
1970 : {
1971 442 : DumpId maxDumpId = AH->maxDumpId;
1972 : TocEntry *te;
1973 :
1974 442 : AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1975 442 : AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1976 :
1977 120260 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1978 : {
1979 : /* this check is purely paranoia, maxDumpId should be correct */
1980 119818 : if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1981 0 : pg_fatal("bad dumpId");
1982 :
1983 : /* tocsByDumpId indexes all TOCs by their dump ID */
1984 119818 : AH->tocsByDumpId[te->dumpId] = te;
1985 :
1986 : /*
1987 : * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1988 : * TOC entry that has a DATA item. We compute this by reversing the
1989 : * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1990 : * just one dependency and it is the TABLE item.
1991 : */
1992 119818 : if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1993 : {
1994 14480 : DumpId tableId = te->dependencies[0];
1995 :
1996 : /*
1997 : * The TABLE item might not have been in the archive, if this was
1998 : * a data-only dump; but its dump ID should be less than its data
1999 : * item's dump ID, so there should be a place for it in the array.
2000 : */
2001 14480 : if (tableId <= 0 || tableId > maxDumpId)
2002 0 : pg_fatal("bad table dumpId for TABLE DATA item");
2003 :
2004 14480 : AH->tableDataId[tableId] = te->dumpId;
2005 : }
2006 : }
2007 442 : }
2008 :
2009 : TocEntry *
2010 36398 : getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
2011 : {
2012 : /* build index arrays if we didn't already */
2013 36398 : if (AH->tocsByDumpId == NULL)
2014 76 : buildTocEntryArrays(AH);
2015 :
2016 36398 : if (id > 0 && id <= AH->maxDumpId)
2017 36398 : return AH->tocsByDumpId[id];
2018 :
2019 0 : return NULL;
2020 : }
2021 :
2022 : int
2023 32302 : TocIDRequired(ArchiveHandle *AH, DumpId id)
2024 : {
2025 32302 : TocEntry *te = getTocEntryByDumpId(AH, id);
2026 :
2027 32302 : if (!te)
2028 15218 : return 0;
2029 :
2030 17084 : return te->reqs;
2031 : }
2032 :
2033 : size_t
2034 12434 : WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
2035 : {
2036 : int off;
2037 :
2038 : /* Save the flag */
2039 12434 : AH->WriteBytePtr(AH, wasSet);
2040 :
2041 : /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2042 111906 : for (off = 0; off < sizeof(pgoff_t); off++)
2043 : {
2044 99472 : AH->WriteBytePtr(AH, o & 0xFF);
2045 99472 : o >>= 8;
2046 : }
2047 12434 : return sizeof(pgoff_t) + 1;
2048 : }
2049 :
2050 : int
2051 12350 : ReadOffset(ArchiveHandle *AH, pgoff_t * o)
2052 : {
2053 : int i;
2054 : int off;
2055 : int offsetFlg;
2056 :
2057 : /* Initialize to zero */
2058 12350 : *o = 0;
2059 :
2060 : /* Check for old version */
2061 12350 : if (AH->version < K_VERS_1_7)
2062 : {
2063 : /* Prior versions wrote offsets using WriteInt */
2064 0 : i = ReadInt(AH);
2065 : /* -1 means not set */
2066 0 : if (i < 0)
2067 0 : return K_OFFSET_POS_NOT_SET;
2068 0 : else if (i == 0)
2069 0 : return K_OFFSET_NO_DATA;
2070 :
2071 : /* Cast to pgoff_t because it was written as an int. */
2072 0 : *o = (pgoff_t) i;
2073 0 : return K_OFFSET_POS_SET;
2074 : }
2075 :
2076 : /*
2077 : * Read the flag indicating the state of the data pointer. Check if valid
2078 : * and die if not.
2079 : *
2080 : * This used to be handled by a negative or zero pointer, now we use an
2081 : * extra byte specifically for the state.
2082 : */
2083 12350 : offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2084 :
2085 12350 : switch (offsetFlg)
2086 : {
2087 12350 : case K_OFFSET_POS_NOT_SET:
2088 : case K_OFFSET_NO_DATA:
2089 : case K_OFFSET_POS_SET:
2090 :
2091 12350 : break;
2092 :
2093 0 : default:
2094 0 : pg_fatal("unexpected data offset flag %d", offsetFlg);
2095 : }
2096 :
2097 : /*
2098 : * Read the bytes
2099 : */
2100 111150 : for (off = 0; off < AH->offSize; off++)
2101 : {
2102 98800 : if (off < sizeof(pgoff_t))
2103 98800 : *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2104 : else
2105 : {
2106 0 : if (AH->ReadBytePtr(AH) != 0)
2107 0 : pg_fatal("file offset in dump file is too large");
2108 : }
2109 : }
2110 :
2111 12350 : return offsetFlg;
2112 : }
2113 :
2114 : size_t
2115 420956 : WriteInt(ArchiveHandle *AH, int i)
2116 : {
2117 : int b;
2118 :
2119 : /*
2120 : * This is a bit yucky, but I don't want to make the binary format very
2121 : * dependent on representation, and not knowing much about it, I write out
2122 : * a sign byte. If you change this, don't forget to change the file
2123 : * version #, and modify ReadInt to read the new format AS WELL AS the old
2124 : * formats.
2125 : */
2126 :
2127 : /* SIGN byte */
2128 420956 : if (i < 0)
2129 : {
2130 96040 : AH->WriteBytePtr(AH, 1);
2131 96040 : i = -i;
2132 : }
2133 : else
2134 324916 : AH->WriteBytePtr(AH, 0);
2135 :
2136 2104780 : for (b = 0; b < AH->intSize; b++)
2137 : {
2138 1683824 : AH->WriteBytePtr(AH, i & 0xFF);
2139 1683824 : i >>= 8;
2140 : }
2141 :
2142 420956 : return AH->intSize + 1;
2143 : }
2144 :
2145 : int
2146 434778 : ReadInt(ArchiveHandle *AH)
2147 : {
2148 434778 : int res = 0;
2149 : int bv,
2150 : b;
2151 434778 : int sign = 0; /* Default positive */
2152 434778 : int bitShift = 0;
2153 :
2154 434778 : if (AH->version > K_VERS_1_0)
2155 : /* Read a sign byte */
2156 434778 : sign = AH->ReadBytePtr(AH);
2157 :
2158 2173890 : for (b = 0; b < AH->intSize; b++)
2159 : {
2160 1739112 : bv = AH->ReadBytePtr(AH) & 0xFF;
2161 1739112 : if (bv != 0)
2162 409056 : res = res + (bv << bitShift);
2163 1739112 : bitShift += 8;
2164 : }
2165 :
2166 434778 : if (sign)
2167 98968 : res = -res;
2168 :
2169 434778 : return res;
2170 : }
2171 :
2172 : size_t
2173 330374 : WriteStr(ArchiveHandle *AH, const char *c)
2174 : {
2175 : size_t res;
2176 :
2177 330374 : if (c)
2178 : {
2179 234334 : int len = strlen(c);
2180 :
2181 234334 : res = WriteInt(AH, len);
2182 234334 : AH->WriteBufPtr(AH, c, len);
2183 234334 : res += len;
2184 : }
2185 : else
2186 96040 : res = WriteInt(AH, -1);
2187 :
2188 330374 : return res;
2189 : }
2190 :
2191 : char *
2192 341578 : ReadStr(ArchiveHandle *AH)
2193 : {
2194 : char *buf;
2195 : int l;
2196 :
2197 341578 : l = ReadInt(AH);
2198 341578 : if (l < 0)
2199 98968 : buf = NULL;
2200 : else
2201 : {
2202 242610 : buf = (char *) pg_malloc(l + 1);
2203 242610 : AH->ReadBufPtr(AH, buf, l);
2204 :
2205 242610 : buf[l] = '\0';
2206 : }
2207 :
2208 341578 : return buf;
2209 : }
2210 :
2211 : static bool
2212 24 : _fileExistsInDirectory(const char *dir, const char *filename)
2213 : {
2214 : struct stat st;
2215 : char buf[MAXPGPATH];
2216 :
2217 24 : if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2218 0 : pg_fatal("directory name too long: \"%s\"", dir);
2219 :
2220 24 : return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2221 : }
2222 :
2223 : static int
2224 94 : _discoverArchiveFormat(ArchiveHandle *AH)
2225 : {
2226 : FILE *fh;
2227 : char sig[6]; /* More than enough */
2228 : size_t cnt;
2229 94 : int wantClose = 0;
2230 :
2231 94 : pg_log_debug("attempting to ascertain archive format");
2232 :
2233 94 : free(AH->lookahead);
2234 :
2235 94 : AH->readHeader = 0;
2236 94 : AH->lookaheadSize = 512;
2237 94 : AH->lookahead = pg_malloc0(512);
2238 94 : AH->lookaheadLen = 0;
2239 94 : AH->lookaheadPos = 0;
2240 :
2241 94 : if (AH->fSpec)
2242 : {
2243 : struct stat st;
2244 :
2245 94 : wantClose = 1;
2246 :
2247 : /*
2248 : * Check if the specified archive is a directory. If so, check if
2249 : * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2250 : */
2251 94 : if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2252 : {
2253 24 : AH->format = archDirectory;
2254 24 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2255 24 : return AH->format;
2256 : #ifdef HAVE_LIBZ
2257 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2258 0 : return AH->format;
2259 : #endif
2260 : #ifdef USE_LZ4
2261 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2262 0 : return AH->format;
2263 : #endif
2264 : #ifdef USE_ZSTD
2265 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2266 : return AH->format;
2267 : #endif
2268 0 : pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2269 : AH->fSpec);
2270 : fh = NULL; /* keep compiler quiet */
2271 : }
2272 : else
2273 : {
2274 70 : fh = fopen(AH->fSpec, PG_BINARY_R);
2275 70 : if (!fh)
2276 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2277 : }
2278 : }
2279 : else
2280 : {
2281 0 : fh = stdin;
2282 0 : if (!fh)
2283 0 : pg_fatal("could not open input file: %m");
2284 : }
2285 :
2286 70 : if ((cnt = fread(sig, 1, 5, fh)) != 5)
2287 : {
2288 0 : if (ferror(fh))
2289 0 : pg_fatal("could not read input file: %m");
2290 : else
2291 0 : pg_fatal("input file is too short (read %lu, expected 5)",
2292 : (unsigned long) cnt);
2293 : }
2294 :
2295 : /* Save it, just in case we need it later */
2296 70 : memcpy(&AH->lookahead[0], sig, 5);
2297 70 : AH->lookaheadLen = 5;
2298 :
2299 70 : if (strncmp(sig, "PGDMP", 5) == 0)
2300 : {
2301 : /* It's custom format, stop here */
2302 68 : AH->format = archCustom;
2303 68 : AH->readHeader = 1;
2304 : }
2305 : else
2306 : {
2307 : /*
2308 : * *Maybe* we have a tar archive format file or a text dump ... So,
2309 : * read first 512 byte header...
2310 : */
2311 2 : cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2312 : /* read failure is checked below */
2313 2 : AH->lookaheadLen += cnt;
2314 :
2315 2 : if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2316 2 : (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2317 2 : strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2318 : {
2319 : /*
2320 : * looks like it's probably a text format dump. so suggest they
2321 : * try psql
2322 : */
2323 0 : pg_fatal("input file appears to be a text format dump. Please use psql.");
2324 : }
2325 :
2326 2 : if (AH->lookaheadLen != 512)
2327 : {
2328 0 : if (feof(fh))
2329 0 : pg_fatal("input file does not appear to be a valid archive (too short?)");
2330 : else
2331 0 : READ_ERROR_EXIT(fh);
2332 : }
2333 :
2334 2 : if (!isValidTarHeader(AH->lookahead))
2335 0 : pg_fatal("input file does not appear to be a valid archive");
2336 :
2337 2 : AH->format = archTar;
2338 : }
2339 :
2340 : /* Close the file if we opened it */
2341 70 : if (wantClose)
2342 : {
2343 70 : if (fclose(fh) != 0)
2344 0 : pg_fatal("could not close input file: %m");
2345 : /* Forget lookahead, since we'll re-read header after re-opening */
2346 70 : AH->readHeader = 0;
2347 70 : AH->lookaheadLen = 0;
2348 : }
2349 :
2350 70 : return AH->format;
2351 : }
2352 :
2353 :
2354 : /*
2355 : * Allocate an archive handle
2356 : */
2357 : static ArchiveHandle *
2358 522 : _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2359 : const pg_compress_specification compression_spec,
2360 : bool dosync, ArchiveMode mode,
2361 : SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
2362 : {
2363 : ArchiveHandle *AH;
2364 : CompressFileHandle *CFH;
2365 522 : pg_compress_specification out_compress_spec = {0};
2366 :
2367 522 : pg_log_debug("allocating AH for %s, format %d",
2368 : FileSpec ? FileSpec : "(stdio)", fmt);
2369 :
2370 522 : AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2371 :
2372 522 : AH->version = K_VERS_SELF;
2373 :
2374 : /* initialize for backwards compatible string processing */
2375 522 : AH->public.encoding = 0; /* PG_SQL_ASCII */
2376 522 : AH->public.std_strings = false;
2377 :
2378 : /* sql error handling */
2379 522 : AH->public.exit_on_error = true;
2380 522 : AH->public.n_errors = 0;
2381 :
2382 522 : AH->archiveDumpVersion = PG_VERSION;
2383 :
2384 522 : AH->createDate = time(NULL);
2385 :
2386 522 : AH->intSize = sizeof(int);
2387 522 : AH->offSize = sizeof(pgoff_t);
2388 522 : if (FileSpec)
2389 : {
2390 472 : AH->fSpec = pg_strdup(FileSpec);
2391 :
2392 : /*
2393 : * Not used; maybe later....
2394 : *
2395 : * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2396 : * i--) if (AH->workDir[i-1] == '/')
2397 : */
2398 : }
2399 : else
2400 50 : AH->fSpec = NULL;
2401 :
2402 522 : AH->currUser = NULL; /* unknown */
2403 522 : AH->currSchema = NULL; /* ditto */
2404 522 : AH->currTablespace = NULL; /* ditto */
2405 522 : AH->currTableAm = NULL; /* ditto */
2406 :
2407 522 : AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2408 :
2409 522 : AH->toc->next = AH->toc;
2410 522 : AH->toc->prev = AH->toc;
2411 :
2412 522 : AH->mode = mode;
2413 522 : AH->compression_spec = compression_spec;
2414 522 : AH->dosync = dosync;
2415 522 : AH->sync_method = sync_method;
2416 :
2417 522 : memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2418 :
2419 : /* Open stdout with no compression for AH output handle */
2420 522 : out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2421 522 : CFH = InitCompressFileHandle(out_compress_spec);
2422 522 : if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2423 0 : pg_fatal("could not open stdout for appending: %m");
2424 522 : AH->OF = CFH;
2425 :
2426 : /*
2427 : * On Windows, we need to use binary mode to read/write non-text files,
2428 : * which include all archive formats as well as compressed plain text.
2429 : * Force stdin/stdout into binary mode if that is what we are using.
2430 : */
2431 : #ifdef WIN32
2432 : if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2433 : (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2434 : {
2435 : if (mode == archModeWrite)
2436 : _setmode(fileno(stdout), O_BINARY);
2437 : else
2438 : _setmode(fileno(stdin), O_BINARY);
2439 : }
2440 : #endif
2441 :
2442 522 : AH->SetupWorkerPtr = setupWorkerPtr;
2443 :
2444 522 : if (fmt == archUnknown)
2445 94 : AH->format = _discoverArchiveFormat(AH);
2446 : else
2447 428 : AH->format = fmt;
2448 :
2449 522 : switch (AH->format)
2450 : {
2451 166 : case archCustom:
2452 166 : InitArchiveFmt_Custom(AH);
2453 166 : break;
2454 :
2455 298 : case archNull:
2456 298 : InitArchiveFmt_Null(AH);
2457 298 : break;
2458 :
2459 48 : case archDirectory:
2460 48 : InitArchiveFmt_Directory(AH);
2461 48 : break;
2462 :
2463 10 : case archTar:
2464 10 : InitArchiveFmt_Tar(AH);
2465 8 : break;
2466 :
2467 0 : default:
2468 0 : pg_fatal("unrecognized file format \"%d\"", AH->format);
2469 : }
2470 :
2471 520 : return AH;
2472 : }
2473 :
2474 : /*
2475 : * Write out all data (tables & LOs)
2476 : */
2477 : void
2478 108 : WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2479 : {
2480 : TocEntry *te;
2481 :
2482 108 : if (pstate && pstate->numWorkers > 1)
2483 18 : {
2484 : /*
2485 : * In parallel mode, this code runs in the leader process. We
2486 : * construct an array of candidate TEs, then sort it into decreasing
2487 : * size order, then dispatch each TE to a data-transfer worker. By
2488 : * dumping larger tables first, we avoid getting into a situation
2489 : * where we're down to one job and it's big, losing parallelism.
2490 : */
2491 : TocEntry **tes;
2492 : int ntes;
2493 :
2494 18 : tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2495 18 : ntes = 0;
2496 8110 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2497 : {
2498 : /* Consider only TEs with dataDumper functions ... */
2499 8092 : if (!te->dataDumper)
2500 6540 : continue;
2501 : /* ... and ignore ones not enabled for dump */
2502 1552 : if ((te->reqs & REQ_DATA) == 0)
2503 0 : continue;
2504 :
2505 1552 : tes[ntes++] = te;
2506 : }
2507 :
2508 18 : if (ntes > 1)
2509 16 : qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2510 :
2511 1570 : for (int i = 0; i < ntes; i++)
2512 1552 : DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2513 : mark_dump_job_done, NULL);
2514 :
2515 18 : pg_free(tes);
2516 :
2517 : /* Now wait for workers to finish. */
2518 18 : WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2519 : }
2520 : else
2521 : {
2522 : /* Non-parallel mode: just dump all candidate TEs sequentially. */
2523 11732 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2524 : {
2525 : /* Must have same filter conditions as above */
2526 11642 : if (!te->dataDumper)
2527 11074 : continue;
2528 568 : if ((te->reqs & REQ_DATA) == 0)
2529 6 : continue;
2530 :
2531 562 : WriteDataChunksForTocEntry(AH, te);
2532 : }
2533 : }
2534 108 : }
2535 :
2536 :
2537 : /*
2538 : * Callback function that's invoked in the leader process after a step has
2539 : * been parallel dumped.
2540 : *
2541 : * We don't need to do anything except check for worker failure.
2542 : */
2543 : static void
2544 1552 : mark_dump_job_done(ArchiveHandle *AH,
2545 : TocEntry *te,
2546 : int status,
2547 : void *callback_data)
2548 : {
2549 1552 : pg_log_info("finished item %d %s %s",
2550 : te->dumpId, te->desc, te->tag);
2551 :
2552 1552 : if (status != 0)
2553 0 : pg_fatal("worker process failed: exit code %d",
2554 : status);
2555 1552 : }
2556 :
2557 :
2558 : void
2559 2114 : WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2560 : {
2561 : StartDataPtrType startPtr;
2562 : EndDataPtrType endPtr;
2563 :
2564 2114 : AH->currToc = te;
2565 :
2566 2114 : if (strcmp(te->desc, "BLOBS") == 0)
2567 : {
2568 34 : startPtr = AH->StartLOsPtr;
2569 34 : endPtr = AH->EndLOsPtr;
2570 : }
2571 : else
2572 : {
2573 2080 : startPtr = AH->StartDataPtr;
2574 2080 : endPtr = AH->EndDataPtr;
2575 : }
2576 :
2577 2114 : if (startPtr != NULL)
2578 2114 : (*startPtr) (AH, te);
2579 :
2580 : /*
2581 : * The user-provided DataDumper routine needs to call AH->WriteData
2582 : */
2583 2114 : te->dataDumper((Archive *) AH, te->dataDumperArg);
2584 :
2585 2114 : if (endPtr != NULL)
2586 2114 : (*endPtr) (AH, te);
2587 :
2588 2114 : AH->currToc = NULL;
2589 2114 : }
2590 :
2591 : void
2592 120 : WriteToc(ArchiveHandle *AH)
2593 : {
2594 : TocEntry *te;
2595 : char workbuf[32];
2596 : int tocCount;
2597 : int i;
2598 :
2599 : /* count entries that will actually be dumped */
2600 120 : tocCount = 0;
2601 22228 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2602 : {
2603 22108 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) != 0)
2604 22102 : tocCount++;
2605 : }
2606 :
2607 : /* printf("%d TOC Entries to save\n", tocCount); */
2608 :
2609 120 : WriteInt(AH, tocCount);
2610 :
2611 22228 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2612 : {
2613 22108 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) == 0)
2614 6 : continue;
2615 :
2616 22102 : WriteInt(AH, te->dumpId);
2617 22102 : WriteInt(AH, te->dataDumper ? 1 : 0);
2618 :
2619 : /* OID is recorded as a string for historical reasons */
2620 22102 : sprintf(workbuf, "%u", te->catalogId.tableoid);
2621 22102 : WriteStr(AH, workbuf);
2622 22102 : sprintf(workbuf, "%u", te->catalogId.oid);
2623 22102 : WriteStr(AH, workbuf);
2624 :
2625 22102 : WriteStr(AH, te->tag);
2626 22102 : WriteStr(AH, te->desc);
2627 22102 : WriteInt(AH, te->section);
2628 :
2629 22102 : if (te->defnLen)
2630 : {
2631 : /*
2632 : * defnLen should only be set for custom format's second call to
2633 : * WriteToc(), which rewrites the TOC in place to update data
2634 : * offsets. Instead of calling the defnDumper a second time
2635 : * (which could involve re-executing queries), just skip writing
2636 : * the entry. While regenerating the definition should
2637 : * theoretically produce the same result as before, it's expensive
2638 : * and feels risky.
2639 : *
2640 : * The custom format only calls WriteToc() a second time if
2641 : * fseeko() is usable (see _CloseArchive() in pg_backup_custom.c),
2642 : * so we can safely use it without checking. For other formats,
2643 : * we fail because one of our assumptions must no longer hold
2644 : * true.
2645 : *
2646 : * XXX This is a layering violation, but the alternative is an
2647 : * awkward and complicated callback infrastructure for this
2648 : * special case. This might be worth revisiting in the future.
2649 : */
2650 414 : if (AH->format != archCustom)
2651 0 : pg_fatal("unexpected TOC entry in WriteToc(): %d %s %s",
2652 : te->dumpId, te->desc, te->tag);
2653 :
2654 414 : if (fseeko(AH->FH, te->defnLen, SEEK_CUR) != 0)
2655 0 : pg_fatal("error during file seek: %m");
2656 : }
2657 21688 : else if (te->defnDumper)
2658 : {
2659 3346 : char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
2660 :
2661 3346 : te->defnLen = WriteStr(AH, defn);
2662 3346 : pg_free(defn);
2663 : }
2664 : else
2665 18342 : WriteStr(AH, te->defn);
2666 :
2667 22102 : WriteStr(AH, te->dropStmt);
2668 22102 : WriteStr(AH, te->copyStmt);
2669 22102 : WriteStr(AH, te->namespace);
2670 22102 : WriteStr(AH, te->tablespace);
2671 22102 : WriteStr(AH, te->tableam);
2672 22102 : WriteInt(AH, te->relkind);
2673 22102 : WriteStr(AH, te->owner);
2674 22102 : WriteStr(AH, "false");
2675 :
2676 : /* Dump list of dependencies */
2677 55572 : for (i = 0; i < te->nDeps; i++)
2678 : {
2679 33470 : sprintf(workbuf, "%d", te->dependencies[i]);
2680 33470 : WriteStr(AH, workbuf);
2681 : }
2682 22102 : WriteStr(AH, NULL); /* Terminate List */
2683 :
2684 22102 : if (AH->WriteExtraTocPtr)
2685 22102 : AH->WriteExtraTocPtr(AH, te);
2686 : }
2687 120 : }
2688 :
2689 : void
2690 114 : ReadToc(ArchiveHandle *AH)
2691 : {
2692 : int i;
2693 : char *tmp;
2694 : DumpId *deps;
2695 : int depIdx;
2696 : int depSize;
2697 : TocEntry *te;
2698 : bool is_supported;
2699 :
2700 114 : AH->tocCount = ReadInt(AH);
2701 114 : AH->maxDumpId = 0;
2702 :
2703 22890 : for (i = 0; i < AH->tocCount; i++)
2704 : {
2705 22776 : te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2706 22776 : te->dumpId = ReadInt(AH);
2707 :
2708 22776 : if (te->dumpId > AH->maxDumpId)
2709 7160 : AH->maxDumpId = te->dumpId;
2710 :
2711 : /* Sanity check */
2712 22776 : if (te->dumpId <= 0)
2713 0 : pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2714 : te->dumpId);
2715 :
2716 22776 : te->hadDumper = ReadInt(AH);
2717 :
2718 22776 : if (AH->version >= K_VERS_1_8)
2719 : {
2720 22776 : tmp = ReadStr(AH);
2721 22776 : sscanf(tmp, "%u", &te->catalogId.tableoid);
2722 22776 : free(tmp);
2723 : }
2724 : else
2725 0 : te->catalogId.tableoid = InvalidOid;
2726 22776 : tmp = ReadStr(AH);
2727 22776 : sscanf(tmp, "%u", &te->catalogId.oid);
2728 22776 : free(tmp);
2729 :
2730 22776 : te->tag = ReadStr(AH);
2731 22776 : te->desc = ReadStr(AH);
2732 :
2733 22776 : if (AH->version >= K_VERS_1_11)
2734 : {
2735 22776 : te->section = ReadInt(AH);
2736 : }
2737 : else
2738 : {
2739 : /*
2740 : * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2741 : * the entries into sections. This list need not cover entry
2742 : * types added later than 8.4.
2743 : */
2744 0 : if (strcmp(te->desc, "COMMENT") == 0 ||
2745 0 : strcmp(te->desc, "ACL") == 0 ||
2746 0 : strcmp(te->desc, "ACL LANGUAGE") == 0)
2747 0 : te->section = SECTION_NONE;
2748 0 : else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2749 0 : strcmp(te->desc, "BLOBS") == 0 ||
2750 0 : strcmp(te->desc, "BLOB COMMENTS") == 0)
2751 0 : te->section = SECTION_DATA;
2752 0 : else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2753 0 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2754 0 : strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2755 0 : strcmp(te->desc, "INDEX") == 0 ||
2756 0 : strcmp(te->desc, "RULE") == 0 ||
2757 0 : strcmp(te->desc, "TRIGGER") == 0)
2758 0 : te->section = SECTION_POST_DATA;
2759 : else
2760 0 : te->section = SECTION_PRE_DATA;
2761 : }
2762 :
2763 22776 : te->defn = ReadStr(AH);
2764 22776 : te->dropStmt = ReadStr(AH);
2765 :
2766 22776 : if (AH->version >= K_VERS_1_3)
2767 22776 : te->copyStmt = ReadStr(AH);
2768 :
2769 22776 : if (AH->version >= K_VERS_1_6)
2770 22776 : te->namespace = ReadStr(AH);
2771 :
2772 22776 : if (AH->version >= K_VERS_1_10)
2773 22776 : te->tablespace = ReadStr(AH);
2774 :
2775 22776 : if (AH->version >= K_VERS_1_14)
2776 22776 : te->tableam = ReadStr(AH);
2777 :
2778 22776 : if (AH->version >= K_VERS_1_16)
2779 22776 : te->relkind = ReadInt(AH);
2780 :
2781 22776 : te->owner = ReadStr(AH);
2782 22776 : is_supported = true;
2783 22776 : if (AH->version < K_VERS_1_9)
2784 0 : is_supported = false;
2785 : else
2786 : {
2787 22776 : tmp = ReadStr(AH);
2788 :
2789 22776 : if (strcmp(tmp, "true") == 0)
2790 0 : is_supported = false;
2791 :
2792 22776 : free(tmp);
2793 : }
2794 :
2795 22776 : if (!is_supported)
2796 0 : pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2797 :
2798 : /* Read TOC entry dependencies */
2799 22776 : if (AH->version >= K_VERS_1_5)
2800 : {
2801 22776 : depSize = 100;
2802 22776 : deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2803 22776 : depIdx = 0;
2804 : for (;;)
2805 : {
2806 57498 : tmp = ReadStr(AH);
2807 57498 : if (!tmp)
2808 22776 : break; /* end of list */
2809 34722 : if (depIdx >= depSize)
2810 : {
2811 0 : depSize *= 2;
2812 0 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2813 : }
2814 34722 : sscanf(tmp, "%d", &deps[depIdx]);
2815 34722 : free(tmp);
2816 34722 : depIdx++;
2817 : }
2818 :
2819 22776 : if (depIdx > 0) /* We have a non-null entry */
2820 : {
2821 18626 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2822 18626 : te->dependencies = deps;
2823 18626 : te->nDeps = depIdx;
2824 : }
2825 : else
2826 : {
2827 4150 : free(deps);
2828 4150 : te->dependencies = NULL;
2829 4150 : te->nDeps = 0;
2830 : }
2831 : }
2832 : else
2833 : {
2834 0 : te->dependencies = NULL;
2835 0 : te->nDeps = 0;
2836 : }
2837 22776 : te->dataLength = 0;
2838 :
2839 22776 : if (AH->ReadExtraTocPtr)
2840 22776 : AH->ReadExtraTocPtr(AH, te);
2841 :
2842 22776 : pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2843 : i, te->dumpId, te->desc, te->tag);
2844 :
2845 : /* link completed entry into TOC circular list */
2846 22776 : te->prev = AH->toc->prev;
2847 22776 : AH->toc->prev->next = te;
2848 22776 : AH->toc->prev = te;
2849 22776 : te->next = AH->toc;
2850 :
2851 : /* special processing immediately upon read for some items */
2852 22776 : if (strcmp(te->desc, "ENCODING") == 0)
2853 114 : processEncodingEntry(AH, te);
2854 22662 : else if (strcmp(te->desc, "STDSTRINGS") == 0)
2855 114 : processStdStringsEntry(AH, te);
2856 22548 : else if (strcmp(te->desc, "SEARCHPATH") == 0)
2857 114 : processSearchPathEntry(AH, te);
2858 : }
2859 114 : }
2860 :
2861 : static void
2862 114 : processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2863 : {
2864 : /* te->defn should have the form SET client_encoding = 'foo'; */
2865 114 : char *defn = pg_strdup(te->defn);
2866 : char *ptr1;
2867 114 : char *ptr2 = NULL;
2868 : int encoding;
2869 :
2870 114 : ptr1 = strchr(defn, '\'');
2871 114 : if (ptr1)
2872 114 : ptr2 = strchr(++ptr1, '\'');
2873 114 : if (ptr2)
2874 : {
2875 114 : *ptr2 = '\0';
2876 114 : encoding = pg_char_to_encoding(ptr1);
2877 114 : if (encoding < 0)
2878 0 : pg_fatal("unrecognized encoding \"%s\"",
2879 : ptr1);
2880 114 : AH->public.encoding = encoding;
2881 114 : setFmtEncoding(encoding);
2882 : }
2883 : else
2884 0 : pg_fatal("invalid ENCODING item: %s",
2885 : te->defn);
2886 :
2887 114 : free(defn);
2888 114 : }
2889 :
2890 : static void
2891 114 : processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2892 : {
2893 : /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2894 : char *ptr1;
2895 :
2896 114 : ptr1 = strchr(te->defn, '\'');
2897 114 : if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2898 114 : AH->public.std_strings = true;
2899 0 : else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2900 0 : AH->public.std_strings = false;
2901 : else
2902 0 : pg_fatal("invalid STDSTRINGS item: %s",
2903 : te->defn);
2904 114 : }
2905 :
2906 : static void
2907 114 : processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2908 : {
2909 : /*
2910 : * te->defn should contain a command to set search_path. We just copy it
2911 : * verbatim for use later.
2912 : */
2913 114 : AH->public.searchpath = pg_strdup(te->defn);
2914 114 : }
2915 :
2916 : static void
2917 0 : StrictNamesCheck(RestoreOptions *ropt)
2918 : {
2919 : const char *missing_name;
2920 :
2921 : Assert(ropt->strict_names);
2922 :
2923 0 : if (ropt->schemaNames.head != NULL)
2924 : {
2925 0 : missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2926 0 : if (missing_name != NULL)
2927 0 : pg_fatal("schema \"%s\" not found", missing_name);
2928 : }
2929 :
2930 0 : if (ropt->tableNames.head != NULL)
2931 : {
2932 0 : missing_name = simple_string_list_not_touched(&ropt->tableNames);
2933 0 : if (missing_name != NULL)
2934 0 : pg_fatal("table \"%s\" not found", missing_name);
2935 : }
2936 :
2937 0 : if (ropt->indexNames.head != NULL)
2938 : {
2939 0 : missing_name = simple_string_list_not_touched(&ropt->indexNames);
2940 0 : if (missing_name != NULL)
2941 0 : pg_fatal("index \"%s\" not found", missing_name);
2942 : }
2943 :
2944 0 : if (ropt->functionNames.head != NULL)
2945 : {
2946 0 : missing_name = simple_string_list_not_touched(&ropt->functionNames);
2947 0 : if (missing_name != NULL)
2948 0 : pg_fatal("function \"%s\" not found", missing_name);
2949 : }
2950 :
2951 0 : if (ropt->triggerNames.head != NULL)
2952 : {
2953 0 : missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2954 0 : if (missing_name != NULL)
2955 0 : pg_fatal("trigger \"%s\" not found", missing_name);
2956 : }
2957 0 : }
2958 :
2959 : /*
2960 : * Determine whether we want to restore this TOC entry.
2961 : *
2962 : * Returns 0 if entry should be skipped, or some combination of the
2963 : * REQ_SCHEMA, REQ_DATA, and REQ_STATS bits if we want to restore schema, data
2964 : * and/or statistics portions of this TOC entry, or REQ_SPECIAL if it's a
2965 : * special entry.
2966 : */
2967 : static int
2968 123136 : _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2969 : {
2970 123136 : int res = REQ_SCHEMA | REQ_DATA;
2971 123136 : RestoreOptions *ropt = AH->public.ropt;
2972 :
2973 : /*
2974 : * For binary upgrade mode, dump pg_largeobject_metadata and the
2975 : * associated pg_shdepend rows. This is faster to restore than the
2976 : * equivalent set of large object commands. We can only do this for
2977 : * upgrades from v12 and newer; in older versions, pg_largeobject_metadata
2978 : * was created WITH OIDS, so the OID column is hidden and won't be dumped.
2979 : */
2980 123136 : if (ropt->binary_upgrade && AH->public.remoteVersion >= 120000 &&
2981 7790 : strcmp(te->desc, "TABLE DATA") == 0 &&
2982 146 : (te->catalogId.oid == LargeObjectMetadataRelationId ||
2983 74 : te->catalogId.oid == SharedDependRelationId))
2984 144 : return REQ_DATA;
2985 :
2986 : /* These items are treated specially */
2987 122992 : if (strcmp(te->desc, "ENCODING") == 0 ||
2988 122510 : strcmp(te->desc, "STDSTRINGS") == 0 ||
2989 122028 : strcmp(te->desc, "SEARCHPATH") == 0)
2990 1446 : return REQ_SPECIAL;
2991 :
2992 121546 : if (strcmp(te->desc, "STATISTICS DATA") == 0)
2993 : {
2994 10838 : if (!ropt->dumpStatistics)
2995 0 : return 0;
2996 :
2997 10838 : res = REQ_STATS;
2998 : }
2999 :
3000 : /*
3001 : * DATABASE and DATABASE PROPERTIES also have a special rule: they are
3002 : * restored in createDB mode, and not restored otherwise, independently of
3003 : * all else.
3004 : */
3005 121546 : if (strcmp(te->desc, "DATABASE") == 0 ||
3006 121266 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
3007 : {
3008 420 : if (ropt->createDB)
3009 364 : return REQ_SCHEMA;
3010 : else
3011 56 : return 0;
3012 : }
3013 :
3014 : /*
3015 : * Process exclusions that affect certain classes of TOC entries.
3016 : */
3017 :
3018 : /* If it's an ACL, maybe ignore it */
3019 121126 : if (ropt->aclsSkip && _tocEntryIsACL(te))
3020 0 : return 0;
3021 :
3022 : /* If it's a comment, maybe ignore it */
3023 121126 : if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
3024 0 : return 0;
3025 :
3026 : /* If it's a policy, maybe ignore it */
3027 121126 : if (ropt->no_policies &&
3028 722 : (strcmp(te->desc, "POLICY") == 0 ||
3029 722 : strcmp(te->desc, "ROW SECURITY") == 0))
3030 0 : return 0;
3031 :
3032 : /*
3033 : * If it's a publication or a table part of a publication, maybe ignore
3034 : * it.
3035 : */
3036 121126 : if (ropt->no_publications &&
3037 0 : (strcmp(te->desc, "PUBLICATION") == 0 ||
3038 0 : strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
3039 0 : strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
3040 0 : return 0;
3041 :
3042 : /* If it's a security label, maybe ignore it */
3043 121126 : if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
3044 0 : return 0;
3045 :
3046 : /* If it's a subscription, maybe ignore it */
3047 121126 : if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
3048 0 : return 0;
3049 :
3050 : /* Ignore it if section is not to be dumped/restored */
3051 121126 : switch (curSection)
3052 : {
3053 74444 : case SECTION_PRE_DATA:
3054 74444 : if (!(ropt->dumpSections & DUMP_PRE_DATA))
3055 736 : return 0;
3056 73708 : break;
3057 23728 : case SECTION_DATA:
3058 23728 : if (!(ropt->dumpSections & DUMP_DATA))
3059 356 : return 0;
3060 23372 : break;
3061 22954 : case SECTION_POST_DATA:
3062 22954 : if (!(ropt->dumpSections & DUMP_POST_DATA))
3063 444 : return 0;
3064 22510 : break;
3065 0 : default:
3066 : /* shouldn't get here, really, but ignore it */
3067 0 : return 0;
3068 : }
3069 :
3070 : /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
3071 119590 : if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
3072 0 : return 0;
3073 :
3074 : /*
3075 : * Check options for selective dump/restore.
3076 : */
3077 119590 : if (strcmp(te->desc, "ACL") == 0 ||
3078 114414 : strcmp(te->desc, "COMMENT") == 0 ||
3079 100416 : strcmp(te->desc, "STATISTICS DATA") == 0 ||
3080 89854 : strcmp(te->desc, "SECURITY LABEL") == 0)
3081 : {
3082 : /* Database properties react to createDB, not selectivity options. */
3083 29736 : if (strncmp(te->tag, "DATABASE ", 9) == 0)
3084 : {
3085 184 : if (!ropt->createDB)
3086 38 : return 0;
3087 : }
3088 29552 : else if (ropt->schemaNames.head != NULL ||
3089 29552 : ropt->schemaExcludeNames.head != NULL ||
3090 29552 : ropt->selTypes)
3091 : {
3092 : /*
3093 : * In a selective dump/restore, we want to restore these dependent
3094 : * TOC entry types only if their parent object is being restored.
3095 : * Without selectivity options, we let through everything in the
3096 : * archive. Note there may be such entries with no parent, eg
3097 : * non-default ACLs for built-in objects. Also, we make
3098 : * per-column ACLs additionally depend on the table's ACL if any
3099 : * to ensure correct restore order, so those dependencies should
3100 : * be ignored in this check.
3101 : *
3102 : * This code depends on the parent having been marked already,
3103 : * which should be the case; if it isn't, perhaps due to
3104 : * SortTocFromFile rearrangement, skipping the dependent entry
3105 : * seems prudent anyway.
3106 : *
3107 : * Ideally we'd handle, eg, table CHECK constraints this way too.
3108 : * But it's hard to tell which of their dependencies is the one to
3109 : * consult.
3110 : */
3111 0 : bool dumpthis = false;
3112 :
3113 0 : for (int i = 0; i < te->nDeps; i++)
3114 : {
3115 0 : TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3116 :
3117 0 : if (!pte)
3118 0 : continue; /* probably shouldn't happen */
3119 0 : if (strcmp(pte->desc, "ACL") == 0)
3120 0 : continue; /* ignore dependency on another ACL */
3121 0 : if (pte->reqs == 0)
3122 0 : continue; /* this object isn't marked, so ignore it */
3123 : /* Found a parent to be dumped, so we want to dump this too */
3124 0 : dumpthis = true;
3125 0 : break;
3126 : }
3127 0 : if (!dumpthis)
3128 0 : return 0;
3129 : }
3130 : }
3131 : else
3132 : {
3133 : /* Apply selective-restore rules for standalone TOC entries. */
3134 89854 : if (ropt->schemaNames.head != NULL)
3135 : {
3136 : /* If no namespace is specified, it means all. */
3137 40 : if (!te->namespace)
3138 4 : return 0;
3139 36 : if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3140 28 : return 0;
3141 : }
3142 :
3143 89822 : if (ropt->schemaExcludeNames.head != NULL &&
3144 76 : te->namespace &&
3145 36 : simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3146 8 : return 0;
3147 :
3148 89814 : if (ropt->selTypes)
3149 : {
3150 156 : if (strcmp(te->desc, "TABLE") == 0 ||
3151 116 : strcmp(te->desc, "TABLE DATA") == 0 ||
3152 76 : strcmp(te->desc, "VIEW") == 0 ||
3153 76 : strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3154 76 : strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3155 76 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3156 76 : strcmp(te->desc, "SEQUENCE") == 0 ||
3157 70 : strcmp(te->desc, "SEQUENCE SET") == 0)
3158 : {
3159 92 : if (!ropt->selTable)
3160 60 : return 0;
3161 32 : if (ropt->tableNames.head != NULL &&
3162 32 : !simple_string_list_member(&ropt->tableNames, te->tag))
3163 28 : return 0;
3164 : }
3165 64 : else if (strcmp(te->desc, "INDEX") == 0)
3166 : {
3167 12 : if (!ropt->selIndex)
3168 8 : return 0;
3169 4 : if (ropt->indexNames.head != NULL &&
3170 4 : !simple_string_list_member(&ropt->indexNames, te->tag))
3171 2 : return 0;
3172 : }
3173 52 : else if (strcmp(te->desc, "FUNCTION") == 0 ||
3174 28 : strcmp(te->desc, "AGGREGATE") == 0 ||
3175 28 : strcmp(te->desc, "PROCEDURE") == 0)
3176 : {
3177 24 : if (!ropt->selFunction)
3178 8 : return 0;
3179 16 : if (ropt->functionNames.head != NULL &&
3180 16 : !simple_string_list_member(&ropt->functionNames, te->tag))
3181 12 : return 0;
3182 : }
3183 28 : else if (strcmp(te->desc, "TRIGGER") == 0)
3184 : {
3185 12 : if (!ropt->selTrigger)
3186 8 : return 0;
3187 4 : if (ropt->triggerNames.head != NULL &&
3188 4 : !simple_string_list_member(&ropt->triggerNames, te->tag))
3189 2 : return 0;
3190 : }
3191 : else
3192 16 : return 0;
3193 : }
3194 : }
3195 :
3196 :
3197 : /*
3198 : * Determine whether the TOC entry contains schema and/or data components,
3199 : * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3200 : * it's both schema and data. Otherwise it's probably schema-only, but
3201 : * there are exceptions.
3202 : */
3203 119368 : if (!te->hadDumper)
3204 : {
3205 : /*
3206 : * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3207 : * is considered a data entry. We don't need to check for BLOBS or
3208 : * old-style BLOB COMMENTS entries, because they will have hadDumper =
3209 : * true ... but we do need to check new-style BLOB ACLs, comments,
3210 : * etc.
3211 : */
3212 104676 : if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3213 103264 : strcmp(te->desc, "BLOB") == 0 ||
3214 103264 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3215 103064 : (strcmp(te->desc, "ACL") == 0 &&
3216 5176 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3217 102974 : (strcmp(te->desc, "COMMENT") == 0 &&
3218 13960 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3219 102838 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3220 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3221 1838 : res = res & REQ_DATA;
3222 : else
3223 102838 : res = res & ~REQ_DATA;
3224 : }
3225 :
3226 : /*
3227 : * If there's no definition command, there's no schema component. Treat
3228 : * "load via partition root" comments as not schema.
3229 : */
3230 119368 : if (!te->defn || !te->defn[0] ||
3231 98028 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
3232 21364 : res = res & ~REQ_SCHEMA;
3233 :
3234 : /*
3235 : * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3236 : * always ignore it.
3237 : */
3238 119368 : if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3239 0 : return 0;
3240 :
3241 : /* Mask it if we don't want data */
3242 119368 : if (!ropt->dumpData)
3243 : {
3244 : /*
3245 : * The sequence_data option overrides dumpData for SEQUENCE SET.
3246 : *
3247 : * In binary-upgrade mode, even with dumpData unset, we do not mask
3248 : * out large objects. (Only large object definitions, comments and
3249 : * other metadata should be generated in binary-upgrade mode, not the
3250 : * actual data, but that need not concern us here.)
3251 : */
3252 8086 : if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3253 7956 : !(ropt->binary_upgrade &&
3254 7160 : (strcmp(te->desc, "BLOB") == 0 ||
3255 7160 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3256 7160 : (strcmp(te->desc, "ACL") == 0 &&
3257 176 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3258 7160 : (strcmp(te->desc, "COMMENT") == 0 &&
3259 190 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3260 7154 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3261 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3262 7950 : res = res & (REQ_SCHEMA | REQ_STATS);
3263 : }
3264 :
3265 : /* Mask it if we don't want schema */
3266 119368 : if (!ropt->dumpSchema)
3267 852 : res = res & (REQ_DATA | REQ_STATS);
3268 :
3269 119368 : return res;
3270 : }
3271 :
3272 : /*
3273 : * Identify which pass we should restore this TOC entry in.
3274 : *
3275 : * See notes with the RestorePass typedef in pg_backup_archiver.h.
3276 : */
3277 : static RestorePass
3278 258300 : _tocEntryRestorePass(TocEntry *te)
3279 : {
3280 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3281 258300 : if (strcmp(te->desc, "ACL") == 0 ||
3282 247198 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3283 247198 : strcmp(te->desc, "DEFAULT ACL") == 0)
3284 11882 : return RESTORE_PASS_ACL;
3285 246418 : if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3286 246170 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3287 2118 : return RESTORE_PASS_POST_ACL;
3288 :
3289 : /*
3290 : * Comments need to be emitted in the same pass as their parent objects.
3291 : * ACLs haven't got comments, and neither do matview data objects, but
3292 : * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3293 : * we'd need yet another weird special case.)
3294 : */
3295 244300 : if (strcmp(te->desc, "COMMENT") == 0 &&
3296 28042 : strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3297 0 : return RESTORE_PASS_POST_ACL;
3298 :
3299 : /*
3300 : * If statistics data is dependent on materialized view data, it must be
3301 : * deferred to RESTORE_PASS_POST_ACL. Those entries are already marked as
3302 : * SECTION_POST_DATA, and some other stats entries (e.g., index stats)
3303 : * will also be marked as SECTION_POST_DATA. Additionally, our lookahead
3304 : * code in fetchAttributeStats() assumes that we dump all statistics data
3305 : * entries in TOC order. To ensure this assumption holds, we move all
3306 : * statistics data entries in SECTION_POST_DATA to RESTORE_PASS_POST_ACL.
3307 : */
3308 244300 : if (strcmp(te->desc, "STATISTICS DATA") == 0 &&
3309 19836 : te->section == SECTION_POST_DATA)
3310 7404 : return RESTORE_PASS_POST_ACL;
3311 :
3312 : /* All else can be handled in the main pass. */
3313 236896 : return RESTORE_PASS_MAIN;
3314 : }
3315 :
3316 : /*
3317 : * Identify TOC entries that are ACLs.
3318 : *
3319 : * Note: it seems worth duplicating some code here to avoid a hard-wired
3320 : * assumption that these are exactly the same entries that we restore during
3321 : * the RESTORE_PASS_ACL phase.
3322 : */
3323 : static bool
3324 98874 : _tocEntryIsACL(TocEntry *te)
3325 : {
3326 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3327 98874 : if (strcmp(te->desc, "ACL") == 0 ||
3328 95040 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3329 95040 : strcmp(te->desc, "DEFAULT ACL") == 0)
3330 4094 : return true;
3331 94780 : return false;
3332 : }
3333 :
3334 : /*
3335 : * Issue SET commands for parameters that we want to have set the same way
3336 : * at all times during execution of a restore script.
3337 : */
3338 : static void
3339 592 : _doSetFixedOutputState(ArchiveHandle *AH)
3340 : {
3341 592 : RestoreOptions *ropt = AH->public.ropt;
3342 :
3343 : /*
3344 : * Disable timeouts to allow for slow commands, idle parallel workers, etc
3345 : */
3346 592 : ahprintf(AH, "SET statement_timeout = 0;\n");
3347 592 : ahprintf(AH, "SET lock_timeout = 0;\n");
3348 592 : ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3349 592 : ahprintf(AH, "SET transaction_timeout = 0;\n");
3350 :
3351 : /* Select the correct character set encoding */
3352 592 : ahprintf(AH, "SET client_encoding = '%s';\n",
3353 : pg_encoding_to_char(AH->public.encoding));
3354 :
3355 : /* Select the correct string literal syntax */
3356 592 : ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3357 592 : AH->public.std_strings ? "on" : "off");
3358 :
3359 : /* Select the role to be used during restore */
3360 592 : if (ropt && ropt->use_role)
3361 0 : ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3362 :
3363 : /* Select the dump-time search_path */
3364 592 : if (AH->public.searchpath)
3365 592 : ahprintf(AH, "%s", AH->public.searchpath);
3366 :
3367 : /* Make sure function checking is disabled */
3368 592 : ahprintf(AH, "SET check_function_bodies = false;\n");
3369 :
3370 : /* Ensure that all valid XML data will be accepted */
3371 592 : ahprintf(AH, "SET xmloption = content;\n");
3372 :
3373 : /* Avoid annoying notices etc */
3374 592 : ahprintf(AH, "SET client_min_messages = warning;\n");
3375 592 : if (!AH->public.std_strings)
3376 0 : ahprintf(AH, "SET escape_string_warning = off;\n");
3377 :
3378 : /* Adjust row-security state */
3379 592 : if (ropt && ropt->enable_row_security)
3380 0 : ahprintf(AH, "SET row_security = on;\n");
3381 : else
3382 592 : ahprintf(AH, "SET row_security = off;\n");
3383 :
3384 : /*
3385 : * In --transaction-size mode, we should always be in a transaction when
3386 : * we begin to restore objects.
3387 : */
3388 592 : if (ropt && ropt->txn_size > 0)
3389 : {
3390 168 : if (AH->connection)
3391 168 : StartTransaction(&AH->public);
3392 : else
3393 0 : ahprintf(AH, "\nBEGIN;\n");
3394 168 : AH->txnCount = 0;
3395 : }
3396 :
3397 592 : ahprintf(AH, "\n");
3398 592 : }
3399 :
3400 : /*
3401 : * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3402 : * for updating state if appropriate. If user is NULL or an empty string,
3403 : * the specification DEFAULT will be used.
3404 : */
3405 : static void
3406 2 : _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3407 : {
3408 2 : PQExpBuffer cmd = createPQExpBuffer();
3409 :
3410 2 : appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3411 :
3412 : /*
3413 : * SQL requires a string literal here. Might as well be correct.
3414 : */
3415 2 : if (user && *user)
3416 2 : appendStringLiteralAHX(cmd, user, AH);
3417 : else
3418 0 : appendPQExpBufferStr(cmd, "DEFAULT");
3419 2 : appendPQExpBufferChar(cmd, ';');
3420 :
3421 2 : if (RestoringToDB(AH))
3422 : {
3423 : PGresult *res;
3424 :
3425 0 : res = PQexec(AH->connection, cmd->data);
3426 :
3427 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3428 : /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3429 0 : pg_fatal("could not set session user to \"%s\": %s",
3430 : user, PQerrorMessage(AH->connection));
3431 :
3432 0 : PQclear(res);
3433 : }
3434 : else
3435 2 : ahprintf(AH, "%s\n\n", cmd->data);
3436 :
3437 2 : destroyPQExpBuffer(cmd);
3438 2 : }
3439 :
3440 :
3441 : /*
3442 : * Issue the commands to connect to the specified database.
3443 : *
3444 : * If we're currently restoring right into a database, this will
3445 : * actually establish a connection. Otherwise it puts a \connect into
3446 : * the script output.
3447 : */
3448 : static void
3449 188 : _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3450 : {
3451 188 : if (RestoringToDB(AH))
3452 118 : ReconnectToServer(AH, dbname);
3453 : else
3454 : {
3455 : PQExpBufferData connectbuf;
3456 :
3457 70 : initPQExpBuffer(&connectbuf);
3458 70 : appendPsqlMetaConnect(&connectbuf, dbname);
3459 70 : ahprintf(AH, "%s\n", connectbuf.data);
3460 70 : termPQExpBuffer(&connectbuf);
3461 : }
3462 :
3463 : /*
3464 : * NOTE: currUser keeps track of what the imaginary session user in our
3465 : * script is. It's now effectively reset to the original userID.
3466 : */
3467 188 : free(AH->currUser);
3468 188 : AH->currUser = NULL;
3469 :
3470 : /* don't assume we still know the output schema, tablespace, etc either */
3471 188 : free(AH->currSchema);
3472 188 : AH->currSchema = NULL;
3473 :
3474 188 : free(AH->currTableAm);
3475 188 : AH->currTableAm = NULL;
3476 :
3477 188 : free(AH->currTablespace);
3478 188 : AH->currTablespace = NULL;
3479 :
3480 : /* re-establish fixed state */
3481 188 : _doSetFixedOutputState(AH);
3482 188 : }
3483 :
3484 : /*
3485 : * Become the specified user, and update state to avoid redundant commands
3486 : *
3487 : * NULL or empty argument is taken to mean restoring the session default
3488 : */
3489 : static void
3490 144 : _becomeUser(ArchiveHandle *AH, const char *user)
3491 : {
3492 144 : if (!user)
3493 0 : user = ""; /* avoid null pointers */
3494 :
3495 144 : if (AH->currUser && strcmp(AH->currUser, user) == 0)
3496 142 : return; /* no need to do anything */
3497 :
3498 2 : _doSetSessionAuth(AH, user);
3499 :
3500 : /*
3501 : * NOTE: currUser keeps track of what the imaginary session user in our
3502 : * script is
3503 : */
3504 2 : free(AH->currUser);
3505 2 : AH->currUser = pg_strdup(user);
3506 : }
3507 :
3508 : /*
3509 : * Become the owner of the given TOC entry object. If
3510 : * changes in ownership are not allowed, this doesn't do anything.
3511 : */
3512 : static void
3513 111228 : _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3514 : {
3515 111228 : RestoreOptions *ropt = AH->public.ropt;
3516 :
3517 111228 : if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3518 111228 : return;
3519 :
3520 0 : _becomeUser(AH, te->owner);
3521 : }
3522 :
3523 :
3524 : /*
3525 : * Issue the commands to select the specified schema as the current schema
3526 : * in the target database.
3527 : */
3528 : static void
3529 111382 : _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3530 : {
3531 : PQExpBuffer qry;
3532 :
3533 : /*
3534 : * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3535 : * that search_path rather than switching to entry-specific paths.
3536 : * Otherwise, it's an old archive that will not restore correctly unless
3537 : * we set the search_path as it's expecting.
3538 : */
3539 111382 : if (AH->public.searchpath)
3540 111382 : return;
3541 :
3542 0 : if (!schemaName || *schemaName == '\0' ||
3543 0 : (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3544 0 : return; /* no need to do anything */
3545 :
3546 0 : qry = createPQExpBuffer();
3547 :
3548 0 : appendPQExpBuffer(qry, "SET search_path = %s",
3549 : fmtId(schemaName));
3550 0 : if (strcmp(schemaName, "pg_catalog") != 0)
3551 0 : appendPQExpBufferStr(qry, ", pg_catalog");
3552 :
3553 0 : if (RestoringToDB(AH))
3554 : {
3555 : PGresult *res;
3556 :
3557 0 : res = PQexec(AH->connection, qry->data);
3558 :
3559 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3560 0 : warn_or_exit_horribly(AH,
3561 : "could not set \"search_path\" to \"%s\": %s",
3562 0 : schemaName, PQerrorMessage(AH->connection));
3563 :
3564 0 : PQclear(res);
3565 : }
3566 : else
3567 0 : ahprintf(AH, "%s;\n\n", qry->data);
3568 :
3569 0 : free(AH->currSchema);
3570 0 : AH->currSchema = pg_strdup(schemaName);
3571 :
3572 0 : destroyPQExpBuffer(qry);
3573 : }
3574 :
3575 : /*
3576 : * Issue the commands to select the specified tablespace as the current one
3577 : * in the target database.
3578 : */
3579 : static void
3580 98228 : _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3581 : {
3582 98228 : RestoreOptions *ropt = AH->public.ropt;
3583 : PQExpBuffer qry;
3584 : const char *want,
3585 : *have;
3586 :
3587 : /* do nothing in --no-tablespaces mode */
3588 98228 : if (ropt->noTablespace)
3589 0 : return;
3590 :
3591 98228 : have = AH->currTablespace;
3592 98228 : want = tablespace;
3593 :
3594 : /* no need to do anything for non-tablespace object */
3595 98228 : if (!want)
3596 75748 : return;
3597 :
3598 22480 : if (have && strcmp(want, have) == 0)
3599 22018 : return; /* no need to do anything */
3600 :
3601 462 : qry = createPQExpBuffer();
3602 :
3603 462 : if (strcmp(want, "") == 0)
3604 : {
3605 : /* We want the tablespace to be the database's default */
3606 346 : appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3607 : }
3608 : else
3609 : {
3610 : /* We want an explicit tablespace */
3611 116 : appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3612 : }
3613 :
3614 462 : if (RestoringToDB(AH))
3615 : {
3616 : PGresult *res;
3617 :
3618 60 : res = PQexec(AH->connection, qry->data);
3619 :
3620 60 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3621 0 : warn_or_exit_horribly(AH,
3622 : "could not set \"default_tablespace\" to %s: %s",
3623 0 : fmtId(want), PQerrorMessage(AH->connection));
3624 :
3625 60 : PQclear(res);
3626 : }
3627 : else
3628 402 : ahprintf(AH, "%s;\n\n", qry->data);
3629 :
3630 462 : free(AH->currTablespace);
3631 462 : AH->currTablespace = pg_strdup(want);
3632 :
3633 462 : destroyPQExpBuffer(qry);
3634 : }
3635 :
3636 : /*
3637 : * Set the proper default_table_access_method value for the table.
3638 : */
3639 : static void
3640 96624 : _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3641 : {
3642 96624 : RestoreOptions *ropt = AH->public.ropt;
3643 : PQExpBuffer cmd;
3644 : const char *want,
3645 : *have;
3646 :
3647 : /* do nothing in --no-table-access-method mode */
3648 96624 : if (ropt->noTableAm)
3649 730 : return;
3650 :
3651 95894 : have = AH->currTableAm;
3652 95894 : want = tableam;
3653 :
3654 95894 : if (!want)
3655 81784 : return;
3656 :
3657 14110 : if (have && strcmp(want, have) == 0)
3658 13430 : return;
3659 :
3660 680 : cmd = createPQExpBuffer();
3661 680 : appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3662 :
3663 680 : if (RestoringToDB(AH))
3664 : {
3665 : PGresult *res;
3666 :
3667 56 : res = PQexec(AH->connection, cmd->data);
3668 :
3669 56 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3670 0 : warn_or_exit_horribly(AH,
3671 : "could not set \"default_table_access_method\": %s",
3672 0 : PQerrorMessage(AH->connection));
3673 :
3674 56 : PQclear(res);
3675 : }
3676 : else
3677 624 : ahprintf(AH, "%s\n\n", cmd->data);
3678 :
3679 680 : destroyPQExpBuffer(cmd);
3680 :
3681 680 : free(AH->currTableAm);
3682 680 : AH->currTableAm = pg_strdup(want);
3683 : }
3684 :
3685 : /*
3686 : * Set the proper default table access method for a table without storage.
3687 : * Currently, this is required only for partitioned tables with a table AM.
3688 : */
3689 : static void
3690 1604 : _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
3691 : {
3692 1604 : RestoreOptions *ropt = AH->public.ropt;
3693 1604 : const char *tableam = te->tableam;
3694 : PQExpBuffer cmd;
3695 :
3696 : /* do nothing in --no-table-access-method mode */
3697 1604 : if (ropt->noTableAm)
3698 6 : return;
3699 :
3700 1598 : if (!tableam)
3701 1532 : return;
3702 :
3703 : Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3704 :
3705 66 : cmd = createPQExpBuffer();
3706 :
3707 66 : appendPQExpBufferStr(cmd, "ALTER TABLE ");
3708 66 : appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3709 66 : appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3710 : fmtId(tableam));
3711 :
3712 66 : if (RestoringToDB(AH))
3713 : {
3714 : PGresult *res;
3715 :
3716 0 : res = PQexec(AH->connection, cmd->data);
3717 :
3718 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3719 0 : warn_or_exit_horribly(AH,
3720 : "could not alter table access method: %s",
3721 0 : PQerrorMessage(AH->connection));
3722 0 : PQclear(res);
3723 : }
3724 : else
3725 66 : ahprintf(AH, "%s\n\n", cmd->data);
3726 :
3727 66 : destroyPQExpBuffer(cmd);
3728 : }
3729 :
3730 : /*
3731 : * Extract an object description for a TOC entry, and append it to buf.
3732 : *
3733 : * This is used for ALTER ... OWNER TO.
3734 : *
3735 : * If the object type has no owner, do nothing.
3736 : */
3737 : static void
3738 51122 : _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
3739 : {
3740 51122 : const char *type = te->desc;
3741 :
3742 : /* objects that don't require special decoration */
3743 51122 : if (strcmp(type, "COLLATION") == 0 ||
3744 46050 : strcmp(type, "CONVERSION") == 0 ||
3745 45216 : strcmp(type, "DOMAIN") == 0 ||
3746 44766 : strcmp(type, "FOREIGN TABLE") == 0 ||
3747 44696 : strcmp(type, "MATERIALIZED VIEW") == 0 ||
3748 43908 : strcmp(type, "SEQUENCE") == 0 ||
3749 43254 : strcmp(type, "STATISTICS") == 0 ||
3750 42994 : strcmp(type, "TABLE") == 0 ||
3751 28078 : strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3752 27688 : strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3753 27378 : strcmp(type, "TYPE") == 0 ||
3754 25970 : strcmp(type, "VIEW") == 0 ||
3755 : /* non-schema-specified objects */
3756 24602 : strcmp(type, "DATABASE") == 0 ||
3757 24480 : strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3758 24416 : strcmp(type, "SCHEMA") == 0 ||
3759 23950 : strcmp(type, "EVENT TRIGGER") == 0 ||
3760 23870 : strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3761 23774 : strcmp(type, "SERVER") == 0 ||
3762 23674 : strcmp(type, "PUBLICATION") == 0 ||
3763 23272 : strcmp(type, "SUBSCRIPTION") == 0)
3764 : {
3765 28046 : appendPQExpBuffer(buf, "%s ", type);
3766 28046 : if (te->namespace && *te->namespace)
3767 26520 : appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3768 28046 : appendPQExpBufferStr(buf, fmtId(te->tag));
3769 : }
3770 : /* LOs just have a name, but it's numeric so must not use fmtId */
3771 23076 : else if (strcmp(type, "BLOB") == 0)
3772 : {
3773 0 : appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3774 : }
3775 :
3776 : /*
3777 : * These object types require additional decoration. Fortunately, the
3778 : * information needed is exactly what's in the DROP command.
3779 : */
3780 23076 : else if (strcmp(type, "AGGREGATE") == 0 ||
3781 22256 : strcmp(type, "FUNCTION") == 0 ||
3782 17528 : strcmp(type, "OPERATOR") == 0 ||
3783 12492 : strcmp(type, "OPERATOR CLASS") == 0 ||
3784 11184 : strcmp(type, "OPERATOR FAMILY") == 0 ||
3785 10064 : strcmp(type, "PROCEDURE") == 0)
3786 : {
3787 : /* Chop "DROP " off the front and make a modifiable copy */
3788 13270 : char *first = pg_strdup(te->dropStmt + 5);
3789 : char *last;
3790 :
3791 : /* point to last character in string */
3792 13270 : last = first + strlen(first) - 1;
3793 :
3794 : /* Strip off any ';' or '\n' at the end */
3795 39810 : while (last >= first && (*last == '\n' || *last == ';'))
3796 26540 : last--;
3797 13270 : *(last + 1) = '\0';
3798 :
3799 13270 : appendPQExpBufferStr(buf, first);
3800 :
3801 13270 : free(first);
3802 13270 : return;
3803 : }
3804 : /* these object types don't have separate owners */
3805 9806 : else if (strcmp(type, "CAST") == 0 ||
3806 9806 : strcmp(type, "CHECK CONSTRAINT") == 0 ||
3807 9646 : strcmp(type, "CONSTRAINT") == 0 ||
3808 5822 : strcmp(type, "DATABASE PROPERTIES") == 0 ||
3809 5806 : strcmp(type, "DEFAULT") == 0 ||
3810 5384 : strcmp(type, "FK CONSTRAINT") == 0 ||
3811 4948 : strcmp(type, "INDEX") == 0 ||
3812 2410 : strcmp(type, "RULE") == 0 ||
3813 1742 : strcmp(type, "TRIGGER") == 0 ||
3814 592 : strcmp(type, "ROW SECURITY") == 0 ||
3815 592 : strcmp(type, "POLICY") == 0 ||
3816 64 : strcmp(type, "USER MAPPING") == 0)
3817 : {
3818 : /* do nothing */
3819 : }
3820 : else
3821 0 : pg_fatal("don't know how to set owner for object type \"%s\"", type);
3822 : }
3823 :
3824 : /*
3825 : * Emit the SQL commands to create the object represented by a TOC entry
3826 : *
3827 : * This now also includes issuing an ALTER OWNER command to restore the
3828 : * object's ownership, if wanted. But note that the object's permissions
3829 : * will remain at default, until the matching ACL TOC entry is restored.
3830 : */
3831 : static void
3832 98228 : _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx)
3833 : {
3834 98228 : RestoreOptions *ropt = AH->public.ropt;
3835 :
3836 : /*
3837 : * Select owner, schema, tablespace and default AM as necessary. The
3838 : * default access method for partitioned tables is handled after
3839 : * generating the object definition, as it requires an ALTER command
3840 : * rather than SET.
3841 : */
3842 98228 : _becomeOwner(AH, te);
3843 98228 : _selectOutputSchema(AH, te->namespace);
3844 98228 : _selectTablespace(AH, te->tablespace);
3845 98228 : if (te->relkind != RELKIND_PARTITIONED_TABLE)
3846 96624 : _selectTableAccessMethod(AH, te->tableam);
3847 :
3848 : /* Emit header comment for item */
3849 98228 : if (!AH->noTocComments)
3850 : {
3851 : char *sanitized_name;
3852 : char *sanitized_schema;
3853 : char *sanitized_owner;
3854 :
3855 85832 : ahprintf(AH, "--\n");
3856 85832 : if (AH->public.verbose)
3857 : {
3858 2224 : ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3859 : te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3860 2224 : if (te->nDeps > 0)
3861 : {
3862 : int i;
3863 :
3864 1480 : ahprintf(AH, "-- Dependencies:");
3865 4048 : for (i = 0; i < te->nDeps; i++)
3866 2568 : ahprintf(AH, " %d", te->dependencies[i]);
3867 1480 : ahprintf(AH, "\n");
3868 : }
3869 : }
3870 :
3871 85832 : sanitized_name = sanitize_line(te->tag, false);
3872 85832 : sanitized_schema = sanitize_line(te->namespace, true);
3873 85832 : sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3874 :
3875 85832 : ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3876 : pfx, sanitized_name, te->desc, sanitized_schema,
3877 : sanitized_owner);
3878 :
3879 85832 : free(sanitized_name);
3880 85832 : free(sanitized_schema);
3881 85832 : free(sanitized_owner);
3882 :
3883 85832 : if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3884 : {
3885 : char *sanitized_tablespace;
3886 :
3887 204 : sanitized_tablespace = sanitize_line(te->tablespace, false);
3888 204 : ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3889 204 : free(sanitized_tablespace);
3890 : }
3891 85832 : ahprintf(AH, "\n");
3892 :
3893 85832 : if (AH->PrintExtraTocPtr != NULL)
3894 7592 : AH->PrintExtraTocPtr(AH, te);
3895 85832 : ahprintf(AH, "--\n\n");
3896 : }
3897 :
3898 : /*
3899 : * Actually print the definition. Normally we can just print the defn
3900 : * string if any, but we have four special cases:
3901 : *
3902 : * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3903 : * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3904 : * "public" that is a comment. We have to do this when --no-owner mode is
3905 : * selected. This is ugly, but I see no other good way ...
3906 : *
3907 : * 2. BLOB METADATA entries need special processing since their defn
3908 : * strings are just lists of OIDs, not complete SQL commands.
3909 : *
3910 : * 3. ACL LARGE OBJECTS entries need special processing because they
3911 : * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3912 : * apply to each large object listed in the associated BLOB METADATA.
3913 : *
3914 : * 4. Entries with a defnDumper need to call it to generate the
3915 : * definition. This is primarily intended to provide a way to save memory
3916 : * for objects that would otherwise need a lot of it (e.g., statistics
3917 : * data).
3918 : */
3919 98228 : if (ropt->noOwner &&
3920 768 : strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3921 : {
3922 4 : ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3923 : }
3924 98224 : else if (strcmp(te->desc, "BLOB METADATA") == 0)
3925 : {
3926 154 : IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3927 : }
3928 98070 : else if (strcmp(te->desc, "ACL") == 0 &&
3929 3834 : strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3930 : {
3931 0 : IssueACLPerBlob(AH, te);
3932 : }
3933 98070 : else if (te->defnLen && AH->format != archTar)
3934 : {
3935 : /*
3936 : * If defnLen is set, the defnDumper has already been called for this
3937 : * TOC entry. We don't normally expect a defnDumper to be called for
3938 : * a TOC entry a second time in _printTocEntry(), but there's an
3939 : * exception. The tar format first calls WriteToc(), which scans the
3940 : * entire TOC, and then it later calls RestoreArchive() to generate
3941 : * restore.sql, which scans the TOC again. There doesn't appear to be
3942 : * a good way to prevent a second defnDumper call in this case without
3943 : * storing the definition in memory, which defeats the purpose. This
3944 : * second defnDumper invocation should generate the same output as the
3945 : * first, but even if it doesn't, the worst-case scenario is that
3946 : * restore.sql might have different statistics data than the archive.
3947 : *
3948 : * In all other cases, encountering a TOC entry a second time in
3949 : * _printTocEntry() is unexpected, so we fail because one of our
3950 : * assumptions must no longer hold true.
3951 : *
3952 : * XXX This is a layering violation, but the alternative is an awkward
3953 : * and complicated callback infrastructure for this special case. This
3954 : * might be worth revisiting in the future.
3955 : */
3956 0 : pg_fatal("unexpected TOC entry in _printTocEntry(): %d %s %s",
3957 : te->dumpId, te->desc, te->tag);
3958 : }
3959 98070 : else if (te->defnDumper)
3960 : {
3961 3464 : char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
3962 :
3963 3464 : te->defnLen = ahprintf(AH, "%s\n\n", defn);
3964 3464 : pg_free(defn);
3965 : }
3966 94606 : else if (te->defn && strlen(te->defn) > 0)
3967 : {
3968 82128 : ahprintf(AH, "%s\n\n", te->defn);
3969 :
3970 : /*
3971 : * If the defn string contains multiple SQL commands, txn_size mode
3972 : * should count it as N actions not one. But rather than build a full
3973 : * SQL parser, approximate this by counting semicolons. One case
3974 : * where that tends to be badly fooled is function definitions, so
3975 : * ignore them. (restore_toc_entry will count one action anyway.)
3976 : */
3977 82128 : if (ropt->txn_size > 0 &&
3978 6610 : strcmp(te->desc, "FUNCTION") != 0 &&
3979 6082 : strcmp(te->desc, "PROCEDURE") != 0)
3980 : {
3981 6058 : const char *p = te->defn;
3982 6058 : int nsemis = 0;
3983 :
3984 26898 : while ((p = strchr(p, ';')) != NULL)
3985 : {
3986 20840 : nsemis++;
3987 20840 : p++;
3988 : }
3989 6058 : if (nsemis > 1)
3990 2912 : AH->txnCount += nsemis - 1;
3991 : }
3992 : }
3993 :
3994 : /*
3995 : * If we aren't using SET SESSION AUTH to determine ownership, we must
3996 : * instead issue an ALTER OWNER command. Schema "public" is special; when
3997 : * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3998 : * when using SET SESSION for all other objects. We assume that anything
3999 : * without a DROP command is not a separately ownable object.
4000 : */
4001 98228 : if (!ropt->noOwner &&
4002 97460 : (!ropt->use_setsessauth ||
4003 0 : (strcmp(te->desc, "SCHEMA") == 0 &&
4004 0 : strncmp(te->defn, "--", 2) == 0)) &&
4005 97460 : te->owner && strlen(te->owner) > 0 &&
4006 89952 : te->dropStmt && strlen(te->dropStmt) > 0)
4007 : {
4008 51272 : if (strcmp(te->desc, "BLOB METADATA") == 0)
4009 : {
4010 : /* BLOB METADATA needs special code to handle multiple LOs */
4011 150 : char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
4012 :
4013 150 : IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
4014 150 : pg_free(cmdEnd);
4015 : }
4016 : else
4017 : {
4018 : /* For all other cases, we can use _getObjectDescription */
4019 : PQExpBufferData temp;
4020 :
4021 51122 : initPQExpBuffer(&temp);
4022 51122 : _getObjectDescription(&temp, te);
4023 :
4024 : /*
4025 : * If _getObjectDescription() didn't fill the buffer, then there
4026 : * is no owner.
4027 : */
4028 51122 : if (temp.data[0])
4029 41316 : ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
4030 41316 : temp.data, fmtId(te->owner));
4031 51122 : termPQExpBuffer(&temp);
4032 : }
4033 : }
4034 :
4035 : /*
4036 : * Select a partitioned table's default AM, once the table definition has
4037 : * been generated.
4038 : */
4039 98228 : if (te->relkind == RELKIND_PARTITIONED_TABLE)
4040 1604 : _printTableAccessMethodNoStorage(AH, te);
4041 :
4042 : /*
4043 : * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
4044 : * commands, so we can no longer assume we know the current auth setting.
4045 : */
4046 98228 : if (_tocEntryIsACL(te))
4047 : {
4048 4094 : free(AH->currUser);
4049 4094 : AH->currUser = NULL;
4050 : }
4051 98228 : }
4052 :
4053 : /*
4054 : * Sanitize a string to be included in an SQL comment or TOC listing, by
4055 : * replacing any newlines with spaces. This ensures each logical output line
4056 : * is in fact one physical output line, to prevent corruption of the dump
4057 : * (which could, in the worst case, present an SQL injection vulnerability
4058 : * if someone were to incautiously load a dump containing objects with
4059 : * maliciously crafted names).
4060 : *
4061 : * The result is a freshly malloc'd string. If the input string is NULL,
4062 : * return a malloc'ed empty string, unless want_hyphen, in which case return a
4063 : * malloc'ed hyphen.
4064 : *
4065 : * Note that we currently don't bother to quote names, meaning that the name
4066 : * fields aren't automatically parseable. "pg_restore -L" doesn't care because
4067 : * it only examines the dumpId field, but someday we might want to try harder.
4068 : */
4069 : static char *
4070 266540 : sanitize_line(const char *str, bool want_hyphen)
4071 : {
4072 : char *result;
4073 : char *s;
4074 :
4075 266540 : if (!str)
4076 9836 : return pg_strdup(want_hyphen ? "-" : "");
4077 :
4078 256704 : result = pg_strdup(str);
4079 :
4080 3272004 : for (s = result; *s != '\0'; s++)
4081 : {
4082 3015300 : if (*s == '\n' || *s == '\r')
4083 120 : *s = ' ';
4084 : }
4085 :
4086 256704 : return result;
4087 : }
4088 :
4089 : /*
4090 : * Write the file header for a custom-format archive
4091 : */
4092 : void
4093 108 : WriteHead(ArchiveHandle *AH)
4094 : {
4095 : struct tm crtm;
4096 :
4097 108 : AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
4098 108 : AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
4099 108 : AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
4100 108 : AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
4101 108 : AH->WriteBytePtr(AH, AH->intSize);
4102 108 : AH->WriteBytePtr(AH, AH->offSize);
4103 108 : AH->WriteBytePtr(AH, AH->format);
4104 108 : AH->WriteBytePtr(AH, AH->compression_spec.algorithm);
4105 108 : crtm = *localtime(&AH->createDate);
4106 108 : WriteInt(AH, crtm.tm_sec);
4107 108 : WriteInt(AH, crtm.tm_min);
4108 108 : WriteInt(AH, crtm.tm_hour);
4109 108 : WriteInt(AH, crtm.tm_mday);
4110 108 : WriteInt(AH, crtm.tm_mon);
4111 108 : WriteInt(AH, crtm.tm_year);
4112 108 : WriteInt(AH, crtm.tm_isdst);
4113 108 : WriteStr(AH, PQdb(AH->connection));
4114 108 : WriteStr(AH, AH->public.remoteVersionStr);
4115 108 : WriteStr(AH, PG_VERSION);
4116 108 : }
4117 :
4118 : void
4119 114 : ReadHead(ArchiveHandle *AH)
4120 : {
4121 : char *errmsg;
4122 : char vmaj,
4123 : vmin,
4124 : vrev;
4125 : int fmt;
4126 :
4127 : /*
4128 : * If we haven't already read the header, do so.
4129 : *
4130 : * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
4131 : * way to unify the cases?
4132 : */
4133 114 : if (!AH->readHeader)
4134 : {
4135 : char tmpMag[7];
4136 :
4137 114 : AH->ReadBufPtr(AH, tmpMag, 5);
4138 :
4139 114 : if (strncmp(tmpMag, "PGDMP", 5) != 0)
4140 0 : pg_fatal("did not find magic string in file header");
4141 : }
4142 :
4143 114 : vmaj = AH->ReadBytePtr(AH);
4144 114 : vmin = AH->ReadBytePtr(AH);
4145 :
4146 114 : if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4147 114 : vrev = AH->ReadBytePtr(AH);
4148 : else
4149 0 : vrev = 0;
4150 :
4151 114 : AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
4152 :
4153 114 : if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4154 0 : pg_fatal("unsupported version (%d.%d) in file header",
4155 : vmaj, vmin);
4156 :
4157 114 : AH->intSize = AH->ReadBytePtr(AH);
4158 114 : if (AH->intSize > 32)
4159 0 : pg_fatal("sanity check on integer size (%lu) failed",
4160 : (unsigned long) AH->intSize);
4161 :
4162 114 : if (AH->intSize > sizeof(int))
4163 0 : pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4164 :
4165 114 : if (AH->version >= K_VERS_1_7)
4166 114 : AH->offSize = AH->ReadBytePtr(AH);
4167 : else
4168 0 : AH->offSize = AH->intSize;
4169 :
4170 114 : fmt = AH->ReadBytePtr(AH);
4171 :
4172 114 : if (AH->format != fmt)
4173 0 : pg_fatal("expected format (%d) differs from format found in file (%d)",
4174 : AH->format, fmt);
4175 :
4176 114 : if (AH->version >= K_VERS_1_15)
4177 114 : AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
4178 0 : else if (AH->version >= K_VERS_1_2)
4179 : {
4180 : /* Guess the compression method based on the level */
4181 0 : if (AH->version < K_VERS_1_4)
4182 0 : AH->compression_spec.level = AH->ReadBytePtr(AH);
4183 : else
4184 0 : AH->compression_spec.level = ReadInt(AH);
4185 :
4186 0 : if (AH->compression_spec.level != 0)
4187 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4188 : }
4189 : else
4190 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4191 :
4192 114 : errmsg = supports_compression(AH->compression_spec);
4193 114 : if (errmsg)
4194 : {
4195 0 : pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4196 : errmsg);
4197 0 : pg_free(errmsg);
4198 : }
4199 :
4200 114 : if (AH->version >= K_VERS_1_4)
4201 : {
4202 : struct tm crtm;
4203 :
4204 114 : crtm.tm_sec = ReadInt(AH);
4205 114 : crtm.tm_min = ReadInt(AH);
4206 114 : crtm.tm_hour = ReadInt(AH);
4207 114 : crtm.tm_mday = ReadInt(AH);
4208 114 : crtm.tm_mon = ReadInt(AH);
4209 114 : crtm.tm_year = ReadInt(AH);
4210 114 : crtm.tm_isdst = ReadInt(AH);
4211 :
4212 : /*
4213 : * Newer versions of glibc have mktime() report failure if tm_isdst is
4214 : * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4215 : * TZ=UTC. This is problematic when restoring an archive under a
4216 : * different timezone setting. If we get a failure, try again with
4217 : * tm_isdst set to -1 ("don't know").
4218 : *
4219 : * XXX with or without this hack, we reconstruct createDate
4220 : * incorrectly when the prevailing timezone is different from
4221 : * pg_dump's. Next time we bump the archive version, we should flush
4222 : * this representation and store a plain seconds-since-the-Epoch
4223 : * timestamp instead.
4224 : */
4225 114 : AH->createDate = mktime(&crtm);
4226 114 : if (AH->createDate == (time_t) -1)
4227 : {
4228 0 : crtm.tm_isdst = -1;
4229 0 : AH->createDate = mktime(&crtm);
4230 0 : if (AH->createDate == (time_t) -1)
4231 0 : pg_log_warning("invalid creation date in header");
4232 : }
4233 : }
4234 :
4235 114 : if (AH->version >= K_VERS_1_4)
4236 : {
4237 114 : AH->archdbname = ReadStr(AH);
4238 : }
4239 :
4240 114 : if (AH->version >= K_VERS_1_10)
4241 : {
4242 114 : AH->archiveRemoteVersion = ReadStr(AH);
4243 114 : AH->archiveDumpVersion = ReadStr(AH);
4244 : }
4245 114 : }
4246 :
4247 :
4248 : /*
4249 : * checkSeek
4250 : * check to see if ftell/fseek can be performed.
4251 : */
4252 : bool
4253 176 : checkSeek(FILE *fp)
4254 : {
4255 : pgoff_t tpos;
4256 :
4257 : /* Check that ftello works on this file */
4258 176 : tpos = ftello(fp);
4259 176 : if (tpos < 0)
4260 2 : return false;
4261 :
4262 : /*
4263 : * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4264 : * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4265 : * successful no-op even on files that are otherwise unseekable.
4266 : */
4267 174 : if (fseeko(fp, tpos, SEEK_SET) != 0)
4268 0 : return false;
4269 :
4270 174 : return true;
4271 : }
4272 :
4273 :
4274 : /*
4275 : * dumpTimestamp
4276 : */
4277 : static void
4278 156 : dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4279 : {
4280 : char buf[64];
4281 :
4282 156 : if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4283 156 : ahprintf(AH, "-- %s %s\n\n", msg, buf);
4284 156 : }
4285 :
4286 : /*
4287 : * Main engine for parallel restore.
4288 : *
4289 : * Parallel restore is done in three phases. In this first phase,
4290 : * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4291 : * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4292 : * PRE_DATA items other than ACLs.) Entries we can't process now are
4293 : * added to the pending_list for later phases to deal with.
4294 : */
4295 : static void
4296 10 : restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
4297 : {
4298 : bool skipped_some;
4299 : TocEntry *next_work_item;
4300 :
4301 10 : pg_log_debug("entering restore_toc_entries_prefork");
4302 :
4303 : /* Adjust dependency information */
4304 10 : fix_dependencies(AH);
4305 :
4306 : /*
4307 : * Do all the early stuff in a single connection in the parent. There's no
4308 : * great point in running it in parallel, in fact it will actually run
4309 : * faster in a single connection because we avoid all the connection and
4310 : * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4311 : * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4312 : * not risk trying to process them out-of-order.
4313 : *
4314 : * Stuff that we can't do immediately gets added to the pending_list.
4315 : * Note: we don't yet filter out entries that aren't going to be restored.
4316 : * They might participate in dependency chains connecting entries that
4317 : * should be restored, so we treat them as live until we actually process
4318 : * them.
4319 : *
4320 : * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4321 : * before DATA items, and all DATA items before POST_DATA items. That is
4322 : * not certain to be true in older archives, though, and in any case use
4323 : * of a list file would destroy that ordering (cf. SortTocFromFile). So
4324 : * this loop cannot assume that it holds.
4325 : */
4326 10 : AH->restorePass = RESTORE_PASS_MAIN;
4327 10 : skipped_some = false;
4328 5720 : for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4329 : {
4330 5710 : bool do_now = true;
4331 :
4332 5710 : if (next_work_item->section != SECTION_PRE_DATA)
4333 : {
4334 : /* DATA and POST_DATA items are just ignored for now */
4335 2558 : if (next_work_item->section == SECTION_DATA ||
4336 1138 : next_work_item->section == SECTION_POST_DATA)
4337 : {
4338 2500 : do_now = false;
4339 2500 : skipped_some = true;
4340 : }
4341 : else
4342 : {
4343 : /*
4344 : * SECTION_NONE items, such as comments, can be processed now
4345 : * if we are still in the PRE_DATA part of the archive. Once
4346 : * we've skipped any items, we have to consider whether the
4347 : * comment's dependencies are satisfied, so skip it for now.
4348 : */
4349 58 : if (skipped_some)
4350 18 : do_now = false;
4351 : }
4352 : }
4353 :
4354 : /*
4355 : * Also skip items that need to be forced into later passes. We need
4356 : * not set skipped_some in this case, since by assumption no main-pass
4357 : * items could depend on these.
4358 : */
4359 5710 : if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4360 44 : do_now = false;
4361 :
4362 5710 : if (do_now)
4363 : {
4364 : /* OK, restore the item and update its dependencies */
4365 3166 : pg_log_info("processing item %d %s %s",
4366 : next_work_item->dumpId,
4367 : next_work_item->desc, next_work_item->tag);
4368 :
4369 3166 : (void) restore_toc_entry(AH, next_work_item, false);
4370 :
4371 : /* Reduce dependencies, but don't move anything to ready_heap */
4372 3166 : reduce_dependencies(AH, next_work_item, NULL);
4373 : }
4374 : else
4375 : {
4376 : /* Nope, so add it to pending_list */
4377 2544 : pending_list_append(pending_list, next_work_item);
4378 : }
4379 : }
4380 :
4381 : /*
4382 : * In --transaction-size mode, we must commit the open transaction before
4383 : * dropping the database connection. This also ensures that child workers
4384 : * can see the objects we've created so far.
4385 : */
4386 10 : if (AH->public.ropt->txn_size > 0)
4387 0 : CommitTransaction(&AH->public);
4388 :
4389 : /*
4390 : * Now close parent connection in prep for parallel steps. We do this
4391 : * mainly to ensure that we don't exceed the specified number of parallel
4392 : * connections.
4393 : */
4394 10 : DisconnectDatabase(&AH->public);
4395 :
4396 : /* blow away any transient state from the old connection */
4397 10 : free(AH->currUser);
4398 10 : AH->currUser = NULL;
4399 10 : free(AH->currSchema);
4400 10 : AH->currSchema = NULL;
4401 10 : free(AH->currTablespace);
4402 10 : AH->currTablespace = NULL;
4403 10 : free(AH->currTableAm);
4404 10 : AH->currTableAm = NULL;
4405 10 : }
4406 :
4407 : /*
4408 : * Main engine for parallel restore.
4409 : *
4410 : * Parallel restore is done in three phases. In this second phase,
4411 : * we process entries by dispatching them to parallel worker children
4412 : * (processes on Unix, threads on Windows), each of which connects
4413 : * separately to the database. Inter-entry dependencies are respected,
4414 : * and so is the RestorePass multi-pass structure. When we can no longer
4415 : * make any entries ready to process, we exit. Normally, there will be
4416 : * nothing left to do; but if there is, the third phase will mop up.
4417 : */
4418 : static void
4419 10 : restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4420 : TocEntry *pending_list)
4421 : {
4422 : binaryheap *ready_heap;
4423 : TocEntry *next_work_item;
4424 :
4425 10 : pg_log_debug("entering restore_toc_entries_parallel");
4426 :
4427 : /* Set up ready_heap with enough room for all known TocEntrys */
4428 10 : ready_heap = binaryheap_allocate(AH->tocCount,
4429 : TocEntrySizeCompareBinaryheap,
4430 : NULL);
4431 :
4432 : /*
4433 : * The pending_list contains all items that we need to restore. Move all
4434 : * items that are available to process immediately into the ready_heap.
4435 : * After this setup, the pending list is everything that needs to be done
4436 : * but is blocked by one or more dependencies, while the ready heap
4437 : * contains items that have no remaining dependencies and are OK to
4438 : * process in the current restore pass.
4439 : */
4440 10 : AH->restorePass = RESTORE_PASS_MAIN;
4441 10 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4442 :
4443 : /*
4444 : * main parent loop
4445 : *
4446 : * Keep going until there is no worker still running AND there is no work
4447 : * left to be done. Note invariant: at top of loop, there should always
4448 : * be at least one worker available to dispatch a job to.
4449 : */
4450 10 : pg_log_info("entering main parallel loop");
4451 :
4452 : for (;;)
4453 : {
4454 : /* Look for an item ready to be dispatched to a worker */
4455 2604 : next_work_item = pop_next_work_item(ready_heap, pstate);
4456 2604 : if (next_work_item != NULL)
4457 : {
4458 : /* If not to be restored, don't waste time launching a worker */
4459 2544 : if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
4460 : {
4461 0 : pg_log_info("skipping item %d %s %s",
4462 : next_work_item->dumpId,
4463 : next_work_item->desc, next_work_item->tag);
4464 : /* Update its dependencies as though we'd completed it */
4465 0 : reduce_dependencies(AH, next_work_item, ready_heap);
4466 : /* Loop around to see if anything else can be dispatched */
4467 0 : continue;
4468 : }
4469 :
4470 2544 : pg_log_info("launching item %d %s %s",
4471 : next_work_item->dumpId,
4472 : next_work_item->desc, next_work_item->tag);
4473 :
4474 : /* Dispatch to some worker */
4475 2544 : DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4476 : mark_restore_job_done, ready_heap);
4477 : }
4478 60 : else if (IsEveryWorkerIdle(pstate))
4479 : {
4480 : /*
4481 : * Nothing is ready and no worker is running, so we're done with
4482 : * the current pass or maybe with the whole process.
4483 : */
4484 30 : if (AH->restorePass == RESTORE_PASS_LAST)
4485 10 : break; /* No more parallel processing is possible */
4486 :
4487 : /* Advance to next restore pass */
4488 20 : AH->restorePass++;
4489 : /* That probably allows some stuff to be made ready */
4490 20 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4491 : /* Loop around to see if anything's now ready */
4492 20 : continue;
4493 : }
4494 : else
4495 : {
4496 : /*
4497 : * We have nothing ready, but at least one child is working, so
4498 : * wait for some subjob to finish.
4499 : */
4500 : }
4501 :
4502 : /*
4503 : * Before dispatching another job, check to see if anything has
4504 : * finished. We should check every time through the loop so as to
4505 : * reduce dependencies as soon as possible. If we were unable to
4506 : * dispatch any job this time through, wait until some worker finishes
4507 : * (and, hopefully, unblocks some pending item). If we did dispatch
4508 : * something, continue as soon as there's at least one idle worker.
4509 : * Note that in either case, there's guaranteed to be at least one
4510 : * idle worker when we return to the top of the loop. This ensures we
4511 : * won't block inside DispatchJobForTocEntry, which would be
4512 : * undesirable: we'd rather postpone dispatching until we see what's
4513 : * been unblocked by finished jobs.
4514 : */
4515 2574 : WaitForWorkers(AH, pstate,
4516 : next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4517 : }
4518 :
4519 : /* There should now be nothing in ready_heap. */
4520 : Assert(binaryheap_empty(ready_heap));
4521 :
4522 10 : binaryheap_free(ready_heap);
4523 :
4524 10 : pg_log_info("finished main parallel loop");
4525 10 : }
4526 :
4527 : /*
4528 : * Main engine for parallel restore.
4529 : *
4530 : * Parallel restore is done in three phases. In this third phase,
4531 : * we mop up any remaining TOC entries by processing them serially.
4532 : * This phase normally should have nothing to do, but if we've somehow
4533 : * gotten stuck due to circular dependencies or some such, this provides
4534 : * at least some chance of completing the restore successfully.
4535 : */
4536 : static void
4537 10 : restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4538 : {
4539 10 : RestoreOptions *ropt = AH->public.ropt;
4540 : TocEntry *te;
4541 :
4542 10 : pg_log_debug("entering restore_toc_entries_postfork");
4543 :
4544 : /*
4545 : * Now reconnect the single parent connection.
4546 : */
4547 10 : ConnectDatabaseAhx((Archive *) AH, &ropt->cparams, true);
4548 :
4549 : /* re-establish fixed state */
4550 10 : _doSetFixedOutputState(AH);
4551 :
4552 : /*
4553 : * Make sure there is no work left due to, say, circular dependencies, or
4554 : * some other pathological condition. If so, do it in the single parent
4555 : * connection. We don't sweat about RestorePass ordering; it's likely we
4556 : * already violated that.
4557 : */
4558 10 : for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4559 : {
4560 0 : pg_log_info("processing missed item %d %s %s",
4561 : te->dumpId, te->desc, te->tag);
4562 0 : (void) restore_toc_entry(AH, te, false);
4563 : }
4564 10 : }
4565 :
4566 : /*
4567 : * Check if te1 has an exclusive lock requirement for an item that te2 also
4568 : * requires, whether or not te2's requirement is for an exclusive lock.
4569 : */
4570 : static bool
4571 5362 : has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4572 : {
4573 : int j,
4574 : k;
4575 :
4576 8754 : for (j = 0; j < te1->nLockDeps; j++)
4577 : {
4578 12920 : for (k = 0; k < te2->nDeps; k++)
4579 : {
4580 9528 : if (te1->lockDeps[j] == te2->dependencies[k])
4581 174 : return true;
4582 : }
4583 : }
4584 5188 : return false;
4585 : }
4586 :
4587 :
4588 : /*
4589 : * Initialize the header of the pending-items list.
4590 : *
4591 : * This is a circular list with a dummy TocEntry as header, just like the
4592 : * main TOC list; but we use separate list links so that an entry can be in
4593 : * the main TOC list as well as in the pending list.
4594 : */
4595 : static void
4596 10 : pending_list_header_init(TocEntry *l)
4597 : {
4598 10 : l->pending_prev = l->pending_next = l;
4599 10 : }
4600 :
4601 : /* Append te to the end of the pending-list headed by l */
4602 : static void
4603 2544 : pending_list_append(TocEntry *l, TocEntry *te)
4604 : {
4605 2544 : te->pending_prev = l->pending_prev;
4606 2544 : l->pending_prev->pending_next = te;
4607 2544 : l->pending_prev = te;
4608 2544 : te->pending_next = l;
4609 2544 : }
4610 :
4611 : /* Remove te from the pending-list */
4612 : static void
4613 2544 : pending_list_remove(TocEntry *te)
4614 : {
4615 2544 : te->pending_prev->pending_next = te->pending_next;
4616 2544 : te->pending_next->pending_prev = te->pending_prev;
4617 2544 : te->pending_prev = NULL;
4618 2544 : te->pending_next = NULL;
4619 2544 : }
4620 :
4621 :
4622 : /* qsort comparator for sorting TocEntries by dataLength */
4623 : static int
4624 59524 : TocEntrySizeCompareQsort(const void *p1, const void *p2)
4625 : {
4626 59524 : const TocEntry *te1 = *(const TocEntry *const *) p1;
4627 59524 : const TocEntry *te2 = *(const TocEntry *const *) p2;
4628 :
4629 : /* Sort by decreasing dataLength */
4630 59524 : if (te1->dataLength > te2->dataLength)
4631 12824 : return -1;
4632 46700 : if (te1->dataLength < te2->dataLength)
4633 22484 : return 1;
4634 :
4635 : /* For equal dataLengths, sort by dumpId, just to be stable */
4636 24216 : if (te1->dumpId < te2->dumpId)
4637 11050 : return -1;
4638 13166 : if (te1->dumpId > te2->dumpId)
4639 13130 : return 1;
4640 :
4641 36 : return 0;
4642 : }
4643 :
4644 : /* binaryheap comparator for sorting TocEntries by dataLength */
4645 : static int
4646 45996 : TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4647 : {
4648 : /* return opposite of qsort comparator for max-heap */
4649 45996 : return -TocEntrySizeCompareQsort(&p1, &p2);
4650 : }
4651 :
4652 :
4653 : /*
4654 : * Move all immediately-ready items from pending_list to ready_heap.
4655 : *
4656 : * Items are considered ready if they have no remaining dependencies and
4657 : * they belong in the current restore pass. (See also reduce_dependencies,
4658 : * which applies the same logic one-at-a-time.)
4659 : */
4660 : static void
4661 30 : move_to_ready_heap(TocEntry *pending_list,
4662 : binaryheap *ready_heap,
4663 : RestorePass pass)
4664 : {
4665 : TocEntry *te;
4666 : TocEntry *next_te;
4667 :
4668 2636 : for (te = pending_list->pending_next; te != pending_list; te = next_te)
4669 : {
4670 : /* must save list link before possibly removing te from list */
4671 2606 : next_te = te->pending_next;
4672 :
4673 4174 : if (te->depCount == 0 &&
4674 1568 : _tocEntryRestorePass(te) == pass)
4675 : {
4676 : /* Remove it from pending_list ... */
4677 1518 : pending_list_remove(te);
4678 : /* ... and add to ready_heap */
4679 1518 : binaryheap_add(ready_heap, te);
4680 : }
4681 : }
4682 30 : }
4683 :
4684 : /*
4685 : * Find the next work item (if any) that is capable of being run now,
4686 : * and remove it from the ready_heap.
4687 : *
4688 : * Returns the item, or NULL if nothing is runnable.
4689 : *
4690 : * To qualify, the item must have no remaining dependencies
4691 : * and no requirements for locks that are incompatible with
4692 : * items currently running. Items in the ready_heap are known to have
4693 : * no remaining dependencies, but we have to check for lock conflicts.
4694 : */
4695 : static TocEntry *
4696 2604 : pop_next_work_item(binaryheap *ready_heap,
4697 : ParallelState *pstate)
4698 : {
4699 : /*
4700 : * Search the ready_heap until we find a suitable item. Note that we do a
4701 : * sequential scan through the heap nodes, so even though we will first
4702 : * try to choose the highest-priority item, we might end up picking
4703 : * something with a much lower priority. However, we expect that we will
4704 : * typically be able to pick one of the first few items, which should
4705 : * usually have a relatively high priority.
4706 : */
4707 2778 : for (int i = 0; i < binaryheap_size(ready_heap); i++)
4708 : {
4709 2718 : TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4710 2718 : bool conflicts = false;
4711 :
4712 : /*
4713 : * Check to see if the item would need exclusive lock on something
4714 : * that a currently running item also needs lock on, or vice versa. If
4715 : * so, we don't want to schedule them together.
4716 : */
4717 7968 : for (int k = 0; k < pstate->numWorkers; k++)
4718 : {
4719 5424 : TocEntry *running_te = pstate->te[k];
4720 :
4721 5424 : if (running_te == NULL)
4722 2656 : continue;
4723 5362 : if (has_lock_conflicts(te, running_te) ||
4724 2594 : has_lock_conflicts(running_te, te))
4725 : {
4726 174 : conflicts = true;
4727 174 : break;
4728 : }
4729 : }
4730 :
4731 2718 : if (conflicts)
4732 174 : continue;
4733 :
4734 : /* passed all tests, so this item can run */
4735 2544 : binaryheap_remove_node(ready_heap, i);
4736 2544 : return te;
4737 : }
4738 :
4739 60 : pg_log_debug("no item ready");
4740 60 : return NULL;
4741 : }
4742 :
4743 :
4744 : /*
4745 : * Restore a single TOC item in parallel with others
4746 : *
4747 : * this is run in the worker, i.e. in a thread (Windows) or a separate process
4748 : * (everything else). A worker process executes several such work items during
4749 : * a parallel backup or restore. Once we terminate here and report back that
4750 : * our work is finished, the leader process will assign us a new work item.
4751 : */
4752 : int
4753 2544 : parallel_restore(ArchiveHandle *AH, TocEntry *te)
4754 : {
4755 : int status;
4756 :
4757 : Assert(AH->connection != NULL);
4758 :
4759 : /* Count only errors associated with this TOC entry */
4760 2544 : AH->public.n_errors = 0;
4761 :
4762 : /* Restore the TOC item */
4763 2544 : status = restore_toc_entry(AH, te, true);
4764 :
4765 2544 : return status;
4766 : }
4767 :
4768 :
4769 : /*
4770 : * Callback function that's invoked in the leader process after a step has
4771 : * been parallel restored.
4772 : *
4773 : * Update status and reduce the dependency count of any dependent items.
4774 : */
4775 : static void
4776 2544 : mark_restore_job_done(ArchiveHandle *AH,
4777 : TocEntry *te,
4778 : int status,
4779 : void *callback_data)
4780 : {
4781 2544 : binaryheap *ready_heap = (binaryheap *) callback_data;
4782 :
4783 2544 : pg_log_info("finished item %d %s %s",
4784 : te->dumpId, te->desc, te->tag);
4785 :
4786 2544 : if (status == WORKER_CREATE_DONE)
4787 0 : mark_create_done(AH, te);
4788 2544 : else if (status == WORKER_INHIBIT_DATA)
4789 : {
4790 0 : inhibit_data_for_failed_table(AH, te);
4791 0 : AH->public.n_errors++;
4792 : }
4793 2544 : else if (status == WORKER_IGNORED_ERRORS)
4794 0 : AH->public.n_errors++;
4795 2544 : else if (status != 0)
4796 0 : pg_fatal("worker process failed: exit code %d",
4797 : status);
4798 :
4799 2544 : reduce_dependencies(AH, te, ready_heap);
4800 2544 : }
4801 :
4802 :
4803 : /*
4804 : * Process the dependency information into a form useful for parallel restore.
4805 : *
4806 : * This function takes care of fixing up some missing or badly designed
4807 : * dependencies, and then prepares subsidiary data structures that will be
4808 : * used in the main parallel-restore logic, including:
4809 : * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4810 : * 2. We set up depCount fields that are the number of as-yet-unprocessed
4811 : * dependencies for each TOC entry.
4812 : *
4813 : * We also identify locking dependencies so that we can avoid trying to
4814 : * schedule conflicting items at the same time.
4815 : */
4816 : static void
4817 10 : fix_dependencies(ArchiveHandle *AH)
4818 : {
4819 : TocEntry *te;
4820 : int i;
4821 :
4822 : /*
4823 : * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4824 : * items are marked as not being in any parallel-processing list.
4825 : */
4826 5720 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4827 : {
4828 5710 : te->depCount = te->nDeps;
4829 5710 : te->revDeps = NULL;
4830 5710 : te->nRevDeps = 0;
4831 5710 : te->pending_prev = NULL;
4832 5710 : te->pending_next = NULL;
4833 : }
4834 :
4835 : /*
4836 : * POST_DATA items that are shown as depending on a table need to be
4837 : * re-pointed to depend on that table's data, instead. This ensures they
4838 : * won't get scheduled until the data has been loaded.
4839 : */
4840 10 : repoint_table_dependencies(AH);
4841 :
4842 : /*
4843 : * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4844 : * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4845 : * one BLOB COMMENTS in such files.)
4846 : */
4847 10 : if (AH->version < K_VERS_1_11)
4848 : {
4849 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4850 : {
4851 0 : if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4852 : {
4853 : TocEntry *te2;
4854 :
4855 0 : for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4856 : {
4857 0 : if (strcmp(te2->desc, "BLOBS") == 0)
4858 : {
4859 0 : te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4860 0 : te->dependencies[0] = te2->dumpId;
4861 0 : te->nDeps++;
4862 0 : te->depCount++;
4863 0 : break;
4864 : }
4865 : }
4866 0 : break;
4867 : }
4868 : }
4869 : }
4870 :
4871 : /*
4872 : * At this point we start to build the revDeps reverse-dependency arrays,
4873 : * so all changes of dependencies must be complete.
4874 : */
4875 :
4876 : /*
4877 : * Count the incoming dependencies for each item. Also, it is possible
4878 : * that the dependencies list items that are not in the archive at all
4879 : * (that should not happen in 9.2 and later, but is highly likely in older
4880 : * archives). Subtract such items from the depCounts.
4881 : */
4882 5720 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4883 : {
4884 13054 : for (i = 0; i < te->nDeps; i++)
4885 : {
4886 7344 : DumpId depid = te->dependencies[i];
4887 :
4888 7344 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4889 7318 : AH->tocsByDumpId[depid]->nRevDeps++;
4890 : else
4891 26 : te->depCount--;
4892 : }
4893 : }
4894 :
4895 : /*
4896 : * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4897 : * it as a counter below.
4898 : */
4899 5720 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4900 : {
4901 5710 : if (te->nRevDeps > 0)
4902 2756 : te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4903 5710 : te->nRevDeps = 0;
4904 : }
4905 :
4906 : /*
4907 : * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4908 : * better agree with the loops above.
4909 : */
4910 5720 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4911 : {
4912 13054 : for (i = 0; i < te->nDeps; i++)
4913 : {
4914 7344 : DumpId depid = te->dependencies[i];
4915 :
4916 7344 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4917 : {
4918 7318 : TocEntry *otherte = AH->tocsByDumpId[depid];
4919 :
4920 7318 : otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4921 : }
4922 : }
4923 : }
4924 :
4925 : /*
4926 : * Lastly, work out the locking dependencies.
4927 : */
4928 5720 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4929 : {
4930 5710 : te->lockDeps = NULL;
4931 5710 : te->nLockDeps = 0;
4932 5710 : identify_locking_dependencies(AH, te);
4933 : }
4934 10 : }
4935 :
4936 : /*
4937 : * Change dependencies on table items to depend on table data items instead,
4938 : * but only in POST_DATA items.
4939 : *
4940 : * Also, for any item having such dependency(s), set its dataLength to the
4941 : * largest dataLength of the table data items it depends on. This ensures
4942 : * that parallel restore will prioritize larger jobs (index builds, FK
4943 : * constraint checks, etc) over smaller ones, avoiding situations where we
4944 : * end a restore with only one active job working on a large table.
4945 : */
4946 : static void
4947 10 : repoint_table_dependencies(ArchiveHandle *AH)
4948 : {
4949 : TocEntry *te;
4950 : int i;
4951 : DumpId olddep;
4952 :
4953 5720 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4954 : {
4955 5710 : if (te->section != SECTION_POST_DATA)
4956 4630 : continue;
4957 4274 : for (i = 0; i < te->nDeps; i++)
4958 : {
4959 3194 : olddep = te->dependencies[i];
4960 3194 : if (olddep <= AH->maxDumpId &&
4961 3194 : AH->tableDataId[olddep] != 0)
4962 : {
4963 1958 : DumpId tabledataid = AH->tableDataId[olddep];
4964 1958 : TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4965 :
4966 1958 : te->dependencies[i] = tabledataid;
4967 1958 : te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4968 1958 : pg_log_debug("transferring dependency %d -> %d to %d",
4969 : te->dumpId, olddep, tabledataid);
4970 : }
4971 : }
4972 : }
4973 10 : }
4974 :
4975 : /*
4976 : * Identify which objects we'll need exclusive lock on in order to restore
4977 : * the given TOC entry (*other* than the one identified by the TOC entry
4978 : * itself). Record their dump IDs in the entry's lockDeps[] array.
4979 : */
4980 : static void
4981 5710 : identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4982 : {
4983 : DumpId *lockids;
4984 : int nlockids;
4985 : int i;
4986 :
4987 : /*
4988 : * We only care about this for POST_DATA items. PRE_DATA items are not
4989 : * run in parallel, and DATA items are all independent by assumption.
4990 : */
4991 5710 : if (te->section != SECTION_POST_DATA)
4992 4630 : return;
4993 :
4994 : /* Quick exit if no dependencies at all */
4995 1080 : if (te->nDeps == 0)
4996 10 : return;
4997 :
4998 : /*
4999 : * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
5000 : * and hence require exclusive lock. However, we know that CREATE INDEX
5001 : * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
5002 : * category too ... but today is not that day.)
5003 : */
5004 1070 : if (strcmp(te->desc, "INDEX") == 0)
5005 296 : return;
5006 :
5007 : /*
5008 : * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
5009 : * item listed among its dependencies. Originally all of these would have
5010 : * been TABLE items, but repoint_table_dependencies would have repointed
5011 : * them to the TABLE DATA items if those are present (which they might not
5012 : * be, eg in a schema-only dump). Note that all of the entries we are
5013 : * processing here are POST_DATA; otherwise there might be a significant
5014 : * difference between a dependency on a table and a dependency on its
5015 : * data, so that closer analysis would be needed here.
5016 : */
5017 774 : lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
5018 774 : nlockids = 0;
5019 3396 : for (i = 0; i < te->nDeps; i++)
5020 : {
5021 2622 : DumpId depid = te->dependencies[i];
5022 :
5023 2622 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
5024 2606 : ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
5025 1170 : strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
5026 1760 : lockids[nlockids++] = depid;
5027 : }
5028 :
5029 774 : if (nlockids == 0)
5030 : {
5031 30 : free(lockids);
5032 30 : return;
5033 : }
5034 :
5035 744 : te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
5036 744 : te->nLockDeps = nlockids;
5037 : }
5038 :
5039 : /*
5040 : * Remove the specified TOC entry from the depCounts of items that depend on
5041 : * it, thereby possibly making them ready-to-run. Any pending item that
5042 : * becomes ready should be moved to the ready_heap, if that's provided.
5043 : */
5044 : static void
5045 5710 : reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
5046 : binaryheap *ready_heap)
5047 : {
5048 : int i;
5049 :
5050 5710 : pg_log_debug("reducing dependencies for %d", te->dumpId);
5051 :
5052 13028 : for (i = 0; i < te->nRevDeps; i++)
5053 : {
5054 7318 : TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
5055 :
5056 : Assert(otherte->depCount > 0);
5057 7318 : otherte->depCount--;
5058 :
5059 : /*
5060 : * It's ready if it has no remaining dependencies, and it belongs in
5061 : * the current restore pass, and it is currently a member of the
5062 : * pending list (that check is needed to prevent double restore in
5063 : * some cases where a list-file forces out-of-order restoring).
5064 : * However, if ready_heap == NULL then caller doesn't want any list
5065 : * memberships changed.
5066 : */
5067 7318 : if (otherte->depCount == 0 &&
5068 4230 : _tocEntryRestorePass(otherte) == AH->restorePass &&
5069 4200 : otherte->pending_prev != NULL &&
5070 : ready_heap != NULL)
5071 : {
5072 : /* Remove it from pending list ... */
5073 1026 : pending_list_remove(otherte);
5074 : /* ... and add to ready_heap */
5075 1026 : binaryheap_add(ready_heap, otherte);
5076 : }
5077 : }
5078 5710 : }
5079 :
5080 : /*
5081 : * Set the created flag on the DATA member corresponding to the given
5082 : * TABLE member
5083 : */
5084 : static void
5085 14998 : mark_create_done(ArchiveHandle *AH, TocEntry *te)
5086 : {
5087 14998 : if (AH->tableDataId[te->dumpId] != 0)
5088 : {
5089 11888 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5090 :
5091 11888 : ted->created = true;
5092 : }
5093 14998 : }
5094 :
5095 : /*
5096 : * Mark the DATA member corresponding to the given TABLE member
5097 : * as not wanted
5098 : */
5099 : static void
5100 0 : inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
5101 : {
5102 0 : pg_log_info("table \"%s\" could not be created, will not restore its data",
5103 : te->tag);
5104 :
5105 0 : if (AH->tableDataId[te->dumpId] != 0)
5106 : {
5107 0 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5108 :
5109 0 : ted->reqs = 0;
5110 : }
5111 0 : }
5112 :
5113 : /*
5114 : * Clone and de-clone routines used in parallel restoration.
5115 : *
5116 : * Enough of the structure is cloned to ensure that there is no
5117 : * conflict between different threads each with their own clone.
5118 : */
5119 : ArchiveHandle *
5120 60 : CloneArchive(ArchiveHandle *AH)
5121 : {
5122 : ArchiveHandle *clone;
5123 :
5124 : /* Make a "flat" copy */
5125 60 : clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
5126 60 : memcpy(clone, AH, sizeof(ArchiveHandle));
5127 :
5128 : /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
5129 60 : clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
5130 60 : memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
5131 :
5132 : /* Handle format-independent fields */
5133 60 : memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
5134 :
5135 : /* The clone will have its own connection, so disregard connection state */
5136 60 : clone->connection = NULL;
5137 60 : clone->connCancel = NULL;
5138 60 : clone->currUser = NULL;
5139 60 : clone->currSchema = NULL;
5140 60 : clone->currTableAm = NULL;
5141 60 : clone->currTablespace = NULL;
5142 :
5143 : /* savedPassword must be local in case we change it while connecting */
5144 60 : if (clone->savedPassword)
5145 0 : clone->savedPassword = pg_strdup(clone->savedPassword);
5146 :
5147 : /* clone has its own error count, too */
5148 60 : clone->public.n_errors = 0;
5149 :
5150 : /* clones should not share lo_buf */
5151 60 : clone->lo_buf = NULL;
5152 :
5153 : /*
5154 : * Clone connections disregard --transaction-size; they must commit after
5155 : * each command so that the results are immediately visible to other
5156 : * workers.
5157 : */
5158 60 : clone->public.ropt->txn_size = 0;
5159 :
5160 : /*
5161 : * Connect our new clone object to the database, using the same connection
5162 : * parameters used for the original connection.
5163 : */
5164 60 : ConnectDatabaseAhx((Archive *) clone, &clone->public.ropt->cparams, true);
5165 :
5166 : /* re-establish fixed state */
5167 60 : if (AH->mode == archModeRead)
5168 24 : _doSetFixedOutputState(clone);
5169 : /* in write case, setupDumpWorker will fix up connection state */
5170 :
5171 : /* Let the format-specific code have a chance too */
5172 60 : clone->ClonePtr(clone);
5173 :
5174 : Assert(clone->connection != NULL);
5175 60 : return clone;
5176 : }
5177 :
5178 : /*
5179 : * Release clone-local storage.
5180 : *
5181 : * Note: we assume any clone-local connection was already closed.
5182 : */
5183 : void
5184 60 : DeCloneArchive(ArchiveHandle *AH)
5185 : {
5186 : /* Should not have an open database connection */
5187 : Assert(AH->connection == NULL);
5188 :
5189 : /* Clear format-specific state */
5190 60 : AH->DeClonePtr(AH);
5191 :
5192 : /* Clear state allocated by CloneArchive */
5193 60 : if (AH->sqlparse.curCmd)
5194 6 : destroyPQExpBuffer(AH->sqlparse.curCmd);
5195 :
5196 : /* Clear any connection-local state */
5197 60 : free(AH->currUser);
5198 60 : free(AH->currSchema);
5199 60 : free(AH->currTablespace);
5200 60 : free(AH->currTableAm);
5201 60 : free(AH->savedPassword);
5202 :
5203 60 : free(AH);
5204 60 : }
|