Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgdoinsert.c
4 : * implementation of insert algorithm
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgdoinsert.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "access/genam.h"
19 : #include "access/spgist_private.h"
20 : #include "access/spgxlog.h"
21 : #include "access/xloginsert.h"
22 : #include "common/int.h"
23 : #include "common/pg_prng.h"
24 : #include "miscadmin.h"
25 : #include "storage/bufmgr.h"
26 : #include "utils/rel.h"
27 :
28 :
29 : /*
30 : * SPPageDesc tracks all info about a page we are inserting into. In some
31 : * situations it actually identifies a tuple, or even a specific node within
32 : * an inner tuple. But any of the fields can be invalid. If the buffer
33 : * field is valid, it implies we hold pin and exclusive lock on that buffer.
34 : * page pointer should be valid exactly when buffer is.
35 : */
36 : typedef struct SPPageDesc
37 : {
38 : BlockNumber blkno; /* block number, or InvalidBlockNumber */
39 : Buffer buffer; /* page's buffer number, or InvalidBuffer */
40 : Page page; /* pointer to page buffer, or NULL */
41 : OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
42 : int node; /* node number within inner tuple, or -1 */
43 : } SPPageDesc;
44 :
45 :
46 : /*
47 : * Set the item pointer in the nodeN'th entry in inner tuple tup. This
48 : * is used to update the parent inner tuple's downlink after a move or
49 : * split operation.
50 : */
51 : void
52 12008 : spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
53 : BlockNumber blkno, OffsetNumber offset)
54 : {
55 : int i;
56 : SpGistNodeTuple node;
57 :
58 61420 : SGITITERATE(tup, i, node)
59 : {
60 61420 : if (i == nodeN)
61 : {
62 12008 : ItemPointerSet(&node->t_tid, blkno, offset);
63 12008 : return;
64 : }
65 : }
66 :
67 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
68 : nodeN);
69 : }
70 :
71 : /*
72 : * Form a new inner tuple containing one more node than the given one, with
73 : * the specified label datum, inserted at offset "offset" in the node array.
74 : * The new tuple's prefix is the same as the old one's.
75 : *
76 : * Note that the new node initially has an invalid downlink. We'll find a
77 : * page to point it to later.
78 : */
79 : static SpGistInnerTuple
80 1598 : addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
81 : {
82 : SpGistNodeTuple node,
83 : *nodes;
84 : int i;
85 :
86 : /* if offset is negative, insert at end */
87 1598 : if (offset < 0)
88 0 : offset = tuple->nNodes;
89 1598 : else if (offset > tuple->nNodes)
90 0 : elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
91 :
92 1598 : nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
93 10286 : SGITITERATE(tuple, i, node)
94 : {
95 8688 : if (i < offset)
96 7444 : nodes[i] = node;
97 : else
98 1244 : nodes[i + 1] = node;
99 : }
100 :
101 1598 : nodes[offset] = spgFormNodeTuple(state, label, false);
102 :
103 1598 : return spgFormInnerTuple(state,
104 1598 : (tuple->prefixSize > 0),
105 754 : SGITDATUM(tuple, state),
106 1598 : tuple->nNodes + 1,
107 : nodes);
108 : }
109 :
110 : /* qsort comparator for sorting OffsetNumbers */
111 : static int
112 5941642 : cmpOffsetNumbers(const void *a, const void *b)
113 : {
114 5941642 : return pg_cmp_u16(*(const OffsetNumber *) a, *(const OffsetNumber *) b);
115 : }
116 :
117 : /*
118 : * Delete multiple tuples from an index page, preserving tuple offset numbers.
119 : *
120 : * The first tuple in the given list is replaced with a dead tuple of type
121 : * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 : * with dead tuples of type "reststate". If either firststate or reststate
123 : * is REDIRECT, blkno/offnum specify where to link to.
124 : *
125 : * NB: this is used during WAL replay, so beware of trying to make it too
126 : * smart. In particular, it shouldn't use "state" except for calling
127 : * spgFormDeadTuple(). This is also used in a critical section, so no
128 : * pallocs either!
129 : */
130 : void
131 9298 : spgPageIndexMultiDelete(SpGistState *state, Page page,
132 : OffsetNumber *itemnos, int nitems,
133 : int firststate, int reststate,
134 : BlockNumber blkno, OffsetNumber offnum)
135 : {
136 : OffsetNumber firstItem;
137 : OffsetNumber sortednos[MaxIndexTuplesPerPage];
138 9298 : SpGistDeadTuple tuple = NULL;
139 : int i;
140 :
141 9298 : if (nitems == 0)
142 424 : return; /* nothing to do */
143 :
144 : /*
145 : * For efficiency we want to use PageIndexMultiDelete, which requires the
146 : * targets to be listed in sorted order, so we have to sort the itemnos
147 : * array. (This also greatly simplifies the math for reinserting the
148 : * replacement tuples.) However, we must not scribble on the caller's
149 : * array, so we have to make a copy.
150 : */
151 8874 : memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 8874 : if (nitems > 1)
153 8504 : qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 :
155 8874 : PageIndexMultiDelete(page, sortednos, nitems);
156 :
157 8874 : firstItem = itemnos[0];
158 :
159 855350 : for (i = 0; i < nitems; i++)
160 : {
161 846476 : OffsetNumber itemno = sortednos[i];
162 : int tupstate;
163 :
164 846476 : tupstate = (itemno == firstItem) ? firststate : reststate;
165 846476 : if (tuple == NULL || tuple->tupstate != tupstate)
166 13562 : tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 :
168 846476 : if (PageAddItem(page, (Item) tuple, tuple->size,
169 : itemno, false, false) != itemno)
170 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
171 : tuple->size);
172 :
173 846476 : if (tupstate == SPGIST_REDIRECT)
174 2426 : SpGistPageGetOpaque(page)->nRedirection++;
175 844050 : else if (tupstate == SPGIST_PLACEHOLDER)
176 844050 : SpGistPageGetOpaque(page)->nPlaceholder++;
177 : }
178 : }
179 :
180 : /*
181 : * Update the parent inner tuple's downlink, and mark the parent buffer
182 : * dirty (this must be the last change to the parent page in the current
183 : * WAL action).
184 : */
185 : static void
186 9936 : saveNodeLink(Relation index, SPPageDesc *parent,
187 : BlockNumber blkno, OffsetNumber offnum)
188 : {
189 : SpGistInnerTuple innerTuple;
190 :
191 9936 : innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192 9936 : PageGetItemId(parent->page, parent->offnum));
193 :
194 9936 : spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195 :
196 9936 : MarkBufferDirty(parent->buffer);
197 9936 : }
198 :
199 : /*
200 : * Add a leaf tuple to a leaf page where there is known to be room for it
201 : */
202 : static void
203 797976 : addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
204 : SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205 : {
206 : spgxlogAddLeaf xlrec;
207 :
208 797976 : xlrec.newPage = isNew;
209 797976 : xlrec.storesNulls = isNulls;
210 :
211 : /* these will be filled below as needed */
212 797976 : xlrec.offnumLeaf = InvalidOffsetNumber;
213 797976 : xlrec.offnumHeadLeaf = InvalidOffsetNumber;
214 797976 : xlrec.offnumParent = InvalidOffsetNumber;
215 797976 : xlrec.nodeI = 0;
216 :
217 797976 : START_CRIT_SECTION();
218 :
219 797976 : if (current->offnum == InvalidOffsetNumber ||
220 795924 : SpGistBlockIsRoot(current->blkno))
221 : {
222 : /* Tuple is not part of a chain */
223 19020 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
224 38040 : current->offnum = SpGistPageAddNewItem(state, current->page,
225 19020 : (Item) leafTuple, leafTuple->size,
226 : NULL, false);
227 :
228 19020 : xlrec.offnumLeaf = current->offnum;
229 :
230 : /* Must update parent's downlink if any */
231 19020 : if (parent->buffer != InvalidBuffer)
232 : {
233 2052 : xlrec.offnumParent = parent->offnum;
234 2052 : xlrec.nodeI = parent->node;
235 :
236 2052 : saveNodeLink(index, parent, current->blkno, current->offnum);
237 : }
238 : }
239 : else
240 : {
241 : /*
242 : * Tuple must be inserted into existing chain. We mustn't change the
243 : * chain's head address, but we don't need to chase the entire chain
244 : * to put the tuple at the end; we can insert it second.
245 : *
246 : * Also, it's possible that the "chain" consists only of a DEAD tuple,
247 : * in which case we should replace the DEAD tuple in-place.
248 : */
249 : SpGistLeafTuple head;
250 : OffsetNumber offnum;
251 :
252 778956 : head = (SpGistLeafTuple) PageGetItem(current->page,
253 778956 : PageGetItemId(current->page, current->offnum));
254 778956 : if (head->tupstate == SPGIST_LIVE)
255 : {
256 778956 : SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
257 778956 : offnum = SpGistPageAddNewItem(state, current->page,
258 778956 : (Item) leafTuple, leafTuple->size,
259 : NULL, false);
260 :
261 : /*
262 : * re-get head of list because it could have been moved on page,
263 : * and set new second element
264 : */
265 778956 : head = (SpGistLeafTuple) PageGetItem(current->page,
266 778956 : PageGetItemId(current->page, current->offnum));
267 778956 : SGLT_SET_NEXTOFFSET(head, offnum);
268 :
269 778956 : xlrec.offnumLeaf = offnum;
270 778956 : xlrec.offnumHeadLeaf = current->offnum;
271 : }
272 0 : else if (head->tupstate == SPGIST_DEAD)
273 : {
274 0 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
275 0 : PageIndexTupleDelete(current->page, current->offnum);
276 0 : if (PageAddItem(current->page,
277 : (Item) leafTuple, leafTuple->size,
278 0 : current->offnum, false, false) != current->offnum)
279 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
280 : leafTuple->size);
281 :
282 : /* WAL replay distinguishes this case by equal offnums */
283 0 : xlrec.offnumLeaf = current->offnum;
284 0 : xlrec.offnumHeadLeaf = current->offnum;
285 : }
286 : else
287 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288 : }
289 :
290 797976 : MarkBufferDirty(current->buffer);
291 :
292 797976 : if (RelationNeedsWAL(index) && !state->isBuild)
293 : {
294 : XLogRecPtr recptr;
295 : int flags;
296 :
297 240668 : XLogBeginInsert();
298 240668 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
299 240668 : XLogRegisterData((char *) leafTuple, leafTuple->size);
300 :
301 240668 : flags = REGBUF_STANDARD;
302 240668 : if (xlrec.newPage)
303 0 : flags |= REGBUF_WILL_INIT;
304 240668 : XLogRegisterBuffer(0, current->buffer, flags);
305 240668 : if (xlrec.offnumParent != InvalidOffsetNumber)
306 830 : XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
307 :
308 240668 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309 :
310 240668 : PageSetLSN(current->page, recptr);
311 :
312 : /* update parent only if we actually changed it */
313 240668 : if (xlrec.offnumParent != InvalidOffsetNumber)
314 : {
315 830 : PageSetLSN(parent->page, recptr);
316 : }
317 : }
318 :
319 797976 : END_CRIT_SECTION();
320 797976 : }
321 :
322 : /*
323 : * Count the number and total size of leaf tuples in the chain starting at
324 : * current->offnum. Return number into *nToSplit and total size as function
325 : * result.
326 : *
327 : * Klugy special case when considering the root page (i.e., root is a leaf
328 : * page, but we're about to split for the first time): return fake large
329 : * values to force spgdoinsert() to take the doPickSplit rather than
330 : * moveLeafs code path. moveLeafs is not prepared to deal with root page.
331 : */
332 : static int
333 7960 : checkSplitConditions(Relation index, SpGistState *state,
334 : SPPageDesc *current, int *nToSplit)
335 : {
336 : int i,
337 7960 : n = 0,
338 7960 : totalSize = 0;
339 :
340 7960 : if (SpGistBlockIsRoot(current->blkno))
341 : {
342 : /* return impossible values to force split */
343 82 : *nToSplit = BLCKSZ;
344 82 : return BLCKSZ;
345 : }
346 :
347 7878 : i = current->offnum;
348 806346 : while (i != InvalidOffsetNumber)
349 : {
350 : SpGistLeafTuple it;
351 :
352 : Assert(i >= FirstOffsetNumber &&
353 : i <= PageGetMaxOffsetNumber(current->page));
354 798468 : it = (SpGistLeafTuple) PageGetItem(current->page,
355 : PageGetItemId(current->page, i));
356 798468 : if (it->tupstate == SPGIST_LIVE)
357 : {
358 798468 : n++;
359 798468 : totalSize += it->size + sizeof(ItemIdData);
360 : }
361 0 : else if (it->tupstate == SPGIST_DEAD)
362 : {
363 : /* We could see a DEAD tuple as first/only chain item */
364 : Assert(i == current->offnum);
365 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
366 : /* Don't count it in result, because it won't go to other page */
367 : }
368 : else
369 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370 :
371 798468 : i = SGLT_GET_NEXTOFFSET(it);
372 : }
373 :
374 7878 : *nToSplit = n;
375 :
376 7878 : return totalSize;
377 : }
378 :
379 : /*
380 : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381 : * but the chain has to be moved because there's not enough room to add
382 : * newLeafTuple to its page. We use this method when the chain contains
383 : * very little data so a split would be inefficient. We are sure we can
384 : * fit the chain plus newLeafTuple on one other page.
385 : */
386 : static void
387 2238 : moveLeafs(Relation index, SpGistState *state,
388 : SPPageDesc *current, SPPageDesc *parent,
389 : SpGistLeafTuple newLeafTuple, bool isNulls)
390 : {
391 : int i,
392 : nDelete,
393 : nInsert,
394 : size;
395 : Buffer nbuf;
396 : Page npage;
397 2238 : OffsetNumber r = InvalidOffsetNumber,
398 2238 : startOffset = InvalidOffsetNumber;
399 2238 : bool replaceDead = false;
400 : OffsetNumber *toDelete;
401 : OffsetNumber *toInsert;
402 : BlockNumber nblkno;
403 : spgxlogMoveLeafs xlrec;
404 : char *leafdata,
405 : *leafptr;
406 :
407 : /* This doesn't work on root page */
408 : Assert(parent->buffer != InvalidBuffer);
409 : Assert(parent->buffer != current->buffer);
410 :
411 : /* Locate the tuples to be moved, and count up the space needed */
412 2238 : i = PageGetMaxOffsetNumber(current->page);
413 2238 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
414 2238 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
415 :
416 2238 : size = newLeafTuple->size + sizeof(ItemIdData);
417 :
418 2238 : nDelete = 0;
419 2238 : i = current->offnum;
420 90922 : while (i != InvalidOffsetNumber)
421 : {
422 : SpGistLeafTuple it;
423 :
424 : Assert(i >= FirstOffsetNumber &&
425 : i <= PageGetMaxOffsetNumber(current->page));
426 88684 : it = (SpGistLeafTuple) PageGetItem(current->page,
427 : PageGetItemId(current->page, i));
428 :
429 88684 : if (it->tupstate == SPGIST_LIVE)
430 : {
431 88684 : toDelete[nDelete] = i;
432 88684 : size += it->size + sizeof(ItemIdData);
433 88684 : nDelete++;
434 : }
435 0 : else if (it->tupstate == SPGIST_DEAD)
436 : {
437 : /* We could see a DEAD tuple as first/only chain item */
438 : Assert(i == current->offnum);
439 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
440 : /* We don't want to move it, so don't count it in size */
441 0 : toDelete[nDelete] = i;
442 0 : nDelete++;
443 0 : replaceDead = true;
444 : }
445 : else
446 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
447 :
448 88684 : i = SGLT_GET_NEXTOFFSET(it);
449 : }
450 :
451 : /* Find a leaf page that will hold them */
452 2238 : nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
453 : size, &xlrec.newPage);
454 2238 : npage = BufferGetPage(nbuf);
455 2238 : nblkno = BufferGetBlockNumber(nbuf);
456 : Assert(nblkno != current->blkno);
457 :
458 2238 : leafdata = leafptr = palloc(size);
459 :
460 2238 : START_CRIT_SECTION();
461 :
462 : /* copy all the old tuples to new page, unless they're dead */
463 2238 : nInsert = 0;
464 2238 : if (!replaceDead)
465 : {
466 90922 : for (i = 0; i < nDelete; i++)
467 : {
468 : SpGistLeafTuple it;
469 :
470 88684 : it = (SpGistLeafTuple) PageGetItem(current->page,
471 88684 : PageGetItemId(current->page, toDelete[i]));
472 : Assert(it->tupstate == SPGIST_LIVE);
473 :
474 : /*
475 : * Update chain link (notice the chain order gets reversed, but we
476 : * don't care). We're modifying the tuple on the source page
477 : * here, but it's okay since we're about to delete it.
478 : */
479 88684 : SGLT_SET_NEXTOFFSET(it, r);
480 :
481 88684 : r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
482 : &startOffset, false);
483 :
484 88684 : toInsert[nInsert] = r;
485 88684 : nInsert++;
486 :
487 : /* save modified tuple into leafdata as well */
488 88684 : memcpy(leafptr, it, it->size);
489 88684 : leafptr += it->size;
490 : }
491 : }
492 :
493 : /* add the new tuple as well */
494 2238 : SGLT_SET_NEXTOFFSET(newLeafTuple, r);
495 2238 : r = SpGistPageAddNewItem(state, npage,
496 2238 : (Item) newLeafTuple, newLeafTuple->size,
497 : &startOffset, false);
498 2238 : toInsert[nInsert] = r;
499 2238 : nInsert++;
500 2238 : memcpy(leafptr, newLeafTuple, newLeafTuple->size);
501 2238 : leafptr += newLeafTuple->size;
502 :
503 : /*
504 : * Now delete the old tuples, leaving a redirection pointer behind for the
505 : * first one, unless we're doing an index build; in which case there can't
506 : * be any concurrent scan so we need not provide a redirect.
507 : */
508 2238 : spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
509 2238 : state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
510 : SPGIST_PLACEHOLDER,
511 : nblkno, r);
512 :
513 : /* Update parent's downlink and mark parent page dirty */
514 2238 : saveNodeLink(index, parent, nblkno, r);
515 :
516 : /* Mark the leaf pages too */
517 2238 : MarkBufferDirty(current->buffer);
518 2238 : MarkBufferDirty(nbuf);
519 :
520 2238 : if (RelationNeedsWAL(index) && !state->isBuild)
521 : {
522 : XLogRecPtr recptr;
523 :
524 : /* prepare WAL info */
525 520 : STORE_STATE(state, xlrec.stateSrc);
526 :
527 520 : xlrec.nMoves = nDelete;
528 520 : xlrec.replaceDead = replaceDead;
529 520 : xlrec.storesNulls = isNulls;
530 :
531 520 : xlrec.offnumParent = parent->offnum;
532 520 : xlrec.nodeI = parent->node;
533 :
534 520 : XLogBeginInsert();
535 520 : XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs);
536 520 : XLogRegisterData((char *) toDelete,
537 : sizeof(OffsetNumber) * nDelete);
538 520 : XLogRegisterData((char *) toInsert,
539 : sizeof(OffsetNumber) * nInsert);
540 520 : XLogRegisterData((char *) leafdata, leafptr - leafdata);
541 :
542 520 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
543 520 : XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
544 520 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
545 :
546 520 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
547 :
548 520 : PageSetLSN(current->page, recptr);
549 520 : PageSetLSN(npage, recptr);
550 520 : PageSetLSN(parent->page, recptr);
551 : }
552 :
553 2238 : END_CRIT_SECTION();
554 :
555 : /* Update local free-space cache and release new buffer */
556 2238 : SpGistSetLastUsedPage(index, nbuf);
557 2238 : UnlockReleaseBuffer(nbuf);
558 2238 : }
559 :
560 : /*
561 : * Update previously-created redirection tuple with appropriate destination
562 : *
563 : * We use this when it's not convenient to know the destination first.
564 : * The tuple should have been made with the "impossible" destination of
565 : * the metapage.
566 : */
567 : static void
568 1330 : setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
569 : BlockNumber blkno, OffsetNumber offnum)
570 : {
571 : SpGistDeadTuple dt;
572 :
573 1330 : dt = (SpGistDeadTuple) PageGetItem(current->page,
574 : PageGetItemId(current->page, position));
575 : Assert(dt->tupstate == SPGIST_REDIRECT);
576 : Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
577 1330 : ItemPointerSet(&dt->pointer, blkno, offnum);
578 1330 : }
579 :
580 : /*
581 : * Test to see if the user-defined picksplit function failed to do its job,
582 : * ie, it put all the leaf tuples into the same node.
583 : * If so, randomly divide the tuples into several nodes (all with the same
584 : * label) and return true to select allTheSame mode for this inner tuple.
585 : *
586 : * (This code is also used to forcibly select allTheSame mode for nulls.)
587 : *
588 : * If we know that the leaf tuples wouldn't all fit on one page, then we
589 : * exclude the last tuple (which is the incoming new tuple that forced a split)
590 : * from the check to see if more than one node is used. The reason for this
591 : * is that if the existing tuples are put into only one chain, then even if
592 : * we move them all to an empty page, there would still not be room for the
593 : * new tuple, so we'd get into an infinite loop of picksplit attempts.
594 : * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
595 : * be split across pages. (Exercise for the reader: figure out why this
596 : * fixes the problem even when there is only one old tuple.)
597 : */
598 : static bool
599 5722 : checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
600 : bool *includeNew)
601 : {
602 : int theNode;
603 : int limit;
604 : int i;
605 :
606 : /* For the moment, assume we can include the new leaf tuple */
607 5722 : *includeNew = true;
608 :
609 : /* If there's only the new leaf tuple, don't select allTheSame mode */
610 5722 : if (in->nTuples <= 1)
611 44 : return false;
612 :
613 : /* If tuple set doesn't fit on one page, ignore the new tuple in test */
614 5678 : limit = tooBig ? in->nTuples - 1 : in->nTuples;
615 :
616 : /* Check to see if more than one node is populated */
617 5678 : theNode = out->mapTuplesToNodes[0];
618 125760 : for (i = 1; i < limit; i++)
619 : {
620 125456 : if (out->mapTuplesToNodes[i] != theNode)
621 5374 : return false;
622 : }
623 :
624 : /* Nope, so override the picksplit function's decisions */
625 :
626 : /* If the new tuple is in its own node, it can't be included in split */
627 304 : if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
628 0 : *includeNew = false;
629 :
630 304 : out->nNodes = 8; /* arbitrary number of child nodes */
631 :
632 : /* Random assignment of tuples to nodes (note we include new tuple) */
633 31912 : for (i = 0; i < in->nTuples; i++)
634 31608 : out->mapTuplesToNodes[i] = i % out->nNodes;
635 :
636 : /* The opclass may not use node labels, but if it does, duplicate 'em */
637 304 : if (out->nodeLabels)
638 : {
639 48 : Datum theLabel = out->nodeLabels[theNode];
640 :
641 48 : out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
642 432 : for (i = 0; i < out->nNodes; i++)
643 384 : out->nodeLabels[i] = theLabel;
644 : }
645 :
646 : /* We don't touch the prefix or the leaf tuple datum assignments */
647 :
648 304 : return true;
649 : }
650 :
651 : /*
652 : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
653 : * but the chain has to be split because there's not enough room to add
654 : * newLeafTuple to its page.
655 : *
656 : * This function splits the leaf tuple set according to picksplit's rules,
657 : * creating one or more new chains that are spread across the current page
658 : * and an additional leaf page (we assume that two leaf pages will be
659 : * sufficient). A new inner tuple is created, and the parent downlink
660 : * pointer is updated to point to that inner tuple instead of the leaf chain.
661 : *
662 : * On exit, current contains the address of the new inner tuple.
663 : *
664 : * Returns true if we successfully inserted newLeafTuple during this function,
665 : * false if caller still has to do it (meaning another picksplit operation is
666 : * probably needed). Failure could occur if the picksplit result is fairly
667 : * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
668 : * Because we force the picksplit result to be at least two chains, each
669 : * cycle will get rid of at least one leaf tuple from the chain, so the loop
670 : * will eventually terminate if lack of balance is the issue. If the tuple
671 : * is too big, we assume that repeated picksplit operations will eventually
672 : * make it small enough by repeated prefix-stripping. A broken opclass could
673 : * make this an infinite loop, though, so spgdoinsert() checks that the
674 : * leaf datums get smaller each time.
675 : */
676 : static bool
677 5722 : doPickSplit(Relation index, SpGistState *state,
678 : SPPageDesc *current, SPPageDesc *parent,
679 : SpGistLeafTuple newLeafTuple,
680 : int level, bool isNulls, bool isNew)
681 : {
682 5722 : bool insertedNew = false;
683 : spgPickSplitIn in;
684 : spgPickSplitOut out;
685 : FmgrInfo *procinfo;
686 : bool includeNew;
687 : int i,
688 : max,
689 : n;
690 : SpGistInnerTuple innerTuple;
691 : SpGistNodeTuple node,
692 : *nodes;
693 : Buffer newInnerBuffer,
694 : newLeafBuffer;
695 : uint8 *leafPageSelect;
696 : int *leafSizes;
697 : OffsetNumber *toDelete;
698 : OffsetNumber *toInsert;
699 5722 : OffsetNumber redirectTuplePos = InvalidOffsetNumber;
700 : OffsetNumber startOffsets[2];
701 : SpGistLeafTuple *oldLeafs;
702 : SpGistLeafTuple *newLeafs;
703 : Datum leafDatums[INDEX_MAX_KEYS];
704 : bool leafIsnulls[INDEX_MAX_KEYS];
705 : int spaceToDelete;
706 : int currentFreeSpace;
707 : int totalLeafSizes;
708 : bool allTheSame;
709 : spgxlogPickSplit xlrec;
710 : char *leafdata,
711 : *leafptr;
712 : SPPageDesc saveCurrent;
713 : int nToDelete,
714 : nToInsert,
715 : maxToInclude;
716 :
717 5722 : in.level = level;
718 :
719 : /*
720 : * Allocate per-leaf-tuple work arrays with max possible size
721 : */
722 5722 : max = PageGetMaxOffsetNumber(current->page);
723 5722 : n = max + 1;
724 5722 : in.datums = (Datum *) palloc(sizeof(Datum) * n);
725 5722 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
726 5722 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
727 5722 : oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
728 5722 : newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
729 5722 : leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
730 :
731 5722 : STORE_STATE(state, xlrec.stateSrc);
732 :
733 : /*
734 : * Form list of leaf tuples which will be distributed as split result;
735 : * also, count up the amount of space that will be freed from current.
736 : * (Note that in the non-root case, we won't actually delete the old
737 : * tuples, only replace them with redirects or placeholders.)
738 : */
739 5722 : nToInsert = 0;
740 5722 : nToDelete = 0;
741 5722 : spaceToDelete = 0;
742 5722 : if (SpGistBlockIsRoot(current->blkno))
743 : {
744 : /*
745 : * We are splitting the root (which up to now is also a leaf page).
746 : * Its tuples are not linked, so scan sequentially to get them all. We
747 : * ignore the original value of current->offnum.
748 : */
749 15418 : for (i = FirstOffsetNumber; i <= max; i++)
750 : {
751 : SpGistLeafTuple it;
752 :
753 15336 : it = (SpGistLeafTuple) PageGetItem(current->page,
754 : PageGetItemId(current->page, i));
755 15336 : if (it->tupstate == SPGIST_LIVE)
756 : {
757 15336 : in.datums[nToInsert] =
758 15336 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
759 15336 : oldLeafs[nToInsert] = it;
760 15336 : nToInsert++;
761 15336 : toDelete[nToDelete] = i;
762 15336 : nToDelete++;
763 : /* we will delete the tuple altogether, so count full space */
764 15336 : spaceToDelete += it->size + sizeof(ItemIdData);
765 : }
766 : else /* tuples on root should be live */
767 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
768 : }
769 : }
770 : else
771 : {
772 : /* Normal case, just collect the leaf tuples in the chain */
773 5640 : i = current->offnum;
774 715424 : while (i != InvalidOffsetNumber)
775 : {
776 : SpGistLeafTuple it;
777 :
778 : Assert(i >= FirstOffsetNumber && i <= max);
779 709784 : it = (SpGistLeafTuple) PageGetItem(current->page,
780 : PageGetItemId(current->page, i));
781 709784 : if (it->tupstate == SPGIST_LIVE)
782 : {
783 709784 : in.datums[nToInsert] =
784 709784 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
785 709784 : oldLeafs[nToInsert] = it;
786 709784 : nToInsert++;
787 709784 : toDelete[nToDelete] = i;
788 709784 : nToDelete++;
789 : /* we will not delete the tuple, only replace with dead */
790 : Assert(it->size >= SGDTSIZE);
791 709784 : spaceToDelete += it->size - SGDTSIZE;
792 : }
793 0 : else if (it->tupstate == SPGIST_DEAD)
794 : {
795 : /* We could see a DEAD tuple as first/only chain item */
796 : Assert(i == current->offnum);
797 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
798 0 : toDelete[nToDelete] = i;
799 0 : nToDelete++;
800 : /* replacing it with redirect will save no space */
801 : }
802 : else
803 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
804 :
805 709784 : i = SGLT_GET_NEXTOFFSET(it);
806 : }
807 : }
808 5722 : in.nTuples = nToInsert;
809 :
810 : /*
811 : * We may not actually insert new tuple because another picksplit may be
812 : * necessary due to too large value, but we will try to allocate enough
813 : * space to include it; and in any case it has to be included in the input
814 : * for the picksplit function. So don't increment nToInsert yet.
815 : */
816 5722 : in.datums[in.nTuples] =
817 5722 : isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
818 5722 : oldLeafs[in.nTuples] = newLeafTuple;
819 5722 : in.nTuples++;
820 :
821 5722 : memset(&out, 0, sizeof(out));
822 :
823 5722 : if (!isNulls)
824 : {
825 : /*
826 : * Perform split using user-defined method.
827 : */
828 5722 : procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
829 5722 : FunctionCall2Coll(procinfo,
830 5722 : index->rd_indcollation[0],
831 : PointerGetDatum(&in),
832 : PointerGetDatum(&out));
833 :
834 : /*
835 : * Form new leaf tuples and count up the total space needed.
836 : */
837 5722 : totalLeafSizes = 0;
838 736564 : for (i = 0; i < in.nTuples; i++)
839 : {
840 730842 : if (state->leafTupDesc->natts > 1)
841 62408 : spgDeformLeafTuple(oldLeafs[i],
842 : state->leafTupDesc,
843 : leafDatums,
844 : leafIsnulls,
845 : isNulls);
846 :
847 730842 : leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
848 730842 : leafIsnulls[spgKeyColumn] = false;
849 :
850 730842 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
851 : leafDatums,
852 : leafIsnulls);
853 730842 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
854 : }
855 : }
856 : else
857 : {
858 : /*
859 : * Perform dummy split that puts all tuples into one node.
860 : * checkAllTheSame will override this and force allTheSame mode.
861 : */
862 0 : out.hasPrefix = false;
863 0 : out.nNodes = 1;
864 0 : out.nodeLabels = NULL;
865 0 : out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
866 :
867 : /*
868 : * Form new leaf tuples and count up the total space needed.
869 : */
870 0 : totalLeafSizes = 0;
871 0 : for (i = 0; i < in.nTuples; i++)
872 : {
873 0 : if (state->leafTupDesc->natts > 1)
874 0 : spgDeformLeafTuple(oldLeafs[i],
875 : state->leafTupDesc,
876 : leafDatums,
877 : leafIsnulls,
878 : isNulls);
879 :
880 : /*
881 : * Nulls tree can contain only null key values.
882 : */
883 0 : leafDatums[spgKeyColumn] = (Datum) 0;
884 0 : leafIsnulls[spgKeyColumn] = true;
885 :
886 0 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
887 : leafDatums,
888 : leafIsnulls);
889 0 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
890 : }
891 : }
892 :
893 : /*
894 : * Check to see if the picksplit function failed to separate the values,
895 : * ie, it put them all into the same child node. If so, select allTheSame
896 : * mode and create a random split instead. See comments for
897 : * checkAllTheSame as to why we need to know if the new leaf tuples could
898 : * fit on one page.
899 : */
900 5722 : allTheSame = checkAllTheSame(&in, &out,
901 : totalLeafSizes > SPGIST_PAGE_CAPACITY,
902 : &includeNew);
903 :
904 : /*
905 : * If checkAllTheSame decided we must exclude the new tuple, don't
906 : * consider it any further.
907 : */
908 5722 : if (includeNew)
909 5722 : maxToInclude = in.nTuples;
910 : else
911 : {
912 0 : maxToInclude = in.nTuples - 1;
913 0 : totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
914 : }
915 :
916 : /*
917 : * Allocate per-node work arrays. Since checkAllTheSame could replace
918 : * out.nNodes with a value larger than the number of tuples on the input
919 : * page, we can't allocate these arrays before here.
920 : */
921 5722 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
922 5722 : leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
923 :
924 : /*
925 : * Form nodes of inner tuple and inner tuple itself
926 : */
927 44292 : for (i = 0; i < out.nNodes; i++)
928 : {
929 38570 : Datum label = (Datum) 0;
930 38570 : bool labelisnull = (out.nodeLabels == NULL);
931 :
932 38570 : if (!labelisnull)
933 3554 : label = out.nodeLabels[i];
934 38570 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
935 : }
936 5722 : innerTuple = spgFormInnerTuple(state,
937 5722 : out.hasPrefix, out.prefixDatum,
938 : out.nNodes, nodes);
939 5722 : innerTuple->allTheSame = allTheSame;
940 :
941 : /*
942 : * Update nodes[] array to point into the newly formed innerTuple, so that
943 : * we can adjust their downlinks below.
944 : */
945 44292 : SGITITERATE(innerTuple, i, node)
946 : {
947 38570 : nodes[i] = node;
948 : }
949 :
950 : /*
951 : * Re-scan new leaf tuples and count up the space needed under each node.
952 : */
953 736564 : for (i = 0; i < maxToInclude; i++)
954 : {
955 730842 : n = out.mapTuplesToNodes[i];
956 730842 : if (n < 0 || n >= out.nNodes)
957 0 : elog(ERROR, "inconsistent result of SPGiST picksplit function");
958 730842 : leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
959 : }
960 :
961 : /*
962 : * To perform the split, we must insert a new inner tuple, which can't go
963 : * on a leaf page; and unless we are splitting the root page, we must then
964 : * update the parent tuple's downlink to point to the inner tuple. If
965 : * there is room, we'll put the new inner tuple on the same page as the
966 : * parent tuple, otherwise we need another non-leaf buffer. But if the
967 : * parent page is the root, we can't add the new inner tuple there,
968 : * because the root page must have only one inner tuple.
969 : */
970 5722 : xlrec.initInner = false;
971 5722 : if (parent->buffer != InvalidBuffer &&
972 5640 : !SpGistBlockIsRoot(parent->blkno) &&
973 5276 : (SpGistPageGetFreeSpace(parent->page, 1) >=
974 5276 : innerTuple->size + sizeof(ItemIdData)))
975 : {
976 : /* New inner tuple will fit on parent page */
977 4868 : newInnerBuffer = parent->buffer;
978 : }
979 854 : else if (parent->buffer != InvalidBuffer)
980 : {
981 : /* Send tuple to page with next triple parity (see README) */
982 1544 : newInnerBuffer = SpGistGetBuffer(index,
983 772 : GBUF_INNER_PARITY(parent->blkno + 1) |
984 772 : (isNulls ? GBUF_NULLS : 0),
985 772 : innerTuple->size + sizeof(ItemIdData),
986 : &xlrec.initInner);
987 : }
988 : else
989 : {
990 : /* Root page split ... inner tuple will go to root page */
991 82 : newInnerBuffer = InvalidBuffer;
992 : }
993 :
994 : /*
995 : * The new leaf tuples converted from the existing ones should require the
996 : * same or less space, and therefore should all fit onto one page
997 : * (although that's not necessarily the current page, since we can't
998 : * delete the old tuples but only replace them with placeholders).
999 : * However, the incoming new tuple might not also fit, in which case we
1000 : * might need another picksplit cycle to reduce it some more.
1001 : *
1002 : * If there's not room to put everything back onto the current page, then
1003 : * we decide on a per-node basis which tuples go to the new page. (We do
1004 : * it like that because leaf tuple chains can't cross pages, so we must
1005 : * place all leaf tuples belonging to the same parent node on the same
1006 : * page.)
1007 : *
1008 : * If we are splitting the root page (turning it from a leaf page into an
1009 : * inner page), then no leaf tuples can go back to the current page; they
1010 : * must all go somewhere else.
1011 : */
1012 5722 : if (!SpGistBlockIsRoot(current->blkno))
1013 5640 : currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1014 : else
1015 82 : currentFreeSpace = 0; /* prevent assigning any tuples to current */
1016 :
1017 5722 : xlrec.initDest = false;
1018 :
1019 5722 : if (totalLeafSizes <= currentFreeSpace)
1020 : {
1021 : /* All the leaf tuples will fit on current page */
1022 24 : newLeafBuffer = InvalidBuffer;
1023 : /* mark new leaf tuple as included in insertions, if allowed */
1024 24 : if (includeNew)
1025 : {
1026 24 : nToInsert++;
1027 24 : insertedNew = true;
1028 : }
1029 2970 : for (i = 0; i < nToInsert; i++)
1030 2946 : leafPageSelect[i] = 0; /* signifies current page */
1031 : }
1032 5698 : else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1033 : {
1034 : /*
1035 : * We're trying to split up a long value by repeated suffixing, but
1036 : * it's not going to fit yet. Don't bother allocating a second leaf
1037 : * buffer that we won't be able to use.
1038 : */
1039 44 : newLeafBuffer = InvalidBuffer;
1040 : Assert(includeNew);
1041 44 : Assert(nToInsert == 0);
1042 : }
1043 : else
1044 : {
1045 : /* We will need another leaf page */
1046 : uint8 *nodePageSelect;
1047 : int curspace;
1048 : int newspace;
1049 :
1050 5654 : newLeafBuffer = SpGistGetBuffer(index,
1051 : GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1052 5654 : Min(totalLeafSizes,
1053 : SPGIST_PAGE_CAPACITY),
1054 : &xlrec.initDest);
1055 :
1056 : /*
1057 : * Attempt to assign node groups to the two pages. We might fail to
1058 : * do so, even if totalLeafSizes is less than the available space,
1059 : * because we can't split a group across pages.
1060 : */
1061 5654 : nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1062 :
1063 5654 : curspace = currentFreeSpace;
1064 5654 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1065 44132 : for (i = 0; i < out.nNodes; i++)
1066 : {
1067 38478 : if (leafSizes[i] <= curspace)
1068 : {
1069 24944 : nodePageSelect[i] = 0; /* signifies current page */
1070 24944 : curspace -= leafSizes[i];
1071 : }
1072 : else
1073 : {
1074 13534 : nodePageSelect[i] = 1; /* signifies new leaf page */
1075 13534 : newspace -= leafSizes[i];
1076 : }
1077 : }
1078 5654 : if (curspace >= 0 && newspace >= 0)
1079 : {
1080 : /* Successful assignment, so we can include the new leaf tuple */
1081 5404 : if (includeNew)
1082 : {
1083 5404 : nToInsert++;
1084 5404 : insertedNew = true;
1085 : }
1086 : }
1087 250 : else if (includeNew)
1088 : {
1089 : /* We must exclude the new leaf tuple from the split */
1090 250 : int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1091 :
1092 250 : leafSizes[nodeOfNewTuple] -=
1093 250 : newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1094 :
1095 : /* Repeat the node assignment process --- should succeed now */
1096 250 : curspace = currentFreeSpace;
1097 250 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1098 1214 : for (i = 0; i < out.nNodes; i++)
1099 : {
1100 964 : if (leafSizes[i] <= curspace)
1101 : {
1102 326 : nodePageSelect[i] = 0; /* signifies current page */
1103 326 : curspace -= leafSizes[i];
1104 : }
1105 : else
1106 : {
1107 638 : nodePageSelect[i] = 1; /* signifies new leaf page */
1108 638 : newspace -= leafSizes[i];
1109 : }
1110 : }
1111 250 : if (curspace < 0 || newspace < 0)
1112 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1113 : }
1114 : else
1115 : {
1116 : /* oops, we already excluded new tuple ... should not get here */
1117 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1118 : }
1119 : /* Expand the per-node assignments to be shown per leaf tuple */
1120 733256 : for (i = 0; i < nToInsert; i++)
1121 : {
1122 727602 : n = out.mapTuplesToNodes[i];
1123 727602 : leafPageSelect[i] = nodePageSelect[n];
1124 : }
1125 : }
1126 :
1127 : /* Start preparing WAL record */
1128 5722 : xlrec.nDelete = 0;
1129 5722 : xlrec.initSrc = isNew;
1130 5722 : xlrec.storesNulls = isNulls;
1131 5722 : xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1132 :
1133 5722 : leafdata = leafptr = (char *) palloc(totalLeafSizes);
1134 :
1135 : /* Here we begin making the changes to the target pages */
1136 5722 : START_CRIT_SECTION();
1137 :
1138 : /*
1139 : * Delete old leaf tuples from current buffer, except when we're splitting
1140 : * the root; in that case there's no need because we'll re-init the page
1141 : * below. We do this first to make room for reinserting new leaf tuples.
1142 : */
1143 5722 : if (!SpGistBlockIsRoot(current->blkno))
1144 : {
1145 : /*
1146 : * Init buffer instead of deleting individual tuples, but only if
1147 : * there aren't any other live tuples and only during build; otherwise
1148 : * we need to set a redirection tuple for concurrent scans.
1149 : */
1150 5640 : if (state->isBuild &&
1151 4266 : nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1152 4266 : PageGetMaxOffsetNumber(current->page))
1153 : {
1154 306 : SpGistInitBuffer(current->buffer,
1155 : SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1156 306 : xlrec.initSrc = true;
1157 : }
1158 5334 : else if (isNew)
1159 : {
1160 : /* don't expose the freshly init'd buffer as a backup block */
1161 : Assert(nToDelete == 0);
1162 : }
1163 : else
1164 : {
1165 5290 : xlrec.nDelete = nToDelete;
1166 :
1167 5290 : if (!state->isBuild)
1168 : {
1169 : /*
1170 : * Need to create redirect tuple (it will point to new inner
1171 : * tuple) but right now the new tuple's location is not known
1172 : * yet. So, set the redirection pointer to "impossible" value
1173 : * and remember its position to update tuple later.
1174 : */
1175 1330 : if (nToDelete > 0)
1176 1330 : redirectTuplePos = toDelete[0];
1177 1330 : spgPageIndexMultiDelete(state, current->page,
1178 : toDelete, nToDelete,
1179 : SPGIST_REDIRECT,
1180 : SPGIST_PLACEHOLDER,
1181 : SPGIST_METAPAGE_BLKNO,
1182 : FirstOffsetNumber);
1183 : }
1184 : else
1185 : {
1186 : /*
1187 : * During index build there is not concurrent searches, so we
1188 : * don't need to create redirection tuple.
1189 : */
1190 3960 : spgPageIndexMultiDelete(state, current->page,
1191 : toDelete, nToDelete,
1192 : SPGIST_PLACEHOLDER,
1193 : SPGIST_PLACEHOLDER,
1194 : InvalidBlockNumber,
1195 : InvalidOffsetNumber);
1196 : }
1197 : }
1198 : }
1199 :
1200 : /*
1201 : * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1202 : * nodes.
1203 : */
1204 5722 : startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1205 736270 : for (i = 0; i < nToInsert; i++)
1206 : {
1207 730548 : SpGistLeafTuple it = newLeafs[i];
1208 : Buffer leafBuffer;
1209 : BlockNumber leafBlock;
1210 : OffsetNumber newoffset;
1211 :
1212 : /* Which page is it going to? */
1213 730548 : leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1214 730548 : leafBlock = BufferGetBlockNumber(leafBuffer);
1215 :
1216 : /* Link tuple into correct chain for its node */
1217 730548 : n = out.mapTuplesToNodes[i];
1218 :
1219 730548 : if (ItemPointerIsValid(&nodes[n]->t_tid))
1220 : {
1221 : Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1222 708650 : SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1223 : }
1224 : else
1225 21898 : SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
1226 :
1227 : /* Insert it on page */
1228 730548 : newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1229 730548 : (Item) it, it->size,
1230 730548 : &startOffsets[leafPageSelect[i]],
1231 : false);
1232 730548 : toInsert[i] = newoffset;
1233 :
1234 : /* ... and complete the chain linking */
1235 730548 : ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1236 :
1237 : /* Also copy leaf tuple into WAL data */
1238 730548 : memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1239 730548 : leafptr += newLeafs[i]->size;
1240 : }
1241 :
1242 : /*
1243 : * We're done modifying the other leaf buffer (if any), so mark it dirty.
1244 : * current->buffer will be marked below, after we're entirely done
1245 : * modifying it.
1246 : */
1247 5722 : if (newLeafBuffer != InvalidBuffer)
1248 : {
1249 5654 : MarkBufferDirty(newLeafBuffer);
1250 : }
1251 :
1252 : /* Remember current buffer, since we're about to change "current" */
1253 5722 : saveCurrent = *current;
1254 :
1255 : /*
1256 : * Store the new innerTuple
1257 : */
1258 5722 : if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1259 : {
1260 : /*
1261 : * new inner tuple goes to parent page
1262 : */
1263 : Assert(current->buffer != parent->buffer);
1264 :
1265 : /* Repoint "current" at the new inner tuple */
1266 4868 : current->blkno = parent->blkno;
1267 4868 : current->buffer = parent->buffer;
1268 4868 : current->page = parent->page;
1269 4868 : xlrec.offnumInner = current->offnum =
1270 4868 : SpGistPageAddNewItem(state, current->page,
1271 4868 : (Item) innerTuple, innerTuple->size,
1272 : NULL, false);
1273 :
1274 : /*
1275 : * Update parent node link and mark parent page dirty
1276 : */
1277 4868 : xlrec.innerIsParent = true;
1278 4868 : xlrec.offnumParent = parent->offnum;
1279 4868 : xlrec.nodeI = parent->node;
1280 4868 : saveNodeLink(index, parent, current->blkno, current->offnum);
1281 :
1282 : /*
1283 : * Update redirection link (in old current buffer)
1284 : */
1285 4868 : if (redirectTuplePos != InvalidOffsetNumber)
1286 1202 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1287 1202 : current->blkno, current->offnum);
1288 :
1289 : /* Done modifying old current buffer, mark it dirty */
1290 4868 : MarkBufferDirty(saveCurrent.buffer);
1291 : }
1292 854 : else if (parent->buffer != InvalidBuffer)
1293 : {
1294 : /*
1295 : * new inner tuple will be stored on a new page
1296 : */
1297 : Assert(newInnerBuffer != InvalidBuffer);
1298 :
1299 : /* Repoint "current" at the new inner tuple */
1300 772 : current->buffer = newInnerBuffer;
1301 772 : current->blkno = BufferGetBlockNumber(current->buffer);
1302 772 : current->page = BufferGetPage(current->buffer);
1303 772 : xlrec.offnumInner = current->offnum =
1304 772 : SpGistPageAddNewItem(state, current->page,
1305 772 : (Item) innerTuple, innerTuple->size,
1306 : NULL, false);
1307 :
1308 : /* Done modifying new current buffer, mark it dirty */
1309 772 : MarkBufferDirty(current->buffer);
1310 :
1311 : /*
1312 : * Update parent node link and mark parent page dirty
1313 : */
1314 772 : xlrec.innerIsParent = (parent->buffer == current->buffer);
1315 772 : xlrec.offnumParent = parent->offnum;
1316 772 : xlrec.nodeI = parent->node;
1317 772 : saveNodeLink(index, parent, current->blkno, current->offnum);
1318 :
1319 : /*
1320 : * Update redirection link (in old current buffer)
1321 : */
1322 772 : if (redirectTuplePos != InvalidOffsetNumber)
1323 128 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1324 128 : current->blkno, current->offnum);
1325 :
1326 : /* Done modifying old current buffer, mark it dirty */
1327 772 : MarkBufferDirty(saveCurrent.buffer);
1328 : }
1329 : else
1330 : {
1331 : /*
1332 : * Splitting root page, which was a leaf but now becomes inner page
1333 : * (and so "current" continues to point at it)
1334 : */
1335 : Assert(SpGistBlockIsRoot(current->blkno));
1336 : Assert(redirectTuplePos == InvalidOffsetNumber);
1337 :
1338 82 : SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1339 82 : xlrec.initInner = true;
1340 82 : xlrec.innerIsParent = false;
1341 :
1342 82 : xlrec.offnumInner = current->offnum =
1343 82 : PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1344 : InvalidOffsetNumber, false, false);
1345 82 : if (current->offnum != FirstOffsetNumber)
1346 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1347 : innerTuple->size);
1348 :
1349 : /* No parent link to update, nor redirection to do */
1350 82 : xlrec.offnumParent = InvalidOffsetNumber;
1351 82 : xlrec.nodeI = 0;
1352 :
1353 : /* Done modifying new current buffer, mark it dirty */
1354 82 : MarkBufferDirty(current->buffer);
1355 :
1356 : /* saveCurrent doesn't represent a different buffer */
1357 82 : saveCurrent.buffer = InvalidBuffer;
1358 : }
1359 :
1360 5722 : if (RelationNeedsWAL(index) && !state->isBuild)
1361 : {
1362 : XLogRecPtr recptr;
1363 : int flags;
1364 :
1365 1394 : XLogBeginInsert();
1366 :
1367 1394 : xlrec.nInsert = nToInsert;
1368 1394 : XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit);
1369 :
1370 1394 : XLogRegisterData((char *) toDelete,
1371 1394 : sizeof(OffsetNumber) * xlrec.nDelete);
1372 1394 : XLogRegisterData((char *) toInsert,
1373 1394 : sizeof(OffsetNumber) * xlrec.nInsert);
1374 1394 : XLogRegisterData((char *) leafPageSelect,
1375 1394 : sizeof(uint8) * xlrec.nInsert);
1376 1394 : XLogRegisterData((char *) innerTuple, innerTuple->size);
1377 1394 : XLogRegisterData(leafdata, leafptr - leafdata);
1378 :
1379 : /* Old leaf page */
1380 1394 : if (BufferIsValid(saveCurrent.buffer))
1381 : {
1382 1374 : flags = REGBUF_STANDARD;
1383 1374 : if (xlrec.initSrc)
1384 44 : flags |= REGBUF_WILL_INIT;
1385 1374 : XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1386 : }
1387 :
1388 : /* New leaf page */
1389 1394 : if (BufferIsValid(newLeafBuffer))
1390 : {
1391 1350 : flags = REGBUF_STANDARD;
1392 1350 : if (xlrec.initDest)
1393 1264 : flags |= REGBUF_WILL_INIT;
1394 1350 : XLogRegisterBuffer(1, newLeafBuffer, flags);
1395 : }
1396 :
1397 : /* Inner page */
1398 1394 : flags = REGBUF_STANDARD;
1399 1394 : if (xlrec.initInner)
1400 40 : flags |= REGBUF_WILL_INIT;
1401 1394 : XLogRegisterBuffer(2, current->buffer, flags);
1402 :
1403 : /* Parent page, if different from inner page */
1404 1394 : if (parent->buffer != InvalidBuffer)
1405 : {
1406 1374 : if (parent->buffer != current->buffer)
1407 128 : XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1408 : else
1409 : Assert(xlrec.innerIsParent);
1410 : }
1411 :
1412 : /* Issue the WAL record */
1413 1394 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1414 :
1415 : /* Update page LSNs on all affected pages */
1416 1394 : if (newLeafBuffer != InvalidBuffer)
1417 : {
1418 1350 : Page page = BufferGetPage(newLeafBuffer);
1419 :
1420 1350 : PageSetLSN(page, recptr);
1421 : }
1422 :
1423 1394 : if (saveCurrent.buffer != InvalidBuffer)
1424 : {
1425 1374 : Page page = BufferGetPage(saveCurrent.buffer);
1426 :
1427 1374 : PageSetLSN(page, recptr);
1428 : }
1429 :
1430 1394 : PageSetLSN(current->page, recptr);
1431 :
1432 1394 : if (parent->buffer != InvalidBuffer)
1433 : {
1434 1374 : PageSetLSN(parent->page, recptr);
1435 : }
1436 : }
1437 :
1438 5722 : END_CRIT_SECTION();
1439 :
1440 : /* Update local free-space cache and unlock buffers */
1441 5722 : if (newLeafBuffer != InvalidBuffer)
1442 : {
1443 5654 : SpGistSetLastUsedPage(index, newLeafBuffer);
1444 5654 : UnlockReleaseBuffer(newLeafBuffer);
1445 : }
1446 5722 : if (saveCurrent.buffer != InvalidBuffer)
1447 : {
1448 5640 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1449 5640 : UnlockReleaseBuffer(saveCurrent.buffer);
1450 : }
1451 :
1452 5722 : return insertedNew;
1453 : }
1454 :
1455 : /*
1456 : * spgMatchNode action: descend to N'th child node of current inner tuple
1457 : */
1458 : static void
1459 18668072 : spgMatchNodeAction(Relation index, SpGistState *state,
1460 : SpGistInnerTuple innerTuple,
1461 : SPPageDesc *current, SPPageDesc *parent, int nodeN)
1462 : {
1463 : int i;
1464 : SpGistNodeTuple node;
1465 :
1466 : /* Release previous parent buffer if any */
1467 18668072 : if (parent->buffer != InvalidBuffer &&
1468 17879272 : parent->buffer != current->buffer)
1469 : {
1470 784702 : SpGistSetLastUsedPage(index, parent->buffer);
1471 784702 : UnlockReleaseBuffer(parent->buffer);
1472 : }
1473 :
1474 : /* Repoint parent to specified node of current inner tuple */
1475 18668072 : parent->blkno = current->blkno;
1476 18668072 : parent->buffer = current->buffer;
1477 18668072 : parent->page = current->page;
1478 18668072 : parent->offnum = current->offnum;
1479 18668072 : parent->node = nodeN;
1480 :
1481 : /* Locate that node */
1482 31271200 : SGITITERATE(innerTuple, i, node)
1483 : {
1484 31271200 : if (i == nodeN)
1485 18668072 : break;
1486 : }
1487 :
1488 18668072 : if (i != nodeN)
1489 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1490 : nodeN);
1491 :
1492 : /* Point current to the downlink location, if any */
1493 18668072 : if (ItemPointerIsValid(&node->t_tid))
1494 : {
1495 18665972 : current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1496 18665972 : current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1497 : }
1498 : else
1499 : {
1500 : /* Downlink is empty, so we'll need to find a new page */
1501 2100 : current->blkno = InvalidBlockNumber;
1502 2100 : current->offnum = InvalidOffsetNumber;
1503 : }
1504 :
1505 18668072 : current->buffer = InvalidBuffer;
1506 18668072 : current->page = NULL;
1507 18668072 : }
1508 :
1509 : /*
1510 : * spgAddNode action: add a node to the inner tuple at current
1511 : */
1512 : static void
1513 1598 : spgAddNodeAction(Relation index, SpGistState *state,
1514 : SpGistInnerTuple innerTuple,
1515 : SPPageDesc *current, SPPageDesc *parent,
1516 : int nodeN, Datum nodeLabel)
1517 : {
1518 : SpGistInnerTuple newInnerTuple;
1519 : spgxlogAddNode xlrec;
1520 :
1521 : /* Should not be applied to nulls */
1522 : Assert(!SpGistPageStoresNulls(current->page));
1523 :
1524 : /* Construct new inner tuple with additional node */
1525 1598 : newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1526 :
1527 : /* Prepare WAL record */
1528 1598 : STORE_STATE(state, xlrec.stateSrc);
1529 1598 : xlrec.offnum = current->offnum;
1530 :
1531 : /* we don't fill these unless we need to change the parent downlink */
1532 1598 : xlrec.parentBlk = -1;
1533 1598 : xlrec.offnumParent = InvalidOffsetNumber;
1534 1598 : xlrec.nodeI = 0;
1535 :
1536 : /* we don't fill these unless tuple has to be moved */
1537 1598 : xlrec.offnumNew = InvalidOffsetNumber;
1538 1598 : xlrec.newPage = false;
1539 :
1540 1598 : if (PageGetExactFreeSpace(current->page) >=
1541 1598 : newInnerTuple->size - innerTuple->size)
1542 : {
1543 : /*
1544 : * We can replace the inner tuple by new version in-place
1545 : */
1546 1592 : START_CRIT_SECTION();
1547 :
1548 1592 : PageIndexTupleDelete(current->page, current->offnum);
1549 1592 : if (PageAddItem(current->page,
1550 : (Item) newInnerTuple, newInnerTuple->size,
1551 1592 : current->offnum, false, false) != current->offnum)
1552 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1553 : newInnerTuple->size);
1554 :
1555 1592 : MarkBufferDirty(current->buffer);
1556 :
1557 1592 : if (RelationNeedsWAL(index) && !state->isBuild)
1558 : {
1559 : XLogRecPtr recptr;
1560 :
1561 710 : XLogBeginInsert();
1562 710 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1563 710 : XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1564 :
1565 710 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1566 :
1567 710 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1568 :
1569 710 : PageSetLSN(current->page, recptr);
1570 : }
1571 :
1572 1592 : END_CRIT_SECTION();
1573 : }
1574 : else
1575 : {
1576 : /*
1577 : * move inner tuple to another page, and update parent
1578 : */
1579 : SpGistDeadTuple dt;
1580 : SPPageDesc saveCurrent;
1581 :
1582 : /*
1583 : * It should not be possible to get here for the root page, since we
1584 : * allow only one inner tuple on the root page, and spgFormInnerTuple
1585 : * always checks that inner tuples don't exceed the size of a page.
1586 : */
1587 6 : if (SpGistBlockIsRoot(current->blkno))
1588 0 : elog(ERROR, "cannot enlarge root tuple any more");
1589 : Assert(parent->buffer != InvalidBuffer);
1590 :
1591 6 : saveCurrent = *current;
1592 :
1593 6 : xlrec.offnumParent = parent->offnum;
1594 6 : xlrec.nodeI = parent->node;
1595 :
1596 : /*
1597 : * obtain new buffer with the same parity as current, since it will be
1598 : * a child of same parent tuple
1599 : */
1600 12 : current->buffer = SpGistGetBuffer(index,
1601 6 : GBUF_INNER_PARITY(current->blkno),
1602 6 : newInnerTuple->size + sizeof(ItemIdData),
1603 : &xlrec.newPage);
1604 6 : current->blkno = BufferGetBlockNumber(current->buffer);
1605 6 : current->page = BufferGetPage(current->buffer);
1606 :
1607 : /*
1608 : * Let's just make real sure new current isn't same as old. Right now
1609 : * that's impossible, but if SpGistGetBuffer ever got smart enough to
1610 : * delete placeholder tuples before checking space, maybe it wouldn't
1611 : * be impossible. The case would appear to work except that WAL
1612 : * replay would be subtly wrong, so I think a mere assert isn't enough
1613 : * here.
1614 : */
1615 6 : if (current->blkno == saveCurrent.blkno)
1616 0 : elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1617 :
1618 : /*
1619 : * New current and parent buffer will both be modified; but note that
1620 : * parent buffer could be same as either new or old current.
1621 : */
1622 6 : if (parent->buffer == saveCurrent.buffer)
1623 0 : xlrec.parentBlk = 0;
1624 6 : else if (parent->buffer == current->buffer)
1625 0 : xlrec.parentBlk = 1;
1626 : else
1627 6 : xlrec.parentBlk = 2;
1628 :
1629 6 : START_CRIT_SECTION();
1630 :
1631 : /* insert new ... */
1632 6 : xlrec.offnumNew = current->offnum =
1633 6 : SpGistPageAddNewItem(state, current->page,
1634 6 : (Item) newInnerTuple, newInnerTuple->size,
1635 : NULL, false);
1636 :
1637 6 : MarkBufferDirty(current->buffer);
1638 :
1639 : /* update parent's downlink and mark parent page dirty */
1640 6 : saveNodeLink(index, parent, current->blkno, current->offnum);
1641 :
1642 : /*
1643 : * Replace old tuple with a placeholder or redirection tuple. Unless
1644 : * doing an index build, we have to insert a redirection tuple for
1645 : * possible concurrent scans. We can't just delete it in any case,
1646 : * because that could change the offsets of other tuples on the page,
1647 : * breaking downlinks from their parents.
1648 : */
1649 6 : if (state->isBuild)
1650 0 : dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1651 : InvalidBlockNumber, InvalidOffsetNumber);
1652 : else
1653 6 : dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1654 6 : current->blkno, current->offnum);
1655 :
1656 6 : PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1657 6 : if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1658 : saveCurrent.offnum,
1659 6 : false, false) != saveCurrent.offnum)
1660 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1661 : dt->size);
1662 :
1663 6 : if (state->isBuild)
1664 0 : SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1665 : else
1666 6 : SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1667 :
1668 6 : MarkBufferDirty(saveCurrent.buffer);
1669 :
1670 6 : if (RelationNeedsWAL(index) && !state->isBuild)
1671 : {
1672 : XLogRecPtr recptr;
1673 : int flags;
1674 :
1675 6 : XLogBeginInsert();
1676 :
1677 : /* orig page */
1678 6 : XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1679 : /* new page */
1680 6 : flags = REGBUF_STANDARD;
1681 6 : if (xlrec.newPage)
1682 6 : flags |= REGBUF_WILL_INIT;
1683 6 : XLogRegisterBuffer(1, current->buffer, flags);
1684 : /* parent page (if different from orig and new) */
1685 6 : if (xlrec.parentBlk == 2)
1686 6 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1687 :
1688 6 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1689 6 : XLogRegisterData((char *) newInnerTuple, newInnerTuple->size);
1690 :
1691 6 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1692 :
1693 : /* we don't bother to check if any of these are redundant */
1694 6 : PageSetLSN(current->page, recptr);
1695 6 : PageSetLSN(parent->page, recptr);
1696 6 : PageSetLSN(saveCurrent.page, recptr);
1697 : }
1698 :
1699 6 : END_CRIT_SECTION();
1700 :
1701 : /* Release saveCurrent if it's not same as current or parent */
1702 6 : if (saveCurrent.buffer != current->buffer &&
1703 6 : saveCurrent.buffer != parent->buffer)
1704 : {
1705 6 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1706 6 : UnlockReleaseBuffer(saveCurrent.buffer);
1707 : }
1708 : }
1709 1598 : }
1710 :
1711 : /*
1712 : * spgSplitNode action: split inner tuple at current into prefix and postfix
1713 : */
1714 : static void
1715 646 : spgSplitNodeAction(Relation index, SpGistState *state,
1716 : SpGistInnerTuple innerTuple,
1717 : SPPageDesc *current, spgChooseOut *out)
1718 : {
1719 : SpGistInnerTuple prefixTuple,
1720 : postfixTuple;
1721 : SpGistNodeTuple node,
1722 : *nodes;
1723 : BlockNumber postfixBlkno;
1724 : OffsetNumber postfixOffset;
1725 : int i;
1726 : spgxlogSplitTuple xlrec;
1727 646 : Buffer newBuffer = InvalidBuffer;
1728 :
1729 : /* Should not be applied to nulls */
1730 : Assert(!SpGistPageStoresNulls(current->page));
1731 :
1732 : /* Check opclass gave us sane values */
1733 646 : if (out->result.splitTuple.prefixNNodes <= 0 ||
1734 646 : out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1735 0 : elog(ERROR, "invalid number of prefix nodes: %d",
1736 : out->result.splitTuple.prefixNNodes);
1737 646 : if (out->result.splitTuple.childNodeN < 0 ||
1738 646 : out->result.splitTuple.childNodeN >=
1739 646 : out->result.splitTuple.prefixNNodes)
1740 0 : elog(ERROR, "invalid child node number: %d",
1741 : out->result.splitTuple.childNodeN);
1742 :
1743 : /*
1744 : * Construct new prefix tuple with requested number of nodes. We'll fill
1745 : * in the childNodeN'th node's downlink below.
1746 : */
1747 646 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1748 646 : out->result.splitTuple.prefixNNodes);
1749 :
1750 1292 : for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1751 : {
1752 646 : Datum label = (Datum) 0;
1753 : bool labelisnull;
1754 :
1755 646 : labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1756 646 : if (!labelisnull)
1757 646 : label = out->result.splitTuple.prefixNodeLabels[i];
1758 646 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1759 : }
1760 :
1761 646 : prefixTuple = spgFormInnerTuple(state,
1762 646 : out->result.splitTuple.prefixHasPrefix,
1763 : out->result.splitTuple.prefixPrefixDatum,
1764 : out->result.splitTuple.prefixNNodes,
1765 : nodes);
1766 :
1767 : /* it must fit in the space that innerTuple now occupies */
1768 646 : if (prefixTuple->size > innerTuple->size)
1769 0 : elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1770 :
1771 : /*
1772 : * Construct new postfix tuple, containing all nodes of innerTuple with
1773 : * same node datums, but with the prefix specified by the picksplit
1774 : * function.
1775 : */
1776 646 : nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1777 2286 : SGITITERATE(innerTuple, i, node)
1778 : {
1779 1640 : nodes[i] = node;
1780 : }
1781 :
1782 646 : postfixTuple = spgFormInnerTuple(state,
1783 646 : out->result.splitTuple.postfixHasPrefix,
1784 : out->result.splitTuple.postfixPrefixDatum,
1785 646 : innerTuple->nNodes, nodes);
1786 :
1787 : /* Postfix tuple is allTheSame if original tuple was */
1788 646 : postfixTuple->allTheSame = innerTuple->allTheSame;
1789 :
1790 : /* prep data for WAL record */
1791 646 : xlrec.newPage = false;
1792 :
1793 : /*
1794 : * If we can't fit both tuples on the current page, get a new page for the
1795 : * postfix tuple. In particular, can't split to the root page.
1796 : *
1797 : * For the space calculation, note that prefixTuple replaces innerTuple
1798 : * but postfixTuple will be a new entry.
1799 : */
1800 646 : if (SpGistBlockIsRoot(current->blkno) ||
1801 634 : SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1802 634 : prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1803 : {
1804 : /*
1805 : * Choose page with next triple parity, because postfix tuple is a
1806 : * child of prefix one
1807 : */
1808 168 : newBuffer = SpGistGetBuffer(index,
1809 168 : GBUF_INNER_PARITY(current->blkno + 1),
1810 168 : postfixTuple->size + sizeof(ItemIdData),
1811 : &xlrec.newPage);
1812 : }
1813 :
1814 646 : START_CRIT_SECTION();
1815 :
1816 : /*
1817 : * Replace old tuple by prefix tuple
1818 : */
1819 646 : PageIndexTupleDelete(current->page, current->offnum);
1820 646 : xlrec.offnumPrefix = PageAddItem(current->page,
1821 : (Item) prefixTuple, prefixTuple->size,
1822 : current->offnum, false, false);
1823 646 : if (xlrec.offnumPrefix != current->offnum)
1824 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1825 : prefixTuple->size);
1826 :
1827 : /*
1828 : * put postfix tuple into appropriate page
1829 : */
1830 646 : if (newBuffer == InvalidBuffer)
1831 : {
1832 478 : postfixBlkno = current->blkno;
1833 478 : xlrec.offnumPostfix = postfixOffset =
1834 478 : SpGistPageAddNewItem(state, current->page,
1835 478 : (Item) postfixTuple, postfixTuple->size,
1836 : NULL, false);
1837 478 : xlrec.postfixBlkSame = true;
1838 : }
1839 : else
1840 : {
1841 168 : postfixBlkno = BufferGetBlockNumber(newBuffer);
1842 168 : xlrec.offnumPostfix = postfixOffset =
1843 168 : SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1844 168 : (Item) postfixTuple, postfixTuple->size,
1845 : NULL, false);
1846 168 : MarkBufferDirty(newBuffer);
1847 168 : xlrec.postfixBlkSame = false;
1848 : }
1849 :
1850 : /*
1851 : * And set downlink pointer in the prefix tuple to point to postfix tuple.
1852 : * (We can't avoid this step by doing the above two steps in opposite
1853 : * order, because there might not be enough space on the page to insert
1854 : * the postfix tuple first.) We have to update the local copy of the
1855 : * prefixTuple too, because that's what will be written to WAL.
1856 : */
1857 646 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1858 : postfixBlkno, postfixOffset);
1859 646 : prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1860 646 : PageGetItemId(current->page, current->offnum));
1861 646 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1862 : postfixBlkno, postfixOffset);
1863 :
1864 646 : MarkBufferDirty(current->buffer);
1865 :
1866 646 : if (RelationNeedsWAL(index) && !state->isBuild)
1867 : {
1868 : XLogRecPtr recptr;
1869 :
1870 612 : XLogBeginInsert();
1871 612 : XLogRegisterData((char *) &xlrec, sizeof(xlrec));
1872 612 : XLogRegisterData((char *) prefixTuple, prefixTuple->size);
1873 612 : XLogRegisterData((char *) postfixTuple, postfixTuple->size);
1874 :
1875 612 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1876 612 : if (newBuffer != InvalidBuffer)
1877 : {
1878 : int flags;
1879 :
1880 162 : flags = REGBUF_STANDARD;
1881 162 : if (xlrec.newPage)
1882 6 : flags |= REGBUF_WILL_INIT;
1883 162 : XLogRegisterBuffer(1, newBuffer, flags);
1884 : }
1885 :
1886 612 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1887 :
1888 612 : PageSetLSN(current->page, recptr);
1889 :
1890 612 : if (newBuffer != InvalidBuffer)
1891 : {
1892 162 : PageSetLSN(BufferGetPage(newBuffer), recptr);
1893 : }
1894 : }
1895 :
1896 646 : END_CRIT_SECTION();
1897 :
1898 : /* Update local free-space cache and release buffer */
1899 646 : if (newBuffer != InvalidBuffer)
1900 : {
1901 168 : SpGistSetLastUsedPage(index, newBuffer);
1902 168 : UnlockReleaseBuffer(newBuffer);
1903 : }
1904 646 : }
1905 :
1906 : /*
1907 : * Insert one item into the index.
1908 : *
1909 : * Returns true on success, false if we failed to complete the insertion
1910 : * (typically because of conflict with a concurrent insert). In the latter
1911 : * case, caller should re-call spgdoinsert() with the same args.
1912 : */
1913 : bool
1914 805778 : spgdoinsert(Relation index, SpGistState *state,
1915 : ItemPointer heapPtr, Datum *datums, bool *isnulls)
1916 : {
1917 805778 : bool result = true;
1918 805778 : TupleDesc leafDescriptor = state->leafTupDesc;
1919 805778 : bool isnull = isnulls[spgKeyColumn];
1920 805778 : int level = 0;
1921 : Datum leafDatums[INDEX_MAX_KEYS];
1922 : int leafSize;
1923 : int bestLeafSize;
1924 805778 : int numNoProgressCycles = 0;
1925 : SPPageDesc current,
1926 : parent;
1927 805778 : FmgrInfo *procinfo = NULL;
1928 :
1929 : /*
1930 : * Look up FmgrInfo of the user-defined choose function once, to save
1931 : * cycles in the loop below.
1932 : */
1933 805778 : if (!isnull)
1934 805688 : procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1935 :
1936 : /*
1937 : * Prepare the leaf datum to insert.
1938 : *
1939 : * If an optional "compress" method is provided, then call it to form the
1940 : * leaf key datum from the input datum. Otherwise, store the input datum
1941 : * as is. Since we don't use index_form_tuple in this AM, we have to make
1942 : * sure value to be inserted is not toasted; FormIndexDatum doesn't
1943 : * guarantee that. But we assume the "compress" method to return an
1944 : * untoasted value.
1945 : */
1946 805778 : if (!isnull)
1947 : {
1948 805688 : if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1949 : {
1950 79544 : FmgrInfo *compressProcinfo = NULL;
1951 :
1952 79544 : compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1953 79544 : leafDatums[spgKeyColumn] =
1954 79544 : FunctionCall1Coll(compressProcinfo,
1955 79544 : index->rd_indcollation[spgKeyColumn],
1956 : datums[spgKeyColumn]);
1957 : }
1958 : else
1959 : {
1960 : Assert(state->attLeafType.type == state->attType.type);
1961 :
1962 726144 : if (state->attType.attlen == -1)
1963 179472 : leafDatums[spgKeyColumn] =
1964 179472 : PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
1965 : else
1966 546672 : leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1967 : }
1968 : }
1969 : else
1970 90 : leafDatums[spgKeyColumn] = (Datum) 0;
1971 :
1972 : /* Likewise, ensure that any INCLUDE values are not toasted */
1973 898884 : for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1974 : {
1975 93106 : if (!isnulls[i])
1976 : {
1977 86080 : if (TupleDescCompactAttr(leafDescriptor, i)->attlen == -1)
1978 13544 : leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1979 : else
1980 72536 : leafDatums[i] = datums[i];
1981 : }
1982 : else
1983 7026 : leafDatums[i] = (Datum) 0;
1984 : }
1985 :
1986 : /*
1987 : * Compute space needed for a leaf tuple containing the given data.
1988 : */
1989 805778 : leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1990 : /* Account for an item pointer, too */
1991 805778 : leafSize += sizeof(ItemIdData);
1992 :
1993 : /*
1994 : * If it isn't gonna fit, and the opclass can't reduce the datum size by
1995 : * suffixing, bail out now rather than doing a lot of useless work.
1996 : */
1997 805778 : if (leafSize > SPGIST_PAGE_CAPACITY &&
1998 4 : (isnull || !state->config.longValuesOK))
1999 0 : ereport(ERROR,
2000 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2001 : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2002 : leafSize - sizeof(ItemIdData),
2003 : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2004 : RelationGetRelationName(index)),
2005 : errhint("Values larger than a buffer page cannot be indexed.")));
2006 805778 : bestLeafSize = leafSize;
2007 :
2008 : /* Initialize "current" to the appropriate root page */
2009 805778 : current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2010 805778 : current.buffer = InvalidBuffer;
2011 805778 : current.page = NULL;
2012 805778 : current.offnum = FirstOffsetNumber;
2013 805778 : current.node = -1;
2014 :
2015 : /* "parent" is invalid for the moment */
2016 805778 : parent.blkno = InvalidBlockNumber;
2017 805778 : parent.buffer = InvalidBuffer;
2018 805778 : parent.page = NULL;
2019 805778 : parent.offnum = InvalidOffsetNumber;
2020 805778 : parent.node = -1;
2021 :
2022 : /*
2023 : * Before entering the loop, try to clear any pending interrupt condition.
2024 : * If a query cancel is pending, we might as well accept it now not later;
2025 : * while if a non-canceling condition is pending, servicing it here avoids
2026 : * having to restart the insertion and redo all the work so far.
2027 : */
2028 805778 : CHECK_FOR_INTERRUPTS();
2029 :
2030 : for (;;)
2031 18668068 : {
2032 19473846 : bool isNew = false;
2033 :
2034 : /*
2035 : * Bail out if query cancel is pending. We must have this somewhere
2036 : * in the loop since a broken opclass could produce an infinite
2037 : * picksplit loop. However, because we'll be holding buffer lock(s)
2038 : * after the first iteration, ProcessInterrupts() wouldn't be able to
2039 : * throw a cancel error here. Hence, if we see that an interrupt is
2040 : * pending, break out of the loop and deal with the situation below.
2041 : * Set result = false because we must restart the insertion if the
2042 : * interrupt isn't a query-cancel-or-die case.
2043 : */
2044 19473846 : if (INTERRUPTS_PENDING_CONDITION())
2045 : {
2046 0 : result = false;
2047 0 : break;
2048 : }
2049 :
2050 19473846 : if (current.blkno == InvalidBlockNumber)
2051 : {
2052 : /*
2053 : * Create a leaf page. If leafSize is too large to fit on a page,
2054 : * we won't actually use the page yet, but it simplifies the API
2055 : * for doPickSplit to always have a leaf page at hand; so just
2056 : * quietly limit our request to a page size.
2057 : */
2058 2096 : current.buffer =
2059 2096 : SpGistGetBuffer(index,
2060 : GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2061 2096 : Min(leafSize, SPGIST_PAGE_CAPACITY),
2062 : &isNew);
2063 2096 : current.blkno = BufferGetBlockNumber(current.buffer);
2064 : }
2065 19471750 : else if (parent.buffer == InvalidBuffer)
2066 : {
2067 : /* we hold no parent-page lock, so no deadlock is possible */
2068 805778 : current.buffer = ReadBuffer(index, current.blkno);
2069 805778 : LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2070 : }
2071 18665972 : else if (current.blkno != parent.blkno)
2072 : {
2073 : /* descend to a new child page */
2074 1571614 : current.buffer = ReadBuffer(index, current.blkno);
2075 :
2076 : /*
2077 : * Attempt to acquire lock on child page. We must beware of
2078 : * deadlock against another insertion process descending from that
2079 : * page to our parent page (see README). If we fail to get lock,
2080 : * abandon the insertion and tell our caller to start over.
2081 : *
2082 : * XXX this could be improved, because failing to get lock on a
2083 : * buffer is not proof of a deadlock situation; the lock might be
2084 : * held by a reader, or even just background writer/checkpointer
2085 : * process. Perhaps it'd be worth retrying after sleeping a bit?
2086 : */
2087 1571614 : if (!ConditionalLockBuffer(current.buffer))
2088 : {
2089 132 : ReleaseBuffer(current.buffer);
2090 132 : UnlockReleaseBuffer(parent.buffer);
2091 132 : return false;
2092 : }
2093 : }
2094 : else
2095 : {
2096 : /* inner tuple can be stored on the same page as parent one */
2097 17094358 : current.buffer = parent.buffer;
2098 : }
2099 19473714 : current.page = BufferGetPage(current.buffer);
2100 :
2101 : /* should not arrive at a page of the wrong type */
2102 38947338 : if (isnull ? !SpGistPageStoresNulls(current.page) :
2103 19473624 : SpGistPageStoresNulls(current.page))
2104 0 : elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2105 : current.blkno);
2106 :
2107 19473714 : if (SpGistPageIsLeaf(current.page))
2108 : {
2109 : SpGistLeafTuple leafTuple;
2110 : int nToSplit,
2111 : sizeToSplit;
2112 :
2113 805936 : leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2114 1611872 : if (leafTuple->size + sizeof(ItemIdData) <=
2115 805936 : SpGistPageGetFreeSpace(current.page, 1))
2116 : {
2117 : /* it fits on page, so insert it and we're done */
2118 797976 : addLeafTuple(index, state, leafTuple,
2119 : ¤t, &parent, isnull, isNew);
2120 805642 : break;
2121 : }
2122 7960 : else if ((sizeToSplit =
2123 7960 : checkSplitConditions(index, state, ¤t,
2124 3664 : &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2125 3664 : nToSplit < 64 &&
2126 2286 : leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2127 : {
2128 : /*
2129 : * the amount of data is pretty small, so just move the whole
2130 : * chain to another leaf page rather than splitting it.
2131 : */
2132 : Assert(!isNew);
2133 2238 : moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
2134 2238 : break; /* we're done */
2135 : }
2136 : else
2137 : {
2138 : /* picksplit */
2139 5722 : if (doPickSplit(index, state, ¤t, &parent,
2140 : leafTuple, level, isnull, isNew))
2141 5428 : break; /* doPickSplit installed new tuples */
2142 :
2143 : /* leaf tuple will not be inserted yet */
2144 294 : pfree(leafTuple);
2145 :
2146 : /*
2147 : * current now describes new inner tuple, go insert into it
2148 : */
2149 : Assert(!SpGistPageIsLeaf(current.page));
2150 294 : goto process_inner_tuple;
2151 : }
2152 : }
2153 : else /* non-leaf page */
2154 : {
2155 : /*
2156 : * Apply the opclass choose function to figure out how to insert
2157 : * the given datum into the current inner tuple.
2158 : */
2159 : SpGistInnerTuple innerTuple;
2160 : spgChooseIn in;
2161 : spgChooseOut out;
2162 :
2163 : /*
2164 : * spgAddNode and spgSplitTuple cases will loop back to here to
2165 : * complete the insertion operation. Just in case the choose
2166 : * function is broken and produces add or split requests
2167 : * repeatedly, check for query cancel (see comments above).
2168 : */
2169 18668072 : process_inner_tuple:
2170 18670316 : if (INTERRUPTS_PENDING_CONDITION())
2171 : {
2172 0 : result = false;
2173 0 : break;
2174 : }
2175 :
2176 18670316 : innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2177 18670316 : PageGetItemId(current.page, current.offnum));
2178 :
2179 18670316 : in.datum = datums[spgKeyColumn];
2180 18670316 : in.leafDatum = leafDatums[spgKeyColumn];
2181 18670316 : in.level = level;
2182 18670316 : in.allTheSame = innerTuple->allTheSame;
2183 18670316 : in.hasPrefix = (innerTuple->prefixSize > 0);
2184 18670316 : in.prefixDatum = SGITDATUM(innerTuple, state);
2185 18670316 : in.nNodes = innerTuple->nNodes;
2186 18670316 : in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2187 :
2188 18670316 : memset(&out, 0, sizeof(out));
2189 :
2190 18670316 : if (!isnull)
2191 : {
2192 : /* use user-defined choose method */
2193 18670316 : FunctionCall2Coll(procinfo,
2194 18670316 : index->rd_indcollation[0],
2195 : PointerGetDatum(&in),
2196 : PointerGetDatum(&out));
2197 : }
2198 : else
2199 : {
2200 : /* force "match" action (to insert to random subnode) */
2201 0 : out.resultType = spgMatchNode;
2202 : }
2203 :
2204 18670316 : if (innerTuple->allTheSame)
2205 : {
2206 : /*
2207 : * It's not allowed to do an AddNode at an allTheSame tuple.
2208 : * Opclass must say "match", in which case we choose a random
2209 : * one of the nodes to descend into, or "split".
2210 : */
2211 143204 : if (out.resultType == spgAddNode)
2212 0 : elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2213 143204 : else if (out.resultType == spgMatchNode)
2214 143192 : out.result.matchNode.nodeN =
2215 143192 : pg_prng_uint64_range(&pg_global_prng_state,
2216 143192 : 0, innerTuple->nNodes - 1);
2217 : }
2218 :
2219 18670316 : switch (out.resultType)
2220 : {
2221 18668072 : case spgMatchNode:
2222 : /* Descend to N'th child node */
2223 18668072 : spgMatchNodeAction(index, state, innerTuple,
2224 : ¤t, &parent,
2225 : out.result.matchNode.nodeN);
2226 : /* Adjust level as per opclass request */
2227 18668072 : level += out.result.matchNode.levelAdd;
2228 : /* Replace leafDatum and recompute leafSize */
2229 18668072 : if (!isnull)
2230 : {
2231 18668072 : leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2232 18668072 : leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2233 : leafDatums, isnulls);
2234 18668072 : leafSize += sizeof(ItemIdData);
2235 : }
2236 :
2237 : /*
2238 : * Check new tuple size; fail if it can't fit, unless the
2239 : * opclass says it can handle the situation by suffixing.
2240 : *
2241 : * However, the opclass can only shorten the leaf datum,
2242 : * which may not be enough to ever make the tuple fit,
2243 : * since INCLUDE columns might alone use more than a page.
2244 : * Depending on the opclass' behavior, that could lead to
2245 : * an infinite loop --- spgtextproc.c, for example, will
2246 : * just repeatedly generate an empty-string leaf datum
2247 : * once it runs out of data. Actual bugs in opclasses
2248 : * might cause infinite looping, too. To detect such a
2249 : * loop, check to see if we are making progress by
2250 : * reducing the leafSize in each pass. This is a bit
2251 : * tricky though. Because of alignment considerations,
2252 : * the total tuple size might not decrease on every pass.
2253 : * Also, there are edge cases where the choose method
2254 : * might seem to not make progress for a cycle or two.
2255 : * Somewhat arbitrarily, we allow up to 10 no-progress
2256 : * iterations before failing. (This limit should be more
2257 : * than MAXALIGN, to accommodate opclasses that trim one
2258 : * byte from the leaf datum per pass.)
2259 : */
2260 18668072 : if (leafSize > SPGIST_PAGE_CAPACITY)
2261 : {
2262 52 : bool ok = false;
2263 :
2264 52 : if (state->config.longValuesOK && !isnull)
2265 : {
2266 52 : if (leafSize < bestLeafSize)
2267 : {
2268 4 : ok = true;
2269 4 : bestLeafSize = leafSize;
2270 4 : numNoProgressCycles = 0;
2271 : }
2272 48 : else if (++numNoProgressCycles < 10)
2273 44 : ok = true;
2274 : }
2275 52 : if (!ok)
2276 4 : ereport(ERROR,
2277 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2278 : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2279 : leafSize - sizeof(ItemIdData),
2280 : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2281 : RelationGetRelationName(index)),
2282 : errhint("Values larger than a buffer page cannot be indexed.")));
2283 : }
2284 :
2285 : /*
2286 : * Loop around and attempt to insert the new leafDatum at
2287 : * "current" (which might reference an existing child
2288 : * tuple, or might be invalid to force us to find a new
2289 : * page for the tuple).
2290 : */
2291 18668068 : break;
2292 1598 : case spgAddNode:
2293 : /* AddNode is not sensible if nodes don't have labels */
2294 1598 : if (in.nodeLabels == NULL)
2295 0 : elog(ERROR, "cannot add a node to an inner tuple without node labels");
2296 : /* Add node to inner tuple, per request */
2297 1598 : spgAddNodeAction(index, state, innerTuple,
2298 : ¤t, &parent,
2299 : out.result.addNode.nodeN,
2300 : out.result.addNode.nodeLabel);
2301 :
2302 : /*
2303 : * Retry insertion into the enlarged node. We assume that
2304 : * we'll get a MatchNode result this time.
2305 : */
2306 1598 : goto process_inner_tuple;
2307 : break;
2308 646 : case spgSplitTuple:
2309 : /* Split inner tuple, per request */
2310 646 : spgSplitNodeAction(index, state, innerTuple,
2311 : ¤t, &out);
2312 :
2313 : /* Retry insertion into the split node */
2314 646 : goto process_inner_tuple;
2315 : break;
2316 0 : default:
2317 0 : elog(ERROR, "unrecognized SPGiST choose result: %d",
2318 : (int) out.resultType);
2319 : break;
2320 : }
2321 : }
2322 : } /* end loop */
2323 :
2324 : /*
2325 : * Release any buffers we're still holding. Beware of possibility that
2326 : * current and parent reference same buffer.
2327 : */
2328 805642 : if (current.buffer != InvalidBuffer)
2329 : {
2330 805642 : SpGistSetLastUsedPage(index, current.buffer);
2331 805642 : UnlockReleaseBuffer(current.buffer);
2332 : }
2333 805642 : if (parent.buffer != InvalidBuffer &&
2334 788664 : parent.buffer != current.buffer)
2335 : {
2336 784008 : SpGistSetLastUsedPage(index, parent.buffer);
2337 784008 : UnlockReleaseBuffer(parent.buffer);
2338 : }
2339 :
2340 : /*
2341 : * We do not support being called while some outer function is holding a
2342 : * buffer lock (or any other reason to postpone query cancels). If that
2343 : * were the case, telling the caller to retry would create an infinite
2344 : * loop.
2345 : */
2346 : Assert(INTERRUPTS_CAN_BE_PROCESSED());
2347 :
2348 : /*
2349 : * Finally, check for interrupts again. If there was a query cancel,
2350 : * ProcessInterrupts() will be able to throw the error here. If it was
2351 : * some other kind of interrupt that can just be cleared, return false to
2352 : * tell our caller to retry.
2353 : */
2354 805642 : CHECK_FOR_INTERRUPTS();
2355 :
2356 805642 : return result;
2357 : }
|