LCOV - code coverage report
Current view: top level - src/backend/storage/large_object - inv_api.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 279 295 94.6 %
Date: 2023-12-07 07:10:44 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * inv_api.c
       4             :  *    routines for manipulating inversion fs large objects. This file
       5             :  *    contains the user-level large object application interface routines.
       6             :  *
       7             :  *
       8             :  * Note: we access pg_largeobject.data using its C struct declaration.
       9             :  * This is safe because it immediately follows pageno which is an int4 field,
      10             :  * and therefore the data field will always be 4-byte aligned, even if it
      11             :  * is in the short 1-byte-header format.  We have to detoast it since it's
      12             :  * quite likely to be in compressed or short format.  We also need to check
      13             :  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
      14             :  *
      15             :  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
      16             :  * does most of the backend code.  We expect that CurrentMemoryContext will
      17             :  * be a short-lived context.  Data that must persist across function calls
      18             :  * is kept either in CacheMemoryContext (the Relation structs) or in the
      19             :  * memory context given to inv_open (for LargeObjectDesc structs).
      20             :  *
      21             :  *
      22             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      23             :  * Portions Copyright (c) 1994, Regents of the University of California
      24             :  *
      25             :  *
      26             :  * IDENTIFICATION
      27             :  *    src/backend/storage/large_object/inv_api.c
      28             :  *
      29             :  *-------------------------------------------------------------------------
      30             :  */
      31             : #include "postgres.h"
      32             : 
      33             : #include <limits.h>
      34             : 
      35             : #include "access/detoast.h"
      36             : #include "access/genam.h"
      37             : #include "access/htup_details.h"
      38             : #include "access/sysattr.h"
      39             : #include "access/table.h"
      40             : #include "access/xact.h"
      41             : #include "catalog/dependency.h"
      42             : #include "catalog/indexing.h"
      43             : #include "catalog/objectaccess.h"
      44             : #include "catalog/pg_largeobject.h"
      45             : #include "catalog/pg_largeobject_metadata.h"
      46             : #include "libpq/libpq-fs.h"
      47             : #include "miscadmin.h"
      48             : #include "storage/large_object.h"
      49             : #include "utils/acl.h"
      50             : #include "utils/fmgroids.h"
      51             : #include "utils/rel.h"
      52             : #include "utils/snapmgr.h"
      53             : 
      54             : 
      55             : /*
      56             :  * GUC: backwards-compatibility flag to suppress LO permission checks
      57             :  */
      58             : bool        lo_compat_privileges;
      59             : 
      60             : /*
      61             :  * All accesses to pg_largeobject and its index make use of a single
      62             :  * Relation reference.  To guarantee that the relcache entry remains
      63             :  * in the cache, on the first reference inside a subtransaction, we
      64             :  * execute a slightly klugy maneuver to assign ownership of the
      65             :  * Relation reference to TopTransactionResourceOwner.
      66             :  */
      67             : static Relation lo_heap_r = NULL;
      68             : static Relation lo_index_r = NULL;
      69             : 
      70             : 
      71             : /*
      72             :  * Open pg_largeobject and its index, if not already done in current xact
      73             :  */
      74             : static void
      75        3058 : open_lo_relation(void)
      76             : {
      77             :     ResourceOwner currentOwner;
      78             : 
      79        3058 :     if (lo_heap_r && lo_index_r)
      80        2772 :         return;                 /* already open in current xact */
      81             : 
      82             :     /* Arrange for the top xact to own these relation references */
      83         286 :     currentOwner = CurrentResourceOwner;
      84         286 :     CurrentResourceOwner = TopTransactionResourceOwner;
      85             : 
      86             :     /* Use RowExclusiveLock since we might either read or write */
      87         286 :     if (lo_heap_r == NULL)
      88         286 :         lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
      89         286 :     if (lo_index_r == NULL)
      90         286 :         lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
      91             : 
      92         286 :     CurrentResourceOwner = currentOwner;
      93             : }
      94             : 
      95             : /*
      96             :  * Clean up at main transaction end
      97             :  */
      98             : void
      99         440 : close_lo_relation(bool isCommit)
     100             : {
     101         440 :     if (lo_heap_r || lo_index_r)
     102             :     {
     103             :         /*
     104             :          * Only bother to close if committing; else abort cleanup will handle
     105             :          * it
     106             :          */
     107         286 :         if (isCommit)
     108             :         {
     109             :             ResourceOwner currentOwner;
     110             : 
     111         210 :             currentOwner = CurrentResourceOwner;
     112         210 :             CurrentResourceOwner = TopTransactionResourceOwner;
     113             : 
     114         210 :             if (lo_index_r)
     115         210 :                 index_close(lo_index_r, NoLock);
     116         210 :             if (lo_heap_r)
     117         210 :                 table_close(lo_heap_r, NoLock);
     118             : 
     119         210 :             CurrentResourceOwner = currentOwner;
     120             :         }
     121         286 :         lo_heap_r = NULL;
     122         286 :         lo_index_r = NULL;
     123             :     }
     124         440 : }
     125             : 
     126             : 
     127             : /*
     128             :  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
     129             :  * read with can be specified.
     130             :  */
     131             : static bool
     132         464 : myLargeObjectExists(Oid loid, Snapshot snapshot)
     133             : {
     134             :     Relation    pg_lo_meta;
     135             :     ScanKeyData skey[1];
     136             :     SysScanDesc sd;
     137             :     HeapTuple   tuple;
     138         464 :     bool        retval = false;
     139             : 
     140         464 :     ScanKeyInit(&skey[0],
     141             :                 Anum_pg_largeobject_metadata_oid,
     142             :                 BTEqualStrategyNumber, F_OIDEQ,
     143             :                 ObjectIdGetDatum(loid));
     144             : 
     145         464 :     pg_lo_meta = table_open(LargeObjectMetadataRelationId,
     146             :                             AccessShareLock);
     147             : 
     148         464 :     sd = systable_beginscan(pg_lo_meta,
     149             :                             LargeObjectMetadataOidIndexId, true,
     150             :                             snapshot, 1, skey);
     151             : 
     152         464 :     tuple = systable_getnext(sd);
     153         464 :     if (HeapTupleIsValid(tuple))
     154         460 :         retval = true;
     155             : 
     156         464 :     systable_endscan(sd);
     157             : 
     158         464 :     table_close(pg_lo_meta, AccessShareLock);
     159             : 
     160         464 :     return retval;
     161             : }
     162             : 
     163             : 
     164             : /*
     165             :  * Extract data field from a pg_largeobject tuple, detoasting if needed
     166             :  * and verifying that the length is sane.  Returns data pointer (a bytea *),
     167             :  * data length, and an indication of whether to pfree the data pointer.
     168             :  */
     169             : static void
     170       10236 : getdatafield(Form_pg_largeobject tuple,
     171             :              bytea **pdatafield,
     172             :              int *plen,
     173             :              bool *pfreeit)
     174             : {
     175             :     bytea      *datafield;
     176             :     int         len;
     177             :     bool        freeit;
     178             : 
     179       10236 :     datafield = &(tuple->data); /* see note at top of file */
     180       10236 :     freeit = false;
     181       10236 :     if (VARATT_IS_EXTENDED(datafield))
     182             :     {
     183             :         datafield = (bytea *)
     184       10070 :             detoast_attr((struct varlena *) datafield);
     185       10070 :         freeit = true;
     186             :     }
     187       10236 :     len = VARSIZE(datafield) - VARHDRSZ;
     188       10236 :     if (len < 0 || len > LOBLKSIZE)
     189           0 :         ereport(ERROR,
     190             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     191             :                  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
     192             :                         tuple->loid, tuple->pageno, len)));
     193       10236 :     *pdatafield = datafield;
     194       10236 :     *plen = len;
     195       10236 :     *pfreeit = freeit;
     196       10236 : }
     197             : 
     198             : 
     199             : /*
     200             :  *  inv_create -- create a new large object
     201             :  *
     202             :  *  Arguments:
     203             :  *    lobjId - OID to use for new large object, or InvalidOid to pick one
     204             :  *
     205             :  *  Returns:
     206             :  *    OID of new object
     207             :  *
     208             :  * If lobjId is not InvalidOid, then an error occurs if the OID is already
     209             :  * in use.
     210             :  */
     211             : Oid
     212         112 : inv_create(Oid lobjId)
     213             : {
     214             :     Oid         lobjId_new;
     215             : 
     216             :     /*
     217             :      * Create a new largeobject with empty data pages
     218             :      */
     219         112 :     lobjId_new = LargeObjectCreate(lobjId);
     220             : 
     221             :     /*
     222             :      * dependency on the owner of largeobject
     223             :      *
     224             :      * The reason why we use LargeObjectRelationId instead of
     225             :      * LargeObjectMetadataRelationId here is to provide backward compatibility
     226             :      * to the applications which utilize a knowledge about internal layout of
     227             :      * system catalogs. OID of pg_largeobject_metadata and loid of
     228             :      * pg_largeobject are same value, so there are no actual differences here.
     229             :      */
     230         112 :     recordDependencyOnOwner(LargeObjectRelationId,
     231             :                             lobjId_new, GetUserId());
     232             : 
     233             :     /* Post creation hook for new large object */
     234         112 :     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
     235             : 
     236             :     /*
     237             :      * Advance command counter to make new tuple visible to later operations.
     238             :      */
     239         112 :     CommandCounterIncrement();
     240             : 
     241         112 :     return lobjId_new;
     242             : }
     243             : 
     244             : /*
     245             :  *  inv_open -- access an existing large object.
     246             :  *
     247             :  * Returns a large object descriptor, appropriately filled in.
     248             :  * The descriptor and subsidiary data are allocated in the specified
     249             :  * memory context, which must be suitably long-lived for the caller's
     250             :  * purposes.  If the returned descriptor has a snapshot associated
     251             :  * with it, the caller must ensure that it also lives long enough,
     252             :  * e.g. by calling RegisterSnapshotOnOwner
     253             :  */
     254             : LargeObjectDesc *
     255         464 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
     256             : {
     257             :     LargeObjectDesc *retval;
     258         464 :     Snapshot    snapshot = NULL;
     259         464 :     int         descflags = 0;
     260             : 
     261             :     /*
     262             :      * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
     263             :      * | INV_READ), the caller being allowed to read the large object
     264             :      * descriptor in either case.
     265             :      */
     266         464 :     if (flags & INV_WRITE)
     267         154 :         descflags |= IFS_WRLOCK | IFS_RDLOCK;
     268         464 :     if (flags & INV_READ)
     269         340 :         descflags |= IFS_RDLOCK;
     270             : 
     271         464 :     if (descflags == 0)
     272           0 :         ereport(ERROR,
     273             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     274             :                  errmsg("invalid flags for opening a large object: %d",
     275             :                         flags)));
     276             : 
     277             :     /* Get snapshot.  If write is requested, use an instantaneous snapshot. */
     278         464 :     if (descflags & IFS_WRLOCK)
     279         154 :         snapshot = NULL;
     280             :     else
     281         310 :         snapshot = GetActiveSnapshot();
     282             : 
     283             :     /* Can't use LargeObjectExists here because we need to specify snapshot */
     284         464 :     if (!myLargeObjectExists(lobjId, snapshot))
     285           4 :         ereport(ERROR,
     286             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     287             :                  errmsg("large object %u does not exist", lobjId)));
     288             : 
     289             :     /* Apply permission checks, again specifying snapshot */
     290         460 :     if ((descflags & IFS_RDLOCK) != 0)
     291             :     {
     292         902 :         if (!lo_compat_privileges &&
     293         442 :             pg_largeobject_aclcheck_snapshot(lobjId,
     294             :                                              GetUserId(),
     295             :                                              ACL_SELECT,
     296             :                                              snapshot) != ACLCHECK_OK)
     297          42 :             ereport(ERROR,
     298             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     299             :                      errmsg("permission denied for large object %u",
     300             :                             lobjId)));
     301             :     }
     302         418 :     if ((descflags & IFS_WRLOCK) != 0)
     303             :     {
     304         248 :         if (!lo_compat_privileges &&
     305         118 :             pg_largeobject_aclcheck_snapshot(lobjId,
     306             :                                              GetUserId(),
     307             :                                              ACL_UPDATE,
     308             :                                              snapshot) != ACLCHECK_OK)
     309          12 :             ereport(ERROR,
     310             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     311             :                      errmsg("permission denied for large object %u",
     312             :                             lobjId)));
     313             :     }
     314             : 
     315             :     /* OK to create a descriptor */
     316         406 :     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
     317             :                                                     sizeof(LargeObjectDesc));
     318         406 :     retval->id = lobjId;
     319         406 :     retval->offset = 0;
     320         406 :     retval->flags = descflags;
     321             : 
     322             :     /* caller sets if needed, not used by the functions in this file */
     323         406 :     retval->subid = InvalidSubTransactionId;
     324             : 
     325             :     /*
     326             :      * The snapshot (if any) is just the currently active snapshot.  The
     327             :      * caller will replace it with a longer-lived copy if needed.
     328             :      */
     329         406 :     retval->snapshot = snapshot;
     330             : 
     331         406 :     return retval;
     332             : }
     333             : 
     334             : /*
     335             :  * Closes a large object descriptor previously made by inv_open(), and
     336             :  * releases the long-term memory used by it.
     337             :  */
     338             : void
     339         376 : inv_close(LargeObjectDesc *obj_desc)
     340             : {
     341             :     Assert(PointerIsValid(obj_desc));
     342         376 :     pfree(obj_desc);
     343         376 : }
     344             : 
     345             : /*
     346             :  * Destroys an existing large object (not to be confused with a descriptor!)
     347             :  *
     348             :  * Note we expect caller to have done any required permissions check.
     349             :  */
     350             : int
     351          82 : inv_drop(Oid lobjId)
     352             : {
     353             :     ObjectAddress object;
     354             : 
     355             :     /*
     356             :      * Delete any comments and dependencies on the large object
     357             :      */
     358          82 :     object.classId = LargeObjectRelationId;
     359          82 :     object.objectId = lobjId;
     360          82 :     object.objectSubId = 0;
     361          82 :     performDeletion(&object, DROP_CASCADE, 0);
     362             : 
     363             :     /*
     364             :      * Advance command counter so that tuple removal will be seen by later
     365             :      * large-object operations in this transaction.
     366             :      */
     367          82 :     CommandCounterIncrement();
     368             : 
     369             :     /* For historical reasons, we always return 1 on success. */
     370          82 :     return 1;
     371             : }
     372             : 
     373             : /*
     374             :  * Determine size of a large object
     375             :  *
     376             :  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
     377             :  * the offset of the last byte + 1.
     378             :  */
     379             : static uint64
     380         104 : inv_getsize(LargeObjectDesc *obj_desc)
     381             : {
     382         104 :     uint64      lastbyte = 0;
     383             :     ScanKeyData skey[1];
     384             :     SysScanDesc sd;
     385             :     HeapTuple   tuple;
     386             : 
     387             :     Assert(PointerIsValid(obj_desc));
     388             : 
     389         104 :     open_lo_relation();
     390             : 
     391         104 :     ScanKeyInit(&skey[0],
     392             :                 Anum_pg_largeobject_loid,
     393             :                 BTEqualStrategyNumber, F_OIDEQ,
     394             :                 ObjectIdGetDatum(obj_desc->id));
     395             : 
     396         104 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     397             :                                     obj_desc->snapshot, 1, skey);
     398             : 
     399             :     /*
     400             :      * Because the pg_largeobject index is on both loid and pageno, but we
     401             :      * constrain only loid, a backwards scan should visit all pages of the
     402             :      * large object in reverse pageno order.  So, it's sufficient to examine
     403             :      * the first valid tuple (== last valid page).
     404             :      */
     405         104 :     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
     406         104 :     if (HeapTupleIsValid(tuple))
     407             :     {
     408             :         Form_pg_largeobject data;
     409             :         bytea      *datafield;
     410             :         int         len;
     411             :         bool        pfreeit;
     412             : 
     413          96 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     414           0 :             elog(ERROR, "null field found in pg_largeobject");
     415          96 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     416          96 :         getdatafield(data, &datafield, &len, &pfreeit);
     417          96 :         lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
     418          96 :         if (pfreeit)
     419          18 :             pfree(datafield);
     420             :     }
     421             : 
     422         104 :     systable_endscan_ordered(sd);
     423             : 
     424         104 :     return lastbyte;
     425             : }
     426             : 
     427             : int64
     428         220 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
     429             : {
     430             :     int64       newoffset;
     431             : 
     432             :     Assert(PointerIsValid(obj_desc));
     433             : 
     434             :     /*
     435             :      * We allow seek/tell if you have either read or write permission, so no
     436             :      * need for a permission check here.
     437             :      */
     438             : 
     439             :     /*
     440             :      * Note: overflow in the additions is possible, but since we will reject
     441             :      * negative results, we don't need any extra test for that.
     442             :      */
     443         220 :     switch (whence)
     444             :     {
     445          98 :         case SEEK_SET:
     446          98 :             newoffset = offset;
     447          98 :             break;
     448          18 :         case SEEK_CUR:
     449          18 :             newoffset = obj_desc->offset + offset;
     450          18 :             break;
     451         104 :         case SEEK_END:
     452         104 :             newoffset = inv_getsize(obj_desc) + offset;
     453         104 :             break;
     454           0 :         default:
     455           0 :             ereport(ERROR,
     456             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     457             :                      errmsg("invalid whence setting: %d", whence)));
     458             :             newoffset = 0;      /* keep compiler quiet */
     459             :             break;
     460             :     }
     461             : 
     462             :     /*
     463             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     464             :      * in translatable strings; doing better is not worth the trouble
     465             :      */
     466         220 :     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
     467           0 :         ereport(ERROR,
     468             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     469             :                  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
     470             :                                  newoffset)));
     471             : 
     472         220 :     obj_desc->offset = newoffset;
     473         220 :     return newoffset;
     474             : }
     475             : 
     476             : int64
     477          48 : inv_tell(LargeObjectDesc *obj_desc)
     478             : {
     479             :     Assert(PointerIsValid(obj_desc));
     480             : 
     481             :     /*
     482             :      * We allow seek/tell if you have either read or write permission, so no
     483             :      * need for a permission check here.
     484             :      */
     485             : 
     486          48 :     return obj_desc->offset;
     487             : }
     488             : 
     489             : int
     490        1368 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
     491             : {
     492        1368 :     int         nread = 0;
     493             :     int64       n;
     494             :     int64       off;
     495             :     int         len;
     496        1368 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     497             :     uint64      pageoff;
     498             :     ScanKeyData skey[2];
     499             :     SysScanDesc sd;
     500             :     HeapTuple   tuple;
     501             : 
     502             :     Assert(PointerIsValid(obj_desc));
     503             :     Assert(buf != NULL);
     504             : 
     505        1368 :     if ((obj_desc->flags & IFS_RDLOCK) == 0)
     506           0 :         ereport(ERROR,
     507             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     508             :                  errmsg("permission denied for large object %u",
     509             :                         obj_desc->id)));
     510             : 
     511        1368 :     if (nbytes <= 0)
     512           8 :         return 0;
     513             : 
     514        1360 :     open_lo_relation();
     515             : 
     516        1360 :     ScanKeyInit(&skey[0],
     517             :                 Anum_pg_largeobject_loid,
     518             :                 BTEqualStrategyNumber, F_OIDEQ,
     519             :                 ObjectIdGetDatum(obj_desc->id));
     520             : 
     521        1360 :     ScanKeyInit(&skey[1],
     522             :                 Anum_pg_largeobject_pageno,
     523             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     524             :                 Int32GetDatum(pageno));
     525             : 
     526        1360 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     527             :                                     obj_desc->snapshot, 2, skey);
     528             : 
     529       10408 :     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     530             :     {
     531             :         Form_pg_largeobject data;
     532             :         bytea      *datafield;
     533             :         bool        pfreeit;
     534             : 
     535       10110 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     536           0 :             elog(ERROR, "null field found in pg_largeobject");
     537       10110 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     538             : 
     539             :         /*
     540             :          * We expect the indexscan will deliver pages in order.  However,
     541             :          * there may be missing pages if the LO contains unwritten "holes". We
     542             :          * want missing sections to read out as zeroes.
     543             :          */
     544       10110 :         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
     545       10110 :         if (pageoff > obj_desc->offset)
     546             :         {
     547          12 :             n = pageoff - obj_desc->offset;
     548          12 :             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     549          12 :             MemSet(buf + nread, 0, n);
     550          12 :             nread += n;
     551          12 :             obj_desc->offset += n;
     552             :         }
     553             : 
     554       10110 :         if (nread < nbytes)
     555             :         {
     556             :             Assert(obj_desc->offset >= pageoff);
     557       10104 :             off = (int) (obj_desc->offset - pageoff);
     558             :             Assert(off >= 0 && off < LOBLKSIZE);
     559             : 
     560       10104 :             getdatafield(data, &datafield, &len, &pfreeit);
     561       10104 :             if (len > off)
     562             :             {
     563       10008 :                 n = len - off;
     564       10008 :                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     565       10008 :                 memcpy(buf + nread, VARDATA(datafield) + off, n);
     566       10008 :                 nread += n;
     567       10008 :                 obj_desc->offset += n;
     568             :             }
     569       10104 :             if (pfreeit)
     570       10028 :                 pfree(datafield);
     571             :         }
     572             : 
     573       10110 :         if (nread >= nbytes)
     574        1062 :             break;
     575             :     }
     576             : 
     577        1360 :     systable_endscan_ordered(sd);
     578             : 
     579        1360 :     return nread;
     580             : }
     581             : 
     582             : int
     583        1552 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
     584             : {
     585        1552 :     int         nwritten = 0;
     586             :     int         n;
     587             :     int         off;
     588             :     int         len;
     589        1552 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     590             :     ScanKeyData skey[2];
     591             :     SysScanDesc sd;
     592             :     HeapTuple   oldtuple;
     593             :     Form_pg_largeobject olddata;
     594             :     bool        neednextpage;
     595             :     bytea      *datafield;
     596             :     bool        pfreeit;
     597             :     union
     598             :     {
     599             :         bytea       hdr;
     600             :         /* this is to make the union big enough for a LO data chunk: */
     601             :         char        data[LOBLKSIZE + VARHDRSZ];
     602             :         /* ensure union is aligned well enough: */
     603             :         int32       align_it;
     604             :     }           workbuf;
     605        1552 :     char       *workb = VARDATA(&workbuf.hdr);
     606             :     HeapTuple   newtup;
     607             :     Datum       values[Natts_pg_largeobject];
     608             :     bool        nulls[Natts_pg_largeobject];
     609             :     bool        replace[Natts_pg_largeobject];
     610             :     CatalogIndexState indstate;
     611             : 
     612             :     Assert(PointerIsValid(obj_desc));
     613             :     Assert(buf != NULL);
     614             : 
     615             :     /* enforce writability because snapshot is probably wrong otherwise */
     616        1552 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     617           0 :         ereport(ERROR,
     618             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     619             :                  errmsg("permission denied for large object %u",
     620             :                         obj_desc->id)));
     621             : 
     622        1552 :     if (nbytes <= 0)
     623           0 :         return 0;
     624             : 
     625             :     /* this addition can't overflow because nbytes is only int32 */
     626        1552 :     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
     627           0 :         ereport(ERROR,
     628             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     629             :                  errmsg("invalid large object write request size: %d",
     630             :                         nbytes)));
     631             : 
     632        1552 :     open_lo_relation();
     633             : 
     634        1552 :     indstate = CatalogOpenIndexes(lo_heap_r);
     635             : 
     636        1552 :     ScanKeyInit(&skey[0],
     637             :                 Anum_pg_largeobject_loid,
     638             :                 BTEqualStrategyNumber, F_OIDEQ,
     639             :                 ObjectIdGetDatum(obj_desc->id));
     640             : 
     641        1552 :     ScanKeyInit(&skey[1],
     642             :                 Anum_pg_largeobject_pageno,
     643             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     644             :                 Int32GetDatum(pageno));
     645             : 
     646        1552 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     647             :                                     obj_desc->snapshot, 2, skey);
     648             : 
     649        1552 :     oldtuple = NULL;
     650        1552 :     olddata = NULL;
     651        1552 :     neednextpage = true;
     652             : 
     653        9500 :     while (nwritten < nbytes)
     654             :     {
     655             :         /*
     656             :          * If possible, get next pre-existing page of the LO.  We expect the
     657             :          * indexscan will deliver these in order --- but there may be holes.
     658             :          */
     659        7948 :         if (neednextpage)
     660             :         {
     661        1558 :             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     662             :             {
     663          24 :                 if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     664           0 :                     elog(ERROR, "null field found in pg_largeobject");
     665          24 :                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     666             :                 Assert(olddata->pageno >= pageno);
     667             :             }
     668        1558 :             neednextpage = false;
     669             :         }
     670             : 
     671             :         /*
     672             :          * If we have a pre-existing page, see if it is the page we want to
     673             :          * write, or a later one.
     674             :          */
     675        7948 :         if (olddata != NULL && olddata->pageno == pageno)
     676             :         {
     677             :             /*
     678             :              * Update an existing page with fresh data.
     679             :              *
     680             :              * First, load old data into workbuf
     681             :              */
     682          24 :             getdatafield(olddata, &datafield, &len, &pfreeit);
     683          24 :             memcpy(workb, VARDATA(datafield), len);
     684          24 :             if (pfreeit)
     685          18 :                 pfree(datafield);
     686             : 
     687             :             /*
     688             :              * Fill any hole
     689             :              */
     690          24 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     691          24 :             if (off > len)
     692           0 :                 MemSet(workb + len, 0, off - len);
     693             : 
     694             :             /*
     695             :              * Insert appropriate portion of new data
     696             :              */
     697          24 :             n = LOBLKSIZE - off;
     698          24 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     699          24 :             memcpy(workb + off, buf + nwritten, n);
     700          24 :             nwritten += n;
     701          24 :             obj_desc->offset += n;
     702          24 :             off += n;
     703             :             /* compute valid length of new page */
     704          24 :             len = (len >= off) ? len : off;
     705          24 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     706             : 
     707             :             /*
     708             :              * Form and insert updated tuple
     709             :              */
     710          24 :             memset(values, 0, sizeof(values));
     711          24 :             memset(nulls, false, sizeof(nulls));
     712          24 :             memset(replace, false, sizeof(replace));
     713          24 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     714          24 :             replace[Anum_pg_largeobject_data - 1] = true;
     715          24 :             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     716             :                                        values, nulls, replace);
     717          24 :             CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     718             :                                        indstate);
     719          24 :             heap_freetuple(newtup);
     720             : 
     721             :             /*
     722             :              * We're done with this old page.
     723             :              */
     724          24 :             oldtuple = NULL;
     725          24 :             olddata = NULL;
     726          24 :             neednextpage = true;
     727             :         }
     728             :         else
     729             :         {
     730             :             /*
     731             :              * Write a brand new page.
     732             :              *
     733             :              * First, fill any hole
     734             :              */
     735        7924 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     736        7924 :             if (off > 0)
     737           6 :                 MemSet(workb, 0, off);
     738             : 
     739             :             /*
     740             :              * Insert appropriate portion of new data
     741             :              */
     742        7924 :             n = LOBLKSIZE - off;
     743        7924 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     744        7924 :             memcpy(workb + off, buf + nwritten, n);
     745        7924 :             nwritten += n;
     746        7924 :             obj_desc->offset += n;
     747             :             /* compute valid length of new page */
     748        7924 :             len = off + n;
     749        7924 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     750             : 
     751             :             /*
     752             :              * Form and insert updated tuple
     753             :              */
     754        7924 :             memset(values, 0, sizeof(values));
     755        7924 :             memset(nulls, false, sizeof(nulls));
     756        7924 :             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     757        7924 :             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     758        7924 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     759        7924 :             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     760        7924 :             CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     761        7924 :             heap_freetuple(newtup);
     762             :         }
     763        7948 :         pageno++;
     764             :     }
     765             : 
     766        1552 :     systable_endscan_ordered(sd);
     767             : 
     768        1552 :     CatalogCloseIndexes(indstate);
     769             : 
     770             :     /*
     771             :      * Advance command counter so that my tuple updates will be seen by later
     772             :      * large-object operations in this transaction.
     773             :      */
     774        1552 :     CommandCounterIncrement();
     775             : 
     776        1552 :     return nwritten;
     777             : }
     778             : 
     779             : void
     780          42 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
     781             : {
     782          42 :     int32       pageno = (int32) (len / LOBLKSIZE);
     783             :     int32       off;
     784             :     ScanKeyData skey[2];
     785             :     SysScanDesc sd;
     786             :     HeapTuple   oldtuple;
     787             :     Form_pg_largeobject olddata;
     788             :     union
     789             :     {
     790             :         bytea       hdr;
     791             :         /* this is to make the union big enough for a LO data chunk: */
     792             :         char        data[LOBLKSIZE + VARHDRSZ];
     793             :         /* ensure union is aligned well enough: */
     794             :         int32       align_it;
     795             :     }           workbuf;
     796          42 :     char       *workb = VARDATA(&workbuf.hdr);
     797             :     HeapTuple   newtup;
     798             :     Datum       values[Natts_pg_largeobject];
     799             :     bool        nulls[Natts_pg_largeobject];
     800             :     bool        replace[Natts_pg_largeobject];
     801             :     CatalogIndexState indstate;
     802             : 
     803             :     Assert(PointerIsValid(obj_desc));
     804             : 
     805             :     /* enforce writability because snapshot is probably wrong otherwise */
     806          42 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     807           0 :         ereport(ERROR,
     808             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     809             :                  errmsg("permission denied for large object %u",
     810             :                         obj_desc->id)));
     811             : 
     812             :     /*
     813             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     814             :      * in translatable strings; doing better is not worth the trouble
     815             :      */
     816          42 :     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
     817           0 :         ereport(ERROR,
     818             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     819             :                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
     820             :                                  len)));
     821             : 
     822          42 :     open_lo_relation();
     823             : 
     824          42 :     indstate = CatalogOpenIndexes(lo_heap_r);
     825             : 
     826             :     /*
     827             :      * Set up to find all pages with desired loid and pageno >= target
     828             :      */
     829          42 :     ScanKeyInit(&skey[0],
     830             :                 Anum_pg_largeobject_loid,
     831             :                 BTEqualStrategyNumber, F_OIDEQ,
     832             :                 ObjectIdGetDatum(obj_desc->id));
     833             : 
     834          42 :     ScanKeyInit(&skey[1],
     835             :                 Anum_pg_largeobject_pageno,
     836             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     837             :                 Int32GetDatum(pageno));
     838             : 
     839          42 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     840             :                                     obj_desc->snapshot, 2, skey);
     841             : 
     842             :     /*
     843             :      * If possible, get the page the truncation point is in. The truncation
     844             :      * point may be beyond the end of the LO or in a hole.
     845             :      */
     846          42 :     olddata = NULL;
     847          42 :     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     848             :     {
     849          24 :         if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     850           0 :             elog(ERROR, "null field found in pg_largeobject");
     851          24 :         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     852             :         Assert(olddata->pageno >= pageno);
     853             :     }
     854             : 
     855             :     /*
     856             :      * If we found the page of the truncation point we need to truncate the
     857             :      * data in it.  Otherwise if we're in a hole, we need to create a page to
     858             :      * mark the end of data.
     859             :      */
     860          42 :     if (olddata != NULL && olddata->pageno == pageno)
     861          12 :     {
     862             :         /* First, load old data into workbuf */
     863             :         bytea      *datafield;
     864             :         int         pagelen;
     865             :         bool        pfreeit;
     866             : 
     867          12 :         getdatafield(olddata, &datafield, &pagelen, &pfreeit);
     868          12 :         memcpy(workb, VARDATA(datafield), pagelen);
     869          12 :         if (pfreeit)
     870           6 :             pfree(datafield);
     871             : 
     872             :         /*
     873             :          * Fill any hole
     874             :          */
     875          12 :         off = len % LOBLKSIZE;
     876          12 :         if (off > pagelen)
     877           6 :             MemSet(workb + pagelen, 0, off - pagelen);
     878             : 
     879             :         /* compute length of new page */
     880          12 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     881             : 
     882             :         /*
     883             :          * Form and insert updated tuple
     884             :          */
     885          12 :         memset(values, 0, sizeof(values));
     886          12 :         memset(nulls, false, sizeof(nulls));
     887          12 :         memset(replace, false, sizeof(replace));
     888          12 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     889          12 :         replace[Anum_pg_largeobject_data - 1] = true;
     890          12 :         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     891             :                                    values, nulls, replace);
     892          12 :         CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     893             :                                    indstate);
     894          12 :         heap_freetuple(newtup);
     895             :     }
     896             :     else
     897             :     {
     898             :         /*
     899             :          * If the first page we found was after the truncation point, we're in
     900             :          * a hole that we'll fill, but we need to delete the later page
     901             :          * because the loop below won't visit it again.
     902             :          */
     903          30 :         if (olddata != NULL)
     904             :         {
     905             :             Assert(olddata->pageno > pageno);
     906          12 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     907             :         }
     908             : 
     909             :         /*
     910             :          * Write a brand new page.
     911             :          *
     912             :          * Fill the hole up to the truncation point
     913             :          */
     914          30 :         off = len % LOBLKSIZE;
     915          30 :         if (off > 0)
     916          30 :             MemSet(workb, 0, off);
     917             : 
     918             :         /* compute length of new page */
     919          30 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     920             : 
     921             :         /*
     922             :          * Form and insert new tuple
     923             :          */
     924          30 :         memset(values, 0, sizeof(values));
     925          30 :         memset(nulls, false, sizeof(nulls));
     926          30 :         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     927          30 :         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     928          30 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     929          30 :         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     930          30 :         CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     931          30 :         heap_freetuple(newtup);
     932             :     }
     933             : 
     934             :     /*
     935             :      * Delete any pages after the truncation point.  If the initial search
     936             :      * didn't find a page, then of course there's nothing more to do.
     937             :      */
     938          42 :     if (olddata != NULL)
     939             :     {
     940          30 :         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     941             :         {
     942           6 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     943             :         }
     944             :     }
     945             : 
     946          42 :     systable_endscan_ordered(sd);
     947             : 
     948          42 :     CatalogCloseIndexes(indstate);
     949             : 
     950             :     /*
     951             :      * Advance command counter so that tuple updates will be seen by later
     952             :      * large-object operations in this transaction.
     953             :      */
     954          42 :     CommandCounterIncrement();
     955          42 : }

Generated by: LCOV version 1.14