Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_archiver.c
4 : *
5 : * Private implementation of the archiver routines.
6 : *
7 : * See the headers to pg_restore for more details.
8 : *
9 : * Copyright (c) 2000, Philip Warner
10 : * Rights are granted to use this software in any way so long
11 : * as this notice is not removed.
12 : *
13 : * The author is not responsible for loss or damages that may
14 : * result from its use.
15 : *
16 : *
17 : * IDENTIFICATION
18 : * src/bin/pg_dump/pg_backup_archiver.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres_fe.h"
23 :
24 : #include <ctype.h>
25 : #include <fcntl.h>
26 : #include <unistd.h>
27 : #include <sys/stat.h>
28 : #include <sys/wait.h>
29 : #ifdef WIN32
30 : #include <io.h>
31 : #endif
32 :
33 : #include "catalog/pg_class_d.h"
34 : #include "common/string.h"
35 : #include "compress_io.h"
36 : #include "dumputils.h"
37 : #include "fe_utils/string_utils.h"
38 : #include "lib/binaryheap.h"
39 : #include "lib/stringinfo.h"
40 : #include "libpq/libpq-fs.h"
41 : #include "parallel.h"
42 : #include "pg_backup_archiver.h"
43 : #include "pg_backup_db.h"
44 : #include "pg_backup_utils.h"
45 :
46 : #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
47 : #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
48 :
49 :
50 : static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
51 : const pg_compress_specification compression_spec,
52 : bool dosync, ArchiveMode mode,
53 : SetupWorkerPtrType setupWorkerPtr,
54 : DataDirSyncMethod sync_method);
55 : static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
56 : static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
57 : static char *sanitize_line(const char *str, bool want_hyphen);
58 : static void _doSetFixedOutputState(ArchiveHandle *AH);
59 : static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
60 : static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
61 : static void _becomeUser(ArchiveHandle *AH, const char *user);
62 : static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
63 : static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
64 : static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
65 : static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
66 : static void _printTableAccessMethodNoStorage(ArchiveHandle *AH,
67 : TocEntry *te);
68 : static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
69 : static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
70 : static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
71 : static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
72 : static RestorePass _tocEntryRestorePass(TocEntry *te);
73 : static bool _tocEntryIsACL(TocEntry *te);
74 : static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
75 : static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
76 : static bool is_load_via_partition_root(TocEntry *te);
77 : static void buildTocEntryArrays(ArchiveHandle *AH);
78 : static void _moveBefore(TocEntry *pos, TocEntry *te);
79 : static int _discoverArchiveFormat(ArchiveHandle *AH);
80 :
81 : static int RestoringToDB(ArchiveHandle *AH);
82 : static void dump_lo_buf(ArchiveHandle *AH);
83 : static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
84 : static void SetOutput(ArchiveHandle *AH, const char *filename,
85 : const pg_compress_specification compression_spec);
86 : static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
87 : static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
88 :
89 : static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
90 : static void restore_toc_entries_prefork(ArchiveHandle *AH,
91 : TocEntry *pending_list);
92 : static void restore_toc_entries_parallel(ArchiveHandle *AH,
93 : ParallelState *pstate,
94 : TocEntry *pending_list);
95 : static void restore_toc_entries_postfork(ArchiveHandle *AH,
96 : TocEntry *pending_list);
97 : static void pending_list_header_init(TocEntry *l);
98 : static void pending_list_append(TocEntry *l, TocEntry *te);
99 : static void pending_list_remove(TocEntry *te);
100 : static int TocEntrySizeCompareQsort(const void *p1, const void *p2);
101 : static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg);
102 : static void move_to_ready_heap(TocEntry *pending_list,
103 : binaryheap *ready_heap,
104 : RestorePass pass);
105 : static TocEntry *pop_next_work_item(binaryheap *ready_heap,
106 : ParallelState *pstate);
107 : static void mark_dump_job_done(ArchiveHandle *AH,
108 : TocEntry *te,
109 : int status,
110 : void *callback_data);
111 : static void mark_restore_job_done(ArchiveHandle *AH,
112 : TocEntry *te,
113 : int status,
114 : void *callback_data);
115 : static void fix_dependencies(ArchiveHandle *AH);
116 : static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
117 : static void repoint_table_dependencies(ArchiveHandle *AH);
118 : static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
119 : static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
120 : binaryheap *ready_heap);
121 : static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
122 : static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
123 :
124 : static void StrictNamesCheck(RestoreOptions *ropt);
125 :
126 :
127 : /*
128 : * Allocate a new DumpOptions block containing all default values.
129 : */
130 : DumpOptions *
131 80 : NewDumpOptions(void)
132 : {
133 80 : DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
134 :
135 80 : InitDumpOptions(opts);
136 80 : return opts;
137 : }
138 :
139 : /*
140 : * Initialize a DumpOptions struct to all default values
141 : */
142 : void
143 470 : InitDumpOptions(DumpOptions *opts)
144 : {
145 470 : memset(opts, 0, sizeof(DumpOptions));
146 : /* set any fields that shouldn't default to zeroes */
147 470 : opts->include_everything = true;
148 470 : opts->cparams.promptPassword = TRI_DEFAULT;
149 470 : opts->dumpSections = DUMP_UNSECTIONED;
150 470 : }
151 :
152 : /*
153 : * Create a freshly allocated DumpOptions with options equivalent to those
154 : * found in the given RestoreOptions.
155 : */
156 : DumpOptions *
157 80 : dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
158 : {
159 80 : DumpOptions *dopt = NewDumpOptions();
160 :
161 : /* this is the inverse of what's at the end of pg_dump.c's main() */
162 80 : dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
163 80 : dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
164 80 : dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
165 80 : dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
166 80 : dopt->cparams.promptPassword = ropt->cparams.promptPassword;
167 80 : dopt->outputClean = ropt->dropSchema;
168 80 : dopt->dataOnly = ropt->dataOnly;
169 80 : dopt->schemaOnly = ropt->schemaOnly;
170 80 : dopt->if_exists = ropt->if_exists;
171 80 : dopt->column_inserts = ropt->column_inserts;
172 80 : dopt->dumpSections = ropt->dumpSections;
173 80 : dopt->aclsSkip = ropt->aclsSkip;
174 80 : dopt->outputSuperuser = ropt->superuser;
175 80 : dopt->outputCreateDB = ropt->createDB;
176 80 : dopt->outputNoOwner = ropt->noOwner;
177 80 : dopt->outputNoTableAm = ropt->noTableAm;
178 80 : dopt->outputNoTablespaces = ropt->noTablespace;
179 80 : dopt->disable_triggers = ropt->disable_triggers;
180 80 : dopt->use_setsessauth = ropt->use_setsessauth;
181 80 : dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
182 80 : dopt->dump_inserts = ropt->dump_inserts;
183 80 : dopt->no_comments = ropt->no_comments;
184 80 : dopt->no_publications = ropt->no_publications;
185 80 : dopt->no_security_labels = ropt->no_security_labels;
186 80 : dopt->no_subscriptions = ropt->no_subscriptions;
187 80 : dopt->lockWaitTimeout = ropt->lockWaitTimeout;
188 80 : dopt->include_everything = ropt->include_everything;
189 80 : dopt->enable_row_security = ropt->enable_row_security;
190 80 : dopt->sequence_data = ropt->sequence_data;
191 :
192 80 : return dopt;
193 : }
194 :
195 :
196 : /*
197 : * Wrapper functions.
198 : *
199 : * The objective is to make writing new formats and dumpers as simple
200 : * as possible, if necessary at the expense of extra function calls etc.
201 : *
202 : */
203 :
204 : /*
205 : * The dump worker setup needs lots of knowledge of the internals of pg_dump,
206 : * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
207 : * setup doesn't need to know anything much, so it's defined here.
208 : */
209 : static void
210 20 : setupRestoreWorker(Archive *AHX)
211 : {
212 20 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
213 :
214 20 : AH->ReopenPtr(AH);
215 20 : }
216 :
217 :
218 : /* Create a new archive */
219 : /* Public */
220 : Archive *
221 348 : CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
222 : const pg_compress_specification compression_spec,
223 : bool dosync, ArchiveMode mode,
224 : SetupWorkerPtrType setupDumpWorker,
225 : DataDirSyncMethod sync_method)
226 :
227 : {
228 348 : ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
229 : dosync, mode, setupDumpWorker, sync_method);
230 :
231 346 : return (Archive *) AH;
232 : }
233 :
234 : /* Open an existing archive */
235 : /* Public */
236 : Archive *
237 76 : OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
238 : {
239 : ArchiveHandle *AH;
240 76 : pg_compress_specification compression_spec = {0};
241 :
242 76 : compression_spec.algorithm = PG_COMPRESSION_NONE;
243 76 : AH = _allocAH(FileSpec, fmt, compression_spec, true,
244 : archModeRead, setupRestoreWorker,
245 : DATA_DIR_SYNC_METHOD_FSYNC);
246 :
247 76 : return (Archive *) AH;
248 : }
249 :
250 : /* Public */
251 : void
252 382 : CloseArchive(Archive *AHX)
253 : {
254 382 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
255 :
256 382 : AH->ClosePtr(AH);
257 :
258 : /* Close the output */
259 382 : errno = 0;
260 382 : if (!EndCompressFileHandle(AH->OF))
261 0 : pg_fatal("could not close output file: %m");
262 382 : }
263 :
264 : /* Public */
265 : void
266 738 : SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
267 : {
268 : /* Caller can omit dump options, in which case we synthesize them */
269 738 : if (dopt == NULL && ropt != NULL)
270 80 : dopt = dumpOptionsFromRestoreOptions(ropt);
271 :
272 : /* Save options for later access */
273 738 : AH->dopt = dopt;
274 738 : AH->ropt = ropt;
275 738 : }
276 :
277 : /* Public */
278 : void
279 376 : ProcessArchiveRestoreOptions(Archive *AHX)
280 : {
281 376 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
282 376 : RestoreOptions *ropt = AH->public.ropt;
283 : TocEntry *te;
284 : teSection curSection;
285 :
286 : /* Decide which TOC entries will be dumped/restored, and mark them */
287 376 : curSection = SECTION_PRE_DATA;
288 81206 : for (te = AH->toc->next; te != AH->toc; te = te->next)
289 : {
290 : /*
291 : * When writing an archive, we also take this opportunity to check
292 : * that we have generated the entries in a sane order that respects
293 : * the section divisions. When reading, don't complain, since buggy
294 : * old versions of pg_dump might generate out-of-order archives.
295 : */
296 80830 : if (AH->mode != archModeRead)
297 : {
298 71020 : switch (te->section)
299 : {
300 16300 : case SECTION_NONE:
301 : /* ok to be anywhere */
302 16300 : break;
303 36084 : case SECTION_PRE_DATA:
304 36084 : if (curSection != SECTION_PRE_DATA)
305 0 : pg_log_warning("archive items not in correct section order");
306 36084 : break;
307 8312 : case SECTION_DATA:
308 8312 : if (curSection == SECTION_POST_DATA)
309 0 : pg_log_warning("archive items not in correct section order");
310 8312 : break;
311 10324 : case SECTION_POST_DATA:
312 : /* ok no matter which section we were in */
313 10324 : break;
314 0 : default:
315 0 : pg_fatal("unexpected section code %d",
316 : (int) te->section);
317 : break;
318 : }
319 9810 : }
320 :
321 80830 : if (te->section != SECTION_NONE)
322 63068 : curSection = te->section;
323 :
324 80830 : te->reqs = _tocEntryRequired(te, curSection, AH);
325 : }
326 :
327 : /* Enforce strict names checking */
328 376 : if (ropt->strict_names)
329 0 : StrictNamesCheck(ropt);
330 376 : }
331 :
332 : /* Public */
333 : void
334 318 : RestoreArchive(Archive *AHX)
335 : {
336 318 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
337 318 : RestoreOptions *ropt = AH->public.ropt;
338 : bool parallel_mode;
339 : TocEntry *te;
340 : CompressFileHandle *sav;
341 :
342 318 : AH->stage = STAGE_INITIALIZING;
343 :
344 : /*
345 : * If we're going to do parallel restore, there are some restrictions.
346 : */
347 318 : parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
348 318 : if (parallel_mode)
349 : {
350 : /* We haven't got round to making this work for all archive formats */
351 8 : if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
352 0 : pg_fatal("parallel restore is not supported with this archive file format");
353 :
354 : /* Doesn't work if the archive represents dependencies as OIDs */
355 8 : if (AH->version < K_VERS_1_8)
356 0 : pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
357 :
358 : /*
359 : * It's also not gonna work if we can't reopen the input file, so
360 : * let's try that immediately.
361 : */
362 8 : AH->ReopenPtr(AH);
363 : }
364 :
365 : /*
366 : * Make sure we won't need (de)compression we haven't got
367 : */
368 318 : if (AH->PrintTocDataPtr != NULL)
369 : {
370 55000 : for (te = AH->toc->next; te != AH->toc; te = te->next)
371 : {
372 54870 : if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
373 : {
374 188 : char *errmsg = supports_compression(AH->compression_spec);
375 :
376 188 : if (errmsg)
377 0 : pg_fatal("cannot restore from compressed archive (%s)",
378 : errmsg);
379 : else
380 188 : break;
381 : }
382 : }
383 : }
384 :
385 : /*
386 : * Prepare index arrays, so we can assume we have them throughout restore.
387 : * It's possible we already did this, though.
388 : */
389 318 : if (AH->tocsByDumpId == NULL)
390 314 : buildTocEntryArrays(AH);
391 :
392 : /*
393 : * If we're using a DB connection, then connect it.
394 : */
395 318 : if (ropt->useDB)
396 : {
397 28 : pg_log_info("connecting to database for restore");
398 28 : if (AH->version < K_VERS_1_3)
399 0 : pg_fatal("direct database connections are not supported in pre-1.3 archives");
400 :
401 : /*
402 : * We don't want to guess at whether the dump will successfully
403 : * restore; allow the attempt regardless of the version of the restore
404 : * target.
405 : */
406 28 : AHX->minRemoteVersion = 0;
407 28 : AHX->maxRemoteVersion = 9999999;
408 :
409 28 : ConnectDatabase(AHX, &ropt->cparams, false);
410 :
411 : /*
412 : * If we're talking to the DB directly, don't send comments since they
413 : * obscure SQL when displaying errors
414 : */
415 28 : AH->noTocComments = 1;
416 : }
417 :
418 : /*
419 : * Work out if we have an implied data-only restore. This can happen if
420 : * the dump was data only or if the user has used a toc list to exclude
421 : * all of the schema data. All we do is look for schema entries - if none
422 : * are found then we set the dataOnly flag.
423 : *
424 : * We could scan for wanted TABLE entries, but that is not the same as
425 : * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
426 : */
427 318 : if (!ropt->dataOnly)
428 : {
429 306 : int impliedDataOnly = 1;
430 :
431 2408 : for (te = AH->toc->next; te != AH->toc; te = te->next)
432 : {
433 2366 : if ((te->reqs & REQ_SCHEMA) != 0)
434 : { /* It's schema, and it's wanted */
435 264 : impliedDataOnly = 0;
436 264 : break;
437 : }
438 : }
439 306 : if (impliedDataOnly)
440 : {
441 42 : ropt->dataOnly = impliedDataOnly;
442 42 : pg_log_info("implied data-only restore");
443 : }
444 : }
445 :
446 : /*
447 : * Setup the output file if necessary.
448 : */
449 318 : sav = SaveOutput(AH);
450 318 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
451 268 : SetOutput(AH, ropt->filename, ropt->compression_spec);
452 :
453 318 : ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
454 :
455 318 : if (AH->archiveRemoteVersion)
456 318 : ahprintf(AH, "-- Dumped from database version %s\n",
457 : AH->archiveRemoteVersion);
458 318 : if (AH->archiveDumpVersion)
459 318 : ahprintf(AH, "-- Dumped by pg_dump version %s\n",
460 : AH->archiveDumpVersion);
461 :
462 318 : ahprintf(AH, "\n");
463 :
464 318 : if (AH->public.verbose)
465 42 : dumpTimestamp(AH, "Started on", AH->createDate);
466 :
467 318 : if (ropt->single_txn)
468 : {
469 0 : if (AH->connection)
470 0 : StartTransaction(AHX);
471 : else
472 0 : ahprintf(AH, "BEGIN;\n\n");
473 : }
474 :
475 : /*
476 : * Establish important parameter values right away.
477 : */
478 318 : _doSetFixedOutputState(AH);
479 :
480 318 : AH->stage = STAGE_PROCESSING;
481 :
482 : /*
483 : * Drop the items at the start, in reverse order
484 : */
485 318 : if (ropt->dropSchema)
486 : {
487 1970 : for (te = AH->toc->prev; te != AH->toc; te = te->prev)
488 : {
489 1944 : AH->currentTE = te;
490 :
491 : /*
492 : * In createDB mode, issue a DROP *only* for the database as a
493 : * whole. Issuing drops against anything else would be wrong,
494 : * because at this point we're connected to the wrong database.
495 : * (The DATABASE PROPERTIES entry, if any, should be treated like
496 : * the DATABASE entry.)
497 : */
498 1944 : if (ropt->createDB)
499 : {
500 730 : if (strcmp(te->desc, "DATABASE") != 0 &&
501 714 : strcmp(te->desc, "DATABASE PROPERTIES") != 0)
502 702 : continue;
503 : }
504 :
505 : /* Otherwise, drop anything that's selected and has a dropStmt */
506 1242 : if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
507 : {
508 588 : bool not_allowed_in_txn = false;
509 :
510 588 : pg_log_info("dropping %s %s", te->desc, te->tag);
511 :
512 : /*
513 : * In --transaction-size mode, we have to temporarily exit our
514 : * transaction block to drop objects that can't be dropped
515 : * within a transaction.
516 : */
517 588 : if (ropt->txn_size > 0)
518 : {
519 24 : if (strcmp(te->desc, "DATABASE") == 0 ||
520 12 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
521 : {
522 24 : not_allowed_in_txn = true;
523 24 : if (AH->connection)
524 24 : CommitTransaction(AHX);
525 : else
526 0 : ahprintf(AH, "COMMIT;\n");
527 : }
528 : }
529 :
530 : /* Select owner and schema as necessary */
531 588 : _becomeOwner(AH, te);
532 588 : _selectOutputSchema(AH, te->namespace);
533 :
534 : /*
535 : * Now emit the DROP command, if the object has one. Note we
536 : * don't necessarily emit it verbatim; at this point we add an
537 : * appropriate IF EXISTS clause, if the user requested it.
538 : */
539 588 : if (strcmp(te->desc, "BLOB METADATA") == 0)
540 : {
541 : /* We must generate the per-blob commands */
542 8 : if (ropt->if_exists)
543 4 : IssueCommandPerBlob(AH, te,
544 : "SELECT pg_catalog.lo_unlink(oid) "
545 : "FROM pg_catalog.pg_largeobject_metadata "
546 : "WHERE oid = '", "'");
547 : else
548 4 : IssueCommandPerBlob(AH, te,
549 : "SELECT pg_catalog.lo_unlink('",
550 : "')");
551 : }
552 580 : else if (*te->dropStmt != '\0')
553 : {
554 558 : if (!ropt->if_exists ||
555 268 : strncmp(te->dropStmt, "--", 2) == 0)
556 : {
557 : /*
558 : * Without --if-exists, or if it's just a comment (as
559 : * happens for the public schema), print the dropStmt
560 : * as-is.
561 : */
562 292 : ahprintf(AH, "%s", te->dropStmt);
563 : }
564 : else
565 : {
566 : /*
567 : * Inject an appropriate spelling of "if exists". For
568 : * old-style large objects, we have a routine that
569 : * knows how to do it, without depending on
570 : * te->dropStmt; use that. For other objects we need
571 : * to parse the command.
572 : */
573 266 : if (strcmp(te->desc, "BLOB") == 0)
574 : {
575 0 : DropLOIfExists(AH, te->catalogId.oid);
576 : }
577 : else
578 : {
579 266 : char *dropStmt = pg_strdup(te->dropStmt);
580 266 : char *dropStmtOrig = dropStmt;
581 266 : PQExpBuffer ftStmt = createPQExpBuffer();
582 :
583 : /*
584 : * Need to inject IF EXISTS clause after ALTER
585 : * TABLE part in ALTER TABLE .. DROP statement
586 : */
587 266 : if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
588 : {
589 36 : appendPQExpBufferStr(ftStmt,
590 : "ALTER TABLE IF EXISTS");
591 36 : dropStmt = dropStmt + 11;
592 : }
593 :
594 : /*
595 : * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
596 : * not support the IF EXISTS clause, and therefore
597 : * we simply emit the original command for DEFAULT
598 : * objects (modulo the adjustment made above).
599 : *
600 : * Likewise, don't mess with DATABASE PROPERTIES.
601 : *
602 : * If we used CREATE OR REPLACE VIEW as a means of
603 : * quasi-dropping an ON SELECT rule, that should
604 : * be emitted unchanged as well.
605 : *
606 : * For other object types, we need to extract the
607 : * first part of the DROP which includes the
608 : * object type. Most of the time this matches
609 : * te->desc, so search for that; however for the
610 : * different kinds of CONSTRAINTs, we know to
611 : * search for hardcoded "DROP CONSTRAINT" instead.
612 : */
613 266 : if (strcmp(te->desc, "DEFAULT") == 0 ||
614 260 : strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
615 260 : strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
616 6 : appendPQExpBufferStr(ftStmt, dropStmt);
617 : else
618 : {
619 : char buffer[40];
620 : char *mark;
621 :
622 260 : if (strcmp(te->desc, "CONSTRAINT") == 0 ||
623 234 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
624 234 : strcmp(te->desc, "FK CONSTRAINT") == 0)
625 30 : strcpy(buffer, "DROP CONSTRAINT");
626 : else
627 230 : snprintf(buffer, sizeof(buffer), "DROP %s",
628 : te->desc);
629 :
630 260 : mark = strstr(dropStmt, buffer);
631 :
632 260 : if (mark)
633 : {
634 260 : *mark = '\0';
635 260 : appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
636 : dropStmt, buffer,
637 260 : mark + strlen(buffer));
638 : }
639 : else
640 : {
641 : /* complain and emit unmodified command */
642 0 : pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
643 : dropStmtOrig);
644 0 : appendPQExpBufferStr(ftStmt, dropStmt);
645 : }
646 : }
647 :
648 266 : ahprintf(AH, "%s", ftStmt->data);
649 :
650 266 : destroyPQExpBuffer(ftStmt);
651 266 : pg_free(dropStmtOrig);
652 : }
653 : }
654 : }
655 :
656 : /*
657 : * In --transaction-size mode, re-establish the transaction
658 : * block if needed; otherwise, commit after every N drops.
659 : */
660 588 : if (ropt->txn_size > 0)
661 : {
662 24 : if (not_allowed_in_txn)
663 : {
664 24 : if (AH->connection)
665 24 : StartTransaction(AHX);
666 : else
667 0 : ahprintf(AH, "BEGIN;\n");
668 24 : AH->txnCount = 0;
669 : }
670 0 : else if (++AH->txnCount >= ropt->txn_size)
671 : {
672 0 : if (AH->connection)
673 : {
674 0 : CommitTransaction(AHX);
675 0 : StartTransaction(AHX);
676 : }
677 : else
678 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n");
679 0 : AH->txnCount = 0;
680 : }
681 : }
682 : }
683 : }
684 :
685 : /*
686 : * _selectOutputSchema may have set currSchema to reflect the effect
687 : * of a "SET search_path" command it emitted. However, by now we may
688 : * have dropped that schema; or it might not have existed in the first
689 : * place. In either case the effective value of search_path will not
690 : * be what we think. Forcibly reset currSchema so that we will
691 : * re-establish the search_path setting when needed (after creating
692 : * the schema).
693 : *
694 : * If we treated users as pg_dump'able objects then we'd need to reset
695 : * currUser here too.
696 : */
697 26 : free(AH->currSchema);
698 26 : AH->currSchema = NULL;
699 : }
700 :
701 318 : if (parallel_mode)
702 : {
703 : /*
704 : * In parallel mode, turn control over to the parallel-restore logic.
705 : */
706 : ParallelState *pstate;
707 : TocEntry pending_list;
708 :
709 : /* The archive format module may need some setup for this */
710 8 : if (AH->PrepParallelRestorePtr)
711 8 : AH->PrepParallelRestorePtr(AH);
712 :
713 8 : pending_list_header_init(&pending_list);
714 :
715 : /* This runs PRE_DATA items and then disconnects from the database */
716 8 : restore_toc_entries_prefork(AH, &pending_list);
717 : Assert(AH->connection == NULL);
718 :
719 : /* ParallelBackupStart() will actually fork the processes */
720 8 : pstate = ParallelBackupStart(AH);
721 8 : restore_toc_entries_parallel(AH, pstate, &pending_list);
722 8 : ParallelBackupEnd(AH, pstate);
723 :
724 : /* reconnect the leader and see if we missed something */
725 8 : restore_toc_entries_postfork(AH, &pending_list);
726 : Assert(AH->connection != NULL);
727 : }
728 : else
729 : {
730 : /*
731 : * In serial mode, process everything in three phases: normal items,
732 : * then ACLs, then post-ACL items. We might be able to skip one or
733 : * both extra phases in some cases, eg data-only restores.
734 : */
735 310 : bool haveACL = false;
736 310 : bool havePostACL = false;
737 :
738 71912 : for (te = AH->toc->next; te != AH->toc; te = te->next)
739 : {
740 71604 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
741 2418 : continue; /* ignore if not to be dumped at all */
742 :
743 69186 : switch (_tocEntryRestorePass(te))
744 : {
745 64854 : case RESTORE_PASS_MAIN:
746 64854 : (void) restore_toc_entry(AH, te, false);
747 64852 : break;
748 3776 : case RESTORE_PASS_ACL:
749 3776 : haveACL = true;
750 3776 : break;
751 556 : case RESTORE_PASS_POST_ACL:
752 556 : havePostACL = true;
753 556 : break;
754 : }
755 71602 : }
756 :
757 308 : if (haveACL)
758 : {
759 69864 : for (te = AH->toc->next; te != AH->toc; te = te->next)
760 : {
761 137790 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
762 68066 : _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
763 3776 : (void) restore_toc_entry(AH, te, false);
764 : }
765 : }
766 :
767 308 : if (havePostACL)
768 : {
769 41474 : for (te = AH->toc->next; te != AH->toc; te = te->next)
770 : {
771 82122 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
772 40724 : _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
773 556 : (void) restore_toc_entry(AH, te, false);
774 : }
775 : }
776 : }
777 :
778 : /*
779 : * Close out any persistent transaction we may have. While these two
780 : * cases are started in different places, we can end both cases here.
781 : */
782 316 : if (ropt->single_txn || ropt->txn_size > 0)
783 : {
784 20 : if (AH->connection)
785 20 : CommitTransaction(AHX);
786 : else
787 0 : ahprintf(AH, "COMMIT;\n\n");
788 : }
789 :
790 316 : if (AH->public.verbose)
791 42 : dumpTimestamp(AH, "Completed on", time(NULL));
792 :
793 316 : ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
794 :
795 : /*
796 : * Clean up & we're done.
797 : */
798 316 : AH->stage = STAGE_FINALIZING;
799 :
800 316 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
801 268 : RestoreOutput(AH, sav);
802 :
803 316 : if (ropt->useDB)
804 28 : DisconnectDatabase(&AH->public);
805 316 : }
806 :
807 : /*
808 : * Restore a single TOC item. Used in both parallel and non-parallel restore;
809 : * is_parallel is true if we are in a worker child process.
810 : *
811 : * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
812 : * the parallel parent has to make the corresponding status update.
813 : */
814 : static int
815 69378 : restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
816 : {
817 69378 : RestoreOptions *ropt = AH->public.ropt;
818 69378 : int status = WORKER_OK;
819 : int reqs;
820 : bool defnDumped;
821 :
822 69378 : AH->currentTE = te;
823 :
824 : /* Dump any relevant dump warnings to stderr */
825 69378 : if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
826 : {
827 0 : if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
828 0 : pg_log_warning("warning from original dump file: %s", te->defn);
829 0 : else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
830 0 : pg_log_warning("warning from original dump file: %s", te->copyStmt);
831 : }
832 :
833 : /* Work out what, if anything, we want from this entry */
834 69378 : reqs = te->reqs;
835 :
836 69378 : defnDumped = false;
837 :
838 : /*
839 : * If it has a schema component that we want, then process that
840 : */
841 69378 : if ((reqs & REQ_SCHEMA) != 0)
842 : {
843 60992 : bool object_is_db = false;
844 :
845 : /*
846 : * In --transaction-size mode, must exit our transaction block to
847 : * create a database or set its properties.
848 : */
849 60992 : if (strcmp(te->desc, "DATABASE") == 0 ||
850 60908 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
851 : {
852 112 : object_is_db = true;
853 112 : if (ropt->txn_size > 0)
854 : {
855 40 : if (AH->connection)
856 40 : CommitTransaction(&AH->public);
857 : else
858 0 : ahprintf(AH, "COMMIT;\n\n");
859 : }
860 : }
861 :
862 : /* Show namespace in log message if available */
863 60992 : if (te->namespace)
864 58612 : pg_log_info("creating %s \"%s.%s\"",
865 : te->desc, te->namespace, te->tag);
866 : else
867 2380 : pg_log_info("creating %s \"%s\"",
868 : te->desc, te->tag);
869 :
870 60992 : _printTocEntry(AH, te, false);
871 60992 : defnDumped = true;
872 :
873 60992 : if (strcmp(te->desc, "TABLE") == 0)
874 : {
875 9330 : if (AH->lastErrorTE == te)
876 : {
877 : /*
878 : * We failed to create the table. If
879 : * --no-data-for-failed-tables was given, mark the
880 : * corresponding TABLE DATA to be ignored.
881 : *
882 : * In the parallel case this must be done in the parent, so we
883 : * just set the return value.
884 : */
885 0 : if (ropt->noDataForFailedTables)
886 : {
887 0 : if (is_parallel)
888 0 : status = WORKER_INHIBIT_DATA;
889 : else
890 0 : inhibit_data_for_failed_table(AH, te);
891 : }
892 : }
893 : else
894 : {
895 : /*
896 : * We created the table successfully. Mark the corresponding
897 : * TABLE DATA for possible truncation.
898 : *
899 : * In the parallel case this must be done in the parent, so we
900 : * just set the return value.
901 : */
902 9330 : if (is_parallel)
903 0 : status = WORKER_CREATE_DONE;
904 : else
905 9330 : mark_create_done(AH, te);
906 : }
907 : }
908 :
909 : /*
910 : * If we created a DB, connect to it. Also, if we changed DB
911 : * properties, reconnect to ensure that relevant GUC settings are
912 : * applied to our session. (That also restarts the transaction block
913 : * in --transaction-size mode.)
914 : */
915 60992 : if (object_is_db)
916 : {
917 112 : pg_log_info("connecting to new database \"%s\"", te->tag);
918 112 : _reconnectToDB(AH, te->tag);
919 : }
920 : }
921 :
922 : /*
923 : * If it has a data component that we want, then process that
924 : */
925 69378 : if ((reqs & REQ_DATA) != 0)
926 : {
927 : /*
928 : * hadDumper will be set if there is genuine data component for this
929 : * node. Otherwise, we need to check the defn field for statements
930 : * that need to be executed in data-only restores.
931 : */
932 8356 : if (te->hadDumper)
933 : {
934 : /*
935 : * If we can output the data, then restore it.
936 : */
937 7346 : if (AH->PrintTocDataPtr != NULL)
938 : {
939 7346 : _printTocEntry(AH, te, true);
940 :
941 7346 : if (strcmp(te->desc, "BLOBS") == 0 ||
942 7210 : strcmp(te->desc, "BLOB COMMENTS") == 0)
943 : {
944 136 : pg_log_info("processing %s", te->desc);
945 :
946 136 : _selectOutputSchema(AH, "pg_catalog");
947 :
948 : /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
949 136 : if (strcmp(te->desc, "BLOB COMMENTS") == 0)
950 0 : AH->outputKind = OUTPUT_OTHERDATA;
951 :
952 136 : AH->PrintTocDataPtr(AH, te);
953 :
954 136 : AH->outputKind = OUTPUT_SQLCMDS;
955 : }
956 : else
957 : {
958 : bool use_truncate;
959 :
960 7210 : _disableTriggersIfNecessary(AH, te);
961 :
962 : /* Select owner and schema as necessary */
963 7210 : _becomeOwner(AH, te);
964 7210 : _selectOutputSchema(AH, te->namespace);
965 :
966 7210 : pg_log_info("processing data for table \"%s.%s\"",
967 : te->namespace, te->tag);
968 :
969 : /*
970 : * In parallel restore, if we created the table earlier in
971 : * this run (so that we know it is empty) and we are not
972 : * restoring a load-via-partition-root data item then we
973 : * wrap the COPY in a transaction and precede it with a
974 : * TRUNCATE. If wal_level is set to minimal this prevents
975 : * WAL-logging the COPY. This obtains a speedup similar
976 : * to that from using single_txn mode in non-parallel
977 : * restores.
978 : *
979 : * We mustn't do this for load-via-partition-root cases
980 : * because some data might get moved across partition
981 : * boundaries, risking deadlock and/or loss of previously
982 : * loaded data. (We assume that all partitions of a
983 : * partitioned table will be treated the same way.)
984 : */
985 7242 : use_truncate = is_parallel && te->created &&
986 32 : !is_load_via_partition_root(te);
987 :
988 7210 : if (use_truncate)
989 : {
990 : /*
991 : * Parallel restore is always talking directly to a
992 : * server, so no need to see if we should issue BEGIN.
993 : */
994 20 : StartTransaction(&AH->public);
995 :
996 : /*
997 : * Issue TRUNCATE with ONLY so that child tables are
998 : * not wiped.
999 : */
1000 20 : ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
1001 20 : fmtQualifiedId(te->namespace, te->tag));
1002 : }
1003 :
1004 : /*
1005 : * If we have a copy statement, use it.
1006 : */
1007 7210 : if (te->copyStmt && strlen(te->copyStmt) > 0)
1008 : {
1009 7072 : ahprintf(AH, "%s", te->copyStmt);
1010 7072 : AH->outputKind = OUTPUT_COPYDATA;
1011 : }
1012 : else
1013 138 : AH->outputKind = OUTPUT_OTHERDATA;
1014 :
1015 7210 : AH->PrintTocDataPtr(AH, te);
1016 :
1017 : /*
1018 : * Terminate COPY if needed.
1019 : */
1020 14278 : if (AH->outputKind == OUTPUT_COPYDATA &&
1021 7070 : RestoringToDB(AH))
1022 18 : EndDBCopyMode(&AH->public, te->tag);
1023 7208 : AH->outputKind = OUTPUT_SQLCMDS;
1024 :
1025 : /* close out the transaction started above */
1026 7208 : if (use_truncate)
1027 20 : CommitTransaction(&AH->public);
1028 :
1029 7208 : _enableTriggersIfNecessary(AH, te);
1030 : }
1031 : }
1032 : }
1033 1010 : else if (!defnDumped)
1034 : {
1035 : /* If we haven't already dumped the defn part, do so now */
1036 1010 : pg_log_info("executing %s %s", te->desc, te->tag);
1037 1010 : _printTocEntry(AH, te, false);
1038 : }
1039 : }
1040 :
1041 : /*
1042 : * If we emitted anything for this TOC entry, that counts as one action
1043 : * against the transaction-size limit. Commit if it's time to.
1044 : */
1045 69376 : if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
1046 : {
1047 3962 : if (++AH->txnCount >= ropt->txn_size)
1048 : {
1049 14 : if (AH->connection)
1050 : {
1051 14 : CommitTransaction(&AH->public);
1052 14 : StartTransaction(&AH->public);
1053 : }
1054 : else
1055 0 : ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
1056 14 : AH->txnCount = 0;
1057 : }
1058 : }
1059 :
1060 69376 : if (AH->public.n_errors > 0 && status == WORKER_OK)
1061 0 : status = WORKER_IGNORED_ERRORS;
1062 :
1063 69376 : return status;
1064 : }
1065 :
1066 : /*
1067 : * Allocate a new RestoreOptions block.
1068 : * This is mainly so we can initialize it, but also for future expansion,
1069 : */
1070 : RestoreOptions *
1071 440 : NewRestoreOptions(void)
1072 : {
1073 : RestoreOptions *opts;
1074 :
1075 440 : opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
1076 :
1077 : /* set any fields that shouldn't default to zeroes */
1078 440 : opts->format = archUnknown;
1079 440 : opts->cparams.promptPassword = TRI_DEFAULT;
1080 440 : opts->dumpSections = DUMP_UNSECTIONED;
1081 440 : opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
1082 440 : opts->compression_spec.level = 0;
1083 :
1084 440 : return opts;
1085 : }
1086 :
1087 : static void
1088 7210 : _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1089 : {
1090 7210 : RestoreOptions *ropt = AH->public.ropt;
1091 :
1092 : /* This hack is only needed in a data-only restore */
1093 7210 : if (!ropt->dataOnly || !ropt->disable_triggers)
1094 7150 : return;
1095 :
1096 60 : pg_log_info("disabling triggers for %s", te->tag);
1097 :
1098 : /*
1099 : * Become superuser if possible, since they are the only ones who can
1100 : * disable constraint triggers. If -S was not given, assume the initial
1101 : * user identity is a superuser. (XXX would it be better to become the
1102 : * table owner?)
1103 : */
1104 60 : _becomeUser(AH, ropt->superuser);
1105 :
1106 : /*
1107 : * Disable them.
1108 : */
1109 60 : ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1110 60 : fmtQualifiedId(te->namespace, te->tag));
1111 : }
1112 :
1113 : static void
1114 7208 : _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1115 : {
1116 7208 : RestoreOptions *ropt = AH->public.ropt;
1117 :
1118 : /* This hack is only needed in a data-only restore */
1119 7208 : if (!ropt->dataOnly || !ropt->disable_triggers)
1120 7148 : return;
1121 :
1122 60 : pg_log_info("enabling triggers for %s", te->tag);
1123 :
1124 : /*
1125 : * Become superuser if possible, since they are the only ones who can
1126 : * disable constraint triggers. If -S was not given, assume the initial
1127 : * user identity is a superuser. (XXX would it be better to become the
1128 : * table owner?)
1129 : */
1130 60 : _becomeUser(AH, ropt->superuser);
1131 :
1132 : /*
1133 : * Enable them.
1134 : */
1135 60 : ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1136 60 : fmtQualifiedId(te->namespace, te->tag));
1137 : }
1138 :
1139 : /*
1140 : * Detect whether a TABLE DATA TOC item is performing "load via partition
1141 : * root", that is the target table is an ancestor partition rather than the
1142 : * table the TOC item is nominally for.
1143 : *
1144 : * In newer archive files this can be detected by checking for a special
1145 : * comment placed in te->defn. In older files we have to fall back to seeing
1146 : * if the COPY statement targets the named table or some other one. This
1147 : * will not work for data dumped as INSERT commands, so we could give a false
1148 : * negative in that case; fortunately, that's a rarely-used option.
1149 : */
1150 : static bool
1151 32 : is_load_via_partition_root(TocEntry *te)
1152 : {
1153 32 : if (te->defn &&
1154 12 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
1155 12 : return true;
1156 20 : if (te->copyStmt && *te->copyStmt)
1157 : {
1158 12 : PQExpBuffer copyStmt = createPQExpBuffer();
1159 : bool result;
1160 :
1161 : /*
1162 : * Build the initial part of the COPY as it would appear if the
1163 : * nominal target table is the actual target. If we see anything
1164 : * else, it must be a load-via-partition-root case.
1165 : */
1166 12 : appendPQExpBuffer(copyStmt, "COPY %s ",
1167 12 : fmtQualifiedId(te->namespace, te->tag));
1168 12 : result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1169 12 : destroyPQExpBuffer(copyStmt);
1170 12 : return result;
1171 : }
1172 : /* Assume it's not load-via-partition-root */
1173 8 : return false;
1174 : }
1175 :
1176 : /*
1177 : * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1178 : */
1179 :
1180 : /* Public */
1181 : void
1182 3645168 : WriteData(Archive *AHX, const void *data, size_t dLen)
1183 : {
1184 3645168 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1185 :
1186 3645168 : if (!AH->currToc)
1187 0 : pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1188 :
1189 3645168 : AH->WriteDataPtr(AH, data, dLen);
1190 3645168 : }
1191 :
1192 : /*
1193 : * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1194 : * repository for all metadata. But the name has stuck.
1195 : *
1196 : * The new entry is added to the Archive's TOC list. Most callers can ignore
1197 : * the result value because nothing else need be done, but a few want to
1198 : * manipulate the TOC entry further.
1199 : */
1200 :
1201 : /* Public */
1202 : TocEntry *
1203 71020 : ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1204 : ArchiveOpts *opts)
1205 : {
1206 71020 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1207 : TocEntry *newToc;
1208 :
1209 71020 : newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1210 :
1211 71020 : AH->tocCount++;
1212 71020 : if (dumpId > AH->maxDumpId)
1213 18420 : AH->maxDumpId = dumpId;
1214 :
1215 71020 : newToc->prev = AH->toc->prev;
1216 71020 : newToc->next = AH->toc;
1217 71020 : AH->toc->prev->next = newToc;
1218 71020 : AH->toc->prev = newToc;
1219 :
1220 71020 : newToc->catalogId = catalogId;
1221 71020 : newToc->dumpId = dumpId;
1222 71020 : newToc->section = opts->section;
1223 :
1224 71020 : newToc->tag = pg_strdup(opts->tag);
1225 71020 : newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1226 71020 : newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1227 71020 : newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1228 71020 : newToc->relkind = opts->relkind;
1229 71020 : newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1230 71020 : newToc->desc = pg_strdup(opts->description);
1231 71020 : newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1232 71020 : newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1233 71020 : newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1234 :
1235 71020 : if (opts->nDeps > 0)
1236 : {
1237 25326 : newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1238 25326 : memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1239 25326 : newToc->nDeps = opts->nDeps;
1240 : }
1241 : else
1242 : {
1243 45694 : newToc->dependencies = NULL;
1244 45694 : newToc->nDeps = 0;
1245 : }
1246 :
1247 71020 : newToc->dataDumper = opts->dumpFn;
1248 71020 : newToc->dataDumperArg = opts->dumpArg;
1249 71020 : newToc->hadDumper = opts->dumpFn ? true : false;
1250 :
1251 71020 : newToc->formatData = NULL;
1252 71020 : newToc->dataLength = 0;
1253 :
1254 71020 : if (AH->ArchiveEntryPtr != NULL)
1255 9646 : AH->ArchiveEntryPtr(AH, newToc);
1256 :
1257 71020 : return newToc;
1258 : }
1259 :
1260 : /* Public */
1261 : void
1262 8 : PrintTOCSummary(Archive *AHX)
1263 : {
1264 8 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1265 8 : RestoreOptions *ropt = AH->public.ropt;
1266 : TocEntry *te;
1267 8 : pg_compress_specification out_compression_spec = {0};
1268 : teSection curSection;
1269 : CompressFileHandle *sav;
1270 : const char *fmtName;
1271 : char stamp_str[64];
1272 :
1273 : /* TOC is always uncompressed */
1274 8 : out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1275 :
1276 8 : sav = SaveOutput(AH);
1277 8 : if (ropt->filename)
1278 0 : SetOutput(AH, ropt->filename, out_compression_spec);
1279 :
1280 8 : if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1281 8 : localtime(&AH->createDate)) == 0)
1282 0 : strcpy(stamp_str, "[unknown]");
1283 :
1284 8 : ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1285 16 : ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1286 8 : sanitize_line(AH->archdbname, false),
1287 : AH->tocCount,
1288 : get_compress_algorithm_name(AH->compression_spec.algorithm));
1289 :
1290 8 : switch (AH->format)
1291 : {
1292 6 : case archCustom:
1293 6 : fmtName = "CUSTOM";
1294 6 : break;
1295 2 : case archDirectory:
1296 2 : fmtName = "DIRECTORY";
1297 2 : break;
1298 0 : case archTar:
1299 0 : fmtName = "TAR";
1300 0 : break;
1301 0 : default:
1302 0 : fmtName = "UNKNOWN";
1303 : }
1304 :
1305 8 : ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1306 8 : ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1307 8 : ahprintf(AH, "; Format: %s\n", fmtName);
1308 8 : ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1309 8 : ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1310 8 : if (AH->archiveRemoteVersion)
1311 8 : ahprintf(AH, "; Dumped from database version: %s\n",
1312 : AH->archiveRemoteVersion);
1313 8 : if (AH->archiveDumpVersion)
1314 8 : ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1315 : AH->archiveDumpVersion);
1316 :
1317 8 : ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1318 :
1319 8 : curSection = SECTION_PRE_DATA;
1320 2288 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1321 : {
1322 : /* This bit must match ProcessArchiveRestoreOptions' marking logic */
1323 2280 : if (te->section != SECTION_NONE)
1324 1704 : curSection = te->section;
1325 2280 : te->reqs = _tocEntryRequired(te, curSection, AH);
1326 : /* Now, should we print it? */
1327 2280 : if (ropt->verbose ||
1328 2280 : (te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
1329 : {
1330 : char *sanitized_name;
1331 : char *sanitized_schema;
1332 : char *sanitized_owner;
1333 :
1334 : /*
1335 : */
1336 2240 : sanitized_name = sanitize_line(te->tag, false);
1337 2240 : sanitized_schema = sanitize_line(te->namespace, true);
1338 2240 : sanitized_owner = sanitize_line(te->owner, false);
1339 :
1340 2240 : ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1341 : te->catalogId.tableoid, te->catalogId.oid,
1342 : te->desc, sanitized_schema, sanitized_name,
1343 : sanitized_owner);
1344 :
1345 2240 : free(sanitized_name);
1346 2240 : free(sanitized_schema);
1347 2240 : free(sanitized_owner);
1348 : }
1349 2280 : if (ropt->verbose && te->nDeps > 0)
1350 : {
1351 : int i;
1352 :
1353 0 : ahprintf(AH, ";\tdepends on:");
1354 0 : for (i = 0; i < te->nDeps; i++)
1355 0 : ahprintf(AH, " %d", te->dependencies[i]);
1356 0 : ahprintf(AH, "\n");
1357 : }
1358 : }
1359 :
1360 : /* Enforce strict names checking */
1361 8 : if (ropt->strict_names)
1362 0 : StrictNamesCheck(ropt);
1363 :
1364 8 : if (ropt->filename)
1365 0 : RestoreOutput(AH, sav);
1366 8 : }
1367 :
1368 : /***********
1369 : * Large Object Archival
1370 : ***********/
1371 :
1372 : /* Called by a dumper to signal start of a LO */
1373 : int
1374 148 : StartLO(Archive *AHX, Oid oid)
1375 : {
1376 148 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1377 :
1378 148 : if (!AH->StartLOPtr)
1379 0 : pg_fatal("large-object output not supported in chosen format");
1380 :
1381 148 : AH->StartLOPtr(AH, AH->currToc, oid);
1382 :
1383 148 : return 1;
1384 : }
1385 :
1386 : /* Called by a dumper to signal end of a LO */
1387 : int
1388 148 : EndLO(Archive *AHX, Oid oid)
1389 : {
1390 148 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1391 :
1392 148 : if (AH->EndLOPtr)
1393 148 : AH->EndLOPtr(AH, AH->currToc, oid);
1394 :
1395 148 : return 1;
1396 : }
1397 :
1398 : /**********
1399 : * Large Object Restoration
1400 : **********/
1401 :
1402 : /*
1403 : * Called by a format handler before a group of LOs is restored
1404 : */
1405 : void
1406 32 : StartRestoreLOs(ArchiveHandle *AH)
1407 : {
1408 32 : RestoreOptions *ropt = AH->public.ropt;
1409 :
1410 : /*
1411 : * LOs must be restored within a transaction block, since we need the LO
1412 : * handle to stay open while we write it. Establish a transaction unless
1413 : * there's one being used globally.
1414 : */
1415 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1416 : {
1417 32 : if (AH->connection)
1418 0 : StartTransaction(&AH->public);
1419 : else
1420 32 : ahprintf(AH, "BEGIN;\n\n");
1421 : }
1422 :
1423 32 : AH->loCount = 0;
1424 32 : }
1425 :
1426 : /*
1427 : * Called by a format handler after a group of LOs is restored
1428 : */
1429 : void
1430 32 : EndRestoreLOs(ArchiveHandle *AH)
1431 : {
1432 32 : RestoreOptions *ropt = AH->public.ropt;
1433 :
1434 32 : if (!(ropt->single_txn || ropt->txn_size > 0))
1435 : {
1436 32 : if (AH->connection)
1437 0 : CommitTransaction(&AH->public);
1438 : else
1439 32 : ahprintf(AH, "COMMIT;\n\n");
1440 : }
1441 :
1442 32 : pg_log_info(ngettext("restored %d large object",
1443 : "restored %d large objects",
1444 : AH->loCount),
1445 : AH->loCount);
1446 32 : }
1447 :
1448 :
1449 : /*
1450 : * Called by a format handler to initiate restoration of a LO
1451 : */
1452 : void
1453 32 : StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1454 : {
1455 32 : bool old_lo_style = (AH->version < K_VERS_1_12);
1456 : Oid loOid;
1457 :
1458 32 : AH->loCount++;
1459 :
1460 : /* Initialize the LO Buffer */
1461 32 : if (AH->lo_buf == NULL)
1462 : {
1463 : /* First time through (in this process) so allocate the buffer */
1464 16 : AH->lo_buf_size = LOBBUFSIZE;
1465 16 : AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
1466 : }
1467 32 : AH->lo_buf_used = 0;
1468 :
1469 32 : pg_log_info("restoring large object with OID %u", oid);
1470 :
1471 : /* With an old archive we must do drop and create logic here */
1472 32 : if (old_lo_style && drop)
1473 0 : DropLOIfExists(AH, oid);
1474 :
1475 32 : if (AH->connection)
1476 : {
1477 0 : if (old_lo_style)
1478 : {
1479 0 : loOid = lo_create(AH->connection, oid);
1480 0 : if (loOid == 0 || loOid != oid)
1481 0 : pg_fatal("could not create large object %u: %s",
1482 : oid, PQerrorMessage(AH->connection));
1483 : }
1484 0 : AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1485 0 : if (AH->loFd == -1)
1486 0 : pg_fatal("could not open large object %u: %s",
1487 : oid, PQerrorMessage(AH->connection));
1488 : }
1489 : else
1490 : {
1491 32 : if (old_lo_style)
1492 0 : ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1493 : oid, INV_WRITE);
1494 : else
1495 32 : ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1496 : oid, INV_WRITE);
1497 : }
1498 :
1499 32 : AH->writingLO = true;
1500 32 : }
1501 :
1502 : void
1503 32 : EndRestoreLO(ArchiveHandle *AH, Oid oid)
1504 : {
1505 32 : if (AH->lo_buf_used > 0)
1506 : {
1507 : /* Write remaining bytes from the LO buffer */
1508 16 : dump_lo_buf(AH);
1509 : }
1510 :
1511 32 : AH->writingLO = false;
1512 :
1513 32 : if (AH->connection)
1514 : {
1515 0 : lo_close(AH->connection, AH->loFd);
1516 0 : AH->loFd = -1;
1517 : }
1518 : else
1519 : {
1520 32 : ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1521 : }
1522 32 : }
1523 :
1524 : /***********
1525 : * Sorting and Reordering
1526 : ***********/
1527 :
1528 : void
1529 0 : SortTocFromFile(Archive *AHX)
1530 : {
1531 0 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1532 0 : RestoreOptions *ropt = AH->public.ropt;
1533 : FILE *fh;
1534 : StringInfoData linebuf;
1535 :
1536 : /* Allocate space for the 'wanted' array, and init it */
1537 0 : ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1538 :
1539 : /* Setup the file */
1540 0 : fh = fopen(ropt->tocFile, PG_BINARY_R);
1541 0 : if (!fh)
1542 0 : pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1543 :
1544 0 : initStringInfo(&linebuf);
1545 :
1546 0 : while (pg_get_line_buf(fh, &linebuf))
1547 : {
1548 : char *cmnt;
1549 : char *endptr;
1550 : DumpId id;
1551 : TocEntry *te;
1552 :
1553 : /* Truncate line at comment, if any */
1554 0 : cmnt = strchr(linebuf.data, ';');
1555 0 : if (cmnt != NULL)
1556 : {
1557 0 : cmnt[0] = '\0';
1558 0 : linebuf.len = cmnt - linebuf.data;
1559 : }
1560 :
1561 : /* Ignore if all blank */
1562 0 : if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1563 0 : continue;
1564 :
1565 : /* Get an ID, check it's valid and not already seen */
1566 0 : id = strtol(linebuf.data, &endptr, 10);
1567 0 : if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1568 0 : ropt->idWanted[id - 1])
1569 : {
1570 0 : pg_log_warning("line ignored: %s", linebuf.data);
1571 0 : continue;
1572 : }
1573 :
1574 : /* Find TOC entry */
1575 0 : te = getTocEntryByDumpId(AH, id);
1576 0 : if (!te)
1577 0 : pg_fatal("could not find entry for ID %d",
1578 : id);
1579 :
1580 : /* Mark it wanted */
1581 0 : ropt->idWanted[id - 1] = true;
1582 :
1583 : /*
1584 : * Move each item to the end of the list as it is selected, so that
1585 : * they are placed in the desired order. Any unwanted items will end
1586 : * up at the front of the list, which may seem unintuitive but it's
1587 : * what we need. In an ordinary serial restore that makes no
1588 : * difference, but in a parallel restore we need to mark unrestored
1589 : * items' dependencies as satisfied before we start examining
1590 : * restorable items. Otherwise they could have surprising
1591 : * side-effects on the order in which restorable items actually get
1592 : * restored.
1593 : */
1594 0 : _moveBefore(AH->toc, te);
1595 : }
1596 :
1597 0 : pg_free(linebuf.data);
1598 :
1599 0 : if (fclose(fh) != 0)
1600 0 : pg_fatal("could not close TOC file: %m");
1601 0 : }
1602 :
1603 : /**********************
1604 : * Convenience functions that look like standard IO functions
1605 : * for writing data when in dump mode.
1606 : **********************/
1607 :
1608 : /* Public */
1609 : void
1610 43244 : archputs(const char *s, Archive *AH)
1611 : {
1612 43244 : WriteData(AH, s, strlen(s));
1613 43244 : }
1614 :
1615 : /* Public */
1616 : int
1617 7024 : archprintf(Archive *AH, const char *fmt,...)
1618 : {
1619 7024 : int save_errno = errno;
1620 : char *p;
1621 7024 : size_t len = 128; /* initial assumption about buffer size */
1622 : size_t cnt;
1623 :
1624 : for (;;)
1625 0 : {
1626 : va_list args;
1627 :
1628 : /* Allocate work buffer. */
1629 7024 : p = (char *) pg_malloc(len);
1630 :
1631 : /* Try to format the data. */
1632 7024 : errno = save_errno;
1633 7024 : va_start(args, fmt);
1634 7024 : cnt = pvsnprintf(p, len, fmt, args);
1635 7024 : va_end(args);
1636 :
1637 7024 : if (cnt < len)
1638 7024 : break; /* success */
1639 :
1640 : /* Release buffer and loop around to try again with larger len. */
1641 0 : free(p);
1642 0 : len = cnt;
1643 : }
1644 :
1645 7024 : WriteData(AH, p, cnt);
1646 7024 : free(p);
1647 7024 : return (int) cnt;
1648 : }
1649 :
1650 :
1651 : /*******************************
1652 : * Stuff below here should be 'private' to the archiver routines
1653 : *******************************/
1654 :
1655 : static void
1656 268 : SetOutput(ArchiveHandle *AH, const char *filename,
1657 : const pg_compress_specification compression_spec)
1658 : {
1659 : CompressFileHandle *CFH;
1660 : const char *mode;
1661 268 : int fn = -1;
1662 :
1663 268 : if (filename)
1664 : {
1665 268 : if (strcmp(filename, "-") == 0)
1666 0 : fn = fileno(stdout);
1667 : }
1668 0 : else if (AH->FH)
1669 0 : fn = fileno(AH->FH);
1670 0 : else if (AH->fSpec)
1671 : {
1672 0 : filename = AH->fSpec;
1673 : }
1674 : else
1675 0 : fn = fileno(stdout);
1676 :
1677 268 : if (AH->mode == archModeAppend)
1678 86 : mode = PG_BINARY_A;
1679 : else
1680 182 : mode = PG_BINARY_W;
1681 :
1682 268 : CFH = InitCompressFileHandle(compression_spec);
1683 :
1684 268 : if (!CFH->open_func(filename, fn, mode, CFH))
1685 : {
1686 0 : if (filename)
1687 0 : pg_fatal("could not open output file \"%s\": %m", filename);
1688 : else
1689 0 : pg_fatal("could not open output file: %m");
1690 : }
1691 :
1692 268 : AH->OF = CFH;
1693 268 : }
1694 :
1695 : static CompressFileHandle *
1696 326 : SaveOutput(ArchiveHandle *AH)
1697 : {
1698 326 : return (CompressFileHandle *) AH->OF;
1699 : }
1700 :
1701 : static void
1702 268 : RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
1703 : {
1704 268 : errno = 0;
1705 268 : if (!EndCompressFileHandle(AH->OF))
1706 0 : pg_fatal("could not close output file: %m");
1707 :
1708 268 : AH->OF = savedOutput;
1709 268 : }
1710 :
1711 :
1712 :
1713 : /*
1714 : * Print formatted text to the output file (usually stdout).
1715 : */
1716 : int
1717 380058 : ahprintf(ArchiveHandle *AH, const char *fmt,...)
1718 : {
1719 380058 : int save_errno = errno;
1720 : char *p;
1721 380058 : size_t len = 128; /* initial assumption about buffer size */
1722 : size_t cnt;
1723 :
1724 : for (;;)
1725 19706 : {
1726 : va_list args;
1727 :
1728 : /* Allocate work buffer. */
1729 399764 : p = (char *) pg_malloc(len);
1730 :
1731 : /* Try to format the data. */
1732 399764 : errno = save_errno;
1733 399764 : va_start(args, fmt);
1734 399764 : cnt = pvsnprintf(p, len, fmt, args);
1735 399764 : va_end(args);
1736 :
1737 399764 : if (cnt < len)
1738 380058 : break; /* success */
1739 :
1740 : /* Release buffer and loop around to try again with larger len. */
1741 19706 : free(p);
1742 19706 : len = cnt;
1743 : }
1744 :
1745 380058 : ahwrite(p, 1, cnt, AH);
1746 380058 : free(p);
1747 380058 : return (int) cnt;
1748 : }
1749 :
1750 : /*
1751 : * Single place for logic which says 'We are restoring to a direct DB connection'.
1752 : */
1753 : static int
1754 3981454 : RestoringToDB(ArchiveHandle *AH)
1755 : {
1756 3981454 : RestoreOptions *ropt = AH->public.ropt;
1757 :
1758 3981454 : return (ropt && ropt->useDB && AH->connection);
1759 : }
1760 :
1761 : /*
1762 : * Dump the current contents of the LO data buffer while writing a LO
1763 : */
1764 : static void
1765 16 : dump_lo_buf(ArchiveHandle *AH)
1766 : {
1767 16 : if (AH->connection)
1768 : {
1769 : int res;
1770 :
1771 0 : res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1772 0 : pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1773 : "wrote %zu bytes of large object data (result = %d)",
1774 : AH->lo_buf_used),
1775 : AH->lo_buf_used, res);
1776 : /* We assume there are no short writes, only errors */
1777 0 : if (res != AH->lo_buf_used)
1778 0 : warn_or_exit_horribly(AH, "could not write to large object: %s",
1779 0 : PQerrorMessage(AH->connection));
1780 : }
1781 : else
1782 : {
1783 16 : PQExpBuffer buf = createPQExpBuffer();
1784 :
1785 16 : appendByteaLiteralAHX(buf,
1786 : (const unsigned char *) AH->lo_buf,
1787 : AH->lo_buf_used,
1788 : AH);
1789 :
1790 : /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1791 16 : AH->writingLO = false;
1792 16 : ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1793 16 : AH->writingLO = true;
1794 :
1795 16 : destroyPQExpBuffer(buf);
1796 : }
1797 16 : AH->lo_buf_used = 0;
1798 16 : }
1799 :
1800 :
1801 : /*
1802 : * Write buffer to the output file (usually stdout). This is used for
1803 : * outputting 'restore' scripts etc. It is even possible for an archive
1804 : * format to create a custom output routine to 'fake' a restore if it
1805 : * wants to generate a script (see TAR output).
1806 : */
1807 : void
1808 3976762 : ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1809 : {
1810 3976762 : int bytes_written = 0;
1811 :
1812 3976762 : if (AH->writingLO)
1813 : {
1814 26 : size_t remaining = size * nmemb;
1815 :
1816 26 : while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1817 : {
1818 0 : size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1819 :
1820 0 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1821 0 : ptr = (const void *) ((const char *) ptr + avail);
1822 0 : remaining -= avail;
1823 0 : AH->lo_buf_used += avail;
1824 0 : dump_lo_buf(AH);
1825 : }
1826 :
1827 26 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1828 26 : AH->lo_buf_used += remaining;
1829 :
1830 26 : bytes_written = size * nmemb;
1831 : }
1832 3976736 : else if (AH->CustomOutPtr)
1833 3546 : bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1834 :
1835 : /*
1836 : * If we're doing a restore, and it's direct to DB, and we're connected
1837 : * then send it to the DB.
1838 : */
1839 3973190 : else if (RestoringToDB(AH))
1840 8036 : bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1841 : else
1842 : {
1843 3965154 : CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1844 :
1845 3965154 : if (CFH->write_func(ptr, size * nmemb, CFH))
1846 3965154 : bytes_written = size * nmemb;
1847 : }
1848 :
1849 3976762 : if (bytes_written != size * nmemb)
1850 0 : WRITE_ERROR_EXIT;
1851 3976762 : }
1852 :
1853 : /* on some error, we may decide to go on... */
1854 : void
1855 0 : warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1856 : {
1857 : va_list ap;
1858 :
1859 0 : switch (AH->stage)
1860 : {
1861 :
1862 0 : case STAGE_NONE:
1863 : /* Do nothing special */
1864 0 : break;
1865 :
1866 0 : case STAGE_INITIALIZING:
1867 0 : if (AH->stage != AH->lastErrorStage)
1868 0 : pg_log_info("while INITIALIZING:");
1869 0 : break;
1870 :
1871 0 : case STAGE_PROCESSING:
1872 0 : if (AH->stage != AH->lastErrorStage)
1873 0 : pg_log_info("while PROCESSING TOC:");
1874 0 : break;
1875 :
1876 0 : case STAGE_FINALIZING:
1877 0 : if (AH->stage != AH->lastErrorStage)
1878 0 : pg_log_info("while FINALIZING:");
1879 0 : break;
1880 : }
1881 0 : if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1882 : {
1883 0 : pg_log_info("from TOC entry %d; %u %u %s %s %s",
1884 : AH->currentTE->dumpId,
1885 : AH->currentTE->catalogId.tableoid,
1886 : AH->currentTE->catalogId.oid,
1887 : AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1888 : AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1889 : AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1890 : }
1891 0 : AH->lastErrorStage = AH->stage;
1892 0 : AH->lastErrorTE = AH->currentTE;
1893 :
1894 0 : va_start(ap, fmt);
1895 0 : pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, ap);
1896 0 : va_end(ap);
1897 :
1898 0 : if (AH->public.exit_on_error)
1899 0 : exit_nicely(1);
1900 : else
1901 0 : AH->public.n_errors++;
1902 0 : }
1903 :
1904 : #ifdef NOT_USED
1905 :
1906 : static void
1907 : _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1908 : {
1909 : /* Unlink te from list */
1910 : te->prev->next = te->next;
1911 : te->next->prev = te->prev;
1912 :
1913 : /* and insert it after "pos" */
1914 : te->prev = pos;
1915 : te->next = pos->next;
1916 : pos->next->prev = te;
1917 : pos->next = te;
1918 : }
1919 : #endif
1920 :
1921 : static void
1922 0 : _moveBefore(TocEntry *pos, TocEntry *te)
1923 : {
1924 : /* Unlink te from list */
1925 0 : te->prev->next = te->next;
1926 0 : te->next->prev = te->prev;
1927 :
1928 : /* and insert it before "pos" */
1929 0 : te->prev = pos->prev;
1930 0 : te->next = pos;
1931 0 : pos->prev->next = te;
1932 0 : pos->prev = te;
1933 0 : }
1934 :
1935 : /*
1936 : * Build index arrays for the TOC list
1937 : *
1938 : * This should be invoked only after we have created or read in all the TOC
1939 : * items.
1940 : *
1941 : * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1942 : * array entries run only up to maxDumpId. We might see dependency dump IDs
1943 : * beyond that (if the dump was partial); so always check the array bound
1944 : * before trying to touch an array entry.
1945 : */
1946 : static void
1947 358 : buildTocEntryArrays(ArchiveHandle *AH)
1948 : {
1949 358 : DumpId maxDumpId = AH->maxDumpId;
1950 : TocEntry *te;
1951 :
1952 358 : AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1953 358 : AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1954 :
1955 81042 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1956 : {
1957 : /* this check is purely paranoia, maxDumpId should be correct */
1958 80684 : if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1959 0 : pg_fatal("bad dumpId");
1960 :
1961 : /* tocsByDumpId indexes all TOCs by their dump ID */
1962 80684 : AH->tocsByDumpId[te->dumpId] = te;
1963 :
1964 : /*
1965 : * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1966 : * TOC entry that has a DATA item. We compute this by reversing the
1967 : * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1968 : * just one dependency and it is the TABLE item.
1969 : */
1970 80684 : if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1971 : {
1972 7892 : DumpId tableId = te->dependencies[0];
1973 :
1974 : /*
1975 : * The TABLE item might not have been in the archive, if this was
1976 : * a data-only dump; but its dump ID should be less than its data
1977 : * item's dump ID, so there should be a place for it in the array.
1978 : */
1979 7892 : if (tableId <= 0 || tableId > maxDumpId)
1980 0 : pg_fatal("bad table dumpId for TABLE DATA item");
1981 :
1982 7892 : AH->tableDataId[tableId] = te->dumpId;
1983 : }
1984 : }
1985 358 : }
1986 :
1987 : TocEntry *
1988 21016 : getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1989 : {
1990 : /* build index arrays if we didn't already */
1991 21016 : if (AH->tocsByDumpId == NULL)
1992 44 : buildTocEntryArrays(AH);
1993 :
1994 21016 : if (id > 0 && id <= AH->maxDumpId)
1995 21016 : return AH->tocsByDumpId[id];
1996 :
1997 0 : return NULL;
1998 : }
1999 :
2000 : int
2001 20688 : TocIDRequired(ArchiveHandle *AH, DumpId id)
2002 : {
2003 20688 : TocEntry *te = getTocEntryByDumpId(AH, id);
2004 :
2005 20688 : if (!te)
2006 9716 : return 0;
2007 :
2008 10972 : return te->reqs;
2009 : }
2010 :
2011 : size_t
2012 12796 : WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
2013 : {
2014 : int off;
2015 :
2016 : /* Save the flag */
2017 12796 : AH->WriteBytePtr(AH, wasSet);
2018 :
2019 : /* Write out pgoff_t smallest byte first, prevents endian mismatch */
2020 115164 : for (off = 0; off < sizeof(pgoff_t); off++)
2021 : {
2022 102368 : AH->WriteBytePtr(AH, o & 0xFF);
2023 102368 : o >>= 8;
2024 : }
2025 12796 : return sizeof(pgoff_t) + 1;
2026 : }
2027 :
2028 : int
2029 8266 : ReadOffset(ArchiveHandle *AH, pgoff_t * o)
2030 : {
2031 : int i;
2032 : int off;
2033 : int offsetFlg;
2034 :
2035 : /* Initialize to zero */
2036 8266 : *o = 0;
2037 :
2038 : /* Check for old version */
2039 8266 : if (AH->version < K_VERS_1_7)
2040 : {
2041 : /* Prior versions wrote offsets using WriteInt */
2042 0 : i = ReadInt(AH);
2043 : /* -1 means not set */
2044 0 : if (i < 0)
2045 0 : return K_OFFSET_POS_NOT_SET;
2046 0 : else if (i == 0)
2047 0 : return K_OFFSET_NO_DATA;
2048 :
2049 : /* Cast to pgoff_t because it was written as an int. */
2050 0 : *o = (pgoff_t) i;
2051 0 : return K_OFFSET_POS_SET;
2052 : }
2053 :
2054 : /*
2055 : * Read the flag indicating the state of the data pointer. Check if valid
2056 : * and die if not.
2057 : *
2058 : * This used to be handled by a negative or zero pointer, now we use an
2059 : * extra byte specifically for the state.
2060 : */
2061 8266 : offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
2062 :
2063 8266 : switch (offsetFlg)
2064 : {
2065 8266 : case K_OFFSET_POS_NOT_SET:
2066 : case K_OFFSET_NO_DATA:
2067 : case K_OFFSET_POS_SET:
2068 :
2069 8266 : break;
2070 :
2071 0 : default:
2072 0 : pg_fatal("unexpected data offset flag %d", offsetFlg);
2073 : }
2074 :
2075 : /*
2076 : * Read the bytes
2077 : */
2078 74394 : for (off = 0; off < AH->offSize; off++)
2079 : {
2080 66128 : if (off < sizeof(pgoff_t))
2081 66128 : *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
2082 : else
2083 : {
2084 0 : if (AH->ReadBytePtr(AH) != 0)
2085 0 : pg_fatal("file offset in dump file is too large");
2086 : }
2087 : }
2088 :
2089 8266 : return offsetFlg;
2090 : }
2091 :
2092 : size_t
2093 300374 : WriteInt(ArchiveHandle *AH, int i)
2094 : {
2095 : int b;
2096 :
2097 : /*
2098 : * This is a bit yucky, but I don't want to make the binary format very
2099 : * dependent on representation, and not knowing much about it, I write out
2100 : * a sign byte. If you change this, don't forget to change the file
2101 : * version #, and modify ReadInt to read the new format AS WELL AS the old
2102 : * formats.
2103 : */
2104 :
2105 : /* SIGN byte */
2106 300374 : if (i < 0)
2107 : {
2108 63294 : AH->WriteBytePtr(AH, 1);
2109 63294 : i = -i;
2110 : }
2111 : else
2112 237080 : AH->WriteBytePtr(AH, 0);
2113 :
2114 1501870 : for (b = 0; b < AH->intSize; b++)
2115 : {
2116 1201496 : AH->WriteBytePtr(AH, i & 0xFF);
2117 1201496 : i >>= 8;
2118 : }
2119 :
2120 300374 : return AH->intSize + 1;
2121 : }
2122 :
2123 : int
2124 228792 : ReadInt(ArchiveHandle *AH)
2125 : {
2126 228792 : int res = 0;
2127 : int bv,
2128 : b;
2129 228792 : int sign = 0; /* Default positive */
2130 228792 : int bitShift = 0;
2131 :
2132 228792 : if (AH->version > K_VERS_1_0)
2133 : /* Read a sign byte */
2134 228792 : sign = AH->ReadBytePtr(AH);
2135 :
2136 1143960 : for (b = 0; b < AH->intSize; b++)
2137 : {
2138 915168 : bv = AH->ReadBytePtr(AH) & 0xFF;
2139 915168 : if (bv != 0)
2140 214466 : res = res + (bv << bitShift);
2141 915168 : bitShift += 8;
2142 : }
2143 :
2144 228792 : if (sign)
2145 49328 : res = -res;
2146 :
2147 228792 : return res;
2148 : }
2149 :
2150 : size_t
2151 234930 : WriteStr(ArchiveHandle *AH, const char *c)
2152 : {
2153 : size_t res;
2154 :
2155 234930 : if (c)
2156 : {
2157 171636 : int len = strlen(c);
2158 :
2159 171636 : res = WriteInt(AH, len);
2160 171636 : AH->WriteBufPtr(AH, c, len);
2161 171636 : res += len;
2162 : }
2163 : else
2164 63294 : res = WriteInt(AH, -1);
2165 :
2166 234930 : return res;
2167 : }
2168 :
2169 : char *
2170 179108 : ReadStr(ArchiveHandle *AH)
2171 : {
2172 : char *buf;
2173 : int l;
2174 :
2175 179108 : l = ReadInt(AH);
2176 179108 : if (l < 0)
2177 49328 : buf = NULL;
2178 : else
2179 : {
2180 129780 : buf = (char *) pg_malloc(l + 1);
2181 129780 : AH->ReadBufPtr(AH, (void *) buf, l);
2182 :
2183 129780 : buf[l] = '\0';
2184 : }
2185 :
2186 179108 : return buf;
2187 : }
2188 :
2189 : static bool
2190 22 : _fileExistsInDirectory(const char *dir, const char *filename)
2191 : {
2192 : struct stat st;
2193 : char buf[MAXPGPATH];
2194 :
2195 22 : if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2196 0 : pg_fatal("directory name too long: \"%s\"", dir);
2197 :
2198 22 : return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2199 : }
2200 :
2201 : static int
2202 56 : _discoverArchiveFormat(ArchiveHandle *AH)
2203 : {
2204 : FILE *fh;
2205 : char sig[6]; /* More than enough */
2206 : size_t cnt;
2207 56 : int wantClose = 0;
2208 :
2209 56 : pg_log_debug("attempting to ascertain archive format");
2210 :
2211 56 : free(AH->lookahead);
2212 :
2213 56 : AH->readHeader = 0;
2214 56 : AH->lookaheadSize = 512;
2215 56 : AH->lookahead = pg_malloc0(512);
2216 56 : AH->lookaheadLen = 0;
2217 56 : AH->lookaheadPos = 0;
2218 :
2219 56 : if (AH->fSpec)
2220 : {
2221 : struct stat st;
2222 :
2223 56 : wantClose = 1;
2224 :
2225 : /*
2226 : * Check if the specified archive is a directory. If so, check if
2227 : * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2228 : */
2229 56 : if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2230 : {
2231 22 : AH->format = archDirectory;
2232 22 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2233 22 : return AH->format;
2234 : #ifdef HAVE_LIBZ
2235 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2236 0 : return AH->format;
2237 : #endif
2238 : #ifdef USE_LZ4
2239 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2240 0 : return AH->format;
2241 : #endif
2242 : #ifdef USE_ZSTD
2243 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2244 : return AH->format;
2245 : #endif
2246 0 : pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2247 : AH->fSpec);
2248 : fh = NULL; /* keep compiler quiet */
2249 : }
2250 : else
2251 : {
2252 34 : fh = fopen(AH->fSpec, PG_BINARY_R);
2253 34 : if (!fh)
2254 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2255 : }
2256 : }
2257 : else
2258 : {
2259 0 : fh = stdin;
2260 0 : if (!fh)
2261 0 : pg_fatal("could not open input file: %m");
2262 : }
2263 :
2264 34 : if ((cnt = fread(sig, 1, 5, fh)) != 5)
2265 : {
2266 0 : if (ferror(fh))
2267 0 : pg_fatal("could not read input file: %m");
2268 : else
2269 0 : pg_fatal("input file is too short (read %lu, expected 5)",
2270 : (unsigned long) cnt);
2271 : }
2272 :
2273 : /* Save it, just in case we need it later */
2274 34 : memcpy(&AH->lookahead[0], sig, 5);
2275 34 : AH->lookaheadLen = 5;
2276 :
2277 34 : if (strncmp(sig, "PGDMP", 5) == 0)
2278 : {
2279 : /* It's custom format, stop here */
2280 32 : AH->format = archCustom;
2281 32 : AH->readHeader = 1;
2282 : }
2283 : else
2284 : {
2285 : /*
2286 : * *Maybe* we have a tar archive format file or a text dump ... So,
2287 : * read first 512 byte header...
2288 : */
2289 2 : cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2290 : /* read failure is checked below */
2291 2 : AH->lookaheadLen += cnt;
2292 :
2293 2 : if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2294 2 : (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2295 2 : strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2296 : {
2297 : /*
2298 : * looks like it's probably a text format dump. so suggest they
2299 : * try psql
2300 : */
2301 0 : pg_fatal("input file appears to be a text format dump. Please use psql.");
2302 : }
2303 :
2304 2 : if (AH->lookaheadLen != 512)
2305 : {
2306 0 : if (feof(fh))
2307 0 : pg_fatal("input file does not appear to be a valid archive (too short?)");
2308 : else
2309 0 : READ_ERROR_EXIT(fh);
2310 : }
2311 :
2312 2 : if (!isValidTarHeader(AH->lookahead))
2313 0 : pg_fatal("input file does not appear to be a valid archive");
2314 :
2315 2 : AH->format = archTar;
2316 : }
2317 :
2318 : /* Close the file if we opened it */
2319 34 : if (wantClose)
2320 : {
2321 34 : if (fclose(fh) != 0)
2322 0 : pg_fatal("could not close input file: %m");
2323 : /* Forget lookahead, since we'll re-read header after re-opening */
2324 34 : AH->readHeader = 0;
2325 34 : AH->lookaheadLen = 0;
2326 : }
2327 :
2328 34 : return AH->format;
2329 : }
2330 :
2331 :
2332 : /*
2333 : * Allocate an archive handle
2334 : */
2335 : static ArchiveHandle *
2336 424 : _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2337 : const pg_compress_specification compression_spec,
2338 : bool dosync, ArchiveMode mode,
2339 : SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
2340 : {
2341 : ArchiveHandle *AH;
2342 : CompressFileHandle *CFH;
2343 424 : pg_compress_specification out_compress_spec = {0};
2344 :
2345 424 : pg_log_debug("allocating AH for %s, format %d",
2346 : FileSpec ? FileSpec : "(stdio)", fmt);
2347 :
2348 424 : AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2349 :
2350 424 : AH->version = K_VERS_SELF;
2351 :
2352 : /* initialize for backwards compatible string processing */
2353 424 : AH->public.encoding = 0; /* PG_SQL_ASCII */
2354 424 : AH->public.std_strings = false;
2355 :
2356 : /* sql error handling */
2357 424 : AH->public.exit_on_error = true;
2358 424 : AH->public.n_errors = 0;
2359 :
2360 424 : AH->archiveDumpVersion = PG_VERSION;
2361 :
2362 424 : AH->createDate = time(NULL);
2363 :
2364 424 : AH->intSize = sizeof(int);
2365 424 : AH->offSize = sizeof(pgoff_t);
2366 424 : if (FileSpec)
2367 : {
2368 374 : AH->fSpec = pg_strdup(FileSpec);
2369 :
2370 : /*
2371 : * Not used; maybe later....
2372 : *
2373 : * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2374 : * i--) if (AH->workDir[i-1] == '/')
2375 : */
2376 : }
2377 : else
2378 50 : AH->fSpec = NULL;
2379 :
2380 424 : AH->currUser = NULL; /* unknown */
2381 424 : AH->currSchema = NULL; /* ditto */
2382 424 : AH->currTablespace = NULL; /* ditto */
2383 424 : AH->currTableAm = NULL; /* ditto */
2384 :
2385 424 : AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2386 :
2387 424 : AH->toc->next = AH->toc;
2388 424 : AH->toc->prev = AH->toc;
2389 :
2390 424 : AH->mode = mode;
2391 424 : AH->compression_spec = compression_spec;
2392 424 : AH->dosync = dosync;
2393 424 : AH->sync_method = sync_method;
2394 :
2395 424 : memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2396 :
2397 : /* Open stdout with no compression for AH output handle */
2398 424 : out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2399 424 : CFH = InitCompressFileHandle(out_compress_spec);
2400 424 : if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2401 0 : pg_fatal("could not open stdout for appending: %m");
2402 424 : AH->OF = CFH;
2403 :
2404 : /*
2405 : * On Windows, we need to use binary mode to read/write non-text files,
2406 : * which include all archive formats as well as compressed plain text.
2407 : * Force stdin/stdout into binary mode if that is what we are using.
2408 : */
2409 : #ifdef WIN32
2410 : if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2411 : (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2412 : {
2413 : if (mode == archModeWrite)
2414 : _setmode(fileno(stdout), O_BINARY);
2415 : else
2416 : _setmode(fileno(stdin), O_BINARY);
2417 : }
2418 : #endif
2419 :
2420 424 : AH->SetupWorkerPtr = setupWorkerPtr;
2421 :
2422 424 : if (fmt == archUnknown)
2423 56 : AH->format = _discoverArchiveFormat(AH);
2424 : else
2425 368 : AH->format = fmt;
2426 :
2427 424 : switch (AH->format)
2428 : {
2429 86 : case archCustom:
2430 86 : InitArchiveFmt_Custom(AH);
2431 86 : break;
2432 :
2433 284 : case archNull:
2434 284 : InitArchiveFmt_Null(AH);
2435 284 : break;
2436 :
2437 44 : case archDirectory:
2438 44 : InitArchiveFmt_Directory(AH);
2439 44 : break;
2440 :
2441 10 : case archTar:
2442 10 : InitArchiveFmt_Tar(AH);
2443 8 : break;
2444 :
2445 0 : default:
2446 0 : pg_fatal("unrecognized file format \"%d\"", fmt);
2447 : }
2448 :
2449 422 : return AH;
2450 : }
2451 :
2452 : /*
2453 : * Write out all data (tables & LOs)
2454 : */
2455 : void
2456 62 : WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2457 : {
2458 : TocEntry *te;
2459 :
2460 62 : if (pstate && pstate->numWorkers > 1)
2461 16 : {
2462 : /*
2463 : * In parallel mode, this code runs in the leader process. We
2464 : * construct an array of candidate TEs, then sort it into decreasing
2465 : * size order, then dispatch each TE to a data-transfer worker. By
2466 : * dumping larger tables first, we avoid getting into a situation
2467 : * where we're down to one job and it's big, losing parallelism.
2468 : */
2469 : TocEntry **tes;
2470 : int ntes;
2471 :
2472 16 : tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2473 16 : ntes = 0;
2474 2034 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2475 : {
2476 : /* Consider only TEs with dataDumper functions ... */
2477 2018 : if (!te->dataDumper)
2478 1782 : continue;
2479 : /* ... and ignore ones not enabled for dump */
2480 236 : if ((te->reqs & REQ_DATA) == 0)
2481 0 : continue;
2482 :
2483 236 : tes[ntes++] = te;
2484 : }
2485 :
2486 16 : if (ntes > 1)
2487 14 : qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
2488 :
2489 252 : for (int i = 0; i < ntes; i++)
2490 236 : DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2491 : mark_dump_job_done, NULL);
2492 :
2493 16 : pg_free(tes);
2494 :
2495 : /* Now wait for workers to finish. */
2496 16 : WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2497 : }
2498 : else
2499 : {
2500 : /* Non-parallel mode: just dump all candidate TEs sequentially. */
2501 7674 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2502 : {
2503 : /* Must have same filter conditions as above */
2504 7628 : if (!te->dataDumper)
2505 7260 : continue;
2506 368 : if ((te->reqs & REQ_DATA) == 0)
2507 6 : continue;
2508 :
2509 362 : WriteDataChunksForTocEntry(AH, te);
2510 : }
2511 : }
2512 62 : }
2513 :
2514 :
2515 : /*
2516 : * Callback function that's invoked in the leader process after a step has
2517 : * been parallel dumped.
2518 : *
2519 : * We don't need to do anything except check for worker failure.
2520 : */
2521 : static void
2522 236 : mark_dump_job_done(ArchiveHandle *AH,
2523 : TocEntry *te,
2524 : int status,
2525 : void *callback_data)
2526 : {
2527 236 : pg_log_info("finished item %d %s %s",
2528 : te->dumpId, te->desc, te->tag);
2529 :
2530 236 : if (status != 0)
2531 0 : pg_fatal("worker process failed: exit code %d",
2532 : status);
2533 236 : }
2534 :
2535 :
2536 : void
2537 598 : WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2538 : {
2539 : StartDataPtrType startPtr;
2540 : EndDataPtrType endPtr;
2541 :
2542 598 : AH->currToc = te;
2543 :
2544 598 : if (strcmp(te->desc, "BLOBS") == 0)
2545 : {
2546 32 : startPtr = AH->StartLOsPtr;
2547 32 : endPtr = AH->EndLOsPtr;
2548 : }
2549 : else
2550 : {
2551 566 : startPtr = AH->StartDataPtr;
2552 566 : endPtr = AH->EndDataPtr;
2553 : }
2554 :
2555 598 : if (startPtr != NULL)
2556 598 : (*startPtr) (AH, te);
2557 :
2558 : /*
2559 : * The user-provided DataDumper routine needs to call AH->WriteData
2560 : */
2561 598 : te->dataDumper((Archive *) AH, te->dataDumperArg);
2562 :
2563 598 : if (endPtr != NULL)
2564 598 : (*endPtr) (AH, te);
2565 :
2566 598 : AH->currToc = NULL;
2567 598 : }
2568 :
2569 : void
2570 100 : WriteToc(ArchiveHandle *AH)
2571 : {
2572 : TocEntry *te;
2573 : char workbuf[32];
2574 : int tocCount;
2575 : int i;
2576 :
2577 : /* count entries that will actually be dumped */
2578 100 : tocCount = 0;
2579 16150 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2580 : {
2581 16050 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2582 16038 : tocCount++;
2583 : }
2584 :
2585 : /* printf("%d TOC Entries to save\n", tocCount); */
2586 :
2587 100 : WriteInt(AH, tocCount);
2588 :
2589 16150 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2590 : {
2591 16050 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2592 12 : continue;
2593 :
2594 16038 : WriteInt(AH, te->dumpId);
2595 16038 : WriteInt(AH, te->dataDumper ? 1 : 0);
2596 :
2597 : /* OID is recorded as a string for historical reasons */
2598 16038 : sprintf(workbuf, "%u", te->catalogId.tableoid);
2599 16038 : WriteStr(AH, workbuf);
2600 16038 : sprintf(workbuf, "%u", te->catalogId.oid);
2601 16038 : WriteStr(AH, workbuf);
2602 :
2603 16038 : WriteStr(AH, te->tag);
2604 16038 : WriteStr(AH, te->desc);
2605 16038 : WriteInt(AH, te->section);
2606 16038 : WriteStr(AH, te->defn);
2607 16038 : WriteStr(AH, te->dropStmt);
2608 16038 : WriteStr(AH, te->copyStmt);
2609 16038 : WriteStr(AH, te->namespace);
2610 16038 : WriteStr(AH, te->tablespace);
2611 16038 : WriteStr(AH, te->tableam);
2612 16038 : WriteInt(AH, te->relkind);
2613 16038 : WriteStr(AH, te->owner);
2614 16038 : WriteStr(AH, "false");
2615 :
2616 : /* Dump list of dependencies */
2617 39046 : for (i = 0; i < te->nDeps; i++)
2618 : {
2619 23008 : sprintf(workbuf, "%d", te->dependencies[i]);
2620 23008 : WriteStr(AH, workbuf);
2621 : }
2622 16038 : WriteStr(AH, NULL); /* Terminate List */
2623 :
2624 16038 : if (AH->WriteExtraTocPtr)
2625 16038 : AH->WriteExtraTocPtr(AH, te);
2626 : }
2627 100 : }
2628 :
2629 : void
2630 76 : ReadToc(ArchiveHandle *AH)
2631 : {
2632 : int i;
2633 : char *tmp;
2634 : DumpId *deps;
2635 : int depIdx;
2636 : int depSize;
2637 : TocEntry *te;
2638 : bool is_supported;
2639 :
2640 76 : AH->tocCount = ReadInt(AH);
2641 76 : AH->maxDumpId = 0;
2642 :
2643 12166 : for (i = 0; i < AH->tocCount; i++)
2644 : {
2645 12090 : te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2646 12090 : te->dumpId = ReadInt(AH);
2647 :
2648 12090 : if (te->dumpId > AH->maxDumpId)
2649 2674 : AH->maxDumpId = te->dumpId;
2650 :
2651 : /* Sanity check */
2652 12090 : if (te->dumpId <= 0)
2653 0 : pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2654 : te->dumpId);
2655 :
2656 12090 : te->hadDumper = ReadInt(AH);
2657 :
2658 12090 : if (AH->version >= K_VERS_1_8)
2659 : {
2660 12090 : tmp = ReadStr(AH);
2661 12090 : sscanf(tmp, "%u", &te->catalogId.tableoid);
2662 12090 : free(tmp);
2663 : }
2664 : else
2665 0 : te->catalogId.tableoid = InvalidOid;
2666 12090 : tmp = ReadStr(AH);
2667 12090 : sscanf(tmp, "%u", &te->catalogId.oid);
2668 12090 : free(tmp);
2669 :
2670 12090 : te->tag = ReadStr(AH);
2671 12090 : te->desc = ReadStr(AH);
2672 :
2673 12090 : if (AH->version >= K_VERS_1_11)
2674 : {
2675 12090 : te->section = ReadInt(AH);
2676 : }
2677 : else
2678 : {
2679 : /*
2680 : * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2681 : * the entries into sections. This list need not cover entry
2682 : * types added later than 8.4.
2683 : */
2684 0 : if (strcmp(te->desc, "COMMENT") == 0 ||
2685 0 : strcmp(te->desc, "ACL") == 0 ||
2686 0 : strcmp(te->desc, "ACL LANGUAGE") == 0)
2687 0 : te->section = SECTION_NONE;
2688 0 : else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2689 0 : strcmp(te->desc, "BLOBS") == 0 ||
2690 0 : strcmp(te->desc, "BLOB COMMENTS") == 0)
2691 0 : te->section = SECTION_DATA;
2692 0 : else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2693 0 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2694 0 : strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2695 0 : strcmp(te->desc, "INDEX") == 0 ||
2696 0 : strcmp(te->desc, "RULE") == 0 ||
2697 0 : strcmp(te->desc, "TRIGGER") == 0)
2698 0 : te->section = SECTION_POST_DATA;
2699 : else
2700 0 : te->section = SECTION_PRE_DATA;
2701 : }
2702 :
2703 12090 : te->defn = ReadStr(AH);
2704 12090 : te->dropStmt = ReadStr(AH);
2705 :
2706 12090 : if (AH->version >= K_VERS_1_3)
2707 12090 : te->copyStmt = ReadStr(AH);
2708 :
2709 12090 : if (AH->version >= K_VERS_1_6)
2710 12090 : te->namespace = ReadStr(AH);
2711 :
2712 12090 : if (AH->version >= K_VERS_1_10)
2713 12090 : te->tablespace = ReadStr(AH);
2714 :
2715 12090 : if (AH->version >= K_VERS_1_14)
2716 12090 : te->tableam = ReadStr(AH);
2717 :
2718 12090 : if (AH->version >= K_VERS_1_16)
2719 12090 : te->relkind = ReadInt(AH);
2720 :
2721 12090 : te->owner = ReadStr(AH);
2722 12090 : is_supported = true;
2723 12090 : if (AH->version < K_VERS_1_9)
2724 0 : is_supported = false;
2725 : else
2726 : {
2727 12090 : tmp = ReadStr(AH);
2728 :
2729 12090 : if (strcmp(tmp, "true") == 0)
2730 0 : is_supported = false;
2731 :
2732 12090 : free(tmp);
2733 : }
2734 :
2735 12090 : if (!is_supported)
2736 0 : pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2737 :
2738 : /* Read TOC entry dependencies */
2739 12090 : if (AH->version >= K_VERS_1_5)
2740 : {
2741 12090 : depSize = 100;
2742 12090 : deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2743 12090 : depIdx = 0;
2744 : for (;;)
2745 : {
2746 29976 : tmp = ReadStr(AH);
2747 29976 : if (!tmp)
2748 12090 : break; /* end of list */
2749 17886 : if (depIdx >= depSize)
2750 : {
2751 0 : depSize *= 2;
2752 0 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2753 : }
2754 17886 : sscanf(tmp, "%d", &deps[depIdx]);
2755 17886 : free(tmp);
2756 17886 : depIdx++;
2757 : }
2758 :
2759 12090 : if (depIdx > 0) /* We have a non-null entry */
2760 : {
2761 9676 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2762 9676 : te->dependencies = deps;
2763 9676 : te->nDeps = depIdx;
2764 : }
2765 : else
2766 : {
2767 2414 : free(deps);
2768 2414 : te->dependencies = NULL;
2769 2414 : te->nDeps = 0;
2770 : }
2771 : }
2772 : else
2773 : {
2774 0 : te->dependencies = NULL;
2775 0 : te->nDeps = 0;
2776 : }
2777 12090 : te->dataLength = 0;
2778 :
2779 12090 : if (AH->ReadExtraTocPtr)
2780 12090 : AH->ReadExtraTocPtr(AH, te);
2781 :
2782 12090 : pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2783 : i, te->dumpId, te->desc, te->tag);
2784 :
2785 : /* link completed entry into TOC circular list */
2786 12090 : te->prev = AH->toc->prev;
2787 12090 : AH->toc->prev->next = te;
2788 12090 : AH->toc->prev = te;
2789 12090 : te->next = AH->toc;
2790 :
2791 : /* special processing immediately upon read for some items */
2792 12090 : if (strcmp(te->desc, "ENCODING") == 0)
2793 76 : processEncodingEntry(AH, te);
2794 12014 : else if (strcmp(te->desc, "STDSTRINGS") == 0)
2795 76 : processStdStringsEntry(AH, te);
2796 11938 : else if (strcmp(te->desc, "SEARCHPATH") == 0)
2797 76 : processSearchPathEntry(AH, te);
2798 : }
2799 76 : }
2800 :
2801 : static void
2802 76 : processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2803 : {
2804 : /* te->defn should have the form SET client_encoding = 'foo'; */
2805 76 : char *defn = pg_strdup(te->defn);
2806 : char *ptr1;
2807 76 : char *ptr2 = NULL;
2808 : int encoding;
2809 :
2810 76 : ptr1 = strchr(defn, '\'');
2811 76 : if (ptr1)
2812 76 : ptr2 = strchr(++ptr1, '\'');
2813 76 : if (ptr2)
2814 : {
2815 76 : *ptr2 = '\0';
2816 76 : encoding = pg_char_to_encoding(ptr1);
2817 76 : if (encoding < 0)
2818 0 : pg_fatal("unrecognized encoding \"%s\"",
2819 : ptr1);
2820 76 : AH->public.encoding = encoding;
2821 : }
2822 : else
2823 0 : pg_fatal("invalid ENCODING item: %s",
2824 : te->defn);
2825 :
2826 76 : free(defn);
2827 76 : }
2828 :
2829 : static void
2830 76 : processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2831 : {
2832 : /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2833 : char *ptr1;
2834 :
2835 76 : ptr1 = strchr(te->defn, '\'');
2836 76 : if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2837 76 : AH->public.std_strings = true;
2838 0 : else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2839 0 : AH->public.std_strings = false;
2840 : else
2841 0 : pg_fatal("invalid STDSTRINGS item: %s",
2842 : te->defn);
2843 76 : }
2844 :
2845 : static void
2846 76 : processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2847 : {
2848 : /*
2849 : * te->defn should contain a command to set search_path. We just copy it
2850 : * verbatim for use later.
2851 : */
2852 76 : AH->public.searchpath = pg_strdup(te->defn);
2853 76 : }
2854 :
2855 : static void
2856 0 : StrictNamesCheck(RestoreOptions *ropt)
2857 : {
2858 : const char *missing_name;
2859 :
2860 : Assert(ropt->strict_names);
2861 :
2862 0 : if (ropt->schemaNames.head != NULL)
2863 : {
2864 0 : missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2865 0 : if (missing_name != NULL)
2866 0 : pg_fatal("schema \"%s\" not found", missing_name);
2867 : }
2868 :
2869 0 : if (ropt->tableNames.head != NULL)
2870 : {
2871 0 : missing_name = simple_string_list_not_touched(&ropt->tableNames);
2872 0 : if (missing_name != NULL)
2873 0 : pg_fatal("table \"%s\" not found", missing_name);
2874 : }
2875 :
2876 0 : if (ropt->indexNames.head != NULL)
2877 : {
2878 0 : missing_name = simple_string_list_not_touched(&ropt->indexNames);
2879 0 : if (missing_name != NULL)
2880 0 : pg_fatal("index \"%s\" not found", missing_name);
2881 : }
2882 :
2883 0 : if (ropt->functionNames.head != NULL)
2884 : {
2885 0 : missing_name = simple_string_list_not_touched(&ropt->functionNames);
2886 0 : if (missing_name != NULL)
2887 0 : pg_fatal("function \"%s\" not found", missing_name);
2888 : }
2889 :
2890 0 : if (ropt->triggerNames.head != NULL)
2891 : {
2892 0 : missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2893 0 : if (missing_name != NULL)
2894 0 : pg_fatal("trigger \"%s\" not found", missing_name);
2895 : }
2896 0 : }
2897 :
2898 : /*
2899 : * Determine whether we want to restore this TOC entry.
2900 : *
2901 : * Returns 0 if entry should be skipped, or some combination of the
2902 : * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2903 : * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2904 : */
2905 : static int
2906 83110 : _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2907 : {
2908 83110 : int res = REQ_SCHEMA | REQ_DATA;
2909 83110 : RestoreOptions *ropt = AH->public.ropt;
2910 :
2911 : /* These items are treated specially */
2912 83110 : if (strcmp(te->desc, "ENCODING") == 0 ||
2913 82726 : strcmp(te->desc, "STDSTRINGS") == 0 ||
2914 82342 : strcmp(te->desc, "SEARCHPATH") == 0)
2915 1152 : return REQ_SPECIAL;
2916 :
2917 : /*
2918 : * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2919 : * restored in createDB mode, and not restored otherwise, independently of
2920 : * all else.
2921 : */
2922 81958 : if (strcmp(te->desc, "DATABASE") == 0 ||
2923 81762 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2924 : {
2925 252 : if (ropt->createDB)
2926 196 : return REQ_SCHEMA;
2927 : else
2928 56 : return 0;
2929 : }
2930 :
2931 : /*
2932 : * Process exclusions that affect certain classes of TOC entries.
2933 : */
2934 :
2935 : /* If it's an ACL, maybe ignore it */
2936 81706 : if (ropt->aclsSkip && _tocEntryIsACL(te))
2937 0 : return 0;
2938 :
2939 : /* If it's a comment, maybe ignore it */
2940 81706 : if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2941 0 : return 0;
2942 :
2943 : /*
2944 : * If it's a publication or a table part of a publication, maybe ignore
2945 : * it.
2946 : */
2947 81706 : if (ropt->no_publications &&
2948 0 : (strcmp(te->desc, "PUBLICATION") == 0 ||
2949 0 : strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2950 0 : strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2951 0 : return 0;
2952 :
2953 : /* If it's a security label, maybe ignore it */
2954 81706 : if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2955 0 : return 0;
2956 :
2957 : /* If it's a subscription, maybe ignore it */
2958 81706 : if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2959 0 : return 0;
2960 :
2961 : /* Ignore it if section is not to be dumped/restored */
2962 81706 : switch (curSection)
2963 : {
2964 58518 : case SECTION_PRE_DATA:
2965 58518 : if (!(ropt->dumpSections & DUMP_PRE_DATA))
2966 700 : return 0;
2967 57818 : break;
2968 9656 : case SECTION_DATA:
2969 9656 : if (!(ropt->dumpSections & DUMP_DATA))
2970 172 : return 0;
2971 9484 : break;
2972 13532 : case SECTION_POST_DATA:
2973 13532 : if (!(ropt->dumpSections & DUMP_POST_DATA))
2974 312 : return 0;
2975 13220 : break;
2976 0 : default:
2977 : /* shouldn't get here, really, but ignore it */
2978 0 : return 0;
2979 : }
2980 :
2981 : /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2982 80522 : if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2983 0 : return 0;
2984 :
2985 : /*
2986 : * Check options for selective dump/restore.
2987 : */
2988 80522 : if (strcmp(te->desc, "ACL") == 0 ||
2989 75686 : strcmp(te->desc, "COMMENT") == 0 ||
2990 62496 : strcmp(te->desc, "SECURITY LABEL") == 0)
2991 : {
2992 : /* Database properties react to createDB, not selectivity options. */
2993 18026 : if (strncmp(te->tag, "DATABASE ", 9) == 0)
2994 : {
2995 118 : if (!ropt->createDB)
2996 38 : return 0;
2997 : }
2998 17908 : else if (ropt->schemaNames.head != NULL ||
2999 17908 : ropt->schemaExcludeNames.head != NULL ||
3000 17908 : ropt->selTypes)
3001 : {
3002 : /*
3003 : * In a selective dump/restore, we want to restore these dependent
3004 : * TOC entry types only if their parent object is being restored.
3005 : * Without selectivity options, we let through everything in the
3006 : * archive. Note there may be such entries with no parent, eg
3007 : * non-default ACLs for built-in objects. Also, we make
3008 : * per-column ACLs additionally depend on the table's ACL if any
3009 : * to ensure correct restore order, so those dependencies should
3010 : * be ignored in this check.
3011 : *
3012 : * This code depends on the parent having been marked already,
3013 : * which should be the case; if it isn't, perhaps due to
3014 : * SortTocFromFile rearrangement, skipping the dependent entry
3015 : * seems prudent anyway.
3016 : *
3017 : * Ideally we'd handle, eg, table CHECK constraints this way too.
3018 : * But it's hard to tell which of their dependencies is the one to
3019 : * consult.
3020 : */
3021 0 : bool dumpthis = false;
3022 :
3023 0 : for (int i = 0; i < te->nDeps; i++)
3024 : {
3025 0 : TocEntry *pte = getTocEntryByDumpId(AH, te->dependencies[i]);
3026 :
3027 0 : if (!pte)
3028 0 : continue; /* probably shouldn't happen */
3029 0 : if (strcmp(pte->desc, "ACL") == 0)
3030 0 : continue; /* ignore dependency on another ACL */
3031 0 : if (pte->reqs == 0)
3032 0 : continue; /* this object isn't marked, so ignore it */
3033 : /* Found a parent to be dumped, so we want to dump this too */
3034 0 : dumpthis = true;
3035 0 : break;
3036 : }
3037 0 : if (!dumpthis)
3038 0 : return 0;
3039 : }
3040 : }
3041 : else
3042 : {
3043 : /* Apply selective-restore rules for standalone TOC entries. */
3044 62496 : if (ropt->schemaNames.head != NULL)
3045 : {
3046 : /* If no namespace is specified, it means all. */
3047 40 : if (!te->namespace)
3048 4 : return 0;
3049 36 : if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
3050 28 : return 0;
3051 : }
3052 :
3053 62464 : if (ropt->schemaExcludeNames.head != NULL &&
3054 76 : te->namespace &&
3055 36 : simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
3056 8 : return 0;
3057 :
3058 62456 : if (ropt->selTypes)
3059 : {
3060 156 : if (strcmp(te->desc, "TABLE") == 0 ||
3061 116 : strcmp(te->desc, "TABLE DATA") == 0 ||
3062 76 : strcmp(te->desc, "VIEW") == 0 ||
3063 76 : strcmp(te->desc, "FOREIGN TABLE") == 0 ||
3064 76 : strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
3065 76 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
3066 76 : strcmp(te->desc, "SEQUENCE") == 0 ||
3067 70 : strcmp(te->desc, "SEQUENCE SET") == 0)
3068 : {
3069 92 : if (!ropt->selTable)
3070 60 : return 0;
3071 32 : if (ropt->tableNames.head != NULL &&
3072 32 : !simple_string_list_member(&ropt->tableNames, te->tag))
3073 28 : return 0;
3074 : }
3075 64 : else if (strcmp(te->desc, "INDEX") == 0)
3076 : {
3077 12 : if (!ropt->selIndex)
3078 8 : return 0;
3079 4 : if (ropt->indexNames.head != NULL &&
3080 4 : !simple_string_list_member(&ropt->indexNames, te->tag))
3081 2 : return 0;
3082 : }
3083 52 : else if (strcmp(te->desc, "FUNCTION") == 0 ||
3084 28 : strcmp(te->desc, "AGGREGATE") == 0 ||
3085 28 : strcmp(te->desc, "PROCEDURE") == 0)
3086 : {
3087 24 : if (!ropt->selFunction)
3088 8 : return 0;
3089 16 : if (ropt->functionNames.head != NULL &&
3090 16 : !simple_string_list_member(&ropt->functionNames, te->tag))
3091 12 : return 0;
3092 : }
3093 28 : else if (strcmp(te->desc, "TRIGGER") == 0)
3094 : {
3095 12 : if (!ropt->selTrigger)
3096 8 : return 0;
3097 4 : if (ropt->triggerNames.head != NULL &&
3098 4 : !simple_string_list_member(&ropt->triggerNames, te->tag))
3099 2 : return 0;
3100 : }
3101 : else
3102 16 : return 0;
3103 : }
3104 : }
3105 :
3106 : /*
3107 : * Determine whether the TOC entry contains schema and/or data components,
3108 : * and mask off inapplicable REQ bits. If it had a dataDumper, assume
3109 : * it's both schema and data. Otherwise it's probably schema-only, but
3110 : * there are exceptions.
3111 : */
3112 80300 : if (!te->hadDumper)
3113 : {
3114 : /*
3115 : * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then it
3116 : * is considered a data entry. We don't need to check for BLOBS or
3117 : * old-style BLOB COMMENTS entries, because they will have hadDumper =
3118 : * true ... but we do need to check new-style BLOB ACLs, comments,
3119 : * etc.
3120 : */
3121 72160 : if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
3122 71264 : strcmp(te->desc, "BLOB") == 0 ||
3123 71264 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3124 71072 : (strcmp(te->desc, "ACL") == 0 &&
3125 4836 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3126 70984 : (strcmp(te->desc, "COMMENT") == 0 &&
3127 13152 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3128 70870 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3129 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))
3130 1290 : res = res & REQ_DATA;
3131 : else
3132 70870 : res = res & ~REQ_DATA;
3133 : }
3134 :
3135 : /*
3136 : * If there's no definition command, there's no schema component. Treat
3137 : * "load via partition root" comments as not schema.
3138 : */
3139 80300 : if (!te->defn || !te->defn[0] ||
3140 72184 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
3141 8140 : res = res & ~REQ_SCHEMA;
3142 :
3143 : /*
3144 : * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3145 : * always ignore it.
3146 : */
3147 80300 : if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3148 0 : return 0;
3149 :
3150 : /* Mask it if we only want schema */
3151 80300 : if (ropt->schemaOnly)
3152 : {
3153 : /*
3154 : * The sequence_data option overrides schemaOnly for SEQUENCE SET.
3155 : *
3156 : * In binary-upgrade mode, even with schemaOnly set, we do not mask
3157 : * out large objects. (Only large object definitions, comments and
3158 : * other metadata should be generated in binary-upgrade mode, not the
3159 : * actual data, but that need not concern us here.)
3160 : */
3161 5012 : if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3162 4900 : !(ropt->binary_upgrade &&
3163 4404 : (strcmp(te->desc, "BLOB") == 0 ||
3164 4404 : strcmp(te->desc, "BLOB METADATA") == 0 ||
3165 4398 : (strcmp(te->desc, "ACL") == 0 &&
3166 164 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3167 4396 : (strcmp(te->desc, "COMMENT") == 0 &&
3168 108 : strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
3169 4390 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3170 0 : strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
3171 4886 : res = res & REQ_SCHEMA;
3172 : }
3173 :
3174 : /* Mask it if we only want data */
3175 80300 : if (ropt->dataOnly)
3176 290 : res = res & REQ_DATA;
3177 :
3178 80300 : return res;
3179 : }
3180 :
3181 : /*
3182 : * Identify which pass we should restore this TOC entry in.
3183 : *
3184 : * See notes with the RestorePass typedef in pg_backup_archiver.h.
3185 : */
3186 : static RestorePass
3187 178356 : _tocEntryRestorePass(TocEntry *te)
3188 : {
3189 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3190 178356 : if (strcmp(te->desc, "ACL") == 0 ||
3191 168210 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3192 168210 : strcmp(te->desc, "DEFAULT ACL") == 0)
3193 10878 : return RESTORE_PASS_ACL;
3194 167478 : if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3195 167264 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3196 1652 : return RESTORE_PASS_POST_ACL;
3197 :
3198 : /*
3199 : * Comments need to be emitted in the same pass as their parent objects.
3200 : * ACLs haven't got comments, and neither do matview data objects, but
3201 : * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3202 : * we'd need yet another weird special case.)
3203 : */
3204 165826 : if (strcmp(te->desc, "COMMENT") == 0 &&
3205 26434 : strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3206 0 : return RESTORE_PASS_POST_ACL;
3207 :
3208 : /* All else can be handled in the main pass. */
3209 165826 : return RESTORE_PASS_MAIN;
3210 : }
3211 :
3212 : /*
3213 : * Identify TOC entries that are ACLs.
3214 : *
3215 : * Note: it seems worth duplicating some code here to avoid a hard-wired
3216 : * assumption that these are exactly the same entries that we restore during
3217 : * the RESTORE_PASS_ACL phase.
3218 : */
3219 : static bool
3220 69818 : _tocEntryIsACL(TocEntry *te)
3221 : {
3222 : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3223 69818 : if (strcmp(te->desc, "ACL") == 0 ||
3224 66286 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3225 66286 : strcmp(te->desc, "DEFAULT ACL") == 0)
3226 3776 : return true;
3227 66042 : return false;
3228 : }
3229 :
3230 : /*
3231 : * Issue SET commands for parameters that we want to have set the same way
3232 : * at all times during execution of a restore script.
3233 : */
3234 : static void
3235 458 : _doSetFixedOutputState(ArchiveHandle *AH)
3236 : {
3237 458 : RestoreOptions *ropt = AH->public.ropt;
3238 :
3239 : /*
3240 : * Disable timeouts to allow for slow commands, idle parallel workers, etc
3241 : */
3242 458 : ahprintf(AH, "SET statement_timeout = 0;\n");
3243 458 : ahprintf(AH, "SET lock_timeout = 0;\n");
3244 458 : ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3245 458 : ahprintf(AH, "SET transaction_timeout = 0;\n");
3246 :
3247 : /* Select the correct character set encoding */
3248 458 : ahprintf(AH, "SET client_encoding = '%s';\n",
3249 : pg_encoding_to_char(AH->public.encoding));
3250 :
3251 : /* Select the correct string literal syntax */
3252 458 : ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3253 458 : AH->public.std_strings ? "on" : "off");
3254 :
3255 : /* Select the role to be used during restore */
3256 458 : if (ropt && ropt->use_role)
3257 0 : ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3258 :
3259 : /* Select the dump-time search_path */
3260 458 : if (AH->public.searchpath)
3261 458 : ahprintf(AH, "%s", AH->public.searchpath);
3262 :
3263 : /* Make sure function checking is disabled */
3264 458 : ahprintf(AH, "SET check_function_bodies = false;\n");
3265 :
3266 : /* Ensure that all valid XML data will be accepted */
3267 458 : ahprintf(AH, "SET xmloption = content;\n");
3268 :
3269 : /* Avoid annoying notices etc */
3270 458 : ahprintf(AH, "SET client_min_messages = warning;\n");
3271 458 : if (!AH->public.std_strings)
3272 0 : ahprintf(AH, "SET escape_string_warning = off;\n");
3273 :
3274 : /* Adjust row-security state */
3275 458 : if (ropt && ropt->enable_row_security)
3276 0 : ahprintf(AH, "SET row_security = on;\n");
3277 : else
3278 458 : ahprintf(AH, "SET row_security = off;\n");
3279 :
3280 : /*
3281 : * In --transaction-size mode, we should always be in a transaction when
3282 : * we begin to restore objects.
3283 : */
3284 458 : if (ropt && ropt->txn_size > 0)
3285 : {
3286 60 : if (AH->connection)
3287 60 : StartTransaction(&AH->public);
3288 : else
3289 0 : ahprintf(AH, "\nBEGIN;\n");
3290 60 : AH->txnCount = 0;
3291 : }
3292 :
3293 458 : ahprintf(AH, "\n");
3294 458 : }
3295 :
3296 : /*
3297 : * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3298 : * for updating state if appropriate. If user is NULL or an empty string,
3299 : * the specification DEFAULT will be used.
3300 : */
3301 : static void
3302 2 : _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3303 : {
3304 2 : PQExpBuffer cmd = createPQExpBuffer();
3305 :
3306 2 : appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3307 :
3308 : /*
3309 : * SQL requires a string literal here. Might as well be correct.
3310 : */
3311 2 : if (user && *user)
3312 2 : appendStringLiteralAHX(cmd, user, AH);
3313 : else
3314 0 : appendPQExpBufferStr(cmd, "DEFAULT");
3315 2 : appendPQExpBufferChar(cmd, ';');
3316 :
3317 2 : if (RestoringToDB(AH))
3318 : {
3319 : PGresult *res;
3320 :
3321 0 : res = PQexec(AH->connection, cmd->data);
3322 :
3323 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3324 : /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3325 0 : pg_fatal("could not set session user to \"%s\": %s",
3326 : user, PQerrorMessage(AH->connection));
3327 :
3328 0 : PQclear(res);
3329 : }
3330 : else
3331 2 : ahprintf(AH, "%s\n\n", cmd->data);
3332 :
3333 2 : destroyPQExpBuffer(cmd);
3334 2 : }
3335 :
3336 :
3337 : /*
3338 : * Issue the commands to connect to the specified database.
3339 : *
3340 : * If we're currently restoring right into a database, this will
3341 : * actually establish a connection. Otherwise it puts a \connect into
3342 : * the script output.
3343 : */
3344 : static void
3345 112 : _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3346 : {
3347 112 : if (RestoringToDB(AH))
3348 42 : ReconnectToServer(AH, dbname);
3349 : else
3350 : {
3351 : PQExpBufferData connectbuf;
3352 :
3353 70 : initPQExpBuffer(&connectbuf);
3354 70 : appendPsqlMetaConnect(&connectbuf, dbname);
3355 70 : ahprintf(AH, "%s\n", connectbuf.data);
3356 70 : termPQExpBuffer(&connectbuf);
3357 : }
3358 :
3359 : /*
3360 : * NOTE: currUser keeps track of what the imaginary session user in our
3361 : * script is. It's now effectively reset to the original userID.
3362 : */
3363 112 : free(AH->currUser);
3364 112 : AH->currUser = NULL;
3365 :
3366 : /* don't assume we still know the output schema, tablespace, etc either */
3367 112 : free(AH->currSchema);
3368 112 : AH->currSchema = NULL;
3369 :
3370 112 : free(AH->currTableAm);
3371 112 : AH->currTableAm = NULL;
3372 :
3373 112 : free(AH->currTablespace);
3374 112 : AH->currTablespace = NULL;
3375 :
3376 : /* re-establish fixed state */
3377 112 : _doSetFixedOutputState(AH);
3378 112 : }
3379 :
3380 : /*
3381 : * Become the specified user, and update state to avoid redundant commands
3382 : *
3383 : * NULL or empty argument is taken to mean restoring the session default
3384 : */
3385 : static void
3386 120 : _becomeUser(ArchiveHandle *AH, const char *user)
3387 : {
3388 120 : if (!user)
3389 0 : user = ""; /* avoid null pointers */
3390 :
3391 120 : if (AH->currUser && strcmp(AH->currUser, user) == 0)
3392 118 : return; /* no need to do anything */
3393 :
3394 2 : _doSetSessionAuth(AH, user);
3395 :
3396 : /*
3397 : * NOTE: currUser keeps track of what the imaginary session user in our
3398 : * script is
3399 : */
3400 2 : free(AH->currUser);
3401 2 : AH->currUser = pg_strdup(user);
3402 : }
3403 :
3404 : /*
3405 : * Become the owner of the given TOC entry object. If
3406 : * changes in ownership are not allowed, this doesn't do anything.
3407 : */
3408 : static void
3409 77146 : _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3410 : {
3411 77146 : RestoreOptions *ropt = AH->public.ropt;
3412 :
3413 77146 : if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3414 77146 : return;
3415 :
3416 0 : _becomeUser(AH, te->owner);
3417 : }
3418 :
3419 :
3420 : /*
3421 : * Issue the commands to select the specified schema as the current schema
3422 : * in the target database.
3423 : */
3424 : static void
3425 77282 : _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3426 : {
3427 : PQExpBuffer qry;
3428 :
3429 : /*
3430 : * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3431 : * that search_path rather than switching to entry-specific paths.
3432 : * Otherwise, it's an old archive that will not restore correctly unless
3433 : * we set the search_path as it's expecting.
3434 : */
3435 77282 : if (AH->public.searchpath)
3436 77282 : return;
3437 :
3438 0 : if (!schemaName || *schemaName == '\0' ||
3439 0 : (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3440 0 : return; /* no need to do anything */
3441 :
3442 0 : qry = createPQExpBuffer();
3443 :
3444 0 : appendPQExpBuffer(qry, "SET search_path = %s",
3445 : fmtId(schemaName));
3446 0 : if (strcmp(schemaName, "pg_catalog") != 0)
3447 0 : appendPQExpBufferStr(qry, ", pg_catalog");
3448 :
3449 0 : if (RestoringToDB(AH))
3450 : {
3451 : PGresult *res;
3452 :
3453 0 : res = PQexec(AH->connection, qry->data);
3454 :
3455 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3456 0 : warn_or_exit_horribly(AH,
3457 : "could not set \"search_path\" to \"%s\": %s",
3458 0 : schemaName, PQerrorMessage(AH->connection));
3459 :
3460 0 : PQclear(res);
3461 : }
3462 : else
3463 0 : ahprintf(AH, "%s;\n\n", qry->data);
3464 :
3465 0 : free(AH->currSchema);
3466 0 : AH->currSchema = pg_strdup(schemaName);
3467 :
3468 0 : destroyPQExpBuffer(qry);
3469 : }
3470 :
3471 : /*
3472 : * Issue the commands to select the specified tablespace as the current one
3473 : * in the target database.
3474 : */
3475 : static void
3476 69348 : _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3477 : {
3478 69348 : RestoreOptions *ropt = AH->public.ropt;
3479 : PQExpBuffer qry;
3480 : const char *want,
3481 : *have;
3482 :
3483 : /* do nothing in --no-tablespaces mode */
3484 69348 : if (ropt->noTablespace)
3485 0 : return;
3486 :
3487 69348 : have = AH->currTablespace;
3488 69348 : want = tablespace;
3489 :
3490 : /* no need to do anything for non-tablespace object */
3491 69348 : if (!want)
3492 54774 : return;
3493 :
3494 14574 : if (have && strcmp(want, have) == 0)
3495 14154 : return; /* no need to do anything */
3496 :
3497 420 : qry = createPQExpBuffer();
3498 :
3499 420 : if (strcmp(want, "") == 0)
3500 : {
3501 : /* We want the tablespace to be the database's default */
3502 312 : appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3503 : }
3504 : else
3505 : {
3506 : /* We want an explicit tablespace */
3507 108 : appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3508 : }
3509 :
3510 420 : if (RestoringToDB(AH))
3511 : {
3512 : PGresult *res;
3513 :
3514 26 : res = PQexec(AH->connection, qry->data);
3515 :
3516 26 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3517 0 : warn_or_exit_horribly(AH,
3518 : "could not set \"default_tablespace\" to %s: %s",
3519 0 : fmtId(want), PQerrorMessage(AH->connection));
3520 :
3521 26 : PQclear(res);
3522 : }
3523 : else
3524 394 : ahprintf(AH, "%s;\n\n", qry->data);
3525 :
3526 420 : free(AH->currTablespace);
3527 420 : AH->currTablespace = pg_strdup(want);
3528 :
3529 420 : destroyPQExpBuffer(qry);
3530 : }
3531 :
3532 : /*
3533 : * Set the proper default_table_access_method value for the table.
3534 : */
3535 : static void
3536 68318 : _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3537 : {
3538 68318 : RestoreOptions *ropt = AH->public.ropt;
3539 : PQExpBuffer cmd;
3540 : const char *want,
3541 : *have;
3542 :
3543 : /* do nothing in --no-table-access-method mode */
3544 68318 : if (ropt->noTableAm)
3545 554 : return;
3546 :
3547 67764 : have = AH->currTableAm;
3548 67764 : want = tableam;
3549 :
3550 67764 : if (!want)
3551 58886 : return;
3552 :
3553 8878 : if (have && strcmp(want, have) == 0)
3554 8280 : return;
3555 :
3556 598 : cmd = createPQExpBuffer();
3557 598 : appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3558 :
3559 598 : if (RestoringToDB(AH))
3560 : {
3561 : PGresult *res;
3562 :
3563 22 : res = PQexec(AH->connection, cmd->data);
3564 :
3565 22 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3566 0 : warn_or_exit_horribly(AH,
3567 : "could not set \"default_table_access_method\": %s",
3568 0 : PQerrorMessage(AH->connection));
3569 :
3570 22 : PQclear(res);
3571 : }
3572 : else
3573 576 : ahprintf(AH, "%s\n\n", cmd->data);
3574 :
3575 598 : destroyPQExpBuffer(cmd);
3576 :
3577 598 : free(AH->currTableAm);
3578 598 : AH->currTableAm = pg_strdup(want);
3579 : }
3580 :
3581 : /*
3582 : * Set the proper default table access method for a table without storage.
3583 : * Currently, this is required only for partitioned tables with a table AM.
3584 : */
3585 : static void
3586 1030 : _printTableAccessMethodNoStorage(ArchiveHandle *AH, TocEntry *te)
3587 : {
3588 1030 : RestoreOptions *ropt = AH->public.ropt;
3589 1030 : const char *tableam = te->tableam;
3590 : PQExpBuffer cmd;
3591 :
3592 : /* do nothing in --no-table-access-method mode */
3593 1030 : if (ropt->noTableAm)
3594 6 : return;
3595 :
3596 1024 : if (!tableam)
3597 962 : return;
3598 :
3599 : Assert(te->relkind == RELKIND_PARTITIONED_TABLE);
3600 :
3601 62 : cmd = createPQExpBuffer();
3602 :
3603 62 : appendPQExpBufferStr(cmd, "ALTER TABLE ");
3604 62 : appendPQExpBuffer(cmd, "%s ", fmtQualifiedId(te->namespace, te->tag));
3605 62 : appendPQExpBuffer(cmd, "SET ACCESS METHOD %s;",
3606 : fmtId(tableam));
3607 :
3608 62 : if (RestoringToDB(AH))
3609 : {
3610 : PGresult *res;
3611 :
3612 0 : res = PQexec(AH->connection, cmd->data);
3613 :
3614 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3615 0 : warn_or_exit_horribly(AH,
3616 : "could not alter table access method: %s",
3617 0 : PQerrorMessage(AH->connection));
3618 0 : PQclear(res);
3619 : }
3620 : else
3621 62 : ahprintf(AH, "%s\n\n", cmd->data);
3622 :
3623 62 : destroyPQExpBuffer(cmd);
3624 : }
3625 :
3626 : /*
3627 : * Extract an object description for a TOC entry, and append it to buf.
3628 : *
3629 : * This is used for ALTER ... OWNER TO.
3630 : *
3631 : * If the object type has no owner, do nothing.
3632 : */
3633 : static void
3634 38518 : _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
3635 : {
3636 38518 : const char *type = te->desc;
3637 :
3638 : /* objects that don't require special decoration */
3639 38518 : if (strcmp(type, "COLLATION") == 0 ||
3640 33626 : strcmp(type, "CONVERSION") == 0 ||
3641 32796 : strcmp(type, "DOMAIN") == 0 ||
3642 32534 : strcmp(type, "FOREIGN TABLE") == 0 ||
3643 32468 : strcmp(type, "MATERIALIZED VIEW") == 0 ||
3644 31830 : strcmp(type, "SEQUENCE") == 0 ||
3645 31428 : strcmp(type, "STATISTICS") == 0 ||
3646 31204 : strcmp(type, "TABLE") == 0 ||
3647 21944 : strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3648 21618 : strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3649 21342 : strcmp(type, "TYPE") == 0 ||
3650 20188 : strcmp(type, "VIEW") == 0 ||
3651 : /* non-schema-specified objects */
3652 19178 : strcmp(type, "DATABASE") == 0 ||
3653 19094 : strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3654 19034 : strcmp(type, "SCHEMA") == 0 ||
3655 18720 : strcmp(type, "EVENT TRIGGER") == 0 ||
3656 18650 : strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3657 18564 : strcmp(type, "SERVER") == 0 ||
3658 18474 : strcmp(type, "PUBLICATION") == 0 ||
3659 18172 : strcmp(type, "SUBSCRIPTION") == 0)
3660 : {
3661 20530 : appendPQExpBuffer(buf, "%s ", type);
3662 20530 : if (te->namespace && *te->namespace)
3663 19340 : appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3664 20530 : appendPQExpBufferStr(buf, fmtId(te->tag));
3665 : }
3666 : /* LOs just have a name, but it's numeric so must not use fmtId */
3667 17988 : else if (strcmp(type, "BLOB") == 0)
3668 : {
3669 0 : appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3670 : }
3671 :
3672 : /*
3673 : * These object types require additional decoration. Fortunately, the
3674 : * information needed is exactly what's in the DROP command.
3675 : */
3676 17988 : else if (strcmp(type, "AGGREGATE") == 0 ||
3677 17454 : strcmp(type, "FUNCTION") == 0 ||
3678 14368 : strcmp(type, "OPERATOR") == 0 ||
3679 9402 : strcmp(type, "OPERATOR CLASS") == 0 ||
3680 8124 : strcmp(type, "OPERATOR FAMILY") == 0 ||
3681 7054 : strcmp(type, "PROCEDURE") == 0)
3682 : {
3683 : /* Chop "DROP " off the front and make a modifiable copy */
3684 11116 : char *first = pg_strdup(te->dropStmt + 5);
3685 : char *last;
3686 :
3687 : /* point to last character in string */
3688 11116 : last = first + strlen(first) - 1;
3689 :
3690 : /* Strip off any ';' or '\n' at the end */
3691 33348 : while (last >= first && (*last == '\n' || *last == ';'))
3692 22232 : last--;
3693 11116 : *(last + 1) = '\0';
3694 :
3695 11116 : appendPQExpBufferStr(buf, first);
3696 :
3697 11116 : free(first);
3698 11116 : return;
3699 : }
3700 : /* these object types don't have separate owners */
3701 6872 : else if (strcmp(type, "CAST") == 0 ||
3702 6872 : strcmp(type, "CHECK CONSTRAINT") == 0 ||
3703 6822 : strcmp(type, "CONSTRAINT") == 0 ||
3704 4206 : strcmp(type, "DATABASE PROPERTIES") == 0 ||
3705 4200 : strcmp(type, "DEFAULT") == 0 ||
3706 3924 : strcmp(type, "FK CONSTRAINT") == 0 ||
3707 3600 : strcmp(type, "INDEX") == 0 ||
3708 1668 : strcmp(type, "RULE") == 0 ||
3709 1246 : strcmp(type, "TRIGGER") == 0 ||
3710 522 : strcmp(type, "ROW SECURITY") == 0 ||
3711 522 : strcmp(type, "POLICY") == 0 ||
3712 60 : strcmp(type, "USER MAPPING") == 0)
3713 : {
3714 : /* do nothing */
3715 : }
3716 : else
3717 0 : pg_fatal("don't know how to set owner for object type \"%s\"", type);
3718 : }
3719 :
3720 : /*
3721 : * Emit the SQL commands to create the object represented by a TOC entry
3722 : *
3723 : * This now also includes issuing an ALTER OWNER command to restore the
3724 : * object's ownership, if wanted. But note that the object's permissions
3725 : * will remain at default, until the matching ACL TOC entry is restored.
3726 : */
3727 : static void
3728 69348 : _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3729 : {
3730 69348 : RestoreOptions *ropt = AH->public.ropt;
3731 :
3732 : /*
3733 : * Select owner, schema, tablespace and default AM as necessary. The
3734 : * default access method for partitioned tables is handled after
3735 : * generating the object definition, as it requires an ALTER command
3736 : * rather than SET.
3737 : */
3738 69348 : _becomeOwner(AH, te);
3739 69348 : _selectOutputSchema(AH, te->namespace);
3740 69348 : _selectTablespace(AH, te->tablespace);
3741 69348 : if (te->relkind != RELKIND_PARTITIONED_TABLE)
3742 68318 : _selectTableAccessMethod(AH, te->tableam);
3743 :
3744 : /* Emit header comment for item */
3745 69348 : if (!AH->noTocComments)
3746 : {
3747 : const char *pfx;
3748 : char *sanitized_name;
3749 : char *sanitized_schema;
3750 : char *sanitized_owner;
3751 :
3752 65224 : if (isData)
3753 7314 : pfx = "Data for ";
3754 : else
3755 57910 : pfx = "";
3756 :
3757 65224 : ahprintf(AH, "--\n");
3758 65224 : if (AH->public.verbose)
3759 : {
3760 1700 : ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3761 : te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3762 1700 : if (te->nDeps > 0)
3763 : {
3764 : int i;
3765 :
3766 988 : ahprintf(AH, "-- Dependencies:");
3767 2524 : for (i = 0; i < te->nDeps; i++)
3768 1536 : ahprintf(AH, " %d", te->dependencies[i]);
3769 988 : ahprintf(AH, "\n");
3770 : }
3771 : }
3772 :
3773 65224 : sanitized_name = sanitize_line(te->tag, false);
3774 65224 : sanitized_schema = sanitize_line(te->namespace, true);
3775 65224 : sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3776 :
3777 65224 : ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3778 : pfx, sanitized_name, te->desc, sanitized_schema,
3779 : sanitized_owner);
3780 :
3781 65224 : free(sanitized_name);
3782 65224 : free(sanitized_schema);
3783 65224 : free(sanitized_owner);
3784 :
3785 65224 : if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3786 : {
3787 : char *sanitized_tablespace;
3788 :
3789 204 : sanitized_tablespace = sanitize_line(te->tablespace, false);
3790 204 : ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3791 204 : free(sanitized_tablespace);
3792 : }
3793 65224 : ahprintf(AH, "\n");
3794 :
3795 65224 : if (AH->PrintExtraTocPtr != NULL)
3796 5820 : AH->PrintExtraTocPtr(AH, te);
3797 65224 : ahprintf(AH, "--\n\n");
3798 : }
3799 :
3800 : /*
3801 : * Actually print the definition. Normally we can just print the defn
3802 : * string if any, but we have three special cases:
3803 : *
3804 : * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
3805 : * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3806 : * "public" that is a comment. We have to do this when --no-owner mode is
3807 : * selected. This is ugly, but I see no other good way ...
3808 : *
3809 : * 2. BLOB METADATA entries need special processing since their defn
3810 : * strings are just lists of OIDs, not complete SQL commands.
3811 : *
3812 : * 3. ACL LARGE OBJECTS entries need special processing because they
3813 : * contain only one copy of the ACL GRANT/REVOKE commands, which we must
3814 : * apply to each large object listed in the associated BLOB METADATA.
3815 : */
3816 69348 : if (ropt->noOwner &&
3817 592 : strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3818 : {
3819 4 : ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3820 : }
3821 69344 : else if (strcmp(te->desc, "BLOB METADATA") == 0)
3822 : {
3823 142 : IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
3824 : }
3825 69202 : else if (strcmp(te->desc, "ACL") == 0 &&
3826 3532 : strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
3827 : {
3828 0 : IssueACLPerBlob(AH, te);
3829 : }
3830 69202 : else if (te->defn && strlen(te->defn) > 0)
3831 : {
3832 61868 : ahprintf(AH, "%s\n\n", te->defn);
3833 :
3834 : /*
3835 : * If the defn string contains multiple SQL commands, txn_size mode
3836 : * should count it as N actions not one. But rather than build a full
3837 : * SQL parser, approximate this by counting semicolons. One case
3838 : * where that tends to be badly fooled is function definitions, so
3839 : * ignore them. (restore_toc_entry will count one action anyway.)
3840 : */
3841 61868 : if (ropt->txn_size > 0 &&
3842 3960 : strcmp(te->desc, "FUNCTION") != 0 &&
3843 3438 : strcmp(te->desc, "PROCEDURE") != 0)
3844 : {
3845 3414 : const char *p = te->defn;
3846 3414 : int nsemis = 0;
3847 :
3848 19024 : while ((p = strchr(p, ';')) != NULL)
3849 : {
3850 15610 : nsemis++;
3851 15610 : p++;
3852 : }
3853 3414 : if (nsemis > 1)
3854 2214 : AH->txnCount += nsemis - 1;
3855 : }
3856 : }
3857 :
3858 : /*
3859 : * If we aren't using SET SESSION AUTH to determine ownership, we must
3860 : * instead issue an ALTER OWNER command. Schema "public" is special; when
3861 : * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3862 : * when using SET SESSION for all other objects. We assume that anything
3863 : * without a DROP command is not a separately ownable object.
3864 : */
3865 69348 : if (!ropt->noOwner &&
3866 68756 : (!ropt->use_setsessauth ||
3867 0 : (strcmp(te->desc, "SCHEMA") == 0 &&
3868 0 : strncmp(te->defn, "--", 2) == 0)) &&
3869 68756 : te->owner && strlen(te->owner) > 0 &&
3870 68028 : te->dropStmt && strlen(te->dropStmt) > 0)
3871 : {
3872 38656 : if (strcmp(te->desc, "BLOB METADATA") == 0)
3873 : {
3874 : /* BLOB METADATA needs special code to handle multiple LOs */
3875 138 : char *cmdEnd = psprintf(" OWNER TO %s", fmtId(te->owner));
3876 :
3877 138 : IssueCommandPerBlob(AH, te, "ALTER LARGE OBJECT ", cmdEnd);
3878 138 : pg_free(cmdEnd);
3879 : }
3880 : else
3881 : {
3882 : /* For all other cases, we can use _getObjectDescription */
3883 : PQExpBufferData temp;
3884 :
3885 38518 : initPQExpBuffer(&temp);
3886 38518 : _getObjectDescription(&temp, te);
3887 :
3888 : /*
3889 : * If _getObjectDescription() didn't fill the buffer, then there
3890 : * is no owner.
3891 : */
3892 38518 : if (temp.data[0])
3893 31646 : ahprintf(AH, "ALTER %s OWNER TO %s;\n\n",
3894 31646 : temp.data, fmtId(te->owner));
3895 38518 : termPQExpBuffer(&temp);
3896 : }
3897 : }
3898 :
3899 : /*
3900 : * Select a partitioned table's default AM, once the table definition has
3901 : * been generated.
3902 : */
3903 69348 : if (te->relkind == RELKIND_PARTITIONED_TABLE)
3904 1030 : _printTableAccessMethodNoStorage(AH, te);
3905 :
3906 : /*
3907 : * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3908 : * commands, so we can no longer assume we know the current auth setting.
3909 : */
3910 69348 : if (_tocEntryIsACL(te))
3911 : {
3912 3776 : free(AH->currUser);
3913 3776 : AH->currUser = NULL;
3914 : }
3915 69348 : }
3916 :
3917 : /*
3918 : * Sanitize a string to be included in an SQL comment or TOC listing, by
3919 : * replacing any newlines with spaces. This ensures each logical output line
3920 : * is in fact one physical output line, to prevent corruption of the dump
3921 : * (which could, in the worst case, present an SQL injection vulnerability
3922 : * if someone were to incautiously load a dump containing objects with
3923 : * maliciously crafted names).
3924 : *
3925 : * The result is a freshly malloc'd string. If the input string is NULL,
3926 : * return a malloc'ed empty string, unless want_hyphen, in which case return a
3927 : * malloc'ed hyphen.
3928 : *
3929 : * Note that we currently don't bother to quote names, meaning that the name
3930 : * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3931 : * it only examines the dumpId field, but someday we might want to try harder.
3932 : */
3933 : static char *
3934 202604 : sanitize_line(const char *str, bool want_hyphen)
3935 : {
3936 : char *result;
3937 : char *s;
3938 :
3939 202604 : if (!str)
3940 4070 : return pg_strdup(want_hyphen ? "-" : "");
3941 :
3942 198534 : result = pg_strdup(str);
3943 :
3944 2531922 : for (s = result; *s != '\0'; s++)
3945 : {
3946 2333388 : if (*s == '\n' || *s == '\r')
3947 120 : *s = ' ';
3948 : }
3949 :
3950 198534 : return result;
3951 : }
3952 :
3953 : /*
3954 : * Write the file header for a custom-format archive
3955 : */
3956 : void
3957 62 : WriteHead(ArchiveHandle *AH)
3958 : {
3959 : struct tm crtm;
3960 :
3961 62 : AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3962 62 : AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3963 62 : AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3964 62 : AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3965 62 : AH->WriteBytePtr(AH, AH->intSize);
3966 62 : AH->WriteBytePtr(AH, AH->offSize);
3967 62 : AH->WriteBytePtr(AH, AH->format);
3968 62 : AH->WriteBytePtr(AH, AH->compression_spec.algorithm);
3969 62 : crtm = *localtime(&AH->createDate);
3970 62 : WriteInt(AH, crtm.tm_sec);
3971 62 : WriteInt(AH, crtm.tm_min);
3972 62 : WriteInt(AH, crtm.tm_hour);
3973 62 : WriteInt(AH, crtm.tm_mday);
3974 62 : WriteInt(AH, crtm.tm_mon);
3975 62 : WriteInt(AH, crtm.tm_year);
3976 62 : WriteInt(AH, crtm.tm_isdst);
3977 62 : WriteStr(AH, PQdb(AH->connection));
3978 62 : WriteStr(AH, AH->public.remoteVersionStr);
3979 62 : WriteStr(AH, PG_VERSION);
3980 62 : }
3981 :
3982 : void
3983 76 : ReadHead(ArchiveHandle *AH)
3984 : {
3985 : char *errmsg;
3986 : char vmaj,
3987 : vmin,
3988 : vrev;
3989 : int fmt;
3990 :
3991 : /*
3992 : * If we haven't already read the header, do so.
3993 : *
3994 : * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3995 : * way to unify the cases?
3996 : */
3997 76 : if (!AH->readHeader)
3998 : {
3999 : char tmpMag[7];
4000 :
4001 76 : AH->ReadBufPtr(AH, tmpMag, 5);
4002 :
4003 76 : if (strncmp(tmpMag, "PGDMP", 5) != 0)
4004 0 : pg_fatal("did not find magic string in file header");
4005 : }
4006 :
4007 76 : vmaj = AH->ReadBytePtr(AH);
4008 76 : vmin = AH->ReadBytePtr(AH);
4009 :
4010 76 : if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
4011 76 : vrev = AH->ReadBytePtr(AH);
4012 : else
4013 0 : vrev = 0;
4014 :
4015 76 : AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
4016 :
4017 76 : if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
4018 0 : pg_fatal("unsupported version (%d.%d) in file header",
4019 : vmaj, vmin);
4020 :
4021 76 : AH->intSize = AH->ReadBytePtr(AH);
4022 76 : if (AH->intSize > 32)
4023 0 : pg_fatal("sanity check on integer size (%lu) failed",
4024 : (unsigned long) AH->intSize);
4025 :
4026 76 : if (AH->intSize > sizeof(int))
4027 0 : pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
4028 :
4029 76 : if (AH->version >= K_VERS_1_7)
4030 76 : AH->offSize = AH->ReadBytePtr(AH);
4031 : else
4032 0 : AH->offSize = AH->intSize;
4033 :
4034 76 : fmt = AH->ReadBytePtr(AH);
4035 :
4036 76 : if (AH->format != fmt)
4037 0 : pg_fatal("expected format (%d) differs from format found in file (%d)",
4038 : AH->format, fmt);
4039 :
4040 76 : if (AH->version >= K_VERS_1_15)
4041 76 : AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
4042 0 : else if (AH->version >= K_VERS_1_2)
4043 : {
4044 : /* Guess the compression method based on the level */
4045 0 : if (AH->version < K_VERS_1_4)
4046 0 : AH->compression_spec.level = AH->ReadBytePtr(AH);
4047 : else
4048 0 : AH->compression_spec.level = ReadInt(AH);
4049 :
4050 0 : if (AH->compression_spec.level != 0)
4051 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4052 : }
4053 : else
4054 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
4055 :
4056 76 : errmsg = supports_compression(AH->compression_spec);
4057 76 : if (errmsg)
4058 : {
4059 0 : pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
4060 : errmsg);
4061 0 : pg_free(errmsg);
4062 : }
4063 :
4064 76 : if (AH->version >= K_VERS_1_4)
4065 : {
4066 : struct tm crtm;
4067 :
4068 76 : crtm.tm_sec = ReadInt(AH);
4069 76 : crtm.tm_min = ReadInt(AH);
4070 76 : crtm.tm_hour = ReadInt(AH);
4071 76 : crtm.tm_mday = ReadInt(AH);
4072 76 : crtm.tm_mon = ReadInt(AH);
4073 76 : crtm.tm_year = ReadInt(AH);
4074 76 : crtm.tm_isdst = ReadInt(AH);
4075 :
4076 : /*
4077 : * Newer versions of glibc have mktime() report failure if tm_isdst is
4078 : * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
4079 : * TZ=UTC. This is problematic when restoring an archive under a
4080 : * different timezone setting. If we get a failure, try again with
4081 : * tm_isdst set to -1 ("don't know").
4082 : *
4083 : * XXX with or without this hack, we reconstruct createDate
4084 : * incorrectly when the prevailing timezone is different from
4085 : * pg_dump's. Next time we bump the archive version, we should flush
4086 : * this representation and store a plain seconds-since-the-Epoch
4087 : * timestamp instead.
4088 : */
4089 76 : AH->createDate = mktime(&crtm);
4090 76 : if (AH->createDate == (time_t) -1)
4091 : {
4092 0 : crtm.tm_isdst = -1;
4093 0 : AH->createDate = mktime(&crtm);
4094 0 : if (AH->createDate == (time_t) -1)
4095 0 : pg_log_warning("invalid creation date in header");
4096 : }
4097 : }
4098 :
4099 76 : if (AH->version >= K_VERS_1_4)
4100 : {
4101 76 : AH->archdbname = ReadStr(AH);
4102 : }
4103 :
4104 76 : if (AH->version >= K_VERS_1_10)
4105 : {
4106 76 : AH->archiveRemoteVersion = ReadStr(AH);
4107 76 : AH->archiveDumpVersion = ReadStr(AH);
4108 : }
4109 76 : }
4110 :
4111 :
4112 : /*
4113 : * checkSeek
4114 : * check to see if ftell/fseek can be performed.
4115 : */
4116 : bool
4117 96 : checkSeek(FILE *fp)
4118 : {
4119 : pgoff_t tpos;
4120 :
4121 : /* Check that ftello works on this file */
4122 96 : tpos = ftello(fp);
4123 96 : if (tpos < 0)
4124 2 : return false;
4125 :
4126 : /*
4127 : * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
4128 : * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
4129 : * successful no-op even on files that are otherwise unseekable.
4130 : */
4131 94 : if (fseeko(fp, tpos, SEEK_SET) != 0)
4132 0 : return false;
4133 :
4134 94 : return true;
4135 : }
4136 :
4137 :
4138 : /*
4139 : * dumpTimestamp
4140 : */
4141 : static void
4142 84 : dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
4143 : {
4144 : char buf[64];
4145 :
4146 84 : if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
4147 84 : ahprintf(AH, "-- %s %s\n\n", msg, buf);
4148 84 : }
4149 :
4150 : /*
4151 : * Main engine for parallel restore.
4152 : *
4153 : * Parallel restore is done in three phases. In this first phase,
4154 : * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
4155 : * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
4156 : * PRE_DATA items other than ACLs.) Entries we can't process now are
4157 : * added to the pending_list for later phases to deal with.
4158 : */
4159 : static void
4160 8 : restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
4161 : {
4162 : bool skipped_some;
4163 : TocEntry *next_work_item;
4164 :
4165 8 : pg_log_debug("entering restore_toc_entries_prefork");
4166 :
4167 : /* Adjust dependency information */
4168 8 : fix_dependencies(AH);
4169 :
4170 : /*
4171 : * Do all the early stuff in a single connection in the parent. There's no
4172 : * great point in running it in parallel, in fact it will actually run
4173 : * faster in a single connection because we avoid all the connection and
4174 : * setup overhead. Also, pre-9.2 pg_dump versions were not very good
4175 : * about showing all the dependencies of SECTION_PRE_DATA items, so we do
4176 : * not risk trying to process them out-of-order.
4177 : *
4178 : * Stuff that we can't do immediately gets added to the pending_list.
4179 : * Note: we don't yet filter out entries that aren't going to be restored.
4180 : * They might participate in dependency chains connecting entries that
4181 : * should be restored, so we treat them as live until we actually process
4182 : * them.
4183 : *
4184 : * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
4185 : * before DATA items, and all DATA items before POST_DATA items. That is
4186 : * not certain to be true in older archives, though, and in any case use
4187 : * of a list file would destroy that ordering (cf. SortTocFromFile). So
4188 : * this loop cannot assume that it holds.
4189 : */
4190 8 : AH->restorePass = RESTORE_PASS_MAIN;
4191 8 : skipped_some = false;
4192 200 : for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
4193 : {
4194 192 : bool do_now = true;
4195 :
4196 192 : if (next_work_item->section != SECTION_PRE_DATA)
4197 : {
4198 : /* DATA and POST_DATA items are just ignored for now */
4199 92 : if (next_work_item->section == SECTION_DATA ||
4200 60 : next_work_item->section == SECTION_POST_DATA)
4201 : {
4202 92 : do_now = false;
4203 92 : skipped_some = true;
4204 : }
4205 : else
4206 : {
4207 : /*
4208 : * SECTION_NONE items, such as comments, can be processed now
4209 : * if we are still in the PRE_DATA part of the archive. Once
4210 : * we've skipped any items, we have to consider whether the
4211 : * comment's dependencies are satisfied, so skip it for now.
4212 : */
4213 0 : if (skipped_some)
4214 0 : do_now = false;
4215 : }
4216 : }
4217 :
4218 : /*
4219 : * Also skip items that need to be forced into later passes. We need
4220 : * not set skipped_some in this case, since by assumption no main-pass
4221 : * items could depend on these.
4222 : */
4223 192 : if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
4224 0 : do_now = false;
4225 :
4226 192 : if (do_now)
4227 : {
4228 : /* OK, restore the item and update its dependencies */
4229 100 : pg_log_info("processing item %d %s %s",
4230 : next_work_item->dumpId,
4231 : next_work_item->desc, next_work_item->tag);
4232 :
4233 100 : (void) restore_toc_entry(AH, next_work_item, false);
4234 :
4235 : /* Reduce dependencies, but don't move anything to ready_heap */
4236 100 : reduce_dependencies(AH, next_work_item, NULL);
4237 : }
4238 : else
4239 : {
4240 : /* Nope, so add it to pending_list */
4241 92 : pending_list_append(pending_list, next_work_item);
4242 : }
4243 : }
4244 :
4245 : /*
4246 : * In --transaction-size mode, we must commit the open transaction before
4247 : * dropping the database connection. This also ensures that child workers
4248 : * can see the objects we've created so far.
4249 : */
4250 8 : if (AH->public.ropt->txn_size > 0)
4251 0 : CommitTransaction(&AH->public);
4252 :
4253 : /*
4254 : * Now close parent connection in prep for parallel steps. We do this
4255 : * mainly to ensure that we don't exceed the specified number of parallel
4256 : * connections.
4257 : */
4258 8 : DisconnectDatabase(&AH->public);
4259 :
4260 : /* blow away any transient state from the old connection */
4261 8 : free(AH->currUser);
4262 8 : AH->currUser = NULL;
4263 8 : free(AH->currSchema);
4264 8 : AH->currSchema = NULL;
4265 8 : free(AH->currTablespace);
4266 8 : AH->currTablespace = NULL;
4267 8 : free(AH->currTableAm);
4268 8 : AH->currTableAm = NULL;
4269 8 : }
4270 :
4271 : /*
4272 : * Main engine for parallel restore.
4273 : *
4274 : * Parallel restore is done in three phases. In this second phase,
4275 : * we process entries by dispatching them to parallel worker children
4276 : * (processes on Unix, threads on Windows), each of which connects
4277 : * separately to the database. Inter-entry dependencies are respected,
4278 : * and so is the RestorePass multi-pass structure. When we can no longer
4279 : * make any entries ready to process, we exit. Normally, there will be
4280 : * nothing left to do; but if there is, the third phase will mop up.
4281 : */
4282 : static void
4283 8 : restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4284 : TocEntry *pending_list)
4285 : {
4286 : binaryheap *ready_heap;
4287 : TocEntry *next_work_item;
4288 :
4289 8 : pg_log_debug("entering restore_toc_entries_parallel");
4290 :
4291 : /* Set up ready_heap with enough room for all known TocEntrys */
4292 8 : ready_heap = binaryheap_allocate(AH->tocCount,
4293 : TocEntrySizeCompareBinaryheap,
4294 : NULL);
4295 :
4296 : /*
4297 : * The pending_list contains all items that we need to restore. Move all
4298 : * items that are available to process immediately into the ready_heap.
4299 : * After this setup, the pending list is everything that needs to be done
4300 : * but is blocked by one or more dependencies, while the ready heap
4301 : * contains items that have no remaining dependencies and are OK to
4302 : * process in the current restore pass.
4303 : */
4304 8 : AH->restorePass = RESTORE_PASS_MAIN;
4305 8 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4306 :
4307 : /*
4308 : * main parent loop
4309 : *
4310 : * Keep going until there is no worker still running AND there is no work
4311 : * left to be done. Note invariant: at top of loop, there should always
4312 : * be at least one worker available to dispatch a job to.
4313 : */
4314 8 : pg_log_info("entering main parallel loop");
4315 :
4316 : for (;;)
4317 : {
4318 : /* Look for an item ready to be dispatched to a worker */
4319 138 : next_work_item = pop_next_work_item(ready_heap, pstate);
4320 138 : if (next_work_item != NULL)
4321 : {
4322 : /* If not to be restored, don't waste time launching a worker */
4323 92 : if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4324 : {
4325 0 : pg_log_info("skipping item %d %s %s",
4326 : next_work_item->dumpId,
4327 : next_work_item->desc, next_work_item->tag);
4328 : /* Update its dependencies as though we'd completed it */
4329 0 : reduce_dependencies(AH, next_work_item, ready_heap);
4330 : /* Loop around to see if anything else can be dispatched */
4331 0 : continue;
4332 : }
4333 :
4334 92 : pg_log_info("launching item %d %s %s",
4335 : next_work_item->dumpId,
4336 : next_work_item->desc, next_work_item->tag);
4337 :
4338 : /* Dispatch to some worker */
4339 92 : DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4340 : mark_restore_job_done, ready_heap);
4341 : }
4342 46 : else if (IsEveryWorkerIdle(pstate))
4343 : {
4344 : /*
4345 : * Nothing is ready and no worker is running, so we're done with
4346 : * the current pass or maybe with the whole process.
4347 : */
4348 24 : if (AH->restorePass == RESTORE_PASS_LAST)
4349 8 : break; /* No more parallel processing is possible */
4350 :
4351 : /* Advance to next restore pass */
4352 16 : AH->restorePass++;
4353 : /* That probably allows some stuff to be made ready */
4354 16 : move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
4355 : /* Loop around to see if anything's now ready */
4356 16 : continue;
4357 : }
4358 : else
4359 : {
4360 : /*
4361 : * We have nothing ready, but at least one child is working, so
4362 : * wait for some subjob to finish.
4363 : */
4364 : }
4365 :
4366 : /*
4367 : * Before dispatching another job, check to see if anything has
4368 : * finished. We should check every time through the loop so as to
4369 : * reduce dependencies as soon as possible. If we were unable to
4370 : * dispatch any job this time through, wait until some worker finishes
4371 : * (and, hopefully, unblocks some pending item). If we did dispatch
4372 : * something, continue as soon as there's at least one idle worker.
4373 : * Note that in either case, there's guaranteed to be at least one
4374 : * idle worker when we return to the top of the loop. This ensures we
4375 : * won't block inside DispatchJobForTocEntry, which would be
4376 : * undesirable: we'd rather postpone dispatching until we see what's
4377 : * been unblocked by finished jobs.
4378 : */
4379 114 : WaitForWorkers(AH, pstate,
4380 : next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4381 : }
4382 :
4383 : /* There should now be nothing in ready_heap. */
4384 : Assert(binaryheap_empty(ready_heap));
4385 :
4386 8 : binaryheap_free(ready_heap);
4387 :
4388 8 : pg_log_info("finished main parallel loop");
4389 8 : }
4390 :
4391 : /*
4392 : * Main engine for parallel restore.
4393 : *
4394 : * Parallel restore is done in three phases. In this third phase,
4395 : * we mop up any remaining TOC entries by processing them serially.
4396 : * This phase normally should have nothing to do, but if we've somehow
4397 : * gotten stuck due to circular dependencies or some such, this provides
4398 : * at least some chance of completing the restore successfully.
4399 : */
4400 : static void
4401 8 : restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4402 : {
4403 8 : RestoreOptions *ropt = AH->public.ropt;
4404 : TocEntry *te;
4405 :
4406 8 : pg_log_debug("entering restore_toc_entries_postfork");
4407 :
4408 : /*
4409 : * Now reconnect the single parent connection.
4410 : */
4411 8 : ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4412 :
4413 : /* re-establish fixed state */
4414 8 : _doSetFixedOutputState(AH);
4415 :
4416 : /*
4417 : * Make sure there is no work left due to, say, circular dependencies, or
4418 : * some other pathological condition. If so, do it in the single parent
4419 : * connection. We don't sweat about RestorePass ordering; it's likely we
4420 : * already violated that.
4421 : */
4422 8 : for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4423 : {
4424 0 : pg_log_info("processing missed item %d %s %s",
4425 : te->dumpId, te->desc, te->tag);
4426 0 : (void) restore_toc_entry(AH, te, false);
4427 : }
4428 8 : }
4429 :
4430 : /*
4431 : * Check if te1 has an exclusive lock requirement for an item that te2 also
4432 : * requires, whether or not te2's requirement is for an exclusive lock.
4433 : */
4434 : static bool
4435 306 : has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4436 : {
4437 : int j,
4438 : k;
4439 :
4440 630 : for (j = 0; j < te1->nLockDeps; j++)
4441 : {
4442 764 : for (k = 0; k < te2->nDeps; k++)
4443 : {
4444 440 : if (te1->lockDeps[j] == te2->dependencies[k])
4445 2 : return true;
4446 : }
4447 : }
4448 304 : return false;
4449 : }
4450 :
4451 :
4452 : /*
4453 : * Initialize the header of the pending-items list.
4454 : *
4455 : * This is a circular list with a dummy TocEntry as header, just like the
4456 : * main TOC list; but we use separate list links so that an entry can be in
4457 : * the main TOC list as well as in the pending list.
4458 : */
4459 : static void
4460 8 : pending_list_header_init(TocEntry *l)
4461 : {
4462 8 : l->pending_prev = l->pending_next = l;
4463 8 : }
4464 :
4465 : /* Append te to the end of the pending-list headed by l */
4466 : static void
4467 92 : pending_list_append(TocEntry *l, TocEntry *te)
4468 : {
4469 92 : te->pending_prev = l->pending_prev;
4470 92 : l->pending_prev->pending_next = te;
4471 92 : l->pending_prev = te;
4472 92 : te->pending_next = l;
4473 92 : }
4474 :
4475 : /* Remove te from the pending-list */
4476 : static void
4477 92 : pending_list_remove(TocEntry *te)
4478 : {
4479 92 : te->pending_prev->pending_next = te->pending_next;
4480 92 : te->pending_next->pending_prev = te->pending_prev;
4481 92 : te->pending_prev = NULL;
4482 92 : te->pending_next = NULL;
4483 92 : }
4484 :
4485 :
4486 : /* qsort comparator for sorting TocEntries by dataLength */
4487 : static int
4488 1278 : TocEntrySizeCompareQsort(const void *p1, const void *p2)
4489 : {
4490 1278 : const TocEntry *te1 = *(const TocEntry *const *) p1;
4491 1278 : const TocEntry *te2 = *(const TocEntry *const *) p2;
4492 :
4493 : /* Sort by decreasing dataLength */
4494 1278 : if (te1->dataLength > te2->dataLength)
4495 124 : return -1;
4496 1154 : if (te1->dataLength < te2->dataLength)
4497 186 : return 1;
4498 :
4499 : /* For equal dataLengths, sort by dumpId, just to be stable */
4500 968 : if (te1->dumpId < te2->dumpId)
4501 446 : return -1;
4502 522 : if (te1->dumpId > te2->dumpId)
4503 494 : return 1;
4504 :
4505 28 : return 0;
4506 : }
4507 :
4508 : /* binaryheap comparator for sorting TocEntries by dataLength */
4509 : static int
4510 278 : TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg)
4511 : {
4512 : /* return opposite of qsort comparator for max-heap */
4513 278 : return -TocEntrySizeCompareQsort(&p1, &p2);
4514 : }
4515 :
4516 :
4517 : /*
4518 : * Move all immediately-ready items from pending_list to ready_heap.
4519 : *
4520 : * Items are considered ready if they have no remaining dependencies and
4521 : * they belong in the current restore pass. (See also reduce_dependencies,
4522 : * which applies the same logic one-at-a-time.)
4523 : */
4524 : static void
4525 24 : move_to_ready_heap(TocEntry *pending_list,
4526 : binaryheap *ready_heap,
4527 : RestorePass pass)
4528 : {
4529 : TocEntry *te;
4530 : TocEntry *next_te;
4531 :
4532 116 : for (te = pending_list->pending_next; te != pending_list; te = next_te)
4533 : {
4534 : /* must save list link before possibly removing te from list */
4535 92 : next_te = te->pending_next;
4536 :
4537 132 : if (te->depCount == 0 &&
4538 40 : _tocEntryRestorePass(te) == pass)
4539 : {
4540 : /* Remove it from pending_list ... */
4541 40 : pending_list_remove(te);
4542 : /* ... and add to ready_heap */
4543 40 : binaryheap_add(ready_heap, te);
4544 : }
4545 : }
4546 24 : }
4547 :
4548 : /*
4549 : * Find the next work item (if any) that is capable of being run now,
4550 : * and remove it from the ready_heap.
4551 : *
4552 : * Returns the item, or NULL if nothing is runnable.
4553 : *
4554 : * To qualify, the item must have no remaining dependencies
4555 : * and no requirements for locks that are incompatible with
4556 : * items currently running. Items in the ready_heap are known to have
4557 : * no remaining dependencies, but we have to check for lock conflicts.
4558 : */
4559 : static TocEntry *
4560 138 : pop_next_work_item(binaryheap *ready_heap,
4561 : ParallelState *pstate)
4562 : {
4563 : /*
4564 : * Search the ready_heap until we find a suitable item. Note that we do a
4565 : * sequential scan through the heap nodes, so even though we will first
4566 : * try to choose the highest-priority item, we might end up picking
4567 : * something with a much lower priority. However, we expect that we will
4568 : * typically be able to pick one of the first few items, which should
4569 : * usually have a relatively high priority.
4570 : */
4571 140 : for (int i = 0; i < binaryheap_size(ready_heap); i++)
4572 : {
4573 94 : TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
4574 94 : bool conflicts = false;
4575 :
4576 : /*
4577 : * Check to see if the item would need exclusive lock on something
4578 : * that a currently running item also needs lock on, or vice versa. If
4579 : * so, we don't want to schedule them together.
4580 : */
4581 370 : for (int k = 0; k < pstate->numWorkers; k++)
4582 : {
4583 278 : TocEntry *running_te = pstate->te[k];
4584 :
4585 278 : if (running_te == NULL)
4586 124 : continue;
4587 306 : if (has_lock_conflicts(te, running_te) ||
4588 152 : has_lock_conflicts(running_te, te))
4589 : {
4590 2 : conflicts = true;
4591 2 : break;
4592 : }
4593 : }
4594 :
4595 94 : if (conflicts)
4596 2 : continue;
4597 :
4598 : /* passed all tests, so this item can run */
4599 92 : binaryheap_remove_node(ready_heap, i);
4600 92 : return te;
4601 : }
4602 :
4603 46 : pg_log_debug("no item ready");
4604 46 : return NULL;
4605 : }
4606 :
4607 :
4608 : /*
4609 : * Restore a single TOC item in parallel with others
4610 : *
4611 : * this is run in the worker, i.e. in a thread (Windows) or a separate process
4612 : * (everything else). A worker process executes several such work items during
4613 : * a parallel backup or restore. Once we terminate here and report back that
4614 : * our work is finished, the leader process will assign us a new work item.
4615 : */
4616 : int
4617 92 : parallel_restore(ArchiveHandle *AH, TocEntry *te)
4618 : {
4619 : int status;
4620 :
4621 : Assert(AH->connection != NULL);
4622 :
4623 : /* Count only errors associated with this TOC entry */
4624 92 : AH->public.n_errors = 0;
4625 :
4626 : /* Restore the TOC item */
4627 92 : status = restore_toc_entry(AH, te, true);
4628 :
4629 92 : return status;
4630 : }
4631 :
4632 :
4633 : /*
4634 : * Callback function that's invoked in the leader process after a step has
4635 : * been parallel restored.
4636 : *
4637 : * Update status and reduce the dependency count of any dependent items.
4638 : */
4639 : static void
4640 92 : mark_restore_job_done(ArchiveHandle *AH,
4641 : TocEntry *te,
4642 : int status,
4643 : void *callback_data)
4644 : {
4645 92 : binaryheap *ready_heap = (binaryheap *) callback_data;
4646 :
4647 92 : pg_log_info("finished item %d %s %s",
4648 : te->dumpId, te->desc, te->tag);
4649 :
4650 92 : if (status == WORKER_CREATE_DONE)
4651 0 : mark_create_done(AH, te);
4652 92 : else if (status == WORKER_INHIBIT_DATA)
4653 : {
4654 0 : inhibit_data_for_failed_table(AH, te);
4655 0 : AH->public.n_errors++;
4656 : }
4657 92 : else if (status == WORKER_IGNORED_ERRORS)
4658 0 : AH->public.n_errors++;
4659 92 : else if (status != 0)
4660 0 : pg_fatal("worker process failed: exit code %d",
4661 : status);
4662 :
4663 92 : reduce_dependencies(AH, te, ready_heap);
4664 92 : }
4665 :
4666 :
4667 : /*
4668 : * Process the dependency information into a form useful for parallel restore.
4669 : *
4670 : * This function takes care of fixing up some missing or badly designed
4671 : * dependencies, and then prepares subsidiary data structures that will be
4672 : * used in the main parallel-restore logic, including:
4673 : * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4674 : * 2. We set up depCount fields that are the number of as-yet-unprocessed
4675 : * dependencies for each TOC entry.
4676 : *
4677 : * We also identify locking dependencies so that we can avoid trying to
4678 : * schedule conflicting items at the same time.
4679 : */
4680 : static void
4681 8 : fix_dependencies(ArchiveHandle *AH)
4682 : {
4683 : TocEntry *te;
4684 : int i;
4685 :
4686 : /*
4687 : * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4688 : * items are marked as not being in any parallel-processing list.
4689 : */
4690 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4691 : {
4692 192 : te->depCount = te->nDeps;
4693 192 : te->revDeps = NULL;
4694 192 : te->nRevDeps = 0;
4695 192 : te->pending_prev = NULL;
4696 192 : te->pending_next = NULL;
4697 : }
4698 :
4699 : /*
4700 : * POST_DATA items that are shown as depending on a table need to be
4701 : * re-pointed to depend on that table's data, instead. This ensures they
4702 : * won't get scheduled until the data has been loaded.
4703 : */
4704 8 : repoint_table_dependencies(AH);
4705 :
4706 : /*
4707 : * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4708 : * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4709 : * one BLOB COMMENTS in such files.)
4710 : */
4711 8 : if (AH->version < K_VERS_1_11)
4712 : {
4713 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4714 : {
4715 0 : if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4716 : {
4717 : TocEntry *te2;
4718 :
4719 0 : for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4720 : {
4721 0 : if (strcmp(te2->desc, "BLOBS") == 0)
4722 : {
4723 0 : te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4724 0 : te->dependencies[0] = te2->dumpId;
4725 0 : te->nDeps++;
4726 0 : te->depCount++;
4727 0 : break;
4728 : }
4729 : }
4730 0 : break;
4731 : }
4732 : }
4733 : }
4734 :
4735 : /*
4736 : * At this point we start to build the revDeps reverse-dependency arrays,
4737 : * so all changes of dependencies must be complete.
4738 : */
4739 :
4740 : /*
4741 : * Count the incoming dependencies for each item. Also, it is possible
4742 : * that the dependencies list items that are not in the archive at all
4743 : * (that should not happen in 9.2 and later, but is highly likely in older
4744 : * archives). Subtract such items from the depCounts.
4745 : */
4746 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4747 : {
4748 576 : for (i = 0; i < te->nDeps; i++)
4749 : {
4750 384 : DumpId depid = te->dependencies[i];
4751 :
4752 384 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4753 384 : AH->tocsByDumpId[depid]->nRevDeps++;
4754 : else
4755 0 : te->depCount--;
4756 : }
4757 : }
4758 :
4759 : /*
4760 : * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4761 : * it as a counter below.
4762 : */
4763 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4764 : {
4765 192 : if (te->nRevDeps > 0)
4766 104 : te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4767 192 : te->nRevDeps = 0;
4768 : }
4769 :
4770 : /*
4771 : * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4772 : * better agree with the loops above.
4773 : */
4774 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4775 : {
4776 576 : for (i = 0; i < te->nDeps; i++)
4777 : {
4778 384 : DumpId depid = te->dependencies[i];
4779 :
4780 384 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4781 : {
4782 384 : TocEntry *otherte = AH->tocsByDumpId[depid];
4783 :
4784 384 : otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4785 : }
4786 : }
4787 : }
4788 :
4789 : /*
4790 : * Lastly, work out the locking dependencies.
4791 : */
4792 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4793 : {
4794 192 : te->lockDeps = NULL;
4795 192 : te->nLockDeps = 0;
4796 192 : identify_locking_dependencies(AH, te);
4797 : }
4798 8 : }
4799 :
4800 : /*
4801 : * Change dependencies on table items to depend on table data items instead,
4802 : * but only in POST_DATA items.
4803 : *
4804 : * Also, for any item having such dependency(s), set its dataLength to the
4805 : * largest dataLength of the table data items it depends on. This ensures
4806 : * that parallel restore will prioritize larger jobs (index builds, FK
4807 : * constraint checks, etc) over smaller ones, avoiding situations where we
4808 : * end a restore with only one active job working on a large table.
4809 : */
4810 : static void
4811 8 : repoint_table_dependencies(ArchiveHandle *AH)
4812 : {
4813 : TocEntry *te;
4814 : int i;
4815 : DumpId olddep;
4816 :
4817 200 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4818 : {
4819 192 : if (te->section != SECTION_POST_DATA)
4820 132 : continue;
4821 320 : for (i = 0; i < te->nDeps; i++)
4822 : {
4823 260 : olddep = te->dependencies[i];
4824 260 : if (olddep <= AH->maxDumpId &&
4825 260 : AH->tableDataId[olddep] != 0)
4826 : {
4827 124 : DumpId tabledataid = AH->tableDataId[olddep];
4828 124 : TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4829 :
4830 124 : te->dependencies[i] = tabledataid;
4831 124 : te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4832 124 : pg_log_debug("transferring dependency %d -> %d to %d",
4833 : te->dumpId, olddep, tabledataid);
4834 : }
4835 : }
4836 : }
4837 8 : }
4838 :
4839 : /*
4840 : * Identify which objects we'll need exclusive lock on in order to restore
4841 : * the given TOC entry (*other* than the one identified by the TOC entry
4842 : * itself). Record their dump IDs in the entry's lockDeps[] array.
4843 : */
4844 : static void
4845 192 : identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4846 : {
4847 : DumpId *lockids;
4848 : int nlockids;
4849 : int i;
4850 :
4851 : /*
4852 : * We only care about this for POST_DATA items. PRE_DATA items are not
4853 : * run in parallel, and DATA items are all independent by assumption.
4854 : */
4855 192 : if (te->section != SECTION_POST_DATA)
4856 132 : return;
4857 :
4858 : /* Quick exit if no dependencies at all */
4859 60 : if (te->nDeps == 0)
4860 0 : return;
4861 :
4862 : /*
4863 : * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4864 : * and hence require exclusive lock. However, we know that CREATE INDEX
4865 : * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4866 : * category too ... but today is not that day.)
4867 : */
4868 60 : if (strcmp(te->desc, "INDEX") == 0)
4869 0 : return;
4870 :
4871 : /*
4872 : * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4873 : * item listed among its dependencies. Originally all of these would have
4874 : * been TABLE items, but repoint_table_dependencies would have repointed
4875 : * them to the TABLE DATA items if those are present (which they might not
4876 : * be, eg in a schema-only dump). Note that all of the entries we are
4877 : * processing here are POST_DATA; otherwise there might be a significant
4878 : * difference between a dependency on a table and a dependency on its
4879 : * data, so that closer analysis would be needed here.
4880 : */
4881 60 : lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4882 60 : nlockids = 0;
4883 320 : for (i = 0; i < te->nDeps; i++)
4884 : {
4885 260 : DumpId depid = te->dependencies[i];
4886 :
4887 260 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4888 260 : ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4889 136 : strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4890 164 : lockids[nlockids++] = depid;
4891 : }
4892 :
4893 60 : if (nlockids == 0)
4894 : {
4895 0 : free(lockids);
4896 0 : return;
4897 : }
4898 :
4899 60 : te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4900 60 : te->nLockDeps = nlockids;
4901 : }
4902 :
4903 : /*
4904 : * Remove the specified TOC entry from the depCounts of items that depend on
4905 : * it, thereby possibly making them ready-to-run. Any pending item that
4906 : * becomes ready should be moved to the ready_heap, if that's provided.
4907 : */
4908 : static void
4909 192 : reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4910 : binaryheap *ready_heap)
4911 : {
4912 : int i;
4913 :
4914 192 : pg_log_debug("reducing dependencies for %d", te->dumpId);
4915 :
4916 576 : for (i = 0; i < te->nRevDeps; i++)
4917 : {
4918 384 : TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4919 :
4920 : Assert(otherte->depCount > 0);
4921 384 : otherte->depCount--;
4922 :
4923 : /*
4924 : * It's ready if it has no remaining dependencies, and it belongs in
4925 : * the current restore pass, and it is currently a member of the
4926 : * pending list (that check is needed to prevent double restore in
4927 : * some cases where a list-file forces out-of-order restoring).
4928 : * However, if ready_heap == NULL then caller doesn't want any list
4929 : * memberships changed.
4930 : */
4931 384 : if (otherte->depCount == 0 &&
4932 148 : _tocEntryRestorePass(otherte) == AH->restorePass &&
4933 148 : otherte->pending_prev != NULL &&
4934 : ready_heap != NULL)
4935 : {
4936 : /* Remove it from pending list ... */
4937 52 : pending_list_remove(otherte);
4938 : /* ... and add to ready_heap */
4939 52 : binaryheap_add(ready_heap, otherte);
4940 : }
4941 : }
4942 192 : }
4943 :
4944 : /*
4945 : * Set the created flag on the DATA member corresponding to the given
4946 : * TABLE member
4947 : */
4948 : static void
4949 9330 : mark_create_done(ArchiveHandle *AH, TocEntry *te)
4950 : {
4951 9330 : if (AH->tableDataId[te->dumpId] != 0)
4952 : {
4953 6990 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4954 :
4955 6990 : ted->created = true;
4956 : }
4957 9330 : }
4958 :
4959 : /*
4960 : * Mark the DATA member corresponding to the given TABLE member
4961 : * as not wanted
4962 : */
4963 : static void
4964 0 : inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4965 : {
4966 0 : pg_log_info("table \"%s\" could not be created, will not restore its data",
4967 : te->tag);
4968 :
4969 0 : if (AH->tableDataId[te->dumpId] != 0)
4970 : {
4971 0 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4972 :
4973 0 : ted->reqs = 0;
4974 : }
4975 0 : }
4976 :
4977 : /*
4978 : * Clone and de-clone routines used in parallel restoration.
4979 : *
4980 : * Enough of the structure is cloned to ensure that there is no
4981 : * conflict between different threads each with their own clone.
4982 : */
4983 : ArchiveHandle *
4984 52 : CloneArchive(ArchiveHandle *AH)
4985 : {
4986 : ArchiveHandle *clone;
4987 :
4988 : /* Make a "flat" copy */
4989 52 : clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4990 52 : memcpy(clone, AH, sizeof(ArchiveHandle));
4991 :
4992 : /* Likewise flat-copy the RestoreOptions, so we can alter them locally */
4993 52 : clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
4994 52 : memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
4995 :
4996 : /* Handle format-independent fields */
4997 52 : memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4998 :
4999 : /* The clone will have its own connection, so disregard connection state */
5000 52 : clone->connection = NULL;
5001 52 : clone->connCancel = NULL;
5002 52 : clone->currUser = NULL;
5003 52 : clone->currSchema = NULL;
5004 52 : clone->currTableAm = NULL;
5005 52 : clone->currTablespace = NULL;
5006 :
5007 : /* savedPassword must be local in case we change it while connecting */
5008 52 : if (clone->savedPassword)
5009 0 : clone->savedPassword = pg_strdup(clone->savedPassword);
5010 :
5011 : /* clone has its own error count, too */
5012 52 : clone->public.n_errors = 0;
5013 :
5014 : /* clones should not share lo_buf */
5015 52 : clone->lo_buf = NULL;
5016 :
5017 : /*
5018 : * Clone connections disregard --transaction-size; they must commit after
5019 : * each command so that the results are immediately visible to other
5020 : * workers.
5021 : */
5022 52 : clone->public.ropt->txn_size = 0;
5023 :
5024 : /*
5025 : * Connect our new clone object to the database, using the same connection
5026 : * parameters used for the original connection.
5027 : */
5028 52 : ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
5029 :
5030 : /* re-establish fixed state */
5031 52 : if (AH->mode == archModeRead)
5032 20 : _doSetFixedOutputState(clone);
5033 : /* in write case, setupDumpWorker will fix up connection state */
5034 :
5035 : /* Let the format-specific code have a chance too */
5036 52 : clone->ClonePtr(clone);
5037 :
5038 : Assert(clone->connection != NULL);
5039 52 : return clone;
5040 : }
5041 :
5042 : /*
5043 : * Release clone-local storage.
5044 : *
5045 : * Note: we assume any clone-local connection was already closed.
5046 : */
5047 : void
5048 52 : DeCloneArchive(ArchiveHandle *AH)
5049 : {
5050 : /* Should not have an open database connection */
5051 : Assert(AH->connection == NULL);
5052 :
5053 : /* Clear format-specific state */
5054 52 : AH->DeClonePtr(AH);
5055 :
5056 : /* Clear state allocated by CloneArchive */
5057 52 : if (AH->sqlparse.curCmd)
5058 6 : destroyPQExpBuffer(AH->sqlparse.curCmd);
5059 :
5060 : /* Clear any connection-local state */
5061 52 : free(AH->currUser);
5062 52 : free(AH->currSchema);
5063 52 : free(AH->currTablespace);
5064 52 : free(AH->currTableAm);
5065 52 : free(AH->savedPassword);
5066 :
5067 52 : free(AH);
5068 52 : }
|