LCOV - code coverage report
Current view: top level - src/backend/storage/large_object - inv_api.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 268 284 94.4 %
Date: 2024-11-21 08:14:44 Functions: 13 13 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * inv_api.c
       4             :  *    routines for manipulating inversion fs large objects. This file
       5             :  *    contains the user-level large object application interface routines.
       6             :  *
       7             :  *
       8             :  * Note: we access pg_largeobject.data using its C struct declaration.
       9             :  * This is safe because it immediately follows pageno which is an int4 field,
      10             :  * and therefore the data field will always be 4-byte aligned, even if it
      11             :  * is in the short 1-byte-header format.  We have to detoast it since it's
      12             :  * quite likely to be in compressed or short format.  We also need to check
      13             :  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
      14             :  *
      15             :  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
      16             :  * does most of the backend code.  We expect that CurrentMemoryContext will
      17             :  * be a short-lived context.  Data that must persist across function calls
      18             :  * is kept either in CacheMemoryContext (the Relation structs) or in the
      19             :  * memory context given to inv_open (for LargeObjectDesc structs).
      20             :  *
      21             :  *
      22             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      23             :  * Portions Copyright (c) 1994, Regents of the University of California
      24             :  *
      25             :  *
      26             :  * IDENTIFICATION
      27             :  *    src/backend/storage/large_object/inv_api.c
      28             :  *
      29             :  *-------------------------------------------------------------------------
      30             :  */
      31             : #include "postgres.h"
      32             : 
      33             : #include <limits.h>
      34             : 
      35             : #include "access/detoast.h"
      36             : #include "access/genam.h"
      37             : #include "access/htup_details.h"
      38             : #include "access/table.h"
      39             : #include "access/xact.h"
      40             : #include "catalog/dependency.h"
      41             : #include "catalog/indexing.h"
      42             : #include "catalog/objectaccess.h"
      43             : #include "catalog/pg_largeobject.h"
      44             : #include "libpq/libpq-fs.h"
      45             : #include "miscadmin.h"
      46             : #include "storage/large_object.h"
      47             : #include "utils/acl.h"
      48             : #include "utils/fmgroids.h"
      49             : #include "utils/rel.h"
      50             : #include "utils/snapmgr.h"
      51             : 
      52             : 
      53             : /*
      54             :  * GUC: backwards-compatibility flag to suppress LO permission checks
      55             :  */
      56             : bool        lo_compat_privileges;
      57             : 
      58             : /*
      59             :  * All accesses to pg_largeobject and its index make use of a single
      60             :  * Relation reference.  To guarantee that the relcache entry remains
      61             :  * in the cache, on the first reference inside a subtransaction, we
      62             :  * execute a slightly klugy maneuver to assign ownership of the
      63             :  * Relation reference to TopTransactionResourceOwner.
      64             :  */
      65             : static Relation lo_heap_r = NULL;
      66             : static Relation lo_index_r = NULL;
      67             : 
      68             : 
      69             : /*
      70             :  * Open pg_largeobject and its index, if not already done in current xact
      71             :  */
      72             : static void
      73        3058 : open_lo_relation(void)
      74             : {
      75             :     ResourceOwner currentOwner;
      76             : 
      77        3058 :     if (lo_heap_r && lo_index_r)
      78        2766 :         return;                 /* already open in current xact */
      79             : 
      80             :     /* Arrange for the top xact to own these relation references */
      81         292 :     currentOwner = CurrentResourceOwner;
      82         292 :     CurrentResourceOwner = TopTransactionResourceOwner;
      83             : 
      84             :     /* Use RowExclusiveLock since we might either read or write */
      85         292 :     if (lo_heap_r == NULL)
      86         292 :         lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
      87         292 :     if (lo_index_r == NULL)
      88         292 :         lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
      89             : 
      90         292 :     CurrentResourceOwner = currentOwner;
      91             : }
      92             : 
      93             : /*
      94             :  * Clean up at main transaction end
      95             :  */
      96             : void
      97         442 : close_lo_relation(bool isCommit)
      98             : {
      99         442 :     if (lo_heap_r || lo_index_r)
     100             :     {
     101             :         /*
     102             :          * Only bother to close if committing; else abort cleanup will handle
     103             :          * it
     104             :          */
     105         292 :         if (isCommit)
     106             :         {
     107             :             ResourceOwner currentOwner;
     108             : 
     109         210 :             currentOwner = CurrentResourceOwner;
     110         210 :             CurrentResourceOwner = TopTransactionResourceOwner;
     111             : 
     112         210 :             if (lo_index_r)
     113         210 :                 index_close(lo_index_r, NoLock);
     114         210 :             if (lo_heap_r)
     115         210 :                 table_close(lo_heap_r, NoLock);
     116             : 
     117         210 :             CurrentResourceOwner = currentOwner;
     118             :         }
     119         292 :         lo_heap_r = NULL;
     120         292 :         lo_index_r = NULL;
     121             :     }
     122         442 : }
     123             : 
     124             : 
     125             : /*
     126             :  * Extract data field from a pg_largeobject tuple, detoasting if needed
     127             :  * and verifying that the length is sane.  Returns data pointer (a bytea *),
     128             :  * data length, and an indication of whether to pfree the data pointer.
     129             :  */
     130             : static void
     131       10236 : getdatafield(Form_pg_largeobject tuple,
     132             :              bytea **pdatafield,
     133             :              int *plen,
     134             :              bool *pfreeit)
     135             : {
     136             :     bytea      *datafield;
     137             :     int         len;
     138             :     bool        freeit;
     139             : 
     140       10236 :     datafield = &(tuple->data); /* see note at top of file */
     141       10236 :     freeit = false;
     142       10236 :     if (VARATT_IS_EXTENDED(datafield))
     143             :     {
     144             :         datafield = (bytea *)
     145       10070 :             detoast_attr((struct varlena *) datafield);
     146       10070 :         freeit = true;
     147             :     }
     148       10236 :     len = VARSIZE(datafield) - VARHDRSZ;
     149       10236 :     if (len < 0 || len > LOBLKSIZE)
     150           0 :         ereport(ERROR,
     151             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     152             :                  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
     153             :                         tuple->loid, tuple->pageno, len)));
     154       10236 :     *pdatafield = datafield;
     155       10236 :     *plen = len;
     156       10236 :     *pfreeit = freeit;
     157       10236 : }
     158             : 
     159             : 
     160             : /*
     161             :  *  inv_create -- create a new large object
     162             :  *
     163             :  *  Arguments:
     164             :  *    lobjId - OID to use for new large object, or InvalidOid to pick one
     165             :  *
     166             :  *  Returns:
     167             :  *    OID of new object
     168             :  *
     169             :  * If lobjId is not InvalidOid, then an error occurs if the OID is already
     170             :  * in use.
     171             :  */
     172             : Oid
     173         112 : inv_create(Oid lobjId)
     174             : {
     175             :     Oid         lobjId_new;
     176             : 
     177             :     /*
     178             :      * Create a new largeobject with empty data pages
     179             :      */
     180         112 :     lobjId_new = LargeObjectCreate(lobjId);
     181             : 
     182             :     /*
     183             :      * dependency on the owner of largeobject
     184             :      *
     185             :      * Note that LO dependencies are recorded using classId
     186             :      * LargeObjectRelationId for backwards-compatibility reasons.  Using
     187             :      * LargeObjectMetadataRelationId instead would simplify matters for the
     188             :      * backend, but it'd complicate pg_dump and possibly break other clients.
     189             :      */
     190         112 :     recordDependencyOnOwner(LargeObjectRelationId,
     191             :                             lobjId_new, GetUserId());
     192             : 
     193             :     /* Post creation hook for new large object */
     194         112 :     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
     195             : 
     196             :     /*
     197             :      * Advance command counter to make new tuple visible to later operations.
     198             :      */
     199         112 :     CommandCounterIncrement();
     200             : 
     201         112 :     return lobjId_new;
     202             : }
     203             : 
     204             : /*
     205             :  *  inv_open -- access an existing large object.
     206             :  *
     207             :  * Returns a large object descriptor, appropriately filled in.
     208             :  * The descriptor and subsidiary data are allocated in the specified
     209             :  * memory context, which must be suitably long-lived for the caller's
     210             :  * purposes.  If the returned descriptor has a snapshot associated
     211             :  * with it, the caller must ensure that it also lives long enough,
     212             :  * e.g. by calling RegisterSnapshotOnOwner
     213             :  */
     214             : LargeObjectDesc *
     215         464 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
     216             : {
     217             :     LargeObjectDesc *retval;
     218         464 :     Snapshot    snapshot = NULL;
     219         464 :     int         descflags = 0;
     220             : 
     221             :     /*
     222             :      * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
     223             :      * | INV_READ), the caller being allowed to read the large object
     224             :      * descriptor in either case.
     225             :      */
     226         464 :     if (flags & INV_WRITE)
     227         154 :         descflags |= IFS_WRLOCK | IFS_RDLOCK;
     228         464 :     if (flags & INV_READ)
     229         340 :         descflags |= IFS_RDLOCK;
     230             : 
     231         464 :     if (descflags == 0)
     232           0 :         ereport(ERROR,
     233             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     234             :                  errmsg("invalid flags for opening a large object: %d",
     235             :                         flags)));
     236             : 
     237             :     /* Get snapshot.  If write is requested, use an instantaneous snapshot. */
     238         464 :     if (descflags & IFS_WRLOCK)
     239         154 :         snapshot = NULL;
     240             :     else
     241         310 :         snapshot = GetActiveSnapshot();
     242             : 
     243             :     /* Can't use LargeObjectExists here because we need to specify snapshot */
     244         464 :     if (!LargeObjectExistsWithSnapshot(lobjId, snapshot))
     245           4 :         ereport(ERROR,
     246             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     247             :                  errmsg("large object %u does not exist", lobjId)));
     248             : 
     249             :     /* Apply permission checks, again specifying snapshot */
     250         460 :     if ((descflags & IFS_RDLOCK) != 0)
     251             :     {
     252         902 :         if (!lo_compat_privileges &&
     253         442 :             pg_largeobject_aclcheck_snapshot(lobjId,
     254             :                                              GetUserId(),
     255             :                                              ACL_SELECT,
     256             :                                              snapshot) != ACLCHECK_OK)
     257          42 :             ereport(ERROR,
     258             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     259             :                      errmsg("permission denied for large object %u",
     260             :                             lobjId)));
     261             :     }
     262         418 :     if ((descflags & IFS_WRLOCK) != 0)
     263             :     {
     264         248 :         if (!lo_compat_privileges &&
     265         118 :             pg_largeobject_aclcheck_snapshot(lobjId,
     266             :                                              GetUserId(),
     267             :                                              ACL_UPDATE,
     268             :                                              snapshot) != ACLCHECK_OK)
     269          12 :             ereport(ERROR,
     270             :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     271             :                      errmsg("permission denied for large object %u",
     272             :                             lobjId)));
     273             :     }
     274             : 
     275             :     /* OK to create a descriptor */
     276         406 :     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
     277             :                                                     sizeof(LargeObjectDesc));
     278         406 :     retval->id = lobjId;
     279         406 :     retval->offset = 0;
     280         406 :     retval->flags = descflags;
     281             : 
     282             :     /* caller sets if needed, not used by the functions in this file */
     283         406 :     retval->subid = InvalidSubTransactionId;
     284             : 
     285             :     /*
     286             :      * The snapshot (if any) is just the currently active snapshot.  The
     287             :      * caller will replace it with a longer-lived copy if needed.
     288             :      */
     289         406 :     retval->snapshot = snapshot;
     290             : 
     291         406 :     return retval;
     292             : }
     293             : 
     294             : /*
     295             :  * Closes a large object descriptor previously made by inv_open(), and
     296             :  * releases the long-term memory used by it.
     297             :  */
     298             : void
     299         376 : inv_close(LargeObjectDesc *obj_desc)
     300             : {
     301             :     Assert(PointerIsValid(obj_desc));
     302         376 :     pfree(obj_desc);
     303         376 : }
     304             : 
     305             : /*
     306             :  * Destroys an existing large object (not to be confused with a descriptor!)
     307             :  *
     308             :  * Note we expect caller to have done any required permissions check.
     309             :  */
     310             : int
     311          82 : inv_drop(Oid lobjId)
     312             : {
     313             :     ObjectAddress object;
     314             : 
     315             :     /*
     316             :      * Delete any comments and dependencies on the large object
     317             :      */
     318          82 :     object.classId = LargeObjectRelationId;
     319          82 :     object.objectId = lobjId;
     320          82 :     object.objectSubId = 0;
     321          82 :     performDeletion(&object, DROP_CASCADE, 0);
     322             : 
     323             :     /*
     324             :      * Advance command counter so that tuple removal will be seen by later
     325             :      * large-object operations in this transaction.
     326             :      */
     327          82 :     CommandCounterIncrement();
     328             : 
     329             :     /* For historical reasons, we always return 1 on success. */
     330          82 :     return 1;
     331             : }
     332             : 
     333             : /*
     334             :  * Determine size of a large object
     335             :  *
     336             :  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
     337             :  * the offset of the last byte + 1.
     338             :  */
     339             : static uint64
     340         104 : inv_getsize(LargeObjectDesc *obj_desc)
     341             : {
     342         104 :     uint64      lastbyte = 0;
     343             :     ScanKeyData skey[1];
     344             :     SysScanDesc sd;
     345             :     HeapTuple   tuple;
     346             : 
     347             :     Assert(PointerIsValid(obj_desc));
     348             : 
     349         104 :     open_lo_relation();
     350             : 
     351         104 :     ScanKeyInit(&skey[0],
     352             :                 Anum_pg_largeobject_loid,
     353             :                 BTEqualStrategyNumber, F_OIDEQ,
     354             :                 ObjectIdGetDatum(obj_desc->id));
     355             : 
     356         104 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     357             :                                     obj_desc->snapshot, 1, skey);
     358             : 
     359             :     /*
     360             :      * Because the pg_largeobject index is on both loid and pageno, but we
     361             :      * constrain only loid, a backwards scan should visit all pages of the
     362             :      * large object in reverse pageno order.  So, it's sufficient to examine
     363             :      * the first valid tuple (== last valid page).
     364             :      */
     365         104 :     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
     366         104 :     if (HeapTupleIsValid(tuple))
     367             :     {
     368             :         Form_pg_largeobject data;
     369             :         bytea      *datafield;
     370             :         int         len;
     371             :         bool        pfreeit;
     372             : 
     373          96 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     374           0 :             elog(ERROR, "null field found in pg_largeobject");
     375          96 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     376          96 :         getdatafield(data, &datafield, &len, &pfreeit);
     377          96 :         lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
     378          96 :         if (pfreeit)
     379          18 :             pfree(datafield);
     380             :     }
     381             : 
     382         104 :     systable_endscan_ordered(sd);
     383             : 
     384         104 :     return lastbyte;
     385             : }
     386             : 
     387             : int64
     388         220 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
     389             : {
     390             :     int64       newoffset;
     391             : 
     392             :     Assert(PointerIsValid(obj_desc));
     393             : 
     394             :     /*
     395             :      * We allow seek/tell if you have either read or write permission, so no
     396             :      * need for a permission check here.
     397             :      */
     398             : 
     399             :     /*
     400             :      * Note: overflow in the additions is possible, but since we will reject
     401             :      * negative results, we don't need any extra test for that.
     402             :      */
     403         220 :     switch (whence)
     404             :     {
     405          98 :         case SEEK_SET:
     406          98 :             newoffset = offset;
     407          98 :             break;
     408          18 :         case SEEK_CUR:
     409          18 :             newoffset = obj_desc->offset + offset;
     410          18 :             break;
     411         104 :         case SEEK_END:
     412         104 :             newoffset = inv_getsize(obj_desc) + offset;
     413         104 :             break;
     414           0 :         default:
     415           0 :             ereport(ERROR,
     416             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     417             :                      errmsg("invalid whence setting: %d", whence)));
     418             :             newoffset = 0;      /* keep compiler quiet */
     419             :             break;
     420             :     }
     421             : 
     422             :     /*
     423             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     424             :      * in translatable strings; doing better is not worth the trouble
     425             :      */
     426         220 :     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
     427           0 :         ereport(ERROR,
     428             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     429             :                  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
     430             :                                  newoffset)));
     431             : 
     432         220 :     obj_desc->offset = newoffset;
     433         220 :     return newoffset;
     434             : }
     435             : 
     436             : int64
     437          48 : inv_tell(LargeObjectDesc *obj_desc)
     438             : {
     439             :     Assert(PointerIsValid(obj_desc));
     440             : 
     441             :     /*
     442             :      * We allow seek/tell if you have either read or write permission, so no
     443             :      * need for a permission check here.
     444             :      */
     445             : 
     446          48 :     return obj_desc->offset;
     447             : }
     448             : 
     449             : int
     450        1368 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
     451             : {
     452        1368 :     int         nread = 0;
     453             :     int64       n;
     454             :     int64       off;
     455             :     int         len;
     456        1368 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     457             :     uint64      pageoff;
     458             :     ScanKeyData skey[2];
     459             :     SysScanDesc sd;
     460             :     HeapTuple   tuple;
     461             : 
     462             :     Assert(PointerIsValid(obj_desc));
     463             :     Assert(buf != NULL);
     464             : 
     465        1368 :     if ((obj_desc->flags & IFS_RDLOCK) == 0)
     466           0 :         ereport(ERROR,
     467             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     468             :                  errmsg("permission denied for large object %u",
     469             :                         obj_desc->id)));
     470             : 
     471        1368 :     if (nbytes <= 0)
     472           8 :         return 0;
     473             : 
     474        1360 :     open_lo_relation();
     475             : 
     476        1360 :     ScanKeyInit(&skey[0],
     477             :                 Anum_pg_largeobject_loid,
     478             :                 BTEqualStrategyNumber, F_OIDEQ,
     479             :                 ObjectIdGetDatum(obj_desc->id));
     480             : 
     481        1360 :     ScanKeyInit(&skey[1],
     482             :                 Anum_pg_largeobject_pageno,
     483             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     484             :                 Int32GetDatum(pageno));
     485             : 
     486        1360 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     487             :                                     obj_desc->snapshot, 2, skey);
     488             : 
     489       10408 :     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     490             :     {
     491             :         Form_pg_largeobject data;
     492             :         bytea      *datafield;
     493             :         bool        pfreeit;
     494             : 
     495       10110 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     496           0 :             elog(ERROR, "null field found in pg_largeobject");
     497       10110 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     498             : 
     499             :         /*
     500             :          * We expect the indexscan will deliver pages in order.  However,
     501             :          * there may be missing pages if the LO contains unwritten "holes". We
     502             :          * want missing sections to read out as zeroes.
     503             :          */
     504       10110 :         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
     505       10110 :         if (pageoff > obj_desc->offset)
     506             :         {
     507          12 :             n = pageoff - obj_desc->offset;
     508          12 :             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     509          12 :             MemSet(buf + nread, 0, n);
     510          12 :             nread += n;
     511          12 :             obj_desc->offset += n;
     512             :         }
     513             : 
     514       10110 :         if (nread < nbytes)
     515             :         {
     516             :             Assert(obj_desc->offset >= pageoff);
     517       10104 :             off = (int) (obj_desc->offset - pageoff);
     518             :             Assert(off >= 0 && off < LOBLKSIZE);
     519             : 
     520       10104 :             getdatafield(data, &datafield, &len, &pfreeit);
     521       10104 :             if (len > off)
     522             :             {
     523       10008 :                 n = len - off;
     524       10008 :                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     525       10008 :                 memcpy(buf + nread, VARDATA(datafield) + off, n);
     526       10008 :                 nread += n;
     527       10008 :                 obj_desc->offset += n;
     528             :             }
     529       10104 :             if (pfreeit)
     530       10028 :                 pfree(datafield);
     531             :         }
     532             : 
     533       10110 :         if (nread >= nbytes)
     534        1062 :             break;
     535             :     }
     536             : 
     537        1360 :     systable_endscan_ordered(sd);
     538             : 
     539        1360 :     return nread;
     540             : }
     541             : 
     542             : int
     543        1552 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
     544             : {
     545        1552 :     int         nwritten = 0;
     546             :     int         n;
     547             :     int         off;
     548             :     int         len;
     549        1552 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     550             :     ScanKeyData skey[2];
     551             :     SysScanDesc sd;
     552             :     HeapTuple   oldtuple;
     553             :     Form_pg_largeobject olddata;
     554             :     bool        neednextpage;
     555             :     bytea      *datafield;
     556             :     bool        pfreeit;
     557             :     union
     558             :     {
     559             :         bytea       hdr;
     560             :         /* this is to make the union big enough for a LO data chunk: */
     561             :         char        data[LOBLKSIZE + VARHDRSZ];
     562             :         /* ensure union is aligned well enough: */
     563             :         int32       align_it;
     564             :     }           workbuf;
     565        1552 :     char       *workb = VARDATA(&workbuf.hdr);
     566             :     HeapTuple   newtup;
     567             :     Datum       values[Natts_pg_largeobject];
     568             :     bool        nulls[Natts_pg_largeobject];
     569             :     bool        replace[Natts_pg_largeobject];
     570             :     CatalogIndexState indstate;
     571             : 
     572             :     Assert(PointerIsValid(obj_desc));
     573             :     Assert(buf != NULL);
     574             : 
     575             :     /* enforce writability because snapshot is probably wrong otherwise */
     576        1552 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     577           0 :         ereport(ERROR,
     578             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     579             :                  errmsg("permission denied for large object %u",
     580             :                         obj_desc->id)));
     581             : 
     582        1552 :     if (nbytes <= 0)
     583           0 :         return 0;
     584             : 
     585             :     /* this addition can't overflow because nbytes is only int32 */
     586        1552 :     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
     587           0 :         ereport(ERROR,
     588             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     589             :                  errmsg("invalid large object write request size: %d",
     590             :                         nbytes)));
     591             : 
     592        1552 :     open_lo_relation();
     593             : 
     594        1552 :     indstate = CatalogOpenIndexes(lo_heap_r);
     595             : 
     596        1552 :     ScanKeyInit(&skey[0],
     597             :                 Anum_pg_largeobject_loid,
     598             :                 BTEqualStrategyNumber, F_OIDEQ,
     599             :                 ObjectIdGetDatum(obj_desc->id));
     600             : 
     601        1552 :     ScanKeyInit(&skey[1],
     602             :                 Anum_pg_largeobject_pageno,
     603             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     604             :                 Int32GetDatum(pageno));
     605             : 
     606        1552 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     607             :                                     obj_desc->snapshot, 2, skey);
     608             : 
     609        1552 :     oldtuple = NULL;
     610        1552 :     olddata = NULL;
     611        1552 :     neednextpage = true;
     612             : 
     613        9500 :     while (nwritten < nbytes)
     614             :     {
     615             :         /*
     616             :          * If possible, get next pre-existing page of the LO.  We expect the
     617             :          * indexscan will deliver these in order --- but there may be holes.
     618             :          */
     619        7948 :         if (neednextpage)
     620             :         {
     621        1558 :             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     622             :             {
     623          24 :                 if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     624           0 :                     elog(ERROR, "null field found in pg_largeobject");
     625          24 :                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     626             :                 Assert(olddata->pageno >= pageno);
     627             :             }
     628        1558 :             neednextpage = false;
     629             :         }
     630             : 
     631             :         /*
     632             :          * If we have a pre-existing page, see if it is the page we want to
     633             :          * write, or a later one.
     634             :          */
     635        7948 :         if (olddata != NULL && olddata->pageno == pageno)
     636             :         {
     637             :             /*
     638             :              * Update an existing page with fresh data.
     639             :              *
     640             :              * First, load old data into workbuf
     641             :              */
     642          24 :             getdatafield(olddata, &datafield, &len, &pfreeit);
     643          24 :             memcpy(workb, VARDATA(datafield), len);
     644          24 :             if (pfreeit)
     645          18 :                 pfree(datafield);
     646             : 
     647             :             /*
     648             :              * Fill any hole
     649             :              */
     650          24 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     651          24 :             if (off > len)
     652           0 :                 MemSet(workb + len, 0, off - len);
     653             : 
     654             :             /*
     655             :              * Insert appropriate portion of new data
     656             :              */
     657          24 :             n = LOBLKSIZE - off;
     658          24 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     659          24 :             memcpy(workb + off, buf + nwritten, n);
     660          24 :             nwritten += n;
     661          24 :             obj_desc->offset += n;
     662          24 :             off += n;
     663             :             /* compute valid length of new page */
     664          24 :             len = (len >= off) ? len : off;
     665          24 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     666             : 
     667             :             /*
     668             :              * Form and insert updated tuple
     669             :              */
     670          24 :             memset(values, 0, sizeof(values));
     671          24 :             memset(nulls, false, sizeof(nulls));
     672          24 :             memset(replace, false, sizeof(replace));
     673          24 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     674          24 :             replace[Anum_pg_largeobject_data - 1] = true;
     675          24 :             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     676             :                                        values, nulls, replace);
     677          24 :             CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     678             :                                        indstate);
     679          24 :             heap_freetuple(newtup);
     680             : 
     681             :             /*
     682             :              * We're done with this old page.
     683             :              */
     684          24 :             oldtuple = NULL;
     685          24 :             olddata = NULL;
     686          24 :             neednextpage = true;
     687             :         }
     688             :         else
     689             :         {
     690             :             /*
     691             :              * Write a brand new page.
     692             :              *
     693             :              * First, fill any hole
     694             :              */
     695        7924 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     696        7924 :             if (off > 0)
     697           6 :                 MemSet(workb, 0, off);
     698             : 
     699             :             /*
     700             :              * Insert appropriate portion of new data
     701             :              */
     702        7924 :             n = LOBLKSIZE - off;
     703        7924 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     704        7924 :             memcpy(workb + off, buf + nwritten, n);
     705        7924 :             nwritten += n;
     706        7924 :             obj_desc->offset += n;
     707             :             /* compute valid length of new page */
     708        7924 :             len = off + n;
     709        7924 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     710             : 
     711             :             /*
     712             :              * Form and insert updated tuple
     713             :              */
     714        7924 :             memset(values, 0, sizeof(values));
     715        7924 :             memset(nulls, false, sizeof(nulls));
     716        7924 :             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     717        7924 :             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     718        7924 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     719        7924 :             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     720        7924 :             CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     721        7924 :             heap_freetuple(newtup);
     722             :         }
     723        7948 :         pageno++;
     724             :     }
     725             : 
     726        1552 :     systable_endscan_ordered(sd);
     727             : 
     728        1552 :     CatalogCloseIndexes(indstate);
     729             : 
     730             :     /*
     731             :      * Advance command counter so that my tuple updates will be seen by later
     732             :      * large-object operations in this transaction.
     733             :      */
     734        1552 :     CommandCounterIncrement();
     735             : 
     736        1552 :     return nwritten;
     737             : }
     738             : 
     739             : void
     740          42 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
     741             : {
     742          42 :     int32       pageno = (int32) (len / LOBLKSIZE);
     743             :     int32       off;
     744             :     ScanKeyData skey[2];
     745             :     SysScanDesc sd;
     746             :     HeapTuple   oldtuple;
     747             :     Form_pg_largeobject olddata;
     748             :     union
     749             :     {
     750             :         bytea       hdr;
     751             :         /* this is to make the union big enough for a LO data chunk: */
     752             :         char        data[LOBLKSIZE + VARHDRSZ];
     753             :         /* ensure union is aligned well enough: */
     754             :         int32       align_it;
     755             :     }           workbuf;
     756          42 :     char       *workb = VARDATA(&workbuf.hdr);
     757             :     HeapTuple   newtup;
     758             :     Datum       values[Natts_pg_largeobject];
     759             :     bool        nulls[Natts_pg_largeobject];
     760             :     bool        replace[Natts_pg_largeobject];
     761             :     CatalogIndexState indstate;
     762             : 
     763             :     Assert(PointerIsValid(obj_desc));
     764             : 
     765             :     /* enforce writability because snapshot is probably wrong otherwise */
     766          42 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     767           0 :         ereport(ERROR,
     768             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     769             :                  errmsg("permission denied for large object %u",
     770             :                         obj_desc->id)));
     771             : 
     772             :     /*
     773             :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     774             :      * in translatable strings; doing better is not worth the trouble
     775             :      */
     776          42 :     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
     777           0 :         ereport(ERROR,
     778             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     779             :                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
     780             :                                  len)));
     781             : 
     782          42 :     open_lo_relation();
     783             : 
     784          42 :     indstate = CatalogOpenIndexes(lo_heap_r);
     785             : 
     786             :     /*
     787             :      * Set up to find all pages with desired loid and pageno >= target
     788             :      */
     789          42 :     ScanKeyInit(&skey[0],
     790             :                 Anum_pg_largeobject_loid,
     791             :                 BTEqualStrategyNumber, F_OIDEQ,
     792             :                 ObjectIdGetDatum(obj_desc->id));
     793             : 
     794          42 :     ScanKeyInit(&skey[1],
     795             :                 Anum_pg_largeobject_pageno,
     796             :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     797             :                 Int32GetDatum(pageno));
     798             : 
     799          42 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     800             :                                     obj_desc->snapshot, 2, skey);
     801             : 
     802             :     /*
     803             :      * If possible, get the page the truncation point is in. The truncation
     804             :      * point may be beyond the end of the LO or in a hole.
     805             :      */
     806          42 :     olddata = NULL;
     807          42 :     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     808             :     {
     809          24 :         if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     810           0 :             elog(ERROR, "null field found in pg_largeobject");
     811          24 :         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     812             :         Assert(olddata->pageno >= pageno);
     813             :     }
     814             : 
     815             :     /*
     816             :      * If we found the page of the truncation point we need to truncate the
     817             :      * data in it.  Otherwise if we're in a hole, we need to create a page to
     818             :      * mark the end of data.
     819             :      */
     820          42 :     if (olddata != NULL && olddata->pageno == pageno)
     821          12 :     {
     822             :         /* First, load old data into workbuf */
     823             :         bytea      *datafield;
     824             :         int         pagelen;
     825             :         bool        pfreeit;
     826             : 
     827          12 :         getdatafield(olddata, &datafield, &pagelen, &pfreeit);
     828          12 :         memcpy(workb, VARDATA(datafield), pagelen);
     829          12 :         if (pfreeit)
     830           6 :             pfree(datafield);
     831             : 
     832             :         /*
     833             :          * Fill any hole
     834             :          */
     835          12 :         off = len % LOBLKSIZE;
     836          12 :         if (off > pagelen)
     837           6 :             MemSet(workb + pagelen, 0, off - pagelen);
     838             : 
     839             :         /* compute length of new page */
     840          12 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     841             : 
     842             :         /*
     843             :          * Form and insert updated tuple
     844             :          */
     845          12 :         memset(values, 0, sizeof(values));
     846          12 :         memset(nulls, false, sizeof(nulls));
     847          12 :         memset(replace, false, sizeof(replace));
     848          12 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     849          12 :         replace[Anum_pg_largeobject_data - 1] = true;
     850          12 :         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     851             :                                    values, nulls, replace);
     852          12 :         CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     853             :                                    indstate);
     854          12 :         heap_freetuple(newtup);
     855             :     }
     856             :     else
     857             :     {
     858             :         /*
     859             :          * If the first page we found was after the truncation point, we're in
     860             :          * a hole that we'll fill, but we need to delete the later page
     861             :          * because the loop below won't visit it again.
     862             :          */
     863          30 :         if (olddata != NULL)
     864             :         {
     865             :             Assert(olddata->pageno > pageno);
     866          12 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     867             :         }
     868             : 
     869             :         /*
     870             :          * Write a brand new page.
     871             :          *
     872             :          * Fill the hole up to the truncation point
     873             :          */
     874          30 :         off = len % LOBLKSIZE;
     875          30 :         if (off > 0)
     876          30 :             MemSet(workb, 0, off);
     877             : 
     878             :         /* compute length of new page */
     879          30 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     880             : 
     881             :         /*
     882             :          * Form and insert new tuple
     883             :          */
     884          30 :         memset(values, 0, sizeof(values));
     885          30 :         memset(nulls, false, sizeof(nulls));
     886          30 :         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     887          30 :         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     888          30 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     889          30 :         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     890          30 :         CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     891          30 :         heap_freetuple(newtup);
     892             :     }
     893             : 
     894             :     /*
     895             :      * Delete any pages after the truncation point.  If the initial search
     896             :      * didn't find a page, then of course there's nothing more to do.
     897             :      */
     898          42 :     if (olddata != NULL)
     899             :     {
     900          30 :         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     901             :         {
     902           6 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     903             :         }
     904             :     }
     905             : 
     906          42 :     systable_endscan_ordered(sd);
     907             : 
     908          42 :     CatalogCloseIndexes(indstate);
     909             : 
     910             :     /*
     911             :      * Advance command counter so that tuple updates will be seen by later
     912             :      * large-object operations in this transaction.
     913             :      */
     914          42 :     CommandCounterIncrement();
     915          42 : }

Generated by: LCOV version 1.14