LCOV - code coverage report
Current view: top level - src/backend/storage/large_object - inv_api.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 283 298 95.0 %
Date: 2019-11-21 15:06:52 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * inv_api.c
       4             :  *    routines for manipulating inversion fs large objects. This file
       5             :  *    contains the user-level large object application interface routines.
       6             :  *
       7             :  *
       8             :  * Note: we access pg_largeobject.data using its C struct declaration.
       9             :  * This is safe because it immediately follows pageno which is an int4 field,
      10             :  * and therefore the data field will always be 4-byte aligned, even if it
      11             :  * is in the short 1-byte-header format.  We have to detoast it since it's
      12             :  * quite likely to be in compressed or short format.  We also need to check
      13             :  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
      14             :  *
      15             :  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
      16             :  * does most of the backend code.  We expect that CurrentMemoryContext will
      17             :  * be a short-lived context.  Data that must persist across function calls
      18             :  * is kept either in CacheMemoryContext (the Relation structs) or in the
      19             :  * memory context given to inv_open (for LargeObjectDesc structs).
      20             :  *
      21             :  *
      22             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
      23             :  * Portions Copyright (c) 1994, Regents of the University of California
      24             :  *
      25             :  *
      26             :  * IDENTIFICATION
      27             :  *    src/backend/storage/large_object/inv_api.c
      28             :  *
      29             :  *-------------------------------------------------------------------------
      30             :  */
      31             : #include "postgres.h"
      32             : 
      33             : #include <limits.h>
      34             : 
      35             : #include "access/detoast.h"
      36             : #include "access/genam.h"
      37             : #include "access/htup_details.h"
      38             : #include "access/sysattr.h"
      39             : #include "access/table.h"
      40             : #include "access/xact.h"
      41             : #include "catalog/dependency.h"
      42             : #include "catalog/indexing.h"
      43             : #include "catalog/objectaccess.h"
      44             : #include "catalog/pg_largeobject.h"
      45             : #include "catalog/pg_largeobject_metadata.h"
      46             : #include "libpq/libpq-fs.h"
      47             : #include "miscadmin.h"
      48             : #include "storage/large_object.h"
      49             : #include "utils/fmgroids.h"
      50             : #include "utils/rel.h"
      51             : #include "utils/snapmgr.h"
      52             : 
      53             : 
      54             : /*
      55             :  * GUC: backwards-compatibility flag to suppress LO permission checks
      56             :  */
      57             : bool        lo_compat_privileges;
      58             : 
      59             : /*
      60             :  * All accesses to pg_largeobject and its index make use of a single Relation
      61             :  * reference, so that we only need to open pg_relation once per transaction.
      62             :  * To avoid problems when the first such reference occurs inside a
      63             :  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
      64             :  * the Relation reference to TopTransactionResourceOwner.
      65             :  */
      66             : static Relation lo_heap_r = NULL;
      67             : static Relation lo_index_r = NULL;
      68             : 
      69             : 
      70             : /*
      71             :  * Open pg_largeobject and its index, if not already done in current xact
      72             :  */
      73             : static void
      74        1984 : open_lo_relation(void)
      75             : {
      76             :     ResourceOwner currentOwner;
      77             : 
      78        1984 :     if (lo_heap_r && lo_index_r)
      79        1798 :         return;                 /* already open in current xact */
      80             : 
      81             :     /* Arrange for the top xact to own these relation references */
      82         186 :     currentOwner = CurrentResourceOwner;
      83         186 :     CurrentResourceOwner = TopTransactionResourceOwner;
      84             : 
      85             :     /* Use RowExclusiveLock since we might either read or write */
      86         186 :     if (lo_heap_r == NULL)
      87         186 :         lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
      88         186 :     if (lo_index_r == NULL)
      89         186 :         lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
      90             : 
      91         186 :     CurrentResourceOwner = currentOwner;
      92             : }
      93             : 
      94             : /*
      95             :  * Clean up at main transaction end
      96             :  */
      97             : void
      98         280 : close_lo_relation(bool isCommit)
      99             : {
     100         280 :     if (lo_heap_r || lo_index_r)
     101             :     {
     102             :         /*
     103             :          * Only bother to close if committing; else abort cleanup will handle
     104             :          * it
     105             :          */
     106         186 :         if (isCommit)
     107             :         {
     108             :             ResourceOwner currentOwner;
     109             : 
     110         140 :             currentOwner = CurrentResourceOwner;
     111         140 :             CurrentResourceOwner = TopTransactionResourceOwner;
     112             : 
     113         140 :             if (lo_index_r)
     114         140 :                 index_close(lo_index_r, NoLock);
     115         140 :             if (lo_heap_r)
     116         140 :                 table_close(lo_heap_r, NoLock);
     117             : 
     118         140 :             CurrentResourceOwner = currentOwner;
     119             :         }
     120         186 :         lo_heap_r = NULL;
     121         186 :         lo_index_r = NULL;
     122             :     }
     123         280 : }
     124             : 
     125             : 
     126             : /*
     127             :  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
     128             :  * read with can be specified.
     129             :  */
     130             : static bool
     131         254 : myLargeObjectExists(Oid loid, Snapshot snapshot)
     132             : {
     133             :     Relation    pg_lo_meta;
     134             :     ScanKeyData skey[1];
     135             :     SysScanDesc sd;
     136             :     HeapTuple   tuple;
     137         254 :     bool        retval = false;
     138             : 
     139         254 :     ScanKeyInit(&skey[0],
     140             :                 Anum_pg_largeobject_metadata_oid,
     141             :                 BTEqualStrategyNumber, F_OIDEQ,
     142             :                 ObjectIdGetDatum(loid));
     143             : 
     144         254 :     pg_lo_meta = table_open(LargeObjectMetadataRelationId,
     145             :                             AccessShareLock);
     146             : 
     147         254 :     sd = systable_beginscan(pg_lo_meta,
     148             :                             LargeObjectMetadataOidIndexId, true,
     149             :                             snapshot, 1, skey);
     150             : 
     151         254 :     tuple = systable_getnext(sd);
     152         254 :     if (HeapTupleIsValid(tuple))
     153         250 :         retval = true;
     154             : 
     155         254 :     systable_endscan(sd);
     156             : 
     157         254 :     table_close(pg_lo_meta, AccessShareLock);
     158             : 
     159         254 :     return retval;
     160             : }
     161             : 
     162             : 
     163             : /*
     164             :  * Extract data field from a pg_largeobject tuple, detoasting if needed
     165             :  * and verifying that the length is sane.  Returns data pointer (a bytea *),
     166             :  * data length, and an indication of whether to pfree the data pointer.
     167             :  */
     168             : static void
     169        6812 : getdatafield(Form_pg_largeobject tuple,
     170             :              bytea **pdatafield,
     171             :              int *plen,
     172             :              bool *pfreeit)
     173             : {
     174             :     bytea      *datafield;
     175             :     int         len;
     176             :     bool        freeit;
     177             : 
     178        6812 :     datafield = &(tuple->data); /* see note at top of file */
     179        6812 :     freeit = false;
     180        6812 :     if (VARATT_IS_EXTENDED(datafield))
     181             :     {
     182        6704 :         datafield = (bytea *)
     183             :             detoast_attr((struct varlena *) datafield);
     184        6704 :         freeit = true;
     185             :     }
     186        6812 :     len = VARSIZE(datafield) - VARHDRSZ;
     187        6812 :     if (len < 0 || len > LOBLKSIZE)
     188           0 :         ereport(ERROR,
     189             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     190             :                  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
     191             :                         tuple->loid, tuple->pageno, len)));
     192        6812 :     *pdatafield = datafield;
     193        6812 :     *plen = len;
     194        6812 :     *pfreeit = freeit;
     195        6812 : }
     196             : 
     197             : 
     198             : /*
     199             :  *  inv_create -- create a new large object
     200             :  *
     201             :  *  Arguments:
     202             :  *    lobjId - OID to use for new large object, or InvalidOid to pick one
     203             :  *
     204             :  *  Returns:
     205             :  *    OID of new object
     206             :  *
     207             :  * If lobjId is not InvalidOid, then an error occurs if the OID is already
     208             :  * in use.
     209             :  */
     210             : Oid
     211          72 : inv_create(Oid lobjId)
     212             : {
     213             :     Oid         lobjId_new;
     214             : 
     215             :     /*
     216             :      * Create a new largeobject with empty data pages
     217             :      */
     218          72 :     lobjId_new = LargeObjectCreate(lobjId);
     219             : 
     220             :     /*
     221             :      * dependency on the owner of largeobject
     222             :      *
     223             :      * The reason why we use LargeObjectRelationId instead of
     224             :      * LargeObjectMetadataRelationId here is to provide backward compatibility
     225             :      * to the applications which utilize a knowledge about internal layout of
     226             :      * system catalogs. OID of pg_largeobject_metadata and loid of
     227             :      * pg_largeobject are same value, so there are no actual differences here.
     228             :      */
     229          72 :     recordDependencyOnOwner(LargeObjectRelationId,
     230             :                             lobjId_new, GetUserId());
     231             : 
     232             :     /* Post creation hook for new large object */
     233          72 :     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
     234             : 
     235             :     /*
     236             :      * Advance command counter to make new tuple visible to later operations.
     237             :      */
     238          72 :     CommandCounterIncrement();
     239             : 
     240          72 :     return lobjId_new;
     241             : }
     242             : 
     243             : /*
     244             :  *  inv_open -- access an existing large object.
     245             :  *
     246             :  *      Returns:
     247             :  *        Large object descriptor, appropriately filled in.  The descriptor
     248             :  *        and subsidiary data are allocated in the specified memory context,
     249             :  *        which must be suitably long-lived for the caller's purposes.
     250             :  */
     251             : LargeObjectDesc *
     252         254 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
     253             : {
     254             :     LargeObjectDesc *retval;
     255         254 :     Snapshot    snapshot = NULL;
     256         254 :     int         descflags = 0;
     257             : 
     258             :     /*
     259             :      * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
     260             :      * | INV_READ), the caller being allowed to read the large object
     261             :      * descriptor in either case.
     262             :      */
     263         254 :     if (flags & INV_WRITE)
     264         102 :         descflags |= IFS_WRLOCK | IFS_RDLOCK;
     265         254 :     if (flags & INV_READ)
     266         172 :         descflags |= IFS_RDLOCK;
     267             : 
     268         254 :     if (descflags == 0)
     269           0 :         ereport(ERROR,
     270             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     271             :                  errmsg("invalid flags for opening a large object: %d",
     272             :                         flags)));
     273             : 
     274             :     /* Get snapshot.  If write is requested, use an instantaneous snapshot. */
     275         254 :     if (descflags & IFS_WRLOCK)
     276         102 :         snapshot = NULL;
     277             :     else
     278         152 :         snapshot = GetActiveSnapshot();
     279             : 
     280             :     /* Can't use LargeObjectExists here because we need to specify snapshot */
     281         254 :     if (!myLargeObjectExists(lobjId, snapshot))
     282           4 :         ereport(ERROR,
     283             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     284             :                  errmsg("large object %u does not exist", lobjId)));
     285             : 
     286             :     /* Apply permission checks, again specifying snapshot */
     287         250 :     if ((descflags & IFS_RDLOCK) != 0)
     288             :     {
     289         488 :         if (!lo_compat_privileges &&
     290         238 :             pg_largeobject_aclcheck_snapshot(lobjId,
     291             :                                              GetUserId(),
     292             :                                              ACL_SELECT,
     293             :                                              snapshot) != ACLCHECK_OK)
     294          28 :             ereport(ERROR,
     295             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     296             :                      errmsg("permission denied for large object %u",
     297             :                             lobjId)));
     298             :     }
     299         222 :     if ((descflags & IFS_WRLOCK) != 0)
     300             :     {
     301         164 :         if (!lo_compat_privileges &&
     302          78 :             pg_largeobject_aclcheck_snapshot(lobjId,
     303             :                                              GetUserId(),
     304             :                                              ACL_UPDATE,
     305             :                                              snapshot) != ACLCHECK_OK)
     306           8 :             ereport(ERROR,
     307             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     308             :                      errmsg("permission denied for large object %u",
     309             :                             lobjId)));
     310             :     }
     311             : 
     312             :     /* OK to create a descriptor */
     313         214 :     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
     314             :                                                     sizeof(LargeObjectDesc));
     315         214 :     retval->id = lobjId;
     316         214 :     retval->subid = GetCurrentSubTransactionId();
     317         214 :     retval->offset = 0;
     318         214 :     retval->flags = descflags;
     319             : 
     320             :     /*
     321             :      * We must register the snapshot in TopTransaction's resowner, because it
     322             :      * must stay alive until the LO is closed rather than until the current
     323             :      * portal shuts down.  Do this last to avoid uselessly leaking the
     324             :      * snapshot if an error is thrown above.
     325             :      */
     326         214 :     if (snapshot)
     327         136 :         snapshot = RegisterSnapshotOnOwner(snapshot,
     328             :                                            TopTransactionResourceOwner);
     329         214 :     retval->snapshot = snapshot;
     330             : 
     331         214 :     return retval;
     332             : }
     333             : 
     334             : /*
     335             :  * Closes a large object descriptor previously made by inv_open(), and
     336             :  * releases the long-term memory used by it.
     337             :  */
     338             : void
     339         202 : inv_close(LargeObjectDesc *obj_desc)
     340             : {
     341             :     Assert(PointerIsValid(obj_desc));
     342             : 
     343         202 :     UnregisterSnapshotFromOwner(obj_desc->snapshot,
     344             :                                 TopTransactionResourceOwner);
     345             : 
     346         202 :     pfree(obj_desc);
     347         202 : }
     348             : 
     349             : /*
     350             :  * Destroys an existing large object (not to be confused with a descriptor!)
     351             :  *
     352             :  * Note we expect caller to have done any required permissions check.
     353             :  */
     354             : int
     355          52 : inv_drop(Oid lobjId)
     356             : {
     357             :     ObjectAddress object;
     358             : 
     359             :     /*
     360             :      * Delete any comments and dependencies on the large object
     361             :      */
     362          52 :     object.classId = LargeObjectRelationId;
     363          52 :     object.objectId = lobjId;
     364          52 :     object.objectSubId = 0;
     365          52 :     performDeletion(&object, DROP_CASCADE, 0);
     366             : 
     367             :     /*
     368             :      * Advance command counter so that tuple removal will be seen by later
     369             :      * large-object operations in this transaction.
     370             :      */
     371          52 :     CommandCounterIncrement();
     372             : 
     373             :     /* For historical reasons, we always return 1 on success. */
     374          52 :     return 1;
     375             : }
     376             : 
     377             : /*
     378             :  * Determine size of a large object
     379             :  *
     380             :  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
     381             :  * the offset of the last byte + 1.
     382             :  */
     383             : static uint64
     384          70 : inv_getsize(LargeObjectDesc *obj_desc)
     385             : {
     386          70 :     uint64      lastbyte = 0;
     387             :     ScanKeyData skey[1];
     388             :     SysScanDesc sd;
     389             :     HeapTuple   tuple;
     390             : 
     391             :     Assert(PointerIsValid(obj_desc));
     392             : 
     393          70 :     open_lo_relation();
     394             : 
     395          70 :     ScanKeyInit(&skey[0],
     396             :                 Anum_pg_largeobject_loid,
     397             :                 BTEqualStrategyNumber, F_OIDEQ,
     398          70 :                 ObjectIdGetDatum(obj_desc->id));
     399             : 
     400          70 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     401             :                                     obj_desc->snapshot, 1, skey);
     402             : 
     403             :     /*
     404             :      * Because the pg_largeobject index is on both loid and pageno, but we
     405             :      * constrain only loid, a backwards scan should visit all pages of the
     406             :      * large object in reverse pageno order.  So, it's sufficient to examine
     407             :      * the first valid tuple (== last valid page).
     408             :      */
     409          70 :     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
     410          70 :     if (HeapTupleIsValid(tuple))
     411             :     {
     412             :         Form_pg_largeobject data;
     413             :         bytea      *datafield;
     414             :         int         len;
     415             :         bool        pfreeit;
     416             : 
     417          64 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     418           0 :             elog(ERROR, "null field found in pg_largeobject");
     419          64 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     420          64 :         getdatafield(data, &datafield, &len, &pfreeit);
     421          64 :         lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
     422          64 :         if (pfreeit)
     423          12 :             pfree(datafield);
     424             :     }
     425             : 
     426          70 :     systable_endscan_ordered(sd);
     427             : 
     428          70 :     return lastbyte;
     429             : }
     430             : 
     431             : int64
     432         148 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
     433             : {
     434             :     int64       newoffset;
     435             : 
     436             :     Assert(PointerIsValid(obj_desc));
     437             : 
     438             :     /*
     439             :      * We allow seek/tell if you have either read or write permission, so no
     440             :      * need for a permission check here.
     441             :      */
     442             : 
     443             :     /*
     444             :      * Note: overflow in the additions is possible, but since we will reject
     445             :      * negative results, we don't need any extra test for that.
     446             :      */
     447         148 :     switch (whence)
     448             :     {
     449             :         case SEEK_SET:
     450          66 :             newoffset = offset;
     451          66 :             break;
     452             :         case SEEK_CUR:
     453          12 :             newoffset = obj_desc->offset + offset;
     454          12 :             break;
     455             :         case SEEK_END:
     456          70 :             newoffset = inv_getsize(obj_desc) + offset;
     457          70 :             break;
     458             :         default:
     459           0 :             ereport(ERROR,
     460             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     461             :                      errmsg("invalid whence setting: %d", whence)));
     462             :             newoffset = 0;      /* keep compiler quiet */
     463             :             break;
     464             :     }
     465             : 
     466             :     /*
     467             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     468             :      * in translatable strings; doing better is not worth the trouble
     469             :      */
     470         148 :     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
     471           0 :         ereport(ERROR,
     472             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     473             :                  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
     474             :                                  newoffset)));
     475             : 
     476         148 :     obj_desc->offset = newoffset;
     477         148 :     return newoffset;
     478             : }
     479             : 
     480             : int64
     481          32 : inv_tell(LargeObjectDesc *obj_desc)
     482             : {
     483             :     Assert(PointerIsValid(obj_desc));
     484             : 
     485             :     /*
     486             :      * We allow seek/tell if you have either read or write permission, so no
     487             :      * need for a permission check here.
     488             :      */
     489             : 
     490          32 :     return obj_desc->offset;
     491             : }
     492             : 
     493             : int
     494         858 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
     495             : {
     496         858 :     int         nread = 0;
     497             :     int64       n;
     498             :     int64       off;
     499             :     int         len;
     500         858 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     501             :     uint64      pageoff;
     502             :     ScanKeyData skey[2];
     503             :     SysScanDesc sd;
     504             :     HeapTuple   tuple;
     505             : 
     506             :     Assert(PointerIsValid(obj_desc));
     507             :     Assert(buf != NULL);
     508             : 
     509         858 :     if ((obj_desc->flags & IFS_RDLOCK) == 0)
     510           0 :         ereport(ERROR,
     511             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     512             :                  errmsg("permission denied for large object %u",
     513             :                         obj_desc->id)));
     514             : 
     515         858 :     if (nbytes <= 0)
     516           6 :         return 0;
     517             : 
     518         852 :     open_lo_relation();
     519             : 
     520         852 :     ScanKeyInit(&skey[0],
     521             :                 Anum_pg_largeobject_loid,
     522             :                 BTEqualStrategyNumber, F_OIDEQ,
     523         852 :                 ObjectIdGetDatum(obj_desc->id));
     524             : 
     525         852 :     ScanKeyInit(&skey[1],
     526             :                 Anum_pg_largeobject_pageno,
     527             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     528             :                 Int32GetDatum(pageno));
     529             : 
     530         852 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     531             :                                     obj_desc->snapshot, 2, skey);
     532             : 
     533        7724 :     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     534             :     {
     535             :         Form_pg_largeobject data;
     536             :         bytea      *datafield;
     537             :         bool        pfreeit;
     538             : 
     539        6728 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     540           0 :             elog(ERROR, "null field found in pg_largeobject");
     541        6728 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     542             : 
     543             :         /*
     544             :          * We expect the indexscan will deliver pages in order.  However,
     545             :          * there may be missing pages if the LO contains unwritten "holes". We
     546             :          * want missing sections to read out as zeroes.
     547             :          */
     548        6728 :         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
     549        6728 :         if (pageoff > obj_desc->offset)
     550             :         {
     551           8 :             n = pageoff - obj_desc->offset;
     552           8 :             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     553           8 :             MemSet(buf + nread, 0, n);
     554           8 :             nread += n;
     555           8 :             obj_desc->offset += n;
     556             :         }
     557             : 
     558        6728 :         if (nread < nbytes)
     559             :         {
     560             :             Assert(obj_desc->offset >= pageoff);
     561        6724 :             off = (int) (obj_desc->offset - pageoff);
     562             :             Assert(off >= 0 && off < LOBLKSIZE);
     563             : 
     564        6724 :             getdatafield(data, &datafield, &len, &pfreeit);
     565        6724 :             if (len > off)
     566             :             {
     567        6666 :                 n = len - off;
     568        6666 :                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     569        6666 :                 memcpy(buf + nread, VARDATA(datafield) + off, n);
     570        6666 :                 nread += n;
     571        6666 :                 obj_desc->offset += n;
     572             :             }
     573        6724 :             if (pfreeit)
     574        6676 :                 pfree(datafield);
     575             :         }
     576             : 
     577        6728 :         if (nread >= nbytes)
     578         708 :             break;
     579             :     }
     580             : 
     581         852 :     systable_endscan_ordered(sd);
     582             : 
     583         852 :     return nread;
     584             : }
     585             : 
     586             : int
     587        1034 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
     588             : {
     589        1034 :     int         nwritten = 0;
     590             :     int         n;
     591             :     int         off;
     592             :     int         len;
     593        1034 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     594             :     ScanKeyData skey[2];
     595             :     SysScanDesc sd;
     596             :     HeapTuple   oldtuple;
     597             :     Form_pg_largeobject olddata;
     598             :     bool        neednextpage;
     599             :     bytea      *datafield;
     600             :     bool        pfreeit;
     601             :     union
     602             :     {
     603             :         bytea       hdr;
     604             :         /* this is to make the union big enough for a LO data chunk: */
     605             :         char        data[LOBLKSIZE + VARHDRSZ];
     606             :         /* ensure union is aligned well enough: */
     607             :         int32       align_it;
     608             :     }           workbuf;
     609        1034 :     char       *workb = VARDATA(&workbuf.hdr);
     610             :     HeapTuple   newtup;
     611             :     Datum       values[Natts_pg_largeobject];
     612             :     bool        nulls[Natts_pg_largeobject];
     613             :     bool        replace[Natts_pg_largeobject];
     614             :     CatalogIndexState indstate;
     615             : 
     616             :     Assert(PointerIsValid(obj_desc));
     617             :     Assert(buf != NULL);
     618             : 
     619             :     /* enforce writability because snapshot is probably wrong otherwise */
     620        1034 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     621           0 :         ereport(ERROR,
     622             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     623             :                  errmsg("permission denied for large object %u",
     624             :                         obj_desc->id)));
     625             : 
     626        1034 :     if (nbytes <= 0)
     627           0 :         return 0;
     628             : 
     629             :     /* this addition can't overflow because nbytes is only int32 */
     630        1034 :     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
     631           0 :         ereport(ERROR,
     632             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     633             :                  errmsg("invalid large object write request size: %d",
     634             :                         nbytes)));
     635             : 
     636        1034 :     open_lo_relation();
     637             : 
     638        1034 :     indstate = CatalogOpenIndexes(lo_heap_r);
     639             : 
     640        1034 :     ScanKeyInit(&skey[0],
     641             :                 Anum_pg_largeobject_loid,
     642             :                 BTEqualStrategyNumber, F_OIDEQ,
     643        1034 :                 ObjectIdGetDatum(obj_desc->id));
     644             : 
     645        1034 :     ScanKeyInit(&skey[1],
     646             :                 Anum_pg_largeobject_pageno,
     647             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     648             :                 Int32GetDatum(pageno));
     649             : 
     650        1034 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     651             :                                     obj_desc->snapshot, 2, skey);
     652             : 
     653        1034 :     oldtuple = NULL;
     654        1034 :     olddata = NULL;
     655        1034 :     neednextpage = true;
     656             : 
     657        7366 :     while (nwritten < nbytes)
     658             :     {
     659             :         /*
     660             :          * If possible, get next pre-existing page of the LO.  We expect the
     661             :          * indexscan will deliver these in order --- but there may be holes.
     662             :          */
     663        5298 :         if (neednextpage)
     664             :         {
     665        1038 :             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     666             :             {
     667          16 :                 if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     668           0 :                     elog(ERROR, "null field found in pg_largeobject");
     669          16 :                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     670             :                 Assert(olddata->pageno >= pageno);
     671             :             }
     672        1038 :             neednextpage = false;
     673             :         }
     674             : 
     675             :         /*
     676             :          * If we have a pre-existing page, see if it is the page we want to
     677             :          * write, or a later one.
     678             :          */
     679        5298 :         if (olddata != NULL && olddata->pageno == pageno)
     680             :         {
     681             :             /*
     682             :              * Update an existing page with fresh data.
     683             :              *
     684             :              * First, load old data into workbuf
     685             :              */
     686          16 :             getdatafield(olddata, &datafield, &len, &pfreeit);
     687          16 :             memcpy(workb, VARDATA(datafield), len);
     688          16 :             if (pfreeit)
     689          12 :                 pfree(datafield);
     690             : 
     691             :             /*
     692             :              * Fill any hole
     693             :              */
     694          16 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     695          16 :             if (off > len)
     696           0 :                 MemSet(workb + len, 0, off - len);
     697             : 
     698             :             /*
     699             :              * Insert appropriate portion of new data
     700             :              */
     701          16 :             n = LOBLKSIZE - off;
     702          16 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     703          16 :             memcpy(workb + off, buf + nwritten, n);
     704          16 :             nwritten += n;
     705          16 :             obj_desc->offset += n;
     706          16 :             off += n;
     707             :             /* compute valid length of new page */
     708          16 :             len = (len >= off) ? len : off;
     709          16 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     710             : 
     711             :             /*
     712             :              * Form and insert updated tuple
     713             :              */
     714          16 :             memset(values, 0, sizeof(values));
     715          16 :             memset(nulls, false, sizeof(nulls));
     716          16 :             memset(replace, false, sizeof(replace));
     717          16 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     718          16 :             replace[Anum_pg_largeobject_data - 1] = true;
     719          16 :             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     720             :                                        values, nulls, replace);
     721          16 :             CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     722             :                                        indstate);
     723          16 :             heap_freetuple(newtup);
     724             : 
     725             :             /*
     726             :              * We're done with this old page.
     727             :              */
     728          16 :             oldtuple = NULL;
     729          16 :             olddata = NULL;
     730          16 :             neednextpage = true;
     731             :         }
     732             :         else
     733             :         {
     734             :             /*
     735             :              * Write a brand new page.
     736             :              *
     737             :              * First, fill any hole
     738             :              */
     739        5282 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     740        5282 :             if (off > 0)
     741           4 :                 MemSet(workb, 0, off);
     742             : 
     743             :             /*
     744             :              * Insert appropriate portion of new data
     745             :              */
     746        5282 :             n = LOBLKSIZE - off;
     747        5282 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     748        5282 :             memcpy(workb + off, buf + nwritten, n);
     749        5282 :             nwritten += n;
     750        5282 :             obj_desc->offset += n;
     751             :             /* compute valid length of new page */
     752        5282 :             len = off + n;
     753        5282 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     754             : 
     755             :             /*
     756             :              * Form and insert updated tuple
     757             :              */
     758        5282 :             memset(values, 0, sizeof(values));
     759        5282 :             memset(nulls, false, sizeof(nulls));
     760        5282 :             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     761        5282 :             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     762        5282 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     763        5282 :             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     764        5282 :             CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     765        5282 :             heap_freetuple(newtup);
     766             :         }
     767        5298 :         pageno++;
     768             :     }
     769             : 
     770        1034 :     systable_endscan_ordered(sd);
     771             : 
     772        1034 :     CatalogCloseIndexes(indstate);
     773             : 
     774             :     /*
     775             :      * Advance command counter so that my tuple updates will be seen by later
     776             :      * large-object operations in this transaction.
     777             :      */
     778        1034 :     CommandCounterIncrement();
     779             : 
     780        1034 :     return nwritten;
     781             : }
     782             : 
     783             : void
     784          28 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
     785             : {
     786          28 :     int32       pageno = (int32) (len / LOBLKSIZE);
     787             :     int32       off;
     788             :     ScanKeyData skey[2];
     789             :     SysScanDesc sd;
     790             :     HeapTuple   oldtuple;
     791             :     Form_pg_largeobject olddata;
     792             :     union
     793             :     {
     794             :         bytea       hdr;
     795             :         /* this is to make the union big enough for a LO data chunk: */
     796             :         char        data[LOBLKSIZE + VARHDRSZ];
     797             :         /* ensure union is aligned well enough: */
     798             :         int32       align_it;
     799             :     }           workbuf;
     800          28 :     char       *workb = VARDATA(&workbuf.hdr);
     801             :     HeapTuple   newtup;
     802             :     Datum       values[Natts_pg_largeobject];
     803             :     bool        nulls[Natts_pg_largeobject];
     804             :     bool        replace[Natts_pg_largeobject];
     805             :     CatalogIndexState indstate;
     806             : 
     807             :     Assert(PointerIsValid(obj_desc));
     808             : 
     809             :     /* enforce writability because snapshot is probably wrong otherwise */
     810          28 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     811           0 :         ereport(ERROR,
     812             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     813             :                  errmsg("permission denied for large object %u",
     814             :                         obj_desc->id)));
     815             : 
     816             :     /*
     817             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     818             :      * in translatable strings; doing better is not worth the trouble
     819             :      */
     820          28 :     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
     821           0 :         ereport(ERROR,
     822             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     823             :                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
     824             :                                  len)));
     825             : 
     826          28 :     open_lo_relation();
     827             : 
     828          28 :     indstate = CatalogOpenIndexes(lo_heap_r);
     829             : 
     830             :     /*
     831             :      * Set up to find all pages with desired loid and pageno >= target
     832             :      */
     833          28 :     ScanKeyInit(&skey[0],
     834             :                 Anum_pg_largeobject_loid,
     835             :                 BTEqualStrategyNumber, F_OIDEQ,
     836          28 :                 ObjectIdGetDatum(obj_desc->id));
     837             : 
     838          28 :     ScanKeyInit(&skey[1],
     839             :                 Anum_pg_largeobject_pageno,
     840             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     841             :                 Int32GetDatum(pageno));
     842             : 
     843          28 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     844             :                                     obj_desc->snapshot, 2, skey);
     845             : 
     846             :     /*
     847             :      * If possible, get the page the truncation point is in. The truncation
     848             :      * point may be beyond the end of the LO or in a hole.
     849             :      */
     850          28 :     olddata = NULL;
     851          28 :     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     852             :     {
     853          16 :         if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     854           0 :             elog(ERROR, "null field found in pg_largeobject");
     855          16 :         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     856             :         Assert(olddata->pageno >= pageno);
     857             :     }
     858             : 
     859             :     /*
     860             :      * If we found the page of the truncation point we need to truncate the
     861             :      * data in it.  Otherwise if we're in a hole, we need to create a page to
     862             :      * mark the end of data.
     863             :      */
     864          28 :     if (olddata != NULL && olddata->pageno == pageno)
     865           8 :     {
     866             :         /* First, load old data into workbuf */
     867             :         bytea      *datafield;
     868             :         int         pagelen;
     869             :         bool        pfreeit;
     870             : 
     871           8 :         getdatafield(olddata, &datafield, &pagelen, &pfreeit);
     872           8 :         memcpy(workb, VARDATA(datafield), pagelen);
     873           8 :         if (pfreeit)
     874           4 :             pfree(datafield);
     875             : 
     876             :         /*
     877             :          * Fill any hole
     878             :          */
     879           8 :         off = len % LOBLKSIZE;
     880           8 :         if (off > pagelen)
     881           4 :             MemSet(workb + pagelen, 0, off - pagelen);
     882             : 
     883             :         /* compute length of new page */
     884           8 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     885             : 
     886             :         /*
     887             :          * Form and insert updated tuple
     888             :          */
     889           8 :         memset(values, 0, sizeof(values));
     890           8 :         memset(nulls, false, sizeof(nulls));
     891           8 :         memset(replace, false, sizeof(replace));
     892           8 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     893           8 :         replace[Anum_pg_largeobject_data - 1] = true;
     894           8 :         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     895             :                                    values, nulls, replace);
     896           8 :         CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     897             :                                    indstate);
     898           8 :         heap_freetuple(newtup);
     899             :     }
     900             :     else
     901             :     {
     902             :         /*
     903             :          * If the first page we found was after the truncation point, we're in
     904             :          * a hole that we'll fill, but we need to delete the later page
     905             :          * because the loop below won't visit it again.
     906             :          */
     907          20 :         if (olddata != NULL)
     908             :         {
     909             :             Assert(olddata->pageno > pageno);
     910           8 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     911             :         }
     912             : 
     913             :         /*
     914             :          * Write a brand new page.
     915             :          *
     916             :          * Fill the hole up to the truncation point
     917             :          */
     918          20 :         off = len % LOBLKSIZE;
     919          20 :         if (off > 0)
     920          20 :             MemSet(workb, 0, off);
     921             : 
     922             :         /* compute length of new page */
     923          20 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     924             : 
     925             :         /*
     926             :          * Form and insert new tuple
     927             :          */
     928          20 :         memset(values, 0, sizeof(values));
     929          20 :         memset(nulls, false, sizeof(nulls));
     930          20 :         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     931          20 :         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     932          20 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     933          20 :         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     934          20 :         CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     935          20 :         heap_freetuple(newtup);
     936             :     }
     937             : 
     938             :     /*
     939             :      * Delete any pages after the truncation point.  If the initial search
     940             :      * didn't find a page, then of course there's nothing more to do.
     941             :      */
     942          28 :     if (olddata != NULL)
     943             :     {
     944          36 :         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     945             :         {
     946           4 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     947             :         }
     948             :     }
     949             : 
     950          28 :     systable_endscan_ordered(sd);
     951             : 
     952          28 :     CatalogCloseIndexes(indstate);
     953             : 
     954             :     /*
     955             :      * Advance command counter so that tuple updates will be seen by later
     956             :      * large-object operations in this transaction.
     957             :      */
     958          28 :     CommandCounterIncrement();
     959          28 : }

Generated by: LCOV version 1.13