Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_custom.c
4 : *
5 : * Implements the custom output format.
6 : *
7 : * The comments with the routines in this code are a good place to
8 : * understand how to write a new format.
9 : *
10 : * See the headers to pg_restore for more details.
11 : *
12 : * Copyright (c) 2000, Philip Warner
13 : * Rights are granted to use this software in any way so long
14 : * as this notice is not removed.
15 : *
16 : * The author is not responsible for loss or damages that may
17 : * and any liability will be limited to the time taken to fix any
18 : * related bug.
19 : *
20 : *
21 : * IDENTIFICATION
22 : * src/bin/pg_dump/pg_backup_custom.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres_fe.h"
27 :
28 : #include "common/file_utils.h"
29 : #include "compress_io.h"
30 : #include "pg_backup_utils.h"
31 :
32 : /*--------
33 : * Routines in the format interface
34 : *--------
35 : */
36 :
37 : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
38 : static void _StartData(ArchiveHandle *AH, TocEntry *te);
39 : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
40 : static void _EndData(ArchiveHandle *AH, TocEntry *te);
41 : static int _WriteByte(ArchiveHandle *AH, const int i);
42 : static int _ReadByte(ArchiveHandle *AH);
43 : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
44 : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
45 : static void _CloseArchive(ArchiveHandle *AH);
46 : static void _ReopenArchive(ArchiveHandle *AH);
47 : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
48 : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
49 : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
50 : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
51 :
52 : static void _PrintData(ArchiveHandle *AH);
53 : static void _skipData(ArchiveHandle *AH);
54 : static void _skipLOs(ArchiveHandle *AH);
55 :
56 : static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
57 : static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
58 : static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 : static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
60 : static void _LoadLOs(ArchiveHandle *AH, bool drop);
61 :
62 : static void _PrepParallelRestore(ArchiveHandle *AH);
63 : static void _Clone(ArchiveHandle *AH);
64 : static void _DeClone(ArchiveHandle *AH);
65 :
66 : static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
67 :
68 : typedef struct
69 : {
70 : CompressorState *cs;
71 : int hasSeek;
72 : /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
73 : pgoff_t lastFilePos; /* position after last data block we've read */
74 : } lclContext;
75 :
76 : typedef struct
77 : {
78 : int dataState;
79 : pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
80 : } lclTocEntry;
81 :
82 :
83 : /*------
84 : * Static declarations
85 : *------
86 : */
87 : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
88 : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
89 :
90 : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
91 : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
92 :
93 :
94 : /*
95 : * Init routine required by ALL formats. This is a global routine
96 : * and should be declared in pg_backup_archiver.h
97 : *
98 : * It's task is to create any extra archive context (using AH->formatData),
99 : * and to initialize the supported function pointers.
100 : *
101 : * It should also prepare whatever its input source is for reading/writing,
102 : * and in the case of a read mode connection, it should load the Header & TOC.
103 : */
104 : void
105 86 : InitArchiveFmt_Custom(ArchiveHandle *AH)
106 : {
107 : lclContext *ctx;
108 :
109 : /* Assuming static functions, this can be copied for each format. */
110 86 : AH->ArchiveEntryPtr = _ArchiveEntry;
111 86 : AH->StartDataPtr = _StartData;
112 86 : AH->WriteDataPtr = _WriteData;
113 86 : AH->EndDataPtr = _EndData;
114 86 : AH->WriteBytePtr = _WriteByte;
115 86 : AH->ReadBytePtr = _ReadByte;
116 86 : AH->WriteBufPtr = _WriteBuf;
117 86 : AH->ReadBufPtr = _ReadBuf;
118 86 : AH->ClosePtr = _CloseArchive;
119 86 : AH->ReopenPtr = _ReopenArchive;
120 86 : AH->PrintTocDataPtr = _PrintTocData;
121 86 : AH->ReadExtraTocPtr = _ReadExtraToc;
122 86 : AH->WriteExtraTocPtr = _WriteExtraToc;
123 86 : AH->PrintExtraTocPtr = _PrintExtraToc;
124 :
125 86 : AH->StartLOsPtr = _StartLOs;
126 86 : AH->StartLOPtr = _StartLO;
127 86 : AH->EndLOPtr = _EndLO;
128 86 : AH->EndLOsPtr = _EndLOs;
129 :
130 86 : AH->PrepParallelRestorePtr = _PrepParallelRestore;
131 86 : AH->ClonePtr = _Clone;
132 86 : AH->DeClonePtr = _DeClone;
133 :
134 : /* no parallel dump in the custom archive, only parallel restore */
135 86 : AH->WorkerJobDumpPtr = NULL;
136 86 : AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
137 :
138 : /* Set up a private area. */
139 86 : ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
140 86 : AH->formatData = ctx;
141 :
142 : /*
143 : * Now open the file
144 : */
145 86 : if (AH->mode == archModeWrite)
146 : {
147 38 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
148 : {
149 38 : AH->FH = fopen(AH->fSpec, PG_BINARY_W);
150 38 : if (!AH->FH)
151 0 : pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
152 : }
153 : else
154 : {
155 0 : AH->FH = stdout;
156 0 : if (!AH->FH)
157 0 : pg_fatal("could not open output file: %m");
158 : }
159 :
160 38 : ctx->hasSeek = checkSeek(AH->FH);
161 : }
162 : else
163 : {
164 48 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
165 : {
166 48 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
167 48 : if (!AH->FH)
168 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
169 : }
170 : else
171 : {
172 0 : AH->FH = stdin;
173 0 : if (!AH->FH)
174 0 : pg_fatal("could not open input file: %m");
175 : }
176 :
177 48 : ctx->hasSeek = checkSeek(AH->FH);
178 :
179 48 : ReadHead(AH);
180 48 : ReadToc(AH);
181 :
182 : /*
183 : * Remember location of first data block (i.e., the point after TOC)
184 : * in case we have to search for desired data blocks.
185 : */
186 48 : ctx->lastFilePos = _getFilePos(AH, ctx);
187 : }
188 86 : }
189 :
190 : /*
191 : * Called by the Archiver when the dumper creates a new TOC entry.
192 : *
193 : * Optional.
194 : *
195 : * Set up extract format-related TOC data.
196 : */
197 : static void
198 6416 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
199 : {
200 : lclTocEntry *ctx;
201 :
202 6416 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
203 6416 : if (te->dataDumper)
204 228 : ctx->dataState = K_OFFSET_POS_NOT_SET;
205 : else
206 6188 : ctx->dataState = K_OFFSET_NO_DATA;
207 :
208 6416 : te->formatData = ctx;
209 6416 : }
210 :
211 : /*
212 : * Called by the Archiver to save any extra format-related TOC entry
213 : * data.
214 : *
215 : * Optional.
216 : *
217 : * Use the Archiver routines to write data - they are non-endian, and
218 : * maintain other important file information.
219 : */
220 : static void
221 12820 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
222 : {
223 12820 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
224 :
225 12820 : WriteOffset(AH, ctx->dataPos, ctx->dataState);
226 12820 : }
227 :
228 : /*
229 : * Called by the Archiver to read any extra format-related TOC data.
230 : *
231 : * Optional.
232 : *
233 : * Needs to match the order defined in _WriteExtraToc, and should also
234 : * use the Archiver input routines.
235 : */
236 : static void
237 8278 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
238 : {
239 8278 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
240 :
241 8278 : if (ctx == NULL)
242 : {
243 8278 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
244 8278 : te->formatData = ctx;
245 : }
246 :
247 8278 : ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
248 :
249 : /*
250 : * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
251 : * dump it at all.
252 : */
253 8278 : if (AH->version < K_VERS_1_7)
254 0 : ReadInt(AH);
255 8278 : }
256 :
257 : /*
258 : * Called by the Archiver when restoring an archive to output a comment
259 : * that includes useful information about the TOC entry.
260 : *
261 : * Optional.
262 : */
263 : static void
264 2248 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
265 : {
266 2248 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
267 :
268 2248 : if (AH->public.verbose)
269 484 : ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
270 484 : (int64) ctx->dataPos);
271 2248 : }
272 :
273 : /*
274 : * Called by the archiver when saving TABLE DATA (not schema). This routine
275 : * should save whatever format-specific information is needed to read
276 : * the archive back.
277 : *
278 : * It is called just prior to the dumper's 'DataDumper' routine being called.
279 : *
280 : * Optional, but strongly recommended.
281 : *
282 : */
283 : static void
284 210 : _StartData(ArchiveHandle *AH, TocEntry *te)
285 : {
286 210 : lclContext *ctx = (lclContext *) AH->formatData;
287 210 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
288 :
289 210 : tctx->dataPos = _getFilePos(AH, ctx);
290 210 : if (tctx->dataPos >= 0)
291 210 : tctx->dataState = K_OFFSET_POS_SET;
292 :
293 210 : _WriteByte(AH, BLK_DATA); /* Block type */
294 210 : WriteInt(AH, te->dumpId); /* For sanity check */
295 :
296 210 : ctx->cs = AllocateCompressor(AH->compression_spec,
297 : NULL,
298 : _CustomWriteFunc);
299 210 : }
300 :
301 : /*
302 : * Called by archiver when dumper calls WriteData. This routine is
303 : * called for both LO and table data; it is the responsibility of
304 : * the format to manage each kind of data using StartLO/StartData.
305 : *
306 : * It should only be called from within a DataDumper routine.
307 : *
308 : * Mandatory.
309 : */
310 : static void
311 418 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
312 : {
313 418 : lclContext *ctx = (lclContext *) AH->formatData;
314 418 : CompressorState *cs = ctx->cs;
315 :
316 418 : if (dLen > 0)
317 : /* writeData() internally throws write errors */
318 406 : cs->writeData(AH, cs, data, dLen);
319 418 : }
320 :
321 : /*
322 : * Called by the archiver when a dumper's 'DataDumper' routine has
323 : * finished.
324 : *
325 : * Mandatory.
326 : */
327 : static void
328 210 : _EndData(ArchiveHandle *AH, TocEntry *te)
329 : {
330 210 : lclContext *ctx = (lclContext *) AH->formatData;
331 :
332 210 : EndCompressor(AH, ctx->cs);
333 210 : ctx->cs = NULL;
334 :
335 : /* Send the end marker */
336 210 : WriteInt(AH, 0);
337 210 : }
338 :
339 : /*
340 : * Called by the archiver when starting to save BLOB DATA (not schema).
341 : * This routine should save whatever format-specific information is needed
342 : * to read the LOs back into memory.
343 : *
344 : * It is called just prior to the dumper's DataDumper routine.
345 : *
346 : * Optional, but strongly recommended.
347 : */
348 : static void
349 12 : _StartLOs(ArchiveHandle *AH, TocEntry *te)
350 : {
351 12 : lclContext *ctx = (lclContext *) AH->formatData;
352 12 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
353 :
354 12 : tctx->dataPos = _getFilePos(AH, ctx);
355 12 : if (tctx->dataPos >= 0)
356 12 : tctx->dataState = K_OFFSET_POS_SET;
357 :
358 12 : _WriteByte(AH, BLK_BLOBS); /* Block type */
359 12 : WriteInt(AH, te->dumpId); /* For sanity check */
360 12 : }
361 :
362 : /*
363 : * Called by the archiver when the dumper calls StartLO.
364 : *
365 : * Mandatory.
366 : *
367 : * Must save the passed OID for retrieval at restore-time.
368 : */
369 : static void
370 12 : _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
371 : {
372 12 : lclContext *ctx = (lclContext *) AH->formatData;
373 :
374 12 : if (oid == 0)
375 0 : pg_fatal("invalid OID for large object");
376 :
377 12 : WriteInt(AH, oid);
378 :
379 12 : ctx->cs = AllocateCompressor(AH->compression_spec,
380 : NULL,
381 : _CustomWriteFunc);
382 12 : }
383 :
384 : /*
385 : * Called by the archiver when the dumper calls EndLO.
386 : *
387 : * Optional.
388 : */
389 : static void
390 12 : _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
391 : {
392 12 : lclContext *ctx = (lclContext *) AH->formatData;
393 :
394 12 : EndCompressor(AH, ctx->cs);
395 : /* Send the end marker */
396 12 : WriteInt(AH, 0);
397 12 : }
398 :
399 : /*
400 : * Called by the archiver when finishing saving BLOB DATA.
401 : *
402 : * Optional.
403 : */
404 : static void
405 12 : _EndLOs(ArchiveHandle *AH, TocEntry *te)
406 : {
407 : /* Write out a fake zero OID to mark end-of-LOs. */
408 12 : WriteInt(AH, 0);
409 12 : }
410 :
411 : /*
412 : * Print data for a given TOC entry
413 : */
414 : static void
415 208 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
416 : {
417 208 : lclContext *ctx = (lclContext *) AH->formatData;
418 208 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
419 : int blkType;
420 : int id;
421 :
422 208 : if (tctx->dataState == K_OFFSET_NO_DATA)
423 0 : return;
424 :
425 208 : if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
426 : {
427 : /*
428 : * We cannot seek directly to the desired block. Instead, skip over
429 : * block headers until we find the one we want. Remember the
430 : * positions of skipped-over blocks, so that if we later decide we
431 : * need to read one, we'll be able to seek to it.
432 : *
433 : * When our input file is seekable, we can do the search starting from
434 : * the point after the last data block we scanned in previous
435 : * iterations of this function.
436 : */
437 0 : if (ctx->hasSeek)
438 : {
439 0 : if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
440 0 : pg_fatal("error during file seek: %m");
441 : }
442 :
443 : for (;;)
444 0 : {
445 0 : pgoff_t thisBlkPos = _getFilePos(AH, ctx);
446 :
447 0 : _readBlockHeader(AH, &blkType, &id);
448 :
449 0 : if (blkType == EOF || id == te->dumpId)
450 : break;
451 :
452 : /* Remember the block position, if we got one */
453 0 : if (thisBlkPos >= 0)
454 : {
455 0 : TocEntry *otherte = getTocEntryByDumpId(AH, id);
456 :
457 0 : if (otherte && otherte->formatData)
458 : {
459 0 : lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
460 :
461 : /*
462 : * Note: on Windows, multiple threads might access/update
463 : * the same lclTocEntry concurrently, but that should be
464 : * safe as long as we update dataPos before dataState.
465 : * Ideally, we'd use pg_write_barrier() to enforce that,
466 : * but the needed infrastructure doesn't exist in frontend
467 : * code. But Windows only runs on machines with strong
468 : * store ordering, so it should be okay for now.
469 : */
470 0 : if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
471 : {
472 0 : othertctx->dataPos = thisBlkPos;
473 0 : othertctx->dataState = K_OFFSET_POS_SET;
474 : }
475 0 : else if (othertctx->dataPos != thisBlkPos ||
476 0 : othertctx->dataState != K_OFFSET_POS_SET)
477 : {
478 : /* sanity check */
479 0 : pg_log_warning("data block %d has wrong seek position",
480 : id);
481 : }
482 : }
483 : }
484 :
485 0 : switch (blkType)
486 : {
487 0 : case BLK_DATA:
488 0 : _skipData(AH);
489 0 : break;
490 :
491 0 : case BLK_BLOBS:
492 0 : _skipLOs(AH);
493 0 : break;
494 :
495 0 : default: /* Always have a default */
496 0 : pg_fatal("unrecognized data block type (%d) while searching archive",
497 : blkType);
498 : break;
499 : }
500 : }
501 : }
502 : else
503 : {
504 : /* We can just seek to the place we need to be. */
505 208 : if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
506 0 : pg_fatal("error during file seek: %m");
507 :
508 208 : _readBlockHeader(AH, &blkType, &id);
509 : }
510 :
511 : /*
512 : * If we reached EOF without finding the block we want, then either it
513 : * doesn't exist, or it does but we lack the ability to seek back to it.
514 : */
515 208 : if (blkType == EOF)
516 : {
517 0 : if (!ctx->hasSeek)
518 0 : pg_fatal("could not find block ID %d in archive -- "
519 : "possibly due to out-of-order restore request, "
520 : "which cannot be handled due to non-seekable input file",
521 : te->dumpId);
522 : else
523 0 : pg_fatal("could not find block ID %d in archive -- "
524 : "possibly corrupt archive",
525 : te->dumpId);
526 : }
527 :
528 : /* Are we sane? */
529 208 : if (id != te->dumpId)
530 0 : pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
531 : id, te->dumpId);
532 :
533 208 : switch (blkType)
534 : {
535 196 : case BLK_DATA:
536 196 : _PrintData(AH);
537 196 : break;
538 :
539 12 : case BLK_BLOBS:
540 12 : _LoadLOs(AH, AH->public.ropt->dropSchema);
541 12 : break;
542 :
543 0 : default: /* Always have a default */
544 0 : pg_fatal("unrecognized data block type %d while restoring archive",
545 : blkType);
546 : break;
547 : }
548 :
549 : /*
550 : * If our input file is seekable but lacks data offsets, update our
551 : * knowledge of where to start future searches from. (Note that we did
552 : * not update the current TE's dataState/dataPos. We could have, but
553 : * there is no point since it will not be visited again.)
554 : */
555 208 : if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
556 : {
557 0 : pgoff_t curPos = _getFilePos(AH, ctx);
558 :
559 0 : if (curPos > ctx->lastFilePos)
560 0 : ctx->lastFilePos = curPos;
561 : }
562 : }
563 :
564 : /*
565 : * Print data from current file position.
566 : */
567 : static void
568 208 : _PrintData(ArchiveHandle *AH)
569 : {
570 : CompressorState *cs;
571 :
572 208 : cs = AllocateCompressor(AH->compression_spec,
573 : _CustomReadFunc, NULL);
574 208 : cs->readData(AH, cs);
575 208 : EndCompressor(AH, cs);
576 208 : }
577 :
578 : static void
579 12 : _LoadLOs(ArchiveHandle *AH, bool drop)
580 : {
581 : Oid oid;
582 :
583 12 : StartRestoreLOs(AH);
584 :
585 12 : oid = ReadInt(AH);
586 24 : while (oid != 0)
587 : {
588 12 : StartRestoreLO(AH, oid, drop);
589 12 : _PrintData(AH);
590 12 : EndRestoreLO(AH, oid);
591 12 : oid = ReadInt(AH);
592 : }
593 :
594 12 : EndRestoreLOs(AH);
595 12 : }
596 :
597 : /*
598 : * Skip the LOs from the current file position.
599 : * LOs are written sequentially as data blocks (see below).
600 : * Each LO is preceded by its original OID.
601 : * A zero OID indicates the end of the LOs.
602 : */
603 : static void
604 0 : _skipLOs(ArchiveHandle *AH)
605 : {
606 : Oid oid;
607 :
608 0 : oid = ReadInt(AH);
609 0 : while (oid != 0)
610 : {
611 0 : _skipData(AH);
612 0 : oid = ReadInt(AH);
613 : }
614 0 : }
615 :
616 : /*
617 : * Skip data from current file position.
618 : * Data blocks are formatted as an integer length, followed by data.
619 : * A zero length indicates the end of the block.
620 : */
621 : static void
622 0 : _skipData(ArchiveHandle *AH)
623 : {
624 0 : lclContext *ctx = (lclContext *) AH->formatData;
625 : size_t blkLen;
626 0 : char *buf = NULL;
627 0 : int buflen = 0;
628 :
629 0 : blkLen = ReadInt(AH);
630 0 : while (blkLen != 0)
631 : {
632 0 : if (ctx->hasSeek)
633 : {
634 0 : if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
635 0 : pg_fatal("error during file seek: %m");
636 : }
637 : else
638 : {
639 0 : if (blkLen > buflen)
640 : {
641 0 : free(buf);
642 0 : buf = (char *) pg_malloc(blkLen);
643 0 : buflen = blkLen;
644 : }
645 0 : if (fread(buf, 1, blkLen, AH->FH) != blkLen)
646 : {
647 0 : if (feof(AH->FH))
648 0 : pg_fatal("could not read from input file: end of file");
649 : else
650 0 : pg_fatal("could not read from input file: %m");
651 : }
652 : }
653 :
654 0 : blkLen = ReadInt(AH);
655 : }
656 :
657 0 : free(buf);
658 0 : }
659 :
660 : /*
661 : * Write a byte of data to the archive.
662 : *
663 : * Mandatory.
664 : *
665 : * Called by the archiver to do integer & byte output to the archive.
666 : */
667 : static int
668 1300778 : _WriteByte(ArchiveHandle *AH, const int i)
669 : {
670 1300778 : if (fputc(i, AH->FH) == EOF)
671 0 : WRITE_ERROR_EXIT;
672 :
673 1300778 : return 1;
674 : }
675 :
676 : /*
677 : * Read a byte of data from the archive.
678 : *
679 : * Mandatory
680 : *
681 : * Called by the archiver to read bytes & integers from the archive.
682 : * EOF should be treated as a fatal error.
683 : */
684 : static int
685 843538 : _ReadByte(ArchiveHandle *AH)
686 : {
687 : int res;
688 :
689 843538 : res = getc(AH->FH);
690 843538 : if (res == EOF)
691 0 : READ_ERROR_EXIT(AH->FH);
692 843538 : return res;
693 : }
694 :
695 : /*
696 : * Write a buffer of data to the archive.
697 : *
698 : * Mandatory.
699 : *
700 : * Called by the archiver to write a block of bytes to the archive.
701 : */
702 : static void
703 135714 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
704 : {
705 135714 : if (fwrite(buf, 1, len, AH->FH) != len)
706 0 : WRITE_ERROR_EXIT;
707 135714 : }
708 :
709 : /*
710 : * Read a block of bytes from the archive.
711 : *
712 : * Mandatory.
713 : *
714 : * Called by the archiver to read a block of bytes from the archive
715 : */
716 : static void
717 87192 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
718 : {
719 87192 : if (fread(buf, 1, len, AH->FH) != len)
720 0 : READ_ERROR_EXIT(AH->FH);
721 87192 : }
722 :
723 : /*
724 : * Close the archive.
725 : *
726 : * Mandatory.
727 : *
728 : * When writing the archive, this is the routine that actually starts
729 : * the process of saving it to files. No data should be written prior
730 : * to this point, since the user could sort the TOC after creating it.
731 : *
732 : * If an archive is to be written, this routine must call:
733 : * WriteHead to save the archive header
734 : * WriteToc to save the TOC entries
735 : * WriteDataChunks to save all data & LOs.
736 : *
737 : */
738 : static void
739 86 : _CloseArchive(ArchiveHandle *AH)
740 : {
741 86 : lclContext *ctx = (lclContext *) AH->formatData;
742 : pgoff_t tpos;
743 :
744 86 : if (AH->mode == archModeWrite)
745 : {
746 38 : WriteHead(AH);
747 : /* Remember TOC's seek position for use below */
748 38 : tpos = ftello(AH->FH);
749 38 : if (tpos < 0 && ctx->hasSeek)
750 0 : pg_fatal("could not determine seek position in archive file: %m");
751 38 : WriteToc(AH);
752 38 : WriteDataChunks(AH, NULL);
753 :
754 : /*
755 : * If possible, re-write the TOC in order to update the data offset
756 : * information. This is not essential, as pg_restore can cope in most
757 : * cases without it; but it can make pg_restore significantly faster
758 : * in some situations (especially parallel restore).
759 : */
760 76 : if (ctx->hasSeek &&
761 38 : fseeko(AH->FH, tpos, SEEK_SET) == 0)
762 38 : WriteToc(AH);
763 : }
764 :
765 86 : if (fclose(AH->FH) != 0)
766 0 : pg_fatal("could not close archive file: %m");
767 :
768 : /* Sync the output file if one is defined */
769 86 : if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
770 10 : (void) fsync_fname(AH->fSpec, false);
771 :
772 86 : AH->FH = NULL;
773 86 : }
774 :
775 : /*
776 : * Reopen the archive's file handle.
777 : *
778 : * We close the original file handle, except on Windows. (The difference
779 : * is because on Windows, this is used within a multithreading context,
780 : * and we don't want a thread closing the parent file handle.)
781 : */
782 : static void
783 0 : _ReopenArchive(ArchiveHandle *AH)
784 : {
785 0 : lclContext *ctx = (lclContext *) AH->formatData;
786 : pgoff_t tpos;
787 :
788 0 : if (AH->mode == archModeWrite)
789 0 : pg_fatal("can only reopen input archives");
790 :
791 : /*
792 : * These two cases are user-facing errors since they represent unsupported
793 : * (but not invalid) use-cases. Word the error messages appropriately.
794 : */
795 0 : if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
796 0 : pg_fatal("parallel restore from standard input is not supported");
797 0 : if (!ctx->hasSeek)
798 0 : pg_fatal("parallel restore from non-seekable file is not supported");
799 :
800 0 : tpos = ftello(AH->FH);
801 0 : if (tpos < 0)
802 0 : pg_fatal("could not determine seek position in archive file: %m");
803 :
804 : #ifndef WIN32
805 0 : if (fclose(AH->FH) != 0)
806 0 : pg_fatal("could not close archive file: %m");
807 : #endif
808 :
809 0 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
810 0 : if (!AH->FH)
811 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
812 :
813 0 : if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
814 0 : pg_fatal("could not set seek position in archive file: %m");
815 0 : }
816 :
817 : /*
818 : * Prepare for parallel restore.
819 : *
820 : * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
821 : * TOC entries' dataLength fields with appropriate values to guide the
822 : * ordering of restore jobs. The source of said data is format-dependent,
823 : * as is the exact meaning of the values.
824 : *
825 : * A format module might also choose to do other setup here.
826 : */
827 : static void
828 0 : _PrepParallelRestore(ArchiveHandle *AH)
829 : {
830 0 : lclContext *ctx = (lclContext *) AH->formatData;
831 0 : TocEntry *prev_te = NULL;
832 0 : lclTocEntry *prev_tctx = NULL;
833 : TocEntry *te;
834 :
835 : /*
836 : * Knowing that the data items were dumped out in TOC order, we can
837 : * reconstruct the length of each item as the delta to the start offset of
838 : * the next data item.
839 : */
840 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
841 : {
842 0 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
843 :
844 : /*
845 : * Ignore entries without a known data offset; if we were unable to
846 : * seek to rewrite the TOC when creating the archive, this'll be all
847 : * of them, and we'll end up with no size estimates.
848 : */
849 0 : if (tctx->dataState != K_OFFSET_POS_SET)
850 0 : continue;
851 :
852 : /* Compute previous data item's length */
853 0 : if (prev_te)
854 : {
855 0 : if (tctx->dataPos > prev_tctx->dataPos)
856 0 : prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
857 : }
858 :
859 0 : prev_te = te;
860 0 : prev_tctx = tctx;
861 : }
862 :
863 : /* If OK to seek, we can determine the length of the last item */
864 0 : if (prev_te && ctx->hasSeek)
865 : {
866 : pgoff_t endpos;
867 :
868 0 : if (fseeko(AH->FH, 0, SEEK_END) != 0)
869 0 : pg_fatal("error during file seek: %m");
870 0 : endpos = ftello(AH->FH);
871 0 : if (endpos > prev_tctx->dataPos)
872 0 : prev_te->dataLength = endpos - prev_tctx->dataPos;
873 : }
874 0 : }
875 :
876 : /*
877 : * Clone format-specific fields during parallel restoration.
878 : */
879 : static void
880 0 : _Clone(ArchiveHandle *AH)
881 : {
882 0 : lclContext *ctx = (lclContext *) AH->formatData;
883 :
884 : /*
885 : * Each thread must have private lclContext working state.
886 : */
887 0 : AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
888 0 : memcpy(AH->formatData, ctx, sizeof(lclContext));
889 0 : ctx = (lclContext *) AH->formatData;
890 :
891 : /* sanity check, shouldn't happen */
892 0 : if (ctx->cs != NULL)
893 0 : pg_fatal("compressor active");
894 :
895 : /*
896 : * We intentionally do not clone TOC-entry-local state: it's useful to
897 : * share knowledge about where the data blocks are across threads.
898 : * _PrintTocData has to be careful about the order of operations on that
899 : * state, though.
900 : */
901 0 : }
902 :
903 : static void
904 0 : _DeClone(ArchiveHandle *AH)
905 : {
906 0 : lclContext *ctx = (lclContext *) AH->formatData;
907 :
908 0 : free(ctx);
909 0 : }
910 :
911 : /*
912 : * This function is executed in the child of a parallel restore from a
913 : * custom-format archive and restores the actual data for one TOC entry.
914 : */
915 : static int
916 0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
917 : {
918 0 : return parallel_restore(AH, te);
919 : }
920 :
921 : /*--------------------------------------------------
922 : * END OF FORMAT CALLBACKS
923 : *--------------------------------------------------
924 : */
925 :
926 : /*
927 : * Get the current position in the archive file.
928 : *
929 : * With a non-seekable archive file, we may not be able to obtain the
930 : * file position. If so, just return -1. It's not too important in
931 : * that case because we won't be able to rewrite the TOC to fill in
932 : * data block offsets anyway.
933 : */
934 : static pgoff_t
935 270 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
936 : {
937 : pgoff_t pos;
938 :
939 270 : pos = ftello(AH->FH);
940 270 : if (pos < 0)
941 : {
942 : /* Not expected if we found we can seek. */
943 0 : if (ctx->hasSeek)
944 0 : pg_fatal("could not determine seek position in archive file: %m");
945 : }
946 270 : return pos;
947 : }
948 :
949 : /*
950 : * Read a data block header. The format changed in V1.3, so we
951 : * centralize the code here for simplicity. Returns *type = EOF
952 : * if at EOF.
953 : */
954 : static void
955 208 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
956 : {
957 : int byt;
958 :
959 : /*
960 : * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
961 : * inside ReadInt rather than returning EOF. It doesn't seem worth
962 : * jumping through hoops to deal with that case better, because no such
963 : * files are likely to exist in the wild: only some 7.1 development
964 : * versions of pg_dump ever generated such files.
965 : */
966 208 : if (AH->version < K_VERS_1_3)
967 0 : *type = BLK_DATA;
968 : else
969 : {
970 208 : byt = getc(AH->FH);
971 208 : *type = byt;
972 208 : if (byt == EOF)
973 : {
974 0 : *id = 0; /* don't return an uninitialized value */
975 0 : return;
976 : }
977 : }
978 :
979 208 : *id = ReadInt(AH);
980 : }
981 :
982 : /*
983 : * Callback function for writeData. Writes one block of (compressed)
984 : * data to the archive.
985 : */
986 : static void
987 418 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
988 : {
989 : /* never write 0-byte blocks (this should not happen) */
990 418 : if (len > 0)
991 : {
992 290 : WriteInt(AH, len);
993 290 : _WriteBuf(AH, buf, len);
994 : }
995 418 : }
996 :
997 : /*
998 : * Callback function for readData. To keep things simple, we
999 : * always read one compressed block at a time.
1000 : */
1001 : static size_t
1002 484 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1003 : {
1004 : size_t blkLen;
1005 :
1006 : /* Read length */
1007 484 : blkLen = ReadInt(AH);
1008 484 : if (blkLen == 0)
1009 208 : return 0;
1010 :
1011 : /* If the caller's buffer is not large enough, allocate a bigger one */
1012 276 : if (blkLen > *buflen)
1013 : {
1014 2 : free(*buf);
1015 2 : *buf = (char *) pg_malloc(blkLen);
1016 2 : *buflen = blkLen;
1017 : }
1018 :
1019 : /* exits app on read errors */
1020 276 : _ReadBuf(AH, *buf, blkLen);
1021 :
1022 276 : return blkLen;
1023 : }
|