Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_custom.c
4 : *
5 : * Implements the custom output format.
6 : *
7 : * The comments with the routines in this code are a good place to
8 : * understand how to write a new format.
9 : *
10 : * See the headers to pg_restore for more details.
11 : *
12 : * Copyright (c) 2000, Philip Warner
13 : * Rights are granted to use this software in any way so long
14 : * as this notice is not removed.
15 : *
16 : * The author is not responsible for loss or damages that may
17 : * and any liability will be limited to the time taken to fix any
18 : * related bug.
19 : *
20 : *
21 : * IDENTIFICATION
22 : * src/bin/pg_dump/pg_backup_custom.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres_fe.h"
27 :
28 : #include "common/file_utils.h"
29 : #include "compress_io.h"
30 : #include "pg_backup_utils.h"
31 :
32 : /*--------
33 : * Routines in the format interface
34 : *--------
35 : */
36 :
37 : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
38 : static void _StartData(ArchiveHandle *AH, TocEntry *te);
39 : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
40 : static void _EndData(ArchiveHandle *AH, TocEntry *te);
41 : static int _WriteByte(ArchiveHandle *AH, const int i);
42 : static int _ReadByte(ArchiveHandle *AH);
43 : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
44 : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
45 : static void _CloseArchive(ArchiveHandle *AH);
46 : static void _ReopenArchive(ArchiveHandle *AH);
47 : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
48 : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
49 : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
50 : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
51 :
52 : static void _PrintData(ArchiveHandle *AH);
53 : static void _skipData(ArchiveHandle *AH);
54 : static void _skipLOs(ArchiveHandle *AH);
55 :
56 : static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
57 : static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
58 : static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 : static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
60 : static void _LoadLOs(ArchiveHandle *AH, bool drop);
61 :
62 : static void _PrepParallelRestore(ArchiveHandle *AH);
63 : static void _Clone(ArchiveHandle *AH);
64 : static void _DeClone(ArchiveHandle *AH);
65 :
66 : static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
67 :
68 : typedef struct
69 : {
70 : CompressorState *cs;
71 : int hasSeek;
72 : /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
73 : pgoff_t lastFilePos; /* position after last data block we've read */
74 : } lclContext;
75 :
76 : typedef struct
77 : {
78 : int dataState;
79 : pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
80 : } lclTocEntry;
81 :
82 :
83 : /*------
84 : * Static declarations
85 : *------
86 : */
87 : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
88 : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
89 :
90 : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
91 : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
92 :
93 :
94 : /*
95 : * Init routine required by ALL formats. This is a global routine
96 : * and should be declared in pg_backup_archiver.h
97 : *
98 : * It's task is to create any extra archive context (using AH->formatData),
99 : * and to initialize the supported function pointers.
100 : *
101 : * It should also prepare whatever its input source is for reading/writing,
102 : * and in the case of a read mode connection, it should load the Header & TOC.
103 : */
104 : void
105 178 : InitArchiveFmt_Custom(ArchiveHandle *AH)
106 : {
107 : lclContext *ctx;
108 :
109 : /* Assuming static functions, this can be copied for each format. */
110 178 : AH->ArchiveEntryPtr = _ArchiveEntry;
111 178 : AH->StartDataPtr = _StartData;
112 178 : AH->WriteDataPtr = _WriteData;
113 178 : AH->EndDataPtr = _EndData;
114 178 : AH->WriteBytePtr = _WriteByte;
115 178 : AH->ReadBytePtr = _ReadByte;
116 178 : AH->WriteBufPtr = _WriteBuf;
117 178 : AH->ReadBufPtr = _ReadBuf;
118 178 : AH->ClosePtr = _CloseArchive;
119 178 : AH->ReopenPtr = _ReopenArchive;
120 178 : AH->PrintTocDataPtr = _PrintTocData;
121 178 : AH->ReadExtraTocPtr = _ReadExtraToc;
122 178 : AH->WriteExtraTocPtr = _WriteExtraToc;
123 178 : AH->PrintExtraTocPtr = _PrintExtraToc;
124 :
125 178 : AH->StartLOsPtr = _StartLOs;
126 178 : AH->StartLOPtr = _StartLO;
127 178 : AH->EndLOPtr = _EndLO;
128 178 : AH->EndLOsPtr = _EndLOs;
129 :
130 178 : AH->PrepParallelRestorePtr = _PrepParallelRestore;
131 178 : AH->ClonePtr = _Clone;
132 178 : AH->DeClonePtr = _DeClone;
133 :
134 : /* no parallel dump in the custom archive, only parallel restore */
135 178 : AH->WorkerJobDumpPtr = NULL;
136 178 : AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
137 :
138 : /* Set up a private area. */
139 178 : ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
140 178 : AH->formatData = ctx;
141 :
142 : /*
143 : * Now open the file
144 : */
145 178 : if (AH->mode == archModeWrite)
146 : {
147 88 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
148 : {
149 88 : AH->FH = fopen(AH->fSpec, PG_BINARY_W);
150 88 : if (!AH->FH)
151 0 : pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
152 : }
153 : else
154 : {
155 0 : AH->FH = stdout;
156 0 : if (!AH->FH)
157 0 : pg_fatal("could not open output file: %m");
158 : }
159 :
160 88 : ctx->hasSeek = checkSeek(AH->FH);
161 : }
162 : else
163 : {
164 90 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
165 : {
166 90 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
167 90 : if (!AH->FH)
168 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
169 : }
170 : else
171 : {
172 0 : AH->FH = stdin;
173 0 : if (!AH->FH)
174 0 : pg_fatal("could not open input file: %m");
175 : }
176 :
177 90 : ctx->hasSeek = checkSeek(AH->FH);
178 :
179 90 : ReadHead(AH);
180 90 : ReadToc(AH);
181 :
182 : /*
183 : * Remember location of first data block (i.e., the point after TOC)
184 : * in case we have to search for desired data blocks.
185 : */
186 90 : ctx->lastFilePos = _getFilePos(AH, ctx);
187 : }
188 178 : }
189 :
190 : /*
191 : * Called by the Archiver when the dumper creates a new TOC entry.
192 : *
193 : * Optional.
194 : *
195 : * Set up extract format-related TOC data.
196 : */
197 : static void
198 10352 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
199 : {
200 : lclTocEntry *ctx;
201 :
202 10352 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
203 10352 : if (te->dataDumper)
204 442 : ctx->dataState = K_OFFSET_POS_NOT_SET;
205 : else
206 9910 : ctx->dataState = K_OFFSET_NO_DATA;
207 :
208 10352 : te->formatData = ctx;
209 10352 : }
210 :
211 : /*
212 : * Called by the Archiver to save any extra format-related TOC entry
213 : * data.
214 : *
215 : * Optional.
216 : *
217 : * Use the Archiver routines to write data - they are non-endian, and
218 : * maintain other important file information.
219 : */
220 : static void
221 12826 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
222 : {
223 12826 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
224 :
225 12826 : WriteOffset(AH, ctx->dataPos, ctx->dataState);
226 12826 : }
227 :
228 : /*
229 : * Called by the Archiver to read any extra format-related TOC data.
230 : *
231 : * Optional.
232 : *
233 : * Needs to match the order defined in _WriteExtraToc, and should also
234 : * use the Archiver input routines.
235 : */
236 : static void
237 11176 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
238 : {
239 11176 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
240 :
241 11176 : if (ctx == NULL)
242 : {
243 11176 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
244 11176 : te->formatData = ctx;
245 : }
246 :
247 11176 : ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
248 :
249 : /*
250 : * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
251 : * dump it at all.
252 : */
253 11176 : if (AH->version < K_VERS_1_7)
254 0 : ReadInt(AH);
255 11176 : }
256 :
257 : /*
258 : * Called by the Archiver when restoring an archive to output a comment
259 : * that includes useful information about the TOC entry.
260 : *
261 : * Optional.
262 : */
263 : static void
264 3022 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
265 : {
266 3022 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
267 :
268 3022 : if (AH->public.verbose)
269 658 : ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
270 658 : (int64) ctx->dataPos);
271 3022 : }
272 :
273 : /*
274 : * Called by the archiver when saving TABLE DATA (not schema). This routine
275 : * should save whatever format-specific information is needed to read
276 : * the archive back.
277 : *
278 : * It is called just prior to the dumper's 'DataDumper' routine being called.
279 : *
280 : * Optional, but strongly recommended.
281 : *
282 : */
283 : static void
284 398 : _StartData(ArchiveHandle *AH, TocEntry *te)
285 : {
286 398 : lclContext *ctx = (lclContext *) AH->formatData;
287 398 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
288 :
289 398 : tctx->dataPos = _getFilePos(AH, ctx);
290 398 : if (tctx->dataPos >= 0)
291 398 : tctx->dataState = K_OFFSET_POS_SET;
292 :
293 398 : _WriteByte(AH, BLK_DATA); /* Block type */
294 398 : WriteInt(AH, te->dumpId); /* For sanity check */
295 :
296 398 : ctx->cs = AllocateCompressor(AH->compression_spec,
297 : NULL,
298 : _CustomWriteFunc);
299 398 : }
300 :
301 : /*
302 : * Called by archiver when dumper calls WriteData. This routine is
303 : * called for both LO and table data; it is the responsibility of
304 : * the format to manage each kind of data using StartLO/StartData.
305 : *
306 : * It should only be called from within a DataDumper routine.
307 : *
308 : * Mandatory.
309 : */
310 : static void
311 1266 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
312 : {
313 1266 : lclContext *ctx = (lclContext *) AH->formatData;
314 1266 : CompressorState *cs = ctx->cs;
315 :
316 1266 : if (dLen > 0)
317 : /* writeData() internally throws write errors */
318 1248 : cs->writeData(AH, cs, data, dLen);
319 1266 : }
320 :
321 : /*
322 : * Called by the archiver when a dumper's 'DataDumper' routine has
323 : * finished.
324 : *
325 : * Mandatory.
326 : */
327 : static void
328 398 : _EndData(ArchiveHandle *AH, TocEntry *te)
329 : {
330 398 : lclContext *ctx = (lclContext *) AH->formatData;
331 :
332 398 : EndCompressor(AH, ctx->cs);
333 398 : ctx->cs = NULL;
334 :
335 : /* Send the end marker */
336 398 : WriteInt(AH, 0);
337 398 : }
338 :
339 : /*
340 : * Called by the archiver when starting to save BLOB DATA (not schema).
341 : * This routine should save whatever format-specific information is needed
342 : * to read the LOs back into memory.
343 : *
344 : * It is called just prior to the dumper's DataDumper routine.
345 : *
346 : * Optional, but strongly recommended.
347 : */
348 : static void
349 18 : _StartLOs(ArchiveHandle *AH, TocEntry *te)
350 : {
351 18 : lclContext *ctx = (lclContext *) AH->formatData;
352 18 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
353 :
354 18 : tctx->dataPos = _getFilePos(AH, ctx);
355 18 : if (tctx->dataPos >= 0)
356 18 : tctx->dataState = K_OFFSET_POS_SET;
357 :
358 18 : _WriteByte(AH, BLK_BLOBS); /* Block type */
359 18 : WriteInt(AH, te->dumpId); /* For sanity check */
360 18 : }
361 :
362 : /*
363 : * Called by the archiver when the dumper calls StartLO.
364 : *
365 : * Mandatory.
366 : *
367 : * Must save the passed OID for retrieval at restore-time.
368 : */
369 : static void
370 18 : _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
371 : {
372 18 : lclContext *ctx = (lclContext *) AH->formatData;
373 :
374 18 : if (oid == 0)
375 0 : pg_fatal("invalid OID for large object");
376 :
377 18 : WriteInt(AH, oid);
378 :
379 18 : ctx->cs = AllocateCompressor(AH->compression_spec,
380 : NULL,
381 : _CustomWriteFunc);
382 18 : }
383 :
384 : /*
385 : * Called by the archiver when the dumper calls EndLO.
386 : *
387 : * Optional.
388 : */
389 : static void
390 18 : _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
391 : {
392 18 : lclContext *ctx = (lclContext *) AH->formatData;
393 :
394 18 : EndCompressor(AH, ctx->cs);
395 : /* Send the end marker */
396 18 : WriteInt(AH, 0);
397 18 : }
398 :
399 : /*
400 : * Called by the archiver when finishing saving BLOB DATA.
401 : *
402 : * Optional.
403 : */
404 : static void
405 18 : _EndLOs(ArchiveHandle *AH, TocEntry *te)
406 : {
407 : /* Write out a fake zero OID to mark end-of-LOs. */
408 18 : WriteInt(AH, 0);
409 18 : }
410 :
411 : /*
412 : * Print data for a given TOC entry
413 : */
414 : static void
415 378 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
416 : {
417 378 : lclContext *ctx = (lclContext *) AH->formatData;
418 378 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
419 : int blkType;
420 : int id;
421 :
422 378 : if (tctx->dataState == K_OFFSET_NO_DATA)
423 0 : return;
424 :
425 378 : if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
426 : {
427 : /*
428 : * We cannot seek directly to the desired block. Instead, skip over
429 : * block headers until we find the one we want. Remember the
430 : * positions of skipped-over blocks, so that if we later decide we
431 : * need to read one, we'll be able to seek to it.
432 : *
433 : * When our input file is seekable, we can do the search starting from
434 : * the point after the last data block we scanned in previous
435 : * iterations of this function.
436 : */
437 116 : if (ctx->hasSeek)
438 : {
439 116 : if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
440 0 : pg_fatal("error during file seek: %m");
441 : }
442 :
443 : for (;;)
444 0 : {
445 116 : pgoff_t thisBlkPos = _getFilePos(AH, ctx);
446 :
447 116 : _readBlockHeader(AH, &blkType, &id);
448 :
449 116 : if (blkType == EOF || id == te->dumpId)
450 : break;
451 :
452 : /* Remember the block position, if we got one */
453 0 : if (thisBlkPos >= 0)
454 : {
455 0 : TocEntry *otherte = getTocEntryByDumpId(AH, id);
456 :
457 0 : if (otherte && otherte->formatData)
458 : {
459 0 : lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
460 :
461 : /*
462 : * Note: on Windows, multiple threads might access/update
463 : * the same lclTocEntry concurrently, but that should be
464 : * safe as long as we update dataPos before dataState.
465 : * Ideally, we'd use pg_write_barrier() to enforce that,
466 : * but the needed infrastructure doesn't exist in frontend
467 : * code. But Windows only runs on machines with strong
468 : * store ordering, so it should be okay for now.
469 : */
470 0 : if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
471 : {
472 0 : othertctx->dataPos = thisBlkPos;
473 0 : othertctx->dataState = K_OFFSET_POS_SET;
474 : }
475 0 : else if (othertctx->dataPos != thisBlkPos ||
476 0 : othertctx->dataState != K_OFFSET_POS_SET)
477 : {
478 : /* sanity check */
479 0 : pg_log_warning("data block %d has wrong seek position",
480 : id);
481 : }
482 : }
483 : }
484 :
485 0 : switch (blkType)
486 : {
487 0 : case BLK_DATA:
488 0 : _skipData(AH);
489 0 : break;
490 :
491 0 : case BLK_BLOBS:
492 0 : _skipLOs(AH);
493 0 : break;
494 :
495 0 : default: /* Always have a default */
496 0 : pg_fatal("unrecognized data block type (%d) while searching archive",
497 : blkType);
498 : break;
499 : }
500 : }
501 : }
502 : else
503 : {
504 : /* We can just seek to the place we need to be. */
505 262 : if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
506 0 : pg_fatal("error during file seek: %m");
507 :
508 262 : _readBlockHeader(AH, &blkType, &id);
509 : }
510 :
511 : /*
512 : * If we reached EOF without finding the block we want, then either it
513 : * doesn't exist, or it does but we lack the ability to seek back to it.
514 : */
515 378 : if (blkType == EOF)
516 : {
517 0 : if (!ctx->hasSeek)
518 0 : pg_fatal("could not find block ID %d in archive -- "
519 : "possibly due to out-of-order restore request, "
520 : "which cannot be handled due to non-seekable input file",
521 : te->dumpId);
522 : else
523 0 : pg_fatal("could not find block ID %d in archive -- "
524 : "possibly corrupt archive",
525 : te->dumpId);
526 : }
527 :
528 : /* Are we sane? */
529 378 : if (id != te->dumpId)
530 0 : pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
531 : id, te->dumpId);
532 :
533 378 : switch (blkType)
534 : {
535 360 : case BLK_DATA:
536 360 : _PrintData(AH);
537 360 : break;
538 :
539 18 : case BLK_BLOBS:
540 18 : _LoadLOs(AH, AH->public.ropt->dropSchema);
541 18 : break;
542 :
543 0 : default: /* Always have a default */
544 0 : pg_fatal("unrecognized data block type %d while restoring archive",
545 : blkType);
546 : break;
547 : }
548 :
549 : /*
550 : * If our input file is seekable but lacks data offsets, update our
551 : * knowledge of where to start future searches from. (Note that we did
552 : * not update the current TE's dataState/dataPos. We could have, but
553 : * there is no point since it will not be visited again.)
554 : */
555 378 : if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
556 : {
557 116 : pgoff_t curPos = _getFilePos(AH, ctx);
558 :
559 116 : if (curPos > ctx->lastFilePos)
560 116 : ctx->lastFilePos = curPos;
561 : }
562 : }
563 :
564 : /*
565 : * Print data from current file position.
566 : */
567 : static void
568 378 : _PrintData(ArchiveHandle *AH)
569 : {
570 : CompressorState *cs;
571 :
572 378 : cs = AllocateCompressor(AH->compression_spec,
573 : _CustomReadFunc, NULL);
574 378 : cs->readData(AH, cs);
575 378 : EndCompressor(AH, cs);
576 378 : }
577 :
578 : static void
579 18 : _LoadLOs(ArchiveHandle *AH, bool drop)
580 : {
581 : Oid oid;
582 :
583 18 : StartRestoreLOs(AH);
584 :
585 18 : oid = ReadInt(AH);
586 36 : while (oid != 0)
587 : {
588 18 : StartRestoreLO(AH, oid, drop);
589 18 : _PrintData(AH);
590 18 : EndRestoreLO(AH, oid);
591 18 : oid = ReadInt(AH);
592 : }
593 :
594 18 : EndRestoreLOs(AH);
595 18 : }
596 :
597 : /*
598 : * Skip the LOs from the current file position.
599 : * LOs are written sequentially as data blocks (see below).
600 : * Each LO is preceded by its original OID.
601 : * A zero OID indicates the end of the LOs.
602 : */
603 : static void
604 0 : _skipLOs(ArchiveHandle *AH)
605 : {
606 : Oid oid;
607 :
608 0 : oid = ReadInt(AH);
609 0 : while (oid != 0)
610 : {
611 0 : _skipData(AH);
612 0 : oid = ReadInt(AH);
613 : }
614 0 : }
615 :
616 : /*
617 : * Skip data from current file position.
618 : * Data blocks are formatted as an integer length, followed by data.
619 : * A zero length indicates the end of the block.
620 : */
621 : static void
622 0 : _skipData(ArchiveHandle *AH)
623 : {
624 0 : lclContext *ctx = (lclContext *) AH->formatData;
625 : size_t blkLen;
626 0 : char *buf = NULL;
627 0 : size_t buflen = 0;
628 :
629 0 : blkLen = ReadInt(AH);
630 0 : while (blkLen != 0)
631 : {
632 : /*
633 : * Seeks of less than stdio's buffer size are less efficient than just
634 : * reading the data, at least on common platforms. We don't know the
635 : * buffer size for sure, but 4kB is the usual value. (While pg_dump
636 : * currently tries to avoid producing such short data blocks, older
637 : * dump files often contain them.)
638 : */
639 0 : if (ctx->hasSeek && blkLen >= 4 * 1024)
640 : {
641 0 : if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
642 0 : pg_fatal("error during file seek: %m");
643 : }
644 : else
645 : {
646 0 : if (blkLen > buflen)
647 : {
648 0 : free(buf);
649 0 : buflen = Max(blkLen, 4 * 1024);
650 0 : buf = (char *) pg_malloc(buflen);
651 : }
652 0 : if (fread(buf, 1, blkLen, AH->FH) != blkLen)
653 : {
654 0 : if (feof(AH->FH))
655 0 : pg_fatal("could not read from input file: end of file");
656 : else
657 0 : pg_fatal("could not read from input file: %m");
658 : }
659 : }
660 :
661 0 : blkLen = ReadInt(AH);
662 : }
663 :
664 0 : free(buf);
665 0 : }
666 :
667 : /*
668 : * Write a byte of data to the archive.
669 : *
670 : * Mandatory.
671 : *
672 : * Called by the archiver to do integer & byte output to the archive.
673 : */
674 : static int
675 1314396 : _WriteByte(ArchiveHandle *AH, const int i)
676 : {
677 1314396 : if (fputc(i, AH->FH) == EOF)
678 0 : WRITE_ERROR_EXIT;
679 :
680 1314396 : return 1;
681 : }
682 :
683 : /*
684 : * Read a byte of data from the archive.
685 : *
686 : * Mandatory
687 : *
688 : * Called by the archiver to read bytes & integers from the archive.
689 : * EOF should be treated as a fatal error.
690 : */
691 : static int
692 1147114 : _ReadByte(ArchiveHandle *AH)
693 : {
694 : int res;
695 :
696 1147114 : res = getc(AH->FH);
697 1147114 : if (res == EOF)
698 0 : READ_ERROR_EXIT(AH->FH);
699 1147114 : return res;
700 : }
701 :
702 : /*
703 : * Write a buffer of data to the archive.
704 : *
705 : * Mandatory.
706 : *
707 : * Called by the archiver to write a block of bytes to the archive.
708 : */
709 : static void
710 128680 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
711 : {
712 128680 : if (fwrite(buf, 1, len, AH->FH) != len)
713 0 : WRITE_ERROR_EXIT;
714 128680 : }
715 :
716 : /*
717 : * Read a block of bytes from the archive.
718 : *
719 : * Mandatory.
720 : *
721 : * Called by the archiver to read a block of bytes from the archive
722 : */
723 : static void
724 112840 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
725 : {
726 112840 : if (fread(buf, 1, len, AH->FH) != len)
727 0 : READ_ERROR_EXIT(AH->FH);
728 112840 : }
729 :
730 : /*
731 : * Close the archive.
732 : *
733 : * Mandatory.
734 : *
735 : * When writing the archive, this is the routine that actually starts
736 : * the process of saving it to files. No data should be written prior
737 : * to this point, since the user could sort the TOC after creating it.
738 : *
739 : * If an archive is to be written, this routine must call:
740 : * WriteHead to save the archive header
741 : * WriteToc to save the TOC entries
742 : * WriteDataChunks to save all data & LOs.
743 : *
744 : */
745 : static void
746 178 : _CloseArchive(ArchiveHandle *AH)
747 : {
748 178 : lclContext *ctx = (lclContext *) AH->formatData;
749 : pgoff_t tpos;
750 :
751 178 : if (AH->mode == archModeWrite)
752 : {
753 88 : WriteHead(AH);
754 : /* Remember TOC's seek position for use below */
755 88 : tpos = ftello(AH->FH);
756 88 : if (tpos < 0 && ctx->hasSeek)
757 0 : pg_fatal("could not determine seek position in archive file: %m");
758 88 : WriteToc(AH);
759 88 : WriteDataChunks(AH, NULL);
760 :
761 : /*
762 : * If possible, re-write the TOC in order to update the data offset
763 : * information. This is not essential, as pg_restore can cope in most
764 : * cases without it; but it can make pg_restore significantly faster
765 : * in some situations (especially parallel restore). We can skip this
766 : * step if we're not dumping any data; there are no offsets to update
767 : * in that case.
768 : */
769 106 : if (ctx->hasSeek && AH->public.dopt->dumpData &&
770 18 : fseeko(AH->FH, tpos, SEEK_SET) == 0)
771 18 : WriteToc(AH);
772 : }
773 :
774 178 : if (fclose(AH->FH) != 0)
775 0 : pg_fatal("could not close archive file: %m");
776 :
777 : /* Sync the output file if one is defined */
778 178 : if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
779 6 : (void) fsync_fname(AH->fSpec, false);
780 :
781 178 : AH->FH = NULL;
782 178 : }
783 :
784 : /*
785 : * Reopen the archive's file handle.
786 : *
787 : * We close the original file handle, except on Windows. (The difference
788 : * is because on Windows, this is used within a multithreading context,
789 : * and we don't want a thread closing the parent file handle.)
790 : */
791 : static void
792 0 : _ReopenArchive(ArchiveHandle *AH)
793 : {
794 0 : lclContext *ctx = (lclContext *) AH->formatData;
795 : pgoff_t tpos;
796 :
797 0 : if (AH->mode == archModeWrite)
798 0 : pg_fatal("can only reopen input archives");
799 :
800 : /*
801 : * These two cases are user-facing errors since they represent unsupported
802 : * (but not invalid) use-cases. Word the error messages appropriately.
803 : */
804 0 : if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
805 0 : pg_fatal("parallel restore from standard input is not supported");
806 0 : if (!ctx->hasSeek)
807 0 : pg_fatal("parallel restore from non-seekable file is not supported");
808 :
809 0 : tpos = ftello(AH->FH);
810 0 : if (tpos < 0)
811 0 : pg_fatal("could not determine seek position in archive file: %m");
812 :
813 : #ifndef WIN32
814 0 : if (fclose(AH->FH) != 0)
815 0 : pg_fatal("could not close archive file: %m");
816 : #endif
817 :
818 0 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
819 0 : if (!AH->FH)
820 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
821 :
822 0 : if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
823 0 : pg_fatal("could not set seek position in archive file: %m");
824 0 : }
825 :
826 : /*
827 : * Prepare for parallel restore.
828 : *
829 : * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
830 : * TOC entries' dataLength fields with appropriate values to guide the
831 : * ordering of restore jobs. The source of said data is format-dependent,
832 : * as is the exact meaning of the values.
833 : *
834 : * A format module might also choose to do other setup here.
835 : */
836 : static void
837 0 : _PrepParallelRestore(ArchiveHandle *AH)
838 : {
839 0 : lclContext *ctx = (lclContext *) AH->formatData;
840 0 : TocEntry *prev_te = NULL;
841 0 : lclTocEntry *prev_tctx = NULL;
842 : TocEntry *te;
843 :
844 : /*
845 : * Knowing that the data items were dumped out in TOC order, we can
846 : * reconstruct the length of each item as the delta to the start offset of
847 : * the next data item.
848 : */
849 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
850 : {
851 0 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
852 :
853 : /*
854 : * Ignore entries without a known data offset; if we were unable to
855 : * seek to rewrite the TOC when creating the archive, this'll be all
856 : * of them, and we'll end up with no size estimates.
857 : */
858 0 : if (tctx->dataState != K_OFFSET_POS_SET)
859 0 : continue;
860 :
861 : /* Compute previous data item's length */
862 0 : if (prev_te)
863 : {
864 0 : if (tctx->dataPos > prev_tctx->dataPos)
865 0 : prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
866 : }
867 :
868 0 : prev_te = te;
869 0 : prev_tctx = tctx;
870 : }
871 :
872 : /* If OK to seek, we can determine the length of the last item */
873 0 : if (prev_te && ctx->hasSeek)
874 : {
875 : pgoff_t endpos;
876 :
877 0 : if (fseeko(AH->FH, 0, SEEK_END) != 0)
878 0 : pg_fatal("error during file seek: %m");
879 0 : endpos = ftello(AH->FH);
880 0 : if (endpos > prev_tctx->dataPos)
881 0 : prev_te->dataLength = endpos - prev_tctx->dataPos;
882 : }
883 0 : }
884 :
885 : /*
886 : * Clone format-specific fields during parallel restoration.
887 : */
888 : static void
889 0 : _Clone(ArchiveHandle *AH)
890 : {
891 0 : lclContext *ctx = (lclContext *) AH->formatData;
892 :
893 : /*
894 : * Each thread must have private lclContext working state.
895 : */
896 0 : AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
897 0 : memcpy(AH->formatData, ctx, sizeof(lclContext));
898 0 : ctx = (lclContext *) AH->formatData;
899 :
900 : /* sanity check, shouldn't happen */
901 0 : if (ctx->cs != NULL)
902 0 : pg_fatal("compressor active");
903 :
904 : /*
905 : * We intentionally do not clone TOC-entry-local state: it's useful to
906 : * share knowledge about where the data blocks are across threads.
907 : * _PrintTocData has to be careful about the order of operations on that
908 : * state, though.
909 : */
910 0 : }
911 :
912 : static void
913 0 : _DeClone(ArchiveHandle *AH)
914 : {
915 0 : lclContext *ctx = (lclContext *) AH->formatData;
916 :
917 0 : free(ctx);
918 0 : }
919 :
920 : /*
921 : * This function is executed in the child of a parallel restore from a
922 : * custom-format archive and restores the actual data for one TOC entry.
923 : */
924 : static int
925 0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
926 : {
927 0 : return parallel_restore(AH, te);
928 : }
929 :
930 : /*--------------------------------------------------
931 : * END OF FORMAT CALLBACKS
932 : *--------------------------------------------------
933 : */
934 :
935 : /*
936 : * Get the current position in the archive file.
937 : *
938 : * With a non-seekable archive file, we may not be able to obtain the
939 : * file position. If so, just return -1. It's not too important in
940 : * that case because we won't be able to rewrite the TOC to fill in
941 : * data block offsets anyway.
942 : */
943 : static pgoff_t
944 738 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
945 : {
946 : pgoff_t pos;
947 :
948 738 : pos = ftello(AH->FH);
949 738 : if (pos < 0)
950 : {
951 : /* Not expected if we found we can seek. */
952 0 : if (ctx->hasSeek)
953 0 : pg_fatal("could not determine seek position in archive file: %m");
954 : }
955 738 : return pos;
956 : }
957 :
958 : /*
959 : * Read a data block header. The format changed in V1.3, so we
960 : * centralize the code here for simplicity. Returns *type = EOF
961 : * if at EOF.
962 : */
963 : static void
964 378 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
965 : {
966 : int byt;
967 :
968 : /*
969 : * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
970 : * inside ReadInt rather than returning EOF. It doesn't seem worth
971 : * jumping through hoops to deal with that case better, because no such
972 : * files are likely to exist in the wild: only some 7.1 development
973 : * versions of pg_dump ever generated such files.
974 : */
975 378 : if (AH->version < K_VERS_1_3)
976 0 : *type = BLK_DATA;
977 : else
978 : {
979 378 : byt = getc(AH->FH);
980 378 : *type = byt;
981 378 : if (byt == EOF)
982 : {
983 0 : *id = 0; /* don't return an uninitialized value */
984 0 : return;
985 : }
986 : }
987 :
988 378 : *id = ReadInt(AH);
989 : }
990 :
991 : /*
992 : * Callback function for writeData. Writes one block of (compressed)
993 : * data to the archive.
994 : */
995 : static void
996 422 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
997 : {
998 : /* never write 0-byte blocks (this should not happen) */
999 422 : if (len > 0)
1000 : {
1001 422 : WriteInt(AH, len);
1002 422 : _WriteBuf(AH, buf, len);
1003 : }
1004 422 : }
1005 :
1006 : /*
1007 : * Callback function for readData. To keep things simple, we
1008 : * always read one compressed block at a time.
1009 : */
1010 : static size_t
1011 762 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1012 : {
1013 : size_t blkLen;
1014 :
1015 : /* Read length */
1016 762 : blkLen = ReadInt(AH);
1017 762 : if (blkLen == 0)
1018 378 : return 0;
1019 :
1020 : /* If the caller's buffer is not large enough, allocate a bigger one */
1021 384 : if (blkLen > *buflen)
1022 : {
1023 2 : free(*buf);
1024 2 : *buf = (char *) pg_malloc(blkLen);
1025 2 : *buflen = blkLen;
1026 : }
1027 :
1028 : /* exits app on read errors */
1029 384 : _ReadBuf(AH, *buf, blkLen);
1030 :
1031 384 : return blkLen;
1032 : }
|