Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_archiver.c
4 : *
5 : * Private implementation of the archiver routines.
6 : *
7 : * See the headers to pg_restore for more details.
8 : *
9 : * Copyright (c) 2000, Philip Warner
10 : * Rights are granted to use this software in any way so long
11 : * as this notice is not removed.
12 : *
13 : * The author is not responsible for loss or damages that may
14 : * result from its use.
15 : *
16 : *
17 : * IDENTIFICATION
18 : * src/bin/pg_dump/pg_backup_archiver.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres_fe.h"
23 :
24 : #include <ctype.h>
25 : #include <fcntl.h>
26 : #include <unistd.h>
27 : #include <sys/stat.h>
28 : #include <sys/wait.h>
29 : #ifdef WIN32
30 : #include <io.h>
31 : #endif
32 :
33 : #include "catalog/pg_class_d.h"
34 : #include "common/string.h"
35 : #include "compress_io.h"
36 : #include "dumputils.h"
37 : #include "fe_utils/string_utils.h"
38 : #include "lib/binaryheap.h"
39 : #include "lib/stringinfo.h"
40 : #include "libpq/libpq-fs.h"
41 : #include "parallel.h"
42 : #include "pg_backup_archiver.h"
43 : #include "pg_backup_db.h"
44 : #include "pg_backup_utils.h"
45 :
46 : #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
47 : #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
48 :
49 : #define TOC_PREFIX_NONE ""
50 : #define TOC_PREFIX_DATA "Data for "
51 : #define TOC_PREFIX_STATS "Statistics for "
52 :
53 : static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
54 : const pg_compress_specification compression_spec,
55 : bool dosync, ArchiveMode mode,
56 : SetupWorkerPtrType setupWorkerPtr,
57 : DataDirSyncMethod sync_method);
58 : static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
59 : static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
60 : static char *sanitize_line(const char *str, bool want_hyphen);
61 : static void _doSetFixedOutputState(ArchiveHandle *AH);
62 : static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
63 : static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
64 : static void _becomeUser(ArchiveHandle *AH, const char *user);
65 : static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
66 : static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
67 : static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
68 : static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
69 : static void _printTableAccessMethodNoStorage(ArchiveHandle *AH,
70 : TocEntry *te);
71 : static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
72 : static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
73 : static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
74 : static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
75 : static RestorePass _tocEntryRestorePass(ArchiveHandle *AH, TocEntry *te);
76 : static bool _tocEntryIsACL(TocEntry *te);
77 : static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
78 : static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
79 : static bool is_load_via_partition_root(TocEntry *te);
80 : static void buildTocEntryArrays(ArchiveHandle *AH);
81 : static void _moveBefore(TocEntry *pos, TocEntry *te);
82 : static int _discoverArchiveFormat(ArchiveHandle *AH);
83 :
84 : static int RestoringToDB(ArchiveHandle *AH);
85 : static void dump_lo_buf(ArchiveHandle *AH);
86 : static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
87 : static void SetOutput(ArchiveHandle *AH, const char *filename,
88 : const pg_compress_specification compression_spec);
89 : static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
90 : static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
91 :
92 : static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
93 : static void restore_toc_entries_prefork(ArchiveHandle *AH,
94 : TocEntry *pending_list);
95 : static void restore_toc_entries_parallel(ArchiveHandle *AH,
96 : ParallelState *pstate,
97 : TocEntry *pending_list);
98 : static void restore_toc_entries_postfork(ArchiveHandle *AH,
99 : TocEntry *pending_list);
100 : static void pending_list_header_init(TocEntry *l);
101 : static void pending_list_append(TocEntry *l, TocEntry *te);
102 : static void pending_list_remove(TocEntry *te);
103 : static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
104 : static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
105 : static void move_to_ready_heap(ArchiveHandle *AH,
106 : TocEntry *pending_list,
107 : binaryheap *ready_heap,
108 : RestorePass pass);
109 : static TocEntry *pop_next_work_item(binaryheap *ready_heap,
110 : ParallelState *pstate);
111 : static void mark_dump_job_done(ArchiveHandle *AH,
112 : TocEntry *te,
113 : int status,
114 : void *callback_data);
115 : static void mark_restore_job_done(ArchiveHandle *AH,
116 : TocEntry *te,
117 : int status,
118 : void *callback_data);
119 : static void fix_dependencies(ArchiveHandle *AH);
120 : static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
121 : static void repoint_table_dependencies(ArchiveHandle *AH);
122 : static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
123 : static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
124 : binaryheap *ready_heap);
125 : static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
126 : static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
127 :
128 : static void StrictNamesCheck(RestoreOptions *ropt);
129 :
130 :
131 : /*
132 : * Allocate a new DumpOptions block containing all default values.
133 : */
134 : DumpOptions *
135 108 : NewDumpOptions(void)
136 : {
137 108 : DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
138 :
139 108 : InitDumpOptions(opts);
140 108 : return opts;
141 : }
142 :
143 : /*
144 : * Initialize a DumpOptions struct to all default values
145 : */
146 : void
147 550 : InitDumpOptions(DumpOptions *opts)
148 : {
149 550 : memset(opts, 0, sizeof(DumpOptions));
150 : /* set any fields that shouldn't default to zeroes */
151 550 : opts->include_everything = true;
152 550 : opts->cparams.promptPassword = TRI_DEFAULT;
153 550 : opts->dumpSections = DUMP_UNSECTIONED;
154 550 : opts->dumpSchema = true;
155 550 : opts->dumpData = true;
156 550 : opts->dumpStatistics = true;
157 550 : }
158 :
159 : /*
160 : * Create a freshly allocated DumpOptions with options equivalent to those
161 : * found in the given RestoreOptions.
162 : */
163 : DumpOptions *
164 108 : dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
165 : {
166 108 : DumpOptions *dopt = NewDumpOptions();
167 :
168 : /* this is the inverse of what's at the end of pg_dump.c's main() */
169 108 : dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
170 108 : dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
171 108 : dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
172 108 : dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
173 108 : dopt->cparams.promptPassword = ropt->cparams.promptPassword;
174 108 : dopt->outputClean = ropt->dropSchema;
175 108 : dopt->dumpData = ropt->dumpData;
176 108 : dopt->dumpSchema = ropt->dumpSchema;
177 108 : dopt->dumpSections = ropt->dumpSections;
178 108 : dopt->dumpStatistics = ropt->dumpStatistics;
179 108 : dopt->if_exists = ropt->if_exists;
180 108 : dopt->column_inserts = ropt->column_inserts;
181 108 : dopt->aclsSkip = ropt->aclsSkip;
182 108 : dopt->outputSuperuser = ropt->superuser;
183 108 : dopt->outputCreateDB = ropt->createDB;
184 108 : dopt->outputNoOwner = ropt->noOwner;
185 108 : dopt->outputNoTableAm = ropt->noTableAm;
186 108 : dopt->outputNoTablespaces = ropt->noTablespace;
187 108 : dopt->disable_triggers = ropt->disable_triggers;
188 108 : dopt->use_setsessauth = ropt->use_setsessauth;
189 108 : dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
190 108 : dopt->dump_inserts = ropt->dump_inserts;
191 108 : dopt->no_comments = ropt->no_comments;
192 108 : dopt->no_policies = ropt->no_policies;
193 108 : dopt->no_publications = ropt->no_publications;
194 108 : dopt->no_security_labels = ropt->no_security_labels;
195 108 : dopt->no_subscriptions = ropt->no_subscriptions;
196 108 : dopt->lockWaitTimeout = ropt->lockWaitTimeout;
197 108 : dopt->include_everything = ropt->include_everything;
198 108 : dopt->enable_row_security = ropt->enable_row_security;
199 108 : dopt->sequence_data = ropt->sequence_data;
200 :
201 108 : return dopt;
202 : }
203 :
204 :
205 : /*
206 : * Wrapper functions.
207 : *
208 : * The objective is to make writing new formats and dumpers as simple
209 : * as possible, if necessary at the expense of extra function calls etc.
210 : *
211 : */
212 :
213 : /*
214 : * The dump worker setup needs lots of knowledge of the internals of pg_dump,
215 : * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
216 : * setup doesn't need to know anything much, so it's defined here.
217 : */
218 : static void
219 20 : setupRestoreWorker(Archive *AHX)
220 : {
221 20 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
222 :
223 20 : AH->ReopenPtr(AH);
224 20 : }
225 :
226 :
227 : /* Create a new archive */
228 : /* Public */
229 : Archive *
230 394 : CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
231 : const pg_compress_specification compression_spec,
232 : bool dosync, ArchiveMode mode,
233 : SetupWorkerPtrType setupDumpWorker,
234 : DataDirSyncMethod sync_method)
235 :
236 : {
237 394 : ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
238 : dosync, mode, setupDumpWorker, sync_method);
239 :
240 392 : return (Archive *) AH;
241 : }
242 :
243 : /* Open an existing archive */
244 : /* Public */
245 : Archive *
246 104 : OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
247 : {
248 : ArchiveHandle *AH;
249 104 : pg_compress_specification compression_spec = {0};
250 :
251 104 : compression_spec.algorithm = PG_COMPRESSION_NONE;
252 104 : AH = _allocAH(FileSpec, fmt, compression_spec, true,
253 : archModeRead, setupRestoreWorker,
254 : DATA_DIR_SYNC_METHOD_FSYNC);
255 :
256 104 : return (Archive *) AH;
257 : }
258 :
259 : /* Public */
260 : void
261 456 : CloseArchive(Archive *AHX)
262 : {
263 456 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
264 :
265 456 : AH->ClosePtr(AH);
266 :
267 : /* Close the output */
268 456 : errno = 0;
269 456 : if (!EndCompressFileHandle(AH->OF))
270 0 : pg_fatal("could not close output file: %m");
271 456 : }
272 :
273 : /* Public */
274 : void
275 858 : SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
276 : {
277 : /* Caller can omit dump options, in which case we synthesize them */
278 858 : if (dopt == NULL && ropt != NULL)
279 108 : dopt = dumpOptionsFromRestoreOptions(ropt);
280 :
281 : /* Save options for later access */
282 858 : AH->dopt = dopt;
283 858 : AH->ropt = ropt;
284 858 : }
285 :
286 : /* Public */
287 : void
288 450 : ProcessArchiveRestoreOptions(Archive *AHX)
289 : {
290 450 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
291 450 : RestoreOptions *ropt = AH->public.ropt;
292 : TocEntry *te;
293 : teSection curSection;
294 :
295 : /* Decide which TOC entries will be dumped/restored, and mark them */
296 450 : curSection = SECTION_PRE_DATA;
297 101356 : for (te = AH->toc->next; te != AH->toc; te = te->next)
298 : {
299 : /*
300 : * When writing an archive, we also take this opportunity to check
301 : * that we have generated the entries in a sane order that respects
302 : * the section divisions. When reading, don't complain, since buggy
303 : * old versions of pg_dump might generate out-of-order archives.
304 : */
305 100906 : if (AH->mode != archModeRead)
306 : {
307 87182 : switch (te->section)
308 : {
309 16824 : case SECTION_NONE:
310 : /* ok to be anywhere */
311 16824 : break;
312 37924 : case SECTION_PRE_DATA:
313 37924 : if (curSection != SECTION_PRE_DATA)
314 0 : pg_log_warning("archive items not in correct section order");
315 37924 : break;
316 16556 : case SECTION_DATA:
317 16556 : if (curSection == SECTION_POST_DATA)
318 0 : pg_log_warning("archive items not in correct section order");
319 16556 : break;
320 15878 : case SECTION_POST_DATA:
321 : /* ok no matter which section we were in */
322 15878 : break;
323 0 : default:
324 0 : pg_fatal("unexpected section code %d",
325 : (int) te->section);
326 : break;
327 : }
328 13724 : }
329 :
330 100906 : if (te->section != SECTION_NONE)
331 82560 : curSection = te->section;
332 :
333 100906 : te->reqs = _tocEntryRequired(te, curSection, AH);
334 : }
335 :
336 : /* Enforce strict names checking */
337 450 : if (ropt->strict_names)
338 0 : StrictNamesCheck(ropt);
339 450 : }
340 :
341 : /* Public */
342 : void
343 358 : RestoreArchive(Archive *AHX)
344 : {
345 358 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
346 358 : RestoreOptions *ropt = AH->public.ropt;
347 : bool parallel_mode;
348 : TocEntry *te;
349 : CompressFileHandle *sav;
350 :
351 358 : AH->stage = STAGE_INITIALIZING;
352 :
353 : /*
354 : * If we're going to do parallel restore, there are some restrictions.
355 : */
356 358 : parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
357 358 : if (parallel_mode)
358 : {
359 : /* We haven't got round to making this work for all archive formats */
360 8 : if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
361 0 : pg_fatal("parallel restore is not supported with this archive file format");
362 :
363 : /* Doesn't work if the archive represents dependencies as OIDs */
364 8 : if (AH->version < K_VERS_1_8)
365 0 : pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
366 :
367 : /*
368 : * It's also not gonna work if we can't reopen the input file, so
369 : * let's try that immediately.
370 : */
371 8 : AH->ReopenPtr(AH);
372 : }
373 :
374 : /*
375 : * Make sure we won't need (de)compression we haven't got
376 : */
377 358 : if (AH->PrintTocDataPtr != NULL)
378 : {
379 60384 : for (te = AH->toc->next; te != AH->toc; te = te->next)
380 : {
381 60220 : if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
382 : {
383 194 : char *errmsg = supports_compression(AH->compression_spec);
384 :
385 194 : if (errmsg)
386 0 : pg_fatal("cannot restore from compressed archive (%s)",
387 : errmsg);
388 : else
389 194 : break;
390 : }
391 : }
392 : }
393 :
394 : /*
395 : * Prepare index arrays, so we can assume we have them throughout restore.
396 : * It's possible we already did this, though.
397 : */
398 358 : if (AH->tocsByDumpId == NULL)
399 342 : buildTocEntryArrays(AH);
400 :
401 : /*
402 : * If we're using a DB connection, then connect it.
403 : */
404 358 : if (ropt->useDB)
405 : {
406 56 : pg_log_info("connecting to database for restore");
407 56 : if (AH->version < K_VERS_1_3)
408 0 : pg_fatal("direct database connections are not supported in pre-1.3 archives");
409 :
410 : /*
411 : * We don't want to guess at whether the dump will successfully
412 : * restore; allow the attempt regardless of the version of the restore
413 : * target.
414 : */
415 56 : AHX->minRemoteVersion = 0;
416 56 : AHX->maxRemoteVersion = 9999999;
417 :
418 56 : ConnectDatabase(AHX, &ropt->cparams, false);
419 :
420 : /*
421 : * If we're talking to the DB directly, don't send comments since they
422 : * obscure SQL when displaying errors
423 : */
424 56 : AH->noTocComments = 1;
425 : }
426 :
427 : /*
428 : * Work out if we have an implied schema-less restore. This can happen if
429 : * the dump excluded the schema or the user has used a toc list to exclude
430 : * all of the schema data. All we do is look for schema entries - if none
431 : * are found then we unset the dumpSchema flag.
432 : *
433 : * We could scan for wanted TABLE entries, but that is not the same as
434 : * data-only. At this stage, it seems unnecessary (6-Mar-2001).
435 : */
436 358 : if (ropt->dumpSchema)
437 : {
438 340 : bool no_schema_found = true;
439 :
440 2774 : for (te = AH->toc->next; te != AH->toc; te = te->next)
441 : {
442 2732 : if ((te->reqs & REQ_SCHEMA) != 0)
443 : {
444 298 : no_schema_found = false;
445 298 : break;
446 : }
447 : }
448 340 : if (no_schema_found)
449 : {
450 42 : ropt->dumpSchema = false;
451 42 : pg_log_info("implied no-schema restore");
452 : }
453 : }
454 :
455 : /*
456 : * Setup the output file if necessary.
457 : */
458 358 : sav = SaveOutput(AH);
459 358 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
460 280 : SetOutput(AH, ropt->filename, ropt->compression_spec);
461 :
462 358 : ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
463 :
464 358 : if (AH->archiveRemoteVersion)
465 358 : ahprintf(AH, "-- Dumped from database version %s\n",
466 : AH->archiveRemoteVersion);
467 358 : if (AH->archiveDumpVersion)
468 358 : ahprintf(AH, "-- Dumped by pg_dump version %s\n",
469 : AH->archiveDumpVersion);
470 :
471 358 : ahprintf(AH, "\n");
472 :
473 358 : if (AH->public.verbose)
474 70 : dumpTimestamp(AH, "Started on", AH->createDate);
475 :
476 358 : if (ropt->single_txn)
477 : {
478 0 : if (AH->connection)
479 0 : StartTransaction(AHX);
480 : else
481 0 : ahprintf(AH, "BEGIN;\n\n");
482 : }
483 :
484 : /*
485 : * Establish important parameter values right away.
486 : */
487 358 : _doSetFixedOutputState(AH);
488 :
489 358 : AH->stage = STAGE_PROCESSING;
490 :
491 : /*
492 : * Drop the items at the start, in reverse order
493 : */
494 358 : if (ropt->dropSchema)
495 : {
496 2614 : for (te = AH->toc->prev; te != AH->toc; te = te->prev)
497 : {
498 2568 : AH->currentTE = te;
499 :
500 : /*
501 : * In createDB mode, issue a DROP *only* for the database as a
502 : * whole. Issuing drops against anything else would be wrong,
503 : * because at this point we're connected to the wrong database.
504 : * (The DATABASE PROPERTIES entry, if any, should be treated like
505 : * the DATABASE entry.)
506 : */
507 2568 : if (ropt->createDB)
508 : {
509 1066 : if (strcmp(te->desc, "DATABASE") != 0 &&
510 1030 : strcmp(te->desc, "DATABASE PROPERTIES") != 0)
511 998 : continue;
512 : }
513 :
514 : /* Otherwise, drop anything that's selected and has a dropStmt */
515 1570 : if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
516 : {
517 640 : bool not_allowed_in_txn = false;
518 :
519 640 : pg_log_info("dropping %s %s", te->desc, te->tag);
520 :
521 : /*
522 : * In --transaction-size mode, we have to temporarily exit our
523 : * transaction block to drop objects that can't be dropped
524 : * within a transaction.
525 : */
526 640 : if (ropt->txn_size > 0)
527 : {
528 64 : if (strcmp(te->desc, "DATABASE") == 0 ||
529 32 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
530 : {
531 64 : not_allowed_in_txn = true;
532 64 : if (AH->connection)
533 64 : CommitTransaction(AHX);
534 : else
535 0 : ahprintf(AH, "COMMIT;\n");
536 : }
537 : }
538 :
539 : /* Select owner and schema as necessary */
540 640 : _becomeOwner(AH, te);
541 640 : _selectOutputSchema(AH, te->namespace);
542 :
543 : /*
544 : * Now emit the DROP command, if the object has one. Note we
545 : * don't necessarily emit it verbatim; at this point we add an
546 : * appropriate IF EXISTS clause, if the user requested it.
547 : */
548 640 : if (strcmp(te->desc, "BLOB METADATA") == 0)
549 : {
550 : /* We must generate the per-blob commands */
551 8 : if (ropt->if_exists)
552 4 : IssueCommandPerBlob(AH, te,
553 : "SELECT pg_catalog.lo_unlink(oid) "
554 : "FROM pg_catalog.pg_largeobject_metadata "
555 : "WHERE oid = '", "'");
556 : else
557 4 : IssueCommandPerBlob(AH, te,
558 : "SELECT pg_catalog.lo_unlink('",
559 : "')");
560 : }
561 632 : else if (*te->dropStmt != '\0')
562 : {
563 600 : if (!ropt->if_exists ||
564 274 : strncmp(te->dropStmt, "--", 2) == 0)
565 : {
566 : /*
567 : * Without --if-exists, or if it's just a comment (as
568 : * happens for the public schema), print the dropStmt
569 : * as-is.
570 : */
571 328 : ahprintf(AH, "%s", te->dropStmt);
572 : }
573 : else
574 : {
575 : /*
576 : * Inject an appropriate spelling of "if exists". For
577 : * old-style large objects, we have a routine that
578 : * knows how to do it, without depending on
579 : * te->dropStmt; use that. For other objects we need
580 : * to parse the command.
581 : */
582 272 : if (strcmp(te->desc, "BLOB") == 0)
583 : {
584 0 : DropLOIfExists(AH, te->catalogId.oid);
585 : }
586 : else
587 : {
588 272 : char *dropStmt = pg_strdup(te->dropStmt);
589 272 : char *dropStmtOrig = dropStmt;
590 272 : PQExpBuffer ftStmt = createPQExpBuffer();
591 :
592 : /*
593 : * Need to inject IF EXISTS clause after ALTER
594 : * TABLE part in ALTER TABLE .. DROP statement
595 : */
596 272 : if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
597 : {
598 36 : appendPQExpBufferStr(ftStmt,
599 : "ALTER TABLE IF EXISTS");
600 36 : dropStmt = dropStmt + 11;
601 : }
602 :
603 : /*
604 : * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
605 : * not support the IF EXISTS clause, and therefore
606 : * we simply emit the original command for DEFAULT
607 : * objects (modulo the adjustment made above).
608 : *
609 : * Likewise, don't mess with DATABASE PROPERTIES.
610 : *
611 : * If we used CREATE OR REPLACE VIEW as a means of
612 : * quasi-dropping an ON SELECT rule, that should
613 : * be emitted unchanged as well.
614 : *
615 : * For other object types, we need to extract the
616 : * first part of the DROP which includes the
617 : * object type. Most of the time this matches
618 : * te->desc, so search for that; however for the
619 : * different kinds of CONSTRAINTs, we know to
620 : * search for hardcoded "DROP CONSTRAINT" instead.
621 : */
622 272 : if (strcmp(te->desc, "DEFAULT") == 0 ||
623 266 : strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
624 266 : strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
625 6 : appendPQExpBufferStr(ftStmt, dropStmt);
626 : else
627 : {
628 : char buffer[40];
629 : char *mark;
630 :
631 266 : if (strcmp(te->desc, "CONSTRAINT") == 0 ||
632 240 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
633 240 : strcmp(te->desc, "FK CONSTRAINT") == 0)
634 30 : strcpy(buffer, "DROP CONSTRAINT");
635 : else
636 236 : snprintf(buffer, sizeof(buffer), "DROP %s",
637 : te->desc);
638 :
639 266 : mark = strstr(dropStmt, buffer);
640 :
641 266 : if (mark)
642 : {
643 266 : *mark = '\0';
644 266 : appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
645 : dropStmt, buffer,
646 266 : mark + strlen(buffer));
647 : }
648 : else
649 : {
650 : /* complain and emit unmodified command */
651 0 : pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
652 : dropStmtOrig);
653 0 : appendPQExpBufferStr(ftStmt, dropStmt);
654 : }
655 : }
656 :
657 272 : ahprintf(AH, "%s", ftStmt->data);
658 :
659 272 : destroyPQExpBuffer(ftStmt);
660 272 : pg_free(dropStmtOrig);
661 : }
662 : }
663 : }
664 :
665 : /*
666 : * In --transaction-size mode, re-establish the transaction
667 : * block if needed; otherwise, commit after every N drops.
668 : */
669 640 : if (ropt->txn_size > 0)
670 : {
671 64 : if (not_allowed_in_txn)
672 : {
673 64 : if (AH->connection)
674 64 : StartTransaction(AHX);
675 : else
676 0 : ahprintf(AH, "BEGIN;\n");
677 64 : AH->txnCount = 0;
678 : }
679 0 : else if (++AH->txnCount >= ropt->txn_size)
680 : {
681 0 : if (AH->connection)
682 : {
683 0 : CommitTransaction(AHX);
684 0 : StartTransaction(AHX);
685 : }
686 : else
687 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n");
688 0 : AH->txnCount = 0;
689 : }
690 : }
691 : }
692 : }
693 :
694 : /*
695 : * _selectOutputSchema may have set currSchema to reflect the effect
696 : * of a "SET search_path" command it emitted. However, by now we may
697 : * have dropped that schema; or it might not have existed in the first
698 : * place. In either case the effective value of search_path will not
699 : * be what we think. Forcibly reset currSchema so that we will
700 : * re-establish the search_path setting when needed (after creating
701 : * the schema).
702 : *
703 : * If we treated users as pg_dump'able objects then we'd need to reset
704 : * currUser here too.
705 : */
706 46 : free(AH->currSchema);
707 46 : AH->currSchema = NULL;
708 : }
709 :
710 358 : if (parallel_mode)
711 : {
712 : /*
713 : * In parallel mode, turn control over to the parallel-restore logic.
714 : */
715 : ParallelState *pstate;
716 : TocEntry pending_list;
717 :
718 : /* The archive format module may need some setup for this */
719 8 : if (AH->PrepParallelRestorePtr)
720 8 : AH->PrepParallelRestorePtr(AH);
721 :
722 8 : pending_list_header_init(&pending_list);
723 :
724 : /* This runs PRE_DATA items and then disconnects from the database */
725 8 : restore_toc_entries_prefork(AH, &pending_list);
726 : Assert(AH->connection == NULL);
727 :
728 : /* ParallelBackupStart() will actually fork the processes */
729 8 : pstate = ParallelBackupStart(AH);
730 8 : restore_toc_entries_parallel(AH, pstate, &pending_list);
731 8 : ParallelBackupEnd(AH, pstate);
732 :
733 : /* reconnect the leader and see if we missed something */
734 8 : restore_toc_entries_postfork(AH, &pending_list);
735 : Assert(AH->connection != NULL);
736 : }
737 : else
738 : {
739 : /*
740 : * In serial mode, process everything in three phases: normal items,
741 : * then ACLs, then post-ACL items. We might be able to skip one or
742 : * both extra phases in some cases, eg data-only restores.
743 : */
744 350 : bool haveACL = false;
745 350 : bool havePostACL = false;
746 :
747 88172 : for (te = AH->toc->next; te != AH->toc; te = te->next)
748 : {
749 87824 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
750 2908 : continue; /* ignore if not to be dumped at all */
751 :
752 84916 : switch (_tocEntryRestorePass(AH, te))
753 : {
754 79592 : case RESTORE_PASS_MAIN:
755 79592 : (void) restore_toc_entry(AH, te, false);
756 79590 : break;
757 4126 : case RESTORE_PASS_ACL:
758 4126 : haveACL = true;
759 4126 : break;
760 1198 : case RESTORE_PASS_POST_ACL:
761 1198 : havePostACL = true;
762 1198 : break;
763 : }
764 87822 : }
765 :
766 348 : if (haveACL)
767 : {
768 85216 : for (te = AH->toc->next; te != AH->toc; te = te->next)
769 : {
770 168102 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
771 83044 : _tocEntryRestorePass(AH, te) == RESTORE_PASS_ACL)
772 4126 : (void) restore_toc_entry(AH, te, false);
773 : }
774 : }
775 :
776 348 : if (havePostACL)
777 : {
778 55220 : for (te = AH->toc->next; te != AH->toc; te = te->next)
779 : {
780 109484 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
781 54348 : _tocEntryRestorePass(AH, te) == RESTORE_PASS_POST_ACL)
782 1198 : (void) restore_toc_entry(AH, te, false);
783 : }
784 : }
785 : }
786 :
787 : /*
788 : * Close out any persistent transaction we may have. While these two
789 : * cases are started in different places, we can end both cases here.
790 : */
791 356 : if (ropt->single_txn || ropt->txn_size > 0)
792 : {
793 48 : if (AH->connection)
794 48 : CommitTransaction(AHX);
795 : else
796 0 : ahprintf(AH, "COMMIT;\n\n");
797 : }
798 :
799 356 : if (AH->public.verbose)
800 70 : dumpTimestamp(AH, "Completed on", time(NULL));
801 :
802 356 : ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
803 :
804 : /*
805 : * Clean up & we're done.
806 : */
807 356 : AH->stage = STAGE_FINALIZING;
808 :
809 356 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
810 280 : RestoreOutput(AH, sav);
811 :
812 356 : if (ropt->useDB)
813 56 : DisconnectDatabase(&AH->public);
814 356 : }
815 :
816 : /*
817 : * Restore a single TOC item. Used in both parallel and non-parallel restore;
818 : * is_parallel is true if we are in a worker child process.
819 : *
820 : * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
821 : * the parallel parent has to make the corresponding status update.
822 : */
823 : static int
824 85184 : restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
825 : {
826 85184 : RestoreOptions *ropt = AH->public.ropt;
827 85184 : int status = WORKER_OK;
828 : int reqs;
829 : bool defnDumped;
830 :
831 85184 : AH->currentTE = te;
832 :
833 : /* Dump any relevant dump warnings to stderr */
834 85184 : if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
835 : {
836 0 : if (ropt->dumpSchema && te->defn != NULL && strlen(te->defn) != 0)
837 0 : pg_log_warning("warning from original dump file: %s", te->defn);
838 0 : else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
839 0 : pg_log_warning("warning from original dump file: %s", te->copyStmt);
840 : }
841 :
842 : /* Work out what, if anything, we want from this entry */
843 85184 : reqs = te->reqs;
844 :
845 85184 : defnDumped = false;
846 :
847 : /*
848 : * If it has a schema component that we want, then process that
849 : */
850 85184 : if ((reqs & REQ_SCHEMA) != 0)
851 : {
852 64028 : bool object_is_db = false;
853 :
854 : /*
855 : * In --transaction-size mode, must exit our transaction block to
856 : * create a database or set its properties.
857 : */
858 64028 : if (strcmp(te->desc, "DATABASE") == 0 ||
859 63916 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
860 : {
861 168 : object_is_db = true;
862 168 : if (ropt->txn_size > 0)
863 : {
864 96 : if (AH->connection)
865 96 : CommitTransaction(&AH->public);
866 : else
867 0 : ahprintf(AH, "COMMIT;\n\n");
868 : }
869 : }
870 :
871 : /* Show namespace in log message if available */
872 64028 : if (te->namespace)
873 61312 : pg_log_info("creating %s \"%s.%s\"",
874 : te->desc, te->namespace, te->tag);
875 : else
876 2716 : pg_log_info("creating %s \"%s\"",
877 : te->desc, te->tag);
878 :
879 64028 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
880 64028 : defnDumped = true;
881 :
882 64028 : if (strcmp(te->desc, "TABLE") == 0)
883 : {
884 10094 : if (AH->lastErrorTE == te)
885 : {
886 : /*
887 : * We failed to create the table. If
888 : * --no-data-for-failed-tables was given, mark the
889 : * corresponding TABLE DATA to be ignored.
890 : *
891 : * In the parallel case this must be done in the parent, so we
892 : * just set the return value.
893 : */
894 0 : if (ropt->noDataForFailedTables)
895 : {
896 0 : if (is_parallel)
897 0 : status = WORKER_INHIBIT_DATA;
898 : else
899 0 : inhibit_data_for_failed_table(AH, te);
900 : }
901 : }
902 : else
903 : {
904 : /*
905 : * We created the table successfully. Mark the corresponding
906 : * TABLE DATA for possible truncation.
907 : *
908 : * In the parallel case this must be done in the parent, so we
909 : * just set the return value.
910 : */
911 10094 : if (is_parallel)
912 0 : status = WORKER_CREATE_DONE;
913 : else
914 10094 : mark_create_done(AH, te);
915 : }
916 : }
917 :
918 : /*
919 : * If we created a DB, connect to it. Also, if we changed DB
920 : * properties, reconnect to ensure that relevant GUC settings are
921 : * applied to our session. (That also restarts the transaction block
922 : * in --transaction-size mode.)
923 : */
924 64028 : if (object_is_db)
925 : {
926 168 : pg_log_info("connecting to new database \"%s\"", te->tag);
927 168 : _reconnectToDB(AH, te->tag);
928 : }
929 : }
930 :
931 : /*
932 : * If it has a data component that we want, then process that
933 : */
934 85184 : if ((reqs & REQ_DATA) != 0)
935 : {
936 : /*
937 : * hadDumper will be set if there is genuine data component for this
938 : * node. Otherwise, we need to check the defn field for statements
939 : * that need to be executed in data-only restores.
940 : */
941 9086 : if (te->hadDumper)
942 : {
943 : /*
944 : * If we can output the data, then restore it.
945 : */
946 7978 : if (AH->PrintTocDataPtr != NULL)
947 : {
948 7978 : _printTocEntry(AH, te, TOC_PREFIX_DATA);
949 :
950 7978 : if (strcmp(te->desc, "BLOBS") == 0 ||
951 7830 : strcmp(te->desc, "BLOB COMMENTS") == 0)
952 : {
953 148 : pg_log_info("processing %s", te->desc);
954 :
955 148 : _selectOutputSchema(AH, "pg_catalog");
956 :
957 : /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
958 148 : if (strcmp(te->desc, "BLOB COMMENTS") == 0)
959 0 : AH->outputKind = OUTPUT_OTHERDATA;
960 :
961 148 : AH->PrintTocDataPtr(AH, te);
962 :
963 148 : AH->outputKind = OUTPUT_SQLCMDS;
964 : }
965 : else
966 : {
967 : bool use_truncate;
968 :
969 7830 : _disableTriggersIfNecessary(AH, te);
970 :
971 : /* Select owner and schema as necessary */
972 7830 : _becomeOwner(AH, te);
973 7830 : _selectOutputSchema(AH, te->namespace);
974 :
975 7830 : pg_log_info("processing data for table \"%s.%s\"",
976 : te->namespace, te->tag);
977 :
978 : /*
979 : * In parallel restore, if we created the table earlier in
980 : * this run (so that we know it is empty) and we are not
981 : * restoring a load-via-partition-root data item then we
982 : * wrap the COPY in a transaction and precede it with a
983 : * TRUNCATE. If wal_level is set to minimal this prevents
984 : * WAL-logging the COPY. This obtains a speedup similar
985 : * to that from using single_txn mode in non-parallel
986 : * restores.
987 : *
988 : * We mustn't do this for load-via-partition-root cases
989 : * because some data might get moved across partition
990 : * boundaries, risking deadlock and/or loss of previously
991 : * loaded data. (We assume that all partitions of a
992 : * partitioned table will be treated the same way.)
993 : */
994 7862 : use_truncate = is_parallel && te->created &&
995 32 : !is_load_via_partition_root(te);
996 :
997 7830 : if (use_truncate)
998 : {
999 : /*
1000 : * Parallel restore is always talking directly to a
1001 : * server, so no need to see if we should issue BEGIN.
1002 : */
1003 20 : StartTransaction(&AH->public);
1004 :
1005 : /*
1006 : * Issue TRUNCATE with ONLY so that child tables are
1007 : * not wiped.
1008 : */
1009 20 : ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1010 20 : fmtQualifiedId(te->namespace, te->tag));
1011 : }
1012 :
1013 : /*
1014 : * If we have a copy statement, use it.
1015 : */
1016 7830 : if (te->copyStmt && strlen(te->copyStmt) > 0)
1017 : {
1018 7688 : ahprintf(AH, "%s", te->copyStmt);
1019 7688 : AH->outputKind = OUTPUT_COPYDATA;
1020 : }
1021 : else
1022 142 : AH->outputKind = OUTPUT_OTHERDATA;
1023 :
1024 7830 : AH->PrintTocDataPtr(AH, te);
1025 :
1026 : /*
1027 : * Terminate COPY if needed.
1028 : */
1029 15514 : if (AH->outputKind == OUTPUT_COPYDATA &&
1030 7686 : RestoringToDB(AH))
1031 18 : EndDBCopyMode(&AH->public, te->tag);
1032 7828 : AH->outputKind = OUTPUT_SQLCMDS;
1033 :
1034 : /* close out the transaction started above */
1035 7828 : if (use_truncate)
1036 20 : CommitTransaction(&AH->public);
1037 :
1038 7828 : _enableTriggersIfNecessary(AH, te);
1039 : }
1040 : }
1041 : }
1042 1108 : else if (!defnDumped)
1043 : {
1044 : /* If we haven't already dumped the defn part, do so now */
1045 1108 : pg_log_info("executing %s %s", te->desc, te->tag);
1046 1108 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
1047 : }
1048 : }
1049 :
1050 : /*
1051 : * If it has a statistics component that we want, then process that
1052 : */
1053 85182 : if ((reqs & REQ_STATS) != 0)
1054 12040 : _printTocEntry(AH, te, TOC_PREFIX_STATS);
1055 :
1056 : /*
1057 : * If we emitted anything for this TOC entry, that counts as one action
1058 : * against the transaction-size limit. Commit if it's time to.
1059 : */
1060 85182 : if ((reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 && ropt->txn_size > 0)
1061 : {
1062 6360 : if (++AH->txnCount >= ropt->txn_size)
1063 : {
1064 18 : if (AH->connection)
1065 : {
1066 18 : CommitTransaction(&AH->public);
1067 18 : StartTransaction(&AH->public);
1068 : }
1069 : else
1070 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1071 18 : AH->txnCount = 0;
1072 : }
1073 : }
1074 :
1075 85182 : if (AH->public.n_errors > 0 && status == WORKER_OK)
1076 0 : status = WORKER_IGNORED_ERRORS;
1077 :
1078 85182 : return status;
1079 : }
1080 :
1081 : /*
1082 : * Allocate a new RestoreOptions block.
1083 : * This is mainly so we can initialize it, but also for future expansion,
1084 : */
1085 : RestoreOptions *
1086 528 : NewRestoreOptions(void)
1087 : {
1088 : RestoreOptions *opts;
1089 :
1090 528 : opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
1091 :
1092 : /* set any fields that shouldn't default to zeroes */
1093 528 : opts->format = archUnknown;
1094 528 : opts->cparams.promptPassword = TRI_DEFAULT;
1095 528 : opts->dumpSections = DUMP_UNSECTIONED;
1096 528 : opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1097 528 : opts->compression_spec.level = 0;
1098 528 : opts->dumpSchema = true;
1099 528 : opts->dumpData = true;
1100 528 : opts->dumpStatistics = true;
1101 :
1102 528 : return opts;
1103 : }
1104 :
1105 : static void
1106 7830 : _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1107 : {
1108 7830 : RestoreOptions *ropt = AH->public.ropt;
1109 :
1110 : /* This hack is only needed in a data-only restore */
1111 7830 : if (ropt->dumpSchema || !ropt->disable_triggers)
1112 7768 : return;
1113 :
1114 62 : pg_log_info("disabling triggers for %s", te->tag);
1115 :
1116 : /*
1117 : * Become superuser if possible, since they are the only ones who can
1118 : * disable constraint triggers. If -S was not given, assume the initial
1119 : * user identity is a superuser. (XXX would it be better to become the
1120 : * table owner?)
1121 : */
1122 62 : _becomeUser(AH, ropt->superuser);
1123 :
1124 : /*
1125 : * Disable them.
1126 : */
1127 62 : ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1128 62 : fmtQualifiedId(te->namespace, te->tag));
1129 : }
1130 :
1131 : static void
1132 7828 : _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1133 : {
1134 7828 : RestoreOptions *ropt = AH->public.ropt;
1135 :
1136 : /* This hack is only needed in a data-only restore */
1137 7828 : if (ropt->dumpSchema || !ropt->disable_triggers)
1138 7766 : return;
1139 :
1140 62 : pg_log_info("enabling triggers for %s", te->tag);
1141 :
1142 : /*
1143 : * Become superuser if possible, since they are the only ones who can
1144 : * disable constraint triggers. If -S was not given, assume the initial
1145 : * user identity is a superuser. (XXX would it be better to become the
1146 : * table owner?)
1147 : */
1148 62 : _becomeUser(AH, ropt->superuser);
1149 :
1150 : /*
1151 : * Enable them.
1152 : */
1153 62 : ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1154 62 : fmtQualifiedId(te->namespace, te->tag));
1155 : }
1156 :
1157 : /*
1158 : * Detect whether a TABLE DATA TOC item is performing "load via partition
1159 : * root", that is the target table is an ancestor partition rather than the
1160 : * table the TOC item is nominally for.
1161 : *
1162 : * In newer archive files this can be detected by checking for a special
1163 : * comment placed in te->defn. In older files we have to fall back to seeing
1164 : * if the COPY statement targets the named table or some other one. This
1165 : * will not work for data dumped as INSERT commands, so we could give a false
1166 : * negative in that case; fortunately, that's a rarely-used option.
1167 : */
1168 : static bool
1169 32 : is_load_via_partition_root(TocEntry *te)
1170 : {
1171 32 : if (te->defn &&
1172 12 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
1173 12 : return true;
1174 20 : if (te->copyStmt && *te->copyStmt)
1175 : {
1176 12 : PQExpBuffer copyStmt = createPQExpBuffer();
1177 : bool result;
1178 :
1179 : /*
1180 : * Build the initial part of the COPY as it would appear if the
1181 : * nominal target table is the actual target. If we see anything
1182 : * else, it must be a load-via-partition-root case.
1183 : */
1184 12 : appendPQExpBuffer(copyStmt, "COPY %s ",
1185 12 : fmtQualifiedId(te->namespace, te->tag));
1186 12 : result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1187 12 : destroyPQExpBuffer(copyStmt);
1188 12 : return result;
1189 : }
1190 : /* Assume it's not load-via-partition-root */
1191 8 : return false;
1192 : }
1193 :
1194 : /*
1195 : * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1196 : */
1197 :
1198 : /* Public */
1199 : void
1200 3661928 : WriteData(Archive *AHX, const void *data, size_t dLen)
1201 : {
1202 3661928 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1203 :
1204 3661928 : if (!AH->currToc)
1205 0 : pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1206 :
1207 3661928 : AH->WriteDataPtr(AH, data, dLen);
1208 3661928 : }
1209 :
1210 : /*
1211 : * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1212 : * repository for all metadata. But the name has stuck.
1213 : *
1214 : * The new entry is added to the Archive's TOC list. Most callers can ignore
1215 : * the result value because nothing else need be done, but a few want to
1216 : * manipulate the TOC entry further.
1217 : */
1218 :
1219 : /* Public */
1220 : TocEntry *
1221 87182 : ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1222 : ArchiveOpts *opts)
1223 : {
1224 87182 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1225 : TocEntry *newToc;
1226 :
1227 87182 : newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1228 :
1229 87182 : AH->tocCount++;
1230 87182 : if (dumpId > AH->maxDumpId)
1231 31516 : AH->maxDumpId = dumpId;
1232 :
1233 87182 : newToc->prev = AH->toc->prev;
1234 87182 : newToc->next = AH->toc;
1235 87182 : AH->toc->prev->next = newToc;
1236 87182 : AH->toc->prev = newToc;
1237 :
1238 87182 : newToc->catalogId = catalogId;
1239 87182 : newToc->dumpId = dumpId;
1240 87182 : newToc->section = opts->section;
1241 :
1242 87182 : newToc->tag = pg_strdup(opts->tag);
1243 87182 : newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1244 87182 : newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1245 87182 : newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1246 87182 : newToc->relkind = opts->relkind;
1247 87182 : newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1248 87182 : newToc->desc = pg_strdup(opts->description);
1249 87182 : newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1250 87182 : newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1251 87182 : newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1252 :
1253 87182 : if (opts->nDeps > 0)
1254 : {
1255 38930 : newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1256 38930 : memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1257 38930 : newToc->nDeps = opts->nDeps;
1258 : }
1259 : else
1260 : {
1261 48252 : newToc->dependencies = NULL;
1262 48252 : newToc->nDeps = 0;
1263 : }
1264 :
1265 87182 : newToc->dataDumper = opts->dumpFn;
1266 87182 : newToc->dataDumperArg = opts->dumpArg;
1267 87182 : newToc->hadDumper = opts->dumpFn ? true : false;
1268 :
1269 87182 : newToc->formatData = NULL;
1270 87182 : newToc->dataLength = 0;
1271 :
1272 87182 : if (AH->ArchiveEntryPtr != NULL)
1273 13570 : AH->ArchiveEntryPtr(AH, newToc);
1274 :
1275 87182 : return newToc;
1276 : }
1277 :
1278 : /* Public */
1279 : void
1280 8 : PrintTOCSummary(Archive *AHX)
1281 : {
1282 8 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1283 8 : RestoreOptions *ropt = AH->public.ropt;
1284 : TocEntry *te;
1285 8 : pg_compress_specification out_compression_spec = {0};
1286 : teSection curSection;
1287 : CompressFileHandle *sav;
1288 : const char *fmtName;
1289 : char stamp_str[64];
1290 :
1291 : /* TOC is always uncompressed */
1292 8 : out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1293 :
1294 8 : sav = SaveOutput(AH);
1295 8 : if (ropt->filename)
1296 0 : SetOutput(AH, ropt->filename, out_compression_spec);
1297 :
1298 8 : if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1299 8 : localtime(&AH->createDate)) == 0)
1300 0 : strcpy(stamp_str, "[unknown]");
1301 :
1302 8 : ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1303 16 : ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1304 8 : sanitize_line(AH->archdbname, false),
1305 : AH->tocCount,
1306 : get_compress_algorithm_name(AH->compression_spec.algorithm));
1307 :
1308 8 : switch (AH->format)
1309 : {
1310 6 : case archCustom:
1311 6 : fmtName = "CUSTOM";
1312 6 : break;
1313 2 : case archDirectory:
1314 2 : fmtName = "DIRECTORY";
1315 2 : break;
1316 0 : case archTar:
1317 0 : fmtName = "TAR";
1318 0 : break;
1319 0 : default:
1320 0 : fmtName = "UNKNOWN";
1321 : }
1322 :
1323 8 : ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1324 8 : ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1325 8 : ahprintf(AH, "; Format: %s\n", fmtName);
1326 8 : ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1327 8 : ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1328 8 : if (AH->archiveRemoteVersion)
1329 8 : ahprintf(AH, "; Dumped from database version: %s\n",
1330 : AH->archiveRemoteVersion);
1331 8 : if (AH->archiveDumpVersion)
1332 8 : ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1333 : AH->archiveDumpVersion);
1334 :
1335 8 : ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1336 :
1337 8 : curSection = SECTION_PRE_DATA;
1338 2832 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1339 : {
1340 : /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1341 2824 : if (te->section != SECTION_NONE)
1342 2248 : curSection = te->section;
1343 2824 : te->reqs = _tocEntryRequired(te, curSection, AH);
1344 : /* Now, should we print it? */
1345 2824 : if (ropt->verbose ||
1346 2824 : (te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0)
1347 : {
1348 : char *sanitized_name;
1349 : char *sanitized_schema;
1350 : char *sanitized_owner;
1351 :
1352 : /*
1353 : */
1354 2784 : sanitized_name = sanitize_line(te->tag, false);
1355 2784 : sanitized_schema = sanitize_line(te->namespace, true);
1356 2784 : sanitized_owner = sanitize_line(te->owner, false);
1357 :
1358 2784 : ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1359 : te->catalogId.tableoid, te->catalogId.oid,
1360 : te->desc, sanitized_schema, sanitized_name,
1361 : sanitized_owner);
1362 :
1363 2784 : free(sanitized_name);
1364 2784 : free(sanitized_schema);
1365 2784 : free(sanitized_owner);
1366 : }
1367 2824 : if (ropt->verbose && te->nDeps > 0)
1368 : {
1369 : int i;
1370 :
1371 0 : ahprintf(AH, ";\tdepends on:");
1372 0 : for (i = 0; i < te->nDeps; i++)
1373 0 : ahprintf(AH, " %d", te->dependencies[i]);
1374 0 : ahprintf(AH, "\n");
1375 : }
1376 : }
1377 :
1378 : /* Enforce strict names checking */
1379 8 : if (ropt->strict_names)
1380 0 : StrictNamesCheck(ropt);
1381 :
1382 8 : if (ropt->filename)
1383 0 : RestoreOutput(AH, sav);
1384 8 : }
1385 :
1386 : /***********
1387 : * Large Object Archival
1388 : ***********/
1389 :
1390 : /* Called by a dumper to signal start of a LO */
1391 : int
1392 160 : StartLO(Archive *AHX, Oid oid)
1393 : {
1394 160 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1395 :
1396 160 : if (!AH->StartLOPtr)
1397 0 : pg_fatal("large-object output not supported in chosen format");
1398 :
1399 160 : AH->StartLOPtr(AH, AH->currToc, oid);
1400 :
1401 160 : return 1;
1402 : }
1403 :
1404 : /* Called by a dumper to signal end of a LO */
1405 : int
1406 160 : EndLO(Archive *AHX, Oid oid)
1407 : {
1408 160 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1409 :
1410 160 : if (AH->EndLOPtr)
1411 160 : AH->EndLOPtr(AH, AH->currToc, oid);
1412 :
1413 160 : return 1;
1414 : }
1415 :
1416 : /**********
1417 : * Large Object Restoration
1418 : **********/
1419 :
1420 : /*
1421 : * Called by a format handler before a group of LOs is restored
1422 : */
1423 : void
1424 32 : StartRestoreLOs(ArchiveHandle *AH)
1425 : {
1426 32 : RestoreOptions *ropt = AH->public.ropt;
1427 :
1428 : /*
1429 : * LOs must be restored within a transaction block, since we need the LO
1430 : * handle to stay open while we write it. Establish a transaction unless
1431 : * there's one being used globally.
1432 : */
1433 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1434 : {
1435 32 : if (AH->connection)
1436 0 : StartTransaction(&AH->public);
1437 : else
1438 32 : ahprintf(AH, "BEGIN;\n\n");
1439 : }
1440 :
1441 32 : AH->loCount = 0;
1442 32 : }
1443 :
1444 : /*
1445 : * Called by a format handler after a group of LOs is restored
1446 : */
1447 : void
1448 32 : EndRestoreLOs(ArchiveHandle *AH)
1449 : {
1450 32 : RestoreOptions *ropt = AH->public.ropt;
1451 :
1452 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1453 : {
1454 32 : if (AH->connection)
1455 0 : CommitTransaction(&AH->public);
1456 : else
1457 32 : ahprintf(AH, "COMMIT;\n\n");
1458 : }
1459 :
1460 32 : pg_log_info(ngettext("restored %d large object",
1461 : "restored %d large objects",
1462 : AH->loCount),
1463 : AH->loCount);
1464 32 : }
1465 :
1466 :
1467 : /*
1468 : * Called by a format handler to initiate restoration of a LO
1469 : */
1470 : void
1471 32 : StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1472 : {
1473 32 : bool old_lo_style = (AH->version < K_VERS_1_12);
1474 : Oid loOid;
1475 :
1476 32 : AH->loCount++;
1477 :
1478 : /* Initialize the LO Buffer */
1479 32 : if (AH->lo_buf == NULL)
1480 : {
1481 : /* First time through (in this process) so allocate the buffer */
1482 16 : AH->lo_buf_size = LOBBUFSIZE;
1483 16 : AH->lo_buf = pg_malloc(LOBBUFSIZE);
1484 : }
1485 32 : AH->lo_buf_used = 0;
1486 :
1487 32 : pg_log_info("restoring large object with OID %u", oid);
1488 :
1489 : /* With an old archive we must do drop and create logic here */
1490 32 : if (old_lo_style && drop)
1491 0 : DropLOIfExists(AH, oid);
1492 :
1493 32 : if (AH->connection)
1494 : {
1495 0 : if (old_lo_style)
1496 : {
1497 0 : loOid = lo_create(AH->connection, oid);
1498 0 : if (loOid == 0 || loOid != oid)
1499 0 : pg_fatal("could not create large object %u: %s",
1500 : oid, PQerrorMessage(AH->connection));
1501 : }
1502 0 : AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1503 0 : if (AH->loFd == -1)
1504 0 : pg_fatal("could not open large object %u: %s",
1505 : oid, PQerrorMessage(AH->connection));
1506 : }
1507 : else
1508 : {
1509 32 : if (old_lo_style)
1510 0 : ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1511 : oid, INV_WRITE);
1512 : else
1513 32 : ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1514 : oid, INV_WRITE);
1515 : }
1516 :
1517 32 : AH->writingLO = true;
1518 32 : }
1519 :
1520 : void
1521 32 : EndRestoreLO(ArchiveHandle *AH, Oid oid)
1522 : {
1523 32 : if (AH->lo_buf_used > 0)
1524 : {
1525 : /* Write remaining bytes from the LO buffer */
1526 16 : dump_lo_buf(AH);
1527 : }
1528 :
1529 32 : AH->writingLO = false;
1530 :
1531 32 : if (AH->connection)
1532 : {
1533 0 : lo_close(AH->connection, AH->loFd);
1534 0 : AH->loFd = -1;
1535 : }
1536 : else
1537 : {
1538 32 : ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1539 : }
1540 32 : }
1541 :
1542 : /***********
1543 : * Sorting and Reordering
1544 : ***********/
1545 :
1546 : void
1547 0 : SortTocFromFile(Archive *AHX)
1548 : {
1549 0 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1550 0 : RestoreOptions *ropt = AH->public.ropt;
1551 : FILE *fh;
1552 : StringInfoData linebuf;
1553 :
1554 : /* Allocate space for the 'wanted' array, and init it */
1555 0 : ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1556 :
1557 : /* Setup the file */
1558 0 : fh = fopen(ropt->tocFile, PG_BINARY_R);
1559 0 : if (!fh)
1560 0 : pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1561 :
1562 0 : initStringInfo(&linebuf);
1563 :
1564 0 : while (pg_get_line_buf(fh, &linebuf))
1565 : {
1566 : char *cmnt;
1567 : char *endptr;
1568 : DumpId id;
1569 : TocEntry *te;
1570 :
1571 : /* Truncate line at comment, if any */
1572 0 : cmnt = strchr(linebuf.data, ';');
1573 0 : if (cmnt != NULL)
1574 : {
1575 0 : cmnt[0] = '\0';
1576 0 : linebuf.len = cmnt - linebuf.data;
1577 : }
1578 :
1579 : /* Ignore if all blank */
1580 0 : if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1581 0 : continue;
1582 :
1583 : /* Get an ID, check it's valid and not already seen */
1584 0 : id = strtol(linebuf.data, &endptr, 10);
1585 0 : if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1586 0 : ropt->idWanted[id - 1])
1587 : {
1588 0 : pg_log_warning("line ignored: %s", linebuf.data);
1589 0 : continue;
1590 : }
1591 :
1592 : /* Find TOC entry */
1593 0 : te = getTocEntryByDumpId(AH, id);
1594 0 : if (!te)
1595 0 : pg_fatal("could not find entry for ID %d",
1596 : id);
1597 :
1598 : /* Mark it wanted */
1599 0 : ropt->idWanted[id - 1] = true;
1600 :
1601 : /*
1602 : * Move each item to the end of the list as it is selected, so that
1603 : * they are placed in the desired order. Any unwanted items will end
1604 : * up at the front of the list, which may seem unintuitive but it's
1605 : * what we need. In an ordinary serial restore that makes no
1606 : * difference, but in a parallel restore we need to mark unrestored
1607 : * items' dependencies as satisfied before we start examining
1608 : * restorable items. Otherwise they could have surprising
1609 : * side-effects on the order in which restorable items actually get
1610 : * restored.
1611 : */
1612 0 : _moveBefore(AH->toc, te);
1613 : }
1614 :
1615 0 : pg_free(linebuf.data);
1616 :
1617 0 : if (fclose(fh) != 0)
1618 0 : pg_fatal("could not close TOC file: %m");
1619 0 : }
1620 :
1621 : /**********************
1622 : * Convenience functions that look like standard IO functions
1623 : * for writing data when in dump mode.
1624 : **********************/
1625 :
1626 : /* Public */
1627 : void
1628 46048 : archputs(const char *s, Archive *AH)
1629 : {
1630 46048 : WriteData(AH, s, strlen(s));
1631 46048 : }
1632 :
1633 : /* Public */
1634 : int
1635 7638 : archprintf(Archive *AH, const char *fmt,...)
1636 : {
1637 7638 : int save_errno = errno;
1638 : char *p;
1639 7638 : size_t len = 128; /* initial assumption about buffer size */
1640 : size_t cnt;
1641 :
1642 : for (;;)
1643 0 : {
1644 : va_list args;
1645 :
1646 : /* Allocate work buffer. */
1647 7638 : p = (char *) pg_malloc(len);
1648 :
1649 : /* Try to format the data. */
1650 7638 : errno = save_errno;
1651 7638 : va_start(args, fmt);
1652 7638 : cnt = pvsnprintf(p, len, fmt, args);
1653 7638 : va_end(args);
1654 :
1655 7638 : if (cnt < len)
1656 7638 : break; /* success */
1657 :
1658 : /* Release buffer and loop around to try again with larger len. */
1659 0 : free(p);
1660 0 : len = cnt;
1661 : }
1662 :
1663 7638 : WriteData(AH, p, cnt);
1664 7638 : free(p);
1665 7638 : return (int) cnt;
1666 : }
1667 :
1668 :
1669 : /*******************************
1670 : * Stuff below here should be 'private' to the archiver routines
1671 : *******************************/
1672 :
1673 : static void
1674 280 : SetOutput(ArchiveHandle *AH, const char *filename,
1675 : const pg_compress_specification compression_spec)
1676 : {
1677 : CompressFileHandle *CFH;
1678 : const char *mode;
1679 280 : int fn = -1;
1680 :
1681 280 : if (filename)
1682 : {
1683 280 : if (strcmp(filename, "-") == 0)
1684 0 : fn = fileno(stdout);
1685 : }
1686 0 : else if (AH->FH)
1687 0 : fn = fileno(AH->FH);
1688 0 : else if (AH->fSpec)
1689 : {
1690 0 : filename = AH->fSpec;
1691 : }
1692 : else
1693 0 : fn = fileno(stdout);
1694 :
1695 280 : if (AH->mode == archModeAppend)
1696 86 : mode = PG_BINARY_A;
1697 : else
1698 194 : mode = PG_BINARY_W;
1699 :
1700 280 : CFH = InitCompressFileHandle(compression_spec);
1701 :
1702 280 : if (!CFH->open_func(filename, fn, mode, CFH))
1703 : {
1704 0 : if (filename)
1705 0 : pg_fatal("could not open output file \"%s\": %m", filename);
1706 : else
1707 0 : pg_fatal("could not open output file: %m");
1708 : }
1709 :
1710 280 : AH->OF = CFH;
1711 280 : }
1712 :
1713 : static CompressFileHandle *
1714 366 : SaveOutput(ArchiveHandle *AH)
1715 : {
1716 366 : return (CompressFileHandle *) AH->OF;
1717 : }
1718 :
1719 : static void
1720 280 : RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
1721 : {
1722 280 : errno = 0;
1723 280 : if (!EndCompressFileHandle(AH->OF))
1724 0 : pg_fatal("could not close output file: %m");
1725 :
1726 280 : AH->OF = savedOutput;
1727 280 : }
1728 :
1729 :
1730 :
1731 : /*
1732 : * Print formatted text to the output file (usually stdout).
1733 : */
1734 : int
1735 454940 : ahprintf(ArchiveHandle *AH, const char *fmt,...)
1736 : {
1737 454940 : int save_errno = errno;
1738 : char *p;
1739 454940 : size_t len = 128; /* initial assumption about buffer size */
1740 : size_t cnt;
1741 :
1742 : for (;;)
1743 33108 : {
1744 : va_list args;
1745 :
1746 : /* Allocate work buffer. */
1747 488048 : p = (char *) pg_malloc(len);
1748 :
1749 : /* Try to format the data. */
1750 488048 : errno = save_errno;
1751 488048 : va_start(args, fmt);
1752 488048 : cnt = pvsnprintf(p, len, fmt, args);
1753 488048 : va_end(args);
1754 :
1755 488048 : if (cnt < len)
1756 454940 : break; /* success */
1757 :
1758 : /* Release buffer and loop around to try again with larger len. */
1759 33108 : free(p);
1760 33108 : len = cnt;
1761 : }
1762 :
1763 454940 : ahwrite(p, 1, cnt, AH);
1764 454940 : free(p);
1765 454940 : return (int) cnt;
1766 : }
1767 :
1768 : /*
1769 : * Single place for logic which says 'We are restoring to a direct DB connection'.
1770 : */
1771 : static int
1772 4071484 : RestoringToDB(ArchiveHandle *AH)
1773 : {
1774 4071484 : RestoreOptions *ropt = AH->public.ropt;
1775 :
1776 4071484 : return (ropt && ropt->useDB && AH->connection);
1777 : }
1778 :
1779 : /*
1780 : * Dump the current contents of the LO data buffer while writing a LO
1781 : */
1782 : static void
1783 16 : dump_lo_buf(ArchiveHandle *AH)
1784 : {
1785 16 : if (AH->connection)
1786 : {
1787 : int res;
1788 :
1789 0 : res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1790 0 : pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1791 : "wrote %zu bytes of large object data (result = %d)",
1792 : AH->lo_buf_used),
1793 : AH->lo_buf_used, res);
1794 : /* We assume there are no short writes, only errors */
1795 0 : if (res != AH->lo_buf_used)
1796 0 : warn_or_exit_horribly(AH, "could not write to large object: %s",
1797 0 : PQerrorMessage(AH->connection));
1798 : }
1799 : else
1800 : {
1801 16 : PQExpBuffer buf = createPQExpBuffer();
1802 :
1803 16 : appendByteaLiteralAHX(buf,
1804 : (const unsigned char *) AH->lo_buf,
1805 : AH->lo_buf_used,
1806 : AH);
1807 :
1808 : /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1809 16 : AH->writingLO = false;
1810 16 : ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1811 16 : AH->writingLO = true;
1812 :
1813 16 : destroyPQExpBuffer(buf);
1814 : }
1815 16 : AH->lo_buf_used = 0;
1816 16 : }
1817 :
1818 :
1819 : /*
1820 : * Write buffer to the output file (usually stdout). This is used for
1821 : * outputting 'restore' scripts etc. It is even possible for an archive
1822 : * format to create a custom output routine to 'fake' a restore if it
1823 : * wants to generate a script (see TAR output).
1824 : */
1825 : void
1826 4066790 : ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1827 : {
1828 4066790 : int bytes_written = 0;
1829 :
1830 4066790 : if (AH->writingLO)
1831 : {
1832 26 : size_t remaining = size * nmemb;
1833 :
1834 26 : while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1835 : {
1836 0 : size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1837 :
1838 0 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1839 0 : ptr = (const char *) ptr + avail;
1840 0 : remaining -= avail;
1841 0 : AH->lo_buf_used += avail;
1842 0 : dump_lo_buf(AH);
1843 : }
1844 :
1845 26 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1846 26 : AH->lo_buf_used += remaining;
1847 :
1848 26 : bytes_written = size * nmemb;
1849 : }
1850 4066764 : else if (AH->CustomOutPtr)
1851 4276 : bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1852 :
1853 : /*
1854 : * If we're doing a restore, and it's direct to DB, and we're connected
1855 : * then send it to the DB.
1856 : */
1857 4062488 : else if (RestoringToDB(AH))
1858 11930 : bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1859 : else
1860 : {
1861 4050558 : CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1862 :
1863 4050558 : if (CFH->write_func(ptr, size * nmemb, CFH))
1864 4050558 : bytes_written = size * nmemb;
1865 : }
1866 :
1867 4066790 : if (bytes_written != size * nmemb)
1868 0 : WRITE_ERROR_EXIT;
1869 4066790 : }
1870 :
1871 : /* on some error, we may decide to go on... */
1872 : void
1873 0 : warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1874 : {
1875 : va_list ap;
1876 :
1877 0 : switch (AH->stage)
1878 : {
1879 :
1880 0 : case STAGE_NONE:
1881 : /* Do nothing special */
1882 0 : break;
1883 :
1884 0 : case STAGE_INITIALIZING:
1885 0 : if (AH->stage != AH->lastErrorStage)
1886 0 : pg_log_info("while INITIALIZING:");
1887 0 : break;
1888 :
1889 0 : case STAGE_PROCESSING:
1890 0 : if (AH->stage != AH->lastErrorStage)
1891 0 : pg_log_info("while PROCESSING TOC:");
1892 0 : break;
1893 :
1894 0 : case STAGE_FINALIZING:
1895 0 : if (AH->stage != AH->lastErrorStage)
1896 0 : pg_log_info("while FINALIZING:");
1897 0 : break;
1898 : }
1899 0 : if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1900 : {
1901 0 : pg_log_info("from TOC entry %d; %u %u %s %s %s",
1902 : AH->currentTE->dumpId,
1903 : AH->currentTE->catalogId.tableoid,
1904 : AH->currentTE->catalogId.oid,
1905 : AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1906 : AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1907 : AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1908 : }
1909 0 : AH->lastErrorStage = AH->stage;
1910 0 : AH->lastErrorTE = AH->currentTE;
1911 :
1912 0 : va_start(ap, fmt);
1913 0 : pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, ap);
1914 0 : va_end(ap);
1915 :
1916 0 : if (AH->public.exit_on_error)
1917 0 : exit_nicely(1);
1918 : else
1919 0 : AH->public.n_errors++;
1920 0 : }
1921 :
1922 : #ifdef NOT_USED
1923 :
1924 : static void
1925 : _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1926 : {
1927 : /* Unlink te from list */
1928 : te->prev->next = te->next;
1929 : te->next->prev = te->prev;
1930 :
1931 : /* and insert it after "pos" */
1932 : te->prev = pos;
1933 : te->next = pos->next;
1934 : pos->next->prev = te;
1935 : pos->next = te;
1936 : }
1937 : #endif
1938 :
1939 : static void
1940 0 : _moveBefore(TocEntry *pos, TocEntry *te)
1941 : {
1942 : /* Unlink te from list */
1943 0 : te->prev->next = te->next;
1944 0 : te->next->prev = te->prev;
1945 :
1946 : /* and insert it before "pos" */
1947 0 : te->prev = pos->prev;
1948 0 : te->next = pos;
1949 0 : pos->prev->next = te;
1950 0 : pos->prev = te;
1951 0 : }
1952 :
1953 : /*
1954 : * Build index arrays for the TOC list
1955 : *
1956 : * This should be invoked only after we have created or read in all the TOC
1957 : * items.
1958 : *
1959 : * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1960 : * array entries run only up to maxDumpId. We might see dependency dump IDs
1961 : * beyond that (if the dump was partial); so always check the array bound
1962 : * before trying to touch an array entry.
1963 : */
1964 : static void
1965 418 : buildTocEntryArrays(ArchiveHandle *AH)
1966 : {
1967 418 : DumpId maxDumpId = AH->maxDumpId;
1968 : TocEntry *te;
1969 :
1970 418 : AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1971 418 : AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1972 :
1973 101054 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1974 : {
1975 : /* this check is purely paranoia, maxDumpId should be correct */
1976 100636 : if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1977 0 : pg_fatal("bad dumpId");
1978 :
1979 : /* tocsByDumpId indexes all TOCs by their dump ID */
1980 100636 : AH->tocsByDumpId[te->dumpId] = te;
1981 :
1982 : /*
1983 : * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1984 : * TOC entry that has a DATA item. We compute this by reversing the
1985 : * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1986 : * just one dependency and it is the TABLE item.
1987 : */
1988 100636 : if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1989 : {
1990 8530 : DumpId tableId = te->dependencies[0];
1991 :
1992 : /*
1993 : * The TABLE item might not have been in the archive, if this was
1994 : * a data-only dump; but its dump ID should be less than its data
1995 : * item's dump ID, so there should be a place for it in the array.
1996 : */
1997 8530 : if (tableId <= 0 || tableId > maxDumpId)
1998 0 : pg_fatal("bad table dumpId for TABLE DATA item");
1999 :
2000 8530 : AH->tableDataId[tableId] = te->dumpId;
2001 : }
2002 : }
2003 418 : }
2004 :
2005 : TocEntry *
2006 22008 : getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
2007 : {
2008 : /* build index arrays if we didn't already */
2009 22008 : if (AH->tocsByDumpId == NULL)
2010 76 : buildTocEntryArrays(AH);
2011 :
2012 22008 : if (id > 0 && id <= AH->maxDumpId)
2013 22008 : return AH->tocsByDumpId[id];
2014 :
2015 0 : return NULL;
2016 : }
2017 :
2018 : int
2019 21462 : TocIDRequired(ArchiveHandle *AH, DumpId id)
2020 : {
2021 21462 : TocEntry *te = getTocEntryByDumpId(AH, id);
2022 :
2023 21462 : if (!te)
2024 10118 : return 0;
2025 :
2026 11344 : return te->reqs;
2027 : }
2028 :
2029 : size_t
2030 19032 : WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
2031 : {
2032 : int off;
2033 :
2034 : /* Save the flag */
2035 19032 : AH->WriteBytePtr(AH, wasSet);
2036 :
2037 : /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2038 171288 : for (off = 0; off < sizeof(pgoff_t); off++)
2039 : {
2040 152256 : AH->WriteBytePtr(AH, o & 0xFF);
2041 152256 : o >>= 8;
2042 : }
2043 19032 : return sizeof(pgoff_t) + 1;
2044 : }
2045 :
2046 : int
2047 11780 : ReadOffset(ArchiveHandle *AH, pgoff_t * o)
2048 : {
2049 : int i;
2050 : int off;
2051 : int offsetFlg;
2052 :
2053 : /* Initialize to zero */
2054 11780 : *o = 0;
2055 :
2056 : /* Check for old version */
2057 11780 : if (AH->version < K_VERS_1_7)
2058 : {
2059 : /* Prior versions wrote offsets using WriteInt */
2060 0 : i = ReadInt(AH);
2061 : /* -1 means not set */
2062 0 : if (i < 0)
2063 0 : return K_OFFSET_POS_NOT_SET;
2064 0 : else if (i == 0)
2065 0 : return K_OFFSET_NO_DATA;
2066 :
2067 : /* Cast to pgoff_t because it was written as an int. */
2068 0 : *o = (pgoff_t) i;
2069 0 : return K_OFFSET_POS_SET;
2070 : }
2071 :
2072 : /*
2073 : * Read the flag indicating the state of the data pointer. Check if valid
2074 : * and die if not.
2075 : *
2076 : * This used to be handled by a negative or zero pointer, now we use an
2077 : * extra byte specifically for the state.
2078 : */
2079 11780 : offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2080 :
2081 11780 : switch (offsetFlg)
2082 : {
2083 11780 : case K_OFFSET_POS_NOT_SET:
2084 : case K_OFFSET_NO_DATA:
2085 : case K_OFFSET_POS_SET:
2086 :
2087 11780 : break;
2088 :
2089 0 : default:
2090 0 : pg_fatal("unexpected data offset flag %d", offsetFlg);
2091 : }
2092 :
2093 : /*
2094 : * Read the bytes
2095 : */
2096 106020 : for (off = 0; off < AH->offSize; off++)
2097 : {
2098 94240 : if (off < sizeof(pgoff_t))
2099 94240 : *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2100 : else
2101 : {
2102 0 : if (AH->ReadBytePtr(AH) != 0)
2103 0 : pg_fatal("file offset in dump file is too large");
2104 : }
2105 : }
2106 :
2107 11780 : return offsetFlg;
2108 : }
2109 :
2110 : size_t
2111 435036 : WriteInt(ArchiveHandle *AH, int i)
2112 : {
2113 : int b;
2114 :
2115 : /*
2116 : * This is a bit yucky, but I don't want to make the binary format very
2117 : * dependent on representation, and not knowing much about it, I write out
2118 : * a sign byte. If you change this, don't forget to change the file
2119 : * version #, and modify ReadInt to read the new format AS WELL AS the old
2120 : * formats.
2121 : */
2122 :
2123 : /* SIGN byte */
2124 435036 : if (i < 0)
2125 : {
2126 103808 : AH->WriteBytePtr(AH, 1);
2127 103808 : i = -i;
2128 : }
2129 : else
2130 331228 : AH->WriteBytePtr(AH, 0);
2131 :
2132 2175180 : for (b = 0; b < AH->intSize; b++)
2133 : {
2134 1740144 : AH->WriteBytePtr(AH, i & 0xFF);
2135 1740144 : i >>= 8;
2136 : }
2137 :
2138 435036 : return AH->intSize + 1;
2139 : }
2140 :
2141 : int
2142 314978 : ReadInt(ArchiveHandle *AH)
2143 : {
2144 314978 : int res = 0;
2145 : int bv,
2146 : b;
2147 314978 : int sign = 0; /* Default positive */
2148 314978 : int bitShift = 0;
2149 :
2150 314978 : if (AH->version > K_VERS_1_0)
2151 : /* Read a sign byte */
2152 314978 : sign = AH->ReadBytePtr(AH);
2153 :
2154 1574890 : for (b = 0; b < AH->intSize; b++)
2155 : {
2156 1259912 : bv = AH->ReadBytePtr(AH) & 0xFF;
2157 1259912 : if (bv != 0)
2158 298538 : res = res + (bv << bitShift);
2159 1259912 : bitShift += 8;
2160 : }
2161 :
2162 314978 : if (sign)
2163 75048 : res = -res;
2164 :
2165 314978 : return res;
2166 : }
2167 :
2168 : size_t
2169 341098 : WriteStr(ArchiveHandle *AH, const char *c)
2170 : {
2171 : size_t res;
2172 :
2173 341098 : if (c)
2174 : {
2175 237290 : int len = strlen(c);
2176 :
2177 237290 : res = WriteInt(AH, len);
2178 237290 : AH->WriteBufPtr(AH, c, len);
2179 237290 : res += len;
2180 : }
2181 : else
2182 103808 : res = WriteInt(AH, -1);
2183 :
2184 341098 : return res;
2185 : }
2186 :
2187 : char *
2188 247218 : ReadStr(ArchiveHandle *AH)
2189 : {
2190 : char *buf;
2191 : int l;
2192 :
2193 247218 : l = ReadInt(AH);
2194 247218 : if (l < 0)
2195 75048 : buf = NULL;
2196 : else
2197 : {
2198 172170 : buf = (char *) pg_malloc(l + 1);
2199 172170 : AH->ReadBufPtr(AH, buf, l);
2200 :
2201 172170 : buf[l] = '\0';
2202 : }
2203 :
2204 247218 : return buf;
2205 : }
2206 :
2207 : static bool
2208 22 : _fileExistsInDirectory(const char *dir, const char *filename)
2209 : {
2210 : struct stat st;
2211 : char buf[MAXPGPATH];
2212 :
2213 22 : if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2214 0 : pg_fatal("directory name too long: \"%s\"", dir);
2215 :
2216 22 : return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2217 : }
2218 :
2219 : static int
2220 84 : _discoverArchiveFormat(ArchiveHandle *AH)
2221 : {
2222 : FILE *fh;
2223 : char sig[6]; /* More than enough */
2224 : size_t cnt;
2225 84 : int wantClose = 0;
2226 :
2227 84 : pg_log_debug("attempting to ascertain archive format");
2228 :
2229 84 : free(AH->lookahead);
2230 :
2231 84 : AH->readHeader = 0;
2232 84 : AH->lookaheadSize = 512;
2233 84 : AH->lookahead = pg_malloc0(512);
2234 84 : AH->lookaheadLen = 0;
2235 84 : AH->lookaheadPos = 0;
2236 :
2237 84 : if (AH->fSpec)
2238 : {
2239 : struct stat st;
2240 :
2241 84 : wantClose = 1;
2242 :
2243 : /*
2244 : * Check if the specified archive is a directory. If so, check if
2245 : * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2246 : */
2247 84 : if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2248 : {
2249 22 : AH->format = archDirectory;
2250 22 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2251 22 : return AH->format;
2252 : #ifdef HAVE_LIBZ
2253 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2254 0 : return AH->format;
2255 : #endif
2256 : #ifdef USE_LZ4
2257 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2258 0 : return AH->format;
2259 : #endif
2260 : #ifdef USE_ZSTD
2261 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2262 : return AH->format;
2263 : #endif
2264 0 : pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2265 : AH->fSpec);
2266 : fh = NULL; /* keep compiler quiet */
2267 : }
2268 : else
2269 : {
2270 62 : fh = fopen(AH->fSpec, PG_BINARY_R);
2271 62 : if (!fh)
2272 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2273 : }
2274 : }
2275 : else
2276 : {
2277 0 : fh = stdin;
2278 0 : if (!fh)
2279 0 : pg_fatal("could not open input file: %m");
2280 : }
2281 :
2282 62 : if ((cnt = fread(sig, 1, 5, fh)) != 5)
2283 : {
2284 0 : if (ferror(fh))
2285 0 : pg_fatal("could not read input file: %m");
2286 : else
2287 0 : pg_fatal("input file is too short (read %lu, expected 5)",
2288 : (unsigned long) cnt);
2289 : }
2290 :
2291 : /* Save it, just in case we need it later */
2292 62 : memcpy(&AH->lookahead[0], sig, 5);
2293 62 : AH->lookaheadLen = 5;
2294 :
2295 62 : if (strncmp(sig, "PGDMP", 5) == 0)
2296 : {
2297 : /* It's custom format, stop here */
2298 60 : AH->format = archCustom;
2299 60 : AH->readHeader = 1;
2300 : }
2301 : else
2302 : {
2303 : /*
2304 : * *Maybe* we have a tar archive format file or a text dump ... So,
2305 : * read first 512 byte header...
2306 : */
2307 2 : cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2308 : /* read failure is checked below */
2309 2 : AH->lookaheadLen += cnt;
2310 :
2311 2 : if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2312 2 : (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2313 2 : strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2314 : {
2315 : /*
2316 : * looks like it's probably a text format dump. so suggest they
2317 : * try psql
2318 : */
2319 0 : pg_fatal("input file appears to be a text format dump. Please use psql.");
2320 : }
2321 :
2322 2 : if (AH->lookaheadLen != 512)
2323 : {
2324 0 : if (feof(fh))
2325 0 : pg_fatal("input file does not appear to be a valid archive (too short?)");
2326 : else
2327 0 : READ_ERROR_EXIT(fh);
2328 : }
2329 :
2330 2 : if (!isValidTarHeader(AH->lookahead))
2331 0 : pg_fatal("input file does not appear to be a valid archive");
2332 :
2333 2 : AH->format = archTar;
2334 : }
2335 :
2336 : /* Close the file if we opened it */
2337 62 : if (wantClose)
2338 : {
2339 62 : if (fclose(fh) != 0)
2340 0 : pg_fatal("could not close input file: %m");
2341 : /* Forget lookahead, since we'll re-read header after re-opening */
2342 62 : AH->readHeader = 0;
2343 62 : AH->lookaheadLen = 0;
2344 : }
2345 :
2346 62 : return AH->format;
2347 : }
2348 :
2349 :
2350 : /*
2351 : * Allocate an archive handle
2352 : */
2353 : static ArchiveHandle *
2354 498 : _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2355 : const pg_compress_specification compression_spec,
2356 : bool dosync, ArchiveMode mode,
2357 : SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
2358 : {
2359 : ArchiveHandle *AH;
2360 : CompressFileHandle *CFH;
2361 498 : pg_compress_specification out_compress_spec = {0};
2362 :
2363 498 : pg_log_debug("allocating AH for %s, format %d",
2364 : FileSpec ? FileSpec : "(stdio)", fmt);
2365 :
2366 498 : AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2367 :
2368 498 : AH->version = K_VERS_SELF;
2369 :
2370 : /* initialize for backwards compatible string processing */
2371 498 : AH->public.encoding = 0; /* PG_SQL_ASCII */
2372 498 : AH->public.std_strings = false;
2373 :
2374 : /* sql error handling */
2375 498 : AH->public.exit_on_error = true;
2376 498 : AH->public.n_errors = 0;
2377 :
2378 498 : AH->archiveDumpVersion = PG_VERSION;
2379 :
2380 498 : AH->createDate = time(NULL);
2381 :
2382 498 : AH->intSize = sizeof(int);
2383 498 : AH->offSize = sizeof(pgoff_t);
2384 498 : if (FileSpec)
2385 : {
2386 448 : AH->fSpec = pg_strdup(FileSpec);
2387 :
2388 : /*
2389 : * Not used; maybe later....
2390 : *
2391 : * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2392 : * i--) if (AH->workDir[i-1] == '/')
2393 : */
2394 : }
2395 : else
2396 50 : AH->fSpec = NULL;
2397 :
2398 498 : AH->currUser = NULL; /* unknown */
2399 498 : AH->currSchema = NULL; /* ditto */
2400 498 : AH->currTablespace = NULL; /* ditto */
2401 498 : AH->currTableAm = NULL; /* ditto */
2402 :
2403 498 : AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2404 :
2405 498 : AH->toc->next = AH->toc;
2406 498 : AH->toc->prev = AH->toc;
2407 :
2408 498 : AH->mode = mode;
2409 498 : AH->compression_spec = compression_spec;
2410 498 : AH->dosync = dosync;
2411 498 : AH->sync_method = sync_method;
2412 :
2413 498 : memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2414 :
2415 : /* Open stdout with no compression for AH output handle */
2416 498 : out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2417 498 : CFH = InitCompressFileHandle(out_compress_spec);
2418 498 : if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2419 0 : pg_fatal("could not open stdout for appending: %m");
2420 498 : AH->OF = CFH;
2421 :
2422 : /*
2423 : * On Windows, we need to use binary mode to read/write non-text files,
2424 : * which include all archive formats as well as compressed plain text.
2425 : * Force stdin/stdout into binary mode if that is what we are using.
2426 : */
2427 : #ifdef WIN32
2428 : if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2429 : (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2430 : {
2431 : if (mode == archModeWrite)
2432 : _setmode(fileno(stdout), O_BINARY);
2433 : else
2434 : _setmode(fileno(stdin), O_BINARY);
2435 : }
2436 : #endif
2437 :
2438 498 : AH->SetupWorkerPtr = setupWorkerPtr;
2439 :
2440 498 : if (fmt == archUnknown)
2441 84 : AH->format = _discoverArchiveFormat(AH);
2442 : else
2443 414 : AH->format = fmt;
2444 :
2445 498 : switch (AH->format)
2446 : {
2447 148 : case archCustom:
2448 148 : InitArchiveFmt_Custom(AH);
2449 148 : break;
2450 :
2451 296 : case archNull:
2452 296 : InitArchiveFmt_Null(AH);
2453 296 : break;
2454 :
2455 44 : case archDirectory:
2456 44 : InitArchiveFmt_Directory(AH);
2457 44 : break;
2458 :
2459 10 : case archTar:
2460 10 : InitArchiveFmt_Tar(AH);
2461 8 : break;
2462 :
2463 0 : default:
2464 0 : pg_fatal("unrecognized file format \"%d\"", fmt);
2465 : }
2466 :
2467 496 : return AH;
2468 : }
2469 :
2470 : /*
2471 : * Write out all data (tables & LOs)
2472 : */
2473 : void
2474 96 : WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2475 : {
2476 : TocEntry *te;
2477 :
2478 96 : if (pstate && pstate->numWorkers > 1)
2479 16 : {
2480 : /*
2481 : * In parallel mode, this code runs in the leader process. We
2482 : * construct an array of candidate TEs, then sort it into decreasing
2483 : * size order, then dispatch each TE to a data-transfer worker. By
2484 : * dumping larger tables first, we avoid getting into a situation
2485 : * where we're down to one job and it's big, losing parallelism.
2486 : */
2487 : TocEntry **tes;
2488 : int ntes;
2489 :
2490 16 : tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2491 16 : ntes = 0;
2492 2552 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2493 : {
2494 : /* Consider only TEs with dataDumper functions ... */
2495 2536 : if (!te->dataDumper)
2496 2294 : continue;
2497 : /* ... and ignore ones not enabled for dump */
2498 242 : if ((te->reqs & REQ_DATA) == 0)
2499 0 : continue;
2500 :
2501 242 : tes[ntes++] = te;
2502 : }
2503 :
2504 16 : if (ntes > 1)
2505 14 : qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2506 :
2507 258 : for (int i = 0; i < ntes; i++)
2508 242 : DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2509 : mark_dump_job_done, NULL);
2510 :
2511 16 : pg_free(tes);
2512 :
2513 : /* Now wait for workers to finish. */
2514 16 : WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2515 : }
2516 : else
2517 : {
2518 : /* Non-parallel mode: just dump all candidate TEs sequentially. */
2519 11114 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2520 : {
2521 : /* Must have same filter conditions as above */
2522 11034 : if (!te->dataDumper)
2523 10656 : continue;
2524 378 : if ((te->reqs & REQ_DATA) == 0)
2525 6 : continue;
2526 :
2527 372 : WriteDataChunksForTocEntry(AH, te);
2528 : }
2529 : }
2530 96 : }
2531 :
2532 :
2533 : /*
2534 : * Callback function that's invoked in the leader process after a step has
2535 : * been parallel dumped.
2536 : *
2537 : * We don't need to do anything except check for worker failure.
2538 : */
2539 : static void
2540 242 : mark_dump_job_done(ArchiveHandle *AH,
2541 : TocEntry *te,
2542 : int status,
2543 : void *callback_data)
2544 : {
2545 242 : pg_log_info("finished item %d %s %s",
2546 : te->dumpId, te->desc, te->tag);
2547 :
2548 242 : if (status != 0)
2549 0 : pg_fatal("worker process failed: exit code %d",
2550 : status);
2551 242 : }
2552 :
2553 :
2554 : void
2555 614 : WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2556 : {
2557 : StartDataPtrType startPtr;
2558 : EndDataPtrType endPtr;
2559 :
2560 614 : AH->currToc = te;
2561 :
2562 614 : if (strcmp(te->desc, "BLOBS") == 0)
2563 : {
2564 32 : startPtr = AH->StartLOsPtr;
2565 32 : endPtr = AH->EndLOsPtr;
2566 : }
2567 : else
2568 : {
2569 582 : startPtr = AH->StartDataPtr;
2570 582 : endPtr = AH->EndDataPtr;
2571 : }
2572 :
2573 614 : if (startPtr != NULL)
2574 614 : (*startPtr) (AH, te);
2575 :
2576 : /*
2577 : * The user-provided DataDumper routine needs to call AH->WriteData
2578 : */
2579 614 : te->dataDumper((Archive *) AH, te->dataDumperArg);
2580 :
2581 614 : if (endPtr != NULL)
2582 614 : (*endPtr) (AH, te);
2583 :
2584 614 : AH->currToc = NULL;
2585 614 : }
2586 :
2587 : void
2588 168 : WriteToc(ArchiveHandle *AH)
2589 : {
2590 : TocEntry *te;
2591 : char workbuf[32];
2592 : int tocCount;
2593 : int i;
2594 :
2595 : /* count entries that will actually be dumped */
2596 168 : tocCount = 0;
2597 23260 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2598 : {
2599 23092 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) != 0)
2600 23080 : tocCount++;
2601 : }
2602 :
2603 : /* printf("%d TOC Entries to save\n", tocCount); */
2604 :
2605 168 : WriteInt(AH, tocCount);
2606 :
2607 23260 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2608 : {
2609 23092 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) == 0)
2610 12 : continue;
2611 :
2612 23080 : WriteInt(AH, te->dumpId);
2613 23080 : WriteInt(AH, te->dataDumper ? 1 : 0);
2614 :
2615 : /* OID is recorded as a string for historical reasons */
2616 23080 : sprintf(workbuf, "%u", te->catalogId.tableoid);
2617 23080 : WriteStr(AH, workbuf);
2618 23080 : sprintf(workbuf, "%u", te->catalogId.oid);
2619 23080 : WriteStr(AH, workbuf);
2620 :
2621 23080 : WriteStr(AH, te->tag);
2622 23080 : WriteStr(AH, te->desc);
2623 23080 : WriteInt(AH, te->section);
2624 23080 : WriteStr(AH, te->defn);
2625 23080 : WriteStr(AH, te->dropStmt);
2626 23080 : WriteStr(AH, te->copyStmt);
2627 23080 : WriteStr(AH, te->namespace);
2628 23080 : WriteStr(AH, te->tablespace);
2629 23080 : WriteStr(AH, te->tableam);
2630 23080 : WriteInt(AH, te->relkind);
2631 23080 : WriteStr(AH, te->owner);
2632 23080 : WriteStr(AH, "false");
2633 :
2634 : /* Dump list of dependencies */
2635 59802 : for (i = 0; i < te->nDeps; i++)
2636 : {
2637 36722 : sprintf(workbuf, "%d", te->dependencies[i]);
2638 36722 : WriteStr(AH, workbuf);
2639 : }
2640 23080 : WriteStr(AH, NULL); /* Terminate List */
2641 :
2642 23080 : if (AH->WriteExtraTocPtr)
2643 23080 : AH->WriteExtraTocPtr(AH, te);
2644 : }
2645 168 : }
2646 :
2647 : void
2648 104 : ReadToc(ArchiveHandle *AH)
2649 : {
2650 : int i;
2651 : char *tmp;
2652 : DumpId *deps;
2653 : int depIdx;
2654 : int depSize;
2655 : TocEntry *te;
2656 : bool is_supported;
2657 :
2658 104 : AH->tocCount = ReadInt(AH);
2659 104 : AH->maxDumpId = 0;
2660 :
2661 16652 : for (i = 0; i < AH->tocCount; i++)
2662 : {
2663 16548 : te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2664 16548 : te->dumpId = ReadInt(AH);
2665 :
2666 16548 : if (te->dumpId > AH->maxDumpId)
2667 6770 : AH->maxDumpId = te->dumpId;
2668 :
2669 : /* Sanity check */
2670 16548 : if (te->dumpId <= 0)
2671 0 : pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2672 : te->dumpId);
2673 :
2674 16548 : te->hadDumper = ReadInt(AH);
2675 :
2676 16548 : if (AH->version >= K_VERS_1_8)
2677 : {
2678 16548 : tmp = ReadStr(AH);
2679 16548 : sscanf(tmp, "%u", &te->catalogId.tableoid);
2680 16548 : free(tmp);
2681 : }
2682 : else
2683 0 : te->catalogId.tableoid = InvalidOid;
2684 16548 : tmp = ReadStr(AH);
2685 16548 : sscanf(tmp, "%u", &te->catalogId.oid);
2686 16548 : free(tmp);
2687 :
2688 16548 : te->tag = ReadStr(AH);
2689 16548 : te->desc = ReadStr(AH);
2690 :
2691 16548 : if (AH->version >= K_VERS_1_11)
2692 : {
2693 16548 : te->section = ReadInt(AH);
2694 : }
2695 : else
2696 : {
2697 : /*
2698 : * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2699 : * the entries into sections. This list need not cover entry
2700 : * types added later than 8.4.
2701 : */
2702 0 : if (strcmp(te->desc, "COMMENT") == 0 ||
2703 0 : strcmp(te->desc, "ACL") == 0 ||
2704 0 : strcmp(te->desc, "ACL LANGUAGE") == 0)
2705 0 : te->section = SECTION_NONE;
2706 0 : else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2707 0 : strcmp(te->desc, "BLOBS") == 0 ||
2708 0 : strcmp(te->desc, "BLOB COMMENTS") == 0)
2709 0 : te->section = SECTION_DATA;
2710 0 : else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2711 0 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2712 0 : strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2713 0 : strcmp(te->desc, "INDEX") == 0 ||
2714 0 : strcmp(te->desc, "RULE") == 0 ||
2715 0 : strcmp(te->desc, "TRIGGER") == 0)
2716 0 : te->section = SECTION_POST_DATA;
2717 : else
2718 0 : te->section = SECTION_PRE_DATA;
2719 : }
2720 :
2721 16548 : te->defn = ReadStr(AH);
2722 16548 : te->dropStmt = ReadStr(AH);
2723 :
2724 16548 : if (AH->version >= K_VERS_1_3)
2725 16548 : te->copyStmt = ReadStr(AH);
2726 :
2727 16548 : if (AH->version >= K_VERS_1_6)
2728 16548 : te->namespace = ReadStr(AH);
2729 :
2730 16548 : if (AH->version >= K_VERS_1_10)
2731 16548 : te->tablespace = ReadStr(AH);
2732 :
2733 16548 : if (AH->version >= K_VERS_1_14)
2734 16548 : te->tableam = ReadStr(AH);
2735 :
2736 16548 : if (AH->version >= K_VERS_1_16)
2737 16548 : te->relkind = ReadInt(AH);
2738 :
2739 16548 : te->owner = ReadStr(AH);
2740 16548 : is_supported = true;
2741 16548 : if (AH->version < K_VERS_1_9)
2742 0 : is_supported = false;
2743 : else
2744 : {
2745 16548 : tmp = ReadStr(AH);
2746 :
2747 16548 : if (strcmp(tmp, "true") == 0)
2748 0 : is_supported = false;
2749 :
2750 16548 : free(tmp);
2751 : }
2752 :
2753 16548 : if (!is_supported)
2754 0 : pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2755 :
2756 : /* Read TOC entry dependencies */
2757 16548 : if (AH->version >= K_VERS_1_5)
2758 : {
2759 16548 : depSize = 100;
2760 16548 : deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2761 16548 : depIdx = 0;
2762 : for (;;)
2763 : {
2764 43562 : tmp = ReadStr(AH);
2765 43562 : if (!tmp)
2766 16548 : break; /* end of list */
2767 27014 : if (depIdx >= depSize)
2768 : {
2769 0 : depSize *= 2;
2770 0 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2771 : }
2772 27014 : sscanf(tmp, "%d", &deps[depIdx]);
2773 27014 : free(tmp);
2774 27014 : depIdx++;
2775 : }
2776 :
2777 16548 : if (depIdx > 0) /* We have a non-null entry */
2778 : {
2779 13910 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2780 13910 : te->dependencies = deps;
2781 13910 : te->nDeps = depIdx;
2782 : }
2783 : else
2784 : {
2785 2638 : free(deps);
2786 2638 : te->dependencies = NULL;
2787 2638 : te->nDeps = 0;
2788 : }
2789 : }
2790 : else
2791 : {
2792 0 : te->dependencies = NULL;
2793 0 : te->nDeps = 0;
2794 : }
2795 16548 : te->dataLength = 0;
2796 :
2797 16548 : if (AH->ReadExtraTocPtr)
2798 16548 : AH->ReadExtraTocPtr(AH, te);
2799 :
2800 16548 : pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2801 : i, te->dumpId, te->desc, te->tag);
2802 :
2803 : /* link completed entry into TOC circular list */
2804 16548 : te->prev = AH->toc->prev;
2805 16548 : AH->toc->prev->next = te;
2806 16548 : AH->toc->prev = te;
2807 16548 : te->next = AH->toc;
2808 :
2809 : /* special processing immediately upon read for some items */
2810 16548 : if (strcmp(te->desc, "ENCODING") == 0)
2811 104 : processEncodingEntry(AH, te);
2812 16444 : else if (strcmp(te->desc, "STDSTRINGS") == 0)
2813 104 : processStdStringsEntry(AH, te);
2814 16340 : else if (strcmp(te->desc, "SEARCHPATH") == 0)
2815 104 : processSearchPathEntry(AH, te);
2816 : }
2817 104 : }
2818 :
2819 : static void
2820 104 : processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2821 : {
2822 : /* te->defn should have the form SET client_encoding = 'foo'; */
2823 104 : char *defn = pg_strdup(te->defn);
2824 : char *ptr1;
2825 104 : char *ptr2 = NULL;
2826 : int encoding;
2827 :
2828 104 : ptr1 = strchr(defn, '\'');
2829 104 : if (ptr1)
2830 104 : ptr2 = strchr(++ptr1, '\'');
2831 104 : if (ptr2)
2832 : {
2833 104 : *ptr2 = '\0';
2834 104 : encoding = pg_char_to_encoding(ptr1);
2835 104 : if (encoding < 0)
2836 0 : pg_fatal("unrecognized encoding \"%s\"",
2837 : ptr1);
2838 104 : AH->public.encoding = encoding;
2839 104 : setFmtEncoding(encoding);
2840 : }
2841 : else
2842 0 : pg_fatal("invalid ENCODING item: %s",
2843 : te->defn);
2844 :
2845 104 : free(defn);
2846 104 : }
2847 :
2848 : static void
2849 104 : processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2850 : {
2851 : /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2852 : char *ptr1;
2853 :
2854 104 : ptr1 = strchr(te->defn, '\'');
2855 104 : if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2856 104 : AH->public.std_strings = true;
2857 0 : else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2858 0 : AH->public.std_strings = false;
2859 : else
2860 0 : pg_fatal("invalid STDSTRINGS item: %s",
2861 : te->defn);
2862 104 : }
2863 :
2864 : static void
2865 104 : processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2866 : {
2867 : /*
2868 : * te->defn should contain a command to set search_path. We just copy it
2869 : * verbatim for use later.
2870 : */
2871 104 : AH->public.searchpath = pg_strdup(te->defn);
2872 104 : }
2873 :
2874 : static void
2875 0 : StrictNamesCheck(RestoreOptions *ropt)
2876 : {
2877 : const char *missing_name;
2878 :
2879 : Assert(ropt->strict_names);
2880 :
2881 0 : if (ropt->schemaNames.head != NULL)
2882 : {
2883 0 : missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2884 0 : if (missing_name != NULL)
2885 0 : pg_fatal("schema \"%s\" not found", missing_name);
2886 : }
2887 :
2888 0 : if (ropt->tableNames.head != NULL)
2889 : {
2890 0 : missing_name = simple_string_list_not_touched(&ropt->tableNames);
2891 0 : if (missing_name != NULL)
2892 0 : pg_fatal("table \"%s\" not found", missing_name);
2893 : }
2894 :
2895 0 : if (ropt->indexNames.head != NULL)
2896 : {
2897 0 : missing_name = simple_string_list_not_touched(&ropt->indexNames);
2898 0 : if (missing_name != NULL)
2899 0 : pg_fatal("index \"%s\" not found", missing_name);
2900 : }
2901 :
2902 0 : if (ropt->functionNames.head != NULL)
2903 : {
2904 0 : missing_name = simple_string_list_not_touched(&ropt->functionNames);
2905 0 : if (missing_name != NULL)
2906 0 : pg_fatal("function \"%s\" not found", missing_name);
2907 : }
2908 :
2909 0 : if (ropt->triggerNames.head != NULL)
2910 : {
2911 0 : missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2912 0 : if (missing_name != NULL)
2913 0 : pg_fatal("trigger \"%s\" not found", missing_name);
2914 : }
2915 0 : }
2916 :
2917 : /*
2918 : * Determine whether we want to restore this TOC entry.
2919 : *
2920 : * Returns 0 if entry should be skipped, or some combination of the
2921 : * REQ_SCHEMA, REQ_DATA, and REQ_STATS bits if we want to restore schema, data
2922 : * and/or statistics portions of this TOC entry, or REQ_SPECIAL if it's a
2923 : * special entry.
2924 : */
2925 : static int
2926 103730 : _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2927 : {
2928 103730 : int res = REQ_SCHEMA | REQ_DATA;
2929 103730 : RestoreOptions *ropt = AH->public.ropt;
2930 :
2931 : /* These items are treated specially */
2932 103730 : if (strcmp(te->desc, "ENCODING") == 0 ||
2933 103272 : strcmp(te->desc, "STDSTRINGS") == 0 ||
2934 102814 : strcmp(te->desc, "SEARCHPATH") == 0)
2935 1374 : return REQ_SPECIAL;
2936 :
2937 102356 : if (strcmp(te->desc, "STATISTICS DATA") == 0)
2938 : {
2939 16030 : if (!ropt->dumpStatistics)
2940 0 : return 0;
2941 :
2942 16030 : res = REQ_STATS;
2943 : }
2944 :
2945 : /*
2946 : * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2947 : * restored in createDB mode, and not restored otherwise, independently of
2948 : * all else.
2949 : */
2950 102356 : if (strcmp(te->desc, "DATABASE") == 0 ||
2951 102098 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2952 : {
2953 376 : if (ropt->createDB)
2954 320 : return REQ_SCHEMA;
2955 : else
2956 56 : return 0;
2957 : }
2958 :
2959 : /*
2960 : * Process exclusions that affect certain classes of TOC entries.
2961 : */
2962 :
2963 : /* If it's an ACL, maybe ignore it */
2964 101980 : if (ropt->aclsSkip && _tocEntryIsACL(te))
2965 0 : return 0;
2966 :
2967 : /* If it's a comment, maybe ignore it */
2968 101980 : if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2969 0 : return 0;
2970 :
2971 : /* If it's a policy, maybe ignore it */
2972 101980 : if (ropt->no_policies &&
2973 682 : (strcmp(te->desc, "POLICY") == 0 ||
2974 682 : strcmp(te->desc, "ROW SECURITY") == 0))
2975 0 : return 0;
2976 :
2977 : /*
2978 : * If it's a publication or a table part of a publication, maybe ignore
2979 : * it.
2980 : */
2981 101980 : if (ropt->no_publications &&
2982 0 : (strcmp(te->desc, "PUBLICATION") == 0 ||
2983 0 : strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2984 0 : strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2985 0 : return 0;
2986 :
2987 : /* If it's a security label, maybe ignore it */
2988 101980 : if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2989 0 : return 0;
2990 :
2991 : /* If it's a subscription, maybe ignore it */
2992 101980 : if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2993 0 : return 0;
2994 :
2995 : /* Ignore it if section is not to be dumped/restored */
2996 101980 : switch (curSection)
2997 : {
2998 60980 : case SECTION_PRE_DATA:
2999 60980 : if (!(ropt->dumpSections & DUMP_PRE_DATA))
3000 708 : return 0;
3001 60272 : break;
3002 20412 : case SECTION_DATA:
3003 20412 : if (!(ropt->dumpSections & DUMP_DATA))
3004 320 : return 0;
3005 20092 : break;
3006 20588 : case SECTION_POST_DATA:
3007 20588 : if (!(ropt->dumpSections & DUMP_POST_DATA))
3008 444 : return 0;
3009 20144 : break;
3010 0 : default:
3011 : /* shouldn't get here, really, but ignore it */
3012 0 : return 0;
3013 : }
3014 :
3015 : /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
3016 100508 : if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
3017 0 : return 0;
3018 :
3019 : /*
3020 : * Check options for selective dump/restore.
3021 : */
3022 100508 : if (strcmp(te->desc, "ACL") == 0 ||
3023 95332 : strcmp(te->desc, "COMMENT") == 0 ||
3024 81898 : strcmp(te->desc, "STATISTICS DATA") == 0 ||
3025 66136 : strcmp(te->desc, "SECURITY LABEL") == 0)
3026 : {
3027 : /* Database properties react to createDB, not selectivity options. */
3028 34372 : if (strncmp(te->tag, "DATABASE ", 9) == 0)
3029 : {
3030 184 : if (!ropt->createDB)
3031 38 : return 0;
3032 : }
3033 34188 : else if (ropt->schemaNames.head != NULL ||
3034 34176 : ropt->schemaExcludeNames.head != NULL ||
3035 34164 : ropt->selTypes)
3036 : {
3037 : /*
3038 : * In a selective dump/restore, we want to restore these dependent
3039 : * TOC entry types only if their parent object is being restored.
3040 : * Without selectivity options, we let through everything in the
3041 : * archive. Note there may be such entries with no parent, eg
3042 : * non-default ACLs for built-in objects. Also, we make
3043 : * per-column ACLs additionally depend on the table's ACL if any
3044 : * to ensure correct restore order, so those dependencies should
3045 : * be ignored in this check.
3046 : *
3047 : * This code depends on the parent having been marked already,
3048 : * which should be the case; if it isn't, perhaps due to
3049 : * SortTocFromFile rearrangement, skipping the dependent entry
3050 : * seems prudent anyway.
3051 : *
3052 : * Ideally we'd handle, eg, table CHECK constraints this way too.
3053 : * But it's hard to tell which of their dependencies is the one to
3054 : * consult.
3055 : */
3056 76 : bool dumpthis = false;
3057 :
3058 196 : for (int i = 0; i < te->nDeps; i++)
3059 : {
3060 136 : TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3061 :
3062 136 : if (!pte)
3063 60 : continue; /* probably shouldn't happen */
3064 76 : if (strcmp(pte->desc, "ACL") == 0)
3065 0 : continue; /* ignore dependency on another ACL */
3066 76 : if (pte->reqs == 0)
3067 60 : continue; /* this object isn't marked, so ignore it */
3068 : /* Found a parent to be dumped, so we want to dump this too */
3069 16 : dumpthis = true;
3070 16 : break;
3071 : }
3072 76 : if (!dumpthis)
3073 60 : return 0;
3074 : }
3075 : }
3076 : else
3077 : {
3078 : /* Apply selective-restore rules for standalone TOC entries. */
3079 66136 : if (ropt->schemaNames.head != NULL)
3080 : {
3081 : /* If no namespace is specified, it means all. */
3082 40 : if (!te->namespace)
3083 4 : return 0;
3084 36 : if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3085 28 : return 0;
3086 : }
3087 :
3088 66104 : if (ropt->schemaExcludeNames.head != NULL &&
3089 76 : te->namespace &&
3090 36 : simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3091 8 : return 0;
3092 :
3093 66096 : if (ropt->selTypes)
3094 : {
3095 156 : if (strcmp(te->desc, "TABLE") == 0 ||
3096 116 : strcmp(te->desc, "TABLE DATA") == 0 ||
3097 76 : strcmp(te->desc, "VIEW") == 0 ||
3098 76 : strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3099 76 : strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3100 76 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3101 76 : strcmp(te->desc, "SEQUENCE") == 0 ||
3102 70 : strcmp(te->desc, "SEQUENCE SET") == 0)
3103 : {
3104 92 : if (!ropt->selTable)
3105 60 : return 0;
3106 32 : if (ropt->tableNames.head != NULL &&
3107 32 : !simple_string_list_member(&ropt->tableNames, te->tag))
3108 28 : return 0;
3109 : }
3110 64 : else if (strcmp(te->desc, "INDEX") == 0)
3111 : {
3112 12 : if (!ropt->selIndex)
3113 8 : return 0;
3114 4 : if (ropt->indexNames.head != NULL &&
3115 4 : !simple_string_list_member(&ropt->indexNames, te->tag))
3116 2 : return 0;
3117 : }
3118 52 : else if (strcmp(te->desc, "FUNCTION") == 0 ||
3119 28 : strcmp(te->desc, "AGGREGATE") == 0 ||
3120 28 : strcmp(te->desc, "PROCEDURE") == 0)
3121 : {
3122 24 : if (!ropt->selFunction)
3123 8 : return 0;
3124 16 : if (ropt->functionNames.head != NULL &&
3125 16 : !simple_string_list_member(&ropt->functionNames, te->tag))
3126 12 : return 0;
3127 : }
3128 28 : else if (strcmp(te->desc, "TRIGGER") == 0)
3129 : {
3130 12 : if (!ropt->selTrigger)
3131 8 : return 0;
3132 4 : if (ropt->triggerNames.head != NULL &&
3133 4 : !simple_string_list_member(&ropt->triggerNames, te->tag))
3134 2 : return 0;
3135 : }
3136 : else
3137 16 : return 0;
3138 : }
3139 : }
3140 :
3141 :
3142 : /*
3143 : * Determine whether the TOC entry contains schema and/or data components,
3144 : * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3145 : * it's both schema and data. Otherwise it's probably schema-only, but
3146 : * there are exceptions.
3147 : */
3148 100226 : if (!te->hadDumper)
3149 : {
3150 : /*
3151 : * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3152 : * is considered a data entry. We don't need to check for BLOBS or
3153 : * old-style BLOB COMMENTS entries, because they will have hadDumper =
3154 : * true ... but we do need to check new-style BLOB ACLs, comments,
3155 : * etc.
3156 : */
3157 91432 : if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3158 90444 : strcmp(te->desc, "BLOB") == 0 ||
3159 90444 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3160 90240 : (strcmp(te->desc, "ACL") == 0 &&
3161 5176 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3162 90146 : (strcmp(te->desc, "COMMENT") == 0 &&
3163 13396 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3164 90026 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3165 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3166 1406 : res = res & REQ_DATA;
3167 : else
3168 90026 : res = res & ~REQ_DATA;
3169 : }
3170 :
3171 : /*
3172 : * If there's no definition command, there's no schema component. Treat
3173 : * "load via partition root" comments as not schema.
3174 : */
3175 100226 : if (!te->defn || !te->defn[0] ||
3176 91456 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
3177 8794 : res = res & ~REQ_SCHEMA;
3178 :
3179 : /*
3180 : * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3181 : * always ignore it.
3182 : */
3183 100226 : if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3184 0 : return 0;
3185 :
3186 : /* Mask it if we don't want data */
3187 100226 : if (!ropt->dumpData)
3188 : {
3189 : /*
3190 : * The sequence_data option overrides dumpData for SEQUENCE SET.
3191 : *
3192 : * In binary-upgrade mode, even with dumpData unset, we do not mask
3193 : * out large objects. (Only large object definitions, comments and
3194 : * other metadata should be generated in binary-upgrade mode, not the
3195 : * actual data, but that need not concern us here.)
3196 : */
3197 8374 : if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3198 8244 : !(ropt->binary_upgrade &&
3199 6890 : (strcmp(te->desc, "BLOB") == 0 ||
3200 6890 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3201 6884 : (strcmp(te->desc, "ACL") == 0 &&
3202 178 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3203 6882 : (strcmp(te->desc, "COMMENT") == 0 &&
3204 166 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3205 6876 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3206 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3207 8230 : res = res & (REQ_SCHEMA | REQ_STATS);
3208 : }
3209 :
3210 : /* Mask it if we don't want schema */
3211 100226 : if (!ropt->dumpSchema)
3212 776 : res = res & (REQ_DATA | REQ_STATS);
3213 :
3214 100226 : return res;
3215 : }
3216 :
3217 : /*
3218 : * Identify which pass we should restore this TOC entry in.
3219 : *
3220 : * See notes with the RestorePass typedef in pg_backup_archiver.h.
3221 : */
3222 : static RestorePass
3223 222880 : _tocEntryRestorePass(ArchiveHandle *AH, TocEntry *te)
3224 : {
3225 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3226 222880 : if (strcmp(te->desc, "ACL") == 0 ||
3227 211766 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3228 211766 : strcmp(te->desc, "DEFAULT ACL") == 0)
3229 11918 : return RESTORE_PASS_ACL;
3230 210962 : if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3231 210730 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3232 1944 : return RESTORE_PASS_POST_ACL;
3233 :
3234 : /*
3235 : * Comments need to be emitted in the same pass as their parent objects.
3236 : * ACLs haven't got comments, and neither do matview data objects, but
3237 : * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3238 : * we'd need yet another weird special case.)
3239 : */
3240 209018 : if (strcmp(te->desc, "COMMENT") == 0 &&
3241 26898 : strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3242 0 : return RESTORE_PASS_POST_ACL;
3243 :
3244 : /*
3245 : * If statistics data is dependent on materialized view data, it must be
3246 : * deferred to RESTORE_PASS_POST_ACL.
3247 : */
3248 209018 : if (strcmp(te->desc, "STATISTICS DATA") == 0)
3249 : {
3250 105754 : for (int i = 0; i < te->nDeps; i++)
3251 : {
3252 73802 : DumpId depid = te->dependencies[i];
3253 :
3254 73802 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
3255 : {
3256 34508 : TocEntry *otherte = AH->tocsByDumpId[depid];
3257 :
3258 34508 : if (strcmp(otherte->desc, "MATERIALIZED VIEW DATA") == 0)
3259 1616 : return RESTORE_PASS_POST_ACL;
3260 : }
3261 : }
3262 : }
3263 :
3264 : /* All else can be handled in the main pass. */
3265 207402 : return RESTORE_PASS_MAIN;
3266 : }
3267 :
3268 : /*
3269 : * Identify TOC entries that are ACLs.
3270 : *
3271 : * Note: it seems worth duplicating some code here to avoid a hard-wired
3272 : * assumption that these are exactly the same entries that we restore during
3273 : * the RESTORE_PASS_ACL phase.
3274 : */
3275 : static bool
3276 85768 : _tocEntryIsACL(TocEntry *te)
3277 : {
3278 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3279 85768 : if (strcmp(te->desc, "ACL") == 0 ||
3280 81910 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3281 81910 : strcmp(te->desc, "DEFAULT ACL") == 0)
3282 4126 : return true;
3283 81642 : return false;
3284 : }
3285 :
3286 : /*
3287 : * Issue SET commands for parameters that we want to have set the same way
3288 : * at all times during execution of a restore script.
3289 : */
3290 : static void
3291 554 : _doSetFixedOutputState(ArchiveHandle *AH)
3292 : {
3293 554 : RestoreOptions *ropt = AH->public.ropt;
3294 :
3295 : /*
3296 : * Disable timeouts to allow for slow commands, idle parallel workers, etc
3297 : */
3298 554 : ahprintf(AH, "SET statement_timeout = 0;\n");
3299 554 : ahprintf(AH, "SET lock_timeout = 0;\n");
3300 554 : ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3301 554 : ahprintf(AH, "SET transaction_timeout = 0;\n");
3302 :
3303 : /* Select the correct character set encoding */
3304 554 : ahprintf(AH, "SET client_encoding = '%s';\n",
3305 : pg_encoding_to_char(AH->public.encoding));
3306 :
3307 : /* Select the correct string literal syntax */
3308 554 : ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3309 554 : AH->public.std_strings ? "on" : "off");
3310 :
3311 : /* Select the role to be used during restore */
3312 554 : if (ropt && ropt->use_role)
3313 0 : ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3314 :
3315 : /* Select the dump-time search_path */
3316 554 : if (AH->public.searchpath)
3317 554 : ahprintf(AH, "%s", AH->public.searchpath);
3318 :
3319 : /* Make sure function checking is disabled */
3320 554 : ahprintf(AH, "SET check_function_bodies = false;\n");
3321 :
3322 : /* Ensure that all valid XML data will be accepted */
3323 554 : ahprintf(AH, "SET xmloption = content;\n");
3324 :
3325 : /* Avoid annoying notices etc */
3326 554 : ahprintf(AH, "SET client_min_messages = warning;\n");
3327 554 : if (!AH->public.std_strings)
3328 0 : ahprintf(AH, "SET escape_string_warning = off;\n");
3329 :
3330 : /* Adjust row-security state */
3331 554 : if (ropt && ropt->enable_row_security)
3332 0 : ahprintf(AH, "SET row_security = on;\n");
3333 : else
3334 554 : ahprintf(AH, "SET row_security = off;\n");
3335 :
3336 : /*
3337 : * In --transaction-size mode, we should always be in a transaction when
3338 : * we begin to restore objects.
3339 : */
3340 554 : if (ropt && ropt->txn_size > 0)
3341 : {
3342 144 : if (AH->connection)
3343 144 : StartTransaction(&AH->public);
3344 : else
3345 0 : ahprintf(AH, "\nBEGIN;\n");
3346 144 : AH->txnCount = 0;
3347 : }
3348 :
3349 554 : ahprintf(AH, "\n");
3350 554 : }
3351 :
3352 : /*
3353 : * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3354 : * for updating state if appropriate. If user is NULL or an empty string,
3355 : * the specification DEFAULT will be used.
3356 : */
3357 : static void
3358 2 : _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3359 : {
3360 2 : PQExpBuffer cmd = createPQExpBuffer();
3361 :
3362 2 : appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3363 :
3364 : /*
3365 : * SQL requires a string literal here. Might as well be correct.
3366 : */
3367 2 : if (user && *user)
3368 2 : appendStringLiteralAHX(cmd, user, AH);
3369 : else
3370 0 : appendPQExpBufferStr(cmd, "DEFAULT");
3371 2 : appendPQExpBufferChar(cmd, ';');
3372 :
3373 2 : if (RestoringToDB(AH))
3374 : {
3375 : PGresult *res;
3376 :
3377 0 : res = PQexec(AH->connection, cmd->data);
3378 :
3379 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3380 : /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3381 0 : pg_fatal("could not set session user to \"%s\": %s",
3382 : user, PQerrorMessage(AH->connection));
3383 :
3384 0 : PQclear(res);
3385 : }
3386 : else
3387 2 : ahprintf(AH, "%s\n\n", cmd->data);
3388 :
3389 2 : destroyPQExpBuffer(cmd);
3390 2 : }
3391 :
3392 :
3393 : /*
3394 : * Issue the commands to connect to the specified database.
3395 : *
3396 : * If we're currently restoring right into a database, this will
3397 : * actually establish a connection. Otherwise it puts a \connect into
3398 : * the script output.
3399 : */
3400 : static void
3401 168 : _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3402 : {
3403 168 : if (RestoringToDB(AH))
3404 98 : ReconnectToServer(AH, dbname);
3405 : else
3406 : {
3407 : PQExpBufferData connectbuf;
3408 :
3409 70 : initPQExpBuffer(&connectbuf);
3410 70 : appendPsqlMetaConnect(&connectbuf, dbname);
3411 70 : ahprintf(AH, "%s\n", connectbuf.data);
3412 70 : termPQExpBuffer(&connectbuf);
3413 : }
3414 :
3415 : /*
3416 : * NOTE: currUser keeps track of what the imaginary session user in our
3417 : * script is. It's now effectively reset to the original userID.
3418 : */
3419 168 : free(AH->currUser);
3420 168 : AH->currUser = NULL;
3421 :
3422 : /* don't assume we still know the output schema, tablespace, etc either */
3423 168 : free(AH->currSchema);
3424 168 : AH->currSchema = NULL;
3425 :
3426 168 : free(AH->currTableAm);
3427 168 : AH->currTableAm = NULL;
3428 :
3429 168 : free(AH->currTablespace);
3430 168 : AH->currTablespace = NULL;
3431 :
3432 : /* re-establish fixed state */
3433 168 : _doSetFixedOutputState(AH);
3434 168 : }
3435 :
3436 : /*
3437 : * Become the specified user, and update state to avoid redundant commands
3438 : *
3439 : * NULL or empty argument is taken to mean restoring the session default
3440 : */
3441 : static void
3442 124 : _becomeUser(ArchiveHandle *AH, const char *user)
3443 : {
3444 124 : if (!user)
3445 0 : user = ""; /* avoid null pointers */
3446 :
3447 124 : if (AH->currUser && strcmp(AH->currUser, user) == 0)
3448 122 : return; /* no need to do anything */
3449 :
3450 2 : _doSetSessionAuth(AH, user);
3451 :
3452 : /*
3453 : * NOTE: currUser keeps track of what the imaginary session user in our
3454 : * script is
3455 : */
3456 2 : free(AH->currUser);
3457 2 : AH->currUser = pg_strdup(user);
3458 : }
3459 :
3460 : /*
3461 : * Become the owner of the given TOC entry object. If
3462 : * changes in ownership are not allowed, this doesn't do anything.
3463 : */
3464 : static void
3465 93624 : _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3466 : {
3467 93624 : RestoreOptions *ropt = AH->public.ropt;
3468 :
3469 93624 : if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3470 93624 : return;
3471 :
3472 0 : _becomeUser(AH, te->owner);
3473 : }
3474 :
3475 :
3476 : /*
3477 : * Issue the commands to select the specified schema as the current schema
3478 : * in the target database.
3479 : */
3480 : static void
3481 93772 : _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3482 : {
3483 : PQExpBuffer qry;
3484 :
3485 : /*
3486 : * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3487 : * that search_path rather than switching to entry-specific paths.
3488 : * Otherwise, it's an old archive that will not restore correctly unless
3489 : * we set the search_path as it's expecting.
3490 : */
3491 93772 : if (AH->public.searchpath)
3492 93772 : return;
3493 :
3494 0 : if (!schemaName || *schemaName == '\0' ||
3495 0 : (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3496 0 : return; /* no need to do anything */
3497 :
3498 0 : qry = createPQExpBuffer();
3499 :
3500 0 : appendPQExpBuffer(qry, "SET search_path = %s",
3501 : fmtId(schemaName));
3502 0 : if (strcmp(schemaName, "pg_catalog") != 0)
3503 0 : appendPQExpBufferStr(qry, ", pg_catalog");
3504 :
3505 0 : if (RestoringToDB(AH))
3506 : {
3507 : PGresult *res;
3508 :
3509 0 : res = PQexec(AH->connection, qry->data);
3510 :
3511 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3512 0 : warn_or_exit_horribly(AH,
3513 : "could not set \"search_path\" to \"%s\": %s",
3514 0 : schemaName, PQerrorMessage(AH->connection));
3515 :
3516 0 : PQclear(res);
3517 : }
3518 : else
3519 0 : ahprintf(AH, "%s;\n\n", qry->data);
3520 :
3521 0 : free(AH->currSchema);
3522 0 : AH->currSchema = pg_strdup(schemaName);
3523 :
3524 0 : destroyPQExpBuffer(qry);
3525 : }
3526 :
3527 : /*
3528 : * Issue the commands to select the specified tablespace as the current one
3529 : * in the target database.
3530 : */
3531 : static void
3532 85154 : _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3533 : {
3534 85154 : RestoreOptions *ropt = AH->public.ropt;
3535 : PQExpBuffer qry;
3536 : const char *want,
3537 : *have;
3538 :
3539 : /* do nothing in --no-tablespaces mode */
3540 85154 : if (ropt->noTablespace)
3541 0 : return;
3542 :
3543 85154 : have = AH->currTablespace;
3544 85154 : want = tablespace;
3545 :
3546 : /* no need to do anything for non-tablespace object */
3547 85154 : if (!want)
3548 69294 : return;
3549 :
3550 15860 : if (have && strcmp(want, have) == 0)
3551 15444 : return; /* no need to do anything */
3552 :
3553 416 : qry = createPQExpBuffer();
3554 :
3555 416 : if (strcmp(want, "") == 0)
3556 : {
3557 : /* We want the tablespace to be the database's default */
3558 320 : appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3559 : }
3560 : else
3561 : {
3562 : /* We want an explicit tablespace */
3563 96 : appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3564 : }
3565 :
3566 416 : if (RestoringToDB(AH))
3567 : {
3568 : PGresult *res;
3569 :
3570 40 : res = PQexec(AH->connection, qry->data);
3571 :
3572 40 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3573 0 : warn_or_exit_horribly(AH,
3574 : "could not set \"default_tablespace\" to %s: %s",
3575 0 : fmtId(want), PQerrorMessage(AH->connection));
3576 :
3577 40 : PQclear(res);
3578 : }
3579 : else
3580 376 : ahprintf(AH, "%s;\n\n", qry->data);
3581 :
3582 416 : free(AH->currTablespace);
3583 416 : AH->currTablespace = pg_strdup(want);
3584 :
3585 416 : destroyPQExpBuffer(qry);
3586 : }
3587 :
3588 : /*
3589 : * Set the proper default_table_access_method value for the table.
3590 : */
3591 : static void
3592 84086 : _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3593 : {
3594 84086 : RestoreOptions *ropt = AH->public.ropt;
3595 : PQExpBuffer cmd;
3596 : const char *want,
3597 : *have;
3598 :
3599 : /* do nothing in --no-table-access-method mode */
3600 84086 : if (ropt->noTableAm)
3601 690 : return;
3602 :
3603 83396 : have = AH->currTableAm;
3604 83396 : want = tableam;
3605 :
3606 83396 : if (!want)
3607 73672 : return;
3608 :
3609 9724 : if (have && strcmp(want, have) == 0)
3610 9068 : return;
3611 :
3612 656 : cmd = createPQExpBuffer();
3613 656 : appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3614 :
3615 656 : if (RestoringToDB(AH))
3616 : {
3617 : PGresult *res;
3618 :
3619 38 : res = PQexec(AH->connection, cmd->data);
3620 :
3621 38 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3622 0 : warn_or_exit_horribly(AH,
3623 : "could not set \"default_table_access_method\": %s",
3624 0 : PQerrorMessage(AH->connection));
3625 :
3626 38 : PQclear(res);
3627 : }
3628 : else
3629 618 : ahprintf(AH, "%s\n\n", cmd->data);
3630 :
3631 656 : destroyPQExpBuffer(cmd);
3632 :
3633 656 : free(AH->currTableAm);
3634 656 : AH->currTableAm = pg_strdup(want);
3635 : }
3636 :
3637 : /*
3638 : * Set the proper default table access method for a table without storage.
3639 : * Currently, this is required only for partitioned tables with a table AM.
3640 : */
3641 : static void
3642 1068 : _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
3643 : {
3644 1068 : RestoreOptions *ropt = AH->public.ropt;
3645 1068 : const char *tableam = te->tableam;
3646 : PQExpBuffer cmd;
3647 :
3648 : /* do nothing in --no-table-access-method mode */
3649 1068 : if (ropt->noTableAm)
3650 6 : return;
3651 :
3652 1062 : if (!tableam)
3653 994 : return;
3654 :
3655 : Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3656 :
3657 68 : cmd = createPQExpBuffer();
3658 :
3659 68 : appendPQExpBufferStr(cmd, "ALTER TABLE ");
3660 68 : appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3661 68 : appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3662 : fmtId(tableam));
3663 :
3664 68 : if (RestoringToDB(AH))
3665 : {
3666 : PGresult *res;
3667 :
3668 0 : res = PQexec(AH->connection, cmd->data);
3669 :
3670 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3671 0 : warn_or_exit_horribly(AH,
3672 : "could not alter table access method: %s",
3673 0 : PQerrorMessage(AH->connection));
3674 0 : PQclear(res);
3675 : }
3676 : else
3677 68 : ahprintf(AH, "%s\n\n", cmd->data);
3678 :
3679 68 : destroyPQExpBuffer(cmd);
3680 : }
3681 :
3682 : /*
3683 : * Extract an object description for a TOC entry, and append it to buf.
3684 : *
3685 : * This is used for ALTER ... OWNER TO.
3686 : *
3687 : * If the object type has no owner, do nothing.
3688 : */
3689 : static void
3690 40538 : _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
3691 : {
3692 40538 : const char *type = te->desc;
3693 :
3694 : /* objects that don't require special decoration */
3695 40538 : if (strcmp(type, "COLLATION") == 0 ||
3696 35618 : strcmp(type, "CONVERSION") == 0 ||
3697 34782 : strcmp(type, "DOMAIN") == 0 ||
3698 34504 : strcmp(type, "FOREIGN TABLE") == 0 ||
3699 34432 : strcmp(type, "MATERIALIZED VIEW") == 0 ||
3700 33672 : strcmp(type, "SEQUENCE") == 0 ||
3701 33200 : strcmp(type, "STATISTICS") == 0 ||
3702 32958 : strcmp(type, "TABLE") == 0 ||
3703 22936 : strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3704 22598 : strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3705 22310 : strcmp(type, "TYPE") == 0 ||
3706 21100 : strcmp(type, "VIEW") == 0 ||
3707 : /* non-schema-specified objects */
3708 20064 : strcmp(type, "DATABASE") == 0 ||
3709 19952 : strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3710 19886 : strcmp(type, "SCHEMA") == 0 ||
3711 19544 : strcmp(type, "EVENT TRIGGER") == 0 ||
3712 19468 : strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3713 19376 : strcmp(type, "SERVER") == 0 ||
3714 19280 : strcmp(type, "PUBLICATION") == 0 ||
3715 18948 : strcmp(type, "SUBSCRIPTION") == 0)
3716 : {
3717 21792 : appendPQExpBuffer(buf, "%s ", type);
3718 21792 : if (te->namespace && *te->namespace)
3719 20474 : appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3720 21792 : appendPQExpBufferStr(buf, fmtId(te->tag));
3721 : }
3722 : /* LOs just have a name, but it's numeric so must not use fmtId */
3723 18746 : else if (strcmp(type, "BLOB") == 0)
3724 : {
3725 0 : appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3726 : }
3727 :
3728 : /*
3729 : * These object types require additional decoration. Fortunately, the
3730 : * information needed is exactly what's in the DROP command.
3731 : */
3732 18746 : else if (strcmp(type, "AGGREGATE") == 0 ||
3733 18206 : strcmp(type, "FUNCTION") == 0 ||
3734 15008 : strcmp(type, "OPERATOR") == 0 ||
3735 10036 : strcmp(type, "OPERATOR CLASS") == 0 ||
3736 8740 : strcmp(type, "OPERATOR FAMILY") == 0 ||
3737 7658 : strcmp(type, "PROCEDURE") == 0)
3738 : {
3739 : /* Chop "DROP " off the front and make a modifiable copy */
3740 11276 : char *first = pg_strdup(te->dropStmt + 5);
3741 : char *last;
3742 :
3743 : /* point to last character in string */
3744 11276 : last = first + strlen(first) - 1;
3745 :
3746 : /* Strip off any ';' or '\n' at the end */
3747 33828 : while (last >= first && (*last == '\n' || *last == ';'))
3748 22552 : last--;
3749 11276 : *(last + 1) = '\0';
3750 :
3751 11276 : appendPQExpBufferStr(buf, first);
3752 :
3753 11276 : free(first);
3754 11276 : return;
3755 : }
3756 : /* these object types don't have separate owners */
3757 7470 : else if (strcmp(type, "CAST") == 0 ||
3758 7470 : strcmp(type, "CHECK CONSTRAINT") == 0 ||
3759 7380 : strcmp(type, "CONSTRAINT") == 0 ||
3760 4522 : strcmp(type, "DATABASE PROPERTIES") == 0 ||
3761 4506 : strcmp(type, "DEFAULT") == 0 ||
3762 4162 : strcmp(type, "FK CONSTRAINT") == 0 ||
3763 3836 : strcmp(type, "INDEX") == 0 ||
3764 1750 : strcmp(type, "RULE") == 0 ||
3765 1328 : strcmp(type, "TRIGGER") == 0 ||
3766 552 : strcmp(type, "ROW SECURITY") == 0 ||
3767 552 : strcmp(type, "POLICY") == 0 ||
3768 66 : strcmp(type, "USER MAPPING") == 0)
3769 : {
3770 : /* do nothing */
3771 : }
3772 : else
3773 0 : pg_fatal("don't know how to set owner for object type \"%s\"", type);
3774 : }
3775 :
3776 : /*
3777 : * Emit the SQL commands to create the object represented by a TOC entry
3778 : *
3779 : * This now also includes issuing an ALTER OWNER command to restore the
3780 : * object's ownership, if wanted. But note that the object's permissions
3781 : * will remain at default, until the matching ACL TOC entry is restored.
3782 : */
3783 : static void
3784 85154 : _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx)
3785 : {
3786 85154 : RestoreOptions *ropt = AH->public.ropt;
3787 :
3788 : /*
3789 : * Select owner, schema, tablespace and default AM as necessary. The
3790 : * default access method for partitioned tables is handled after
3791 : * generating the object definition, as it requires an ALTER command
3792 : * rather than SET.
3793 : */
3794 85154 : _becomeOwner(AH, te);
3795 85154 : _selectOutputSchema(AH, te->namespace);
3796 85154 : _selectTablespace(AH, te->tablespace);
3797 85154 : if (te->relkind != RELKIND_PARTITIONED_TABLE)
3798 84086 : _selectTableAccessMethod(AH, te->tableam);
3799 :
3800 : /* Emit header comment for item */
3801 85154 : if (!AH->noTocComments)
3802 : {
3803 : char *sanitized_name;
3804 : char *sanitized_schema;
3805 : char *sanitized_owner;
3806 :
3807 78556 : ahprintf(AH, "--\n");
3808 78556 : if (AH->public.verbose)
3809 : {
3810 2106 : ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3811 : te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3812 2106 : if (te->nDeps > 0)
3813 : {
3814 : int i;
3815 :
3816 1382 : ahprintf(AH, "-- Dependencies:");
3817 3812 : for (i = 0; i < te->nDeps; i++)
3818 2430 : ahprintf(AH, " %d", te->dependencies[i]);
3819 1382 : ahprintf(AH, "\n");
3820 : }
3821 : }
3822 :
3823 78556 : sanitized_name = sanitize_line(te->tag, false);
3824 78556 : sanitized_schema = sanitize_line(te->namespace, true);
3825 78556 : sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3826 :
3827 78556 : ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3828 : pfx, sanitized_name, te->desc, sanitized_schema,
3829 : sanitized_owner);
3830 :
3831 78556 : free(sanitized_name);
3832 78556 : free(sanitized_schema);
3833 78556 : free(sanitized_owner);
3834 :
3835 78556 : if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3836 : {
3837 : char *sanitized_tablespace;
3838 :
3839 204 : sanitized_tablespace = sanitize_line(te->tablespace, false);
3840 204 : ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3841 204 : free(sanitized_tablespace);
3842 : }
3843 78556 : ahprintf(AH, "\n");
3844 :
3845 78556 : if (AH->PrintExtraTocPtr != NULL)
3846 7260 : AH->PrintExtraTocPtr(AH, te);
3847 78556 : ahprintf(AH, "--\n\n");
3848 : }
3849 :
3850 : /*
3851 : * Actually print the definition. Normally we can just print the defn
3852 : * string if any, but we have three special cases:
3853 : *
3854 : * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3855 : * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3856 : * "public" that is a comment. We have to do this when --no-owner mode is
3857 : * selected. This is ugly, but I see no other good way ...
3858 : *
3859 : * 2. BLOB METADATA entries need special processing since their defn
3860 : * strings are just lists of OIDs, not complete SQL commands.
3861 : *
3862 : * 3. ACL LARGE OBJECTS entries need special processing because they
3863 : * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3864 : * apply to each large object listed in the associated BLOB METADATA.
3865 : */
3866 85154 : if (ropt->noOwner &&
3867 736 : strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3868 : {
3869 4 : ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3870 : }
3871 85150 : else if (strcmp(te->desc, "BLOB METADATA") == 0)
3872 : {
3873 154 : IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3874 : }
3875 84996 : else if (strcmp(te->desc, "ACL") == 0 &&
3876 3858 : strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3877 : {
3878 0 : IssueACLPerBlob(AH, te);
3879 : }
3880 84996 : else if (te->defn && strlen(te->defn) > 0)
3881 : {
3882 77030 : ahprintf(AH, "%s\n\n", te->defn);
3883 :
3884 : /*
3885 : * If the defn string contains multiple SQL commands, txn_size mode
3886 : * should count it as N actions not one. But rather than build a full
3887 : * SQL parser, approximate this by counting semicolons. One case
3888 : * where that tends to be badly fooled is function definitions, so
3889 : * ignore them. (restore_toc_entry will count one action anyway.)
3890 : */
3891 77030 : if (ropt->txn_size > 0 &&
3892 6358 : strcmp(te->desc, "FUNCTION") != 0 &&
3893 5822 : strcmp(te->desc, "PROCEDURE") != 0)
3894 : {
3895 5798 : const char *p = te->defn;
3896 5798 : int nsemis = 0;
3897 :
3898 25902 : while ((p = strchr(p, ';')) != NULL)
3899 : {
3900 20104 : nsemis++;
3901 20104 : p++;
3902 : }
3903 5798 : if (nsemis > 1)
3904 2818 : AH->txnCount += nsemis - 1;
3905 : }
3906 : }
3907 :
3908 : /*
3909 : * If we aren't using SET SESSION AUTH to determine ownership, we must
3910 : * instead issue an ALTER OWNER command. Schema "public" is special; when
3911 : * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3912 : * when using SET SESSION for all other objects. We assume that anything
3913 : * without a DROP command is not a separately ownable object.
3914 : */
3915 85154 : if (!ropt->noOwner &&
3916 84418 : (!ropt->use_setsessauth ||
3917 0 : (strcmp(te->desc, "SCHEMA") == 0 &&
3918 0 : strncmp(te->defn, "--", 2) == 0)) &&
3919 84418 : te->owner && strlen(te->owner) > 0 &&
3920 71708 : te->dropStmt && strlen(te->dropStmt) > 0)
3921 : {
3922 40688 : if (strcmp(te->desc, "BLOB METADATA") == 0)
3923 : {
3924 : /* BLOB METADATA needs special code to handle multiple LOs */
3925 150 : char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
3926 :
3927 150 : IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
3928 150 : pg_free(cmdEnd);
3929 : }
3930 : else
3931 : {
3932 : /* For all other cases, we can use _getObjectDescription */
3933 : PQExpBufferData temp;
3934 :
3935 40538 : initPQExpBuffer(&temp);
3936 40538 : _getObjectDescription(&temp, te);
3937 :
3938 : /*
3939 : * If _getObjectDescription() didn't fill the buffer, then there
3940 : * is no owner.
3941 : */
3942 40538 : if (temp.data[0])
3943 33068 : ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
3944 33068 : temp.data, fmtId(te->owner));
3945 40538 : termPQExpBuffer(&temp);
3946 : }
3947 : }
3948 :
3949 : /*
3950 : * Select a partitioned table's default AM, once the table definition has
3951 : * been generated.
3952 : */
3953 85154 : if (te->relkind == RELKIND_PARTITIONED_TABLE)
3954 1068 : _printTableAccessMethodNoStorage(AH, te);
3955 :
3956 : /*
3957 : * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3958 : * commands, so we can no longer assume we know the current auth setting.
3959 : */
3960 85154 : if (_tocEntryIsACL(te))
3961 : {
3962 4126 : free(AH->currUser);
3963 4126 : AH->currUser = NULL;
3964 : }
3965 85154 : }
3966 :
3967 : /*
3968 : * Sanitize a string to be included in an SQL comment or TOC listing, by
3969 : * replacing any newlines with spaces. This ensures each logical output line
3970 : * is in fact one physical output line, to prevent corruption of the dump
3971 : * (which could, in the worst case, present an SQL injection vulnerability
3972 : * if someone were to incautiously load a dump containing objects with
3973 : * maliciously crafted names).
3974 : *
3975 : * The result is a freshly malloc'd string. If the input string is NULL,
3976 : * return a malloc'ed empty string, unless want_hyphen, in which case return a
3977 : * malloc'ed hyphen.
3978 : *
3979 : * Note that we currently don't bother to quote names, meaning that the name
3980 : * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3981 : * it only examines the dumpId field, but someday we might want to try harder.
3982 : */
3983 : static char *
3984 244232 : sanitize_line(const char *str, bool want_hyphen)
3985 : {
3986 : char *result;
3987 : char *s;
3988 :
3989 244232 : if (!str)
3990 14818 : return pg_strdup(want_hyphen ? "-" : "");
3991 :
3992 229414 : result = pg_strdup(str);
3993 :
3994 2976482 : for (s = result; *s != '\0'; s++)
3995 : {
3996 2747068 : if (*s == '\n' || *s == '\r')
3997 180 : *s = ' ';
3998 : }
3999 :
4000 229414 : return result;
4001 : }
4002 :
4003 : /*
4004 : * Write the file header for a custom-format archive
4005 : */
4006 : void
4007 96 : WriteHead(ArchiveHandle *AH)
4008 : {
4009 : struct tm crtm;
4010 :
4011 96 : AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
4012 96 : AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
4013 96 : AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
4014 96 : AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
4015 96 : AH->WriteBytePtr(AH, AH->intSize);
4016 96 : AH->WriteBytePtr(AH, AH->offSize);
4017 96 : AH->WriteBytePtr(AH, AH->format);
4018 96 : AH->WriteBytePtr(AH, AH->compression_spec.algorithm);
4019 96 : crtm = *localtime(&AH->createDate);
4020 96 : WriteInt(AH, crtm.tm_sec);
4021 96 : WriteInt(AH, crtm.tm_min);
4022 96 : WriteInt(AH, crtm.tm_hour);
4023 96 : WriteInt(AH, crtm.tm_mday);
4024 96 : WriteInt(AH, crtm.tm_mon);
4025 96 : WriteInt(AH, crtm.tm_year);
4026 96 : WriteInt(AH, crtm.tm_isdst);
4027 96 : WriteStr(AH, PQdb(AH->connection));
4028 96 : WriteStr(AH, AH->public.remoteVersionStr);
4029 96 : WriteStr(AH, PG_VERSION);
4030 96 : }
4031 :
4032 : void
4033 104 : ReadHead(ArchiveHandle *AH)
4034 : {
4035 : char *errmsg;
4036 : char vmaj,
4037 : vmin,
4038 : vrev;
4039 : int fmt;
4040 :
4041 : /*
4042 : * If we haven't already read the header, do so.
4043 : *
4044 : * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
4045 : * way to unify the cases?
4046 : */
4047 104 : if (!AH->readHeader)
4048 : {
4049 : char tmpMag[7];
4050 :
4051 104 : AH->ReadBufPtr(AH, tmpMag, 5);
4052 :
4053 104 : if (strncmp(tmpMag, "PGDMP", 5) != 0)
4054 0 : pg_fatal("did not find magic string in file header");
4055 : }
4056 :
4057 104 : vmaj = AH->ReadBytePtr(AH);
4058 104 : vmin = AH->ReadBytePtr(AH);
4059 :
4060 104 : if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4061 104 : vrev = AH->ReadBytePtr(AH);
4062 : else
4063 0 : vrev = 0;
4064 :
4065 104 : AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
4066 :
4067 104 : if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4068 0 : pg_fatal("unsupported version (%d.%d) in file header",
4069 : vmaj, vmin);
4070 :
4071 104 : AH->intSize = AH->ReadBytePtr(AH);
4072 104 : if (AH->intSize > 32)
4073 0 : pg_fatal("sanity check on integer size (%lu) failed",
4074 : (unsigned long) AH->intSize);
4075 :
4076 104 : if (AH->intSize > sizeof(int))
4077 0 : pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4078 :
4079 104 : if (AH->version >= K_VERS_1_7)
4080 104 : AH->offSize = AH->ReadBytePtr(AH);
4081 : else
4082 0 : AH->offSize = AH->intSize;
4083 :
4084 104 : fmt = AH->ReadBytePtr(AH);
4085 :
4086 104 : if (AH->format != fmt)
4087 0 : pg_fatal("expected format (%d) differs from format found in file (%d)",
4088 : AH->format, fmt);
4089 :
4090 104 : if (AH->version >= K_VERS_1_15)
4091 104 : AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
4092 0 : else if (AH->version >= K_VERS_1_2)
4093 : {
4094 : /* Guess the compression method based on the level */
4095 0 : if (AH->version < K_VERS_1_4)
4096 0 : AH->compression_spec.level = AH->ReadBytePtr(AH);
4097 : else
4098 0 : AH->compression_spec.level = ReadInt(AH);
4099 :
4100 0 : if (AH->compression_spec.level != 0)
4101 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4102 : }
4103 : else
4104 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4105 :
4106 104 : errmsg = supports_compression(AH->compression_spec);
4107 104 : if (errmsg)
4108 : {
4109 0 : pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4110 : errmsg);
4111 0 : pg_free(errmsg);
4112 : }
4113 :
4114 104 : if (AH->version >= K_VERS_1_4)
4115 : {
4116 : struct tm crtm;
4117 :
4118 104 : crtm.tm_sec = ReadInt(AH);
4119 104 : crtm.tm_min = ReadInt(AH);
4120 104 : crtm.tm_hour = ReadInt(AH);
4121 104 : crtm.tm_mday = ReadInt(AH);
4122 104 : crtm.tm_mon = ReadInt(AH);
4123 104 : crtm.tm_year = ReadInt(AH);
4124 104 : crtm.tm_isdst = ReadInt(AH);
4125 :
4126 : /*
4127 : * Newer versions of glibc have mktime() report failure if tm_isdst is
4128 : * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4129 : * TZ=UTC. This is problematic when restoring an archive under a
4130 : * different timezone setting. If we get a failure, try again with
4131 : * tm_isdst set to -1 ("don't know").
4132 : *
4133 : * XXX with or without this hack, we reconstruct createDate
4134 : * incorrectly when the prevailing timezone is different from
4135 : * pg_dump's. Next time we bump the archive version, we should flush
4136 : * this representation and store a plain seconds-since-the-Epoch
4137 : * timestamp instead.
4138 : */
4139 104 : AH->createDate = mktime(&crtm);
4140 104 : if (AH->createDate == (time_t) -1)
4141 : {
4142 0 : crtm.tm_isdst = -1;
4143 0 : AH->createDate = mktime(&crtm);
4144 0 : if (AH->createDate == (time_t) -1)
4145 0 : pg_log_warning("invalid creation date in header");
4146 : }
4147 : }
4148 :
4149 104 : if (AH->version >= K_VERS_1_4)
4150 : {
4151 104 : AH->archdbname = ReadStr(AH);
4152 : }
4153 :
4154 104 : if (AH->version >= K_VERS_1_10)
4155 : {
4156 104 : AH->archiveRemoteVersion = ReadStr(AH);
4157 104 : AH->archiveDumpVersion = ReadStr(AH);
4158 : }
4159 104 : }
4160 :
4161 :
4162 : /*
4163 : * checkSeek
4164 : * check to see if ftell/fseek can be performed.
4165 : */
4166 : bool
4167 158 : checkSeek(FILE *fp)
4168 : {
4169 : pgoff_t tpos;
4170 :
4171 : /* Check that ftello works on this file */
4172 158 : tpos = ftello(fp);
4173 158 : if (tpos < 0)
4174 2 : return false;
4175 :
4176 : /*
4177 : * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4178 : * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4179 : * successful no-op even on files that are otherwise unseekable.
4180 : */
4181 156 : if (fseeko(fp, tpos, SEEK_SET) != 0)
4182 0 : return false;
4183 :
4184 156 : return true;
4185 : }
4186 :
4187 :
4188 : /*
4189 : * dumpTimestamp
4190 : */
4191 : static void
4192 140 : dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4193 : {
4194 : char buf[64];
4195 :
4196 140 : if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4197 140 : ahprintf(AH, "-- %s %s\n\n", msg, buf);
4198 140 : }
4199 :
4200 : /*
4201 : * Main engine for parallel restore.
4202 : *
4203 : * Parallel restore is done in three phases. In this first phase,
4204 : * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4205 : * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4206 : * PRE_DATA items other than ACLs.) Entries we can't process now are
4207 : * added to the pending_list for later phases to deal with.
4208 : */
4209 : static void
4210 8 : restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
4211 : {
4212 : bool skipped_some;
4213 : TocEntry *next_work_item;
4214 :
4215 8 : pg_log_debug("entering restore_toc_entries_prefork");
4216 :
4217 : /* Adjust dependency information */
4218 8 : fix_dependencies(AH);
4219 :
4220 : /*
4221 : * Do all the early stuff in a single connection in the parent. There's no
4222 : * great point in running it in parallel, in fact it will actually run
4223 : * faster in a single connection because we avoid all the connection and
4224 : * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4225 : * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4226 : * not risk trying to process them out-of-order.
4227 : *
4228 : * Stuff that we can't do immediately gets added to the pending_list.
4229 : * Note: we don't yet filter out entries that aren't going to be restored.
4230 : * They might participate in dependency chains connecting entries that
4231 : * should be restored, so we treat them as live until we actually process
4232 : * them.
4233 : *
4234 : * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4235 : * before DATA items, and all DATA items before POST_DATA items. That is
4236 : * not certain to be true in older archives, though, and in any case use
4237 : * of a list file would destroy that ordering (cf. SortTocFromFile). So
4238 : * this loop cannot assume that it holds.
4239 : */
4240 8 : AH->restorePass = RESTORE_PASS_MAIN;
4241 8 : skipped_some = false;
4242 276 : for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4243 : {
4244 268 : bool do_now = true;
4245 :
4246 268 : if (next_work_item->section != SECTION_PRE_DATA)
4247 : {
4248 : /* DATA and POST_DATA items are just ignored for now */
4249 168 : if (next_work_item->section == SECTION_DATA ||
4250 96 : next_work_item->section == SECTION_POST_DATA)
4251 : {
4252 168 : do_now = false;
4253 168 : skipped_some = true;
4254 : }
4255 : else
4256 : {
4257 : /*
4258 : * SECTION_NONE items, such as comments, can be processed now
4259 : * if we are still in the PRE_DATA part of the archive. Once
4260 : * we've skipped any items, we have to consider whether the
4261 : * comment's dependencies are satisfied, so skip it for now.
4262 : */
4263 0 : if (skipped_some)
4264 0 : do_now = false;
4265 : }
4266 : }
4267 :
4268 : /*
4269 : * Also skip items that need to be forced into later passes. We need
4270 : * not set skipped_some in this case, since by assumption no main-pass
4271 : * items could depend on these.
4272 : */
4273 268 : if (_tocEntryRestorePass(AH, next_work_item) != RESTORE_PASS_MAIN)
4274 0 : do_now = false;
4275 :
4276 268 : if (do_now)
4277 : {
4278 : /* OK, restore the item and update its dependencies */
4279 100 : pg_log_info("processing item %d %s %s",
4280 : next_work_item->dumpId,
4281 : next_work_item->desc, next_work_item->tag);
4282 :
4283 100 : (void) restore_toc_entry(AH, next_work_item, false);
4284 :
4285 : /* Reduce dependencies, but don't move anything to ready_heap */
4286 100 : reduce_dependencies(AH, next_work_item, NULL);
4287 : }
4288 : else
4289 : {
4290 : /* Nope, so add it to pending_list */
4291 168 : pending_list_append(pending_list, next_work_item);
4292 : }
4293 : }
4294 :
4295 : /*
4296 : * In --transaction-size mode, we must commit the open transaction before
4297 : * dropping the database connection. This also ensures that child workers
4298 : * can see the objects we've created so far.
4299 : */
4300 8 : if (AH->public.ropt->txn_size > 0)
4301 0 : CommitTransaction(&AH->public);
4302 :
4303 : /*
4304 : * Now close parent connection in prep for parallel steps. We do this
4305 : * mainly to ensure that we don't exceed the specified number of parallel
4306 : * connections.
4307 : */
4308 8 : DisconnectDatabase(&AH->public);
4309 :
4310 : /* blow away any transient state from the old connection */
4311 8 : free(AH->currUser);
4312 8 : AH->currUser = NULL;
4313 8 : free(AH->currSchema);
4314 8 : AH->currSchema = NULL;
4315 8 : free(AH->currTablespace);
4316 8 : AH->currTablespace = NULL;
4317 8 : free(AH->currTableAm);
4318 8 : AH->currTableAm = NULL;
4319 8 : }
4320 :
4321 : /*
4322 : * Main engine for parallel restore.
4323 : *
4324 : * Parallel restore is done in three phases. In this second phase,
4325 : * we process entries by dispatching them to parallel worker children
4326 : * (processes on Unix, threads on Windows), each of which connects
4327 : * separately to the database. Inter-entry dependencies are respected,
4328 : * and so is the RestorePass multi-pass structure. When we can no longer
4329 : * make any entries ready to process, we exit. Normally, there will be
4330 : * nothing left to do; but if there is, the third phase will mop up.
4331 : */
4332 : static void
4333 8 : restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4334 : TocEntry *pending_list)
4335 : {
4336 : binaryheap *ready_heap;
4337 : TocEntry *next_work_item;
4338 :
4339 8 : pg_log_debug("entering restore_toc_entries_parallel");
4340 :
4341 : /* Set up ready_heap with enough room for all known TocEntrys */
4342 8 : ready_heap = binaryheap_allocate(AH->tocCount,
4343 : TocEntrySizeCompareBinaryheap,
4344 : NULL);
4345 :
4346 : /*
4347 : * The pending_list contains all items that we need to restore. Move all
4348 : * items that are available to process immediately into the ready_heap.
4349 : * After this setup, the pending list is everything that needs to be done
4350 : * but is blocked by one or more dependencies, while the ready heap
4351 : * contains items that have no remaining dependencies and are OK to
4352 : * process in the current restore pass.
4353 : */
4354 8 : AH->restorePass = RESTORE_PASS_MAIN;
4355 8 : move_to_ready_heap(AH, pending_list, ready_heap, AH->restorePass);
4356 :
4357 : /*
4358 : * main parent loop
4359 : *
4360 : * Keep going until there is no worker still running AND there is no work
4361 : * left to be done. Note invariant: at top of loop, there should always
4362 : * be at least one worker available to dispatch a job to.
4363 : */
4364 8 : pg_log_info("entering main parallel loop");
4365 :
4366 : for (;;)
4367 : {
4368 : /* Look for an item ready to be dispatched to a worker */
4369 220 : next_work_item = pop_next_work_item(ready_heap, pstate);
4370 220 : if (next_work_item != NULL)
4371 : {
4372 : /* If not to be restored, don't waste time launching a worker */
4373 168 : if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
4374 : {
4375 0 : pg_log_info("skipping item %d %s %s",
4376 : next_work_item->dumpId,
4377 : next_work_item->desc, next_work_item->tag);
4378 : /* Update its dependencies as though we'd completed it */
4379 0 : reduce_dependencies(AH, next_work_item, ready_heap);
4380 : /* Loop around to see if anything else can be dispatched */
4381 0 : continue;
4382 : }
4383 :
4384 168 : pg_log_info("launching item %d %s %s",
4385 : next_work_item->dumpId,
4386 : next_work_item->desc, next_work_item->tag);
4387 :
4388 : /* Dispatch to some worker */
4389 168 : DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4390 : mark_restore_job_done, ready_heap);
4391 : }
4392 52 : else if (IsEveryWorkerIdle(pstate))
4393 : {
4394 : /*
4395 : * Nothing is ready and no worker is running, so we're done with
4396 : * the current pass or maybe with the whole process.
4397 : */
4398 24 : if (AH->restorePass == RESTORE_PASS_LAST)
4399 8 : break; /* No more parallel processing is possible */
4400 :
4401 : /* Advance to next restore pass */
4402 16 : AH->restorePass++;
4403 : /* That probably allows some stuff to be made ready */
4404 16 : move_to_ready_heap(AH, pending_list, ready_heap, AH->restorePass);
4405 : /* Loop around to see if anything's now ready */
4406 16 : continue;
4407 : }
4408 : else
4409 : {
4410 : /*
4411 : * We have nothing ready, but at least one child is working, so
4412 : * wait for some subjob to finish.
4413 : */
4414 : }
4415 :
4416 : /*
4417 : * Before dispatching another job, check to see if anything has
4418 : * finished. We should check every time through the loop so as to
4419 : * reduce dependencies as soon as possible. If we were unable to
4420 : * dispatch any job this time through, wait until some worker finishes
4421 : * (and, hopefully, unblocks some pending item). If we did dispatch
4422 : * something, continue as soon as there's at least one idle worker.
4423 : * Note that in either case, there's guaranteed to be at least one
4424 : * idle worker when we return to the top of the loop. This ensures we
4425 : * won't block inside DispatchJobForTocEntry, which would be
4426 : * undesirable: we'd rather postpone dispatching until we see what's
4427 : * been unblocked by finished jobs.
4428 : */
4429 196 : WaitForWorkers(AH, pstate,
4430 : next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4431 : }
4432 :
4433 : /* There should now be nothing in ready_heap. */
4434 : Assert(binaryheap_empty(ready_heap));
4435 :
4436 8 : binaryheap_free(ready_heap);
4437 :
4438 8 : pg_log_info("finished main parallel loop");
4439 8 : }
4440 :
4441 : /*
4442 : * Main engine for parallel restore.
4443 : *
4444 : * Parallel restore is done in three phases. In this third phase,
4445 : * we mop up any remaining TOC entries by processing them serially.
4446 : * This phase normally should have nothing to do, but if we've somehow
4447 : * gotten stuck due to circular dependencies or some such, this provides
4448 : * at least some chance of completing the restore successfully.
4449 : */
4450 : static void
4451 8 : restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4452 : {
4453 8 : RestoreOptions *ropt = AH->public.ropt;
4454 : TocEntry *te;
4455 :
4456 8 : pg_log_debug("entering restore_toc_entries_postfork");
4457 :
4458 : /*
4459 : * Now reconnect the single parent connection.
4460 : */
4461 8 : ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4462 :
4463 : /* re-establish fixed state */
4464 8 : _doSetFixedOutputState(AH);
4465 :
4466 : /*
4467 : * Make sure there is no work left due to, say, circular dependencies, or
4468 : * some other pathological condition. If so, do it in the single parent
4469 : * connection. We don't sweat about RestorePass ordering; it's likely we
4470 : * already violated that.
4471 : */
4472 8 : for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4473 : {
4474 0 : pg_log_info("processing missed item %d %s %s",
4475 : te->dumpId, te->desc, te->tag);
4476 0 : (void) restore_toc_entry(AH, te, false);
4477 : }
4478 8 : }
4479 :
4480 : /*
4481 : * Check if te1 has an exclusive lock requirement for an item that te2 also
4482 : * requires, whether or not te2's requirement is for an exclusive lock.
4483 : */
4484 : static bool
4485 576 : has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4486 : {
4487 : int j,
4488 : k;
4489 :
4490 936 : for (j = 0; j < te1->nLockDeps; j++)
4491 : {
4492 1154 : for (k = 0; k < te2->nDeps; k++)
4493 : {
4494 794 : if (te1->lockDeps[j] == te2->dependencies[k])
4495 0 : return true;
4496 : }
4497 : }
4498 576 : return false;
4499 : }
4500 :
4501 :
4502 : /*
4503 : * Initialize the header of the pending-items list.
4504 : *
4505 : * This is a circular list with a dummy TocEntry as header, just like the
4506 : * main TOC list; but we use separate list links so that an entry can be in
4507 : * the main TOC list as well as in the pending list.
4508 : */
4509 : static void
4510 8 : pending_list_header_init(TocEntry *l)
4511 : {
4512 8 : l->pending_prev = l->pending_next = l;
4513 8 : }
4514 :
4515 : /* Append te to the end of the pending-list headed by l */
4516 : static void
4517 168 : pending_list_append(TocEntry *l, TocEntry *te)
4518 : {
4519 168 : te->pending_prev = l->pending_prev;
4520 168 : l->pending_prev->pending_next = te;
4521 168 : l->pending_prev = te;
4522 168 : te->pending_next = l;
4523 168 : }
4524 :
4525 : /* Remove te from the pending-list */
4526 : static void
4527 168 : pending_list_remove(TocEntry *te)
4528 : {
4529 168 : te->pending_prev->pending_next = te->pending_next;
4530 168 : te->pending_next->pending_prev = te->pending_prev;
4531 168 : te->pending_prev = NULL;
4532 168 : te->pending_next = NULL;
4533 168 : }
4534 :
4535 :
4536 : /* qsort comparator for sorting TocEntries by dataLength */
4537 : static int
4538 2118 : TocEntrySizeCompareQsort(const void *p1, const void *p2)
4539 : {
4540 2118 : const TocEntry *te1 = *(const TocEntry *const *) p1;
4541 2118 : const TocEntry *te2 = *(const TocEntry *const *) p2;
4542 :
4543 : /* Sort by decreasing dataLength */
4544 2118 : if (te1->dataLength > te2->dataLength)
4545 282 : return -1;
4546 1836 : if (te1->dataLength < te2->dataLength)
4547 316 : return 1;
4548 :
4549 : /* For equal dataLengths, sort by dumpId, just to be stable */
4550 1520 : if (te1->dumpId < te2->dumpId)
4551 590 : return -1;
4552 930 : if (te1->dumpId > te2->dumpId)
4553 910 : return 1;
4554 :
4555 20 : return 0;
4556 : }
4557 :
4558 : /* binaryheap comparator for sorting TocEntries by dataLength */
4559 : static int
4560 1130 : TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4561 : {
4562 : /* return opposite of qsort comparator for max-heap */
4563 1130 : return -TocEntrySizeCompareQsort(&p1, &p2);
4564 : }
4565 :
4566 :
4567 : /*
4568 : * Move all immediately-ready items from pending_list to ready_heap.
4569 : *
4570 : * Items are considered ready if they have no remaining dependencies and
4571 : * they belong in the current restore pass. (See also reduce_dependencies,
4572 : * which applies the same logic one-at-a-time.)
4573 : */
4574 : static void
4575 24 : move_to_ready_heap(ArchiveHandle *AH,
4576 : TocEntry *pending_list,
4577 : binaryheap *ready_heap,
4578 : RestorePass pass)
4579 : {
4580 : TocEntry *te;
4581 : TocEntry *next_te;
4582 :
4583 192 : for (te = pending_list->pending_next; te != pending_list; te = next_te)
4584 : {
4585 : /* must save list link before possibly removing te from list */
4586 168 : next_te = te->pending_next;
4587 :
4588 248 : if (te->depCount == 0 &&
4589 80 : _tocEntryRestorePass(AH, te) == pass)
4590 : {
4591 : /* Remove it from pending_list ... */
4592 80 : pending_list_remove(te);
4593 : /* ... and add to ready_heap */
4594 80 : binaryheap_add(ready_heap, te);
4595 : }
4596 : }
4597 24 : }
4598 :
4599 : /*
4600 : * Find the next work item (if any) that is capable of being run now,
4601 : * and remove it from the ready_heap.
4602 : *
4603 : * Returns the item, or NULL if nothing is runnable.
4604 : *
4605 : * To qualify, the item must have no remaining dependencies
4606 : * and no requirements for locks that are incompatible with
4607 : * items currently running. Items in the ready_heap are known to have
4608 : * no remaining dependencies, but we have to check for lock conflicts.
4609 : */
4610 : static TocEntry *
4611 220 : pop_next_work_item(binaryheap *ready_heap,
4612 : ParallelState *pstate)
4613 : {
4614 : /*
4615 : * Search the ready_heap until we find a suitable item. Note that we do a
4616 : * sequential scan through the heap nodes, so even though we will first
4617 : * try to choose the highest-priority item, we might end up picking
4618 : * something with a much lower priority. However, we expect that we will
4619 : * typically be able to pick one of the first few items, which should
4620 : * usually have a relatively high priority.
4621 : */
4622 220 : for (int i = 0; i < binaryheap_size(ready_heap); i++)
4623 : {
4624 168 : TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4625 168 : bool conflicts = false;
4626 :
4627 : /*
4628 : * Check to see if the item would need exclusive lock on something
4629 : * that a currently running item also needs lock on, or vice versa. If
4630 : * so, we don't want to schedule them together.
4631 : */
4632 664 : for (int k = 0; k < pstate->numWorkers; k++)
4633 : {
4634 496 : TocEntry *running_te = pstate->te[k];
4635 :
4636 496 : if (running_te == NULL)
4637 208 : continue;
4638 576 : if (has_lock_conflicts(te, running_te) ||
4639 288 : has_lock_conflicts(running_te, te))
4640 : {
4641 0 : conflicts = true;
4642 0 : break;
4643 : }
4644 : }
4645 :
4646 168 : if (conflicts)
4647 0 : continue;
4648 :
4649 : /* passed all tests, so this item can run */
4650 168 : binaryheap_remove_node(ready_heap, i);
4651 168 : return te;
4652 : }
4653 :
4654 52 : pg_log_debug("no item ready");
4655 52 : return NULL;
4656 : }
4657 :
4658 :
4659 : /*
4660 : * Restore a single TOC item in parallel with others
4661 : *
4662 : * this is run in the worker, i.e. in a thread (Windows) or a separate process
4663 : * (everything else). A worker process executes several such work items during
4664 : * a parallel backup or restore. Once we terminate here and report back that
4665 : * our work is finished, the leader process will assign us a new work item.
4666 : */
4667 : int
4668 168 : parallel_restore(ArchiveHandle *AH, TocEntry *te)
4669 : {
4670 : int status;
4671 :
4672 : Assert(AH->connection != NULL);
4673 :
4674 : /* Count only errors associated with this TOC entry */
4675 168 : AH->public.n_errors = 0;
4676 :
4677 : /* Restore the TOC item */
4678 168 : status = restore_toc_entry(AH, te, true);
4679 :
4680 168 : return status;
4681 : }
4682 :
4683 :
4684 : /*
4685 : * Callback function that's invoked in the leader process after a step has
4686 : * been parallel restored.
4687 : *
4688 : * Update status and reduce the dependency count of any dependent items.
4689 : */
4690 : static void
4691 168 : mark_restore_job_done(ArchiveHandle *AH,
4692 : TocEntry *te,
4693 : int status,
4694 : void *callback_data)
4695 : {
4696 168 : binaryheap *ready_heap = (binaryheap *) callback_data;
4697 :
4698 168 : pg_log_info("finished item %d %s %s",
4699 : te->dumpId, te->desc, te->tag);
4700 :
4701 168 : if (status == WORKER_CREATE_DONE)
4702 0 : mark_create_done(AH, te);
4703 168 : else if (status == WORKER_INHIBIT_DATA)
4704 : {
4705 0 : inhibit_data_for_failed_table(AH, te);
4706 0 : AH->public.n_errors++;
4707 : }
4708 168 : else if (status == WORKER_IGNORED_ERRORS)
4709 0 : AH->public.n_errors++;
4710 168 : else if (status != 0)
4711 0 : pg_fatal("worker process failed: exit code %d",
4712 : status);
4713 :
4714 168 : reduce_dependencies(AH, te, ready_heap);
4715 168 : }
4716 :
4717 :
4718 : /*
4719 : * Process the dependency information into a form useful for parallel restore.
4720 : *
4721 : * This function takes care of fixing up some missing or badly designed
4722 : * dependencies, and then prepares subsidiary data structures that will be
4723 : * used in the main parallel-restore logic, including:
4724 : * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4725 : * 2. We set up depCount fields that are the number of as-yet-unprocessed
4726 : * dependencies for each TOC entry.
4727 : *
4728 : * We also identify locking dependencies so that we can avoid trying to
4729 : * schedule conflicting items at the same time.
4730 : */
4731 : static void
4732 8 : fix_dependencies(ArchiveHandle *AH)
4733 : {
4734 : TocEntry *te;
4735 : int i;
4736 :
4737 : /*
4738 : * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4739 : * items are marked as not being in any parallel-processing list.
4740 : */
4741 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4742 : {
4743 268 : te->depCount = te->nDeps;
4744 268 : te->revDeps = NULL;
4745 268 : te->nRevDeps = 0;
4746 268 : te->pending_prev = NULL;
4747 268 : te->pending_next = NULL;
4748 : }
4749 :
4750 : /*
4751 : * POST_DATA items that are shown as depending on a table need to be
4752 : * re-pointed to depend on that table's data, instead. This ensures they
4753 : * won't get scheduled until the data has been loaded.
4754 : */
4755 8 : repoint_table_dependencies(AH);
4756 :
4757 : /*
4758 : * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4759 : * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4760 : * one BLOB COMMENTS in such files.)
4761 : */
4762 8 : if (AH->version < K_VERS_1_11)
4763 : {
4764 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4765 : {
4766 0 : if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4767 : {
4768 : TocEntry *te2;
4769 :
4770 0 : for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4771 : {
4772 0 : if (strcmp(te2->desc, "BLOBS") == 0)
4773 : {
4774 0 : te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4775 0 : te->dependencies[0] = te2->dumpId;
4776 0 : te->nDeps++;
4777 0 : te->depCount++;
4778 0 : break;
4779 : }
4780 : }
4781 0 : break;
4782 : }
4783 : }
4784 : }
4785 :
4786 : /*
4787 : * At this point we start to build the revDeps reverse-dependency arrays,
4788 : * so all changes of dependencies must be complete.
4789 : */
4790 :
4791 : /*
4792 : * Count the incoming dependencies for each item. Also, it is possible
4793 : * that the dependencies list items that are not in the archive at all
4794 : * (that should not happen in 9.2 and later, but is highly likely in older
4795 : * archives). Subtract such items from the depCounts.
4796 : */
4797 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4798 : {
4799 840 : for (i = 0; i < te->nDeps; i++)
4800 : {
4801 572 : DumpId depid = te->dependencies[i];
4802 :
4803 572 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4804 460 : AH->tocsByDumpId[depid]->nRevDeps++;
4805 : else
4806 112 : te->depCount--;
4807 : }
4808 : }
4809 :
4810 : /*
4811 : * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4812 : * it as a counter below.
4813 : */
4814 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4815 : {
4816 268 : if (te->nRevDeps > 0)
4817 108 : te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4818 268 : te->nRevDeps = 0;
4819 : }
4820 :
4821 : /*
4822 : * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4823 : * better agree with the loops above.
4824 : */
4825 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4826 : {
4827 840 : for (i = 0; i < te->nDeps; i++)
4828 : {
4829 572 : DumpId depid = te->dependencies[i];
4830 :
4831 572 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4832 : {
4833 460 : TocEntry *otherte = AH->tocsByDumpId[depid];
4834 :
4835 460 : otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4836 : }
4837 : }
4838 : }
4839 :
4840 : /*
4841 : * Lastly, work out the locking dependencies.
4842 : */
4843 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4844 : {
4845 268 : te->lockDeps = NULL;
4846 268 : te->nLockDeps = 0;
4847 268 : identify_locking_dependencies(AH, te);
4848 : }
4849 8 : }
4850 :
4851 : /*
4852 : * Change dependencies on table items to depend on table data items instead,
4853 : * but only in POST_DATA items.
4854 : *
4855 : * Also, for any item having such dependency(s), set its dataLength to the
4856 : * largest dataLength of the table data items it depends on. This ensures
4857 : * that parallel restore will prioritize larger jobs (index builds, FK
4858 : * constraint checks, etc) over smaller ones, avoiding situations where we
4859 : * end a restore with only one active job working on a large table.
4860 : */
4861 : static void
4862 8 : repoint_table_dependencies(ArchiveHandle *AH)
4863 : {
4864 : TocEntry *te;
4865 : int i;
4866 : DumpId olddep;
4867 :
4868 276 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4869 : {
4870 268 : if (te->section != SECTION_POST_DATA)
4871 172 : continue;
4872 464 : for (i = 0; i < te->nDeps; i++)
4873 : {
4874 368 : olddep = te->dependencies[i];
4875 368 : if (olddep <= AH->maxDumpId &&
4876 368 : AH->tableDataId[olddep] != 0)
4877 : {
4878 124 : DumpId tabledataid = AH->tableDataId[olddep];
4879 124 : TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4880 :
4881 124 : te->dependencies[i] = tabledataid;
4882 124 : te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4883 124 : pg_log_debug("transferring dependency %d -> %d to %d",
4884 : te->dumpId, olddep, tabledataid);
4885 : }
4886 : }
4887 : }
4888 8 : }
4889 :
4890 : /*
4891 : * Identify which objects we'll need exclusive lock on in order to restore
4892 : * the given TOC entry (*other* than the one identified by the TOC entry
4893 : * itself). Record their dump IDs in the entry's lockDeps[] array.
4894 : */
4895 : static void
4896 268 : identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4897 : {
4898 : DumpId *lockids;
4899 : int nlockids;
4900 : int i;
4901 :
4902 : /*
4903 : * We only care about this for POST_DATA items. PRE_DATA items are not
4904 : * run in parallel, and DATA items are all independent by assumption.
4905 : */
4906 268 : if (te->section != SECTION_POST_DATA)
4907 172 : return;
4908 :
4909 : /* Quick exit if no dependencies at all */
4910 96 : if (te->nDeps == 0)
4911 0 : return;
4912 :
4913 : /*
4914 : * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4915 : * and hence require exclusive lock. However, we know that CREATE INDEX
4916 : * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4917 : * category too ... but today is not that day.)
4918 : */
4919 96 : if (strcmp(te->desc, "INDEX") == 0)
4920 0 : return;
4921 :
4922 : /*
4923 : * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4924 : * item listed among its dependencies. Originally all of these would have
4925 : * been TABLE items, but repoint_table_dependencies would have repointed
4926 : * them to the TABLE DATA items if those are present (which they might not
4927 : * be, eg in a schema-only dump). Note that all of the entries we are
4928 : * processing here are POST_DATA; otherwise there might be a significant
4929 : * difference between a dependency on a table and a dependency on its
4930 : * data, so that closer analysis would be needed here.
4931 : */
4932 96 : lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4933 96 : nlockids = 0;
4934 464 : for (i = 0; i < te->nDeps; i++)
4935 : {
4936 368 : DumpId depid = te->dependencies[i];
4937 :
4938 368 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4939 296 : ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4940 172 : strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4941 164 : lockids[nlockids++] = depid;
4942 : }
4943 :
4944 96 : if (nlockids == 0)
4945 : {
4946 36 : free(lockids);
4947 36 : return;
4948 : }
4949 :
4950 60 : te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4951 60 : te->nLockDeps = nlockids;
4952 : }
4953 :
4954 : /*
4955 : * Remove the specified TOC entry from the depCounts of items that depend on
4956 : * it, thereby possibly making them ready-to-run. Any pending item that
4957 : * becomes ready should be moved to the ready_heap, if that's provided.
4958 : */
4959 : static void
4960 268 : reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4961 : binaryheap *ready_heap)
4962 : {
4963 : int i;
4964 :
4965 268 : pg_log_debug("reducing dependencies for %d", te->dumpId);
4966 :
4967 728 : for (i = 0; i < te->nRevDeps; i++)
4968 : {
4969 460 : TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4970 :
4971 : Assert(otherte->depCount > 0);
4972 460 : otherte->depCount--;
4973 :
4974 : /*
4975 : * It's ready if it has no remaining dependencies, and it belongs in
4976 : * the current restore pass, and it is currently a member of the
4977 : * pending list (that check is needed to prevent double restore in
4978 : * some cases where a list-file forces out-of-order restoring).
4979 : * However, if ready_heap == NULL then caller doesn't want any list
4980 : * memberships changed.
4981 : */
4982 460 : if (otherte->depCount == 0 &&
4983 224 : _tocEntryRestorePass(AH, otherte) == AH->restorePass &&
4984 224 : otherte->pending_prev != NULL &&
4985 : ready_heap != NULL)
4986 : {
4987 : /* Remove it from pending list ... */
4988 88 : pending_list_remove(otherte);
4989 : /* ... and add to ready_heap */
4990 88 : binaryheap_add(ready_heap, otherte);
4991 : }
4992 : }
4993 268 : }
4994 :
4995 : /*
4996 : * Set the created flag on the DATA member corresponding to the given
4997 : * TABLE member
4998 : */
4999 : static void
5000 10094 : mark_create_done(ArchiveHandle *AH, TocEntry *te)
5001 : {
5002 10094 : if (AH->tableDataId[te->dumpId] != 0)
5003 : {
5004 7542 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5005 :
5006 7542 : ted->created = true;
5007 : }
5008 10094 : }
5009 :
5010 : /*
5011 : * Mark the DATA member corresponding to the given TABLE member
5012 : * as not wanted
5013 : */
5014 : static void
5015 0 : inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
5016 : {
5017 0 : pg_log_info("table \"%s\" could not be created, will not restore its data",
5018 : te->tag);
5019 :
5020 0 : if (AH->tableDataId[te->dumpId] != 0)
5021 : {
5022 0 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5023 :
5024 0 : ted->reqs = 0;
5025 : }
5026 0 : }
5027 :
5028 : /*
5029 : * Clone and de-clone routines used in parallel restoration.
5030 : *
5031 : * Enough of the structure is cloned to ensure that there is no
5032 : * conflict between different threads each with their own clone.
5033 : */
5034 : ArchiveHandle *
5035 52 : CloneArchive(ArchiveHandle *AH)
5036 : {
5037 : ArchiveHandle *clone;
5038 :
5039 : /* Make a "flat" copy */
5040 52 : clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
5041 52 : memcpy(clone, AH, sizeof(ArchiveHandle));
5042 :
5043 : /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
5044 52 : clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
5045 52 : memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
5046 :
5047 : /* Handle format-independent fields */
5048 52 : memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
5049 :
5050 : /* The clone will have its own connection, so disregard connection state */
5051 52 : clone->connection = NULL;
5052 52 : clone->connCancel = NULL;
5053 52 : clone->currUser = NULL;
5054 52 : clone->currSchema = NULL;
5055 52 : clone->currTableAm = NULL;
5056 52 : clone->currTablespace = NULL;
5057 :
5058 : /* savedPassword must be local in case we change it while connecting */
5059 52 : if (clone->savedPassword)
5060 0 : clone->savedPassword = pg_strdup(clone->savedPassword);
5061 :
5062 : /* clone has its own error count, too */
5063 52 : clone->public.n_errors = 0;
5064 :
5065 : /* clones should not share lo_buf */
5066 52 : clone->lo_buf = NULL;
5067 :
5068 : /*
5069 : * Clone connections disregard --transaction-size; they must commit after
5070 : * each command so that the results are immediately visible to other
5071 : * workers.
5072 : */
5073 52 : clone->public.ropt->txn_size = 0;
5074 :
5075 : /*
5076 : * Connect our new clone object to the database, using the same connection
5077 : * parameters used for the original connection.
5078 : */
5079 52 : ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
5080 :
5081 : /* re-establish fixed state */
5082 52 : if (AH->mode == archModeRead)
5083 20 : _doSetFixedOutputState(clone);
5084 : /* in write case, setupDumpWorker will fix up connection state */
5085 :
5086 : /* Let the format-specific code have a chance too */
5087 52 : clone->ClonePtr(clone);
5088 :
5089 : Assert(clone->connection != NULL);
5090 52 : return clone;
5091 : }
5092 :
5093 : /*
5094 : * Release clone-local storage.
5095 : *
5096 : * Note: we assume any clone-local connection was already closed.
5097 : */
5098 : void
5099 52 : DeCloneArchive(ArchiveHandle *AH)
5100 : {
5101 : /* Should not have an open database connection */
5102 : Assert(AH->connection == NULL);
5103 :
5104 : /* Clear format-specific state */
5105 52 : AH->DeClonePtr(AH);
5106 :
5107 : /* Clear state allocated by CloneArchive */
5108 52 : if (AH->sqlparse.curCmd)
5109 6 : destroyPQExpBuffer(AH->sqlparse.curCmd);
5110 :
5111 : /* Clear any connection-local state */
5112 52 : free(AH->currUser);
5113 52 : free(AH->currSchema);
5114 52 : free(AH->currTablespace);
5115 52 : free(AH->currTableAm);
5116 52 : free(AH->savedPassword);
5117 :
5118 52 : free(AH);
5119 52 : }
|