Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_archiver.c
4 : *
5 : * Private implementation of the archiver routines.
6 : *
7 : * See the headers to pg_restore for more details.
8 : *
9 : * Copyright (c) 2000, Philip Warner
10 : * Rights are granted to use this software in any way so long
11 : * as this notice is not removed.
12 : *
13 : * The author is not responsible for loss or damages that may
14 : * result from its use.
15 : *
16 : *
17 : * IDENTIFICATION
18 : * src/bin/pg_dump/pg_backup_archiver.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres_fe.h"
23 :
24 : #include <ctype.h>
25 : #include <fcntl.h>
26 : #include <unistd.h>
27 : #include <sys/stat.h>
28 : #include <sys/wait.h>
29 : #ifdef WIN32
30 : #include <io.h>
31 : #endif
32 :
33 : #include "catalog/pg_class_d.h"
34 : #include "common/string.h"
35 : #include "compress_io.h"
36 : #include "dumputils.h"
37 : #include "fe_utils/string_utils.h"
38 : #include "lib/binaryheap.h"
39 : #include "lib/stringinfo.h"
40 : #include "libpq/libpq-fs.h"
41 : #include "parallel.h"
42 : #include "pg_backup_archiver.h"
43 : #include "pg_backup_db.h"
44 : #include "pg_backup_utils.h"
45 :
46 : #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
47 : #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
48 :
49 :
50 : static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
51 : const pg_compress_specification compression_spec,
52 : bool dosync, ArchiveMode mode,
53 : SetupWorkerPtrType setupWorkerPtr,
54 : DataDirSyncMethod sync_method);
55 : static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
56 : static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
57 : static char *sanitize_line(const char *str, bool want_hyphen);
58 : static void _doSetFixedOutputState(ArchiveHandle *AH);
59 : static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
60 : static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
61 : static void _becomeUser(ArchiveHandle *AH, const char *user);
62 : static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
63 : static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
64 : static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
65 : static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
66 : static void _printTableAccessMethodNoStorage(ArchiveHandle *AH,
67 : TocEntry *te);
68 : static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
69 : static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
70 : static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
71 : static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
72 : static RestorePass _tocEntryRestorePass(TocEntry *te);
73 : static bool _tocEntryIsACL(TocEntry *te);
74 : static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
75 : static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
76 : static bool is_load_via_partition_root(TocEntry *te);
77 : static void buildTocEntryArrays(ArchiveHandle *AH);
78 : static void _moveBefore(TocEntry *pos, TocEntry *te);
79 : static int _discoverArchiveFormat(ArchiveHandle *AH);
80 :
81 : static int RestoringToDB(ArchiveHandle *AH);
82 : static void dump_lo_buf(ArchiveHandle *AH);
83 : static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
84 : static void SetOutput(ArchiveHandle *AH, const char *filename,
85 : const pg_compress_specification compression_spec);
86 : static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
87 : static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
88 :
89 : static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
90 : static void restore_toc_entries_prefork(ArchiveHandle *AH,
91 : TocEntry *pending_list);
92 : static void restore_toc_entries_parallel(ArchiveHandle *AH,
93 : ParallelState *pstate,
94 : TocEntry *pending_list);
95 : static void restore_toc_entries_postfork(ArchiveHandle *AH,
96 : TocEntry *pending_list);
97 : static void pending_list_header_init(TocEntry *l);
98 : static void pending_list_append(TocEntry *l, TocEntry *te);
99 : static void pending_list_remove(TocEntry *te);
100 : static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
101 : static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
102 : static void move_to_ready_heap(TocEntry *pending_list,
103 : binaryheap *ready_heap,
104 : RestorePass pass);
105 : static TocEntry *pop_next_work_item(binaryheap *ready_heap,
106 : ParallelState *pstate);
107 : static void mark_dump_job_done(ArchiveHandle *AH,
108 : TocEntry *te,
109 : int status,
110 : void *callback_data);
111 : static void mark_restore_job_done(ArchiveHandle *AH,
112 : TocEntry *te,
113 : int status,
114 : void *callback_data);
115 : static void fix_dependencies(ArchiveHandle *AH);
116 : static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
117 : static void repoint_table_dependencies(ArchiveHandle *AH);
118 : static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
119 : static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
120 : binaryheap *ready_heap);
121 : static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
122 : static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
123 :
124 : static void StrictNamesCheck(RestoreOptions *ropt);
125 :
126 :
127 : /*
128 : * Allocate a new DumpOptions block containing all default values.
129 : */
130 : DumpOptions *
131 80 : NewDumpOptions(void)
132 : {
133 80 : DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
134 :
135 80 : InitDumpOptions(opts);
136 80 : return opts;
137 : }
138 :
139 : /*
140 : * Initialize a DumpOptions struct to all default values
141 : */
142 : void
143 466 : InitDumpOptions(DumpOptions *opts)
144 : {
145 466 : memset(opts, 0, sizeof(DumpOptions));
146 : /* set any fields that shouldn't default to zeroes */
147 466 : opts->include_everything = true;
148 466 : opts->cparams.promptPassword = TRI_DEFAULT;
149 466 : opts->dumpSections = DUMP_UNSECTIONED;
150 466 : }
151 :
152 : /*
153 : * Create a freshly allocated DumpOptions with options equivalent to those
154 : * found in the given RestoreOptions.
155 : */
156 : DumpOptions *
157 80 : dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
158 : {
159 80 : DumpOptions *dopt = NewDumpOptions();
160 :
161 : /* this is the inverse of what's at the end of pg_dump.c's main() */
162 80 : dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
163 80 : dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
164 80 : dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
165 80 : dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
166 80 : dopt->cparams.promptPassword = ropt->cparams.promptPassword;
167 80 : dopt->outputClean = ropt->dropSchema;
168 80 : dopt->dataOnly = ropt->dataOnly;
169 80 : dopt->schemaOnly = ropt->schemaOnly;
170 80 : dopt->if_exists = ropt->if_exists;
171 80 : dopt->column_inserts = ropt->column_inserts;
172 80 : dopt->dumpSections = ropt->dumpSections;
173 80 : dopt->aclsSkip = ropt->aclsSkip;
174 80 : dopt->outputSuperuser = ropt->superuser;
175 80 : dopt->outputCreateDB = ropt->createDB;
176 80 : dopt->outputNoOwner = ropt->noOwner;
177 80 : dopt->outputNoTableAm = ropt->noTableAm;
178 80 : dopt->outputNoTablespaces = ropt->noTablespace;
179 80 : dopt->disable_triggers = ropt->disable_triggers;
180 80 : dopt->use_setsessauth = ropt->use_setsessauth;
181 80 : dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
182 80 : dopt->dump_inserts = ropt->dump_inserts;
183 80 : dopt->no_comments = ropt->no_comments;
184 80 : dopt->no_publications = ropt->no_publications;
185 80 : dopt->no_security_labels = ropt->no_security_labels;
186 80 : dopt->no_subscriptions = ropt->no_subscriptions;
187 80 : dopt->lockWaitTimeout = ropt->lockWaitTimeout;
188 80 : dopt->include_everything = ropt->include_everything;
189 80 : dopt->enable_row_security = ropt->enable_row_security;
190 80 : dopt->sequence_data = ropt->sequence_data;
191 :
192 80 : return dopt;
193 : }
194 :
195 :
196 : /*
197 : * Wrapper functions.
198 : *
199 : * The objective is to make writing new formats and dumpers as simple
200 : * as possible, if necessary at the expense of extra function calls etc.
201 : *
202 : */
203 :
204 : /*
205 : * The dump worker setup needs lots of knowledge of the internals of pg_dump,
206 : * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
207 : * setup doesn't need to know anything much, so it's defined here.
208 : */
209 : static void
210 20 : setupRestoreWorker(Archive *AHX)
211 : {
212 20 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
213 :
214 20 : AH->ReopenPtr(AH);
215 20 : }
216 :
217 :
218 : /* Create a new archive */
219 : /* Public */
220 : Archive *
221 344 : CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
222 : const pg_compress_specification compression_spec,
223 : bool dosync, ArchiveMode mode,
224 : SetupWorkerPtrType setupDumpWorker,
225 : DataDirSyncMethod sync_method)
226 :
227 : {
228 344 : ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
229 : dosync, mode, setupDumpWorker, sync_method);
230 :
231 342 : return (Archive *) AH;
232 : }
233 :
234 : /* Open an existing archive */
235 : /* Public */
236 : Archive *
237 76 : OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
238 : {
239 : ArchiveHandle *AH;
240 76 : pg_compress_specification compression_spec = {0};
241 :
242 76 : compression_spec.algorithm = PG_COMPRESSION_NONE;
243 76 : AH = _allocAH(FileSpec, fmt, compression_spec, true,
244 : archModeRead, setupRestoreWorker,
245 : DATA_DIR_SYNC_METHOD_FSYNC);
246 :
247 76 : return (Archive *) AH;
248 : }
249 :
250 : /* Public */
251 : void
252 378 : CloseArchive(Archive *AHX)
253 : {
254 378 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
255 :
256 378 : AH->ClosePtr(AH);
257 :
258 : /* Close the output */
259 378 : errno = 0;
260 378 : if (!EndCompressFileHandle(AH->OF))
261 0 : pg_fatal("could not close output file: %m");
262 378 : }
263 :
264 : /* Public */
265 : void
266 730 : SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
267 : {
268 : /* Caller can omit dump options, in which case we synthesize them */
269 730 : if (dopt == NULL && ropt != NULL)
270 80 : dopt = dumpOptionsFromRestoreOptions(ropt);
271 :
272 : /* Save options for later access */
273 730 : AH->dopt = dopt;
274 730 : AH->ropt = ropt;
275 730 : }
276 :
277 : /* Public */
278 : void
279 372 : ProcessArchiveRestoreOptions(Archive *AHX)
280 : {
281 372 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
282 372 : RestoreOptions *ropt = AH->public.ropt;
283 : TocEntry *te;
284 : teSection curSection;
285 :
286 : /* Decide which TOC entries will be dumped/restored, and mark them */
287 372 : curSection = SECTION_PRE_DATA;
288 63848 : for (te = AH->toc->next; te != AH->toc; te = te->next)
289 : {
290 : /*
291 : * When writing an archive, we also take this opportunity to check
292 : * that we have generated the entries in a sane order that respects
293 : * the section divisions. When reading, don't complain, since buggy
294 : * old versions of pg_dump might generate out-of-order archives.
295 : */
296 63476 : if (AH->mode != archModeRead)
297 : {
298 53654 : switch (te->section)
299 : {
300 8896 : case SECTION_NONE:
301 : /* ok to be anywhere */
302 8896 : break;
303 26814 : case SECTION_PRE_DATA:
304 26814 : if (curSection != SECTION_PRE_DATA)
305 0 : pg_log_warning("archive items not in correct section order");
306 26814 : break;
307 8118 : case SECTION_DATA:
308 8118 : if (curSection == SECTION_POST_DATA)
309 0 : pg_log_warning("archive items not in correct section order");
310 8118 : break;
311 9826 : case SECTION_POST_DATA:
312 : /* ok no matter which section we were in */
313 9826 : break;
314 0 : default:
315 0 : pg_fatal("unexpected section code %d",
316 : (int) te->section);
317 : break;
318 : }
319 9822 : }
320 :
321 63476 : if (te->section != SECTION_NONE)
322 53118 : curSection = te->section;
323 :
324 63476 : te->reqs = _tocEntryRequired(te, curSection, AH);
325 : }
326 :
327 : /* Enforce strict names checking */
328 372 : if (ropt->strict_names)
329 0 : StrictNamesCheck(ropt);
330 372 : }
331 :
332 : /* Public */
333 : void
334 314 : RestoreArchive(Archive *AHX)
335 : {
336 314 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
337 314 : RestoreOptions *ropt = AH->public.ropt;
338 : bool parallel_mode;
339 : TocEntry *te;
340 : CompressFileHandle *sav;
341 :
342 314 : AH->stage = STAGE_INITIALIZING;
343 :
344 : /*
345 : * If we're going to do parallel restore, there are some restrictions.
346 : */
347 314 : parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
348 314 : if (parallel_mode)
349 : {
350 : /* We haven't got round to making this work for all archive formats */
351 8 : if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
352 0 : pg_fatal("parallel restore is not supported with this archive file format");
353 :
354 : /* Doesn't work if the archive represents dependencies as OIDs */
355 8 : if (AH->version < K_VERS_1_8)
356 0 : pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
357 :
358 : /*
359 : * It's also not gonna work if we can't reopen the input file, so
360 : * let's try that immediately.
361 : */
362 8 : AH->ReopenPtr(AH);
363 : }
364 :
365 : /*
366 : * Make sure we won't need (de)compression we haven't got
367 : */
368 314 : if (AH->PrintTocDataPtr != NULL)
369 : {
370 38334 : for (te = AH->toc->next; te != AH->toc; te = te->next)
371 : {
372 38204 : if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
373 : {
374 184 : char *errmsg = supports_compression(AH->compression_spec);
375 :
376 184 : if (errmsg)
377 0 : pg_fatal("cannot restore from compressed archive (%s)",
378 : errmsg);
379 : else
380 184 : break;
381 : }
382 : }
383 : }
384 :
385 : /*
386 : * Prepare index arrays, so we can assume we have them throughout restore.
387 : * It's possible we already did this, though.
388 : */
389 314 : if (AH->tocsByDumpId == NULL)
390 310 : buildTocEntryArrays(AH);
391 :
392 : /*
393 : * If we're using a DB connection, then connect it.
394 : */
395 314 : if (ropt->useDB)
396 : {
397 28 : pg_log_info("connecting to database for restore");
398 28 : if (AH->version < K_VERS_1_3)
399 0 : pg_fatal("direct database connections are not supported in pre-1.3 archives");
400 :
401 : /*
402 : * We don't want to guess at whether the dump will successfully
403 : * restore; allow the attempt regardless of the version of the restore
404 : * target.
405 : */
406 28 : AHX->minRemoteVersion = 0;
407 28 : AHX->maxRemoteVersion = 9999999;
408 :
409 28 : ConnectDatabase(AHX, &ropt->cparams, false);
410 :
411 : /*
412 : * If we're talking to the DB directly, don't send comments since they
413 : * obscure SQL when displaying errors
414 : */
415 28 : AH->noTocComments = 1;
416 : }
417 :
418 : /*
419 : * Work out if we have an implied data-only restore. This can happen if
420 : * the dump was data only or if the user has used a toc list to exclude
421 : * all of the schema data. All we do is look for schema entries - if none
422 : * are found then we set the dataOnly flag.
423 : *
424 : * We could scan for wanted TABLE entries, but that is not the same as
425 : * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
426 : */
427 314 : if (!ropt->dataOnly)
428 : {
429 302 : int impliedDataOnly = 1;
430 :
431 2390 : for (te = AH->toc->next; te != AH->toc; te = te->next)
432 : {
433 2348 : if ((te->reqs & REQ_SCHEMA) != 0)
434 : { /* It's schema, and it's wanted */
435 260 : impliedDataOnly = 0;
436 260 : break;
437 : }
438 : }
439 302 : if (impliedDataOnly)
440 : {
441 42 : ropt->dataOnly = impliedDataOnly;
442 42 : pg_log_info("implied data-only restore");
443 : }
444 : }
445 :
446 : /*
447 : * Setup the output file if necessary.
448 : */
449 314 : sav = SaveOutput(AH);
450 314 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
451 264 : SetOutput(AH, ropt->filename, ropt->compression_spec);
452 :
453 314 : ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
454 :
455 314 : if (AH->archiveRemoteVersion)
456 314 : ahprintf(AH, "-- Dumped from database version %s\n",
457 : AH->archiveRemoteVersion);
458 314 : if (AH->archiveDumpVersion)
459 314 : ahprintf(AH, "-- Dumped by pg_dump version %s\n",
460 : AH->archiveDumpVersion);
461 :
462 314 : ahprintf(AH, "\n");
463 :
464 314 : if (AH->public.verbose)
465 42 : dumpTimestamp(AH, "Started on", AH->createDate);
466 :
467 314 : if (ropt->single_txn)
468 : {
469 0 : if (AH->connection)
470 0 : StartTransaction(AHX);
471 : else
472 0 : ahprintf(AH, "BEGIN;\n\n");
473 : }
474 :
475 : /*
476 : * Establish important parameter values right away.
477 : */
478 314 : _doSetFixedOutputState(AH);
479 :
480 314 : AH->stage = STAGE_PROCESSING;
481 :
482 : /*
483 : * Drop the items at the start, in reverse order
484 : */
485 314 : if (ropt->dropSchema)
486 : {
487 1964 : for (te = AH->toc->prev; te != AH->toc; te = te->prev)
488 : {
489 1938 : AH->currentTE = te;
490 :
491 : /*
492 : * In createDB mode, issue a DROP *only* for the database as a
493 : * whole. Issuing drops against anything else would be wrong,
494 : * because at this point we're connected to the wrong database.
495 : * (The DATABASE PROPERTIES entry, if any, should be treated like
496 : * the DATABASE entry.)
497 : */
498 1938 : if (ropt->createDB)
499 : {
500 728 : if (strcmp(te->desc, "DATABASE") != 0 &&
501 712 : strcmp(te->desc, "DATABASE PROPERTIES") != 0)
502 700 : continue;
503 : }
504 :
505 : /* Otherwise, drop anything that's selected and has a dropStmt */
506 1238 : if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
507 : {
508 584 : bool not_allowed_in_txn = false;
509 :
510 584 : pg_log_info("dropping %s %s", te->desc, te->tag);
511 :
512 : /*
513 : * In --transaction-size mode, we have to temporarily exit our
514 : * transaction block to drop objects that can't be dropped
515 : * within a transaction.
516 : */
517 584 : if (ropt->txn_size > 0)
518 : {
519 24 : if (strcmp(te->desc, "DATABASE") == 0 ||
520 12 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
521 : {
522 24 : not_allowed_in_txn = true;
523 24 : if (AH->connection)
524 24 : CommitTransaction(AHX);
525 : else
526 0 : ahprintf(AH, "COMMIT;\n");
527 : }
528 : }
529 :
530 : /* Select owner and schema as necessary */
531 584 : _becomeOwner(AH, te);
532 584 : _selectOutputSchema(AH, te->namespace);
533 :
534 : /*
535 : * Now emit the DROP command, if the object has one. Note we
536 : * don't necessarily emit it verbatim; at this point we add an
537 : * appropriate IF EXISTS clause, if the user requested it.
538 : */
539 584 : if (strcmp(te->desc, "BLOB METADATA") == 0)
540 : {
541 : /* We must generate the per-blob commands */
542 8 : if (ropt->if_exists)
543 4 : IssueCommandPerBlob(AH, te,
544 : "SELECT pg_catalog.lo_unlink(oid) "
545 : "FROM pg_catalog.pg_largeobject_metadata "
546 : "WHERE oid = '", "'");
547 : else
548 4 : IssueCommandPerBlob(AH, te,
549 : "SELECT pg_catalog.lo_unlink('",
550 : "')");
551 : }
552 576 : else if (*te->dropStmt != '\0')
553 : {
554 554 : if (!ropt->if_exists ||
555 266 : strncmp(te->dropStmt, "--", 2) == 0)
556 : {
557 : /*
558 : * Without --if-exists, or if it's just a comment (as
559 : * happens for the public schema), print the dropStmt
560 : * as-is.
561 : */
562 290 : ahprintf(AH, "%s", te->dropStmt);
563 : }
564 : else
565 : {
566 : /*
567 : * Inject an appropriate spelling of "if exists". For
568 : * old-style large objects, we have a routine that
569 : * knows how to do it, without depending on
570 : * te->dropStmt; use that. For other objects we need
571 : * to parse the command.
572 : */
573 264 : if (strcmp(te->desc, "BLOB") == 0)
574 : {
575 0 : DropLOIfExists(AH, te->catalogId.oid);
576 : }
577 : else
578 : {
579 264 : char *dropStmt = pg_strdup(te->dropStmt);
580 264 : char *dropStmtOrig = dropStmt;
581 264 : PQExpBuffer ftStmt = createPQExpBuffer();
582 :
583 : /*
584 : * Need to inject IF EXISTS clause after ALTER
585 : * TABLE part in ALTER TABLE .. DROP statement
586 : */
587 264 : if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
588 : {
589 36 : appendPQExpBufferStr(ftStmt,
590 : "ALTER TABLE IF EXISTS");
591 36 : dropStmt = dropStmt + 11;
592 : }
593 :
594 : /*
595 : * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
596 : * not support the IF EXISTS clause, and therefore
597 : * we simply emit the original command for DEFAULT
598 : * objects (modulo the adjustment made above).
599 : *
600 : * Likewise, don't mess with DATABASE PROPERTIES.
601 : *
602 : * If we used CREATE OR REPLACE VIEW as a means of
603 : * quasi-dropping an ON SELECT rule, that should
604 : * be emitted unchanged as well.
605 : *
606 : * For other object types, we need to extract the
607 : * first part of the DROP which includes the
608 : * object type. Most of the time this matches
609 : * te->desc, so search for that; however for the
610 : * different kinds of CONSTRAINTs, we know to
611 : * search for hardcoded "DROP CONSTRAINT" instead.
612 : */
613 264 : if (strcmp(te->desc, "DEFAULT") == 0 ||
614 258 : strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
615 258 : strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
616 6 : appendPQExpBufferStr(ftStmt, dropStmt);
617 : else
618 : {
619 : char buffer[40];
620 : char *mark;
621 :
622 258 : if (strcmp(te->desc, "CONSTRAINT") == 0 ||
623 232 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
624 232 : strcmp(te->desc, "FK CONSTRAINT") == 0)
625 30 : strcpy(buffer, "DROP CONSTRAINT");
626 : else
627 228 : snprintf(buffer, sizeof(buffer), "DROP %s",
628 : te->desc);
629 :
630 258 : mark = strstr(dropStmt, buffer);
631 :
632 258 : if (mark)
633 : {
634 258 : *mark = '\0';
635 258 : appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
636 : dropStmt, buffer,
637 258 : mark + strlen(buffer));
638 : }
639 : else
640 : {
641 : /* complain and emit unmodified command */
642 0 : pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
643 : dropStmtOrig);
644 0 : appendPQExpBufferStr(ftStmt, dropStmt);
645 : }
646 : }
647 :
648 264 : ahprintf(AH, "%s", ftStmt->data);
649 :
650 264 : destroyPQExpBuffer(ftStmt);
651 264 : pg_free(dropStmtOrig);
652 : }
653 : }
654 : }
655 :
656 : /*
657 : * In --transaction-size mode, re-establish the transaction
658 : * block if needed; otherwise, commit after every N drops.
659 : */
660 584 : if (ropt->txn_size > 0)
661 : {
662 24 : if (not_allowed_in_txn)
663 : {
664 24 : if (AH->connection)
665 24 : StartTransaction(AHX);
666 : else
667 0 : ahprintf(AH, "BEGIN;\n");
668 24 : AH->txnCount = 0;
669 : }
670 0 : else if (++AH->txnCount >= ropt->txn_size)
671 : {
672 0 : if (AH->connection)
673 : {
674 0 : CommitTransaction(AHX);
675 0 : StartTransaction(AHX);
676 : }
677 : else
678 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n");
679 0 : AH->txnCount = 0;
680 : }
681 : }
682 : }
683 : }
684 :
685 : /*
686 : * _selectOutputSchema may have set currSchema to reflect the effect
687 : * of a "SET search_path" command it emitted. However, by now we may
688 : * have dropped that schema; or it might not have existed in the first
689 : * place. In either case the effective value of search_path will not
690 : * be what we think. Forcibly reset currSchema so that we will
691 : * re-establish the search_path setting when needed (after creating
692 : * the schema).
693 : *
694 : * If we treated users as pg_dump'able objects then we'd need to reset
695 : * currUser here too.
696 : */
697 26 : free(AH->currSchema);
698 26 : AH->currSchema = NULL;
699 : }
700 :
701 314 : if (parallel_mode)
702 : {
703 : /*
704 : * In parallel mode, turn control over to the parallel-restore logic.
705 : */
706 : ParallelState *pstate;
707 : TocEntry pending_list;
708 :
709 : /* The archive format module may need some setup for this */
710 8 : if (AH->PrepParallelRestorePtr)
711 8 : AH->PrepParallelRestorePtr(AH);
712 :
713 8 : pending_list_header_init(&pending_list);
714 :
715 : /* This runs PRE_DATA items and then disconnects from the database */
716 8 : restore_toc_entries_prefork(AH, &pending_list);
717 : Assert(AH->connection == NULL);
718 :
719 : /* ParallelBackupStart() will actually fork the processes */
720 8 : pstate = ParallelBackupStart(AH);
721 8 : restore_toc_entries_parallel(AH, pstate, &pending_list);
722 8 : ParallelBackupEnd(AH, pstate);
723 :
724 : /* reconnect the leader and see if we missed something */
725 8 : restore_toc_entries_postfork(AH, &pending_list);
726 : Assert(AH->connection != NULL);
727 : }
728 : else
729 : {
730 : /*
731 : * In serial mode, process everything in three phases: normal items,
732 : * then ACLs, then post-ACL items. We might be able to skip one or
733 : * both extra phases in some cases, eg data-only restores.
734 : */
735 306 : bool haveACL = false;
736 306 : bool havePostACL = false;
737 :
738 54540 : for (te = AH->toc->next; te != AH->toc; te = te->next)
739 : {
740 54236 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
741 2402 : continue; /* ignore if not to be dumped at all */
742 :
743 51834 : switch (_tocEntryRestorePass(te))
744 : {
745 47514 : case RESTORE_PASS_MAIN:
746 47514 : (void) restore_toc_entry(AH, te, false);
747 47512 : break;
748 3764 : case RESTORE_PASS_ACL:
749 3764 : haveACL = true;
750 3764 : break;
751 556 : case RESTORE_PASS_POST_ACL:
752 556 : havePostACL = true;
753 556 : break;
754 : }
755 54234 : }
756 :
757 304 : if (haveACL)
758 : {
759 52494 : for (te = AH->toc->next; te != AH->toc; te = te->next)
760 : {
761 103074 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
762 50716 : _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
763 3764 : (void) restore_toc_entry(AH, te, false);
764 : }
765 : }
766 :
767 304 : if (havePostACL)
768 : {
769 41594 : for (te = AH->toc->next; te != AH->toc; te = te->next)
770 : {
771 82362 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
772 40844 : _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
773 556 : (void) restore_toc_entry(AH, te, false);
774 : }
775 : }
776 : }
777 :
778 : /*
779 : * Close out any persistent transaction we may have. While these two
780 : * cases are started in different places, we can end both cases here.
781 : */
782 312 : if (ropt->single_txn || ropt->txn_size > 0)
783 : {
784 20 : if (AH->connection)
785 20 : CommitTransaction(AHX);
786 : else
787 0 : ahprintf(AH, "COMMIT;\n\n");
788 : }
789 :
790 312 : if (AH->public.verbose)
791 42 : dumpTimestamp(AH, "Completed on", time(NULL));
792 :
793 312 : ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
794 :
795 : /*
796 : * Clean up & we're done.
797 : */
798 312 : AH->stage = STAGE_FINALIZING;
799 :
800 312 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
801 264 : RestoreOutput(AH, sav);
802 :
803 312 : if (ropt->useDB)
804 28 : DisconnectDatabase(&AH->public);
805 312 : }
806 :
807 : /*
808 : * Restore a single TOC item. Used in both parallel and non-parallel restore;
809 : * is_parallel is true if we are in a worker child process.
810 : *
811 : * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
812 : * the parallel parent has to make the corresponding status update.
813 : */
814 : static int
815 52026 : restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
816 : {
817 52026 : RestoreOptions *ropt = AH->public.ropt;
818 52026 : int status = WORKER_OK;
819 : int reqs;
820 : bool defnDumped;
821 :
822 52026 : AH->currentTE = te;
823 :
824 : /* Dump any relevant dump warnings to stderr */
825 52026 : if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
826 : {
827 0 : if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
828 0 : pg_log_warning("warning from original dump file: %s", te->defn);
829 0 : else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
830 0 : pg_log_warning("warning from original dump file: %s", te->copyStmt);
831 : }
832 :
833 : /* Work out what, if anything, we want from this entry */
834 52026 : reqs = te->reqs;
835 :
836 52026 : defnDumped = false;
837 :
838 : /*
839 : * If it has a schema component that we want, then process that
840 : */
841 52026 : if ((reqs & REQ_SCHEMA) != 0)
842 : {
843 43834 : bool object_is_db = false;
844 :
845 : /*
846 : * In --transaction-size mode, must exit our transaction block to
847 : * create a database or set its properties.
848 : */
849 43834 : if (strcmp(te->desc, "DATABASE") == 0 ||
850 43750 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
851 : {
852 112 : object_is_db = true;
853 112 : if (ropt->txn_size > 0)
854 : {
855 40 : if (AH->connection)
856 40 : CommitTransaction(&AH->public);
857 : else
858 0 : ahprintf(AH, "COMMIT;\n\n");
859 : }
860 : }
861 :
862 : /* Show namespace in log message if available */
863 43834 : if (te->namespace)
864 41544 : pg_log_info("creating %s \"%s.%s\"",
865 : te->desc, te->namespace, te->tag);
866 : else
867 2290 : pg_log_info("creating %s \"%s\"",
868 : te->desc, te->tag);
869 :
870 43834 : _printTocEntry(AH, te, false);
871 43834 : defnDumped = true;
872 :
873 43834 : if (strcmp(te->desc, "TABLE") == 0)
874 : {
875 9074 : if (AH->lastErrorTE == te)
876 : {
877 : /*
878 : * We failed to create the table. If
879 : * --no-data-for-failed-tables was given, mark the
880 : * corresponding TABLE DATA to be ignored.
881 : *
882 : * In the parallel case this must be done in the parent, so we
883 : * just set the return value.
884 : */
885 0 : if (ropt->noDataForFailedTables)
886 : {
887 0 : if (is_parallel)
888 0 : status = WORKER_INHIBIT_DATA;
889 : else
890 0 : inhibit_data_for_failed_table(AH, te);
891 : }
892 : }
893 : else
894 : {
895 : /*
896 : * We created the table successfully. Mark the corresponding
897 : * TABLE DATA for possible truncation.
898 : *
899 : * In the parallel case this must be done in the parent, so we
900 : * just set the return value.
901 : */
902 9074 : if (is_parallel)
903 0 : status = WORKER_CREATE_DONE;
904 : else
905 9074 : mark_create_done(AH, te);
906 : }
907 : }
908 :
909 : /*
910 : * If we created a DB, connect to it. Also, if we changed DB
911 : * properties, reconnect to ensure that relevant GUC settings are
912 : * applied to our session. (That also restarts the transaction block
913 : * in --transaction-size mode.)
914 : */
915 43834 : if (object_is_db)
916 : {
917 112 : pg_log_info("connecting to new database \"%s\"", te->tag);
918 112 : _reconnectToDB(AH, te->tag);
919 : }
920 : }
921 :
922 : /*
923 : * If it has a data component that we want, then process that
924 : */
925 52026 : if ((reqs & REQ_DATA) != 0)
926 : {
927 : /*
928 : * hadDumper will be set if there is genuine data component for this
929 : * node. Otherwise, we need to check the defn field for statements
930 : * that need to be executed in data-only restores.
931 : */
932 8162 : if (te->hadDumper)
933 : {
934 : /*
935 : * If we can output the data, then restore it.
936 : */
937 7118 : if (AH->PrintTocDataPtr != NULL)
938 : {
939 7118 : _printTocEntry(AH, te, true);
940 :
941 7118 : if (strcmp(te->desc, "BLOBS") == 0 ||
942 6982 : strcmp(te->desc, "BLOB COMMENTS") == 0)
943 : {
944 136 : pg_log_info("processing %s", te->desc);
945 :
946 136 : _selectOutputSchema(AH, "pg_catalog");
947 :
948 : /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
949 136 : if (strcmp(te->desc, "BLOB COMMENTS") == 0)
950 0 : AH->outputKind = OUTPUT_OTHERDATA;
951 :
952 136 : AH->PrintTocDataPtr(AH, te);
953 :
954 136 : AH->outputKind = OUTPUT_SQLCMDS;
955 : }
956 : else
957 : {
958 : bool use_truncate;
959 :
960 6982 : _disableTriggersIfNecessary(AH, te);
961 :
962 : /* Select owner and schema as necessary */
963 6982 : _becomeOwner(AH, te);
964 6982 : _selectOutputSchema(AH, te->namespace);
965 :
966 6982 : pg_log_info("processing data for table \"%s.%s\"",
967 : te->namespace, te->tag);
968 :
969 : /*
970 : * In parallel restore, if we created the table earlier in
971 : * this run (so that we know it is empty) and we are not
972 : * restoring a load-via-partition-root data item then we
973 : * wrap the COPY in a transaction and precede it with a
974 : * TRUNCATE. If wal_level is set to minimal this prevents
975 : * WAL-logging the COPY. This obtains a speedup similar
976 : * to that from using single_txn mode in non-parallel
977 : * restores.
978 : *
979 : * We mustn't do this for load-via-partition-root cases
980 : * because some data might get moved across partition
981 : * boundaries, risking deadlock and/or loss of previously
982 : * loaded data. (We assume that all partitions of a
983 : * partitioned table will be treated the same way.)
984 : */
985 7014 : use_truncate = is_parallel && te->created &&
986 32 : !is_load_via_partition_root(te);
987 :
988 6982 : if (use_truncate)
989 : {
990 : /*
991 : * Parallel restore is always talking directly to a
992 : * server, so no need to see if we should issue BEGIN.
993 : */
994 20 : StartTransaction(&AH->public);
995 :
996 : /*
997 : * Issue TRUNCATE with ONLY so that child tables are
998 : * not wiped.
999 : */
1000 20 : ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1001 20 : fmtQualifiedId(te->namespace, te->tag));
1002 : }
1003 :
1004 : /*
1005 : * If we have a copy statement, use it.
1006 : */
1007 6982 : if (te->copyStmt && strlen(te->copyStmt) > 0)
1008 : {
1009 6844 : ahprintf(AH, "%s", te->copyStmt);
1010 6844 : AH->outputKind = OUTPUT_COPYDATA;
1011 : }
1012 : else
1013 138 : AH->outputKind = OUTPUT_OTHERDATA;
1014 :
1015 6982 : AH->PrintTocDataPtr(AH, te);
1016 :
1017 : /*
1018 : * Terminate COPY if needed.
1019 : */
1020 13822 : if (AH->outputKind == OUTPUT_COPYDATA &&
1021 6842 : RestoringToDB(AH))
1022 18 : EndDBCopyMode(&AH->public, te->tag);
1023 6980 : AH->outputKind = OUTPUT_SQLCMDS;
1024 :
1025 : /* close out the transaction started above */
1026 6980 : if (use_truncate)
1027 20 : CommitTransaction(&AH->public);
1028 :
1029 6980 : _enableTriggersIfNecessary(AH, te);
1030 : }
1031 : }
1032 : }
1033 1044 : else if (!defnDumped)
1034 : {
1035 : /* If we haven't already dumped the defn part, do so now */
1036 1044 : pg_log_info("executing %s %s", te->desc, te->tag);
1037 1044 : _printTocEntry(AH, te, false);
1038 : }
1039 : }
1040 :
1041 : /*
1042 : * If we emitted anything for this TOC entry, that counts as one action
1043 : * against the transaction-size limit. Commit if it's time to.
1044 : */
1045 52024 : if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
1046 : {
1047 3992 : if (++AH->txnCount >= ropt->txn_size)
1048 : {
1049 2 : if (AH->connection)
1050 : {
1051 2 : CommitTransaction(&AH->public);
1052 2 : StartTransaction(&AH->public);
1053 : }
1054 : else
1055 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1056 2 : AH->txnCount = 0;
1057 : }
1058 : }
1059 :
1060 52024 : if (AH->public.n_errors > 0 && status == WORKER_OK)
1061 0 : status = WORKER_IGNORED_ERRORS;
1062 :
1063 52024 : return status;
1064 : }
1065 :
1066 : /*
1067 : * Allocate a new RestoreOptions block.
1068 : * This is mainly so we can initialize it, but also for future expansion,
1069 : */
1070 : RestoreOptions *
1071 436 : NewRestoreOptions(void)
1072 : {
1073 : RestoreOptions *opts;
1074 :
1075 436 : opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
1076 :
1077 : /* set any fields that shouldn't default to zeroes */
1078 436 : opts->format = archUnknown;
1079 436 : opts->cparams.promptPassword = TRI_DEFAULT;
1080 436 : opts->dumpSections = DUMP_UNSECTIONED;
1081 436 : opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1082 436 : opts->compression_spec.level = 0;
1083 :
1084 436 : return opts;
1085 : }
1086 :
1087 : static void
1088 6982 : _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1089 : {
1090 6982 : RestoreOptions *ropt = AH->public.ropt;
1091 :
1092 : /* This hack is only needed in a data-only restore */
1093 6982 : if (!ropt->dataOnly || !ropt->disable_triggers)
1094 6922 : return;
1095 :
1096 60 : pg_log_info("disabling triggers for %s", te->tag);
1097 :
1098 : /*
1099 : * Become superuser if possible, since they are the only ones who can
1100 : * disable constraint triggers. If -S was not given, assume the initial
1101 : * user identity is a superuser. (XXX would it be better to become the
1102 : * table owner?)
1103 : */
1104 60 : _becomeUser(AH, ropt->superuser);
1105 :
1106 : /*
1107 : * Disable them.
1108 : */
1109 60 : ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1110 60 : fmtQualifiedId(te->namespace, te->tag));
1111 : }
1112 :
1113 : static void
1114 6980 : _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1115 : {
1116 6980 : RestoreOptions *ropt = AH->public.ropt;
1117 :
1118 : /* This hack is only needed in a data-only restore */
1119 6980 : if (!ropt->dataOnly || !ropt->disable_triggers)
1120 6920 : return;
1121 :
1122 60 : pg_log_info("enabling triggers for %s", te->tag);
1123 :
1124 : /*
1125 : * Become superuser if possible, since they are the only ones who can
1126 : * disable constraint triggers. If -S was not given, assume the initial
1127 : * user identity is a superuser. (XXX would it be better to become the
1128 : * table owner?)
1129 : */
1130 60 : _becomeUser(AH, ropt->superuser);
1131 :
1132 : /*
1133 : * Enable them.
1134 : */
1135 60 : ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1136 60 : fmtQualifiedId(te->namespace, te->tag));
1137 : }
1138 :
1139 : /*
1140 : * Detect whether a TABLE DATA TOC item is performing "load via partition
1141 : * root", that is the target table is an ancestor partition rather than the
1142 : * table the TOC item is nominally for.
1143 : *
1144 : * In newer archive files this can be detected by checking for a special
1145 : * comment placed in te->defn. In older files we have to fall back to seeing
1146 : * if the COPY statement targets the named table or some other one. This
1147 : * will not work for data dumped as INSERT commands, so we could give a false
1148 : * negative in that case; fortunately, that's a rarely-used option.
1149 : */
1150 : static bool
1151 32 : is_load_via_partition_root(TocEntry *te)
1152 : {
1153 32 : if (te->defn &&
1154 12 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
1155 12 : return true;
1156 20 : if (te->copyStmt && *te->copyStmt)
1157 : {
1158 12 : PQExpBuffer copyStmt = createPQExpBuffer();
1159 : bool result;
1160 :
1161 : /*
1162 : * Build the initial part of the COPY as it would appear if the
1163 : * nominal target table is the actual target. If we see anything
1164 : * else, it must be a load-via-partition-root case.
1165 : */
1166 12 : appendPQExpBuffer(copyStmt, "COPY %s ",
1167 12 : fmtQualifiedId(te->namespace, te->tag));
1168 12 : result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1169 12 : destroyPQExpBuffer(copyStmt);
1170 12 : return result;
1171 : }
1172 : /* Assume it's not load-via-partition-root */
1173 8 : return false;
1174 : }
1175 :
1176 : /*
1177 : * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1178 : */
1179 :
1180 : /* Public */
1181 : void
1182 3469476 : WriteData(Archive *AHX, const void *data, size_t dLen)
1183 : {
1184 3469476 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1185 :
1186 3469476 : if (!AH->currToc)
1187 0 : pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1188 :
1189 3469476 : AH->WriteDataPtr(AH, data, dLen);
1190 3469476 : }
1191 :
1192 : /*
1193 : * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1194 : * repository for all metadata. But the name has stuck.
1195 : *
1196 : * The new entry is added to the Archive's TOC list. Most callers can ignore
1197 : * the result value because nothing else need be done, but a few want to
1198 : * manipulate the TOC entry further.
1199 : */
1200 :
1201 : /* Public */
1202 : TocEntry *
1203 53654 : ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1204 : ArchiveOpts *opts)
1205 : {
1206 53654 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1207 : TocEntry *newToc;
1208 :
1209 53654 : newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1210 :
1211 53654 : AH->tocCount++;
1212 53654 : if (dumpId > AH->maxDumpId)
1213 11088 : AH->maxDumpId = dumpId;
1214 :
1215 53654 : newToc->prev = AH->toc->prev;
1216 53654 : newToc->next = AH->toc;
1217 53654 : AH->toc->prev->next = newToc;
1218 53654 : AH->toc->prev = newToc;
1219 :
1220 53654 : newToc->catalogId = catalogId;
1221 53654 : newToc->dumpId = dumpId;
1222 53654 : newToc->section = opts->section;
1223 :
1224 53654 : newToc->tag = pg_strdup(opts->tag);
1225 53654 : newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1226 53654 : newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1227 53654 : newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1228 53654 : newToc->relkind = opts->relkind;
1229 53654 : newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1230 53654 : newToc->desc = pg_strdup(opts->description);
1231 53654 : newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1232 53654 : newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1233 53654 : newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1234 :
1235 53654 : if (opts->nDeps > 0)
1236 : {
1237 17778 : newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1238 17778 : memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1239 17778 : newToc->nDeps = opts->nDeps;
1240 : }
1241 : else
1242 : {
1243 35876 : newToc->dependencies = NULL;
1244 35876 : newToc->nDeps = 0;
1245 : }
1246 :
1247 53654 : newToc->dataDumper = opts->dumpFn;
1248 53654 : newToc->dataDumperArg = opts->dumpArg;
1249 53654 : newToc->hadDumper = opts->dumpFn ? true : false;
1250 :
1251 53654 : newToc->formatData = NULL;
1252 53654 : newToc->dataLength = 0;
1253 :
1254 53654 : if (AH->ArchiveEntryPtr != NULL)
1255 9658 : AH->ArchiveEntryPtr(AH, newToc);
1256 :
1257 53654 : return newToc;
1258 : }
1259 :
1260 : /* Public */
1261 : void
1262 8 : PrintTOCSummary(Archive *AHX)
1263 : {
1264 8 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1265 8 : RestoreOptions *ropt = AH->public.ropt;
1266 : TocEntry *te;
1267 8 : pg_compress_specification out_compression_spec = {0};
1268 : teSection curSection;
1269 : CompressFileHandle *sav;
1270 : const char *fmtName;
1271 : char stamp_str[64];
1272 :
1273 : /* TOC is always uncompressed */
1274 8 : out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1275 :
1276 8 : sav = SaveOutput(AH);
1277 8 : if (ropt->filename)
1278 0 : SetOutput(AH, ropt->filename, out_compression_spec);
1279 :
1280 8 : if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1281 8 : localtime(&AH->createDate)) == 0)
1282 0 : strcpy(stamp_str, "[unknown]");
1283 :
1284 8 : ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1285 16 : ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1286 8 : sanitize_line(AH->archdbname, false),
1287 : AH->tocCount,
1288 : get_compress_algorithm_name(AH->compression_spec.algorithm));
1289 :
1290 8 : switch (AH->format)
1291 : {
1292 6 : case archCustom:
1293 6 : fmtName = "CUSTOM";
1294 6 : break;
1295 2 : case archDirectory:
1296 2 : fmtName = "DIRECTORY";
1297 2 : break;
1298 0 : case archTar:
1299 0 : fmtName = "TAR";
1300 0 : break;
1301 0 : default:
1302 0 : fmtName = "UNKNOWN";
1303 : }
1304 :
1305 8 : ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1306 8 : ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1307 8 : ahprintf(AH, "; Format: %s\n", fmtName);
1308 8 : ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1309 8 : ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1310 8 : if (AH->archiveRemoteVersion)
1311 8 : ahprintf(AH, "; Dumped from database version: %s\n",
1312 : AH->archiveRemoteVersion);
1313 8 : if (AH->archiveDumpVersion)
1314 8 : ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1315 : AH->archiveDumpVersion);
1316 :
1317 8 : ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1318 :
1319 8 : curSection = SECTION_PRE_DATA;
1320 2280 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1321 : {
1322 2272 : if (te->section != SECTION_NONE)
1323 1696 : curSection = te->section;
1324 2272 : if (ropt->verbose ||
1325 2272 : (_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
1326 : {
1327 : char *sanitized_name;
1328 : char *sanitized_schema;
1329 : char *sanitized_owner;
1330 :
1331 : /*
1332 : */
1333 2232 : sanitized_name = sanitize_line(te->tag, false);
1334 2232 : sanitized_schema = sanitize_line(te->namespace, true);
1335 2232 : sanitized_owner = sanitize_line(te->owner, false);
1336 :
1337 2232 : ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1338 : te->catalogId.tableoid, te->catalogId.oid,
1339 : te->desc, sanitized_schema, sanitized_name,
1340 : sanitized_owner);
1341 :
1342 2232 : free(sanitized_name);
1343 2232 : free(sanitized_schema);
1344 2232 : free(sanitized_owner);
1345 : }
1346 2272 : if (ropt->verbose && te->nDeps > 0)
1347 : {
1348 : int i;
1349 :
1350 0 : ahprintf(AH, ";\tdepends on:");
1351 0 : for (i = 0; i < te->nDeps; i++)
1352 0 : ahprintf(AH, " %d", te->dependencies[i]);
1353 0 : ahprintf(AH, "\n");
1354 : }
1355 : }
1356 :
1357 : /* Enforce strict names checking */
1358 8 : if (ropt->strict_names)
1359 0 : StrictNamesCheck(ropt);
1360 :
1361 8 : if (ropt->filename)
1362 0 : RestoreOutput(AH, sav);
1363 8 : }
1364 :
1365 : /***********
1366 : * Large Object Archival
1367 : ***********/
1368 :
1369 : /* Called by a dumper to signal start of a LO */
1370 : int
1371 148 : StartLO(Archive *AHX, Oid oid)
1372 : {
1373 148 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1374 :
1375 148 : if (!AH->StartLOPtr)
1376 0 : pg_fatal("large-object output not supported in chosen format");
1377 :
1378 148 : AH->StartLOPtr(AH, AH->currToc, oid);
1379 :
1380 148 : return 1;
1381 : }
1382 :
1383 : /* Called by a dumper to signal end of a LO */
1384 : int
1385 148 : EndLO(Archive *AHX, Oid oid)
1386 : {
1387 148 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1388 :
1389 148 : if (AH->EndLOPtr)
1390 148 : AH->EndLOPtr(AH, AH->currToc, oid);
1391 :
1392 148 : return 1;
1393 : }
1394 :
1395 : /**********
1396 : * Large Object Restoration
1397 : **********/
1398 :
1399 : /*
1400 : * Called by a format handler before a group of LOs is restored
1401 : */
1402 : void
1403 32 : StartRestoreLOs(ArchiveHandle *AH)
1404 : {
1405 32 : RestoreOptions *ropt = AH->public.ropt;
1406 :
1407 : /*
1408 : * LOs must be restored within a transaction block, since we need the LO
1409 : * handle to stay open while we write it. Establish a transaction unless
1410 : * there's one being used globally.
1411 : */
1412 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1413 : {
1414 32 : if (AH->connection)
1415 0 : StartTransaction(&AH->public);
1416 : else
1417 32 : ahprintf(AH, "BEGIN;\n\n");
1418 : }
1419 :
1420 32 : AH->loCount = 0;
1421 32 : }
1422 :
1423 : /*
1424 : * Called by a format handler after a group of LOs is restored
1425 : */
1426 : void
1427 32 : EndRestoreLOs(ArchiveHandle *AH)
1428 : {
1429 32 : RestoreOptions *ropt = AH->public.ropt;
1430 :
1431 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1432 : {
1433 32 : if (AH->connection)
1434 0 : CommitTransaction(&AH->public);
1435 : else
1436 32 : ahprintf(AH, "COMMIT;\n\n");
1437 : }
1438 :
1439 32 : pg_log_info(ngettext("restored %d large object",
1440 : "restored %d large objects",
1441 : AH->loCount),
1442 : AH->loCount);
1443 32 : }
1444 :
1445 :
1446 : /*
1447 : * Called by a format handler to initiate restoration of a LO
1448 : */
1449 : void
1450 32 : StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1451 : {
1452 32 : bool old_lo_style = (AH->version < K_VERS_1_12);
1453 : Oid loOid;
1454 :
1455 32 : AH->loCount++;
1456 :
1457 : /* Initialize the LO Buffer */
1458 32 : if (AH->lo_buf == NULL)
1459 : {
1460 : /* First time through (in this process) so allocate the buffer */
1461 16 : AH->lo_buf_size = LOBBUFSIZE;
1462 16 : AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
1463 : }
1464 32 : AH->lo_buf_used = 0;
1465 :
1466 32 : pg_log_info("restoring large object with OID %u", oid);
1467 :
1468 : /* With an old archive we must do drop and create logic here */
1469 32 : if (old_lo_style && drop)
1470 0 : DropLOIfExists(AH, oid);
1471 :
1472 32 : if (AH->connection)
1473 : {
1474 0 : if (old_lo_style)
1475 : {
1476 0 : loOid = lo_create(AH->connection, oid);
1477 0 : if (loOid == 0 || loOid != oid)
1478 0 : pg_fatal("could not create large object %u: %s",
1479 : oid, PQerrorMessage(AH->connection));
1480 : }
1481 0 : AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1482 0 : if (AH->loFd == -1)
1483 0 : pg_fatal("could not open large object %u: %s",
1484 : oid, PQerrorMessage(AH->connection));
1485 : }
1486 : else
1487 : {
1488 32 : if (old_lo_style)
1489 0 : ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1490 : oid, INV_WRITE);
1491 : else
1492 32 : ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1493 : oid, INV_WRITE);
1494 : }
1495 :
1496 32 : AH->writingLO = true;
1497 32 : }
1498 :
1499 : void
1500 32 : EndRestoreLO(ArchiveHandle *AH, Oid oid)
1501 : {
1502 32 : if (AH->lo_buf_used > 0)
1503 : {
1504 : /* Write remaining bytes from the LO buffer */
1505 16 : dump_lo_buf(AH);
1506 : }
1507 :
1508 32 : AH->writingLO = false;
1509 :
1510 32 : if (AH->connection)
1511 : {
1512 0 : lo_close(AH->connection, AH->loFd);
1513 0 : AH->loFd = -1;
1514 : }
1515 : else
1516 : {
1517 32 : ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1518 : }
1519 32 : }
1520 :
1521 : /***********
1522 : * Sorting and Reordering
1523 : ***********/
1524 :
1525 : void
1526 0 : SortTocFromFile(Archive *AHX)
1527 : {
1528 0 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1529 0 : RestoreOptions *ropt = AH->public.ropt;
1530 : FILE *fh;
1531 : StringInfoData linebuf;
1532 :
1533 : /* Allocate space for the 'wanted' array, and init it */
1534 0 : ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1535 :
1536 : /* Setup the file */
1537 0 : fh = fopen(ropt->tocFile, PG_BINARY_R);
1538 0 : if (!fh)
1539 0 : pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1540 :
1541 0 : initStringInfo(&linebuf);
1542 :
1543 0 : while (pg_get_line_buf(fh, &linebuf))
1544 : {
1545 : char *cmnt;
1546 : char *endptr;
1547 : DumpId id;
1548 : TocEntry *te;
1549 :
1550 : /* Truncate line at comment, if any */
1551 0 : cmnt = strchr(linebuf.data, ';');
1552 0 : if (cmnt != NULL)
1553 : {
1554 0 : cmnt[0] = '\0';
1555 0 : linebuf.len = cmnt - linebuf.data;
1556 : }
1557 :
1558 : /* Ignore if all blank */
1559 0 : if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1560 0 : continue;
1561 :
1562 : /* Get an ID, check it's valid and not already seen */
1563 0 : id = strtol(linebuf.data, &endptr, 10);
1564 0 : if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1565 0 : ropt->idWanted[id - 1])
1566 : {
1567 0 : pg_log_warning("line ignored: %s", linebuf.data);
1568 0 : continue;
1569 : }
1570 :
1571 : /* Find TOC entry */
1572 0 : te = getTocEntryByDumpId(AH, id);
1573 0 : if (!te)
1574 0 : pg_fatal("could not find entry for ID %d",
1575 : id);
1576 :
1577 : /* Mark it wanted */
1578 0 : ropt->idWanted[id - 1] = true;
1579 :
1580 : /*
1581 : * Move each item to the end of the list as it is selected, so that
1582 : * they are placed in the desired order. Any unwanted items will end
1583 : * up at the front of the list, which may seem unintuitive but it's
1584 : * what we need. In an ordinary serial restore that makes no
1585 : * difference, but in a parallel restore we need to mark unrestored
1586 : * items' dependencies as satisfied before we start examining
1587 : * restorable items. Otherwise they could have surprising
1588 : * side-effects on the order in which restorable items actually get
1589 : * restored.
1590 : */
1591 0 : _moveBefore(AH->toc, te);
1592 : }
1593 :
1594 0 : pg_free(linebuf.data);
1595 :
1596 0 : if (fclose(fh) != 0)
1597 0 : pg_fatal("could not close TOC file: %m");
1598 0 : }
1599 :
1600 : /**********************
1601 : * Convenience functions that look like standard IO functions
1602 : * for writing data when in dump mode.
1603 : **********************/
1604 :
1605 : /* Public */
1606 : void
1607 43244 : archputs(const char *s, Archive *AH)
1608 : {
1609 43244 : WriteData(AH, s, strlen(s));
1610 43244 : }
1611 :
1612 : /* Public */
1613 : int
1614 6796 : archprintf(Archive *AH, const char *fmt,...)
1615 : {
1616 6796 : int save_errno = errno;
1617 : char *p;
1618 6796 : size_t len = 128; /* initial assumption about buffer size */
1619 : size_t cnt;
1620 :
1621 : for (;;)
1622 0 : {
1623 : va_list args;
1624 :
1625 : /* Allocate work buffer. */
1626 6796 : p = (char *) pg_malloc(len);
1627 :
1628 : /* Try to format the data. */
1629 6796 : errno = save_errno;
1630 6796 : va_start(args, fmt);
1631 6796 : cnt = pvsnprintf(p, len, fmt, args);
1632 6796 : va_end(args);
1633 :
1634 6796 : if (cnt < len)
1635 6796 : break; /* success */
1636 :
1637 : /* Release buffer and loop around to try again with larger len. */
1638 0 : free(p);
1639 0 : len = cnt;
1640 : }
1641 :
1642 6796 : WriteData(AH, p, cnt);
1643 6796 : free(p);
1644 6796 : return (int) cnt;
1645 : }
1646 :
1647 :
1648 : /*******************************
1649 : * Stuff below here should be 'private' to the archiver routines
1650 : *******************************/
1651 :
1652 : static void
1653 264 : SetOutput(ArchiveHandle *AH, const char *filename,
1654 : const pg_compress_specification compression_spec)
1655 : {
1656 : CompressFileHandle *CFH;
1657 : const char *mode;
1658 264 : int fn = -1;
1659 :
1660 264 : if (filename)
1661 : {
1662 264 : if (strcmp(filename, "-") == 0)
1663 0 : fn = fileno(stdout);
1664 : }
1665 0 : else if (AH->FH)
1666 0 : fn = fileno(AH->FH);
1667 0 : else if (AH->fSpec)
1668 : {
1669 0 : filename = AH->fSpec;
1670 : }
1671 : else
1672 0 : fn = fileno(stdout);
1673 :
1674 264 : if (AH->mode == archModeAppend)
1675 86 : mode = PG_BINARY_A;
1676 : else
1677 178 : mode = PG_BINARY_W;
1678 :
1679 264 : CFH = InitCompressFileHandle(compression_spec);
1680 :
1681 264 : if (!CFH->open_func(filename, fn, mode, CFH))
1682 : {
1683 0 : if (filename)
1684 0 : pg_fatal("could not open output file \"%s\": %m", filename);
1685 : else
1686 0 : pg_fatal("could not open output file: %m");
1687 : }
1688 :
1689 264 : AH->OF = CFH;
1690 264 : }
1691 :
1692 : static CompressFileHandle *
1693 322 : SaveOutput(ArchiveHandle *AH)
1694 : {
1695 322 : return (CompressFileHandle *) AH->OF;
1696 : }
1697 :
1698 : static void
1699 264 : RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
1700 : {
1701 264 : errno = 0;
1702 264 : if (!EndCompressFileHandle(AH->OF))
1703 0 : pg_fatal("could not close output file: %m");
1704 :
1705 264 : AH->OF = savedOutput;
1706 264 : }
1707 :
1708 :
1709 :
1710 : /*
1711 : * Print formatted text to the output file (usually stdout).
1712 : */
1713 : int
1714 283472 : ahprintf(ArchiveHandle *AH, const char *fmt,...)
1715 : {
1716 283472 : int save_errno = errno;
1717 : char *p;
1718 283472 : size_t len = 128; /* initial assumption about buffer size */
1719 : size_t cnt;
1720 :
1721 : for (;;)
1722 16360 : {
1723 : va_list args;
1724 :
1725 : /* Allocate work buffer. */
1726 299832 : p = (char *) pg_malloc(len);
1727 :
1728 : /* Try to format the data. */
1729 299832 : errno = save_errno;
1730 299832 : va_start(args, fmt);
1731 299832 : cnt = pvsnprintf(p, len, fmt, args);
1732 299832 : va_end(args);
1733 :
1734 299832 : if (cnt < len)
1735 283472 : break; /* success */
1736 :
1737 : /* Release buffer and loop around to try again with larger len. */
1738 16360 : free(p);
1739 16360 : len = cnt;
1740 : }
1741 :
1742 283472 : ahwrite(p, 1, cnt, AH);
1743 283472 : free(p);
1744 283472 : return (int) cnt;
1745 : }
1746 :
1747 : /*
1748 : * Single place for logic which says 'We are restoring to a direct DB connection'.
1749 : */
1750 : static int
1751 3708820 : RestoringToDB(ArchiveHandle *AH)
1752 : {
1753 3708820 : RestoreOptions *ropt = AH->public.ropt;
1754 :
1755 3708820 : return (ropt && ropt->useDB && AH->connection);
1756 : }
1757 :
1758 : /*
1759 : * Dump the current contents of the LO data buffer while writing a LO
1760 : */
1761 : static void
1762 16 : dump_lo_buf(ArchiveHandle *AH)
1763 : {
1764 16 : if (AH->connection)
1765 : {
1766 : int res;
1767 :
1768 0 : res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1769 0 : pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1770 : "wrote %zu bytes of large object data (result = %d)",
1771 : AH->lo_buf_used),
1772 : AH->lo_buf_used, res);
1773 : /* We assume there are no short writes, only errors */
1774 0 : if (res != AH->lo_buf_used)
1775 0 : warn_or_exit_horribly(AH, "could not write to large object: %s",
1776 0 : PQerrorMessage(AH->connection));
1777 : }
1778 : else
1779 : {
1780 16 : PQExpBuffer buf = createPQExpBuffer();
1781 :
1782 16 : appendByteaLiteralAHX(buf,
1783 : (const unsigned char *) AH->lo_buf,
1784 : AH->lo_buf_used,
1785 : AH);
1786 :
1787 : /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1788 16 : AH->writingLO = false;
1789 16 : ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1790 16 : AH->writingLO = true;
1791 :
1792 16 : destroyPQExpBuffer(buf);
1793 : }
1794 16 : AH->lo_buf_used = 0;
1795 16 : }
1796 :
1797 :
1798 : /*
1799 : * Write buffer to the output file (usually stdout). This is used for
1800 : * outputting 'restore' scripts etc. It is even possible for an archive
1801 : * format to create a custom output routine to 'fake' a restore if it
1802 : * wants to generate a script (see TAR output).
1803 : */
1804 : void
1805 3704484 : ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1806 : {
1807 3704484 : int bytes_written = 0;
1808 :
1809 3704484 : if (AH->writingLO)
1810 : {
1811 26 : size_t remaining = size * nmemb;
1812 :
1813 26 : while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1814 : {
1815 0 : size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1816 :
1817 0 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1818 0 : ptr = (const void *) ((const char *) ptr + avail);
1819 0 : remaining -= avail;
1820 0 : AH->lo_buf_used += avail;
1821 0 : dump_lo_buf(AH);
1822 : }
1823 :
1824 26 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1825 26 : AH->lo_buf_used += remaining;
1826 :
1827 26 : bytes_written = size * nmemb;
1828 : }
1829 3704458 : else if (AH->CustomOutPtr)
1830 3534 : bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1831 :
1832 : /*
1833 : * If we're doing a restore, and it's direct to DB, and we're connected
1834 : * then send it to the DB.
1835 : */
1836 3700924 : else if (RestoringToDB(AH))
1837 8064 : bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1838 : else
1839 : {
1840 3692860 : CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1841 :
1842 3692860 : if (CFH->write_func(ptr, size * nmemb, CFH))
1843 3692860 : bytes_written = size * nmemb;
1844 : }
1845 :
1846 3704484 : if (bytes_written != size * nmemb)
1847 0 : WRITE_ERROR_EXIT;
1848 3704484 : }
1849 :
1850 : /* on some error, we may decide to go on... */
1851 : void
1852 0 : warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1853 : {
1854 : va_list ap;
1855 :
1856 0 : switch (AH->stage)
1857 : {
1858 :
1859 0 : case STAGE_NONE:
1860 : /* Do nothing special */
1861 0 : break;
1862 :
1863 0 : case STAGE_INITIALIZING:
1864 0 : if (AH->stage != AH->lastErrorStage)
1865 0 : pg_log_info("while INITIALIZING:");
1866 0 : break;
1867 :
1868 0 : case STAGE_PROCESSING:
1869 0 : if (AH->stage != AH->lastErrorStage)
1870 0 : pg_log_info("while PROCESSING TOC:");
1871 0 : break;
1872 :
1873 0 : case STAGE_FINALIZING:
1874 0 : if (AH->stage != AH->lastErrorStage)
1875 0 : pg_log_info("while FINALIZING:");
1876 0 : break;
1877 : }
1878 0 : if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1879 : {
1880 0 : pg_log_info("from TOC entry %d; %u %u %s %s %s",
1881 : AH->currentTE->dumpId,
1882 : AH->currentTE->catalogId.tableoid,
1883 : AH->currentTE->catalogId.oid,
1884 : AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1885 : AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1886 : AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1887 : }
1888 0 : AH->lastErrorStage = AH->stage;
1889 0 : AH->lastErrorTE = AH->currentTE;
1890 :
1891 0 : va_start(ap, fmt);
1892 0 : pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, ap);
1893 0 : va_end(ap);
1894 :
1895 0 : if (AH->public.exit_on_error)
1896 0 : exit_nicely(1);
1897 : else
1898 0 : AH->public.n_errors++;
1899 0 : }
1900 :
1901 : #ifdef NOT_USED
1902 :
1903 : static void
1904 : _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1905 : {
1906 : /* Unlink te from list */
1907 : te->prev->next = te->next;
1908 : te->next->prev = te->prev;
1909 :
1910 : /* and insert it after "pos" */
1911 : te->prev = pos;
1912 : te->next = pos->next;
1913 : pos->next->prev = te;
1914 : pos->next = te;
1915 : }
1916 : #endif
1917 :
1918 : static void
1919 0 : _moveBefore(TocEntry *pos, TocEntry *te)
1920 : {
1921 : /* Unlink te from list */
1922 0 : te->prev->next = te->next;
1923 0 : te->next->prev = te->prev;
1924 :
1925 : /* and insert it before "pos" */
1926 0 : te->prev = pos->prev;
1927 0 : te->next = pos;
1928 0 : pos->prev->next = te;
1929 0 : pos->prev = te;
1930 0 : }
1931 :
1932 : /*
1933 : * Build index arrays for the TOC list
1934 : *
1935 : * This should be invoked only after we have created or read in all the TOC
1936 : * items.
1937 : *
1938 : * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1939 : * array entries run only up to maxDumpId. We might see dependency dump IDs
1940 : * beyond that (if the dump was partial); so always check the array bound
1941 : * before trying to touch an array entry.
1942 : */
1943 : static void
1944 354 : buildTocEntryArrays(ArchiveHandle *AH)
1945 : {
1946 354 : DumpId maxDumpId = AH->maxDumpId;
1947 : TocEntry *te;
1948 :
1949 354 : AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1950 354 : AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1951 :
1952 63684 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1953 : {
1954 : /* this check is purely paranoia, maxDumpId should be correct */
1955 63330 : if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1956 0 : pg_fatal("bad dumpId");
1957 :
1958 : /* tocsByDumpId indexes all TOCs by their dump ID */
1959 63330 : AH->tocsByDumpId[te->dumpId] = te;
1960 :
1961 : /*
1962 : * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1963 : * TOC entry that has a DATA item. We compute this by reversing the
1964 : * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1965 : * just one dependency and it is the TABLE item.
1966 : */
1967 63330 : if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1968 : {
1969 7664 : DumpId tableId = te->dependencies[0];
1970 :
1971 : /*
1972 : * The TABLE item might not have been in the archive, if this was
1973 : * a data-only dump; but its dump ID should be less than its data
1974 : * item's dump ID, so there should be a place for it in the array.
1975 : */
1976 7664 : if (tableId <= 0 || tableId > maxDumpId)
1977 0 : pg_fatal("bad table dumpId for TABLE DATA item");
1978 :
1979 7664 : AH->tableDataId[tableId] = te->dumpId;
1980 : }
1981 : }
1982 354 : }
1983 :
1984 : TocEntry *
1985 21036 : getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1986 : {
1987 : /* build index arrays if we didn't already */
1988 21036 : if (AH->tocsByDumpId == NULL)
1989 44 : buildTocEntryArrays(AH);
1990 :
1991 21036 : if (id > 0 && id <= AH->maxDumpId)
1992 21036 : return AH->tocsByDumpId[id];
1993 :
1994 0 : return NULL;
1995 : }
1996 :
1997 : int
1998 20708 : TocIDRequired(ArchiveHandle *AH, DumpId id)
1999 : {
2000 20708 : TocEntry *te = getTocEntryByDumpId(AH, id);
2001 :
2002 20708 : if (!te)
2003 9780 : return 0;
2004 :
2005 10928 : return te->reqs;
2006 : }
2007 :
2008 : size_t
2009 12840 : WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
2010 : {
2011 : int off;
2012 :
2013 : /* Save the flag */
2014 12840 : AH->WriteBytePtr(AH, wasSet);
2015 :
2016 : /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2017 115560 : for (off = 0; off < sizeof(pgoff_t); off++)
2018 : {
2019 102720 : AH->WriteBytePtr(AH, o & 0xFF);
2020 102720 : o >>= 8;
2021 : }
2022 12840 : return sizeof(pgoff_t) + 1;
2023 : }
2024 :
2025 : int
2026 8282 : ReadOffset(ArchiveHandle *AH, pgoff_t * o)
2027 : {
2028 : int i;
2029 : int off;
2030 : int offsetFlg;
2031 :
2032 : /* Initialize to zero */
2033 8282 : *o = 0;
2034 :
2035 : /* Check for old version */
2036 8282 : if (AH->version < K_VERS_1_7)
2037 : {
2038 : /* Prior versions wrote offsets using WriteInt */
2039 0 : i = ReadInt(AH);
2040 : /* -1 means not set */
2041 0 : if (i < 0)
2042 0 : return K_OFFSET_POS_NOT_SET;
2043 0 : else if (i == 0)
2044 0 : return K_OFFSET_NO_DATA;
2045 :
2046 : /* Cast to pgoff_t because it was written as an int. */
2047 0 : *o = (pgoff_t) i;
2048 0 : return K_OFFSET_POS_SET;
2049 : }
2050 :
2051 : /*
2052 : * Read the flag indicating the state of the data pointer. Check if valid
2053 : * and die if not.
2054 : *
2055 : * This used to be handled by a negative or zero pointer, now we use an
2056 : * extra byte specifically for the state.
2057 : */
2058 8282 : offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2059 :
2060 8282 : switch (offsetFlg)
2061 : {
2062 8282 : case K_OFFSET_POS_NOT_SET:
2063 : case K_OFFSET_NO_DATA:
2064 : case K_OFFSET_POS_SET:
2065 :
2066 8282 : break;
2067 :
2068 0 : default:
2069 0 : pg_fatal("unexpected data offset flag %d", offsetFlg);
2070 : }
2071 :
2072 : /*
2073 : * Read the bytes
2074 : */
2075 74538 : for (off = 0; off < AH->offSize; off++)
2076 : {
2077 66256 : if (off < sizeof(pgoff_t))
2078 66256 : *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2079 : else
2080 : {
2081 0 : if (AH->ReadBytePtr(AH) != 0)
2082 0 : pg_fatal("file offset in dump file is too large");
2083 : }
2084 : }
2085 :
2086 8282 : return offsetFlg;
2087 : }
2088 :
2089 : size_t
2090 300890 : WriteInt(ArchiveHandle *AH, int i)
2091 : {
2092 : int b;
2093 :
2094 : /*
2095 : * This is a bit yucky, but I don't want to make the binary format very
2096 : * dependent on representation, and not knowing much about it, I write out
2097 : * a sign byte. If you change this, don't forget to change the file
2098 : * version #, and modify ReadInt to read the new format AS WELL AS the old
2099 : * formats.
2100 : */
2101 :
2102 : /* SIGN byte */
2103 300890 : if (i < 0)
2104 : {
2105 63372 : AH->WriteBytePtr(AH, 1);
2106 63372 : i = -i;
2107 : }
2108 : else
2109 237518 : AH->WriteBytePtr(AH, 0);
2110 :
2111 1504450 : for (b = 0; b < AH->intSize; b++)
2112 : {
2113 1203560 : AH->WriteBytePtr(AH, i & 0xFF);
2114 1203560 : i >>= 8;
2115 : }
2116 :
2117 300890 : return AH->intSize + 1;
2118 : }
2119 :
2120 : int
2121 228822 : ReadInt(ArchiveHandle *AH)
2122 : {
2123 228822 : int res = 0;
2124 : int bv,
2125 : b;
2126 228822 : int sign = 0; /* Default positive */
2127 228822 : int bitShift = 0;
2128 :
2129 228822 : if (AH->version > K_VERS_1_0)
2130 : /* Read a sign byte */
2131 228822 : sign = AH->ReadBytePtr(AH);
2132 :
2133 1144110 : for (b = 0; b < AH->intSize; b++)
2134 : {
2135 915288 : bv = AH->ReadBytePtr(AH) & 0xFF;
2136 915288 : if (bv != 0)
2137 214540 : res = res + (bv << bitShift);
2138 915288 : bitShift += 8;
2139 : }
2140 :
2141 228822 : if (sign)
2142 49302 : res = -res;
2143 :
2144 228822 : return res;
2145 : }
2146 :
2147 : size_t
2148 235310 : WriteStr(ArchiveHandle *AH, const char *c)
2149 : {
2150 : size_t res;
2151 :
2152 235310 : if (c)
2153 : {
2154 171938 : int len = strlen(c);
2155 :
2156 171938 : res = WriteInt(AH, len);
2157 171938 : AH->WriteBufPtr(AH, c, len);
2158 171938 : res += len;
2159 : }
2160 : else
2161 63372 : res = WriteInt(AH, -1);
2162 :
2163 235310 : return res;
2164 : }
2165 :
2166 : char *
2167 179122 : ReadStr(ArchiveHandle *AH)
2168 : {
2169 : char *buf;
2170 : int l;
2171 :
2172 179122 : l = ReadInt(AH);
2173 179122 : if (l < 0)
2174 49302 : buf = NULL;
2175 : else
2176 : {
2177 129820 : buf = (char *) pg_malloc(l + 1);
2178 129820 : AH->ReadBufPtr(AH, (void *) buf, l);
2179 :
2180 129820 : buf[l] = '\0';
2181 : }
2182 :
2183 179122 : return buf;
2184 : }
2185 :
2186 : static bool
2187 22 : _fileExistsInDirectory(const char *dir, const char *filename)
2188 : {
2189 : struct stat st;
2190 : char buf[MAXPGPATH];
2191 :
2192 22 : if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2193 0 : pg_fatal("directory name too long: \"%s\"", dir);
2194 :
2195 22 : return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2196 : }
2197 :
2198 : static int
2199 56 : _discoverArchiveFormat(ArchiveHandle *AH)
2200 : {
2201 : FILE *fh;
2202 : char sig[6]; /* More than enough */
2203 : size_t cnt;
2204 56 : int wantClose = 0;
2205 :
2206 56 : pg_log_debug("attempting to ascertain archive format");
2207 :
2208 56 : free(AH->lookahead);
2209 :
2210 56 : AH->readHeader = 0;
2211 56 : AH->lookaheadSize = 512;
2212 56 : AH->lookahead = pg_malloc0(512);
2213 56 : AH->lookaheadLen = 0;
2214 56 : AH->lookaheadPos = 0;
2215 :
2216 56 : if (AH->fSpec)
2217 : {
2218 : struct stat st;
2219 :
2220 56 : wantClose = 1;
2221 :
2222 : /*
2223 : * Check if the specified archive is a directory. If so, check if
2224 : * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2225 : */
2226 56 : if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2227 : {
2228 22 : AH->format = archDirectory;
2229 22 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2230 22 : return AH->format;
2231 : #ifdef HAVE_LIBZ
2232 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2233 0 : return AH->format;
2234 : #endif
2235 : #ifdef USE_LZ4
2236 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2237 0 : return AH->format;
2238 : #endif
2239 : #ifdef USE_ZSTD
2240 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2241 : return AH->format;
2242 : #endif
2243 0 : pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2244 : AH->fSpec);
2245 : fh = NULL; /* keep compiler quiet */
2246 : }
2247 : else
2248 : {
2249 34 : fh = fopen(AH->fSpec, PG_BINARY_R);
2250 34 : if (!fh)
2251 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2252 : }
2253 : }
2254 : else
2255 : {
2256 0 : fh = stdin;
2257 0 : if (!fh)
2258 0 : pg_fatal("could not open input file: %m");
2259 : }
2260 :
2261 34 : if ((cnt = fread(sig, 1, 5, fh)) != 5)
2262 : {
2263 0 : if (ferror(fh))
2264 0 : pg_fatal("could not read input file: %m");
2265 : else
2266 0 : pg_fatal("input file is too short (read %lu, expected 5)",
2267 : (unsigned long) cnt);
2268 : }
2269 :
2270 : /* Save it, just in case we need it later */
2271 34 : memcpy(&AH->lookahead[0], sig, 5);
2272 34 : AH->lookaheadLen = 5;
2273 :
2274 34 : if (strncmp(sig, "PGDMP", 5) == 0)
2275 : {
2276 : /* It's custom format, stop here */
2277 32 : AH->format = archCustom;
2278 32 : AH->readHeader = 1;
2279 : }
2280 : else
2281 : {
2282 : /*
2283 : * *Maybe* we have a tar archive format file or a text dump ... So,
2284 : * read first 512 byte header...
2285 : */
2286 2 : cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2287 : /* read failure is checked below */
2288 2 : AH->lookaheadLen += cnt;
2289 :
2290 2 : if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2291 2 : (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2292 2 : strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2293 : {
2294 : /*
2295 : * looks like it's probably a text format dump. so suggest they
2296 : * try psql
2297 : */
2298 0 : pg_fatal("input file appears to be a text format dump. Please use psql.");
2299 : }
2300 :
2301 2 : if (AH->lookaheadLen != 512)
2302 : {
2303 0 : if (feof(fh))
2304 0 : pg_fatal("input file does not appear to be a valid archive (too short?)");
2305 : else
2306 0 : READ_ERROR_EXIT(fh);
2307 : }
2308 :
2309 2 : if (!isValidTarHeader(AH->lookahead))
2310 0 : pg_fatal("input file does not appear to be a valid archive");
2311 :
2312 2 : AH->format = archTar;
2313 : }
2314 :
2315 : /* Close the file if we opened it */
2316 34 : if (wantClose)
2317 : {
2318 34 : if (fclose(fh) != 0)
2319 0 : pg_fatal("could not close input file: %m");
2320 : /* Forget lookahead, since we'll re-read header after re-opening */
2321 34 : AH->readHeader = 0;
2322 34 : AH->lookaheadLen = 0;
2323 : }
2324 :
2325 34 : return AH->format;
2326 : }
2327 :
2328 :
2329 : /*
2330 : * Allocate an archive handle
2331 : */
2332 : static ArchiveHandle *
2333 420 : _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2334 : const pg_compress_specification compression_spec,
2335 : bool dosync, ArchiveMode mode,
2336 : SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
2337 : {
2338 : ArchiveHandle *AH;
2339 : CompressFileHandle *CFH;
2340 420 : pg_compress_specification out_compress_spec = {0};
2341 :
2342 420 : pg_log_debug("allocating AH for %s, format %d",
2343 : FileSpec ? FileSpec : "(stdio)", fmt);
2344 :
2345 420 : AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2346 :
2347 420 : AH->version = K_VERS_SELF;
2348 :
2349 : /* initialize for backwards compatible string processing */
2350 420 : AH->public.encoding = 0; /* PG_SQL_ASCII */
2351 420 : AH->public.std_strings = false;
2352 :
2353 : /* sql error handling */
2354 420 : AH->public.exit_on_error = true;
2355 420 : AH->public.n_errors = 0;
2356 :
2357 420 : AH->archiveDumpVersion = PG_VERSION;
2358 :
2359 420 : AH->createDate = time(NULL);
2360 :
2361 420 : AH->intSize = sizeof(int);
2362 420 : AH->offSize = sizeof(pgoff_t);
2363 420 : if (FileSpec)
2364 : {
2365 370 : AH->fSpec = pg_strdup(FileSpec);
2366 :
2367 : /*
2368 : * Not used; maybe later....
2369 : *
2370 : * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2371 : * i--) if (AH->workDir[i-1] == '/')
2372 : */
2373 : }
2374 : else
2375 50 : AH->fSpec = NULL;
2376 :
2377 420 : AH->currUser = NULL; /* unknown */
2378 420 : AH->currSchema = NULL; /* ditto */
2379 420 : AH->currTablespace = NULL; /* ditto */
2380 420 : AH->currTableAm = NULL; /* ditto */
2381 :
2382 420 : AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2383 :
2384 420 : AH->toc->next = AH->toc;
2385 420 : AH->toc->prev = AH->toc;
2386 :
2387 420 : AH->mode = mode;
2388 420 : AH->compression_spec = compression_spec;
2389 420 : AH->dosync = dosync;
2390 420 : AH->sync_method = sync_method;
2391 :
2392 420 : memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2393 :
2394 : /* Open stdout with no compression for AH output handle */
2395 420 : out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2396 420 : CFH = InitCompressFileHandle(out_compress_spec);
2397 420 : if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2398 0 : pg_fatal("could not open stdout for appending: %m");
2399 420 : AH->OF = CFH;
2400 :
2401 : /*
2402 : * On Windows, we need to use binary mode to read/write non-text files,
2403 : * which include all archive formats as well as compressed plain text.
2404 : * Force stdin/stdout into binary mode if that is what we are using.
2405 : */
2406 : #ifdef WIN32
2407 : if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2408 : (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2409 : {
2410 : if (mode == archModeWrite)
2411 : _setmode(fileno(stdout), O_BINARY);
2412 : else
2413 : _setmode(fileno(stdin), O_BINARY);
2414 : }
2415 : #endif
2416 :
2417 420 : AH->SetupWorkerPtr = setupWorkerPtr;
2418 :
2419 420 : if (fmt == archUnknown)
2420 56 : AH->format = _discoverArchiveFormat(AH);
2421 : else
2422 364 : AH->format = fmt;
2423 :
2424 420 : switch (AH->format)
2425 : {
2426 86 : case archCustom:
2427 86 : InitArchiveFmt_Custom(AH);
2428 86 : break;
2429 :
2430 280 : case archNull:
2431 280 : InitArchiveFmt_Null(AH);
2432 280 : break;
2433 :
2434 44 : case archDirectory:
2435 44 : InitArchiveFmt_Directory(AH);
2436 44 : break;
2437 :
2438 10 : case archTar:
2439 10 : InitArchiveFmt_Tar(AH);
2440 8 : break;
2441 :
2442 0 : default:
2443 0 : pg_fatal("unrecognized file format \"%d\"", fmt);
2444 : }
2445 :
2446 418 : return AH;
2447 : }
2448 :
2449 : /*
2450 : * Write out all data (tables & LOs)
2451 : */
2452 : void
2453 62 : WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2454 : {
2455 : TocEntry *te;
2456 :
2457 62 : if (pstate && pstate->numWorkers > 1)
2458 16 : {
2459 : /*
2460 : * In parallel mode, this code runs in the leader process. We
2461 : * construct an array of candidate TEs, then sort it into decreasing
2462 : * size order, then dispatch each TE to a data-transfer worker. By
2463 : * dumping larger tables first, we avoid getting into a situation
2464 : * where we're down to one job and it's big, losing parallelism.
2465 : */
2466 : TocEntry **tes;
2467 : int ntes;
2468 :
2469 16 : tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2470 16 : ntes = 0;
2471 2028 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2472 : {
2473 : /* Consider only TEs with dataDumper functions ... */
2474 2012 : if (!te->dataDumper)
2475 1776 : continue;
2476 : /* ... and ignore ones not enabled for dump */
2477 236 : if ((te->reqs & REQ_DATA) == 0)
2478 0 : continue;
2479 :
2480 236 : tes[ntes++] = te;
2481 : }
2482 :
2483 16 : if (ntes > 1)
2484 14 : qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2485 :
2486 252 : for (int i = 0; i < ntes; i++)
2487 236 : DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2488 : mark_dump_job_done, NULL);
2489 :
2490 16 : pg_free(tes);
2491 :
2492 : /* Now wait for workers to finish. */
2493 16 : WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2494 : }
2495 : else
2496 : {
2497 : /* Non-parallel mode: just dump all candidate TEs sequentially. */
2498 7692 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2499 : {
2500 : /* Must have same filter conditions as above */
2501 7646 : if (!te->dataDumper)
2502 7278 : continue;
2503 368 : if ((te->reqs & REQ_DATA) == 0)
2504 6 : continue;
2505 :
2506 362 : WriteDataChunksForTocEntry(AH, te);
2507 : }
2508 : }
2509 62 : }
2510 :
2511 :
2512 : /*
2513 : * Callback function that's invoked in the leader process after a step has
2514 : * been parallel dumped.
2515 : *
2516 : * We don't need to do anything except check for worker failure.
2517 : */
2518 : static void
2519 236 : mark_dump_job_done(ArchiveHandle *AH,
2520 : TocEntry *te,
2521 : int status,
2522 : void *callback_data)
2523 : {
2524 236 : pg_log_info("finished item %d %s %s",
2525 : te->dumpId, te->desc, te->tag);
2526 :
2527 236 : if (status != 0)
2528 0 : pg_fatal("worker process failed: exit code %d",
2529 : status);
2530 236 : }
2531 :
2532 :
2533 : void
2534 598 : WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2535 : {
2536 : StartDataPtrType startPtr;
2537 : EndDataPtrType endPtr;
2538 :
2539 598 : AH->currToc = te;
2540 :
2541 598 : if (strcmp(te->desc, "BLOBS") == 0)
2542 : {
2543 32 : startPtr = AH->StartLOsPtr;
2544 32 : endPtr = AH->EndLOsPtr;
2545 : }
2546 : else
2547 : {
2548 566 : startPtr = AH->StartDataPtr;
2549 566 : endPtr = AH->EndDataPtr;
2550 : }
2551 :
2552 598 : if (startPtr != NULL)
2553 598 : (*startPtr) (AH, te);
2554 :
2555 : /*
2556 : * The user-provided DataDumper routine needs to call AH->WriteData
2557 : */
2558 598 : te->dataDumper((Archive *) AH, te->dataDumperArg);
2559 :
2560 598 : if (endPtr != NULL)
2561 598 : (*endPtr) (AH, te);
2562 :
2563 598 : AH->currToc = NULL;
2564 598 : }
2565 :
2566 : void
2567 100 : WriteToc(ArchiveHandle *AH)
2568 : {
2569 : TocEntry *te;
2570 : char workbuf[32];
2571 : int tocCount;
2572 : int i;
2573 :
2574 : /* count entries that will actually be dumped */
2575 100 : tocCount = 0;
2576 16184 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2577 : {
2578 16084 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2579 16072 : tocCount++;
2580 : }
2581 :
2582 : /* printf("%d TOC Entries to save\n", tocCount); */
2583 :
2584 100 : WriteInt(AH, tocCount);
2585 :
2586 16184 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2587 : {
2588 16084 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2589 12 : continue;
2590 :
2591 16072 : WriteInt(AH, te->dumpId);
2592 16072 : WriteInt(AH, te->dataDumper ? 1 : 0);
2593 :
2594 : /* OID is recorded as a string for historical reasons */
2595 16072 : sprintf(workbuf, "%u", te->catalogId.tableoid);
2596 16072 : WriteStr(AH, workbuf);
2597 16072 : sprintf(workbuf, "%u", te->catalogId.oid);
2598 16072 : WriteStr(AH, workbuf);
2599 :
2600 16072 : WriteStr(AH, te->tag);
2601 16072 : WriteStr(AH, te->desc);
2602 16072 : WriteInt(AH, te->section);
2603 16072 : WriteStr(AH, te->defn);
2604 16072 : WriteStr(AH, te->dropStmt);
2605 16072 : WriteStr(AH, te->copyStmt);
2606 16072 : WriteStr(AH, te->namespace);
2607 16072 : WriteStr(AH, te->tablespace);
2608 16072 : WriteStr(AH, te->tableam);
2609 16072 : WriteInt(AH, te->relkind);
2610 16072 : WriteStr(AH, te->owner);
2611 16072 : WriteStr(AH, "false");
2612 :
2613 : /* Dump list of dependencies */
2614 39028 : for (i = 0; i < te->nDeps; i++)
2615 : {
2616 22956 : sprintf(workbuf, "%d", te->dependencies[i]);
2617 22956 : WriteStr(AH, workbuf);
2618 : }
2619 16072 : WriteStr(AH, NULL); /* Terminate List */
2620 :
2621 16072 : if (AH->WriteExtraTocPtr)
2622 16072 : AH->WriteExtraTocPtr(AH, te);
2623 : }
2624 100 : }
2625 :
2626 : void
2627 76 : ReadToc(ArchiveHandle *AH)
2628 : {
2629 : int i;
2630 : char *tmp;
2631 : DumpId *deps;
2632 : int depIdx;
2633 : int depSize;
2634 : TocEntry *te;
2635 : bool is_supported;
2636 :
2637 76 : AH->tocCount = ReadInt(AH);
2638 76 : AH->maxDumpId = 0;
2639 :
2640 12170 : for (i = 0; i < AH->tocCount; i++)
2641 : {
2642 12094 : te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2643 12094 : te->dumpId = ReadInt(AH);
2644 :
2645 12094 : if (te->dumpId > AH->maxDumpId)
2646 2690 : AH->maxDumpId = te->dumpId;
2647 :
2648 : /* Sanity check */
2649 12094 : if (te->dumpId <= 0)
2650 0 : pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2651 : te->dumpId);
2652 :
2653 12094 : te->hadDumper = ReadInt(AH);
2654 :
2655 12094 : if (AH->version >= K_VERS_1_8)
2656 : {
2657 12094 : tmp = ReadStr(AH);
2658 12094 : sscanf(tmp, "%u", &te->catalogId.tableoid);
2659 12094 : free(tmp);
2660 : }
2661 : else
2662 0 : te->catalogId.tableoid = InvalidOid;
2663 12094 : tmp = ReadStr(AH);
2664 12094 : sscanf(tmp, "%u", &te->catalogId.oid);
2665 12094 : free(tmp);
2666 :
2667 12094 : te->tag = ReadStr(AH);
2668 12094 : te->desc = ReadStr(AH);
2669 :
2670 12094 : if (AH->version >= K_VERS_1_11)
2671 : {
2672 12094 : te->section = ReadInt(AH);
2673 : }
2674 : else
2675 : {
2676 : /*
2677 : * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2678 : * the entries into sections. This list need not cover entry
2679 : * types added later than 8.4.
2680 : */
2681 0 : if (strcmp(te->desc, "COMMENT") == 0 ||
2682 0 : strcmp(te->desc, "ACL") == 0 ||
2683 0 : strcmp(te->desc, "ACL LANGUAGE") == 0)
2684 0 : te->section = SECTION_NONE;
2685 0 : else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2686 0 : strcmp(te->desc, "BLOBS") == 0 ||
2687 0 : strcmp(te->desc, "BLOB COMMENTS") == 0)
2688 0 : te->section = SECTION_DATA;
2689 0 : else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2690 0 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2691 0 : strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2692 0 : strcmp(te->desc, "INDEX") == 0 ||
2693 0 : strcmp(te->desc, "RULE") == 0 ||
2694 0 : strcmp(te->desc, "TRIGGER") == 0)
2695 0 : te->section = SECTION_POST_DATA;
2696 : else
2697 0 : te->section = SECTION_PRE_DATA;
2698 : }
2699 :
2700 12094 : te->defn = ReadStr(AH);
2701 12094 : te->dropStmt = ReadStr(AH);
2702 :
2703 12094 : if (AH->version >= K_VERS_1_3)
2704 12094 : te->copyStmt = ReadStr(AH);
2705 :
2706 12094 : if (AH->version >= K_VERS_1_6)
2707 12094 : te->namespace = ReadStr(AH);
2708 :
2709 12094 : if (AH->version >= K_VERS_1_10)
2710 12094 : te->tablespace = ReadStr(AH);
2711 :
2712 12094 : if (AH->version >= K_VERS_1_14)
2713 12094 : te->tableam = ReadStr(AH);
2714 :
2715 12094 : if (AH->version >= K_VERS_1_16)
2716 12094 : te->relkind = ReadInt(AH);
2717 :
2718 12094 : te->owner = ReadStr(AH);
2719 12094 : is_supported = true;
2720 12094 : if (AH->version < K_VERS_1_9)
2721 0 : is_supported = false;
2722 : else
2723 : {
2724 12094 : tmp = ReadStr(AH);
2725 :
2726 12094 : if (strcmp(tmp, "true") == 0)
2727 0 : is_supported = false;
2728 :
2729 12094 : free(tmp);
2730 : }
2731 :
2732 12094 : if (!is_supported)
2733 0 : pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2734 :
2735 : /* Read TOC entry dependencies */
2736 12094 : if (AH->version >= K_VERS_1_5)
2737 : {
2738 12094 : depSize = 100;
2739 12094 : deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2740 12094 : depIdx = 0;
2741 : for (;;)
2742 : {
2743 29954 : tmp = ReadStr(AH);
2744 29954 : if (!tmp)
2745 12094 : break; /* end of list */
2746 17860 : if (depIdx >= depSize)
2747 : {
2748 0 : depSize *= 2;
2749 0 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2750 : }
2751 17860 : sscanf(tmp, "%d", &deps[depIdx]);
2752 17860 : free(tmp);
2753 17860 : depIdx++;
2754 : }
2755 :
2756 12094 : if (depIdx > 0) /* We have a non-null entry */
2757 : {
2758 9654 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2759 9654 : te->dependencies = deps;
2760 9654 : te->nDeps = depIdx;
2761 : }
2762 : else
2763 : {
2764 2440 : free(deps);
2765 2440 : te->dependencies = NULL;
2766 2440 : te->nDeps = 0;
2767 : }
2768 : }
2769 : else
2770 : {
2771 0 : te->dependencies = NULL;
2772 0 : te->nDeps = 0;
2773 : }
2774 12094 : te->dataLength = 0;
2775 :
2776 12094 : if (AH->ReadExtraTocPtr)
2777 12094 : AH->ReadExtraTocPtr(AH, te);
2778 :
2779 12094 : pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2780 : i, te->dumpId, te->desc, te->tag);
2781 :
2782 : /* link completed entry into TOC circular list */
2783 12094 : te->prev = AH->toc->prev;
2784 12094 : AH->toc->prev->next = te;
2785 12094 : AH->toc->prev = te;
2786 12094 : te->next = AH->toc;
2787 :
2788 : /* special processing immediately upon read for some items */
2789 12094 : if (strcmp(te->desc, "ENCODING") == 0)
2790 76 : processEncodingEntry(AH, te);
2791 12018 : else if (strcmp(te->desc, "STDSTRINGS") == 0)
2792 76 : processStdStringsEntry(AH, te);
2793 11942 : else if (strcmp(te->desc, "SEARCHPATH") == 0)
2794 76 : processSearchPathEntry(AH, te);
2795 : }
2796 76 : }
2797 :
2798 : static void
2799 76 : processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2800 : {
2801 : /* te->defn should have the form SET client_encoding = 'foo'; */
2802 76 : char *defn = pg_strdup(te->defn);
2803 : char *ptr1;
2804 76 : char *ptr2 = NULL;
2805 : int encoding;
2806 :
2807 76 : ptr1 = strchr(defn, '\'');
2808 76 : if (ptr1)
2809 76 : ptr2 = strchr(++ptr1, '\'');
2810 76 : if (ptr2)
2811 : {
2812 76 : *ptr2 = '\0';
2813 76 : encoding = pg_char_to_encoding(ptr1);
2814 76 : if (encoding < 0)
2815 0 : pg_fatal("unrecognized encoding \"%s\"",
2816 : ptr1);
2817 76 : AH->public.encoding = encoding;
2818 : }
2819 : else
2820 0 : pg_fatal("invalid ENCODING item: %s",
2821 : te->defn);
2822 :
2823 76 : free(defn);
2824 76 : }
2825 :
2826 : static void
2827 76 : processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2828 : {
2829 : /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2830 : char *ptr1;
2831 :
2832 76 : ptr1 = strchr(te->defn, '\'');
2833 76 : if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2834 76 : AH->public.std_strings = true;
2835 0 : else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2836 0 : AH->public.std_strings = false;
2837 : else
2838 0 : pg_fatal("invalid STDSTRINGS item: %s",
2839 : te->defn);
2840 76 : }
2841 :
2842 : static void
2843 76 : processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2844 : {
2845 : /*
2846 : * te->defn should contain a command to set search_path. We just copy it
2847 : * verbatim for use later.
2848 : */
2849 76 : AH->public.searchpath = pg_strdup(te->defn);
2850 76 : }
2851 :
2852 : static void
2853 0 : StrictNamesCheck(RestoreOptions *ropt)
2854 : {
2855 : const char *missing_name;
2856 :
2857 : Assert(ropt->strict_names);
2858 :
2859 0 : if (ropt->schemaNames.head != NULL)
2860 : {
2861 0 : missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2862 0 : if (missing_name != NULL)
2863 0 : pg_fatal("schema \"%s\" not found", missing_name);
2864 : }
2865 :
2866 0 : if (ropt->tableNames.head != NULL)
2867 : {
2868 0 : missing_name = simple_string_list_not_touched(&ropt->tableNames);
2869 0 : if (missing_name != NULL)
2870 0 : pg_fatal("table \"%s\" not found", missing_name);
2871 : }
2872 :
2873 0 : if (ropt->indexNames.head != NULL)
2874 : {
2875 0 : missing_name = simple_string_list_not_touched(&ropt->indexNames);
2876 0 : if (missing_name != NULL)
2877 0 : pg_fatal("index \"%s\" not found", missing_name);
2878 : }
2879 :
2880 0 : if (ropt->functionNames.head != NULL)
2881 : {
2882 0 : missing_name = simple_string_list_not_touched(&ropt->functionNames);
2883 0 : if (missing_name != NULL)
2884 0 : pg_fatal("function \"%s\" not found", missing_name);
2885 : }
2886 :
2887 0 : if (ropt->triggerNames.head != NULL)
2888 : {
2889 0 : missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2890 0 : if (missing_name != NULL)
2891 0 : pg_fatal("trigger \"%s\" not found", missing_name);
2892 : }
2893 0 : }
2894 :
2895 : /*
2896 : * Determine whether we want to restore this TOC entry.
2897 : *
2898 : * Returns 0 if entry should be skipped, or some combination of the
2899 : * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2900 : * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2901 : */
2902 : static int
2903 65748 : _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2904 : {
2905 65748 : int res = REQ_SCHEMA | REQ_DATA;
2906 65748 : RestoreOptions *ropt = AH->public.ropt;
2907 :
2908 : /* These items are treated specially */
2909 65748 : if (strcmp(te->desc, "ENCODING") == 0 ||
2910 65368 : strcmp(te->desc, "STDSTRINGS") == 0 ||
2911 64988 : strcmp(te->desc, "SEARCHPATH") == 0)
2912 1140 : return REQ_SPECIAL;
2913 :
2914 : /*
2915 : * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2916 : * restored in createDB mode, and not restored otherwise, independently of
2917 : * all else.
2918 : */
2919 64608 : if (strcmp(te->desc, "DATABASE") == 0 ||
2920 64412 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2921 : {
2922 252 : if (ropt->createDB)
2923 196 : return REQ_SCHEMA;
2924 : else
2925 56 : return 0;
2926 : }
2927 :
2928 : /*
2929 : * Process exclusions that affect certain classes of TOC entries.
2930 : */
2931 :
2932 : /* If it's an ACL, maybe ignore it */
2933 64356 : if (ropt->aclsSkip && _tocEntryIsACL(te))
2934 0 : return 0;
2935 :
2936 : /* If it's a comment, maybe ignore it */
2937 64356 : if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2938 0 : return 0;
2939 :
2940 : /*
2941 : * If it's a publication or a table part of a publication, maybe ignore
2942 : * it.
2943 : */
2944 64356 : if (ropt->no_publications &&
2945 0 : (strcmp(te->desc, "PUBLICATION") == 0 ||
2946 0 : strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2947 0 : strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2948 0 : return 0;
2949 :
2950 : /* If it's a security label, maybe ignore it */
2951 64356 : if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2952 0 : return 0;
2953 :
2954 : /* If it's a subscription, maybe ignore it */
2955 64356 : if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2956 0 : return 0;
2957 :
2958 : /* Ignore it if section is not to be dumped/restored */
2959 64356 : switch (curSection)
2960 : {
2961 41866 : case SECTION_PRE_DATA:
2962 41866 : if (!(ropt->dumpSections & DUMP_PRE_DATA))
2963 700 : return 0;
2964 41166 : break;
2965 9468 : case SECTION_DATA:
2966 9468 : if (!(ropt->dumpSections & DUMP_DATA))
2967 172 : return 0;
2968 9296 : break;
2969 13022 : case SECTION_POST_DATA:
2970 13022 : if (!(ropt->dumpSections & DUMP_POST_DATA))
2971 308 : return 0;
2972 12714 : break;
2973 0 : default:
2974 : /* shouldn't get here, really, but ignore it */
2975 0 : return 0;
2976 : }
2977 :
2978 : /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2979 63176 : if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2980 0 : return 0;
2981 :
2982 : /*
2983 : * Check options for selective dump/restore.
2984 : */
2985 63176 : if (strcmp(te->desc, "ACL") == 0 ||
2986 58352 : strcmp(te->desc, "COMMENT") == 0 ||
2987 52554 : strcmp(te->desc, "SECURITY LABEL") == 0)
2988 : {
2989 : /* Database properties react to createDB, not selectivity options. */
2990 10622 : if (strncmp(te->tag, "DATABASE ", 9) == 0)
2991 : {
2992 118 : if (!ropt->createDB)
2993 38 : return 0;
2994 : }
2995 10504 : else if (ropt->schemaNames.head != NULL ||
2996 10504 : ropt->schemaExcludeNames.head != NULL ||
2997 10504 : ropt->selTypes)
2998 : {
2999 : /*
3000 : * In a selective dump/restore, we want to restore these dependent
3001 : * TOC entry types only if their parent object is being restored.
3002 : * Without selectivity options, we let through everything in the
3003 : * archive. Note there may be such entries with no parent, eg
3004 : * non-default ACLs for built-in objects. Also, we make
3005 : * per-column ACLs additionally depend on the table's ACL if any
3006 : * to ensure correct restore order, so those dependencies should
3007 : * be ignored in this check.
3008 : *
3009 : * This code depends on the parent having been marked already,
3010 : * which should be the case; if it isn't, perhaps due to
3011 : * SortTocFromFile rearrangement, skipping the dependent entry
3012 : * seems prudent anyway.
3013 : *
3014 : * Ideally we'd handle, eg, table CHECK constraints this way too.
3015 : * But it's hard to tell which of their dependencies is the one to
3016 : * consult.
3017 : */
3018 0 : bool dumpthis = false;
3019 :
3020 0 : for (int i = 0; i < te->nDeps; i++)
3021 : {
3022 0 : TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3023 :
3024 0 : if (!pte)
3025 0 : continue; /* probably shouldn't happen */
3026 0 : if (strcmp(pte->desc, "ACL") == 0)
3027 0 : continue; /* ignore dependency on another ACL */
3028 0 : if (pte->reqs == 0)
3029 0 : continue; /* this object isn't marked, so ignore it */
3030 : /* Found a parent to be dumped, so we want to dump this too */
3031 0 : dumpthis = true;
3032 0 : break;
3033 : }
3034 0 : if (!dumpthis)
3035 0 : return 0;
3036 : }
3037 : }
3038 : else
3039 : {
3040 : /* Apply selective-restore rules for standalone TOC entries. */
3041 52554 : if (ropt->schemaNames.head != NULL)
3042 : {
3043 : /* If no namespace is specified, it means all. */
3044 40 : if (!te->namespace)
3045 4 : return 0;
3046 36 : if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3047 28 : return 0;
3048 : }
3049 :
3050 52522 : if (ropt->schemaExcludeNames.head != NULL &&
3051 76 : te->namespace &&
3052 36 : simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3053 8 : return 0;
3054 :
3055 52514 : if (ropt->selTypes)
3056 : {
3057 156 : if (strcmp(te->desc, "TABLE") == 0 ||
3058 116 : strcmp(te->desc, "TABLE DATA") == 0 ||
3059 76 : strcmp(te->desc, "VIEW") == 0 ||
3060 76 : strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3061 76 : strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3062 76 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3063 76 : strcmp(te->desc, "SEQUENCE") == 0 ||
3064 70 : strcmp(te->desc, "SEQUENCE SET") == 0)
3065 : {
3066 92 : if (!ropt->selTable)
3067 60 : return 0;
3068 32 : if (ropt->tableNames.head != NULL &&
3069 32 : !simple_string_list_member(&ropt->tableNames, te->tag))
3070 28 : return 0;
3071 : }
3072 64 : else if (strcmp(te->desc, "INDEX") == 0)
3073 : {
3074 12 : if (!ropt->selIndex)
3075 8 : return 0;
3076 4 : if (ropt->indexNames.head != NULL &&
3077 4 : !simple_string_list_member(&ropt->indexNames, te->tag))
3078 2 : return 0;
3079 : }
3080 52 : else if (strcmp(te->desc, "FUNCTION") == 0 ||
3081 28 : strcmp(te->desc, "AGGREGATE") == 0 ||
3082 28 : strcmp(te->desc, "PROCEDURE") == 0)
3083 : {
3084 24 : if (!ropt->selFunction)
3085 8 : return 0;
3086 16 : if (ropt->functionNames.head != NULL &&
3087 16 : !simple_string_list_member(&ropt->functionNames, te->tag))
3088 12 : return 0;
3089 : }
3090 28 : else if (strcmp(te->desc, "TRIGGER") == 0)
3091 : {
3092 12 : if (!ropt->selTrigger)
3093 8 : return 0;
3094 4 : if (ropt->triggerNames.head != NULL &&
3095 4 : !simple_string_list_member(&ropt->triggerNames, te->tag))
3096 2 : return 0;
3097 : }
3098 : else
3099 16 : return 0;
3100 : }
3101 : }
3102 :
3103 : /*
3104 : * Determine whether the TOC entry contains schema and/or data components,
3105 : * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3106 : * it's both schema and data. Otherwise it's probably schema-only, but
3107 : * there are exceptions.
3108 : */
3109 62954 : if (!te->hadDumper)
3110 : {
3111 : /*
3112 : * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3113 : * is considered a data entry. We don't need to check for BLOBS or
3114 : * old-style BLOB COMMENTS entries, because they will have hadDumper =
3115 : * true ... but we do need to check new-style BLOB ACLs, comments,
3116 : * etc.
3117 : */
3118 55042 : if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3119 54106 : strcmp(te->desc, "BLOB") == 0 ||
3120 54106 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3121 53914 : (strcmp(te->desc, "ACL") == 0 &&
3122 4824 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3123 53826 : (strcmp(te->desc, "COMMENT") == 0 &&
3124 5760 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3125 53712 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3126 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3127 1330 : res = res & REQ_DATA;
3128 : else
3129 53712 : res = res & ~REQ_DATA;
3130 : }
3131 :
3132 : /*
3133 : * If there's no definition command, there's no schema component. Treat
3134 : * "load via partition root" comments as not schema.
3135 : */
3136 62954 : if (!te->defn || !te->defn[0] ||
3137 55066 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
3138 7912 : res = res & ~REQ_SCHEMA;
3139 :
3140 : /*
3141 : * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3142 : * always ignore it.
3143 : */
3144 62954 : if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3145 0 : return 0;
3146 :
3147 : /* Mask it if we only want schema */
3148 62954 : if (ropt->schemaOnly)
3149 : {
3150 : /*
3151 : * The sequence_data option overrides schemaOnly for SEQUENCE SET.
3152 : *
3153 : * In binary-upgrade mode, even with schemaOnly set, we do not mask
3154 : * out large objects. (Only large object definitions, comments and
3155 : * other metadata should be generated in binary-upgrade mode, not the
3156 : * actual data, but that need not concern us here.)
3157 : */
3158 5038 : if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3159 4920 : !(ropt->binary_upgrade &&
3160 4426 : (strcmp(te->desc, "BLOB") == 0 ||
3161 4426 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3162 4420 : (strcmp(te->desc, "ACL") == 0 &&
3163 164 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3164 4418 : (strcmp(te->desc, "COMMENT") == 0 &&
3165 108 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3166 4412 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3167 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3168 4906 : res = res & REQ_SCHEMA;
3169 : }
3170 :
3171 : /* Mask it if we only want data */
3172 62954 : if (ropt->dataOnly)
3173 290 : res = res & REQ_DATA;
3174 :
3175 62954 : return res;
3176 : }
3177 :
3178 : /*
3179 : * Identify which pass we should restore this TOC entry in.
3180 : *
3181 : * See notes with the RestorePass typedef in pg_backup_archiver.h.
3182 : */
3183 : static RestorePass
3184 143774 : _tocEntryRestorePass(TocEntry *te)
3185 : {
3186 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3187 143774 : if (strcmp(te->desc, "ACL") == 0 ||
3188 133652 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3189 133652 : strcmp(te->desc, "DEFAULT ACL") == 0)
3190 10854 : return RESTORE_PASS_ACL;
3191 132920 : if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3192 132706 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3193 1652 : return RESTORE_PASS_POST_ACL;
3194 :
3195 : /*
3196 : * Comments need to be emitted in the same pass as their parent objects.
3197 : * ACLs haven't got comments, and neither do matview data objects, but
3198 : * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3199 : * we'd need yet another weird special case.)
3200 : */
3201 131268 : if (strcmp(te->desc, "COMMENT") == 0 &&
3202 11650 : strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3203 0 : return RESTORE_PASS_POST_ACL;
3204 :
3205 : /* All else can be handled in the main pass. */
3206 131268 : return RESTORE_PASS_MAIN;
3207 : }
3208 :
3209 : /*
3210 : * Identify TOC entries that are ACLs.
3211 : *
3212 : * Note: it seems worth duplicating some code here to avoid a hard-wired
3213 : * assumption that these are exactly the same entries that we restore during
3214 : * the RESTORE_PASS_ACL phase.
3215 : */
3216 : static bool
3217 52464 : _tocEntryIsACL(TocEntry *te)
3218 : {
3219 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3220 52464 : if (strcmp(te->desc, "ACL") == 0 ||
3221 48944 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3222 48944 : strcmp(te->desc, "DEFAULT ACL") == 0)
3223 3764 : return true;
3224 48700 : return false;
3225 : }
3226 :
3227 : /*
3228 : * Issue SET commands for parameters that we want to have set the same way
3229 : * at all times during execution of a restore script.
3230 : */
3231 : static void
3232 454 : _doSetFixedOutputState(ArchiveHandle *AH)
3233 : {
3234 454 : RestoreOptions *ropt = AH->public.ropt;
3235 :
3236 : /*
3237 : * Disable timeouts to allow for slow commands, idle parallel workers, etc
3238 : */
3239 454 : ahprintf(AH, "SET statement_timeout = 0;\n");
3240 454 : ahprintf(AH, "SET lock_timeout = 0;\n");
3241 454 : ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3242 454 : ahprintf(AH, "SET transaction_timeout = 0;\n");
3243 :
3244 : /* Select the correct character set encoding */
3245 454 : ahprintf(AH, "SET client_encoding = '%s';\n",
3246 : pg_encoding_to_char(AH->public.encoding));
3247 :
3248 : /* Select the correct string literal syntax */
3249 454 : ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3250 454 : AH->public.std_strings ? "on" : "off");
3251 :
3252 : /* Select the role to be used during restore */
3253 454 : if (ropt && ropt->use_role)
3254 0 : ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3255 :
3256 : /* Select the dump-time search_path */
3257 454 : if (AH->public.searchpath)
3258 454 : ahprintf(AH, "%s", AH->public.searchpath);
3259 :
3260 : /* Make sure function checking is disabled */
3261 454 : ahprintf(AH, "SET check_function_bodies = false;\n");
3262 :
3263 : /* Ensure that all valid XML data will be accepted */
3264 454 : ahprintf(AH, "SET xmloption = content;\n");
3265 :
3266 : /* Avoid annoying notices etc */
3267 454 : ahprintf(AH, "SET client_min_messages = warning;\n");
3268 454 : if (!AH->public.std_strings)
3269 0 : ahprintf(AH, "SET escape_string_warning = off;\n");
3270 :
3271 : /* Adjust row-security state */
3272 454 : if (ropt && ropt->enable_row_security)
3273 0 : ahprintf(AH, "SET row_security = on;\n");
3274 : else
3275 454 : ahprintf(AH, "SET row_security = off;\n");
3276 :
3277 : /*
3278 : * In --transaction-size mode, we should always be in a transaction when
3279 : * we begin to restore objects.
3280 : */
3281 454 : if (ropt && ropt->txn_size > 0)
3282 : {
3283 60 : if (AH->connection)
3284 60 : StartTransaction(&AH->public);
3285 : else
3286 0 : ahprintf(AH, "\nBEGIN;\n");
3287 60 : AH->txnCount = 0;
3288 : }
3289 :
3290 454 : ahprintf(AH, "\n");
3291 454 : }
3292 :
3293 : /*
3294 : * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3295 : * for updating state if appropriate. If user is NULL or an empty string,
3296 : * the specification DEFAULT will be used.
3297 : */
3298 : static void
3299 2 : _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3300 : {
3301 2 : PQExpBuffer cmd = createPQExpBuffer();
3302 :
3303 2 : appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3304 :
3305 : /*
3306 : * SQL requires a string literal here. Might as well be correct.
3307 : */
3308 2 : if (user && *user)
3309 2 : appendStringLiteralAHX(cmd, user, AH);
3310 : else
3311 0 : appendPQExpBufferStr(cmd, "DEFAULT");
3312 2 : appendPQExpBufferChar(cmd, ';');
3313 :
3314 2 : if (RestoringToDB(AH))
3315 : {
3316 : PGresult *res;
3317 :
3318 0 : res = PQexec(AH->connection, cmd->data);
3319 :
3320 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3321 : /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3322 0 : pg_fatal("could not set session user to \"%s\": %s",
3323 : user, PQerrorMessage(AH->connection));
3324 :
3325 0 : PQclear(res);
3326 : }
3327 : else
3328 2 : ahprintf(AH, "%s\n\n", cmd->data);
3329 :
3330 2 : destroyPQExpBuffer(cmd);
3331 2 : }
3332 :
3333 :
3334 : /*
3335 : * Issue the commands to connect to the specified database.
3336 : *
3337 : * If we're currently restoring right into a database, this will
3338 : * actually establish a connection. Otherwise it puts a \connect into
3339 : * the script output.
3340 : */
3341 : static void
3342 112 : _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3343 : {
3344 112 : if (RestoringToDB(AH))
3345 42 : ReconnectToServer(AH, dbname);
3346 : else
3347 : {
3348 : PQExpBufferData connectbuf;
3349 :
3350 70 : initPQExpBuffer(&connectbuf);
3351 70 : appendPsqlMetaConnect(&connectbuf, dbname);
3352 70 : ahprintf(AH, "%s\n", connectbuf.data);
3353 70 : termPQExpBuffer(&connectbuf);
3354 : }
3355 :
3356 : /*
3357 : * NOTE: currUser keeps track of what the imaginary session user in our
3358 : * script is. It's now effectively reset to the original userID.
3359 : */
3360 112 : free(AH->currUser);
3361 112 : AH->currUser = NULL;
3362 :
3363 : /* don't assume we still know the output schema, tablespace, etc either */
3364 112 : free(AH->currSchema);
3365 112 : AH->currSchema = NULL;
3366 :
3367 112 : free(AH->currTableAm);
3368 112 : AH->currTableAm = NULL;
3369 :
3370 112 : free(AH->currTablespace);
3371 112 : AH->currTablespace = NULL;
3372 :
3373 : /* re-establish fixed state */
3374 112 : _doSetFixedOutputState(AH);
3375 112 : }
3376 :
3377 : /*
3378 : * Become the specified user, and update state to avoid redundant commands
3379 : *
3380 : * NULL or empty argument is taken to mean restoring the session default
3381 : */
3382 : static void
3383 120 : _becomeUser(ArchiveHandle *AH, const char *user)
3384 : {
3385 120 : if (!user)
3386 0 : user = ""; /* avoid null pointers */
3387 :
3388 120 : if (AH->currUser && strcmp(AH->currUser, user) == 0)
3389 118 : return; /* no need to do anything */
3390 :
3391 2 : _doSetSessionAuth(AH, user);
3392 :
3393 : /*
3394 : * NOTE: currUser keeps track of what the imaginary session user in our
3395 : * script is
3396 : */
3397 2 : free(AH->currUser);
3398 2 : AH->currUser = pg_strdup(user);
3399 : }
3400 :
3401 : /*
3402 : * Become the owner of the given TOC entry object. If
3403 : * changes in ownership are not allowed, this doesn't do anything.
3404 : */
3405 : static void
3406 59562 : _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3407 : {
3408 59562 : RestoreOptions *ropt = AH->public.ropt;
3409 :
3410 59562 : if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3411 59562 : return;
3412 :
3413 0 : _becomeUser(AH, te->owner);
3414 : }
3415 :
3416 :
3417 : /*
3418 : * Issue the commands to select the specified schema as the current schema
3419 : * in the target database.
3420 : */
3421 : static void
3422 59698 : _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3423 : {
3424 : PQExpBuffer qry;
3425 :
3426 : /*
3427 : * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3428 : * that search_path rather than switching to entry-specific paths.
3429 : * Otherwise, it's an old archive that will not restore correctly unless
3430 : * we set the search_path as it's expecting.
3431 : */
3432 59698 : if (AH->public.searchpath)
3433 59698 : return;
3434 :
3435 0 : if (!schemaName || *schemaName == '\0' ||
3436 0 : (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3437 0 : return; /* no need to do anything */
3438 :
3439 0 : qry = createPQExpBuffer();
3440 :
3441 0 : appendPQExpBuffer(qry, "SET search_path = %s",
3442 : fmtId(schemaName));
3443 0 : if (strcmp(schemaName, "pg_catalog") != 0)
3444 0 : appendPQExpBufferStr(qry, ", pg_catalog");
3445 :
3446 0 : if (RestoringToDB(AH))
3447 : {
3448 : PGresult *res;
3449 :
3450 0 : res = PQexec(AH->connection, qry->data);
3451 :
3452 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3453 0 : warn_or_exit_horribly(AH,
3454 : "could not set search_path to \"%s\": %s",
3455 0 : schemaName, PQerrorMessage(AH->connection));
3456 :
3457 0 : PQclear(res);
3458 : }
3459 : else
3460 0 : ahprintf(AH, "%s;\n\n", qry->data);
3461 :
3462 0 : free(AH->currSchema);
3463 0 : AH->currSchema = pg_strdup(schemaName);
3464 :
3465 0 : destroyPQExpBuffer(qry);
3466 : }
3467 :
3468 : /*
3469 : * Issue the commands to select the specified tablespace as the current one
3470 : * in the target database.
3471 : */
3472 : static void
3473 51996 : _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3474 : {
3475 51996 : RestoreOptions *ropt = AH->public.ropt;
3476 : PQExpBuffer qry;
3477 : const char *want,
3478 : *have;
3479 :
3480 : /* do nothing in --no-tablespaces mode */
3481 51996 : if (ropt->noTablespace)
3482 0 : return;
3483 :
3484 51996 : have = AH->currTablespace;
3485 51996 : want = tablespace;
3486 :
3487 : /* no need to do anything for non-tablespace object */
3488 51996 : if (!want)
3489 38094 : return;
3490 :
3491 13902 : if (have && strcmp(want, have) == 0)
3492 13618 : return; /* no need to do anything */
3493 :
3494 284 : qry = createPQExpBuffer();
3495 :
3496 284 : if (strcmp(want, "") == 0)
3497 : {
3498 : /* We want the tablespace to be the database's default */
3499 240 : appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3500 : }
3501 : else
3502 : {
3503 : /* We want an explicit tablespace */
3504 44 : appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3505 : }
3506 :
3507 284 : if (RestoringToDB(AH))
3508 : {
3509 : PGresult *res;
3510 :
3511 22 : res = PQexec(AH->connection, qry->data);
3512 :
3513 22 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3514 0 : warn_or_exit_horribly(AH,
3515 : "could not set default_tablespace to %s: %s",
3516 0 : fmtId(want), PQerrorMessage(AH->connection));
3517 :
3518 22 : PQclear(res);
3519 : }
3520 : else
3521 262 : ahprintf(AH, "%s;\n\n", qry->data);
3522 :
3523 284 : free(AH->currTablespace);
3524 284 : AH->currTablespace = pg_strdup(want);
3525 :
3526 284 : destroyPQExpBuffer(qry);
3527 : }
3528 :
3529 : /*
3530 : * Set the proper default_table_access_method value for the table.
3531 : */
3532 : static void
3533 50996 : _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3534 : {
3535 50996 : RestoreOptions *ropt = AH->public.ropt;
3536 : PQExpBuffer cmd;
3537 : const char *want,
3538 : *have;
3539 :
3540 : /* do nothing in --no-table-access-method mode */
3541 50996 : if (ropt->noTableAm)
3542 552 : return;
3543 :
3544 50444 : have = AH->currTableAm;
3545 50444 : want = tableam;
3546 :
3547 50444 : if (!want)
3548 41792 : return;
3549 :
3550 8652 : if (have && strcmp(want, have) == 0)
3551 8058 : return;
3552 :
3553 594 : cmd = createPQExpBuffer();
3554 594 : appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3555 :
3556 594 : if (RestoringToDB(AH))
3557 : {
3558 : PGresult *res;
3559 :
3560 22 : res = PQexec(AH->connection, cmd->data);
3561 :
3562 22 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3563 0 : warn_or_exit_horribly(AH,
3564 : "could not set default_table_access_method: %s",
3565 0 : PQerrorMessage(AH->connection));
3566 :
3567 22 : PQclear(res);
3568 : }
3569 : else
3570 572 : ahprintf(AH, "%s\n\n", cmd->data);
3571 :
3572 594 : destroyPQExpBuffer(cmd);
3573 :
3574 594 : free(AH->currTableAm);
3575 594 : AH->currTableAm = pg_strdup(want);
3576 : }
3577 :
3578 : /*
3579 : * Set the proper default table access method for a table without storage.
3580 : * Currently, this is required only for partitioned tables with a table AM.
3581 : */
3582 : static void
3583 1000 : _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
3584 : {
3585 1000 : RestoreOptions *ropt = AH->public.ropt;
3586 1000 : const char *tableam = te->tableam;
3587 : PQExpBuffer cmd;
3588 :
3589 : /* do nothing in --no-table-access-method mode */
3590 1000 : if (ropt->noTableAm)
3591 6 : return;
3592 :
3593 994 : if (!tableam)
3594 932 : return;
3595 :
3596 : Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3597 :
3598 62 : cmd = createPQExpBuffer();
3599 :
3600 62 : appendPQExpBufferStr(cmd, "ALTER TABLE ");
3601 62 : appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3602 62 : appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3603 : fmtId(tableam));
3604 :
3605 62 : if (RestoringToDB(AH))
3606 : {
3607 : PGresult *res;
3608 :
3609 0 : res = PQexec(AH->connection, cmd->data);
3610 :
3611 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3612 0 : warn_or_exit_horribly(AH,
3613 : "could not alter table access method: %s",
3614 0 : PQerrorMessage(AH->connection));
3615 0 : PQclear(res);
3616 : }
3617 : else
3618 62 : ahprintf(AH, "%s\n\n", cmd->data);
3619 :
3620 62 : destroyPQExpBuffer(cmd);
3621 : }
3622 :
3623 : /*
3624 : * Extract an object description for a TOC entry, and append it to buf.
3625 : *
3626 : * This is used for ALTER ... OWNER TO.
3627 : *
3628 : * If the object type has no owner, do nothing.
3629 : */
3630 : static void
3631 28790 : _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
3632 : {
3633 28790 : const char *type = te->desc;
3634 :
3635 : /* objects that don't require special decoration */
3636 28790 : if (strcmp(type, "COLLATION") == 0 ||
3637 27066 : strcmp(type, "CONVERSION") == 0 ||
3638 26748 : strcmp(type, "DOMAIN") == 0 ||
3639 26496 : strcmp(type, "FOREIGN TABLE") == 0 ||
3640 26430 : strcmp(type, "MATERIALIZED VIEW") == 0 ||
3641 25792 : strcmp(type, "SEQUENCE") == 0 ||
3642 25340 : strcmp(type, "STATISTICS") == 0 ||
3643 25116 : strcmp(type, "TABLE") == 0 ||
3644 16112 : strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3645 15902 : strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3646 15742 : strcmp(type, "TYPE") == 0 ||
3647 14892 : strcmp(type, "VIEW") == 0 ||
3648 : /* non-schema-specified objects */
3649 14204 : strcmp(type, "DATABASE") == 0 ||
3650 14120 : strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3651 14060 : strcmp(type, "SCHEMA") == 0 ||
3652 13760 : strcmp(type, "EVENT TRIGGER") == 0 ||
3653 13690 : strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3654 13604 : strcmp(type, "SERVER") == 0 ||
3655 13514 : strcmp(type, "PUBLICATION") == 0 ||
3656 13272 : strcmp(type, "SUBSCRIPTION") == 0)
3657 : {
3658 15702 : appendPQExpBuffer(buf, "%s ", type);
3659 15702 : if (te->namespace && *te->namespace)
3660 14586 : appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3661 15702 : appendPQExpBufferStr(buf, fmtId(te->tag));
3662 : }
3663 : /* LOs just have a name, but it's numeric so must not use fmtId */
3664 13088 : else if (strcmp(type, "BLOB") == 0)
3665 : {
3666 0 : appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3667 : }
3668 :
3669 : /*
3670 : * These object types require additional decoration. Fortunately, the
3671 : * information needed is exactly what's in the DROP command.
3672 : */
3673 13088 : else if (strcmp(type, "AGGREGATE") == 0 ||
3674 12554 : strcmp(type, "FUNCTION") == 0 ||
3675 9476 : strcmp(type, "OPERATOR") == 0 ||
3676 7706 : strcmp(type, "OPERATOR CLASS") == 0 ||
3677 7136 : strcmp(type, "OPERATOR FAMILY") == 0 ||
3678 6650 : strcmp(type, "PROCEDURE") == 0)
3679 : {
3680 : /* Chop "DROP " off the front and make a modifiable copy */
3681 6590 : char *first = pg_strdup(te->dropStmt + 5);
3682 : char *last;
3683 :
3684 : /* point to last character in string */
3685 6590 : last = first + strlen(first) - 1;
3686 :
3687 : /* Strip off any ';' or '\n' at the end */
3688 19770 : while (last >= first && (*last == '\n' || *last == ';'))
3689 13180 : last--;
3690 6590 : *(last + 1) = '\0';
3691 :
3692 6590 : appendPQExpBufferStr(buf, first);
3693 :
3694 6590 : free(first);
3695 6590 : return;
3696 : }
3697 : /* these object types don't have separate owners */
3698 6498 : else if (strcmp(type, "CAST") == 0 ||
3699 6498 : strcmp(type, "CHECK CONSTRAINT") == 0 ||
3700 6448 : strcmp(type, "CONSTRAINT") == 0 ||
3701 4232 : strcmp(type, "DATABASE PROPERTIES") == 0 ||
3702 4226 : strcmp(type, "DEFAULT") == 0 ||
3703 3900 : strcmp(type, "FK CONSTRAINT") == 0 ||
3704 3576 : strcmp(type, "INDEX") == 0 ||
3705 1660 : strcmp(type, "RULE") == 0 ||
3706 1246 : strcmp(type, "TRIGGER") == 0 ||
3707 522 : strcmp(type, "ROW SECURITY") == 0 ||
3708 522 : strcmp(type, "POLICY") == 0 ||
3709 60 : strcmp(type, "USER MAPPING") == 0)
3710 : {
3711 : /* do nothing */
3712 : }
3713 : else
3714 0 : pg_fatal("don't know how to set owner for object type \"%s\"", type);
3715 : }
3716 :
3717 : /*
3718 : * Emit the SQL commands to create the object represented by a TOC entry
3719 : *
3720 : * This now also includes issuing an ALTER OWNER command to restore the
3721 : * object's ownership, if wanted. But note that the object's permissions
3722 : * will remain at default, until the matching ACL TOC entry is restored.
3723 : */
3724 : static void
3725 51996 : _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3726 : {
3727 51996 : RestoreOptions *ropt = AH->public.ropt;
3728 :
3729 : /*
3730 : * Select owner, schema, tablespace and default AM as necessary. The
3731 : * default access method for partitioned tables is handled after
3732 : * generating the object definition, as it requires an ALTER command
3733 : * rather than SET.
3734 : */
3735 51996 : _becomeOwner(AH, te);
3736 51996 : _selectOutputSchema(AH, te->namespace);
3737 51996 : _selectTablespace(AH, te->tablespace);
3738 51996 : if (te->relkind != RELKIND_PARTITIONED_TABLE)
3739 50996 : _selectTableAccessMethod(AH, te->tableam);
3740 :
3741 : /* Emit header comment for item */
3742 51996 : if (!AH->noTocComments)
3743 : {
3744 : const char *pfx;
3745 : char *sanitized_name;
3746 : char *sanitized_schema;
3747 : char *sanitized_owner;
3748 :
3749 47842 : if (isData)
3750 7086 : pfx = "Data for ";
3751 : else
3752 40756 : pfx = "";
3753 :
3754 47842 : ahprintf(AH, "--\n");
3755 47842 : if (AH->public.verbose)
3756 : {
3757 1694 : ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3758 : te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3759 1694 : if (te->nDeps > 0)
3760 : {
3761 : int i;
3762 :
3763 988 : ahprintf(AH, "-- Dependencies:");
3764 2524 : for (i = 0; i < te->nDeps; i++)
3765 1536 : ahprintf(AH, " %d", te->dependencies[i]);
3766 988 : ahprintf(AH, "\n");
3767 : }
3768 : }
3769 :
3770 47842 : sanitized_name = sanitize_line(te->tag, false);
3771 47842 : sanitized_schema = sanitize_line(te->namespace, true);
3772 47842 : sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3773 :
3774 47842 : ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3775 : pfx, sanitized_name, te->desc, sanitized_schema,
3776 : sanitized_owner);
3777 :
3778 47842 : free(sanitized_name);
3779 47842 : free(sanitized_schema);
3780 47842 : free(sanitized_owner);
3781 :
3782 47842 : if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3783 : {
3784 : char *sanitized_tablespace;
3785 :
3786 76 : sanitized_tablespace = sanitize_line(te->tablespace, false);
3787 76 : ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3788 76 : free(sanitized_tablespace);
3789 : }
3790 47842 : ahprintf(AH, "\n");
3791 :
3792 47842 : if (AH->PrintExtraTocPtr != NULL)
3793 5800 : AH->PrintExtraTocPtr(AH, te);
3794 47842 : ahprintf(AH, "--\n\n");
3795 : }
3796 :
3797 : /*
3798 : * Actually print the definition. Normally we can just print the defn
3799 : * string if any, but we have three special cases:
3800 : *
3801 : * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3802 : * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3803 : * "public" that is a comment. We have to do this when --no-owner mode is
3804 : * selected. This is ugly, but I see no other good way ...
3805 : *
3806 : * 2. BLOB METADATA entries need special processing since their defn
3807 : * strings are just lists of OIDs, not complete SQL commands.
3808 : *
3809 : * 3. ACL LARGE OBJECTS entries need special processing because they
3810 : * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3811 : * apply to each large object listed in the associated BLOB METADATA.
3812 : */
3813 51996 : if (ropt->noOwner &&
3814 590 : strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3815 : {
3816 4 : ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3817 : }
3818 51992 : else if (strcmp(te->desc, "BLOB METADATA") == 0)
3819 : {
3820 142 : IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3821 : }
3822 51850 : else if (strcmp(te->desc, "ACL") == 0 &&
3823 3520 : strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3824 : {
3825 0 : IssueACLPerBlob(AH, te);
3826 : }
3827 : else
3828 : {
3829 51850 : if (te->defn && strlen(te->defn) > 0)
3830 44744 : ahprintf(AH, "%s\n\n", te->defn);
3831 : }
3832 :
3833 : /*
3834 : * If we aren't using SET SESSION AUTH to determine ownership, we must
3835 : * instead issue an ALTER OWNER command. Schema "public" is special; when
3836 : * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3837 : * when using SET SESSION for all other objects. We assume that anything
3838 : * without a DROP command is not a separately ownable object.
3839 : */
3840 51996 : if (!ropt->noOwner &&
3841 51406 : (!ropt->use_setsessauth ||
3842 0 : (strcmp(te->desc, "SCHEMA") == 0 &&
3843 0 : strncmp(te->defn, "--", 2) == 0)) &&
3844 51406 : te->owner && strlen(te->owner) > 0 &&
3845 50726 : te->dropStmt && strlen(te->dropStmt) > 0)
3846 : {
3847 28928 : if (strcmp(te->desc, "BLOB METADATA") == 0)
3848 : {
3849 : /* BLOB METADATA needs special code to handle multiple LOs */
3850 138 : char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
3851 :
3852 138 : IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
3853 138 : pg_free(cmdEnd);
3854 : }
3855 : else
3856 : {
3857 : /* For all other cases, we can use _getObjectDescription */
3858 : PQExpBufferData temp;
3859 :
3860 28790 : initPQExpBuffer(&temp);
3861 28790 : _getObjectDescription(&temp, te);
3862 :
3863 : /*
3864 : * If _getObjectDescription() didn't fill the buffer, then there
3865 : * is no owner.
3866 : */
3867 28790 : if (temp.data[0])
3868 22292 : ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
3869 22292 : temp.data, fmtId(te->owner));
3870 28790 : termPQExpBuffer(&temp);
3871 : }
3872 : }
3873 :
3874 : /*
3875 : * Select a partitioned table's default AM, once the table definition has
3876 : * been generated.
3877 : */
3878 51996 : if (te->relkind == RELKIND_PARTITIONED_TABLE)
3879 1000 : _printTableAccessMethodNoStorage(AH, te);
3880 :
3881 : /*
3882 : * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3883 : * commands, so we can no longer assume we know the current auth setting.
3884 : */
3885 51996 : if (_tocEntryIsACL(te))
3886 : {
3887 3764 : free(AH->currUser);
3888 3764 : AH->currUser = NULL;
3889 : }
3890 51996 : }
3891 :
3892 : /*
3893 : * Sanitize a string to be included in an SQL comment or TOC listing, by
3894 : * replacing any newlines with spaces. This ensures each logical output line
3895 : * is in fact one physical output line, to prevent corruption of the dump
3896 : * (which could, in the worst case, present an SQL injection vulnerability
3897 : * if someone were to incautiously load a dump containing objects with
3898 : * maliciously crafted names).
3899 : *
3900 : * The result is a freshly malloc'd string. If the input string is NULL,
3901 : * return a malloc'ed empty string, unless want_hyphen, in which case return a
3902 : * malloc'ed hyphen.
3903 : *
3904 : * Note that we currently don't bother to quote names, meaning that the name
3905 : * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3906 : * it only examines the dumpId field, but someday we might want to try harder.
3907 : */
3908 : static char *
3909 150306 : sanitize_line(const char *str, bool want_hyphen)
3910 : {
3911 : char *result;
3912 : char *s;
3913 :
3914 150306 : if (!str)
3915 3950 : return pg_strdup(want_hyphen ? "-" : "");
3916 :
3917 146356 : result = pg_strdup(str);
3918 :
3919 1850704 : for (s = result; *s != '\0'; s++)
3920 : {
3921 1704348 : if (*s == '\n' || *s == '\r')
3922 120 : *s = ' ';
3923 : }
3924 :
3925 146356 : return result;
3926 : }
3927 :
3928 : /*
3929 : * Write the file header for a custom-format archive
3930 : */
3931 : void
3932 62 : WriteHead(ArchiveHandle *AH)
3933 : {
3934 : struct tm crtm;
3935 :
3936 62 : AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3937 62 : AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3938 62 : AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3939 62 : AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3940 62 : AH->WriteBytePtr(AH, AH->intSize);
3941 62 : AH->WriteBytePtr(AH, AH->offSize);
3942 62 : AH->WriteBytePtr(AH, AH->format);
3943 62 : AH->WriteBytePtr(AH, AH->compression_spec.algorithm);
3944 62 : crtm = *localtime(&AH->createDate);
3945 62 : WriteInt(AH, crtm.tm_sec);
3946 62 : WriteInt(AH, crtm.tm_min);
3947 62 : WriteInt(AH, crtm.tm_hour);
3948 62 : WriteInt(AH, crtm.tm_mday);
3949 62 : WriteInt(AH, crtm.tm_mon);
3950 62 : WriteInt(AH, crtm.tm_year);
3951 62 : WriteInt(AH, crtm.tm_isdst);
3952 62 : WriteStr(AH, PQdb(AH->connection));
3953 62 : WriteStr(AH, AH->public.remoteVersionStr);
3954 62 : WriteStr(AH, PG_VERSION);
3955 62 : }
3956 :
3957 : void
3958 76 : ReadHead(ArchiveHandle *AH)
3959 : {
3960 : char *errmsg;
3961 : char vmaj,
3962 : vmin,
3963 : vrev;
3964 : int fmt;
3965 :
3966 : /*
3967 : * If we haven't already read the header, do so.
3968 : *
3969 : * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3970 : * way to unify the cases?
3971 : */
3972 76 : if (!AH->readHeader)
3973 : {
3974 : char tmpMag[7];
3975 :
3976 76 : AH->ReadBufPtr(AH, tmpMag, 5);
3977 :
3978 76 : if (strncmp(tmpMag, "PGDMP", 5) != 0)
3979 0 : pg_fatal("did not find magic string in file header");
3980 : }
3981 :
3982 76 : vmaj = AH->ReadBytePtr(AH);
3983 76 : vmin = AH->ReadBytePtr(AH);
3984 :
3985 76 : if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
3986 76 : vrev = AH->ReadBytePtr(AH);
3987 : else
3988 0 : vrev = 0;
3989 :
3990 76 : AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3991 :
3992 76 : if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3993 0 : pg_fatal("unsupported version (%d.%d) in file header",
3994 : vmaj, vmin);
3995 :
3996 76 : AH->intSize = AH->ReadBytePtr(AH);
3997 76 : if (AH->intSize > 32)
3998 0 : pg_fatal("sanity check on integer size (%lu) failed",
3999 : (unsigned long) AH->intSize);
4000 :
4001 76 : if (AH->intSize > sizeof(int))
4002 0 : pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4003 :
4004 76 : if (AH->version >= K_VERS_1_7)
4005 76 : AH->offSize = AH->ReadBytePtr(AH);
4006 : else
4007 0 : AH->offSize = AH->intSize;
4008 :
4009 76 : fmt = AH->ReadBytePtr(AH);
4010 :
4011 76 : if (AH->format != fmt)
4012 0 : pg_fatal("expected format (%d) differs from format found in file (%d)",
4013 : AH->format, fmt);
4014 :
4015 76 : if (AH->version >= K_VERS_1_15)
4016 76 : AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
4017 0 : else if (AH->version >= K_VERS_1_2)
4018 : {
4019 : /* Guess the compression method based on the level */
4020 0 : if (AH->version < K_VERS_1_4)
4021 0 : AH->compression_spec.level = AH->ReadBytePtr(AH);
4022 : else
4023 0 : AH->compression_spec.level = ReadInt(AH);
4024 :
4025 0 : if (AH->compression_spec.level != 0)
4026 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4027 : }
4028 : else
4029 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4030 :
4031 76 : errmsg = supports_compression(AH->compression_spec);
4032 76 : if (errmsg)
4033 : {
4034 0 : pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4035 : errmsg);
4036 0 : pg_free(errmsg);
4037 : }
4038 :
4039 76 : if (AH->version >= K_VERS_1_4)
4040 : {
4041 : struct tm crtm;
4042 :
4043 76 : crtm.tm_sec = ReadInt(AH);
4044 76 : crtm.tm_min = ReadInt(AH);
4045 76 : crtm.tm_hour = ReadInt(AH);
4046 76 : crtm.tm_mday = ReadInt(AH);
4047 76 : crtm.tm_mon = ReadInt(AH);
4048 76 : crtm.tm_year = ReadInt(AH);
4049 76 : crtm.tm_isdst = ReadInt(AH);
4050 :
4051 : /*
4052 : * Newer versions of glibc have mktime() report failure if tm_isdst is
4053 : * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4054 : * TZ=UTC. This is problematic when restoring an archive under a
4055 : * different timezone setting. If we get a failure, try again with
4056 : * tm_isdst set to -1 ("don't know").
4057 : *
4058 : * XXX with or without this hack, we reconstruct createDate
4059 : * incorrectly when the prevailing timezone is different from
4060 : * pg_dump's. Next time we bump the archive version, we should flush
4061 : * this representation and store a plain seconds-since-the-Epoch
4062 : * timestamp instead.
4063 : */
4064 76 : AH->createDate = mktime(&crtm);
4065 76 : if (AH->createDate == (time_t) -1)
4066 : {
4067 0 : crtm.tm_isdst = -1;
4068 0 : AH->createDate = mktime(&crtm);
4069 0 : if (AH->createDate == (time_t) -1)
4070 0 : pg_log_warning("invalid creation date in header");
4071 : }
4072 : }
4073 :
4074 76 : if (AH->version >= K_VERS_1_4)
4075 : {
4076 76 : AH->archdbname = ReadStr(AH);
4077 : }
4078 :
4079 76 : if (AH->version >= K_VERS_1_10)
4080 : {
4081 76 : AH->archiveRemoteVersion = ReadStr(AH);
4082 76 : AH->archiveDumpVersion = ReadStr(AH);
4083 : }
4084 76 : }
4085 :
4086 :
4087 : /*
4088 : * checkSeek
4089 : * check to see if ftell/fseek can be performed.
4090 : */
4091 : bool
4092 96 : checkSeek(FILE *fp)
4093 : {
4094 : pgoff_t tpos;
4095 :
4096 : /* Check that ftello works on this file */
4097 96 : tpos = ftello(fp);
4098 96 : if (tpos < 0)
4099 2 : return false;
4100 :
4101 : /*
4102 : * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4103 : * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4104 : * successful no-op even on files that are otherwise unseekable.
4105 : */
4106 94 : if (fseeko(fp, tpos, SEEK_SET) != 0)
4107 0 : return false;
4108 :
4109 94 : return true;
4110 : }
4111 :
4112 :
4113 : /*
4114 : * dumpTimestamp
4115 : */
4116 : static void
4117 84 : dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4118 : {
4119 : char buf[64];
4120 :
4121 84 : if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4122 84 : ahprintf(AH, "-- %s %s\n\n", msg, buf);
4123 84 : }
4124 :
4125 : /*
4126 : * Main engine for parallel restore.
4127 : *
4128 : * Parallel restore is done in three phases. In this first phase,
4129 : * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4130 : * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4131 : * PRE_DATA items other than ACLs.) Entries we can't process now are
4132 : * added to the pending_list for later phases to deal with.
4133 : */
4134 : static void
4135 8 : restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
4136 : {
4137 : bool skipped_some;
4138 : TocEntry *next_work_item;
4139 :
4140 8 : pg_log_debug("entering restore_toc_entries_prefork");
4141 :
4142 : /* Adjust dependency information */
4143 8 : fix_dependencies(AH);
4144 :
4145 : /*
4146 : * Do all the early stuff in a single connection in the parent. There's no
4147 : * great point in running it in parallel, in fact it will actually run
4148 : * faster in a single connection because we avoid all the connection and
4149 : * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4150 : * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4151 : * not risk trying to process them out-of-order.
4152 : *
4153 : * Stuff that we can't do immediately gets added to the pending_list.
4154 : * Note: we don't yet filter out entries that aren't going to be restored.
4155 : * They might participate in dependency chains connecting entries that
4156 : * should be restored, so we treat them as live until we actually process
4157 : * them.
4158 : *
4159 : * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4160 : * before DATA items, and all DATA items before POST_DATA items. That is
4161 : * not certain to be true in older archives, though, and in any case use
4162 : * of a list file would destroy that ordering (cf. SortTocFromFile). So
4163 : * this loop cannot assume that it holds.
4164 : */
4165 8 : AH->restorePass = RESTORE_PASS_MAIN;
4166 8 : skipped_some = false;
4167 200 : for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4168 : {
4169 192 : bool do_now = true;
4170 :
4171 192 : if (next_work_item->section != SECTION_PRE_DATA)
4172 : {
4173 : /* DATA and POST_DATA items are just ignored for now */
4174 92 : if (next_work_item->section == SECTION_DATA ||
4175 60 : next_work_item->section == SECTION_POST_DATA)
4176 : {
4177 92 : do_now = false;
4178 92 : skipped_some = true;
4179 : }
4180 : else
4181 : {
4182 : /*
4183 : * SECTION_NONE items, such as comments, can be processed now
4184 : * if we are still in the PRE_DATA part of the archive. Once
4185 : * we've skipped any items, we have to consider whether the
4186 : * comment's dependencies are satisfied, so skip it for now.
4187 : */
4188 0 : if (skipped_some)
4189 0 : do_now = false;
4190 : }
4191 : }
4192 :
4193 : /*
4194 : * Also skip items that need to be forced into later passes. We need
4195 : * not set skipped_some in this case, since by assumption no main-pass
4196 : * items could depend on these.
4197 : */
4198 192 : if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4199 0 : do_now = false;
4200 :
4201 192 : if (do_now)
4202 : {
4203 : /* OK, restore the item and update its dependencies */
4204 100 : pg_log_info("processing item %d %s %s",
4205 : next_work_item->dumpId,
4206 : next_work_item->desc, next_work_item->tag);
4207 :
4208 100 : (void) restore_toc_entry(AH, next_work_item, false);
4209 :
4210 : /* Reduce dependencies, but don't move anything to ready_heap */
4211 100 : reduce_dependencies(AH, next_work_item, NULL);
4212 : }
4213 : else
4214 : {
4215 : /* Nope, so add it to pending_list */
4216 92 : pending_list_append(pending_list, next_work_item);
4217 : }
4218 : }
4219 :
4220 : /*
4221 : * In --transaction-size mode, we must commit the open transaction before
4222 : * dropping the database connection. This also ensures that child workers
4223 : * can see the objects we've created so far.
4224 : */
4225 8 : if (AH->public.ropt->txn_size > 0)
4226 0 : CommitTransaction(&AH->public);
4227 :
4228 : /*
4229 : * Now close parent connection in prep for parallel steps. We do this
4230 : * mainly to ensure that we don't exceed the specified number of parallel
4231 : * connections.
4232 : */
4233 8 : DisconnectDatabase(&AH->public);
4234 :
4235 : /* blow away any transient state from the old connection */
4236 8 : free(AH->currUser);
4237 8 : AH->currUser = NULL;
4238 8 : free(AH->currSchema);
4239 8 : AH->currSchema = NULL;
4240 8 : free(AH->currTablespace);
4241 8 : AH->currTablespace = NULL;
4242 8 : free(AH->currTableAm);
4243 8 : AH->currTableAm = NULL;
4244 8 : }
4245 :
4246 : /*
4247 : * Main engine for parallel restore.
4248 : *
4249 : * Parallel restore is done in three phases. In this second phase,
4250 : * we process entries by dispatching them to parallel worker children
4251 : * (processes on Unix, threads on Windows), each of which connects
4252 : * separately to the database. Inter-entry dependencies are respected,
4253 : * and so is the RestorePass multi-pass structure. When we can no longer
4254 : * make any entries ready to process, we exit. Normally, there will be
4255 : * nothing left to do; but if there is, the third phase will mop up.
4256 : */
4257 : static void
4258 8 : restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4259 : TocEntry *pending_list)
4260 : {
4261 : binaryheap *ready_heap;
4262 : TocEntry *next_work_item;
4263 :
4264 8 : pg_log_debug("entering restore_toc_entries_parallel");
4265 :
4266 : /* Set up ready_heap with enough room for all known TocEntrys */
4267 8 : ready_heap = binaryheap_allocate(AH->tocCount,
4268 : TocEntrySizeCompareBinaryheap,
4269 : NULL);
4270 :
4271 : /*
4272 : * The pending_list contains all items that we need to restore. Move all
4273 : * items that are available to process immediately into the ready_heap.
4274 : * After this setup, the pending list is everything that needs to be done
4275 : * but is blocked by one or more dependencies, while the ready heap
4276 : * contains items that have no remaining dependencies and are OK to
4277 : * process in the current restore pass.
4278 : */
4279 8 : AH->restorePass = RESTORE_PASS_MAIN;
4280 8 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4281 :
4282 : /*
4283 : * main parent loop
4284 : *
4285 : * Keep going until there is no worker still running AND there is no work
4286 : * left to be done. Note invariant: at top of loop, there should always
4287 : * be at least one worker available to dispatch a job to.
4288 : */
4289 8 : pg_log_info("entering main parallel loop");
4290 :
4291 : for (;;)
4292 : {
4293 : /* Look for an item ready to be dispatched to a worker */
4294 138 : next_work_item = pop_next_work_item(ready_heap, pstate);
4295 138 : if (next_work_item != NULL)
4296 : {
4297 : /* If not to be restored, don't waste time launching a worker */
4298 92 : if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4299 : {
4300 0 : pg_log_info("skipping item %d %s %s",
4301 : next_work_item->dumpId,
4302 : next_work_item->desc, next_work_item->tag);
4303 : /* Update its dependencies as though we'd completed it */
4304 0 : reduce_dependencies(AH, next_work_item, ready_heap);
4305 : /* Loop around to see if anything else can be dispatched */
4306 0 : continue;
4307 : }
4308 :
4309 92 : pg_log_info("launching item %d %s %s",
4310 : next_work_item->dumpId,
4311 : next_work_item->desc, next_work_item->tag);
4312 :
4313 : /* Dispatch to some worker */
4314 92 : DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4315 : mark_restore_job_done, ready_heap);
4316 : }
4317 46 : else if (IsEveryWorkerIdle(pstate))
4318 : {
4319 : /*
4320 : * Nothing is ready and no worker is running, so we're done with
4321 : * the current pass or maybe with the whole process.
4322 : */
4323 24 : if (AH->restorePass == RESTORE_PASS_LAST)
4324 8 : break; /* No more parallel processing is possible */
4325 :
4326 : /* Advance to next restore pass */
4327 16 : AH->restorePass++;
4328 : /* That probably allows some stuff to be made ready */
4329 16 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4330 : /* Loop around to see if anything's now ready */
4331 16 : continue;
4332 : }
4333 : else
4334 : {
4335 : /*
4336 : * We have nothing ready, but at least one child is working, so
4337 : * wait for some subjob to finish.
4338 : */
4339 : }
4340 :
4341 : /*
4342 : * Before dispatching another job, check to see if anything has
4343 : * finished. We should check every time through the loop so as to
4344 : * reduce dependencies as soon as possible. If we were unable to
4345 : * dispatch any job this time through, wait until some worker finishes
4346 : * (and, hopefully, unblocks some pending item). If we did dispatch
4347 : * something, continue as soon as there's at least one idle worker.
4348 : * Note that in either case, there's guaranteed to be at least one
4349 : * idle worker when we return to the top of the loop. This ensures we
4350 : * won't block inside DispatchJobForTocEntry, which would be
4351 : * undesirable: we'd rather postpone dispatching until we see what's
4352 : * been unblocked by finished jobs.
4353 : */
4354 114 : WaitForWorkers(AH, pstate,
4355 : next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4356 : }
4357 :
4358 : /* There should now be nothing in ready_heap. */
4359 : Assert(binaryheap_empty(ready_heap));
4360 :
4361 8 : binaryheap_free(ready_heap);
4362 :
4363 8 : pg_log_info("finished main parallel loop");
4364 8 : }
4365 :
4366 : /*
4367 : * Main engine for parallel restore.
4368 : *
4369 : * Parallel restore is done in three phases. In this third phase,
4370 : * we mop up any remaining TOC entries by processing them serially.
4371 : * This phase normally should have nothing to do, but if we've somehow
4372 : * gotten stuck due to circular dependencies or some such, this provides
4373 : * at least some chance of completing the restore successfully.
4374 : */
4375 : static void
4376 8 : restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4377 : {
4378 8 : RestoreOptions *ropt = AH->public.ropt;
4379 : TocEntry *te;
4380 :
4381 8 : pg_log_debug("entering restore_toc_entries_postfork");
4382 :
4383 : /*
4384 : * Now reconnect the single parent connection.
4385 : */
4386 8 : ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4387 :
4388 : /* re-establish fixed state */
4389 8 : _doSetFixedOutputState(AH);
4390 :
4391 : /*
4392 : * Make sure there is no work left due to, say, circular dependencies, or
4393 : * some other pathological condition. If so, do it in the single parent
4394 : * connection. We don't sweat about RestorePass ordering; it's likely we
4395 : * already violated that.
4396 : */
4397 8 : for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4398 : {
4399 0 : pg_log_info("processing missed item %d %s %s",
4400 : te->dumpId, te->desc, te->tag);
4401 0 : (void) restore_toc_entry(AH, te, false);
4402 : }
4403 8 : }
4404 :
4405 : /*
4406 : * Check if te1 has an exclusive lock requirement for an item that te2 also
4407 : * requires, whether or not te2's requirement is for an exclusive lock.
4408 : */
4409 : static bool
4410 298 : has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4411 : {
4412 : int j,
4413 : k;
4414 :
4415 698 : for (j = 0; j < te1->nLockDeps; j++)
4416 : {
4417 1718 : for (k = 0; k < te2->nDeps; k++)
4418 : {
4419 1318 : if (te1->lockDeps[j] == te2->dependencies[k])
4420 6 : return true;
4421 : }
4422 : }
4423 292 : return false;
4424 : }
4425 :
4426 :
4427 : /*
4428 : * Initialize the header of the pending-items list.
4429 : *
4430 : * This is a circular list with a dummy TocEntry as header, just like the
4431 : * main TOC list; but we use separate list links so that an entry can be in
4432 : * the main TOC list as well as in the pending list.
4433 : */
4434 : static void
4435 8 : pending_list_header_init(TocEntry *l)
4436 : {
4437 8 : l->pending_prev = l->pending_next = l;
4438 8 : }
4439 :
4440 : /* Append te to the end of the pending-list headed by l */
4441 : static void
4442 92 : pending_list_append(TocEntry *l, TocEntry *te)
4443 : {
4444 92 : te->pending_prev = l->pending_prev;
4445 92 : l->pending_prev->pending_next = te;
4446 92 : l->pending_prev = te;
4447 92 : te->pending_next = l;
4448 92 : }
4449 :
4450 : /* Remove te from the pending-list */
4451 : static void
4452 92 : pending_list_remove(TocEntry *te)
4453 : {
4454 92 : te->pending_prev->pending_next = te->pending_next;
4455 92 : te->pending_next->pending_prev = te->pending_prev;
4456 92 : te->pending_prev = NULL;
4457 92 : te->pending_next = NULL;
4458 92 : }
4459 :
4460 :
4461 : /* qsort comparator for sorting TocEntries by dataLength */
4462 : static int
4463 1272 : TocEntrySizeCompareQsort(const void *p1, const void *p2)
4464 : {
4465 1272 : const TocEntry *te1 = *(const TocEntry *const *) p1;
4466 1272 : const TocEntry *te2 = *(const TocEntry *const *) p2;
4467 :
4468 : /* Sort by decreasing dataLength */
4469 1272 : if (te1->dataLength > te2->dataLength)
4470 106 : return -1;
4471 1166 : if (te1->dataLength < te2->dataLength)
4472 224 : return 1;
4473 :
4474 : /* For equal dataLengths, sort by dumpId, just to be stable */
4475 942 : if (te1->dumpId < te2->dumpId)
4476 390 : return -1;
4477 552 : if (te1->dumpId > te2->dumpId)
4478 526 : return 1;
4479 :
4480 26 : return 0;
4481 : }
4482 :
4483 : /* binaryheap comparator for sorting TocEntries by dataLength */
4484 : static int
4485 326 : TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4486 : {
4487 : /* return opposite of qsort comparator for max-heap */
4488 326 : return -TocEntrySizeCompareQsort(&p1, &p2);
4489 : }
4490 :
4491 :
4492 : /*
4493 : * Move all immediately-ready items from pending_list to ready_heap.
4494 : *
4495 : * Items are considered ready if they have no remaining dependencies and
4496 : * they belong in the current restore pass. (See also reduce_dependencies,
4497 : * which applies the same logic one-at-a-time.)
4498 : */
4499 : static void
4500 24 : move_to_ready_heap(TocEntry *pending_list,
4501 : binaryheap *ready_heap,
4502 : RestorePass pass)
4503 : {
4504 : TocEntry *te;
4505 : TocEntry *next_te;
4506 :
4507 116 : for (te = pending_list->pending_next; te != pending_list; te = next_te)
4508 : {
4509 : /* must save list link before possibly removing te from list */
4510 92 : next_te = te->pending_next;
4511 :
4512 132 : if (te->depCount == 0 &&
4513 40 : _tocEntryRestorePass(te) == pass)
4514 : {
4515 : /* Remove it from pending_list ... */
4516 40 : pending_list_remove(te);
4517 : /* ... and add to ready_heap */
4518 40 : binaryheap_add(ready_heap, te);
4519 : }
4520 : }
4521 24 : }
4522 :
4523 : /*
4524 : * Find the next work item (if any) that is capable of being run now,
4525 : * and remove it from the ready_heap.
4526 : *
4527 : * Returns the item, or NULL if nothing is runnable.
4528 : *
4529 : * To qualify, the item must have no remaining dependencies
4530 : * and no requirements for locks that are incompatible with
4531 : * items currently running. Items in the ready_heap are known to have
4532 : * no remaining dependencies, but we have to check for lock conflicts.
4533 : */
4534 : static TocEntry *
4535 138 : pop_next_work_item(binaryheap *ready_heap,
4536 : ParallelState *pstate)
4537 : {
4538 : /*
4539 : * Search the ready_heap until we find a suitable item. Note that we do a
4540 : * sequential scan through the heap nodes, so even though we will first
4541 : * try to choose the highest-priority item, we might end up picking
4542 : * something with a much lower priority. However, we expect that we will
4543 : * typically be able to pick one of the first few items, which should
4544 : * usually have a relatively high priority.
4545 : */
4546 144 : for (int i = 0; i < binaryheap_size(ready_heap); i++)
4547 : {
4548 98 : TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4549 98 : bool conflicts = false;
4550 :
4551 : /*
4552 : * Check to see if the item would need exclusive lock on something
4553 : * that a currently running item also needs lock on, or vice versa. If
4554 : * so, we don't want to schedule them together.
4555 : */
4556 376 : for (int k = 0; k < pstate->numWorkers; k++)
4557 : {
4558 284 : TocEntry *running_te = pstate->te[k];
4559 :
4560 284 : if (running_te == NULL)
4561 132 : continue;
4562 298 : if (has_lock_conflicts(te, running_te) ||
4563 146 : has_lock_conflicts(running_te, te))
4564 : {
4565 6 : conflicts = true;
4566 6 : break;
4567 : }
4568 : }
4569 :
4570 98 : if (conflicts)
4571 6 : continue;
4572 :
4573 : /* passed all tests, so this item can run */
4574 92 : binaryheap_remove_node(ready_heap, i);
4575 92 : return te;
4576 : }
4577 :
4578 46 : pg_log_debug("no item ready");
4579 46 : return NULL;
4580 : }
4581 :
4582 :
4583 : /*
4584 : * Restore a single TOC item in parallel with others
4585 : *
4586 : * this is run in the worker, i.e. in a thread (Windows) or a separate process
4587 : * (everything else). A worker process executes several such work items during
4588 : * a parallel backup or restore. Once we terminate here and report back that
4589 : * our work is finished, the leader process will assign us a new work item.
4590 : */
4591 : int
4592 92 : parallel_restore(ArchiveHandle *AH, TocEntry *te)
4593 : {
4594 : int status;
4595 :
4596 : Assert(AH->connection != NULL);
4597 :
4598 : /* Count only errors associated with this TOC entry */
4599 92 : AH->public.n_errors = 0;
4600 :
4601 : /* Restore the TOC item */
4602 92 : status = restore_toc_entry(AH, te, true);
4603 :
4604 92 : return status;
4605 : }
4606 :
4607 :
4608 : /*
4609 : * Callback function that's invoked in the leader process after a step has
4610 : * been parallel restored.
4611 : *
4612 : * Update status and reduce the dependency count of any dependent items.
4613 : */
4614 : static void
4615 92 : mark_restore_job_done(ArchiveHandle *AH,
4616 : TocEntry *te,
4617 : int status,
4618 : void *callback_data)
4619 : {
4620 92 : binaryheap *ready_heap = (binaryheap *) callback_data;
4621 :
4622 92 : pg_log_info("finished item %d %s %s",
4623 : te->dumpId, te->desc, te->tag);
4624 :
4625 92 : if (status == WORKER_CREATE_DONE)
4626 0 : mark_create_done(AH, te);
4627 92 : else if (status == WORKER_INHIBIT_DATA)
4628 : {
4629 0 : inhibit_data_for_failed_table(AH, te);
4630 0 : AH->public.n_errors++;
4631 : }
4632 92 : else if (status == WORKER_IGNORED_ERRORS)
4633 0 : AH->public.n_errors++;
4634 92 : else if (status != 0)
4635 0 : pg_fatal("worker process failed: exit code %d",
4636 : status);
4637 :
4638 92 : reduce_dependencies(AH, te, ready_heap);
4639 92 : }
4640 :
4641 :
4642 : /*
4643 : * Process the dependency information into a form useful for parallel restore.
4644 : *
4645 : * This function takes care of fixing up some missing or badly designed
4646 : * dependencies, and then prepares subsidiary data structures that will be
4647 : * used in the main parallel-restore logic, including:
4648 : * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4649 : * 2. We set up depCount fields that are the number of as-yet-unprocessed
4650 : * dependencies for each TOC entry.
4651 : *
4652 : * We also identify locking dependencies so that we can avoid trying to
4653 : * schedule conflicting items at the same time.
4654 : */
4655 : static void
4656 8 : fix_dependencies(ArchiveHandle *AH)
4657 : {
4658 : TocEntry *te;
4659 : int i;
4660 :
4661 : /*
4662 : * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4663 : * items are marked as not being in any parallel-processing list.
4664 : */
4665 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4666 : {
4667 192 : te->depCount = te->nDeps;
4668 192 : te->revDeps = NULL;
4669 192 : te->nRevDeps = 0;
4670 192 : te->pending_prev = NULL;
4671 192 : te->pending_next = NULL;
4672 : }
4673 :
4674 : /*
4675 : * POST_DATA items that are shown as depending on a table need to be
4676 : * re-pointed to depend on that table's data, instead. This ensures they
4677 : * won't get scheduled until the data has been loaded.
4678 : */
4679 8 : repoint_table_dependencies(AH);
4680 :
4681 : /*
4682 : * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4683 : * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4684 : * one BLOB COMMENTS in such files.)
4685 : */
4686 8 : if (AH->version < K_VERS_1_11)
4687 : {
4688 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4689 : {
4690 0 : if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4691 : {
4692 : TocEntry *te2;
4693 :
4694 0 : for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4695 : {
4696 0 : if (strcmp(te2->desc, "BLOBS") == 0)
4697 : {
4698 0 : te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4699 0 : te->dependencies[0] = te2->dumpId;
4700 0 : te->nDeps++;
4701 0 : te->depCount++;
4702 0 : break;
4703 : }
4704 : }
4705 0 : break;
4706 : }
4707 : }
4708 : }
4709 :
4710 : /*
4711 : * At this point we start to build the revDeps reverse-dependency arrays,
4712 : * so all changes of dependencies must be complete.
4713 : */
4714 :
4715 : /*
4716 : * Count the incoming dependencies for each item. Also, it is possible
4717 : * that the dependencies list items that are not in the archive at all
4718 : * (that should not happen in 9.2 and later, but is highly likely in older
4719 : * archives). Subtract such items from the depCounts.
4720 : */
4721 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4722 : {
4723 576 : for (i = 0; i < te->nDeps; i++)
4724 : {
4725 384 : DumpId depid = te->dependencies[i];
4726 :
4727 384 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4728 384 : AH->tocsByDumpId[depid]->nRevDeps++;
4729 : else
4730 0 : te->depCount--;
4731 : }
4732 : }
4733 :
4734 : /*
4735 : * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4736 : * it as a counter below.
4737 : */
4738 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4739 : {
4740 192 : if (te->nRevDeps > 0)
4741 104 : te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4742 192 : te->nRevDeps = 0;
4743 : }
4744 :
4745 : /*
4746 : * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4747 : * better agree with the loops above.
4748 : */
4749 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4750 : {
4751 576 : for (i = 0; i < te->nDeps; i++)
4752 : {
4753 384 : DumpId depid = te->dependencies[i];
4754 :
4755 384 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4756 : {
4757 384 : TocEntry *otherte = AH->tocsByDumpId[depid];
4758 :
4759 384 : otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4760 : }
4761 : }
4762 : }
4763 :
4764 : /*
4765 : * Lastly, work out the locking dependencies.
4766 : */
4767 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4768 : {
4769 192 : te->lockDeps = NULL;
4770 192 : te->nLockDeps = 0;
4771 192 : identify_locking_dependencies(AH, te);
4772 : }
4773 8 : }
4774 :
4775 : /*
4776 : * Change dependencies on table items to depend on table data items instead,
4777 : * but only in POST_DATA items.
4778 : *
4779 : * Also, for any item having such dependency(s), set its dataLength to the
4780 : * largest dataLength of the table data items it depends on. This ensures
4781 : * that parallel restore will prioritize larger jobs (index builds, FK
4782 : * constraint checks, etc) over smaller ones, avoiding situations where we
4783 : * end a restore with only one active job working on a large table.
4784 : */
4785 : static void
4786 8 : repoint_table_dependencies(ArchiveHandle *AH)
4787 : {
4788 : TocEntry *te;
4789 : int i;
4790 : DumpId olddep;
4791 :
4792 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4793 : {
4794 192 : if (te->section != SECTION_POST_DATA)
4795 132 : continue;
4796 320 : for (i = 0; i < te->nDeps; i++)
4797 : {
4798 260 : olddep = te->dependencies[i];
4799 260 : if (olddep <= AH->maxDumpId &&
4800 260 : AH->tableDataId[olddep] != 0)
4801 : {
4802 124 : DumpId tabledataid = AH->tableDataId[olddep];
4803 124 : TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4804 :
4805 124 : te->dependencies[i] = tabledataid;
4806 124 : te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4807 124 : pg_log_debug("transferring dependency %d -> %d to %d",
4808 : te->dumpId, olddep, tabledataid);
4809 : }
4810 : }
4811 : }
4812 8 : }
4813 :
4814 : /*
4815 : * Identify which objects we'll need exclusive lock on in order to restore
4816 : * the given TOC entry (*other* than the one identified by the TOC entry
4817 : * itself). Record their dump IDs in the entry's lockDeps[] array.
4818 : */
4819 : static void
4820 192 : identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4821 : {
4822 : DumpId *lockids;
4823 : int nlockids;
4824 : int i;
4825 :
4826 : /*
4827 : * We only care about this for POST_DATA items. PRE_DATA items are not
4828 : * run in parallel, and DATA items are all independent by assumption.
4829 : */
4830 192 : if (te->section != SECTION_POST_DATA)
4831 132 : return;
4832 :
4833 : /* Quick exit if no dependencies at all */
4834 60 : if (te->nDeps == 0)
4835 0 : return;
4836 :
4837 : /*
4838 : * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4839 : * and hence require exclusive lock. However, we know that CREATE INDEX
4840 : * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4841 : * category too ... but today is not that day.)
4842 : */
4843 60 : if (strcmp(te->desc, "INDEX") == 0)
4844 0 : return;
4845 :
4846 : /*
4847 : * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4848 : * item listed among its dependencies. Originally all of these would have
4849 : * been TABLE items, but repoint_table_dependencies would have repointed
4850 : * them to the TABLE DATA items if those are present (which they might not
4851 : * be, eg in a schema-only dump). Note that all of the entries we are
4852 : * processing here are POST_DATA; otherwise there might be a significant
4853 : * difference between a dependency on a table and a dependency on its
4854 : * data, so that closer analysis would be needed here.
4855 : */
4856 60 : lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4857 60 : nlockids = 0;
4858 320 : for (i = 0; i < te->nDeps; i++)
4859 : {
4860 260 : DumpId depid = te->dependencies[i];
4861 :
4862 260 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4863 260 : ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4864 136 : strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4865 164 : lockids[nlockids++] = depid;
4866 : }
4867 :
4868 60 : if (nlockids == 0)
4869 : {
4870 0 : free(lockids);
4871 0 : return;
4872 : }
4873 :
4874 60 : te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4875 60 : te->nLockDeps = nlockids;
4876 : }
4877 :
4878 : /*
4879 : * Remove the specified TOC entry from the depCounts of items that depend on
4880 : * it, thereby possibly making them ready-to-run. Any pending item that
4881 : * becomes ready should be moved to the ready_heap, if that's provided.
4882 : */
4883 : static void
4884 192 : reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4885 : binaryheap *ready_heap)
4886 : {
4887 : int i;
4888 :
4889 192 : pg_log_debug("reducing dependencies for %d", te->dumpId);
4890 :
4891 576 : for (i = 0; i < te->nRevDeps; i++)
4892 : {
4893 384 : TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4894 :
4895 : Assert(otherte->depCount > 0);
4896 384 : otherte->depCount--;
4897 :
4898 : /*
4899 : * It's ready if it has no remaining dependencies, and it belongs in
4900 : * the current restore pass, and it is currently a member of the
4901 : * pending list (that check is needed to prevent double restore in
4902 : * some cases where a list-file forces out-of-order restoring).
4903 : * However, if ready_heap == NULL then caller doesn't want any list
4904 : * memberships changed.
4905 : */
4906 384 : if (otherte->depCount == 0 &&
4907 148 : _tocEntryRestorePass(otherte) == AH->restorePass &&
4908 148 : otherte->pending_prev != NULL &&
4909 : ready_heap != NULL)
4910 : {
4911 : /* Remove it from pending list ... */
4912 52 : pending_list_remove(otherte);
4913 : /* ... and add to ready_heap */
4914 52 : binaryheap_add(ready_heap, otherte);
4915 : }
4916 : }
4917 192 : }
4918 :
4919 : /*
4920 : * Set the created flag on the DATA member corresponding to the given
4921 : * TABLE member
4922 : */
4923 : static void
4924 9074 : mark_create_done(ArchiveHandle *AH, TocEntry *te)
4925 : {
4926 9074 : if (AH->tableDataId[te->dumpId] != 0)
4927 : {
4928 6762 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4929 :
4930 6762 : ted->created = true;
4931 : }
4932 9074 : }
4933 :
4934 : /*
4935 : * Mark the DATA member corresponding to the given TABLE member
4936 : * as not wanted
4937 : */
4938 : static void
4939 0 : inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4940 : {
4941 0 : pg_log_info("table \"%s\" could not be created, will not restore its data",
4942 : te->tag);
4943 :
4944 0 : if (AH->tableDataId[te->dumpId] != 0)
4945 : {
4946 0 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4947 :
4948 0 : ted->reqs = 0;
4949 : }
4950 0 : }
4951 :
4952 : /*
4953 : * Clone and de-clone routines used in parallel restoration.
4954 : *
4955 : * Enough of the structure is cloned to ensure that there is no
4956 : * conflict between different threads each with their own clone.
4957 : */
4958 : ArchiveHandle *
4959 52 : CloneArchive(ArchiveHandle *AH)
4960 : {
4961 : ArchiveHandle *clone;
4962 :
4963 : /* Make a "flat" copy */
4964 52 : clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4965 52 : memcpy(clone, AH, sizeof(ArchiveHandle));
4966 :
4967 : /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
4968 52 : clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
4969 52 : memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
4970 :
4971 : /* Handle format-independent fields */
4972 52 : memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4973 :
4974 : /* The clone will have its own connection, so disregard connection state */
4975 52 : clone->connection = NULL;
4976 52 : clone->connCancel = NULL;
4977 52 : clone->currUser = NULL;
4978 52 : clone->currSchema = NULL;
4979 52 : clone->currTableAm = NULL;
4980 52 : clone->currTablespace = NULL;
4981 :
4982 : /* savedPassword must be local in case we change it while connecting */
4983 52 : if (clone->savedPassword)
4984 0 : clone->savedPassword = pg_strdup(clone->savedPassword);
4985 :
4986 : /* clone has its own error count, too */
4987 52 : clone->public.n_errors = 0;
4988 :
4989 : /* clones should not share lo_buf */
4990 52 : clone->lo_buf = NULL;
4991 :
4992 : /*
4993 : * Clone connections disregard --transaction-size; they must commit after
4994 : * each command so that the results are immediately visible to other
4995 : * workers.
4996 : */
4997 52 : clone->public.ropt->txn_size = 0;
4998 :
4999 : /*
5000 : * Connect our new clone object to the database, using the same connection
5001 : * parameters used for the original connection.
5002 : */
5003 52 : ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
5004 :
5005 : /* re-establish fixed state */
5006 52 : if (AH->mode == archModeRead)
5007 20 : _doSetFixedOutputState(clone);
5008 : /* in write case, setupDumpWorker will fix up connection state */
5009 :
5010 : /* Let the format-specific code have a chance too */
5011 52 : clone->ClonePtr(clone);
5012 :
5013 : Assert(clone->connection != NULL);
5014 52 : return clone;
5015 : }
5016 :
5017 : /*
5018 : * Release clone-local storage.
5019 : *
5020 : * Note: we assume any clone-local connection was already closed.
5021 : */
5022 : void
5023 52 : DeCloneArchive(ArchiveHandle *AH)
5024 : {
5025 : /* Should not have an open database connection */
5026 : Assert(AH->connection == NULL);
5027 :
5028 : /* Clear format-specific state */
5029 52 : AH->DeClonePtr(AH);
5030 :
5031 : /* Clear state allocated by CloneArchive */
5032 52 : if (AH->sqlparse.curCmd)
5033 6 : destroyPQExpBuffer(AH->sqlparse.curCmd);
5034 :
5035 : /* Clear any connection-local state */
5036 52 : free(AH->currUser);
5037 52 : free(AH->currSchema);
5038 52 : free(AH->currTablespace);
5039 52 : free(AH->currTableAm);
5040 52 : free(AH->savedPassword);
5041 :
5042 52 : free(AH);
5043 52 : }
|