LCOV - code coverage report
Current view: top level - src/bin/pg_dump - pg_backup_custom.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 191 333 57.4 %
Date: 2021-12-09 04:09:06 Functions: 24 31 77.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_backup_custom.c
       4             :  *
       5             :  *  Implements the custom output format.
       6             :  *
       7             :  *  The comments with the routines in this code are a good place to
       8             :  *  understand how to write a new format.
       9             :  *
      10             :  *  See the headers to pg_restore for more details.
      11             :  *
      12             :  * Copyright (c) 2000, Philip Warner
      13             :  *      Rights are granted to use this software in any way so long
      14             :  *      as this notice is not removed.
      15             :  *
      16             :  *  The author is not responsible for loss or damages that may
      17             :  *  and any liability will be limited to the time taken to fix any
      18             :  *  related bug.
      19             :  *
      20             :  *
      21             :  * IDENTIFICATION
      22             :  *      src/bin/pg_dump/pg_backup_custom.c
      23             :  *
      24             :  *-------------------------------------------------------------------------
      25             :  */
      26             : #include "postgres_fe.h"
      27             : 
      28             : #include "common/file_utils.h"
      29             : #include "compress_io.h"
      30             : #include "parallel.h"
      31             : #include "pg_backup_utils.h"
      32             : 
      33             : /*--------
      34             :  * Routines in the format interface
      35             :  *--------
      36             :  */
      37             : 
      38             : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
      39             : static void _StartData(ArchiveHandle *AH, TocEntry *te);
      40             : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
      41             : static void _EndData(ArchiveHandle *AH, TocEntry *te);
      42             : static int  _WriteByte(ArchiveHandle *AH, const int i);
      43             : static int  _ReadByte(ArchiveHandle *);
      44             : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
      45             : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
      46             : static void _CloseArchive(ArchiveHandle *AH);
      47             : static void _ReopenArchive(ArchiveHandle *AH);
      48             : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
      49             : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
      50             : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
      51             : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
      52             : 
      53             : static void _PrintData(ArchiveHandle *AH);
      54             : static void _skipData(ArchiveHandle *AH);
      55             : static void _skipBlobs(ArchiveHandle *AH);
      56             : 
      57             : static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
      58             : static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
      59             : static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
      60             : static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
      61             : static void _LoadBlobs(ArchiveHandle *AH, bool drop);
      62             : 
      63             : static void _PrepParallelRestore(ArchiveHandle *AH);
      64             : static void _Clone(ArchiveHandle *AH);
      65             : static void _DeClone(ArchiveHandle *AH);
      66             : 
      67             : static int  _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
      68             : 
      69             : typedef struct
      70             : {
      71             :     CompressorState *cs;
      72             :     int         hasSeek;
      73             :     /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
      74             :     pgoff_t     lastFilePos;    /* position after last data block we've read */
      75             : } lclContext;
      76             : 
      77             : typedef struct
      78             : {
      79             :     int         dataState;
      80             :     pgoff_t     dataPos;        /* valid only if dataState=K_OFFSET_POS_SET */
      81             : } lclTocEntry;
      82             : 
      83             : 
      84             : /*------
      85             :  * Static declarations
      86             :  *------
      87             :  */
      88             : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
      89             : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
      90             : 
      91             : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
      92             : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
      93             : 
      94             : 
      95             : /*
      96             :  *  Init routine required by ALL formats. This is a global routine
      97             :  *  and should be declared in pg_backup_archiver.h
      98             :  *
      99             :  *  It's task is to create any extra archive context (using AH->formatData),
     100             :  *  and to initialize the supported function pointers.
     101             :  *
     102             :  *  It should also prepare whatever it's input source is for reading/writing,
     103             :  *  and in the case of a read mode connection, it should load the Header & TOC.
     104             :  */
     105             : void
     106          36 : InitArchiveFmt_Custom(ArchiveHandle *AH)
     107             : {
     108             :     lclContext *ctx;
     109             : 
     110             :     /* Assuming static functions, this can be copied for each format. */
     111          36 :     AH->ArchiveEntryPtr = _ArchiveEntry;
     112          36 :     AH->StartDataPtr = _StartData;
     113          36 :     AH->WriteDataPtr = _WriteData;
     114          36 :     AH->EndDataPtr = _EndData;
     115          36 :     AH->WriteBytePtr = _WriteByte;
     116          36 :     AH->ReadBytePtr = _ReadByte;
     117          36 :     AH->WriteBufPtr = _WriteBuf;
     118          36 :     AH->ReadBufPtr = _ReadBuf;
     119          36 :     AH->ClosePtr = _CloseArchive;
     120          36 :     AH->ReopenPtr = _ReopenArchive;
     121          36 :     AH->PrintTocDataPtr = _PrintTocData;
     122          36 :     AH->ReadExtraTocPtr = _ReadExtraToc;
     123          36 :     AH->WriteExtraTocPtr = _WriteExtraToc;
     124          36 :     AH->PrintExtraTocPtr = _PrintExtraToc;
     125             : 
     126          36 :     AH->StartBlobsPtr = _StartBlobs;
     127          36 :     AH->StartBlobPtr = _StartBlob;
     128          36 :     AH->EndBlobPtr = _EndBlob;
     129          36 :     AH->EndBlobsPtr = _EndBlobs;
     130             : 
     131          36 :     AH->PrepParallelRestorePtr = _PrepParallelRestore;
     132          36 :     AH->ClonePtr = _Clone;
     133          36 :     AH->DeClonePtr = _DeClone;
     134             : 
     135             :     /* no parallel dump in the custom archive, only parallel restore */
     136          36 :     AH->WorkerJobDumpPtr = NULL;
     137          36 :     AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
     138             : 
     139             :     /* Set up a private area. */
     140          36 :     ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
     141          36 :     AH->formatData = (void *) ctx;
     142             : 
     143             :     /* Initialize LO buffering */
     144          36 :     AH->lo_buf_size = LOBBUFSIZE;
     145          36 :     AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
     146             : 
     147             :     /*
     148             :      * Now open the file
     149             :      */
     150          36 :     if (AH->mode == archModeWrite)
     151             :     {
     152          18 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     153             :         {
     154          18 :             AH->FH = fopen(AH->fSpec, PG_BINARY_W);
     155          18 :             if (!AH->FH)
     156           0 :                 fatal("could not open output file \"%s\": %m", AH->fSpec);
     157             :         }
     158             :         else
     159             :         {
     160           0 :             AH->FH = stdout;
     161           0 :             if (!AH->FH)
     162           0 :                 fatal("could not open output file: %m");
     163             :         }
     164             : 
     165          18 :         ctx->hasSeek = checkSeek(AH->FH);
     166             :     }
     167             :     else
     168             :     {
     169          18 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     170             :         {
     171          18 :             AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     172          18 :             if (!AH->FH)
     173           0 :                 fatal("could not open input file \"%s\": %m", AH->fSpec);
     174             :         }
     175             :         else
     176             :         {
     177           0 :             AH->FH = stdin;
     178           0 :             if (!AH->FH)
     179           0 :                 fatal("could not open input file: %m");
     180             :         }
     181             : 
     182          18 :         ctx->hasSeek = checkSeek(AH->FH);
     183             : 
     184          18 :         ReadHead(AH);
     185          18 :         ReadToc(AH);
     186             : 
     187             :         /*
     188             :          * Remember location of first data block (i.e., the point after TOC)
     189             :          * in case we have to search for desired data blocks.
     190             :          */
     191          18 :         ctx->lastFilePos = _getFilePos(AH, ctx);
     192             :     }
     193          36 : }
     194             : 
     195             : /*
     196             :  * Called by the Archiver when the dumper creates a new TOC entry.
     197             :  *
     198             :  * Optional.
     199             :  *
     200             :  * Set up extract format-related TOC data.
     201             : */
     202             : static void
     203        4602 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
     204             : {
     205             :     lclTocEntry *ctx;
     206             : 
     207        4602 :     ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     208        4602 :     if (te->dataDumper)
     209          56 :         ctx->dataState = K_OFFSET_POS_NOT_SET;
     210             :     else
     211        4546 :         ctx->dataState = K_OFFSET_NO_DATA;
     212             : 
     213        4602 :     te->formatData = (void *) ctx;
     214        4602 : }
     215             : 
     216             : /*
     217             :  * Called by the Archiver to save any extra format-related TOC entry
     218             :  * data.
     219             :  *
     220             :  * Optional.
     221             :  *
     222             :  * Use the Archiver routines to write data - they are non-endian, and
     223             :  * maintain other important file information.
     224             :  */
     225             : static void
     226        9196 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
     227             : {
     228        9196 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     229             : 
     230        9196 :     WriteOffset(AH, ctx->dataPos, ctx->dataState);
     231        9196 : }
     232             : 
     233             : /*
     234             :  * Called by the Archiver to read any extra format-related TOC data.
     235             :  *
     236             :  * Optional.
     237             :  *
     238             :  * Needs to match the order defined in _WriteExtraToc, and should also
     239             :  * use the Archiver input routines.
     240             :  */
     241             : static void
     242        4598 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
     243             : {
     244        4598 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     245             : 
     246        4598 :     if (ctx == NULL)
     247             :     {
     248        4598 :         ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     249        4598 :         te->formatData = (void *) ctx;
     250             :     }
     251             : 
     252        4598 :     ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
     253             : 
     254             :     /*
     255             :      * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
     256             :      * dump it at all.
     257             :      */
     258        4598 :     if (AH->version < K_VERS_1_7)
     259           0 :         ReadInt(AH);
     260        4598 : }
     261             : 
     262             : /*
     263             :  * Called by the Archiver when restoring an archive to output a comment
     264             :  * that includes useful information about the TOC entry.
     265             :  *
     266             :  * Optional.
     267             :  */
     268             : static void
     269         936 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
     270             : {
     271         936 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     272             : 
     273         936 :     if (AH->public.verbose)
     274         424 :         ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
     275         424 :                  (int64) ctx->dataPos);
     276         936 : }
     277             : 
     278             : /*
     279             :  * Called by the archiver when saving TABLE DATA (not schema). This routine
     280             :  * should save whatever format-specific information is needed to read
     281             :  * the archive back.
     282             :  *
     283             :  * It is called just prior to the dumper's 'DataDumper' routine being called.
     284             :  *
     285             :  * Optional, but strongly recommended.
     286             :  *
     287             :  */
     288             : static void
     289          50 : _StartData(ArchiveHandle *AH, TocEntry *te)
     290             : {
     291          50 :     lclContext *ctx = (lclContext *) AH->formatData;
     292          50 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     293             : 
     294          50 :     tctx->dataPos = _getFilePos(AH, ctx);
     295          50 :     if (tctx->dataPos >= 0)
     296          50 :         tctx->dataState = K_OFFSET_POS_SET;
     297             : 
     298          50 :     _WriteByte(AH, BLK_DATA);   /* Block type */
     299          50 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     300             : 
     301          50 :     ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
     302          50 : }
     303             : 
     304             : /*
     305             :  * Called by archiver when dumper calls WriteData. This routine is
     306             :  * called for both BLOB and TABLE data; it is the responsibility of
     307             :  * the format to manage each kind of data using StartBlob/StartData.
     308             :  *
     309             :  * It should only be called from within a DataDumper routine.
     310             :  *
     311             :  * Mandatory.
     312             :  */
     313             : static void
     314         110 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
     315             : {
     316         110 :     lclContext *ctx = (lclContext *) AH->formatData;
     317         110 :     CompressorState *cs = ctx->cs;
     318             : 
     319         110 :     if (dLen > 0)
     320             :         /* WriteDataToArchive() internally throws write errors */
     321         108 :         WriteDataToArchive(AH, cs, data, dLen);
     322         110 : }
     323             : 
     324             : /*
     325             :  * Called by the archiver when a dumper's 'DataDumper' routine has
     326             :  * finished.
     327             :  *
     328             :  * Optional.
     329             :  */
     330             : static void
     331          50 : _EndData(ArchiveHandle *AH, TocEntry *te)
     332             : {
     333          50 :     lclContext *ctx = (lclContext *) AH->formatData;
     334             : 
     335          50 :     EndCompressor(AH, ctx->cs);
     336             :     /* Send the end marker */
     337          50 :     WriteInt(AH, 0);
     338          50 : }
     339             : 
     340             : /*
     341             :  * Called by the archiver when starting to save all BLOB DATA (not schema).
     342             :  * This routine should save whatever format-specific information is needed
     343             :  * to read the BLOBs back into memory.
     344             :  *
     345             :  * It is called just prior to the dumper's DataDumper routine.
     346             :  *
     347             :  * Optional, but strongly recommended.
     348             :  */
     349             : static void
     350           2 : _StartBlobs(ArchiveHandle *AH, TocEntry *te)
     351             : {
     352           2 :     lclContext *ctx = (lclContext *) AH->formatData;
     353           2 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     354             : 
     355           2 :     tctx->dataPos = _getFilePos(AH, ctx);
     356           2 :     if (tctx->dataPos >= 0)
     357           2 :         tctx->dataState = K_OFFSET_POS_SET;
     358             : 
     359           2 :     _WriteByte(AH, BLK_BLOBS);  /* Block type */
     360           2 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     361           2 : }
     362             : 
     363             : /*
     364             :  * Called by the archiver when the dumper calls StartBlob.
     365             :  *
     366             :  * Mandatory.
     367             :  *
     368             :  * Must save the passed OID for retrieval at restore-time.
     369             :  */
     370             : static void
     371           2 : _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
     372             : {
     373           2 :     lclContext *ctx = (lclContext *) AH->formatData;
     374             : 
     375           2 :     if (oid == 0)
     376           0 :         fatal("invalid OID for large object");
     377             : 
     378           2 :     WriteInt(AH, oid);
     379             : 
     380           2 :     ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
     381           2 : }
     382             : 
     383             : /*
     384             :  * Called by the archiver when the dumper calls EndBlob.
     385             :  *
     386             :  * Optional.
     387             :  */
     388             : static void
     389           2 : _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
     390             : {
     391           2 :     lclContext *ctx = (lclContext *) AH->formatData;
     392             : 
     393           2 :     EndCompressor(AH, ctx->cs);
     394             :     /* Send the end marker */
     395           2 :     WriteInt(AH, 0);
     396           2 : }
     397             : 
     398             : /*
     399             :  * Called by the archiver when finishing saving all BLOB DATA.
     400             :  *
     401             :  * Optional.
     402             :  */
     403             : static void
     404           2 : _EndBlobs(ArchiveHandle *AH, TocEntry *te)
     405             : {
     406             :     /* Write out a fake zero OID to mark end-of-blobs. */
     407           2 :     WriteInt(AH, 0);
     408           2 : }
     409             : 
     410             : /*
     411             :  * Print data for a given TOC entry
     412             :  */
     413             : static void
     414          52 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
     415             : {
     416          52 :     lclContext *ctx = (lclContext *) AH->formatData;
     417          52 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     418             :     int         blkType;
     419             :     int         id;
     420             : 
     421          52 :     if (tctx->dataState == K_OFFSET_NO_DATA)
     422           0 :         return;
     423             : 
     424          52 :     if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
     425             :     {
     426             :         /*
     427             :          * We cannot seek directly to the desired block.  Instead, skip over
     428             :          * block headers until we find the one we want.  Remember the
     429             :          * positions of skipped-over blocks, so that if we later decide we
     430             :          * need to read one, we'll be able to seek to it.
     431             :          *
     432             :          * When our input file is seekable, we can do the search starting from
     433             :          * the point after the last data block we scanned in previous
     434             :          * iterations of this function.
     435             :          */
     436           0 :         if (ctx->hasSeek)
     437             :         {
     438           0 :             if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
     439           0 :                 fatal("error during file seek: %m");
     440             :         }
     441             : 
     442             :         for (;;)
     443           0 :         {
     444           0 :             pgoff_t     thisBlkPos = _getFilePos(AH, ctx);
     445             : 
     446           0 :             _readBlockHeader(AH, &blkType, &id);
     447             : 
     448           0 :             if (blkType == EOF || id == te->dumpId)
     449             :                 break;
     450             : 
     451             :             /* Remember the block position, if we got one */
     452           0 :             if (thisBlkPos >= 0)
     453             :             {
     454           0 :                 TocEntry   *otherte = getTocEntryByDumpId(AH, id);
     455             : 
     456           0 :                 if (otherte && otherte->formatData)
     457             :                 {
     458           0 :                     lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
     459             : 
     460             :                     /*
     461             :                      * Note: on Windows, multiple threads might access/update
     462             :                      * the same lclTocEntry concurrently, but that should be
     463             :                      * safe as long as we update dataPos before dataState.
     464             :                      * Ideally, we'd use pg_write_barrier() to enforce that,
     465             :                      * but the needed infrastructure doesn't exist in frontend
     466             :                      * code.  But Windows only runs on machines with strong
     467             :                      * store ordering, so it should be okay for now.
     468             :                      */
     469           0 :                     if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
     470             :                     {
     471           0 :                         othertctx->dataPos = thisBlkPos;
     472           0 :                         othertctx->dataState = K_OFFSET_POS_SET;
     473             :                     }
     474           0 :                     else if (othertctx->dataPos != thisBlkPos ||
     475           0 :                              othertctx->dataState != K_OFFSET_POS_SET)
     476             :                     {
     477             :                         /* sanity check */
     478           0 :                         pg_log_warning("data block %d has wrong seek position",
     479             :                                        id);
     480             :                     }
     481             :                 }
     482             :             }
     483             : 
     484           0 :             switch (blkType)
     485             :             {
     486           0 :                 case BLK_DATA:
     487           0 :                     _skipData(AH);
     488           0 :                     break;
     489             : 
     490           0 :                 case BLK_BLOBS:
     491           0 :                     _skipBlobs(AH);
     492           0 :                     break;
     493             : 
     494           0 :                 default:        /* Always have a default */
     495           0 :                     fatal("unrecognized data block type (%d) while searching archive",
     496             :                           blkType);
     497             :                     break;
     498             :             }
     499             :         }
     500             :     }
     501             :     else
     502             :     {
     503             :         /* We can just seek to the place we need to be. */
     504          52 :         if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
     505           0 :             fatal("error during file seek: %m");
     506             : 
     507          52 :         _readBlockHeader(AH, &blkType, &id);
     508             :     }
     509             : 
     510             :     /*
     511             :      * If we reached EOF without finding the block we want, then either it
     512             :      * doesn't exist, or it does but we lack the ability to seek back to it.
     513             :      */
     514          52 :     if (blkType == EOF)
     515             :     {
     516           0 :         if (!ctx->hasSeek)
     517           0 :             fatal("could not find block ID %d in archive -- "
     518             :                   "possibly due to out-of-order restore request, "
     519             :                   "which cannot be handled due to non-seekable input file",
     520             :                   te->dumpId);
     521             :         else
     522           0 :             fatal("could not find block ID %d in archive -- "
     523             :                   "possibly corrupt archive",
     524             :                   te->dumpId);
     525             :     }
     526             : 
     527             :     /* Are we sane? */
     528          52 :     if (id != te->dumpId)
     529           0 :         fatal("found unexpected block ID (%d) when reading data -- expected %d",
     530             :               id, te->dumpId);
     531             : 
     532          52 :     switch (blkType)
     533             :     {
     534          50 :         case BLK_DATA:
     535          50 :             _PrintData(AH);
     536          50 :             break;
     537             : 
     538           2 :         case BLK_BLOBS:
     539           2 :             _LoadBlobs(AH, AH->public.ropt->dropSchema);
     540           2 :             break;
     541             : 
     542           0 :         default:                /* Always have a default */
     543           0 :             fatal("unrecognized data block type %d while restoring archive",
     544             :                   blkType);
     545             :             break;
     546             :     }
     547             : 
     548             :     /*
     549             :      * If our input file is seekable but lacks data offsets, update our
     550             :      * knowledge of where to start future searches from.  (Note that we did
     551             :      * not update the current TE's dataState/dataPos.  We could have, but
     552             :      * there is no point since it will not be visited again.)
     553             :      */
     554          52 :     if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
     555             :     {
     556           0 :         pgoff_t     curPos = _getFilePos(AH, ctx);
     557             : 
     558           0 :         if (curPos > ctx->lastFilePos)
     559           0 :             ctx->lastFilePos = curPos;
     560             :     }
     561             : }
     562             : 
     563             : /*
     564             :  * Print data from current file position.
     565             : */
     566             : static void
     567          52 : _PrintData(ArchiveHandle *AH)
     568             : {
     569          52 :     ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
     570          52 : }
     571             : 
     572             : static void
     573           2 : _LoadBlobs(ArchiveHandle *AH, bool drop)
     574             : {
     575             :     Oid         oid;
     576             : 
     577           2 :     StartRestoreBlobs(AH);
     578             : 
     579           2 :     oid = ReadInt(AH);
     580           4 :     while (oid != 0)
     581             :     {
     582           2 :         StartRestoreBlob(AH, oid, drop);
     583           2 :         _PrintData(AH);
     584           2 :         EndRestoreBlob(AH, oid);
     585           2 :         oid = ReadInt(AH);
     586             :     }
     587             : 
     588           2 :     EndRestoreBlobs(AH);
     589           2 : }
     590             : 
     591             : /*
     592             :  * Skip the BLOBs from the current file position.
     593             :  * BLOBS are written sequentially as data blocks (see below).
     594             :  * Each BLOB is preceded by its original OID.
     595             :  * A zero OID indicates the end of the BLOBS.
     596             :  */
     597             : static void
     598           0 : _skipBlobs(ArchiveHandle *AH)
     599             : {
     600             :     Oid         oid;
     601             : 
     602           0 :     oid = ReadInt(AH);
     603           0 :     while (oid != 0)
     604             :     {
     605           0 :         _skipData(AH);
     606           0 :         oid = ReadInt(AH);
     607             :     }
     608           0 : }
     609             : 
     610             : /*
     611             :  * Skip data from current file position.
     612             :  * Data blocks are formatted as an integer length, followed by data.
     613             :  * A zero length indicates the end of the block.
     614             : */
     615             : static void
     616           0 : _skipData(ArchiveHandle *AH)
     617             : {
     618           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     619             :     size_t      blkLen;
     620           0 :     char       *buf = NULL;
     621           0 :     int         buflen = 0;
     622             : 
     623           0 :     blkLen = ReadInt(AH);
     624           0 :     while (blkLen != 0)
     625             :     {
     626           0 :         if (ctx->hasSeek)
     627             :         {
     628           0 :             if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
     629           0 :                 fatal("error during file seek: %m");
     630             :         }
     631             :         else
     632             :         {
     633           0 :             if (blkLen > buflen)
     634             :             {
     635           0 :                 if (buf)
     636           0 :                     free(buf);
     637           0 :                 buf = (char *) pg_malloc(blkLen);
     638           0 :                 buflen = blkLen;
     639             :             }
     640           0 :             if (fread(buf, 1, blkLen, AH->FH) != blkLen)
     641             :             {
     642           0 :                 if (feof(AH->FH))
     643           0 :                     fatal("could not read from input file: end of file");
     644             :                 else
     645           0 :                     fatal("could not read from input file: %m");
     646             :             }
     647             :         }
     648             : 
     649           0 :         blkLen = ReadInt(AH);
     650             :     }
     651             : 
     652           0 :     if (buf)
     653           0 :         free(buf);
     654           0 : }
     655             : 
     656             : /*
     657             :  * Write a byte of data to the archive.
     658             :  *
     659             :  * Mandatory.
     660             :  *
     661             :  * Called by the archiver to do integer & byte output to the archive.
     662             :  */
     663             : static int
     664      882714 : _WriteByte(ArchiveHandle *AH, const int i)
     665             : {
     666      882714 :     if (fputc(i, AH->FH) == EOF)
     667           0 :         WRITE_ERROR_EXIT;
     668             : 
     669      882714 :     return 1;
     670             : }
     671             : 
     672             : /*
     673             :  * Read a byte of data from the archive.
     674             :  *
     675             :  * Mandatory
     676             :  *
     677             :  * Called by the archiver to read bytes & integers from the archive.
     678             :  * EOF should be treated as a fatal error.
     679             :  */
     680             : static int
     681      442280 : _ReadByte(ArchiveHandle *AH)
     682             : {
     683             :     int         res;
     684             : 
     685      442280 :     res = getc(AH->FH);
     686      442280 :     if (res == EOF)
     687           0 :         READ_ERROR_EXIT(AH->FH);
     688      442280 :     return res;
     689             : }
     690             : 
     691             : /*
     692             :  * Write a buffer of data to the archive.
     693             :  *
     694             :  * Mandatory.
     695             :  *
     696             :  * Called by the archiver to write a block of bytes to the archive.
     697             :  */
     698             : static void
     699       98300 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
     700             : {
     701       98300 :     if (fwrite(buf, 1, len, AH->FH) != len)
     702           0 :         WRITE_ERROR_EXIT;
     703       98300 : }
     704             : 
     705             : /*
     706             :  * Read a block of bytes from the archive.
     707             :  *
     708             :  * Mandatory.
     709             :  *
     710             :  * Called by the archiver to read a block of bytes from the archive
     711             :  */
     712             : static void
     713       49212 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
     714             : {
     715       49212 :     if (fread(buf, 1, len, AH->FH) != len)
     716           0 :         READ_ERROR_EXIT(AH->FH);
     717       49212 : }
     718             : 
     719             : /*
     720             :  * Close the archive.
     721             :  *
     722             :  * Mandatory.
     723             :  *
     724             :  * When writing the archive, this is the routine that actually starts
     725             :  * the process of saving it to files. No data should be written prior
     726             :  * to this point, since the user could sort the TOC after creating it.
     727             :  *
     728             :  * If an archive is to be written, this routine must call:
     729             :  *      WriteHead           to save the archive header
     730             :  *      WriteToc            to save the TOC entries
     731             :  *      WriteDataChunks     to save all DATA & BLOBs.
     732             :  *
     733             :  */
     734             : static void
     735          36 : _CloseArchive(ArchiveHandle *AH)
     736             : {
     737          36 :     lclContext *ctx = (lclContext *) AH->formatData;
     738             :     pgoff_t     tpos;
     739             : 
     740          36 :     if (AH->mode == archModeWrite)
     741             :     {
     742          18 :         WriteHead(AH);
     743             :         /* Remember TOC's seek position for use below */
     744          18 :         tpos = ftello(AH->FH);
     745          18 :         if (tpos < 0 && ctx->hasSeek)
     746           0 :             fatal("could not determine seek position in archive file: %m");
     747          18 :         WriteToc(AH);
     748          18 :         WriteDataChunks(AH, NULL);
     749             : 
     750             :         /*
     751             :          * If possible, re-write the TOC in order to update the data offset
     752             :          * information.  This is not essential, as pg_restore can cope in most
     753             :          * cases without it; but it can make pg_restore significantly faster
     754             :          * in some situations (especially parallel restore).
     755             :          */
     756          36 :         if (ctx->hasSeek &&
     757          18 :             fseeko(AH->FH, tpos, SEEK_SET) == 0)
     758          18 :             WriteToc(AH);
     759             :     }
     760             : 
     761          36 :     if (fclose(AH->FH) != 0)
     762           0 :         fatal("could not close archive file: %m");
     763             : 
     764             :     /* Sync the output file if one is defined */
     765          36 :     if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
     766          14 :         (void) fsync_fname(AH->fSpec, false);
     767             : 
     768          36 :     AH->FH = NULL;
     769          36 : }
     770             : 
     771             : /*
     772             :  * Reopen the archive's file handle.
     773             :  *
     774             :  * We close the original file handle, except on Windows.  (The difference
     775             :  * is because on Windows, this is used within a multithreading context,
     776             :  * and we don't want a thread closing the parent file handle.)
     777             :  */
     778             : static void
     779           0 : _ReopenArchive(ArchiveHandle *AH)
     780             : {
     781           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     782             :     pgoff_t     tpos;
     783             : 
     784           0 :     if (AH->mode == archModeWrite)
     785           0 :         fatal("can only reopen input archives");
     786             : 
     787             :     /*
     788             :      * These two cases are user-facing errors since they represent unsupported
     789             :      * (but not invalid) use-cases.  Word the error messages appropriately.
     790             :      */
     791           0 :     if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
     792           0 :         fatal("parallel restore from standard input is not supported");
     793           0 :     if (!ctx->hasSeek)
     794           0 :         fatal("parallel restore from non-seekable file is not supported");
     795             : 
     796           0 :     tpos = ftello(AH->FH);
     797           0 :     if (tpos < 0)
     798           0 :         fatal("could not determine seek position in archive file: %m");
     799             : 
     800             : #ifndef WIN32
     801           0 :     if (fclose(AH->FH) != 0)
     802           0 :         fatal("could not close archive file: %m");
     803             : #endif
     804             : 
     805           0 :     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     806           0 :     if (!AH->FH)
     807           0 :         fatal("could not open input file \"%s\": %m", AH->fSpec);
     808             : 
     809           0 :     if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
     810           0 :         fatal("could not set seek position in archive file: %m");
     811           0 : }
     812             : 
     813             : /*
     814             :  * Prepare for parallel restore.
     815             :  *
     816             :  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
     817             :  * TOC entries' dataLength fields with appropriate values to guide the
     818             :  * ordering of restore jobs.  The source of said data is format-dependent,
     819             :  * as is the exact meaning of the values.
     820             :  *
     821             :  * A format module might also choose to do other setup here.
     822             :  */
     823             : static void
     824           0 : _PrepParallelRestore(ArchiveHandle *AH)
     825             : {
     826           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     827           0 :     TocEntry   *prev_te = NULL;
     828           0 :     lclTocEntry *prev_tctx = NULL;
     829             :     TocEntry   *te;
     830             : 
     831             :     /*
     832             :      * Knowing that the data items were dumped out in TOC order, we can
     833             :      * reconstruct the length of each item as the delta to the start offset of
     834             :      * the next data item.
     835             :      */
     836           0 :     for (te = AH->toc->next; te != AH->toc; te = te->next)
     837             :     {
     838           0 :         lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     839             : 
     840             :         /*
     841             :          * Ignore entries without a known data offset; if we were unable to
     842             :          * seek to rewrite the TOC when creating the archive, this'll be all
     843             :          * of them, and we'll end up with no size estimates.
     844             :          */
     845           0 :         if (tctx->dataState != K_OFFSET_POS_SET)
     846           0 :             continue;
     847             : 
     848             :         /* Compute previous data item's length */
     849           0 :         if (prev_te)
     850             :         {
     851           0 :             if (tctx->dataPos > prev_tctx->dataPos)
     852           0 :                 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
     853             :         }
     854             : 
     855           0 :         prev_te = te;
     856           0 :         prev_tctx = tctx;
     857             :     }
     858             : 
     859             :     /* If OK to seek, we can determine the length of the last item */
     860           0 :     if (prev_te && ctx->hasSeek)
     861             :     {
     862             :         pgoff_t     endpos;
     863             : 
     864           0 :         if (fseeko(AH->FH, 0, SEEK_END) != 0)
     865           0 :             fatal("error during file seek: %m");
     866           0 :         endpos = ftello(AH->FH);
     867           0 :         if (endpos > prev_tctx->dataPos)
     868           0 :             prev_te->dataLength = endpos - prev_tctx->dataPos;
     869             :     }
     870           0 : }
     871             : 
     872             : /*
     873             :  * Clone format-specific fields during parallel restoration.
     874             :  */
     875             : static void
     876           0 : _Clone(ArchiveHandle *AH)
     877             : {
     878           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     879             : 
     880             :     /*
     881             :      * Each thread must have private lclContext working state.
     882             :      */
     883           0 :     AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
     884           0 :     memcpy(AH->formatData, ctx, sizeof(lclContext));
     885           0 :     ctx = (lclContext *) AH->formatData;
     886             : 
     887             :     /* sanity check, shouldn't happen */
     888           0 :     if (ctx->cs != NULL)
     889           0 :         fatal("compressor active");
     890             : 
     891             :     /*
     892             :      * We intentionally do not clone TOC-entry-local state: it's useful to
     893             :      * share knowledge about where the data blocks are across threads.
     894             :      * _PrintTocData has to be careful about the order of operations on that
     895             :      * state, though.
     896             :      *
     897             :      * Note: we do not make a local lo_buf because we expect at most one BLOBS
     898             :      * entry per archive, so no parallelism is possible.
     899             :      */
     900           0 : }
     901             : 
     902             : static void
     903           0 : _DeClone(ArchiveHandle *AH)
     904             : {
     905           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     906             : 
     907           0 :     free(ctx);
     908           0 : }
     909             : 
     910             : /*
     911             :  * This function is executed in the child of a parallel restore from a
     912             :  * custom-format archive and restores the actual data for one TOC entry.
     913             :  */
     914             : static int
     915           0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
     916             : {
     917           0 :     return parallel_restore(AH, te);
     918             : }
     919             : 
     920             : /*--------------------------------------------------
     921             :  * END OF FORMAT CALLBACKS
     922             :  *--------------------------------------------------
     923             :  */
     924             : 
     925             : /*
     926             :  * Get the current position in the archive file.
     927             :  *
     928             :  * With a non-seekable archive file, we may not be able to obtain the
     929             :  * file position.  If so, just return -1.  It's not too important in
     930             :  * that case because we won't be able to rewrite the TOC to fill in
     931             :  * data block offsets anyway.
     932             :  */
     933             : static pgoff_t
     934          70 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
     935             : {
     936             :     pgoff_t     pos;
     937             : 
     938          70 :     pos = ftello(AH->FH);
     939          70 :     if (pos < 0)
     940             :     {
     941             :         /* Not expected if we found we can seek. */
     942           0 :         if (ctx->hasSeek)
     943           0 :             fatal("could not determine seek position in archive file: %m");
     944             :     }
     945          70 :     return pos;
     946             : }
     947             : 
     948             : /*
     949             :  * Read a data block header. The format changed in V1.3, so we
     950             :  * centralize the code here for simplicity.  Returns *type = EOF
     951             :  * if at EOF.
     952             :  */
     953             : static void
     954          52 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
     955             : {
     956             :     int         byt;
     957             : 
     958             :     /*
     959             :      * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
     960             :      * ReadInt rather than returning EOF.  It doesn't seem worth jumping
     961             :      * through hoops to deal with that case better, because no such files are
     962             :      * likely to exist in the wild: only some 7.1 development versions of
     963             :      * pg_dump ever generated such files.
     964             :      */
     965          52 :     if (AH->version < K_VERS_1_3)
     966           0 :         *type = BLK_DATA;
     967             :     else
     968             :     {
     969          52 :         byt = getc(AH->FH);
     970          52 :         *type = byt;
     971          52 :         if (byt == EOF)
     972             :         {
     973           0 :             *id = 0;            /* don't return an uninitialized value */
     974           0 :             return;
     975             :         }
     976             :     }
     977             : 
     978          52 :     *id = ReadInt(AH);
     979             : }
     980             : 
     981             : /*
     982             :  * Callback function for WriteDataToArchive. Writes one block of (compressed)
     983             :  * data to the archive.
     984             :  */
     985             : static void
     986          52 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
     987             : {
     988             :     /* never write 0-byte blocks (this should not happen) */
     989          52 :     if (len > 0)
     990             :     {
     991          52 :         WriteInt(AH, len);
     992          52 :         _WriteBuf(AH, buf, len);
     993             :     }
     994          52 : }
     995             : 
     996             : /*
     997             :  * Callback function for ReadDataFromArchive. To keep things simple, we
     998             :  * always read one compressed block at a time.
     999             :  */
    1000             : static size_t
    1001         104 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
    1002             : {
    1003             :     size_t      blkLen;
    1004             : 
    1005             :     /* Read length */
    1006         104 :     blkLen = ReadInt(AH);
    1007         104 :     if (blkLen == 0)
    1008          52 :         return 0;
    1009             : 
    1010             :     /* If the caller's buffer is not large enough, allocate a bigger one */
    1011          52 :     if (blkLen > *buflen)
    1012             :     {
    1013           0 :         free(*buf);
    1014           0 :         *buf = (char *) pg_malloc(blkLen);
    1015           0 :         *buflen = blkLen;
    1016             :     }
    1017             : 
    1018             :     /* exits app on read errors */
    1019          52 :     _ReadBuf(AH, *buf, blkLen);
    1020             : 
    1021          52 :     return blkLen;
    1022             : }

Generated by: LCOV version 1.14