Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_custom.c
4 : *
5 : * Implements the custom output format.
6 : *
7 : * The comments with the routines in this code are a good place to
8 : * understand how to write a new format.
9 : *
10 : * See the headers to pg_restore for more details.
11 : *
12 : * Copyright (c) 2000, Philip Warner
13 : * Rights are granted to use this software in any way so long
14 : * as this notice is not removed.
15 : *
16 : * The author is not responsible for loss or damages that may
17 : * and any liability will be limited to the time taken to fix any
18 : * related bug.
19 : *
20 : *
21 : * IDENTIFICATION
22 : * src/bin/pg_dump/pg_backup_custom.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres_fe.h"
27 :
28 : #include "common/file_utils.h"
29 : #include "compress_io.h"
30 : #include "parallel.h"
31 : #include "pg_backup_utils.h"
32 :
33 : /*--------
34 : * Routines in the format interface
35 : *--------
36 : */
37 :
38 : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 : static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 : static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 : static int _WriteByte(ArchiveHandle *AH, const int i);
43 : static int _ReadByte(ArchiveHandle *AH);
44 : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 : static void _CloseArchive(ArchiveHandle *AH);
47 : static void _ReopenArchive(ArchiveHandle *AH);
48 : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52 :
53 : static void _PrintData(ArchiveHandle *AH);
54 : static void _skipData(ArchiveHandle *AH);
55 : static void _skipLOs(ArchiveHandle *AH);
56 :
57 : static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
58 : static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 : static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 : static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
61 : static void _LoadLOs(ArchiveHandle *AH, bool drop);
62 :
63 : static void _PrepParallelRestore(ArchiveHandle *AH);
64 : static void _Clone(ArchiveHandle *AH);
65 : static void _DeClone(ArchiveHandle *AH);
66 :
67 : static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
68 :
69 : typedef struct
70 : {
71 : CompressorState *cs;
72 : int hasSeek;
73 : /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
74 : pgoff_t lastFilePos; /* position after last data block we've read */
75 : } lclContext;
76 :
77 : typedef struct
78 : {
79 : int dataState;
80 : pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
81 : } lclTocEntry;
82 :
83 :
84 : /*------
85 : * Static declarations
86 : *------
87 : */
88 : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
89 : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
90 :
91 : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
92 : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
93 :
94 :
95 : /*
96 : * Init routine required by ALL formats. This is a global routine
97 : * and should be declared in pg_backup_archiver.h
98 : *
99 : * It's task is to create any extra archive context (using AH->formatData),
100 : * and to initialize the supported function pointers.
101 : *
102 : * It should also prepare whatever its input source is for reading/writing,
103 : * and in the case of a read mode connection, it should load the Header & TOC.
104 : */
105 : void
106 86 : InitArchiveFmt_Custom(ArchiveHandle *AH)
107 : {
108 : lclContext *ctx;
109 :
110 : /* Assuming static functions, this can be copied for each format. */
111 86 : AH->ArchiveEntryPtr = _ArchiveEntry;
112 86 : AH->StartDataPtr = _StartData;
113 86 : AH->WriteDataPtr = _WriteData;
114 86 : AH->EndDataPtr = _EndData;
115 86 : AH->WriteBytePtr = _WriteByte;
116 86 : AH->ReadBytePtr = _ReadByte;
117 86 : AH->WriteBufPtr = _WriteBuf;
118 86 : AH->ReadBufPtr = _ReadBuf;
119 86 : AH->ClosePtr = _CloseArchive;
120 86 : AH->ReopenPtr = _ReopenArchive;
121 86 : AH->PrintTocDataPtr = _PrintTocData;
122 86 : AH->ReadExtraTocPtr = _ReadExtraToc;
123 86 : AH->WriteExtraTocPtr = _WriteExtraToc;
124 86 : AH->PrintExtraTocPtr = _PrintExtraToc;
125 :
126 86 : AH->StartLOsPtr = _StartLOs;
127 86 : AH->StartLOPtr = _StartLO;
128 86 : AH->EndLOPtr = _EndLO;
129 86 : AH->EndLOsPtr = _EndLOs;
130 :
131 86 : AH->PrepParallelRestorePtr = _PrepParallelRestore;
132 86 : AH->ClonePtr = _Clone;
133 86 : AH->DeClonePtr = _DeClone;
134 :
135 : /* no parallel dump in the custom archive, only parallel restore */
136 86 : AH->WorkerJobDumpPtr = NULL;
137 86 : AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
138 :
139 : /* Set up a private area. */
140 86 : ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
141 86 : AH->formatData = (void *) ctx;
142 :
143 : /*
144 : * Now open the file
145 : */
146 86 : if (AH->mode == archModeWrite)
147 : {
148 38 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
149 : {
150 38 : AH->FH = fopen(AH->fSpec, PG_BINARY_W);
151 38 : if (!AH->FH)
152 0 : pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
153 : }
154 : else
155 : {
156 0 : AH->FH = stdout;
157 0 : if (!AH->FH)
158 0 : pg_fatal("could not open output file: %m");
159 : }
160 :
161 38 : ctx->hasSeek = checkSeek(AH->FH);
162 : }
163 : else
164 : {
165 48 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
166 : {
167 48 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
168 48 : if (!AH->FH)
169 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
170 : }
171 : else
172 : {
173 0 : AH->FH = stdin;
174 0 : if (!AH->FH)
175 0 : pg_fatal("could not open input file: %m");
176 : }
177 :
178 48 : ctx->hasSeek = checkSeek(AH->FH);
179 :
180 48 : ReadHead(AH);
181 48 : ReadToc(AH);
182 :
183 : /*
184 : * Remember location of first data block (i.e., the point after TOC)
185 : * in case we have to search for desired data blocks.
186 : */
187 48 : ctx->lastFilePos = _getFilePos(AH, ctx);
188 : }
189 86 : }
190 :
191 : /*
192 : * Called by the Archiver when the dumper creates a new TOC entry.
193 : *
194 : * Optional.
195 : *
196 : * Set up extract format-related TOC data.
197 : */
198 : static void
199 6346 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
200 : {
201 : lclTocEntry *ctx;
202 :
203 6346 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
204 6346 : if (te->dataDumper)
205 228 : ctx->dataState = K_OFFSET_POS_NOT_SET;
206 : else
207 6118 : ctx->dataState = K_OFFSET_NO_DATA;
208 :
209 6346 : te->formatData = (void *) ctx;
210 6346 : }
211 :
212 : /*
213 : * Called by the Archiver to save any extra format-related TOC entry
214 : * data.
215 : *
216 : * Optional.
217 : *
218 : * Use the Archiver routines to write data - they are non-endian, and
219 : * maintain other important file information.
220 : */
221 : static void
222 12680 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
223 : {
224 12680 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
225 :
226 12680 : WriteOffset(AH, ctx->dataPos, ctx->dataState);
227 12680 : }
228 :
229 : /*
230 : * Called by the Archiver to read any extra format-related TOC data.
231 : *
232 : * Optional.
233 : *
234 : * Needs to match the order defined in _WriteExtraToc, and should also
235 : * use the Archiver input routines.
236 : */
237 : static void
238 8202 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
239 : {
240 8202 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
241 :
242 8202 : if (ctx == NULL)
243 : {
244 8202 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
245 8202 : te->formatData = (void *) ctx;
246 : }
247 :
248 8202 : ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
249 :
250 : /*
251 : * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
252 : * dump it at all.
253 : */
254 8202 : if (AH->version < K_VERS_1_7)
255 0 : ReadInt(AH);
256 8202 : }
257 :
258 : /*
259 : * Called by the Archiver when restoring an archive to output a comment
260 : * that includes useful information about the TOC entry.
261 : *
262 : * Optional.
263 : */
264 : static void
265 2240 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
266 : {
267 2240 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
268 :
269 2240 : if (AH->public.verbose)
270 482 : ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
271 482 : (int64) ctx->dataPos);
272 2240 : }
273 :
274 : /*
275 : * Called by the archiver when saving TABLE DATA (not schema). This routine
276 : * should save whatever format-specific information is needed to read
277 : * the archive back.
278 : *
279 : * It is called just prior to the dumper's 'DataDumper' routine being called.
280 : *
281 : * Optional, but strongly recommended.
282 : *
283 : */
284 : static void
285 210 : _StartData(ArchiveHandle *AH, TocEntry *te)
286 : {
287 210 : lclContext *ctx = (lclContext *) AH->formatData;
288 210 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
289 :
290 210 : tctx->dataPos = _getFilePos(AH, ctx);
291 210 : if (tctx->dataPos >= 0)
292 210 : tctx->dataState = K_OFFSET_POS_SET;
293 :
294 210 : _WriteByte(AH, BLK_DATA); /* Block type */
295 210 : WriteInt(AH, te->dumpId); /* For sanity check */
296 :
297 210 : ctx->cs = AllocateCompressor(AH->compression_spec,
298 : NULL,
299 : _CustomWriteFunc);
300 210 : }
301 :
302 : /*
303 : * Called by archiver when dumper calls WriteData. This routine is
304 : * called for both LO and table data; it is the responsibility of
305 : * the format to manage each kind of data using StartLO/StartData.
306 : *
307 : * It should only be called from within a DataDumper routine.
308 : *
309 : * Mandatory.
310 : */
311 : static void
312 418 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
313 : {
314 418 : lclContext *ctx = (lclContext *) AH->formatData;
315 418 : CompressorState *cs = ctx->cs;
316 :
317 418 : if (dLen > 0)
318 : /* writeData() internally throws write errors */
319 406 : cs->writeData(AH, cs, data, dLen);
320 418 : }
321 :
322 : /*
323 : * Called by the archiver when a dumper's 'DataDumper' routine has
324 : * finished.
325 : *
326 : * Mandatory.
327 : */
328 : static void
329 210 : _EndData(ArchiveHandle *AH, TocEntry *te)
330 : {
331 210 : lclContext *ctx = (lclContext *) AH->formatData;
332 :
333 210 : EndCompressor(AH, ctx->cs);
334 210 : ctx->cs = NULL;
335 :
336 : /* Send the end marker */
337 210 : WriteInt(AH, 0);
338 210 : }
339 :
340 : /*
341 : * Called by the archiver when starting to save BLOB DATA (not schema).
342 : * This routine should save whatever format-specific information is needed
343 : * to read the LOs back into memory.
344 : *
345 : * It is called just prior to the dumper's DataDumper routine.
346 : *
347 : * Optional, but strongly recommended.
348 : */
349 : static void
350 12 : _StartLOs(ArchiveHandle *AH, TocEntry *te)
351 : {
352 12 : lclContext *ctx = (lclContext *) AH->formatData;
353 12 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
354 :
355 12 : tctx->dataPos = _getFilePos(AH, ctx);
356 12 : if (tctx->dataPos >= 0)
357 12 : tctx->dataState = K_OFFSET_POS_SET;
358 :
359 12 : _WriteByte(AH, BLK_BLOBS); /* Block type */
360 12 : WriteInt(AH, te->dumpId); /* For sanity check */
361 12 : }
362 :
363 : /*
364 : * Called by the archiver when the dumper calls StartLO.
365 : *
366 : * Mandatory.
367 : *
368 : * Must save the passed OID for retrieval at restore-time.
369 : */
370 : static void
371 12 : _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
372 : {
373 12 : lclContext *ctx = (lclContext *) AH->formatData;
374 :
375 12 : if (oid == 0)
376 0 : pg_fatal("invalid OID for large object");
377 :
378 12 : WriteInt(AH, oid);
379 :
380 12 : ctx->cs = AllocateCompressor(AH->compression_spec,
381 : NULL,
382 : _CustomWriteFunc);
383 12 : }
384 :
385 : /*
386 : * Called by the archiver when the dumper calls EndLO.
387 : *
388 : * Optional.
389 : */
390 : static void
391 12 : _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
392 : {
393 12 : lclContext *ctx = (lclContext *) AH->formatData;
394 :
395 12 : EndCompressor(AH, ctx->cs);
396 : /* Send the end marker */
397 12 : WriteInt(AH, 0);
398 12 : }
399 :
400 : /*
401 : * Called by the archiver when finishing saving BLOB DATA.
402 : *
403 : * Optional.
404 : */
405 : static void
406 12 : _EndLOs(ArchiveHandle *AH, TocEntry *te)
407 : {
408 : /* Write out a fake zero OID to mark end-of-LOs. */
409 12 : WriteInt(AH, 0);
410 12 : }
411 :
412 : /*
413 : * Print data for a given TOC entry
414 : */
415 : static void
416 208 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
417 : {
418 208 : lclContext *ctx = (lclContext *) AH->formatData;
419 208 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
420 : int blkType;
421 : int id;
422 :
423 208 : if (tctx->dataState == K_OFFSET_NO_DATA)
424 0 : return;
425 :
426 208 : if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
427 : {
428 : /*
429 : * We cannot seek directly to the desired block. Instead, skip over
430 : * block headers until we find the one we want. Remember the
431 : * positions of skipped-over blocks, so that if we later decide we
432 : * need to read one, we'll be able to seek to it.
433 : *
434 : * When our input file is seekable, we can do the search starting from
435 : * the point after the last data block we scanned in previous
436 : * iterations of this function.
437 : */
438 0 : if (ctx->hasSeek)
439 : {
440 0 : if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
441 0 : pg_fatal("error during file seek: %m");
442 : }
443 :
444 : for (;;)
445 0 : {
446 0 : pgoff_t thisBlkPos = _getFilePos(AH, ctx);
447 :
448 0 : _readBlockHeader(AH, &blkType, &id);
449 :
450 0 : if (blkType == EOF || id == te->dumpId)
451 : break;
452 :
453 : /* Remember the block position, if we got one */
454 0 : if (thisBlkPos >= 0)
455 : {
456 0 : TocEntry *otherte = getTocEntryByDumpId(AH, id);
457 :
458 0 : if (otherte && otherte->formatData)
459 : {
460 0 : lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
461 :
462 : /*
463 : * Note: on Windows, multiple threads might access/update
464 : * the same lclTocEntry concurrently, but that should be
465 : * safe as long as we update dataPos before dataState.
466 : * Ideally, we'd use pg_write_barrier() to enforce that,
467 : * but the needed infrastructure doesn't exist in frontend
468 : * code. But Windows only runs on machines with strong
469 : * store ordering, so it should be okay for now.
470 : */
471 0 : if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
472 : {
473 0 : othertctx->dataPos = thisBlkPos;
474 0 : othertctx->dataState = K_OFFSET_POS_SET;
475 : }
476 0 : else if (othertctx->dataPos != thisBlkPos ||
477 0 : othertctx->dataState != K_OFFSET_POS_SET)
478 : {
479 : /* sanity check */
480 0 : pg_log_warning("data block %d has wrong seek position",
481 : id);
482 : }
483 : }
484 : }
485 :
486 0 : switch (blkType)
487 : {
488 0 : case BLK_DATA:
489 0 : _skipData(AH);
490 0 : break;
491 :
492 0 : case BLK_BLOBS:
493 0 : _skipLOs(AH);
494 0 : break;
495 :
496 0 : default: /* Always have a default */
497 0 : pg_fatal("unrecognized data block type (%d) while searching archive",
498 : blkType);
499 : break;
500 : }
501 : }
502 : }
503 : else
504 : {
505 : /* We can just seek to the place we need to be. */
506 208 : if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
507 0 : pg_fatal("error during file seek: %m");
508 :
509 208 : _readBlockHeader(AH, &blkType, &id);
510 : }
511 :
512 : /*
513 : * If we reached EOF without finding the block we want, then either it
514 : * doesn't exist, or it does but we lack the ability to seek back to it.
515 : */
516 208 : if (blkType == EOF)
517 : {
518 0 : if (!ctx->hasSeek)
519 0 : pg_fatal("could not find block ID %d in archive -- "
520 : "possibly due to out-of-order restore request, "
521 : "which cannot be handled due to non-seekable input file",
522 : te->dumpId);
523 : else
524 0 : pg_fatal("could not find block ID %d in archive -- "
525 : "possibly corrupt archive",
526 : te->dumpId);
527 : }
528 :
529 : /* Are we sane? */
530 208 : if (id != te->dumpId)
531 0 : pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
532 : id, te->dumpId);
533 :
534 208 : switch (blkType)
535 : {
536 196 : case BLK_DATA:
537 196 : _PrintData(AH);
538 196 : break;
539 :
540 12 : case BLK_BLOBS:
541 12 : _LoadLOs(AH, AH->public.ropt->dropSchema);
542 12 : break;
543 :
544 0 : default: /* Always have a default */
545 0 : pg_fatal("unrecognized data block type %d while restoring archive",
546 : blkType);
547 : break;
548 : }
549 :
550 : /*
551 : * If our input file is seekable but lacks data offsets, update our
552 : * knowledge of where to start future searches from. (Note that we did
553 : * not update the current TE's dataState/dataPos. We could have, but
554 : * there is no point since it will not be visited again.)
555 : */
556 208 : if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
557 : {
558 0 : pgoff_t curPos = _getFilePos(AH, ctx);
559 :
560 0 : if (curPos > ctx->lastFilePos)
561 0 : ctx->lastFilePos = curPos;
562 : }
563 : }
564 :
565 : /*
566 : * Print data from current file position.
567 : */
568 : static void
569 208 : _PrintData(ArchiveHandle *AH)
570 : {
571 : CompressorState *cs;
572 :
573 208 : cs = AllocateCompressor(AH->compression_spec,
574 : _CustomReadFunc, NULL);
575 208 : cs->readData(AH, cs);
576 208 : EndCompressor(AH, cs);
577 208 : }
578 :
579 : static void
580 12 : _LoadLOs(ArchiveHandle *AH, bool drop)
581 : {
582 : Oid oid;
583 :
584 12 : StartRestoreLOs(AH);
585 :
586 12 : oid = ReadInt(AH);
587 24 : while (oid != 0)
588 : {
589 12 : StartRestoreLO(AH, oid, drop);
590 12 : _PrintData(AH);
591 12 : EndRestoreLO(AH, oid);
592 12 : oid = ReadInt(AH);
593 : }
594 :
595 12 : EndRestoreLOs(AH);
596 12 : }
597 :
598 : /*
599 : * Skip the LOs from the current file position.
600 : * LOs are written sequentially as data blocks (see below).
601 : * Each LO is preceded by its original OID.
602 : * A zero OID indicates the end of the LOs.
603 : */
604 : static void
605 0 : _skipLOs(ArchiveHandle *AH)
606 : {
607 : Oid oid;
608 :
609 0 : oid = ReadInt(AH);
610 0 : while (oid != 0)
611 : {
612 0 : _skipData(AH);
613 0 : oid = ReadInt(AH);
614 : }
615 0 : }
616 :
617 : /*
618 : * Skip data from current file position.
619 : * Data blocks are formatted as an integer length, followed by data.
620 : * A zero length indicates the end of the block.
621 : */
622 : static void
623 0 : _skipData(ArchiveHandle *AH)
624 : {
625 0 : lclContext *ctx = (lclContext *) AH->formatData;
626 : size_t blkLen;
627 0 : char *buf = NULL;
628 0 : int buflen = 0;
629 :
630 0 : blkLen = ReadInt(AH);
631 0 : while (blkLen != 0)
632 : {
633 0 : if (ctx->hasSeek)
634 : {
635 0 : if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
636 0 : pg_fatal("error during file seek: %m");
637 : }
638 : else
639 : {
640 0 : if (blkLen > buflen)
641 : {
642 0 : free(buf);
643 0 : buf = (char *) pg_malloc(blkLen);
644 0 : buflen = blkLen;
645 : }
646 0 : if (fread(buf, 1, blkLen, AH->FH) != blkLen)
647 : {
648 0 : if (feof(AH->FH))
649 0 : pg_fatal("could not read from input file: end of file");
650 : else
651 0 : pg_fatal("could not read from input file: %m");
652 : }
653 : }
654 :
655 0 : blkLen = ReadInt(AH);
656 : }
657 :
658 0 : free(buf);
659 0 : }
660 :
661 : /*
662 : * Write a byte of data to the archive.
663 : *
664 : * Mandatory.
665 : *
666 : * Called by the archiver to do integer & byte output to the archive.
667 : */
668 : static int
669 1287118 : _WriteByte(ArchiveHandle *AH, const int i)
670 : {
671 1287118 : if (fputc(i, AH->FH) == EOF)
672 0 : WRITE_ERROR_EXIT;
673 :
674 1287118 : return 1;
675 : }
676 :
677 : /*
678 : * Read a byte of data from the archive.
679 : *
680 : * Mandatory
681 : *
682 : * Called by the archiver to read bytes & integers from the archive.
683 : * EOF should be treated as a fatal error.
684 : */
685 : static int
686 836144 : _ReadByte(ArchiveHandle *AH)
687 : {
688 : int res;
689 :
690 836144 : res = getc(AH->FH);
691 836144 : if (res == EOF)
692 0 : READ_ERROR_EXIT(AH->FH);
693 836144 : return res;
694 : }
695 :
696 : /*
697 : * Write a buffer of data to the archive.
698 : *
699 : * Mandatory.
700 : *
701 : * Called by the archiver to write a block of bytes to the archive.
702 : */
703 : static void
704 134206 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
705 : {
706 134206 : if (fwrite(buf, 1, len, AH->FH) != len)
707 0 : WRITE_ERROR_EXIT;
708 134206 : }
709 :
710 : /*
711 : * Read a block of bytes from the archive.
712 : *
713 : * Mandatory.
714 : *
715 : * Called by the archiver to read a block of bytes from the archive
716 : */
717 : static void
718 86390 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
719 : {
720 86390 : if (fread(buf, 1, len, AH->FH) != len)
721 0 : READ_ERROR_EXIT(AH->FH);
722 86390 : }
723 :
724 : /*
725 : * Close the archive.
726 : *
727 : * Mandatory.
728 : *
729 : * When writing the archive, this is the routine that actually starts
730 : * the process of saving it to files. No data should be written prior
731 : * to this point, since the user could sort the TOC after creating it.
732 : *
733 : * If an archive is to be written, this routine must call:
734 : * WriteHead to save the archive header
735 : * WriteToc to save the TOC entries
736 : * WriteDataChunks to save all data & LOs.
737 : *
738 : */
739 : static void
740 86 : _CloseArchive(ArchiveHandle *AH)
741 : {
742 86 : lclContext *ctx = (lclContext *) AH->formatData;
743 : pgoff_t tpos;
744 :
745 86 : if (AH->mode == archModeWrite)
746 : {
747 38 : WriteHead(AH);
748 : /* Remember TOC's seek position for use below */
749 38 : tpos = ftello(AH->FH);
750 38 : if (tpos < 0 && ctx->hasSeek)
751 0 : pg_fatal("could not determine seek position in archive file: %m");
752 38 : WriteToc(AH);
753 38 : WriteDataChunks(AH, NULL);
754 :
755 : /*
756 : * If possible, re-write the TOC in order to update the data offset
757 : * information. This is not essential, as pg_restore can cope in most
758 : * cases without it; but it can make pg_restore significantly faster
759 : * in some situations (especially parallel restore).
760 : */
761 76 : if (ctx->hasSeek &&
762 38 : fseeko(AH->FH, tpos, SEEK_SET) == 0)
763 38 : WriteToc(AH);
764 : }
765 :
766 86 : if (fclose(AH->FH) != 0)
767 0 : pg_fatal("could not close archive file: %m");
768 :
769 : /* Sync the output file if one is defined */
770 86 : if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
771 10 : (void) fsync_fname(AH->fSpec, false);
772 :
773 86 : AH->FH = NULL;
774 86 : }
775 :
776 : /*
777 : * Reopen the archive's file handle.
778 : *
779 : * We close the original file handle, except on Windows. (The difference
780 : * is because on Windows, this is used within a multithreading context,
781 : * and we don't want a thread closing the parent file handle.)
782 : */
783 : static void
784 0 : _ReopenArchive(ArchiveHandle *AH)
785 : {
786 0 : lclContext *ctx = (lclContext *) AH->formatData;
787 : pgoff_t tpos;
788 :
789 0 : if (AH->mode == archModeWrite)
790 0 : pg_fatal("can only reopen input archives");
791 :
792 : /*
793 : * These two cases are user-facing errors since they represent unsupported
794 : * (but not invalid) use-cases. Word the error messages appropriately.
795 : */
796 0 : if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
797 0 : pg_fatal("parallel restore from standard input is not supported");
798 0 : if (!ctx->hasSeek)
799 0 : pg_fatal("parallel restore from non-seekable file is not supported");
800 :
801 0 : tpos = ftello(AH->FH);
802 0 : if (tpos < 0)
803 0 : pg_fatal("could not determine seek position in archive file: %m");
804 :
805 : #ifndef WIN32
806 0 : if (fclose(AH->FH) != 0)
807 0 : pg_fatal("could not close archive file: %m");
808 : #endif
809 :
810 0 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
811 0 : if (!AH->FH)
812 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
813 :
814 0 : if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
815 0 : pg_fatal("could not set seek position in archive file: %m");
816 0 : }
817 :
818 : /*
819 : * Prepare for parallel restore.
820 : *
821 : * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
822 : * TOC entries' dataLength fields with appropriate values to guide the
823 : * ordering of restore jobs. The source of said data is format-dependent,
824 : * as is the exact meaning of the values.
825 : *
826 : * A format module might also choose to do other setup here.
827 : */
828 : static void
829 0 : _PrepParallelRestore(ArchiveHandle *AH)
830 : {
831 0 : lclContext *ctx = (lclContext *) AH->formatData;
832 0 : TocEntry *prev_te = NULL;
833 0 : lclTocEntry *prev_tctx = NULL;
834 : TocEntry *te;
835 :
836 : /*
837 : * Knowing that the data items were dumped out in TOC order, we can
838 : * reconstruct the length of each item as the delta to the start offset of
839 : * the next data item.
840 : */
841 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
842 : {
843 0 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
844 :
845 : /*
846 : * Ignore entries without a known data offset; if we were unable to
847 : * seek to rewrite the TOC when creating the archive, this'll be all
848 : * of them, and we'll end up with no size estimates.
849 : */
850 0 : if (tctx->dataState != K_OFFSET_POS_SET)
851 0 : continue;
852 :
853 : /* Compute previous data item's length */
854 0 : if (prev_te)
855 : {
856 0 : if (tctx->dataPos > prev_tctx->dataPos)
857 0 : prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
858 : }
859 :
860 0 : prev_te = te;
861 0 : prev_tctx = tctx;
862 : }
863 :
864 : /* If OK to seek, we can determine the length of the last item */
865 0 : if (prev_te && ctx->hasSeek)
866 : {
867 : pgoff_t endpos;
868 :
869 0 : if (fseeko(AH->FH, 0, SEEK_END) != 0)
870 0 : pg_fatal("error during file seek: %m");
871 0 : endpos = ftello(AH->FH);
872 0 : if (endpos > prev_tctx->dataPos)
873 0 : prev_te->dataLength = endpos - prev_tctx->dataPos;
874 : }
875 0 : }
876 :
877 : /*
878 : * Clone format-specific fields during parallel restoration.
879 : */
880 : static void
881 0 : _Clone(ArchiveHandle *AH)
882 : {
883 0 : lclContext *ctx = (lclContext *) AH->formatData;
884 :
885 : /*
886 : * Each thread must have private lclContext working state.
887 : */
888 0 : AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
889 0 : memcpy(AH->formatData, ctx, sizeof(lclContext));
890 0 : ctx = (lclContext *) AH->formatData;
891 :
892 : /* sanity check, shouldn't happen */
893 0 : if (ctx->cs != NULL)
894 0 : pg_fatal("compressor active");
895 :
896 : /*
897 : * We intentionally do not clone TOC-entry-local state: it's useful to
898 : * share knowledge about where the data blocks are across threads.
899 : * _PrintTocData has to be careful about the order of operations on that
900 : * state, though.
901 : */
902 0 : }
903 :
904 : static void
905 0 : _DeClone(ArchiveHandle *AH)
906 : {
907 0 : lclContext *ctx = (lclContext *) AH->formatData;
908 :
909 0 : free(ctx);
910 0 : }
911 :
912 : /*
913 : * This function is executed in the child of a parallel restore from a
914 : * custom-format archive and restores the actual data for one TOC entry.
915 : */
916 : static int
917 0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
918 : {
919 0 : return parallel_restore(AH, te);
920 : }
921 :
922 : /*--------------------------------------------------
923 : * END OF FORMAT CALLBACKS
924 : *--------------------------------------------------
925 : */
926 :
927 : /*
928 : * Get the current position in the archive file.
929 : *
930 : * With a non-seekable archive file, we may not be able to obtain the
931 : * file position. If so, just return -1. It's not too important in
932 : * that case because we won't be able to rewrite the TOC to fill in
933 : * data block offsets anyway.
934 : */
935 : static pgoff_t
936 270 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
937 : {
938 : pgoff_t pos;
939 :
940 270 : pos = ftello(AH->FH);
941 270 : if (pos < 0)
942 : {
943 : /* Not expected if we found we can seek. */
944 0 : if (ctx->hasSeek)
945 0 : pg_fatal("could not determine seek position in archive file: %m");
946 : }
947 270 : return pos;
948 : }
949 :
950 : /*
951 : * Read a data block header. The format changed in V1.3, so we
952 : * centralize the code here for simplicity. Returns *type = EOF
953 : * if at EOF.
954 : */
955 : static void
956 208 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
957 : {
958 : int byt;
959 :
960 : /*
961 : * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
962 : * inside ReadInt rather than returning EOF. It doesn't seem worth
963 : * jumping through hoops to deal with that case better, because no such
964 : * files are likely to exist in the wild: only some 7.1 development
965 : * versions of pg_dump ever generated such files.
966 : */
967 208 : if (AH->version < K_VERS_1_3)
968 0 : *type = BLK_DATA;
969 : else
970 : {
971 208 : byt = getc(AH->FH);
972 208 : *type = byt;
973 208 : if (byt == EOF)
974 : {
975 0 : *id = 0; /* don't return an uninitialized value */
976 0 : return;
977 : }
978 : }
979 :
980 208 : *id = ReadInt(AH);
981 : }
982 :
983 : /*
984 : * Callback function for writeData. Writes one block of (compressed)
985 : * data to the archive.
986 : */
987 : static void
988 418 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
989 : {
990 : /* never write 0-byte blocks (this should not happen) */
991 418 : if (len > 0)
992 : {
993 290 : WriteInt(AH, len);
994 290 : _WriteBuf(AH, buf, len);
995 : }
996 418 : }
997 :
998 : /*
999 : * Callback function for readData. To keep things simple, we
1000 : * always read one compressed block at a time.
1001 : */
1002 : static size_t
1003 484 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1004 : {
1005 : size_t blkLen;
1006 :
1007 : /* Read length */
1008 484 : blkLen = ReadInt(AH);
1009 484 : if (blkLen == 0)
1010 208 : return 0;
1011 :
1012 : /* If the caller's buffer is not large enough, allocate a bigger one */
1013 276 : if (blkLen > *buflen)
1014 : {
1015 2 : free(*buf);
1016 2 : *buf = (char *) pg_malloc(blkLen);
1017 2 : *buflen = blkLen;
1018 : }
1019 :
1020 : /* exits app on read errors */
1021 276 : _ReadBuf(AH, *buf, blkLen);
1022 :
1023 276 : return blkLen;
1024 : }
|