LCOV - code coverage report
Current view: top level - src/bin/pg_dump - pg_backup_custom.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 200 322 62.1 %
Date: 2019-11-15 23:07:02 Functions: 24 31 77.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_backup_custom.c
       4             :  *
       5             :  *  Implements the custom output format.
       6             :  *
       7             :  *  The comments with the routines in this code are a good place to
       8             :  *  understand how to write a new format.
       9             :  *
      10             :  *  See the headers to pg_restore for more details.
      11             :  *
      12             :  * Copyright (c) 2000, Philip Warner
      13             :  *      Rights are granted to use this software in any way so long
      14             :  *      as this notice is not removed.
      15             :  *
      16             :  *  The author is not responsible for loss or damages that may
      17             :  *  and any liability will be limited to the time taken to fix any
      18             :  *  related bug.
      19             :  *
      20             :  *
      21             :  * IDENTIFICATION
      22             :  *      src/bin/pg_dump/pg_backup_custom.c
      23             :  *
      24             :  *-------------------------------------------------------------------------
      25             :  */
      26             : #include "postgres_fe.h"
      27             : 
      28             : #include "common/file_utils.h"
      29             : #include "compress_io.h"
      30             : #include "parallel.h"
      31             : #include "pg_backup_utils.h"
      32             : 
      33             : /*--------
      34             :  * Routines in the format interface
      35             :  *--------
      36             :  */
      37             : 
      38             : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
      39             : static void _StartData(ArchiveHandle *AH, TocEntry *te);
      40             : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
      41             : static void _EndData(ArchiveHandle *AH, TocEntry *te);
      42             : static int  _WriteByte(ArchiveHandle *AH, const int i);
      43             : static int  _ReadByte(ArchiveHandle *);
      44             : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
      45             : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
      46             : static void _CloseArchive(ArchiveHandle *AH);
      47             : static void _ReopenArchive(ArchiveHandle *AH);
      48             : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
      49             : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
      50             : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
      51             : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
      52             : 
      53             : static void _PrintData(ArchiveHandle *AH);
      54             : static void _skipData(ArchiveHandle *AH);
      55             : static void _skipBlobs(ArchiveHandle *AH);
      56             : 
      57             : static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
      58             : static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
      59             : static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
      60             : static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
      61             : static void _LoadBlobs(ArchiveHandle *AH, bool drop);
      62             : 
      63             : static void _PrepParallelRestore(ArchiveHandle *AH);
      64             : static void _Clone(ArchiveHandle *AH);
      65             : static void _DeClone(ArchiveHandle *AH);
      66             : 
      67             : static int  _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
      68             : 
      69             : typedef struct
      70             : {
      71             :     CompressorState *cs;
      72             :     int         hasSeek;
      73             :     pgoff_t     filePos;
      74             :     pgoff_t     dataStart;
      75             : } lclContext;
      76             : 
      77             : typedef struct
      78             : {
      79             :     int         dataState;
      80             :     pgoff_t     dataPos;
      81             : } lclTocEntry;
      82             : 
      83             : 
      84             : /*------
      85             :  * Static declarations
      86             :  *------
      87             :  */
      88             : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
      89             : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
      90             : 
      91             : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
      92             : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
      93             : 
      94             : 
      95             : /*
      96             :  *  Init routine required by ALL formats. This is a global routine
      97             :  *  and should be declared in pg_backup_archiver.h
      98             :  *
      99             :  *  It's task is to create any extra archive context (using AH->formatData),
     100             :  *  and to initialize the supported function pointers.
     101             :  *
     102             :  *  It should also prepare whatever it's input source is for reading/writing,
     103             :  *  and in the case of a read mode connection, it should load the Header & TOC.
     104             :  */
     105             : void
     106          36 : InitArchiveFmt_Custom(ArchiveHandle *AH)
     107             : {
     108             :     lclContext *ctx;
     109             : 
     110             :     /* Assuming static functions, this can be copied for each format. */
     111          36 :     AH->ArchiveEntryPtr = _ArchiveEntry;
     112          36 :     AH->StartDataPtr = _StartData;
     113          36 :     AH->WriteDataPtr = _WriteData;
     114          36 :     AH->EndDataPtr = _EndData;
     115          36 :     AH->WriteBytePtr = _WriteByte;
     116          36 :     AH->ReadBytePtr = _ReadByte;
     117          36 :     AH->WriteBufPtr = _WriteBuf;
     118          36 :     AH->ReadBufPtr = _ReadBuf;
     119          36 :     AH->ClosePtr = _CloseArchive;
     120          36 :     AH->ReopenPtr = _ReopenArchive;
     121          36 :     AH->PrintTocDataPtr = _PrintTocData;
     122          36 :     AH->ReadExtraTocPtr = _ReadExtraToc;
     123          36 :     AH->WriteExtraTocPtr = _WriteExtraToc;
     124          36 :     AH->PrintExtraTocPtr = _PrintExtraToc;
     125             : 
     126          36 :     AH->StartBlobsPtr = _StartBlobs;
     127          36 :     AH->StartBlobPtr = _StartBlob;
     128          36 :     AH->EndBlobPtr = _EndBlob;
     129          36 :     AH->EndBlobsPtr = _EndBlobs;
     130             : 
     131          36 :     AH->PrepParallelRestorePtr = _PrepParallelRestore;
     132          36 :     AH->ClonePtr = _Clone;
     133          36 :     AH->DeClonePtr = _DeClone;
     134             : 
     135             :     /* no parallel dump in the custom archive, only parallel restore */
     136          36 :     AH->WorkerJobDumpPtr = NULL;
     137          36 :     AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
     138             : 
     139             :     /* Set up a private area. */
     140          36 :     ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
     141          36 :     AH->formatData = (void *) ctx;
     142             : 
     143             :     /* Initialize LO buffering */
     144          36 :     AH->lo_buf_size = LOBBUFSIZE;
     145          36 :     AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
     146             : 
     147          36 :     ctx->filePos = 0;
     148             : 
     149             :     /*
     150             :      * Now open the file
     151             :      */
     152          36 :     if (AH->mode == archModeWrite)
     153             :     {
     154          18 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     155             :         {
     156          18 :             AH->FH = fopen(AH->fSpec, PG_BINARY_W);
     157          36 :             if (!AH->FH)
     158           0 :                 fatal("could not open output file \"%s\": %m", AH->fSpec);
     159             :         }
     160             :         else
     161             :         {
     162           0 :             AH->FH = stdout;
     163           0 :             if (!AH->FH)
     164           0 :                 fatal("could not open output file: %m");
     165             :         }
     166             : 
     167          18 :         ctx->hasSeek = checkSeek(AH->FH);
     168             :     }
     169             :     else
     170             :     {
     171          18 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     172             :         {
     173          18 :             AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     174          36 :             if (!AH->FH)
     175           0 :                 fatal("could not open input file \"%s\": %m", AH->fSpec);
     176             :         }
     177             :         else
     178             :         {
     179           0 :             AH->FH = stdin;
     180           0 :             if (!AH->FH)
     181           0 :                 fatal("could not open input file: %m");
     182             :         }
     183             : 
     184          18 :         ctx->hasSeek = checkSeek(AH->FH);
     185             : 
     186          18 :         ReadHead(AH);
     187          18 :         ReadToc(AH);
     188          18 :         ctx->dataStart = _getFilePos(AH, ctx);
     189             :     }
     190             : 
     191          36 : }
     192             : 
     193             : /*
     194             :  * Called by the Archiver when the dumper creates a new TOC entry.
     195             :  *
     196             :  * Optional.
     197             :  *
     198             :  * Set up extract format-related TOC data.
     199             : */
     200             : static void
     201        3718 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
     202             : {
     203             :     lclTocEntry *ctx;
     204             : 
     205        3718 :     ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     206        3718 :     if (te->dataDumper)
     207          38 :         ctx->dataState = K_OFFSET_POS_NOT_SET;
     208             :     else
     209        3680 :         ctx->dataState = K_OFFSET_NO_DATA;
     210             : 
     211        3718 :     te->formatData = (void *) ctx;
     212        3718 : }
     213             : 
     214             : /*
     215             :  * Called by the Archiver to save any extra format-related TOC entry
     216             :  * data.
     217             :  *
     218             :  * Optional.
     219             :  *
     220             :  * Use the Archiver routines to write data - they are non-endian, and
     221             :  * maintain other important file information.
     222             :  */
     223             : static void
     224        7428 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
     225             : {
     226        7428 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     227             : 
     228        7428 :     WriteOffset(AH, ctx->dataPos, ctx->dataState);
     229        7428 : }
     230             : 
     231             : /*
     232             :  * Called by the Archiver to read any extra format-related TOC data.
     233             :  *
     234             :  * Optional.
     235             :  *
     236             :  * Needs to match the order defined in _WriteExtraToc, and should also
     237             :  * use the Archiver input routines.
     238             :  */
     239             : static void
     240        3714 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
     241             : {
     242        3714 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     243             : 
     244        3714 :     if (ctx == NULL)
     245             :     {
     246        3714 :         ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     247        3714 :         te->formatData = (void *) ctx;
     248             :     }
     249             : 
     250        3714 :     ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
     251             : 
     252             :     /*
     253             :      * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
     254             :      * dump it at all.
     255             :      */
     256        3714 :     if (AH->version < K_VERS_1_7)
     257           0 :         ReadInt(AH);
     258        3714 : }
     259             : 
     260             : /*
     261             :  * Called by the Archiver when restoring an archive to output a comment
     262             :  * that includes useful information about the TOC entry.
     263             :  *
     264             :  * Optional.
     265             :  *
     266             :  */
     267             : static void
     268         750 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
     269             : {
     270         750 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     271             : 
     272         750 :     if (AH->public.verbose)
     273         346 :         ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
     274         346 :                  (int64) ctx->dataPos);
     275         750 : }
     276             : 
     277             : /*
     278             :  * Called by the archiver when saving TABLE DATA (not schema). This routine
     279             :  * should save whatever format-specific information is needed to read
     280             :  * the archive back.
     281             :  *
     282             :  * It is called just prior to the dumper's 'DataDumper' routine being called.
     283             :  *
     284             :  * Optional, but strongly recommended.
     285             :  *
     286             :  */
     287             : static void
     288          32 : _StartData(ArchiveHandle *AH, TocEntry *te)
     289             : {
     290          32 :     lclContext *ctx = (lclContext *) AH->formatData;
     291          32 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     292             : 
     293          32 :     tctx->dataPos = _getFilePos(AH, ctx);
     294          32 :     tctx->dataState = K_OFFSET_POS_SET;
     295             : 
     296          32 :     _WriteByte(AH, BLK_DATA);   /* Block type */
     297          32 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     298             : 
     299          32 :     ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
     300          32 : }
     301             : 
     302             : /*
     303             :  * Called by archiver when dumper calls WriteData. This routine is
     304             :  * called for both BLOB and TABLE data; it is the responsibility of
     305             :  * the format to manage each kind of data using StartBlob/StartData.
     306             :  *
     307             :  * It should only be called from within a DataDumper routine.
     308             :  *
     309             :  * Mandatory.
     310             :  */
     311             : static void
     312          90 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
     313             : {
     314          90 :     lclContext *ctx = (lclContext *) AH->formatData;
     315          90 :     CompressorState *cs = ctx->cs;
     316             : 
     317          90 :     if (dLen > 0)
     318             :         /* WriteDataToArchive() internally throws write errors */
     319          88 :         WriteDataToArchive(AH, cs, data, dLen);
     320             : 
     321          90 :     return;
     322             : }
     323             : 
     324             : /*
     325             :  * Called by the archiver when a dumper's 'DataDumper' routine has
     326             :  * finished.
     327             :  *
     328             :  * Optional.
     329             :  *
     330             :  */
     331             : static void
     332          32 : _EndData(ArchiveHandle *AH, TocEntry *te)
     333             : {
     334          32 :     lclContext *ctx = (lclContext *) AH->formatData;
     335             : 
     336          32 :     EndCompressor(AH, ctx->cs);
     337             :     /* Send the end marker */
     338          32 :     WriteInt(AH, 0);
     339          32 : }
     340             : 
     341             : /*
     342             :  * Called by the archiver when starting to save all BLOB DATA (not schema).
     343             :  * This routine should save whatever format-specific information is needed
     344             :  * to read the BLOBs back into memory.
     345             :  *
     346             :  * It is called just prior to the dumper's DataDumper routine.
     347             :  *
     348             :  * Optional, but strongly recommended.
     349             :  */
     350             : static void
     351           2 : _StartBlobs(ArchiveHandle *AH, TocEntry *te)
     352             : {
     353           2 :     lclContext *ctx = (lclContext *) AH->formatData;
     354           2 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     355             : 
     356           2 :     tctx->dataPos = _getFilePos(AH, ctx);
     357           2 :     tctx->dataState = K_OFFSET_POS_SET;
     358             : 
     359           2 :     _WriteByte(AH, BLK_BLOBS);  /* Block type */
     360           2 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     361           2 : }
     362             : 
     363             : /*
     364             :  * Called by the archiver when the dumper calls StartBlob.
     365             :  *
     366             :  * Mandatory.
     367             :  *
     368             :  * Must save the passed OID for retrieval at restore-time.
     369             :  */
     370             : static void
     371           2 : _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
     372             : {
     373           2 :     lclContext *ctx = (lclContext *) AH->formatData;
     374             : 
     375           2 :     if (oid == 0)
     376           0 :         fatal("invalid OID for large object");
     377             : 
     378           2 :     WriteInt(AH, oid);
     379             : 
     380           2 :     ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
     381           2 : }
     382             : 
     383             : /*
     384             :  * Called by the archiver when the dumper calls EndBlob.
     385             :  *
     386             :  * Optional.
     387             :  */
     388             : static void
     389           2 : _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
     390             : {
     391           2 :     lclContext *ctx = (lclContext *) AH->formatData;
     392             : 
     393           2 :     EndCompressor(AH, ctx->cs);
     394             :     /* Send the end marker */
     395           2 :     WriteInt(AH, 0);
     396           2 : }
     397             : 
     398             : /*
     399             :  * Called by the archiver when finishing saving all BLOB DATA.
     400             :  *
     401             :  * Optional.
     402             :  */
     403             : static void
     404           2 : _EndBlobs(ArchiveHandle *AH, TocEntry *te)
     405             : {
     406             :     /* Write out a fake zero OID to mark end-of-blobs. */
     407           2 :     WriteInt(AH, 0);
     408           2 : }
     409             : 
     410             : /*
     411             :  * Print data for a given TOC entry
     412             :  */
     413             : static void
     414          34 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
     415             : {
     416          34 :     lclContext *ctx = (lclContext *) AH->formatData;
     417          34 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     418             :     int         blkType;
     419             :     int         id;
     420             : 
     421          34 :     if (tctx->dataState == K_OFFSET_NO_DATA)
     422           0 :         return;
     423             : 
     424          34 :     if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
     425             :     {
     426             :         /*
     427             :          * We cannot seek directly to the desired block.  Instead, skip over
     428             :          * block headers until we find the one we want.  This could fail if we
     429             :          * are asked to restore items out-of-order.
     430             :          */
     431           0 :         _readBlockHeader(AH, &blkType, &id);
     432             : 
     433           0 :         while (blkType != EOF && id != te->dumpId)
     434             :         {
     435           0 :             switch (blkType)
     436             :             {
     437             :                 case BLK_DATA:
     438           0 :                     _skipData(AH);
     439           0 :                     break;
     440             : 
     441             :                 case BLK_BLOBS:
     442           0 :                     _skipBlobs(AH);
     443           0 :                     break;
     444             : 
     445             :                 default:        /* Always have a default */
     446           0 :                     fatal("unrecognized data block type (%d) while searching archive",
     447             :                           blkType);
     448             :                     break;
     449             :             }
     450           0 :             _readBlockHeader(AH, &blkType, &id);
     451             :         }
     452             :     }
     453             :     else
     454             :     {
     455             :         /* We can just seek to the place we need to be. */
     456          34 :         if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
     457           0 :             fatal("error during file seek: %m");
     458             : 
     459          34 :         _readBlockHeader(AH, &blkType, &id);
     460             :     }
     461             : 
     462             :     /* Produce suitable failure message if we fell off end of file */
     463          34 :     if (blkType == EOF)
     464             :     {
     465           0 :         if (tctx->dataState == K_OFFSET_POS_NOT_SET)
     466           0 :             fatal("could not find block ID %d in archive -- "
     467             :                   "possibly due to out-of-order restore request, "
     468             :                   "which cannot be handled due to lack of data offsets in archive",
     469             :                   te->dumpId);
     470           0 :         else if (!ctx->hasSeek)
     471           0 :             fatal("could not find block ID %d in archive -- "
     472             :                   "possibly due to out-of-order restore request, "
     473             :                   "which cannot be handled due to non-seekable input file",
     474             :                   te->dumpId);
     475             :         else                    /* huh, the dataPos led us to EOF? */
     476           0 :             fatal("could not find block ID %d in archive -- "
     477             :                   "possibly corrupt archive",
     478             :                   te->dumpId);
     479             :     }
     480             : 
     481             :     /* Are we sane? */
     482          34 :     if (id != te->dumpId)
     483           0 :         fatal("found unexpected block ID (%d) when reading data -- expected %d",
     484             :               id, te->dumpId);
     485             : 
     486          34 :     switch (blkType)
     487             :     {
     488             :         case BLK_DATA:
     489          32 :             _PrintData(AH);
     490          32 :             break;
     491             : 
     492             :         case BLK_BLOBS:
     493           2 :             _LoadBlobs(AH, AH->public.ropt->dropSchema);
     494           2 :             break;
     495             : 
     496             :         default:                /* Always have a default */
     497           0 :             fatal("unrecognized data block type %d while restoring archive",
     498             :                   blkType);
     499             :             break;
     500             :     }
     501             : }
     502             : 
     503             : /*
     504             :  * Print data from current file position.
     505             : */
     506             : static void
     507          34 : _PrintData(ArchiveHandle *AH)
     508             : {
     509          34 :     ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
     510          34 : }
     511             : 
     512             : static void
     513           2 : _LoadBlobs(ArchiveHandle *AH, bool drop)
     514             : {
     515             :     Oid         oid;
     516             : 
     517           2 :     StartRestoreBlobs(AH);
     518             : 
     519           2 :     oid = ReadInt(AH);
     520           6 :     while (oid != 0)
     521             :     {
     522           2 :         StartRestoreBlob(AH, oid, drop);
     523           2 :         _PrintData(AH);
     524           2 :         EndRestoreBlob(AH, oid);
     525           2 :         oid = ReadInt(AH);
     526             :     }
     527             : 
     528           2 :     EndRestoreBlobs(AH);
     529           2 : }
     530             : 
     531             : /*
     532             :  * Skip the BLOBs from the current file position.
     533             :  * BLOBS are written sequentially as data blocks (see below).
     534             :  * Each BLOB is preceded by it's original OID.
     535             :  * A zero OID indicated the end of the BLOBS
     536             :  */
     537             : static void
     538           0 : _skipBlobs(ArchiveHandle *AH)
     539             : {
     540             :     Oid         oid;
     541             : 
     542           0 :     oid = ReadInt(AH);
     543           0 :     while (oid != 0)
     544             :     {
     545           0 :         _skipData(AH);
     546           0 :         oid = ReadInt(AH);
     547             :     }
     548           0 : }
     549             : 
     550             : /*
     551             :  * Skip data from current file position.
     552             :  * Data blocks are formatted as an integer length, followed by data.
     553             :  * A zero length denoted the end of the block.
     554             : */
     555             : static void
     556           0 : _skipData(ArchiveHandle *AH)
     557             : {
     558           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     559             :     size_t      blkLen;
     560           0 :     char       *buf = NULL;
     561           0 :     int         buflen = 0;
     562             :     size_t      cnt;
     563             : 
     564           0 :     blkLen = ReadInt(AH);
     565           0 :     while (blkLen != 0)
     566             :     {
     567           0 :         if (blkLen > buflen)
     568             :         {
     569           0 :             if (buf)
     570           0 :                 free(buf);
     571           0 :             buf = (char *) pg_malloc(blkLen);
     572           0 :             buflen = blkLen;
     573             :         }
     574           0 :         if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
     575             :         {
     576           0 :             if (feof(AH->FH))
     577           0 :                 fatal("could not read from input file: end of file");
     578             :             else
     579           0 :                 fatal("could not read from input file: %m");
     580             :         }
     581             : 
     582           0 :         ctx->filePos += blkLen;
     583             : 
     584           0 :         blkLen = ReadInt(AH);
     585             :     }
     586             : 
     587           0 :     if (buf)
     588           0 :         free(buf);
     589           0 : }
     590             : 
     591             : /*
     592             :  * Write a byte of data to the archive.
     593             :  *
     594             :  * Mandatory.
     595             :  *
     596             :  * Called by the archiver to do integer & byte output to the archive.
     597             :  */
     598             : static int
     599      708474 : _WriteByte(ArchiveHandle *AH, const int i)
     600             : {
     601      708474 :     lclContext *ctx = (lclContext *) AH->formatData;
     602             :     int         res;
     603             : 
     604      708474 :     if ((res = fputc(i, AH->FH)) == EOF)
     605           0 :         WRITE_ERROR_EXIT;
     606      708474 :     ctx->filePos += 1;
     607             : 
     608      708474 :     return 1;
     609             : }
     610             : 
     611             : /*
     612             :  * Read a byte of data from the archive.
     613             :  *
     614             :  * Mandatory
     615             :  *
     616             :  * Called by the archiver to read bytes & integers from the archive.
     617             :  * EOF should be treated as a fatal error.
     618             :  */
     619             : static int
     620      355034 : _ReadByte(ArchiveHandle *AH)
     621             : {
     622      355034 :     lclContext *ctx = (lclContext *) AH->formatData;
     623             :     int         res;
     624             : 
     625      355034 :     res = getc(AH->FH);
     626      355034 :     if (res == EOF)
     627           0 :         READ_ERROR_EXIT(AH->FH);
     628      355034 :     ctx->filePos += 1;
     629      355034 :     return res;
     630             : }
     631             : 
     632             : /*
     633             :  * Write a buffer of data to the archive.
     634             :  *
     635             :  * Mandatory.
     636             :  *
     637             :  * Called by the archiver to write a block of bytes to the archive.
     638             :  */
     639             : static void
     640       79010 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
     641             : {
     642       79010 :     lclContext *ctx = (lclContext *) AH->formatData;
     643             : 
     644       79010 :     if (fwrite(buf, 1, len, AH->FH) != len)
     645           0 :         WRITE_ERROR_EXIT;
     646       79010 :     ctx->filePos += len;
     647             : 
     648       79010 :     return;
     649             : }
     650             : 
     651             : /*
     652             :  * Read a block of bytes from the archive.
     653             :  *
     654             :  * Mandatory.
     655             :  *
     656             :  * Called by the archiver to read a block of bytes from the archive
     657             :  */
     658             : static void
     659       39558 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
     660             : {
     661       39558 :     lclContext *ctx = (lclContext *) AH->formatData;
     662             : 
     663       39558 :     if (fread(buf, 1, len, AH->FH) != len)
     664           0 :         READ_ERROR_EXIT(AH->FH);
     665       39558 :     ctx->filePos += len;
     666             : 
     667       39558 :     return;
     668             : }
     669             : 
     670             : /*
     671             :  * Close the archive.
     672             :  *
     673             :  * Mandatory.
     674             :  *
     675             :  * When writing the archive, this is the routine that actually starts
     676             :  * the process of saving it to files. No data should be written prior
     677             :  * to this point, since the user could sort the TOC after creating it.
     678             :  *
     679             :  * If an archive is to be written, this routine must call:
     680             :  *      WriteHead           to save the archive header
     681             :  *      WriteToc            to save the TOC entries
     682             :  *      WriteDataChunks     to save all DATA & BLOBs.
     683             :  *
     684             :  */
     685             : static void
     686          36 : _CloseArchive(ArchiveHandle *AH)
     687             : {
     688          36 :     lclContext *ctx = (lclContext *) AH->formatData;
     689             :     pgoff_t     tpos;
     690             : 
     691          36 :     if (AH->mode == archModeWrite)
     692             :     {
     693          18 :         WriteHead(AH);
     694             :         /* Remember TOC's seek position for use below */
     695          18 :         tpos = ftello(AH->FH);
     696          18 :         if (tpos < 0 && ctx->hasSeek)
     697           0 :             fatal("could not determine seek position in archive file: %m");
     698          18 :         WriteToc(AH);
     699          18 :         ctx->dataStart = _getFilePos(AH, ctx);
     700          18 :         WriteDataChunks(AH, NULL);
     701             : 
     702             :         /*
     703             :          * If possible, re-write the TOC in order to update the data offset
     704             :          * information.  This is not essential, as pg_restore can cope in most
     705             :          * cases without it; but it can make pg_restore significantly faster
     706             :          * in some situations (especially parallel restore).
     707             :          */
     708          36 :         if (ctx->hasSeek &&
     709          18 :             fseeko(AH->FH, tpos, SEEK_SET) == 0)
     710          18 :             WriteToc(AH);
     711             :     }
     712             : 
     713          36 :     if (fclose(AH->FH) != 0)
     714           0 :         fatal("could not close archive file: %m");
     715             : 
     716             :     /* Sync the output file if one is defined */
     717          36 :     if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
     718          14 :         (void) fsync_fname(AH->fSpec, false);
     719             : 
     720          36 :     AH->FH = NULL;
     721          36 : }
     722             : 
     723             : /*
     724             :  * Reopen the archive's file handle.
     725             :  *
     726             :  * We close the original file handle, except on Windows.  (The difference
     727             :  * is because on Windows, this is used within a multithreading context,
     728             :  * and we don't want a thread closing the parent file handle.)
     729             :  */
     730             : static void
     731           0 : _ReopenArchive(ArchiveHandle *AH)
     732             : {
     733           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     734             :     pgoff_t     tpos;
     735             : 
     736           0 :     if (AH->mode == archModeWrite)
     737           0 :         fatal("can only reopen input archives");
     738             : 
     739             :     /*
     740             :      * These two cases are user-facing errors since they represent unsupported
     741             :      * (but not invalid) use-cases.  Word the error messages appropriately.
     742             :      */
     743           0 :     if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
     744           0 :         fatal("parallel restore from standard input is not supported");
     745           0 :     if (!ctx->hasSeek)
     746           0 :         fatal("parallel restore from non-seekable file is not supported");
     747             : 
     748           0 :     tpos = ftello(AH->FH);
     749           0 :     if (tpos < 0)
     750           0 :         fatal("could not determine seek position in archive file: %m");
     751             : 
     752             : #ifndef WIN32
     753           0 :     if (fclose(AH->FH) != 0)
     754           0 :         fatal("could not close archive file: %m");
     755             : #endif
     756             : 
     757           0 :     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     758           0 :     if (!AH->FH)
     759           0 :         fatal("could not open input file \"%s\": %m", AH->fSpec);
     760             : 
     761           0 :     if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
     762           0 :         fatal("could not set seek position in archive file: %m");
     763           0 : }
     764             : 
     765             : /*
     766             :  * Prepare for parallel restore.
     767             :  *
     768             :  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
     769             :  * TOC entries' dataLength fields with appropriate values to guide the
     770             :  * ordering of restore jobs.  The source of said data is format-dependent,
     771             :  * as is the exact meaning of the values.
     772             :  *
     773             :  * A format module might also choose to do other setup here.
     774             :  */
     775             : static void
     776           0 : _PrepParallelRestore(ArchiveHandle *AH)
     777             : {
     778           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     779           0 :     TocEntry   *prev_te = NULL;
     780           0 :     lclTocEntry *prev_tctx = NULL;
     781             :     TocEntry   *te;
     782             : 
     783             :     /*
     784             :      * Knowing that the data items were dumped out in TOC order, we can
     785             :      * reconstruct the length of each item as the delta to the start offset of
     786             :      * the next data item.
     787             :      */
     788           0 :     for (te = AH->toc->next; te != AH->toc; te = te->next)
     789             :     {
     790           0 :         lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     791             : 
     792             :         /*
     793             :          * Ignore entries without a known data offset; if we were unable to
     794             :          * seek to rewrite the TOC when creating the archive, this'll be all
     795             :          * of them, and we'll end up with no size estimates.
     796             :          */
     797           0 :         if (tctx->dataState != K_OFFSET_POS_SET)
     798           0 :             continue;
     799             : 
     800             :         /* Compute previous data item's length */
     801           0 :         if (prev_te)
     802             :         {
     803           0 :             if (tctx->dataPos > prev_tctx->dataPos)
     804           0 :                 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
     805             :         }
     806             : 
     807           0 :         prev_te = te;
     808           0 :         prev_tctx = tctx;
     809             :     }
     810             : 
     811             :     /* If OK to seek, we can determine the length of the last item */
     812           0 :     if (prev_te && ctx->hasSeek)
     813             :     {
     814             :         pgoff_t     endpos;
     815             : 
     816           0 :         if (fseeko(AH->FH, 0, SEEK_END) != 0)
     817           0 :             fatal("error during file seek: %m");
     818           0 :         endpos = ftello(AH->FH);
     819           0 :         if (endpos > prev_tctx->dataPos)
     820           0 :             prev_te->dataLength = endpos - prev_tctx->dataPos;
     821             :     }
     822           0 : }
     823             : 
     824             : /*
     825             :  * Clone format-specific fields during parallel restoration.
     826             :  */
     827             : static void
     828           0 : _Clone(ArchiveHandle *AH)
     829             : {
     830           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     831             : 
     832           0 :     AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
     833           0 :     memcpy(AH->formatData, ctx, sizeof(lclContext));
     834           0 :     ctx = (lclContext *) AH->formatData;
     835             : 
     836             :     /* sanity check, shouldn't happen */
     837           0 :     if (ctx->cs != NULL)
     838           0 :         fatal("compressor active");
     839             : 
     840             :     /*
     841             :      * Note: we do not make a local lo_buf because we expect at most one BLOBS
     842             :      * entry per archive, so no parallelism is possible.  Likewise,
     843             :      * TOC-entry-local state isn't an issue because any one TOC entry is
     844             :      * touched by just one worker child.
     845             :      */
     846           0 : }
     847             : 
     848             : static void
     849           0 : _DeClone(ArchiveHandle *AH)
     850             : {
     851           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     852             : 
     853           0 :     free(ctx);
     854           0 : }
     855             : 
     856             : /*
     857             :  * This function is executed in the child of a parallel restore from a
     858             :  * custom-format archive and restores the actual data for one TOC entry.
     859             :  */
     860             : static int
     861           0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
     862             : {
     863           0 :     return parallel_restore(AH, te);
     864             : }
     865             : 
     866             : /*--------------------------------------------------
     867             :  * END OF FORMAT CALLBACKS
     868             :  *--------------------------------------------------
     869             :  */
     870             : 
     871             : /*
     872             :  * Get the current position in the archive file.
     873             :  */
     874             : static pgoff_t
     875          70 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
     876             : {
     877             :     pgoff_t     pos;
     878             : 
     879          70 :     if (ctx->hasSeek)
     880             :     {
     881             :         /*
     882             :          * Prior to 1.7 (pg7.3) we relied on the internally maintained
     883             :          * pointer.  Now we rely on ftello() always, unless the file has been
     884             :          * found to not support it.  For debugging purposes, print a warning
     885             :          * if the internal pointer disagrees, so that we're more likely to
     886             :          * notice if something's broken about the internal position tracking.
     887             :          */
     888          70 :         pos = ftello(AH->FH);
     889          70 :         if (pos < 0)
     890           0 :             fatal("could not determine seek position in archive file: %m");
     891             : 
     892          70 :         if (pos != ctx->filePos)
     893           0 :             pg_log_warning("ftell mismatch with expected position -- ftell used");
     894             :     }
     895             :     else
     896           0 :         pos = ctx->filePos;
     897          70 :     return pos;
     898             : }
     899             : 
     900             : /*
     901             :  * Read a data block header. The format changed in V1.3, so we
     902             :  * centralize the code here for simplicity.  Returns *type = EOF
     903             :  * if at EOF.
     904             :  */
     905             : static void
     906          34 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
     907             : {
     908          34 :     lclContext *ctx = (lclContext *) AH->formatData;
     909             :     int         byt;
     910             : 
     911             :     /*
     912             :      * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
     913             :      * ReadInt rather than returning EOF.  It doesn't seem worth jumping
     914             :      * through hoops to deal with that case better, because no such files are
     915             :      * likely to exist in the wild: only some 7.1 development versions of
     916             :      * pg_dump ever generated such files.
     917             :      */
     918          34 :     if (AH->version < K_VERS_1_3)
     919           0 :         *type = BLK_DATA;
     920             :     else
     921             :     {
     922          34 :         byt = getc(AH->FH);
     923          34 :         *type = byt;
     924          34 :         if (byt == EOF)
     925             :         {
     926           0 :             *id = 0;            /* don't return an uninitialized value */
     927           0 :             return;
     928             :         }
     929          34 :         ctx->filePos += 1;
     930             :     }
     931             : 
     932          34 :     *id = ReadInt(AH);
     933             : }
     934             : 
     935             : /*
     936             :  * Callback function for WriteDataToArchive. Writes one block of (compressed)
     937             :  * data to the archive.
     938             :  */
     939             : static void
     940          34 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
     941             : {
     942             :     /* never write 0-byte blocks (this should not happen) */
     943          34 :     if (len > 0)
     944             :     {
     945          34 :         WriteInt(AH, len);
     946          34 :         _WriteBuf(AH, buf, len);
     947             :     }
     948          34 :     return;
     949             : }
     950             : 
     951             : /*
     952             :  * Callback function for ReadDataFromArchive. To keep things simple, we
     953             :  * always read one compressed block at a time.
     954             :  */
     955             : static size_t
     956          68 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
     957             : {
     958             :     size_t      blkLen;
     959             : 
     960             :     /* Read length */
     961          68 :     blkLen = ReadInt(AH);
     962          68 :     if (blkLen == 0)
     963          34 :         return 0;
     964             : 
     965             :     /* If the caller's buffer is not large enough, allocate a bigger one */
     966          34 :     if (blkLen > *buflen)
     967             :     {
     968           0 :         free(*buf);
     969           0 :         *buf = (char *) pg_malloc(blkLen);
     970           0 :         *buflen = blkLen;
     971             :     }
     972             : 
     973             :     /* exits app on read errors */
     974          34 :     _ReadBuf(AH, *buf, blkLen);
     975             : 
     976          34 :     return blkLen;
     977             : }

Generated by: LCOV version 1.13