LCOV - code coverage report
Current view: top level - src/bin/pg_dump - pg_backup_custom.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 195 332 58.7 %
Date: 2024-11-21 08:14:44 Functions: 24 31 77.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_backup_custom.c
       4             :  *
       5             :  *  Implements the custom output format.
       6             :  *
       7             :  *  The comments with the routines in this code are a good place to
       8             :  *  understand how to write a new format.
       9             :  *
      10             :  *  See the headers to pg_restore for more details.
      11             :  *
      12             :  * Copyright (c) 2000, Philip Warner
      13             :  *      Rights are granted to use this software in any way so long
      14             :  *      as this notice is not removed.
      15             :  *
      16             :  *  The author is not responsible for loss or damages that may
      17             :  *  and any liability will be limited to the time taken to fix any
      18             :  *  related bug.
      19             :  *
      20             :  *
      21             :  * IDENTIFICATION
      22             :  *      src/bin/pg_dump/pg_backup_custom.c
      23             :  *
      24             :  *-------------------------------------------------------------------------
      25             :  */
      26             : #include "postgres_fe.h"
      27             : 
      28             : #include "common/file_utils.h"
      29             : #include "compress_io.h"
      30             : #include "pg_backup_utils.h"
      31             : 
      32             : /*--------
      33             :  * Routines in the format interface
      34             :  *--------
      35             :  */
      36             : 
      37             : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
      38             : static void _StartData(ArchiveHandle *AH, TocEntry *te);
      39             : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
      40             : static void _EndData(ArchiveHandle *AH, TocEntry *te);
      41             : static int  _WriteByte(ArchiveHandle *AH, const int i);
      42             : static int  _ReadByte(ArchiveHandle *AH);
      43             : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
      44             : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
      45             : static void _CloseArchive(ArchiveHandle *AH);
      46             : static void _ReopenArchive(ArchiveHandle *AH);
      47             : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
      48             : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
      49             : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
      50             : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
      51             : 
      52             : static void _PrintData(ArchiveHandle *AH);
      53             : static void _skipData(ArchiveHandle *AH);
      54             : static void _skipLOs(ArchiveHandle *AH);
      55             : 
      56             : static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
      57             : static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
      58             : static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
      59             : static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
      60             : static void _LoadLOs(ArchiveHandle *AH, bool drop);
      61             : 
      62             : static void _PrepParallelRestore(ArchiveHandle *AH);
      63             : static void _Clone(ArchiveHandle *AH);
      64             : static void _DeClone(ArchiveHandle *AH);
      65             : 
      66             : static int  _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
      67             : 
      68             : typedef struct
      69             : {
      70             :     CompressorState *cs;
      71             :     int         hasSeek;
      72             :     /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
      73             :     pgoff_t     lastFilePos;    /* position after last data block we've read */
      74             : } lclContext;
      75             : 
      76             : typedef struct
      77             : {
      78             :     int         dataState;
      79             :     pgoff_t     dataPos;        /* valid only if dataState=K_OFFSET_POS_SET */
      80             : } lclTocEntry;
      81             : 
      82             : 
      83             : /*------
      84             :  * Static declarations
      85             :  *------
      86             :  */
      87             : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
      88             : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
      89             : 
      90             : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
      91             : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
      92             : 
      93             : 
      94             : /*
      95             :  *  Init routine required by ALL formats. This is a global routine
      96             :  *  and should be declared in pg_backup_archiver.h
      97             :  *
      98             :  *  It's task is to create any extra archive context (using AH->formatData),
      99             :  *  and to initialize the supported function pointers.
     100             :  *
     101             :  *  It should also prepare whatever its input source is for reading/writing,
     102             :  *  and in the case of a read mode connection, it should load the Header & TOC.
     103             :  */
     104             : void
     105          86 : InitArchiveFmt_Custom(ArchiveHandle *AH)
     106             : {
     107             :     lclContext *ctx;
     108             : 
     109             :     /* Assuming static functions, this can be copied for each format. */
     110          86 :     AH->ArchiveEntryPtr = _ArchiveEntry;
     111          86 :     AH->StartDataPtr = _StartData;
     112          86 :     AH->WriteDataPtr = _WriteData;
     113          86 :     AH->EndDataPtr = _EndData;
     114          86 :     AH->WriteBytePtr = _WriteByte;
     115          86 :     AH->ReadBytePtr = _ReadByte;
     116          86 :     AH->WriteBufPtr = _WriteBuf;
     117          86 :     AH->ReadBufPtr = _ReadBuf;
     118          86 :     AH->ClosePtr = _CloseArchive;
     119          86 :     AH->ReopenPtr = _ReopenArchive;
     120          86 :     AH->PrintTocDataPtr = _PrintTocData;
     121          86 :     AH->ReadExtraTocPtr = _ReadExtraToc;
     122          86 :     AH->WriteExtraTocPtr = _WriteExtraToc;
     123          86 :     AH->PrintExtraTocPtr = _PrintExtraToc;
     124             : 
     125          86 :     AH->StartLOsPtr = _StartLOs;
     126          86 :     AH->StartLOPtr = _StartLO;
     127          86 :     AH->EndLOPtr = _EndLO;
     128          86 :     AH->EndLOsPtr = _EndLOs;
     129             : 
     130          86 :     AH->PrepParallelRestorePtr = _PrepParallelRestore;
     131          86 :     AH->ClonePtr = _Clone;
     132          86 :     AH->DeClonePtr = _DeClone;
     133             : 
     134             :     /* no parallel dump in the custom archive, only parallel restore */
     135          86 :     AH->WorkerJobDumpPtr = NULL;
     136          86 :     AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
     137             : 
     138             :     /* Set up a private area. */
     139          86 :     ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
     140          86 :     AH->formatData = (void *) ctx;
     141             : 
     142             :     /*
     143             :      * Now open the file
     144             :      */
     145          86 :     if (AH->mode == archModeWrite)
     146             :     {
     147          38 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     148             :         {
     149          38 :             AH->FH = fopen(AH->fSpec, PG_BINARY_W);
     150          38 :             if (!AH->FH)
     151           0 :                 pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
     152             :         }
     153             :         else
     154             :         {
     155           0 :             AH->FH = stdout;
     156           0 :             if (!AH->FH)
     157           0 :                 pg_fatal("could not open output file: %m");
     158             :         }
     159             : 
     160          38 :         ctx->hasSeek = checkSeek(AH->FH);
     161             :     }
     162             :     else
     163             :     {
     164          48 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     165             :         {
     166          48 :             AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     167          48 :             if (!AH->FH)
     168           0 :                 pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
     169             :         }
     170             :         else
     171             :         {
     172           0 :             AH->FH = stdin;
     173           0 :             if (!AH->FH)
     174           0 :                 pg_fatal("could not open input file: %m");
     175             :         }
     176             : 
     177          48 :         ctx->hasSeek = checkSeek(AH->FH);
     178             : 
     179          48 :         ReadHead(AH);
     180          48 :         ReadToc(AH);
     181             : 
     182             :         /*
     183             :          * Remember location of first data block (i.e., the point after TOC)
     184             :          * in case we have to search for desired data blocks.
     185             :          */
     186          48 :         ctx->lastFilePos = _getFilePos(AH, ctx);
     187             :     }
     188          86 : }
     189             : 
     190             : /*
     191             :  * Called by the Archiver when the dumper creates a new TOC entry.
     192             :  *
     193             :  * Optional.
     194             :  *
     195             :  * Set up extract format-related TOC data.
     196             : */
     197             : static void
     198        6404 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
     199             : {
     200             :     lclTocEntry *ctx;
     201             : 
     202        6404 :     ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     203        6404 :     if (te->dataDumper)
     204         228 :         ctx->dataState = K_OFFSET_POS_NOT_SET;
     205             :     else
     206        6176 :         ctx->dataState = K_OFFSET_NO_DATA;
     207             : 
     208        6404 :     te->formatData = (void *) ctx;
     209        6404 : }
     210             : 
     211             : /*
     212             :  * Called by the Archiver to save any extra format-related TOC entry
     213             :  * data.
     214             :  *
     215             :  * Optional.
     216             :  *
     217             :  * Use the Archiver routines to write data - they are non-endian, and
     218             :  * maintain other important file information.
     219             :  */
     220             : static void
     221       12796 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
     222             : {
     223       12796 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     224             : 
     225       12796 :     WriteOffset(AH, ctx->dataPos, ctx->dataState);
     226       12796 : }
     227             : 
     228             : /*
     229             :  * Called by the Archiver to read any extra format-related TOC data.
     230             :  *
     231             :  * Optional.
     232             :  *
     233             :  * Needs to match the order defined in _WriteExtraToc, and should also
     234             :  * use the Archiver input routines.
     235             :  */
     236             : static void
     237        8266 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
     238             : {
     239        8266 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     240             : 
     241        8266 :     if (ctx == NULL)
     242             :     {
     243        8266 :         ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     244        8266 :         te->formatData = (void *) ctx;
     245             :     }
     246             : 
     247        8266 :     ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
     248             : 
     249             :     /*
     250             :      * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
     251             :      * dump it at all.
     252             :      */
     253        8266 :     if (AH->version < K_VERS_1_7)
     254           0 :         ReadInt(AH);
     255        8266 : }
     256             : 
     257             : /*
     258             :  * Called by the Archiver when restoring an archive to output a comment
     259             :  * that includes useful information about the TOC entry.
     260             :  *
     261             :  * Optional.
     262             :  */
     263             : static void
     264        2248 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
     265             : {
     266        2248 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     267             : 
     268        2248 :     if (AH->public.verbose)
     269         484 :         ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
     270         484 :                  (int64) ctx->dataPos);
     271        2248 : }
     272             : 
     273             : /*
     274             :  * Called by the archiver when saving TABLE DATA (not schema). This routine
     275             :  * should save whatever format-specific information is needed to read
     276             :  * the archive back.
     277             :  *
     278             :  * It is called just prior to the dumper's 'DataDumper' routine being called.
     279             :  *
     280             :  * Optional, but strongly recommended.
     281             :  *
     282             :  */
     283             : static void
     284         210 : _StartData(ArchiveHandle *AH, TocEntry *te)
     285             : {
     286         210 :     lclContext *ctx = (lclContext *) AH->formatData;
     287         210 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     288             : 
     289         210 :     tctx->dataPos = _getFilePos(AH, ctx);
     290         210 :     if (tctx->dataPos >= 0)
     291         210 :         tctx->dataState = K_OFFSET_POS_SET;
     292             : 
     293         210 :     _WriteByte(AH, BLK_DATA);   /* Block type */
     294         210 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     295             : 
     296         210 :     ctx->cs = AllocateCompressor(AH->compression_spec,
     297             :                                  NULL,
     298             :                                  _CustomWriteFunc);
     299         210 : }
     300             : 
     301             : /*
     302             :  * Called by archiver when dumper calls WriteData. This routine is
     303             :  * called for both LO and table data; it is the responsibility of
     304             :  * the format to manage each kind of data using StartLO/StartData.
     305             :  *
     306             :  * It should only be called from within a DataDumper routine.
     307             :  *
     308             :  * Mandatory.
     309             :  */
     310             : static void
     311         418 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
     312             : {
     313         418 :     lclContext *ctx = (lclContext *) AH->formatData;
     314         418 :     CompressorState *cs = ctx->cs;
     315             : 
     316         418 :     if (dLen > 0)
     317             :         /* writeData() internally throws write errors */
     318         406 :         cs->writeData(AH, cs, data, dLen);
     319         418 : }
     320             : 
     321             : /*
     322             :  * Called by the archiver when a dumper's 'DataDumper' routine has
     323             :  * finished.
     324             :  *
     325             :  * Mandatory.
     326             :  */
     327             : static void
     328         210 : _EndData(ArchiveHandle *AH, TocEntry *te)
     329             : {
     330         210 :     lclContext *ctx = (lclContext *) AH->formatData;
     331             : 
     332         210 :     EndCompressor(AH, ctx->cs);
     333         210 :     ctx->cs = NULL;
     334             : 
     335             :     /* Send the end marker */
     336         210 :     WriteInt(AH, 0);
     337         210 : }
     338             : 
     339             : /*
     340             :  * Called by the archiver when starting to save BLOB DATA (not schema).
     341             :  * This routine should save whatever format-specific information is needed
     342             :  * to read the LOs back into memory.
     343             :  *
     344             :  * It is called just prior to the dumper's DataDumper routine.
     345             :  *
     346             :  * Optional, but strongly recommended.
     347             :  */
     348             : static void
     349          12 : _StartLOs(ArchiveHandle *AH, TocEntry *te)
     350             : {
     351          12 :     lclContext *ctx = (lclContext *) AH->formatData;
     352          12 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     353             : 
     354          12 :     tctx->dataPos = _getFilePos(AH, ctx);
     355          12 :     if (tctx->dataPos >= 0)
     356          12 :         tctx->dataState = K_OFFSET_POS_SET;
     357             : 
     358          12 :     _WriteByte(AH, BLK_BLOBS);  /* Block type */
     359          12 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     360          12 : }
     361             : 
     362             : /*
     363             :  * Called by the archiver when the dumper calls StartLO.
     364             :  *
     365             :  * Mandatory.
     366             :  *
     367             :  * Must save the passed OID for retrieval at restore-time.
     368             :  */
     369             : static void
     370          12 : _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
     371             : {
     372          12 :     lclContext *ctx = (lclContext *) AH->formatData;
     373             : 
     374          12 :     if (oid == 0)
     375           0 :         pg_fatal("invalid OID for large object");
     376             : 
     377          12 :     WriteInt(AH, oid);
     378             : 
     379          12 :     ctx->cs = AllocateCompressor(AH->compression_spec,
     380             :                                  NULL,
     381             :                                  _CustomWriteFunc);
     382          12 : }
     383             : 
     384             : /*
     385             :  * Called by the archiver when the dumper calls EndLO.
     386             :  *
     387             :  * Optional.
     388             :  */
     389             : static void
     390          12 : _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
     391             : {
     392          12 :     lclContext *ctx = (lclContext *) AH->formatData;
     393             : 
     394          12 :     EndCompressor(AH, ctx->cs);
     395             :     /* Send the end marker */
     396          12 :     WriteInt(AH, 0);
     397          12 : }
     398             : 
     399             : /*
     400             :  * Called by the archiver when finishing saving BLOB DATA.
     401             :  *
     402             :  * Optional.
     403             :  */
     404             : static void
     405          12 : _EndLOs(ArchiveHandle *AH, TocEntry *te)
     406             : {
     407             :     /* Write out a fake zero OID to mark end-of-LOs. */
     408          12 :     WriteInt(AH, 0);
     409          12 : }
     410             : 
     411             : /*
     412             :  * Print data for a given TOC entry
     413             :  */
     414             : static void
     415         208 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
     416             : {
     417         208 :     lclContext *ctx = (lclContext *) AH->formatData;
     418         208 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     419             :     int         blkType;
     420             :     int         id;
     421             : 
     422         208 :     if (tctx->dataState == K_OFFSET_NO_DATA)
     423           0 :         return;
     424             : 
     425         208 :     if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
     426             :     {
     427             :         /*
     428             :          * We cannot seek directly to the desired block.  Instead, skip over
     429             :          * block headers until we find the one we want.  Remember the
     430             :          * positions of skipped-over blocks, so that if we later decide we
     431             :          * need to read one, we'll be able to seek to it.
     432             :          *
     433             :          * When our input file is seekable, we can do the search starting from
     434             :          * the point after the last data block we scanned in previous
     435             :          * iterations of this function.
     436             :          */
     437           0 :         if (ctx->hasSeek)
     438             :         {
     439           0 :             if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
     440           0 :                 pg_fatal("error during file seek: %m");
     441             :         }
     442             : 
     443             :         for (;;)
     444           0 :         {
     445           0 :             pgoff_t     thisBlkPos = _getFilePos(AH, ctx);
     446             : 
     447           0 :             _readBlockHeader(AH, &blkType, &id);
     448             : 
     449           0 :             if (blkType == EOF || id == te->dumpId)
     450             :                 break;
     451             : 
     452             :             /* Remember the block position, if we got one */
     453           0 :             if (thisBlkPos >= 0)
     454             :             {
     455           0 :                 TocEntry   *otherte = getTocEntryByDumpId(AH, id);
     456             : 
     457           0 :                 if (otherte && otherte->formatData)
     458             :                 {
     459           0 :                     lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
     460             : 
     461             :                     /*
     462             :                      * Note: on Windows, multiple threads might access/update
     463             :                      * the same lclTocEntry concurrently, but that should be
     464             :                      * safe as long as we update dataPos before dataState.
     465             :                      * Ideally, we'd use pg_write_barrier() to enforce that,
     466             :                      * but the needed infrastructure doesn't exist in frontend
     467             :                      * code.  But Windows only runs on machines with strong
     468             :                      * store ordering, so it should be okay for now.
     469             :                      */
     470           0 :                     if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
     471             :                     {
     472           0 :                         othertctx->dataPos = thisBlkPos;
     473           0 :                         othertctx->dataState = K_OFFSET_POS_SET;
     474             :                     }
     475           0 :                     else if (othertctx->dataPos != thisBlkPos ||
     476           0 :                              othertctx->dataState != K_OFFSET_POS_SET)
     477             :                     {
     478             :                         /* sanity check */
     479           0 :                         pg_log_warning("data block %d has wrong seek position",
     480             :                                        id);
     481             :                     }
     482             :                 }
     483             :             }
     484             : 
     485           0 :             switch (blkType)
     486             :             {
     487           0 :                 case BLK_DATA:
     488           0 :                     _skipData(AH);
     489           0 :                     break;
     490             : 
     491           0 :                 case BLK_BLOBS:
     492           0 :                     _skipLOs(AH);
     493           0 :                     break;
     494             : 
     495           0 :                 default:        /* Always have a default */
     496           0 :                     pg_fatal("unrecognized data block type (%d) while searching archive",
     497             :                              blkType);
     498             :                     break;
     499             :             }
     500             :         }
     501             :     }
     502             :     else
     503             :     {
     504             :         /* We can just seek to the place we need to be. */
     505         208 :         if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
     506           0 :             pg_fatal("error during file seek: %m");
     507             : 
     508         208 :         _readBlockHeader(AH, &blkType, &id);
     509             :     }
     510             : 
     511             :     /*
     512             :      * If we reached EOF without finding the block we want, then either it
     513             :      * doesn't exist, or it does but we lack the ability to seek back to it.
     514             :      */
     515         208 :     if (blkType == EOF)
     516             :     {
     517           0 :         if (!ctx->hasSeek)
     518           0 :             pg_fatal("could not find block ID %d in archive -- "
     519             :                      "possibly due to out-of-order restore request, "
     520             :                      "which cannot be handled due to non-seekable input file",
     521             :                      te->dumpId);
     522             :         else
     523           0 :             pg_fatal("could not find block ID %d in archive -- "
     524             :                      "possibly corrupt archive",
     525             :                      te->dumpId);
     526             :     }
     527             : 
     528             :     /* Are we sane? */
     529         208 :     if (id != te->dumpId)
     530           0 :         pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
     531             :                  id, te->dumpId);
     532             : 
     533         208 :     switch (blkType)
     534             :     {
     535         196 :         case BLK_DATA:
     536         196 :             _PrintData(AH);
     537         196 :             break;
     538             : 
     539          12 :         case BLK_BLOBS:
     540          12 :             _LoadLOs(AH, AH->public.ropt->dropSchema);
     541          12 :             break;
     542             : 
     543           0 :         default:                /* Always have a default */
     544           0 :             pg_fatal("unrecognized data block type %d while restoring archive",
     545             :                      blkType);
     546             :             break;
     547             :     }
     548             : 
     549             :     /*
     550             :      * If our input file is seekable but lacks data offsets, update our
     551             :      * knowledge of where to start future searches from.  (Note that we did
     552             :      * not update the current TE's dataState/dataPos.  We could have, but
     553             :      * there is no point since it will not be visited again.)
     554             :      */
     555         208 :     if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
     556             :     {
     557           0 :         pgoff_t     curPos = _getFilePos(AH, ctx);
     558             : 
     559           0 :         if (curPos > ctx->lastFilePos)
     560           0 :             ctx->lastFilePos = curPos;
     561             :     }
     562             : }
     563             : 
     564             : /*
     565             :  * Print data from current file position.
     566             : */
     567             : static void
     568         208 : _PrintData(ArchiveHandle *AH)
     569             : {
     570             :     CompressorState *cs;
     571             : 
     572         208 :     cs = AllocateCompressor(AH->compression_spec,
     573             :                             _CustomReadFunc, NULL);
     574         208 :     cs->readData(AH, cs);
     575         208 :     EndCompressor(AH, cs);
     576         208 : }
     577             : 
     578             : static void
     579          12 : _LoadLOs(ArchiveHandle *AH, bool drop)
     580             : {
     581             :     Oid         oid;
     582             : 
     583          12 :     StartRestoreLOs(AH);
     584             : 
     585          12 :     oid = ReadInt(AH);
     586          24 :     while (oid != 0)
     587             :     {
     588          12 :         StartRestoreLO(AH, oid, drop);
     589          12 :         _PrintData(AH);
     590          12 :         EndRestoreLO(AH, oid);
     591          12 :         oid = ReadInt(AH);
     592             :     }
     593             : 
     594          12 :     EndRestoreLOs(AH);
     595          12 : }
     596             : 
     597             : /*
     598             :  * Skip the LOs from the current file position.
     599             :  * LOs are written sequentially as data blocks (see below).
     600             :  * Each LO is preceded by its original OID.
     601             :  * A zero OID indicates the end of the LOs.
     602             :  */
     603             : static void
     604           0 : _skipLOs(ArchiveHandle *AH)
     605             : {
     606             :     Oid         oid;
     607             : 
     608           0 :     oid = ReadInt(AH);
     609           0 :     while (oid != 0)
     610             :     {
     611           0 :         _skipData(AH);
     612           0 :         oid = ReadInt(AH);
     613             :     }
     614           0 : }
     615             : 
     616             : /*
     617             :  * Skip data from current file position.
     618             :  * Data blocks are formatted as an integer length, followed by data.
     619             :  * A zero length indicates the end of the block.
     620             : */
     621             : static void
     622           0 : _skipData(ArchiveHandle *AH)
     623             : {
     624           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     625             :     size_t      blkLen;
     626           0 :     char       *buf = NULL;
     627           0 :     int         buflen = 0;
     628             : 
     629           0 :     blkLen = ReadInt(AH);
     630           0 :     while (blkLen != 0)
     631             :     {
     632           0 :         if (ctx->hasSeek)
     633             :         {
     634           0 :             if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
     635           0 :                 pg_fatal("error during file seek: %m");
     636             :         }
     637             :         else
     638             :         {
     639           0 :             if (blkLen > buflen)
     640             :             {
     641           0 :                 free(buf);
     642           0 :                 buf = (char *) pg_malloc(blkLen);
     643           0 :                 buflen = blkLen;
     644             :             }
     645           0 :             if (fread(buf, 1, blkLen, AH->FH) != blkLen)
     646             :             {
     647           0 :                 if (feof(AH->FH))
     648           0 :                     pg_fatal("could not read from input file: end of file");
     649             :                 else
     650           0 :                     pg_fatal("could not read from input file: %m");
     651             :             }
     652             :         }
     653             : 
     654           0 :         blkLen = ReadInt(AH);
     655             :     }
     656             : 
     657           0 :     free(buf);
     658           0 : }
     659             : 
     660             : /*
     661             :  * Write a byte of data to the archive.
     662             :  *
     663             :  * Mandatory.
     664             :  *
     665             :  * Called by the archiver to do integer & byte output to the archive.
     666             :  */
     667             : static int
     668     1298362 : _WriteByte(ArchiveHandle *AH, const int i)
     669             : {
     670     1298362 :     if (fputc(i, AH->FH) == EOF)
     671           0 :         WRITE_ERROR_EXIT;
     672             : 
     673     1298362 :     return 1;
     674             : }
     675             : 
     676             : /*
     677             :  * Read a byte of data from the archive.
     678             :  *
     679             :  * Mandatory
     680             :  *
     681             :  * Called by the archiver to read bytes & integers from the archive.
     682             :  * EOF should be treated as a fatal error.
     683             :  */
     684             : static int
     685      842330 : _ReadByte(ArchiveHandle *AH)
     686             : {
     687             :     int         res;
     688             : 
     689      842330 :     res = getc(AH->FH);
     690      842330 :     if (res == EOF)
     691           0 :         READ_ERROR_EXIT(AH->FH);
     692      842330 :     return res;
     693             : }
     694             : 
     695             : /*
     696             :  * Write a buffer of data to the archive.
     697             :  *
     698             :  * Mandatory.
     699             :  *
     700             :  * Called by the archiver to write a block of bytes to the archive.
     701             :  */
     702             : static void
     703      135458 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
     704             : {
     705      135458 :     if (fwrite(buf, 1, len, AH->FH) != len)
     706           0 :         WRITE_ERROR_EXIT;
     707      135458 : }
     708             : 
     709             : /*
     710             :  * Read a block of bytes from the archive.
     711             :  *
     712             :  * Mandatory.
     713             :  *
     714             :  * Called by the archiver to read a block of bytes from the archive
     715             :  */
     716             : static void
     717       87064 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
     718             : {
     719       87064 :     if (fread(buf, 1, len, AH->FH) != len)
     720           0 :         READ_ERROR_EXIT(AH->FH);
     721       87064 : }
     722             : 
     723             : /*
     724             :  * Close the archive.
     725             :  *
     726             :  * Mandatory.
     727             :  *
     728             :  * When writing the archive, this is the routine that actually starts
     729             :  * the process of saving it to files. No data should be written prior
     730             :  * to this point, since the user could sort the TOC after creating it.
     731             :  *
     732             :  * If an archive is to be written, this routine must call:
     733             :  *      WriteHead           to save the archive header
     734             :  *      WriteToc            to save the TOC entries
     735             :  *      WriteDataChunks     to save all data & LOs.
     736             :  *
     737             :  */
     738             : static void
     739          86 : _CloseArchive(ArchiveHandle *AH)
     740             : {
     741          86 :     lclContext *ctx = (lclContext *) AH->formatData;
     742             :     pgoff_t     tpos;
     743             : 
     744          86 :     if (AH->mode == archModeWrite)
     745             :     {
     746          38 :         WriteHead(AH);
     747             :         /* Remember TOC's seek position for use below */
     748          38 :         tpos = ftello(AH->FH);
     749          38 :         if (tpos < 0 && ctx->hasSeek)
     750           0 :             pg_fatal("could not determine seek position in archive file: %m");
     751          38 :         WriteToc(AH);
     752          38 :         WriteDataChunks(AH, NULL);
     753             : 
     754             :         /*
     755             :          * If possible, re-write the TOC in order to update the data offset
     756             :          * information.  This is not essential, as pg_restore can cope in most
     757             :          * cases without it; but it can make pg_restore significantly faster
     758             :          * in some situations (especially parallel restore).
     759             :          */
     760          76 :         if (ctx->hasSeek &&
     761          38 :             fseeko(AH->FH, tpos, SEEK_SET) == 0)
     762          38 :             WriteToc(AH);
     763             :     }
     764             : 
     765          86 :     if (fclose(AH->FH) != 0)
     766           0 :         pg_fatal("could not close archive file: %m");
     767             : 
     768             :     /* Sync the output file if one is defined */
     769          86 :     if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
     770          10 :         (void) fsync_fname(AH->fSpec, false);
     771             : 
     772          86 :     AH->FH = NULL;
     773          86 : }
     774             : 
     775             : /*
     776             :  * Reopen the archive's file handle.
     777             :  *
     778             :  * We close the original file handle, except on Windows.  (The difference
     779             :  * is because on Windows, this is used within a multithreading context,
     780             :  * and we don't want a thread closing the parent file handle.)
     781             :  */
     782             : static void
     783           0 : _ReopenArchive(ArchiveHandle *AH)
     784             : {
     785           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     786             :     pgoff_t     tpos;
     787             : 
     788           0 :     if (AH->mode == archModeWrite)
     789           0 :         pg_fatal("can only reopen input archives");
     790             : 
     791             :     /*
     792             :      * These two cases are user-facing errors since they represent unsupported
     793             :      * (but not invalid) use-cases.  Word the error messages appropriately.
     794             :      */
     795           0 :     if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
     796           0 :         pg_fatal("parallel restore from standard input is not supported");
     797           0 :     if (!ctx->hasSeek)
     798           0 :         pg_fatal("parallel restore from non-seekable file is not supported");
     799             : 
     800           0 :     tpos = ftello(AH->FH);
     801           0 :     if (tpos < 0)
     802           0 :         pg_fatal("could not determine seek position in archive file: %m");
     803             : 
     804             : #ifndef WIN32
     805           0 :     if (fclose(AH->FH) != 0)
     806           0 :         pg_fatal("could not close archive file: %m");
     807             : #endif
     808             : 
     809           0 :     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     810           0 :     if (!AH->FH)
     811           0 :         pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
     812             : 
     813           0 :     if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
     814           0 :         pg_fatal("could not set seek position in archive file: %m");
     815           0 : }
     816             : 
     817             : /*
     818             :  * Prepare for parallel restore.
     819             :  *
     820             :  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
     821             :  * TOC entries' dataLength fields with appropriate values to guide the
     822             :  * ordering of restore jobs.  The source of said data is format-dependent,
     823             :  * as is the exact meaning of the values.
     824             :  *
     825             :  * A format module might also choose to do other setup here.
     826             :  */
     827             : static void
     828           0 : _PrepParallelRestore(ArchiveHandle *AH)
     829             : {
     830           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     831           0 :     TocEntry   *prev_te = NULL;
     832           0 :     lclTocEntry *prev_tctx = NULL;
     833             :     TocEntry   *te;
     834             : 
     835             :     /*
     836             :      * Knowing that the data items were dumped out in TOC order, we can
     837             :      * reconstruct the length of each item as the delta to the start offset of
     838             :      * the next data item.
     839             :      */
     840           0 :     for (te = AH->toc->next; te != AH->toc; te = te->next)
     841             :     {
     842           0 :         lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     843             : 
     844             :         /*
     845             :          * Ignore entries without a known data offset; if we were unable to
     846             :          * seek to rewrite the TOC when creating the archive, this'll be all
     847             :          * of them, and we'll end up with no size estimates.
     848             :          */
     849           0 :         if (tctx->dataState != K_OFFSET_POS_SET)
     850           0 :             continue;
     851             : 
     852             :         /* Compute previous data item's length */
     853           0 :         if (prev_te)
     854             :         {
     855           0 :             if (tctx->dataPos > prev_tctx->dataPos)
     856           0 :                 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
     857             :         }
     858             : 
     859           0 :         prev_te = te;
     860           0 :         prev_tctx = tctx;
     861             :     }
     862             : 
     863             :     /* If OK to seek, we can determine the length of the last item */
     864           0 :     if (prev_te && ctx->hasSeek)
     865             :     {
     866             :         pgoff_t     endpos;
     867             : 
     868           0 :         if (fseeko(AH->FH, 0, SEEK_END) != 0)
     869           0 :             pg_fatal("error during file seek: %m");
     870           0 :         endpos = ftello(AH->FH);
     871           0 :         if (endpos > prev_tctx->dataPos)
     872           0 :             prev_te->dataLength = endpos - prev_tctx->dataPos;
     873             :     }
     874           0 : }
     875             : 
     876             : /*
     877             :  * Clone format-specific fields during parallel restoration.
     878             :  */
     879             : static void
     880           0 : _Clone(ArchiveHandle *AH)
     881             : {
     882           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     883             : 
     884             :     /*
     885             :      * Each thread must have private lclContext working state.
     886             :      */
     887           0 :     AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
     888           0 :     memcpy(AH->formatData, ctx, sizeof(lclContext));
     889           0 :     ctx = (lclContext *) AH->formatData;
     890             : 
     891             :     /* sanity check, shouldn't happen */
     892           0 :     if (ctx->cs != NULL)
     893           0 :         pg_fatal("compressor active");
     894             : 
     895             :     /*
     896             :      * We intentionally do not clone TOC-entry-local state: it's useful to
     897             :      * share knowledge about where the data blocks are across threads.
     898             :      * _PrintTocData has to be careful about the order of operations on that
     899             :      * state, though.
     900             :      */
     901           0 : }
     902             : 
     903             : static void
     904           0 : _DeClone(ArchiveHandle *AH)
     905             : {
     906           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     907             : 
     908           0 :     free(ctx);
     909           0 : }
     910             : 
     911             : /*
     912             :  * This function is executed in the child of a parallel restore from a
     913             :  * custom-format archive and restores the actual data for one TOC entry.
     914             :  */
     915             : static int
     916           0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
     917             : {
     918           0 :     return parallel_restore(AH, te);
     919             : }
     920             : 
     921             : /*--------------------------------------------------
     922             :  * END OF FORMAT CALLBACKS
     923             :  *--------------------------------------------------
     924             :  */
     925             : 
     926             : /*
     927             :  * Get the current position in the archive file.
     928             :  *
     929             :  * With a non-seekable archive file, we may not be able to obtain the
     930             :  * file position.  If so, just return -1.  It's not too important in
     931             :  * that case because we won't be able to rewrite the TOC to fill in
     932             :  * data block offsets anyway.
     933             :  */
     934             : static pgoff_t
     935         270 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
     936             : {
     937             :     pgoff_t     pos;
     938             : 
     939         270 :     pos = ftello(AH->FH);
     940         270 :     if (pos < 0)
     941             :     {
     942             :         /* Not expected if we found we can seek. */
     943           0 :         if (ctx->hasSeek)
     944           0 :             pg_fatal("could not determine seek position in archive file: %m");
     945             :     }
     946         270 :     return pos;
     947             : }
     948             : 
     949             : /*
     950             :  * Read a data block header. The format changed in V1.3, so we
     951             :  * centralize the code here for simplicity.  Returns *type = EOF
     952             :  * if at EOF.
     953             :  */
     954             : static void
     955         208 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
     956             : {
     957             :     int         byt;
     958             : 
     959             :     /*
     960             :      * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
     961             :      * inside ReadInt rather than returning EOF.  It doesn't seem worth
     962             :      * jumping through hoops to deal with that case better, because no such
     963             :      * files are likely to exist in the wild: only some 7.1 development
     964             :      * versions of pg_dump ever generated such files.
     965             :      */
     966         208 :     if (AH->version < K_VERS_1_3)
     967           0 :         *type = BLK_DATA;
     968             :     else
     969             :     {
     970         208 :         byt = getc(AH->FH);
     971         208 :         *type = byt;
     972         208 :         if (byt == EOF)
     973             :         {
     974           0 :             *id = 0;            /* don't return an uninitialized value */
     975           0 :             return;
     976             :         }
     977             :     }
     978             : 
     979         208 :     *id = ReadInt(AH);
     980             : }
     981             : 
     982             : /*
     983             :  * Callback function for writeData. Writes one block of (compressed)
     984             :  * data to the archive.
     985             :  */
     986             : static void
     987         418 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
     988             : {
     989             :     /* never write 0-byte blocks (this should not happen) */
     990         418 :     if (len > 0)
     991             :     {
     992         290 :         WriteInt(AH, len);
     993         290 :         _WriteBuf(AH, buf, len);
     994             :     }
     995         418 : }
     996             : 
     997             : /*
     998             :  * Callback function for readData. To keep things simple, we
     999             :  * always read one compressed block at a time.
    1000             :  */
    1001             : static size_t
    1002         484 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
    1003             : {
    1004             :     size_t      blkLen;
    1005             : 
    1006             :     /* Read length */
    1007         484 :     blkLen = ReadInt(AH);
    1008         484 :     if (blkLen == 0)
    1009         208 :         return 0;
    1010             : 
    1011             :     /* If the caller's buffer is not large enough, allocate a bigger one */
    1012         276 :     if (blkLen > *buflen)
    1013             :     {
    1014           2 :         free(*buf);
    1015           2 :         *buf = (char *) pg_malloc(blkLen);
    1016           2 :         *buflen = blkLen;
    1017             :     }
    1018             : 
    1019             :     /* exits app on read errors */
    1020         276 :     _ReadBuf(AH, *buf, blkLen);
    1021             : 
    1022         276 :     return blkLen;
    1023             : }

Generated by: LCOV version 1.14