Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_archiver.c
4 : *
5 : * Private implementation of the archiver routines.
6 : *
7 : * See the headers to pg_restore for more details.
8 : *
9 : * Copyright (c) 2000, Philip Warner
10 : * Rights are granted to use this software in any way so long
11 : * as this notice is not removed.
12 : *
13 : * The author is not responsible for loss or damages that may
14 : * result from its use.
15 : *
16 : *
17 : * IDENTIFICATION
18 : * src/bin/pg_dump/pg_backup_archiver.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres_fe.h"
23 :
24 : #include <ctype.h>
25 : #include <fcntl.h>
26 : #include <unistd.h>
27 : #include <sys/stat.h>
28 : #include <sys/wait.h>
29 : #ifdef WIN32
30 : #include <io.h>
31 : #endif
32 :
33 : #include "catalog/pg_class_d.h"
34 : #include "common/string.h"
35 : #include "compress_io.h"
36 : #include "dumputils.h"
37 : #include "fe_utils/string_utils.h"
38 : #include "lib/binaryheap.h"
39 : #include "lib/stringinfo.h"
40 : #include "libpq/libpq-fs.h"
41 : #include "parallel.h"
42 : #include "pg_backup_archiver.h"
43 : #include "pg_backup_db.h"
44 : #include "pg_backup_utils.h"
45 :
46 : #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
47 : #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
48 :
49 : #define TOC_PREFIX_NONE ""
50 : #define TOC_PREFIX_DATA "Data for "
51 : #define TOC_PREFIX_STATS "Statistics for "
52 :
53 : static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
54 : const pg_compress_specification compression_spec,
55 : bool dosync, ArchiveMode mode,
56 : SetupWorkerPtrType setupWorkerPtr,
57 : DataDirSyncMethod sync_method);
58 : static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
59 : static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
60 : static char *sanitize_line(const char *str, bool want_hyphen);
61 : static void _doSetFixedOutputState(ArchiveHandle *AH);
62 : static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
63 : static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
64 : static void _becomeUser(ArchiveHandle *AH, const char *user);
65 : static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
66 : static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
67 : static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
68 : static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
69 : static void _printTableAccessMethodNoStorage(ArchiveHandle *AH,
70 : TocEntry *te);
71 : static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
72 : static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
73 : static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
74 : static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
75 : static RestorePass _tocEntryRestorePass(TocEntry *te);
76 : static bool _tocEntryIsACL(TocEntry *te);
77 : static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
78 : static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
79 : static bool is_load_via_partition_root(TocEntry *te);
80 : static void buildTocEntryArrays(ArchiveHandle *AH);
81 : static void _moveBefore(TocEntry *pos, TocEntry *te);
82 : static int _discoverArchiveFormat(ArchiveHandle *AH);
83 :
84 : static int RestoringToDB(ArchiveHandle *AH);
85 : static void dump_lo_buf(ArchiveHandle *AH);
86 : static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
87 : static void SetOutput(ArchiveHandle *AH, const char *filename,
88 : const pg_compress_specification compression_spec, bool append_data);
89 : static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
90 : static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
91 :
92 : static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
93 : static void restore_toc_entries_prefork(ArchiveHandle *AH,
94 : TocEntry *pending_list);
95 : static void restore_toc_entries_parallel(ArchiveHandle *AH,
96 : ParallelState *pstate,
97 : TocEntry *pending_list);
98 : static void restore_toc_entries_postfork(ArchiveHandle *AH,
99 : TocEntry *pending_list);
100 : static void pending_list_header_init(TocEntry *l);
101 : static void pending_list_append(TocEntry *l, TocEntry *te);
102 : static void pending_list_remove(TocEntry *te);
103 : static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
104 : static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
105 : static void move_to_ready_heap(TocEntry *pending_list,
106 : binaryheap *ready_heap,
107 : RestorePass pass);
108 : static TocEntry *pop_next_work_item(binaryheap *ready_heap,
109 : ParallelState *pstate);
110 : static void mark_dump_job_done(ArchiveHandle *AH,
111 : TocEntry *te,
112 : int status,
113 : void *callback_data);
114 : static void mark_restore_job_done(ArchiveHandle *AH,
115 : TocEntry *te,
116 : int status,
117 : void *callback_data);
118 : static void fix_dependencies(ArchiveHandle *AH);
119 : static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
120 : static void repoint_table_dependencies(ArchiveHandle *AH);
121 : static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
122 : static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
123 : binaryheap *ready_heap);
124 : static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
125 : static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
126 :
127 : static void StrictNamesCheck(RestoreOptions *ropt);
128 :
129 :
130 : /*
131 : * Allocate a new DumpOptions block containing all default values.
132 : */
133 : DumpOptions *
134 232 : NewDumpOptions(void)
135 : {
136 232 : DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
137 :
138 232 : InitDumpOptions(opts);
139 232 : return opts;
140 : }
141 :
142 : /*
143 : * Initialize a DumpOptions struct to all default values
144 : */
145 : void
146 788 : InitDumpOptions(DumpOptions *opts)
147 : {
148 788 : memset(opts, 0, sizeof(DumpOptions));
149 : /* set any fields that shouldn't default to zeroes */
150 788 : opts->include_everything = true;
151 788 : opts->cparams.promptPassword = TRI_DEFAULT;
152 788 : opts->dumpSections = DUMP_UNSECTIONED;
153 788 : opts->dumpSchema = true;
154 788 : opts->dumpData = true;
155 788 : opts->dumpStatistics = true;
156 788 : }
157 :
158 : /*
159 : * Create a freshly allocated DumpOptions with options equivalent to those
160 : * found in the given RestoreOptions.
161 : */
162 : DumpOptions *
163 232 : dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
164 : {
165 232 : DumpOptions *dopt = NewDumpOptions();
166 :
167 : /* this is the inverse of what's at the end of pg_dump.c's main() */
168 232 : dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
169 232 : dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
170 232 : dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
171 232 : dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
172 232 : dopt->cparams.promptPassword = ropt->cparams.promptPassword;
173 232 : dopt->outputClean = ropt->dropSchema;
174 232 : dopt->dumpData = ropt->dumpData;
175 232 : dopt->dumpSchema = ropt->dumpSchema;
176 232 : dopt->dumpSections = ropt->dumpSections;
177 232 : dopt->dumpStatistics = ropt->dumpStatistics;
178 232 : dopt->if_exists = ropt->if_exists;
179 232 : dopt->column_inserts = ropt->column_inserts;
180 232 : dopt->aclsSkip = ropt->aclsSkip;
181 232 : dopt->outputSuperuser = ropt->superuser;
182 232 : dopt->outputCreateDB = ropt->createDB;
183 232 : dopt->outputNoOwner = ropt->noOwner;
184 232 : dopt->outputNoTableAm = ropt->noTableAm;
185 232 : dopt->outputNoTablespaces = ropt->noTablespace;
186 232 : dopt->disable_triggers = ropt->disable_triggers;
187 232 : dopt->use_setsessauth = ropt->use_setsessauth;
188 232 : dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
189 232 : dopt->dump_inserts = ropt->dump_inserts;
190 232 : dopt->no_comments = ropt->no_comments;
191 232 : dopt->no_policies = ropt->no_policies;
192 232 : dopt->no_publications = ropt->no_publications;
193 232 : dopt->no_security_labels = ropt->no_security_labels;
194 232 : dopt->no_subscriptions = ropt->no_subscriptions;
195 232 : dopt->lockWaitTimeout = ropt->lockWaitTimeout;
196 232 : dopt->include_everything = ropt->include_everything;
197 232 : dopt->enable_row_security = ropt->enable_row_security;
198 232 : dopt->sequence_data = ropt->sequence_data;
199 :
200 232 : return dopt;
201 : }
202 :
203 :
204 : /*
205 : * Wrapper functions.
206 : *
207 : * The objective is to make writing new formats and dumpers as simple
208 : * as possible, if necessary at the expense of extra function calls etc.
209 : *
210 : */
211 :
212 : /*
213 : * The dump worker setup needs lots of knowledge of the internals of pg_dump,
214 : * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
215 : * setup doesn't need to know anything much, so it's defined here.
216 : */
217 : static void
218 24 : setupRestoreWorker(Archive *AHX)
219 : {
220 24 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
221 :
222 24 : AH->ReopenPtr(AH);
223 24 : }
224 :
225 :
226 : /* Create a new archive */
227 : /* Public */
228 : Archive *
229 508 : CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
230 : const pg_compress_specification compression_spec,
231 : bool dosync, ArchiveMode mode,
232 : SetupWorkerPtrType setupDumpWorker,
233 : DataDirSyncMethod sync_method)
234 :
235 : {
236 508 : ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
237 : dosync, mode, setupDumpWorker, sync_method);
238 :
239 506 : return (Archive *) AH;
240 : }
241 :
242 : /* Open an existing archive */
243 : /* Public */
244 : Archive *
245 212 : OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
246 : {
247 : ArchiveHandle *AH;
248 212 : pg_compress_specification compression_spec = {0};
249 :
250 212 : compression_spec.algorithm = PG_COMPRESSION_NONE;
251 212 : AH = _allocAH(FileSpec, fmt, compression_spec, true,
252 : archModeRead, setupRestoreWorker,
253 : DATA_DIR_SYNC_METHOD_FSYNC);
254 :
255 212 : return (Archive *) AH;
256 : }
257 :
258 : /* Public */
259 : void
260 678 : CloseArchive(Archive *AHX)
261 : {
262 678 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
263 :
264 678 : AH->ClosePtr(AH);
265 :
266 : /* Close the output */
267 678 : errno = 0;
268 678 : if (!EndCompressFileHandle(AH->OF))
269 0 : pg_fatal("could not close output file: %m");
270 678 : }
271 :
272 : /* Public */
273 : void
274 1226 : SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
275 : {
276 : /* Caller can omit dump options, in which case we synthesize them */
277 1226 : if (dopt == NULL && ropt != NULL)
278 232 : dopt = dumpOptionsFromRestoreOptions(ropt);
279 :
280 : /* Save options for later access */
281 1226 : AH->dopt = dopt;
282 1226 : AH->ropt = ropt;
283 1226 : }
284 :
285 : /* Public */
286 : void
287 672 : ProcessArchiveRestoreOptions(Archive *AHX)
288 : {
289 672 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
290 672 : RestoreOptions *ropt = AH->public.ropt;
291 : TocEntry *te;
292 : teSection curSection;
293 :
294 : /* Decide which TOC entries will be dumped/restored, and mark them */
295 672 : curSection = SECTION_PRE_DATA;
296 134308 : for (te = AH->toc->next; te != AH->toc; te = te->next)
297 : {
298 : /*
299 : * When writing an archive, we also take this opportunity to check
300 : * that we have generated the entries in a sane order that respects
301 : * the section divisions. When reading, don't complain, since buggy
302 : * old versions of pg_dump might generate out-of-order archives.
303 : */
304 133636 : if (AH->mode != archModeRead)
305 : {
306 111192 : switch (te->section)
307 : {
308 17086 : case SECTION_NONE:
309 : /* ok to be anywhere */
310 17086 : break;
311 47850 : case SECTION_PRE_DATA:
312 47850 : if (curSection != SECTION_PRE_DATA)
313 0 : pg_log_warning("archive items not in correct section order");
314 47850 : break;
315 25600 : case SECTION_DATA:
316 25600 : if (curSection == SECTION_POST_DATA)
317 0 : pg_log_warning("archive items not in correct section order");
318 25600 : break;
319 20656 : case SECTION_POST_DATA:
320 : /* ok no matter which section we were in */
321 20656 : break;
322 0 : default:
323 0 : pg_fatal("unexpected section code %d",
324 : (int) te->section);
325 : break;
326 : }
327 22444 : }
328 :
329 133636 : if (te->section != SECTION_NONE)
330 114866 : curSection = te->section;
331 :
332 133636 : te->reqs = _tocEntryRequired(te, curSection, AH);
333 : }
334 :
335 : /* Enforce strict names checking */
336 672 : if (ropt->strict_names)
337 0 : StrictNamesCheck(ropt);
338 672 : }
339 :
340 : /*
341 : * RestoreArchive
342 : *
343 : * If append_data is set, then append data into file as we are restoring dump
344 : * of multiple databases which was taken by pg_dumpall.
345 : */
346 : void
347 486 : RestoreArchive(Archive *AHX, bool append_data)
348 : {
349 486 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
350 486 : RestoreOptions *ropt = AH->public.ropt;
351 : bool parallel_mode;
352 : TocEntry *te;
353 : CompressFileHandle *sav;
354 :
355 486 : AH->stage = STAGE_INITIALIZING;
356 :
357 : /*
358 : * If we're going to do parallel restore, there are some restrictions.
359 : */
360 486 : parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
361 486 : if (parallel_mode)
362 : {
363 : /* We haven't got round to making this work for all archive formats */
364 10 : if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
365 0 : pg_fatal("parallel restore is not supported with this archive file format");
366 :
367 : /* Doesn't work if the archive represents dependencies as OIDs */
368 10 : if (AH->version < K_VERS_1_8)
369 0 : pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
370 :
371 : /*
372 : * It's also not gonna work if we can't reopen the input file, so
373 : * let's try that immediately.
374 : */
375 10 : AH->ReopenPtr(AH);
376 : }
377 :
378 : /*
379 : * Make sure we won't need (de)compression we haven't got
380 : */
381 486 : if (AH->PrintTocDataPtr != NULL)
382 : {
383 70844 : for (te = AH->toc->next; te != AH->toc; te = te->next)
384 : {
385 70648 : if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
386 : {
387 290 : char *errmsg = supports_compression(AH->compression_spec);
388 :
389 290 : if (errmsg)
390 0 : pg_fatal("cannot restore from compressed archive (%s)",
391 : errmsg);
392 : else
393 290 : break;
394 : }
395 : }
396 : }
397 :
398 : /*
399 : * Prepare index arrays, so we can assume we have them throughout restore.
400 : * It's possible we already did this, though.
401 : */
402 486 : if (AH->tocsByDumpId == NULL)
403 458 : buildTocEntryArrays(AH);
404 :
405 : /*
406 : * If we're using a DB connection, then connect it.
407 : */
408 486 : if (ropt->useDB)
409 : {
410 58 : pg_log_info("connecting to database for restore");
411 58 : if (AH->version < K_VERS_1_3)
412 0 : pg_fatal("direct database connections are not supported in pre-1.3 archives");
413 :
414 : /*
415 : * We don't want to guess at whether the dump will successfully
416 : * restore; allow the attempt regardless of the version of the restore
417 : * target.
418 : */
419 58 : AHX->minRemoteVersion = 0;
420 58 : AHX->maxRemoteVersion = 9999999;
421 :
422 58 : ConnectDatabaseAhx(AHX, &ropt->cparams, false);
423 :
424 : /*
425 : * If we're talking to the DB directly, don't send comments since they
426 : * obscure SQL when displaying errors
427 : */
428 58 : AH->noTocComments = 1;
429 : }
430 :
431 : /*
432 : * Work out if we have an implied schema-less restore. This can happen if
433 : * the dump excluded the schema or the user has used a toc list to exclude
434 : * all of the schema data. All we do is look for schema entries - if none
435 : * are found then we unset the dumpSchema flag.
436 : *
437 : * We could scan for wanted TABLE entries, but that is not the same as
438 : * data-only. At this stage, it seems unnecessary (6-Mar-2001).
439 : */
440 486 : if (ropt->dumpSchema)
441 : {
442 468 : bool no_schema_found = true;
443 :
444 3300 : for (te = AH->toc->next; te != AH->toc; te = te->next)
445 : {
446 3258 : if ((te->reqs & REQ_SCHEMA) != 0)
447 : {
448 426 : no_schema_found = false;
449 426 : break;
450 : }
451 : }
452 468 : if (no_schema_found)
453 : {
454 42 : ropt->dumpSchema = false;
455 42 : pg_log_info("implied no-schema restore");
456 : }
457 : }
458 :
459 : /*
460 : * Setup the output file if necessary.
461 : */
462 486 : sav = SaveOutput(AH);
463 486 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
464 390 : SetOutput(AH, ropt->filename, ropt->compression_spec, append_data);
465 :
466 486 : ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
467 :
468 486 : if (AH->archiveRemoteVersion)
469 486 : ahprintf(AH, "-- Dumped from database version %s\n",
470 : AH->archiveRemoteVersion);
471 486 : if (AH->archiveDumpVersion)
472 486 : ahprintf(AH, "-- Dumped by pg_dump version %s\n",
473 : AH->archiveDumpVersion);
474 :
475 486 : ahprintf(AH, "\n");
476 :
477 486 : if (AH->public.verbose)
478 70 : dumpTimestamp(AH, "Started on", AH->createDate);
479 :
480 486 : if (ropt->single_txn)
481 : {
482 0 : if (AH->connection)
483 0 : StartTransaction(AHX);
484 : else
485 0 : ahprintf(AH, "BEGIN;\n\n");
486 : }
487 :
488 : /*
489 : * Establish important parameter values right away.
490 : */
491 486 : _doSetFixedOutputState(AH);
492 :
493 486 : AH->stage = STAGE_PROCESSING;
494 :
495 : /*
496 : * Drop the items at the start, in reverse order
497 : */
498 486 : if (ropt->dropSchema)
499 : {
500 2834 : for (te = AH->toc->prev; te != AH->toc; te = te->prev)
501 : {
502 2772 : AH->currentTE = te;
503 :
504 : /*
505 : * In createDB mode, issue a DROP *only* for the database as a
506 : * whole. Issuing drops against anything else would be wrong,
507 : * because at this point we're connected to the wrong database.
508 : * (The DATABASE PROPERTIES entry, if any, should be treated like
509 : * the DATABASE entry.)
510 : */
511 2772 : if (ropt->createDB)
512 : {
513 1254 : if (strcmp(te->desc, "DATABASE") != 0 &&
514 1202 : strcmp(te->desc, "DATABASE PROPERTIES") != 0)
515 1168 : continue;
516 : }
517 :
518 : /* Otherwise, drop anything that's selected and has a dropStmt */
519 1604 : if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
520 : {
521 666 : bool not_allowed_in_txn = false;
522 :
523 666 : pg_log_info("dropping %s %s", te->desc, te->tag);
524 :
525 : /*
526 : * In --transaction-size mode, we have to temporarily exit our
527 : * transaction block to drop objects that can't be dropped
528 : * within a transaction.
529 : */
530 666 : if (ropt->txn_size > 0)
531 : {
532 64 : if (strcmp(te->desc, "DATABASE") == 0 ||
533 32 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
534 : {
535 64 : not_allowed_in_txn = true;
536 64 : if (AH->connection)
537 64 : CommitTransaction(AHX);
538 : else
539 0 : ahprintf(AH, "COMMIT;\n");
540 : }
541 : }
542 :
543 : /* Select owner and schema as necessary */
544 666 : _becomeOwner(AH, te);
545 666 : _selectOutputSchema(AH, te->namespace);
546 :
547 : /*
548 : * Now emit the DROP command, if the object has one. Note we
549 : * don't necessarily emit it verbatim; at this point we add an
550 : * appropriate IF EXISTS clause, if the user requested it.
551 : */
552 666 : if (strcmp(te->desc, "BLOB METADATA") == 0)
553 : {
554 : /* We must generate the per-blob commands */
555 8 : if (ropt->if_exists)
556 4 : IssueCommandPerBlob(AH, te,
557 : "SELECT pg_catalog.lo_unlink(oid) "
558 : "FROM pg_catalog.pg_largeobject_metadata "
559 : "WHERE oid = '", "'");
560 : else
561 4 : IssueCommandPerBlob(AH, te,
562 : "SELECT pg_catalog.lo_unlink('",
563 : "')");
564 : }
565 658 : else if (*te->dropStmt != '\0')
566 : {
567 610 : if (!ropt->if_exists ||
568 270 : strncmp(te->dropStmt, "--", 2) == 0)
569 : {
570 : /*
571 : * Without --if-exists, or if it's just a comment (as
572 : * happens for the public schema), print the dropStmt
573 : * as-is.
574 : */
575 342 : ahprintf(AH, "%s", te->dropStmt);
576 : }
577 : else
578 : {
579 : /*
580 : * Inject an appropriate spelling of "if exists". For
581 : * old-style large objects, we have a routine that
582 : * knows how to do it, without depending on
583 : * te->dropStmt; use that. For other objects we need
584 : * to parse the command.
585 : */
586 268 : if (strcmp(te->desc, "BLOB") == 0)
587 : {
588 0 : DropLOIfExists(AH, te->catalogId.oid);
589 : }
590 : else
591 : {
592 268 : char *dropStmt = pg_strdup(te->dropStmt);
593 268 : char *dropStmtOrig = dropStmt;
594 268 : PQExpBuffer ftStmt = createPQExpBuffer();
595 :
596 : /*
597 : * Need to inject IF EXISTS clause after ALTER
598 : * TABLE part in ALTER TABLE .. DROP statement
599 : */
600 268 : if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
601 : {
602 38 : appendPQExpBufferStr(ftStmt,
603 : "ALTER TABLE IF EXISTS");
604 38 : dropStmt = dropStmt + 11;
605 : }
606 :
607 : /*
608 : * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
609 : * not support the IF EXISTS clause, and therefore
610 : * we simply emit the original command for DEFAULT
611 : * objects (modulo the adjustment made above).
612 : *
613 : * Likewise, don't mess with DATABASE PROPERTIES.
614 : *
615 : * If we used CREATE OR REPLACE VIEW as a means of
616 : * quasi-dropping an ON SELECT rule, that should
617 : * be emitted unchanged as well.
618 : *
619 : * For other object types, we need to extract the
620 : * first part of the DROP which includes the
621 : * object type. Most of the time this matches
622 : * te->desc, so search for that; however for the
623 : * different kinds of CONSTRAINTs, we know to
624 : * search for hardcoded "DROP CONSTRAINT" instead.
625 : */
626 268 : if (strcmp(te->desc, "DEFAULT") == 0 ||
627 262 : strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
628 262 : strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
629 6 : appendPQExpBufferStr(ftStmt, dropStmt);
630 : else
631 : {
632 : char buffer[40];
633 : char *mark;
634 :
635 262 : if (strcmp(te->desc, "CONSTRAINT") == 0 ||
636 234 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
637 234 : strcmp(te->desc, "FK CONSTRAINT") == 0)
638 32 : strcpy(buffer, "DROP CONSTRAINT");
639 : else
640 230 : snprintf(buffer, sizeof(buffer), "DROP %s",
641 : te->desc);
642 :
643 262 : mark = strstr(dropStmt, buffer);
644 :
645 262 : if (mark)
646 : {
647 262 : *mark = '\0';
648 262 : appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
649 : dropStmt, buffer,
650 262 : mark + strlen(buffer));
651 : }
652 : else
653 : {
654 : /* complain and emit unmodified command */
655 0 : pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
656 : dropStmtOrig);
657 0 : appendPQExpBufferStr(ftStmt, dropStmt);
658 : }
659 : }
660 :
661 268 : ahprintf(AH, "%s", ftStmt->data);
662 :
663 268 : destroyPQExpBuffer(ftStmt);
664 268 : pg_free(dropStmtOrig);
665 : }
666 : }
667 : }
668 :
669 : /*
670 : * In --transaction-size mode, re-establish the transaction
671 : * block if needed; otherwise, commit after every N drops.
672 : */
673 666 : if (ropt->txn_size > 0)
674 : {
675 64 : if (not_allowed_in_txn)
676 : {
677 64 : if (AH->connection)
678 64 : StartTransaction(AHX);
679 : else
680 0 : ahprintf(AH, "BEGIN;\n");
681 64 : AH->txnCount = 0;
682 : }
683 0 : else if (++AH->txnCount >= ropt->txn_size)
684 : {
685 0 : if (AH->connection)
686 : {
687 0 : CommitTransaction(AHX);
688 0 : StartTransaction(AHX);
689 : }
690 : else
691 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n");
692 0 : AH->txnCount = 0;
693 : }
694 : }
695 : }
696 : }
697 :
698 : /*
699 : * _selectOutputSchema may have set currSchema to reflect the effect
700 : * of a "SET search_path" command it emitted. However, by now we may
701 : * have dropped that schema; or it might not have existed in the first
702 : * place. In either case the effective value of search_path will not
703 : * be what we think. Forcibly reset currSchema so that we will
704 : * re-establish the search_path setting when needed (after creating
705 : * the schema).
706 : *
707 : * If we treated users as pg_dump'able objects then we'd need to reset
708 : * currUser here too.
709 : */
710 62 : free(AH->currSchema);
711 62 : AH->currSchema = NULL;
712 : }
713 :
714 486 : if (parallel_mode)
715 : {
716 : /*
717 : * In parallel mode, turn control over to the parallel-restore logic.
718 : */
719 : ParallelState *pstate;
720 : TocEntry pending_list;
721 :
722 : /* The archive format module may need some setup for this */
723 10 : if (AH->PrepParallelRestorePtr)
724 10 : AH->PrepParallelRestorePtr(AH);
725 :
726 10 : pending_list_header_init(&pending_list);
727 :
728 : /* This runs PRE_DATA items and then disconnects from the database */
729 10 : restore_toc_entries_prefork(AH, &pending_list);
730 : Assert(AH->connection == NULL);
731 :
732 : /* ParallelBackupStart() will actually fork the processes */
733 10 : pstate = ParallelBackupStart(AH);
734 10 : restore_toc_entries_parallel(AH, pstate, &pending_list);
735 10 : ParallelBackupEnd(AH, pstate);
736 :
737 : /* reconnect the leader and see if we missed something */
738 10 : restore_toc_entries_postfork(AH, &pending_list);
739 : Assert(AH->connection != NULL);
740 : }
741 : else
742 : {
743 : /*
744 : * In serial mode, process everything in three phases: normal items,
745 : * then ACLs, then post-ACL items. We might be able to skip one or
746 : * both extra phases in some cases, eg data-only restores.
747 : */
748 476 : bool haveACL = false;
749 476 : bool havePostACL = false;
750 :
751 105086 : for (te = AH->toc->next; te != AH->toc; te = te->next)
752 : {
753 104612 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
754 3302 : continue; /* ignore if not to be dumped at all */
755 :
756 101310 : switch (_tocEntryRestorePass(te))
757 : {
758 90626 : case RESTORE_PASS_MAIN:
759 90626 : (void) restore_toc_entry(AH, te, false);
760 90624 : break;
761 4274 : case RESTORE_PASS_ACL:
762 4274 : haveACL = true;
763 4274 : break;
764 6410 : case RESTORE_PASS_POST_ACL:
765 6410 : havePostACL = true;
766 6410 : break;
767 : }
768 104610 : }
769 :
770 474 : if (haveACL)
771 : {
772 101304 : for (te = AH->toc->next; te != AH->toc; te = te->next)
773 : {
774 200002 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
775 98908 : _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
776 4274 : (void) restore_toc_entry(AH, te, false);
777 : }
778 : }
779 :
780 474 : if (havePostACL)
781 : {
782 99738 : for (te = AH->toc->next; te != AH->toc; te = te->next)
783 : {
784 198082 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 &&
785 98492 : _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
786 6410 : (void) restore_toc_entry(AH, te, false);
787 : }
788 : }
789 : }
790 :
791 : /*
792 : * Close out any persistent transaction we may have. While these two
793 : * cases are started in different places, we can end both cases here.
794 : */
795 484 : if (ropt->single_txn || ropt->txn_size > 0)
796 : {
797 48 : if (AH->connection)
798 48 : CommitTransaction(AHX);
799 : else
800 0 : ahprintf(AH, "COMMIT;\n\n");
801 : }
802 :
803 484 : if (AH->public.verbose)
804 70 : dumpTimestamp(AH, "Completed on", time(NULL));
805 :
806 484 : ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
807 :
808 : /*
809 : * Clean up & we're done.
810 : */
811 484 : AH->stage = STAGE_FINALIZING;
812 :
813 484 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
814 390 : RestoreOutput(AH, sav);
815 :
816 484 : if (ropt->useDB)
817 58 : DisconnectDatabase(&AH->public);
818 484 : }
819 :
820 : /*
821 : * Restore a single TOC item. Used in both parallel and non-parallel restore;
822 : * is_parallel is true if we are in a worker child process.
823 : *
824 : * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
825 : * the parallel parent has to make the corresponding status update.
826 : */
827 : static int
828 108968 : restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
829 : {
830 108968 : RestoreOptions *ropt = AH->public.ropt;
831 108968 : int status = WORKER_OK;
832 : int reqs;
833 : bool defnDumped;
834 :
835 108968 : AH->currentTE = te;
836 :
837 : /* Dump any relevant dump warnings to stderr */
838 108968 : if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
839 : {
840 0 : if (ropt->dumpSchema && te->defn != NULL && strlen(te->defn) != 0)
841 0 : pg_log_warning("warning from original dump file: %s", te->defn);
842 0 : else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
843 0 : pg_log_warning("warning from original dump file: %s", te->copyStmt);
844 : }
845 :
846 : /* Work out what, if anything, we want from this entry */
847 108968 : reqs = te->reqs;
848 :
849 108968 : defnDumped = false;
850 :
851 : /*
852 : * If it has a schema component that we want, then process that
853 : */
854 108968 : if ((reqs & REQ_SCHEMA) != 0)
855 : {
856 76978 : bool object_is_db = false;
857 :
858 : /*
859 : * In --transaction-size mode, must exit our transaction block to
860 : * create a database or set its properties.
861 : */
862 76978 : if (strcmp(te->desc, "DATABASE") == 0 ||
863 76742 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
864 : {
865 310 : object_is_db = true;
866 310 : if (ropt->txn_size > 0)
867 : {
868 96 : if (AH->connection)
869 96 : CommitTransaction(&AH->public);
870 : else
871 0 : ahprintf(AH, "COMMIT;\n\n");
872 : }
873 : }
874 :
875 : /* Show namespace in log message if available */
876 76978 : if (te->namespace)
877 73874 : pg_log_info("creating %s \"%s.%s\"",
878 : te->desc, te->namespace, te->tag);
879 : else
880 3104 : pg_log_info("creating %s \"%s\"",
881 : te->desc, te->tag);
882 :
883 76978 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
884 76978 : defnDumped = true;
885 :
886 76978 : if (strcmp(te->desc, "TABLE") == 0)
887 : {
888 14768 : if (AH->lastErrorTE == te)
889 : {
890 : /*
891 : * We failed to create the table. If
892 : * --no-data-for-failed-tables was given, mark the
893 : * corresponding TABLE DATA to be ignored.
894 : *
895 : * In the parallel case this must be done in the parent, so we
896 : * just set the return value.
897 : */
898 0 : if (ropt->noDataForFailedTables)
899 : {
900 0 : if (is_parallel)
901 0 : status = WORKER_INHIBIT_DATA;
902 : else
903 0 : inhibit_data_for_failed_table(AH, te);
904 : }
905 : }
906 : else
907 : {
908 : /*
909 : * We created the table successfully. Mark the corresponding
910 : * TABLE DATA for possible truncation.
911 : *
912 : * In the parallel case this must be done in the parent, so we
913 : * just set the return value.
914 : */
915 14768 : if (is_parallel)
916 0 : status = WORKER_CREATE_DONE;
917 : else
918 14768 : mark_create_done(AH, te);
919 : }
920 : }
921 :
922 : /*
923 : * If we created a DB, connect to it. Also, if we changed DB
924 : * properties, reconnect to ensure that relevant GUC settings are
925 : * applied to our session. (That also restarts the transaction block
926 : * in --transaction-size mode.)
927 : */
928 76978 : if (object_is_db)
929 : {
930 310 : pg_log_info("connecting to new database \"%s\"", te->tag);
931 310 : _reconnectToDB(AH, te->tag);
932 : }
933 : }
934 :
935 : /*
936 : * If it has a data component that we want, then process that
937 : */
938 108968 : if ((reqs & REQ_DATA) != 0)
939 : {
940 : /*
941 : * hadDumper will be set if there is genuine data component for this
942 : * node. Otherwise, we need to check the defn field for statements
943 : * that need to be executed in data-only restores.
944 : */
945 13560 : if (te->hadDumper)
946 : {
947 : /*
948 : * If we can output the data, then restore it.
949 : */
950 12100 : if (AH->PrintTocDataPtr != NULL)
951 : {
952 12100 : _printTocEntry(AH, te, TOC_PREFIX_DATA);
953 :
954 12100 : if (strcmp(te->desc, "BLOBS") == 0 ||
955 11946 : strcmp(te->desc, "BLOB COMMENTS") == 0)
956 : {
957 154 : pg_log_info("processing %s", te->desc);
958 :
959 154 : _selectOutputSchema(AH, "pg_catalog");
960 :
961 : /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
962 154 : if (strcmp(te->desc, "BLOB COMMENTS") == 0)
963 0 : AH->outputKind = OUTPUT_OTHERDATA;
964 :
965 154 : AH->PrintTocDataPtr(AH, te);
966 :
967 154 : AH->outputKind = OUTPUT_SQLCMDS;
968 : }
969 : else
970 : {
971 : bool use_truncate;
972 :
973 11946 : _disableTriggersIfNecessary(AH, te);
974 :
975 : /* Select owner and schema as necessary */
976 11946 : _becomeOwner(AH, te);
977 11946 : _selectOutputSchema(AH, te->namespace);
978 :
979 11946 : pg_log_info("processing data for table \"%s.%s\"",
980 : te->namespace, te->tag);
981 :
982 : /*
983 : * In parallel restore, if we created the table earlier in
984 : * this run (so that we know it is empty) and we are not
985 : * restoring a load-via-partition-root data item then we
986 : * wrap the COPY in a transaction and precede it with a
987 : * TRUNCATE. If wal_level is set to minimal this prevents
988 : * WAL-logging the COPY. This obtains a speedup similar
989 : * to that from using single_txn mode in non-parallel
990 : * restores.
991 : *
992 : * We mustn't do this for load-via-partition-root cases
993 : * because some data might get moved across partition
994 : * boundaries, risking deadlock and/or loss of previously
995 : * loaded data. (We assume that all partitions of a
996 : * partitioned table will be treated the same way.)
997 : */
998 13226 : use_truncate = is_parallel && te->created &&
999 1280 : !is_load_via_partition_root(te);
1000 :
1001 11946 : if (use_truncate)
1002 : {
1003 : /*
1004 : * Parallel restore is always talking directly to a
1005 : * server, so no need to see if we should issue BEGIN.
1006 : */
1007 1268 : StartTransaction(&AH->public);
1008 :
1009 : /*
1010 : * Issue TRUNCATE with ONLY so that child tables are
1011 : * not wiped.
1012 : */
1013 1268 : ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1014 1268 : fmtQualifiedId(te->namespace, te->tag));
1015 : }
1016 :
1017 : /*
1018 : * If we have a copy statement, use it.
1019 : */
1020 11946 : if (te->copyStmt && strlen(te->copyStmt) > 0)
1021 : {
1022 11800 : ahprintf(AH, "%s", te->copyStmt);
1023 11800 : AH->outputKind = OUTPUT_COPYDATA;
1024 : }
1025 : else
1026 146 : AH->outputKind = OUTPUT_OTHERDATA;
1027 :
1028 11946 : AH->PrintTocDataPtr(AH, te);
1029 :
1030 : /*
1031 : * Terminate COPY if needed.
1032 : */
1033 23742 : if (AH->outputKind == OUTPUT_COPYDATA &&
1034 11798 : RestoringToDB(AH))
1035 1266 : EndDBCopyMode(&AH->public, te->tag);
1036 11944 : AH->outputKind = OUTPUT_SQLCMDS;
1037 :
1038 : /* close out the transaction started above */
1039 11944 : if (use_truncate)
1040 1268 : CommitTransaction(&AH->public);
1041 :
1042 11944 : _enableTriggersIfNecessary(AH, te);
1043 : }
1044 : }
1045 : }
1046 1460 : else if (!defnDumped)
1047 : {
1048 : /* If we haven't already dumped the defn part, do so now */
1049 1460 : pg_log_info("executing %s %s", te->desc, te->tag);
1050 1460 : _printTocEntry(AH, te, TOC_PREFIX_NONE);
1051 : }
1052 : }
1053 :
1054 : /*
1055 : * If it has a statistics component that we want, then process that
1056 : */
1057 108966 : if ((reqs & REQ_STATS) != 0)
1058 18394 : _printTocEntry(AH, te, TOC_PREFIX_STATS);
1059 :
1060 : /*
1061 : * If we emitted anything for this TOC entry, that counts as one action
1062 : * against the transaction-size limit. Commit if it's time to.
1063 : */
1064 108966 : if ((reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0 && ropt->txn_size > 0)
1065 : {
1066 6420 : if (++AH->txnCount >= ropt->txn_size)
1067 : {
1068 20 : if (AH->connection)
1069 : {
1070 20 : CommitTransaction(&AH->public);
1071 20 : StartTransaction(&AH->public);
1072 : }
1073 : else
1074 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1075 20 : AH->txnCount = 0;
1076 : }
1077 : }
1078 :
1079 108966 : if (AH->public.n_errors > 0 && status == WORKER_OK)
1080 0 : status = WORKER_IGNORED_ERRORS;
1081 :
1082 108966 : return status;
1083 : }
1084 :
1085 : /*
1086 : * Allocate a new RestoreOptions block.
1087 : * This is mainly so we can initialize it, but also for future expansion,
1088 : */
1089 : RestoreOptions *
1090 688 : NewRestoreOptions(void)
1091 : {
1092 : RestoreOptions *opts;
1093 :
1094 688 : opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
1095 :
1096 : /* set any fields that shouldn't default to zeroes */
1097 688 : opts->format = archUnknown;
1098 688 : opts->cparams.promptPassword = TRI_DEFAULT;
1099 688 : opts->dumpSections = DUMP_UNSECTIONED;
1100 688 : opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1101 688 : opts->compression_spec.level = 0;
1102 688 : opts->dumpSchema = true;
1103 688 : opts->dumpData = true;
1104 688 : opts->dumpStatistics = true;
1105 :
1106 688 : return opts;
1107 : }
1108 :
1109 : static void
1110 11946 : _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1111 : {
1112 11946 : RestoreOptions *ropt = AH->public.ropt;
1113 :
1114 : /* This hack is only needed in a data-only restore */
1115 11946 : if (ropt->dumpSchema || !ropt->disable_triggers)
1116 11882 : return;
1117 :
1118 64 : pg_log_info("disabling triggers for %s", te->tag);
1119 :
1120 : /*
1121 : * Become superuser if possible, since they are the only ones who can
1122 : * disable constraint triggers. If -S was not given, assume the initial
1123 : * user identity is a superuser. (XXX would it be better to become the
1124 : * table owner?)
1125 : */
1126 64 : _becomeUser(AH, ropt->superuser);
1127 :
1128 : /*
1129 : * Disable them.
1130 : */
1131 64 : ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1132 64 : fmtQualifiedId(te->namespace, te->tag));
1133 : }
1134 :
1135 : static void
1136 11944 : _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1137 : {
1138 11944 : RestoreOptions *ropt = AH->public.ropt;
1139 :
1140 : /* This hack is only needed in a data-only restore */
1141 11944 : if (ropt->dumpSchema || !ropt->disable_triggers)
1142 11880 : return;
1143 :
1144 64 : pg_log_info("enabling triggers for %s", te->tag);
1145 :
1146 : /*
1147 : * Become superuser if possible, since they are the only ones who can
1148 : * disable constraint triggers. If -S was not given, assume the initial
1149 : * user identity is a superuser. (XXX would it be better to become the
1150 : * table owner?)
1151 : */
1152 64 : _becomeUser(AH, ropt->superuser);
1153 :
1154 : /*
1155 : * Enable them.
1156 : */
1157 64 : ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1158 64 : fmtQualifiedId(te->namespace, te->tag));
1159 : }
1160 :
1161 : /*
1162 : * Detect whether a TABLE DATA TOC item is performing "load via partition
1163 : * root", that is the target table is an ancestor partition rather than the
1164 : * table the TOC item is nominally for.
1165 : *
1166 : * In newer archive files this can be detected by checking for a special
1167 : * comment placed in te->defn. In older files we have to fall back to seeing
1168 : * if the COPY statement targets the named table or some other one. This
1169 : * will not work for data dumped as INSERT commands, so we could give a false
1170 : * negative in that case; fortunately, that's a rarely-used option.
1171 : */
1172 : static bool
1173 1280 : is_load_via_partition_root(TocEntry *te)
1174 : {
1175 1280 : if (te->defn &&
1176 12 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
1177 12 : return true;
1178 1268 : if (te->copyStmt && *te->copyStmt)
1179 : {
1180 1260 : PQExpBuffer copyStmt = createPQExpBuffer();
1181 : bool result;
1182 :
1183 : /*
1184 : * Build the initial part of the COPY as it would appear if the
1185 : * nominal target table is the actual target. If we see anything
1186 : * else, it must be a load-via-partition-root case.
1187 : */
1188 1260 : appendPQExpBuffer(copyStmt, "COPY %s ",
1189 1260 : fmtQualifiedId(te->namespace, te->tag));
1190 1260 : result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1191 1260 : destroyPQExpBuffer(copyStmt);
1192 1260 : return result;
1193 : }
1194 : /* Assume it's not load-via-partition-root */
1195 8 : return false;
1196 : }
1197 :
1198 : /*
1199 : * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1200 : */
1201 :
1202 : /* Public */
1203 : void
1204 6180164 : WriteData(Archive *AHX, const void *data, size_t dLen)
1205 : {
1206 6180164 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1207 :
1208 6180164 : if (!AH->currToc)
1209 0 : pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1210 :
1211 6180164 : AH->WriteDataPtr(AH, data, dLen);
1212 6180164 : }
1213 :
1214 : /*
1215 : * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1216 : * repository for all metadata. But the name has stuck.
1217 : *
1218 : * The new entry is added to the Archive's TOC list. Most callers can ignore
1219 : * the result value because nothing else need be done, but a few want to
1220 : * manipulate the TOC entry further.
1221 : */
1222 :
1223 : /* Public */
1224 : TocEntry *
1225 111192 : ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1226 : ArchiveOpts *opts)
1227 : {
1228 111192 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1229 : TocEntry *newToc;
1230 :
1231 111192 : newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1232 :
1233 111192 : AH->tocCount++;
1234 111192 : if (dumpId > AH->maxDumpId)
1235 38992 : AH->maxDumpId = dumpId;
1236 :
1237 111192 : newToc->prev = AH->toc->prev;
1238 111192 : newToc->next = AH->toc;
1239 111192 : AH->toc->prev->next = newToc;
1240 111192 : AH->toc->prev = newToc;
1241 :
1242 111192 : newToc->catalogId = catalogId;
1243 111192 : newToc->dumpId = dumpId;
1244 111192 : newToc->section = opts->section;
1245 :
1246 111192 : newToc->tag = pg_strdup(opts->tag);
1247 111192 : newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1248 111192 : newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1249 111192 : newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1250 111192 : newToc->relkind = opts->relkind;
1251 111192 : newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1252 111192 : newToc->desc = pg_strdup(opts->description);
1253 111192 : newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1254 111192 : newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1255 111192 : newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1256 :
1257 111192 : if (opts->nDeps > 0)
1258 : {
1259 50102 : newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1260 50102 : memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1261 50102 : newToc->nDeps = opts->nDeps;
1262 : }
1263 : else
1264 : {
1265 61090 : newToc->dependencies = NULL;
1266 61090 : newToc->nDeps = 0;
1267 : }
1268 :
1269 111192 : newToc->dataDumper = opts->dumpFn;
1270 111192 : newToc->dataDumperArg = opts->dumpArg;
1271 111192 : newToc->hadDumper = opts->dumpFn ? true : false;
1272 :
1273 111192 : newToc->defnDumper = opts->defnFn;
1274 111192 : newToc->defnDumperArg = opts->defnArg;
1275 :
1276 111192 : newToc->formatData = NULL;
1277 111192 : newToc->dataLength = 0;
1278 :
1279 111192 : if (AH->ArchiveEntryPtr != NULL)
1280 22310 : AH->ArchiveEntryPtr(AH, newToc);
1281 :
1282 111192 : return newToc;
1283 : }
1284 :
1285 : /* Public */
1286 : void
1287 8 : PrintTOCSummary(Archive *AHX)
1288 : {
1289 8 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1290 8 : RestoreOptions *ropt = AH->public.ropt;
1291 : TocEntry *te;
1292 8 : pg_compress_specification out_compression_spec = {0};
1293 : teSection curSection;
1294 : CompressFileHandle *sav;
1295 : const char *fmtName;
1296 : char stamp_str[64];
1297 :
1298 : /* TOC is always uncompressed */
1299 8 : out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1300 :
1301 8 : sav = SaveOutput(AH);
1302 8 : if (ropt->filename)
1303 0 : SetOutput(AH, ropt->filename, out_compression_spec, false);
1304 :
1305 8 : if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1306 8 : localtime(&AH->createDate)) == 0)
1307 0 : strcpy(stamp_str, "[unknown]");
1308 :
1309 8 : ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1310 16 : ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1311 8 : sanitize_line(AH->archdbname, false),
1312 : AH->tocCount,
1313 : get_compress_algorithm_name(AH->compression_spec.algorithm));
1314 :
1315 8 : switch (AH->format)
1316 : {
1317 6 : case archCustom:
1318 6 : fmtName = "CUSTOM";
1319 6 : break;
1320 2 : case archDirectory:
1321 2 : fmtName = "DIRECTORY";
1322 2 : break;
1323 0 : case archTar:
1324 0 : fmtName = "TAR";
1325 0 : break;
1326 0 : default:
1327 0 : fmtName = "UNKNOWN";
1328 : }
1329 :
1330 8 : ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1331 8 : ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1332 8 : ahprintf(AH, "; Format: %s\n", fmtName);
1333 8 : ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1334 8 : ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1335 8 : if (AH->archiveRemoteVersion)
1336 8 : ahprintf(AH, "; Dumped from database version: %s\n",
1337 : AH->archiveRemoteVersion);
1338 8 : if (AH->archiveDumpVersion)
1339 8 : ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1340 : AH->archiveDumpVersion);
1341 :
1342 8 : ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1343 :
1344 8 : curSection = SECTION_PRE_DATA;
1345 2864 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1346 : {
1347 : /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1348 2856 : if (te->section != SECTION_NONE)
1349 2280 : curSection = te->section;
1350 2856 : te->reqs = _tocEntryRequired(te, curSection, AH);
1351 : /* Now, should we print it? */
1352 2856 : if (ropt->verbose ||
1353 2856 : (te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) != 0)
1354 : {
1355 : char *sanitized_name;
1356 : char *sanitized_schema;
1357 : char *sanitized_owner;
1358 :
1359 : /*
1360 : */
1361 2816 : sanitized_name = sanitize_line(te->tag, false);
1362 2816 : sanitized_schema = sanitize_line(te->namespace, true);
1363 2816 : sanitized_owner = sanitize_line(te->owner, false);
1364 :
1365 2816 : ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1366 : te->catalogId.tableoid, te->catalogId.oid,
1367 : te->desc, sanitized_schema, sanitized_name,
1368 : sanitized_owner);
1369 :
1370 2816 : free(sanitized_name);
1371 2816 : free(sanitized_schema);
1372 2816 : free(sanitized_owner);
1373 : }
1374 2856 : if (ropt->verbose && te->nDeps > 0)
1375 : {
1376 : int i;
1377 :
1378 0 : ahprintf(AH, ";\tdepends on:");
1379 0 : for (i = 0; i < te->nDeps; i++)
1380 0 : ahprintf(AH, " %d", te->dependencies[i]);
1381 0 : ahprintf(AH, "\n");
1382 : }
1383 : }
1384 :
1385 : /* Enforce strict names checking */
1386 8 : if (ropt->strict_names)
1387 0 : StrictNamesCheck(ropt);
1388 :
1389 8 : if (ropt->filename)
1390 0 : RestoreOutput(AH, sav);
1391 8 : }
1392 :
1393 : /***********
1394 : * Large Object Archival
1395 : ***********/
1396 :
1397 : /* Called by a dumper to signal start of a LO */
1398 : int
1399 178 : StartLO(Archive *AHX, Oid oid)
1400 : {
1401 178 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1402 :
1403 178 : if (!AH->StartLOPtr)
1404 0 : pg_fatal("large-object output not supported in chosen format");
1405 :
1406 178 : AH->StartLOPtr(AH, AH->currToc, oid);
1407 :
1408 178 : return 1;
1409 : }
1410 :
1411 : /* Called by a dumper to signal end of a LO */
1412 : int
1413 178 : EndLO(Archive *AHX, Oid oid)
1414 : {
1415 178 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1416 :
1417 178 : if (AH->EndLOPtr)
1418 178 : AH->EndLOPtr(AH, AH->currToc, oid);
1419 :
1420 178 : return 1;
1421 : }
1422 :
1423 : /**********
1424 : * Large Object Restoration
1425 : **********/
1426 :
1427 : /*
1428 : * Called by a format handler before a group of LOs is restored
1429 : */
1430 : void
1431 34 : StartRestoreLOs(ArchiveHandle *AH)
1432 : {
1433 34 : RestoreOptions *ropt = AH->public.ropt;
1434 :
1435 : /*
1436 : * LOs must be restored within a transaction block, since we need the LO
1437 : * handle to stay open while we write it. Establish a transaction unless
1438 : * there's one being used globally.
1439 : */
1440 34 : if (!(ropt->single_txn || ropt->txn_size > 0))
1441 : {
1442 34 : if (AH->connection)
1443 2 : StartTransaction(&AH->public);
1444 : else
1445 32 : ahprintf(AH, "BEGIN;\n\n");
1446 : }
1447 :
1448 34 : AH->loCount = 0;
1449 34 : }
1450 :
1451 : /*
1452 : * Called by a format handler after a group of LOs is restored
1453 : */
1454 : void
1455 34 : EndRestoreLOs(ArchiveHandle *AH)
1456 : {
1457 34 : RestoreOptions *ropt = AH->public.ropt;
1458 :
1459 34 : if (!(ropt->single_txn || ropt->txn_size > 0))
1460 : {
1461 34 : if (AH->connection)
1462 2 : CommitTransaction(&AH->public);
1463 : else
1464 32 : ahprintf(AH, "COMMIT;\n\n");
1465 : }
1466 :
1467 34 : pg_log_info(ngettext("restored %d large object",
1468 : "restored %d large objects",
1469 : AH->loCount),
1470 : AH->loCount);
1471 34 : }
1472 :
1473 :
1474 : /*
1475 : * Called by a format handler to initiate restoration of a LO
1476 : */
1477 : void
1478 38 : StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1479 : {
1480 38 : bool old_lo_style = (AH->version < K_VERS_1_12);
1481 : Oid loOid;
1482 :
1483 38 : AH->loCount++;
1484 :
1485 : /* Initialize the LO Buffer */
1486 38 : if (AH->lo_buf == NULL)
1487 : {
1488 : /* First time through (in this process) so allocate the buffer */
1489 18 : AH->lo_buf_size = LOBBUFSIZE;
1490 18 : AH->lo_buf = pg_malloc(LOBBUFSIZE);
1491 : }
1492 38 : AH->lo_buf_used = 0;
1493 :
1494 38 : pg_log_info("restoring large object with OID %u", oid);
1495 :
1496 : /* With an old archive we must do drop and create logic here */
1497 38 : if (old_lo_style && drop)
1498 0 : DropLOIfExists(AH, oid);
1499 :
1500 38 : if (AH->connection)
1501 : {
1502 6 : if (old_lo_style)
1503 : {
1504 0 : loOid = lo_create(AH->connection, oid);
1505 0 : if (loOid == 0 || loOid != oid)
1506 0 : pg_fatal("could not create large object %u: %s",
1507 : oid, PQerrorMessage(AH->connection));
1508 : }
1509 6 : AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1510 6 : if (AH->loFd == -1)
1511 0 : pg_fatal("could not open large object %u: %s",
1512 : oid, PQerrorMessage(AH->connection));
1513 : }
1514 : else
1515 : {
1516 32 : if (old_lo_style)
1517 0 : ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1518 : oid, INV_WRITE);
1519 : else
1520 32 : ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1521 : oid, INV_WRITE);
1522 : }
1523 :
1524 38 : AH->writingLO = true;
1525 38 : }
1526 :
1527 : void
1528 38 : EndRestoreLO(ArchiveHandle *AH, Oid oid)
1529 : {
1530 38 : if (AH->lo_buf_used > 0)
1531 : {
1532 : /* Write remaining bytes from the LO buffer */
1533 20 : dump_lo_buf(AH);
1534 : }
1535 :
1536 38 : AH->writingLO = false;
1537 :
1538 38 : if (AH->connection)
1539 : {
1540 6 : lo_close(AH->connection, AH->loFd);
1541 6 : AH->loFd = -1;
1542 : }
1543 : else
1544 : {
1545 32 : ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1546 : }
1547 38 : }
1548 :
1549 : /***********
1550 : * Sorting and Reordering
1551 : ***********/
1552 :
1553 : void
1554 0 : SortTocFromFile(Archive *AHX)
1555 : {
1556 0 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1557 0 : RestoreOptions *ropt = AH->public.ropt;
1558 : FILE *fh;
1559 : StringInfoData linebuf;
1560 :
1561 : /* Allocate space for the 'wanted' array, and init it */
1562 0 : ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1563 :
1564 : /* Setup the file */
1565 0 : fh = fopen(ropt->tocFile, PG_BINARY_R);
1566 0 : if (!fh)
1567 0 : pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1568 :
1569 0 : initStringInfo(&linebuf);
1570 :
1571 0 : while (pg_get_line_buf(fh, &linebuf))
1572 : {
1573 : char *cmnt;
1574 : char *endptr;
1575 : DumpId id;
1576 : TocEntry *te;
1577 :
1578 : /* Truncate line at comment, if any */
1579 0 : cmnt = strchr(linebuf.data, ';');
1580 0 : if (cmnt != NULL)
1581 : {
1582 0 : cmnt[0] = '\0';
1583 0 : linebuf.len = cmnt - linebuf.data;
1584 : }
1585 :
1586 : /* Ignore if all blank */
1587 0 : if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1588 0 : continue;
1589 :
1590 : /* Get an ID, check it's valid and not already seen */
1591 0 : id = strtol(linebuf.data, &endptr, 10);
1592 0 : if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1593 0 : ropt->idWanted[id - 1])
1594 : {
1595 0 : pg_log_warning("line ignored: %s", linebuf.data);
1596 0 : continue;
1597 : }
1598 :
1599 : /* Find TOC entry */
1600 0 : te = getTocEntryByDumpId(AH, id);
1601 0 : if (!te)
1602 0 : pg_fatal("could not find entry for ID %d",
1603 : id);
1604 :
1605 : /* Mark it wanted */
1606 0 : ropt->idWanted[id - 1] = true;
1607 :
1608 : /*
1609 : * Move each item to the end of the list as it is selected, so that
1610 : * they are placed in the desired order. Any unwanted items will end
1611 : * up at the front of the list, which may seem unintuitive but it's
1612 : * what we need. In an ordinary serial restore that makes no
1613 : * difference, but in a parallel restore we need to mark unrestored
1614 : * items' dependencies as satisfied before we start examining
1615 : * restorable items. Otherwise they could have surprising
1616 : * side-effects on the order in which restorable items actually get
1617 : * restored.
1618 : */
1619 0 : _moveBefore(AH->toc, te);
1620 : }
1621 :
1622 0 : pg_free(linebuf.data);
1623 :
1624 0 : if (fclose(fh) != 0)
1625 0 : pg_fatal("could not close TOC file: %m");
1626 0 : }
1627 :
1628 : /**********************
1629 : * Convenience functions that look like standard IO functions
1630 : * for writing data when in dump mode.
1631 : **********************/
1632 :
1633 : /* Public */
1634 : void
1635 46052 : archputs(const char *s, Archive *AH)
1636 : {
1637 46052 : WriteData(AH, s, strlen(s));
1638 46052 : }
1639 :
1640 : /* Public */
1641 : int
1642 11722 : archprintf(Archive *AH, const char *fmt,...)
1643 : {
1644 11722 : int save_errno = errno;
1645 : char *p;
1646 11722 : size_t len = 128; /* initial assumption about buffer size */
1647 : size_t cnt;
1648 :
1649 : for (;;)
1650 0 : {
1651 : va_list args;
1652 :
1653 : /* Allocate work buffer. */
1654 11722 : p = (char *) pg_malloc(len);
1655 :
1656 : /* Try to format the data. */
1657 11722 : errno = save_errno;
1658 11722 : va_start(args, fmt);
1659 11722 : cnt = pvsnprintf(p, len, fmt, args);
1660 11722 : va_end(args);
1661 :
1662 11722 : if (cnt < len)
1663 11722 : break; /* success */
1664 :
1665 : /* Release buffer and loop around to try again with larger len. */
1666 0 : free(p);
1667 0 : len = cnt;
1668 : }
1669 :
1670 11722 : WriteData(AH, p, cnt);
1671 11722 : free(p);
1672 11722 : return (int) cnt;
1673 : }
1674 :
1675 :
1676 : /*******************************
1677 : * Stuff below here should be 'private' to the archiver routines
1678 : *******************************/
1679 :
1680 : static void
1681 390 : SetOutput(ArchiveHandle *AH, const char *filename,
1682 : const pg_compress_specification compression_spec,
1683 : bool append_data)
1684 : {
1685 : CompressFileHandle *CFH;
1686 : const char *mode;
1687 390 : int fn = -1;
1688 :
1689 390 : if (filename)
1690 : {
1691 390 : if (strcmp(filename, "-") == 0)
1692 0 : fn = fileno(stdout);
1693 : }
1694 0 : else if (AH->FH)
1695 0 : fn = fileno(AH->FH);
1696 0 : else if (AH->fSpec)
1697 : {
1698 0 : filename = AH->fSpec;
1699 : }
1700 : else
1701 0 : fn = fileno(stdout);
1702 :
1703 390 : if (append_data || AH->mode == archModeAppend)
1704 192 : mode = PG_BINARY_A;
1705 : else
1706 198 : mode = PG_BINARY_W;
1707 :
1708 390 : CFH = InitCompressFileHandle(compression_spec);
1709 :
1710 390 : if (!CFH->open_func(filename, fn, mode, CFH))
1711 : {
1712 0 : if (filename)
1713 0 : pg_fatal("could not open output file \"%s\": %m", filename);
1714 : else
1715 0 : pg_fatal("could not open output file: %m");
1716 : }
1717 :
1718 390 : AH->OF = CFH;
1719 390 : }
1720 :
1721 : static CompressFileHandle *
1722 494 : SaveOutput(ArchiveHandle *AH)
1723 : {
1724 494 : return (CompressFileHandle *) AH->OF;
1725 : }
1726 :
1727 : static void
1728 390 : RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
1729 : {
1730 390 : errno = 0;
1731 390 : if (!EndCompressFileHandle(AH->OF))
1732 0 : pg_fatal("could not close output file: %m");
1733 :
1734 390 : AH->OF = savedOutput;
1735 390 : }
1736 :
1737 :
1738 :
1739 : /*
1740 : * Print formatted text to the output file (usually stdout).
1741 : */
1742 : int
1743 557838 : ahprintf(ArchiveHandle *AH, const char *fmt,...)
1744 : {
1745 557838 : int save_errno = errno;
1746 : char *p;
1747 557838 : size_t len = 128; /* initial assumption about buffer size */
1748 : size_t cnt;
1749 :
1750 : for (;;)
1751 43376 : {
1752 : va_list args;
1753 :
1754 : /* Allocate work buffer. */
1755 601214 : p = (char *) pg_malloc(len);
1756 :
1757 : /* Try to format the data. */
1758 601214 : errno = save_errno;
1759 601214 : va_start(args, fmt);
1760 601214 : cnt = pvsnprintf(p, len, fmt, args);
1761 601214 : va_end(args);
1762 :
1763 601214 : if (cnt < len)
1764 557838 : break; /* success */
1765 :
1766 : /* Release buffer and loop around to try again with larger len. */
1767 43376 : free(p);
1768 43376 : len = cnt;
1769 : }
1770 :
1771 557838 : ahwrite(p, 1, cnt, AH);
1772 557838 : free(p);
1773 557838 : return (int) cnt;
1774 : }
1775 :
1776 : /*
1777 : * Single place for logic which says 'We are restoring to a direct DB connection'.
1778 : */
1779 : static int
1780 5863710 : RestoringToDB(ArchiveHandle *AH)
1781 : {
1782 5863710 : RestoreOptions *ropt = AH->public.ropt;
1783 :
1784 5863710 : return (ropt && ropt->useDB && AH->connection);
1785 : }
1786 :
1787 : /*
1788 : * Dump the current contents of the LO data buffer while writing a LO
1789 : */
1790 : static void
1791 20 : dump_lo_buf(ArchiveHandle *AH)
1792 : {
1793 20 : if (AH->connection)
1794 : {
1795 : int res;
1796 :
1797 4 : res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1798 4 : pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1799 : "wrote %zu bytes of large object data (result = %d)",
1800 : AH->lo_buf_used),
1801 : AH->lo_buf_used, res);
1802 : /* We assume there are no short writes, only errors */
1803 4 : if (res != AH->lo_buf_used)
1804 0 : warn_or_exit_horribly(AH, "could not write to large object: %s",
1805 0 : PQerrorMessage(AH->connection));
1806 : }
1807 : else
1808 : {
1809 16 : PQExpBuffer buf = createPQExpBuffer();
1810 :
1811 16 : appendByteaLiteralAHX(buf,
1812 : (const unsigned char *) AH->lo_buf,
1813 : AH->lo_buf_used,
1814 : AH);
1815 :
1816 : /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1817 16 : AH->writingLO = false;
1818 16 : ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1819 16 : AH->writingLO = true;
1820 :
1821 16 : destroyPQExpBuffer(buf);
1822 : }
1823 20 : AH->lo_buf_used = 0;
1824 20 : }
1825 :
1826 :
1827 : /*
1828 : * Write buffer to the output file (usually stdout). This is used for
1829 : * outputting 'restore' scripts etc. It is even possible for an archive
1830 : * format to create a custom output routine to 'fake' a restore if it
1831 : * wants to generate a script (see TAR output).
1832 : */
1833 : void
1834 5855944 : ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1835 : {
1836 5855944 : int bytes_written = 0;
1837 :
1838 5855944 : if (AH->writingLO)
1839 : {
1840 30 : size_t remaining = size * nmemb;
1841 :
1842 30 : while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1843 : {
1844 0 : size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1845 :
1846 0 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1847 0 : ptr = (const char *) ptr + avail;
1848 0 : remaining -= avail;
1849 0 : AH->lo_buf_used += avail;
1850 0 : dump_lo_buf(AH);
1851 : }
1852 :
1853 30 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1854 30 : AH->lo_buf_used += remaining;
1855 :
1856 30 : bytes_written = size * nmemb;
1857 : }
1858 5855914 : else if (AH->CustomOutPtr)
1859 5674 : bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1860 :
1861 : /*
1862 : * If we're doing a restore, and it's direct to DB, and we're connected
1863 : * then send it to the DB.
1864 : */
1865 5850240 : else if (RestoringToDB(AH))
1866 30358 : bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1867 : else
1868 : {
1869 5819882 : CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1870 :
1871 5819882 : if (CFH->write_func(ptr, size * nmemb, CFH))
1872 5819882 : bytes_written = size * nmemb;
1873 : }
1874 :
1875 5855944 : if (bytes_written != size * nmemb)
1876 0 : WRITE_ERROR_EXIT;
1877 5855944 : }
1878 :
1879 : /* on some error, we may decide to go on... */
1880 : void
1881 0 : warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1882 : {
1883 : va_list ap;
1884 :
1885 0 : switch (AH->stage)
1886 : {
1887 :
1888 0 : case STAGE_NONE:
1889 : /* Do nothing special */
1890 0 : break;
1891 :
1892 0 : case STAGE_INITIALIZING:
1893 0 : if (AH->stage != AH->lastErrorStage)
1894 0 : pg_log_info("while INITIALIZING:");
1895 0 : break;
1896 :
1897 0 : case STAGE_PROCESSING:
1898 0 : if (AH->stage != AH->lastErrorStage)
1899 0 : pg_log_info("while PROCESSING TOC:");
1900 0 : break;
1901 :
1902 0 : case STAGE_FINALIZING:
1903 0 : if (AH->stage != AH->lastErrorStage)
1904 0 : pg_log_info("while FINALIZING:");
1905 0 : break;
1906 : }
1907 0 : if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1908 : {
1909 0 : pg_log_info("from TOC entry %d; %u %u %s %s %s",
1910 : AH->currentTE->dumpId,
1911 : AH->currentTE->catalogId.tableoid,
1912 : AH->currentTE->catalogId.oid,
1913 : AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1914 : AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1915 : AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1916 : }
1917 0 : AH->lastErrorStage = AH->stage;
1918 0 : AH->lastErrorTE = AH->currentTE;
1919 :
1920 0 : va_start(ap, fmt);
1921 0 : pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, ap);
1922 0 : va_end(ap);
1923 :
1924 0 : if (AH->public.exit_on_error)
1925 0 : exit_nicely(1);
1926 : else
1927 0 : AH->public.n_errors++;
1928 0 : }
1929 :
1930 : #ifdef NOT_USED
1931 :
1932 : static void
1933 : _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1934 : {
1935 : /* Unlink te from list */
1936 : te->prev->next = te->next;
1937 : te->next->prev = te->prev;
1938 :
1939 : /* and insert it after "pos" */
1940 : te->prev = pos;
1941 : te->next = pos->next;
1942 : pos->next->prev = te;
1943 : pos->next = te;
1944 : }
1945 : #endif
1946 :
1947 : static void
1948 0 : _moveBefore(TocEntry *pos, TocEntry *te)
1949 : {
1950 : /* Unlink te from list */
1951 0 : te->prev->next = te->next;
1952 0 : te->next->prev = te->prev;
1953 :
1954 : /* and insert it before "pos" */
1955 0 : te->prev = pos->prev;
1956 0 : te->next = pos;
1957 0 : pos->prev->next = te;
1958 0 : pos->prev = te;
1959 0 : }
1960 :
1961 : /*
1962 : * Build index arrays for the TOC list
1963 : *
1964 : * This should be invoked only after we have created or read in all the TOC
1965 : * items.
1966 : *
1967 : * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1968 : * array entries run only up to maxDumpId. We might see dependency dump IDs
1969 : * beyond that (if the dump was partial); so always check the array bound
1970 : * before trying to touch an array entry.
1971 : */
1972 : static void
1973 616 : buildTocEntryArrays(ArchiveHandle *AH)
1974 : {
1975 616 : DumpId maxDumpId = AH->maxDumpId;
1976 : TocEntry *te;
1977 :
1978 616 : AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1979 616 : AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1980 :
1981 133838 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1982 : {
1983 : /* this check is purely paranoia, maxDumpId should be correct */
1984 133222 : if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1985 0 : pg_fatal("bad dumpId");
1986 :
1987 : /* tocsByDumpId indexes all TOCs by their dump ID */
1988 133222 : AH->tocsByDumpId[te->dumpId] = te;
1989 :
1990 : /*
1991 : * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1992 : * TOC entry that has a DATA item. We compute this by reversing the
1993 : * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1994 : * just one dependency and it is the TABLE item.
1995 : */
1996 133222 : if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1997 : {
1998 14084 : DumpId tableId = te->dependencies[0];
1999 :
2000 : /*
2001 : * The TABLE item might not have been in the archive, if this was
2002 : * a data-only dump; but its dump ID should be less than its data
2003 : * item's dump ID, so there should be a place for it in the array.
2004 : */
2005 14084 : if (tableId <= 0 || tableId > maxDumpId)
2006 0 : pg_fatal("bad table dumpId for TABLE DATA item");
2007 :
2008 14084 : AH->tableDataId[tableId] = te->dumpId;
2009 : }
2010 : }
2011 616 : }
2012 :
2013 : TocEntry *
2014 38226 : getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
2015 : {
2016 : /* build index arrays if we didn't already */
2017 38226 : if (AH->tocsByDumpId == NULL)
2018 158 : buildTocEntryArrays(AH);
2019 :
2020 38226 : if (id > 0 && id <= AH->maxDumpId)
2021 38226 : return AH->tocsByDumpId[id];
2022 :
2023 0 : return NULL;
2024 : }
2025 :
2026 : int
2027 32054 : TocIDRequired(ArchiveHandle *AH, DumpId id)
2028 : {
2029 32054 : TocEntry *te = getTocEntryByDumpId(AH, id);
2030 :
2031 32054 : if (!te)
2032 15360 : return 0;
2033 :
2034 16694 : return te->reqs;
2035 : }
2036 :
2037 : size_t
2038 12280 : WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
2039 : {
2040 : int off;
2041 :
2042 : /* Save the flag */
2043 12280 : AH->WriteBytePtr(AH, wasSet);
2044 :
2045 : /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2046 110520 : for (off = 0; off < sizeof(pgoff_t); off++)
2047 : {
2048 98240 : AH->WriteBytePtr(AH, o & 0xFF);
2049 98240 : o >>= 8;
2050 : }
2051 12280 : return sizeof(pgoff_t) + 1;
2052 : }
2053 :
2054 : int
2055 12074 : ReadOffset(ArchiveHandle *AH, pgoff_t * o)
2056 : {
2057 : int i;
2058 : int off;
2059 : int offsetFlg;
2060 :
2061 : /* Initialize to zero */
2062 12074 : *o = 0;
2063 :
2064 : /* Check for old version */
2065 12074 : if (AH->version < K_VERS_1_7)
2066 : {
2067 : /* Prior versions wrote offsets using WriteInt */
2068 0 : i = ReadInt(AH);
2069 : /* -1 means not set */
2070 0 : if (i < 0)
2071 0 : return K_OFFSET_POS_NOT_SET;
2072 0 : else if (i == 0)
2073 0 : return K_OFFSET_NO_DATA;
2074 :
2075 : /* Cast to pgoff_t because it was written as an int. */
2076 0 : *o = (pgoff_t) i;
2077 0 : return K_OFFSET_POS_SET;
2078 : }
2079 :
2080 : /*
2081 : * Read the flag indicating the state of the data pointer. Check if valid
2082 : * and die if not.
2083 : *
2084 : * This used to be handled by a negative or zero pointer, now we use an
2085 : * extra byte specifically for the state.
2086 : */
2087 12074 : offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2088 :
2089 12074 : switch (offsetFlg)
2090 : {
2091 12074 : case K_OFFSET_POS_NOT_SET:
2092 : case K_OFFSET_NO_DATA:
2093 : case K_OFFSET_POS_SET:
2094 :
2095 12074 : break;
2096 :
2097 0 : default:
2098 0 : pg_fatal("unexpected data offset flag %d", offsetFlg);
2099 : }
2100 :
2101 : /*
2102 : * Read the bytes
2103 : */
2104 108666 : for (off = 0; off < AH->offSize; off++)
2105 : {
2106 96592 : if (off < sizeof(pgoff_t))
2107 96592 : *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2108 : else
2109 : {
2110 0 : if (AH->ReadBytePtr(AH) != 0)
2111 0 : pg_fatal("file offset in dump file is too large");
2112 : }
2113 : }
2114 :
2115 12074 : return offsetFlg;
2116 : }
2117 :
2118 : size_t
2119 474630 : WriteInt(ArchiveHandle *AH, int i)
2120 : {
2121 : int b;
2122 :
2123 : /*
2124 : * This is a bit yucky, but I don't want to make the binary format very
2125 : * dependent on representation, and not knowing much about it, I write out
2126 : * a sign byte. If you change this, don't forget to change the file
2127 : * version #, and modify ReadInt to read the new format AS WELL AS the old
2128 : * formats.
2129 : */
2130 :
2131 : /* SIGN byte */
2132 474630 : if (i < 0)
2133 : {
2134 112320 : AH->WriteBytePtr(AH, 1);
2135 112320 : i = -i;
2136 : }
2137 : else
2138 362310 : AH->WriteBytePtr(AH, 0);
2139 :
2140 2373150 : for (b = 0; b < AH->intSize; b++)
2141 : {
2142 1898520 : AH->WriteBytePtr(AH, i & 0xFF);
2143 1898520 : i >>= 8;
2144 : }
2145 :
2146 474630 : return AH->intSize + 1;
2147 : }
2148 :
2149 : int
2150 485460 : ReadInt(ArchiveHandle *AH)
2151 : {
2152 485460 : int res = 0;
2153 : int bv,
2154 : b;
2155 485460 : int sign = 0; /* Default positive */
2156 485460 : int bitShift = 0;
2157 :
2158 485460 : if (AH->version > K_VERS_1_0)
2159 : /* Read a sign byte */
2160 485460 : sign = AH->ReadBytePtr(AH);
2161 :
2162 2427300 : for (b = 0; b < AH->intSize; b++)
2163 : {
2164 1941840 : bv = AH->ReadBytePtr(AH) & 0xFF;
2165 1941840 : if (bv != 0)
2166 455120 : res = res + (bv << bitShift);
2167 1941840 : bitShift += 8;
2168 : }
2169 :
2170 485460 : if (sign)
2171 114380 : res = -res;
2172 :
2173 485460 : return res;
2174 : }
2175 :
2176 : size_t
2177 372874 : WriteStr(ArchiveHandle *AH, const char *c)
2178 : {
2179 : size_t res;
2180 :
2181 372874 : if (c)
2182 : {
2183 260554 : int len = strlen(c);
2184 :
2185 260554 : res = WriteInt(AH, len);
2186 260554 : AH->WriteBufPtr(AH, c, len);
2187 260554 : res += len;
2188 : }
2189 : else
2190 112320 : res = WriteInt(AH, -1);
2191 :
2192 372874 : return res;
2193 : }
2194 :
2195 : char *
2196 381718 : ReadStr(ArchiveHandle *AH)
2197 : {
2198 : char *buf;
2199 : int l;
2200 :
2201 381718 : l = ReadInt(AH);
2202 381718 : if (l < 0)
2203 114380 : buf = NULL;
2204 : else
2205 : {
2206 267338 : buf = (char *) pg_malloc(l + 1);
2207 267338 : AH->ReadBufPtr(AH, buf, l);
2208 :
2209 267338 : buf[l] = '\0';
2210 : }
2211 :
2212 381718 : return buf;
2213 : }
2214 :
2215 : static bool
2216 24 : _fileExistsInDirectory(const char *dir, const char *filename)
2217 : {
2218 : struct stat st;
2219 : char buf[MAXPGPATH];
2220 :
2221 24 : if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2222 0 : pg_fatal("directory name too long: \"%s\"", dir);
2223 :
2224 24 : return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2225 : }
2226 :
2227 : static int
2228 86 : _discoverArchiveFormat(ArchiveHandle *AH)
2229 : {
2230 : FILE *fh;
2231 : char sig[6]; /* More than enough */
2232 : size_t cnt;
2233 86 : int wantClose = 0;
2234 :
2235 86 : pg_log_debug("attempting to ascertain archive format");
2236 :
2237 86 : free(AH->lookahead);
2238 :
2239 86 : AH->readHeader = 0;
2240 86 : AH->lookaheadSize = 512;
2241 86 : AH->lookahead = pg_malloc0(512);
2242 86 : AH->lookaheadLen = 0;
2243 86 : AH->lookaheadPos = 0;
2244 :
2245 86 : if (AH->fSpec)
2246 : {
2247 : struct stat st;
2248 :
2249 86 : wantClose = 1;
2250 :
2251 : /*
2252 : * Check if the specified archive is a directory. If so, check if
2253 : * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2254 : */
2255 86 : if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2256 : {
2257 24 : AH->format = archDirectory;
2258 24 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2259 24 : return AH->format;
2260 : #ifdef HAVE_LIBZ
2261 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2262 0 : return AH->format;
2263 : #endif
2264 : #ifdef USE_LZ4
2265 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2266 0 : return AH->format;
2267 : #endif
2268 : #ifdef USE_ZSTD
2269 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2270 : return AH->format;
2271 : #endif
2272 0 : pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2273 : AH->fSpec);
2274 : fh = NULL; /* keep compiler quiet */
2275 : }
2276 : else
2277 : {
2278 62 : fh = fopen(AH->fSpec, PG_BINARY_R);
2279 62 : if (!fh)
2280 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2281 : }
2282 : }
2283 : else
2284 : {
2285 0 : fh = stdin;
2286 0 : if (!fh)
2287 0 : pg_fatal("could not open input file: %m");
2288 : }
2289 :
2290 62 : if ((cnt = fread(sig, 1, 5, fh)) != 5)
2291 : {
2292 0 : if (ferror(fh))
2293 0 : pg_fatal("could not read input file: %m");
2294 : else
2295 0 : pg_fatal("input file is too short (read %lu, expected 5)",
2296 : (unsigned long) cnt);
2297 : }
2298 :
2299 : /* Save it, just in case we need it later */
2300 62 : memcpy(&AH->lookahead[0], sig, 5);
2301 62 : AH->lookaheadLen = 5;
2302 :
2303 62 : if (strncmp(sig, "PGDMP", 5) == 0)
2304 : {
2305 : /* It's custom format, stop here */
2306 60 : AH->format = archCustom;
2307 60 : AH->readHeader = 1;
2308 : }
2309 : else
2310 : {
2311 : /*
2312 : * *Maybe* we have a tar archive format file or a text dump ... So,
2313 : * read first 512 byte header...
2314 : */
2315 2 : cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2316 : /* read failure is checked below */
2317 2 : AH->lookaheadLen += cnt;
2318 :
2319 2 : if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2320 2 : (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2321 2 : strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2322 : {
2323 : /*
2324 : * looks like it's probably a text format dump. so suggest they
2325 : * try psql
2326 : */
2327 0 : pg_fatal("input file appears to be a text format dump. Please use psql.");
2328 : }
2329 :
2330 2 : if (AH->lookaheadLen != 512)
2331 : {
2332 0 : if (feof(fh))
2333 0 : pg_fatal("input file does not appear to be a valid archive (too short?)");
2334 : else
2335 0 : READ_ERROR_EXIT(fh);
2336 : }
2337 :
2338 2 : if (!isValidTarHeader(AH->lookahead))
2339 0 : pg_fatal("input file does not appear to be a valid archive");
2340 :
2341 2 : AH->format = archTar;
2342 : }
2343 :
2344 : /* Close the file if we opened it */
2345 62 : if (wantClose)
2346 : {
2347 62 : if (fclose(fh) != 0)
2348 0 : pg_fatal("could not close input file: %m");
2349 : /* Forget lookahead, since we'll re-read header after re-opening */
2350 62 : AH->readHeader = 0;
2351 62 : AH->lookaheadLen = 0;
2352 : }
2353 :
2354 62 : return AH->format;
2355 : }
2356 :
2357 :
2358 : /*
2359 : * Allocate an archive handle
2360 : */
2361 : static ArchiveHandle *
2362 720 : _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2363 : const pg_compress_specification compression_spec,
2364 : bool dosync, ArchiveMode mode,
2365 : SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
2366 : {
2367 : ArchiveHandle *AH;
2368 : CompressFileHandle *CFH;
2369 720 : pg_compress_specification out_compress_spec = {0};
2370 :
2371 720 : pg_log_debug("allocating AH for %s, format %d",
2372 : FileSpec ? FileSpec : "(stdio)", fmt);
2373 :
2374 720 : AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2375 :
2376 720 : AH->version = K_VERS_SELF;
2377 :
2378 : /* initialize for backwards compatible string processing */
2379 720 : AH->public.encoding = 0; /* PG_SQL_ASCII */
2380 720 : AH->public.std_strings = false;
2381 :
2382 : /* sql error handling */
2383 720 : AH->public.exit_on_error = true;
2384 720 : AH->public.n_errors = 0;
2385 :
2386 720 : AH->archiveDumpVersion = PG_VERSION;
2387 :
2388 720 : AH->createDate = time(NULL);
2389 :
2390 720 : AH->intSize = sizeof(int);
2391 720 : AH->offSize = sizeof(pgoff_t);
2392 720 : if (FileSpec)
2393 : {
2394 670 : AH->fSpec = pg_strdup(FileSpec);
2395 :
2396 : /*
2397 : * Not used; maybe later....
2398 : *
2399 : * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2400 : * i--) if (AH->workDir[i-1] == '/')
2401 : */
2402 : }
2403 : else
2404 50 : AH->fSpec = NULL;
2405 :
2406 720 : AH->currUser = NULL; /* unknown */
2407 720 : AH->currSchema = NULL; /* ditto */
2408 720 : AH->currTablespace = NULL; /* ditto */
2409 720 : AH->currTableAm = NULL; /* ditto */
2410 :
2411 720 : AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2412 :
2413 720 : AH->toc->next = AH->toc;
2414 720 : AH->toc->prev = AH->toc;
2415 :
2416 720 : AH->mode = mode;
2417 720 : AH->compression_spec = compression_spec;
2418 720 : AH->dosync = dosync;
2419 720 : AH->sync_method = sync_method;
2420 :
2421 720 : memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2422 :
2423 : /* Open stdout with no compression for AH output handle */
2424 720 : out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2425 720 : CFH = InitCompressFileHandle(out_compress_spec);
2426 720 : if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2427 0 : pg_fatal("could not open stdout for appending: %m");
2428 720 : AH->OF = CFH;
2429 :
2430 : /*
2431 : * On Windows, we need to use binary mode to read/write non-text files,
2432 : * which include all archive formats as well as compressed plain text.
2433 : * Force stdin/stdout into binary mode if that is what we are using.
2434 : */
2435 : #ifdef WIN32
2436 : if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2437 : (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2438 : {
2439 : if (mode == archModeWrite)
2440 : _setmode(fileno(stdout), O_BINARY);
2441 : else
2442 : _setmode(fileno(stdin), O_BINARY);
2443 : }
2444 : #endif
2445 :
2446 720 : AH->SetupWorkerPtr = setupWorkerPtr;
2447 :
2448 720 : if (fmt == archUnknown)
2449 86 : AH->format = _discoverArchiveFormat(AH);
2450 : else
2451 634 : AH->format = fmt;
2452 :
2453 720 : switch (AH->format)
2454 : {
2455 180 : case archCustom:
2456 180 : InitArchiveFmt_Custom(AH);
2457 180 : break;
2458 :
2459 300 : case archNull:
2460 300 : InitArchiveFmt_Null(AH);
2461 300 : break;
2462 :
2463 198 : case archDirectory:
2464 198 : InitArchiveFmt_Directory(AH);
2465 198 : break;
2466 :
2467 42 : case archTar:
2468 42 : InitArchiveFmt_Tar(AH);
2469 40 : break;
2470 :
2471 0 : default:
2472 0 : pg_fatal("unrecognized file format \"%d\"", AH->format);
2473 : }
2474 :
2475 718 : return AH;
2476 : }
2477 :
2478 : /*
2479 : * Write out all data (tables & LOs)
2480 : */
2481 : void
2482 206 : WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2483 : {
2484 : TocEntry *te;
2485 :
2486 206 : if (pstate && pstate->numWorkers > 1)
2487 18 : {
2488 : /*
2489 : * In parallel mode, this code runs in the leader process. We
2490 : * construct an array of candidate TEs, then sort it into decreasing
2491 : * size order, then dispatch each TE to a data-transfer worker. By
2492 : * dumping larger tables first, we avoid getting into a situation
2493 : * where we're down to one job and it's big, losing parallelism.
2494 : */
2495 : TocEntry **tes;
2496 : int ntes;
2497 :
2498 18 : tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2499 18 : ntes = 0;
2500 9968 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2501 : {
2502 : /* Consider only TEs with dataDumper functions ... */
2503 9950 : if (!te->dataDumper)
2504 8452 : continue;
2505 : /* ... and ignore ones not enabled for dump */
2506 1498 : if ((te->reqs & REQ_DATA) == 0)
2507 0 : continue;
2508 :
2509 1498 : tes[ntes++] = te;
2510 : }
2511 :
2512 18 : if (ntes > 1)
2513 16 : qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2514 :
2515 1516 : for (int i = 0; i < ntes; i++)
2516 1498 : DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2517 : mark_dump_job_done, NULL);
2518 :
2519 18 : pg_free(tes);
2520 :
2521 : /* Now wait for workers to finish. */
2522 18 : WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2523 : }
2524 : else
2525 : {
2526 : /* Non-parallel mode: just dump all candidate TEs sequentially. */
2527 12548 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2528 : {
2529 : /* Must have same filter conditions as above */
2530 12360 : if (!te->dataDumper)
2531 11770 : continue;
2532 590 : if ((te->reqs & REQ_DATA) == 0)
2533 6 : continue;
2534 :
2535 584 : WriteDataChunksForTocEntry(AH, te);
2536 : }
2537 : }
2538 206 : }
2539 :
2540 :
2541 : /*
2542 : * Callback function that's invoked in the leader process after a step has
2543 : * been parallel dumped.
2544 : *
2545 : * We don't need to do anything except check for worker failure.
2546 : */
2547 : static void
2548 1498 : mark_dump_job_done(ArchiveHandle *AH,
2549 : TocEntry *te,
2550 : int status,
2551 : void *callback_data)
2552 : {
2553 1498 : pg_log_info("finished item %d %s %s",
2554 : te->dumpId, te->desc, te->tag);
2555 :
2556 1498 : if (status != 0)
2557 0 : pg_fatal("worker process failed: exit code %d",
2558 : status);
2559 1498 : }
2560 :
2561 :
2562 : void
2563 2082 : WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2564 : {
2565 : StartDataPtrType startPtr;
2566 : EndDataPtrType endPtr;
2567 :
2568 2082 : AH->currToc = te;
2569 :
2570 2082 : if (strcmp(te->desc, "BLOBS") == 0)
2571 : {
2572 34 : startPtr = AH->StartLOsPtr;
2573 34 : endPtr = AH->EndLOsPtr;
2574 : }
2575 : else
2576 : {
2577 2048 : startPtr = AH->StartDataPtr;
2578 2048 : endPtr = AH->EndDataPtr;
2579 : }
2580 :
2581 2082 : if (startPtr != NULL)
2582 2082 : (*startPtr) (AH, te);
2583 :
2584 : /*
2585 : * The user-provided DataDumper routine needs to call AH->WriteData
2586 : */
2587 2082 : te->dataDumper((Archive *) AH, te->dataDumperArg);
2588 :
2589 2082 : if (endPtr != NULL)
2590 2082 : (*endPtr) (AH, te);
2591 :
2592 2082 : AH->currToc = NULL;
2593 2082 : }
2594 :
2595 : void
2596 234 : WriteToc(ArchiveHandle *AH)
2597 : {
2598 : TocEntry *te;
2599 : char workbuf[32];
2600 : int tocCount;
2601 : int i;
2602 :
2603 : /* count entries that will actually be dumped */
2604 234 : tocCount = 0;
2605 25038 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2606 : {
2607 24804 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) != 0)
2608 24798 : tocCount++;
2609 : }
2610 :
2611 : /* printf("%d TOC Entries to save\n", tocCount); */
2612 :
2613 234 : WriteInt(AH, tocCount);
2614 :
2615 25038 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2616 : {
2617 24804 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS | REQ_SPECIAL)) == 0)
2618 6 : continue;
2619 :
2620 24798 : WriteInt(AH, te->dumpId);
2621 24798 : WriteInt(AH, te->dataDumper ? 1 : 0);
2622 :
2623 : /* OID is recorded as a string for historical reasons */
2624 24798 : sprintf(workbuf, "%u", te->catalogId.tableoid);
2625 24798 : WriteStr(AH, workbuf);
2626 24798 : sprintf(workbuf, "%u", te->catalogId.oid);
2627 24798 : WriteStr(AH, workbuf);
2628 :
2629 24798 : WriteStr(AH, te->tag);
2630 24798 : WriteStr(AH, te->desc);
2631 24798 : WriteInt(AH, te->section);
2632 :
2633 24798 : if (te->defnLen)
2634 : {
2635 : /*
2636 : * defnLen should only be set for custom format's second call to
2637 : * WriteToc(), which rewrites the TOC in place to update data
2638 : * offsets. Instead of calling the defnDumper a second time
2639 : * (which could involve re-executing queries), just skip writing
2640 : * the entry. While regenerating the definition should
2641 : * theoretically produce the same result as before, it's expensive
2642 : * and feels risky.
2643 : *
2644 : * The custom format only calls WriteToc() a second time if
2645 : * fseeko() is usable (see _CloseArchive() in pg_backup_custom.c),
2646 : * so we can safely use it without checking. For other formats,
2647 : * we fail because one of our assumptions must no longer hold
2648 : * true.
2649 : *
2650 : * XXX This is a layering violation, but the alternative is an
2651 : * awkward and complicated callback infrastructure for this
2652 : * special case. This might be worth revisiting in the future.
2653 : */
2654 450 : if (AH->format != archCustom)
2655 0 : pg_fatal("unexpected TOC entry in WriteToc(): %d %s %s",
2656 : te->dumpId, te->desc, te->tag);
2657 :
2658 450 : if (fseeko(AH->FH, te->defnLen, SEEK_CUR != 0))
2659 0 : pg_fatal("error during file seek: %m");
2660 : }
2661 24348 : else if (te->defnDumper)
2662 : {
2663 5530 : char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
2664 :
2665 5530 : te->defnLen = WriteStr(AH, defn);
2666 5530 : pg_free(defn);
2667 : }
2668 : else
2669 18818 : WriteStr(AH, te->defn);
2670 :
2671 24798 : WriteStr(AH, te->dropStmt);
2672 24798 : WriteStr(AH, te->copyStmt);
2673 24798 : WriteStr(AH, te->namespace);
2674 24798 : WriteStr(AH, te->tablespace);
2675 24798 : WriteStr(AH, te->tableam);
2676 24798 : WriteInt(AH, te->relkind);
2677 24798 : WriteStr(AH, te->owner);
2678 24798 : WriteStr(AH, "false");
2679 :
2680 : /* Dump list of dependencies */
2681 62612 : for (i = 0; i < te->nDeps; i++)
2682 : {
2683 37814 : sprintf(workbuf, "%d", te->dependencies[i]);
2684 37814 : WriteStr(AH, workbuf);
2685 : }
2686 24798 : WriteStr(AH, NULL); /* Terminate List */
2687 :
2688 24798 : if (AH->WriteExtraTocPtr)
2689 24798 : AH->WriteExtraTocPtr(AH, te);
2690 : }
2691 234 : }
2692 :
2693 : void
2694 212 : ReadToc(ArchiveHandle *AH)
2695 : {
2696 : int i;
2697 : char *tmp;
2698 : DumpId *deps;
2699 : int depIdx;
2700 : int depSize;
2701 : TocEntry *te;
2702 : bool is_supported;
2703 :
2704 212 : AH->tocCount = ReadInt(AH);
2705 212 : AH->maxDumpId = 0;
2706 :
2707 25512 : for (i = 0; i < AH->tocCount; i++)
2708 : {
2709 25300 : te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2710 25300 : te->dumpId = ReadInt(AH);
2711 :
2712 25300 : if (te->dumpId > AH->maxDumpId)
2713 9770 : AH->maxDumpId = te->dumpId;
2714 :
2715 : /* Sanity check */
2716 25300 : if (te->dumpId <= 0)
2717 0 : pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2718 : te->dumpId);
2719 :
2720 25300 : te->hadDumper = ReadInt(AH);
2721 :
2722 25300 : if (AH->version >= K_VERS_1_8)
2723 : {
2724 25300 : tmp = ReadStr(AH);
2725 25300 : sscanf(tmp, "%u", &te->catalogId.tableoid);
2726 25300 : free(tmp);
2727 : }
2728 : else
2729 0 : te->catalogId.tableoid = InvalidOid;
2730 25300 : tmp = ReadStr(AH);
2731 25300 : sscanf(tmp, "%u", &te->catalogId.oid);
2732 25300 : free(tmp);
2733 :
2734 25300 : te->tag = ReadStr(AH);
2735 25300 : te->desc = ReadStr(AH);
2736 :
2737 25300 : if (AH->version >= K_VERS_1_11)
2738 : {
2739 25300 : te->section = ReadInt(AH);
2740 : }
2741 : else
2742 : {
2743 : /*
2744 : * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2745 : * the entries into sections. This list need not cover entry
2746 : * types added later than 8.4.
2747 : */
2748 0 : if (strcmp(te->desc, "COMMENT") == 0 ||
2749 0 : strcmp(te->desc, "ACL") == 0 ||
2750 0 : strcmp(te->desc, "ACL LANGUAGE") == 0)
2751 0 : te->section = SECTION_NONE;
2752 0 : else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2753 0 : strcmp(te->desc, "BLOBS") == 0 ||
2754 0 : strcmp(te->desc, "BLOB COMMENTS") == 0)
2755 0 : te->section = SECTION_DATA;
2756 0 : else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2757 0 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2758 0 : strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2759 0 : strcmp(te->desc, "INDEX") == 0 ||
2760 0 : strcmp(te->desc, "RULE") == 0 ||
2761 0 : strcmp(te->desc, "TRIGGER") == 0)
2762 0 : te->section = SECTION_POST_DATA;
2763 : else
2764 0 : te->section = SECTION_PRE_DATA;
2765 : }
2766 :
2767 25300 : te->defn = ReadStr(AH);
2768 25300 : te->dropStmt = ReadStr(AH);
2769 :
2770 25300 : if (AH->version >= K_VERS_1_3)
2771 25300 : te->copyStmt = ReadStr(AH);
2772 :
2773 25300 : if (AH->version >= K_VERS_1_6)
2774 25300 : te->namespace = ReadStr(AH);
2775 :
2776 25300 : if (AH->version >= K_VERS_1_10)
2777 25300 : te->tablespace = ReadStr(AH);
2778 :
2779 25300 : if (AH->version >= K_VERS_1_14)
2780 25300 : te->tableam = ReadStr(AH);
2781 :
2782 25300 : if (AH->version >= K_VERS_1_16)
2783 25300 : te->relkind = ReadInt(AH);
2784 :
2785 25300 : te->owner = ReadStr(AH);
2786 25300 : is_supported = true;
2787 25300 : if (AH->version < K_VERS_1_9)
2788 0 : is_supported = false;
2789 : else
2790 : {
2791 25300 : tmp = ReadStr(AH);
2792 :
2793 25300 : if (strcmp(tmp, "true") == 0)
2794 0 : is_supported = false;
2795 :
2796 25300 : free(tmp);
2797 : }
2798 :
2799 25300 : if (!is_supported)
2800 0 : pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2801 :
2802 : /* Read TOC entry dependencies */
2803 25300 : if (AH->version >= K_VERS_1_5)
2804 : {
2805 25300 : depSize = 100;
2806 25300 : deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2807 25300 : depIdx = 0;
2808 : for (;;)
2809 : {
2810 64256 : tmp = ReadStr(AH);
2811 64256 : if (!tmp)
2812 25300 : break; /* end of list */
2813 38956 : if (depIdx >= depSize)
2814 : {
2815 0 : depSize *= 2;
2816 0 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2817 : }
2818 38956 : sscanf(tmp, "%d", &deps[depIdx]);
2819 38956 : free(tmp);
2820 38956 : depIdx++;
2821 : }
2822 :
2823 25300 : if (depIdx > 0) /* We have a non-null entry */
2824 : {
2825 20546 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2826 20546 : te->dependencies = deps;
2827 20546 : te->nDeps = depIdx;
2828 : }
2829 : else
2830 : {
2831 4754 : free(deps);
2832 4754 : te->dependencies = NULL;
2833 4754 : te->nDeps = 0;
2834 : }
2835 : }
2836 : else
2837 : {
2838 0 : te->dependencies = NULL;
2839 0 : te->nDeps = 0;
2840 : }
2841 25300 : te->dataLength = 0;
2842 :
2843 25300 : if (AH->ReadExtraTocPtr)
2844 25300 : AH->ReadExtraTocPtr(AH, te);
2845 :
2846 25300 : pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2847 : i, te->dumpId, te->desc, te->tag);
2848 :
2849 : /* link completed entry into TOC circular list */
2850 25300 : te->prev = AH->toc->prev;
2851 25300 : AH->toc->prev->next = te;
2852 25300 : AH->toc->prev = te;
2853 25300 : te->next = AH->toc;
2854 :
2855 : /* special processing immediately upon read for some items */
2856 25300 : if (strcmp(te->desc, "ENCODING") == 0)
2857 212 : processEncodingEntry(AH, te);
2858 25088 : else if (strcmp(te->desc, "STDSTRINGS") == 0)
2859 212 : processStdStringsEntry(AH, te);
2860 24876 : else if (strcmp(te->desc, "SEARCHPATH") == 0)
2861 212 : processSearchPathEntry(AH, te);
2862 : }
2863 212 : }
2864 :
2865 : static void
2866 212 : processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2867 : {
2868 : /* te->defn should have the form SET client_encoding = 'foo'; */
2869 212 : char *defn = pg_strdup(te->defn);
2870 : char *ptr1;
2871 212 : char *ptr2 = NULL;
2872 : int encoding;
2873 :
2874 212 : ptr1 = strchr(defn, '\'');
2875 212 : if (ptr1)
2876 212 : ptr2 = strchr(++ptr1, '\'');
2877 212 : if (ptr2)
2878 : {
2879 212 : *ptr2 = '\0';
2880 212 : encoding = pg_char_to_encoding(ptr1);
2881 212 : if (encoding < 0)
2882 0 : pg_fatal("unrecognized encoding \"%s\"",
2883 : ptr1);
2884 212 : AH->public.encoding = encoding;
2885 212 : setFmtEncoding(encoding);
2886 : }
2887 : else
2888 0 : pg_fatal("invalid ENCODING item: %s",
2889 : te->defn);
2890 :
2891 212 : free(defn);
2892 212 : }
2893 :
2894 : static void
2895 212 : processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2896 : {
2897 : /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2898 : char *ptr1;
2899 :
2900 212 : ptr1 = strchr(te->defn, '\'');
2901 212 : if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2902 212 : AH->public.std_strings = true;
2903 0 : else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2904 0 : AH->public.std_strings = false;
2905 : else
2906 0 : pg_fatal("invalid STDSTRINGS item: %s",
2907 : te->defn);
2908 212 : }
2909 :
2910 : static void
2911 212 : processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2912 : {
2913 : /*
2914 : * te->defn should contain a command to set search_path. We just copy it
2915 : * verbatim for use later.
2916 : */
2917 212 : AH->public.searchpath = pg_strdup(te->defn);
2918 212 : }
2919 :
2920 : static void
2921 0 : StrictNamesCheck(RestoreOptions *ropt)
2922 : {
2923 : const char *missing_name;
2924 :
2925 : Assert(ropt->strict_names);
2926 :
2927 0 : if (ropt->schemaNames.head != NULL)
2928 : {
2929 0 : missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2930 0 : if (missing_name != NULL)
2931 0 : pg_fatal("schema \"%s\" not found", missing_name);
2932 : }
2933 :
2934 0 : if (ropt->tableNames.head != NULL)
2935 : {
2936 0 : missing_name = simple_string_list_not_touched(&ropt->tableNames);
2937 0 : if (missing_name != NULL)
2938 0 : pg_fatal("table \"%s\" not found", missing_name);
2939 : }
2940 :
2941 0 : if (ropt->indexNames.head != NULL)
2942 : {
2943 0 : missing_name = simple_string_list_not_touched(&ropt->indexNames);
2944 0 : if (missing_name != NULL)
2945 0 : pg_fatal("index \"%s\" not found", missing_name);
2946 : }
2947 :
2948 0 : if (ropt->functionNames.head != NULL)
2949 : {
2950 0 : missing_name = simple_string_list_not_touched(&ropt->functionNames);
2951 0 : if (missing_name != NULL)
2952 0 : pg_fatal("function \"%s\" not found", missing_name);
2953 : }
2954 :
2955 0 : if (ropt->triggerNames.head != NULL)
2956 : {
2957 0 : missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2958 0 : if (missing_name != NULL)
2959 0 : pg_fatal("trigger \"%s\" not found", missing_name);
2960 : }
2961 0 : }
2962 :
2963 : /*
2964 : * Determine whether we want to restore this TOC entry.
2965 : *
2966 : * Returns 0 if entry should be skipped, or some combination of the
2967 : * REQ_SCHEMA, REQ_DATA, and REQ_STATS bits if we want to restore schema, data
2968 : * and/or statistics portions of this TOC entry, or REQ_SPECIAL if it's a
2969 : * special entry.
2970 : */
2971 : static int
2972 136492 : _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2973 : {
2974 136492 : int res = REQ_SCHEMA | REQ_DATA;
2975 136492 : RestoreOptions *ropt = AH->public.ropt;
2976 :
2977 : /* These items are treated specially */
2978 136492 : if (strcmp(te->desc, "ENCODING") == 0 ||
2979 135812 : strcmp(te->desc, "STDSTRINGS") == 0 ||
2980 135132 : strcmp(te->desc, "SEARCHPATH") == 0)
2981 2040 : return REQ_SPECIAL;
2982 :
2983 134452 : if (strcmp(te->desc, "STATISTICS DATA") == 0)
2984 : {
2985 24602 : if (!ropt->dumpStatistics)
2986 0 : return 0;
2987 :
2988 24602 : res = REQ_STATS;
2989 : }
2990 :
2991 : /*
2992 : * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2993 : * restored in createDB mode, and not restored otherwise, independently of
2994 : * all else.
2995 : */
2996 134452 : if (strcmp(te->desc, "DATABASE") == 0 ||
2997 133976 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2998 : {
2999 626 : if (ropt->createDB)
3000 570 : return REQ_SCHEMA;
3001 : else
3002 56 : return 0;
3003 : }
3004 :
3005 : /*
3006 : * Process exclusions that affect certain classes of TOC entries.
3007 : */
3008 :
3009 : /* If it's an ACL, maybe ignore it */
3010 133826 : if (ropt->aclsSkip && _tocEntryIsACL(te))
3011 0 : return 0;
3012 :
3013 : /* If it's a comment, maybe ignore it */
3014 133826 : if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
3015 0 : return 0;
3016 :
3017 : /* If it's a policy, maybe ignore it */
3018 133826 : if (ropt->no_policies &&
3019 690 : (strcmp(te->desc, "POLICY") == 0 ||
3020 690 : strcmp(te->desc, "ROW SECURITY") == 0))
3021 0 : return 0;
3022 :
3023 : /*
3024 : * If it's a publication or a table part of a publication, maybe ignore
3025 : * it.
3026 : */
3027 133826 : if (ropt->no_publications &&
3028 0 : (strcmp(te->desc, "PUBLICATION") == 0 ||
3029 0 : strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
3030 0 : strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
3031 0 : return 0;
3032 :
3033 : /* If it's a security label, maybe ignore it */
3034 133826 : if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
3035 0 : return 0;
3036 :
3037 : /* If it's a subscription, maybe ignore it */
3038 133826 : if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
3039 0 : return 0;
3040 :
3041 : /* Ignore it if section is not to be dumped/restored */
3042 133826 : switch (curSection)
3043 : {
3044 74098 : case SECTION_PRE_DATA:
3045 74098 : if (!(ropt->dumpSections & DUMP_PRE_DATA))
3046 712 : return 0;
3047 73386 : break;
3048 32734 : case SECTION_DATA:
3049 32734 : if (!(ropt->dumpSections & DUMP_DATA))
3050 328 : return 0;
3051 32406 : break;
3052 26994 : case SECTION_POST_DATA:
3053 26994 : if (!(ropt->dumpSections & DUMP_POST_DATA))
3054 448 : return 0;
3055 26546 : break;
3056 0 : default:
3057 : /* shouldn't get here, really, but ignore it */
3058 0 : return 0;
3059 : }
3060 :
3061 : /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
3062 132338 : if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
3063 0 : return 0;
3064 :
3065 : /*
3066 : * Check options for selective dump/restore.
3067 : */
3068 132338 : if (strcmp(te->desc, "ACL") == 0 ||
3069 126890 : strcmp(te->desc, "COMMENT") == 0 ||
3070 113304 : strcmp(te->desc, "STATISTICS DATA") == 0 ||
3071 88974 : strcmp(te->desc, "SECURITY LABEL") == 0)
3072 : {
3073 : /* Database properties react to createDB, not selectivity options. */
3074 43364 : if (strncmp(te->tag, "DATABASE ", 9) == 0)
3075 : {
3076 296 : if (!ropt->createDB)
3077 38 : return 0;
3078 : }
3079 43068 : else if (ropt->schemaNames.head != NULL ||
3080 43056 : ropt->schemaExcludeNames.head != NULL ||
3081 43044 : ropt->selTypes)
3082 : {
3083 : /*
3084 : * In a selective dump/restore, we want to restore these dependent
3085 : * TOC entry types only if their parent object is being restored.
3086 : * Without selectivity options, we let through everything in the
3087 : * archive. Note there may be such entries with no parent, eg
3088 : * non-default ACLs for built-in objects. Also, we make
3089 : * per-column ACLs additionally depend on the table's ACL if any
3090 : * to ensure correct restore order, so those dependencies should
3091 : * be ignored in this check.
3092 : *
3093 : * This code depends on the parent having been marked already,
3094 : * which should be the case; if it isn't, perhaps due to
3095 : * SortTocFromFile rearrangement, skipping the dependent entry
3096 : * seems prudent anyway.
3097 : *
3098 : * Ideally we'd handle, eg, table CHECK constraints this way too.
3099 : * But it's hard to tell which of their dependencies is the one to
3100 : * consult.
3101 : */
3102 76 : bool dumpthis = false;
3103 :
3104 196 : for (int i = 0; i < te->nDeps; i++)
3105 : {
3106 136 : TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3107 :
3108 136 : if (!pte)
3109 60 : continue; /* probably shouldn't happen */
3110 76 : if (strcmp(pte->desc, "ACL") == 0)
3111 0 : continue; /* ignore dependency on another ACL */
3112 76 : if (pte->reqs == 0)
3113 60 : continue; /* this object isn't marked, so ignore it */
3114 : /* Found a parent to be dumped, so we want to dump this too */
3115 16 : dumpthis = true;
3116 16 : break;
3117 : }
3118 76 : if (!dumpthis)
3119 60 : return 0;
3120 : }
3121 : }
3122 : else
3123 : {
3124 : /* Apply selective-restore rules for standalone TOC entries. */
3125 88974 : if (ropt->schemaNames.head != NULL)
3126 : {
3127 : /* If no namespace is specified, it means all. */
3128 40 : if (!te->namespace)
3129 4 : return 0;
3130 36 : if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3131 28 : return 0;
3132 : }
3133 :
3134 88942 : if (ropt->schemaExcludeNames.head != NULL &&
3135 76 : te->namespace &&
3136 36 : simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3137 8 : return 0;
3138 :
3139 88934 : if (ropt->selTypes)
3140 : {
3141 156 : if (strcmp(te->desc, "TABLE") == 0 ||
3142 116 : strcmp(te->desc, "TABLE DATA") == 0 ||
3143 76 : strcmp(te->desc, "VIEW") == 0 ||
3144 76 : strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3145 76 : strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3146 76 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3147 76 : strcmp(te->desc, "SEQUENCE") == 0 ||
3148 70 : strcmp(te->desc, "SEQUENCE SET") == 0)
3149 : {
3150 92 : if (!ropt->selTable)
3151 60 : return 0;
3152 32 : if (ropt->tableNames.head != NULL &&
3153 32 : !simple_string_list_member(&ropt->tableNames, te->tag))
3154 28 : return 0;
3155 : }
3156 64 : else if (strcmp(te->desc, "INDEX") == 0)
3157 : {
3158 12 : if (!ropt->selIndex)
3159 8 : return 0;
3160 4 : if (ropt->indexNames.head != NULL &&
3161 4 : !simple_string_list_member(&ropt->indexNames, te->tag))
3162 2 : return 0;
3163 : }
3164 52 : else if (strcmp(te->desc, "FUNCTION") == 0 ||
3165 28 : strcmp(te->desc, "AGGREGATE") == 0 ||
3166 28 : strcmp(te->desc, "PROCEDURE") == 0)
3167 : {
3168 24 : if (!ropt->selFunction)
3169 8 : return 0;
3170 16 : if (ropt->functionNames.head != NULL &&
3171 16 : !simple_string_list_member(&ropt->functionNames, te->tag))
3172 12 : return 0;
3173 : }
3174 28 : else if (strcmp(te->desc, "TRIGGER") == 0)
3175 : {
3176 12 : if (!ropt->selTrigger)
3177 8 : return 0;
3178 4 : if (ropt->triggerNames.head != NULL &&
3179 4 : !simple_string_list_member(&ropt->triggerNames, te->tag))
3180 2 : return 0;
3181 : }
3182 : else
3183 16 : return 0;
3184 : }
3185 : }
3186 :
3187 :
3188 : /*
3189 : * Determine whether the TOC entry contains schema and/or data components,
3190 : * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3191 : * it's both schema and data. Otherwise it's probably schema-only, but
3192 : * there are exceptions.
3193 : */
3194 132056 : if (!te->hadDumper)
3195 : {
3196 : /*
3197 : * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3198 : * is considered a data entry. We don't need to check for BLOBS or
3199 : * old-style BLOB COMMENTS entries, because they will have hadDumper =
3200 : * true ... but we do need to check new-style BLOB ACLs, comments,
3201 : * etc.
3202 : */
3203 117696 : if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3204 116256 : strcmp(te->desc, "BLOB") == 0 ||
3205 116256 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3206 116044 : (strcmp(te->desc, "ACL") == 0 &&
3207 5448 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3208 115950 : (strcmp(te->desc, "COMMENT") == 0 &&
3209 13548 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3210 115814 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3211 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3212 1882 : res = res & REQ_DATA;
3213 : else
3214 115814 : res = res & ~REQ_DATA;
3215 : }
3216 :
3217 : /*
3218 : * If there's no definition command, there's no schema component. Treat
3219 : * "load via partition root" comments as not schema.
3220 : */
3221 132056 : if (!te->defn || !te->defn[0] ||
3222 99474 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
3223 32606 : res = res & ~REQ_SCHEMA;
3224 :
3225 : /*
3226 : * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3227 : * always ignore it.
3228 : */
3229 132056 : if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3230 0 : return 0;
3231 :
3232 : /* Mask it if we don't want data */
3233 132056 : if (!ropt->dumpData)
3234 : {
3235 : /*
3236 : * The sequence_data option overrides dumpData for SEQUENCE SET.
3237 : *
3238 : * In binary-upgrade mode, even with dumpData unset, we do not mask
3239 : * out large objects. (Only large object definitions, comments and
3240 : * other metadata should be generated in binary-upgrade mode, not the
3241 : * actual data, but that need not concern us here.)
3242 : */
3243 8454 : if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3244 8324 : !(ropt->binary_upgrade &&
3245 6956 : (strcmp(te->desc, "BLOB") == 0 ||
3246 6956 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3247 6950 : (strcmp(te->desc, "ACL") == 0 &&
3248 178 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3249 6948 : (strcmp(te->desc, "COMMENT") == 0 &&
3250 166 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3251 6942 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3252 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3253 8310 : res = res & (REQ_SCHEMA | REQ_STATS);
3254 : }
3255 :
3256 : /* Mask it if we don't want schema */
3257 132056 : if (!ropt->dumpSchema)
3258 790 : res = res & (REQ_DATA | REQ_STATS);
3259 :
3260 132056 : return res;
3261 : }
3262 :
3263 : /*
3264 : * Identify which pass we should restore this TOC entry in.
3265 : *
3266 : * See notes with the RestorePass typedef in pg_backup_archiver.h.
3267 : */
3268 : static RestorePass
3269 316724 : _tocEntryRestorePass(TocEntry *te)
3270 : {
3271 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3272 316724 : if (strcmp(te->desc, "ACL") == 0 ||
3273 304900 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3274 304900 : strcmp(te->desc, "DEFAULT ACL") == 0)
3275 12628 : return RESTORE_PASS_ACL;
3276 304096 : if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3277 303842 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3278 2124 : return RESTORE_PASS_POST_ACL;
3279 :
3280 : /*
3281 : * Comments need to be emitted in the same pass as their parent objects.
3282 : * ACLs haven't got comments, and neither do matview data objects, but
3283 : * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3284 : * we'd need yet another weird special case.)
3285 : */
3286 301972 : if (strcmp(te->desc, "COMMENT") == 0 &&
3287 38302 : strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3288 0 : return RESTORE_PASS_POST_ACL;
3289 :
3290 : /*
3291 : * If statistics data is dependent on materialized view data, it must be
3292 : * deferred to RESTORE_PASS_POST_ACL. Those entries are already marked as
3293 : * SECTION_POST_DATA, and some other stats entries (e.g., index stats)
3294 : * will also be marked as SECTION_POST_DATA. Additionally, our lookahead
3295 : * code in fetchAttributeStats() assumes that we dump all statistics data
3296 : * entries in TOC order. To ensure this assumption holds, we move all
3297 : * statistics data entries in SECTION_POST_DATA to RESTORE_PASS_POST_ACL.
3298 : */
3299 301972 : if (strcmp(te->desc, "STATISTICS DATA") == 0 &&
3300 54492 : te->section == SECTION_POST_DATA)
3301 19388 : return RESTORE_PASS_POST_ACL;
3302 :
3303 : /* All else can be handled in the main pass. */
3304 282584 : return RESTORE_PASS_MAIN;
3305 : }
3306 :
3307 : /*
3308 : * Identify TOC entries that are ACLs.
3309 : *
3310 : * Note: it seems worth duplicating some code here to avoid a hard-wired
3311 : * assumption that these are exactly the same entries that we restore during
3312 : * the RESTORE_PASS_ACL phase.
3313 : */
3314 : static bool
3315 109554 : _tocEntryIsACL(TocEntry *te)
3316 : {
3317 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3318 109554 : if (strcmp(te->desc, "ACL") == 0 ||
3319 105522 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3320 105522 : strcmp(te->desc, "DEFAULT ACL") == 0)
3321 4300 : return true;
3322 105254 : return false;
3323 : }
3324 :
3325 : /*
3326 : * Issue SET commands for parameters that we want to have set the same way
3327 : * at all times during execution of a restore script.
3328 : */
3329 : static void
3330 830 : _doSetFixedOutputState(ArchiveHandle *AH)
3331 : {
3332 830 : RestoreOptions *ropt = AH->public.ropt;
3333 :
3334 : /*
3335 : * Disable timeouts to allow for slow commands, idle parallel workers, etc
3336 : */
3337 830 : ahprintf(AH, "SET statement_timeout = 0;\n");
3338 830 : ahprintf(AH, "SET lock_timeout = 0;\n");
3339 830 : ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3340 830 : ahprintf(AH, "SET transaction_timeout = 0;\n");
3341 :
3342 : /* Select the correct character set encoding */
3343 830 : ahprintf(AH, "SET client_encoding = '%s';\n",
3344 : pg_encoding_to_char(AH->public.encoding));
3345 :
3346 : /* Select the correct string literal syntax */
3347 830 : ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3348 830 : AH->public.std_strings ? "on" : "off");
3349 :
3350 : /* Select the role to be used during restore */
3351 830 : if (ropt && ropt->use_role)
3352 0 : ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3353 :
3354 : /* Select the dump-time search_path */
3355 830 : if (AH->public.searchpath)
3356 830 : ahprintf(AH, "%s", AH->public.searchpath);
3357 :
3358 : /* Make sure function checking is disabled */
3359 830 : ahprintf(AH, "SET check_function_bodies = false;\n");
3360 :
3361 : /* Ensure that all valid XML data will be accepted */
3362 830 : ahprintf(AH, "SET xmloption = content;\n");
3363 :
3364 : /* Avoid annoying notices etc */
3365 830 : ahprintf(AH, "SET client_min_messages = warning;\n");
3366 830 : if (!AH->public.std_strings)
3367 0 : ahprintf(AH, "SET escape_string_warning = off;\n");
3368 :
3369 : /* Adjust row-security state */
3370 830 : if (ropt && ropt->enable_row_security)
3371 0 : ahprintf(AH, "SET row_security = on;\n");
3372 : else
3373 830 : ahprintf(AH, "SET row_security = off;\n");
3374 :
3375 : /*
3376 : * In --transaction-size mode, we should always be in a transaction when
3377 : * we begin to restore objects.
3378 : */
3379 830 : if (ropt && ropt->txn_size > 0)
3380 : {
3381 144 : if (AH->connection)
3382 144 : StartTransaction(&AH->public);
3383 : else
3384 0 : ahprintf(AH, "\nBEGIN;\n");
3385 144 : AH->txnCount = 0;
3386 : }
3387 :
3388 830 : ahprintf(AH, "\n");
3389 830 : }
3390 :
3391 : /*
3392 : * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3393 : * for updating state if appropriate. If user is NULL or an empty string,
3394 : * the specification DEFAULT will be used.
3395 : */
3396 : static void
3397 2 : _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3398 : {
3399 2 : PQExpBuffer cmd = createPQExpBuffer();
3400 :
3401 2 : appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3402 :
3403 : /*
3404 : * SQL requires a string literal here. Might as well be correct.
3405 : */
3406 2 : if (user && *user)
3407 2 : appendStringLiteralAHX(cmd, user, AH);
3408 : else
3409 0 : appendPQExpBufferStr(cmd, "DEFAULT");
3410 2 : appendPQExpBufferChar(cmd, ';');
3411 :
3412 2 : if (RestoringToDB(AH))
3413 : {
3414 : PGresult *res;
3415 :
3416 0 : res = PQexec(AH->connection, cmd->data);
3417 :
3418 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3419 : /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3420 0 : pg_fatal("could not set session user to \"%s\": %s",
3421 : user, PQerrorMessage(AH->connection));
3422 :
3423 0 : PQclear(res);
3424 : }
3425 : else
3426 2 : ahprintf(AH, "%s\n\n", cmd->data);
3427 :
3428 2 : destroyPQExpBuffer(cmd);
3429 2 : }
3430 :
3431 :
3432 : /*
3433 : * Issue the commands to connect to the specified database.
3434 : *
3435 : * If we're currently restoring right into a database, this will
3436 : * actually establish a connection. Otherwise it puts a \connect into
3437 : * the script output.
3438 : */
3439 : static void
3440 310 : _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3441 : {
3442 310 : if (RestoringToDB(AH))
3443 102 : ReconnectToServer(AH, dbname);
3444 : else
3445 : {
3446 : PQExpBufferData connectbuf;
3447 :
3448 208 : initPQExpBuffer(&connectbuf);
3449 208 : appendPsqlMetaConnect(&connectbuf, dbname);
3450 208 : ahprintf(AH, "%s\n", connectbuf.data);
3451 208 : termPQExpBuffer(&connectbuf);
3452 : }
3453 :
3454 : /*
3455 : * NOTE: currUser keeps track of what the imaginary session user in our
3456 : * script is. It's now effectively reset to the original userID.
3457 : */
3458 310 : free(AH->currUser);
3459 310 : AH->currUser = NULL;
3460 :
3461 : /* don't assume we still know the output schema, tablespace, etc either */
3462 310 : free(AH->currSchema);
3463 310 : AH->currSchema = NULL;
3464 :
3465 310 : free(AH->currTableAm);
3466 310 : AH->currTableAm = NULL;
3467 :
3468 310 : free(AH->currTablespace);
3469 310 : AH->currTablespace = NULL;
3470 :
3471 : /* re-establish fixed state */
3472 310 : _doSetFixedOutputState(AH);
3473 310 : }
3474 :
3475 : /*
3476 : * Become the specified user, and update state to avoid redundant commands
3477 : *
3478 : * NULL or empty argument is taken to mean restoring the session default
3479 : */
3480 : static void
3481 128 : _becomeUser(ArchiveHandle *AH, const char *user)
3482 : {
3483 128 : if (!user)
3484 0 : user = ""; /* avoid null pointers */
3485 :
3486 128 : if (AH->currUser && strcmp(AH->currUser, user) == 0)
3487 126 : return; /* no need to do anything */
3488 :
3489 2 : _doSetSessionAuth(AH, user);
3490 :
3491 : /*
3492 : * NOTE: currUser keeps track of what the imaginary session user in our
3493 : * script is
3494 : */
3495 2 : free(AH->currUser);
3496 2 : AH->currUser = pg_strdup(user);
3497 : }
3498 :
3499 : /*
3500 : * Become the owner of the given TOC entry object. If
3501 : * changes in ownership are not allowed, this doesn't do anything.
3502 : */
3503 : static void
3504 121544 : _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3505 : {
3506 121544 : RestoreOptions *ropt = AH->public.ropt;
3507 :
3508 121544 : if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3509 121544 : return;
3510 :
3511 0 : _becomeUser(AH, te->owner);
3512 : }
3513 :
3514 :
3515 : /*
3516 : * Issue the commands to select the specified schema as the current schema
3517 : * in the target database.
3518 : */
3519 : static void
3520 121698 : _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3521 : {
3522 : PQExpBuffer qry;
3523 :
3524 : /*
3525 : * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3526 : * that search_path rather than switching to entry-specific paths.
3527 : * Otherwise, it's an old archive that will not restore correctly unless
3528 : * we set the search_path as it's expecting.
3529 : */
3530 121698 : if (AH->public.searchpath)
3531 121698 : return;
3532 :
3533 0 : if (!schemaName || *schemaName == '\0' ||
3534 0 : (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3535 0 : return; /* no need to do anything */
3536 :
3537 0 : qry = createPQExpBuffer();
3538 :
3539 0 : appendPQExpBuffer(qry, "SET search_path = %s",
3540 : fmtId(schemaName));
3541 0 : if (strcmp(schemaName, "pg_catalog") != 0)
3542 0 : appendPQExpBufferStr(qry, ", pg_catalog");
3543 :
3544 0 : if (RestoringToDB(AH))
3545 : {
3546 : PGresult *res;
3547 :
3548 0 : res = PQexec(AH->connection, qry->data);
3549 :
3550 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3551 0 : warn_or_exit_horribly(AH,
3552 : "could not set \"search_path\" to \"%s\": %s",
3553 0 : schemaName, PQerrorMessage(AH->connection));
3554 :
3555 0 : PQclear(res);
3556 : }
3557 : else
3558 0 : ahprintf(AH, "%s;\n\n", qry->data);
3559 :
3560 0 : free(AH->currSchema);
3561 0 : AH->currSchema = pg_strdup(schemaName);
3562 :
3563 0 : destroyPQExpBuffer(qry);
3564 : }
3565 :
3566 : /*
3567 : * Issue the commands to select the specified tablespace as the current one
3568 : * in the target database.
3569 : */
3570 : static void
3571 108932 : _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3572 : {
3573 108932 : RestoreOptions *ropt = AH->public.ropt;
3574 : PQExpBuffer qry;
3575 : const char *want,
3576 : *have;
3577 :
3578 : /* do nothing in --no-tablespaces mode */
3579 108932 : if (ropt->noTablespace)
3580 0 : return;
3581 :
3582 108932 : have = AH->currTablespace;
3583 108932 : want = tablespace;
3584 :
3585 : /* no need to do anything for non-tablespace object */
3586 108932 : if (!want)
3587 86672 : return;
3588 :
3589 22260 : if (have && strcmp(want, have) == 0)
3590 21744 : return; /* no need to do anything */
3591 :
3592 516 : qry = createPQExpBuffer();
3593 :
3594 516 : if (strcmp(want, "") == 0)
3595 : {
3596 : /* We want the tablespace to be the database's default */
3597 420 : appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3598 : }
3599 : else
3600 : {
3601 : /* We want an explicit tablespace */
3602 96 : appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3603 : }
3604 :
3605 516 : if (RestoringToDB(AH))
3606 : {
3607 : PGresult *res;
3608 :
3609 46 : res = PQexec(AH->connection, qry->data);
3610 :
3611 46 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3612 0 : warn_or_exit_horribly(AH,
3613 : "could not set \"default_tablespace\" to %s: %s",
3614 0 : fmtId(want), PQerrorMessage(AH->connection));
3615 :
3616 46 : PQclear(res);
3617 : }
3618 : else
3619 470 : ahprintf(AH, "%s;\n\n", qry->data);
3620 :
3621 516 : free(AH->currTablespace);
3622 516 : AH->currTablespace = pg_strdup(want);
3623 :
3624 516 : destroyPQExpBuffer(qry);
3625 : }
3626 :
3627 : /*
3628 : * Set the proper default_table_access_method value for the table.
3629 : */
3630 : static void
3631 107322 : _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3632 : {
3633 107322 : RestoreOptions *ropt = AH->public.ropt;
3634 : PQExpBuffer cmd;
3635 : const char *want,
3636 : *have;
3637 :
3638 : /* do nothing in --no-table-access-method mode */
3639 107322 : if (ropt->noTableAm)
3640 698 : return;
3641 :
3642 106624 : have = AH->currTableAm;
3643 106624 : want = tableam;
3644 :
3645 106624 : if (!want)
3646 92722 : return;
3647 :
3648 13902 : if (have && strcmp(want, have) == 0)
3649 13126 : return;
3650 :
3651 776 : cmd = createPQExpBuffer();
3652 776 : appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3653 :
3654 776 : if (RestoringToDB(AH))
3655 : {
3656 : PGresult *res;
3657 :
3658 48 : res = PQexec(AH->connection, cmd->data);
3659 :
3660 48 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3661 0 : warn_or_exit_horribly(AH,
3662 : "could not set \"default_table_access_method\": %s",
3663 0 : PQerrorMessage(AH->connection));
3664 :
3665 48 : PQclear(res);
3666 : }
3667 : else
3668 728 : ahprintf(AH, "%s\n\n", cmd->data);
3669 :
3670 776 : destroyPQExpBuffer(cmd);
3671 :
3672 776 : free(AH->currTableAm);
3673 776 : AH->currTableAm = pg_strdup(want);
3674 : }
3675 :
3676 : /*
3677 : * Set the proper default table access method for a table without storage.
3678 : * Currently, this is required only for partitioned tables with a table AM.
3679 : */
3680 : static void
3681 1610 : _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
3682 : {
3683 1610 : RestoreOptions *ropt = AH->public.ropt;
3684 1610 : const char *tableam = te->tableam;
3685 : PQExpBuffer cmd;
3686 :
3687 : /* do nothing in --no-table-access-method mode */
3688 1610 : if (ropt->noTableAm)
3689 6 : return;
3690 :
3691 1604 : if (!tableam)
3692 1536 : return;
3693 :
3694 : Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3695 :
3696 68 : cmd = createPQExpBuffer();
3697 :
3698 68 : appendPQExpBufferStr(cmd, "ALTER TABLE ");
3699 68 : appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3700 68 : appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3701 : fmtId(tableam));
3702 :
3703 68 : if (RestoringToDB(AH))
3704 : {
3705 : PGresult *res;
3706 :
3707 0 : res = PQexec(AH->connection, cmd->data);
3708 :
3709 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3710 0 : warn_or_exit_horribly(AH,
3711 : "could not alter table access method: %s",
3712 0 : PQerrorMessage(AH->connection));
3713 0 : PQclear(res);
3714 : }
3715 : else
3716 68 : ahprintf(AH, "%s\n\n", cmd->data);
3717 :
3718 68 : destroyPQExpBuffer(cmd);
3719 : }
3720 :
3721 : /*
3722 : * Extract an object description for a TOC entry, and append it to buf.
3723 : *
3724 : * This is used for ALTER ... OWNER TO.
3725 : *
3726 : * If the object type has no owner, do nothing.
3727 : */
3728 : static void
3729 50834 : _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
3730 : {
3731 50834 : const char *type = te->desc;
3732 :
3733 : /* objects that don't require special decoration */
3734 50834 : if (strcmp(type, "COLLATION") == 0 ||
3735 45896 : strcmp(type, "CONVERSION") == 0 ||
3736 45060 : strcmp(type, "DOMAIN") == 0 ||
3737 44624 : strcmp(type, "FOREIGN TABLE") == 0 ||
3738 44552 : strcmp(type, "MATERIALIZED VIEW") == 0 ||
3739 43744 : strcmp(type, "SEQUENCE") == 0 ||
3740 43070 : strcmp(type, "STATISTICS") == 0 ||
3741 42804 : strcmp(type, "TABLE") == 0 ||
3742 28110 : strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3743 27718 : strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3744 27406 : strcmp(type, "TYPE") == 0 ||
3745 25992 : strcmp(type, "VIEW") == 0 ||
3746 : /* non-schema-specified objects */
3747 24644 : strcmp(type, "DATABASE") == 0 ||
3748 24408 : strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3749 24342 : strcmp(type, "SCHEMA") == 0 ||
3750 23918 : strcmp(type, "EVENT TRIGGER") == 0 ||
3751 23836 : strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3752 23738 : strcmp(type, "SERVER") == 0 ||
3753 23636 : strcmp(type, "PUBLICATION") == 0 ||
3754 23304 : strcmp(type, "SUBSCRIPTION") == 0)
3755 : {
3756 27732 : appendPQExpBuffer(buf, "%s ", type);
3757 27732 : if (te->namespace && *te->namespace)
3758 26190 : appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3759 27732 : appendPQExpBufferStr(buf, fmtId(te->tag));
3760 : }
3761 : /* LOs just have a name, but it's numeric so must not use fmtId */
3762 23102 : else if (strcmp(type, "BLOB") == 0)
3763 : {
3764 0 : appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3765 : }
3766 :
3767 : /*
3768 : * These object types require additional decoration. Fortunately, the
3769 : * information needed is exactly what's in the DROP command.
3770 : */
3771 23102 : else if (strcmp(type, "AGGREGATE") == 0 ||
3772 22280 : strcmp(type, "FUNCTION") == 0 ||
3773 17522 : strcmp(type, "OPERATOR") == 0 ||
3774 12484 : strcmp(type, "OPERATOR CLASS") == 0 ||
3775 11170 : strcmp(type, "OPERATOR FAMILY") == 0 ||
3776 10046 : strcmp(type, "PROCEDURE") == 0)
3777 : {
3778 : /* Chop "DROP " off the front and make a modifiable copy */
3779 13316 : char *first = pg_strdup(te->dropStmt + 5);
3780 : char *last;
3781 :
3782 : /* point to last character in string */
3783 13316 : last = first + strlen(first) - 1;
3784 :
3785 : /* Strip off any ';' or '\n' at the end */
3786 39948 : while (last >= first && (*last == '\n' || *last == ';'))
3787 26632 : last--;
3788 13316 : *(last + 1) = '\0';
3789 :
3790 13316 : appendPQExpBufferStr(buf, first);
3791 :
3792 13316 : free(first);
3793 13316 : return;
3794 : }
3795 : /* these object types don't have separate owners */
3796 9786 : else if (strcmp(type, "CAST") == 0 ||
3797 9786 : strcmp(type, "CHECK CONSTRAINT") == 0 ||
3798 9642 : strcmp(type, "CONSTRAINT") == 0 ||
3799 5856 : strcmp(type, "DATABASE PROPERTIES") == 0 ||
3800 5824 : strcmp(type, "DEFAULT") == 0 ||
3801 5396 : strcmp(type, "FK CONSTRAINT") == 0 ||
3802 4956 : strcmp(type, "INDEX") == 0 ||
3803 2428 : strcmp(type, "RULE") == 0 ||
3804 1760 : strcmp(type, "TRIGGER") == 0 ||
3805 606 : strcmp(type, "ROW SECURITY") == 0 ||
3806 606 : strcmp(type, "POLICY") == 0 ||
3807 66 : strcmp(type, "USER MAPPING") == 0)
3808 : {
3809 : /* do nothing */
3810 : }
3811 : else
3812 0 : pg_fatal("don't know how to set owner for object type \"%s\"", type);
3813 : }
3814 :
3815 : /*
3816 : * Emit the SQL commands to create the object represented by a TOC entry
3817 : *
3818 : * This now also includes issuing an ALTER OWNER command to restore the
3819 : * object's ownership, if wanted. But note that the object's permissions
3820 : * will remain at default, until the matching ACL TOC entry is restored.
3821 : */
3822 : static void
3823 108932 : _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx)
3824 : {
3825 108932 : RestoreOptions *ropt = AH->public.ropt;
3826 :
3827 : /*
3828 : * Select owner, schema, tablespace and default AM as necessary. The
3829 : * default access method for partitioned tables is handled after
3830 : * generating the object definition, as it requires an ALTER command
3831 : * rather than SET.
3832 : */
3833 108932 : _becomeOwner(AH, te);
3834 108932 : _selectOutputSchema(AH, te->namespace);
3835 108932 : _selectTablespace(AH, te->tablespace);
3836 108932 : if (te->relkind != RELKIND_PARTITIONED_TABLE)
3837 107322 : _selectTableAccessMethod(AH, te->tableam);
3838 :
3839 : /* Emit header comment for item */
3840 108932 : if (!AH->noTocComments)
3841 : {
3842 : char *sanitized_name;
3843 : char *sanitized_schema;
3844 : char *sanitized_owner;
3845 :
3846 94890 : ahprintf(AH, "--\n");
3847 94890 : if (AH->public.verbose)
3848 : {
3849 2130 : ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3850 : te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3851 2130 : if (te->nDeps > 0)
3852 : {
3853 : int i;
3854 :
3855 1398 : ahprintf(AH, "-- Dependencies:");
3856 3846 : for (i = 0; i < te->nDeps; i++)
3857 2448 : ahprintf(AH, " %d", te->dependencies[i]);
3858 1398 : ahprintf(AH, "\n");
3859 : }
3860 : }
3861 :
3862 94890 : sanitized_name = sanitize_line(te->tag, false);
3863 94890 : sanitized_schema = sanitize_line(te->namespace, true);
3864 94890 : sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3865 :
3866 94890 : ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3867 : pfx, sanitized_name, te->desc, sanitized_schema,
3868 : sanitized_owner);
3869 :
3870 94890 : free(sanitized_name);
3871 94890 : free(sanitized_schema);
3872 94890 : free(sanitized_owner);
3873 :
3874 94890 : if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3875 : {
3876 : char *sanitized_tablespace;
3877 :
3878 204 : sanitized_tablespace = sanitize_line(te->tablespace, false);
3879 204 : ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3880 204 : free(sanitized_tablespace);
3881 : }
3882 94890 : ahprintf(AH, "\n");
3883 :
3884 94890 : if (AH->PrintExtraTocPtr != NULL)
3885 8352 : AH->PrintExtraTocPtr(AH, te);
3886 94890 : ahprintf(AH, "--\n\n");
3887 : }
3888 :
3889 : /*
3890 : * Actually print the definition. Normally we can just print the defn
3891 : * string if any, but we have four special cases:
3892 : *
3893 : * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3894 : * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3895 : * "public" that is a comment. We have to do this when --no-owner mode is
3896 : * selected. This is ugly, but I see no other good way ...
3897 : *
3898 : * 2. BLOB METADATA entries need special processing since their defn
3899 : * strings are just lists of OIDs, not complete SQL commands.
3900 : *
3901 : * 3. ACL LARGE OBJECTS entries need special processing because they
3902 : * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3903 : * apply to each large object listed in the associated BLOB METADATA.
3904 : *
3905 : * 4. Entries with a defnDumper need to call it to generate the
3906 : * definition. This is primarily intended to provide a way to save memory
3907 : * for objects that would otherwise need a lot of it (e.g., statistics
3908 : * data).
3909 : */
3910 108932 : if (ropt->noOwner &&
3911 744 : strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3912 : {
3913 4 : ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3914 : }
3915 108928 : else if (strcmp(te->desc, "BLOB METADATA") == 0)
3916 : {
3917 160 : IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3918 : }
3919 108768 : else if (strcmp(te->desc, "ACL") == 0 &&
3920 4032 : strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3921 : {
3922 0 : IssueACLPerBlob(AH, te);
3923 : }
3924 108768 : else if (te->defnLen && AH->format != archTar)
3925 : {
3926 : /*
3927 : * If defnLen is set, the defnDumper has already been called for this
3928 : * TOC entry. We don't normally expect a defnDumper to be called for
3929 : * a TOC entry a second time in _printTocEntry(), but there's an
3930 : * exception. The tar format first calls WriteToc(), which scans the
3931 : * entire TOC, and then it later calls RestoreArchive() to generate
3932 : * restore.sql, which scans the TOC again. There doesn't appear to be
3933 : * a good way to prevent a second defnDumper call in this case without
3934 : * storing the definition in memory, which defeats the purpose. This
3935 : * second defnDumper invocation should generate the same output as the
3936 : * first, but even if it doesn't, the worst-case scenario is that
3937 : * restore.sql might have different statistics data than the archive.
3938 : *
3939 : * In all other cases, encountering a TOC entry a second time in
3940 : * _printTocEntry() is unexpected, so we fail because one of our
3941 : * assumptions must no longer hold true.
3942 : *
3943 : * XXX This is a layering violation, but the alternative is an awkward
3944 : * and complicated callback infrastructure for this special case. This
3945 : * might be worth revisiting in the future.
3946 : */
3947 0 : pg_fatal("unexpected TOC entry in _printTocEntry(): %d %s %s",
3948 : te->dumpId, te->desc, te->tag);
3949 : }
3950 108768 : else if (te->defnDumper)
3951 : {
3952 12882 : char *defn = te->defnDumper((Archive *) AH, te->defnDumperArg, te);
3953 :
3954 12882 : te->defnLen = ahprintf(AH, "%s\n\n", defn);
3955 12882 : pg_free(defn);
3956 : }
3957 95886 : else if (te->defn && strlen(te->defn) > 0)
3958 : {
3959 83798 : ahprintf(AH, "%s\n\n", te->defn);
3960 :
3961 : /*
3962 : * If the defn string contains multiple SQL commands, txn_size mode
3963 : * should count it as N actions not one. But rather than build a full
3964 : * SQL parser, approximate this by counting semicolons. One case
3965 : * where that tends to be badly fooled is function definitions, so
3966 : * ignore them. (restore_toc_entry will count one action anyway.)
3967 : */
3968 83798 : if (ropt->txn_size > 0 &&
3969 6418 : strcmp(te->desc, "FUNCTION") != 0 &&
3970 5890 : strcmp(te->desc, "PROCEDURE") != 0)
3971 : {
3972 5866 : const char *p = te->defn;
3973 5866 : int nsemis = 0;
3974 :
3975 26170 : while ((p = strchr(p, ';')) != NULL)
3976 : {
3977 20304 : nsemis++;
3978 20304 : p++;
3979 : }
3980 5866 : if (nsemis > 1)
3981 2846 : AH->txnCount += nsemis - 1;
3982 : }
3983 : }
3984 :
3985 : /*
3986 : * If we aren't using SET SESSION AUTH to determine ownership, we must
3987 : * instead issue an ALTER OWNER command. Schema "public" is special; when
3988 : * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3989 : * when using SET SESSION for all other objects. We assume that anything
3990 : * without a DROP command is not a separately ownable object.
3991 : */
3992 108932 : if (!ropt->noOwner &&
3993 108188 : (!ropt->use_setsessauth ||
3994 0 : (strcmp(te->desc, "SCHEMA") == 0 &&
3995 0 : strncmp(te->defn, "--", 2) == 0)) &&
3996 108188 : te->owner && strlen(te->owner) > 0 &&
3997 89078 : te->dropStmt && strlen(te->dropStmt) > 0)
3998 : {
3999 50990 : if (strcmp(te->desc, "BLOB METADATA") == 0)
4000 : {
4001 : /* BLOB METADATA needs special code to handle multiple LOs */
4002 156 : char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
4003 :
4004 156 : IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
4005 156 : pg_free(cmdEnd);
4006 : }
4007 : else
4008 : {
4009 : /* For all other cases, we can use _getObjectDescription */
4010 : PQExpBufferData temp;
4011 :
4012 50834 : initPQExpBuffer(&temp);
4013 50834 : _getObjectDescription(&temp, te);
4014 :
4015 : /*
4016 : * If _getObjectDescription() didn't fill the buffer, then there
4017 : * is no owner.
4018 : */
4019 50834 : if (temp.data[0])
4020 41048 : ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
4021 41048 : temp.data, fmtId(te->owner));
4022 50834 : termPQExpBuffer(&temp);
4023 : }
4024 : }
4025 :
4026 : /*
4027 : * Select a partitioned table's default AM, once the table definition has
4028 : * been generated.
4029 : */
4030 108932 : if (te->relkind == RELKIND_PARTITIONED_TABLE)
4031 1610 : _printTableAccessMethodNoStorage(AH, te);
4032 :
4033 : /*
4034 : * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
4035 : * commands, so we can no longer assume we know the current auth setting.
4036 : */
4037 108932 : if (_tocEntryIsACL(te))
4038 : {
4039 4300 : free(AH->currUser);
4040 4300 : AH->currUser = NULL;
4041 : }
4042 108932 : }
4043 :
4044 : /*
4045 : * Sanitize a string to be included in an SQL comment or TOC listing, by
4046 : * replacing any newlines with spaces. This ensures each logical output line
4047 : * is in fact one physical output line, to prevent corruption of the dump
4048 : * (which could, in the worst case, present an SQL injection vulnerability
4049 : * if someone were to incautiously load a dump containing objects with
4050 : * maliciously crafted names).
4051 : *
4052 : * The result is a freshly malloc'd string. If the input string is NULL,
4053 : * return a malloc'ed empty string, unless want_hyphen, in which case return a
4054 : * malloc'ed hyphen.
4055 : *
4056 : * Note that we currently don't bother to quote names, meaning that the name
4057 : * fields aren't automatically parseable. "pg_restore -L" doesn't care because
4058 : * it only examines the dumpId field, but someday we might want to try harder.
4059 : */
4060 : static char *
4061 293330 : sanitize_line(const char *str, bool want_hyphen)
4062 : {
4063 : char *result;
4064 : char *s;
4065 :
4066 293330 : if (!str)
4067 19550 : return pg_strdup(want_hyphen ? "-" : "");
4068 :
4069 273780 : result = pg_strdup(str);
4070 :
4071 3460222 : for (s = result; *s != '\0'; s++)
4072 : {
4073 3186442 : if (*s == '\n' || *s == '\r')
4074 180 : *s = ' ';
4075 : }
4076 :
4077 273780 : return result;
4078 : }
4079 :
4080 : /*
4081 : * Write the file header for a custom-format archive
4082 : */
4083 : void
4084 206 : WriteHead(ArchiveHandle *AH)
4085 : {
4086 : struct tm crtm;
4087 :
4088 206 : AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
4089 206 : AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
4090 206 : AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
4091 206 : AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
4092 206 : AH->WriteBytePtr(AH, AH->intSize);
4093 206 : AH->WriteBytePtr(AH, AH->offSize);
4094 206 : AH->WriteBytePtr(AH, AH->format);
4095 206 : AH->WriteBytePtr(AH, AH->compression_spec.algorithm);
4096 206 : crtm = *localtime(&AH->createDate);
4097 206 : WriteInt(AH, crtm.tm_sec);
4098 206 : WriteInt(AH, crtm.tm_min);
4099 206 : WriteInt(AH, crtm.tm_hour);
4100 206 : WriteInt(AH, crtm.tm_mday);
4101 206 : WriteInt(AH, crtm.tm_mon);
4102 206 : WriteInt(AH, crtm.tm_year);
4103 206 : WriteInt(AH, crtm.tm_isdst);
4104 206 : WriteStr(AH, PQdb(AH->connection));
4105 206 : WriteStr(AH, AH->public.remoteVersionStr);
4106 206 : WriteStr(AH, PG_VERSION);
4107 206 : }
4108 :
4109 : void
4110 212 : ReadHead(ArchiveHandle *AH)
4111 : {
4112 : char *errmsg;
4113 : char vmaj,
4114 : vmin,
4115 : vrev;
4116 : int fmt;
4117 :
4118 : /*
4119 : * If we haven't already read the header, do so.
4120 : *
4121 : * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
4122 : * way to unify the cases?
4123 : */
4124 212 : if (!AH->readHeader)
4125 : {
4126 : char tmpMag[7];
4127 :
4128 212 : AH->ReadBufPtr(AH, tmpMag, 5);
4129 :
4130 212 : if (strncmp(tmpMag, "PGDMP", 5) != 0)
4131 0 : pg_fatal("did not find magic string in file header");
4132 : }
4133 :
4134 212 : vmaj = AH->ReadBytePtr(AH);
4135 212 : vmin = AH->ReadBytePtr(AH);
4136 :
4137 212 : if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4138 212 : vrev = AH->ReadBytePtr(AH);
4139 : else
4140 0 : vrev = 0;
4141 :
4142 212 : AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
4143 :
4144 212 : if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4145 0 : pg_fatal("unsupported version (%d.%d) in file header",
4146 : vmaj, vmin);
4147 :
4148 212 : AH->intSize = AH->ReadBytePtr(AH);
4149 212 : if (AH->intSize > 32)
4150 0 : pg_fatal("sanity check on integer size (%lu) failed",
4151 : (unsigned long) AH->intSize);
4152 :
4153 212 : if (AH->intSize > sizeof(int))
4154 0 : pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4155 :
4156 212 : if (AH->version >= K_VERS_1_7)
4157 212 : AH->offSize = AH->ReadBytePtr(AH);
4158 : else
4159 0 : AH->offSize = AH->intSize;
4160 :
4161 212 : fmt = AH->ReadBytePtr(AH);
4162 :
4163 212 : if (AH->format != fmt)
4164 0 : pg_fatal("expected format (%d) differs from format found in file (%d)",
4165 : AH->format, fmt);
4166 :
4167 212 : if (AH->version >= K_VERS_1_15)
4168 212 : AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
4169 0 : else if (AH->version >= K_VERS_1_2)
4170 : {
4171 : /* Guess the compression method based on the level */
4172 0 : if (AH->version < K_VERS_1_4)
4173 0 : AH->compression_spec.level = AH->ReadBytePtr(AH);
4174 : else
4175 0 : AH->compression_spec.level = ReadInt(AH);
4176 :
4177 0 : if (AH->compression_spec.level != 0)
4178 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4179 : }
4180 : else
4181 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4182 :
4183 212 : errmsg = supports_compression(AH->compression_spec);
4184 212 : if (errmsg)
4185 : {
4186 0 : pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4187 : errmsg);
4188 0 : pg_free(errmsg);
4189 : }
4190 :
4191 212 : if (AH->version >= K_VERS_1_4)
4192 : {
4193 : struct tm crtm;
4194 :
4195 212 : crtm.tm_sec = ReadInt(AH);
4196 212 : crtm.tm_min = ReadInt(AH);
4197 212 : crtm.tm_hour = ReadInt(AH);
4198 212 : crtm.tm_mday = ReadInt(AH);
4199 212 : crtm.tm_mon = ReadInt(AH);
4200 212 : crtm.tm_year = ReadInt(AH);
4201 212 : crtm.tm_isdst = ReadInt(AH);
4202 :
4203 : /*
4204 : * Newer versions of glibc have mktime() report failure if tm_isdst is
4205 : * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4206 : * TZ=UTC. This is problematic when restoring an archive under a
4207 : * different timezone setting. If we get a failure, try again with
4208 : * tm_isdst set to -1 ("don't know").
4209 : *
4210 : * XXX with or without this hack, we reconstruct createDate
4211 : * incorrectly when the prevailing timezone is different from
4212 : * pg_dump's. Next time we bump the archive version, we should flush
4213 : * this representation and store a plain seconds-since-the-Epoch
4214 : * timestamp instead.
4215 : */
4216 212 : AH->createDate = mktime(&crtm);
4217 212 : if (AH->createDate == (time_t) -1)
4218 : {
4219 0 : crtm.tm_isdst = -1;
4220 0 : AH->createDate = mktime(&crtm);
4221 0 : if (AH->createDate == (time_t) -1)
4222 0 : pg_log_warning("invalid creation date in header");
4223 : }
4224 : }
4225 :
4226 212 : if (AH->version >= K_VERS_1_4)
4227 : {
4228 212 : AH->archdbname = ReadStr(AH);
4229 : }
4230 :
4231 212 : if (AH->version >= K_VERS_1_10)
4232 : {
4233 212 : AH->archiveRemoteVersion = ReadStr(AH);
4234 212 : AH->archiveDumpVersion = ReadStr(AH);
4235 : }
4236 212 : }
4237 :
4238 :
4239 : /*
4240 : * checkSeek
4241 : * check to see if ftell/fseek can be performed.
4242 : */
4243 : bool
4244 222 : checkSeek(FILE *fp)
4245 : {
4246 : pgoff_t tpos;
4247 :
4248 : /* Check that ftello works on this file */
4249 222 : tpos = ftello(fp);
4250 222 : if (tpos < 0)
4251 2 : return false;
4252 :
4253 : /*
4254 : * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4255 : * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4256 : * successful no-op even on files that are otherwise unseekable.
4257 : */
4258 220 : if (fseeko(fp, tpos, SEEK_SET) != 0)
4259 0 : return false;
4260 :
4261 220 : return true;
4262 : }
4263 :
4264 :
4265 : /*
4266 : * dumpTimestamp
4267 : */
4268 : static void
4269 140 : dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4270 : {
4271 : char buf[64];
4272 :
4273 140 : if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4274 140 : ahprintf(AH, "-- %s %s\n\n", msg, buf);
4275 140 : }
4276 :
4277 : /*
4278 : * Main engine for parallel restore.
4279 : *
4280 : * Parallel restore is done in three phases. In this first phase,
4281 : * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4282 : * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4283 : * PRE_DATA items other than ACLs.) Entries we can't process now are
4284 : * added to the pending_list for later phases to deal with.
4285 : */
4286 : static void
4287 10 : restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
4288 : {
4289 : bool skipped_some;
4290 : TocEntry *next_work_item;
4291 :
4292 10 : pg_log_debug("entering restore_toc_entries_prefork");
4293 :
4294 : /* Adjust dependency information */
4295 10 : fix_dependencies(AH);
4296 :
4297 : /*
4298 : * Do all the early stuff in a single connection in the parent. There's no
4299 : * great point in running it in parallel, in fact it will actually run
4300 : * faster in a single connection because we avoid all the connection and
4301 : * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4302 : * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4303 : * not risk trying to process them out-of-order.
4304 : *
4305 : * Stuff that we can't do immediately gets added to the pending_list.
4306 : * Note: we don't yet filter out entries that aren't going to be restored.
4307 : * They might participate in dependency chains connecting entries that
4308 : * should be restored, so we treat them as live until we actually process
4309 : * them.
4310 : *
4311 : * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4312 : * before DATA items, and all DATA items before POST_DATA items. That is
4313 : * not certain to be true in older archives, though, and in any case use
4314 : * of a list file would destroy that ordering (cf. SortTocFromFile). So
4315 : * this loop cannot assume that it holds.
4316 : */
4317 10 : AH->restorePass = RESTORE_PASS_MAIN;
4318 10 : skipped_some = false;
4319 7668 : for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4320 : {
4321 7658 : bool do_now = true;
4322 :
4323 7658 : if (next_work_item->section != SECTION_PRE_DATA)
4324 : {
4325 : /* DATA and POST_DATA items are just ignored for now */
4326 4548 : if (next_work_item->section == SECTION_DATA ||
4327 1696 : next_work_item->section == SECTION_POST_DATA)
4328 : {
4329 4498 : do_now = false;
4330 4498 : skipped_some = true;
4331 : }
4332 : else
4333 : {
4334 : /*
4335 : * SECTION_NONE items, such as comments, can be processed now
4336 : * if we are still in the PRE_DATA part of the archive. Once
4337 : * we've skipped any items, we have to consider whether the
4338 : * comment's dependencies are satisfied, so skip it for now.
4339 : */
4340 50 : if (skipped_some)
4341 14 : do_now = false;
4342 : }
4343 : }
4344 :
4345 : /*
4346 : * Also skip items that need to be forced into later passes. We need
4347 : * not set skipped_some in this case, since by assumption no main-pass
4348 : * items could depend on these.
4349 : */
4350 7658 : if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4351 652 : do_now = false;
4352 :
4353 7658 : if (do_now)
4354 : {
4355 : /* OK, restore the item and update its dependencies */
4356 3120 : pg_log_info("processing item %d %s %s",
4357 : next_work_item->dumpId,
4358 : next_work_item->desc, next_work_item->tag);
4359 :
4360 3120 : (void) restore_toc_entry(AH, next_work_item, false);
4361 :
4362 : /* Reduce dependencies, but don't move anything to ready_heap */
4363 3120 : reduce_dependencies(AH, next_work_item, NULL);
4364 : }
4365 : else
4366 : {
4367 : /* Nope, so add it to pending_list */
4368 4538 : pending_list_append(pending_list, next_work_item);
4369 : }
4370 : }
4371 :
4372 : /*
4373 : * In --transaction-size mode, we must commit the open transaction before
4374 : * dropping the database connection. This also ensures that child workers
4375 : * can see the objects we've created so far.
4376 : */
4377 10 : if (AH->public.ropt->txn_size > 0)
4378 0 : CommitTransaction(&AH->public);
4379 :
4380 : /*
4381 : * Now close parent connection in prep for parallel steps. We do this
4382 : * mainly to ensure that we don't exceed the specified number of parallel
4383 : * connections.
4384 : */
4385 10 : DisconnectDatabase(&AH->public);
4386 :
4387 : /* blow away any transient state from the old connection */
4388 10 : free(AH->currUser);
4389 10 : AH->currUser = NULL;
4390 10 : free(AH->currSchema);
4391 10 : AH->currSchema = NULL;
4392 10 : free(AH->currTablespace);
4393 10 : AH->currTablespace = NULL;
4394 10 : free(AH->currTableAm);
4395 10 : AH->currTableAm = NULL;
4396 10 : }
4397 :
4398 : /*
4399 : * Main engine for parallel restore.
4400 : *
4401 : * Parallel restore is done in three phases. In this second phase,
4402 : * we process entries by dispatching them to parallel worker children
4403 : * (processes on Unix, threads on Windows), each of which connects
4404 : * separately to the database. Inter-entry dependencies are respected,
4405 : * and so is the RestorePass multi-pass structure. When we can no longer
4406 : * make any entries ready to process, we exit. Normally, there will be
4407 : * nothing left to do; but if there is, the third phase will mop up.
4408 : */
4409 : static void
4410 10 : restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4411 : TocEntry *pending_list)
4412 : {
4413 : binaryheap *ready_heap;
4414 : TocEntry *next_work_item;
4415 :
4416 10 : pg_log_debug("entering restore_toc_entries_parallel");
4417 :
4418 : /* Set up ready_heap with enough room for all known TocEntrys */
4419 10 : ready_heap = binaryheap_allocate(AH->tocCount,
4420 : TocEntrySizeCompareBinaryheap,
4421 : NULL);
4422 :
4423 : /*
4424 : * The pending_list contains all items that we need to restore. Move all
4425 : * items that are available to process immediately into the ready_heap.
4426 : * After this setup, the pending list is everything that needs to be done
4427 : * but is blocked by one or more dependencies, while the ready heap
4428 : * contains items that have no remaining dependencies and are OK to
4429 : * process in the current restore pass.
4430 : */
4431 10 : AH->restorePass = RESTORE_PASS_MAIN;
4432 10 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4433 :
4434 : /*
4435 : * main parent loop
4436 : *
4437 : * Keep going until there is no worker still running AND there is no work
4438 : * left to be done. Note invariant: at top of loop, there should always
4439 : * be at least one worker available to dispatch a job to.
4440 : */
4441 10 : pg_log_info("entering main parallel loop");
4442 :
4443 : for (;;)
4444 : {
4445 : /* Look for an item ready to be dispatched to a worker */
4446 4600 : next_work_item = pop_next_work_item(ready_heap, pstate);
4447 4600 : if (next_work_item != NULL)
4448 : {
4449 : /* If not to be restored, don't waste time launching a worker */
4450 4538 : if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA | REQ_STATS)) == 0)
4451 : {
4452 0 : pg_log_info("skipping item %d %s %s",
4453 : next_work_item->dumpId,
4454 : next_work_item->desc, next_work_item->tag);
4455 : /* Update its dependencies as though we'd completed it */
4456 0 : reduce_dependencies(AH, next_work_item, ready_heap);
4457 : /* Loop around to see if anything else can be dispatched */
4458 0 : continue;
4459 : }
4460 :
4461 4538 : pg_log_info("launching item %d %s %s",
4462 : next_work_item->dumpId,
4463 : next_work_item->desc, next_work_item->tag);
4464 :
4465 : /* Dispatch to some worker */
4466 4538 : DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4467 : mark_restore_job_done, ready_heap);
4468 : }
4469 62 : else if (IsEveryWorkerIdle(pstate))
4470 : {
4471 : /*
4472 : * Nothing is ready and no worker is running, so we're done with
4473 : * the current pass or maybe with the whole process.
4474 : */
4475 30 : if (AH->restorePass == RESTORE_PASS_LAST)
4476 10 : break; /* No more parallel processing is possible */
4477 :
4478 : /* Advance to next restore pass */
4479 20 : AH->restorePass++;
4480 : /* That probably allows some stuff to be made ready */
4481 20 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4482 : /* Loop around to see if anything's now ready */
4483 20 : continue;
4484 : }
4485 : else
4486 : {
4487 : /*
4488 : * We have nothing ready, but at least one child is working, so
4489 : * wait for some subjob to finish.
4490 : */
4491 : }
4492 :
4493 : /*
4494 : * Before dispatching another job, check to see if anything has
4495 : * finished. We should check every time through the loop so as to
4496 : * reduce dependencies as soon as possible. If we were unable to
4497 : * dispatch any job this time through, wait until some worker finishes
4498 : * (and, hopefully, unblocks some pending item). If we did dispatch
4499 : * something, continue as soon as there's at least one idle worker.
4500 : * Note that in either case, there's guaranteed to be at least one
4501 : * idle worker when we return to the top of the loop. This ensures we
4502 : * won't block inside DispatchJobForTocEntry, which would be
4503 : * undesirable: we'd rather postpone dispatching until we see what's
4504 : * been unblocked by finished jobs.
4505 : */
4506 4570 : WaitForWorkers(AH, pstate,
4507 : next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4508 : }
4509 :
4510 : /* There should now be nothing in ready_heap. */
4511 : Assert(binaryheap_empty(ready_heap));
4512 :
4513 10 : binaryheap_free(ready_heap);
4514 :
4515 10 : pg_log_info("finished main parallel loop");
4516 10 : }
4517 :
4518 : /*
4519 : * Main engine for parallel restore.
4520 : *
4521 : * Parallel restore is done in three phases. In this third phase,
4522 : * we mop up any remaining TOC entries by processing them serially.
4523 : * This phase normally should have nothing to do, but if we've somehow
4524 : * gotten stuck due to circular dependencies or some such, this provides
4525 : * at least some chance of completing the restore successfully.
4526 : */
4527 : static void
4528 10 : restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4529 : {
4530 10 : RestoreOptions *ropt = AH->public.ropt;
4531 : TocEntry *te;
4532 :
4533 10 : pg_log_debug("entering restore_toc_entries_postfork");
4534 :
4535 : /*
4536 : * Now reconnect the single parent connection.
4537 : */
4538 10 : ConnectDatabaseAhx((Archive *) AH, &ropt->cparams, true);
4539 :
4540 : /* re-establish fixed state */
4541 10 : _doSetFixedOutputState(AH);
4542 :
4543 : /*
4544 : * Make sure there is no work left due to, say, circular dependencies, or
4545 : * some other pathological condition. If so, do it in the single parent
4546 : * connection. We don't sweat about RestorePass ordering; it's likely we
4547 : * already violated that.
4548 : */
4549 10 : for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4550 : {
4551 0 : pg_log_info("processing missed item %d %s %s",
4552 : te->dumpId, te->desc, te->tag);
4553 0 : (void) restore_toc_entry(AH, te, false);
4554 : }
4555 10 : }
4556 :
4557 : /*
4558 : * Check if te1 has an exclusive lock requirement for an item that te2 also
4559 : * requires, whether or not te2's requirement is for an exclusive lock.
4560 : */
4561 : static bool
4562 9526 : has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4563 : {
4564 : int j,
4565 : k;
4566 :
4567 12912 : for (j = 0; j < te1->nLockDeps; j++)
4568 : {
4569 11782 : for (k = 0; k < te2->nDeps; k++)
4570 : {
4571 8396 : if (te1->lockDeps[j] == te2->dependencies[k])
4572 206 : return true;
4573 : }
4574 : }
4575 9320 : return false;
4576 : }
4577 :
4578 :
4579 : /*
4580 : * Initialize the header of the pending-items list.
4581 : *
4582 : * This is a circular list with a dummy TocEntry as header, just like the
4583 : * main TOC list; but we use separate list links so that an entry can be in
4584 : * the main TOC list as well as in the pending list.
4585 : */
4586 : static void
4587 10 : pending_list_header_init(TocEntry *l)
4588 : {
4589 10 : l->pending_prev = l->pending_next = l;
4590 10 : }
4591 :
4592 : /* Append te to the end of the pending-list headed by l */
4593 : static void
4594 4538 : pending_list_append(TocEntry *l, TocEntry *te)
4595 : {
4596 4538 : te->pending_prev = l->pending_prev;
4597 4538 : l->pending_prev->pending_next = te;
4598 4538 : l->pending_prev = te;
4599 4538 : te->pending_next = l;
4600 4538 : }
4601 :
4602 : /* Remove te from the pending-list */
4603 : static void
4604 4538 : pending_list_remove(TocEntry *te)
4605 : {
4606 4538 : te->pending_prev->pending_next = te->pending_next;
4607 4538 : te->pending_next->pending_prev = te->pending_prev;
4608 4538 : te->pending_prev = NULL;
4609 4538 : te->pending_next = NULL;
4610 4538 : }
4611 :
4612 :
4613 : /* qsort comparator for sorting TocEntries by dataLength */
4614 : static int
4615 99628 : TocEntrySizeCompareQsort(const void *p1, const void *p2)
4616 : {
4617 99628 : const TocEntry *te1 = *(const TocEntry *const *) p1;
4618 99628 : const TocEntry *te2 = *(const TocEntry *const *) p2;
4619 :
4620 : /* Sort by decreasing dataLength */
4621 99628 : if (te1->dataLength > te2->dataLength)
4622 15284 : return -1;
4623 84344 : if (te1->dataLength < te2->dataLength)
4624 25388 : return 1;
4625 :
4626 : /* For equal dataLengths, sort by dumpId, just to be stable */
4627 58956 : if (te1->dumpId < te2->dumpId)
4628 20116 : return -1;
4629 38840 : if (te1->dumpId > te2->dumpId)
4630 38816 : return 1;
4631 :
4632 24 : return 0;
4633 : }
4634 :
4635 : /* binaryheap comparator for sorting TocEntries by dataLength */
4636 : static int
4637 86650 : TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4638 : {
4639 : /* return opposite of qsort comparator for max-heap */
4640 86650 : return -TocEntrySizeCompareQsort(&p1, &p2);
4641 : }
4642 :
4643 :
4644 : /*
4645 : * Move all immediately-ready items from pending_list to ready_heap.
4646 : *
4647 : * Items are considered ready if they have no remaining dependencies and
4648 : * they belong in the current restore pass. (See also reduce_dependencies,
4649 : * which applies the same logic one-at-a-time.)
4650 : */
4651 : static void
4652 30 : move_to_ready_heap(TocEntry *pending_list,
4653 : binaryheap *ready_heap,
4654 : RestorePass pass)
4655 : {
4656 : TocEntry *te;
4657 : TocEntry *next_te;
4658 :
4659 5846 : for (te = pending_list->pending_next; te != pending_list; te = next_te)
4660 : {
4661 : /* must save list link before possibly removing te from list */
4662 5816 : next_te = te->pending_next;
4663 :
4664 9986 : if (te->depCount == 0 &&
4665 4170 : _tocEntryRestorePass(te) == pass)
4666 : {
4667 : /* Remove it from pending_list ... */
4668 3528 : pending_list_remove(te);
4669 : /* ... and add to ready_heap */
4670 3528 : binaryheap_add(ready_heap, te);
4671 : }
4672 : }
4673 30 : }
4674 :
4675 : /*
4676 : * Find the next work item (if any) that is capable of being run now,
4677 : * and remove it from the ready_heap.
4678 : *
4679 : * Returns the item, or NULL if nothing is runnable.
4680 : *
4681 : * To qualify, the item must have no remaining dependencies
4682 : * and no requirements for locks that are incompatible with
4683 : * items currently running. Items in the ready_heap are known to have
4684 : * no remaining dependencies, but we have to check for lock conflicts.
4685 : */
4686 : static TocEntry *
4687 4600 : pop_next_work_item(binaryheap *ready_heap,
4688 : ParallelState *pstate)
4689 : {
4690 : /*
4691 : * Search the ready_heap until we find a suitable item. Note that we do a
4692 : * sequential scan through the heap nodes, so even though we will first
4693 : * try to choose the highest-priority item, we might end up picking
4694 : * something with a much lower priority. However, we expect that we will
4695 : * typically be able to pick one of the first few items, which should
4696 : * usually have a relatively high priority.
4697 : */
4698 4806 : for (int i = 0; i < binaryheap_size(ready_heap); i++)
4699 : {
4700 4744 : TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4701 4744 : bool conflicts = false;
4702 :
4703 : /*
4704 : * Check to see if the item would need exclusive lock on something
4705 : * that a currently running item also needs lock on, or vice versa. If
4706 : * so, we don't want to schedule them together.
4707 : */
4708 14084 : for (int k = 0; k < pstate->numWorkers; k++)
4709 : {
4710 9546 : TocEntry *running_te = pstate->te[k];
4711 :
4712 9546 : if (running_te == NULL)
4713 4680 : continue;
4714 9526 : if (has_lock_conflicts(te, running_te) ||
4715 4660 : has_lock_conflicts(running_te, te))
4716 : {
4717 206 : conflicts = true;
4718 206 : break;
4719 : }
4720 : }
4721 :
4722 4744 : if (conflicts)
4723 206 : continue;
4724 :
4725 : /* passed all tests, so this item can run */
4726 4538 : binaryheap_remove_node(ready_heap, i);
4727 4538 : return te;
4728 : }
4729 :
4730 62 : pg_log_debug("no item ready");
4731 62 : return NULL;
4732 : }
4733 :
4734 :
4735 : /*
4736 : * Restore a single TOC item in parallel with others
4737 : *
4738 : * this is run in the worker, i.e. in a thread (Windows) or a separate process
4739 : * (everything else). A worker process executes several such work items during
4740 : * a parallel backup or restore. Once we terminate here and report back that
4741 : * our work is finished, the leader process will assign us a new work item.
4742 : */
4743 : int
4744 4538 : parallel_restore(ArchiveHandle *AH, TocEntry *te)
4745 : {
4746 : int status;
4747 :
4748 : Assert(AH->connection != NULL);
4749 :
4750 : /* Count only errors associated with this TOC entry */
4751 4538 : AH->public.n_errors = 0;
4752 :
4753 : /* Restore the TOC item */
4754 4538 : status = restore_toc_entry(AH, te, true);
4755 :
4756 4538 : return status;
4757 : }
4758 :
4759 :
4760 : /*
4761 : * Callback function that's invoked in the leader process after a step has
4762 : * been parallel restored.
4763 : *
4764 : * Update status and reduce the dependency count of any dependent items.
4765 : */
4766 : static void
4767 4538 : mark_restore_job_done(ArchiveHandle *AH,
4768 : TocEntry *te,
4769 : int status,
4770 : void *callback_data)
4771 : {
4772 4538 : binaryheap *ready_heap = (binaryheap *) callback_data;
4773 :
4774 4538 : pg_log_info("finished item %d %s %s",
4775 : te->dumpId, te->desc, te->tag);
4776 :
4777 4538 : if (status == WORKER_CREATE_DONE)
4778 0 : mark_create_done(AH, te);
4779 4538 : else if (status == WORKER_INHIBIT_DATA)
4780 : {
4781 0 : inhibit_data_for_failed_table(AH, te);
4782 0 : AH->public.n_errors++;
4783 : }
4784 4538 : else if (status == WORKER_IGNORED_ERRORS)
4785 0 : AH->public.n_errors++;
4786 4538 : else if (status != 0)
4787 0 : pg_fatal("worker process failed: exit code %d",
4788 : status);
4789 :
4790 4538 : reduce_dependencies(AH, te, ready_heap);
4791 4538 : }
4792 :
4793 :
4794 : /*
4795 : * Process the dependency information into a form useful for parallel restore.
4796 : *
4797 : * This function takes care of fixing up some missing or badly designed
4798 : * dependencies, and then prepares subsidiary data structures that will be
4799 : * used in the main parallel-restore logic, including:
4800 : * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4801 : * 2. We set up depCount fields that are the number of as-yet-unprocessed
4802 : * dependencies for each TOC entry.
4803 : *
4804 : * We also identify locking dependencies so that we can avoid trying to
4805 : * schedule conflicting items at the same time.
4806 : */
4807 : static void
4808 10 : fix_dependencies(ArchiveHandle *AH)
4809 : {
4810 : TocEntry *te;
4811 : int i;
4812 :
4813 : /*
4814 : * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4815 : * items are marked as not being in any parallel-processing list.
4816 : */
4817 7668 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4818 : {
4819 7658 : te->depCount = te->nDeps;
4820 7658 : te->revDeps = NULL;
4821 7658 : te->nRevDeps = 0;
4822 7658 : te->pending_prev = NULL;
4823 7658 : te->pending_next = NULL;
4824 : }
4825 :
4826 : /*
4827 : * POST_DATA items that are shown as depending on a table need to be
4828 : * re-pointed to depend on that table's data, instead. This ensures they
4829 : * won't get scheduled until the data has been loaded.
4830 : */
4831 10 : repoint_table_dependencies(AH);
4832 :
4833 : /*
4834 : * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4835 : * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4836 : * one BLOB COMMENTS in such files.)
4837 : */
4838 10 : if (AH->version < K_VERS_1_11)
4839 : {
4840 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4841 : {
4842 0 : if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4843 : {
4844 : TocEntry *te2;
4845 :
4846 0 : for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4847 : {
4848 0 : if (strcmp(te2->desc, "BLOBS") == 0)
4849 : {
4850 0 : te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4851 0 : te->dependencies[0] = te2->dumpId;
4852 0 : te->nDeps++;
4853 0 : te->depCount++;
4854 0 : break;
4855 : }
4856 : }
4857 0 : break;
4858 : }
4859 : }
4860 : }
4861 :
4862 : /*
4863 : * At this point we start to build the revDeps reverse-dependency arrays,
4864 : * so all changes of dependencies must be complete.
4865 : */
4866 :
4867 : /*
4868 : * Count the incoming dependencies for each item. Also, it is possible
4869 : * that the dependencies list items that are not in the archive at all
4870 : * (that should not happen in 9.2 and later, but is highly likely in older
4871 : * archives). Subtract such items from the depCounts.
4872 : */
4873 7668 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4874 : {
4875 19296 : for (i = 0; i < te->nDeps; i++)
4876 : {
4877 11638 : DumpId depid = te->dependencies[i];
4878 :
4879 11638 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4880 9244 : AH->tocsByDumpId[depid]->nRevDeps++;
4881 : else
4882 2394 : te->depCount--;
4883 : }
4884 : }
4885 :
4886 : /*
4887 : * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4888 : * it as a counter below.
4889 : */
4890 7668 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4891 : {
4892 7658 : if (te->nRevDeps > 0)
4893 3120 : te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4894 7658 : te->nRevDeps = 0;
4895 : }
4896 :
4897 : /*
4898 : * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4899 : * better agree with the loops above.
4900 : */
4901 7668 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4902 : {
4903 19296 : for (i = 0; i < te->nDeps; i++)
4904 : {
4905 11638 : DumpId depid = te->dependencies[i];
4906 :
4907 11638 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4908 : {
4909 9244 : TocEntry *otherte = AH->tocsByDumpId[depid];
4910 :
4911 9244 : otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4912 : }
4913 : }
4914 : }
4915 :
4916 : /*
4917 : * Lastly, work out the locking dependencies.
4918 : */
4919 7668 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4920 : {
4921 7658 : te->lockDeps = NULL;
4922 7658 : te->nLockDeps = 0;
4923 7658 : identify_locking_dependencies(AH, te);
4924 : }
4925 10 : }
4926 :
4927 : /*
4928 : * Change dependencies on table items to depend on table data items instead,
4929 : * but only in POST_DATA items.
4930 : *
4931 : * Also, for any item having such dependency(s), set its dataLength to the
4932 : * largest dataLength of the table data items it depends on. This ensures
4933 : * that parallel restore will prioritize larger jobs (index builds, FK
4934 : * constraint checks, etc) over smaller ones, avoiding situations where we
4935 : * end a restore with only one active job working on a large table.
4936 : */
4937 : static void
4938 10 : repoint_table_dependencies(ArchiveHandle *AH)
4939 : {
4940 : TocEntry *te;
4941 : int i;
4942 : DumpId olddep;
4943 :
4944 7668 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4945 : {
4946 7658 : if (te->section != SECTION_POST_DATA)
4947 6012 : continue;
4948 6306 : for (i = 0; i < te->nDeps; i++)
4949 : {
4950 4660 : olddep = te->dependencies[i];
4951 4660 : if (olddep <= AH->maxDumpId &&
4952 4660 : AH->tableDataId[olddep] != 0)
4953 : {
4954 1924 : DumpId tabledataid = AH->tableDataId[olddep];
4955 1924 : TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4956 :
4957 1924 : te->dependencies[i] = tabledataid;
4958 1924 : te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4959 1924 : pg_log_debug("transferring dependency %d -> %d to %d",
4960 : te->dumpId, olddep, tabledataid);
4961 : }
4962 : }
4963 : }
4964 10 : }
4965 :
4966 : /*
4967 : * Identify which objects we'll need exclusive lock on in order to restore
4968 : * the given TOC entry (*other* than the one identified by the TOC entry
4969 : * itself). Record their dump IDs in the entry's lockDeps[] array.
4970 : */
4971 : static void
4972 7658 : identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4973 : {
4974 : DumpId *lockids;
4975 : int nlockids;
4976 : int i;
4977 :
4978 : /*
4979 : * We only care about this for POST_DATA items. PRE_DATA items are not
4980 : * run in parallel, and DATA items are all independent by assumption.
4981 : */
4982 7658 : if (te->section != SECTION_POST_DATA)
4983 6012 : return;
4984 :
4985 : /* Quick exit if no dependencies at all */
4986 1646 : if (te->nDeps == 0)
4987 0 : return;
4988 :
4989 : /*
4990 : * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4991 : * and hence require exclusive lock. However, we know that CREATE INDEX
4992 : * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4993 : * category too ... but today is not that day.)
4994 : */
4995 1646 : if (strcmp(te->desc, "INDEX") == 0)
4996 294 : return;
4997 :
4998 : /*
4999 : * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
5000 : * item listed among its dependencies. Originally all of these would have
5001 : * been TABLE items, but repoint_table_dependencies would have repointed
5002 : * them to the TABLE DATA items if those are present (which they might not
5003 : * be, eg in a schema-only dump). Note that all of the entries we are
5004 : * processing here are POST_DATA; otherwise there might be a significant
5005 : * difference between a dependency on a table and a dependency on its
5006 : * data, so that closer analysis would be needed here.
5007 : */
5008 1352 : lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
5009 1352 : nlockids = 0;
5010 5442 : for (i = 0; i < te->nDeps; i++)
5011 : {
5012 4090 : DumpId depid = te->dependencies[i];
5013 :
5014 4090 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
5015 3168 : ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
5016 1764 : strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
5017 1728 : lockids[nlockids++] = depid;
5018 : }
5019 :
5020 1352 : if (nlockids == 0)
5021 : {
5022 626 : free(lockids);
5023 626 : return;
5024 : }
5025 :
5026 726 : te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
5027 726 : te->nLockDeps = nlockids;
5028 : }
5029 :
5030 : /*
5031 : * Remove the specified TOC entry from the depCounts of items that depend on
5032 : * it, thereby possibly making them ready-to-run. Any pending item that
5033 : * becomes ready should be moved to the ready_heap, if that's provided.
5034 : */
5035 : static void
5036 7658 : reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
5037 : binaryheap *ready_heap)
5038 : {
5039 : int i;
5040 :
5041 7658 : pg_log_debug("reducing dependencies for %d", te->dumpId);
5042 :
5043 16902 : for (i = 0; i < te->nRevDeps; i++)
5044 : {
5045 9244 : TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
5046 :
5047 : Assert(otherte->depCount > 0);
5048 9244 : otherte->depCount--;
5049 :
5050 : /*
5051 : * It's ready if it has no remaining dependencies, and it belongs in
5052 : * the current restore pass, and it is currently a member of the
5053 : * pending list (that check is needed to prevent double restore in
5054 : * some cases where a list-file forces out-of-order restoring).
5055 : * However, if ready_heap == NULL then caller doesn't want any list
5056 : * memberships changed.
5057 : */
5058 9244 : if (otherte->depCount == 0 &&
5059 6186 : _tocEntryRestorePass(otherte) == AH->restorePass &&
5060 5564 : otherte->pending_prev != NULL &&
5061 : ready_heap != NULL)
5062 : {
5063 : /* Remove it from pending list ... */
5064 1010 : pending_list_remove(otherte);
5065 : /* ... and add to ready_heap */
5066 1010 : binaryheap_add(ready_heap, otherte);
5067 : }
5068 : }
5069 7658 : }
5070 :
5071 : /*
5072 : * Set the created flag on the DATA member corresponding to the given
5073 : * TABLE member
5074 : */
5075 : static void
5076 14768 : mark_create_done(ArchiveHandle *AH, TocEntry *te)
5077 : {
5078 14768 : if (AH->tableDataId[te->dumpId] != 0)
5079 : {
5080 11650 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5081 :
5082 11650 : ted->created = true;
5083 : }
5084 14768 : }
5085 :
5086 : /*
5087 : * Mark the DATA member corresponding to the given TABLE member
5088 : * as not wanted
5089 : */
5090 : static void
5091 0 : inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
5092 : {
5093 0 : pg_log_info("table \"%s\" could not be created, will not restore its data",
5094 : te->tag);
5095 :
5096 0 : if (AH->tableDataId[te->dumpId] != 0)
5097 : {
5098 0 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
5099 :
5100 0 : ted->reqs = 0;
5101 : }
5102 0 : }
5103 :
5104 : /*
5105 : * Clone and de-clone routines used in parallel restoration.
5106 : *
5107 : * Enough of the structure is cloned to ensure that there is no
5108 : * conflict between different threads each with their own clone.
5109 : */
5110 : ArchiveHandle *
5111 60 : CloneArchive(ArchiveHandle *AH)
5112 : {
5113 : ArchiveHandle *clone;
5114 :
5115 : /* Make a "flat" copy */
5116 60 : clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
5117 60 : memcpy(clone, AH, sizeof(ArchiveHandle));
5118 :
5119 : /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
5120 60 : clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
5121 60 : memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
5122 :
5123 : /* Handle format-independent fields */
5124 60 : memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
5125 :
5126 : /* The clone will have its own connection, so disregard connection state */
5127 60 : clone->connection = NULL;
5128 60 : clone->connCancel = NULL;
5129 60 : clone->currUser = NULL;
5130 60 : clone->currSchema = NULL;
5131 60 : clone->currTableAm = NULL;
5132 60 : clone->currTablespace = NULL;
5133 :
5134 : /* savedPassword must be local in case we change it while connecting */
5135 60 : if (clone->savedPassword)
5136 0 : clone->savedPassword = pg_strdup(clone->savedPassword);
5137 :
5138 : /* clone has its own error count, too */
5139 60 : clone->public.n_errors = 0;
5140 :
5141 : /* clones should not share lo_buf */
5142 60 : clone->lo_buf = NULL;
5143 :
5144 : /*
5145 : * Clone connections disregard --transaction-size; they must commit after
5146 : * each command so that the results are immediately visible to other
5147 : * workers.
5148 : */
5149 60 : clone->public.ropt->txn_size = 0;
5150 :
5151 : /*
5152 : * Connect our new clone object to the database, using the same connection
5153 : * parameters used for the original connection.
5154 : */
5155 60 : ConnectDatabaseAhx((Archive *) clone, &clone->public.ropt->cparams, true);
5156 :
5157 : /* re-establish fixed state */
5158 60 : if (AH->mode == archModeRead)
5159 24 : _doSetFixedOutputState(clone);
5160 : /* in write case, setupDumpWorker will fix up connection state */
5161 :
5162 : /* Let the format-specific code have a chance too */
5163 60 : clone->ClonePtr(clone);
5164 :
5165 : Assert(clone->connection != NULL);
5166 60 : return clone;
5167 : }
5168 :
5169 : /*
5170 : * Release clone-local storage.
5171 : *
5172 : * Note: we assume any clone-local connection was already closed.
5173 : */
5174 : void
5175 60 : DeCloneArchive(ArchiveHandle *AH)
5176 : {
5177 : /* Should not have an open database connection */
5178 : Assert(AH->connection == NULL);
5179 :
5180 : /* Clear format-specific state */
5181 60 : AH->DeClonePtr(AH);
5182 :
5183 : /* Clear state allocated by CloneArchive */
5184 60 : if (AH->sqlparse.curCmd)
5185 6 : destroyPQExpBuffer(AH->sqlparse.curCmd);
5186 :
5187 : /* Clear any connection-local state */
5188 60 : free(AH->currUser);
5189 60 : free(AH->currSchema);
5190 60 : free(AH->currTablespace);
5191 60 : free(AH->currTableAm);
5192 60 : free(AH->savedPassword);
5193 :
5194 60 : free(AH);
5195 60 : }
|