LCOV - code coverage report
Current view: top level - src/backend/storage/large_object - inv_api.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 286 302 94.7 %
Date: 2020-05-25 06:06:29 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * inv_api.c
       4             :  *    routines for manipulating inversion fs large objects. This file
       5             :  *    contains the user-level large object application interface routines.
       6             :  *
       7             :  *
       8             :  * Note: we access pg_largeobject.data using its C struct declaration.
       9             :  * This is safe because it immediately follows pageno which is an int4 field,
      10             :  * and therefore the data field will always be 4-byte aligned, even if it
      11             :  * is in the short 1-byte-header format.  We have to detoast it since it's
      12             :  * quite likely to be in compressed or short format.  We also need to check
      13             :  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
      14             :  *
      15             :  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
      16             :  * does most of the backend code.  We expect that CurrentMemoryContext will
      17             :  * be a short-lived context.  Data that must persist across function calls
      18             :  * is kept either in CacheMemoryContext (the Relation structs) or in the
      19             :  * memory context given to inv_open (for LargeObjectDesc structs).
      20             :  *
      21             :  *
      22             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
      23             :  * Portions Copyright (c) 1994, Regents of the University of California
      24             :  *
      25             :  *
      26             :  * IDENTIFICATION
      27             :  *    src/backend/storage/large_object/inv_api.c
      28             :  *
      29             :  *-------------------------------------------------------------------------
      30             :  */
      31             : #include "postgres.h"
      32             : 
      33             : #include <limits.h>
      34             : 
      35             : #include "access/detoast.h"
      36             : #include "access/genam.h"
      37             : #include "access/htup_details.h"
      38             : #include "access/sysattr.h"
      39             : #include "access/table.h"
      40             : #include "access/xact.h"
      41             : #include "catalog/dependency.h"
      42             : #include "catalog/indexing.h"
      43             : #include "catalog/objectaccess.h"
      44             : #include "catalog/pg_largeobject.h"
      45             : #include "catalog/pg_largeobject_metadata.h"
      46             : #include "libpq/libpq-fs.h"
      47             : #include "miscadmin.h"
      48             : #include "storage/large_object.h"
      49             : #include "utils/acl.h"
      50             : #include "utils/fmgroids.h"
      51             : #include "utils/rel.h"
      52             : #include "utils/snapmgr.h"
      53             : 
      54             : 
      55             : /*
      56             :  * GUC: backwards-compatibility flag to suppress LO permission checks
      57             :  */
      58             : bool        lo_compat_privileges;
      59             : 
      60             : /*
      61             :  * All accesses to pg_largeobject and its index make use of a single Relation
      62             :  * reference, so that we only need to open pg_relation once per transaction.
      63             :  * To avoid problems when the first such reference occurs inside a
      64             :  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
      65             :  * the Relation reference to TopTransactionResourceOwner.
      66             :  */
      67             : static Relation lo_heap_r = NULL;
      68             : static Relation lo_index_r = NULL;
      69             : 
      70             : 
      71             : /*
      72             :  * Open pg_largeobject and its index, if not already done in current xact
      73             :  */
      74             : static void
      75        1988 : open_lo_relation(void)
      76             : {
      77             :     ResourceOwner currentOwner;
      78             : 
      79        1988 :     if (lo_heap_r && lo_index_r)
      80        1798 :         return;                 /* already open in current xact */
      81             : 
      82             :     /* Arrange for the top xact to own these relation references */
      83         190 :     currentOwner = CurrentResourceOwner;
      84         190 :     CurrentResourceOwner = TopTransactionResourceOwner;
      85             : 
      86             :     /* Use RowExclusiveLock since we might either read or write */
      87         190 :     if (lo_heap_r == NULL)
      88         190 :         lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
      89         190 :     if (lo_index_r == NULL)
      90         190 :         lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
      91             : 
      92         190 :     CurrentResourceOwner = currentOwner;
      93             : }
      94             : 
      95             : /*
      96             :  * Clean up at main transaction end
      97             :  */
      98             : void
      99         284 : close_lo_relation(bool isCommit)
     100             : {
     101         284 :     if (lo_heap_r || lo_index_r)
     102             :     {
     103             :         /*
     104             :          * Only bother to close if committing; else abort cleanup will handle
     105             :          * it
     106             :          */
     107         190 :         if (isCommit)
     108             :         {
     109             :             ResourceOwner currentOwner;
     110             : 
     111         144 :             currentOwner = CurrentResourceOwner;
     112         144 :             CurrentResourceOwner = TopTransactionResourceOwner;
     113             : 
     114         144 :             if (lo_index_r)
     115         144 :                 index_close(lo_index_r, NoLock);
     116         144 :             if (lo_heap_r)
     117         144 :                 table_close(lo_heap_r, NoLock);
     118             : 
     119         144 :             CurrentResourceOwner = currentOwner;
     120             :         }
     121         190 :         lo_heap_r = NULL;
     122         190 :         lo_index_r = NULL;
     123             :     }
     124         284 : }
     125             : 
     126             : 
     127             : /*
     128             :  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
     129             :  * read with can be specified.
     130             :  */
     131             : static bool
     132         258 : myLargeObjectExists(Oid loid, Snapshot snapshot)
     133             : {
     134             :     Relation    pg_lo_meta;
     135             :     ScanKeyData skey[1];
     136             :     SysScanDesc sd;
     137             :     HeapTuple   tuple;
     138         258 :     bool        retval = false;
     139             : 
     140         258 :     ScanKeyInit(&skey[0],
     141             :                 Anum_pg_largeobject_metadata_oid,
     142             :                 BTEqualStrategyNumber, F_OIDEQ,
     143             :                 ObjectIdGetDatum(loid));
     144             : 
     145         258 :     pg_lo_meta = table_open(LargeObjectMetadataRelationId,
     146             :                             AccessShareLock);
     147             : 
     148         258 :     sd = systable_beginscan(pg_lo_meta,
     149             :                             LargeObjectMetadataOidIndexId, true,
     150             :                             snapshot, 1, skey);
     151             : 
     152         258 :     tuple = systable_getnext(sd);
     153         258 :     if (HeapTupleIsValid(tuple))
     154         254 :         retval = true;
     155             : 
     156         258 :     systable_endscan(sd);
     157             : 
     158         258 :     table_close(pg_lo_meta, AccessShareLock);
     159             : 
     160         258 :     return retval;
     161             : }
     162             : 
     163             : 
     164             : /*
     165             :  * Extract data field from a pg_largeobject tuple, detoasting if needed
     166             :  * and verifying that the length is sane.  Returns data pointer (a bytea *),
     167             :  * data length, and an indication of whether to pfree the data pointer.
     168             :  */
     169             : static void
     170        6812 : getdatafield(Form_pg_largeobject tuple,
     171             :              bytea **pdatafield,
     172             :              int *plen,
     173             :              bool *pfreeit)
     174             : {
     175             :     bytea      *datafield;
     176             :     int         len;
     177             :     bool        freeit;
     178             : 
     179        6812 :     datafield = &(tuple->data); /* see note at top of file */
     180        6812 :     freeit = false;
     181        6812 :     if (VARATT_IS_EXTENDED(datafield))
     182             :     {
     183             :         datafield = (bytea *)
     184        6704 :             detoast_attr((struct varlena *) datafield);
     185        6704 :         freeit = true;
     186             :     }
     187        6812 :     len = VARSIZE(datafield) - VARHDRSZ;
     188        6812 :     if (len < 0 || len > LOBLKSIZE)
     189           0 :         ereport(ERROR,
     190             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     191             :                  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
     192             :                         tuple->loid, tuple->pageno, len)));
     193        6812 :     *pdatafield = datafield;
     194        6812 :     *plen = len;
     195        6812 :     *pfreeit = freeit;
     196        6812 : }
     197             : 
     198             : 
     199             : /*
     200             :  *  inv_create -- create a new large object
     201             :  *
     202             :  *  Arguments:
     203             :  *    lobjId - OID to use for new large object, or InvalidOid to pick one
     204             :  *
     205             :  *  Returns:
     206             :  *    OID of new object
     207             :  *
     208             :  * If lobjId is not InvalidOid, then an error occurs if the OID is already
     209             :  * in use.
     210             :  */
     211             : Oid
     212          74 : inv_create(Oid lobjId)
     213             : {
     214             :     Oid         lobjId_new;
     215             : 
     216             :     /*
     217             :      * Create a new largeobject with empty data pages
     218             :      */
     219          74 :     lobjId_new = LargeObjectCreate(lobjId);
     220             : 
     221             :     /*
     222             :      * dependency on the owner of largeobject
     223             :      *
     224             :      * The reason why we use LargeObjectRelationId instead of
     225             :      * LargeObjectMetadataRelationId here is to provide backward compatibility
     226             :      * to the applications which utilize a knowledge about internal layout of
     227             :      * system catalogs. OID of pg_largeobject_metadata and loid of
     228             :      * pg_largeobject are same value, so there are no actual differences here.
     229             :      */
     230          74 :     recordDependencyOnOwner(LargeObjectRelationId,
     231             :                             lobjId_new, GetUserId());
     232             : 
     233             :     /* Post creation hook for new large object */
     234          74 :     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
     235             : 
     236             :     /*
     237             :      * Advance command counter to make new tuple visible to later operations.
     238             :      */
     239          74 :     CommandCounterIncrement();
     240             : 
     241          74 :     return lobjId_new;
     242             : }
     243             : 
     244             : /*
     245             :  *  inv_open -- access an existing large object.
     246             :  *
     247             :  *      Returns:
     248             :  *        Large object descriptor, appropriately filled in.  The descriptor
     249             :  *        and subsidiary data are allocated in the specified memory context,
     250             :  *        which must be suitably long-lived for the caller's purposes.
     251             :  */
     252             : LargeObjectDesc *
     253         258 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
     254             : {
     255             :     LargeObjectDesc *retval;
     256         258 :     Snapshot    snapshot = NULL;
     257         258 :     int         descflags = 0;
     258             : 
     259             :     /*
     260             :      * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
     261             :      * | INV_READ), the caller being allowed to read the large object
     262             :      * descriptor in either case.
     263             :      */
     264         258 :     if (flags & INV_WRITE)
     265         104 :         descflags |= IFS_WRLOCK | IFS_RDLOCK;
     266         258 :     if (flags & INV_READ)
     267         174 :         descflags |= IFS_RDLOCK;
     268             : 
     269         258 :     if (descflags == 0)
     270           0 :         ereport(ERROR,
     271             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     272             :                  errmsg("invalid flags for opening a large object: %d",
     273             :                         flags)));
     274             : 
     275             :     /* Get snapshot.  If write is requested, use an instantaneous snapshot. */
     276         258 :     if (descflags & IFS_WRLOCK)
     277         104 :         snapshot = NULL;
     278             :     else
     279         154 :         snapshot = GetActiveSnapshot();
     280             : 
     281             :     /* Can't use LargeObjectExists here because we need to specify snapshot */
     282         258 :     if (!myLargeObjectExists(lobjId, snapshot))
     283           4 :         ereport(ERROR,
     284             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     285             :                  errmsg("large object %u does not exist", lobjId)));
     286             : 
     287             :     /* Apply permission checks, again specifying snapshot */
     288         254 :     if ((descflags & IFS_RDLOCK) != 0)
     289             :     {
     290         496 :         if (!lo_compat_privileges &&
     291         242 :             pg_largeobject_aclcheck_snapshot(lobjId,
     292             :                                              GetUserId(),
     293             :                                              ACL_SELECT,
     294             :                                              snapshot) != ACLCHECK_OK)
     295          28 :             ereport(ERROR,
     296             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     297             :                      errmsg("permission denied for large object %u",
     298             :                             lobjId)));
     299             :     }
     300         226 :     if ((descflags & IFS_WRLOCK) != 0)
     301             :     {
     302         168 :         if (!lo_compat_privileges &&
     303          80 :             pg_largeobject_aclcheck_snapshot(lobjId,
     304             :                                              GetUserId(),
     305             :                                              ACL_UPDATE,
     306             :                                              snapshot) != ACLCHECK_OK)
     307           8 :             ereport(ERROR,
     308             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     309             :                      errmsg("permission denied for large object %u",
     310             :                             lobjId)));
     311             :     }
     312             : 
     313             :     /* OK to create a descriptor */
     314         218 :     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
     315             :                                                     sizeof(LargeObjectDesc));
     316         218 :     retval->id = lobjId;
     317         218 :     retval->subid = GetCurrentSubTransactionId();
     318         218 :     retval->offset = 0;
     319         218 :     retval->flags = descflags;
     320             : 
     321             :     /*
     322             :      * We must register the snapshot in TopTransaction's resowner, because it
     323             :      * must stay alive until the LO is closed rather than until the current
     324             :      * portal shuts down.  Do this last to avoid uselessly leaking the
     325             :      * snapshot if an error is thrown above.
     326             :      */
     327         218 :     if (snapshot)
     328         138 :         snapshot = RegisterSnapshotOnOwner(snapshot,
     329             :                                            TopTransactionResourceOwner);
     330         218 :     retval->snapshot = snapshot;
     331             : 
     332         218 :     return retval;
     333             : }
     334             : 
     335             : /*
     336             :  * Closes a large object descriptor previously made by inv_open(), and
     337             :  * releases the long-term memory used by it.
     338             :  */
     339             : void
     340         206 : inv_close(LargeObjectDesc *obj_desc)
     341             : {
     342             :     Assert(PointerIsValid(obj_desc));
     343             : 
     344         206 :     UnregisterSnapshotFromOwner(obj_desc->snapshot,
     345             :                                 TopTransactionResourceOwner);
     346             : 
     347         206 :     pfree(obj_desc);
     348         206 : }
     349             : 
     350             : /*
     351             :  * Destroys an existing large object (not to be confused with a descriptor!)
     352             :  *
     353             :  * Note we expect caller to have done any required permissions check.
     354             :  */
     355             : int
     356          52 : inv_drop(Oid lobjId)
     357             : {
     358             :     ObjectAddress object;
     359             : 
     360             :     /*
     361             :      * Delete any comments and dependencies on the large object
     362             :      */
     363          52 :     object.classId = LargeObjectRelationId;
     364          52 :     object.objectId = lobjId;
     365          52 :     object.objectSubId = 0;
     366          52 :     performDeletion(&object, DROP_CASCADE, 0);
     367             : 
     368             :     /*
     369             :      * Advance command counter so that tuple removal will be seen by later
     370             :      * large-object operations in this transaction.
     371             :      */
     372          52 :     CommandCounterIncrement();
     373             : 
     374             :     /* For historical reasons, we always return 1 on success. */
     375          52 :     return 1;
     376             : }
     377             : 
     378             : /*
     379             :  * Determine size of a large object
     380             :  *
     381             :  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
     382             :  * the offset of the last byte + 1.
     383             :  */
     384             : static uint64
     385          72 : inv_getsize(LargeObjectDesc *obj_desc)
     386             : {
     387          72 :     uint64      lastbyte = 0;
     388             :     ScanKeyData skey[1];
     389             :     SysScanDesc sd;
     390             :     HeapTuple   tuple;
     391             : 
     392             :     Assert(PointerIsValid(obj_desc));
     393             : 
     394          72 :     open_lo_relation();
     395             : 
     396          72 :     ScanKeyInit(&skey[0],
     397             :                 Anum_pg_largeobject_loid,
     398             :                 BTEqualStrategyNumber, F_OIDEQ,
     399          72 :                 ObjectIdGetDatum(obj_desc->id));
     400             : 
     401          72 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     402             :                                     obj_desc->snapshot, 1, skey);
     403             : 
     404             :     /*
     405             :      * Because the pg_largeobject index is on both loid and pageno, but we
     406             :      * constrain only loid, a backwards scan should visit all pages of the
     407             :      * large object in reverse pageno order.  So, it's sufficient to examine
     408             :      * the first valid tuple (== last valid page).
     409             :      */
     410          72 :     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
     411          72 :     if (HeapTupleIsValid(tuple))
     412             :     {
     413             :         Form_pg_largeobject data;
     414             :         bytea      *datafield;
     415             :         int         len;
     416             :         bool        pfreeit;
     417             : 
     418          64 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     419           0 :             elog(ERROR, "null field found in pg_largeobject");
     420          64 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     421          64 :         getdatafield(data, &datafield, &len, &pfreeit);
     422          64 :         lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
     423          64 :         if (pfreeit)
     424          12 :             pfree(datafield);
     425             :     }
     426             : 
     427          72 :     systable_endscan_ordered(sd);
     428             : 
     429          72 :     return lastbyte;
     430             : }
     431             : 
     432             : int64
     433         152 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
     434             : {
     435             :     int64       newoffset;
     436             : 
     437             :     Assert(PointerIsValid(obj_desc));
     438             : 
     439             :     /*
     440             :      * We allow seek/tell if you have either read or write permission, so no
     441             :      * need for a permission check here.
     442             :      */
     443             : 
     444             :     /*
     445             :      * Note: overflow in the additions is possible, but since we will reject
     446             :      * negative results, we don't need any extra test for that.
     447             :      */
     448         152 :     switch (whence)
     449             :     {
     450          68 :         case SEEK_SET:
     451          68 :             newoffset = offset;
     452          68 :             break;
     453          12 :         case SEEK_CUR:
     454          12 :             newoffset = obj_desc->offset + offset;
     455          12 :             break;
     456          72 :         case SEEK_END:
     457          72 :             newoffset = inv_getsize(obj_desc) + offset;
     458          72 :             break;
     459           0 :         default:
     460           0 :             ereport(ERROR,
     461             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     462             :                      errmsg("invalid whence setting: %d", whence)));
     463             :             newoffset = 0;      /* keep compiler quiet */
     464             :             break;
     465             :     }
     466             : 
     467             :     /*
     468             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     469             :      * in translatable strings; doing better is not worth the trouble
     470             :      */
     471         152 :     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
     472           0 :         ereport(ERROR,
     473             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     474             :                  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
     475             :                                  newoffset)));
     476             : 
     477         152 :     obj_desc->offset = newoffset;
     478         152 :     return newoffset;
     479             : }
     480             : 
     481             : int64
     482          32 : inv_tell(LargeObjectDesc *obj_desc)
     483             : {
     484             :     Assert(PointerIsValid(obj_desc));
     485             : 
     486             :     /*
     487             :      * We allow seek/tell if you have either read or write permission, so no
     488             :      * need for a permission check here.
     489             :      */
     490             : 
     491          32 :     return obj_desc->offset;
     492             : }
     493             : 
     494             : int
     495         860 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
     496             : {
     497         860 :     int         nread = 0;
     498             :     int64       n;
     499             :     int64       off;
     500             :     int         len;
     501         860 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     502             :     uint64      pageoff;
     503             :     ScanKeyData skey[2];
     504             :     SysScanDesc sd;
     505             :     HeapTuple   tuple;
     506             : 
     507             :     Assert(PointerIsValid(obj_desc));
     508             :     Assert(buf != NULL);
     509             : 
     510         860 :     if ((obj_desc->flags & IFS_RDLOCK) == 0)
     511           0 :         ereport(ERROR,
     512             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     513             :                  errmsg("permission denied for large object %u",
     514             :                         obj_desc->id)));
     515             : 
     516         860 :     if (nbytes <= 0)
     517           8 :         return 0;
     518             : 
     519         852 :     open_lo_relation();
     520             : 
     521         852 :     ScanKeyInit(&skey[0],
     522             :                 Anum_pg_largeobject_loid,
     523             :                 BTEqualStrategyNumber, F_OIDEQ,
     524         852 :                 ObjectIdGetDatum(obj_desc->id));
     525             : 
     526         852 :     ScanKeyInit(&skey[1],
     527             :                 Anum_pg_largeobject_pageno,
     528             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     529             :                 Int32GetDatum(pageno));
     530             : 
     531         852 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     532             :                                     obj_desc->snapshot, 2, skey);
     533             : 
     534        6872 :     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     535             :     {
     536             :         Form_pg_largeobject data;
     537             :         bytea      *datafield;
     538             :         bool        pfreeit;
     539             : 
     540        6728 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     541           0 :             elog(ERROR, "null field found in pg_largeobject");
     542        6728 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     543             : 
     544             :         /*
     545             :          * We expect the indexscan will deliver pages in order.  However,
     546             :          * there may be missing pages if the LO contains unwritten "holes". We
     547             :          * want missing sections to read out as zeroes.
     548             :          */
     549        6728 :         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
     550        6728 :         if (pageoff > obj_desc->offset)
     551             :         {
     552           8 :             n = pageoff - obj_desc->offset;
     553           8 :             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     554           8 :             MemSet(buf + nread, 0, n);
     555           8 :             nread += n;
     556           8 :             obj_desc->offset += n;
     557             :         }
     558             : 
     559        6728 :         if (nread < nbytes)
     560             :         {
     561             :             Assert(obj_desc->offset >= pageoff);
     562        6724 :             off = (int) (obj_desc->offset - pageoff);
     563             :             Assert(off >= 0 && off < LOBLKSIZE);
     564             : 
     565        6724 :             getdatafield(data, &datafield, &len, &pfreeit);
     566        6724 :             if (len > off)
     567             :             {
     568        6666 :                 n = len - off;
     569        6666 :                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     570        6666 :                 memcpy(buf + nread, VARDATA(datafield) + off, n);
     571        6666 :                 nread += n;
     572        6666 :                 obj_desc->offset += n;
     573             :             }
     574        6724 :             if (pfreeit)
     575        6676 :                 pfree(datafield);
     576             :         }
     577             : 
     578        6728 :         if (nread >= nbytes)
     579         708 :             break;
     580             :     }
     581             : 
     582         852 :     systable_endscan_ordered(sd);
     583             : 
     584         852 :     return nread;
     585             : }
     586             : 
     587             : int
     588        1036 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
     589             : {
     590        1036 :     int         nwritten = 0;
     591             :     int         n;
     592             :     int         off;
     593             :     int         len;
     594        1036 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     595             :     ScanKeyData skey[2];
     596             :     SysScanDesc sd;
     597             :     HeapTuple   oldtuple;
     598             :     Form_pg_largeobject olddata;
     599             :     bool        neednextpage;
     600             :     bytea      *datafield;
     601             :     bool        pfreeit;
     602             :     union
     603             :     {
     604             :         bytea       hdr;
     605             :         /* this is to make the union big enough for a LO data chunk: */
     606             :         char        data[LOBLKSIZE + VARHDRSZ];
     607             :         /* ensure union is aligned well enough: */
     608             :         int32       align_it;
     609             :     }           workbuf;
     610        1036 :     char       *workb = VARDATA(&workbuf.hdr);
     611             :     HeapTuple   newtup;
     612             :     Datum       values[Natts_pg_largeobject];
     613             :     bool        nulls[Natts_pg_largeobject];
     614             :     bool        replace[Natts_pg_largeobject];
     615             :     CatalogIndexState indstate;
     616             : 
     617             :     Assert(PointerIsValid(obj_desc));
     618             :     Assert(buf != NULL);
     619             : 
     620             :     /* enforce writability because snapshot is probably wrong otherwise */
     621        1036 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     622           0 :         ereport(ERROR,
     623             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     624             :                  errmsg("permission denied for large object %u",
     625             :                         obj_desc->id)));
     626             : 
     627        1036 :     if (nbytes <= 0)
     628           0 :         return 0;
     629             : 
     630             :     /* this addition can't overflow because nbytes is only int32 */
     631        1036 :     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
     632           0 :         ereport(ERROR,
     633             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     634             :                  errmsg("invalid large object write request size: %d",
     635             :                         nbytes)));
     636             : 
     637        1036 :     open_lo_relation();
     638             : 
     639        1036 :     indstate = CatalogOpenIndexes(lo_heap_r);
     640             : 
     641        1036 :     ScanKeyInit(&skey[0],
     642             :                 Anum_pg_largeobject_loid,
     643             :                 BTEqualStrategyNumber, F_OIDEQ,
     644        1036 :                 ObjectIdGetDatum(obj_desc->id));
     645             : 
     646        1036 :     ScanKeyInit(&skey[1],
     647             :                 Anum_pg_largeobject_pageno,
     648             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     649             :                 Int32GetDatum(pageno));
     650             : 
     651        1036 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     652             :                                     obj_desc->snapshot, 2, skey);
     653             : 
     654        1036 :     oldtuple = NULL;
     655        1036 :     olddata = NULL;
     656        1036 :     neednextpage = true;
     657             : 
     658        6336 :     while (nwritten < nbytes)
     659             :     {
     660             :         /*
     661             :          * If possible, get next pre-existing page of the LO.  We expect the
     662             :          * indexscan will deliver these in order --- but there may be holes.
     663             :          */
     664        5300 :         if (neednextpage)
     665             :         {
     666        1040 :             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     667             :             {
     668          16 :                 if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     669           0 :                     elog(ERROR, "null field found in pg_largeobject");
     670          16 :                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     671             :                 Assert(olddata->pageno >= pageno);
     672             :             }
     673        1040 :             neednextpage = false;
     674             :         }
     675             : 
     676             :         /*
     677             :          * If we have a pre-existing page, see if it is the page we want to
     678             :          * write, or a later one.
     679             :          */
     680        5300 :         if (olddata != NULL && olddata->pageno == pageno)
     681             :         {
     682             :             /*
     683             :              * Update an existing page with fresh data.
     684             :              *
     685             :              * First, load old data into workbuf
     686             :              */
     687          16 :             getdatafield(olddata, &datafield, &len, &pfreeit);
     688          16 :             memcpy(workb, VARDATA(datafield), len);
     689          16 :             if (pfreeit)
     690          12 :                 pfree(datafield);
     691             : 
     692             :             /*
     693             :              * Fill any hole
     694             :              */
     695          16 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     696          16 :             if (off > len)
     697           0 :                 MemSet(workb + len, 0, off - len);
     698             : 
     699             :             /*
     700             :              * Insert appropriate portion of new data
     701             :              */
     702          16 :             n = LOBLKSIZE - off;
     703          16 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     704          16 :             memcpy(workb + off, buf + nwritten, n);
     705          16 :             nwritten += n;
     706          16 :             obj_desc->offset += n;
     707          16 :             off += n;
     708             :             /* compute valid length of new page */
     709          16 :             len = (len >= off) ? len : off;
     710          16 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     711             : 
     712             :             /*
     713             :              * Form and insert updated tuple
     714             :              */
     715          16 :             memset(values, 0, sizeof(values));
     716          16 :             memset(nulls, false, sizeof(nulls));
     717          16 :             memset(replace, false, sizeof(replace));
     718          16 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     719          16 :             replace[Anum_pg_largeobject_data - 1] = true;
     720          16 :             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     721             :                                        values, nulls, replace);
     722          16 :             CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     723             :                                        indstate);
     724          16 :             heap_freetuple(newtup);
     725             : 
     726             :             /*
     727             :              * We're done with this old page.
     728             :              */
     729          16 :             oldtuple = NULL;
     730          16 :             olddata = NULL;
     731          16 :             neednextpage = true;
     732             :         }
     733             :         else
     734             :         {
     735             :             /*
     736             :              * Write a brand new page.
     737             :              *
     738             :              * First, fill any hole
     739             :              */
     740        5284 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     741        5284 :             if (off > 0)
     742           4 :                 MemSet(workb, 0, off);
     743             : 
     744             :             /*
     745             :              * Insert appropriate portion of new data
     746             :              */
     747        5284 :             n = LOBLKSIZE - off;
     748        5284 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     749        5284 :             memcpy(workb + off, buf + nwritten, n);
     750        5284 :             nwritten += n;
     751        5284 :             obj_desc->offset += n;
     752             :             /* compute valid length of new page */
     753        5284 :             len = off + n;
     754        5284 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     755             : 
     756             :             /*
     757             :              * Form and insert updated tuple
     758             :              */
     759        5284 :             memset(values, 0, sizeof(values));
     760        5284 :             memset(nulls, false, sizeof(nulls));
     761        5284 :             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     762        5284 :             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     763        5284 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     764        5284 :             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     765        5284 :             CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     766        5284 :             heap_freetuple(newtup);
     767             :         }
     768        5300 :         pageno++;
     769             :     }
     770             : 
     771        1036 :     systable_endscan_ordered(sd);
     772             : 
     773        1036 :     CatalogCloseIndexes(indstate);
     774             : 
     775             :     /*
     776             :      * Advance command counter so that my tuple updates will be seen by later
     777             :      * large-object operations in this transaction.
     778             :      */
     779        1036 :     CommandCounterIncrement();
     780             : 
     781        1036 :     return nwritten;
     782             : }
     783             : 
     784             : void
     785          28 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
     786             : {
     787          28 :     int32       pageno = (int32) (len / LOBLKSIZE);
     788             :     int32       off;
     789             :     ScanKeyData skey[2];
     790             :     SysScanDesc sd;
     791             :     HeapTuple   oldtuple;
     792             :     Form_pg_largeobject olddata;
     793             :     union
     794             :     {
     795             :         bytea       hdr;
     796             :         /* this is to make the union big enough for a LO data chunk: */
     797             :         char        data[LOBLKSIZE + VARHDRSZ];
     798             :         /* ensure union is aligned well enough: */
     799             :         int32       align_it;
     800             :     }           workbuf;
     801          28 :     char       *workb = VARDATA(&workbuf.hdr);
     802             :     HeapTuple   newtup;
     803             :     Datum       values[Natts_pg_largeobject];
     804             :     bool        nulls[Natts_pg_largeobject];
     805             :     bool        replace[Natts_pg_largeobject];
     806             :     CatalogIndexState indstate;
     807             : 
     808             :     Assert(PointerIsValid(obj_desc));
     809             : 
     810             :     /* enforce writability because snapshot is probably wrong otherwise */
     811          28 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     812           0 :         ereport(ERROR,
     813             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     814             :                  errmsg("permission denied for large object %u",
     815             :                         obj_desc->id)));
     816             : 
     817             :     /*
     818             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     819             :      * in translatable strings; doing better is not worth the trouble
     820             :      */
     821          28 :     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
     822           0 :         ereport(ERROR,
     823             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     824             :                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
     825             :                                  len)));
     826             : 
     827          28 :     open_lo_relation();
     828             : 
     829          28 :     indstate = CatalogOpenIndexes(lo_heap_r);
     830             : 
     831             :     /*
     832             :      * Set up to find all pages with desired loid and pageno >= target
     833             :      */
     834          28 :     ScanKeyInit(&skey[0],
     835             :                 Anum_pg_largeobject_loid,
     836             :                 BTEqualStrategyNumber, F_OIDEQ,
     837          28 :                 ObjectIdGetDatum(obj_desc->id));
     838             : 
     839          28 :     ScanKeyInit(&skey[1],
     840             :                 Anum_pg_largeobject_pageno,
     841             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     842             :                 Int32GetDatum(pageno));
     843             : 
     844          28 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     845             :                                     obj_desc->snapshot, 2, skey);
     846             : 
     847             :     /*
     848             :      * If possible, get the page the truncation point is in. The truncation
     849             :      * point may be beyond the end of the LO or in a hole.
     850             :      */
     851          28 :     olddata = NULL;
     852          28 :     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     853             :     {
     854          16 :         if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     855           0 :             elog(ERROR, "null field found in pg_largeobject");
     856          16 :         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     857             :         Assert(olddata->pageno >= pageno);
     858             :     }
     859             : 
     860             :     /*
     861             :      * If we found the page of the truncation point we need to truncate the
     862             :      * data in it.  Otherwise if we're in a hole, we need to create a page to
     863             :      * mark the end of data.
     864             :      */
     865          28 :     if (olddata != NULL && olddata->pageno == pageno)
     866           8 :     {
     867             :         /* First, load old data into workbuf */
     868             :         bytea      *datafield;
     869             :         int         pagelen;
     870             :         bool        pfreeit;
     871             : 
     872           8 :         getdatafield(olddata, &datafield, &pagelen, &pfreeit);
     873           8 :         memcpy(workb, VARDATA(datafield), pagelen);
     874           8 :         if (pfreeit)
     875           4 :             pfree(datafield);
     876             : 
     877             :         /*
     878             :          * Fill any hole
     879             :          */
     880           8 :         off = len % LOBLKSIZE;
     881           8 :         if (off > pagelen)
     882           4 :             MemSet(workb + pagelen, 0, off - pagelen);
     883             : 
     884             :         /* compute length of new page */
     885           8 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     886             : 
     887             :         /*
     888             :          * Form and insert updated tuple
     889             :          */
     890           8 :         memset(values, 0, sizeof(values));
     891           8 :         memset(nulls, false, sizeof(nulls));
     892           8 :         memset(replace, false, sizeof(replace));
     893           8 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     894           8 :         replace[Anum_pg_largeobject_data - 1] = true;
     895           8 :         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     896             :                                    values, nulls, replace);
     897           8 :         CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     898             :                                    indstate);
     899           8 :         heap_freetuple(newtup);
     900             :     }
     901             :     else
     902             :     {
     903             :         /*
     904             :          * If the first page we found was after the truncation point, we're in
     905             :          * a hole that we'll fill, but we need to delete the later page
     906             :          * because the loop below won't visit it again.
     907             :          */
     908          20 :         if (olddata != NULL)
     909             :         {
     910             :             Assert(olddata->pageno > pageno);
     911           8 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     912             :         }
     913             : 
     914             :         /*
     915             :          * Write a brand new page.
     916             :          *
     917             :          * Fill the hole up to the truncation point
     918             :          */
     919          20 :         off = len % LOBLKSIZE;
     920          20 :         if (off > 0)
     921          20 :             MemSet(workb, 0, off);
     922             : 
     923             :         /* compute length of new page */
     924          20 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     925             : 
     926             :         /*
     927             :          * Form and insert new tuple
     928             :          */
     929          20 :         memset(values, 0, sizeof(values));
     930          20 :         memset(nulls, false, sizeof(nulls));
     931          20 :         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     932          20 :         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     933          20 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     934          20 :         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     935          20 :         CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     936          20 :         heap_freetuple(newtup);
     937             :     }
     938             : 
     939             :     /*
     940             :      * Delete any pages after the truncation point.  If the initial search
     941             :      * didn't find a page, then of course there's nothing more to do.
     942             :      */
     943          28 :     if (olddata != NULL)
     944             :     {
     945          20 :         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     946             :         {
     947           4 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     948             :         }
     949             :     }
     950             : 
     951          28 :     systable_endscan_ordered(sd);
     952             : 
     953          28 :     CatalogCloseIndexes(indstate);
     954             : 
     955             :     /*
     956             :      * Advance command counter so that tuple updates will be seen by later
     957             :      * large-object operations in this transaction.
     958             :      */
     959          28 :     CommandCounterIncrement();
     960          28 : }

Generated by: LCOV version 1.13