Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * inv_api.c
4 : * routines for manipulating inversion fs large objects. This file
5 : * contains the user-level large object application interface routines.
6 : *
7 : *
8 : * Note: we access pg_largeobject.data using its C struct declaration.
9 : * This is safe because it immediately follows pageno which is an int4 field,
10 : * and therefore the data field will always be 4-byte aligned, even if it
11 : * is in the short 1-byte-header format. We have to detoast it since it's
12 : * quite likely to be in compressed or short format. We also need to check
13 : * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 : *
15 : * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 : * does most of the backend code. We expect that CurrentMemoryContext will
17 : * be a short-lived context. Data that must persist across function calls
18 : * is kept either in CacheMemoryContext (the Relation structs) or in the
19 : * memory context given to inv_open (for LargeObjectDesc structs).
20 : *
21 : *
22 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
23 : * Portions Copyright (c) 1994, Regents of the University of California
24 : *
25 : *
26 : * IDENTIFICATION
27 : * src/backend/storage/large_object/inv_api.c
28 : *
29 : *-------------------------------------------------------------------------
30 : */
31 : #include "postgres.h"
32 :
33 : #include <limits.h>
34 :
35 : #include "access/detoast.h"
36 : #include "access/genam.h"
37 : #include "access/htup_details.h"
38 : #include "access/sysattr.h"
39 : #include "access/table.h"
40 : #include "access/xact.h"
41 : #include "catalog/dependency.h"
42 : #include "catalog/indexing.h"
43 : #include "catalog/objectaccess.h"
44 : #include "catalog/pg_largeobject.h"
45 : #include "catalog/pg_largeobject_metadata.h"
46 : #include "libpq/libpq-fs.h"
47 : #include "miscadmin.h"
48 : #include "storage/large_object.h"
49 : #include "utils/acl.h"
50 : #include "utils/fmgroids.h"
51 : #include "utils/rel.h"
52 : #include "utils/snapmgr.h"
53 :
54 :
55 : /*
56 : * GUC: backwards-compatibility flag to suppress LO permission checks
57 : */
58 : bool lo_compat_privileges;
59 :
60 : /*
61 : * All accesses to pg_largeobject and its index make use of a single
62 : * Relation reference. To guarantee that the relcache entry remains
63 : * in the cache, on the first reference inside a subtransaction, we
64 : * execute a slightly klugy maneuver to assign ownership of the
65 : * Relation reference to TopTransactionResourceOwner.
66 : */
67 : static Relation lo_heap_r = NULL;
68 : static Relation lo_index_r = NULL;
69 :
70 :
71 : /*
72 : * Open pg_largeobject and its index, if not already done in current xact
73 : */
74 : static void
75 3058 : open_lo_relation(void)
76 : {
77 : ResourceOwner currentOwner;
78 :
79 3058 : if (lo_heap_r && lo_index_r)
80 2772 : return; /* already open in current xact */
81 :
82 : /* Arrange for the top xact to own these relation references */
83 286 : currentOwner = CurrentResourceOwner;
84 286 : CurrentResourceOwner = TopTransactionResourceOwner;
85 :
86 : /* Use RowExclusiveLock since we might either read or write */
87 286 : if (lo_heap_r == NULL)
88 286 : lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
89 286 : if (lo_index_r == NULL)
90 286 : lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
91 :
92 286 : CurrentResourceOwner = currentOwner;
93 : }
94 :
95 : /*
96 : * Clean up at main transaction end
97 : */
98 : void
99 440 : close_lo_relation(bool isCommit)
100 : {
101 440 : if (lo_heap_r || lo_index_r)
102 : {
103 : /*
104 : * Only bother to close if committing; else abort cleanup will handle
105 : * it
106 : */
107 286 : if (isCommit)
108 : {
109 : ResourceOwner currentOwner;
110 :
111 210 : currentOwner = CurrentResourceOwner;
112 210 : CurrentResourceOwner = TopTransactionResourceOwner;
113 :
114 210 : if (lo_index_r)
115 210 : index_close(lo_index_r, NoLock);
116 210 : if (lo_heap_r)
117 210 : table_close(lo_heap_r, NoLock);
118 :
119 210 : CurrentResourceOwner = currentOwner;
120 : }
121 286 : lo_heap_r = NULL;
122 286 : lo_index_r = NULL;
123 : }
124 440 : }
125 :
126 :
127 : /*
128 : * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
129 : * read with can be specified.
130 : */
131 : static bool
132 464 : myLargeObjectExists(Oid loid, Snapshot snapshot)
133 : {
134 : Relation pg_lo_meta;
135 : ScanKeyData skey[1];
136 : SysScanDesc sd;
137 : HeapTuple tuple;
138 464 : bool retval = false;
139 :
140 464 : ScanKeyInit(&skey[0],
141 : Anum_pg_largeobject_metadata_oid,
142 : BTEqualStrategyNumber, F_OIDEQ,
143 : ObjectIdGetDatum(loid));
144 :
145 464 : pg_lo_meta = table_open(LargeObjectMetadataRelationId,
146 : AccessShareLock);
147 :
148 464 : sd = systable_beginscan(pg_lo_meta,
149 : LargeObjectMetadataOidIndexId, true,
150 : snapshot, 1, skey);
151 :
152 464 : tuple = systable_getnext(sd);
153 464 : if (HeapTupleIsValid(tuple))
154 460 : retval = true;
155 :
156 464 : systable_endscan(sd);
157 :
158 464 : table_close(pg_lo_meta, AccessShareLock);
159 :
160 464 : return retval;
161 : }
162 :
163 :
164 : /*
165 : * Extract data field from a pg_largeobject tuple, detoasting if needed
166 : * and verifying that the length is sane. Returns data pointer (a bytea *),
167 : * data length, and an indication of whether to pfree the data pointer.
168 : */
169 : static void
170 10236 : getdatafield(Form_pg_largeobject tuple,
171 : bytea **pdatafield,
172 : int *plen,
173 : bool *pfreeit)
174 : {
175 : bytea *datafield;
176 : int len;
177 : bool freeit;
178 :
179 10236 : datafield = &(tuple->data); /* see note at top of file */
180 10236 : freeit = false;
181 10236 : if (VARATT_IS_EXTENDED(datafield))
182 : {
183 : datafield = (bytea *)
184 10070 : detoast_attr((struct varlena *) datafield);
185 10070 : freeit = true;
186 : }
187 10236 : len = VARSIZE(datafield) - VARHDRSZ;
188 10236 : if (len < 0 || len > LOBLKSIZE)
189 0 : ereport(ERROR,
190 : (errcode(ERRCODE_DATA_CORRUPTED),
191 : errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
192 : tuple->loid, tuple->pageno, len)));
193 10236 : *pdatafield = datafield;
194 10236 : *plen = len;
195 10236 : *pfreeit = freeit;
196 10236 : }
197 :
198 :
199 : /*
200 : * inv_create -- create a new large object
201 : *
202 : * Arguments:
203 : * lobjId - OID to use for new large object, or InvalidOid to pick one
204 : *
205 : * Returns:
206 : * OID of new object
207 : *
208 : * If lobjId is not InvalidOid, then an error occurs if the OID is already
209 : * in use.
210 : */
211 : Oid
212 112 : inv_create(Oid lobjId)
213 : {
214 : Oid lobjId_new;
215 :
216 : /*
217 : * Create a new largeobject with empty data pages
218 : */
219 112 : lobjId_new = LargeObjectCreate(lobjId);
220 :
221 : /*
222 : * dependency on the owner of largeobject
223 : *
224 : * The reason why we use LargeObjectRelationId instead of
225 : * LargeObjectMetadataRelationId here is to provide backward compatibility
226 : * to the applications which utilize a knowledge about internal layout of
227 : * system catalogs. OID of pg_largeobject_metadata and loid of
228 : * pg_largeobject are same value, so there are no actual differences here.
229 : */
230 112 : recordDependencyOnOwner(LargeObjectRelationId,
231 : lobjId_new, GetUserId());
232 :
233 : /* Post creation hook for new large object */
234 112 : InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
235 :
236 : /*
237 : * Advance command counter to make new tuple visible to later operations.
238 : */
239 112 : CommandCounterIncrement();
240 :
241 112 : return lobjId_new;
242 : }
243 :
244 : /*
245 : * inv_open -- access an existing large object.
246 : *
247 : * Returns a large object descriptor, appropriately filled in.
248 : * The descriptor and subsidiary data are allocated in the specified
249 : * memory context, which must be suitably long-lived for the caller's
250 : * purposes. If the returned descriptor has a snapshot associated
251 : * with it, the caller must ensure that it also lives long enough,
252 : * e.g. by calling RegisterSnapshotOnOwner
253 : */
254 : LargeObjectDesc *
255 464 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
256 : {
257 : LargeObjectDesc *retval;
258 464 : Snapshot snapshot = NULL;
259 464 : int descflags = 0;
260 :
261 : /*
262 : * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
263 : * | INV_READ), the caller being allowed to read the large object
264 : * descriptor in either case.
265 : */
266 464 : if (flags & INV_WRITE)
267 154 : descflags |= IFS_WRLOCK | IFS_RDLOCK;
268 464 : if (flags & INV_READ)
269 340 : descflags |= IFS_RDLOCK;
270 :
271 464 : if (descflags == 0)
272 0 : ereport(ERROR,
273 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
274 : errmsg("invalid flags for opening a large object: %d",
275 : flags)));
276 :
277 : /* Get snapshot. If write is requested, use an instantaneous snapshot. */
278 464 : if (descflags & IFS_WRLOCK)
279 154 : snapshot = NULL;
280 : else
281 310 : snapshot = GetActiveSnapshot();
282 :
283 : /* Can't use LargeObjectExists here because we need to specify snapshot */
284 464 : if (!myLargeObjectExists(lobjId, snapshot))
285 4 : ereport(ERROR,
286 : (errcode(ERRCODE_UNDEFINED_OBJECT),
287 : errmsg("large object %u does not exist", lobjId)));
288 :
289 : /* Apply permission checks, again specifying snapshot */
290 460 : if ((descflags & IFS_RDLOCK) != 0)
291 : {
292 902 : if (!lo_compat_privileges &&
293 442 : pg_largeobject_aclcheck_snapshot(lobjId,
294 : GetUserId(),
295 : ACL_SELECT,
296 : snapshot) != ACLCHECK_OK)
297 42 : ereport(ERROR,
298 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
299 : errmsg("permission denied for large object %u",
300 : lobjId)));
301 : }
302 418 : if ((descflags & IFS_WRLOCK) != 0)
303 : {
304 248 : if (!lo_compat_privileges &&
305 118 : pg_largeobject_aclcheck_snapshot(lobjId,
306 : GetUserId(),
307 : ACL_UPDATE,
308 : snapshot) != ACLCHECK_OK)
309 12 : ereport(ERROR,
310 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
311 : errmsg("permission denied for large object %u",
312 : lobjId)));
313 : }
314 :
315 : /* OK to create a descriptor */
316 406 : retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
317 : sizeof(LargeObjectDesc));
318 406 : retval->id = lobjId;
319 406 : retval->offset = 0;
320 406 : retval->flags = descflags;
321 :
322 : /* caller sets if needed, not used by the functions in this file */
323 406 : retval->subid = InvalidSubTransactionId;
324 :
325 : /*
326 : * The snapshot (if any) is just the currently active snapshot. The
327 : * caller will replace it with a longer-lived copy if needed.
328 : */
329 406 : retval->snapshot = snapshot;
330 :
331 406 : return retval;
332 : }
333 :
334 : /*
335 : * Closes a large object descriptor previously made by inv_open(), and
336 : * releases the long-term memory used by it.
337 : */
338 : void
339 376 : inv_close(LargeObjectDesc *obj_desc)
340 : {
341 : Assert(PointerIsValid(obj_desc));
342 376 : pfree(obj_desc);
343 376 : }
344 :
345 : /*
346 : * Destroys an existing large object (not to be confused with a descriptor!)
347 : *
348 : * Note we expect caller to have done any required permissions check.
349 : */
350 : int
351 82 : inv_drop(Oid lobjId)
352 : {
353 : ObjectAddress object;
354 :
355 : /*
356 : * Delete any comments and dependencies on the large object
357 : */
358 82 : object.classId = LargeObjectRelationId;
359 82 : object.objectId = lobjId;
360 82 : object.objectSubId = 0;
361 82 : performDeletion(&object, DROP_CASCADE, 0);
362 :
363 : /*
364 : * Advance command counter so that tuple removal will be seen by later
365 : * large-object operations in this transaction.
366 : */
367 82 : CommandCounterIncrement();
368 :
369 : /* For historical reasons, we always return 1 on success. */
370 82 : return 1;
371 : }
372 :
373 : /*
374 : * Determine size of a large object
375 : *
376 : * NOTE: LOs can contain gaps, just like Unix files. We actually return
377 : * the offset of the last byte + 1.
378 : */
379 : static uint64
380 104 : inv_getsize(LargeObjectDesc *obj_desc)
381 : {
382 104 : uint64 lastbyte = 0;
383 : ScanKeyData skey[1];
384 : SysScanDesc sd;
385 : HeapTuple tuple;
386 :
387 : Assert(PointerIsValid(obj_desc));
388 :
389 104 : open_lo_relation();
390 :
391 104 : ScanKeyInit(&skey[0],
392 : Anum_pg_largeobject_loid,
393 : BTEqualStrategyNumber, F_OIDEQ,
394 : ObjectIdGetDatum(obj_desc->id));
395 :
396 104 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
397 : obj_desc->snapshot, 1, skey);
398 :
399 : /*
400 : * Because the pg_largeobject index is on both loid and pageno, but we
401 : * constrain only loid, a backwards scan should visit all pages of the
402 : * large object in reverse pageno order. So, it's sufficient to examine
403 : * the first valid tuple (== last valid page).
404 : */
405 104 : tuple = systable_getnext_ordered(sd, BackwardScanDirection);
406 104 : if (HeapTupleIsValid(tuple))
407 : {
408 : Form_pg_largeobject data;
409 : bytea *datafield;
410 : int len;
411 : bool pfreeit;
412 :
413 96 : if (HeapTupleHasNulls(tuple)) /* paranoia */
414 0 : elog(ERROR, "null field found in pg_largeobject");
415 96 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
416 96 : getdatafield(data, &datafield, &len, &pfreeit);
417 96 : lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
418 96 : if (pfreeit)
419 18 : pfree(datafield);
420 : }
421 :
422 104 : systable_endscan_ordered(sd);
423 :
424 104 : return lastbyte;
425 : }
426 :
427 : int64
428 220 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
429 : {
430 : int64 newoffset;
431 :
432 : Assert(PointerIsValid(obj_desc));
433 :
434 : /*
435 : * We allow seek/tell if you have either read or write permission, so no
436 : * need for a permission check here.
437 : */
438 :
439 : /*
440 : * Note: overflow in the additions is possible, but since we will reject
441 : * negative results, we don't need any extra test for that.
442 : */
443 220 : switch (whence)
444 : {
445 98 : case SEEK_SET:
446 98 : newoffset = offset;
447 98 : break;
448 18 : case SEEK_CUR:
449 18 : newoffset = obj_desc->offset + offset;
450 18 : break;
451 104 : case SEEK_END:
452 104 : newoffset = inv_getsize(obj_desc) + offset;
453 104 : break;
454 0 : default:
455 0 : ereport(ERROR,
456 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
457 : errmsg("invalid whence setting: %d", whence)));
458 : newoffset = 0; /* keep compiler quiet */
459 : break;
460 : }
461 :
462 : /*
463 : * use errmsg_internal here because we don't want to expose INT64_FORMAT
464 : * in translatable strings; doing better is not worth the trouble
465 : */
466 220 : if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
467 0 : ereport(ERROR,
468 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469 : errmsg_internal("invalid large object seek target: " INT64_FORMAT,
470 : newoffset)));
471 :
472 220 : obj_desc->offset = newoffset;
473 220 : return newoffset;
474 : }
475 :
476 : int64
477 48 : inv_tell(LargeObjectDesc *obj_desc)
478 : {
479 : Assert(PointerIsValid(obj_desc));
480 :
481 : /*
482 : * We allow seek/tell if you have either read or write permission, so no
483 : * need for a permission check here.
484 : */
485 :
486 48 : return obj_desc->offset;
487 : }
488 :
489 : int
490 1368 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
491 : {
492 1368 : int nread = 0;
493 : int64 n;
494 : int64 off;
495 : int len;
496 1368 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
497 : uint64 pageoff;
498 : ScanKeyData skey[2];
499 : SysScanDesc sd;
500 : HeapTuple tuple;
501 :
502 : Assert(PointerIsValid(obj_desc));
503 : Assert(buf != NULL);
504 :
505 1368 : if ((obj_desc->flags & IFS_RDLOCK) == 0)
506 0 : ereport(ERROR,
507 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
508 : errmsg("permission denied for large object %u",
509 : obj_desc->id)));
510 :
511 1368 : if (nbytes <= 0)
512 8 : return 0;
513 :
514 1360 : open_lo_relation();
515 :
516 1360 : ScanKeyInit(&skey[0],
517 : Anum_pg_largeobject_loid,
518 : BTEqualStrategyNumber, F_OIDEQ,
519 : ObjectIdGetDatum(obj_desc->id));
520 :
521 1360 : ScanKeyInit(&skey[1],
522 : Anum_pg_largeobject_pageno,
523 : BTGreaterEqualStrategyNumber, F_INT4GE,
524 : Int32GetDatum(pageno));
525 :
526 1360 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
527 : obj_desc->snapshot, 2, skey);
528 :
529 10408 : while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
530 : {
531 : Form_pg_largeobject data;
532 : bytea *datafield;
533 : bool pfreeit;
534 :
535 10110 : if (HeapTupleHasNulls(tuple)) /* paranoia */
536 0 : elog(ERROR, "null field found in pg_largeobject");
537 10110 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
538 :
539 : /*
540 : * We expect the indexscan will deliver pages in order. However,
541 : * there may be missing pages if the LO contains unwritten "holes". We
542 : * want missing sections to read out as zeroes.
543 : */
544 10110 : pageoff = ((uint64) data->pageno) * LOBLKSIZE;
545 10110 : if (pageoff > obj_desc->offset)
546 : {
547 12 : n = pageoff - obj_desc->offset;
548 12 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
549 12 : MemSet(buf + nread, 0, n);
550 12 : nread += n;
551 12 : obj_desc->offset += n;
552 : }
553 :
554 10110 : if (nread < nbytes)
555 : {
556 : Assert(obj_desc->offset >= pageoff);
557 10104 : off = (int) (obj_desc->offset - pageoff);
558 : Assert(off >= 0 && off < LOBLKSIZE);
559 :
560 10104 : getdatafield(data, &datafield, &len, &pfreeit);
561 10104 : if (len > off)
562 : {
563 10008 : n = len - off;
564 10008 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
565 10008 : memcpy(buf + nread, VARDATA(datafield) + off, n);
566 10008 : nread += n;
567 10008 : obj_desc->offset += n;
568 : }
569 10104 : if (pfreeit)
570 10028 : pfree(datafield);
571 : }
572 :
573 10110 : if (nread >= nbytes)
574 1062 : break;
575 : }
576 :
577 1360 : systable_endscan_ordered(sd);
578 :
579 1360 : return nread;
580 : }
581 :
582 : int
583 1552 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
584 : {
585 1552 : int nwritten = 0;
586 : int n;
587 : int off;
588 : int len;
589 1552 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
590 : ScanKeyData skey[2];
591 : SysScanDesc sd;
592 : HeapTuple oldtuple;
593 : Form_pg_largeobject olddata;
594 : bool neednextpage;
595 : bytea *datafield;
596 : bool pfreeit;
597 : union
598 : {
599 : bytea hdr;
600 : /* this is to make the union big enough for a LO data chunk: */
601 : char data[LOBLKSIZE + VARHDRSZ];
602 : /* ensure union is aligned well enough: */
603 : int32 align_it;
604 : } workbuf;
605 1552 : char *workb = VARDATA(&workbuf.hdr);
606 : HeapTuple newtup;
607 : Datum values[Natts_pg_largeobject];
608 : bool nulls[Natts_pg_largeobject];
609 : bool replace[Natts_pg_largeobject];
610 : CatalogIndexState indstate;
611 :
612 : Assert(PointerIsValid(obj_desc));
613 : Assert(buf != NULL);
614 :
615 : /* enforce writability because snapshot is probably wrong otherwise */
616 1552 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
617 0 : ereport(ERROR,
618 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
619 : errmsg("permission denied for large object %u",
620 : obj_desc->id)));
621 :
622 1552 : if (nbytes <= 0)
623 0 : return 0;
624 :
625 : /* this addition can't overflow because nbytes is only int32 */
626 1552 : if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
627 0 : ereport(ERROR,
628 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629 : errmsg("invalid large object write request size: %d",
630 : nbytes)));
631 :
632 1552 : open_lo_relation();
633 :
634 1552 : indstate = CatalogOpenIndexes(lo_heap_r);
635 :
636 1552 : ScanKeyInit(&skey[0],
637 : Anum_pg_largeobject_loid,
638 : BTEqualStrategyNumber, F_OIDEQ,
639 : ObjectIdGetDatum(obj_desc->id));
640 :
641 1552 : ScanKeyInit(&skey[1],
642 : Anum_pg_largeobject_pageno,
643 : BTGreaterEqualStrategyNumber, F_INT4GE,
644 : Int32GetDatum(pageno));
645 :
646 1552 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
647 : obj_desc->snapshot, 2, skey);
648 :
649 1552 : oldtuple = NULL;
650 1552 : olddata = NULL;
651 1552 : neednextpage = true;
652 :
653 9500 : while (nwritten < nbytes)
654 : {
655 : /*
656 : * If possible, get next pre-existing page of the LO. We expect the
657 : * indexscan will deliver these in order --- but there may be holes.
658 : */
659 7948 : if (neednextpage)
660 : {
661 1558 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
662 : {
663 24 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
664 0 : elog(ERROR, "null field found in pg_largeobject");
665 24 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
666 : Assert(olddata->pageno >= pageno);
667 : }
668 1558 : neednextpage = false;
669 : }
670 :
671 : /*
672 : * If we have a pre-existing page, see if it is the page we want to
673 : * write, or a later one.
674 : */
675 7948 : if (olddata != NULL && olddata->pageno == pageno)
676 : {
677 : /*
678 : * Update an existing page with fresh data.
679 : *
680 : * First, load old data into workbuf
681 : */
682 24 : getdatafield(olddata, &datafield, &len, &pfreeit);
683 24 : memcpy(workb, VARDATA(datafield), len);
684 24 : if (pfreeit)
685 18 : pfree(datafield);
686 :
687 : /*
688 : * Fill any hole
689 : */
690 24 : off = (int) (obj_desc->offset % LOBLKSIZE);
691 24 : if (off > len)
692 0 : MemSet(workb + len, 0, off - len);
693 :
694 : /*
695 : * Insert appropriate portion of new data
696 : */
697 24 : n = LOBLKSIZE - off;
698 24 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
699 24 : memcpy(workb + off, buf + nwritten, n);
700 24 : nwritten += n;
701 24 : obj_desc->offset += n;
702 24 : off += n;
703 : /* compute valid length of new page */
704 24 : len = (len >= off) ? len : off;
705 24 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
706 :
707 : /*
708 : * Form and insert updated tuple
709 : */
710 24 : memset(values, 0, sizeof(values));
711 24 : memset(nulls, false, sizeof(nulls));
712 24 : memset(replace, false, sizeof(replace));
713 24 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
714 24 : replace[Anum_pg_largeobject_data - 1] = true;
715 24 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
716 : values, nulls, replace);
717 24 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
718 : indstate);
719 24 : heap_freetuple(newtup);
720 :
721 : /*
722 : * We're done with this old page.
723 : */
724 24 : oldtuple = NULL;
725 24 : olddata = NULL;
726 24 : neednextpage = true;
727 : }
728 : else
729 : {
730 : /*
731 : * Write a brand new page.
732 : *
733 : * First, fill any hole
734 : */
735 7924 : off = (int) (obj_desc->offset % LOBLKSIZE);
736 7924 : if (off > 0)
737 6 : MemSet(workb, 0, off);
738 :
739 : /*
740 : * Insert appropriate portion of new data
741 : */
742 7924 : n = LOBLKSIZE - off;
743 7924 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
744 7924 : memcpy(workb + off, buf + nwritten, n);
745 7924 : nwritten += n;
746 7924 : obj_desc->offset += n;
747 : /* compute valid length of new page */
748 7924 : len = off + n;
749 7924 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
750 :
751 : /*
752 : * Form and insert updated tuple
753 : */
754 7924 : memset(values, 0, sizeof(values));
755 7924 : memset(nulls, false, sizeof(nulls));
756 7924 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
757 7924 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
758 7924 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
759 7924 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
760 7924 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
761 7924 : heap_freetuple(newtup);
762 : }
763 7948 : pageno++;
764 : }
765 :
766 1552 : systable_endscan_ordered(sd);
767 :
768 1552 : CatalogCloseIndexes(indstate);
769 :
770 : /*
771 : * Advance command counter so that my tuple updates will be seen by later
772 : * large-object operations in this transaction.
773 : */
774 1552 : CommandCounterIncrement();
775 :
776 1552 : return nwritten;
777 : }
778 :
779 : void
780 42 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
781 : {
782 42 : int32 pageno = (int32) (len / LOBLKSIZE);
783 : int32 off;
784 : ScanKeyData skey[2];
785 : SysScanDesc sd;
786 : HeapTuple oldtuple;
787 : Form_pg_largeobject olddata;
788 : union
789 : {
790 : bytea hdr;
791 : /* this is to make the union big enough for a LO data chunk: */
792 : char data[LOBLKSIZE + VARHDRSZ];
793 : /* ensure union is aligned well enough: */
794 : int32 align_it;
795 : } workbuf;
796 42 : char *workb = VARDATA(&workbuf.hdr);
797 : HeapTuple newtup;
798 : Datum values[Natts_pg_largeobject];
799 : bool nulls[Natts_pg_largeobject];
800 : bool replace[Natts_pg_largeobject];
801 : CatalogIndexState indstate;
802 :
803 : Assert(PointerIsValid(obj_desc));
804 :
805 : /* enforce writability because snapshot is probably wrong otherwise */
806 42 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
807 0 : ereport(ERROR,
808 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
809 : errmsg("permission denied for large object %u",
810 : obj_desc->id)));
811 :
812 : /*
813 : * use errmsg_internal here because we don't want to expose INT64_FORMAT
814 : * in translatable strings; doing better is not worth the trouble
815 : */
816 42 : if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
817 0 : ereport(ERROR,
818 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
819 : errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
820 : len)));
821 :
822 42 : open_lo_relation();
823 :
824 42 : indstate = CatalogOpenIndexes(lo_heap_r);
825 :
826 : /*
827 : * Set up to find all pages with desired loid and pageno >= target
828 : */
829 42 : ScanKeyInit(&skey[0],
830 : Anum_pg_largeobject_loid,
831 : BTEqualStrategyNumber, F_OIDEQ,
832 : ObjectIdGetDatum(obj_desc->id));
833 :
834 42 : ScanKeyInit(&skey[1],
835 : Anum_pg_largeobject_pageno,
836 : BTGreaterEqualStrategyNumber, F_INT4GE,
837 : Int32GetDatum(pageno));
838 :
839 42 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
840 : obj_desc->snapshot, 2, skey);
841 :
842 : /*
843 : * If possible, get the page the truncation point is in. The truncation
844 : * point may be beyond the end of the LO or in a hole.
845 : */
846 42 : olddata = NULL;
847 42 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
848 : {
849 24 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
850 0 : elog(ERROR, "null field found in pg_largeobject");
851 24 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
852 : Assert(olddata->pageno >= pageno);
853 : }
854 :
855 : /*
856 : * If we found the page of the truncation point we need to truncate the
857 : * data in it. Otherwise if we're in a hole, we need to create a page to
858 : * mark the end of data.
859 : */
860 42 : if (olddata != NULL && olddata->pageno == pageno)
861 12 : {
862 : /* First, load old data into workbuf */
863 : bytea *datafield;
864 : int pagelen;
865 : bool pfreeit;
866 :
867 12 : getdatafield(olddata, &datafield, &pagelen, &pfreeit);
868 12 : memcpy(workb, VARDATA(datafield), pagelen);
869 12 : if (pfreeit)
870 6 : pfree(datafield);
871 :
872 : /*
873 : * Fill any hole
874 : */
875 12 : off = len % LOBLKSIZE;
876 12 : if (off > pagelen)
877 6 : MemSet(workb + pagelen, 0, off - pagelen);
878 :
879 : /* compute length of new page */
880 12 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
881 :
882 : /*
883 : * Form and insert updated tuple
884 : */
885 12 : memset(values, 0, sizeof(values));
886 12 : memset(nulls, false, sizeof(nulls));
887 12 : memset(replace, false, sizeof(replace));
888 12 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
889 12 : replace[Anum_pg_largeobject_data - 1] = true;
890 12 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
891 : values, nulls, replace);
892 12 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
893 : indstate);
894 12 : heap_freetuple(newtup);
895 : }
896 : else
897 : {
898 : /*
899 : * If the first page we found was after the truncation point, we're in
900 : * a hole that we'll fill, but we need to delete the later page
901 : * because the loop below won't visit it again.
902 : */
903 30 : if (olddata != NULL)
904 : {
905 : Assert(olddata->pageno > pageno);
906 12 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
907 : }
908 :
909 : /*
910 : * Write a brand new page.
911 : *
912 : * Fill the hole up to the truncation point
913 : */
914 30 : off = len % LOBLKSIZE;
915 30 : if (off > 0)
916 30 : MemSet(workb, 0, off);
917 :
918 : /* compute length of new page */
919 30 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
920 :
921 : /*
922 : * Form and insert new tuple
923 : */
924 30 : memset(values, 0, sizeof(values));
925 30 : memset(nulls, false, sizeof(nulls));
926 30 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
927 30 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
928 30 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
929 30 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
930 30 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
931 30 : heap_freetuple(newtup);
932 : }
933 :
934 : /*
935 : * Delete any pages after the truncation point. If the initial search
936 : * didn't find a page, then of course there's nothing more to do.
937 : */
938 42 : if (olddata != NULL)
939 : {
940 30 : while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
941 : {
942 6 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
943 : }
944 : }
945 :
946 42 : systable_endscan_ordered(sd);
947 :
948 42 : CatalogCloseIndexes(indstate);
949 :
950 : /*
951 : * Advance command counter so that tuple updates will be seen by later
952 : * large-object operations in this transaction.
953 : */
954 42 : CommandCounterIncrement();
955 42 : }
|