Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_archiver.c
4 : *
5 : * Private implementation of the archiver routines.
6 : *
7 : * See the headers to pg_restore for more details.
8 : *
9 : * Copyright (c) 2000, Philip Warner
10 : * Rights are granted to use this software in any way so long
11 : * as this notice is not removed.
12 : *
13 : * The author is not responsible for loss or damages that may
14 : * result from its use.
15 : *
16 : *
17 : * IDENTIFICATION
18 : * src/bin/pg_dump/pg_backup_archiver.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres_fe.h"
23 :
24 : #include <ctype.h>
25 : #include <fcntl.h>
26 : #include <unistd.h>
27 : #include <sys/stat.h>
28 : #include <sys/wait.h>
29 : #ifdef WIN32
30 : #include <io.h>
31 : #endif
32 :
33 : #include "catalog/pg_class_d.h"
34 : #include "common/string.h"
35 : #include "compress_io.h"
36 : #include "dumputils.h"
37 : #include "fe_utils/string_utils.h"
38 : #include "lib/binaryheap.h"
39 : #include "lib/stringinfo.h"
40 : #include "libpq/libpq-fs.h"
41 : #include "parallel.h"
42 : #include "pg_backup_archiver.h"
43 : #include "pg_backup_db.h"
44 : #include "pg_backup_utils.h"
45 :
46 : #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
47 : #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
48 :
49 : #define TOC_PREFIX_NONE ""
50 : #define TOC_PREFIX_DATA "Data for "
51 : #define TOC_PREFIX_STATS "Statistics for "
52 :
53 : static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
54 : const pg_compress_specification compression_spec,
55 : bool dosync, ArchiveMode mode,
56 : SetupWorkerPtrType setupWorkerPtr,
57 : DataDirSyncMethod sync_method);
58 : static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
59 : static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
60 : static char *sanitize_line(const char *str, bool want_hyphen);
61 : static void _doSetFixedOutputState(ArchiveHandle *AH);
62 : static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
63 : static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
64 : static void _becomeUser(ArchiveHandle *AH, const char *user);
65 : static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
66 : static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
67 : static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
68 : static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
69 : static void _printTableAccessMethodNoStorage(ArchiveHandle *AH,
70 : TocEntry *te);
71 : static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
72 : static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
73 : static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
74 : static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
75 : static RestorePass _tocEntryRestorePass(TocEntry *te);
76 : static bool _tocEntryIsACL(TocEntry *te);
77 : static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
78 : static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
79 : static bool is_load_via_partition_root(TocEntry *te);
80 : static void buildTocEntryArrays(ArchiveHandle *AH);
81 : static void _moveBefore(TocEntry *pos, TocEntry *te);
82 : static int _discoverArchiveFormat(ArchiveHandle *AH);
83 :
84 : static int RestoringToDB(ArchiveHandle *AH);
85 : static void dump_lo_buf(ArchiveHandle *AH);
86 : static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
87 : static void SetOutput(ArchiveHandle *AH, const char *filename,
88 : const pg_compress_specification compression_spec);
89 : static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
90 : static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
91 :
92 : static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
93 : static void restore_toc_entries_prefork(ArchiveHandle *AH,
94 : TocEntry *pending_list);
95 : static void restore_toc_entries_parallel(ArchiveHandle *AH,
96 : ParallelState *pstate,
97 : TocEntry *pending_list);
98 : static void restore_toc_entries_postfork(ArchiveHandle *AH,
99 : TocEntry *pending_list);
100 : static void pending_list_header_init(TocEntry *l);
101 : static void pending_list_append(TocEntry *l, TocEntry *te);
102 : static void pending_list_remove(TocEntry *te);
103 : static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
104 : static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
105 : static void move_to_ready_heap(TocEntry *pending_list,
106 : binaryheap *ready_heap,
107 : RestorePass pass);
108 : static TocEntry *pop_next_work_item(binaryheap *ready_heap,
109 : ParallelState *pstate);
110 : static void mark_dump_job_done(ArchiveHandle *AH,
111 : TocEntry *te,
112 : int status,
113 : void *callback_data);
114 : static void mark_restore_job_done(ArchiveHandle *AH,
115 : TocEntry *te,
116 : int status,
117 : void *callback_data);
118 : static void fix_dependencies(ArchiveHandle *AH);
119 : static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
120 : static void repoint_table_dependencies(ArchiveHandle *AH);
121 : static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
122 : static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
123 : binaryheap *ready_heap);
124 : static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
125 : static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
126 :
127 : static void StrictNamesCheck(RestoreOptions *ropt);
128 :
129 :
130 : /*
131 : * Allocate a new DumpOptions block containing all default values.
132 : */
133 : DumpOptions *
134 84 : NewDumpOptions(void)
135 : {
136 84 : DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
137 :
138 84 : InitDumpOptions(opts);
139 84 : return opts;
140 : }
141 :
142 : /*
143 : * Initialize a DumpOptions struct to all default values
144 : */
145 : void
146 492 : InitDumpOptions(DumpOptions *opts)
147 : {
148 492 : memset(opts, 0, sizeof(DumpOptions));
149 : /* set any fields that shouldn't default to zeroes */
150 492 : opts->include_everything = true;
151 492 : opts->cparams.promptPassword = TRI_DEFAULT;
152 492 : opts->dumpSections = DUMP_UNSECTIONED;
153 492 : opts->dumpSchema = true;
154 492 : opts->dumpData = true;
155 492 : opts->dumpStatistics = true;
156 492 : }
157 :
158 : /*
159 : * Create a freshly allocated DumpOptions with options equivalent to those
160 : * found in the given RestoreOptions.
161 : */
162 : DumpOptions *
163 84 : dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
164 : {
165 84 : DumpOptions *dopt = NewDumpOptions();
166 :
167 : /* this is the inverse of what's at the end of pg_dump.c's main() */
168 84 : dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
169 84 : dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
170 84 : dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
171 84 : dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
172 84 : dopt->cparams.promptPassword = ropt->cparams.promptPassword;
173 84 : dopt->outputClean = ropt->dropSchema;
174 84 : dopt->dumpData = ropt->dumpData;
175 84 : dopt->dumpSchema = ropt->dumpSchema;
176 84 : dopt->dumpSections = ropt->dumpSections;
177 84 : dopt->dumpStatistics = ropt->dumpStatistics;
178 84 : dopt->if_exists = ropt->if_exists;
179 84 : dopt->column_inserts = ropt->column_inserts;
180 84 : dopt->aclsSkip = ropt->aclsSkip;
181 84 : dopt->outputSuperuser = ropt->superuser;
182 84 : dopt->outputCreateDB = ropt->createDB;
183 84 : dopt->outputNoOwner = ropt->noOwner;
184 84 : dopt->outputNoTableAm = ropt->noTableAm;
185 84 : dopt->outputNoTablespaces = ropt->noTablespace;
186 84 : dopt->disable_triggers = ropt->disable_triggers;
187 84 : dopt->use_setsessauth = ropt->use_setsessauth;
188 84 : dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
189 84 : dopt->dump_inserts = ropt->dump_inserts;
190 84 : dopt->no_comments = ropt->no_comments;
191 84 : dopt->no_publications = ropt->no_publications;
192 84 : dopt->no_security_labels = ropt->no_security_labels;
193 84 : dopt->no_subscriptions = ropt->no_subscriptions;
194 84 : dopt->lockWaitTimeout = ropt->lockWaitTimeout;
195 84 : dopt->include_everything = ropt->include_everything;
196 84 : dopt->enable_row_security = ropt->enable_row_security;
197 84 : dopt->sequence_data = ropt->sequence_data;
198 :
199 84 : return dopt;
200 : }
201 :
202 :
203 : /*
204 : * Wrapper functions.
205 : *
206 : * The objective is to make writing new formats and dumpers as simple
207 : * as possible, if necessary at the expense of extra function calls etc.
208 : *
209 : */
210 :
211 : /*
212 : * The dump worker setup needs lots of knowledge of the internals of pg_dump,
213 : * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
214 : * setup doesn't need to know anything much, so it's defined here.
215 : */
216 : static void
217 20 : setupRestoreWorker(Archive *AHX)
218 : {
219 20 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
220 :
221 20 : AH->ReopenPtr(AH);
222 20 : }
223 :
224 :
225 : /* Create a new archive */
226 : /* Public */
227 : Archive *
228 360 : CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
229 : const pg_compress_specification compression_spec,
230 : bool dosync, ArchiveMode mode,
231 : SetupWorkerPtrType setupDumpWorker,
232 : DataDirSyncMethod sync_method)
233 :
234 : {
235 360 : ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
236 : dosync, mode, setupDumpWorker, sync_method);
237 :
238 358 : return (Archive *) AH;
239 : }
240 :
241 : /* Open an existing archive */
242 : /* Public */
243 : Archive *
244 80 : OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
245 : {
246 : ArchiveHandle *AH;
247 80 : pg_compress_specification compression_spec = {0};
248 :
249 80 : compression_spec.algorithm = PG_COMPRESSION_NONE;
250 80 : AH = _allocAH(FileSpec, fmt, compression_spec, true,
251 : archModeRead, setupRestoreWorker,
252 : DATA_DIR_SYNC_METHOD_FSYNC);
253 :
254 80 : return (Archive *) AH;
255 : }
256 :
257 : /* Public */
258 : void
259 398 : CloseArchive(Archive *AHX)
260 : {
261 398 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
262 :
263 398 : AH->ClosePtr(AH);
264 :
265 : /* Close the output */
266 398 : errno = 0;
267 398 : if (!EndCompressFileHandle(AH->OF))
268 0 : pg_fatal("could not close output file: %m");
269 398 : }
270 :
271 : /* Public */
272 : void
273 766 : SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
274 : {
275 : /* Caller can omit dump options, in which case we synthesize them */
276 766 : if (dopt == NULL && ropt != NULL)
277 84 : dopt = dumpOptionsFromRestoreOptions(ropt);
278 :
279 : /* Save options for later access */
280 766 : AH->dopt = dopt;
281 766 : AH->ropt = ropt;
282 766 : }
283 :
284 : /* Public */
285 : void
286 392 : ProcessArchiveRestoreOptions(Archive *AHX)
287 : {
288 392 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
289 392 : RestoreOptions *ropt = AH->public.ropt;
290 : TocEntry *te;
291 : teSection curSection;
292 :
293 : /* Decide which TOC entries will be dumped/restored, and mark them */
294 392 : curSection = SECTION_PRE_DATA;
295 99368 : for (te = AH->toc->next; te != AH->toc; te = te->next)
296 : {
297 : /*
298 : * When writing an archive, we also take this opportunity to check
299 : * that we have generated the entries in a sane order that respects
300 : * the section divisions. When reading, don't complain, since buggy
301 : * old versions of pg_dump might generate out-of-order archives.
302 : */
303 98976 : if (AH->mode != archModeRead)
304 : {
305 85506 : switch (te->section)
306 : {
307 16484 : case SECTION_NONE:
308 : /* ok to be anywhere */
309 16484 : break;
310 37242 : case SECTION_PRE_DATA:
311 37242 : if (curSection != SECTION_PRE_DATA)
312 0 : pg_log_warning("archive items not in correct section order");
313 37242 : break;
314 16872 : case SECTION_DATA:
315 16872 : if (curSection == SECTION_POST_DATA)
316 0 : pg_log_warning("archive items not in correct section order");
317 16872 : break;
318 14908 : case SECTION_POST_DATA:
319 : /* ok no matter which section we were in */
320 14908 : break;
321 0 : default:
322 0 : pg_fatal("unexpected section code %d",
323 : (int) te->section);
324 : break;
325 : }
326 13470 : }
327 :
328 98976 : if (te->section != SECTION_NONE)
329 81018 : curSection = te->section;
330 :
331 98976 : te->reqs = _tocEntryRequired(te, curSection, AH);
332 : }
333 :
334 : /* Enforce strict names checking */
335 392 : if (ropt->strict_names)
336 0 : StrictNamesCheck(ropt);
337 392 : }
338 :
339 : /* Public */
340 : void
341 330 : RestoreArchive(Archive *AHX)
342 : {
343 330 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
344 330 : RestoreOptions *ropt = AH->public.ropt;
345 : bool parallel_mode;
346 : TocEntry *te;
347 : CompressFileHandle *sav;
348 :
349 330 : AH->stage = STAGE_INITIALIZING;
350 :
351 : /*
352 : * If we're going to do parallel restore, there are some restrictions.
353 : */
354 330 : parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
355 330 : if (parallel_mode)
356 : {
357 : /* We haven't got round to making this work for all archive formats */
358 8 : if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
359 0 : pg_fatal("parallel restore is not supported with this archive file format");
360 :
361 : /* Doesn't work if the archive represents dependencies as OIDs */
362 8 : if (AH->version < K_VERS_1_8)
363 0 : pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
364 :
365 : /*
366 : * It's also not gonna work if we can't reopen the input file, so
367 : * let's try that immediately.
368 : */
369 8 : AH->ReopenPtr(AH);
370 : }
371 :
372 : /*
373 : * Make sure we won't need (de)compression we haven't got
374 : */
375 330 : if (AH->PrintTocDataPtr != NULL)
376 : {
377 59100 : for (te = AH->toc->next; te != AH->toc; te = te->next)
378 : {
379 58962 : if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
380 : {
381 192 : char *errmsg = supports_compression(AH->compression_spec);
382 :
383 192 : if (errmsg)
384 0 : pg_fatal("cannot restore from compressed archive (%s)",
385 : errmsg);
386 : else
387 192 : break;
388 : }
389 : }
390 : }
391 :
392 : /*
393 : * Prepare index arrays, so we can assume we have them throughout restore.
394 : * It's possible we already did this, though.
395 : */
396 330 : if (AH->tocsByDumpId == NULL)
397 314 : buildTocEntryArrays(AH);
398 :
399 : /*
400 : * If we're using a DB connection, then connect it.
401 : */
402 330 : if (ropt->useDB)
403 : {
404 32 : pg_log_info("connecting to database for restore");
405 32 : if (AH->version < K_VERS_1_3)
406 0 : pg_fatal("direct database connections are not supported in pre-1.3 archives");
407 :
408 : /*
409 : * We don't want to guess at whether the dump will successfully
410 : * restore; allow the attempt regardless of the version of the restore
411 : * target.
412 : */
413 32 : AHX->minRemoteVersion = 0;
414 32 : AHX->maxRemoteVersion = 9999999;
415 :
416 32 : ConnectDatabase(AHX, &ropt->cparams, false);
417 :
418 : /*
419 : * If we're talking to the DB directly, don't send comments since they
420 : * obscure SQL when displaying errors
421 : */
422 32 : AH->noTocComments = 1;
423 : }
424 :
425 : /*
426 : * Work out if we have an implied schema-less restore. This can happen if
427 : * the dump excluded the schema or the user has used a toc list to exclude
428 : * all of the schema data. All we do is look for schema entries - if none
429 : * are found then we unset the dumpSchema flag.
430 : *
431 : * We could scan for wanted TABLE entries, but that is not the same as
432 : * data-only. At this stage, it seems unnecessary (6-Mar-2001).
433 : */
434 330 : if (ropt->dumpSchema)
435 : {
436 312 : bool no_schema_found = true;
437 :
438 2680 : for (te = AH->toc->next; te != AH->toc; te = te->next)
439 : {
440 2638 : if ((te->reqs & REQ_SCHEMA) != 0)
441 : {
442 270 : no_schema_found = false;
443 270 : break;
444 : }
445 : }
446 312 : if (no_schema_found)
447 : {
448 42 : ropt->dumpSchema = false;
449 42 : pg_log_info("implied no-schema restore");
450 : }
451 : }
452 :
453 : /*
454 : * Setup the output file if necessary.
455 : */
456 330 : sav = SaveOutput(AH);
457 330 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
458 276 : SetOutput(AH, ropt->filename, ropt->compression_spec);
459 :
460 330 : ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
461 :
462 330 : if (AH->archiveRemoteVersion)
463 330 : ahprintf(AH, "-- Dumped from database version %s\n",
464 : AH->archiveRemoteVersion);
465 330 : if (AH->archiveDumpVersion)
466 330 : ahprintf(AH, "-- Dumped by pg_dump version %s\n",
467 : AH->archiveDumpVersion);
468 :
469 330 : ahprintf(AH, "\n");
470 :
471 330 : if (AH->public.verbose)
472 46 : dumpTimestamp(AH, "Started on", AH->createDate);
473 :
474 330 : if (ropt->single_txn)
475 : {
476 0 : if (AH->connection)
477 0 : StartTransaction(AHX);
478 : else
479 0 : ahprintf(AH, "BEGIN;\n\n");
480 : }
481 :
482 : /*
483 : * Establish important parameter values right away.
484 : */
485 330 : _doSetFixedOutputState(AH);
486 :
487 330 : AH->stage = STAGE_PROCESSING;
488 :
489 : /*
490 : * Drop the items at the start, in reverse order
491 : */
492 330 : if (ropt->dropSchema)
493 : {
494 2446 : for (te = AH->toc->prev; te != AH->toc; te = te->prev)
495 : {
496 2416 : AH->currentTE = te;
497 :
498 : /*
499 : * In createDB mode, issue a DROP *only* for the database as a
500 : * whole. Issuing drops against anything else would be wrong,
501 : * because at this point we're connected to the wrong database.
502 : * (The DATABASE PROPERTIES entry, if any, should be treated like
503 : * the DATABASE entry.)
504 : */
505 2416 : if (ropt->createDB)
506 : {
507 914 : if (strcmp(te->desc, "DATABASE") != 0 &&
508 894 : strcmp(te->desc, "DATABASE PROPERTIES") != 0)
509 878 : continue;
510 : }
511 :
512 : /* Otherwise, drop anything that's selected and has a dropStmt */
513 1538 : if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
514 : {
515 608 : bool not_allowed_in_txn = false;
516 :
517 608 : pg_log_info("dropping %s %s", te->desc, te->tag);
518 :
519 : /*
520 : * In --transaction-size mode, we have to temporarily exit our
521 : * transaction block to drop objects that can't be dropped
522 : * within a transaction.
523 : */
524 608 : if (ropt->txn_size > 0)
525 : {
526 32 : if (strcmp(te->desc, "DATABASE") == 0 ||
527 16 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
528 : {
529 32 : not_allowed_in_txn = true;
530 32 : if (AH->connection)
531 32 : CommitTransaction(AHX);
532 : else
533 0 : ahprintf(AH, "COMMIT;\n");
534 : }
535 : }
536 :
537 : /* Select owner and schema as necessary */
538 608 : _becomeOwner(AH, te);
539 608 : _selectOutputSchema(AH, te->namespace);
540 :
541 : /*
542 : * Now emit the DROP command, if the object has one. Note we
543 : * don't necessarily emit it verbatim; at this point we add an
544 : * appropriate IF EXISTS clause, if the user requested it.
545 : */
546 608 : if (strcmp(te->desc, "BLOB METADATA") == 0)
547 : {
548 : /* We must generate the per-blob commands */
549 8 : if (ropt->if_exists)
550 4 : IssueCommandPerBlob(AH, te,
551 : "SELECT pg_catalog.lo_unlink(oid) "
552 : "FROM pg_catalog.pg_largeobject_metadata "
553 : "WHERE oid = '", "'");
554 : else
555 4 : IssueCommandPerBlob(AH, te,
556 : "SELECT pg_catalog.lo_unlink('",
557 : "')");
558 : }
559 600 : else if (*te->dropStmt != '\0')
560 : {
561 576 : if (!ropt->if_exists ||
562 274 : strncmp(te->dropStmt, "--", 2) == 0)
563 : {
564 : /*
565 : * Without --if-exists, or if it's just a comment (as
566 : * happens for the public schema), print the dropStmt
567 : * as-is.
568 : */
569 304 : ahprintf(AH, "%s", te->dropStmt);
570 : }
571 : else
572 : {
573 : /*
574 : * Inject an appropriate spelling of "if exists". For
575 : * old-style large objects, we have a routine that
576 : * knows how to do it, without depending on
577 : * te->dropStmt; use that. For other objects we need
578 : * to parse the command.
579 : */
580 272 : if (strcmp(te->desc, "BLOB") == 0)
581 : {
582 0 : DropLOIfExists(AH, te->catalogId.oid);
583 : }
584 : else
585 : {
586 272 : char *dropStmt = pg_strdup(te->dropStmt);
587 272 : char *dropStmtOrig = dropStmt;
588 272 : PQExpBuffer ftStmt = createPQExpBuffer();
589 :
590 : /*
591 : * Need to inject IF EXISTS clause after ALTER
592 : * TABLE part in ALTER TABLE .. DROP statement
593 : */
594 272 : if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
595 : {
596 36 : appendPQExpBufferStr(ftStmt,
597 : "ALTER TABLE IF EXISTS");
598 36 : dropStmt = dropStmt + 11;
599 : }
600 :
601 : /*
602 : * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
603 : * not support the IF EXISTS clause, and therefore
604 : * we simply emit the original command for DEFAULT
605 : * objects (modulo the adjustment made above).
606 : *
607 : * Likewise, don't mess with DATABASE PROPERTIES.
608 : *
609 : * If we used CREATE OR REPLACE VIEW as a means of
610 : * quasi-dropping an ON SELECT rule, that should
611 : * be emitted unchanged as well.
612 : *
613 : * For other object types, we need to extract the
614 : * first part of the DROP which includes the
615 : * object type. Most of the time this matches
616 : * te->desc, so search for that; however for the
617 : * different kinds of CONSTRAINTs, we know to
618 : * search for hardcoded "DROP CONSTRAINT" instead.
619 : */
620 272 : if (strcmp(te->desc, "DEFAULT") == 0 ||
621 266 : strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
622 266 : strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
623 6 : appendPQExpBufferStr(ftStmt, dropStmt);
624 : else
625 : {
626 : char buffer[40];
627 : char *mark;
628 :
629 266 : if (strcmp(te->desc, "CONSTRAINT") == 0 ||
630 240 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
631 240 : strcmp(te->desc, "FK CONSTRAINT") == 0)
632 30 : strcpy(buffer, "DROP CONSTRAINT");
633 : else
634 236 : snprintf(buffer, sizeof(buffer), "DROP %s",
635 : te->desc);
636 :
637 266 : mark = strstr(dropStmt, buffer);
638 :
639 266 : if (mark)
640 : {
641 266 : *mark = '\0';
642 266 : appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
643 : dropStmt, buffer,
644 266 : mark + strlen(buffer));
645 : }
646 : else
647 : {
648 : /* complain and emit unmodified command */
649 0 : pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
650 : dropStmtOrig);
651 0 : appendPQExpBufferStr(ftStmt, dropStmt);
652 : }
653 : }
654 :
655 272 : ahprintf(AH, "%s", ftStmt->data);
656 :
657 272 : destroyPQExpBuffer(ftStmt);
658 272 : pg_free(dropStmtOrig);
659 : }
660 : }
661 : }
662 :
663 : /*
664 : * In --transaction-size mode, re-establish the transaction
665 : * block if needed; otherwise, commit after every N drops.
666 : */
667 608 : if (ropt->txn_size > 0)
668 : {
669 32 : if (not_allowed_in_txn)
670 : {
671 32 : if (AH->connection)
672 32 : StartTransaction(AHX);
673 : else
674 0 : ahprintf(AH, "BEGIN;\n");
675 32 : AH->txnCount = 0;
676 : }
677 0 : else if (++AH->txnCount >= ropt->txn_size)
678 : {
679 0 : if (AH->connection)
680 : {
681 0 : CommitTransaction(AHX);
682 0 : StartTransaction(AHX);
683 : }
684 : else
685 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n");
686 0 : AH->txnCount = 0;
687 : }
688 : }
689 : }
690 : }
691 :
692 : /*
693 : * _selectOutputSchema may have set currSchema to reflect the effect
694 : * of a "SET search_path" command it emitted. However, by now we may
695 : * have dropped that schema; or it might not have existed in the first
696 : * place. In either case the effective value of search_path will not
697 : * be what we think. Forcibly reset currSchema so that we will
698 : * re-establish the search_path setting when needed (after creating
699 : * the schema).
700 : *
701 : * If we treated users as pg_dump'able objects then we'd need to reset
702 : * currUser here too.
703 : */
704 30 : free(AH->currSchema);
705 30 : AH->currSchema = NULL;
706 : }
707 :
708 330 : if (parallel_mode)
709 : {
710 : /*
711 : * In parallel mode, turn control over to the parallel-restore logic.
712 : */
713 : ParallelState *pstate;
714 : TocEntry pending_list;
715 :
716 : /* The archive format module may need some setup for this */
717 8 : if (AH->PrepParallelRestorePtr)
718 8 : AH->PrepParallelRestorePtr(AH);
719 :
720 8 : pending_list_header_init(&pending_list);
721 :
722 : /* This runs PRE_DATA items and then disconnects from the database */
723 8 : restore_toc_entries_prefork(AH, &pending_list);
724 : Assert(AH->connection == NULL);
725 :
726 : /* ParallelBackupStart() will actually fork the processes */
727 8 : pstate = ParallelBackupStart(AH);
728 8 : restore_toc_entries_parallel(AH, pstate, &pending_list);
729 8 : ParallelBackupEnd(AH, pstate);
730 :
731 : /* reconnect the leader and see if we missed something */
732 8 : restore_toc_entries_postfork(AH, &pending_list);
733 : Assert(AH->connection != NULL);
734 : }
735 : else
736 : {
737 : /*
738 : * In serial mode, process everything in three phases: normal items,
739 : * then ACLs, then post-ACL items. We might be able to skip one or
740 : * both extra phases in some cases, eg data-only restores.
741 : */
742 322 : bool haveACL = false;
743 322 : bool havePostACL = false;
744 :
745 86528 : for (te = AH->toc->next; te != AH->toc; te = te->next)
746 : {
747 86208 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
748 2824 : continue; /* ignore if not to be dumped at all */
749 :
750 83384 : switch (_tocEntryRestorePass(te))
751 : {
752 78850 : case RESTORE_PASS_MAIN:
753 78850 : (void) restore_toc_entry(AH, te, false);
754 78848 : break;
755 3900 : case RESTORE_PASS_ACL:
756 3900 : haveACL = true;
757 3900 : break;
758 634 : case RESTORE_PASS_POST_ACL:
759 634 : havePostACL = true;
760 634 : break;
761 : }
762 86206 : }
763 :
764 320 : if (haveACL)
765 : {
766 83754 : for (te = AH->toc->next; te != AH->toc; te = te->next)
767 : {
768 165238 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
769 81630 : _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
770 3900 : (void) restore_toc_entry(AH, te, false);
771 : }
772 : }
773 :
774 320 : if (havePostACL)
775 : {
776 53608 : for (te = AH->toc->next; te != AH->toc; te = te->next)
777 : {
778 106288 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
779 52758 : _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
780 634 : (void) restore_toc_entry(AH, te, false);
781 : }
782 : }
783 : }
784 :
785 : /*
786 : * Close out any persistent transaction we may have. While these two
787 : * cases are started in different places, we can end both cases here.
788 : */
789 328 : if (ropt->single_txn || ropt->txn_size > 0)
790 : {
791 24 : if (AH->connection)
792 24 : CommitTransaction(AHX);
793 : else
794 0 : ahprintf(AH, "COMMIT;\n\n");
795 : }
796 :
797 328 : if (AH->public.verbose)
798 46 : dumpTimestamp(AH, "Completed on", time(NULL));
799 :
800 328 : ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
801 :
802 : /*
803 : * Clean up & we're done.
804 : */
805 328 : AH->stage = STAGE_FINALIZING;
806 :
807 328 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
808 276 : RestoreOutput(AH, sav);
809 :
810 328 : if (ropt->useDB)
811 32 : DisconnectDatabase(&AH->public);
812 328 : }
813 :
814 : /*
815 : * Restore a single TOC item. Used in both parallel and non-parallel restore;
816 : * is_parallel is true if we are in a worker child process.
817 : *
818 : * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
819 : * the parallel parent has to make the corresponding status update.
820 : */
821 : static int
822 83652 : restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
823 : {
824 83652 : RestoreOptions *ropt = AH->public.ropt;
825 83652 : int status = WORKER_OK;
826 : int reqs;
827 : bool defnDumped;
828 :
829 83652 : AH->currentTE = te;
830 :
831 : /* Dump any relevant dump warnings to stderr */
832 83652 : if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
833 : {
834 0 : if (ropt->dumpSchema && te->defn != NULL && strlen(te->defn) != 0)
835 0 : pg_log_warning("warning from original dump file: %s", te->defn);
836 0 : else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
837 0 : pg_log_warning("warning from original dump file: %s", te->copyStmt);
838 : }
839 :
840 : /* Work out what, if anything, we want from this entry */
841 83652 : reqs = te->reqs;
842 :
843 83652 : defnDumped = false;
844 :
845 : /*
846 : * If it has a schema component that we want, then process that
847 : */
848 83652 : if ((reqs & REQ_SCHEMA) != 0)
849 : {
850 62884 : bool object_is_db = false;
851 :
852 : /*
853 : * In --transaction-size mode, must exit our transaction block to
854 : * create a database or set its properties.
855 : */
856 62884 : if (strcmp(te->desc, "DATABASE") == 0 ||
857 62796 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
858 : {
859 120 : object_is_db = true;
860 120 : if (ropt->txn_size > 0)
861 : {
862 48 : if (AH->connection)
863 48 : CommitTransaction(&AH->public);
864 : else
865 0 : ahprintf(AH, "COMMIT;\n\n");
866 : }
867 : }
868 :
869 : /* Show namespace in log message if available */
870 62884 : if (te->namespace)
871 60404 : pg_log_info("creating %s \"%s.%s\"",
872 : te->desc, te->namespace, te->tag);
873 : else
874 2480 : pg_log_info("creating %s \"%s\"",
875 : te->desc, te->tag);
876 :
877 62884 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
878 62884 : defnDumped = true;
879 :
880 62884 : if (strcmp(te->desc, "TABLE") == 0)
881 : {
882 9912 : if (AH->lastErrorTE == te)
883 : {
884 : /*
885 : * We failed to create the table. If
886 : * --no-data-for-failed-tables was given, mark the
887 : * corresponding TABLE DATA to be ignored.
888 : *
889 : * In the parallel case this must be done in the parent, so we
890 : * just set the return value.
891 : */
892 0 : if (ropt->noDataForFailedTables)
893 : {
894 0 : if (is_parallel)
895 0 : status = WORKER_INHIBIT_DATA;
896 : else
897 0 : inhibit_data_for_failed_table(AH, te);
898 : }
899 : }
900 : else
901 : {
902 : /*
903 : * We created the table successfully. Mark the corresponding
904 : * TABLE DATA for possible truncation.
905 : *
906 : * In the parallel case this must be done in the parent, so we
907 : * just set the return value.
908 : */
909 9912 : if (is_parallel)
910 0 : status = WORKER_CREATE_DONE;
911 : else
912 9912 : mark_create_done(AH, te);
913 : }
914 : }
915 :
916 : /*
917 : * If we created a DB, connect to it. Also, if we changed DB
918 : * properties, reconnect to ensure that relevant GUC settings are
919 : * applied to our session. (That also restarts the transaction block
920 : * in --transaction-size mode.)
921 : */
922 62884 : if (object_is_db)
923 : {
924 120 : pg_log_info("connecting to new database \"%s\"", te->tag);
925 120 : _reconnectToDB(AH, te->tag);
926 : }
927 : }
928 :
929 : /*
930 : * If it has a data component that we want, then process that
931 : */
932 83652 : if ((reqs & REQ_DATA) != 0)
933 : {
934 : /*
935 : * hadDumper will be set if there is genuine data component for this
936 : * node. Otherwise, we need to check the defn field for statements
937 : * that need to be executed in data-only restores.
938 : */
939 8984 : if (te->hadDumper)
940 : {
941 : /*
942 : * If we can output the data, then restore it.
943 : */
944 7896 : if (AH->PrintTocDataPtr != NULL)
945 : {
946 7896 : _printTocEntry(AH, te, TOC_PREFIX_DATA);
947 :
948 7896 : if (strcmp(te->desc, "BLOBS") == 0 ||
949 7752 : strcmp(te->desc, "BLOB COMMENTS") == 0)
950 : {
951 144 : pg_log_info("processing %s", te->desc);
952 :
953 144 : _selectOutputSchema(AH, "pg_catalog");
954 :
955 : /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
956 144 : if (strcmp(te->desc, "BLOB COMMENTS") == 0)
957 0 : AH->outputKind = OUTPUT_OTHERDATA;
958 :
959 144 : AH->PrintTocDataPtr(AH, te);
960 :
961 144 : AH->outputKind = OUTPUT_SQLCMDS;
962 : }
963 : else
964 : {
965 : bool use_truncate;
966 :
967 7752 : _disableTriggersIfNecessary(AH, te);
968 :
969 : /* Select owner and schema as necessary */
970 7752 : _becomeOwner(AH, te);
971 7752 : _selectOutputSchema(AH, te->namespace);
972 :
973 7752 : pg_log_info("processing data for table \"%s.%s\"",
974 : te->namespace, te->tag);
975 :
976 : /*
977 : * In parallel restore, if we created the table earlier in
978 : * this run (so that we know it is empty) and we are not
979 : * restoring a load-via-partition-root data item then we
980 : * wrap the COPY in a transaction and precede it with a
981 : * TRUNCATE. If wal_level is set to minimal this prevents
982 : * WAL-logging the COPY. This obtains a speedup similar
983 : * to that from using single_txn mode in non-parallel
984 : * restores.
985 : *
986 : * We mustn't do this for load-via-partition-root cases
987 : * because some data might get moved across partition
988 : * boundaries, risking deadlock and/or loss of previously
989 : * loaded data. (We assume that all partitions of a
990 : * partitioned table will be treated the same way.)
991 : */
992 7784 : use_truncate = is_parallel && te->created &&
993 32 : !is_load_via_partition_root(te);
994 :
995 7752 : if (use_truncate)
996 : {
997 : /*
998 : * Parallel restore is always talking directly to a
999 : * server, so no need to see if we should issue BEGIN.
1000 : */
1001 20 : StartTransaction(&AH->public);
1002 :
1003 : /*
1004 : * Issue TRUNCATE with ONLY so that child tables are
1005 : * not wiped.
1006 : */
1007 20 : ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1008 20 : fmtQualifiedId(te->namespace, te->tag));
1009 : }
1010 :
1011 : /*
1012 : * If we have a copy statement, use it.
1013 : */
1014 7752 : if (te->copyStmt && strlen(te->copyStmt) > 0)
1015 : {
1016 7610 : ahprintf(AH, "%s", te->copyStmt);
1017 7610 : AH->outputKind = OUTPUT_COPYDATA;
1018 : }
1019 : else
1020 142 : AH->outputKind = OUTPUT_OTHERDATA;
1021 :
1022 7752 : AH->PrintTocDataPtr(AH, te);
1023 :
1024 : /*
1025 : * Terminate COPY if needed.
1026 : */
1027 15358 : if (AH->outputKind == OUTPUT_COPYDATA &&
1028 7608 : RestoringToDB(AH))
1029 18 : EndDBCopyMode(&AH->public, te->tag);
1030 7750 : AH->outputKind = OUTPUT_SQLCMDS;
1031 :
1032 : /* close out the transaction started above */
1033 7750 : if (use_truncate)
1034 20 : CommitTransaction(&AH->public);
1035 :
1036 7750 : _enableTriggersIfNecessary(AH, te);
1037 : }
1038 : }
1039 : }
1040 1088 : else if (!defnDumped)
1041 : {
1042 : /* If we haven't already dumped the defn part, do so now */
1043 1088 : pg_log_info("executing %s %s", te->desc, te->tag);
1044 1088 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
1045 : }
1046 : }
1047 :
1048 : /*
1049 : * If it has a statistics component that we want, then process that
1050 : */
1051 83650 : if ((reqs & REQ_STATS) != 0)
1052 11754 : _printTocEntry(AH, te, TOC_PREFIX_STATS);
1053 :
1054 : /*
1055 : * If we emitted anything for this TOC entry, that counts as one action
1056 : * against the transaction-size limit. Commit if it's time to.
1057 : */
1058 83650 : if ((reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 && ropt->txn_size > 0)
1059 : {
1060 6178 : if (++AH->txnCount >= ropt->txn_size)
1061 : {
1062 18 : if (AH->connection)
1063 : {
1064 18 : CommitTransaction(&AH->public);
1065 18 : StartTransaction(&AH->public);
1066 : }
1067 : else
1068 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1069 18 : AH->txnCount = 0;
1070 : }
1071 : }
1072 :
1073 83650 : if (AH->public.n_errors > 0 && status == WORKER_OK)
1074 0 : status = WORKER_IGNORED_ERRORS;
1075 :
1076 83650 : return status;
1077 : }
1078 :
1079 : /*
1080 : * Allocate a new RestoreOptions block.
1081 : * This is mainly so we can initialize it, but also for future expansion,
1082 : */
1083 : RestoreOptions *
1084 458 : NewRestoreOptions(void)
1085 : {
1086 : RestoreOptions *opts;
1087 :
1088 458 : opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
1089 :
1090 : /* set any fields that shouldn't default to zeroes */
1091 458 : opts->format = archUnknown;
1092 458 : opts->cparams.promptPassword = TRI_DEFAULT;
1093 458 : opts->dumpSections = DUMP_UNSECTIONED;
1094 458 : opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1095 458 : opts->compression_spec.level = 0;
1096 458 : opts->dumpSchema = true;
1097 458 : opts->dumpData = true;
1098 458 : opts->dumpStatistics = true;
1099 :
1100 458 : return opts;
1101 : }
1102 :
1103 : static void
1104 7752 : _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1105 : {
1106 7752 : RestoreOptions *ropt = AH->public.ropt;
1107 :
1108 : /* This hack is only needed in a data-only restore */
1109 7752 : if (ropt->dumpSchema || !ropt->disable_triggers)
1110 7690 : return;
1111 :
1112 62 : pg_log_info("disabling triggers for %s", te->tag);
1113 :
1114 : /*
1115 : * Become superuser if possible, since they are the only ones who can
1116 : * disable constraint triggers. If -S was not given, assume the initial
1117 : * user identity is a superuser. (XXX would it be better to become the
1118 : * table owner?)
1119 : */
1120 62 : _becomeUser(AH, ropt->superuser);
1121 :
1122 : /*
1123 : * Disable them.
1124 : */
1125 62 : ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1126 62 : fmtQualifiedId(te->namespace, te->tag));
1127 : }
1128 :
1129 : static void
1130 7750 : _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1131 : {
1132 7750 : RestoreOptions *ropt = AH->public.ropt;
1133 :
1134 : /* This hack is only needed in a data-only restore */
1135 7750 : if (ropt->dumpSchema || !ropt->disable_triggers)
1136 7688 : return;
1137 :
1138 62 : pg_log_info("enabling triggers for %s", te->tag);
1139 :
1140 : /*
1141 : * Become superuser if possible, since they are the only ones who can
1142 : * disable constraint triggers. If -S was not given, assume the initial
1143 : * user identity is a superuser. (XXX would it be better to become the
1144 : * table owner?)
1145 : */
1146 62 : _becomeUser(AH, ropt->superuser);
1147 :
1148 : /*
1149 : * Enable them.
1150 : */
1151 62 : ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1152 62 : fmtQualifiedId(te->namespace, te->tag));
1153 : }
1154 :
1155 : /*
1156 : * Detect whether a TABLE DATA TOC item is performing "load via partition
1157 : * root", that is the target table is an ancestor partition rather than the
1158 : * table the TOC item is nominally for.
1159 : *
1160 : * In newer archive files this can be detected by checking for a special
1161 : * comment placed in te->defn. In older files we have to fall back to seeing
1162 : * if the COPY statement targets the named table or some other one. This
1163 : * will not work for data dumped as INSERT commands, so we could give a false
1164 : * negative in that case; fortunately, that's a rarely-used option.
1165 : */
1166 : static bool
1167 32 : is_load_via_partition_root(TocEntry *te)
1168 : {
1169 32 : if (te->defn &&
1170 12 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
1171 12 : return true;
1172 20 : if (te->copyStmt && *te->copyStmt)
1173 : {
1174 12 : PQExpBuffer copyStmt = createPQExpBuffer();
1175 : bool result;
1176 :
1177 : /*
1178 : * Build the initial part of the COPY as it would appear if the
1179 : * nominal target table is the actual target. If we see anything
1180 : * else, it must be a load-via-partition-root case.
1181 : */
1182 12 : appendPQExpBuffer(copyStmt, "COPY %s ",
1183 12 : fmtQualifiedId(te->namespace, te->tag));
1184 12 : result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1185 12 : destroyPQExpBuffer(copyStmt);
1186 12 : return result;
1187 : }
1188 : /* Assume it's not load-via-partition-root */
1189 8 : return false;
1190 : }
1191 :
1192 : /*
1193 : * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1194 : */
1195 :
1196 : /* Public */
1197 : void
1198 3661008 : WriteData(Archive *AHX, const void *data, size_t dLen)
1199 : {
1200 3661008 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1201 :
1202 3661008 : if (!AH->currToc)
1203 0 : pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1204 :
1205 3661008 : AH->WriteDataPtr(AH, data, dLen);
1206 3661008 : }
1207 :
1208 : /*
1209 : * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1210 : * repository for all metadata. But the name has stuck.
1211 : *
1212 : * The new entry is added to the Archive's TOC list. Most callers can ignore
1213 : * the result value because nothing else need be done, but a few want to
1214 : * manipulate the TOC entry further.
1215 : */
1216 :
1217 : /* Public */
1218 : TocEntry *
1219 85506 : ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1220 : ArchiveOpts *opts)
1221 : {
1222 85506 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1223 : TocEntry *newToc;
1224 :
1225 85506 : newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1226 :
1227 85506 : AH->tocCount++;
1228 85506 : if (dumpId > AH->maxDumpId)
1229 30668 : AH->maxDumpId = dumpId;
1230 :
1231 85506 : newToc->prev = AH->toc->prev;
1232 85506 : newToc->next = AH->toc;
1233 85506 : AH->toc->prev->next = newToc;
1234 85506 : AH->toc->prev = newToc;
1235 :
1236 85506 : newToc->catalogId = catalogId;
1237 85506 : newToc->dumpId = dumpId;
1238 85506 : newToc->section = opts->section;
1239 :
1240 85506 : newToc->tag = pg_strdup(opts->tag);
1241 85506 : newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1242 85506 : newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1243 85506 : newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1244 85506 : newToc->relkind = opts->relkind;
1245 85506 : newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1246 85506 : newToc->desc = pg_strdup(opts->description);
1247 85506 : newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1248 85506 : newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1249 85506 : newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1250 :
1251 85506 : if (opts->nDeps > 0)
1252 : {
1253 37800 : newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1254 37800 : memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1255 37800 : newToc->nDeps = opts->nDeps;
1256 : }
1257 : else
1258 : {
1259 47706 : newToc->dependencies = NULL;
1260 47706 : newToc->nDeps = 0;
1261 : }
1262 :
1263 85506 : newToc->dataDumper = opts->dumpFn;
1264 85506 : newToc->dataDumperArg = opts->dumpArg;
1265 85506 : newToc->hadDumper = opts->dumpFn ? true : false;
1266 :
1267 85506 : newToc->formatData = NULL;
1268 85506 : newToc->dataLength = 0;
1269 :
1270 85506 : if (AH->ArchiveEntryPtr != NULL)
1271 13256 : AH->ArchiveEntryPtr(AH, newToc);
1272 :
1273 85506 : return newToc;
1274 : }
1275 :
1276 : /* Public */
1277 : void
1278 8 : PrintTOCSummary(Archive *AHX)
1279 : {
1280 8 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1281 8 : RestoreOptions *ropt = AH->public.ropt;
1282 : TocEntry *te;
1283 8 : pg_compress_specification out_compression_spec = {0};
1284 : teSection curSection;
1285 : CompressFileHandle *sav;
1286 : const char *fmtName;
1287 : char stamp_str[64];
1288 :
1289 : /* TOC is always uncompressed */
1290 8 : out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1291 :
1292 8 : sav = SaveOutput(AH);
1293 8 : if (ropt->filename)
1294 0 : SetOutput(AH, ropt->filename, out_compression_spec);
1295 :
1296 8 : if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1297 8 : localtime(&AH->createDate)) == 0)
1298 0 : strcpy(stamp_str, "[unknown]");
1299 :
1300 8 : ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1301 16 : ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1302 8 : sanitize_line(AH->archdbname, false),
1303 : AH->tocCount,
1304 : get_compress_algorithm_name(AH->compression_spec.algorithm));
1305 :
1306 8 : switch (AH->format)
1307 : {
1308 6 : case archCustom:
1309 6 : fmtName = "CUSTOM";
1310 6 : break;
1311 2 : case archDirectory:
1312 2 : fmtName = "DIRECTORY";
1313 2 : break;
1314 0 : case archTar:
1315 0 : fmtName = "TAR";
1316 0 : break;
1317 0 : default:
1318 0 : fmtName = "UNKNOWN";
1319 : }
1320 :
1321 8 : ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1322 8 : ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1323 8 : ahprintf(AH, "; Format: %s\n", fmtName);
1324 8 : ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1325 8 : ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1326 8 : if (AH->archiveRemoteVersion)
1327 8 : ahprintf(AH, "; Dumped from database version: %s\n",
1328 : AH->archiveRemoteVersion);
1329 8 : if (AH->archiveDumpVersion)
1330 8 : ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1331 : AH->archiveDumpVersion);
1332 :
1333 8 : ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1334 :
1335 8 : curSection = SECTION_PRE_DATA;
1336 2832 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1337 : {
1338 : /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1339 2824 : if (te->section != SECTION_NONE)
1340 2248 : curSection = te->section;
1341 2824 : te->reqs = _tocEntryRequired(te, curSection, AH);
1342 : /* Now, should we print it? */
1343 2824 : if (ropt->verbose ||
1344 2824 : (te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0)
1345 : {
1346 : char *sanitized_name;
1347 : char *sanitized_schema;
1348 : char *sanitized_owner;
1349 :
1350 : /*
1351 : */
1352 2784 : sanitized_name = sanitize_line(te->tag, false);
1353 2784 : sanitized_schema = sanitize_line(te->namespace, true);
1354 2784 : sanitized_owner = sanitize_line(te->owner, false);
1355 :
1356 2784 : ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1357 : te->catalogId.tableoid, te->catalogId.oid,
1358 : te->desc, sanitized_schema, sanitized_name,
1359 : sanitized_owner);
1360 :
1361 2784 : free(sanitized_name);
1362 2784 : free(sanitized_schema);
1363 2784 : free(sanitized_owner);
1364 : }
1365 2824 : if (ropt->verbose && te->nDeps > 0)
1366 : {
1367 : int i;
1368 :
1369 0 : ahprintf(AH, ";\tdepends on:");
1370 0 : for (i = 0; i < te->nDeps; i++)
1371 0 : ahprintf(AH, " %d", te->dependencies[i]);
1372 0 : ahprintf(AH, "\n");
1373 : }
1374 : }
1375 :
1376 : /* Enforce strict names checking */
1377 8 : if (ropt->strict_names)
1378 0 : StrictNamesCheck(ropt);
1379 :
1380 8 : if (ropt->filename)
1381 0 : RestoreOutput(AH, sav);
1382 8 : }
1383 :
1384 : /***********
1385 : * Large Object Archival
1386 : ***********/
1387 :
1388 : /* Called by a dumper to signal start of a LO */
1389 : int
1390 156 : StartLO(Archive *AHX, Oid oid)
1391 : {
1392 156 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1393 :
1394 156 : if (!AH->StartLOPtr)
1395 0 : pg_fatal("large-object output not supported in chosen format");
1396 :
1397 156 : AH->StartLOPtr(AH, AH->currToc, oid);
1398 :
1399 156 : return 1;
1400 : }
1401 :
1402 : /* Called by a dumper to signal end of a LO */
1403 : int
1404 156 : EndLO(Archive *AHX, Oid oid)
1405 : {
1406 156 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1407 :
1408 156 : if (AH->EndLOPtr)
1409 156 : AH->EndLOPtr(AH, AH->currToc, oid);
1410 :
1411 156 : return 1;
1412 : }
1413 :
1414 : /**********
1415 : * Large Object Restoration
1416 : **********/
1417 :
1418 : /*
1419 : * Called by a format handler before a group of LOs is restored
1420 : */
1421 : void
1422 32 : StartRestoreLOs(ArchiveHandle *AH)
1423 : {
1424 32 : RestoreOptions *ropt = AH->public.ropt;
1425 :
1426 : /*
1427 : * LOs must be restored within a transaction block, since we need the LO
1428 : * handle to stay open while we write it. Establish a transaction unless
1429 : * there's one being used globally.
1430 : */
1431 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1432 : {
1433 32 : if (AH->connection)
1434 0 : StartTransaction(&AH->public);
1435 : else
1436 32 : ahprintf(AH, "BEGIN;\n\n");
1437 : }
1438 :
1439 32 : AH->loCount = 0;
1440 32 : }
1441 :
1442 : /*
1443 : * Called by a format handler after a group of LOs is restored
1444 : */
1445 : void
1446 32 : EndRestoreLOs(ArchiveHandle *AH)
1447 : {
1448 32 : RestoreOptions *ropt = AH->public.ropt;
1449 :
1450 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1451 : {
1452 32 : if (AH->connection)
1453 0 : CommitTransaction(&AH->public);
1454 : else
1455 32 : ahprintf(AH, "COMMIT;\n\n");
1456 : }
1457 :
1458 32 : pg_log_info(ngettext("restored %d large object",
1459 : "restored %d large objects",
1460 : AH->loCount),
1461 : AH->loCount);
1462 32 : }
1463 :
1464 :
1465 : /*
1466 : * Called by a format handler to initiate restoration of a LO
1467 : */
1468 : void
1469 32 : StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1470 : {
1471 32 : bool old_lo_style = (AH->version < K_VERS_1_12);
1472 : Oid loOid;
1473 :
1474 32 : AH->loCount++;
1475 :
1476 : /* Initialize the LO Buffer */
1477 32 : if (AH->lo_buf == NULL)
1478 : {
1479 : /* First time through (in this process) so allocate the buffer */
1480 16 : AH->lo_buf_size = LOBBUFSIZE;
1481 16 : AH->lo_buf = pg_malloc(LOBBUFSIZE);
1482 : }
1483 32 : AH->lo_buf_used = 0;
1484 :
1485 32 : pg_log_info("restoring large object with OID %u", oid);
1486 :
1487 : /* With an old archive we must do drop and create logic here */
1488 32 : if (old_lo_style && drop)
1489 0 : DropLOIfExists(AH, oid);
1490 :
1491 32 : if (AH->connection)
1492 : {
1493 0 : if (old_lo_style)
1494 : {
1495 0 : loOid = lo_create(AH->connection, oid);
1496 0 : if (loOid == 0 || loOid != oid)
1497 0 : pg_fatal("could not create large object %u: %s",
1498 : oid, PQerrorMessage(AH->connection));
1499 : }
1500 0 : AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1501 0 : if (AH->loFd == -1)
1502 0 : pg_fatal("could not open large object %u: %s",
1503 : oid, PQerrorMessage(AH->connection));
1504 : }
1505 : else
1506 : {
1507 32 : if (old_lo_style)
1508 0 : ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1509 : oid, INV_WRITE);
1510 : else
1511 32 : ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1512 : oid, INV_WRITE);
1513 : }
1514 :
1515 32 : AH->writingLO = true;
1516 32 : }
1517 :
1518 : void
1519 32 : EndRestoreLO(ArchiveHandle *AH, Oid oid)
1520 : {
1521 32 : if (AH->lo_buf_used > 0)
1522 : {
1523 : /* Write remaining bytes from the LO buffer */
1524 16 : dump_lo_buf(AH);
1525 : }
1526 :
1527 32 : AH->writingLO = false;
1528 :
1529 32 : if (AH->connection)
1530 : {
1531 0 : lo_close(AH->connection, AH->loFd);
1532 0 : AH->loFd = -1;
1533 : }
1534 : else
1535 : {
1536 32 : ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1537 : }
1538 32 : }
1539 :
1540 : /***********
1541 : * Sorting and Reordering
1542 : ***********/
1543 :
1544 : void
1545 0 : SortTocFromFile(Archive *AHX)
1546 : {
1547 0 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1548 0 : RestoreOptions *ropt = AH->public.ropt;
1549 : FILE *fh;
1550 : StringInfoData linebuf;
1551 :
1552 : /* Allocate space for the 'wanted' array, and init it */
1553 0 : ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1554 :
1555 : /* Setup the file */
1556 0 : fh = fopen(ropt->tocFile, PG_BINARY_R);
1557 0 : if (!fh)
1558 0 : pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1559 :
1560 0 : initStringInfo(&linebuf);
1561 :
1562 0 : while (pg_get_line_buf(fh, &linebuf))
1563 : {
1564 : char *cmnt;
1565 : char *endptr;
1566 : DumpId id;
1567 : TocEntry *te;
1568 :
1569 : /* Truncate line at comment, if any */
1570 0 : cmnt = strchr(linebuf.data, ';');
1571 0 : if (cmnt != NULL)
1572 : {
1573 0 : cmnt[0] = '\0';
1574 0 : linebuf.len = cmnt - linebuf.data;
1575 : }
1576 :
1577 : /* Ignore if all blank */
1578 0 : if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1579 0 : continue;
1580 :
1581 : /* Get an ID, check it's valid and not already seen */
1582 0 : id = strtol(linebuf.data, &endptr, 10);
1583 0 : if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1584 0 : ropt->idWanted[id - 1])
1585 : {
1586 0 : pg_log_warning("line ignored: %s", linebuf.data);
1587 0 : continue;
1588 : }
1589 :
1590 : /* Find TOC entry */
1591 0 : te = getTocEntryByDumpId(AH, id);
1592 0 : if (!te)
1593 0 : pg_fatal("could not find entry for ID %d",
1594 : id);
1595 :
1596 : /* Mark it wanted */
1597 0 : ropt->idWanted[id - 1] = true;
1598 :
1599 : /*
1600 : * Move each item to the end of the list as it is selected, so that
1601 : * they are placed in the desired order. Any unwanted items will end
1602 : * up at the front of the list, which may seem unintuitive but it's
1603 : * what we need. In an ordinary serial restore that makes no
1604 : * difference, but in a parallel restore we need to mark unrestored
1605 : * items' dependencies as satisfied before we start examining
1606 : * restorable items. Otherwise they could have surprising
1607 : * side-effects on the order in which restorable items actually get
1608 : * restored.
1609 : */
1610 0 : _moveBefore(AH->toc, te);
1611 : }
1612 :
1613 0 : pg_free(linebuf.data);
1614 :
1615 0 : if (fclose(fh) != 0)
1616 0 : pg_fatal("could not close TOC file: %m");
1617 0 : }
1618 :
1619 : /**********************
1620 : * Convenience functions that look like standard IO functions
1621 : * for writing data when in dump mode.
1622 : **********************/
1623 :
1624 : /* Public */
1625 : void
1626 46048 : archputs(const char *s, Archive *AH)
1627 : {
1628 46048 : WriteData(AH, s, strlen(s));
1629 46048 : }
1630 :
1631 : /* Public */
1632 : int
1633 7560 : archprintf(Archive *AH, const char *fmt,...)
1634 : {
1635 7560 : int save_errno = errno;
1636 : char *p;
1637 7560 : size_t len = 128; /* initial assumption about buffer size */
1638 : size_t cnt;
1639 :
1640 : for (;;)
1641 0 : {
1642 : va_list args;
1643 :
1644 : /* Allocate work buffer. */
1645 7560 : p = (char *) pg_malloc(len);
1646 :
1647 : /* Try to format the data. */
1648 7560 : errno = save_errno;
1649 7560 : va_start(args, fmt);
1650 7560 : cnt = pvsnprintf(p, len, fmt, args);
1651 7560 : va_end(args);
1652 :
1653 7560 : if (cnt < len)
1654 7560 : break; /* success */
1655 :
1656 : /* Release buffer and loop around to try again with larger len. */
1657 0 : free(p);
1658 0 : len = cnt;
1659 : }
1660 :
1661 7560 : WriteData(AH, p, cnt);
1662 7560 : free(p);
1663 7560 : return (int) cnt;
1664 : }
1665 :
1666 :
1667 : /*******************************
1668 : * Stuff below here should be 'private' to the archiver routines
1669 : *******************************/
1670 :
1671 : static void
1672 276 : SetOutput(ArchiveHandle *AH, const char *filename,
1673 : const pg_compress_specification compression_spec)
1674 : {
1675 : CompressFileHandle *CFH;
1676 : const char *mode;
1677 276 : int fn = -1;
1678 :
1679 276 : if (filename)
1680 : {
1681 276 : if (strcmp(filename, "-") == 0)
1682 0 : fn = fileno(stdout);
1683 : }
1684 0 : else if (AH->FH)
1685 0 : fn = fileno(AH->FH);
1686 0 : else if (AH->fSpec)
1687 : {
1688 0 : filename = AH->fSpec;
1689 : }
1690 : else
1691 0 : fn = fileno(stdout);
1692 :
1693 276 : if (AH->mode == archModeAppend)
1694 86 : mode = PG_BINARY_A;
1695 : else
1696 190 : mode = PG_BINARY_W;
1697 :
1698 276 : CFH = InitCompressFileHandle(compression_spec);
1699 :
1700 276 : if (!CFH->open_func(filename, fn, mode, CFH))
1701 : {
1702 0 : if (filename)
1703 0 : pg_fatal("could not open output file \"%s\": %m", filename);
1704 : else
1705 0 : pg_fatal("could not open output file: %m");
1706 : }
1707 :
1708 276 : AH->OF = CFH;
1709 276 : }
1710 :
1711 : static CompressFileHandle *
1712 338 : SaveOutput(ArchiveHandle *AH)
1713 : {
1714 338 : return (CompressFileHandle *) AH->OF;
1715 : }
1716 :
1717 : static void
1718 276 : RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
1719 : {
1720 276 : errno = 0;
1721 276 : if (!EndCompressFileHandle(AH->OF))
1722 0 : pg_fatal("could not close output file: %m");
1723 :
1724 276 : AH->OF = savedOutput;
1725 276 : }
1726 :
1727 :
1728 :
1729 : /*
1730 : * Print formatted text to the output file (usually stdout).
1731 : */
1732 : int
1733 446322 : ahprintf(ArchiveHandle *AH, const char *fmt,...)
1734 : {
1735 446322 : int save_errno = errno;
1736 : char *p;
1737 446322 : size_t len = 128; /* initial assumption about buffer size */
1738 : size_t cnt;
1739 :
1740 : for (;;)
1741 32360 : {
1742 : va_list args;
1743 :
1744 : /* Allocate work buffer. */
1745 478682 : p = (char *) pg_malloc(len);
1746 :
1747 : /* Try to format the data. */
1748 478682 : errno = save_errno;
1749 478682 : va_start(args, fmt);
1750 478682 : cnt = pvsnprintf(p, len, fmt, args);
1751 478682 : va_end(args);
1752 :
1753 478682 : if (cnt < len)
1754 446322 : break; /* success */
1755 :
1756 : /* Release buffer and loop around to try again with larger len. */
1757 32360 : free(p);
1758 32360 : len = cnt;
1759 : }
1760 :
1761 446322 : ahwrite(p, 1, cnt, AH);
1762 446322 : free(p);
1763 446322 : return (int) cnt;
1764 : }
1765 :
1766 : /*
1767 : * Single place for logic which says 'We are restoring to a direct DB connection'.
1768 : */
1769 : static int
1770 4061756 : RestoringToDB(ArchiveHandle *AH)
1771 : {
1772 4061756 : RestoreOptions *ropt = AH->public.ropt;
1773 :
1774 4061756 : return (ropt && ropt->useDB && AH->connection);
1775 : }
1776 :
1777 : /*
1778 : * Dump the current contents of the LO data buffer while writing a LO
1779 : */
1780 : static void
1781 16 : dump_lo_buf(ArchiveHandle *AH)
1782 : {
1783 16 : if (AH->connection)
1784 : {
1785 : int res;
1786 :
1787 0 : res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1788 0 : pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1789 : "wrote %zu bytes of large object data (result = %d)",
1790 : AH->lo_buf_used),
1791 : AH->lo_buf_used, res);
1792 : /* We assume there are no short writes, only errors */
1793 0 : if (res != AH->lo_buf_used)
1794 0 : warn_or_exit_horribly(AH, "could not write to large object: %s",
1795 0 : PQerrorMessage(AH->connection));
1796 : }
1797 : else
1798 : {
1799 16 : PQExpBuffer buf = createPQExpBuffer();
1800 :
1801 16 : appendByteaLiteralAHX(buf,
1802 : (const unsigned char *) AH->lo_buf,
1803 : AH->lo_buf_used,
1804 : AH);
1805 :
1806 : /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1807 16 : AH->writingLO = false;
1808 16 : ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1809 16 : AH->writingLO = true;
1810 :
1811 16 : destroyPQExpBuffer(buf);
1812 : }
1813 16 : AH->lo_buf_used = 0;
1814 16 : }
1815 :
1816 :
1817 : /*
1818 : * Write buffer to the output file (usually stdout). This is used for
1819 : * outputting 'restore' scripts etc. It is even possible for an archive
1820 : * format to create a custom output routine to 'fake' a restore if it
1821 : * wants to generate a script (see TAR output).
1822 : */
1823 : void
1824 4057258 : ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1825 : {
1826 4057258 : int bytes_written = 0;
1827 :
1828 4057258 : if (AH->writingLO)
1829 : {
1830 26 : size_t remaining = size * nmemb;
1831 :
1832 26 : while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1833 : {
1834 0 : size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1835 :
1836 0 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1837 0 : ptr = (const char *) ptr + avail;
1838 0 : remaining -= avail;
1839 0 : AH->lo_buf_used += avail;
1840 0 : dump_lo_buf(AH);
1841 : }
1842 :
1843 26 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1844 26 : AH->lo_buf_used += remaining;
1845 :
1846 26 : bytes_written = size * nmemb;
1847 : }
1848 4057232 : else if (AH->CustomOutPtr)
1849 4276 : bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1850 :
1851 : /*
1852 : * If we're doing a restore, and it's direct to DB, and we're connected
1853 : * then send it to the DB.
1854 : */
1855 4052956 : else if (RestoringToDB(AH))
1856 10636 : bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1857 : else
1858 : {
1859 4042320 : CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1860 :
1861 4042320 : if (CFH->write_func(ptr, size * nmemb, CFH))
1862 4042320 : bytes_written = size * nmemb;
1863 : }
1864 :
1865 4057258 : if (bytes_written != size * nmemb)
1866 0 : WRITE_ERROR_EXIT;
1867 4057258 : }
1868 :
1869 : /* on some error, we may decide to go on... */
1870 : void
1871 0 : warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1872 : {
1873 : va_list ap;
1874 :
1875 0 : switch (AH->stage)
1876 : {
1877 :
1878 0 : case STAGE_NONE:
1879 : /* Do nothing special */
1880 0 : break;
1881 :
1882 0 : case STAGE_INITIALIZING:
1883 0 : if (AH->stage != AH->lastErrorStage)
1884 0 : pg_log_info("while INITIALIZING:");
1885 0 : break;
1886 :
1887 0 : case STAGE_PROCESSING:
1888 0 : if (AH->stage != AH->lastErrorStage)
1889 0 : pg_log_info("while PROCESSING TOC:");
1890 0 : break;
1891 :
1892 0 : case STAGE_FINALIZING:
1893 0 : if (AH->stage != AH->lastErrorStage)
1894 0 : pg_log_info("while FINALIZING:");
1895 0 : break;
1896 : }
1897 0 : if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1898 : {
1899 0 : pg_log_info("from TOC entry %d; %u %u %s %s %s",
1900 : AH->currentTE->dumpId,
1901 : AH->currentTE->catalogId.tableoid,
1902 : AH->currentTE->catalogId.oid,
1903 : AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1904 : AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1905 : AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1906 : }
1907 0 : AH->lastErrorStage = AH->stage;
1908 0 : AH->lastErrorTE = AH->currentTE;
1909 :
1910 0 : va_start(ap, fmt);
1911 0 : pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, ap);
1912 0 : va_end(ap);
1913 :
1914 0 : if (AH->public.exit_on_error)
1915 0 : exit_nicely(1);
1916 : else
1917 0 : AH->public.n_errors++;
1918 0 : }
1919 :
1920 : #ifdef NOT_USED
1921 :
1922 : static void
1923 : _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1924 : {
1925 : /* Unlink te from list */
1926 : te->prev->next = te->next;
1927 : te->next->prev = te->prev;
1928 :
1929 : /* and insert it after "pos" */
1930 : te->prev = pos;
1931 : te->next = pos->next;
1932 : pos->next->prev = te;
1933 : pos->next = te;
1934 : }
1935 : #endif
1936 :
1937 : static void
1938 0 : _moveBefore(TocEntry *pos, TocEntry *te)
1939 : {
1940 : /* Unlink te from list */
1941 0 : te->prev->next = te->next;
1942 0 : te->next->prev = te->prev;
1943 :
1944 : /* and insert it before "pos" */
1945 0 : te->prev = pos->prev;
1946 0 : te->next = pos;
1947 0 : pos->prev->next = te;
1948 0 : pos->prev = te;
1949 0 : }
1950 :
1951 : /*
1952 : * Build index arrays for the TOC list
1953 : *
1954 : * This should be invoked only after we have created or read in all the TOC
1955 : * items.
1956 : *
1957 : * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1958 : * array entries run only up to maxDumpId. We might see dependency dump IDs
1959 : * beyond that (if the dump was partial); so always check the array bound
1960 : * before trying to touch an array entry.
1961 : */
1962 : static void
1963 370 : buildTocEntryArrays(ArchiveHandle *AH)
1964 : {
1965 370 : DumpId maxDumpId = AH->maxDumpId;
1966 : TocEntry *te;
1967 :
1968 370 : AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1969 370 : AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1970 :
1971 99166 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1972 : {
1973 : /* this check is purely paranoia, maxDumpId should be correct */
1974 98796 : if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1975 0 : pg_fatal("bad dumpId");
1976 :
1977 : /* tocsByDumpId indexes all TOCs by their dump ID */
1978 98796 : AH->tocsByDumpId[te->dumpId] = te;
1979 :
1980 : /*
1981 : * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1982 : * TOC entry that has a DATA item. We compute this by reversing the
1983 : * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1984 : * just one dependency and it is the TABLE item.
1985 : */
1986 98796 : if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1987 : {
1988 8452 : DumpId tableId = te->dependencies[0];
1989 :
1990 : /*
1991 : * The TABLE item might not have been in the archive, if this was
1992 : * a data-only dump; but its dump ID should be less than its data
1993 : * item's dump ID, so there should be a place for it in the array.
1994 : */
1995 8452 : if (tableId <= 0 || tableId > maxDumpId)
1996 0 : pg_fatal("bad table dumpId for TABLE DATA item");
1997 :
1998 8452 : AH->tableDataId[tableId] = te->dumpId;
1999 : }
2000 : }
2001 370 : }
2002 :
2003 : TocEntry *
2004 21952 : getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
2005 : {
2006 : /* build index arrays if we didn't already */
2007 21952 : if (AH->tocsByDumpId == NULL)
2008 56 : buildTocEntryArrays(AH);
2009 :
2010 21952 : if (id > 0 && id <= AH->maxDumpId)
2011 21952 : return AH->tocsByDumpId[id];
2012 :
2013 0 : return NULL;
2014 : }
2015 :
2016 : int
2017 21406 : TocIDRequired(ArchiveHandle *AH, DumpId id)
2018 : {
2019 21406 : TocEntry *te = getTocEntryByDumpId(AH, id);
2020 :
2021 21406 : if (!te)
2022 10072 : return 0;
2023 :
2024 11334 : return te->reqs;
2025 : }
2026 :
2027 : size_t
2028 18404 : WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
2029 : {
2030 : int off;
2031 :
2032 : /* Save the flag */
2033 18404 : AH->WriteBytePtr(AH, wasSet);
2034 :
2035 : /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2036 165636 : for (off = 0; off < sizeof(pgoff_t); off++)
2037 : {
2038 147232 : AH->WriteBytePtr(AH, o & 0xFF);
2039 147232 : o >>= 8;
2040 : }
2041 18404 : return sizeof(pgoff_t) + 1;
2042 : }
2043 :
2044 : int
2045 11526 : ReadOffset(ArchiveHandle *AH, pgoff_t * o)
2046 : {
2047 : int i;
2048 : int off;
2049 : int offsetFlg;
2050 :
2051 : /* Initialize to zero */
2052 11526 : *o = 0;
2053 :
2054 : /* Check for old version */
2055 11526 : if (AH->version < K_VERS_1_7)
2056 : {
2057 : /* Prior versions wrote offsets using WriteInt */
2058 0 : i = ReadInt(AH);
2059 : /* -1 means not set */
2060 0 : if (i < 0)
2061 0 : return K_OFFSET_POS_NOT_SET;
2062 0 : else if (i == 0)
2063 0 : return K_OFFSET_NO_DATA;
2064 :
2065 : /* Cast to pgoff_t because it was written as an int. */
2066 0 : *o = (pgoff_t) i;
2067 0 : return K_OFFSET_POS_SET;
2068 : }
2069 :
2070 : /*
2071 : * Read the flag indicating the state of the data pointer. Check if valid
2072 : * and die if not.
2073 : *
2074 : * This used to be handled by a negative or zero pointer, now we use an
2075 : * extra byte specifically for the state.
2076 : */
2077 11526 : offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2078 :
2079 11526 : switch (offsetFlg)
2080 : {
2081 11526 : case K_OFFSET_POS_NOT_SET:
2082 : case K_OFFSET_NO_DATA:
2083 : case K_OFFSET_POS_SET:
2084 :
2085 11526 : break;
2086 :
2087 0 : default:
2088 0 : pg_fatal("unexpected data offset flag %d", offsetFlg);
2089 : }
2090 :
2091 : /*
2092 : * Read the bytes
2093 : */
2094 103734 : for (off = 0; off < AH->offSize; off++)
2095 : {
2096 92208 : if (off < sizeof(pgoff_t))
2097 92208 : *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2098 : else
2099 : {
2100 0 : if (AH->ReadBytePtr(AH) != 0)
2101 0 : pg_fatal("file offset in dump file is too large");
2102 : }
2103 : }
2104 :
2105 11526 : return offsetFlg;
2106 : }
2107 :
2108 : size_t
2109 423520 : WriteInt(ArchiveHandle *AH, int i)
2110 : {
2111 : int b;
2112 :
2113 : /*
2114 : * This is a bit yucky, but I don't want to make the binary format very
2115 : * dependent on representation, and not knowing much about it, I write out
2116 : * a sign byte. If you change this, don't forget to change the file
2117 : * version #, and modify ReadInt to read the new format AS WELL AS the old
2118 : * formats.
2119 : */
2120 :
2121 : /* SIGN byte */
2122 423520 : if (i < 0)
2123 : {
2124 100188 : AH->WriteBytePtr(AH, 1);
2125 100188 : i = -i;
2126 : }
2127 : else
2128 323332 : AH->WriteBytePtr(AH, 0);
2129 :
2130 2117600 : for (b = 0; b < AH->intSize; b++)
2131 : {
2132 1694080 : AH->WriteBytePtr(AH, i & 0xFF);
2133 1694080 : i >>= 8;
2134 : }
2135 :
2136 423520 : return AH->intSize + 1;
2137 : }
2138 :
2139 : int
2140 310048 : ReadInt(ArchiveHandle *AH)
2141 : {
2142 310048 : int res = 0;
2143 : int bv,
2144 : b;
2145 310048 : int sign = 0; /* Default positive */
2146 310048 : int bitShift = 0;
2147 :
2148 310048 : if (AH->version > K_VERS_1_0)
2149 : /* Read a sign byte */
2150 310048 : sign = AH->ReadBytePtr(AH);
2151 :
2152 1550240 : for (b = 0; b < AH->intSize; b++)
2153 : {
2154 1240192 : bv = AH->ReadBytePtr(AH) & 0xFF;
2155 1240192 : if (bv != 0)
2156 291674 : res = res + (bv << bitShift);
2157 1240192 : bitShift += 8;
2158 : }
2159 :
2160 310048 : if (sign)
2161 73588 : res = -res;
2162 :
2163 310048 : return res;
2164 : }
2165 :
2166 : size_t
2167 332364 : WriteStr(ArchiveHandle *AH, const char *c)
2168 : {
2169 : size_t res;
2170 :
2171 332364 : if (c)
2172 : {
2173 232176 : int len = strlen(c);
2174 :
2175 232176 : res = WriteInt(AH, len);
2176 232176 : AH->WriteBufPtr(AH, c, len);
2177 232176 : res += len;
2178 : }
2179 : else
2180 100188 : res = WriteInt(AH, -1);
2181 :
2182 332364 : return res;
2183 : }
2184 :
2185 : char *
2186 243496 : ReadStr(ArchiveHandle *AH)
2187 : {
2188 : char *buf;
2189 : int l;
2190 :
2191 243496 : l = ReadInt(AH);
2192 243496 : if (l < 0)
2193 73588 : buf = NULL;
2194 : else
2195 : {
2196 169908 : buf = (char *) pg_malloc(l + 1);
2197 169908 : AH->ReadBufPtr(AH, buf, l);
2198 :
2199 169908 : buf[l] = '\0';
2200 : }
2201 :
2202 243496 : return buf;
2203 : }
2204 :
2205 : static bool
2206 22 : _fileExistsInDirectory(const char *dir, const char *filename)
2207 : {
2208 : struct stat st;
2209 : char buf[MAXPGPATH];
2210 :
2211 22 : if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2212 0 : pg_fatal("directory name too long: \"%s\"", dir);
2213 :
2214 22 : return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2215 : }
2216 :
2217 : static int
2218 60 : _discoverArchiveFormat(ArchiveHandle *AH)
2219 : {
2220 : FILE *fh;
2221 : char sig[6]; /* More than enough */
2222 : size_t cnt;
2223 60 : int wantClose = 0;
2224 :
2225 60 : pg_log_debug("attempting to ascertain archive format");
2226 :
2227 60 : free(AH->lookahead);
2228 :
2229 60 : AH->readHeader = 0;
2230 60 : AH->lookaheadSize = 512;
2231 60 : AH->lookahead = pg_malloc0(512);
2232 60 : AH->lookaheadLen = 0;
2233 60 : AH->lookaheadPos = 0;
2234 :
2235 60 : if (AH->fSpec)
2236 : {
2237 : struct stat st;
2238 :
2239 60 : wantClose = 1;
2240 :
2241 : /*
2242 : * Check if the specified archive is a directory. If so, check if
2243 : * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2244 : */
2245 60 : if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2246 : {
2247 22 : AH->format = archDirectory;
2248 22 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2249 22 : return AH->format;
2250 : #ifdef HAVE_LIBZ
2251 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2252 0 : return AH->format;
2253 : #endif
2254 : #ifdef USE_LZ4
2255 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2256 0 : return AH->format;
2257 : #endif
2258 : #ifdef USE_ZSTD
2259 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2260 : return AH->format;
2261 : #endif
2262 0 : pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2263 : AH->fSpec);
2264 : fh = NULL; /* keep compiler quiet */
2265 : }
2266 : else
2267 : {
2268 38 : fh = fopen(AH->fSpec, PG_BINARY_R);
2269 38 : if (!fh)
2270 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2271 : }
2272 : }
2273 : else
2274 : {
2275 0 : fh = stdin;
2276 0 : if (!fh)
2277 0 : pg_fatal("could not open input file: %m");
2278 : }
2279 :
2280 38 : if ((cnt = fread(sig, 1, 5, fh)) != 5)
2281 : {
2282 0 : if (ferror(fh))
2283 0 : pg_fatal("could not read input file: %m");
2284 : else
2285 0 : pg_fatal("input file is too short (read %lu, expected 5)",
2286 : (unsigned long) cnt);
2287 : }
2288 :
2289 : /* Save it, just in case we need it later */
2290 38 : memcpy(&AH->lookahead[0], sig, 5);
2291 38 : AH->lookaheadLen = 5;
2292 :
2293 38 : if (strncmp(sig, "PGDMP", 5) == 0)
2294 : {
2295 : /* It's custom format, stop here */
2296 36 : AH->format = archCustom;
2297 36 : AH->readHeader = 1;
2298 : }
2299 : else
2300 : {
2301 : /*
2302 : * *Maybe* we have a tar archive format file or a text dump ... So,
2303 : * read first 512 byte header...
2304 : */
2305 2 : cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2306 : /* read failure is checked below */
2307 2 : AH->lookaheadLen += cnt;
2308 :
2309 2 : if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2310 2 : (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2311 2 : strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2312 : {
2313 : /*
2314 : * looks like it's probably a text format dump. so suggest they
2315 : * try psql
2316 : */
2317 0 : pg_fatal("input file appears to be a text format dump. Please use psql.");
2318 : }
2319 :
2320 2 : if (AH->lookaheadLen != 512)
2321 : {
2322 0 : if (feof(fh))
2323 0 : pg_fatal("input file does not appear to be a valid archive (too short?)");
2324 : else
2325 0 : READ_ERROR_EXIT(fh);
2326 : }
2327 :
2328 2 : if (!isValidTarHeader(AH->lookahead))
2329 0 : pg_fatal("input file does not appear to be a valid archive");
2330 :
2331 2 : AH->format = archTar;
2332 : }
2333 :
2334 : /* Close the file if we opened it */
2335 38 : if (wantClose)
2336 : {
2337 38 : if (fclose(fh) != 0)
2338 0 : pg_fatal("could not close input file: %m");
2339 : /* Forget lookahead, since we'll re-read header after re-opening */
2340 38 : AH->readHeader = 0;
2341 38 : AH->lookaheadLen = 0;
2342 : }
2343 :
2344 38 : return AH->format;
2345 : }
2346 :
2347 :
2348 : /*
2349 : * Allocate an archive handle
2350 : */
2351 : static ArchiveHandle *
2352 440 : _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2353 : const pg_compress_specification compression_spec,
2354 : bool dosync, ArchiveMode mode,
2355 : SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
2356 : {
2357 : ArchiveHandle *AH;
2358 : CompressFileHandle *CFH;
2359 440 : pg_compress_specification out_compress_spec = {0};
2360 :
2361 440 : pg_log_debug("allocating AH for %s, format %d",
2362 : FileSpec ? FileSpec : "(stdio)", fmt);
2363 :
2364 440 : AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2365 :
2366 440 : AH->version = K_VERS_SELF;
2367 :
2368 : /* initialize for backwards compatible string processing */
2369 440 : AH->public.encoding = 0; /* PG_SQL_ASCII */
2370 440 : AH->public.std_strings = false;
2371 :
2372 : /* sql error handling */
2373 440 : AH->public.exit_on_error = true;
2374 440 : AH->public.n_errors = 0;
2375 :
2376 440 : AH->archiveDumpVersion = PG_VERSION;
2377 :
2378 440 : AH->createDate = time(NULL);
2379 :
2380 440 : AH->intSize = sizeof(int);
2381 440 : AH->offSize = sizeof(pgoff_t);
2382 440 : if (FileSpec)
2383 : {
2384 390 : AH->fSpec = pg_strdup(FileSpec);
2385 :
2386 : /*
2387 : * Not used; maybe later....
2388 : *
2389 : * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2390 : * i--) if (AH->workDir[i-1] == '/')
2391 : */
2392 : }
2393 : else
2394 50 : AH->fSpec = NULL;
2395 :
2396 440 : AH->currUser = NULL; /* unknown */
2397 440 : AH->currSchema = NULL; /* ditto */
2398 440 : AH->currTablespace = NULL; /* ditto */
2399 440 : AH->currTableAm = NULL; /* ditto */
2400 :
2401 440 : AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2402 :
2403 440 : AH->toc->next = AH->toc;
2404 440 : AH->toc->prev = AH->toc;
2405 :
2406 440 : AH->mode = mode;
2407 440 : AH->compression_spec = compression_spec;
2408 440 : AH->dosync = dosync;
2409 440 : AH->sync_method = sync_method;
2410 :
2411 440 : memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2412 :
2413 : /* Open stdout with no compression for AH output handle */
2414 440 : out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2415 440 : CFH = InitCompressFileHandle(out_compress_spec);
2416 440 : if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2417 0 : pg_fatal("could not open stdout for appending: %m");
2418 440 : AH->OF = CFH;
2419 :
2420 : /*
2421 : * On Windows, we need to use binary mode to read/write non-text files,
2422 : * which include all archive formats as well as compressed plain text.
2423 : * Force stdin/stdout into binary mode if that is what we are using.
2424 : */
2425 : #ifdef WIN32
2426 : if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2427 : (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2428 : {
2429 : if (mode == archModeWrite)
2430 : _setmode(fileno(stdout), O_BINARY);
2431 : else
2432 : _setmode(fileno(stdin), O_BINARY);
2433 : }
2434 : #endif
2435 :
2436 440 : AH->SetupWorkerPtr = setupWorkerPtr;
2437 :
2438 440 : if (fmt == archUnknown)
2439 60 : AH->format = _discoverArchiveFormat(AH);
2440 : else
2441 380 : AH->format = fmt;
2442 :
2443 440 : switch (AH->format)
2444 : {
2445 94 : case archCustom:
2446 94 : InitArchiveFmt_Custom(AH);
2447 94 : break;
2448 :
2449 292 : case archNull:
2450 292 : InitArchiveFmt_Null(AH);
2451 292 : break;
2452 :
2453 44 : case archDirectory:
2454 44 : InitArchiveFmt_Directory(AH);
2455 44 : break;
2456 :
2457 10 : case archTar:
2458 10 : InitArchiveFmt_Tar(AH);
2459 8 : break;
2460 :
2461 0 : default:
2462 0 : pg_fatal("unrecognized file format \"%d\"", fmt);
2463 : }
2464 :
2465 438 : return AH;
2466 : }
2467 :
2468 : /*
2469 : * Write out all data (tables & LOs)
2470 : */
2471 : void
2472 66 : WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2473 : {
2474 : TocEntry *te;
2475 :
2476 66 : if (pstate && pstate->numWorkers > 1)
2477 16 : {
2478 : /*
2479 : * In parallel mode, this code runs in the leader process. We
2480 : * construct an array of candidate TEs, then sort it into decreasing
2481 : * size order, then dispatch each TE to a data-transfer worker. By
2482 : * dumping larger tables first, we avoid getting into a situation
2483 : * where we're down to one job and it's big, losing parallelism.
2484 : */
2485 : TocEntry **tes;
2486 : int ntes;
2487 :
2488 16 : tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2489 16 : ntes = 0;
2490 2552 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2491 : {
2492 : /* Consider only TEs with dataDumper functions ... */
2493 2536 : if (!te->dataDumper)
2494 2294 : continue;
2495 : /* ... and ignore ones not enabled for dump */
2496 242 : if ((te->reqs & REQ_DATA) == 0)
2497 0 : continue;
2498 :
2499 242 : tes[ntes++] = te;
2500 : }
2501 :
2502 16 : if (ntes > 1)
2503 14 : qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2504 :
2505 258 : for (int i = 0; i < ntes; i++)
2506 242 : DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2507 : mark_dump_job_done, NULL);
2508 :
2509 16 : pg_free(tes);
2510 :
2511 : /* Now wait for workers to finish. */
2512 16 : WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2513 : }
2514 : else
2515 : {
2516 : /* Non-parallel mode: just dump all candidate TEs sequentially. */
2517 10770 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2518 : {
2519 : /* Must have same filter conditions as above */
2520 10720 : if (!te->dataDumper)
2521 10342 : continue;
2522 378 : if ((te->reqs & REQ_DATA) == 0)
2523 6 : continue;
2524 :
2525 372 : WriteDataChunksForTocEntry(AH, te);
2526 : }
2527 : }
2528 66 : }
2529 :
2530 :
2531 : /*
2532 : * Callback function that's invoked in the leader process after a step has
2533 : * been parallel dumped.
2534 : *
2535 : * We don't need to do anything except check for worker failure.
2536 : */
2537 : static void
2538 242 : mark_dump_job_done(ArchiveHandle *AH,
2539 : TocEntry *te,
2540 : int status,
2541 : void *callback_data)
2542 : {
2543 242 : pg_log_info("finished item %d %s %s",
2544 : te->dumpId, te->desc, te->tag);
2545 :
2546 242 : if (status != 0)
2547 0 : pg_fatal("worker process failed: exit code %d",
2548 : status);
2549 242 : }
2550 :
2551 :
2552 : void
2553 614 : WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2554 : {
2555 : StartDataPtrType startPtr;
2556 : EndDataPtrType endPtr;
2557 :
2558 614 : AH->currToc = te;
2559 :
2560 614 : if (strcmp(te->desc, "BLOBS") == 0)
2561 : {
2562 32 : startPtr = AH->StartLOsPtr;
2563 32 : endPtr = AH->EndLOsPtr;
2564 : }
2565 : else
2566 : {
2567 582 : startPtr = AH->StartDataPtr;
2568 582 : endPtr = AH->EndDataPtr;
2569 : }
2570 :
2571 614 : if (startPtr != NULL)
2572 614 : (*startPtr) (AH, te);
2573 :
2574 : /*
2575 : * The user-provided DataDumper routine needs to call AH->WriteData
2576 : */
2577 614 : te->dataDumper((Archive *) AH, te->dataDumperArg);
2578 :
2579 614 : if (endPtr != NULL)
2580 614 : (*endPtr) (AH, te);
2581 :
2582 614 : AH->currToc = NULL;
2583 614 : }
2584 :
2585 : void
2586 108 : WriteToc(ArchiveHandle *AH)
2587 : {
2588 : TocEntry *te;
2589 : char workbuf[32];
2590 : int tocCount;
2591 : int i;
2592 :
2593 : /* count entries that will actually be dumped */
2594 108 : tocCount = 0;
2595 22572 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2596 : {
2597 22464 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) != 0)
2598 22452 : tocCount++;
2599 : }
2600 :
2601 : /* printf("%d TOC Entries to save\n", tocCount); */
2602 :
2603 108 : WriteInt(AH, tocCount);
2604 :
2605 22572 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2606 : {
2607 22464 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) == 0)
2608 12 : continue;
2609 :
2610 22452 : WriteInt(AH, te->dumpId);
2611 22452 : WriteInt(AH, te->dataDumper ? 1 : 0);
2612 :
2613 : /* OID is recorded as a string for historical reasons */
2614 22452 : sprintf(workbuf, "%u", te->catalogId.tableoid);
2615 22452 : WriteStr(AH, workbuf);
2616 22452 : sprintf(workbuf, "%u", te->catalogId.oid);
2617 22452 : WriteStr(AH, workbuf);
2618 :
2619 22452 : WriteStr(AH, te->tag);
2620 22452 : WriteStr(AH, te->desc);
2621 22452 : WriteInt(AH, te->section);
2622 22452 : WriteStr(AH, te->defn);
2623 22452 : WriteStr(AH, te->dropStmt);
2624 22452 : WriteStr(AH, te->copyStmt);
2625 22452 : WriteStr(AH, te->namespace);
2626 22452 : WriteStr(AH, te->tablespace);
2627 22452 : WriteStr(AH, te->tableam);
2628 22452 : WriteInt(AH, te->relkind);
2629 22452 : WriteStr(AH, te->owner);
2630 22452 : WriteStr(AH, "false");
2631 :
2632 : /* Dump list of dependencies */
2633 58694 : for (i = 0; i < te->nDeps; i++)
2634 : {
2635 36242 : sprintf(workbuf, "%d", te->dependencies[i]);
2636 36242 : WriteStr(AH, workbuf);
2637 : }
2638 22452 : WriteStr(AH, NULL); /* Terminate List */
2639 :
2640 22452 : if (AH->WriteExtraTocPtr)
2641 22452 : AH->WriteExtraTocPtr(AH, te);
2642 : }
2643 108 : }
2644 :
2645 : void
2646 80 : ReadToc(ArchiveHandle *AH)
2647 : {
2648 : int i;
2649 : char *tmp;
2650 : DumpId *deps;
2651 : int depIdx;
2652 : int depSize;
2653 : TocEntry *te;
2654 : bool is_supported;
2655 :
2656 80 : AH->tocCount = ReadInt(AH);
2657 80 : AH->maxDumpId = 0;
2658 :
2659 16374 : for (i = 0; i < AH->tocCount; i++)
2660 : {
2661 16294 : te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2662 16294 : te->dumpId = ReadInt(AH);
2663 :
2664 16294 : if (te->dumpId > AH->maxDumpId)
2665 6550 : AH->maxDumpId = te->dumpId;
2666 :
2667 : /* Sanity check */
2668 16294 : if (te->dumpId <= 0)
2669 0 : pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2670 : te->dumpId);
2671 :
2672 16294 : te->hadDumper = ReadInt(AH);
2673 :
2674 16294 : if (AH->version >= K_VERS_1_8)
2675 : {
2676 16294 : tmp = ReadStr(AH);
2677 16294 : sscanf(tmp, "%u", &te->catalogId.tableoid);
2678 16294 : free(tmp);
2679 : }
2680 : else
2681 0 : te->catalogId.tableoid = InvalidOid;
2682 16294 : tmp = ReadStr(AH);
2683 16294 : sscanf(tmp, "%u", &te->catalogId.oid);
2684 16294 : free(tmp);
2685 :
2686 16294 : te->tag = ReadStr(AH);
2687 16294 : te->desc = ReadStr(AH);
2688 :
2689 16294 : if (AH->version >= K_VERS_1_11)
2690 : {
2691 16294 : te->section = ReadInt(AH);
2692 : }
2693 : else
2694 : {
2695 : /*
2696 : * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2697 : * the entries into sections. This list need not cover entry
2698 : * types added later than 8.4.
2699 : */
2700 0 : if (strcmp(te->desc, "COMMENT") == 0 ||
2701 0 : strcmp(te->desc, "ACL") == 0 ||
2702 0 : strcmp(te->desc, "ACL LANGUAGE") == 0)
2703 0 : te->section = SECTION_NONE;
2704 0 : else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2705 0 : strcmp(te->desc, "BLOBS") == 0 ||
2706 0 : strcmp(te->desc, "BLOB COMMENTS") == 0)
2707 0 : te->section = SECTION_DATA;
2708 0 : else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2709 0 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2710 0 : strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2711 0 : strcmp(te->desc, "INDEX") == 0 ||
2712 0 : strcmp(te->desc, "RULE") == 0 ||
2713 0 : strcmp(te->desc, "TRIGGER") == 0)
2714 0 : te->section = SECTION_POST_DATA;
2715 : else
2716 0 : te->section = SECTION_PRE_DATA;
2717 : }
2718 :
2719 16294 : te->defn = ReadStr(AH);
2720 16294 : te->dropStmt = ReadStr(AH);
2721 :
2722 16294 : if (AH->version >= K_VERS_1_3)
2723 16294 : te->copyStmt = ReadStr(AH);
2724 :
2725 16294 : if (AH->version >= K_VERS_1_6)
2726 16294 : te->namespace = ReadStr(AH);
2727 :
2728 16294 : if (AH->version >= K_VERS_1_10)
2729 16294 : te->tablespace = ReadStr(AH);
2730 :
2731 16294 : if (AH->version >= K_VERS_1_14)
2732 16294 : te->tableam = ReadStr(AH);
2733 :
2734 16294 : if (AH->version >= K_VERS_1_16)
2735 16294 : te->relkind = ReadInt(AH);
2736 :
2737 16294 : te->owner = ReadStr(AH);
2738 16294 : is_supported = true;
2739 16294 : if (AH->version < K_VERS_1_9)
2740 0 : is_supported = false;
2741 : else
2742 : {
2743 16294 : tmp = ReadStr(AH);
2744 :
2745 16294 : if (strcmp(tmp, "true") == 0)
2746 0 : is_supported = false;
2747 :
2748 16294 : free(tmp);
2749 : }
2750 :
2751 16294 : if (!is_supported)
2752 0 : pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2753 :
2754 : /* Read TOC entry dependencies */
2755 16294 : if (AH->version >= K_VERS_1_5)
2756 : {
2757 16294 : depSize = 100;
2758 16294 : deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2759 16294 : depIdx = 0;
2760 : for (;;)
2761 : {
2762 42960 : tmp = ReadStr(AH);
2763 42960 : if (!tmp)
2764 16294 : break; /* end of list */
2765 26666 : if (depIdx >= depSize)
2766 : {
2767 0 : depSize *= 2;
2768 0 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2769 : }
2770 26666 : sscanf(tmp, "%d", &deps[depIdx]);
2771 26666 : free(tmp);
2772 26666 : depIdx++;
2773 : }
2774 :
2775 16294 : if (depIdx > 0) /* We have a non-null entry */
2776 : {
2777 13824 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2778 13824 : te->dependencies = deps;
2779 13824 : te->nDeps = depIdx;
2780 : }
2781 : else
2782 : {
2783 2470 : free(deps);
2784 2470 : te->dependencies = NULL;
2785 2470 : te->nDeps = 0;
2786 : }
2787 : }
2788 : else
2789 : {
2790 0 : te->dependencies = NULL;
2791 0 : te->nDeps = 0;
2792 : }
2793 16294 : te->dataLength = 0;
2794 :
2795 16294 : if (AH->ReadExtraTocPtr)
2796 16294 : AH->ReadExtraTocPtr(AH, te);
2797 :
2798 16294 : pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2799 : i, te->dumpId, te->desc, te->tag);
2800 :
2801 : /* link completed entry into TOC circular list */
2802 16294 : te->prev = AH->toc->prev;
2803 16294 : AH->toc->prev->next = te;
2804 16294 : AH->toc->prev = te;
2805 16294 : te->next = AH->toc;
2806 :
2807 : /* special processing immediately upon read for some items */
2808 16294 : if (strcmp(te->desc, "ENCODING") == 0)
2809 80 : processEncodingEntry(AH, te);
2810 16214 : else if (strcmp(te->desc, "STDSTRINGS") == 0)
2811 80 : processStdStringsEntry(AH, te);
2812 16134 : else if (strcmp(te->desc, "SEARCHPATH") == 0)
2813 80 : processSearchPathEntry(AH, te);
2814 : }
2815 80 : }
2816 :
2817 : static void
2818 80 : processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2819 : {
2820 : /* te->defn should have the form SET client_encoding = 'foo'; */
2821 80 : char *defn = pg_strdup(te->defn);
2822 : char *ptr1;
2823 80 : char *ptr2 = NULL;
2824 : int encoding;
2825 :
2826 80 : ptr1 = strchr(defn, '\'');
2827 80 : if (ptr1)
2828 80 : ptr2 = strchr(++ptr1, '\'');
2829 80 : if (ptr2)
2830 : {
2831 80 : *ptr2 = '\0';
2832 80 : encoding = pg_char_to_encoding(ptr1);
2833 80 : if (encoding < 0)
2834 0 : pg_fatal("unrecognized encoding \"%s\"",
2835 : ptr1);
2836 80 : AH->public.encoding = encoding;
2837 80 : setFmtEncoding(encoding);
2838 : }
2839 : else
2840 0 : pg_fatal("invalid ENCODING item: %s",
2841 : te->defn);
2842 :
2843 80 : free(defn);
2844 80 : }
2845 :
2846 : static void
2847 80 : processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2848 : {
2849 : /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2850 : char *ptr1;
2851 :
2852 80 : ptr1 = strchr(te->defn, '\'');
2853 80 : if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2854 80 : AH->public.std_strings = true;
2855 0 : else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2856 0 : AH->public.std_strings = false;
2857 : else
2858 0 : pg_fatal("invalid STDSTRINGS item: %s",
2859 : te->defn);
2860 80 : }
2861 :
2862 : static void
2863 80 : processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2864 : {
2865 : /*
2866 : * te->defn should contain a command to set search_path. We just copy it
2867 : * verbatim for use later.
2868 : */
2869 80 : AH->public.searchpath = pg_strdup(te->defn);
2870 80 : }
2871 :
2872 : static void
2873 0 : StrictNamesCheck(RestoreOptions *ropt)
2874 : {
2875 : const char *missing_name;
2876 :
2877 : Assert(ropt->strict_names);
2878 :
2879 0 : if (ropt->schemaNames.head != NULL)
2880 : {
2881 0 : missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2882 0 : if (missing_name != NULL)
2883 0 : pg_fatal("schema \"%s\" not found", missing_name);
2884 : }
2885 :
2886 0 : if (ropt->tableNames.head != NULL)
2887 : {
2888 0 : missing_name = simple_string_list_not_touched(&ropt->tableNames);
2889 0 : if (missing_name != NULL)
2890 0 : pg_fatal("table \"%s\" not found", missing_name);
2891 : }
2892 :
2893 0 : if (ropt->indexNames.head != NULL)
2894 : {
2895 0 : missing_name = simple_string_list_not_touched(&ropt->indexNames);
2896 0 : if (missing_name != NULL)
2897 0 : pg_fatal("index \"%s\" not found", missing_name);
2898 : }
2899 :
2900 0 : if (ropt->functionNames.head != NULL)
2901 : {
2902 0 : missing_name = simple_string_list_not_touched(&ropt->functionNames);
2903 0 : if (missing_name != NULL)
2904 0 : pg_fatal("function \"%s\" not found", missing_name);
2905 : }
2906 :
2907 0 : if (ropt->triggerNames.head != NULL)
2908 : {
2909 0 : missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2910 0 : if (missing_name != NULL)
2911 0 : pg_fatal("trigger \"%s\" not found", missing_name);
2912 : }
2913 0 : }
2914 :
2915 : /*
2916 : * Determine whether we want to restore this TOC entry.
2917 : *
2918 : * Returns 0 if entry should be skipped, or some combination of the
2919 : * REQ_SCHEMA, REQ_DATA, and REQ_STATS bits if we want to restore schema, data
2920 : * and/or statistics portions of this TOC entry, or REQ_SPECIAL if it's a
2921 : * special entry.
2922 : */
2923 : static int
2924 101800 : _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2925 : {
2926 101800 : int res = REQ_SCHEMA | REQ_DATA;
2927 101800 : RestoreOptions *ropt = AH->public.ropt;
2928 :
2929 : /* These items are treated specially */
2930 101800 : if (strcmp(te->desc, "ENCODING") == 0 ||
2931 101400 : strcmp(te->desc, "STDSTRINGS") == 0 ||
2932 101000 : strcmp(te->desc, "SEARCHPATH") == 0)
2933 1200 : return REQ_SPECIAL;
2934 :
2935 100600 : if (strcmp(te->desc, "STATISTICS DATA") == 0)
2936 : {
2937 15718 : if (!ropt->dumpStatistics)
2938 0 : return 0;
2939 : else
2940 15718 : res = REQ_STATS;
2941 : }
2942 :
2943 : /*
2944 : * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2945 : * restored in createDB mode, and not restored otherwise, independently of
2946 : * all else.
2947 : */
2948 100600 : if (strcmp(te->desc, "DATABASE") == 0 ||
2949 100396 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2950 : {
2951 268 : if (ropt->createDB)
2952 212 : return REQ_SCHEMA;
2953 : else
2954 56 : return 0;
2955 : }
2956 :
2957 : /*
2958 : * Process exclusions that affect certain classes of TOC entries.
2959 : */
2960 :
2961 : /* If it's an ACL, maybe ignore it */
2962 100332 : if (ropt->aclsSkip && _tocEntryIsACL(te))
2963 0 : return 0;
2964 :
2965 : /* If it's a comment, maybe ignore it */
2966 100332 : if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2967 0 : return 0;
2968 :
2969 : /*
2970 : * If it's a publication or a table part of a publication, maybe ignore
2971 : * it.
2972 : */
2973 100332 : if (ropt->no_publications &&
2974 0 : (strcmp(te->desc, "PUBLICATION") == 0 ||
2975 0 : strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2976 0 : strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2977 0 : return 0;
2978 :
2979 : /* If it's a security label, maybe ignore it */
2980 100332 : if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2981 0 : return 0;
2982 :
2983 : /* If it's a subscription, maybe ignore it */
2984 100332 : if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2985 0 : return 0;
2986 :
2987 : /* If it's statistics and we don't want statistics, maybe ignore it */
2988 100332 : if (!ropt->dumpStatistics && strcmp(te->desc, "STATISTICS DATA") == 0)
2989 0 : return 0;
2990 :
2991 : /* Ignore it if section is not to be dumped/restored */
2992 100332 : switch (curSection)
2993 : {
2994 60024 : case SECTION_PRE_DATA:
2995 60024 : if (!(ropt->dumpSections & DUMP_PRE_DATA))
2996 708 : return 0;
2997 59316 : break;
2998 20912 : case SECTION_DATA:
2999 20912 : if (!(ropt->dumpSections & DUMP_DATA))
3000 356 : return 0;
3001 20556 : break;
3002 19396 : case SECTION_POST_DATA:
3003 19396 : if (!(ropt->dumpSections & DUMP_POST_DATA))
3004 408 : return 0;
3005 18988 : break;
3006 0 : default:
3007 : /* shouldn't get here, really, but ignore it */
3008 0 : return 0;
3009 : }
3010 :
3011 : /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
3012 98860 : if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
3013 0 : return 0;
3014 :
3015 : /*
3016 : * Check options for selective dump/restore.
3017 : */
3018 98860 : if (strcmp(te->desc, "ACL") == 0 ||
3019 93904 : strcmp(te->desc, "COMMENT") == 0 ||
3020 80638 : strcmp(te->desc, "STATISTICS DATA") == 0 ||
3021 65188 : strcmp(te->desc, "SECURITY LABEL") == 0)
3022 : {
3023 : /* Database properties react to createDB, not selectivity options. */
3024 33672 : if (strncmp(te->tag, "DATABASE ", 9) == 0)
3025 : {
3026 130 : if (!ropt->createDB)
3027 38 : return 0;
3028 : }
3029 33542 : else if (ropt->schemaNames.head != NULL ||
3030 33530 : ropt->schemaExcludeNames.head != NULL ||
3031 33518 : ropt->selTypes)
3032 : {
3033 : /*
3034 : * In a selective dump/restore, we want to restore these dependent
3035 : * TOC entry types only if their parent object is being restored.
3036 : * Without selectivity options, we let through everything in the
3037 : * archive. Note there may be such entries with no parent, eg
3038 : * non-default ACLs for built-in objects. Also, we make
3039 : * per-column ACLs additionally depend on the table's ACL if any
3040 : * to ensure correct restore order, so those dependencies should
3041 : * be ignored in this check.
3042 : *
3043 : * This code depends on the parent having been marked already,
3044 : * which should be the case; if it isn't, perhaps due to
3045 : * SortTocFromFile rearrangement, skipping the dependent entry
3046 : * seems prudent anyway.
3047 : *
3048 : * Ideally we'd handle, eg, table CHECK constraints this way too.
3049 : * But it's hard to tell which of their dependencies is the one to
3050 : * consult.
3051 : */
3052 76 : bool dumpthis = false;
3053 :
3054 196 : for (int i = 0; i < te->nDeps; i++)
3055 : {
3056 136 : TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3057 :
3058 136 : if (!pte)
3059 60 : continue; /* probably shouldn't happen */
3060 76 : if (strcmp(pte->desc, "ACL") == 0)
3061 0 : continue; /* ignore dependency on another ACL */
3062 76 : if (pte->reqs == 0)
3063 60 : continue; /* this object isn't marked, so ignore it */
3064 : /* Found a parent to be dumped, so we want to dump this too */
3065 16 : dumpthis = true;
3066 16 : break;
3067 : }
3068 76 : if (!dumpthis)
3069 60 : return 0;
3070 : }
3071 : }
3072 : else
3073 : {
3074 : /* Apply selective-restore rules for standalone TOC entries. */
3075 65188 : if (ropt->schemaNames.head != NULL)
3076 : {
3077 : /* If no namespace is specified, it means all. */
3078 40 : if (!te->namespace)
3079 4 : return 0;
3080 36 : if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3081 28 : return 0;
3082 : }
3083 :
3084 65156 : if (ropt->schemaExcludeNames.head != NULL &&
3085 76 : te->namespace &&
3086 36 : simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3087 8 : return 0;
3088 :
3089 65148 : if (ropt->selTypes)
3090 : {
3091 156 : if (strcmp(te->desc, "TABLE") == 0 ||
3092 116 : strcmp(te->desc, "TABLE DATA") == 0 ||
3093 76 : strcmp(te->desc, "VIEW") == 0 ||
3094 76 : strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3095 76 : strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3096 76 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3097 76 : strcmp(te->desc, "SEQUENCE") == 0 ||
3098 70 : strcmp(te->desc, "SEQUENCE SET") == 0)
3099 : {
3100 92 : if (!ropt->selTable)
3101 60 : return 0;
3102 32 : if (ropt->tableNames.head != NULL &&
3103 32 : !simple_string_list_member(&ropt->tableNames, te->tag))
3104 28 : return 0;
3105 : }
3106 64 : else if (strcmp(te->desc, "INDEX") == 0)
3107 : {
3108 12 : if (!ropt->selIndex)
3109 8 : return 0;
3110 4 : if (ropt->indexNames.head != NULL &&
3111 4 : !simple_string_list_member(&ropt->indexNames, te->tag))
3112 2 : return 0;
3113 : }
3114 52 : else if (strcmp(te->desc, "FUNCTION") == 0 ||
3115 28 : strcmp(te->desc, "AGGREGATE") == 0 ||
3116 28 : strcmp(te->desc, "PROCEDURE") == 0)
3117 : {
3118 24 : if (!ropt->selFunction)
3119 8 : return 0;
3120 16 : if (ropt->functionNames.head != NULL &&
3121 16 : !simple_string_list_member(&ropt->functionNames, te->tag))
3122 12 : return 0;
3123 : }
3124 28 : else if (strcmp(te->desc, "TRIGGER") == 0)
3125 : {
3126 12 : if (!ropt->selTrigger)
3127 8 : return 0;
3128 4 : if (ropt->triggerNames.head != NULL &&
3129 4 : !simple_string_list_member(&ropt->triggerNames, te->tag))
3130 2 : return 0;
3131 : }
3132 : else
3133 16 : return 0;
3134 : }
3135 : }
3136 :
3137 :
3138 : /*
3139 : * Determine whether the TOC entry contains schema and/or data components,
3140 : * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3141 : * it's both schema and data. Otherwise it's probably schema-only, but
3142 : * there are exceptions.
3143 : */
3144 98578 : if (!te->hadDumper)
3145 : {
3146 : /*
3147 : * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3148 : * is considered a data entry. We don't need to check for BLOBS or
3149 : * old-style BLOB COMMENTS entries, because they will have hadDumper =
3150 : * true ... but we do need to check new-style BLOB ACLs, comments,
3151 : * etc.
3152 : */
3153 89866 : if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3154 88898 : strcmp(te->desc, "BLOB") == 0 ||
3155 88898 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3156 88698 : (strcmp(te->desc, "ACL") == 0 &&
3157 4956 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3158 88606 : (strcmp(te->desc, "COMMENT") == 0 &&
3159 13228 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3160 88488 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3161 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3162 1378 : res = res & REQ_DATA;
3163 : else
3164 88488 : res = res & ~REQ_DATA;
3165 : }
3166 :
3167 : /*
3168 : * If there's no definition command, there's no schema component. Treat
3169 : * "load via partition root" comments as not schema.
3170 : */
3171 98578 : if (!te->defn || !te->defn[0] ||
3172 89890 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
3173 8712 : res = res & ~REQ_SCHEMA;
3174 :
3175 : /*
3176 : * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3177 : * always ignore it.
3178 : */
3179 98578 : if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3180 0 : return 0;
3181 :
3182 : /* Mask it if we don't want data */
3183 98578 : if (!ropt->dumpData)
3184 : {
3185 : /*
3186 : * The sequence_data option overrides dumpData for SEQUENCE SET.
3187 : *
3188 : * In binary-upgrade mode, even with dumpData unset, we do not mask
3189 : * out large objects. (Only large object definitions, comments and
3190 : * other metadata should be generated in binary-upgrade mode, not the
3191 : * actual data, but that need not concern us here.)
3192 : */
3193 7610 : if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3194 7488 : !(ropt->binary_upgrade &&
3195 6734 : (strcmp(te->desc, "BLOB") == 0 ||
3196 6734 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3197 6728 : (strcmp(te->desc, "ACL") == 0 &&
3198 168 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3199 6726 : (strcmp(te->desc, "COMMENT") == 0 &&
3200 116 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3201 6720 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3202 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3203 7474 : res = res & (REQ_SCHEMA | REQ_STATS);
3204 : }
3205 :
3206 : /* Mask it if we don't want schema */
3207 98578 : if (!ropt->dumpSchema)
3208 776 : res = res & (REQ_DATA | REQ_STATS);
3209 :
3210 98578 : return res;
3211 : }
3212 :
3213 : /*
3214 : * Identify which pass we should restore this TOC entry in.
3215 : *
3216 : * See notes with the RestorePass typedef in pg_backup_archiver.h.
3217 : */
3218 : static RestorePass
3219 218344 : _tocEntryRestorePass(TocEntry *te)
3220 : {
3221 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3222 218344 : if (strcmp(te->desc, "ACL") == 0 ||
3223 207854 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3224 207854 : strcmp(te->desc, "DEFAULT ACL") == 0)
3225 11246 : return RESTORE_PASS_ACL;
3226 207098 : if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3227 206878 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3228 1884 : return RESTORE_PASS_POST_ACL;
3229 :
3230 : /*
3231 : * Comments need to be emitted in the same pass as their parent objects.
3232 : * ACLs haven't got comments, and neither do matview data objects, but
3233 : * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3234 : * we'd need yet another weird special case.)
3235 : */
3236 205214 : if (strcmp(te->desc, "COMMENT") == 0 &&
3237 26606 : strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3238 0 : return RESTORE_PASS_POST_ACL;
3239 :
3240 : /* All else can be handled in the main pass. */
3241 205214 : return RESTORE_PASS_MAIN;
3242 : }
3243 :
3244 : /*
3245 : * Identify TOC entries that are ACLs.
3246 : *
3247 : * Note: it seems worth duplicating some code here to avoid a hard-wired
3248 : * assumption that these are exactly the same entries that we restore during
3249 : * the RESTORE_PASS_ACL phase.
3250 : */
3251 : static bool
3252 84236 : _tocEntryIsACL(TocEntry *te)
3253 : {
3254 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3255 84236 : if (strcmp(te->desc, "ACL") == 0 ||
3256 80588 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3257 80588 : strcmp(te->desc, "DEFAULT ACL") == 0)
3258 3900 : return true;
3259 80336 : return false;
3260 : }
3261 :
3262 : /*
3263 : * Issue SET commands for parameters that we want to have set the same way
3264 : * at all times during execution of a restore script.
3265 : */
3266 : static void
3267 478 : _doSetFixedOutputState(ArchiveHandle *AH)
3268 : {
3269 478 : RestoreOptions *ropt = AH->public.ropt;
3270 :
3271 : /*
3272 : * Disable timeouts to allow for slow commands, idle parallel workers, etc
3273 : */
3274 478 : ahprintf(AH, "SET statement_timeout = 0;\n");
3275 478 : ahprintf(AH, "SET lock_timeout = 0;\n");
3276 478 : ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3277 478 : ahprintf(AH, "SET transaction_timeout = 0;\n");
3278 :
3279 : /* Select the correct character set encoding */
3280 478 : ahprintf(AH, "SET client_encoding = '%s';\n",
3281 : pg_encoding_to_char(AH->public.encoding));
3282 :
3283 : /* Select the correct string literal syntax */
3284 478 : ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3285 478 : AH->public.std_strings ? "on" : "off");
3286 :
3287 : /* Select the role to be used during restore */
3288 478 : if (ropt && ropt->use_role)
3289 0 : ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3290 :
3291 : /* Select the dump-time search_path */
3292 478 : if (AH->public.searchpath)
3293 478 : ahprintf(AH, "%s", AH->public.searchpath);
3294 :
3295 : /* Make sure function checking is disabled */
3296 478 : ahprintf(AH, "SET check_function_bodies = false;\n");
3297 :
3298 : /* Ensure that all valid XML data will be accepted */
3299 478 : ahprintf(AH, "SET xmloption = content;\n");
3300 :
3301 : /* Avoid annoying notices etc */
3302 478 : ahprintf(AH, "SET client_min_messages = warning;\n");
3303 478 : if (!AH->public.std_strings)
3304 0 : ahprintf(AH, "SET escape_string_warning = off;\n");
3305 :
3306 : /* Adjust row-security state */
3307 478 : if (ropt && ropt->enable_row_security)
3308 0 : ahprintf(AH, "SET row_security = on;\n");
3309 : else
3310 478 : ahprintf(AH, "SET row_security = off;\n");
3311 :
3312 : /*
3313 : * In --transaction-size mode, we should always be in a transaction when
3314 : * we begin to restore objects.
3315 : */
3316 478 : if (ropt && ropt->txn_size > 0)
3317 : {
3318 72 : if (AH->connection)
3319 72 : StartTransaction(&AH->public);
3320 : else
3321 0 : ahprintf(AH, "\nBEGIN;\n");
3322 72 : AH->txnCount = 0;
3323 : }
3324 :
3325 478 : ahprintf(AH, "\n");
3326 478 : }
3327 :
3328 : /*
3329 : * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3330 : * for updating state if appropriate. If user is NULL or an empty string,
3331 : * the specification DEFAULT will be used.
3332 : */
3333 : static void
3334 2 : _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3335 : {
3336 2 : PQExpBuffer cmd = createPQExpBuffer();
3337 :
3338 2 : appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3339 :
3340 : /*
3341 : * SQL requires a string literal here. Might as well be correct.
3342 : */
3343 2 : if (user && *user)
3344 2 : appendStringLiteralAHX(cmd, user, AH);
3345 : else
3346 0 : appendPQExpBufferStr(cmd, "DEFAULT");
3347 2 : appendPQExpBufferChar(cmd, ';');
3348 :
3349 2 : if (RestoringToDB(AH))
3350 : {
3351 : PGresult *res;
3352 :
3353 0 : res = PQexec(AH->connection, cmd->data);
3354 :
3355 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3356 : /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3357 0 : pg_fatal("could not set session user to \"%s\": %s",
3358 : user, PQerrorMessage(AH->connection));
3359 :
3360 0 : PQclear(res);
3361 : }
3362 : else
3363 2 : ahprintf(AH, "%s\n\n", cmd->data);
3364 :
3365 2 : destroyPQExpBuffer(cmd);
3366 2 : }
3367 :
3368 :
3369 : /*
3370 : * Issue the commands to connect to the specified database.
3371 : *
3372 : * If we're currently restoring right into a database, this will
3373 : * actually establish a connection. Otherwise it puts a \connect into
3374 : * the script output.
3375 : */
3376 : static void
3377 120 : _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3378 : {
3379 120 : if (RestoringToDB(AH))
3380 50 : ReconnectToServer(AH, dbname);
3381 : else
3382 : {
3383 : PQExpBufferData connectbuf;
3384 :
3385 70 : initPQExpBuffer(&connectbuf);
3386 70 : appendPsqlMetaConnect(&connectbuf, dbname);
3387 70 : ahprintf(AH, "%s\n", connectbuf.data);
3388 70 : termPQExpBuffer(&connectbuf);
3389 : }
3390 :
3391 : /*
3392 : * NOTE: currUser keeps track of what the imaginary session user in our
3393 : * script is. It's now effectively reset to the original userID.
3394 : */
3395 120 : free(AH->currUser);
3396 120 : AH->currUser = NULL;
3397 :
3398 : /* don't assume we still know the output schema, tablespace, etc either */
3399 120 : free(AH->currSchema);
3400 120 : AH->currSchema = NULL;
3401 :
3402 120 : free(AH->currTableAm);
3403 120 : AH->currTableAm = NULL;
3404 :
3405 120 : free(AH->currTablespace);
3406 120 : AH->currTablespace = NULL;
3407 :
3408 : /* re-establish fixed state */
3409 120 : _doSetFixedOutputState(AH);
3410 120 : }
3411 :
3412 : /*
3413 : * Become the specified user, and update state to avoid redundant commands
3414 : *
3415 : * NULL or empty argument is taken to mean restoring the session default
3416 : */
3417 : static void
3418 124 : _becomeUser(ArchiveHandle *AH, const char *user)
3419 : {
3420 124 : if (!user)
3421 0 : user = ""; /* avoid null pointers */
3422 :
3423 124 : if (AH->currUser && strcmp(AH->currUser, user) == 0)
3424 122 : return; /* no need to do anything */
3425 :
3426 2 : _doSetSessionAuth(AH, user);
3427 :
3428 : /*
3429 : * NOTE: currUser keeps track of what the imaginary session user in our
3430 : * script is
3431 : */
3432 2 : free(AH->currUser);
3433 2 : AH->currUser = pg_strdup(user);
3434 : }
3435 :
3436 : /*
3437 : * Become the owner of the given TOC entry object. If
3438 : * changes in ownership are not allowed, this doesn't do anything.
3439 : */
3440 : static void
3441 91982 : _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3442 : {
3443 91982 : RestoreOptions *ropt = AH->public.ropt;
3444 :
3445 91982 : if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3446 91982 : return;
3447 :
3448 0 : _becomeUser(AH, te->owner);
3449 : }
3450 :
3451 :
3452 : /*
3453 : * Issue the commands to select the specified schema as the current schema
3454 : * in the target database.
3455 : */
3456 : static void
3457 92126 : _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3458 : {
3459 : PQExpBuffer qry;
3460 :
3461 : /*
3462 : * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3463 : * that search_path rather than switching to entry-specific paths.
3464 : * Otherwise, it's an old archive that will not restore correctly unless
3465 : * we set the search_path as it's expecting.
3466 : */
3467 92126 : if (AH->public.searchpath)
3468 92126 : return;
3469 :
3470 0 : if (!schemaName || *schemaName == '\0' ||
3471 0 : (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3472 0 : return; /* no need to do anything */
3473 :
3474 0 : qry = createPQExpBuffer();
3475 :
3476 0 : appendPQExpBuffer(qry, "SET search_path = %s",
3477 : fmtId(schemaName));
3478 0 : if (strcmp(schemaName, "pg_catalog") != 0)
3479 0 : appendPQExpBufferStr(qry, ", pg_catalog");
3480 :
3481 0 : if (RestoringToDB(AH))
3482 : {
3483 : PGresult *res;
3484 :
3485 0 : res = PQexec(AH->connection, qry->data);
3486 :
3487 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3488 0 : warn_or_exit_horribly(AH,
3489 : "could not set \"search_path\" to \"%s\": %s",
3490 0 : schemaName, PQerrorMessage(AH->connection));
3491 :
3492 0 : PQclear(res);
3493 : }
3494 : else
3495 0 : ahprintf(AH, "%s;\n\n", qry->data);
3496 :
3497 0 : free(AH->currSchema);
3498 0 : AH->currSchema = pg_strdup(schemaName);
3499 :
3500 0 : destroyPQExpBuffer(qry);
3501 : }
3502 :
3503 : /*
3504 : * Issue the commands to select the specified tablespace as the current one
3505 : * in the target database.
3506 : */
3507 : static void
3508 83622 : _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3509 : {
3510 83622 : RestoreOptions *ropt = AH->public.ropt;
3511 : PQExpBuffer qry;
3512 : const char *want,
3513 : *have;
3514 :
3515 : /* do nothing in --no-tablespaces mode */
3516 83622 : if (ropt->noTablespace)
3517 0 : return;
3518 :
3519 83622 : have = AH->currTablespace;
3520 83622 : want = tablespace;
3521 :
3522 : /* no need to do anything for non-tablespace object */
3523 83622 : if (!want)
3524 68060 : return;
3525 :
3526 15562 : if (have && strcmp(want, have) == 0)
3527 15168 : return; /* no need to do anything */
3528 :
3529 394 : qry = createPQExpBuffer();
3530 :
3531 394 : if (strcmp(want, "") == 0)
3532 : {
3533 : /* We want the tablespace to be the database's default */
3534 298 : appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3535 : }
3536 : else
3537 : {
3538 : /* We want an explicit tablespace */
3539 96 : appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3540 : }
3541 :
3542 394 : if (RestoringToDB(AH))
3543 : {
3544 : PGresult *res;
3545 :
3546 22 : res = PQexec(AH->connection, qry->data);
3547 :
3548 22 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3549 0 : warn_or_exit_horribly(AH,
3550 : "could not set \"default_tablespace\" to %s: %s",
3551 0 : fmtId(want), PQerrorMessage(AH->connection));
3552 :
3553 22 : PQclear(res);
3554 : }
3555 : else
3556 372 : ahprintf(AH, "%s;\n\n", qry->data);
3557 :
3558 394 : free(AH->currTablespace);
3559 394 : AH->currTablespace = pg_strdup(want);
3560 :
3561 394 : destroyPQExpBuffer(qry);
3562 : }
3563 :
3564 : /*
3565 : * Set the proper default_table_access_method value for the table.
3566 : */
3567 : static void
3568 82576 : _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3569 : {
3570 82576 : RestoreOptions *ropt = AH->public.ropt;
3571 : PQExpBuffer cmd;
3572 : const char *want,
3573 : *have;
3574 :
3575 : /* do nothing in --no-table-access-method mode */
3576 82576 : if (ropt->noTableAm)
3577 690 : return;
3578 :
3579 81886 : have = AH->currTableAm;
3580 81886 : want = tableam;
3581 :
3582 81886 : if (!want)
3583 72362 : return;
3584 :
3585 9524 : if (have && strcmp(want, have) == 0)
3586 8912 : return;
3587 :
3588 612 : cmd = createPQExpBuffer();
3589 612 : appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3590 :
3591 612 : if (RestoringToDB(AH))
3592 : {
3593 : PGresult *res;
3594 :
3595 22 : res = PQexec(AH->connection, cmd->data);
3596 :
3597 22 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3598 0 : warn_or_exit_horribly(AH,
3599 : "could not set \"default_table_access_method\": %s",
3600 0 : PQerrorMessage(AH->connection));
3601 :
3602 22 : PQclear(res);
3603 : }
3604 : else
3605 590 : ahprintf(AH, "%s\n\n", cmd->data);
3606 :
3607 612 : destroyPQExpBuffer(cmd);
3608 :
3609 612 : free(AH->currTableAm);
3610 612 : AH->currTableAm = pg_strdup(want);
3611 : }
3612 :
3613 : /*
3614 : * Set the proper default table access method for a table without storage.
3615 : * Currently, this is required only for partitioned tables with a table AM.
3616 : */
3617 : static void
3618 1046 : _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
3619 : {
3620 1046 : RestoreOptions *ropt = AH->public.ropt;
3621 1046 : const char *tableam = te->tableam;
3622 : PQExpBuffer cmd;
3623 :
3624 : /* do nothing in --no-table-access-method mode */
3625 1046 : if (ropt->noTableAm)
3626 6 : return;
3627 :
3628 1040 : if (!tableam)
3629 976 : return;
3630 :
3631 : Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3632 :
3633 64 : cmd = createPQExpBuffer();
3634 :
3635 64 : appendPQExpBufferStr(cmd, "ALTER TABLE ");
3636 64 : appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3637 64 : appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3638 : fmtId(tableam));
3639 :
3640 64 : if (RestoringToDB(AH))
3641 : {
3642 : PGresult *res;
3643 :
3644 0 : res = PQexec(AH->connection, cmd->data);
3645 :
3646 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3647 0 : warn_or_exit_horribly(AH,
3648 : "could not alter table access method: %s",
3649 0 : PQerrorMessage(AH->connection));
3650 0 : PQclear(res);
3651 : }
3652 : else
3653 64 : ahprintf(AH, "%s\n\n", cmd->data);
3654 :
3655 64 : destroyPQExpBuffer(cmd);
3656 : }
3657 :
3658 : /*
3659 : * Extract an object description for a TOC entry, and append it to buf.
3660 : *
3661 : * This is used for ALTER ... OWNER TO.
3662 : *
3663 : * If the object type has no owner, do nothing.
3664 : */
3665 : static void
3666 39960 : _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
3667 : {
3668 39960 : const char *type = te->desc;
3669 :
3670 : /* objects that don't require special decoration */
3671 39960 : if (strcmp(type, "COLLATION") == 0 ||
3672 35048 : strcmp(type, "CONVERSION") == 0 ||
3673 34216 : strcmp(type, "DOMAIN") == 0 ||
3674 33942 : strcmp(type, "FOREIGN TABLE") == 0 ||
3675 33874 : strcmp(type, "MATERIALIZED VIEW") == 0 ||
3676 33154 : strcmp(type, "SEQUENCE") == 0 ||
3677 32698 : strcmp(type, "STATISTICS") == 0 ||
3678 32468 : strcmp(type, "TABLE") == 0 ||
3679 22628 : strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3680 22294 : strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3681 22010 : strcmp(type, "TYPE") == 0 ||
3682 20824 : strcmp(type, "VIEW") == 0 ||
3683 : /* non-schema-specified objects */
3684 19792 : strcmp(type, "DATABASE") == 0 ||
3685 19704 : strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3686 19642 : strcmp(type, "SCHEMA") == 0 ||
3687 19312 : strcmp(type, "EVENT TRIGGER") == 0 ||
3688 19240 : strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3689 19152 : strcmp(type, "SERVER") == 0 ||
3690 19060 : strcmp(type, "PUBLICATION") == 0 ||
3691 18748 : strcmp(type, "SUBSCRIPTION") == 0)
3692 : {
3693 21402 : appendPQExpBuffer(buf, "%s ", type);
3694 21402 : if (te->namespace && *te->namespace)
3695 20168 : appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3696 21402 : appendPQExpBufferStr(buf, fmtId(te->tag));
3697 : }
3698 : /* LOs just have a name, but it's numeric so must not use fmtId */
3699 18558 : else if (strcmp(type, "BLOB") == 0)
3700 : {
3701 0 : appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3702 : }
3703 :
3704 : /*
3705 : * These object types require additional decoration. Fortunately, the
3706 : * information needed is exactly what's in the DROP command.
3707 : */
3708 18558 : else if (strcmp(type, "AGGREGATE") == 0 ||
3709 18022 : strcmp(type, "FUNCTION") == 0 ||
3710 14862 : strcmp(type, "OPERATOR") == 0 ||
3711 9894 : strcmp(type, "OPERATOR CLASS") == 0 ||
3712 8610 : strcmp(type, "OPERATOR FAMILY") == 0 ||
3713 7536 : strcmp(type, "PROCEDURE") == 0)
3714 : {
3715 : /* Chop "DROP " off the front and make a modifiable copy */
3716 11206 : char *first = pg_strdup(te->dropStmt + 5);
3717 : char *last;
3718 :
3719 : /* point to last character in string */
3720 11206 : last = first + strlen(first) - 1;
3721 :
3722 : /* Strip off any ';' or '\n' at the end */
3723 33618 : while (last >= first && (*last == '\n' || *last == ';'))
3724 22412 : last--;
3725 11206 : *(last + 1) = '\0';
3726 :
3727 11206 : appendPQExpBufferStr(buf, first);
3728 :
3729 11206 : free(first);
3730 11206 : return;
3731 : }
3732 : /* these object types don't have separate owners */
3733 7352 : else if (strcmp(type, "CAST") == 0 ||
3734 7352 : strcmp(type, "CHECK CONSTRAINT") == 0 ||
3735 7262 : strcmp(type, "CONSTRAINT") == 0 ||
3736 4452 : strcmp(type, "DATABASE PROPERTIES") == 0 ||
3737 4444 : strcmp(type, "DEFAULT") == 0 ||
3738 4112 : strcmp(type, "FK CONSTRAINT") == 0 ||
3739 3784 : strcmp(type, "INDEX") == 0 ||
3740 1726 : strcmp(type, "RULE") == 0 ||
3741 1304 : strcmp(type, "TRIGGER") == 0 ||
3742 536 : strcmp(type, "ROW SECURITY") == 0 ||
3743 536 : strcmp(type, "POLICY") == 0 ||
3744 62 : strcmp(type, "USER MAPPING") == 0)
3745 : {
3746 : /* do nothing */
3747 : }
3748 : else
3749 0 : pg_fatal("don't know how to set owner for object type \"%s\"", type);
3750 : }
3751 :
3752 : /*
3753 : * Emit the SQL commands to create the object represented by a TOC entry
3754 : *
3755 : * This now also includes issuing an ALTER OWNER command to restore the
3756 : * object's ownership, if wanted. But note that the object's permissions
3757 : * will remain at default, until the matching ACL TOC entry is restored.
3758 : */
3759 : static void
3760 83622 : _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx)
3761 : {
3762 83622 : RestoreOptions *ropt = AH->public.ropt;
3763 :
3764 : /*
3765 : * Select owner, schema, tablespace and default AM as necessary. The
3766 : * default access method for partitioned tables is handled after
3767 : * generating the object definition, as it requires an ALTER command
3768 : * rather than SET.
3769 : */
3770 83622 : _becomeOwner(AH, te);
3771 83622 : _selectOutputSchema(AH, te->namespace);
3772 83622 : _selectTablespace(AH, te->tablespace);
3773 83622 : if (te->relkind != RELKIND_PARTITIONED_TABLE)
3774 82576 : _selectTableAccessMethod(AH, te->tableam);
3775 :
3776 : /* Emit header comment for item */
3777 83622 : if (!AH->noTocComments)
3778 : {
3779 : char *sanitized_name;
3780 : char *sanitized_schema;
3781 : char *sanitized_owner;
3782 :
3783 77206 : ahprintf(AH, "--\n");
3784 77206 : if (AH->public.verbose)
3785 : {
3786 2106 : ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3787 : te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3788 2106 : if (te->nDeps > 0)
3789 : {
3790 : int i;
3791 :
3792 1382 : ahprintf(AH, "-- Dependencies:");
3793 3772 : for (i = 0; i < te->nDeps; i++)
3794 2390 : ahprintf(AH, " %d", te->dependencies[i]);
3795 1382 : ahprintf(AH, "\n");
3796 : }
3797 : }
3798 :
3799 77206 : sanitized_name = sanitize_line(te->tag, false);
3800 77206 : sanitized_schema = sanitize_line(te->namespace, true);
3801 77206 : sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3802 :
3803 77206 : ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3804 : pfx, sanitized_name, te->desc, sanitized_schema,
3805 : sanitized_owner);
3806 :
3807 77206 : free(sanitized_name);
3808 77206 : free(sanitized_schema);
3809 77206 : free(sanitized_owner);
3810 :
3811 77206 : if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3812 : {
3813 : char *sanitized_tablespace;
3814 :
3815 204 : sanitized_tablespace = sanitize_line(te->tablespace, false);
3816 204 : ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3817 204 : free(sanitized_tablespace);
3818 : }
3819 77206 : ahprintf(AH, "\n");
3820 :
3821 77206 : if (AH->PrintExtraTocPtr != NULL)
3822 7260 : AH->PrintExtraTocPtr(AH, te);
3823 77206 : ahprintf(AH, "--\n\n");
3824 : }
3825 :
3826 : /*
3827 : * Actually print the definition. Normally we can just print the defn
3828 : * string if any, but we have three special cases:
3829 : *
3830 : * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3831 : * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3832 : * "public" that is a comment. We have to do this when --no-owner mode is
3833 : * selected. This is ugly, but I see no other good way ...
3834 : *
3835 : * 2. BLOB METADATA entries need special processing since their defn
3836 : * strings are just lists of OIDs, not complete SQL commands.
3837 : *
3838 : * 3. ACL LARGE OBJECTS entries need special processing because they
3839 : * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3840 : * apply to each large object listed in the associated BLOB METADATA.
3841 : */
3842 83622 : if (ropt->noOwner &&
3843 736 : strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3844 : {
3845 4 : ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3846 : }
3847 83618 : else if (strcmp(te->desc, "BLOB METADATA") == 0)
3848 : {
3849 150 : IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3850 : }
3851 83468 : else if (strcmp(te->desc, "ACL") == 0 &&
3852 3648 : strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3853 : {
3854 0 : IssueACLPerBlob(AH, te);
3855 : }
3856 83468 : else if (te->defn && strlen(te->defn) > 0)
3857 : {
3858 75584 : ahprintf(AH, "%s\n\n", te->defn);
3859 :
3860 : /*
3861 : * If the defn string contains multiple SQL commands, txn_size mode
3862 : * should count it as N actions not one. But rather than build a full
3863 : * SQL parser, approximate this by counting semicolons. One case
3864 : * where that tends to be badly fooled is function definitions, so
3865 : * ignore them. (restore_toc_entry will count one action anyway.)
3866 : */
3867 75584 : if (ropt->txn_size > 0 &&
3868 6176 : strcmp(te->desc, "FUNCTION") != 0 &&
3869 5642 : strcmp(te->desc, "PROCEDURE") != 0)
3870 : {
3871 5618 : const char *p = te->defn;
3872 5618 : int nsemis = 0;
3873 :
3874 25300 : while ((p = strchr(p, ';')) != NULL)
3875 : {
3876 19682 : nsemis++;
3877 19682 : p++;
3878 : }
3879 5618 : if (nsemis > 1)
3880 2748 : AH->txnCount += nsemis - 1;
3881 : }
3882 : }
3883 :
3884 : /*
3885 : * If we aren't using SET SESSION AUTH to determine ownership, we must
3886 : * instead issue an ALTER OWNER command. Schema "public" is special; when
3887 : * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3888 : * when using SET SESSION for all other objects. We assume that anything
3889 : * without a DROP command is not a separately ownable object.
3890 : */
3891 83622 : if (!ropt->noOwner &&
3892 82886 : (!ropt->use_setsessauth ||
3893 0 : (strcmp(te->desc, "SCHEMA") == 0 &&
3894 0 : strncmp(te->defn, "--", 2) == 0)) &&
3895 82886 : te->owner && strlen(te->owner) > 0 &&
3896 70518 : te->dropStmt && strlen(te->dropStmt) > 0)
3897 : {
3898 40106 : if (strcmp(te->desc, "BLOB METADATA") == 0)
3899 : {
3900 : /* BLOB METADATA needs special code to handle multiple LOs */
3901 146 : char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
3902 :
3903 146 : IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
3904 146 : pg_free(cmdEnd);
3905 : }
3906 : else
3907 : {
3908 : /* For all other cases, we can use _getObjectDescription */
3909 : PQExpBufferData temp;
3910 :
3911 39960 : initPQExpBuffer(&temp);
3912 39960 : _getObjectDescription(&temp, te);
3913 :
3914 : /*
3915 : * If _getObjectDescription() didn't fill the buffer, then there
3916 : * is no owner.
3917 : */
3918 39960 : if (temp.data[0])
3919 32608 : ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
3920 32608 : temp.data, fmtId(te->owner));
3921 39960 : termPQExpBuffer(&temp);
3922 : }
3923 : }
3924 :
3925 : /*
3926 : * Select a partitioned table's default AM, once the table definition has
3927 : * been generated.
3928 : */
3929 83622 : if (te->relkind == RELKIND_PARTITIONED_TABLE)
3930 1046 : _printTableAccessMethodNoStorage(AH, te);
3931 :
3932 : /*
3933 : * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3934 : * commands, so we can no longer assume we know the current auth setting.
3935 : */
3936 83622 : if (_tocEntryIsACL(te))
3937 : {
3938 3900 : free(AH->currUser);
3939 3900 : AH->currUser = NULL;
3940 : }
3941 83622 : }
3942 :
3943 : /*
3944 : * Sanitize a string to be included in an SQL comment or TOC listing, by
3945 : * replacing any newlines with spaces. This ensures each logical output line
3946 : * is in fact one physical output line, to prevent corruption of the dump
3947 : * (which could, in the worst case, present an SQL injection vulnerability
3948 : * if someone were to incautiously load a dump containing objects with
3949 : * maliciously crafted names).
3950 : *
3951 : * The result is a freshly malloc'd string. If the input string is NULL,
3952 : * return a malloc'ed empty string, unless want_hyphen, in which case return a
3953 : * malloc'ed hyphen.
3954 : *
3955 : * Note that we currently don't bother to quote names, meaning that the name
3956 : * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3957 : * it only examines the dumpId field, but someday we might want to try harder.
3958 : */
3959 : static char *
3960 240182 : sanitize_line(const char *str, bool want_hyphen)
3961 : {
3962 : char *result;
3963 : char *s;
3964 :
3965 240182 : if (!str)
3966 14402 : return pg_strdup(want_hyphen ? "-" : "");
3967 :
3968 225780 : result = pg_strdup(str);
3969 :
3970 2921294 : for (s = result; *s != '\0'; s++)
3971 : {
3972 2695514 : if (*s == '\n' || *s == '\r')
3973 180 : *s = ' ';
3974 : }
3975 :
3976 225780 : return result;
3977 : }
3978 :
3979 : /*
3980 : * Write the file header for a custom-format archive
3981 : */
3982 : void
3983 66 : WriteHead(ArchiveHandle *AH)
3984 : {
3985 : struct tm crtm;
3986 :
3987 66 : AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3988 66 : AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3989 66 : AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3990 66 : AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3991 66 : AH->WriteBytePtr(AH, AH->intSize);
3992 66 : AH->WriteBytePtr(AH, AH->offSize);
3993 66 : AH->WriteBytePtr(AH, AH->format);
3994 66 : AH->WriteBytePtr(AH, AH->compression_spec.algorithm);
3995 66 : crtm = *localtime(&AH->createDate);
3996 66 : WriteInt(AH, crtm.tm_sec);
3997 66 : WriteInt(AH, crtm.tm_min);
3998 66 : WriteInt(AH, crtm.tm_hour);
3999 66 : WriteInt(AH, crtm.tm_mday);
4000 66 : WriteInt(AH, crtm.tm_mon);
4001 66 : WriteInt(AH, crtm.tm_year);
4002 66 : WriteInt(AH, crtm.tm_isdst);
4003 66 : WriteStr(AH, PQdb(AH->connection));
4004 66 : WriteStr(AH, AH->public.remoteVersionStr);
4005 66 : WriteStr(AH, PG_VERSION);
4006 66 : }
4007 :
4008 : void
4009 80 : ReadHead(ArchiveHandle *AH)
4010 : {
4011 : char *errmsg;
4012 : char vmaj,
4013 : vmin,
4014 : vrev;
4015 : int fmt;
4016 :
4017 : /*
4018 : * If we haven't already read the header, do so.
4019 : *
4020 : * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
4021 : * way to unify the cases?
4022 : */
4023 80 : if (!AH->readHeader)
4024 : {
4025 : char tmpMag[7];
4026 :
4027 80 : AH->ReadBufPtr(AH, tmpMag, 5);
4028 :
4029 80 : if (strncmp(tmpMag, "PGDMP", 5) != 0)
4030 0 : pg_fatal("did not find magic string in file header");
4031 : }
4032 :
4033 80 : vmaj = AH->ReadBytePtr(AH);
4034 80 : vmin = AH->ReadBytePtr(AH);
4035 :
4036 80 : if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4037 80 : vrev = AH->ReadBytePtr(AH);
4038 : else
4039 0 : vrev = 0;
4040 :
4041 80 : AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
4042 :
4043 80 : if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4044 0 : pg_fatal("unsupported version (%d.%d) in file header",
4045 : vmaj, vmin);
4046 :
4047 80 : AH->intSize = AH->ReadBytePtr(AH);
4048 80 : if (AH->intSize > 32)
4049 0 : pg_fatal("sanity check on integer size (%lu) failed",
4050 : (unsigned long) AH->intSize);
4051 :
4052 80 : if (AH->intSize > sizeof(int))
4053 0 : pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4054 :
4055 80 : if (AH->version >= K_VERS_1_7)
4056 80 : AH->offSize = AH->ReadBytePtr(AH);
4057 : else
4058 0 : AH->offSize = AH->intSize;
4059 :
4060 80 : fmt = AH->ReadBytePtr(AH);
4061 :
4062 80 : if (AH->format != fmt)
4063 0 : pg_fatal("expected format (%d) differs from format found in file (%d)",
4064 : AH->format, fmt);
4065 :
4066 80 : if (AH->version >= K_VERS_1_15)
4067 80 : AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
4068 0 : else if (AH->version >= K_VERS_1_2)
4069 : {
4070 : /* Guess the compression method based on the level */
4071 0 : if (AH->version < K_VERS_1_4)
4072 0 : AH->compression_spec.level = AH->ReadBytePtr(AH);
4073 : else
4074 0 : AH->compression_spec.level = ReadInt(AH);
4075 :
4076 0 : if (AH->compression_spec.level != 0)
4077 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4078 : }
4079 : else
4080 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4081 :
4082 80 : errmsg = supports_compression(AH->compression_spec);
4083 80 : if (errmsg)
4084 : {
4085 0 : pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4086 : errmsg);
4087 0 : pg_free(errmsg);
4088 : }
4089 :
4090 80 : if (AH->version >= K_VERS_1_4)
4091 : {
4092 : struct tm crtm;
4093 :
4094 80 : crtm.tm_sec = ReadInt(AH);
4095 80 : crtm.tm_min = ReadInt(AH);
4096 80 : crtm.tm_hour = ReadInt(AH);
4097 80 : crtm.tm_mday = ReadInt(AH);
4098 80 : crtm.tm_mon = ReadInt(AH);
4099 80 : crtm.tm_year = ReadInt(AH);
4100 80 : crtm.tm_isdst = ReadInt(AH);
4101 :
4102 : /*
4103 : * Newer versions of glibc have mktime() report failure if tm_isdst is
4104 : * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4105 : * TZ=UTC. This is problematic when restoring an archive under a
4106 : * different timezone setting. If we get a failure, try again with
4107 : * tm_isdst set to -1 ("don't know").
4108 : *
4109 : * XXX with or without this hack, we reconstruct createDate
4110 : * incorrectly when the prevailing timezone is different from
4111 : * pg_dump's. Next time we bump the archive version, we should flush
4112 : * this representation and store a plain seconds-since-the-Epoch
4113 : * timestamp instead.
4114 : */
4115 80 : AH->createDate = mktime(&crtm);
4116 80 : if (AH->createDate == (time_t) -1)
4117 : {
4118 0 : crtm.tm_isdst = -1;
4119 0 : AH->createDate = mktime(&crtm);
4120 0 : if (AH->createDate == (time_t) -1)
4121 0 : pg_log_warning("invalid creation date in header");
4122 : }
4123 : }
4124 :
4125 80 : if (AH->version >= K_VERS_1_4)
4126 : {
4127 80 : AH->archdbname = ReadStr(AH);
4128 : }
4129 :
4130 80 : if (AH->version >= K_VERS_1_10)
4131 : {
4132 80 : AH->archiveRemoteVersion = ReadStr(AH);
4133 80 : AH->archiveDumpVersion = ReadStr(AH);
4134 : }
4135 80 : }
4136 :
4137 :
4138 : /*
4139 : * checkSeek
4140 : * check to see if ftell/fseek can be performed.
4141 : */
4142 : bool
4143 104 : checkSeek(FILE *fp)
4144 : {
4145 : pgoff_t tpos;
4146 :
4147 : /* Check that ftello works on this file */
4148 104 : tpos = ftello(fp);
4149 104 : if (tpos < 0)
4150 2 : return false;
4151 :
4152 : /*
4153 : * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4154 : * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4155 : * successful no-op even on files that are otherwise unseekable.
4156 : */
4157 102 : if (fseeko(fp, tpos, SEEK_SET) != 0)
4158 0 : return false;
4159 :
4160 102 : return true;
4161 : }
4162 :
4163 :
4164 : /*
4165 : * dumpTimestamp
4166 : */
4167 : static void
4168 92 : dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4169 : {
4170 : char buf[64];
4171 :
4172 92 : if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4173 92 : ahprintf(AH, "-- %s %s\n\n", msg, buf);
4174 92 : }
4175 :
4176 : /*
4177 : * Main engine for parallel restore.
4178 : *
4179 : * Parallel restore is done in three phases. In this first phase,
4180 : * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4181 : * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4182 : * PRE_DATA items other than ACLs.) Entries we can't process now are
4183 : * added to the pending_list for later phases to deal with.
4184 : */
4185 : static void
4186 8 : restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
4187 : {
4188 : bool skipped_some;
4189 : TocEntry *next_work_item;
4190 :
4191 8 : pg_log_debug("entering restore_toc_entries_prefork");
4192 :
4193 : /* Adjust dependency information */
4194 8 : fix_dependencies(AH);
4195 :
4196 : /*
4197 : * Do all the early stuff in a single connection in the parent. There's no
4198 : * great point in running it in parallel, in fact it will actually run
4199 : * faster in a single connection because we avoid all the connection and
4200 : * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4201 : * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4202 : * not risk trying to process them out-of-order.
4203 : *
4204 : * Stuff that we can't do immediately gets added to the pending_list.
4205 : * Note: we don't yet filter out entries that aren't going to be restored.
4206 : * They might participate in dependency chains connecting entries that
4207 : * should be restored, so we treat them as live until we actually process
4208 : * them.
4209 : *
4210 : * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4211 : * before DATA items, and all DATA items before POST_DATA items. That is
4212 : * not certain to be true in older archives, though, and in any case use
4213 : * of a list file would destroy that ordering (cf. SortTocFromFile). So
4214 : * this loop cannot assume that it holds.
4215 : */
4216 8 : AH->restorePass = RESTORE_PASS_MAIN;
4217 8 : skipped_some = false;
4218 276 : for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4219 : {
4220 268 : bool do_now = true;
4221 :
4222 268 : if (next_work_item->section != SECTION_PRE_DATA)
4223 : {
4224 : /* DATA and POST_DATA items are just ignored for now */
4225 168 : if (next_work_item->section == SECTION_DATA ||
4226 96 : next_work_item->section == SECTION_POST_DATA)
4227 : {
4228 168 : do_now = false;
4229 168 : skipped_some = true;
4230 : }
4231 : else
4232 : {
4233 : /*
4234 : * SECTION_NONE items, such as comments, can be processed now
4235 : * if we are still in the PRE_DATA part of the archive. Once
4236 : * we've skipped any items, we have to consider whether the
4237 : * comment's dependencies are satisfied, so skip it for now.
4238 : */
4239 0 : if (skipped_some)
4240 0 : do_now = false;
4241 : }
4242 : }
4243 :
4244 : /*
4245 : * Also skip items that need to be forced into later passes. We need
4246 : * not set skipped_some in this case, since by assumption no main-pass
4247 : * items could depend on these.
4248 : */
4249 268 : if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4250 0 : do_now = false;
4251 :
4252 268 : if (do_now)
4253 : {
4254 : /* OK, restore the item and update its dependencies */
4255 100 : pg_log_info("processing item %d %s %s",
4256 : next_work_item->dumpId,
4257 : next_work_item->desc, next_work_item->tag);
4258 :
4259 100 : (void) restore_toc_entry(AH, next_work_item, false);
4260 :
4261 : /* Reduce dependencies, but don't move anything to ready_heap */
4262 100 : reduce_dependencies(AH, next_work_item, NULL);
4263 : }
4264 : else
4265 : {
4266 : /* Nope, so add it to pending_list */
4267 168 : pending_list_append(pending_list, next_work_item);
4268 : }
4269 : }
4270 :
4271 : /*
4272 : * In --transaction-size mode, we must commit the open transaction before
4273 : * dropping the database connection. This also ensures that child workers
4274 : * can see the objects we've created so far.
4275 : */
4276 8 : if (AH->public.ropt->txn_size > 0)
4277 0 : CommitTransaction(&AH->public);
4278 :
4279 : /*
4280 : * Now close parent connection in prep for parallel steps. We do this
4281 : * mainly to ensure that we don't exceed the specified number of parallel
4282 : * connections.
4283 : */
4284 8 : DisconnectDatabase(&AH->public);
4285 :
4286 : /* blow away any transient state from the old connection */
4287 8 : free(AH->currUser);
4288 8 : AH->currUser = NULL;
4289 8 : free(AH->currSchema);
4290 8 : AH->currSchema = NULL;
4291 8 : free(AH->currTablespace);
4292 8 : AH->currTablespace = NULL;
4293 8 : free(AH->currTableAm);
4294 8 : AH->currTableAm = NULL;
4295 8 : }
4296 :
4297 : /*
4298 : * Main engine for parallel restore.
4299 : *
4300 : * Parallel restore is done in three phases. In this second phase,
4301 : * we process entries by dispatching them to parallel worker children
4302 : * (processes on Unix, threads on Windows), each of which connects
4303 : * separately to the database. Inter-entry dependencies are respected,
4304 : * and so is the RestorePass multi-pass structure. When we can no longer
4305 : * make any entries ready to process, we exit. Normally, there will be
4306 : * nothing left to do; but if there is, the third phase will mop up.
4307 : */
4308 : static void
4309 8 : restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4310 : TocEntry *pending_list)
4311 : {
4312 : binaryheap *ready_heap;
4313 : TocEntry *next_work_item;
4314 :
4315 8 : pg_log_debug("entering restore_toc_entries_parallel");
4316 :
4317 : /* Set up ready_heap with enough room for all known TocEntrys */
4318 8 : ready_heap = binaryheap_allocate(AH->tocCount,
4319 : TocEntrySizeCompareBinaryheap,
4320 : NULL);
4321 :
4322 : /*
4323 : * The pending_list contains all items that we need to restore. Move all
4324 : * items that are available to process immediately into the ready_heap.
4325 : * After this setup, the pending list is everything that needs to be done
4326 : * but is blocked by one or more dependencies, while the ready heap
4327 : * contains items that have no remaining dependencies and are OK to
4328 : * process in the current restore pass.
4329 : */
4330 8 : AH->restorePass = RESTORE_PASS_MAIN;
4331 8 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4332 :
4333 : /*
4334 : * main parent loop
4335 : *
4336 : * Keep going until there is no worker still running AND there is no work
4337 : * left to be done. Note invariant: at top of loop, there should always
4338 : * be at least one worker available to dispatch a job to.
4339 : */
4340 8 : pg_log_info("entering main parallel loop");
4341 :
4342 : for (;;)
4343 : {
4344 : /* Look for an item ready to be dispatched to a worker */
4345 216 : next_work_item = pop_next_work_item(ready_heap, pstate);
4346 216 : if (next_work_item != NULL)
4347 : {
4348 : /* If not to be restored, don't waste time launching a worker */
4349 168 : if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
4350 : {
4351 0 : pg_log_info("skipping item %d %s %s",
4352 : next_work_item->dumpId,
4353 : next_work_item->desc, next_work_item->tag);
4354 : /* Update its dependencies as though we'd completed it */
4355 0 : reduce_dependencies(AH, next_work_item, ready_heap);
4356 : /* Loop around to see if anything else can be dispatched */
4357 0 : continue;
4358 : }
4359 :
4360 168 : pg_log_info("launching item %d %s %s",
4361 : next_work_item->dumpId,
4362 : next_work_item->desc, next_work_item->tag);
4363 :
4364 : /* Dispatch to some worker */
4365 168 : DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4366 : mark_restore_job_done, ready_heap);
4367 : }
4368 48 : else if (IsEveryWorkerIdle(pstate))
4369 : {
4370 : /*
4371 : * Nothing is ready and no worker is running, so we're done with
4372 : * the current pass or maybe with the whole process.
4373 : */
4374 24 : if (AH->restorePass == RESTORE_PASS_LAST)
4375 8 : break; /* No more parallel processing is possible */
4376 :
4377 : /* Advance to next restore pass */
4378 16 : AH->restorePass++;
4379 : /* That probably allows some stuff to be made ready */
4380 16 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4381 : /* Loop around to see if anything's now ready */
4382 16 : continue;
4383 : }
4384 : else
4385 : {
4386 : /*
4387 : * We have nothing ready, but at least one child is working, so
4388 : * wait for some subjob to finish.
4389 : */
4390 : }
4391 :
4392 : /*
4393 : * Before dispatching another job, check to see if anything has
4394 : * finished. We should check every time through the loop so as to
4395 : * reduce dependencies as soon as possible. If we were unable to
4396 : * dispatch any job this time through, wait until some worker finishes
4397 : * (and, hopefully, unblocks some pending item). If we did dispatch
4398 : * something, continue as soon as there's at least one idle worker.
4399 : * Note that in either case, there's guaranteed to be at least one
4400 : * idle worker when we return to the top of the loop. This ensures we
4401 : * won't block inside DispatchJobForTocEntry, which would be
4402 : * undesirable: we'd rather postpone dispatching until we see what's
4403 : * been unblocked by finished jobs.
4404 : */
4405 192 : WaitForWorkers(AH, pstate,
4406 : next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4407 : }
4408 :
4409 : /* There should now be nothing in ready_heap. */
4410 : Assert(binaryheap_empty(ready_heap));
4411 :
4412 8 : binaryheap_free(ready_heap);
4413 :
4414 8 : pg_log_info("finished main parallel loop");
4415 8 : }
4416 :
4417 : /*
4418 : * Main engine for parallel restore.
4419 : *
4420 : * Parallel restore is done in three phases. In this third phase,
4421 : * we mop up any remaining TOC entries by processing them serially.
4422 : * This phase normally should have nothing to do, but if we've somehow
4423 : * gotten stuck due to circular dependencies or some such, this provides
4424 : * at least some chance of completing the restore successfully.
4425 : */
4426 : static void
4427 8 : restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4428 : {
4429 8 : RestoreOptions *ropt = AH->public.ropt;
4430 : TocEntry *te;
4431 :
4432 8 : pg_log_debug("entering restore_toc_entries_postfork");
4433 :
4434 : /*
4435 : * Now reconnect the single parent connection.
4436 : */
4437 8 : ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4438 :
4439 : /* re-establish fixed state */
4440 8 : _doSetFixedOutputState(AH);
4441 :
4442 : /*
4443 : * Make sure there is no work left due to, say, circular dependencies, or
4444 : * some other pathological condition. If so, do it in the single parent
4445 : * connection. We don't sweat about RestorePass ordering; it's likely we
4446 : * already violated that.
4447 : */
4448 8 : for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4449 : {
4450 0 : pg_log_info("processing missed item %d %s %s",
4451 : te->dumpId, te->desc, te->tag);
4452 0 : (void) restore_toc_entry(AH, te, false);
4453 : }
4454 8 : }
4455 :
4456 : /*
4457 : * Check if te1 has an exclusive lock requirement for an item that te2 also
4458 : * requires, whether or not te2's requirement is for an exclusive lock.
4459 : */
4460 : static bool
4461 610 : has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4462 : {
4463 : int j,
4464 : k;
4465 :
4466 1086 : for (j = 0; j < te1->nLockDeps; j++)
4467 : {
4468 1724 : for (k = 0; k < te2->nDeps; k++)
4469 : {
4470 1248 : if (te1->lockDeps[j] == te2->dependencies[k])
4471 16 : return true;
4472 : }
4473 : }
4474 594 : return false;
4475 : }
4476 :
4477 :
4478 : /*
4479 : * Initialize the header of the pending-items list.
4480 : *
4481 : * This is a circular list with a dummy TocEntry as header, just like the
4482 : * main TOC list; but we use separate list links so that an entry can be in
4483 : * the main TOC list as well as in the pending list.
4484 : */
4485 : static void
4486 8 : pending_list_header_init(TocEntry *l)
4487 : {
4488 8 : l->pending_prev = l->pending_next = l;
4489 8 : }
4490 :
4491 : /* Append te to the end of the pending-list headed by l */
4492 : static void
4493 168 : pending_list_append(TocEntry *l, TocEntry *te)
4494 : {
4495 168 : te->pending_prev = l->pending_prev;
4496 168 : l->pending_prev->pending_next = te;
4497 168 : l->pending_prev = te;
4498 168 : te->pending_next = l;
4499 168 : }
4500 :
4501 : /* Remove te from the pending-list */
4502 : static void
4503 168 : pending_list_remove(TocEntry *te)
4504 : {
4505 168 : te->pending_prev->pending_next = te->pending_next;
4506 168 : te->pending_next->pending_prev = te->pending_prev;
4507 168 : te->pending_prev = NULL;
4508 168 : te->pending_next = NULL;
4509 168 : }
4510 :
4511 :
4512 : /* qsort comparator for sorting TocEntries by dataLength */
4513 : static int
4514 2090 : TocEntrySizeCompareQsort(const void *p1, const void *p2)
4515 : {
4516 2090 : const TocEntry *te1 = *(const TocEntry *const *) p1;
4517 2090 : const TocEntry *te2 = *(const TocEntry *const *) p2;
4518 :
4519 : /* Sort by decreasing dataLength */
4520 2090 : if (te1->dataLength > te2->dataLength)
4521 290 : return -1;
4522 1800 : if (te1->dataLength < te2->dataLength)
4523 304 : return 1;
4524 :
4525 : /* For equal dataLengths, sort by dumpId, just to be stable */
4526 1496 : if (te1->dumpId < te2->dumpId)
4527 542 : return -1;
4528 954 : if (te1->dumpId > te2->dumpId)
4529 938 : return 1;
4530 :
4531 16 : return 0;
4532 : }
4533 :
4534 : /* binaryheap comparator for sorting TocEntries by dataLength */
4535 : static int
4536 1126 : TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4537 : {
4538 : /* return opposite of qsort comparator for max-heap */
4539 1126 : return -TocEntrySizeCompareQsort(&p1, &p2);
4540 : }
4541 :
4542 :
4543 : /*
4544 : * Move all immediately-ready items from pending_list to ready_heap.
4545 : *
4546 : * Items are considered ready if they have no remaining dependencies and
4547 : * they belong in the current restore pass. (See also reduce_dependencies,
4548 : * which applies the same logic one-at-a-time.)
4549 : */
4550 : static void
4551 24 : move_to_ready_heap(TocEntry *pending_list,
4552 : binaryheap *ready_heap,
4553 : RestorePass pass)
4554 : {
4555 : TocEntry *te;
4556 : TocEntry *next_te;
4557 :
4558 192 : for (te = pending_list->pending_next; te != pending_list; te = next_te)
4559 : {
4560 : /* must save list link before possibly removing te from list */
4561 168 : next_te = te->pending_next;
4562 :
4563 248 : if (te->depCount == 0 &&
4564 80 : _tocEntryRestorePass(te) == pass)
4565 : {
4566 : /* Remove it from pending_list ... */
4567 80 : pending_list_remove(te);
4568 : /* ... and add to ready_heap */
4569 80 : binaryheap_add(ready_heap, te);
4570 : }
4571 : }
4572 24 : }
4573 :
4574 : /*
4575 : * Find the next work item (if any) that is capable of being run now,
4576 : * and remove it from the ready_heap.
4577 : *
4578 : * Returns the item, or NULL if nothing is runnable.
4579 : *
4580 : * To qualify, the item must have no remaining dependencies
4581 : * and no requirements for locks that are incompatible with
4582 : * items currently running. Items in the ready_heap are known to have
4583 : * no remaining dependencies, but we have to check for lock conflicts.
4584 : */
4585 : static TocEntry *
4586 216 : pop_next_work_item(binaryheap *ready_heap,
4587 : ParallelState *pstate)
4588 : {
4589 : /*
4590 : * Search the ready_heap until we find a suitable item. Note that we do a
4591 : * sequential scan through the heap nodes, so even though we will first
4592 : * try to choose the highest-priority item, we might end up picking
4593 : * something with a much lower priority. However, we expect that we will
4594 : * typically be able to pick one of the first few items, which should
4595 : * usually have a relatively high priority.
4596 : */
4597 232 : for (int i = 0; i < binaryheap_size(ready_heap); i++)
4598 : {
4599 184 : TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4600 184 : bool conflicts = false;
4601 :
4602 : /*
4603 : * Check to see if the item would need exclusive lock on something
4604 : * that a currently running item also needs lock on, or vice versa. If
4605 : * so, we don't want to schedule them together.
4606 : */
4607 690 : for (int k = 0; k < pstate->numWorkers; k++)
4608 : {
4609 522 : TocEntry *running_te = pstate->te[k];
4610 :
4611 522 : if (running_te == NULL)
4612 214 : continue;
4613 610 : if (has_lock_conflicts(te, running_te) ||
4614 302 : has_lock_conflicts(running_te, te))
4615 : {
4616 16 : conflicts = true;
4617 16 : break;
4618 : }
4619 : }
4620 :
4621 184 : if (conflicts)
4622 16 : continue;
4623 :
4624 : /* passed all tests, so this item can run */
4625 168 : binaryheap_remove_node(ready_heap, i);
4626 168 : return te;
4627 : }
4628 :
4629 48 : pg_log_debug("no item ready");
4630 48 : return NULL;
4631 : }
4632 :
4633 :
4634 : /*
4635 : * Restore a single TOC item in parallel with others
4636 : *
4637 : * this is run in the worker, i.e. in a thread (Windows) or a separate process
4638 : * (everything else). A worker process executes several such work items during
4639 : * a parallel backup or restore. Once we terminate here and report back that
4640 : * our work is finished, the leader process will assign us a new work item.
4641 : */
4642 : int
4643 168 : parallel_restore(ArchiveHandle *AH, TocEntry *te)
4644 : {
4645 : int status;
4646 :
4647 : Assert(AH->connection != NULL);
4648 :
4649 : /* Count only errors associated with this TOC entry */
4650 168 : AH->public.n_errors = 0;
4651 :
4652 : /* Restore the TOC item */
4653 168 : status = restore_toc_entry(AH, te, true);
4654 :
4655 168 : return status;
4656 : }
4657 :
4658 :
4659 : /*
4660 : * Callback function that's invoked in the leader process after a step has
4661 : * been parallel restored.
4662 : *
4663 : * Update status and reduce the dependency count of any dependent items.
4664 : */
4665 : static void
4666 168 : mark_restore_job_done(ArchiveHandle *AH,
4667 : TocEntry *te,
4668 : int status,
4669 : void *callback_data)
4670 : {
4671 168 : binaryheap *ready_heap = (binaryheap *) callback_data;
4672 :
4673 168 : pg_log_info("finished item %d %s %s",
4674 : te->dumpId, te->desc, te->tag);
4675 :
4676 168 : if (status == WORKER_CREATE_DONE)
4677 0 : mark_create_done(AH, te);
4678 168 : else if (status == WORKER_INHIBIT_DATA)
4679 : {
4680 0 : inhibit_data_for_failed_table(AH, te);
4681 0 : AH->public.n_errors++;
4682 : }
4683 168 : else if (status == WORKER_IGNORED_ERRORS)
4684 0 : AH->public.n_errors++;
4685 168 : else if (status != 0)
4686 0 : pg_fatal("worker process failed: exit code %d",
4687 : status);
4688 :
4689 168 : reduce_dependencies(AH, te, ready_heap);
4690 168 : }
4691 :
4692 :
4693 : /*
4694 : * Process the dependency information into a form useful for parallel restore.
4695 : *
4696 : * This function takes care of fixing up some missing or badly designed
4697 : * dependencies, and then prepares subsidiary data structures that will be
4698 : * used in the main parallel-restore logic, including:
4699 : * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4700 : * 2. We set up depCount fields that are the number of as-yet-unprocessed
4701 : * dependencies for each TOC entry.
4702 : *
4703 : * We also identify locking dependencies so that we can avoid trying to
4704 : * schedule conflicting items at the same time.
4705 : */
4706 : static void
4707 8 : fix_dependencies(ArchiveHandle *AH)
4708 : {
4709 : TocEntry *te;
4710 : int i;
4711 :
4712 : /*
4713 : * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4714 : * items are marked as not being in any parallel-processing list.
4715 : */
4716 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4717 : {
4718 268 : te->depCount = te->nDeps;
4719 268 : te->revDeps = NULL;
4720 268 : te->nRevDeps = 0;
4721 268 : te->pending_prev = NULL;
4722 268 : te->pending_next = NULL;
4723 : }
4724 :
4725 : /*
4726 : * POST_DATA items that are shown as depending on a table need to be
4727 : * re-pointed to depend on that table's data, instead. This ensures they
4728 : * won't get scheduled until the data has been loaded.
4729 : */
4730 8 : repoint_table_dependencies(AH);
4731 :
4732 : /*
4733 : * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4734 : * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4735 : * one BLOB COMMENTS in such files.)
4736 : */
4737 8 : if (AH->version < K_VERS_1_11)
4738 : {
4739 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4740 : {
4741 0 : if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4742 : {
4743 : TocEntry *te2;
4744 :
4745 0 : for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4746 : {
4747 0 : if (strcmp(te2->desc, "BLOBS") == 0)
4748 : {
4749 0 : te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4750 0 : te->dependencies[0] = te2->dumpId;
4751 0 : te->nDeps++;
4752 0 : te->depCount++;
4753 0 : break;
4754 : }
4755 : }
4756 0 : break;
4757 : }
4758 : }
4759 : }
4760 :
4761 : /*
4762 : * At this point we start to build the revDeps reverse-dependency arrays,
4763 : * so all changes of dependencies must be complete.
4764 : */
4765 :
4766 : /*
4767 : * Count the incoming dependencies for each item. Also, it is possible
4768 : * that the dependencies list items that are not in the archive at all
4769 : * (that should not happen in 9.2 and later, but is highly likely in older
4770 : * archives). Subtract such items from the depCounts.
4771 : */
4772 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4773 : {
4774 840 : for (i = 0; i < te->nDeps; i++)
4775 : {
4776 572 : DumpId depid = te->dependencies[i];
4777 :
4778 572 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4779 460 : AH->tocsByDumpId[depid]->nRevDeps++;
4780 : else
4781 112 : te->depCount--;
4782 : }
4783 : }
4784 :
4785 : /*
4786 : * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4787 : * it as a counter below.
4788 : */
4789 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4790 : {
4791 268 : if (te->nRevDeps > 0)
4792 108 : te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4793 268 : te->nRevDeps = 0;
4794 : }
4795 :
4796 : /*
4797 : * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4798 : * better agree with the loops above.
4799 : */
4800 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4801 : {
4802 840 : for (i = 0; i < te->nDeps; i++)
4803 : {
4804 572 : DumpId depid = te->dependencies[i];
4805 :
4806 572 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4807 : {
4808 460 : TocEntry *otherte = AH->tocsByDumpId[depid];
4809 :
4810 460 : otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4811 : }
4812 : }
4813 : }
4814 :
4815 : /*
4816 : * Lastly, work out the locking dependencies.
4817 : */
4818 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4819 : {
4820 268 : te->lockDeps = NULL;
4821 268 : te->nLockDeps = 0;
4822 268 : identify_locking_dependencies(AH, te);
4823 : }
4824 8 : }
4825 :
4826 : /*
4827 : * Change dependencies on table items to depend on table data items instead,
4828 : * but only in POST_DATA items.
4829 : *
4830 : * Also, for any item having such dependency(s), set its dataLength to the
4831 : * largest dataLength of the table data items it depends on. This ensures
4832 : * that parallel restore will prioritize larger jobs (index builds, FK
4833 : * constraint checks, etc) over smaller ones, avoiding situations where we
4834 : * end a restore with only one active job working on a large table.
4835 : */
4836 : static void
4837 8 : repoint_table_dependencies(ArchiveHandle *AH)
4838 : {
4839 : TocEntry *te;
4840 : int i;
4841 : DumpId olddep;
4842 :
4843 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4844 : {
4845 268 : if (te->section != SECTION_POST_DATA)
4846 172 : continue;
4847 464 : for (i = 0; i < te->nDeps; i++)
4848 : {
4849 368 : olddep = te->dependencies[i];
4850 368 : if (olddep <= AH->maxDumpId &&
4851 368 : AH->tableDataId[olddep] != 0)
4852 : {
4853 124 : DumpId tabledataid = AH->tableDataId[olddep];
4854 124 : TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4855 :
4856 124 : te->dependencies[i] = tabledataid;
4857 124 : te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4858 124 : pg_log_debug("transferring dependency %d -> %d to %d",
4859 : te->dumpId, olddep, tabledataid);
4860 : }
4861 : }
4862 : }
4863 8 : }
4864 :
4865 : /*
4866 : * Identify which objects we'll need exclusive lock on in order to restore
4867 : * the given TOC entry (*other* than the one identified by the TOC entry
4868 : * itself). Record their dump IDs in the entry's lockDeps[] array.
4869 : */
4870 : static void
4871 268 : identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4872 : {
4873 : DumpId *lockids;
4874 : int nlockids;
4875 : int i;
4876 :
4877 : /*
4878 : * We only care about this for POST_DATA items. PRE_DATA items are not
4879 : * run in parallel, and DATA items are all independent by assumption.
4880 : */
4881 268 : if (te->section != SECTION_POST_DATA)
4882 172 : return;
4883 :
4884 : /* Quick exit if no dependencies at all */
4885 96 : if (te->nDeps == 0)
4886 0 : return;
4887 :
4888 : /*
4889 : * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4890 : * and hence require exclusive lock. However, we know that CREATE INDEX
4891 : * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4892 : * category too ... but today is not that day.)
4893 : */
4894 96 : if (strcmp(te->desc, "INDEX") == 0)
4895 0 : return;
4896 :
4897 : /*
4898 : * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4899 : * item listed among its dependencies. Originally all of these would have
4900 : * been TABLE items, but repoint_table_dependencies would have repointed
4901 : * them to the TABLE DATA items if those are present (which they might not
4902 : * be, eg in a schema-only dump). Note that all of the entries we are
4903 : * processing here are POST_DATA; otherwise there might be a significant
4904 : * difference between a dependency on a table and a dependency on its
4905 : * data, so that closer analysis would be needed here.
4906 : */
4907 96 : lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4908 96 : nlockids = 0;
4909 464 : for (i = 0; i < te->nDeps; i++)
4910 : {
4911 368 : DumpId depid = te->dependencies[i];
4912 :
4913 368 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4914 296 : ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4915 172 : strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4916 164 : lockids[nlockids++] = depid;
4917 : }
4918 :
4919 96 : if (nlockids == 0)
4920 : {
4921 36 : free(lockids);
4922 36 : return;
4923 : }
4924 :
4925 60 : te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4926 60 : te->nLockDeps = nlockids;
4927 : }
4928 :
4929 : /*
4930 : * Remove the specified TOC entry from the depCounts of items that depend on
4931 : * it, thereby possibly making them ready-to-run. Any pending item that
4932 : * becomes ready should be moved to the ready_heap, if that's provided.
4933 : */
4934 : static void
4935 268 : reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4936 : binaryheap *ready_heap)
4937 : {
4938 : int i;
4939 :
4940 268 : pg_log_debug("reducing dependencies for %d", te->dumpId);
4941 :
4942 728 : for (i = 0; i < te->nRevDeps; i++)
4943 : {
4944 460 : TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4945 :
4946 : Assert(otherte->depCount > 0);
4947 460 : otherte->depCount--;
4948 :
4949 : /*
4950 : * It's ready if it has no remaining dependencies, and it belongs in
4951 : * the current restore pass, and it is currently a member of the
4952 : * pending list (that check is needed to prevent double restore in
4953 : * some cases where a list-file forces out-of-order restoring).
4954 : * However, if ready_heap == NULL then caller doesn't want any list
4955 : * memberships changed.
4956 : */
4957 460 : if (otherte->depCount == 0 &&
4958 224 : _tocEntryRestorePass(otherte) == AH->restorePass &&
4959 224 : otherte->pending_prev != NULL &&
4960 : ready_heap != NULL)
4961 : {
4962 : /* Remove it from pending list ... */
4963 88 : pending_list_remove(otherte);
4964 : /* ... and add to ready_heap */
4965 88 : binaryheap_add(ready_heap, otherte);
4966 : }
4967 : }
4968 268 : }
4969 :
4970 : /*
4971 : * Set the created flag on the DATA member corresponding to the given
4972 : * TABLE member
4973 : */
4974 : static void
4975 9912 : mark_create_done(ArchiveHandle *AH, TocEntry *te)
4976 : {
4977 9912 : if (AH->tableDataId[te->dumpId] != 0)
4978 : {
4979 7464 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4980 :
4981 7464 : ted->created = true;
4982 : }
4983 9912 : }
4984 :
4985 : /*
4986 : * Mark the DATA member corresponding to the given TABLE member
4987 : * as not wanted
4988 : */
4989 : static void
4990 0 : inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4991 : {
4992 0 : pg_log_info("table \"%s\" could not be created, will not restore its data",
4993 : te->tag);
4994 :
4995 0 : if (AH->tableDataId[te->dumpId] != 0)
4996 : {
4997 0 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4998 :
4999 0 : ted->reqs = 0;
5000 : }
5001 0 : }
5002 :
5003 : /*
5004 : * Clone and de-clone routines used in parallel restoration.
5005 : *
5006 : * Enough of the structure is cloned to ensure that there is no
5007 : * conflict between different threads each with their own clone.
5008 : */
5009 : ArchiveHandle *
5010 52 : CloneArchive(ArchiveHandle *AH)
5011 : {
5012 : ArchiveHandle *clone;
5013 :
5014 : /* Make a "flat" copy */
5015 52 : clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
5016 52 : memcpy(clone, AH, sizeof(ArchiveHandle));
5017 :
5018 : /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
5019 52 : clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
5020 52 : memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
5021 :
5022 : /* Handle format-independent fields */
5023 52 : memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
5024 :
5025 : /* The clone will have its own connection, so disregard connection state */
5026 52 : clone->connection = NULL;
5027 52 : clone->connCancel = NULL;
5028 52 : clone->currUser = NULL;
5029 52 : clone->currSchema = NULL;
5030 52 : clone->currTableAm = NULL;
5031 52 : clone->currTablespace = NULL;
5032 :
5033 : /* savedPassword must be local in case we change it while connecting */
5034 52 : if (clone->savedPassword)
5035 0 : clone->savedPassword = pg_strdup(clone->savedPassword);
5036 :
5037 : /* clone has its own error count, too */
5038 52 : clone->public.n_errors = 0;
5039 :
5040 : /* clones should not share lo_buf */
5041 52 : clone->lo_buf = NULL;
5042 :
5043 : /*
5044 : * Clone connections disregard --transaction-size; they must commit after
5045 : * each command so that the results are immediately visible to other
5046 : * workers.
5047 : */
5048 52 : clone->public.ropt->txn_size = 0;
5049 :
5050 : /*
5051 : * Connect our new clone object to the database, using the same connection
5052 : * parameters used for the original connection.
5053 : */
5054 52 : ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
5055 :
5056 : /* re-establish fixed state */
5057 52 : if (AH->mode == archModeRead)
5058 20 : _doSetFixedOutputState(clone);
5059 : /* in write case, setupDumpWorker will fix up connection state */
5060 :
5061 : /* Let the format-specific code have a chance too */
5062 52 : clone->ClonePtr(clone);
5063 :
5064 : Assert(clone->connection != NULL);
5065 52 : return clone;
5066 : }
5067 :
5068 : /*
5069 : * Release clone-local storage.
5070 : *
5071 : * Note: we assume any clone-local connection was already closed.
5072 : */
5073 : void
5074 52 : DeCloneArchive(ArchiveHandle *AH)
5075 : {
5076 : /* Should not have an open database connection */
5077 : Assert(AH->connection == NULL);
5078 :
5079 : /* Clear format-specific state */
5080 52 : AH->DeClonePtr(AH);
5081 :
5082 : /* Clear state allocated by CloneArchive */
5083 52 : if (AH->sqlparse.curCmd)
5084 6 : destroyPQExpBuffer(AH->sqlparse.curCmd);
5085 :
5086 : /* Clear any connection-local state */
5087 52 : free(AH->currUser);
5088 52 : free(AH->currSchema);
5089 52 : free(AH->currTablespace);
5090 52 : free(AH->currTableAm);
5091 52 : free(AH->savedPassword);
5092 :
5093 52 : free(AH);
5094 52 : }
|