Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * inv_api.c
4 : * routines for manipulating inversion fs large objects. This file
5 : * contains the user-level large object application interface routines.
6 : *
7 : *
8 : * Note: we access pg_largeobject.data using its C struct declaration.
9 : * This is safe because it immediately follows pageno which is an int4 field,
10 : * and therefore the data field will always be 4-byte aligned, even if it
11 : * is in the short 1-byte-header format. We have to detoast it since it's
12 : * quite likely to be in compressed or short format. We also need to check
13 : * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 : *
15 : * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 : * does most of the backend code. We expect that CurrentMemoryContext will
17 : * be a short-lived context. Data that must persist across function calls
18 : * is kept either in CacheMemoryContext (the Relation structs) or in the
19 : * memory context given to inv_open (for LargeObjectDesc structs).
20 : *
21 : *
22 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
23 : * Portions Copyright (c) 1994, Regents of the University of California
24 : *
25 : *
26 : * IDENTIFICATION
27 : * src/backend/storage/large_object/inv_api.c
28 : *
29 : *-------------------------------------------------------------------------
30 : */
31 : #include "postgres.h"
32 :
33 : #include <limits.h>
34 :
35 : #include "access/detoast.h"
36 : #include "access/genam.h"
37 : #include "access/htup_details.h"
38 : #include "access/table.h"
39 : #include "access/xact.h"
40 : #include "catalog/dependency.h"
41 : #include "catalog/indexing.h"
42 : #include "catalog/objectaccess.h"
43 : #include "catalog/pg_largeobject.h"
44 : #include "catalog/pg_largeobject_metadata.h"
45 : #include "libpq/libpq-fs.h"
46 : #include "miscadmin.h"
47 : #include "storage/large_object.h"
48 : #include "utils/acl.h"
49 : #include "utils/fmgroids.h"
50 : #include "utils/rel.h"
51 : #include "utils/snapmgr.h"
52 :
53 :
54 : /*
55 : * GUC: backwards-compatibility flag to suppress LO permission checks
56 : */
57 : bool lo_compat_privileges;
58 :
59 : /*
60 : * All accesses to pg_largeobject and its index make use of a single
61 : * Relation reference. To guarantee that the relcache entry remains
62 : * in the cache, on the first reference inside a subtransaction, we
63 : * execute a slightly klugy maneuver to assign ownership of the
64 : * Relation reference to TopTransactionResourceOwner.
65 : */
66 : static Relation lo_heap_r = NULL;
67 : static Relation lo_index_r = NULL;
68 :
69 :
70 : /*
71 : * Open pg_largeobject and its index, if not already done in current xact
72 : */
73 : static void
74 3058 : open_lo_relation(void)
75 : {
76 : ResourceOwner currentOwner;
77 :
78 3058 : if (lo_heap_r && lo_index_r)
79 2766 : return; /* already open in current xact */
80 :
81 : /* Arrange for the top xact to own these relation references */
82 292 : currentOwner = CurrentResourceOwner;
83 292 : CurrentResourceOwner = TopTransactionResourceOwner;
84 :
85 : /* Use RowExclusiveLock since we might either read or write */
86 292 : if (lo_heap_r == NULL)
87 292 : lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
88 292 : if (lo_index_r == NULL)
89 292 : lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
90 :
91 292 : CurrentResourceOwner = currentOwner;
92 : }
93 :
94 : /*
95 : * Clean up at main transaction end
96 : */
97 : void
98 442 : close_lo_relation(bool isCommit)
99 : {
100 442 : if (lo_heap_r || lo_index_r)
101 : {
102 : /*
103 : * Only bother to close if committing; else abort cleanup will handle
104 : * it
105 : */
106 292 : if (isCommit)
107 : {
108 : ResourceOwner currentOwner;
109 :
110 210 : currentOwner = CurrentResourceOwner;
111 210 : CurrentResourceOwner = TopTransactionResourceOwner;
112 :
113 210 : if (lo_index_r)
114 210 : index_close(lo_index_r, NoLock);
115 210 : if (lo_heap_r)
116 210 : table_close(lo_heap_r, NoLock);
117 :
118 210 : CurrentResourceOwner = currentOwner;
119 : }
120 292 : lo_heap_r = NULL;
121 292 : lo_index_r = NULL;
122 : }
123 442 : }
124 :
125 :
126 : /*
127 : * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
128 : * read with can be specified.
129 : */
130 : static bool
131 464 : myLargeObjectExists(Oid loid, Snapshot snapshot)
132 : {
133 : Relation pg_lo_meta;
134 : ScanKeyData skey[1];
135 : SysScanDesc sd;
136 : HeapTuple tuple;
137 464 : bool retval = false;
138 :
139 464 : ScanKeyInit(&skey[0],
140 : Anum_pg_largeobject_metadata_oid,
141 : BTEqualStrategyNumber, F_OIDEQ,
142 : ObjectIdGetDatum(loid));
143 :
144 464 : pg_lo_meta = table_open(LargeObjectMetadataRelationId,
145 : AccessShareLock);
146 :
147 464 : sd = systable_beginscan(pg_lo_meta,
148 : LargeObjectMetadataOidIndexId, true,
149 : snapshot, 1, skey);
150 :
151 464 : tuple = systable_getnext(sd);
152 464 : if (HeapTupleIsValid(tuple))
153 460 : retval = true;
154 :
155 464 : systable_endscan(sd);
156 :
157 464 : table_close(pg_lo_meta, AccessShareLock);
158 :
159 464 : return retval;
160 : }
161 :
162 :
163 : /*
164 : * Extract data field from a pg_largeobject tuple, detoasting if needed
165 : * and verifying that the length is sane. Returns data pointer (a bytea *),
166 : * data length, and an indication of whether to pfree the data pointer.
167 : */
168 : static void
169 10236 : getdatafield(Form_pg_largeobject tuple,
170 : bytea **pdatafield,
171 : int *plen,
172 : bool *pfreeit)
173 : {
174 : bytea *datafield;
175 : int len;
176 : bool freeit;
177 :
178 10236 : datafield = &(tuple->data); /* see note at top of file */
179 10236 : freeit = false;
180 10236 : if (VARATT_IS_EXTENDED(datafield))
181 : {
182 : datafield = (bytea *)
183 10070 : detoast_attr((struct varlena *) datafield);
184 10070 : freeit = true;
185 : }
186 10236 : len = VARSIZE(datafield) - VARHDRSZ;
187 10236 : if (len < 0 || len > LOBLKSIZE)
188 0 : ereport(ERROR,
189 : (errcode(ERRCODE_DATA_CORRUPTED),
190 : errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
191 : tuple->loid, tuple->pageno, len)));
192 10236 : *pdatafield = datafield;
193 10236 : *plen = len;
194 10236 : *pfreeit = freeit;
195 10236 : }
196 :
197 :
198 : /*
199 : * inv_create -- create a new large object
200 : *
201 : * Arguments:
202 : * lobjId - OID to use for new large object, or InvalidOid to pick one
203 : *
204 : * Returns:
205 : * OID of new object
206 : *
207 : * If lobjId is not InvalidOid, then an error occurs if the OID is already
208 : * in use.
209 : */
210 : Oid
211 112 : inv_create(Oid lobjId)
212 : {
213 : Oid lobjId_new;
214 :
215 : /*
216 : * Create a new largeobject with empty data pages
217 : */
218 112 : lobjId_new = LargeObjectCreate(lobjId);
219 :
220 : /*
221 : * dependency on the owner of largeobject
222 : *
223 : * Note that LO dependencies are recorded using classId
224 : * LargeObjectRelationId for backwards-compatibility reasons. Using
225 : * LargeObjectMetadataRelationId instead would simplify matters for the
226 : * backend, but it'd complicate pg_dump and possibly break other clients.
227 : */
228 112 : recordDependencyOnOwner(LargeObjectRelationId,
229 : lobjId_new, GetUserId());
230 :
231 : /* Post creation hook for new large object */
232 112 : InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
233 :
234 : /*
235 : * Advance command counter to make new tuple visible to later operations.
236 : */
237 112 : CommandCounterIncrement();
238 :
239 112 : return lobjId_new;
240 : }
241 :
242 : /*
243 : * inv_open -- access an existing large object.
244 : *
245 : * Returns a large object descriptor, appropriately filled in.
246 : * The descriptor and subsidiary data are allocated in the specified
247 : * memory context, which must be suitably long-lived for the caller's
248 : * purposes. If the returned descriptor has a snapshot associated
249 : * with it, the caller must ensure that it also lives long enough,
250 : * e.g. by calling RegisterSnapshotOnOwner
251 : */
252 : LargeObjectDesc *
253 464 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
254 : {
255 : LargeObjectDesc *retval;
256 464 : Snapshot snapshot = NULL;
257 464 : int descflags = 0;
258 :
259 : /*
260 : * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
261 : * | INV_READ), the caller being allowed to read the large object
262 : * descriptor in either case.
263 : */
264 464 : if (flags & INV_WRITE)
265 154 : descflags |= IFS_WRLOCK | IFS_RDLOCK;
266 464 : if (flags & INV_READ)
267 340 : descflags |= IFS_RDLOCK;
268 :
269 464 : if (descflags == 0)
270 0 : ereport(ERROR,
271 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
272 : errmsg("invalid flags for opening a large object: %d",
273 : flags)));
274 :
275 : /* Get snapshot. If write is requested, use an instantaneous snapshot. */
276 464 : if (descflags & IFS_WRLOCK)
277 154 : snapshot = NULL;
278 : else
279 310 : snapshot = GetActiveSnapshot();
280 :
281 : /* Can't use LargeObjectExists here because we need to specify snapshot */
282 464 : if (!myLargeObjectExists(lobjId, snapshot))
283 4 : ereport(ERROR,
284 : (errcode(ERRCODE_UNDEFINED_OBJECT),
285 : errmsg("large object %u does not exist", lobjId)));
286 :
287 : /* Apply permission checks, again specifying snapshot */
288 460 : if ((descflags & IFS_RDLOCK) != 0)
289 : {
290 902 : if (!lo_compat_privileges &&
291 442 : pg_largeobject_aclcheck_snapshot(lobjId,
292 : GetUserId(),
293 : ACL_SELECT,
294 : snapshot) != ACLCHECK_OK)
295 42 : ereport(ERROR,
296 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
297 : errmsg("permission denied for large object %u",
298 : lobjId)));
299 : }
300 418 : if ((descflags & IFS_WRLOCK) != 0)
301 : {
302 248 : if (!lo_compat_privileges &&
303 118 : pg_largeobject_aclcheck_snapshot(lobjId,
304 : GetUserId(),
305 : ACL_UPDATE,
306 : snapshot) != ACLCHECK_OK)
307 12 : ereport(ERROR,
308 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
309 : errmsg("permission denied for large object %u",
310 : lobjId)));
311 : }
312 :
313 : /* OK to create a descriptor */
314 406 : retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
315 : sizeof(LargeObjectDesc));
316 406 : retval->id = lobjId;
317 406 : retval->offset = 0;
318 406 : retval->flags = descflags;
319 :
320 : /* caller sets if needed, not used by the functions in this file */
321 406 : retval->subid = InvalidSubTransactionId;
322 :
323 : /*
324 : * The snapshot (if any) is just the currently active snapshot. The
325 : * caller will replace it with a longer-lived copy if needed.
326 : */
327 406 : retval->snapshot = snapshot;
328 :
329 406 : return retval;
330 : }
331 :
332 : /*
333 : * Closes a large object descriptor previously made by inv_open(), and
334 : * releases the long-term memory used by it.
335 : */
336 : void
337 376 : inv_close(LargeObjectDesc *obj_desc)
338 : {
339 : Assert(PointerIsValid(obj_desc));
340 376 : pfree(obj_desc);
341 376 : }
342 :
343 : /*
344 : * Destroys an existing large object (not to be confused with a descriptor!)
345 : *
346 : * Note we expect caller to have done any required permissions check.
347 : */
348 : int
349 82 : inv_drop(Oid lobjId)
350 : {
351 : ObjectAddress object;
352 :
353 : /*
354 : * Delete any comments and dependencies on the large object
355 : */
356 82 : object.classId = LargeObjectRelationId;
357 82 : object.objectId = lobjId;
358 82 : object.objectSubId = 0;
359 82 : performDeletion(&object, DROP_CASCADE, 0);
360 :
361 : /*
362 : * Advance command counter so that tuple removal will be seen by later
363 : * large-object operations in this transaction.
364 : */
365 82 : CommandCounterIncrement();
366 :
367 : /* For historical reasons, we always return 1 on success. */
368 82 : return 1;
369 : }
370 :
371 : /*
372 : * Determine size of a large object
373 : *
374 : * NOTE: LOs can contain gaps, just like Unix files. We actually return
375 : * the offset of the last byte + 1.
376 : */
377 : static uint64
378 104 : inv_getsize(LargeObjectDesc *obj_desc)
379 : {
380 104 : uint64 lastbyte = 0;
381 : ScanKeyData skey[1];
382 : SysScanDesc sd;
383 : HeapTuple tuple;
384 :
385 : Assert(PointerIsValid(obj_desc));
386 :
387 104 : open_lo_relation();
388 :
389 104 : ScanKeyInit(&skey[0],
390 : Anum_pg_largeobject_loid,
391 : BTEqualStrategyNumber, F_OIDEQ,
392 : ObjectIdGetDatum(obj_desc->id));
393 :
394 104 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
395 : obj_desc->snapshot, 1, skey);
396 :
397 : /*
398 : * Because the pg_largeobject index is on both loid and pageno, but we
399 : * constrain only loid, a backwards scan should visit all pages of the
400 : * large object in reverse pageno order. So, it's sufficient to examine
401 : * the first valid tuple (== last valid page).
402 : */
403 104 : tuple = systable_getnext_ordered(sd, BackwardScanDirection);
404 104 : if (HeapTupleIsValid(tuple))
405 : {
406 : Form_pg_largeobject data;
407 : bytea *datafield;
408 : int len;
409 : bool pfreeit;
410 :
411 96 : if (HeapTupleHasNulls(tuple)) /* paranoia */
412 0 : elog(ERROR, "null field found in pg_largeobject");
413 96 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
414 96 : getdatafield(data, &datafield, &len, &pfreeit);
415 96 : lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
416 96 : if (pfreeit)
417 18 : pfree(datafield);
418 : }
419 :
420 104 : systable_endscan_ordered(sd);
421 :
422 104 : return lastbyte;
423 : }
424 :
425 : int64
426 220 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
427 : {
428 : int64 newoffset;
429 :
430 : Assert(PointerIsValid(obj_desc));
431 :
432 : /*
433 : * We allow seek/tell if you have either read or write permission, so no
434 : * need for a permission check here.
435 : */
436 :
437 : /*
438 : * Note: overflow in the additions is possible, but since we will reject
439 : * negative results, we don't need any extra test for that.
440 : */
441 220 : switch (whence)
442 : {
443 98 : case SEEK_SET:
444 98 : newoffset = offset;
445 98 : break;
446 18 : case SEEK_CUR:
447 18 : newoffset = obj_desc->offset + offset;
448 18 : break;
449 104 : case SEEK_END:
450 104 : newoffset = inv_getsize(obj_desc) + offset;
451 104 : break;
452 0 : default:
453 0 : ereport(ERROR,
454 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455 : errmsg("invalid whence setting: %d", whence)));
456 : newoffset = 0; /* keep compiler quiet */
457 : break;
458 : }
459 :
460 : /*
461 : * use errmsg_internal here because we don't want to expose INT64_FORMAT
462 : * in translatable strings; doing better is not worth the trouble
463 : */
464 220 : if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
465 0 : ereport(ERROR,
466 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
467 : errmsg_internal("invalid large object seek target: " INT64_FORMAT,
468 : newoffset)));
469 :
470 220 : obj_desc->offset = newoffset;
471 220 : return newoffset;
472 : }
473 :
474 : int64
475 48 : inv_tell(LargeObjectDesc *obj_desc)
476 : {
477 : Assert(PointerIsValid(obj_desc));
478 :
479 : /*
480 : * We allow seek/tell if you have either read or write permission, so no
481 : * need for a permission check here.
482 : */
483 :
484 48 : return obj_desc->offset;
485 : }
486 :
487 : int
488 1368 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
489 : {
490 1368 : int nread = 0;
491 : int64 n;
492 : int64 off;
493 : int len;
494 1368 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
495 : uint64 pageoff;
496 : ScanKeyData skey[2];
497 : SysScanDesc sd;
498 : HeapTuple tuple;
499 :
500 : Assert(PointerIsValid(obj_desc));
501 : Assert(buf != NULL);
502 :
503 1368 : if ((obj_desc->flags & IFS_RDLOCK) == 0)
504 0 : ereport(ERROR,
505 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
506 : errmsg("permission denied for large object %u",
507 : obj_desc->id)));
508 :
509 1368 : if (nbytes <= 0)
510 8 : return 0;
511 :
512 1360 : open_lo_relation();
513 :
514 1360 : ScanKeyInit(&skey[0],
515 : Anum_pg_largeobject_loid,
516 : BTEqualStrategyNumber, F_OIDEQ,
517 : ObjectIdGetDatum(obj_desc->id));
518 :
519 1360 : ScanKeyInit(&skey[1],
520 : Anum_pg_largeobject_pageno,
521 : BTGreaterEqualStrategyNumber, F_INT4GE,
522 : Int32GetDatum(pageno));
523 :
524 1360 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
525 : obj_desc->snapshot, 2, skey);
526 :
527 10408 : while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
528 : {
529 : Form_pg_largeobject data;
530 : bytea *datafield;
531 : bool pfreeit;
532 :
533 10110 : if (HeapTupleHasNulls(tuple)) /* paranoia */
534 0 : elog(ERROR, "null field found in pg_largeobject");
535 10110 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
536 :
537 : /*
538 : * We expect the indexscan will deliver pages in order. However,
539 : * there may be missing pages if the LO contains unwritten "holes". We
540 : * want missing sections to read out as zeroes.
541 : */
542 10110 : pageoff = ((uint64) data->pageno) * LOBLKSIZE;
543 10110 : if (pageoff > obj_desc->offset)
544 : {
545 12 : n = pageoff - obj_desc->offset;
546 12 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
547 12 : MemSet(buf + nread, 0, n);
548 12 : nread += n;
549 12 : obj_desc->offset += n;
550 : }
551 :
552 10110 : if (nread < nbytes)
553 : {
554 : Assert(obj_desc->offset >= pageoff);
555 10104 : off = (int) (obj_desc->offset - pageoff);
556 : Assert(off >= 0 && off < LOBLKSIZE);
557 :
558 10104 : getdatafield(data, &datafield, &len, &pfreeit);
559 10104 : if (len > off)
560 : {
561 10008 : n = len - off;
562 10008 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
563 10008 : memcpy(buf + nread, VARDATA(datafield) + off, n);
564 10008 : nread += n;
565 10008 : obj_desc->offset += n;
566 : }
567 10104 : if (pfreeit)
568 10028 : pfree(datafield);
569 : }
570 :
571 10110 : if (nread >= nbytes)
572 1062 : break;
573 : }
574 :
575 1360 : systable_endscan_ordered(sd);
576 :
577 1360 : return nread;
578 : }
579 :
580 : int
581 1552 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
582 : {
583 1552 : int nwritten = 0;
584 : int n;
585 : int off;
586 : int len;
587 1552 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
588 : ScanKeyData skey[2];
589 : SysScanDesc sd;
590 : HeapTuple oldtuple;
591 : Form_pg_largeobject olddata;
592 : bool neednextpage;
593 : bytea *datafield;
594 : bool pfreeit;
595 : union
596 : {
597 : bytea hdr;
598 : /* this is to make the union big enough for a LO data chunk: */
599 : char data[LOBLKSIZE + VARHDRSZ];
600 : /* ensure union is aligned well enough: */
601 : int32 align_it;
602 : } workbuf;
603 1552 : char *workb = VARDATA(&workbuf.hdr);
604 : HeapTuple newtup;
605 : Datum values[Natts_pg_largeobject];
606 : bool nulls[Natts_pg_largeobject];
607 : bool replace[Natts_pg_largeobject];
608 : CatalogIndexState indstate;
609 :
610 : Assert(PointerIsValid(obj_desc));
611 : Assert(buf != NULL);
612 :
613 : /* enforce writability because snapshot is probably wrong otherwise */
614 1552 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
615 0 : ereport(ERROR,
616 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
617 : errmsg("permission denied for large object %u",
618 : obj_desc->id)));
619 :
620 1552 : if (nbytes <= 0)
621 0 : return 0;
622 :
623 : /* this addition can't overflow because nbytes is only int32 */
624 1552 : if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
625 0 : ereport(ERROR,
626 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
627 : errmsg("invalid large object write request size: %d",
628 : nbytes)));
629 :
630 1552 : open_lo_relation();
631 :
632 1552 : indstate = CatalogOpenIndexes(lo_heap_r);
633 :
634 1552 : ScanKeyInit(&skey[0],
635 : Anum_pg_largeobject_loid,
636 : BTEqualStrategyNumber, F_OIDEQ,
637 : ObjectIdGetDatum(obj_desc->id));
638 :
639 1552 : ScanKeyInit(&skey[1],
640 : Anum_pg_largeobject_pageno,
641 : BTGreaterEqualStrategyNumber, F_INT4GE,
642 : Int32GetDatum(pageno));
643 :
644 1552 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
645 : obj_desc->snapshot, 2, skey);
646 :
647 1552 : oldtuple = NULL;
648 1552 : olddata = NULL;
649 1552 : neednextpage = true;
650 :
651 9500 : while (nwritten < nbytes)
652 : {
653 : /*
654 : * If possible, get next pre-existing page of the LO. We expect the
655 : * indexscan will deliver these in order --- but there may be holes.
656 : */
657 7948 : if (neednextpage)
658 : {
659 1558 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
660 : {
661 24 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
662 0 : elog(ERROR, "null field found in pg_largeobject");
663 24 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
664 : Assert(olddata->pageno >= pageno);
665 : }
666 1558 : neednextpage = false;
667 : }
668 :
669 : /*
670 : * If we have a pre-existing page, see if it is the page we want to
671 : * write, or a later one.
672 : */
673 7948 : if (olddata != NULL && olddata->pageno == pageno)
674 : {
675 : /*
676 : * Update an existing page with fresh data.
677 : *
678 : * First, load old data into workbuf
679 : */
680 24 : getdatafield(olddata, &datafield, &len, &pfreeit);
681 24 : memcpy(workb, VARDATA(datafield), len);
682 24 : if (pfreeit)
683 18 : pfree(datafield);
684 :
685 : /*
686 : * Fill any hole
687 : */
688 24 : off = (int) (obj_desc->offset % LOBLKSIZE);
689 24 : if (off > len)
690 0 : MemSet(workb + len, 0, off - len);
691 :
692 : /*
693 : * Insert appropriate portion of new data
694 : */
695 24 : n = LOBLKSIZE - off;
696 24 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
697 24 : memcpy(workb + off, buf + nwritten, n);
698 24 : nwritten += n;
699 24 : obj_desc->offset += n;
700 24 : off += n;
701 : /* compute valid length of new page */
702 24 : len = (len >= off) ? len : off;
703 24 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
704 :
705 : /*
706 : * Form and insert updated tuple
707 : */
708 24 : memset(values, 0, sizeof(values));
709 24 : memset(nulls, false, sizeof(nulls));
710 24 : memset(replace, false, sizeof(replace));
711 24 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
712 24 : replace[Anum_pg_largeobject_data - 1] = true;
713 24 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
714 : values, nulls, replace);
715 24 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
716 : indstate);
717 24 : heap_freetuple(newtup);
718 :
719 : /*
720 : * We're done with this old page.
721 : */
722 24 : oldtuple = NULL;
723 24 : olddata = NULL;
724 24 : neednextpage = true;
725 : }
726 : else
727 : {
728 : /*
729 : * Write a brand new page.
730 : *
731 : * First, fill any hole
732 : */
733 7924 : off = (int) (obj_desc->offset % LOBLKSIZE);
734 7924 : if (off > 0)
735 6 : MemSet(workb, 0, off);
736 :
737 : /*
738 : * Insert appropriate portion of new data
739 : */
740 7924 : n = LOBLKSIZE - off;
741 7924 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
742 7924 : memcpy(workb + off, buf + nwritten, n);
743 7924 : nwritten += n;
744 7924 : obj_desc->offset += n;
745 : /* compute valid length of new page */
746 7924 : len = off + n;
747 7924 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
748 :
749 : /*
750 : * Form and insert updated tuple
751 : */
752 7924 : memset(values, 0, sizeof(values));
753 7924 : memset(nulls, false, sizeof(nulls));
754 7924 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
755 7924 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
756 7924 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
757 7924 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
758 7924 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
759 7924 : heap_freetuple(newtup);
760 : }
761 7948 : pageno++;
762 : }
763 :
764 1552 : systable_endscan_ordered(sd);
765 :
766 1552 : CatalogCloseIndexes(indstate);
767 :
768 : /*
769 : * Advance command counter so that my tuple updates will be seen by later
770 : * large-object operations in this transaction.
771 : */
772 1552 : CommandCounterIncrement();
773 :
774 1552 : return nwritten;
775 : }
776 :
777 : void
778 42 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
779 : {
780 42 : int32 pageno = (int32) (len / LOBLKSIZE);
781 : int32 off;
782 : ScanKeyData skey[2];
783 : SysScanDesc sd;
784 : HeapTuple oldtuple;
785 : Form_pg_largeobject olddata;
786 : union
787 : {
788 : bytea hdr;
789 : /* this is to make the union big enough for a LO data chunk: */
790 : char data[LOBLKSIZE + VARHDRSZ];
791 : /* ensure union is aligned well enough: */
792 : int32 align_it;
793 : } workbuf;
794 42 : char *workb = VARDATA(&workbuf.hdr);
795 : HeapTuple newtup;
796 : Datum values[Natts_pg_largeobject];
797 : bool nulls[Natts_pg_largeobject];
798 : bool replace[Natts_pg_largeobject];
799 : CatalogIndexState indstate;
800 :
801 : Assert(PointerIsValid(obj_desc));
802 :
803 : /* enforce writability because snapshot is probably wrong otherwise */
804 42 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
805 0 : ereport(ERROR,
806 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
807 : errmsg("permission denied for large object %u",
808 : obj_desc->id)));
809 :
810 : /*
811 : * use errmsg_internal here because we don't want to expose INT64_FORMAT
812 : * in translatable strings; doing better is not worth the trouble
813 : */
814 42 : if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
815 0 : ereport(ERROR,
816 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
817 : errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
818 : len)));
819 :
820 42 : open_lo_relation();
821 :
822 42 : indstate = CatalogOpenIndexes(lo_heap_r);
823 :
824 : /*
825 : * Set up to find all pages with desired loid and pageno >= target
826 : */
827 42 : ScanKeyInit(&skey[0],
828 : Anum_pg_largeobject_loid,
829 : BTEqualStrategyNumber, F_OIDEQ,
830 : ObjectIdGetDatum(obj_desc->id));
831 :
832 42 : ScanKeyInit(&skey[1],
833 : Anum_pg_largeobject_pageno,
834 : BTGreaterEqualStrategyNumber, F_INT4GE,
835 : Int32GetDatum(pageno));
836 :
837 42 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
838 : obj_desc->snapshot, 2, skey);
839 :
840 : /*
841 : * If possible, get the page the truncation point is in. The truncation
842 : * point may be beyond the end of the LO or in a hole.
843 : */
844 42 : olddata = NULL;
845 42 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
846 : {
847 24 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
848 0 : elog(ERROR, "null field found in pg_largeobject");
849 24 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
850 : Assert(olddata->pageno >= pageno);
851 : }
852 :
853 : /*
854 : * If we found the page of the truncation point we need to truncate the
855 : * data in it. Otherwise if we're in a hole, we need to create a page to
856 : * mark the end of data.
857 : */
858 42 : if (olddata != NULL && olddata->pageno == pageno)
859 12 : {
860 : /* First, load old data into workbuf */
861 : bytea *datafield;
862 : int pagelen;
863 : bool pfreeit;
864 :
865 12 : getdatafield(olddata, &datafield, &pagelen, &pfreeit);
866 12 : memcpy(workb, VARDATA(datafield), pagelen);
867 12 : if (pfreeit)
868 6 : pfree(datafield);
869 :
870 : /*
871 : * Fill any hole
872 : */
873 12 : off = len % LOBLKSIZE;
874 12 : if (off > pagelen)
875 6 : MemSet(workb + pagelen, 0, off - pagelen);
876 :
877 : /* compute length of new page */
878 12 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
879 :
880 : /*
881 : * Form and insert updated tuple
882 : */
883 12 : memset(values, 0, sizeof(values));
884 12 : memset(nulls, false, sizeof(nulls));
885 12 : memset(replace, false, sizeof(replace));
886 12 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
887 12 : replace[Anum_pg_largeobject_data - 1] = true;
888 12 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
889 : values, nulls, replace);
890 12 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
891 : indstate);
892 12 : heap_freetuple(newtup);
893 : }
894 : else
895 : {
896 : /*
897 : * If the first page we found was after the truncation point, we're in
898 : * a hole that we'll fill, but we need to delete the later page
899 : * because the loop below won't visit it again.
900 : */
901 30 : if (olddata != NULL)
902 : {
903 : Assert(olddata->pageno > pageno);
904 12 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
905 : }
906 :
907 : /*
908 : * Write a brand new page.
909 : *
910 : * Fill the hole up to the truncation point
911 : */
912 30 : off = len % LOBLKSIZE;
913 30 : if (off > 0)
914 30 : MemSet(workb, 0, off);
915 :
916 : /* compute length of new page */
917 30 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
918 :
919 : /*
920 : * Form and insert new tuple
921 : */
922 30 : memset(values, 0, sizeof(values));
923 30 : memset(nulls, false, sizeof(nulls));
924 30 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
925 30 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
926 30 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
927 30 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
928 30 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
929 30 : heap_freetuple(newtup);
930 : }
931 :
932 : /*
933 : * Delete any pages after the truncation point. If the initial search
934 : * didn't find a page, then of course there's nothing more to do.
935 : */
936 42 : if (olddata != NULL)
937 : {
938 30 : while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
939 : {
940 6 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
941 : }
942 : }
943 :
944 42 : systable_endscan_ordered(sd);
945 :
946 42 : CatalogCloseIndexes(indstate);
947 :
948 : /*
949 : * Advance command counter so that tuple updates will be seen by later
950 : * large-object operations in this transaction.
951 : */
952 42 : CommandCounterIncrement();
953 42 : }
|