LCOV - code coverage report
Current view: top level - src/backend/storage/large_object - inv_api.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.4 % 286 270
Test Date: 2026-03-10 21:14:59 Functions: 100.0 % 13 13
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * inv_api.c
       4              :  *    routines for manipulating inversion fs large objects. This file
       5              :  *    contains the user-level large object application interface routines.
       6              :  *
       7              :  *
       8              :  * Note: we access pg_largeobject.data using its C struct declaration.
       9              :  * This is safe because it immediately follows pageno which is an int4 field,
      10              :  * and therefore the data field will always be 4-byte aligned, even if it
      11              :  * is in the short 1-byte-header format.  We have to detoast it since it's
      12              :  * quite likely to be in compressed or short format.  We also need to check
      13              :  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
      14              :  *
      15              :  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
      16              :  * does most of the backend code.  We expect that CurrentMemoryContext will
      17              :  * be a short-lived context.  Data that must persist across function calls
      18              :  * is kept either in CacheMemoryContext (the Relation structs) or in the
      19              :  * memory context given to inv_open (for LargeObjectDesc structs).
      20              :  *
      21              :  *
      22              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      23              :  * Portions Copyright (c) 1994, Regents of the University of California
      24              :  *
      25              :  *
      26              :  * IDENTIFICATION
      27              :  *    src/backend/storage/large_object/inv_api.c
      28              :  *
      29              :  *-------------------------------------------------------------------------
      30              :  */
      31              : #include "postgres.h"
      32              : 
      33              : #include <limits.h>
      34              : 
      35              : #include "access/detoast.h"
      36              : #include "access/genam.h"
      37              : #include "access/htup_details.h"
      38              : #include "access/table.h"
      39              : #include "access/xact.h"
      40              : #include "catalog/dependency.h"
      41              : #include "catalog/indexing.h"
      42              : #include "catalog/objectaccess.h"
      43              : #include "catalog/pg_largeobject.h"
      44              : #include "libpq/libpq-fs.h"
      45              : #include "miscadmin.h"
      46              : #include "storage/large_object.h"
      47              : #include "utils/acl.h"
      48              : #include "utils/fmgroids.h"
      49              : #include "utils/rel.h"
      50              : #include "utils/snapmgr.h"
      51              : 
      52              : 
      53              : /*
      54              :  * GUC: backwards-compatibility flag to suppress LO permission checks
      55              :  */
      56              : bool        lo_compat_privileges;
      57              : 
      58              : /*
      59              :  * All accesses to pg_largeobject and its index make use of a single
      60              :  * Relation reference.  To guarantee that the relcache entry remains
      61              :  * in the cache, on the first reference inside a subtransaction, we
      62              :  * execute a slightly klugy maneuver to assign ownership of the
      63              :  * Relation reference to TopTransactionResourceOwner.
      64              :  */
      65              : static Relation lo_heap_r = NULL;
      66              : static Relation lo_index_r = NULL;
      67              : 
      68              : 
      69              : /*
      70              :  * Open pg_largeobject and its index, if not already done in current xact
      71              :  */
      72              : static void
      73         1603 : open_lo_relation(void)
      74              : {
      75              :     ResourceOwner currentOwner;
      76              : 
      77         1603 :     if (lo_heap_r && lo_index_r)
      78         1404 :         return;                 /* already open in current xact */
      79              : 
      80              :     /* Arrange for the top xact to own these relation references */
      81          199 :     currentOwner = CurrentResourceOwner;
      82          199 :     CurrentResourceOwner = TopTransactionResourceOwner;
      83              : 
      84              :     /* Use RowExclusiveLock since we might either read or write */
      85          199 :     if (lo_heap_r == NULL)
      86          199 :         lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
      87          199 :     if (lo_index_r == NULL)
      88          199 :         lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
      89              : 
      90          199 :     CurrentResourceOwner = currentOwner;
      91              : }
      92              : 
      93              : /*
      94              :  * Clean up at main transaction end
      95              :  */
      96              : void
      97          291 : close_lo_relation(bool isCommit)
      98              : {
      99          291 :     if (lo_heap_r || lo_index_r)
     100              :     {
     101              :         /*
     102              :          * Only bother to close if committing; else abort cleanup will handle
     103              :          * it
     104              :          */
     105          199 :         if (isCommit)
     106              :         {
     107              :             ResourceOwner currentOwner;
     108              : 
     109          153 :             currentOwner = CurrentResourceOwner;
     110          153 :             CurrentResourceOwner = TopTransactionResourceOwner;
     111              : 
     112          153 :             if (lo_index_r)
     113          153 :                 index_close(lo_index_r, NoLock);
     114          153 :             if (lo_heap_r)
     115          153 :                 table_close(lo_heap_r, NoLock);
     116              : 
     117          153 :             CurrentResourceOwner = currentOwner;
     118              :         }
     119          199 :         lo_heap_r = NULL;
     120          199 :         lo_index_r = NULL;
     121              :     }
     122          291 : }
     123              : 
     124              : 
     125              : /*
     126              :  * Extract data field from a pg_largeobject tuple, detoasting if needed
     127              :  * and verifying that the length is sane.  Returns data pointer (a bytea *),
     128              :  * data length, and an indication of whether to pfree the data pointer.
     129              :  */
     130              : static void
     131         5168 : getdatafield(Form_pg_largeobject tuple,
     132              :              bytea **pdatafield,
     133              :              int *plen,
     134              :              bool *pfreeit)
     135              : {
     136              :     bytea      *datafield;
     137              :     int         len;
     138              :     bool        freeit;
     139              : 
     140         5168 :     datafield = &(tuple->data); /* see note at top of file */
     141         5168 :     freeit = false;
     142         5168 :     if (VARATT_IS_EXTENDED(datafield))
     143              :     {
     144              :         datafield = (bytea *)
     145         5085 :             detoast_attr((varlena *) datafield);
     146         5085 :         freeit = true;
     147              :     }
     148         5168 :     len = VARSIZE(datafield) - VARHDRSZ;
     149         5168 :     if (len < 0 || len > LOBLKSIZE)
     150            0 :         ereport(ERROR,
     151              :                 (errcode(ERRCODE_DATA_CORRUPTED),
     152              :                  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
     153              :                         tuple->loid, tuple->pageno, len)));
     154         5168 :     *pdatafield = datafield;
     155         5168 :     *plen = len;
     156         5168 :     *pfreeit = freeit;
     157         5168 : }
     158              : 
     159              : 
     160              : /*
     161              :  *  inv_create -- create a new large object
     162              :  *
     163              :  *  Arguments:
     164              :  *    lobjId - OID to use for new large object, or InvalidOid to pick one
     165              :  *
     166              :  *  Returns:
     167              :  *    OID of new object
     168              :  *
     169              :  * If lobjId is not InvalidOid, then an error occurs if the OID is already
     170              :  * in use.
     171              :  */
     172              : Oid
     173           84 : inv_create(Oid lobjId)
     174              : {
     175              :     Oid         lobjId_new;
     176              : 
     177              :     /*
     178              :      * Create a new largeobject with empty data pages
     179              :      */
     180           84 :     lobjId_new = LargeObjectCreate(lobjId);
     181              : 
     182              :     /*
     183              :      * dependency on the owner of largeobject
     184              :      *
     185              :      * Note that LO dependencies are recorded using classId
     186              :      * LargeObjectRelationId for backwards-compatibility reasons.  Using
     187              :      * LargeObjectMetadataRelationId instead would simplify matters for the
     188              :      * backend, but it'd complicate pg_dump and possibly break other clients.
     189              :      */
     190           84 :     recordDependencyOnOwner(LargeObjectRelationId,
     191              :                             lobjId_new, GetUserId());
     192              : 
     193              :     /* Post creation hook for new large object */
     194           84 :     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
     195              : 
     196              :     /*
     197              :      * Advance command counter to make new tuple visible to later operations.
     198              :      */
     199           84 :     CommandCounterIncrement();
     200              : 
     201           84 :     return lobjId_new;
     202              : }
     203              : 
     204              : /*
     205              :  *  inv_open -- access an existing large object.
     206              :  *
     207              :  * Returns a large object descriptor, appropriately filled in.
     208              :  * The descriptor and subsidiary data are allocated in the specified
     209              :  * memory context, which must be suitably long-lived for the caller's
     210              :  * purposes.  If the returned descriptor has a snapshot associated
     211              :  * with it, the caller must ensure that it also lives long enough,
     212              :  * e.g. by calling RegisterSnapshotOnOwner
     213              :  */
     214              : LargeObjectDesc *
     215          299 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
     216              : {
     217              :     LargeObjectDesc *retval;
     218          299 :     Snapshot    snapshot = NULL;
     219          299 :     int         descflags = 0;
     220              : 
     221              :     /*
     222              :      * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
     223              :      * | INV_READ), the caller being allowed to read the large object
     224              :      * descriptor in either case.
     225              :      */
     226          299 :     if (flags & INV_WRITE)
     227          111 :         descflags |= IFS_WRLOCK | IFS_RDLOCK;
     228          299 :     if (flags & INV_READ)
     229          203 :         descflags |= IFS_RDLOCK;
     230              : 
     231          299 :     if (descflags == 0)
     232            0 :         ereport(ERROR,
     233              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     234              :                  errmsg("invalid flags for opening a large object: %d",
     235              :                         flags)));
     236              : 
     237              :     /* Get snapshot.  If write is requested, use an instantaneous snapshot. */
     238          299 :     if (descflags & IFS_WRLOCK)
     239          111 :         snapshot = NULL;
     240              :     else
     241          188 :         snapshot = GetActiveSnapshot();
     242              : 
     243              :     /* Can't use LargeObjectExists here because we need to specify snapshot */
     244          299 :     if (!LargeObjectExistsWithSnapshot(lobjId, snapshot))
     245            5 :         ereport(ERROR,
     246              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     247              :                  errmsg("large object %u does not exist", lobjId)));
     248              : 
     249              :     /* Apply permission checks, again specifying snapshot */
     250          294 :     if ((descflags & IFS_RDLOCK) != 0)
     251              :     {
     252          579 :         if (!lo_compat_privileges &&
     253          285 :             pg_largeobject_aclcheck_snapshot(lobjId,
     254              :                                              GetUserId(),
     255              :                                              ACL_SELECT,
     256              :                                              snapshot) != ACLCHECK_OK)
     257           21 :             ereport(ERROR,
     258              :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     259              :                      errmsg("permission denied for large object %u",
     260              :                             lobjId)));
     261              :     }
     262          273 :     if ((descflags & IFS_WRLOCK) != 0)
     263              :     {
     264          192 :         if (!lo_compat_privileges &&
     265           93 :             pg_largeobject_aclcheck_snapshot(lobjId,
     266              :                                              GetUserId(),
     267              :                                              ACL_UPDATE,
     268              :                                              snapshot) != ACLCHECK_OK)
     269           15 :             ereport(ERROR,
     270              :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     271              :                      errmsg("permission denied for large object %u",
     272              :                             lobjId)));
     273              :     }
     274              : 
     275              :     /* OK to create a descriptor */
     276          258 :     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
     277              :                                                     sizeof(LargeObjectDesc));
     278          258 :     retval->id = lobjId;
     279          258 :     retval->offset = 0;
     280          258 :     retval->flags = descflags;
     281              : 
     282              :     /* caller sets if needed, not used by the functions in this file */
     283          258 :     retval->subid = InvalidSubTransactionId;
     284              : 
     285              :     /*
     286              :      * The snapshot (if any) is just the currently active snapshot.  The
     287              :      * caller will replace it with a longer-lived copy if needed.
     288              :      */
     289          258 :     retval->snapshot = snapshot;
     290              : 
     291          258 :     return retval;
     292              : }
     293              : 
     294              : /*
     295              :  * Closes a large object descriptor previously made by inv_open(), and
     296              :  * releases the long-term memory used by it.
     297              :  */
     298              : void
     299          243 : inv_close(LargeObjectDesc *obj_desc)
     300              : {
     301              :     Assert(obj_desc);
     302          243 :     pfree(obj_desc);
     303          243 : }
     304              : 
     305              : /*
     306              :  * Destroys an existing large object (not to be confused with a descriptor!)
     307              :  *
     308              :  * Note we expect caller to have done any required permissions check.
     309              :  */
     310              : int
     311           44 : inv_drop(Oid lobjId)
     312              : {
     313              :     ObjectAddress object;
     314              : 
     315              :     /*
     316              :      * Delete any comments and dependencies on the large object
     317              :      */
     318           44 :     object.classId = LargeObjectRelationId;
     319           44 :     object.objectId = lobjId;
     320           44 :     object.objectSubId = 0;
     321           44 :     performDeletion(&object, DROP_CASCADE, 0);
     322              : 
     323              :     /*
     324              :      * Advance command counter so that tuple removal will be seen by later
     325              :      * large-object operations in this transaction.
     326              :      */
     327           44 :     CommandCounterIncrement();
     328              : 
     329              :     /* For historical reasons, we always return 1 on success. */
     330           44 :     return 1;
     331              : }
     332              : 
     333              : /*
     334              :  * Determine size of a large object
     335              :  *
     336              :  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
     337              :  * the offset of the last byte + 1.
     338              :  */
     339              : static uint64
     340           72 : inv_getsize(LargeObjectDesc *obj_desc)
     341              : {
     342           72 :     uint64      lastbyte = 0;
     343              :     ScanKeyData skey[1];
     344              :     SysScanDesc sd;
     345              :     HeapTuple   tuple;
     346              : 
     347              :     Assert(obj_desc);
     348              : 
     349           72 :     open_lo_relation();
     350              : 
     351           72 :     ScanKeyInit(&skey[0],
     352              :                 Anum_pg_largeobject_loid,
     353              :                 BTEqualStrategyNumber, F_OIDEQ,
     354              :                 ObjectIdGetDatum(obj_desc->id));
     355              : 
     356           72 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     357              :                                     obj_desc->snapshot, 1, skey);
     358              : 
     359              :     /*
     360              :      * Because the pg_largeobject index is on both loid and pageno, but we
     361              :      * constrain only loid, a backwards scan should visit all pages of the
     362              :      * large object in reverse pageno order.  So, it's sufficient to examine
     363              :      * the first valid tuple (== last valid page).
     364              :      */
     365           72 :     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
     366           72 :     if (HeapTupleIsValid(tuple))
     367              :     {
     368              :         Form_pg_largeobject data;
     369              :         bytea      *datafield;
     370              :         int         len;
     371              :         bool        pfreeit;
     372              : 
     373           60 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     374            0 :             elog(ERROR, "null field found in pg_largeobject");
     375           60 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     376           60 :         getdatafield(data, &datafield, &len, &pfreeit);
     377           60 :         lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
     378           60 :         if (pfreeit)
     379           21 :             pfree(datafield);
     380              :     }
     381              : 
     382           72 :     systable_endscan_ordered(sd);
     383              : 
     384           72 :     return lastbyte;
     385              : }
     386              : 
     387              : int64
     388          153 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
     389              : {
     390              :     int64       newoffset;
     391              : 
     392              :     Assert(obj_desc);
     393              : 
     394              :     /*
     395              :      * We allow seek/tell if you have either read or write permission, so no
     396              :      * need for a permission check here.
     397              :      */
     398              : 
     399              :     /*
     400              :      * Note: overflow in the additions is possible, but since we will reject
     401              :      * negative results, we don't need any extra test for that.
     402              :      */
     403          153 :     switch (whence)
     404              :     {
     405           72 :         case SEEK_SET:
     406           72 :             newoffset = offset;
     407           72 :             break;
     408            9 :         case SEEK_CUR:
     409            9 :             newoffset = obj_desc->offset + offset;
     410            9 :             break;
     411           72 :         case SEEK_END:
     412           72 :             newoffset = inv_getsize(obj_desc) + offset;
     413           72 :             break;
     414            0 :         default:
     415            0 :             ereport(ERROR,
     416              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     417              :                      errmsg("invalid whence setting: %d", whence)));
     418              :             newoffset = 0;      /* keep compiler quiet */
     419              :             break;
     420              :     }
     421              : 
     422              :     /*
     423              :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     424              :      * in translatable strings; doing better is not worth the trouble
     425              :      */
     426          153 :     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
     427            0 :         ereport(ERROR,
     428              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     429              :                  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
     430              :                                  newoffset)));
     431              : 
     432          153 :     obj_desc->offset = newoffset;
     433          153 :     return newoffset;
     434              : }
     435              : 
     436              : int64
     437           24 : inv_tell(LargeObjectDesc *obj_desc)
     438              : {
     439              :     Assert(obj_desc);
     440              : 
     441              :     /*
     442              :      * We allow seek/tell if you have either read or write permission, so no
     443              :      * need for a permission check here.
     444              :      */
     445              : 
     446           24 :     return obj_desc->offset;
     447              : }
     448              : 
     449              : int
     450          721 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
     451              : {
     452          721 :     int         nread = 0;
     453              :     int64       n;
     454              :     int64       off;
     455              :     int         len;
     456          721 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     457              :     uint64      pageoff;
     458              :     ScanKeyData skey[2];
     459              :     SysScanDesc sd;
     460              :     HeapTuple   tuple;
     461              : 
     462              :     Assert(obj_desc);
     463              :     Assert(buf != NULL);
     464              : 
     465          721 :     if ((obj_desc->flags & IFS_RDLOCK) == 0)
     466            0 :         ereport(ERROR,
     467              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     468              :                  errmsg("permission denied for large object %u",
     469              :                         obj_desc->id)));
     470              : 
     471          721 :     if (nbytes <= 0)
     472           12 :         return 0;
     473              : 
     474          709 :     open_lo_relation();
     475              : 
     476          709 :     ScanKeyInit(&skey[0],
     477              :                 Anum_pg_largeobject_loid,
     478              :                 BTEqualStrategyNumber, F_OIDEQ,
     479              :                 ObjectIdGetDatum(obj_desc->id));
     480              : 
     481          709 :     ScanKeyInit(&skey[1],
     482              :                 Anum_pg_largeobject_pageno,
     483              :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     484              :                 Int32GetDatum(pageno));
     485              : 
     486          709 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     487              :                                     obj_desc->snapshot, 2, skey);
     488              : 
     489         5250 :     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     490              :     {
     491              :         Form_pg_largeobject data;
     492              :         bytea      *datafield;
     493              :         bool        pfreeit;
     494              : 
     495         5084 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     496            0 :             elog(ERROR, "null field found in pg_largeobject");
     497         5084 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     498              : 
     499              :         /*
     500              :          * We expect the indexscan will deliver pages in order.  However,
     501              :          * there may be missing pages if the LO contains unwritten "holes". We
     502              :          * want missing sections to read out as zeroes.
     503              :          */
     504         5084 :         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
     505         5084 :         if (pageoff > obj_desc->offset)
     506              :         {
     507            6 :             n = pageoff - obj_desc->offset;
     508            6 :             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     509            6 :             MemSet(buf + nread, 0, n);
     510            6 :             nread += n;
     511            6 :             obj_desc->offset += n;
     512              :         }
     513              : 
     514         5084 :         if (nread < nbytes)
     515              :         {
     516              :             Assert(obj_desc->offset >= pageoff);
     517         5081 :             off = (int) (obj_desc->offset - pageoff);
     518              :             Assert(off >= 0 && off < LOBLKSIZE);
     519              : 
     520         5081 :             getdatafield(data, &datafield, &len, &pfreeit);
     521         5081 :             if (len > off)
     522              :             {
     523         5023 :                 n = len - off;
     524         5023 :                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     525         5023 :                 memcpy(buf + nread, VARDATA(datafield) + off, n);
     526         5023 :                 nread += n;
     527         5023 :                 obj_desc->offset += n;
     528              :             }
     529         5081 :             if (pfreeit)
     530         5043 :                 pfree(datafield);
     531              :         }
     532              : 
     533         5084 :         if (nread >= nbytes)
     534          543 :             break;
     535              :     }
     536              : 
     537          709 :     systable_endscan_ordered(sd);
     538              : 
     539          709 :     return nread;
     540              : }
     541              : 
     542              : int
     543          798 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
     544              : {
     545          798 :     int         nwritten = 0;
     546              :     int         n;
     547              :     int         off;
     548              :     int         len;
     549          798 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     550              :     ScanKeyData skey[2];
     551              :     SysScanDesc sd;
     552              :     HeapTuple   oldtuple;
     553              :     Form_pg_largeobject olddata;
     554              :     bool        neednextpage;
     555              :     bytea      *datafield;
     556              :     bool        pfreeit;
     557              :     union
     558              :     {
     559              :         alignas(int32) bytea hdr;
     560              :         /* this is to make the union big enough for a LO data chunk: */
     561              :         char        data[LOBLKSIZE + VARHDRSZ];
     562          798 :     }           workbuf = {0};
     563          798 :     char       *workb = VARDATA(&workbuf.hdr);
     564              :     HeapTuple   newtup;
     565              :     Datum       values[Natts_pg_largeobject];
     566              :     bool        nulls[Natts_pg_largeobject];
     567              :     bool        replace[Natts_pg_largeobject];
     568              :     CatalogIndexState indstate;
     569              : 
     570              :     Assert(obj_desc);
     571              :     Assert(buf != NULL);
     572              : 
     573              :     /* enforce writability because snapshot is probably wrong otherwise */
     574          798 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     575            0 :         ereport(ERROR,
     576              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     577              :                  errmsg("permission denied for large object %u",
     578              :                         obj_desc->id)));
     579              : 
     580          798 :     if (nbytes <= 0)
     581            0 :         return 0;
     582              : 
     583              :     /* this addition can't overflow because nbytes is only int32 */
     584          798 :     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
     585            0 :         ereport(ERROR,
     586              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     587              :                  errmsg("invalid large object write request size: %d",
     588              :                         nbytes)));
     589              : 
     590          798 :     open_lo_relation();
     591              : 
     592          798 :     indstate = CatalogOpenIndexes(lo_heap_r);
     593              : 
     594          798 :     ScanKeyInit(&skey[0],
     595              :                 Anum_pg_largeobject_loid,
     596              :                 BTEqualStrategyNumber, F_OIDEQ,
     597              :                 ObjectIdGetDatum(obj_desc->id));
     598              : 
     599          798 :     ScanKeyInit(&skey[1],
     600              :                 Anum_pg_largeobject_pageno,
     601              :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     602              :                 Int32GetDatum(pageno));
     603              : 
     604          798 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     605              :                                     obj_desc->snapshot, 2, skey);
     606              : 
     607          798 :     oldtuple = NULL;
     608          798 :     olddata = NULL;
     609          798 :     neednextpage = true;
     610              : 
     611         4794 :     while (nwritten < nbytes)
     612              :     {
     613              :         /*
     614              :          * If possible, get next pre-existing page of the LO.  We expect the
     615              :          * indexscan will deliver these in order --- but there may be holes.
     616              :          */
     617         3996 :         if (neednextpage)
     618              :         {
     619          801 :             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     620              :             {
     621           18 :                 if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     622            0 :                     elog(ERROR, "null field found in pg_largeobject");
     623           18 :                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     624              :                 Assert(olddata->pageno >= pageno);
     625              :             }
     626          801 :             neednextpage = false;
     627              :         }
     628              : 
     629              :         /*
     630              :          * If we have a pre-existing page, see if it is the page we want to
     631              :          * write, or a later one.
     632              :          */
     633         3996 :         if (olddata != NULL && olddata->pageno == pageno)
     634              :         {
     635              :             /*
     636              :              * Update an existing page with fresh data.
     637              :              *
     638              :              * First, load old data into workbuf
     639              :              */
     640           18 :             getdatafield(olddata, &datafield, &len, &pfreeit);
     641           18 :             memcpy(workb, VARDATA(datafield), len);
     642           18 :             if (pfreeit)
     643           15 :                 pfree(datafield);
     644              : 
     645              :             /*
     646              :              * Fill any hole
     647              :              */
     648           18 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     649           18 :             if (off > len)
     650            0 :                 MemSet(workb + len, 0, off - len);
     651              : 
     652              :             /*
     653              :              * Insert appropriate portion of new data
     654              :              */
     655           18 :             n = LOBLKSIZE - off;
     656           18 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     657           18 :             memcpy(workb + off, buf + nwritten, n);
     658           18 :             nwritten += n;
     659           18 :             obj_desc->offset += n;
     660           18 :             off += n;
     661              :             /* compute valid length of new page */
     662           18 :             len = (len >= off) ? len : off;
     663           18 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     664              : 
     665              :             /*
     666              :              * Form and insert updated tuple
     667              :              */
     668           18 :             memset(values, 0, sizeof(values));
     669           18 :             memset(nulls, false, sizeof(nulls));
     670           18 :             memset(replace, false, sizeof(replace));
     671           18 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     672           18 :             replace[Anum_pg_largeobject_data - 1] = true;
     673           18 :             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     674              :                                        values, nulls, replace);
     675           18 :             CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     676              :                                        indstate);
     677           18 :             heap_freetuple(newtup);
     678              : 
     679              :             /*
     680              :              * We're done with this old page.
     681              :              */
     682           18 :             oldtuple = NULL;
     683           18 :             olddata = NULL;
     684           18 :             neednextpage = true;
     685              :         }
     686              :         else
     687              :         {
     688              :             /*
     689              :              * Write a brand new page.
     690              :              *
     691              :              * First, fill any hole
     692              :              */
     693         3978 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     694         3978 :             if (off > 0)
     695            3 :                 MemSet(workb, 0, off);
     696              : 
     697              :             /*
     698              :              * Insert appropriate portion of new data
     699              :              */
     700         3978 :             n = LOBLKSIZE - off;
     701         3978 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     702         3978 :             memcpy(workb + off, buf + nwritten, n);
     703         3978 :             nwritten += n;
     704         3978 :             obj_desc->offset += n;
     705              :             /* compute valid length of new page */
     706         3978 :             len = off + n;
     707         3978 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     708              : 
     709              :             /*
     710              :              * Form and insert updated tuple
     711              :              */
     712         3978 :             memset(values, 0, sizeof(values));
     713         3978 :             memset(nulls, false, sizeof(nulls));
     714         3978 :             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     715         3978 :             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     716         3978 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     717         3978 :             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     718         3978 :             CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     719         3978 :             heap_freetuple(newtup);
     720              :         }
     721         3996 :         pageno++;
     722              :     }
     723              : 
     724          798 :     systable_endscan_ordered(sd);
     725              : 
     726          798 :     CatalogCloseIndexes(indstate);
     727              : 
     728              :     /*
     729              :      * Advance command counter so that my tuple updates will be seen by later
     730              :      * large-object operations in this transaction.
     731              :      */
     732          798 :     CommandCounterIncrement();
     733              : 
     734          798 :     return nwritten;
     735              : }
     736              : 
     737              : void
     738           24 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
     739              : {
     740           24 :     int32       pageno = (int32) (len / LOBLKSIZE);
     741              :     int32       off;
     742              :     ScanKeyData skey[2];
     743              :     SysScanDesc sd;
     744              :     HeapTuple   oldtuple;
     745              :     Form_pg_largeobject olddata;
     746              :     union
     747              :     {
     748              :         alignas(int32) bytea hdr;
     749              :         /* this is to make the union big enough for a LO data chunk: */
     750              :         char        data[LOBLKSIZE + VARHDRSZ];
     751           24 :     }           workbuf = {0};
     752           24 :     char       *workb = VARDATA(&workbuf.hdr);
     753              :     HeapTuple   newtup;
     754              :     Datum       values[Natts_pg_largeobject];
     755              :     bool        nulls[Natts_pg_largeobject];
     756              :     bool        replace[Natts_pg_largeobject];
     757              :     CatalogIndexState indstate;
     758              : 
     759              :     Assert(obj_desc);
     760              : 
     761              :     /* enforce writability because snapshot is probably wrong otherwise */
     762           24 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     763            0 :         ereport(ERROR,
     764              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     765              :                  errmsg("permission denied for large object %u",
     766              :                         obj_desc->id)));
     767              : 
     768              :     /*
     769              :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     770              :      * in translatable strings; doing better is not worth the trouble
     771              :      */
     772           24 :     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
     773            0 :         ereport(ERROR,
     774              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     775              :                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
     776              :                                  len)));
     777              : 
     778           24 :     open_lo_relation();
     779              : 
     780           24 :     indstate = CatalogOpenIndexes(lo_heap_r);
     781              : 
     782              :     /*
     783              :      * Set up to find all pages with desired loid and pageno >= target
     784              :      */
     785           24 :     ScanKeyInit(&skey[0],
     786              :                 Anum_pg_largeobject_loid,
     787              :                 BTEqualStrategyNumber, F_OIDEQ,
     788              :                 ObjectIdGetDatum(obj_desc->id));
     789              : 
     790           24 :     ScanKeyInit(&skey[1],
     791              :                 Anum_pg_largeobject_pageno,
     792              :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     793              :                 Int32GetDatum(pageno));
     794              : 
     795           24 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     796              :                                     obj_desc->snapshot, 2, skey);
     797              : 
     798              :     /*
     799              :      * If possible, get the page the truncation point is in. The truncation
     800              :      * point may be beyond the end of the LO or in a hole.
     801              :      */
     802           24 :     olddata = NULL;
     803           24 :     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     804              :     {
     805           15 :         if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     806            0 :             elog(ERROR, "null field found in pg_largeobject");
     807           15 :         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     808              :         Assert(olddata->pageno >= pageno);
     809              :     }
     810              : 
     811              :     /*
     812              :      * If we found the page of the truncation point we need to truncate the
     813              :      * data in it.  Otherwise if we're in a hole, we need to create a page to
     814              :      * mark the end of data.
     815              :      */
     816           24 :     if (olddata != NULL && olddata->pageno == pageno)
     817            9 :     {
     818              :         /* First, load old data into workbuf */
     819              :         bytea      *datafield;
     820              :         int         pagelen;
     821              :         bool        pfreeit;
     822              : 
     823            9 :         getdatafield(olddata, &datafield, &pagelen, &pfreeit);
     824            9 :         memcpy(workb, VARDATA(datafield), pagelen);
     825            9 :         if (pfreeit)
     826            6 :             pfree(datafield);
     827              : 
     828              :         /*
     829              :          * Fill any hole
     830              :          */
     831            9 :         off = len % LOBLKSIZE;
     832            9 :         if (off > pagelen)
     833            3 :             MemSet(workb + pagelen, 0, off - pagelen);
     834              : 
     835              :         /* compute length of new page */
     836            9 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     837              : 
     838              :         /*
     839              :          * Form and insert updated tuple
     840              :          */
     841            9 :         memset(values, 0, sizeof(values));
     842            9 :         memset(nulls, false, sizeof(nulls));
     843            9 :         memset(replace, false, sizeof(replace));
     844            9 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     845            9 :         replace[Anum_pg_largeobject_data - 1] = true;
     846            9 :         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     847              :                                    values, nulls, replace);
     848            9 :         CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     849              :                                    indstate);
     850            9 :         heap_freetuple(newtup);
     851              :     }
     852              :     else
     853              :     {
     854              :         /*
     855              :          * If the first page we found was after the truncation point, we're in
     856              :          * a hole that we'll fill, but we need to delete the later page
     857              :          * because the loop below won't visit it again.
     858              :          */
     859           15 :         if (olddata != NULL)
     860              :         {
     861              :             Assert(olddata->pageno > pageno);
     862            6 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     863              :         }
     864              : 
     865              :         /*
     866              :          * Write a brand new page.
     867              :          *
     868              :          * Fill the hole up to the truncation point
     869              :          */
     870           15 :         off = len % LOBLKSIZE;
     871           15 :         if (off > 0)
     872           15 :             MemSet(workb, 0, off);
     873              : 
     874              :         /* compute length of new page */
     875           15 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     876              : 
     877              :         /*
     878              :          * Form and insert new tuple
     879              :          */
     880           15 :         memset(values, 0, sizeof(values));
     881           15 :         memset(nulls, false, sizeof(nulls));
     882           15 :         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     883           15 :         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     884           15 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     885           15 :         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     886           15 :         CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     887           15 :         heap_freetuple(newtup);
     888              :     }
     889              : 
     890              :     /*
     891              :      * Delete any pages after the truncation point.  If the initial search
     892              :      * didn't find a page, then of course there's nothing more to do.
     893              :      */
     894           24 :     if (olddata != NULL)
     895              :     {
     896           18 :         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     897              :         {
     898            3 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     899              :         }
     900              :     }
     901              : 
     902           24 :     systable_endscan_ordered(sd);
     903              : 
     904           24 :     CatalogCloseIndexes(indstate);
     905              : 
     906              :     /*
     907              :      * Advance command counter so that tuple updates will be seen by later
     908              :      * large-object operations in this transaction.
     909              :      */
     910           24 :     CommandCounterIncrement();
     911           24 : }
        

Generated by: LCOV version 2.0-1