Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_custom.c
4 : *
5 : * Implements the custom output format.
6 : *
7 : * The comments with the routines in this code are a good place to
8 : * understand how to write a new format.
9 : *
10 : * See the headers to pg_restore for more details.
11 : *
12 : * Copyright (c) 2000, Philip Warner
13 : * Rights are granted to use this software in any way so long
14 : * as this notice is not removed.
15 : *
16 : * The author is not responsible for loss or damages that may
17 : * and any liability will be limited to the time taken to fix any
18 : * related bug.
19 : *
20 : *
21 : * IDENTIFICATION
22 : * src/bin/pg_dump/pg_backup_custom.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres_fe.h"
27 :
28 : #include "common/file_utils.h"
29 : #include "compress_io.h"
30 : #include "pg_backup_utils.h"
31 :
32 : /*--------
33 : * Routines in the format interface
34 : *--------
35 : */
36 :
37 : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
38 : static void _StartData(ArchiveHandle *AH, TocEntry *te);
39 : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
40 : static void _EndData(ArchiveHandle *AH, TocEntry *te);
41 : static int _WriteByte(ArchiveHandle *AH, const int i);
42 : static int _ReadByte(ArchiveHandle *AH);
43 : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
44 : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
45 : static void _CloseArchive(ArchiveHandle *AH);
46 : static void _ReopenArchive(ArchiveHandle *AH);
47 : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
48 : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
49 : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
50 : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
51 :
52 : static void _PrintData(ArchiveHandle *AH);
53 : static void _skipData(ArchiveHandle *AH);
54 : static void _skipLOs(ArchiveHandle *AH);
55 :
56 : static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
57 : static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
58 : static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 : static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
60 : static void _LoadLOs(ArchiveHandle *AH, bool drop);
61 :
62 : static void _PrepParallelRestore(ArchiveHandle *AH);
63 : static void _Clone(ArchiveHandle *AH);
64 : static void _DeClone(ArchiveHandle *AH);
65 :
66 : static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
67 :
68 : typedef struct
69 : {
70 : CompressorState *cs;
71 : int hasSeek;
72 : /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
73 : pgoff_t lastFilePos; /* position after last data block we've read */
74 : } lclContext;
75 :
76 : typedef struct
77 : {
78 : int dataState;
79 : pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
80 : } lclTocEntry;
81 :
82 :
83 : /*------
84 : * Static declarations
85 : *------
86 : */
87 : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
88 : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
89 :
90 : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
91 : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
92 :
93 :
94 : /*
95 : * Init routine required by ALL formats. This is a global routine
96 : * and should be declared in pg_backup_archiver.h
97 : *
98 : * It's task is to create any extra archive context (using AH->formatData),
99 : * and to initialize the supported function pointers.
100 : *
101 : * It should also prepare whatever its input source is for reading/writing,
102 : * and in the case of a read mode connection, it should load the Header & TOC.
103 : */
104 : void
105 180 : InitArchiveFmt_Custom(ArchiveHandle *AH)
106 : {
107 : lclContext *ctx;
108 :
109 : /* Assuming static functions, this can be copied for each format. */
110 180 : AH->ArchiveEntryPtr = _ArchiveEntry;
111 180 : AH->StartDataPtr = _StartData;
112 180 : AH->WriteDataPtr = _WriteData;
113 180 : AH->EndDataPtr = _EndData;
114 180 : AH->WriteBytePtr = _WriteByte;
115 180 : AH->ReadBytePtr = _ReadByte;
116 180 : AH->WriteBufPtr = _WriteBuf;
117 180 : AH->ReadBufPtr = _ReadBuf;
118 180 : AH->ClosePtr = _CloseArchive;
119 180 : AH->ReopenPtr = _ReopenArchive;
120 180 : AH->PrintTocDataPtr = _PrintTocData;
121 180 : AH->ReadExtraTocPtr = _ReadExtraToc;
122 180 : AH->WriteExtraTocPtr = _WriteExtraToc;
123 180 : AH->PrintExtraTocPtr = _PrintExtraToc;
124 :
125 180 : AH->StartLOsPtr = _StartLOs;
126 180 : AH->StartLOPtr = _StartLO;
127 180 : AH->EndLOPtr = _EndLO;
128 180 : AH->EndLOsPtr = _EndLOs;
129 :
130 180 : AH->PrepParallelRestorePtr = _PrepParallelRestore;
131 180 : AH->ClonePtr = _Clone;
132 180 : AH->DeClonePtr = _DeClone;
133 :
134 : /* no parallel dump in the custom archive, only parallel restore */
135 180 : AH->WorkerJobDumpPtr = NULL;
136 180 : AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
137 :
138 : /* Set up a private area. */
139 180 : ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
140 180 : AH->formatData = ctx;
141 :
142 : /*
143 : * Now open the file
144 : */
145 180 : if (AH->mode == archModeWrite)
146 : {
147 88 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
148 : {
149 88 : AH->FH = fopen(AH->fSpec, PG_BINARY_W);
150 88 : if (!AH->FH)
151 0 : pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
152 : }
153 : else
154 : {
155 0 : AH->FH = stdout;
156 0 : if (!AH->FH)
157 0 : pg_fatal("could not open output file: %m");
158 : }
159 :
160 88 : ctx->hasSeek = checkSeek(AH->FH);
161 : }
162 : else
163 : {
164 92 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
165 : {
166 92 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
167 92 : if (!AH->FH)
168 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
169 : }
170 : else
171 : {
172 0 : AH->FH = stdin;
173 0 : if (!AH->FH)
174 0 : pg_fatal("could not open input file: %m");
175 : }
176 :
177 92 : ctx->hasSeek = checkSeek(AH->FH);
178 :
179 92 : ReadHead(AH);
180 92 : ReadToc(AH);
181 :
182 : /*
183 : * Remember location of first data block (i.e., the point after TOC)
184 : * in case we have to search for desired data blocks.
185 : */
186 92 : ctx->lastFilePos = _getFilePos(AH, ctx);
187 : }
188 180 : }
189 :
190 : /*
191 : * Called by the Archiver when the dumper creates a new TOC entry.
192 : *
193 : * Optional.
194 : *
195 : * Set up extract format-related TOC data.
196 : */
197 : static void
198 9792 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
199 : {
200 : lclTocEntry *ctx;
201 :
202 9792 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
203 9792 : if (te->dataDumper)
204 270 : ctx->dataState = K_OFFSET_POS_NOT_SET;
205 : else
206 9522 : ctx->dataState = K_OFFSET_NO_DATA;
207 :
208 9792 : te->formatData = ctx;
209 9792 : }
210 :
211 : /*
212 : * Called by the Archiver to save any extra format-related TOC entry
213 : * data.
214 : *
215 : * Optional.
216 : *
217 : * Use the Archiver routines to write data - they are non-endian, and
218 : * maintain other important file information.
219 : */
220 : static void
221 12280 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
222 : {
223 12280 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
224 :
225 12280 : WriteOffset(AH, ctx->dataPos, ctx->dataState);
226 12280 : }
227 :
228 : /*
229 : * Called by the Archiver to read any extra format-related TOC data.
230 : *
231 : * Optional.
232 : *
233 : * Needs to match the order defined in _WriteExtraToc, and should also
234 : * use the Archiver input routines.
235 : */
236 : static void
237 12074 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
238 : {
239 12074 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
240 :
241 12074 : if (ctx == NULL)
242 : {
243 12074 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
244 12074 : te->formatData = ctx;
245 : }
246 :
247 12074 : ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
248 :
249 : /*
250 : * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
251 : * dump it at all.
252 : */
253 12074 : if (AH->version < K_VERS_1_7)
254 0 : ReadInt(AH);
255 12074 : }
256 :
257 : /*
258 : * Called by the Archiver when restoring an archive to output a comment
259 : * that includes useful information about the TOC entry.
260 : *
261 : * Optional.
262 : */
263 : static void
264 2974 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
265 : {
266 2974 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
267 :
268 2974 : if (AH->public.verbose)
269 622 : ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
270 622 : (int64) ctx->dataPos);
271 2974 : }
272 :
273 : /*
274 : * Called by the archiver when saving TABLE DATA (not schema). This routine
275 : * should save whatever format-specific information is needed to read
276 : * the archive back.
277 : *
278 : * It is called just prior to the dumper's 'DataDumper' routine being called.
279 : *
280 : * Optional, but strongly recommended.
281 : *
282 : */
283 : static void
284 252 : _StartData(ArchiveHandle *AH, TocEntry *te)
285 : {
286 252 : lclContext *ctx = (lclContext *) AH->formatData;
287 252 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
288 :
289 252 : tctx->dataPos = _getFilePos(AH, ctx);
290 252 : if (tctx->dataPos >= 0)
291 252 : tctx->dataState = K_OFFSET_POS_SET;
292 :
293 252 : _WriteByte(AH, BLK_DATA); /* Block type */
294 252 : WriteInt(AH, te->dumpId); /* For sanity check */
295 :
296 252 : ctx->cs = AllocateCompressor(AH->compression_spec,
297 : NULL,
298 : _CustomWriteFunc);
299 252 : }
300 :
301 : /*
302 : * Called by archiver when dumper calls WriteData. This routine is
303 : * called for both LO and table data; it is the responsibility of
304 : * the format to manage each kind of data using StartLO/StartData.
305 : *
306 : * It should only be called from within a DataDumper routine.
307 : *
308 : * Mandatory.
309 : */
310 : static void
311 1164 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
312 : {
313 1164 : lclContext *ctx = (lclContext *) AH->formatData;
314 1164 : CompressorState *cs = ctx->cs;
315 :
316 1164 : if (dLen > 0)
317 : /* writeData() internally throws write errors */
318 1152 : cs->writeData(AH, cs, data, dLen);
319 1164 : }
320 :
321 : /*
322 : * Called by the archiver when a dumper's 'DataDumper' routine has
323 : * finished.
324 : *
325 : * Mandatory.
326 : */
327 : static void
328 252 : _EndData(ArchiveHandle *AH, TocEntry *te)
329 : {
330 252 : lclContext *ctx = (lclContext *) AH->formatData;
331 :
332 252 : EndCompressor(AH, ctx->cs);
333 252 : ctx->cs = NULL;
334 :
335 : /* Send the end marker */
336 252 : WriteInt(AH, 0);
337 252 : }
338 :
339 : /*
340 : * Called by the archiver when starting to save BLOB DATA (not schema).
341 : * This routine should save whatever format-specific information is needed
342 : * to read the LOs back into memory.
343 : *
344 : * It is called just prior to the dumper's DataDumper routine.
345 : *
346 : * Optional, but strongly recommended.
347 : */
348 : static void
349 12 : _StartLOs(ArchiveHandle *AH, TocEntry *te)
350 : {
351 12 : lclContext *ctx = (lclContext *) AH->formatData;
352 12 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
353 :
354 12 : tctx->dataPos = _getFilePos(AH, ctx);
355 12 : if (tctx->dataPos >= 0)
356 12 : tctx->dataState = K_OFFSET_POS_SET;
357 :
358 12 : _WriteByte(AH, BLK_BLOBS); /* Block type */
359 12 : WriteInt(AH, te->dumpId); /* For sanity check */
360 12 : }
361 :
362 : /*
363 : * Called by the archiver when the dumper calls StartLO.
364 : *
365 : * Mandatory.
366 : *
367 : * Must save the passed OID for retrieval at restore-time.
368 : */
369 : static void
370 12 : _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
371 : {
372 12 : lclContext *ctx = (lclContext *) AH->formatData;
373 :
374 12 : if (oid == 0)
375 0 : pg_fatal("invalid OID for large object");
376 :
377 12 : WriteInt(AH, oid);
378 :
379 12 : ctx->cs = AllocateCompressor(AH->compression_spec,
380 : NULL,
381 : _CustomWriteFunc);
382 12 : }
383 :
384 : /*
385 : * Called by the archiver when the dumper calls EndLO.
386 : *
387 : * Optional.
388 : */
389 : static void
390 12 : _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
391 : {
392 12 : lclContext *ctx = (lclContext *) AH->formatData;
393 :
394 12 : EndCompressor(AH, ctx->cs);
395 : /* Send the end marker */
396 12 : WriteInt(AH, 0);
397 12 : }
398 :
399 : /*
400 : * Called by the archiver when finishing saving BLOB DATA.
401 : *
402 : * Optional.
403 : */
404 : static void
405 12 : _EndLOs(ArchiveHandle *AH, TocEntry *te)
406 : {
407 : /* Write out a fake zero OID to mark end-of-LOs. */
408 12 : WriteInt(AH, 0);
409 12 : }
410 :
411 : /*
412 : * Print data for a given TOC entry
413 : */
414 : static void
415 250 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
416 : {
417 250 : lclContext *ctx = (lclContext *) AH->formatData;
418 250 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
419 : int blkType;
420 : int id;
421 :
422 250 : if (tctx->dataState == K_OFFSET_NO_DATA)
423 0 : return;
424 :
425 250 : if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
426 : {
427 : /*
428 : * We cannot seek directly to the desired block. Instead, skip over
429 : * block headers until we find the one we want. Remember the
430 : * positions of skipped-over blocks, so that if we later decide we
431 : * need to read one, we'll be able to seek to it.
432 : *
433 : * When our input file is seekable, we can do the search starting from
434 : * the point after the last data block we scanned in previous
435 : * iterations of this function.
436 : */
437 0 : if (ctx->hasSeek)
438 : {
439 0 : if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
440 0 : pg_fatal("error during file seek: %m");
441 : }
442 :
443 : for (;;)
444 0 : {
445 0 : pgoff_t thisBlkPos = _getFilePos(AH, ctx);
446 :
447 0 : _readBlockHeader(AH, &blkType, &id);
448 :
449 0 : if (blkType == EOF || id == te->dumpId)
450 : break;
451 :
452 : /* Remember the block position, if we got one */
453 0 : if (thisBlkPos >= 0)
454 : {
455 0 : TocEntry *otherte = getTocEntryByDumpId(AH, id);
456 :
457 0 : if (otherte && otherte->formatData)
458 : {
459 0 : lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
460 :
461 : /*
462 : * Note: on Windows, multiple threads might access/update
463 : * the same lclTocEntry concurrently, but that should be
464 : * safe as long as we update dataPos before dataState.
465 : * Ideally, we'd use pg_write_barrier() to enforce that,
466 : * but the needed infrastructure doesn't exist in frontend
467 : * code. But Windows only runs on machines with strong
468 : * store ordering, so it should be okay for now.
469 : */
470 0 : if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
471 : {
472 0 : othertctx->dataPos = thisBlkPos;
473 0 : othertctx->dataState = K_OFFSET_POS_SET;
474 : }
475 0 : else if (othertctx->dataPos != thisBlkPos ||
476 0 : othertctx->dataState != K_OFFSET_POS_SET)
477 : {
478 : /* sanity check */
479 0 : pg_log_warning("data block %d has wrong seek position",
480 : id);
481 : }
482 : }
483 : }
484 :
485 0 : switch (blkType)
486 : {
487 0 : case BLK_DATA:
488 0 : _skipData(AH);
489 0 : break;
490 :
491 0 : case BLK_BLOBS:
492 0 : _skipLOs(AH);
493 0 : break;
494 :
495 0 : default: /* Always have a default */
496 0 : pg_fatal("unrecognized data block type (%d) while searching archive",
497 : blkType);
498 : break;
499 : }
500 : }
501 : }
502 : else
503 : {
504 : /* We can just seek to the place we need to be. */
505 250 : if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
506 0 : pg_fatal("error during file seek: %m");
507 :
508 250 : _readBlockHeader(AH, &blkType, &id);
509 : }
510 :
511 : /*
512 : * If we reached EOF without finding the block we want, then either it
513 : * doesn't exist, or it does but we lack the ability to seek back to it.
514 : */
515 250 : if (blkType == EOF)
516 : {
517 0 : if (!ctx->hasSeek)
518 0 : pg_fatal("could not find block ID %d in archive -- "
519 : "possibly due to out-of-order restore request, "
520 : "which cannot be handled due to non-seekable input file",
521 : te->dumpId);
522 : else
523 0 : pg_fatal("could not find block ID %d in archive -- "
524 : "possibly corrupt archive",
525 : te->dumpId);
526 : }
527 :
528 : /* Are we sane? */
529 250 : if (id != te->dumpId)
530 0 : pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
531 : id, te->dumpId);
532 :
533 250 : switch (blkType)
534 : {
535 238 : case BLK_DATA:
536 238 : _PrintData(AH);
537 238 : break;
538 :
539 12 : case BLK_BLOBS:
540 12 : _LoadLOs(AH, AH->public.ropt->dropSchema);
541 12 : break;
542 :
543 0 : default: /* Always have a default */
544 0 : pg_fatal("unrecognized data block type %d while restoring archive",
545 : blkType);
546 : break;
547 : }
548 :
549 : /*
550 : * If our input file is seekable but lacks data offsets, update our
551 : * knowledge of where to start future searches from. (Note that we did
552 : * not update the current TE's dataState/dataPos. We could have, but
553 : * there is no point since it will not be visited again.)
554 : */
555 250 : if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
556 : {
557 0 : pgoff_t curPos = _getFilePos(AH, ctx);
558 :
559 0 : if (curPos > ctx->lastFilePos)
560 0 : ctx->lastFilePos = curPos;
561 : }
562 : }
563 :
564 : /*
565 : * Print data from current file position.
566 : */
567 : static void
568 250 : _PrintData(ArchiveHandle *AH)
569 : {
570 : CompressorState *cs;
571 :
572 250 : cs = AllocateCompressor(AH->compression_spec,
573 : _CustomReadFunc, NULL);
574 250 : cs->readData(AH, cs);
575 250 : EndCompressor(AH, cs);
576 250 : }
577 :
578 : static void
579 12 : _LoadLOs(ArchiveHandle *AH, bool drop)
580 : {
581 : Oid oid;
582 :
583 12 : StartRestoreLOs(AH);
584 :
585 12 : oid = ReadInt(AH);
586 24 : while (oid != 0)
587 : {
588 12 : StartRestoreLO(AH, oid, drop);
589 12 : _PrintData(AH);
590 12 : EndRestoreLO(AH, oid);
591 12 : oid = ReadInt(AH);
592 : }
593 :
594 12 : EndRestoreLOs(AH);
595 12 : }
596 :
597 : /*
598 : * Skip the LOs from the current file position.
599 : * LOs are written sequentially as data blocks (see below).
600 : * Each LO is preceded by its original OID.
601 : * A zero OID indicates the end of the LOs.
602 : */
603 : static void
604 0 : _skipLOs(ArchiveHandle *AH)
605 : {
606 : Oid oid;
607 :
608 0 : oid = ReadInt(AH);
609 0 : while (oid != 0)
610 : {
611 0 : _skipData(AH);
612 0 : oid = ReadInt(AH);
613 : }
614 0 : }
615 :
616 : /*
617 : * Skip data from current file position.
618 : * Data blocks are formatted as an integer length, followed by data.
619 : * A zero length indicates the end of the block.
620 : */
621 : static void
622 0 : _skipData(ArchiveHandle *AH)
623 : {
624 0 : lclContext *ctx = (lclContext *) AH->formatData;
625 : size_t blkLen;
626 0 : char *buf = NULL;
627 0 : int buflen = 0;
628 :
629 0 : blkLen = ReadInt(AH);
630 0 : while (blkLen != 0)
631 : {
632 0 : if (ctx->hasSeek)
633 : {
634 0 : if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
635 0 : pg_fatal("error during file seek: %m");
636 : }
637 : else
638 : {
639 0 : if (blkLen > buflen)
640 : {
641 0 : free(buf);
642 0 : buf = (char *) pg_malloc(blkLen);
643 0 : buflen = blkLen;
644 : }
645 0 : if (fread(buf, 1, blkLen, AH->FH) != blkLen)
646 : {
647 0 : if (feof(AH->FH))
648 0 : pg_fatal("could not read from input file: end of file");
649 : else
650 0 : pg_fatal("could not read from input file: %m");
651 : }
652 : }
653 :
654 0 : blkLen = ReadInt(AH);
655 : }
656 :
657 0 : free(buf);
658 0 : }
659 :
660 : /*
661 : * Write a byte of data to the archive.
662 : *
663 : * Mandatory.
664 : *
665 : * Called by the archiver to do integer & byte output to the archive.
666 : */
667 : static int
668 1257410 : _WriteByte(ArchiveHandle *AH, const int i)
669 : {
670 1257410 : if (fputc(i, AH->FH) == EOF)
671 0 : WRITE_ERROR_EXIT;
672 :
673 1257410 : return 1;
674 : }
675 :
676 : /*
677 : * Read a byte of data from the archive.
678 : *
679 : * Mandatory
680 : *
681 : * Called by the archiver to read bytes & integers from the archive.
682 : * EOF should be treated as a fatal error.
683 : */
684 : static int
685 1239480 : _ReadByte(ArchiveHandle *AH)
686 : {
687 : int res;
688 :
689 1239480 : res = getc(AH->FH);
690 1239480 : if (res == EOF)
691 0 : READ_ERROR_EXIT(AH->FH);
692 1239480 : return res;
693 : }
694 :
695 : /*
696 : * Write a buffer of data to the archive.
697 : *
698 : * Mandatory.
699 : *
700 : * Called by the archiver to write a block of bytes to the archive.
701 : */
702 : static void
703 123416 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
704 : {
705 123416 : if (fwrite(buf, 1, len, AH->FH) != len)
706 0 : WRITE_ERROR_EXIT;
707 123416 : }
708 :
709 : /*
710 : * Read a block of bytes from the archive.
711 : *
712 : * Mandatory.
713 : *
714 : * Called by the archiver to read a block of bytes from the archive
715 : */
716 : static void
717 122318 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
718 : {
719 122318 : if (fread(buf, 1, len, AH->FH) != len)
720 0 : READ_ERROR_EXIT(AH->FH);
721 122318 : }
722 :
723 : /*
724 : * Close the archive.
725 : *
726 : * Mandatory.
727 : *
728 : * When writing the archive, this is the routine that actually starts
729 : * the process of saving it to files. No data should be written prior
730 : * to this point, since the user could sort the TOC after creating it.
731 : *
732 : * If an archive is to be written, this routine must call:
733 : * WriteHead to save the archive header
734 : * WriteToc to save the TOC entries
735 : * WriteDataChunks to save all data & LOs.
736 : *
737 : */
738 : static void
739 180 : _CloseArchive(ArchiveHandle *AH)
740 : {
741 180 : lclContext *ctx = (lclContext *) AH->formatData;
742 : pgoff_t tpos;
743 :
744 180 : if (AH->mode == archModeWrite)
745 : {
746 88 : WriteHead(AH);
747 : /* Remember TOC's seek position for use below */
748 88 : tpos = ftello(AH->FH);
749 88 : if (tpos < 0 && ctx->hasSeek)
750 0 : pg_fatal("could not determine seek position in archive file: %m");
751 88 : WriteToc(AH);
752 88 : WriteDataChunks(AH, NULL);
753 :
754 : /*
755 : * If possible, re-write the TOC in order to update the data offset
756 : * information. This is not essential, as pg_restore can cope in most
757 : * cases without it; but it can make pg_restore significantly faster
758 : * in some situations (especially parallel restore). We can skip this
759 : * step if we're not dumping any data; there are no offsets to update
760 : * in that case.
761 : */
762 116 : if (ctx->hasSeek && AH->public.dopt->dumpData &&
763 28 : fseeko(AH->FH, tpos, SEEK_SET) == 0)
764 28 : WriteToc(AH);
765 : }
766 :
767 180 : if (fclose(AH->FH) != 0)
768 0 : pg_fatal("could not close archive file: %m");
769 :
770 : /* Sync the output file if one is defined */
771 180 : if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
772 26 : (void) fsync_fname(AH->fSpec, false);
773 :
774 180 : AH->FH = NULL;
775 180 : }
776 :
777 : /*
778 : * Reopen the archive's file handle.
779 : *
780 : * We close the original file handle, except on Windows. (The difference
781 : * is because on Windows, this is used within a multithreading context,
782 : * and we don't want a thread closing the parent file handle.)
783 : */
784 : static void
785 0 : _ReopenArchive(ArchiveHandle *AH)
786 : {
787 0 : lclContext *ctx = (lclContext *) AH->formatData;
788 : pgoff_t tpos;
789 :
790 0 : if (AH->mode == archModeWrite)
791 0 : pg_fatal("can only reopen input archives");
792 :
793 : /*
794 : * These two cases are user-facing errors since they represent unsupported
795 : * (but not invalid) use-cases. Word the error messages appropriately.
796 : */
797 0 : if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
798 0 : pg_fatal("parallel restore from standard input is not supported");
799 0 : if (!ctx->hasSeek)
800 0 : pg_fatal("parallel restore from non-seekable file is not supported");
801 :
802 0 : tpos = ftello(AH->FH);
803 0 : if (tpos < 0)
804 0 : pg_fatal("could not determine seek position in archive file: %m");
805 :
806 : #ifndef WIN32
807 0 : if (fclose(AH->FH) != 0)
808 0 : pg_fatal("could not close archive file: %m");
809 : #endif
810 :
811 0 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
812 0 : if (!AH->FH)
813 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
814 :
815 0 : if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
816 0 : pg_fatal("could not set seek position in archive file: %m");
817 0 : }
818 :
819 : /*
820 : * Prepare for parallel restore.
821 : *
822 : * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
823 : * TOC entries' dataLength fields with appropriate values to guide the
824 : * ordering of restore jobs. The source of said data is format-dependent,
825 : * as is the exact meaning of the values.
826 : *
827 : * A format module might also choose to do other setup here.
828 : */
829 : static void
830 0 : _PrepParallelRestore(ArchiveHandle *AH)
831 : {
832 0 : lclContext *ctx = (lclContext *) AH->formatData;
833 0 : TocEntry *prev_te = NULL;
834 0 : lclTocEntry *prev_tctx = NULL;
835 : TocEntry *te;
836 :
837 : /*
838 : * Knowing that the data items were dumped out in TOC order, we can
839 : * reconstruct the length of each item as the delta to the start offset of
840 : * the next data item.
841 : */
842 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
843 : {
844 0 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
845 :
846 : /*
847 : * Ignore entries without a known data offset; if we were unable to
848 : * seek to rewrite the TOC when creating the archive, this'll be all
849 : * of them, and we'll end up with no size estimates.
850 : */
851 0 : if (tctx->dataState != K_OFFSET_POS_SET)
852 0 : continue;
853 :
854 : /* Compute previous data item's length */
855 0 : if (prev_te)
856 : {
857 0 : if (tctx->dataPos > prev_tctx->dataPos)
858 0 : prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
859 : }
860 :
861 0 : prev_te = te;
862 0 : prev_tctx = tctx;
863 : }
864 :
865 : /* If OK to seek, we can determine the length of the last item */
866 0 : if (prev_te && ctx->hasSeek)
867 : {
868 : pgoff_t endpos;
869 :
870 0 : if (fseeko(AH->FH, 0, SEEK_END) != 0)
871 0 : pg_fatal("error during file seek: %m");
872 0 : endpos = ftello(AH->FH);
873 0 : if (endpos > prev_tctx->dataPos)
874 0 : prev_te->dataLength = endpos - prev_tctx->dataPos;
875 : }
876 0 : }
877 :
878 : /*
879 : * Clone format-specific fields during parallel restoration.
880 : */
881 : static void
882 0 : _Clone(ArchiveHandle *AH)
883 : {
884 0 : lclContext *ctx = (lclContext *) AH->formatData;
885 :
886 : /*
887 : * Each thread must have private lclContext working state.
888 : */
889 0 : AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
890 0 : memcpy(AH->formatData, ctx, sizeof(lclContext));
891 0 : ctx = (lclContext *) AH->formatData;
892 :
893 : /* sanity check, shouldn't happen */
894 0 : if (ctx->cs != NULL)
895 0 : pg_fatal("compressor active");
896 :
897 : /*
898 : * We intentionally do not clone TOC-entry-local state: it's useful to
899 : * share knowledge about where the data blocks are across threads.
900 : * _PrintTocData has to be careful about the order of operations on that
901 : * state, though.
902 : */
903 0 : }
904 :
905 : static void
906 0 : _DeClone(ArchiveHandle *AH)
907 : {
908 0 : lclContext *ctx = (lclContext *) AH->formatData;
909 :
910 0 : free(ctx);
911 0 : }
912 :
913 : /*
914 : * This function is executed in the child of a parallel restore from a
915 : * custom-format archive and restores the actual data for one TOC entry.
916 : */
917 : static int
918 0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
919 : {
920 0 : return parallel_restore(AH, te);
921 : }
922 :
923 : /*--------------------------------------------------
924 : * END OF FORMAT CALLBACKS
925 : *--------------------------------------------------
926 : */
927 :
928 : /*
929 : * Get the current position in the archive file.
930 : *
931 : * With a non-seekable archive file, we may not be able to obtain the
932 : * file position. If so, just return -1. It's not too important in
933 : * that case because we won't be able to rewrite the TOC to fill in
934 : * data block offsets anyway.
935 : */
936 : static pgoff_t
937 356 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
938 : {
939 : pgoff_t pos;
940 :
941 356 : pos = ftello(AH->FH);
942 356 : if (pos < 0)
943 : {
944 : /* Not expected if we found we can seek. */
945 0 : if (ctx->hasSeek)
946 0 : pg_fatal("could not determine seek position in archive file: %m");
947 : }
948 356 : return pos;
949 : }
950 :
951 : /*
952 : * Read a data block header. The format changed in V1.3, so we
953 : * centralize the code here for simplicity. Returns *type = EOF
954 : * if at EOF.
955 : */
956 : static void
957 250 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
958 : {
959 : int byt;
960 :
961 : /*
962 : * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
963 : * inside ReadInt rather than returning EOF. It doesn't seem worth
964 : * jumping through hoops to deal with that case better, because no such
965 : * files are likely to exist in the wild: only some 7.1 development
966 : * versions of pg_dump ever generated such files.
967 : */
968 250 : if (AH->version < K_VERS_1_3)
969 0 : *type = BLK_DATA;
970 : else
971 : {
972 250 : byt = getc(AH->FH);
973 250 : *type = byt;
974 250 : if (byt == EOF)
975 : {
976 0 : *id = 0; /* don't return an uninitialized value */
977 0 : return;
978 : }
979 : }
980 :
981 250 : *id = ReadInt(AH);
982 : }
983 :
984 : /*
985 : * Callback function for writeData. Writes one block of (compressed)
986 : * data to the archive.
987 : */
988 : static void
989 668 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
990 : {
991 : /* never write 0-byte blocks (this should not happen) */
992 668 : if (len > 0)
993 : {
994 336 : WriteInt(AH, len);
995 336 : _WriteBuf(AH, buf, len);
996 : }
997 668 : }
998 :
999 : /*
1000 : * Callback function for readData. To keep things simple, we
1001 : * always read one compressed block at a time.
1002 : */
1003 : static size_t
1004 572 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1005 : {
1006 : size_t blkLen;
1007 :
1008 : /* Read length */
1009 572 : blkLen = ReadInt(AH);
1010 572 : if (blkLen == 0)
1011 250 : return 0;
1012 :
1013 : /* If the caller's buffer is not large enough, allocate a bigger one */
1014 322 : if (blkLen > *buflen)
1015 : {
1016 2 : free(*buf);
1017 2 : *buf = (char *) pg_malloc(blkLen);
1018 2 : *buflen = blkLen;
1019 : }
1020 :
1021 : /* exits app on read errors */
1022 322 : _ReadBuf(AH, *buf, blkLen);
1023 :
1024 322 : return blkLen;
1025 : }
|