Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * inv_api.c
4 : * routines for manipulating inversion fs large objects. This file
5 : * contains the user-level large object application interface routines.
6 : *
7 : *
8 : * Note: we access pg_largeobject.data using its C struct declaration.
9 : * This is safe because it immediately follows pageno which is an int4 field,
10 : * and therefore the data field will always be 4-byte aligned, even if it
11 : * is in the short 1-byte-header format. We have to detoast it since it's
12 : * quite likely to be in compressed or short format. We also need to check
13 : * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 : *
15 : * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 : * does most of the backend code. We expect that CurrentMemoryContext will
17 : * be a short-lived context. Data that must persist across function calls
18 : * is kept either in CacheMemoryContext (the Relation structs) or in the
19 : * memory context given to inv_open (for LargeObjectDesc structs).
20 : *
21 : *
22 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
23 : * Portions Copyright (c) 1994, Regents of the University of California
24 : *
25 : *
26 : * IDENTIFICATION
27 : * src/backend/storage/large_object/inv_api.c
28 : *
29 : *-------------------------------------------------------------------------
30 : */
31 : #include "postgres.h"
32 :
33 : #include <limits.h>
34 :
35 : #include "access/detoast.h"
36 : #include "access/genam.h"
37 : #include "access/htup_details.h"
38 : #include "access/table.h"
39 : #include "access/xact.h"
40 : #include "catalog/dependency.h"
41 : #include "catalog/indexing.h"
42 : #include "catalog/objectaccess.h"
43 : #include "catalog/pg_largeobject.h"
44 : #include "libpq/libpq-fs.h"
45 : #include "miscadmin.h"
46 : #include "storage/large_object.h"
47 : #include "utils/acl.h"
48 : #include "utils/fmgroids.h"
49 : #include "utils/rel.h"
50 : #include "utils/snapmgr.h"
51 :
52 :
53 : /*
54 : * GUC: backwards-compatibility flag to suppress LO permission checks
55 : */
56 : bool lo_compat_privileges;
57 :
58 : /*
59 : * All accesses to pg_largeobject and its index make use of a single
60 : * Relation reference. To guarantee that the relcache entry remains
61 : * in the cache, on the first reference inside a subtransaction, we
62 : * execute a slightly klugy maneuver to assign ownership of the
63 : * Relation reference to TopTransactionResourceOwner.
64 : */
65 : static Relation lo_heap_r = NULL;
66 : static Relation lo_index_r = NULL;
67 :
68 :
69 : /*
70 : * Open pg_largeobject and its index, if not already done in current xact
71 : */
72 : static void
73 3058 : open_lo_relation(void)
74 : {
75 : ResourceOwner currentOwner;
76 :
77 3058 : if (lo_heap_r && lo_index_r)
78 2766 : return; /* already open in current xact */
79 :
80 : /* Arrange for the top xact to own these relation references */
81 292 : currentOwner = CurrentResourceOwner;
82 292 : CurrentResourceOwner = TopTransactionResourceOwner;
83 :
84 : /* Use RowExclusiveLock since we might either read or write */
85 292 : if (lo_heap_r == NULL)
86 292 : lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
87 292 : if (lo_index_r == NULL)
88 292 : lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
89 :
90 292 : CurrentResourceOwner = currentOwner;
91 : }
92 :
93 : /*
94 : * Clean up at main transaction end
95 : */
96 : void
97 442 : close_lo_relation(bool isCommit)
98 : {
99 442 : if (lo_heap_r || lo_index_r)
100 : {
101 : /*
102 : * Only bother to close if committing; else abort cleanup will handle
103 : * it
104 : */
105 292 : if (isCommit)
106 : {
107 : ResourceOwner currentOwner;
108 :
109 210 : currentOwner = CurrentResourceOwner;
110 210 : CurrentResourceOwner = TopTransactionResourceOwner;
111 :
112 210 : if (lo_index_r)
113 210 : index_close(lo_index_r, NoLock);
114 210 : if (lo_heap_r)
115 210 : table_close(lo_heap_r, NoLock);
116 :
117 210 : CurrentResourceOwner = currentOwner;
118 : }
119 292 : lo_heap_r = NULL;
120 292 : lo_index_r = NULL;
121 : }
122 442 : }
123 :
124 :
125 : /*
126 : * Extract data field from a pg_largeobject tuple, detoasting if needed
127 : * and verifying that the length is sane. Returns data pointer (a bytea *),
128 : * data length, and an indication of whether to pfree the data pointer.
129 : */
130 : static void
131 10236 : getdatafield(Form_pg_largeobject tuple,
132 : bytea **pdatafield,
133 : int *plen,
134 : bool *pfreeit)
135 : {
136 : bytea *datafield;
137 : int len;
138 : bool freeit;
139 :
140 10236 : datafield = &(tuple->data); /* see note at top of file */
141 10236 : freeit = false;
142 10236 : if (VARATT_IS_EXTENDED(datafield))
143 : {
144 : datafield = (bytea *)
145 10070 : detoast_attr((struct varlena *) datafield);
146 10070 : freeit = true;
147 : }
148 10236 : len = VARSIZE(datafield) - VARHDRSZ;
149 10236 : if (len < 0 || len > LOBLKSIZE)
150 0 : ereport(ERROR,
151 : (errcode(ERRCODE_DATA_CORRUPTED),
152 : errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
153 : tuple->loid, tuple->pageno, len)));
154 10236 : *pdatafield = datafield;
155 10236 : *plen = len;
156 10236 : *pfreeit = freeit;
157 10236 : }
158 :
159 :
160 : /*
161 : * inv_create -- create a new large object
162 : *
163 : * Arguments:
164 : * lobjId - OID to use for new large object, or InvalidOid to pick one
165 : *
166 : * Returns:
167 : * OID of new object
168 : *
169 : * If lobjId is not InvalidOid, then an error occurs if the OID is already
170 : * in use.
171 : */
172 : Oid
173 112 : inv_create(Oid lobjId)
174 : {
175 : Oid lobjId_new;
176 :
177 : /*
178 : * Create a new largeobject with empty data pages
179 : */
180 112 : lobjId_new = LargeObjectCreate(lobjId);
181 :
182 : /*
183 : * dependency on the owner of largeobject
184 : *
185 : * Note that LO dependencies are recorded using classId
186 : * LargeObjectRelationId for backwards-compatibility reasons. Using
187 : * LargeObjectMetadataRelationId instead would simplify matters for the
188 : * backend, but it'd complicate pg_dump and possibly break other clients.
189 : */
190 112 : recordDependencyOnOwner(LargeObjectRelationId,
191 : lobjId_new, GetUserId());
192 :
193 : /* Post creation hook for new large object */
194 112 : InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
195 :
196 : /*
197 : * Advance command counter to make new tuple visible to later operations.
198 : */
199 112 : CommandCounterIncrement();
200 :
201 112 : return lobjId_new;
202 : }
203 :
204 : /*
205 : * inv_open -- access an existing large object.
206 : *
207 : * Returns a large object descriptor, appropriately filled in.
208 : * The descriptor and subsidiary data are allocated in the specified
209 : * memory context, which must be suitably long-lived for the caller's
210 : * purposes. If the returned descriptor has a snapshot associated
211 : * with it, the caller must ensure that it also lives long enough,
212 : * e.g. by calling RegisterSnapshotOnOwner
213 : */
214 : LargeObjectDesc *
215 464 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
216 : {
217 : LargeObjectDesc *retval;
218 464 : Snapshot snapshot = NULL;
219 464 : int descflags = 0;
220 :
221 : /*
222 : * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
223 : * | INV_READ), the caller being allowed to read the large object
224 : * descriptor in either case.
225 : */
226 464 : if (flags & INV_WRITE)
227 154 : descflags |= IFS_WRLOCK | IFS_RDLOCK;
228 464 : if (flags & INV_READ)
229 340 : descflags |= IFS_RDLOCK;
230 :
231 464 : if (descflags == 0)
232 0 : ereport(ERROR,
233 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
234 : errmsg("invalid flags for opening a large object: %d",
235 : flags)));
236 :
237 : /* Get snapshot. If write is requested, use an instantaneous snapshot. */
238 464 : if (descflags & IFS_WRLOCK)
239 154 : snapshot = NULL;
240 : else
241 310 : snapshot = GetActiveSnapshot();
242 :
243 : /* Can't use LargeObjectExists here because we need to specify snapshot */
244 464 : if (!LargeObjectExistsWithSnapshot(lobjId, snapshot))
245 4 : ereport(ERROR,
246 : (errcode(ERRCODE_UNDEFINED_OBJECT),
247 : errmsg("large object %u does not exist", lobjId)));
248 :
249 : /* Apply permission checks, again specifying snapshot */
250 460 : if ((descflags & IFS_RDLOCK) != 0)
251 : {
252 902 : if (!lo_compat_privileges &&
253 442 : pg_largeobject_aclcheck_snapshot(lobjId,
254 : GetUserId(),
255 : ACL_SELECT,
256 : snapshot) != ACLCHECK_OK)
257 42 : ereport(ERROR,
258 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
259 : errmsg("permission denied for large object %u",
260 : lobjId)));
261 : }
262 418 : if ((descflags & IFS_WRLOCK) != 0)
263 : {
264 248 : if (!lo_compat_privileges &&
265 118 : pg_largeobject_aclcheck_snapshot(lobjId,
266 : GetUserId(),
267 : ACL_UPDATE,
268 : snapshot) != ACLCHECK_OK)
269 12 : ereport(ERROR,
270 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
271 : errmsg("permission denied for large object %u",
272 : lobjId)));
273 : }
274 :
275 : /* OK to create a descriptor */
276 406 : retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
277 : sizeof(LargeObjectDesc));
278 406 : retval->id = lobjId;
279 406 : retval->offset = 0;
280 406 : retval->flags = descflags;
281 :
282 : /* caller sets if needed, not used by the functions in this file */
283 406 : retval->subid = InvalidSubTransactionId;
284 :
285 : /*
286 : * The snapshot (if any) is just the currently active snapshot. The
287 : * caller will replace it with a longer-lived copy if needed.
288 : */
289 406 : retval->snapshot = snapshot;
290 :
291 406 : return retval;
292 : }
293 :
294 : /*
295 : * Closes a large object descriptor previously made by inv_open(), and
296 : * releases the long-term memory used by it.
297 : */
298 : void
299 376 : inv_close(LargeObjectDesc *obj_desc)
300 : {
301 : Assert(PointerIsValid(obj_desc));
302 376 : pfree(obj_desc);
303 376 : }
304 :
305 : /*
306 : * Destroys an existing large object (not to be confused with a descriptor!)
307 : *
308 : * Note we expect caller to have done any required permissions check.
309 : */
310 : int
311 82 : inv_drop(Oid lobjId)
312 : {
313 : ObjectAddress object;
314 :
315 : /*
316 : * Delete any comments and dependencies on the large object
317 : */
318 82 : object.classId = LargeObjectRelationId;
319 82 : object.objectId = lobjId;
320 82 : object.objectSubId = 0;
321 82 : performDeletion(&object, DROP_CASCADE, 0);
322 :
323 : /*
324 : * Advance command counter so that tuple removal will be seen by later
325 : * large-object operations in this transaction.
326 : */
327 82 : CommandCounterIncrement();
328 :
329 : /* For historical reasons, we always return 1 on success. */
330 82 : return 1;
331 : }
332 :
333 : /*
334 : * Determine size of a large object
335 : *
336 : * NOTE: LOs can contain gaps, just like Unix files. We actually return
337 : * the offset of the last byte + 1.
338 : */
339 : static uint64
340 104 : inv_getsize(LargeObjectDesc *obj_desc)
341 : {
342 104 : uint64 lastbyte = 0;
343 : ScanKeyData skey[1];
344 : SysScanDesc sd;
345 : HeapTuple tuple;
346 :
347 : Assert(PointerIsValid(obj_desc));
348 :
349 104 : open_lo_relation();
350 :
351 104 : ScanKeyInit(&skey[0],
352 : Anum_pg_largeobject_loid,
353 : BTEqualStrategyNumber, F_OIDEQ,
354 : ObjectIdGetDatum(obj_desc->id));
355 :
356 104 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
357 : obj_desc->snapshot, 1, skey);
358 :
359 : /*
360 : * Because the pg_largeobject index is on both loid and pageno, but we
361 : * constrain only loid, a backwards scan should visit all pages of the
362 : * large object in reverse pageno order. So, it's sufficient to examine
363 : * the first valid tuple (== last valid page).
364 : */
365 104 : tuple = systable_getnext_ordered(sd, BackwardScanDirection);
366 104 : if (HeapTupleIsValid(tuple))
367 : {
368 : Form_pg_largeobject data;
369 : bytea *datafield;
370 : int len;
371 : bool pfreeit;
372 :
373 96 : if (HeapTupleHasNulls(tuple)) /* paranoia */
374 0 : elog(ERROR, "null field found in pg_largeobject");
375 96 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
376 96 : getdatafield(data, &datafield, &len, &pfreeit);
377 96 : lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
378 96 : if (pfreeit)
379 18 : pfree(datafield);
380 : }
381 :
382 104 : systable_endscan_ordered(sd);
383 :
384 104 : return lastbyte;
385 : }
386 :
387 : int64
388 220 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
389 : {
390 : int64 newoffset;
391 :
392 : Assert(PointerIsValid(obj_desc));
393 :
394 : /*
395 : * We allow seek/tell if you have either read or write permission, so no
396 : * need for a permission check here.
397 : */
398 :
399 : /*
400 : * Note: overflow in the additions is possible, but since we will reject
401 : * negative results, we don't need any extra test for that.
402 : */
403 220 : switch (whence)
404 : {
405 98 : case SEEK_SET:
406 98 : newoffset = offset;
407 98 : break;
408 18 : case SEEK_CUR:
409 18 : newoffset = obj_desc->offset + offset;
410 18 : break;
411 104 : case SEEK_END:
412 104 : newoffset = inv_getsize(obj_desc) + offset;
413 104 : break;
414 0 : default:
415 0 : ereport(ERROR,
416 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 : errmsg("invalid whence setting: %d", whence)));
418 : newoffset = 0; /* keep compiler quiet */
419 : break;
420 : }
421 :
422 : /*
423 : * use errmsg_internal here because we don't want to expose INT64_FORMAT
424 : * in translatable strings; doing better is not worth the trouble
425 : */
426 220 : if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
427 0 : ereport(ERROR,
428 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
429 : errmsg_internal("invalid large object seek target: " INT64_FORMAT,
430 : newoffset)));
431 :
432 220 : obj_desc->offset = newoffset;
433 220 : return newoffset;
434 : }
435 :
436 : int64
437 48 : inv_tell(LargeObjectDesc *obj_desc)
438 : {
439 : Assert(PointerIsValid(obj_desc));
440 :
441 : /*
442 : * We allow seek/tell if you have either read or write permission, so no
443 : * need for a permission check here.
444 : */
445 :
446 48 : return obj_desc->offset;
447 : }
448 :
449 : int
450 1368 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
451 : {
452 1368 : int nread = 0;
453 : int64 n;
454 : int64 off;
455 : int len;
456 1368 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
457 : uint64 pageoff;
458 : ScanKeyData skey[2];
459 : SysScanDesc sd;
460 : HeapTuple tuple;
461 :
462 : Assert(PointerIsValid(obj_desc));
463 : Assert(buf != NULL);
464 :
465 1368 : if ((obj_desc->flags & IFS_RDLOCK) == 0)
466 0 : ereport(ERROR,
467 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
468 : errmsg("permission denied for large object %u",
469 : obj_desc->id)));
470 :
471 1368 : if (nbytes <= 0)
472 8 : return 0;
473 :
474 1360 : open_lo_relation();
475 :
476 1360 : ScanKeyInit(&skey[0],
477 : Anum_pg_largeobject_loid,
478 : BTEqualStrategyNumber, F_OIDEQ,
479 : ObjectIdGetDatum(obj_desc->id));
480 :
481 1360 : ScanKeyInit(&skey[1],
482 : Anum_pg_largeobject_pageno,
483 : BTGreaterEqualStrategyNumber, F_INT4GE,
484 : Int32GetDatum(pageno));
485 :
486 1360 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
487 : obj_desc->snapshot, 2, skey);
488 :
489 10408 : while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
490 : {
491 : Form_pg_largeobject data;
492 : bytea *datafield;
493 : bool pfreeit;
494 :
495 10110 : if (HeapTupleHasNulls(tuple)) /* paranoia */
496 0 : elog(ERROR, "null field found in pg_largeobject");
497 10110 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
498 :
499 : /*
500 : * We expect the indexscan will deliver pages in order. However,
501 : * there may be missing pages if the LO contains unwritten "holes". We
502 : * want missing sections to read out as zeroes.
503 : */
504 10110 : pageoff = ((uint64) data->pageno) * LOBLKSIZE;
505 10110 : if (pageoff > obj_desc->offset)
506 : {
507 12 : n = pageoff - obj_desc->offset;
508 12 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
509 12 : MemSet(buf + nread, 0, n);
510 12 : nread += n;
511 12 : obj_desc->offset += n;
512 : }
513 :
514 10110 : if (nread < nbytes)
515 : {
516 : Assert(obj_desc->offset >= pageoff);
517 10104 : off = (int) (obj_desc->offset - pageoff);
518 : Assert(off >= 0 && off < LOBLKSIZE);
519 :
520 10104 : getdatafield(data, &datafield, &len, &pfreeit);
521 10104 : if (len > off)
522 : {
523 10008 : n = len - off;
524 10008 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
525 10008 : memcpy(buf + nread, VARDATA(datafield) + off, n);
526 10008 : nread += n;
527 10008 : obj_desc->offset += n;
528 : }
529 10104 : if (pfreeit)
530 10028 : pfree(datafield);
531 : }
532 :
533 10110 : if (nread >= nbytes)
534 1062 : break;
535 : }
536 :
537 1360 : systable_endscan_ordered(sd);
538 :
539 1360 : return nread;
540 : }
541 :
542 : int
543 1552 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
544 : {
545 1552 : int nwritten = 0;
546 : int n;
547 : int off;
548 : int len;
549 1552 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
550 : ScanKeyData skey[2];
551 : SysScanDesc sd;
552 : HeapTuple oldtuple;
553 : Form_pg_largeobject olddata;
554 : bool neednextpage;
555 : bytea *datafield;
556 : bool pfreeit;
557 : union
558 : {
559 : bytea hdr;
560 : /* this is to make the union big enough for a LO data chunk: */
561 : char data[LOBLKSIZE + VARHDRSZ];
562 : /* ensure union is aligned well enough: */
563 : int32 align_it;
564 : } workbuf;
565 1552 : char *workb = VARDATA(&workbuf.hdr);
566 : HeapTuple newtup;
567 : Datum values[Natts_pg_largeobject];
568 : bool nulls[Natts_pg_largeobject];
569 : bool replace[Natts_pg_largeobject];
570 : CatalogIndexState indstate;
571 :
572 : Assert(PointerIsValid(obj_desc));
573 : Assert(buf != NULL);
574 :
575 : /* enforce writability because snapshot is probably wrong otherwise */
576 1552 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
577 0 : ereport(ERROR,
578 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
579 : errmsg("permission denied for large object %u",
580 : obj_desc->id)));
581 :
582 1552 : if (nbytes <= 0)
583 0 : return 0;
584 :
585 : /* this addition can't overflow because nbytes is only int32 */
586 1552 : if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
587 0 : ereport(ERROR,
588 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
589 : errmsg("invalid large object write request size: %d",
590 : nbytes)));
591 :
592 1552 : open_lo_relation();
593 :
594 1552 : indstate = CatalogOpenIndexes(lo_heap_r);
595 :
596 1552 : ScanKeyInit(&skey[0],
597 : Anum_pg_largeobject_loid,
598 : BTEqualStrategyNumber, F_OIDEQ,
599 : ObjectIdGetDatum(obj_desc->id));
600 :
601 1552 : ScanKeyInit(&skey[1],
602 : Anum_pg_largeobject_pageno,
603 : BTGreaterEqualStrategyNumber, F_INT4GE,
604 : Int32GetDatum(pageno));
605 :
606 1552 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
607 : obj_desc->snapshot, 2, skey);
608 :
609 1552 : oldtuple = NULL;
610 1552 : olddata = NULL;
611 1552 : neednextpage = true;
612 :
613 9500 : while (nwritten < nbytes)
614 : {
615 : /*
616 : * If possible, get next pre-existing page of the LO. We expect the
617 : * indexscan will deliver these in order --- but there may be holes.
618 : */
619 7948 : if (neednextpage)
620 : {
621 1558 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
622 : {
623 24 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
624 0 : elog(ERROR, "null field found in pg_largeobject");
625 24 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
626 : Assert(olddata->pageno >= pageno);
627 : }
628 1558 : neednextpage = false;
629 : }
630 :
631 : /*
632 : * If we have a pre-existing page, see if it is the page we want to
633 : * write, or a later one.
634 : */
635 7948 : if (olddata != NULL && olddata->pageno == pageno)
636 : {
637 : /*
638 : * Update an existing page with fresh data.
639 : *
640 : * First, load old data into workbuf
641 : */
642 24 : getdatafield(olddata, &datafield, &len, &pfreeit);
643 24 : memcpy(workb, VARDATA(datafield), len);
644 24 : if (pfreeit)
645 18 : pfree(datafield);
646 :
647 : /*
648 : * Fill any hole
649 : */
650 24 : off = (int) (obj_desc->offset % LOBLKSIZE);
651 24 : if (off > len)
652 0 : MemSet(workb + len, 0, off - len);
653 :
654 : /*
655 : * Insert appropriate portion of new data
656 : */
657 24 : n = LOBLKSIZE - off;
658 24 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
659 24 : memcpy(workb + off, buf + nwritten, n);
660 24 : nwritten += n;
661 24 : obj_desc->offset += n;
662 24 : off += n;
663 : /* compute valid length of new page */
664 24 : len = (len >= off) ? len : off;
665 24 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
666 :
667 : /*
668 : * Form and insert updated tuple
669 : */
670 24 : memset(values, 0, sizeof(values));
671 24 : memset(nulls, false, sizeof(nulls));
672 24 : memset(replace, false, sizeof(replace));
673 24 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
674 24 : replace[Anum_pg_largeobject_data - 1] = true;
675 24 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
676 : values, nulls, replace);
677 24 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
678 : indstate);
679 24 : heap_freetuple(newtup);
680 :
681 : /*
682 : * We're done with this old page.
683 : */
684 24 : oldtuple = NULL;
685 24 : olddata = NULL;
686 24 : neednextpage = true;
687 : }
688 : else
689 : {
690 : /*
691 : * Write a brand new page.
692 : *
693 : * First, fill any hole
694 : */
695 7924 : off = (int) (obj_desc->offset % LOBLKSIZE);
696 7924 : if (off > 0)
697 6 : MemSet(workb, 0, off);
698 :
699 : /*
700 : * Insert appropriate portion of new data
701 : */
702 7924 : n = LOBLKSIZE - off;
703 7924 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
704 7924 : memcpy(workb + off, buf + nwritten, n);
705 7924 : nwritten += n;
706 7924 : obj_desc->offset += n;
707 : /* compute valid length of new page */
708 7924 : len = off + n;
709 7924 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
710 :
711 : /*
712 : * Form and insert updated tuple
713 : */
714 7924 : memset(values, 0, sizeof(values));
715 7924 : memset(nulls, false, sizeof(nulls));
716 7924 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
717 7924 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
718 7924 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
719 7924 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
720 7924 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
721 7924 : heap_freetuple(newtup);
722 : }
723 7948 : pageno++;
724 : }
725 :
726 1552 : systable_endscan_ordered(sd);
727 :
728 1552 : CatalogCloseIndexes(indstate);
729 :
730 : /*
731 : * Advance command counter so that my tuple updates will be seen by later
732 : * large-object operations in this transaction.
733 : */
734 1552 : CommandCounterIncrement();
735 :
736 1552 : return nwritten;
737 : }
738 :
739 : void
740 42 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
741 : {
742 42 : int32 pageno = (int32) (len / LOBLKSIZE);
743 : int32 off;
744 : ScanKeyData skey[2];
745 : SysScanDesc sd;
746 : HeapTuple oldtuple;
747 : Form_pg_largeobject olddata;
748 : union
749 : {
750 : bytea hdr;
751 : /* this is to make the union big enough for a LO data chunk: */
752 : char data[LOBLKSIZE + VARHDRSZ];
753 : /* ensure union is aligned well enough: */
754 : int32 align_it;
755 : } workbuf;
756 42 : char *workb = VARDATA(&workbuf.hdr);
757 : HeapTuple newtup;
758 : Datum values[Natts_pg_largeobject];
759 : bool nulls[Natts_pg_largeobject];
760 : bool replace[Natts_pg_largeobject];
761 : CatalogIndexState indstate;
762 :
763 : Assert(PointerIsValid(obj_desc));
764 :
765 : /* enforce writability because snapshot is probably wrong otherwise */
766 42 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
767 0 : ereport(ERROR,
768 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
769 : errmsg("permission denied for large object %u",
770 : obj_desc->id)));
771 :
772 : /*
773 : * use errmsg_internal here because we don't want to expose INT64_FORMAT
774 : * in translatable strings; doing better is not worth the trouble
775 : */
776 42 : if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
777 0 : ereport(ERROR,
778 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
779 : errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
780 : len)));
781 :
782 42 : open_lo_relation();
783 :
784 42 : indstate = CatalogOpenIndexes(lo_heap_r);
785 :
786 : /*
787 : * Set up to find all pages with desired loid and pageno >= target
788 : */
789 42 : ScanKeyInit(&skey[0],
790 : Anum_pg_largeobject_loid,
791 : BTEqualStrategyNumber, F_OIDEQ,
792 : ObjectIdGetDatum(obj_desc->id));
793 :
794 42 : ScanKeyInit(&skey[1],
795 : Anum_pg_largeobject_pageno,
796 : BTGreaterEqualStrategyNumber, F_INT4GE,
797 : Int32GetDatum(pageno));
798 :
799 42 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
800 : obj_desc->snapshot, 2, skey);
801 :
802 : /*
803 : * If possible, get the page the truncation point is in. The truncation
804 : * point may be beyond the end of the LO or in a hole.
805 : */
806 42 : olddata = NULL;
807 42 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
808 : {
809 24 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
810 0 : elog(ERROR, "null field found in pg_largeobject");
811 24 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
812 : Assert(olddata->pageno >= pageno);
813 : }
814 :
815 : /*
816 : * If we found the page of the truncation point we need to truncate the
817 : * data in it. Otherwise if we're in a hole, we need to create a page to
818 : * mark the end of data.
819 : */
820 42 : if (olddata != NULL && olddata->pageno == pageno)
821 12 : {
822 : /* First, load old data into workbuf */
823 : bytea *datafield;
824 : int pagelen;
825 : bool pfreeit;
826 :
827 12 : getdatafield(olddata, &datafield, &pagelen, &pfreeit);
828 12 : memcpy(workb, VARDATA(datafield), pagelen);
829 12 : if (pfreeit)
830 6 : pfree(datafield);
831 :
832 : /*
833 : * Fill any hole
834 : */
835 12 : off = len % LOBLKSIZE;
836 12 : if (off > pagelen)
837 6 : MemSet(workb + pagelen, 0, off - pagelen);
838 :
839 : /* compute length of new page */
840 12 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
841 :
842 : /*
843 : * Form and insert updated tuple
844 : */
845 12 : memset(values, 0, sizeof(values));
846 12 : memset(nulls, false, sizeof(nulls));
847 12 : memset(replace, false, sizeof(replace));
848 12 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
849 12 : replace[Anum_pg_largeobject_data - 1] = true;
850 12 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
851 : values, nulls, replace);
852 12 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
853 : indstate);
854 12 : heap_freetuple(newtup);
855 : }
856 : else
857 : {
858 : /*
859 : * If the first page we found was after the truncation point, we're in
860 : * a hole that we'll fill, but we need to delete the later page
861 : * because the loop below won't visit it again.
862 : */
863 30 : if (olddata != NULL)
864 : {
865 : Assert(olddata->pageno > pageno);
866 12 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
867 : }
868 :
869 : /*
870 : * Write a brand new page.
871 : *
872 : * Fill the hole up to the truncation point
873 : */
874 30 : off = len % LOBLKSIZE;
875 30 : if (off > 0)
876 30 : MemSet(workb, 0, off);
877 :
878 : /* compute length of new page */
879 30 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
880 :
881 : /*
882 : * Form and insert new tuple
883 : */
884 30 : memset(values, 0, sizeof(values));
885 30 : memset(nulls, false, sizeof(nulls));
886 30 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
887 30 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
888 30 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
889 30 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
890 30 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
891 30 : heap_freetuple(newtup);
892 : }
893 :
894 : /*
895 : * Delete any pages after the truncation point. If the initial search
896 : * didn't find a page, then of course there's nothing more to do.
897 : */
898 42 : if (olddata != NULL)
899 : {
900 30 : while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
901 : {
902 6 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
903 : }
904 : }
905 :
906 42 : systable_endscan_ordered(sd);
907 :
908 42 : CatalogCloseIndexes(indstate);
909 :
910 : /*
911 : * Advance command counter so that tuple updates will be seen by later
912 : * large-object operations in this transaction.
913 : */
914 42 : CommandCounterIncrement();
915 42 : }
|