LCOV - code coverage report
Current view: top level - src/backend/storage/large_object - inv_api.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17beta1 Lines: 279 295 94.6 %
Date: 2024-06-14 19:10:57 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * inv_api.c
       4             :  *    routines for manipulating inversion fs large objects. This file
       5             :  *    contains the user-level large object application interface routines.
       6             :  *
       7             :  *
       8             :  * Note: we access pg_largeobject.data using its C struct declaration.
       9             :  * This is safe because it immediately follows pageno which is an int4 field,
      10             :  * and therefore the data field will always be 4-byte aligned, even if it
      11             :  * is in the short 1-byte-header format.  We have to detoast it since it's
      12             :  * quite likely to be in compressed or short format.  We also need to check
      13             :  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
      14             :  *
      15             :  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
      16             :  * does most of the backend code.  We expect that CurrentMemoryContext will
      17             :  * be a short-lived context.  Data that must persist across function calls
      18             :  * is kept either in CacheMemoryContext (the Relation structs) or in the
      19             :  * memory context given to inv_open (for LargeObjectDesc structs).
      20             :  *
      21             :  *
      22             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      23             :  * Portions Copyright (c) 1994, Regents of the University of California
      24             :  *
      25             :  *
      26             :  * IDENTIFICATION
      27             :  *    src/backend/storage/large_object/inv_api.c
      28             :  *
      29             :  *-------------------------------------------------------------------------
      30             :  */
      31             : #include "postgres.h"
      32             : 
      33             : #include <limits.h>
      34             : 
      35             : #include "access/detoast.h"
      36             : #include "access/genam.h"
      37             : #include "access/htup_details.h"
      38             : #include "access/table.h"
      39             : #include "access/xact.h"
      40             : #include "catalog/dependency.h"
      41             : #include "catalog/indexing.h"
      42             : #include "catalog/objectaccess.h"
      43             : #include "catalog/pg_largeobject.h"
      44             : #include "catalog/pg_largeobject_metadata.h"
      45             : #include "libpq/libpq-fs.h"
      46             : #include "miscadmin.h"
      47             : #include "storage/large_object.h"
      48             : #include "utils/acl.h"
      49             : #include "utils/fmgroids.h"
      50             : #include "utils/rel.h"
      51             : #include "utils/snapmgr.h"
      52             : 
      53             : 
      54             : /*
      55             :  * GUC: backwards-compatibility flag to suppress LO permission checks
      56             :  */
      57             : bool        lo_compat_privileges;
      58             : 
      59             : /*
      60             :  * All accesses to pg_largeobject and its index make use of a single
      61             :  * Relation reference.  To guarantee that the relcache entry remains
      62             :  * in the cache, on the first reference inside a subtransaction, we
      63             :  * execute a slightly klugy maneuver to assign ownership of the
      64             :  * Relation reference to TopTransactionResourceOwner.
      65             :  */
      66             : static Relation lo_heap_r = NULL;
      67             : static Relation lo_index_r = NULL;
      68             : 
      69             : 
      70             : /*
      71             :  * Open pg_largeobject and its index, if not already done in current xact
      72             :  */
      73             : static void
      74        3058 : open_lo_relation(void)
      75             : {
      76             :     ResourceOwner currentOwner;
      77             : 
      78        3058 :     if (lo_heap_r && lo_index_r)
      79        2766 :         return;                 /* already open in current xact */
      80             : 
      81             :     /* Arrange for the top xact to own these relation references */
      82         292 :     currentOwner = CurrentResourceOwner;
      83         292 :     CurrentResourceOwner = TopTransactionResourceOwner;
      84             : 
      85             :     /* Use RowExclusiveLock since we might either read or write */
      86         292 :     if (lo_heap_r == NULL)
      87         292 :         lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
      88         292 :     if (lo_index_r == NULL)
      89         292 :         lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
      90             : 
      91         292 :     CurrentResourceOwner = currentOwner;
      92             : }
      93             : 
      94             : /*
      95             :  * Clean up at main transaction end
      96             :  */
      97             : void
      98         442 : close_lo_relation(bool isCommit)
      99             : {
     100         442 :     if (lo_heap_r || lo_index_r)
     101             :     {
     102             :         /*
     103             :          * Only bother to close if committing; else abort cleanup will handle
     104             :          * it
     105             :          */
     106         292 :         if (isCommit)
     107             :         {
     108             :             ResourceOwner currentOwner;
     109             : 
     110         210 :             currentOwner = CurrentResourceOwner;
     111         210 :             CurrentResourceOwner = TopTransactionResourceOwner;
     112             : 
     113         210 :             if (lo_index_r)
     114         210 :                 index_close(lo_index_r, NoLock);
     115         210 :             if (lo_heap_r)
     116         210 :                 table_close(lo_heap_r, NoLock);
     117             : 
     118         210 :             CurrentResourceOwner = currentOwner;
     119             :         }
     120         292 :         lo_heap_r = NULL;
     121         292 :         lo_index_r = NULL;
     122             :     }
     123         442 : }
     124             : 
     125             : 
     126             : /*
     127             :  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
     128             :  * read with can be specified.
     129             :  */
     130             : static bool
     131         464 : myLargeObjectExists(Oid loid, Snapshot snapshot)
     132             : {
     133             :     Relation    pg_lo_meta;
     134             :     ScanKeyData skey[1];
     135             :     SysScanDesc sd;
     136             :     HeapTuple   tuple;
     137         464 :     bool        retval = false;
     138             : 
     139         464 :     ScanKeyInit(&skey[0],
     140             :                 Anum_pg_largeobject_metadata_oid,
     141             :                 BTEqualStrategyNumber, F_OIDEQ,
     142             :                 ObjectIdGetDatum(loid));
     143             : 
     144         464 :     pg_lo_meta = table_open(LargeObjectMetadataRelationId,
     145             :                             AccessShareLock);
     146             : 
     147         464 :     sd = systable_beginscan(pg_lo_meta,
     148             :                             LargeObjectMetadataOidIndexId, true,
     149             :                             snapshot, 1, skey);
     150             : 
     151         464 :     tuple = systable_getnext(sd);
     152         464 :     if (HeapTupleIsValid(tuple))
     153         460 :         retval = true;
     154             : 
     155         464 :     systable_endscan(sd);
     156             : 
     157         464 :     table_close(pg_lo_meta, AccessShareLock);
     158             : 
     159         464 :     return retval;
     160             : }
     161             : 
     162             : 
     163             : /*
     164             :  * Extract data field from a pg_largeobject tuple, detoasting if needed
     165             :  * and verifying that the length is sane.  Returns data pointer (a bytea *),
     166             :  * data length, and an indication of whether to pfree the data pointer.
     167             :  */
     168             : static void
     169       10236 : getdatafield(Form_pg_largeobject tuple,
     170             :              bytea **pdatafield,
     171             :              int *plen,
     172             :              bool *pfreeit)
     173             : {
     174             :     bytea      *datafield;
     175             :     int         len;
     176             :     bool        freeit;
     177             : 
     178       10236 :     datafield = &(tuple->data); /* see note at top of file */
     179       10236 :     freeit = false;
     180       10236 :     if (VARATT_IS_EXTENDED(datafield))
     181             :     {
     182             :         datafield = (bytea *)
     183       10070 :             detoast_attr((struct varlena *) datafield);
     184       10070 :         freeit = true;
     185             :     }
     186       10236 :     len = VARSIZE(datafield) - VARHDRSZ;
     187       10236 :     if (len < 0 || len > LOBLKSIZE)
     188           0 :         ereport(ERROR,
     189             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     190             :                  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
     191             :                         tuple->loid, tuple->pageno, len)));
     192       10236 :     *pdatafield = datafield;
     193       10236 :     *plen = len;
     194       10236 :     *pfreeit = freeit;
     195       10236 : }
     196             : 
     197             : 
     198             : /*
     199             :  *  inv_create -- create a new large object
     200             :  *
     201             :  *  Arguments:
     202             :  *    lobjId - OID to use for new large object, or InvalidOid to pick one
     203             :  *
     204             :  *  Returns:
     205             :  *    OID of new object
     206             :  *
     207             :  * If lobjId is not InvalidOid, then an error occurs if the OID is already
     208             :  * in use.
     209             :  */
     210             : Oid
     211         112 : inv_create(Oid lobjId)
     212             : {
     213             :     Oid         lobjId_new;
     214             : 
     215             :     /*
     216             :      * Create a new largeobject with empty data pages
     217             :      */
     218         112 :     lobjId_new = LargeObjectCreate(lobjId);
     219             : 
     220             :     /*
     221             :      * dependency on the owner of largeobject
     222             :      *
     223             :      * Note that LO dependencies are recorded using classId
     224             :      * LargeObjectRelationId for backwards-compatibility reasons.  Using
     225             :      * LargeObjectMetadataRelationId instead would simplify matters for the
     226             :      * backend, but it'd complicate pg_dump and possibly break other clients.
     227             :      */
     228         112 :     recordDependencyOnOwner(LargeObjectRelationId,
     229             :                             lobjId_new, GetUserId());
     230             : 
     231             :     /* Post creation hook for new large object */
     232         112 :     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
     233             : 
     234             :     /*
     235             :      * Advance command counter to make new tuple visible to later operations.
     236             :      */
     237         112 :     CommandCounterIncrement();
     238             : 
     239         112 :     return lobjId_new;
     240             : }
     241             : 
     242             : /*
     243             :  *  inv_open -- access an existing large object.
     244             :  *
     245             :  * Returns a large object descriptor, appropriately filled in.
     246             :  * The descriptor and subsidiary data are allocated in the specified
     247             :  * memory context, which must be suitably long-lived for the caller's
     248             :  * purposes.  If the returned descriptor has a snapshot associated
     249             :  * with it, the caller must ensure that it also lives long enough,
     250             :  * e.g. by calling RegisterSnapshotOnOwner
     251             :  */
     252             : LargeObjectDesc *
     253         464 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
     254             : {
     255             :     LargeObjectDesc *retval;
     256         464 :     Snapshot    snapshot = NULL;
     257         464 :     int         descflags = 0;
     258             : 
     259             :     /*
     260             :      * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
     261             :      * | INV_READ), the caller being allowed to read the large object
     262             :      * descriptor in either case.
     263             :      */
     264         464 :     if (flags & INV_WRITE)
     265         154 :         descflags |= IFS_WRLOCK | IFS_RDLOCK;
     266         464 :     if (flags & INV_READ)
     267         340 :         descflags |= IFS_RDLOCK;
     268             : 
     269         464 :     if (descflags == 0)
     270           0 :         ereport(ERROR,
     271             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     272             :                  errmsg("invalid flags for opening a large object: %d",
     273             :                         flags)));
     274             : 
     275             :     /* Get snapshot.  If write is requested, use an instantaneous snapshot. */
     276         464 :     if (descflags & IFS_WRLOCK)
     277         154 :         snapshot = NULL;
     278             :     else
     279         310 :         snapshot = GetActiveSnapshot();
     280             : 
     281             :     /* Can't use LargeObjectExists here because we need to specify snapshot */
     282         464 :     if (!myLargeObjectExists(lobjId, snapshot))
     283           4 :         ereport(ERROR,
     284             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     285             :                  errmsg("large object %u does not exist", lobjId)));
     286             : 
     287             :     /* Apply permission checks, again specifying snapshot */
     288         460 :     if ((descflags & IFS_RDLOCK) != 0)
     289             :     {
     290         902 :         if (!lo_compat_privileges &&
     291         442 :             pg_largeobject_aclcheck_snapshot(lobjId,
     292             :                                              GetUserId(),
     293             :                                              ACL_SELECT,
     294             :                                              snapshot) != ACLCHECK_OK)
     295          42 :             ereport(ERROR,
     296             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     297             :                      errmsg("permission denied for large object %u",
     298             :                             lobjId)));
     299             :     }
     300         418 :     if ((descflags & IFS_WRLOCK) != 0)
     301             :     {
     302         248 :         if (!lo_compat_privileges &&
     303         118 :             pg_largeobject_aclcheck_snapshot(lobjId,
     304             :                                              GetUserId(),
     305             :                                              ACL_UPDATE,
     306             :                                              snapshot) != ACLCHECK_OK)
     307          12 :             ereport(ERROR,
     308             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     309             :                      errmsg("permission denied for large object %u",
     310             :                             lobjId)));
     311             :     }
     312             : 
     313             :     /* OK to create a descriptor */
     314         406 :     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
     315             :                                                     sizeof(LargeObjectDesc));
     316         406 :     retval->id = lobjId;
     317         406 :     retval->offset = 0;
     318         406 :     retval->flags = descflags;
     319             : 
     320             :     /* caller sets if needed, not used by the functions in this file */
     321         406 :     retval->subid = InvalidSubTransactionId;
     322             : 
     323             :     /*
     324             :      * The snapshot (if any) is just the currently active snapshot.  The
     325             :      * caller will replace it with a longer-lived copy if needed.
     326             :      */
     327         406 :     retval->snapshot = snapshot;
     328             : 
     329         406 :     return retval;
     330             : }
     331             : 
     332             : /*
     333             :  * Closes a large object descriptor previously made by inv_open(), and
     334             :  * releases the long-term memory used by it.
     335             :  */
     336             : void
     337         376 : inv_close(LargeObjectDesc *obj_desc)
     338             : {
     339             :     Assert(PointerIsValid(obj_desc));
     340         376 :     pfree(obj_desc);
     341         376 : }
     342             : 
     343             : /*
     344             :  * Destroys an existing large object (not to be confused with a descriptor!)
     345             :  *
     346             :  * Note we expect caller to have done any required permissions check.
     347             :  */
     348             : int
     349          82 : inv_drop(Oid lobjId)
     350             : {
     351             :     ObjectAddress object;
     352             : 
     353             :     /*
     354             :      * Delete any comments and dependencies on the large object
     355             :      */
     356          82 :     object.classId = LargeObjectRelationId;
     357          82 :     object.objectId = lobjId;
     358          82 :     object.objectSubId = 0;
     359          82 :     performDeletion(&object, DROP_CASCADE, 0);
     360             : 
     361             :     /*
     362             :      * Advance command counter so that tuple removal will be seen by later
     363             :      * large-object operations in this transaction.
     364             :      */
     365          82 :     CommandCounterIncrement();
     366             : 
     367             :     /* For historical reasons, we always return 1 on success. */
     368          82 :     return 1;
     369             : }
     370             : 
     371             : /*
     372             :  * Determine size of a large object
     373             :  *
     374             :  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
     375             :  * the offset of the last byte + 1.
     376             :  */
     377             : static uint64
     378         104 : inv_getsize(LargeObjectDesc *obj_desc)
     379             : {
     380         104 :     uint64      lastbyte = 0;
     381             :     ScanKeyData skey[1];
     382             :     SysScanDesc sd;
     383             :     HeapTuple   tuple;
     384             : 
     385             :     Assert(PointerIsValid(obj_desc));
     386             : 
     387         104 :     open_lo_relation();
     388             : 
     389         104 :     ScanKeyInit(&skey[0],
     390             :                 Anum_pg_largeobject_loid,
     391             :                 BTEqualStrategyNumber, F_OIDEQ,
     392             :                 ObjectIdGetDatum(obj_desc->id));
     393             : 
     394         104 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     395             :                                     obj_desc->snapshot, 1, skey);
     396             : 
     397             :     /*
     398             :      * Because the pg_largeobject index is on both loid and pageno, but we
     399             :      * constrain only loid, a backwards scan should visit all pages of the
     400             :      * large object in reverse pageno order.  So, it's sufficient to examine
     401             :      * the first valid tuple (== last valid page).
     402             :      */
     403         104 :     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
     404         104 :     if (HeapTupleIsValid(tuple))
     405             :     {
     406             :         Form_pg_largeobject data;
     407             :         bytea      *datafield;
     408             :         int         len;
     409             :         bool        pfreeit;
     410             : 
     411          96 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     412           0 :             elog(ERROR, "null field found in pg_largeobject");
     413          96 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     414          96 :         getdatafield(data, &datafield, &len, &pfreeit);
     415          96 :         lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
     416          96 :         if (pfreeit)
     417          18 :             pfree(datafield);
     418             :     }
     419             : 
     420         104 :     systable_endscan_ordered(sd);
     421             : 
     422         104 :     return lastbyte;
     423             : }
     424             : 
     425             : int64
     426         220 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
     427             : {
     428             :     int64       newoffset;
     429             : 
     430             :     Assert(PointerIsValid(obj_desc));
     431             : 
     432             :     /*
     433             :      * We allow seek/tell if you have either read or write permission, so no
     434             :      * need for a permission check here.
     435             :      */
     436             : 
     437             :     /*
     438             :      * Note: overflow in the additions is possible, but since we will reject
     439             :      * negative results, we don't need any extra test for that.
     440             :      */
     441         220 :     switch (whence)
     442             :     {
     443          98 :         case SEEK_SET:
     444          98 :             newoffset = offset;
     445          98 :             break;
     446          18 :         case SEEK_CUR:
     447          18 :             newoffset = obj_desc->offset + offset;
     448          18 :             break;
     449         104 :         case SEEK_END:
     450         104 :             newoffset = inv_getsize(obj_desc) + offset;
     451         104 :             break;
     452           0 :         default:
     453           0 :             ereport(ERROR,
     454             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     455             :                      errmsg("invalid whence setting: %d", whence)));
     456             :             newoffset = 0;      /* keep compiler quiet */
     457             :             break;
     458             :     }
     459             : 
     460             :     /*
     461             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     462             :      * in translatable strings; doing better is not worth the trouble
     463             :      */
     464         220 :     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
     465           0 :         ereport(ERROR,
     466             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     467             :                  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
     468             :                                  newoffset)));
     469             : 
     470         220 :     obj_desc->offset = newoffset;
     471         220 :     return newoffset;
     472             : }
     473             : 
     474             : int64
     475          48 : inv_tell(LargeObjectDesc *obj_desc)
     476             : {
     477             :     Assert(PointerIsValid(obj_desc));
     478             : 
     479             :     /*
     480             :      * We allow seek/tell if you have either read or write permission, so no
     481             :      * need for a permission check here.
     482             :      */
     483             : 
     484          48 :     return obj_desc->offset;
     485             : }
     486             : 
     487             : int
     488        1368 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
     489             : {
     490        1368 :     int         nread = 0;
     491             :     int64       n;
     492             :     int64       off;
     493             :     int         len;
     494        1368 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     495             :     uint64      pageoff;
     496             :     ScanKeyData skey[2];
     497             :     SysScanDesc sd;
     498             :     HeapTuple   tuple;
     499             : 
     500             :     Assert(PointerIsValid(obj_desc));
     501             :     Assert(buf != NULL);
     502             : 
     503        1368 :     if ((obj_desc->flags & IFS_RDLOCK) == 0)
     504           0 :         ereport(ERROR,
     505             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     506             :                  errmsg("permission denied for large object %u",
     507             :                         obj_desc->id)));
     508             : 
     509        1368 :     if (nbytes <= 0)
     510           8 :         return 0;
     511             : 
     512        1360 :     open_lo_relation();
     513             : 
     514        1360 :     ScanKeyInit(&skey[0],
     515             :                 Anum_pg_largeobject_loid,
     516             :                 BTEqualStrategyNumber, F_OIDEQ,
     517             :                 ObjectIdGetDatum(obj_desc->id));
     518             : 
     519        1360 :     ScanKeyInit(&skey[1],
     520             :                 Anum_pg_largeobject_pageno,
     521             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     522             :                 Int32GetDatum(pageno));
     523             : 
     524        1360 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     525             :                                     obj_desc->snapshot, 2, skey);
     526             : 
     527       10408 :     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     528             :     {
     529             :         Form_pg_largeobject data;
     530             :         bytea      *datafield;
     531             :         bool        pfreeit;
     532             : 
     533       10110 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     534           0 :             elog(ERROR, "null field found in pg_largeobject");
     535       10110 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     536             : 
     537             :         /*
     538             :          * We expect the indexscan will deliver pages in order.  However,
     539             :          * there may be missing pages if the LO contains unwritten "holes". We
     540             :          * want missing sections to read out as zeroes.
     541             :          */
     542       10110 :         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
     543       10110 :         if (pageoff > obj_desc->offset)
     544             :         {
     545          12 :             n = pageoff - obj_desc->offset;
     546          12 :             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     547          12 :             MemSet(buf + nread, 0, n);
     548          12 :             nread += n;
     549          12 :             obj_desc->offset += n;
     550             :         }
     551             : 
     552       10110 :         if (nread < nbytes)
     553             :         {
     554             :             Assert(obj_desc->offset >= pageoff);
     555       10104 :             off = (int) (obj_desc->offset - pageoff);
     556             :             Assert(off >= 0 && off < LOBLKSIZE);
     557             : 
     558       10104 :             getdatafield(data, &datafield, &len, &pfreeit);
     559       10104 :             if (len > off)
     560             :             {
     561       10008 :                 n = len - off;
     562       10008 :                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     563       10008 :                 memcpy(buf + nread, VARDATA(datafield) + off, n);
     564       10008 :                 nread += n;
     565       10008 :                 obj_desc->offset += n;
     566             :             }
     567       10104 :             if (pfreeit)
     568       10028 :                 pfree(datafield);
     569             :         }
     570             : 
     571       10110 :         if (nread >= nbytes)
     572        1062 :             break;
     573             :     }
     574             : 
     575        1360 :     systable_endscan_ordered(sd);
     576             : 
     577        1360 :     return nread;
     578             : }
     579             : 
     580             : int
     581        1552 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
     582             : {
     583        1552 :     int         nwritten = 0;
     584             :     int         n;
     585             :     int         off;
     586             :     int         len;
     587        1552 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     588             :     ScanKeyData skey[2];
     589             :     SysScanDesc sd;
     590             :     HeapTuple   oldtuple;
     591             :     Form_pg_largeobject olddata;
     592             :     bool        neednextpage;
     593             :     bytea      *datafield;
     594             :     bool        pfreeit;
     595             :     union
     596             :     {
     597             :         bytea       hdr;
     598             :         /* this is to make the union big enough for a LO data chunk: */
     599             :         char        data[LOBLKSIZE + VARHDRSZ];
     600             :         /* ensure union is aligned well enough: */
     601             :         int32       align_it;
     602             :     }           workbuf;
     603        1552 :     char       *workb = VARDATA(&workbuf.hdr);
     604             :     HeapTuple   newtup;
     605             :     Datum       values[Natts_pg_largeobject];
     606             :     bool        nulls[Natts_pg_largeobject];
     607             :     bool        replace[Natts_pg_largeobject];
     608             :     CatalogIndexState indstate;
     609             : 
     610             :     Assert(PointerIsValid(obj_desc));
     611             :     Assert(buf != NULL);
     612             : 
     613             :     /* enforce writability because snapshot is probably wrong otherwise */
     614        1552 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     615           0 :         ereport(ERROR,
     616             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     617             :                  errmsg("permission denied for large object %u",
     618             :                         obj_desc->id)));
     619             : 
     620        1552 :     if (nbytes <= 0)
     621           0 :         return 0;
     622             : 
     623             :     /* this addition can't overflow because nbytes is only int32 */
     624        1552 :     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
     625           0 :         ereport(ERROR,
     626             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     627             :                  errmsg("invalid large object write request size: %d",
     628             :                         nbytes)));
     629             : 
     630        1552 :     open_lo_relation();
     631             : 
     632        1552 :     indstate = CatalogOpenIndexes(lo_heap_r);
     633             : 
     634        1552 :     ScanKeyInit(&skey[0],
     635             :                 Anum_pg_largeobject_loid,
     636             :                 BTEqualStrategyNumber, F_OIDEQ,
     637             :                 ObjectIdGetDatum(obj_desc->id));
     638             : 
     639        1552 :     ScanKeyInit(&skey[1],
     640             :                 Anum_pg_largeobject_pageno,
     641             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     642             :                 Int32GetDatum(pageno));
     643             : 
     644        1552 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     645             :                                     obj_desc->snapshot, 2, skey);
     646             : 
     647        1552 :     oldtuple = NULL;
     648        1552 :     olddata = NULL;
     649        1552 :     neednextpage = true;
     650             : 
     651        9500 :     while (nwritten < nbytes)
     652             :     {
     653             :         /*
     654             :          * If possible, get next pre-existing page of the LO.  We expect the
     655             :          * indexscan will deliver these in order --- but there may be holes.
     656             :          */
     657        7948 :         if (neednextpage)
     658             :         {
     659        1558 :             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     660             :             {
     661          24 :                 if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     662           0 :                     elog(ERROR, "null field found in pg_largeobject");
     663          24 :                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     664             :                 Assert(olddata->pageno >= pageno);
     665             :             }
     666        1558 :             neednextpage = false;
     667             :         }
     668             : 
     669             :         /*
     670             :          * If we have a pre-existing page, see if it is the page we want to
     671             :          * write, or a later one.
     672             :          */
     673        7948 :         if (olddata != NULL && olddata->pageno == pageno)
     674             :         {
     675             :             /*
     676             :              * Update an existing page with fresh data.
     677             :              *
     678             :              * First, load old data into workbuf
     679             :              */
     680          24 :             getdatafield(olddata, &datafield, &len, &pfreeit);
     681          24 :             memcpy(workb, VARDATA(datafield), len);
     682          24 :             if (pfreeit)
     683          18 :                 pfree(datafield);
     684             : 
     685             :             /*
     686             :              * Fill any hole
     687             :              */
     688          24 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     689          24 :             if (off > len)
     690           0 :                 MemSet(workb + len, 0, off - len);
     691             : 
     692             :             /*
     693             :              * Insert appropriate portion of new data
     694             :              */
     695          24 :             n = LOBLKSIZE - off;
     696          24 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     697          24 :             memcpy(workb + off, buf + nwritten, n);
     698          24 :             nwritten += n;
     699          24 :             obj_desc->offset += n;
     700          24 :             off += n;
     701             :             /* compute valid length of new page */
     702          24 :             len = (len >= off) ? len : off;
     703          24 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     704             : 
     705             :             /*
     706             :              * Form and insert updated tuple
     707             :              */
     708          24 :             memset(values, 0, sizeof(values));
     709          24 :             memset(nulls, false, sizeof(nulls));
     710          24 :             memset(replace, false, sizeof(replace));
     711          24 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     712          24 :             replace[Anum_pg_largeobject_data - 1] = true;
     713          24 :             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     714             :                                        values, nulls, replace);
     715          24 :             CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     716             :                                        indstate);
     717          24 :             heap_freetuple(newtup);
     718             : 
     719             :             /*
     720             :              * We're done with this old page.
     721             :              */
     722          24 :             oldtuple = NULL;
     723          24 :             olddata = NULL;
     724          24 :             neednextpage = true;
     725             :         }
     726             :         else
     727             :         {
     728             :             /*
     729             :              * Write a brand new page.
     730             :              *
     731             :              * First, fill any hole
     732             :              */
     733        7924 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     734        7924 :             if (off > 0)
     735           6 :                 MemSet(workb, 0, off);
     736             : 
     737             :             /*
     738             :              * Insert appropriate portion of new data
     739             :              */
     740        7924 :             n = LOBLKSIZE - off;
     741        7924 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     742        7924 :             memcpy(workb + off, buf + nwritten, n);
     743        7924 :             nwritten += n;
     744        7924 :             obj_desc->offset += n;
     745             :             /* compute valid length of new page */
     746        7924 :             len = off + n;
     747        7924 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     748             : 
     749             :             /*
     750             :              * Form and insert updated tuple
     751             :              */
     752        7924 :             memset(values, 0, sizeof(values));
     753        7924 :             memset(nulls, false, sizeof(nulls));
     754        7924 :             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     755        7924 :             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     756        7924 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     757        7924 :             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     758        7924 :             CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     759        7924 :             heap_freetuple(newtup);
     760             :         }
     761        7948 :         pageno++;
     762             :     }
     763             : 
     764        1552 :     systable_endscan_ordered(sd);
     765             : 
     766        1552 :     CatalogCloseIndexes(indstate);
     767             : 
     768             :     /*
     769             :      * Advance command counter so that my tuple updates will be seen by later
     770             :      * large-object operations in this transaction.
     771             :      */
     772        1552 :     CommandCounterIncrement();
     773             : 
     774        1552 :     return nwritten;
     775             : }
     776             : 
     777             : void
     778          42 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
     779             : {
     780          42 :     int32       pageno = (int32) (len / LOBLKSIZE);
     781             :     int32       off;
     782             :     ScanKeyData skey[2];
     783             :     SysScanDesc sd;
     784             :     HeapTuple   oldtuple;
     785             :     Form_pg_largeobject olddata;
     786             :     union
     787             :     {
     788             :         bytea       hdr;
     789             :         /* this is to make the union big enough for a LO data chunk: */
     790             :         char        data[LOBLKSIZE + VARHDRSZ];
     791             :         /* ensure union is aligned well enough: */
     792             :         int32       align_it;
     793             :     }           workbuf;
     794          42 :     char       *workb = VARDATA(&workbuf.hdr);
     795             :     HeapTuple   newtup;
     796             :     Datum       values[Natts_pg_largeobject];
     797             :     bool        nulls[Natts_pg_largeobject];
     798             :     bool        replace[Natts_pg_largeobject];
     799             :     CatalogIndexState indstate;
     800             : 
     801             :     Assert(PointerIsValid(obj_desc));
     802             : 
     803             :     /* enforce writability because snapshot is probably wrong otherwise */
     804          42 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     805           0 :         ereport(ERROR,
     806             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     807             :                  errmsg("permission denied for large object %u",
     808             :                         obj_desc->id)));
     809             : 
     810             :     /*
     811             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     812             :      * in translatable strings; doing better is not worth the trouble
     813             :      */
     814          42 :     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
     815           0 :         ereport(ERROR,
     816             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     817             :                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
     818             :                                  len)));
     819             : 
     820          42 :     open_lo_relation();
     821             : 
     822          42 :     indstate = CatalogOpenIndexes(lo_heap_r);
     823             : 
     824             :     /*
     825             :      * Set up to find all pages with desired loid and pageno >= target
     826             :      */
     827          42 :     ScanKeyInit(&skey[0],
     828             :                 Anum_pg_largeobject_loid,
     829             :                 BTEqualStrategyNumber, F_OIDEQ,
     830             :                 ObjectIdGetDatum(obj_desc->id));
     831             : 
     832          42 :     ScanKeyInit(&skey[1],
     833             :                 Anum_pg_largeobject_pageno,
     834             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     835             :                 Int32GetDatum(pageno));
     836             : 
     837          42 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     838             :                                     obj_desc->snapshot, 2, skey);
     839             : 
     840             :     /*
     841             :      * If possible, get the page the truncation point is in. The truncation
     842             :      * point may be beyond the end of the LO or in a hole.
     843             :      */
     844          42 :     olddata = NULL;
     845          42 :     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     846             :     {
     847          24 :         if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     848           0 :             elog(ERROR, "null field found in pg_largeobject");
     849          24 :         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     850             :         Assert(olddata->pageno >= pageno);
     851             :     }
     852             : 
     853             :     /*
     854             :      * If we found the page of the truncation point we need to truncate the
     855             :      * data in it.  Otherwise if we're in a hole, we need to create a page to
     856             :      * mark the end of data.
     857             :      */
     858          42 :     if (olddata != NULL && olddata->pageno == pageno)
     859          12 :     {
     860             :         /* First, load old data into workbuf */
     861             :         bytea      *datafield;
     862             :         int         pagelen;
     863             :         bool        pfreeit;
     864             : 
     865          12 :         getdatafield(olddata, &datafield, &pagelen, &pfreeit);
     866          12 :         memcpy(workb, VARDATA(datafield), pagelen);
     867          12 :         if (pfreeit)
     868           6 :             pfree(datafield);
     869             : 
     870             :         /*
     871             :          * Fill any hole
     872             :          */
     873          12 :         off = len % LOBLKSIZE;
     874          12 :         if (off > pagelen)
     875           6 :             MemSet(workb + pagelen, 0, off - pagelen);
     876             : 
     877             :         /* compute length of new page */
     878          12 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     879             : 
     880             :         /*
     881             :          * Form and insert updated tuple
     882             :          */
     883          12 :         memset(values, 0, sizeof(values));
     884          12 :         memset(nulls, false, sizeof(nulls));
     885          12 :         memset(replace, false, sizeof(replace));
     886          12 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     887          12 :         replace[Anum_pg_largeobject_data - 1] = true;
     888          12 :         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     889             :                                    values, nulls, replace);
     890          12 :         CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     891             :                                    indstate);
     892          12 :         heap_freetuple(newtup);
     893             :     }
     894             :     else
     895             :     {
     896             :         /*
     897             :          * If the first page we found was after the truncation point, we're in
     898             :          * a hole that we'll fill, but we need to delete the later page
     899             :          * because the loop below won't visit it again.
     900             :          */
     901          30 :         if (olddata != NULL)
     902             :         {
     903             :             Assert(olddata->pageno > pageno);
     904          12 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     905             :         }
     906             : 
     907             :         /*
     908             :          * Write a brand new page.
     909             :          *
     910             :          * Fill the hole up to the truncation point
     911             :          */
     912          30 :         off = len % LOBLKSIZE;
     913          30 :         if (off > 0)
     914          30 :             MemSet(workb, 0, off);
     915             : 
     916             :         /* compute length of new page */
     917          30 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     918             : 
     919             :         /*
     920             :          * Form and insert new tuple
     921             :          */
     922          30 :         memset(values, 0, sizeof(values));
     923          30 :         memset(nulls, false, sizeof(nulls));
     924          30 :         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     925          30 :         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     926          30 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     927          30 :         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     928          30 :         CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     929          30 :         heap_freetuple(newtup);
     930             :     }
     931             : 
     932             :     /*
     933             :      * Delete any pages after the truncation point.  If the initial search
     934             :      * didn't find a page, then of course there's nothing more to do.
     935             :      */
     936          42 :     if (olddata != NULL)
     937             :     {
     938          30 :         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     939             :         {
     940           6 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     941             :         }
     942             :     }
     943             : 
     944          42 :     systable_endscan_ordered(sd);
     945             : 
     946          42 :     CatalogCloseIndexes(indstate);
     947             : 
     948             :     /*
     949             :      * Advance command counter so that tuple updates will be seen by later
     950             :      * large-object operations in this transaction.
     951             :      */
     952          42 :     CommandCounterIncrement();
     953          42 : }

Generated by: LCOV version 1.14