LCOV - code coverage report
Current view: top level - src/bin/pg_dump - pg_backup_custom.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 202 328 61.6 %
Date: 2020-05-31 23:07:13 Functions: 24 31 77.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_backup_custom.c
       4             :  *
       5             :  *  Implements the custom output format.
       6             :  *
       7             :  *  The comments with the routines in this code are a good place to
       8             :  *  understand how to write a new format.
       9             :  *
      10             :  *  See the headers to pg_restore for more details.
      11             :  *
      12             :  * Copyright (c) 2000, Philip Warner
      13             :  *      Rights are granted to use this software in any way so long
      14             :  *      as this notice is not removed.
      15             :  *
      16             :  *  The author is not responsible for loss or damages that may
      17             :  *  and any liability will be limited to the time taken to fix any
      18             :  *  related bug.
      19             :  *
      20             :  *
      21             :  * IDENTIFICATION
      22             :  *      src/bin/pg_dump/pg_backup_custom.c
      23             :  *
      24             :  *-------------------------------------------------------------------------
      25             :  */
      26             : #include "postgres_fe.h"
      27             : 
      28             : #include "common/file_utils.h"
      29             : #include "compress_io.h"
      30             : #include "parallel.h"
      31             : #include "pg_backup_utils.h"
      32             : 
      33             : /*--------
      34             :  * Routines in the format interface
      35             :  *--------
      36             :  */
      37             : 
      38             : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
      39             : static void _StartData(ArchiveHandle *AH, TocEntry *te);
      40             : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
      41             : static void _EndData(ArchiveHandle *AH, TocEntry *te);
      42             : static int  _WriteByte(ArchiveHandle *AH, const int i);
      43             : static int  _ReadByte(ArchiveHandle *);
      44             : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
      45             : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
      46             : static void _CloseArchive(ArchiveHandle *AH);
      47             : static void _ReopenArchive(ArchiveHandle *AH);
      48             : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
      49             : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
      50             : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
      51             : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
      52             : 
      53             : static void _PrintData(ArchiveHandle *AH);
      54             : static void _skipData(ArchiveHandle *AH);
      55             : static void _skipBlobs(ArchiveHandle *AH);
      56             : 
      57             : static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
      58             : static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
      59             : static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
      60             : static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
      61             : static void _LoadBlobs(ArchiveHandle *AH, bool drop);
      62             : 
      63             : static void _PrepParallelRestore(ArchiveHandle *AH);
      64             : static void _Clone(ArchiveHandle *AH);
      65             : static void _DeClone(ArchiveHandle *AH);
      66             : 
      67             : static int  _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
      68             : 
      69             : typedef struct
      70             : {
      71             :     CompressorState *cs;
      72             :     int         hasSeek;
      73             :     pgoff_t     filePos;
      74             :     pgoff_t     dataStart;
      75             : } lclContext;
      76             : 
      77             : typedef struct
      78             : {
      79             :     int         dataState;
      80             :     pgoff_t     dataPos;
      81             : } lclTocEntry;
      82             : 
      83             : 
      84             : /*------
      85             :  * Static declarations
      86             :  *------
      87             :  */
      88             : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
      89             : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
      90             : 
      91             : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
      92             : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
      93             : 
      94             : 
      95             : /*
      96             :  *  Init routine required by ALL formats. This is a global routine
      97             :  *  and should be declared in pg_backup_archiver.h
      98             :  *
      99             :  *  It's task is to create any extra archive context (using AH->formatData),
     100             :  *  and to initialize the supported function pointers.
     101             :  *
     102             :  *  It should also prepare whatever it's input source is for reading/writing,
     103             :  *  and in the case of a read mode connection, it should load the Header & TOC.
     104             :  */
     105             : void
     106          36 : InitArchiveFmt_Custom(ArchiveHandle *AH)
     107             : {
     108             :     lclContext *ctx;
     109             : 
     110             :     /* Assuming static functions, this can be copied for each format. */
     111          36 :     AH->ArchiveEntryPtr = _ArchiveEntry;
     112          36 :     AH->StartDataPtr = _StartData;
     113          36 :     AH->WriteDataPtr = _WriteData;
     114          36 :     AH->EndDataPtr = _EndData;
     115          36 :     AH->WriteBytePtr = _WriteByte;
     116          36 :     AH->ReadBytePtr = _ReadByte;
     117          36 :     AH->WriteBufPtr = _WriteBuf;
     118          36 :     AH->ReadBufPtr = _ReadBuf;
     119          36 :     AH->ClosePtr = _CloseArchive;
     120          36 :     AH->ReopenPtr = _ReopenArchive;
     121          36 :     AH->PrintTocDataPtr = _PrintTocData;
     122          36 :     AH->ReadExtraTocPtr = _ReadExtraToc;
     123          36 :     AH->WriteExtraTocPtr = _WriteExtraToc;
     124          36 :     AH->PrintExtraTocPtr = _PrintExtraToc;
     125             : 
     126          36 :     AH->StartBlobsPtr = _StartBlobs;
     127          36 :     AH->StartBlobPtr = _StartBlob;
     128          36 :     AH->EndBlobPtr = _EndBlob;
     129          36 :     AH->EndBlobsPtr = _EndBlobs;
     130             : 
     131          36 :     AH->PrepParallelRestorePtr = _PrepParallelRestore;
     132          36 :     AH->ClonePtr = _Clone;
     133          36 :     AH->DeClonePtr = _DeClone;
     134             : 
     135             :     /* no parallel dump in the custom archive, only parallel restore */
     136          36 :     AH->WorkerJobDumpPtr = NULL;
     137          36 :     AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
     138             : 
     139             :     /* Set up a private area. */
     140          36 :     ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
     141          36 :     AH->formatData = (void *) ctx;
     142             : 
     143             :     /* Initialize LO buffering */
     144          36 :     AH->lo_buf_size = LOBBUFSIZE;
     145          36 :     AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
     146             : 
     147          36 :     ctx->filePos = 0;
     148             : 
     149             :     /*
     150             :      * Now open the file
     151             :      */
     152          36 :     if (AH->mode == archModeWrite)
     153             :     {
     154          18 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     155             :         {
     156          18 :             AH->FH = fopen(AH->fSpec, PG_BINARY_W);
     157          18 :             if (!AH->FH)
     158           0 :                 fatal("could not open output file \"%s\": %m", AH->fSpec);
     159             :         }
     160             :         else
     161             :         {
     162           0 :             AH->FH = stdout;
     163           0 :             if (!AH->FH)
     164           0 :                 fatal("could not open output file: %m");
     165             :         }
     166             : 
     167          18 :         ctx->hasSeek = checkSeek(AH->FH);
     168             :     }
     169             :     else
     170             :     {
     171          18 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     172             :         {
     173          18 :             AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     174          18 :             if (!AH->FH)
     175           0 :                 fatal("could not open input file \"%s\": %m", AH->fSpec);
     176             :         }
     177             :         else
     178             :         {
     179           0 :             AH->FH = stdin;
     180           0 :             if (!AH->FH)
     181           0 :                 fatal("could not open input file: %m");
     182             :         }
     183             : 
     184          18 :         ctx->hasSeek = checkSeek(AH->FH);
     185             : 
     186          18 :         ReadHead(AH);
     187          18 :         ReadToc(AH);
     188          18 :         ctx->dataStart = _getFilePos(AH, ctx);
     189             :     }
     190             : 
     191          36 : }
     192             : 
     193             : /*
     194             :  * Called by the Archiver when the dumper creates a new TOC entry.
     195             :  *
     196             :  * Optional.
     197             :  *
     198             :  * Set up extract format-related TOC data.
     199             : */
     200             : static void
     201        3858 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
     202             : {
     203             :     lclTocEntry *ctx;
     204             : 
     205        3858 :     ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     206        3858 :     if (te->dataDumper)
     207          40 :         ctx->dataState = K_OFFSET_POS_NOT_SET;
     208             :     else
     209        3818 :         ctx->dataState = K_OFFSET_NO_DATA;
     210             : 
     211        3858 :     te->formatData = (void *) ctx;
     212        3858 : }
     213             : 
     214             : /*
     215             :  * Called by the Archiver to save any extra format-related TOC entry
     216             :  * data.
     217             :  *
     218             :  * Optional.
     219             :  *
     220             :  * Use the Archiver routines to write data - they are non-endian, and
     221             :  * maintain other important file information.
     222             :  */
     223             : static void
     224        7708 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
     225             : {
     226        7708 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     227             : 
     228        7708 :     WriteOffset(AH, ctx->dataPos, ctx->dataState);
     229        7708 : }
     230             : 
     231             : /*
     232             :  * Called by the Archiver to read any extra format-related TOC data.
     233             :  *
     234             :  * Optional.
     235             :  *
     236             :  * Needs to match the order defined in _WriteExtraToc, and should also
     237             :  * use the Archiver input routines.
     238             :  */
     239             : static void
     240        3854 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
     241             : {
     242        3854 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     243             : 
     244        3854 :     if (ctx == NULL)
     245             :     {
     246        3854 :         ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     247        3854 :         te->formatData = (void *) ctx;
     248             :     }
     249             : 
     250        3854 :     ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
     251             : 
     252             :     /*
     253             :      * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
     254             :      * dump it at all.
     255             :      */
     256        3854 :     if (AH->version < K_VERS_1_7)
     257           0 :         ReadInt(AH);
     258        3854 : }
     259             : 
     260             : /*
     261             :  * Called by the Archiver when restoring an archive to output a comment
     262             :  * that includes useful information about the TOC entry.
     263             :  *
     264             :  * Optional.
     265             :  *
     266             :  */
     267             : static void
     268         762 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
     269             : {
     270         762 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     271             : 
     272         762 :     if (AH->public.verbose)
     273         348 :         ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
     274         348 :                  (int64) ctx->dataPos);
     275         762 : }
     276             : 
     277             : /*
     278             :  * Called by the archiver when saving TABLE DATA (not schema). This routine
     279             :  * should save whatever format-specific information is needed to read
     280             :  * the archive back.
     281             :  *
     282             :  * It is called just prior to the dumper's 'DataDumper' routine being called.
     283             :  *
     284             :  * Optional, but strongly recommended.
     285             :  *
     286             :  */
     287             : static void
     288          34 : _StartData(ArchiveHandle *AH, TocEntry *te)
     289             : {
     290          34 :     lclContext *ctx = (lclContext *) AH->formatData;
     291          34 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     292             : 
     293          34 :     tctx->dataPos = _getFilePos(AH, ctx);
     294          34 :     tctx->dataState = K_OFFSET_POS_SET;
     295             : 
     296          34 :     _WriteByte(AH, BLK_DATA);   /* Block type */
     297          34 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     298             : 
     299          34 :     ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
     300          34 : }
     301             : 
     302             : /*
     303             :  * Called by archiver when dumper calls WriteData. This routine is
     304             :  * called for both BLOB and TABLE data; it is the responsibility of
     305             :  * the format to manage each kind of data using StartBlob/StartData.
     306             :  *
     307             :  * It should only be called from within a DataDumper routine.
     308             :  *
     309             :  * Mandatory.
     310             :  */
     311             : static void
     312          92 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
     313             : {
     314          92 :     lclContext *ctx = (lclContext *) AH->formatData;
     315          92 :     CompressorState *cs = ctx->cs;
     316             : 
     317          92 :     if (dLen > 0)
     318             :         /* WriteDataToArchive() internally throws write errors */
     319          90 :         WriteDataToArchive(AH, cs, data, dLen);
     320          92 : }
     321             : 
     322             : /*
     323             :  * Called by the archiver when a dumper's 'DataDumper' routine has
     324             :  * finished.
     325             :  *
     326             :  * Optional.
     327             :  *
     328             :  */
     329             : static void
     330          34 : _EndData(ArchiveHandle *AH, TocEntry *te)
     331             : {
     332          34 :     lclContext *ctx = (lclContext *) AH->formatData;
     333             : 
     334          34 :     EndCompressor(AH, ctx->cs);
     335             :     /* Send the end marker */
     336          34 :     WriteInt(AH, 0);
     337          34 : }
     338             : 
     339             : /*
     340             :  * Called by the archiver when starting to save all BLOB DATA (not schema).
     341             :  * This routine should save whatever format-specific information is needed
     342             :  * to read the BLOBs back into memory.
     343             :  *
     344             :  * It is called just prior to the dumper's DataDumper routine.
     345             :  *
     346             :  * Optional, but strongly recommended.
     347             :  */
     348             : static void
     349           2 : _StartBlobs(ArchiveHandle *AH, TocEntry *te)
     350             : {
     351           2 :     lclContext *ctx = (lclContext *) AH->formatData;
     352           2 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     353             : 
     354           2 :     tctx->dataPos = _getFilePos(AH, ctx);
     355           2 :     tctx->dataState = K_OFFSET_POS_SET;
     356             : 
     357           2 :     _WriteByte(AH, BLK_BLOBS);  /* Block type */
     358           2 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     359           2 : }
     360             : 
     361             : /*
     362             :  * Called by the archiver when the dumper calls StartBlob.
     363             :  *
     364             :  * Mandatory.
     365             :  *
     366             :  * Must save the passed OID for retrieval at restore-time.
     367             :  */
     368             : static void
     369           2 : _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
     370             : {
     371           2 :     lclContext *ctx = (lclContext *) AH->formatData;
     372             : 
     373           2 :     if (oid == 0)
     374           0 :         fatal("invalid OID for large object");
     375             : 
     376           2 :     WriteInt(AH, oid);
     377             : 
     378           2 :     ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
     379           2 : }
     380             : 
     381             : /*
     382             :  * Called by the archiver when the dumper calls EndBlob.
     383             :  *
     384             :  * Optional.
     385             :  */
     386             : static void
     387           2 : _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
     388             : {
     389           2 :     lclContext *ctx = (lclContext *) AH->formatData;
     390             : 
     391           2 :     EndCompressor(AH, ctx->cs);
     392             :     /* Send the end marker */
     393           2 :     WriteInt(AH, 0);
     394           2 : }
     395             : 
     396             : /*
     397             :  * Called by the archiver when finishing saving all BLOB DATA.
     398             :  *
     399             :  * Optional.
     400             :  */
     401             : static void
     402           2 : _EndBlobs(ArchiveHandle *AH, TocEntry *te)
     403             : {
     404             :     /* Write out a fake zero OID to mark end-of-blobs. */
     405           2 :     WriteInt(AH, 0);
     406           2 : }
     407             : 
     408             : /*
     409             :  * Print data for a given TOC entry
     410             :  */
     411             : static void
     412          36 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
     413             : {
     414          36 :     lclContext *ctx = (lclContext *) AH->formatData;
     415          36 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     416             :     int         blkType;
     417             :     int         id;
     418             : 
     419          36 :     if (tctx->dataState == K_OFFSET_NO_DATA)
     420           0 :         return;
     421             : 
     422          36 :     if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
     423             :     {
     424             :         /*
     425             :          * We cannot seek directly to the desired block.  Instead, skip over
     426             :          * block headers until we find the one we want.  This could fail if we
     427             :          * are asked to restore items out-of-order.
     428             :          */
     429           0 :         _readBlockHeader(AH, &blkType, &id);
     430             : 
     431           0 :         while (blkType != EOF && id != te->dumpId)
     432             :         {
     433           0 :             switch (blkType)
     434             :             {
     435           0 :                 case BLK_DATA:
     436           0 :                     _skipData(AH);
     437           0 :                     break;
     438             : 
     439           0 :                 case BLK_BLOBS:
     440           0 :                     _skipBlobs(AH);
     441           0 :                     break;
     442             : 
     443           0 :                 default:        /* Always have a default */
     444           0 :                     fatal("unrecognized data block type (%d) while searching archive",
     445             :                           blkType);
     446             :                     break;
     447             :             }
     448           0 :             _readBlockHeader(AH, &blkType, &id);
     449             :         }
     450             :     }
     451             :     else
     452             :     {
     453             :         /* We can just seek to the place we need to be. */
     454          36 :         if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
     455           0 :             fatal("error during file seek: %m");
     456             : 
     457          36 :         _readBlockHeader(AH, &blkType, &id);
     458             :     }
     459             : 
     460             :     /* Produce suitable failure message if we fell off end of file */
     461          36 :     if (blkType == EOF)
     462             :     {
     463           0 :         if (tctx->dataState == K_OFFSET_POS_NOT_SET)
     464           0 :             fatal("could not find block ID %d in archive -- "
     465             :                   "possibly due to out-of-order restore request, "
     466             :                   "which cannot be handled due to lack of data offsets in archive",
     467             :                   te->dumpId);
     468           0 :         else if (!ctx->hasSeek)
     469           0 :             fatal("could not find block ID %d in archive -- "
     470             :                   "possibly due to out-of-order restore request, "
     471             :                   "which cannot be handled due to non-seekable input file",
     472             :                   te->dumpId);
     473             :         else                    /* huh, the dataPos led us to EOF? */
     474           0 :             fatal("could not find block ID %d in archive -- "
     475             :                   "possibly corrupt archive",
     476             :                   te->dumpId);
     477             :     }
     478             : 
     479             :     /* Are we sane? */
     480          36 :     if (id != te->dumpId)
     481           0 :         fatal("found unexpected block ID (%d) when reading data -- expected %d",
     482             :               id, te->dumpId);
     483             : 
     484          36 :     switch (blkType)
     485             :     {
     486          34 :         case BLK_DATA:
     487          34 :             _PrintData(AH);
     488          34 :             break;
     489             : 
     490           2 :         case BLK_BLOBS:
     491           2 :             _LoadBlobs(AH, AH->public.ropt->dropSchema);
     492           2 :             break;
     493             : 
     494           0 :         default:                /* Always have a default */
     495           0 :             fatal("unrecognized data block type %d while restoring archive",
     496             :                   blkType);
     497             :             break;
     498             :     }
     499             : }
     500             : 
     501             : /*
     502             :  * Print data from current file position.
     503             : */
     504             : static void
     505          36 : _PrintData(ArchiveHandle *AH)
     506             : {
     507          36 :     ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
     508          36 : }
     509             : 
     510             : static void
     511           2 : _LoadBlobs(ArchiveHandle *AH, bool drop)
     512             : {
     513             :     Oid         oid;
     514             : 
     515           2 :     StartRestoreBlobs(AH);
     516             : 
     517           2 :     oid = ReadInt(AH);
     518           4 :     while (oid != 0)
     519             :     {
     520           2 :         StartRestoreBlob(AH, oid, drop);
     521           2 :         _PrintData(AH);
     522           2 :         EndRestoreBlob(AH, oid);
     523           2 :         oid = ReadInt(AH);
     524             :     }
     525             : 
     526           2 :     EndRestoreBlobs(AH);
     527           2 : }
     528             : 
     529             : /*
     530             :  * Skip the BLOBs from the current file position.
     531             :  * BLOBS are written sequentially as data blocks (see below).
     532             :  * Each BLOB is preceded by it's original OID.
     533             :  * A zero OID indicated the end of the BLOBS
     534             :  */
     535             : static void
     536           0 : _skipBlobs(ArchiveHandle *AH)
     537             : {
     538             :     Oid         oid;
     539             : 
     540           0 :     oid = ReadInt(AH);
     541           0 :     while (oid != 0)
     542             :     {
     543           0 :         _skipData(AH);
     544           0 :         oid = ReadInt(AH);
     545             :     }
     546           0 : }
     547             : 
     548             : /*
     549             :  * Skip data from current file position.
     550             :  * Data blocks are formatted as an integer length, followed by data.
     551             :  * A zero length denoted the end of the block.
     552             : */
     553             : static void
     554           0 : _skipData(ArchiveHandle *AH)
     555             : {
     556           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     557             :     size_t      blkLen;
     558           0 :     char       *buf = NULL;
     559           0 :     int         buflen = 0;
     560             :     size_t      cnt;
     561             : 
     562           0 :     blkLen = ReadInt(AH);
     563           0 :     while (blkLen != 0)
     564             :     {
     565           0 :         if (blkLen > buflen)
     566             :         {
     567           0 :             if (buf)
     568           0 :                 free(buf);
     569           0 :             buf = (char *) pg_malloc(blkLen);
     570           0 :             buflen = blkLen;
     571             :         }
     572           0 :         if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
     573             :         {
     574           0 :             if (feof(AH->FH))
     575           0 :                 fatal("could not read from input file: end of file");
     576             :             else
     577           0 :                 fatal("could not read from input file: %m");
     578             :         }
     579             : 
     580           0 :         ctx->filePos += blkLen;
     581             : 
     582           0 :         blkLen = ReadInt(AH);
     583             :     }
     584             : 
     585           0 :     if (buf)
     586           0 :         free(buf);
     587           0 : }
     588             : 
     589             : /*
     590             :  * Write a byte of data to the archive.
     591             :  *
     592             :  * Mandatory.
     593             :  *
     594             :  * Called by the archiver to do integer & byte output to the archive.
     595             :  */
     596             : static int
     597      734846 : _WriteByte(ArchiveHandle *AH, const int i)
     598             : {
     599      734846 :     lclContext *ctx = (lclContext *) AH->formatData;
     600             :     int         res;
     601             : 
     602      734846 :     if ((res = fputc(i, AH->FH)) == EOF)
     603           0 :         WRITE_ERROR_EXIT;
     604      734846 :     ctx->filePos += 1;
     605             : 
     606      734846 :     return 1;
     607             : }
     608             : 
     609             : /*
     610             :  * Read a byte of data from the archive.
     611             :  *
     612             :  * Mandatory
     613             :  *
     614             :  * Called by the archiver to read bytes & integers from the archive.
     615             :  * EOF should be treated as a fatal error.
     616             :  */
     617             : static int
     618      368234 : _ReadByte(ArchiveHandle *AH)
     619             : {
     620      368234 :     lclContext *ctx = (lclContext *) AH->formatData;
     621             :     int         res;
     622             : 
     623      368234 :     res = getc(AH->FH);
     624      368234 :     if (res == EOF)
     625           0 :         READ_ERROR_EXIT(AH->FH);
     626      368234 :     ctx->filePos += 1;
     627      368234 :     return res;
     628             : }
     629             : 
     630             : /*
     631             :  * Write a buffer of data to the archive.
     632             :  *
     633             :  * Mandatory.
     634             :  *
     635             :  * Called by the archiver to write a block of bytes to the archive.
     636             :  */
     637             : static void
     638       82072 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
     639             : {
     640       82072 :     lclContext *ctx = (lclContext *) AH->formatData;
     641             : 
     642       82072 :     if (fwrite(buf, 1, len, AH->FH) != len)
     643           0 :         WRITE_ERROR_EXIT;
     644       82072 :     ctx->filePos += len;
     645       82072 : }
     646             : 
     647             : /*
     648             :  * Read a block of bytes from the archive.
     649             :  *
     650             :  * Mandatory.
     651             :  *
     652             :  * Called by the archiver to read a block of bytes from the archive
     653             :  */
     654             : static void
     655       41090 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
     656             : {
     657       41090 :     lclContext *ctx = (lclContext *) AH->formatData;
     658             : 
     659       41090 :     if (fread(buf, 1, len, AH->FH) != len)
     660           0 :         READ_ERROR_EXIT(AH->FH);
     661       41090 :     ctx->filePos += len;
     662       41090 : }
     663             : 
     664             : /*
     665             :  * Close the archive.
     666             :  *
     667             :  * Mandatory.
     668             :  *
     669             :  * When writing the archive, this is the routine that actually starts
     670             :  * the process of saving it to files. No data should be written prior
     671             :  * to this point, since the user could sort the TOC after creating it.
     672             :  *
     673             :  * If an archive is to be written, this routine must call:
     674             :  *      WriteHead           to save the archive header
     675             :  *      WriteToc            to save the TOC entries
     676             :  *      WriteDataChunks     to save all DATA & BLOBs.
     677             :  *
     678             :  */
     679             : static void
     680          36 : _CloseArchive(ArchiveHandle *AH)
     681             : {
     682          36 :     lclContext *ctx = (lclContext *) AH->formatData;
     683             :     pgoff_t     tpos;
     684             : 
     685          36 :     if (AH->mode == archModeWrite)
     686             :     {
     687          18 :         WriteHead(AH);
     688             :         /* Remember TOC's seek position for use below */
     689          18 :         tpos = ftello(AH->FH);
     690          18 :         if (tpos < 0 && ctx->hasSeek)
     691           0 :             fatal("could not determine seek position in archive file: %m");
     692          18 :         WriteToc(AH);
     693          18 :         ctx->dataStart = _getFilePos(AH, ctx);
     694          18 :         WriteDataChunks(AH, NULL);
     695             : 
     696             :         /*
     697             :          * If possible, re-write the TOC in order to update the data offset
     698             :          * information.  This is not essential, as pg_restore can cope in most
     699             :          * cases without it; but it can make pg_restore significantly faster
     700             :          * in some situations (especially parallel restore).
     701             :          */
     702          36 :         if (ctx->hasSeek &&
     703          18 :             fseeko(AH->FH, tpos, SEEK_SET) == 0)
     704          18 :             WriteToc(AH);
     705             :     }
     706             : 
     707          36 :     if (fclose(AH->FH) != 0)
     708           0 :         fatal("could not close archive file: %m");
     709             : 
     710             :     /* Sync the output file if one is defined */
     711          36 :     if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
     712          14 :         (void) fsync_fname(AH->fSpec, false);
     713             : 
     714          36 :     AH->FH = NULL;
     715          36 : }
     716             : 
     717             : /*
     718             :  * Reopen the archive's file handle.
     719             :  *
     720             :  * We close the original file handle, except on Windows.  (The difference
     721             :  * is because on Windows, this is used within a multithreading context,
     722             :  * and we don't want a thread closing the parent file handle.)
     723             :  */
     724             : static void
     725           0 : _ReopenArchive(ArchiveHandle *AH)
     726             : {
     727           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     728             :     pgoff_t     tpos;
     729             : 
     730           0 :     if (AH->mode == archModeWrite)
     731           0 :         fatal("can only reopen input archives");
     732             : 
     733             :     /*
     734             :      * These two cases are user-facing errors since they represent unsupported
     735             :      * (but not invalid) use-cases.  Word the error messages appropriately.
     736             :      */
     737           0 :     if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
     738           0 :         fatal("parallel restore from standard input is not supported");
     739           0 :     if (!ctx->hasSeek)
     740           0 :         fatal("parallel restore from non-seekable file is not supported");
     741             : 
     742           0 :     tpos = ftello(AH->FH);
     743           0 :     if (tpos < 0)
     744           0 :         fatal("could not determine seek position in archive file: %m");
     745             : 
     746             : #ifndef WIN32
     747           0 :     if (fclose(AH->FH) != 0)
     748           0 :         fatal("could not close archive file: %m");
     749             : #endif
     750             : 
     751           0 :     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     752           0 :     if (!AH->FH)
     753           0 :         fatal("could not open input file \"%s\": %m", AH->fSpec);
     754             : 
     755           0 :     if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
     756           0 :         fatal("could not set seek position in archive file: %m");
     757           0 : }
     758             : 
     759             : /*
     760             :  * Prepare for parallel restore.
     761             :  *
     762             :  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
     763             :  * TOC entries' dataLength fields with appropriate values to guide the
     764             :  * ordering of restore jobs.  The source of said data is format-dependent,
     765             :  * as is the exact meaning of the values.
     766             :  *
     767             :  * A format module might also choose to do other setup here.
     768             :  */
     769             : static void
     770           0 : _PrepParallelRestore(ArchiveHandle *AH)
     771             : {
     772           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     773           0 :     TocEntry   *prev_te = NULL;
     774           0 :     lclTocEntry *prev_tctx = NULL;
     775             :     TocEntry   *te;
     776             : 
     777             :     /*
     778             :      * Knowing that the data items were dumped out in TOC order, we can
     779             :      * reconstruct the length of each item as the delta to the start offset of
     780             :      * the next data item.
     781             :      */
     782           0 :     for (te = AH->toc->next; te != AH->toc; te = te->next)
     783             :     {
     784           0 :         lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     785             : 
     786             :         /*
     787             :          * Ignore entries without a known data offset; if we were unable to
     788             :          * seek to rewrite the TOC when creating the archive, this'll be all
     789             :          * of them, and we'll end up with no size estimates.
     790             :          */
     791           0 :         if (tctx->dataState != K_OFFSET_POS_SET)
     792           0 :             continue;
     793             : 
     794             :         /* Compute previous data item's length */
     795           0 :         if (prev_te)
     796             :         {
     797           0 :             if (tctx->dataPos > prev_tctx->dataPos)
     798           0 :                 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
     799             :         }
     800             : 
     801           0 :         prev_te = te;
     802           0 :         prev_tctx = tctx;
     803             :     }
     804             : 
     805             :     /* If OK to seek, we can determine the length of the last item */
     806           0 :     if (prev_te && ctx->hasSeek)
     807             :     {
     808             :         pgoff_t     endpos;
     809             : 
     810           0 :         if (fseeko(AH->FH, 0, SEEK_END) != 0)
     811           0 :             fatal("error during file seek: %m");
     812           0 :         endpos = ftello(AH->FH);
     813           0 :         if (endpos > prev_tctx->dataPos)
     814           0 :             prev_te->dataLength = endpos - prev_tctx->dataPos;
     815             :     }
     816           0 : }
     817             : 
     818             : /*
     819             :  * Clone format-specific fields during parallel restoration.
     820             :  */
     821             : static void
     822           0 : _Clone(ArchiveHandle *AH)
     823             : {
     824           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     825             : 
     826           0 :     AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
     827           0 :     memcpy(AH->formatData, ctx, sizeof(lclContext));
     828           0 :     ctx = (lclContext *) AH->formatData;
     829             : 
     830             :     /* sanity check, shouldn't happen */
     831           0 :     if (ctx->cs != NULL)
     832           0 :         fatal("compressor active");
     833             : 
     834             :     /*
     835             :      * Note: we do not make a local lo_buf because we expect at most one BLOBS
     836             :      * entry per archive, so no parallelism is possible.  Likewise,
     837             :      * TOC-entry-local state isn't an issue because any one TOC entry is
     838             :      * touched by just one worker child.
     839             :      */
     840           0 : }
     841             : 
     842             : static void
     843           0 : _DeClone(ArchiveHandle *AH)
     844             : {
     845           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     846             : 
     847           0 :     free(ctx);
     848           0 : }
     849             : 
     850             : /*
     851             :  * This function is executed in the child of a parallel restore from a
     852             :  * custom-format archive and restores the actual data for one TOC entry.
     853             :  */
     854             : static int
     855           0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
     856             : {
     857           0 :     return parallel_restore(AH, te);
     858             : }
     859             : 
     860             : /*--------------------------------------------------
     861             :  * END OF FORMAT CALLBACKS
     862             :  *--------------------------------------------------
     863             :  */
     864             : 
     865             : /*
     866             :  * Get the current position in the archive file.
     867             :  */
     868             : static pgoff_t
     869          72 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
     870             : {
     871             :     pgoff_t     pos;
     872             : 
     873          72 :     if (ctx->hasSeek)
     874             :     {
     875             :         /*
     876             :          * Prior to 1.7 (pg7.3) we relied on the internally maintained
     877             :          * pointer.  Now we rely on ftello() always, unless the file has been
     878             :          * found to not support it.  For debugging purposes, print a warning
     879             :          * if the internal pointer disagrees, so that we're more likely to
     880             :          * notice if something's broken about the internal position tracking.
     881             :          */
     882          72 :         pos = ftello(AH->FH);
     883          72 :         if (pos < 0)
     884           0 :             fatal("could not determine seek position in archive file: %m");
     885             : 
     886          72 :         if (pos != ctx->filePos)
     887           0 :             pg_log_warning("ftell mismatch with expected position -- ftell used");
     888             :     }
     889             :     else
     890           0 :         pos = ctx->filePos;
     891          72 :     return pos;
     892             : }
     893             : 
     894             : /*
     895             :  * Read a data block header. The format changed in V1.3, so we
     896             :  * centralize the code here for simplicity.  Returns *type = EOF
     897             :  * if at EOF.
     898             :  */
     899             : static void
     900          36 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
     901             : {
     902          36 :     lclContext *ctx = (lclContext *) AH->formatData;
     903             :     int         byt;
     904             : 
     905             :     /*
     906             :      * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
     907             :      * ReadInt rather than returning EOF.  It doesn't seem worth jumping
     908             :      * through hoops to deal with that case better, because no such files are
     909             :      * likely to exist in the wild: only some 7.1 development versions of
     910             :      * pg_dump ever generated such files.
     911             :      */
     912          36 :     if (AH->version < K_VERS_1_3)
     913           0 :         *type = BLK_DATA;
     914             :     else
     915             :     {
     916          36 :         byt = getc(AH->FH);
     917          36 :         *type = byt;
     918          36 :         if (byt == EOF)
     919             :         {
     920           0 :             *id = 0;            /* don't return an uninitialized value */
     921           0 :             return;
     922             :         }
     923          36 :         ctx->filePos += 1;
     924             :     }
     925             : 
     926          36 :     *id = ReadInt(AH);
     927             : }
     928             : 
     929             : /*
     930             :  * Callback function for WriteDataToArchive. Writes one block of (compressed)
     931             :  * data to the archive.
     932             :  */
     933             : static void
     934          36 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
     935             : {
     936             :     /* never write 0-byte blocks (this should not happen) */
     937          36 :     if (len > 0)
     938             :     {
     939          36 :         WriteInt(AH, len);
     940          36 :         _WriteBuf(AH, buf, len);
     941             :     }
     942          36 : }
     943             : 
     944             : /*
     945             :  * Callback function for ReadDataFromArchive. To keep things simple, we
     946             :  * always read one compressed block at a time.
     947             :  */
     948             : static size_t
     949          72 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
     950             : {
     951             :     size_t      blkLen;
     952             : 
     953             :     /* Read length */
     954          72 :     blkLen = ReadInt(AH);
     955          72 :     if (blkLen == 0)
     956          36 :         return 0;
     957             : 
     958             :     /* If the caller's buffer is not large enough, allocate a bigger one */
     959          36 :     if (blkLen > *buflen)
     960             :     {
     961           0 :         free(*buf);
     962           0 :         *buf = (char *) pg_malloc(blkLen);
     963           0 :         *buflen = blkLen;
     964             :     }
     965             : 
     966             :     /* exits app on read errors */
     967          36 :     _ReadBuf(AH, *buf, blkLen);
     968             : 
     969          36 :     return blkLen;
     970             : }

Generated by: LCOV version 1.13