Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * spgdoinsert.c
4 : * implementation of insert algorithm
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/spgist/spgdoinsert.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres.h"
17 :
18 : #include "access/genam.h"
19 : #include "access/spgist_private.h"
20 : #include "access/spgxlog.h"
21 : #include "access/xloginsert.h"
22 : #include "common/int.h"
23 : #include "common/pg_prng.h"
24 : #include "miscadmin.h"
25 : #include "storage/bufmgr.h"
26 : #include "utils/rel.h"
27 :
28 :
29 : /*
30 : * SPPageDesc tracks all info about a page we are inserting into. In some
31 : * situations it actually identifies a tuple, or even a specific node within
32 : * an inner tuple. But any of the fields can be invalid. If the buffer
33 : * field is valid, it implies we hold pin and exclusive lock on that buffer.
34 : * page pointer should be valid exactly when buffer is.
35 : */
36 : typedef struct SPPageDesc
37 : {
38 : BlockNumber blkno; /* block number, or InvalidBlockNumber */
39 : Buffer buffer; /* page's buffer number, or InvalidBuffer */
40 : Page page; /* pointer to page buffer, or NULL */
41 : OffsetNumber offnum; /* offset of tuple, or InvalidOffsetNumber */
42 : int node; /* node number within inner tuple, or -1 */
43 : } SPPageDesc;
44 :
45 :
46 : /*
47 : * Set the item pointer in the nodeN'th entry in inner tuple tup. This
48 : * is used to update the parent inner tuple's downlink after a move or
49 : * split operation.
50 : */
51 : void
52 11828 : spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
53 : BlockNumber blkno, OffsetNumber offset)
54 : {
55 : int i;
56 : SpGistNodeTuple node;
57 :
58 61008 : SGITITERATE(tup, i, node)
59 : {
60 61008 : if (i == nodeN)
61 : {
62 11828 : ItemPointerSet(&node->t_tid, blkno, offset);
63 11828 : return;
64 : }
65 : }
66 :
67 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
68 : nodeN);
69 : }
70 :
71 : /*
72 : * Form a new inner tuple containing one more node than the given one, with
73 : * the specified label datum, inserted at offset "offset" in the node array.
74 : * The new tuple's prefix is the same as the old one's.
75 : *
76 : * Note that the new node initially has an invalid downlink. We'll find a
77 : * page to point it to later.
78 : */
79 : static SpGistInnerTuple
80 1582 : addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
81 : {
82 : SpGistNodeTuple node,
83 : *nodes;
84 : int i;
85 :
86 : /* if offset is negative, insert at end */
87 1582 : if (offset < 0)
88 0 : offset = tuple->nNodes;
89 1582 : else if (offset > tuple->nNodes)
90 0 : elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
91 :
92 1582 : nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
93 10214 : SGITITERATE(tuple, i, node)
94 : {
95 8632 : if (i < offset)
96 7426 : nodes[i] = node;
97 : else
98 1206 : nodes[i + 1] = node;
99 : }
100 :
101 1582 : nodes[offset] = spgFormNodeTuple(state, label, false);
102 :
103 1582 : return spgFormInnerTuple(state,
104 1582 : (tuple->prefixSize > 0),
105 752 : SGITDATUM(tuple, state),
106 1582 : tuple->nNodes + 1,
107 : nodes);
108 : }
109 :
110 : /* qsort comparator for sorting OffsetNumbers */
111 : static int
112 5350480 : cmpOffsetNumbers(const void *a, const void *b)
113 : {
114 5350480 : return pg_cmp_u16(*(const OffsetNumber *) a, *(const OffsetNumber *) b);
115 : }
116 :
117 : /*
118 : * Delete multiple tuples from an index page, preserving tuple offset numbers.
119 : *
120 : * The first tuple in the given list is replaced with a dead tuple of type
121 : * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122 : * with dead tuples of type "reststate". If either firststate or reststate
123 : * is REDIRECT, blkno/offnum specify where to link to.
124 : *
125 : * NB: this is used during WAL replay, so beware of trying to make it too
126 : * smart. In particular, it shouldn't use "state" except for calling
127 : * spgFormDeadTuple(). This is also used in a critical section, so no
128 : * pallocs either!
129 : */
130 : void
131 8402 : spgPageIndexMultiDelete(SpGistState *state, Page page,
132 : OffsetNumber *itemnos, int nitems,
133 : int firststate, int reststate,
134 : BlockNumber blkno, OffsetNumber offnum)
135 : {
136 : OffsetNumber firstItem;
137 : OffsetNumber sortednos[MaxIndexTuplesPerPage];
138 8402 : SpGistDeadTuple tuple = NULL;
139 : int i;
140 :
141 8402 : if (nitems == 0)
142 174 : return; /* nothing to do */
143 :
144 : /*
145 : * For efficiency we want to use PageIndexMultiDelete, which requires the
146 : * targets to be listed in sorted order, so we have to sort the itemnos
147 : * array. (This also greatly simplifies the math for reinserting the
148 : * replacement tuples.) However, we must not scribble on the caller's
149 : * array, so we have to make a copy.
150 : */
151 8228 : memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152 8228 : if (nitems > 1)
153 8096 : qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154 :
155 8228 : PageIndexMultiDelete(page, sortednos, nitems);
156 :
157 8228 : firstItem = itemnos[0];
158 :
159 786060 : for (i = 0; i < nitems; i++)
160 : {
161 777832 : OffsetNumber itemno = sortednos[i];
162 : int tupstate;
163 :
164 777832 : tupstate = (itemno == firstItem) ? firststate : reststate;
165 777832 : if (tuple == NULL || tuple->tupstate != tupstate)
166 12808 : tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167 :
168 777832 : if (PageAddItem(page, tuple, tuple->size, itemno, false, false) != itemno)
169 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
170 : tuple->size);
171 :
172 777832 : if (tupstate == SPGIST_REDIRECT)
173 2358 : SpGistPageGetOpaque(page)->nRedirection++;
174 775474 : else if (tupstate == SPGIST_PLACEHOLDER)
175 775474 : SpGistPageGetOpaque(page)->nPlaceholder++;
176 : }
177 : }
178 :
179 : /*
180 : * Update the parent inner tuple's downlink, and mark the parent buffer
181 : * dirty (this must be the last change to the parent page in the current
182 : * WAL action).
183 : */
184 : static void
185 9782 : saveNodeLink(Relation index, SPPageDesc *parent,
186 : BlockNumber blkno, OffsetNumber offnum)
187 : {
188 : SpGistInnerTuple innerTuple;
189 :
190 9782 : innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
191 9782 : PageGetItemId(parent->page, parent->offnum));
192 :
193 9782 : spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
194 :
195 9782 : MarkBufferDirty(parent->buffer);
196 9782 : }
197 :
198 : /*
199 : * Add a leaf tuple to a leaf page where there is known to be room for it
200 : */
201 : static void
202 768094 : addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
203 : SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
204 : {
205 : spgxlogAddLeaf xlrec;
206 :
207 768094 : xlrec.newPage = isNew;
208 768094 : xlrec.storesNulls = isNulls;
209 :
210 : /* these will be filled below as needed */
211 768094 : xlrec.offnumLeaf = InvalidOffsetNumber;
212 768094 : xlrec.offnumHeadLeaf = InvalidOffsetNumber;
213 768094 : xlrec.offnumParent = InvalidOffsetNumber;
214 768094 : xlrec.nodeI = 0;
215 :
216 768094 : START_CRIT_SECTION();
217 :
218 768094 : if (current->offnum == InvalidOffsetNumber ||
219 766060 : SpGistBlockIsRoot(current->blkno))
220 : {
221 : /* Tuple is not part of a chain */
222 19002 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
223 38004 : current->offnum = SpGistPageAddNewItem(state, current->page,
224 19002 : leafTuple, leafTuple->size,
225 : NULL, false);
226 :
227 19002 : xlrec.offnumLeaf = current->offnum;
228 :
229 : /* Must update parent's downlink if any */
230 19002 : if (parent->buffer != InvalidBuffer)
231 : {
232 2034 : xlrec.offnumParent = parent->offnum;
233 2034 : xlrec.nodeI = parent->node;
234 :
235 2034 : saveNodeLink(index, parent, current->blkno, current->offnum);
236 : }
237 : }
238 : else
239 : {
240 : /*
241 : * Tuple must be inserted into existing chain. We mustn't change the
242 : * chain's head address, but we don't need to chase the entire chain
243 : * to put the tuple at the end; we can insert it second.
244 : *
245 : * Also, it's possible that the "chain" consists only of a DEAD tuple,
246 : * in which case we should replace the DEAD tuple in-place.
247 : */
248 : SpGistLeafTuple head;
249 : OffsetNumber offnum;
250 :
251 749092 : head = (SpGistLeafTuple) PageGetItem(current->page,
252 749092 : PageGetItemId(current->page, current->offnum));
253 749092 : if (head->tupstate == SPGIST_LIVE)
254 : {
255 749092 : SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
256 749092 : offnum = SpGistPageAddNewItem(state, current->page,
257 749092 : leafTuple, leafTuple->size,
258 : NULL, false);
259 :
260 : /*
261 : * re-get head of list because it could have been moved on page,
262 : * and set new second element
263 : */
264 749092 : head = (SpGistLeafTuple) PageGetItem(current->page,
265 749092 : PageGetItemId(current->page, current->offnum));
266 749092 : SGLT_SET_NEXTOFFSET(head, offnum);
267 :
268 749092 : xlrec.offnumLeaf = offnum;
269 749092 : xlrec.offnumHeadLeaf = current->offnum;
270 : }
271 0 : else if (head->tupstate == SPGIST_DEAD)
272 : {
273 0 : SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
274 0 : PageIndexTupleDelete(current->page, current->offnum);
275 0 : if (PageAddItem(current->page,
276 : leafTuple, leafTuple->size,
277 0 : current->offnum, false, false) != current->offnum)
278 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
279 : leafTuple->size);
280 :
281 : /* WAL replay distinguishes this case by equal offnums */
282 0 : xlrec.offnumLeaf = current->offnum;
283 0 : xlrec.offnumHeadLeaf = current->offnum;
284 : }
285 : else
286 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
287 : }
288 :
289 768094 : MarkBufferDirty(current->buffer);
290 :
291 768094 : if (RelationNeedsWAL(index) && !state->isBuild)
292 : {
293 : XLogRecPtr recptr;
294 : int flags;
295 :
296 240628 : XLogBeginInsert();
297 240628 : XLogRegisterData(&xlrec, sizeof(xlrec));
298 240628 : XLogRegisterData(leafTuple, leafTuple->size);
299 :
300 240628 : flags = REGBUF_STANDARD;
301 240628 : if (xlrec.newPage)
302 4 : flags |= REGBUF_WILL_INIT;
303 240628 : XLogRegisterBuffer(0, current->buffer, flags);
304 240628 : if (xlrec.offnumParent != InvalidOffsetNumber)
305 830 : XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
306 :
307 240628 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
308 :
309 240628 : PageSetLSN(current->page, recptr);
310 :
311 : /* update parent only if we actually changed it */
312 240628 : if (xlrec.offnumParent != InvalidOffsetNumber)
313 : {
314 830 : PageSetLSN(parent->page, recptr);
315 : }
316 : }
317 :
318 768094 : END_CRIT_SECTION();
319 768094 : }
320 :
321 : /*
322 : * Count the number and total size of leaf tuples in the chain starting at
323 : * current->offnum. Return number into *nToSplit and total size as function
324 : * result.
325 : *
326 : * Klugy special case when considering the root page (i.e., root is a leaf
327 : * page, but we're about to split for the first time): return fake large
328 : * values to force spgdoinsert() to take the doPickSplit rather than
329 : * moveLeafs code path. moveLeafs is not prepared to deal with root page.
330 : */
331 : static int
332 7824 : checkSplitConditions(Relation index, SpGistState *state,
333 : SPPageDesc *current, int *nToSplit)
334 : {
335 : int i,
336 7824 : n = 0,
337 7824 : totalSize = 0;
338 :
339 7824 : if (SpGistBlockIsRoot(current->blkno))
340 : {
341 : /* return impossible values to force split */
342 82 : *nToSplit = BLCKSZ;
343 82 : return BLCKSZ;
344 : }
345 :
346 7742 : i = current->offnum;
347 772922 : while (i != InvalidOffsetNumber)
348 : {
349 : SpGistLeafTuple it;
350 :
351 : Assert(i >= FirstOffsetNumber &&
352 : i <= PageGetMaxOffsetNumber(current->page));
353 765180 : it = (SpGistLeafTuple) PageGetItem(current->page,
354 765180 : PageGetItemId(current->page, i));
355 765180 : if (it->tupstate == SPGIST_LIVE)
356 : {
357 765180 : n++;
358 765180 : totalSize += it->size + sizeof(ItemIdData);
359 : }
360 0 : else if (it->tupstate == SPGIST_DEAD)
361 : {
362 : /* We could see a DEAD tuple as first/only chain item */
363 : Assert(i == current->offnum);
364 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
365 : /* Don't count it in result, because it won't go to other page */
366 : }
367 : else
368 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
369 :
370 765180 : i = SGLT_GET_NEXTOFFSET(it);
371 : }
372 :
373 7742 : *nToSplit = n;
374 :
375 7742 : return totalSize;
376 : }
377 :
378 : /*
379 : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
380 : * but the chain has to be moved because there's not enough room to add
381 : * newLeafTuple to its page. We use this method when the chain contains
382 : * very little data so a split would be inefficient. We are sure we can
383 : * fit the chain plus newLeafTuple on one other page.
384 : */
385 : static void
386 2282 : moveLeafs(Relation index, SpGistState *state,
387 : SPPageDesc *current, SPPageDesc *parent,
388 : SpGistLeafTuple newLeafTuple, bool isNulls)
389 : {
390 : int i,
391 : nDelete,
392 : nInsert,
393 : size;
394 : Buffer nbuf;
395 : Page npage;
396 2282 : OffsetNumber r = InvalidOffsetNumber,
397 2282 : startOffset = InvalidOffsetNumber;
398 2282 : bool replaceDead = false;
399 : OffsetNumber *toDelete;
400 : OffsetNumber *toInsert;
401 : BlockNumber nblkno;
402 : spgxlogMoveLeafs xlrec;
403 : char *leafdata,
404 : *leafptr;
405 :
406 : /* This doesn't work on root page */
407 : Assert(parent->buffer != InvalidBuffer);
408 : Assert(parent->buffer != current->buffer);
409 :
410 : /* Locate the tuples to be moved, and count up the space needed */
411 2282 : i = PageGetMaxOffsetNumber(current->page);
412 2282 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
413 2282 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
414 :
415 2282 : size = newLeafTuple->size + sizeof(ItemIdData);
416 :
417 2282 : nDelete = 0;
418 2282 : i = current->offnum;
419 92186 : while (i != InvalidOffsetNumber)
420 : {
421 : SpGistLeafTuple it;
422 :
423 : Assert(i >= FirstOffsetNumber &&
424 : i <= PageGetMaxOffsetNumber(current->page));
425 89904 : it = (SpGistLeafTuple) PageGetItem(current->page,
426 89904 : PageGetItemId(current->page, i));
427 :
428 89904 : if (it->tupstate == SPGIST_LIVE)
429 : {
430 89904 : toDelete[nDelete] = i;
431 89904 : size += it->size + sizeof(ItemIdData);
432 89904 : nDelete++;
433 : }
434 0 : else if (it->tupstate == SPGIST_DEAD)
435 : {
436 : /* We could see a DEAD tuple as first/only chain item */
437 : Assert(i == current->offnum);
438 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
439 : /* We don't want to move it, so don't count it in size */
440 0 : toDelete[nDelete] = i;
441 0 : nDelete++;
442 0 : replaceDead = true;
443 : }
444 : else
445 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
446 :
447 89904 : i = SGLT_GET_NEXTOFFSET(it);
448 : }
449 :
450 : /* Find a leaf page that will hold them */
451 2282 : nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
452 : size, &xlrec.newPage);
453 2282 : npage = BufferGetPage(nbuf);
454 2282 : nblkno = BufferGetBlockNumber(nbuf);
455 : Assert(nblkno != current->blkno);
456 :
457 2282 : leafdata = leafptr = palloc(size);
458 :
459 2282 : START_CRIT_SECTION();
460 :
461 : /* copy all the old tuples to new page, unless they're dead */
462 2282 : nInsert = 0;
463 2282 : if (!replaceDead)
464 : {
465 92186 : for (i = 0; i < nDelete; i++)
466 : {
467 : SpGistLeafTuple it;
468 :
469 89904 : it = (SpGistLeafTuple) PageGetItem(current->page,
470 89904 : PageGetItemId(current->page, toDelete[i]));
471 : Assert(it->tupstate == SPGIST_LIVE);
472 :
473 : /*
474 : * Update chain link (notice the chain order gets reversed, but we
475 : * don't care). We're modifying the tuple on the source page
476 : * here, but it's okay since we're about to delete it.
477 : */
478 89904 : SGLT_SET_NEXTOFFSET(it, r);
479 :
480 89904 : r = SpGistPageAddNewItem(state, npage, it, it->size, &startOffset, false);
481 :
482 89904 : toInsert[nInsert] = r;
483 89904 : nInsert++;
484 :
485 : /* save modified tuple into leafdata as well */
486 89904 : memcpy(leafptr, it, it->size);
487 89904 : leafptr += it->size;
488 : }
489 : }
490 :
491 : /* add the new tuple as well */
492 2282 : SGLT_SET_NEXTOFFSET(newLeafTuple, r);
493 2282 : r = SpGistPageAddNewItem(state, npage, newLeafTuple, newLeafTuple->size, &startOffset, false);
494 2282 : toInsert[nInsert] = r;
495 2282 : nInsert++;
496 2282 : memcpy(leafptr, newLeafTuple, newLeafTuple->size);
497 2282 : leafptr += newLeafTuple->size;
498 :
499 : /*
500 : * Now delete the old tuples, leaving a redirection pointer behind for the
501 : * first one, unless we're doing an index build; in which case there can't
502 : * be any concurrent scan so we need not provide a redirect.
503 : */
504 2282 : spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
505 2282 : state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
506 : SPGIST_PLACEHOLDER,
507 : nblkno, r);
508 :
509 : /* Update parent's downlink and mark parent page dirty */
510 2282 : saveNodeLink(index, parent, nblkno, r);
511 :
512 : /* Mark the leaf pages too */
513 2282 : MarkBufferDirty(current->buffer);
514 2282 : MarkBufferDirty(nbuf);
515 :
516 2282 : if (RelationNeedsWAL(index) && !state->isBuild)
517 : {
518 : XLogRecPtr recptr;
519 :
520 : /* prepare WAL info */
521 528 : STORE_STATE(state, xlrec.stateSrc);
522 :
523 528 : xlrec.nMoves = nDelete;
524 528 : xlrec.replaceDead = replaceDead;
525 528 : xlrec.storesNulls = isNulls;
526 :
527 528 : xlrec.offnumParent = parent->offnum;
528 528 : xlrec.nodeI = parent->node;
529 :
530 528 : XLogBeginInsert();
531 528 : XLogRegisterData(&xlrec, SizeOfSpgxlogMoveLeafs);
532 528 : XLogRegisterData(toDelete,
533 : sizeof(OffsetNumber) * nDelete);
534 528 : XLogRegisterData(toInsert,
535 : sizeof(OffsetNumber) * nInsert);
536 528 : XLogRegisterData(leafdata, leafptr - leafdata);
537 :
538 528 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
539 528 : XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
540 528 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
541 :
542 528 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
543 :
544 528 : PageSetLSN(current->page, recptr);
545 528 : PageSetLSN(npage, recptr);
546 528 : PageSetLSN(parent->page, recptr);
547 : }
548 :
549 2282 : END_CRIT_SECTION();
550 :
551 : /* Update local free-space cache and release new buffer */
552 2282 : SpGistSetLastUsedPage(index, nbuf);
553 2282 : UnlockReleaseBuffer(nbuf);
554 2282 : }
555 :
556 : /*
557 : * Update previously-created redirection tuple with appropriate destination
558 : *
559 : * We use this when it's not convenient to know the destination first.
560 : * The tuple should have been made with the "impossible" destination of
561 : * the metapage.
562 : */
563 : static void
564 1276 : setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
565 : BlockNumber blkno, OffsetNumber offnum)
566 : {
567 : SpGistDeadTuple dt;
568 :
569 1276 : dt = (SpGistDeadTuple) PageGetItem(current->page,
570 1276 : PageGetItemId(current->page, position));
571 : Assert(dt->tupstate == SPGIST_REDIRECT);
572 : Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
573 1276 : ItemPointerSet(&dt->pointer, blkno, offnum);
574 1276 : }
575 :
576 : /*
577 : * Test to see if the user-defined picksplit function failed to do its job,
578 : * ie, it put all the leaf tuples into the same node.
579 : * If so, randomly divide the tuples into several nodes (all with the same
580 : * label) and return true to select allTheSame mode for this inner tuple.
581 : *
582 : * (This code is also used to forcibly select allTheSame mode for nulls.)
583 : *
584 : * If we know that the leaf tuples wouldn't all fit on one page, then we
585 : * exclude the last tuple (which is the incoming new tuple that forced a split)
586 : * from the check to see if more than one node is used. The reason for this
587 : * is that if the existing tuples are put into only one chain, then even if
588 : * we move them all to an empty page, there would still not be room for the
589 : * new tuple, so we'd get into an infinite loop of picksplit attempts.
590 : * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
591 : * be split across pages. (Exercise for the reader: figure out why this
592 : * fixes the problem even when there is only one old tuple.)
593 : */
594 : static bool
595 5542 : checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
596 : bool *includeNew)
597 : {
598 : int theNode;
599 : int limit;
600 : int i;
601 :
602 : /* For the moment, assume we can include the new leaf tuple */
603 5542 : *includeNew = true;
604 :
605 : /* If there's only the new leaf tuple, don't select allTheSame mode */
606 5542 : if (in->nTuples <= 1)
607 44 : return false;
608 :
609 : /* If tuple set doesn't fit on one page, ignore the new tuple in test */
610 5498 : limit = tooBig ? in->nTuples - 1 : in->nTuples;
611 :
612 : /* Check to see if more than one node is populated */
613 5498 : theNode = out->mapTuplesToNodes[0];
614 115928 : for (i = 1; i < limit; i++)
615 : {
616 115624 : if (out->mapTuplesToNodes[i] != theNode)
617 5194 : return false;
618 : }
619 :
620 : /* Nope, so override the picksplit function's decisions */
621 :
622 : /* If the new tuple is in its own node, it can't be included in split */
623 304 : if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
624 0 : *includeNew = false;
625 :
626 304 : out->nNodes = 8; /* arbitrary number of child nodes */
627 :
628 : /* Random assignment of tuples to nodes (note we include new tuple) */
629 32034 : for (i = 0; i < in->nTuples; i++)
630 31730 : out->mapTuplesToNodes[i] = i % out->nNodes;
631 :
632 : /* The opclass may not use node labels, but if it does, duplicate 'em */
633 304 : if (out->nodeLabels)
634 : {
635 48 : Datum theLabel = out->nodeLabels[theNode];
636 :
637 48 : out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
638 432 : for (i = 0; i < out->nNodes; i++)
639 384 : out->nodeLabels[i] = theLabel;
640 : }
641 :
642 : /* We don't touch the prefix or the leaf tuple datum assignments */
643 :
644 304 : return true;
645 : }
646 :
647 : /*
648 : * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
649 : * but the chain has to be split because there's not enough room to add
650 : * newLeafTuple to its page.
651 : *
652 : * This function splits the leaf tuple set according to picksplit's rules,
653 : * creating one or more new chains that are spread across the current page
654 : * and an additional leaf page (we assume that two leaf pages will be
655 : * sufficient). A new inner tuple is created, and the parent downlink
656 : * pointer is updated to point to that inner tuple instead of the leaf chain.
657 : *
658 : * On exit, current contains the address of the new inner tuple.
659 : *
660 : * Returns true if we successfully inserted newLeafTuple during this function,
661 : * false if caller still has to do it (meaning another picksplit operation is
662 : * probably needed). Failure could occur if the picksplit result is fairly
663 : * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
664 : * Because we force the picksplit result to be at least two chains, each
665 : * cycle will get rid of at least one leaf tuple from the chain, so the loop
666 : * will eventually terminate if lack of balance is the issue. If the tuple
667 : * is too big, we assume that repeated picksplit operations will eventually
668 : * make it small enough by repeated prefix-stripping. A broken opclass could
669 : * make this an infinite loop, though, so spgdoinsert() checks that the
670 : * leaf datums get smaller each time.
671 : */
672 : static bool
673 5542 : doPickSplit(Relation index, SpGistState *state,
674 : SPPageDesc *current, SPPageDesc *parent,
675 : SpGistLeafTuple newLeafTuple,
676 : int level, bool isNulls, bool isNew)
677 : {
678 5542 : bool insertedNew = false;
679 : spgPickSplitIn in;
680 : spgPickSplitOut out;
681 : FmgrInfo *procinfo;
682 : bool includeNew;
683 : int i,
684 : max,
685 : n;
686 : SpGistInnerTuple innerTuple;
687 : SpGistNodeTuple node,
688 : *nodes;
689 : Buffer newInnerBuffer,
690 : newLeafBuffer;
691 : uint8 *leafPageSelect;
692 : int *leafSizes;
693 : OffsetNumber *toDelete;
694 : OffsetNumber *toInsert;
695 5542 : OffsetNumber redirectTuplePos = InvalidOffsetNumber;
696 : OffsetNumber startOffsets[2];
697 : SpGistLeafTuple *oldLeafs;
698 : SpGistLeafTuple *newLeafs;
699 : Datum leafDatums[INDEX_MAX_KEYS];
700 : bool leafIsnulls[INDEX_MAX_KEYS];
701 : int spaceToDelete;
702 : int currentFreeSpace;
703 : int totalLeafSizes;
704 : bool allTheSame;
705 : spgxlogPickSplit xlrec;
706 : char *leafdata,
707 : *leafptr;
708 : SPPageDesc saveCurrent;
709 : int nToDelete,
710 : nToInsert,
711 : maxToInclude;
712 :
713 5542 : in.level = level;
714 :
715 : /*
716 : * Allocate per-leaf-tuple work arrays with max possible size
717 : */
718 5542 : max = PageGetMaxOffsetNumber(current->page);
719 5542 : n = max + 1;
720 5542 : in.datums = (Datum *) palloc(sizeof(Datum) * n);
721 5542 : toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
722 5542 : toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
723 5542 : oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
724 5542 : newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
725 5542 : leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
726 :
727 5542 : STORE_STATE(state, xlrec.stateSrc);
728 :
729 : /*
730 : * Form list of leaf tuples which will be distributed as split result;
731 : * also, count up the amount of space that will be freed from current.
732 : * (Note that in the non-root case, we won't actually delete the old
733 : * tuples, only replace them with redirects or placeholders.)
734 : */
735 5542 : nToInsert = 0;
736 5542 : nToDelete = 0;
737 5542 : spaceToDelete = 0;
738 5542 : if (SpGistBlockIsRoot(current->blkno))
739 : {
740 : /*
741 : * We are splitting the root (which up to now is also a leaf page).
742 : * Its tuples are not linked, so scan sequentially to get them all. We
743 : * ignore the original value of current->offnum.
744 : */
745 15418 : for (i = FirstOffsetNumber; i <= max; i++)
746 : {
747 : SpGistLeafTuple it;
748 :
749 15336 : it = (SpGistLeafTuple) PageGetItem(current->page,
750 15336 : PageGetItemId(current->page, i));
751 15336 : if (it->tupstate == SPGIST_LIVE)
752 : {
753 15336 : in.datums[nToInsert] =
754 15336 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
755 15336 : oldLeafs[nToInsert] = it;
756 15336 : nToInsert++;
757 15336 : toDelete[nToDelete] = i;
758 15336 : nToDelete++;
759 : /* we will delete the tuple altogether, so count full space */
760 15336 : spaceToDelete += it->size + sizeof(ItemIdData);
761 : }
762 : else /* tuples on root should be live */
763 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
764 : }
765 : }
766 : else
767 : {
768 : /* Normal case, just collect the leaf tuples in the chain */
769 5460 : i = current->offnum;
770 680736 : while (i != InvalidOffsetNumber)
771 : {
772 : SpGistLeafTuple it;
773 :
774 : Assert(i >= FirstOffsetNumber && i <= max);
775 675276 : it = (SpGistLeafTuple) PageGetItem(current->page,
776 675276 : PageGetItemId(current->page, i));
777 675276 : if (it->tupstate == SPGIST_LIVE)
778 : {
779 675276 : in.datums[nToInsert] =
780 675276 : isNulls ? (Datum) 0 : SGLTDATUM(it, state);
781 675276 : oldLeafs[nToInsert] = it;
782 675276 : nToInsert++;
783 675276 : toDelete[nToDelete] = i;
784 675276 : nToDelete++;
785 : /* we will not delete the tuple, only replace with dead */
786 : Assert(it->size >= SGDTSIZE);
787 675276 : spaceToDelete += it->size - SGDTSIZE;
788 : }
789 0 : else if (it->tupstate == SPGIST_DEAD)
790 : {
791 : /* We could see a DEAD tuple as first/only chain item */
792 : Assert(i == current->offnum);
793 : Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
794 0 : toDelete[nToDelete] = i;
795 0 : nToDelete++;
796 : /* replacing it with redirect will save no space */
797 : }
798 : else
799 0 : elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
800 :
801 675276 : i = SGLT_GET_NEXTOFFSET(it);
802 : }
803 : }
804 5542 : in.nTuples = nToInsert;
805 :
806 : /*
807 : * We may not actually insert new tuple because another picksplit may be
808 : * necessary due to too large value, but we will try to allocate enough
809 : * space to include it; and in any case it has to be included in the input
810 : * for the picksplit function. So don't increment nToInsert yet.
811 : */
812 5542 : in.datums[in.nTuples] =
813 5542 : isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
814 5542 : oldLeafs[in.nTuples] = newLeafTuple;
815 5542 : in.nTuples++;
816 :
817 5542 : memset(&out, 0, sizeof(out));
818 :
819 5542 : if (!isNulls)
820 : {
821 : /*
822 : * Perform split using user-defined method.
823 : */
824 5542 : procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
825 5542 : FunctionCall2Coll(procinfo,
826 5542 : index->rd_indcollation[0],
827 : PointerGetDatum(&in),
828 : PointerGetDatum(&out));
829 :
830 : /*
831 : * Form new leaf tuples and count up the total space needed.
832 : */
833 5542 : totalLeafSizes = 0;
834 701696 : for (i = 0; i < in.nTuples; i++)
835 : {
836 696154 : if (state->leafTupDesc->natts > 1)
837 62566 : spgDeformLeafTuple(oldLeafs[i],
838 : state->leafTupDesc,
839 : leafDatums,
840 : leafIsnulls,
841 : isNulls);
842 :
843 696154 : leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
844 696154 : leafIsnulls[spgKeyColumn] = false;
845 :
846 696154 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
847 : leafDatums,
848 : leafIsnulls);
849 696154 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
850 : }
851 : }
852 : else
853 : {
854 : /*
855 : * Perform dummy split that puts all tuples into one node.
856 : * checkAllTheSame will override this and force allTheSame mode.
857 : */
858 0 : out.hasPrefix = false;
859 0 : out.nNodes = 1;
860 0 : out.nodeLabels = NULL;
861 0 : out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
862 :
863 : /*
864 : * Form new leaf tuples and count up the total space needed.
865 : */
866 0 : totalLeafSizes = 0;
867 0 : for (i = 0; i < in.nTuples; i++)
868 : {
869 0 : if (state->leafTupDesc->natts > 1)
870 0 : spgDeformLeafTuple(oldLeafs[i],
871 : state->leafTupDesc,
872 : leafDatums,
873 : leafIsnulls,
874 : isNulls);
875 :
876 : /*
877 : * Nulls tree can contain only null key values.
878 : */
879 0 : leafDatums[spgKeyColumn] = (Datum) 0;
880 0 : leafIsnulls[spgKeyColumn] = true;
881 :
882 0 : newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
883 : leafDatums,
884 : leafIsnulls);
885 0 : totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
886 : }
887 : }
888 :
889 : /*
890 : * Check to see if the picksplit function failed to separate the values,
891 : * ie, it put them all into the same child node. If so, select allTheSame
892 : * mode and create a random split instead. See comments for
893 : * checkAllTheSame as to why we need to know if the new leaf tuples could
894 : * fit on one page.
895 : */
896 5542 : allTheSame = checkAllTheSame(&in, &out,
897 : totalLeafSizes > SPGIST_PAGE_CAPACITY,
898 : &includeNew);
899 :
900 : /*
901 : * If checkAllTheSame decided we must exclude the new tuple, don't
902 : * consider it any further.
903 : */
904 5542 : if (includeNew)
905 5542 : maxToInclude = in.nTuples;
906 : else
907 : {
908 0 : maxToInclude = in.nTuples - 1;
909 0 : totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
910 : }
911 :
912 : /*
913 : * Allocate per-node work arrays. Since checkAllTheSame could replace
914 : * out.nNodes with a value larger than the number of tuples on the input
915 : * page, we can't allocate these arrays before here.
916 : */
917 5542 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
918 5542 : leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
919 :
920 : /*
921 : * Form nodes of inner tuple and inner tuple itself
922 : */
923 43380 : for (i = 0; i < out.nNodes; i++)
924 : {
925 37838 : Datum label = (Datum) 0;
926 37838 : bool labelisnull = (out.nodeLabels == NULL);
927 :
928 37838 : if (!labelisnull)
929 3526 : label = out.nodeLabels[i];
930 37838 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
931 : }
932 5542 : innerTuple = spgFormInnerTuple(state,
933 5542 : out.hasPrefix, out.prefixDatum,
934 : out.nNodes, nodes);
935 5542 : innerTuple->allTheSame = allTheSame;
936 :
937 : /*
938 : * Update nodes[] array to point into the newly formed innerTuple, so that
939 : * we can adjust their downlinks below.
940 : */
941 43380 : SGITITERATE(innerTuple, i, node)
942 : {
943 37838 : nodes[i] = node;
944 : }
945 :
946 : /*
947 : * Re-scan new leaf tuples and count up the space needed under each node.
948 : */
949 701696 : for (i = 0; i < maxToInclude; i++)
950 : {
951 696154 : n = out.mapTuplesToNodes[i];
952 696154 : if (n < 0 || n >= out.nNodes)
953 0 : elog(ERROR, "inconsistent result of SPGiST picksplit function");
954 696154 : leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
955 : }
956 :
957 : /*
958 : * To perform the split, we must insert a new inner tuple, which can't go
959 : * on a leaf page; and unless we are splitting the root page, we must then
960 : * update the parent tuple's downlink to point to the inner tuple. If
961 : * there is room, we'll put the new inner tuple on the same page as the
962 : * parent tuple, otherwise we need another non-leaf buffer. But if the
963 : * parent page is the root, we can't add the new inner tuple there,
964 : * because the root page must have only one inner tuple.
965 : */
966 5542 : xlrec.initInner = false;
967 5542 : if (parent->buffer != InvalidBuffer &&
968 5460 : !SpGistBlockIsRoot(parent->blkno) &&
969 5100 : (SpGistPageGetFreeSpace(parent->page, 1) >=
970 5100 : innerTuple->size + sizeof(ItemIdData)))
971 : {
972 : /* New inner tuple will fit on parent page */
973 4692 : newInnerBuffer = parent->buffer;
974 : }
975 850 : else if (parent->buffer != InvalidBuffer)
976 : {
977 : /* Send tuple to page with next triple parity (see README) */
978 1536 : newInnerBuffer = SpGistGetBuffer(index,
979 768 : GBUF_INNER_PARITY(parent->blkno + 1) |
980 768 : (isNulls ? GBUF_NULLS : 0),
981 768 : innerTuple->size + sizeof(ItemIdData),
982 : &xlrec.initInner);
983 : }
984 : else
985 : {
986 : /* Root page split ... inner tuple will go to root page */
987 82 : newInnerBuffer = InvalidBuffer;
988 : }
989 :
990 : /*
991 : * The new leaf tuples converted from the existing ones should require the
992 : * same or less space, and therefore should all fit onto one page
993 : * (although that's not necessarily the current page, since we can't
994 : * delete the old tuples but only replace them with placeholders).
995 : * However, the incoming new tuple might not also fit, in which case we
996 : * might need another picksplit cycle to reduce it some more.
997 : *
998 : * If there's not room to put everything back onto the current page, then
999 : * we decide on a per-node basis which tuples go to the new page. (We do
1000 : * it like that because leaf tuple chains can't cross pages, so we must
1001 : * place all leaf tuples belonging to the same parent node on the same
1002 : * page.)
1003 : *
1004 : * If we are splitting the root page (turning it from a leaf page into an
1005 : * inner page), then no leaf tuples can go back to the current page; they
1006 : * must all go somewhere else.
1007 : */
1008 5542 : if (!SpGistBlockIsRoot(current->blkno))
1009 5460 : currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1010 : else
1011 82 : currentFreeSpace = 0; /* prevent assigning any tuples to current */
1012 :
1013 5542 : xlrec.initDest = false;
1014 :
1015 5542 : if (totalLeafSizes <= currentFreeSpace)
1016 : {
1017 : /* All the leaf tuples will fit on current page */
1018 24 : newLeafBuffer = InvalidBuffer;
1019 : /* mark new leaf tuple as included in insertions, if allowed */
1020 24 : if (includeNew)
1021 : {
1022 24 : nToInsert++;
1023 24 : insertedNew = true;
1024 : }
1025 2970 : for (i = 0; i < nToInsert; i++)
1026 2946 : leafPageSelect[i] = 0; /* signifies current page */
1027 : }
1028 5518 : else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1029 : {
1030 : /*
1031 : * We're trying to split up a long value by repeated suffixing, but
1032 : * it's not going to fit yet. Don't bother allocating a second leaf
1033 : * buffer that we won't be able to use.
1034 : */
1035 44 : newLeafBuffer = InvalidBuffer;
1036 : Assert(includeNew);
1037 44 : Assert(nToInsert == 0);
1038 : }
1039 : else
1040 : {
1041 : /* We will need another leaf page */
1042 : uint8 *nodePageSelect;
1043 : int curspace;
1044 : int newspace;
1045 :
1046 5474 : newLeafBuffer = SpGistGetBuffer(index,
1047 : GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1048 5474 : Min(totalLeafSizes,
1049 : SPGIST_PAGE_CAPACITY),
1050 : &xlrec.initDest);
1051 :
1052 : /*
1053 : * Attempt to assign node groups to the two pages. We might fail to
1054 : * do so, even if totalLeafSizes is less than the available space,
1055 : * because we can't split a group across pages.
1056 : */
1057 5474 : nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1058 :
1059 5474 : curspace = currentFreeSpace;
1060 5474 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1061 43220 : for (i = 0; i < out.nNodes; i++)
1062 : {
1063 37746 : if (leafSizes[i] <= curspace)
1064 : {
1065 24734 : nodePageSelect[i] = 0; /* signifies current page */
1066 24734 : curspace -= leafSizes[i];
1067 : }
1068 : else
1069 : {
1070 13012 : nodePageSelect[i] = 1; /* signifies new leaf page */
1071 13012 : newspace -= leafSizes[i];
1072 : }
1073 : }
1074 5474 : if (curspace >= 0 && newspace >= 0)
1075 : {
1076 : /* Successful assignment, so we can include the new leaf tuple */
1077 5244 : if (includeNew)
1078 : {
1079 5244 : nToInsert++;
1080 5244 : insertedNew = true;
1081 : }
1082 : }
1083 230 : else if (includeNew)
1084 : {
1085 : /* We must exclude the new leaf tuple from the split */
1086 230 : int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1087 :
1088 230 : leafSizes[nodeOfNewTuple] -=
1089 230 : newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1090 :
1091 : /* Repeat the node assignment process --- should succeed now */
1092 230 : curspace = currentFreeSpace;
1093 230 : newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1094 1114 : for (i = 0; i < out.nNodes; i++)
1095 : {
1096 884 : if (leafSizes[i] <= curspace)
1097 : {
1098 288 : nodePageSelect[i] = 0; /* signifies current page */
1099 288 : curspace -= leafSizes[i];
1100 : }
1101 : else
1102 : {
1103 596 : nodePageSelect[i] = 1; /* signifies new leaf page */
1104 596 : newspace -= leafSizes[i];
1105 : }
1106 : }
1107 230 : if (curspace < 0 || newspace < 0)
1108 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1109 : }
1110 : else
1111 : {
1112 : /* oops, we already excluded new tuple ... should not get here */
1113 0 : elog(ERROR, "failed to divide leaf tuple groups across pages");
1114 : }
1115 : /* Expand the per-node assignments to be shown per leaf tuple */
1116 698408 : for (i = 0; i < nToInsert; i++)
1117 : {
1118 692934 : n = out.mapTuplesToNodes[i];
1119 692934 : leafPageSelect[i] = nodePageSelect[n];
1120 : }
1121 : }
1122 :
1123 : /* Start preparing WAL record */
1124 5542 : xlrec.nDelete = 0;
1125 5542 : xlrec.initSrc = isNew;
1126 5542 : xlrec.storesNulls = isNulls;
1127 5542 : xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1128 :
1129 5542 : leafdata = leafptr = (char *) palloc(totalLeafSizes);
1130 :
1131 : /* Here we begin making the changes to the target pages */
1132 5542 : START_CRIT_SECTION();
1133 :
1134 : /*
1135 : * Delete old leaf tuples from current buffer, except when we're splitting
1136 : * the root; in that case there's no need because we'll re-init the page
1137 : * below. We do this first to make room for reinserting new leaf tuples.
1138 : */
1139 5542 : if (!SpGistBlockIsRoot(current->blkno))
1140 : {
1141 : /*
1142 : * Init buffer instead of deleting individual tuples, but only if
1143 : * there aren't any other live tuples and only during build; otherwise
1144 : * we need to set a redirection tuple for concurrent scans.
1145 : */
1146 5460 : if (state->isBuild &&
1147 4140 : nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1148 4140 : PageGetMaxOffsetNumber(current->page))
1149 : {
1150 312 : SpGistInitBuffer(current->buffer,
1151 : SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1152 312 : xlrec.initSrc = true;
1153 : }
1154 5148 : else if (isNew)
1155 : {
1156 : /* don't expose the freshly init'd buffer as a backup block */
1157 : Assert(nToDelete == 0);
1158 : }
1159 : else
1160 : {
1161 5104 : xlrec.nDelete = nToDelete;
1162 :
1163 5104 : if (!state->isBuild)
1164 : {
1165 : /*
1166 : * Need to create redirect tuple (it will point to new inner
1167 : * tuple) but right now the new tuple's location is not known
1168 : * yet. So, set the redirection pointer to "impossible" value
1169 : * and remember its position to update tuple later.
1170 : */
1171 1276 : if (nToDelete > 0)
1172 1276 : redirectTuplePos = toDelete[0];
1173 1276 : spgPageIndexMultiDelete(state, current->page,
1174 : toDelete, nToDelete,
1175 : SPGIST_REDIRECT,
1176 : SPGIST_PLACEHOLDER,
1177 : SPGIST_METAPAGE_BLKNO,
1178 : FirstOffsetNumber);
1179 : }
1180 : else
1181 : {
1182 : /*
1183 : * During index build there is not concurrent searches, so we
1184 : * don't need to create redirection tuple.
1185 : */
1186 3828 : spgPageIndexMultiDelete(state, current->page,
1187 : toDelete, nToDelete,
1188 : SPGIST_PLACEHOLDER,
1189 : SPGIST_PLACEHOLDER,
1190 : InvalidBlockNumber,
1191 : InvalidOffsetNumber);
1192 : }
1193 : }
1194 : }
1195 :
1196 : /*
1197 : * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1198 : * nodes.
1199 : */
1200 5542 : startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1201 701422 : for (i = 0; i < nToInsert; i++)
1202 : {
1203 695880 : SpGistLeafTuple it = newLeafs[i];
1204 : Buffer leafBuffer;
1205 : BlockNumber leafBlock;
1206 : OffsetNumber newoffset;
1207 :
1208 : /* Which page is it going to? */
1209 695880 : leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1210 695880 : leafBlock = BufferGetBlockNumber(leafBuffer);
1211 :
1212 : /* Link tuple into correct chain for its node */
1213 695880 : n = out.mapTuplesToNodes[i];
1214 :
1215 695880 : if (ItemPointerIsValid(&nodes[n]->t_tid))
1216 : {
1217 : Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1218 674352 : SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1219 : }
1220 : else
1221 21528 : SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
1222 :
1223 : /* Insert it on page */
1224 695880 : newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1225 695880 : it, it->size,
1226 695880 : &startOffsets[leafPageSelect[i]],
1227 : false);
1228 695880 : toInsert[i] = newoffset;
1229 :
1230 : /* ... and complete the chain linking */
1231 695880 : ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1232 :
1233 : /* Also copy leaf tuple into WAL data */
1234 695880 : memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1235 695880 : leafptr += newLeafs[i]->size;
1236 : }
1237 :
1238 : /*
1239 : * We're done modifying the other leaf buffer (if any), so mark it dirty.
1240 : * current->buffer will be marked below, after we're entirely done
1241 : * modifying it.
1242 : */
1243 5542 : if (newLeafBuffer != InvalidBuffer)
1244 : {
1245 5474 : MarkBufferDirty(newLeafBuffer);
1246 : }
1247 :
1248 : /* Remember current buffer, since we're about to change "current" */
1249 5542 : saveCurrent = *current;
1250 :
1251 : /*
1252 : * Store the new innerTuple
1253 : */
1254 5542 : if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1255 : {
1256 : /*
1257 : * new inner tuple goes to parent page
1258 : */
1259 : Assert(current->buffer != parent->buffer);
1260 :
1261 : /* Repoint "current" at the new inner tuple */
1262 4692 : current->blkno = parent->blkno;
1263 4692 : current->buffer = parent->buffer;
1264 4692 : current->page = parent->page;
1265 4692 : xlrec.offnumInner = current->offnum =
1266 4692 : SpGistPageAddNewItem(state, current->page,
1267 4692 : innerTuple, innerTuple->size,
1268 : NULL, false);
1269 :
1270 : /*
1271 : * Update parent node link and mark parent page dirty
1272 : */
1273 4692 : xlrec.innerIsParent = true;
1274 4692 : xlrec.offnumParent = parent->offnum;
1275 4692 : xlrec.nodeI = parent->node;
1276 4692 : saveNodeLink(index, parent, current->blkno, current->offnum);
1277 :
1278 : /*
1279 : * Update redirection link (in old current buffer)
1280 : */
1281 4692 : if (redirectTuplePos != InvalidOffsetNumber)
1282 1150 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1283 1150 : current->blkno, current->offnum);
1284 :
1285 : /* Done modifying old current buffer, mark it dirty */
1286 4692 : MarkBufferDirty(saveCurrent.buffer);
1287 : }
1288 850 : else if (parent->buffer != InvalidBuffer)
1289 : {
1290 : /*
1291 : * new inner tuple will be stored on a new page
1292 : */
1293 : Assert(newInnerBuffer != InvalidBuffer);
1294 :
1295 : /* Repoint "current" at the new inner tuple */
1296 768 : current->buffer = newInnerBuffer;
1297 768 : current->blkno = BufferGetBlockNumber(current->buffer);
1298 768 : current->page = BufferGetPage(current->buffer);
1299 768 : xlrec.offnumInner = current->offnum =
1300 768 : SpGistPageAddNewItem(state, current->page,
1301 768 : innerTuple, innerTuple->size,
1302 : NULL, false);
1303 :
1304 : /* Done modifying new current buffer, mark it dirty */
1305 768 : MarkBufferDirty(current->buffer);
1306 :
1307 : /*
1308 : * Update parent node link and mark parent page dirty
1309 : */
1310 768 : xlrec.innerIsParent = (parent->buffer == current->buffer);
1311 768 : xlrec.offnumParent = parent->offnum;
1312 768 : xlrec.nodeI = parent->node;
1313 768 : saveNodeLink(index, parent, current->blkno, current->offnum);
1314 :
1315 : /*
1316 : * Update redirection link (in old current buffer)
1317 : */
1318 768 : if (redirectTuplePos != InvalidOffsetNumber)
1319 126 : setRedirectionTuple(&saveCurrent, redirectTuplePos,
1320 126 : current->blkno, current->offnum);
1321 :
1322 : /* Done modifying old current buffer, mark it dirty */
1323 768 : MarkBufferDirty(saveCurrent.buffer);
1324 : }
1325 : else
1326 : {
1327 : /*
1328 : * Splitting root page, which was a leaf but now becomes inner page
1329 : * (and so "current" continues to point at it)
1330 : */
1331 : Assert(SpGistBlockIsRoot(current->blkno));
1332 : Assert(redirectTuplePos == InvalidOffsetNumber);
1333 :
1334 82 : SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1335 82 : xlrec.initInner = true;
1336 82 : xlrec.innerIsParent = false;
1337 :
1338 82 : xlrec.offnumInner = current->offnum =
1339 82 : PageAddItem(current->page, innerTuple, innerTuple->size,
1340 : InvalidOffsetNumber, false, false);
1341 82 : if (current->offnum != FirstOffsetNumber)
1342 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1343 : innerTuple->size);
1344 :
1345 : /* No parent link to update, nor redirection to do */
1346 82 : xlrec.offnumParent = InvalidOffsetNumber;
1347 82 : xlrec.nodeI = 0;
1348 :
1349 : /* Done modifying new current buffer, mark it dirty */
1350 82 : MarkBufferDirty(current->buffer);
1351 :
1352 : /* saveCurrent doesn't represent a different buffer */
1353 82 : saveCurrent.buffer = InvalidBuffer;
1354 : }
1355 :
1356 5542 : if (RelationNeedsWAL(index) && !state->isBuild)
1357 : {
1358 : XLogRecPtr recptr;
1359 : int flags;
1360 :
1361 1340 : XLogBeginInsert();
1362 :
1363 1340 : xlrec.nInsert = nToInsert;
1364 1340 : XLogRegisterData(&xlrec, SizeOfSpgxlogPickSplit);
1365 :
1366 1340 : XLogRegisterData(toDelete,
1367 1340 : sizeof(OffsetNumber) * xlrec.nDelete);
1368 1340 : XLogRegisterData(toInsert,
1369 1340 : sizeof(OffsetNumber) * xlrec.nInsert);
1370 1340 : XLogRegisterData(leafPageSelect,
1371 1340 : sizeof(uint8) * xlrec.nInsert);
1372 1340 : XLogRegisterData(innerTuple, innerTuple->size);
1373 1340 : XLogRegisterData(leafdata, leafptr - leafdata);
1374 :
1375 : /* Old leaf page */
1376 1340 : if (BufferIsValid(saveCurrent.buffer))
1377 : {
1378 1320 : flags = REGBUF_STANDARD;
1379 1320 : if (xlrec.initSrc)
1380 44 : flags |= REGBUF_WILL_INIT;
1381 1320 : XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1382 : }
1383 :
1384 : /* New leaf page */
1385 1340 : if (BufferIsValid(newLeafBuffer))
1386 : {
1387 1296 : flags = REGBUF_STANDARD;
1388 1296 : if (xlrec.initDest)
1389 1212 : flags |= REGBUF_WILL_INIT;
1390 1296 : XLogRegisterBuffer(1, newLeafBuffer, flags);
1391 : }
1392 :
1393 : /* Inner page */
1394 1340 : flags = REGBUF_STANDARD;
1395 1340 : if (xlrec.initInner)
1396 40 : flags |= REGBUF_WILL_INIT;
1397 1340 : XLogRegisterBuffer(2, current->buffer, flags);
1398 :
1399 : /* Parent page, if different from inner page */
1400 1340 : if (parent->buffer != InvalidBuffer)
1401 : {
1402 1320 : if (parent->buffer != current->buffer)
1403 126 : XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1404 : else
1405 : Assert(xlrec.innerIsParent);
1406 : }
1407 :
1408 : /* Issue the WAL record */
1409 1340 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1410 :
1411 : /* Update page LSNs on all affected pages */
1412 1340 : if (newLeafBuffer != InvalidBuffer)
1413 : {
1414 1296 : Page page = BufferGetPage(newLeafBuffer);
1415 :
1416 1296 : PageSetLSN(page, recptr);
1417 : }
1418 :
1419 1340 : if (saveCurrent.buffer != InvalidBuffer)
1420 : {
1421 1320 : Page page = BufferGetPage(saveCurrent.buffer);
1422 :
1423 1320 : PageSetLSN(page, recptr);
1424 : }
1425 :
1426 1340 : PageSetLSN(current->page, recptr);
1427 :
1428 1340 : if (parent->buffer != InvalidBuffer)
1429 : {
1430 1320 : PageSetLSN(parent->page, recptr);
1431 : }
1432 : }
1433 :
1434 5542 : END_CRIT_SECTION();
1435 :
1436 : /* Update local free-space cache and unlock buffers */
1437 5542 : if (newLeafBuffer != InvalidBuffer)
1438 : {
1439 5474 : SpGistSetLastUsedPage(index, newLeafBuffer);
1440 5474 : UnlockReleaseBuffer(newLeafBuffer);
1441 : }
1442 5542 : if (saveCurrent.buffer != InvalidBuffer)
1443 : {
1444 5460 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1445 5460 : UnlockReleaseBuffer(saveCurrent.buffer);
1446 : }
1447 :
1448 5542 : return insertedNew;
1449 : }
1450 :
1451 : /*
1452 : * spgMatchNode action: descend to N'th child node of current inner tuple
1453 : */
1454 : static void
1455 16338674 : spgMatchNodeAction(Relation index, SpGistState *state,
1456 : SpGistInnerTuple innerTuple,
1457 : SPPageDesc *current, SPPageDesc *parent, int nodeN)
1458 : {
1459 : int i;
1460 : SpGistNodeTuple node;
1461 :
1462 : /* Release previous parent buffer if any */
1463 16338674 : if (parent->buffer != InvalidBuffer &&
1464 15579918 : parent->buffer != current->buffer)
1465 : {
1466 754058 : SpGistSetLastUsedPage(index, parent->buffer);
1467 754058 : UnlockReleaseBuffer(parent->buffer);
1468 : }
1469 :
1470 : /* Repoint parent to specified node of current inner tuple */
1471 16338674 : parent->blkno = current->blkno;
1472 16338674 : parent->buffer = current->buffer;
1473 16338674 : parent->page = current->page;
1474 16338674 : parent->offnum = current->offnum;
1475 16338674 : parent->node = nodeN;
1476 :
1477 : /* Locate that node */
1478 28906328 : SGITITERATE(innerTuple, i, node)
1479 : {
1480 28906328 : if (i == nodeN)
1481 16338674 : break;
1482 : }
1483 :
1484 16338674 : if (i != nodeN)
1485 0 : elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1486 : nodeN);
1487 :
1488 : /* Point current to the downlink location, if any */
1489 16338674 : if (ItemPointerIsValid(&node->t_tid))
1490 : {
1491 16336592 : current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1492 16336592 : current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1493 : }
1494 : else
1495 : {
1496 : /* Downlink is empty, so we'll need to find a new page */
1497 2082 : current->blkno = InvalidBlockNumber;
1498 2082 : current->offnum = InvalidOffsetNumber;
1499 : }
1500 :
1501 16338674 : current->buffer = InvalidBuffer;
1502 16338674 : current->page = NULL;
1503 16338674 : }
1504 :
1505 : /*
1506 : * spgAddNode action: add a node to the inner tuple at current
1507 : */
1508 : static void
1509 1582 : spgAddNodeAction(Relation index, SpGistState *state,
1510 : SpGistInnerTuple innerTuple,
1511 : SPPageDesc *current, SPPageDesc *parent,
1512 : int nodeN, Datum nodeLabel)
1513 : {
1514 : SpGistInnerTuple newInnerTuple;
1515 : spgxlogAddNode xlrec;
1516 :
1517 : /* Should not be applied to nulls */
1518 : Assert(!SpGistPageStoresNulls(current->page));
1519 :
1520 : /* Construct new inner tuple with additional node */
1521 1582 : newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1522 :
1523 : /* Prepare WAL record */
1524 1582 : STORE_STATE(state, xlrec.stateSrc);
1525 1582 : xlrec.offnum = current->offnum;
1526 :
1527 : /* we don't fill these unless we need to change the parent downlink */
1528 1582 : xlrec.parentBlk = -1;
1529 1582 : xlrec.offnumParent = InvalidOffsetNumber;
1530 1582 : xlrec.nodeI = 0;
1531 :
1532 : /* we don't fill these unless tuple has to be moved */
1533 1582 : xlrec.offnumNew = InvalidOffsetNumber;
1534 1582 : xlrec.newPage = false;
1535 :
1536 1582 : if (PageGetExactFreeSpace(current->page) >=
1537 1582 : newInnerTuple->size - innerTuple->size)
1538 : {
1539 : /*
1540 : * We can replace the inner tuple by new version in-place
1541 : */
1542 1576 : START_CRIT_SECTION();
1543 :
1544 1576 : PageIndexTupleDelete(current->page, current->offnum);
1545 1576 : if (PageAddItem(current->page,
1546 : newInnerTuple, newInnerTuple->size,
1547 1576 : current->offnum, false, false) != current->offnum)
1548 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1549 : newInnerTuple->size);
1550 :
1551 1576 : MarkBufferDirty(current->buffer);
1552 :
1553 1576 : if (RelationNeedsWAL(index) && !state->isBuild)
1554 : {
1555 : XLogRecPtr recptr;
1556 :
1557 710 : XLogBeginInsert();
1558 710 : XLogRegisterData(&xlrec, sizeof(xlrec));
1559 710 : XLogRegisterData(newInnerTuple, newInnerTuple->size);
1560 :
1561 710 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1562 :
1563 710 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1564 :
1565 710 : PageSetLSN(current->page, recptr);
1566 : }
1567 :
1568 1576 : END_CRIT_SECTION();
1569 : }
1570 : else
1571 : {
1572 : /*
1573 : * move inner tuple to another page, and update parent
1574 : */
1575 : SpGistDeadTuple dt;
1576 : SPPageDesc saveCurrent;
1577 :
1578 : /*
1579 : * It should not be possible to get here for the root page, since we
1580 : * allow only one inner tuple on the root page, and spgFormInnerTuple
1581 : * always checks that inner tuples don't exceed the size of a page.
1582 : */
1583 6 : if (SpGistBlockIsRoot(current->blkno))
1584 0 : elog(ERROR, "cannot enlarge root tuple any more");
1585 : Assert(parent->buffer != InvalidBuffer);
1586 :
1587 6 : saveCurrent = *current;
1588 :
1589 6 : xlrec.offnumParent = parent->offnum;
1590 6 : xlrec.nodeI = parent->node;
1591 :
1592 : /*
1593 : * obtain new buffer with the same parity as current, since it will be
1594 : * a child of same parent tuple
1595 : */
1596 12 : current->buffer = SpGistGetBuffer(index,
1597 6 : GBUF_INNER_PARITY(current->blkno),
1598 6 : newInnerTuple->size + sizeof(ItemIdData),
1599 : &xlrec.newPage);
1600 6 : current->blkno = BufferGetBlockNumber(current->buffer);
1601 6 : current->page = BufferGetPage(current->buffer);
1602 :
1603 : /*
1604 : * Let's just make real sure new current isn't same as old. Right now
1605 : * that's impossible, but if SpGistGetBuffer ever got smart enough to
1606 : * delete placeholder tuples before checking space, maybe it wouldn't
1607 : * be impossible. The case would appear to work except that WAL
1608 : * replay would be subtly wrong, so I think a mere assert isn't enough
1609 : * here.
1610 : */
1611 6 : if (current->blkno == saveCurrent.blkno)
1612 0 : elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1613 :
1614 : /*
1615 : * New current and parent buffer will both be modified; but note that
1616 : * parent buffer could be same as either new or old current.
1617 : */
1618 6 : if (parent->buffer == saveCurrent.buffer)
1619 0 : xlrec.parentBlk = 0;
1620 6 : else if (parent->buffer == current->buffer)
1621 0 : xlrec.parentBlk = 1;
1622 : else
1623 6 : xlrec.parentBlk = 2;
1624 :
1625 6 : START_CRIT_SECTION();
1626 :
1627 : /* insert new ... */
1628 6 : xlrec.offnumNew = current->offnum =
1629 6 : SpGistPageAddNewItem(state, current->page,
1630 6 : newInnerTuple, newInnerTuple->size,
1631 : NULL, false);
1632 :
1633 6 : MarkBufferDirty(current->buffer);
1634 :
1635 : /* update parent's downlink and mark parent page dirty */
1636 6 : saveNodeLink(index, parent, current->blkno, current->offnum);
1637 :
1638 : /*
1639 : * Replace old tuple with a placeholder or redirection tuple. Unless
1640 : * doing an index build, we have to insert a redirection tuple for
1641 : * possible concurrent scans. We can't just delete it in any case,
1642 : * because that could change the offsets of other tuples on the page,
1643 : * breaking downlinks from their parents.
1644 : */
1645 6 : if (state->isBuild)
1646 0 : dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1647 : InvalidBlockNumber, InvalidOffsetNumber);
1648 : else
1649 6 : dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1650 6 : current->blkno, current->offnum);
1651 :
1652 6 : PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1653 6 : if (PageAddItem(saveCurrent.page, dt, dt->size,
1654 : saveCurrent.offnum,
1655 6 : false, false) != saveCurrent.offnum)
1656 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1657 : dt->size);
1658 :
1659 6 : if (state->isBuild)
1660 0 : SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1661 : else
1662 6 : SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1663 :
1664 6 : MarkBufferDirty(saveCurrent.buffer);
1665 :
1666 6 : if (RelationNeedsWAL(index) && !state->isBuild)
1667 : {
1668 : XLogRecPtr recptr;
1669 : int flags;
1670 :
1671 6 : XLogBeginInsert();
1672 :
1673 : /* orig page */
1674 6 : XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1675 : /* new page */
1676 6 : flags = REGBUF_STANDARD;
1677 6 : if (xlrec.newPage)
1678 6 : flags |= REGBUF_WILL_INIT;
1679 6 : XLogRegisterBuffer(1, current->buffer, flags);
1680 : /* parent page (if different from orig and new) */
1681 6 : if (xlrec.parentBlk == 2)
1682 6 : XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1683 :
1684 6 : XLogRegisterData(&xlrec, sizeof(xlrec));
1685 6 : XLogRegisterData(newInnerTuple, newInnerTuple->size);
1686 :
1687 6 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1688 :
1689 : /* we don't bother to check if any of these are redundant */
1690 6 : PageSetLSN(current->page, recptr);
1691 6 : PageSetLSN(parent->page, recptr);
1692 6 : PageSetLSN(saveCurrent.page, recptr);
1693 : }
1694 :
1695 6 : END_CRIT_SECTION();
1696 :
1697 : /* Release saveCurrent if it's not same as current or parent */
1698 6 : if (saveCurrent.buffer != current->buffer &&
1699 6 : saveCurrent.buffer != parent->buffer)
1700 : {
1701 6 : SpGistSetLastUsedPage(index, saveCurrent.buffer);
1702 6 : UnlockReleaseBuffer(saveCurrent.buffer);
1703 : }
1704 : }
1705 1582 : }
1706 :
1707 : /*
1708 : * spgSplitNode action: split inner tuple at current into prefix and postfix
1709 : */
1710 : static void
1711 646 : spgSplitNodeAction(Relation index, SpGistState *state,
1712 : SpGistInnerTuple innerTuple,
1713 : SPPageDesc *current, spgChooseOut *out)
1714 : {
1715 : SpGistInnerTuple prefixTuple,
1716 : postfixTuple;
1717 : SpGistNodeTuple node,
1718 : *nodes;
1719 : BlockNumber postfixBlkno;
1720 : OffsetNumber postfixOffset;
1721 : int i;
1722 : spgxlogSplitTuple xlrec;
1723 646 : Buffer newBuffer = InvalidBuffer;
1724 :
1725 : /* Should not be applied to nulls */
1726 : Assert(!SpGistPageStoresNulls(current->page));
1727 :
1728 : /* Check opclass gave us sane values */
1729 646 : if (out->result.splitTuple.prefixNNodes <= 0 ||
1730 646 : out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1731 0 : elog(ERROR, "invalid number of prefix nodes: %d",
1732 : out->result.splitTuple.prefixNNodes);
1733 646 : if (out->result.splitTuple.childNodeN < 0 ||
1734 646 : out->result.splitTuple.childNodeN >=
1735 646 : out->result.splitTuple.prefixNNodes)
1736 0 : elog(ERROR, "invalid child node number: %d",
1737 : out->result.splitTuple.childNodeN);
1738 :
1739 : /*
1740 : * Construct new prefix tuple with requested number of nodes. We'll fill
1741 : * in the childNodeN'th node's downlink below.
1742 : */
1743 646 : nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1744 646 : out->result.splitTuple.prefixNNodes);
1745 :
1746 1292 : for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1747 : {
1748 646 : Datum label = (Datum) 0;
1749 : bool labelisnull;
1750 :
1751 646 : labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1752 646 : if (!labelisnull)
1753 646 : label = out->result.splitTuple.prefixNodeLabels[i];
1754 646 : nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1755 : }
1756 :
1757 646 : prefixTuple = spgFormInnerTuple(state,
1758 646 : out->result.splitTuple.prefixHasPrefix,
1759 : out->result.splitTuple.prefixPrefixDatum,
1760 : out->result.splitTuple.prefixNNodes,
1761 : nodes);
1762 :
1763 : /* it must fit in the space that innerTuple now occupies */
1764 646 : if (prefixTuple->size > innerTuple->size)
1765 0 : elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1766 :
1767 : /*
1768 : * Construct new postfix tuple, containing all nodes of innerTuple with
1769 : * same node datums, but with the prefix specified by the picksplit
1770 : * function.
1771 : */
1772 646 : nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1773 2286 : SGITITERATE(innerTuple, i, node)
1774 : {
1775 1640 : nodes[i] = node;
1776 : }
1777 :
1778 646 : postfixTuple = spgFormInnerTuple(state,
1779 646 : out->result.splitTuple.postfixHasPrefix,
1780 : out->result.splitTuple.postfixPrefixDatum,
1781 646 : innerTuple->nNodes, nodes);
1782 :
1783 : /* Postfix tuple is allTheSame if original tuple was */
1784 646 : postfixTuple->allTheSame = innerTuple->allTheSame;
1785 :
1786 : /* prep data for WAL record */
1787 646 : xlrec.newPage = false;
1788 :
1789 : /*
1790 : * If we can't fit both tuples on the current page, get a new page for the
1791 : * postfix tuple. In particular, can't split to the root page.
1792 : *
1793 : * For the space calculation, note that prefixTuple replaces innerTuple
1794 : * but postfixTuple will be a new entry.
1795 : */
1796 646 : if (SpGistBlockIsRoot(current->blkno) ||
1797 634 : SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1798 634 : prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1799 : {
1800 : /*
1801 : * Choose page with next triple parity, because postfix tuple is a
1802 : * child of prefix one
1803 : */
1804 168 : newBuffer = SpGistGetBuffer(index,
1805 168 : GBUF_INNER_PARITY(current->blkno + 1),
1806 168 : postfixTuple->size + sizeof(ItemIdData),
1807 : &xlrec.newPage);
1808 : }
1809 :
1810 646 : START_CRIT_SECTION();
1811 :
1812 : /*
1813 : * Replace old tuple by prefix tuple
1814 : */
1815 646 : PageIndexTupleDelete(current->page, current->offnum);
1816 646 : xlrec.offnumPrefix = PageAddItem(current->page,
1817 : prefixTuple, prefixTuple->size,
1818 : current->offnum, false, false);
1819 646 : if (xlrec.offnumPrefix != current->offnum)
1820 0 : elog(ERROR, "failed to add item of size %u to SPGiST index page",
1821 : prefixTuple->size);
1822 :
1823 : /*
1824 : * put postfix tuple into appropriate page
1825 : */
1826 646 : if (newBuffer == InvalidBuffer)
1827 : {
1828 478 : postfixBlkno = current->blkno;
1829 478 : xlrec.offnumPostfix = postfixOffset =
1830 478 : SpGistPageAddNewItem(state, current->page,
1831 478 : postfixTuple, postfixTuple->size,
1832 : NULL, false);
1833 478 : xlrec.postfixBlkSame = true;
1834 : }
1835 : else
1836 : {
1837 168 : postfixBlkno = BufferGetBlockNumber(newBuffer);
1838 168 : xlrec.offnumPostfix = postfixOffset =
1839 168 : SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1840 168 : postfixTuple, postfixTuple->size,
1841 : NULL, false);
1842 168 : MarkBufferDirty(newBuffer);
1843 168 : xlrec.postfixBlkSame = false;
1844 : }
1845 :
1846 : /*
1847 : * And set downlink pointer in the prefix tuple to point to postfix tuple.
1848 : * (We can't avoid this step by doing the above two steps in opposite
1849 : * order, because there might not be enough space on the page to insert
1850 : * the postfix tuple first.) We have to update the local copy of the
1851 : * prefixTuple too, because that's what will be written to WAL.
1852 : */
1853 646 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1854 : postfixBlkno, postfixOffset);
1855 646 : prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1856 646 : PageGetItemId(current->page, current->offnum));
1857 646 : spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1858 : postfixBlkno, postfixOffset);
1859 :
1860 646 : MarkBufferDirty(current->buffer);
1861 :
1862 646 : if (RelationNeedsWAL(index) && !state->isBuild)
1863 : {
1864 : XLogRecPtr recptr;
1865 :
1866 612 : XLogBeginInsert();
1867 612 : XLogRegisterData(&xlrec, sizeof(xlrec));
1868 612 : XLogRegisterData(prefixTuple, prefixTuple->size);
1869 612 : XLogRegisterData(postfixTuple, postfixTuple->size);
1870 :
1871 612 : XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1872 612 : if (newBuffer != InvalidBuffer)
1873 : {
1874 : int flags;
1875 :
1876 162 : flags = REGBUF_STANDARD;
1877 162 : if (xlrec.newPage)
1878 6 : flags |= REGBUF_WILL_INIT;
1879 162 : XLogRegisterBuffer(1, newBuffer, flags);
1880 : }
1881 :
1882 612 : recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1883 :
1884 612 : PageSetLSN(current->page, recptr);
1885 :
1886 612 : if (newBuffer != InvalidBuffer)
1887 : {
1888 162 : PageSetLSN(BufferGetPage(newBuffer), recptr);
1889 : }
1890 : }
1891 :
1892 646 : END_CRIT_SECTION();
1893 :
1894 : /* Update local free-space cache and release buffer */
1895 646 : if (newBuffer != InvalidBuffer)
1896 : {
1897 168 : SpGistSetLastUsedPage(index, newBuffer);
1898 168 : UnlockReleaseBuffer(newBuffer);
1899 : }
1900 646 : }
1901 :
1902 : /*
1903 : * Insert one item into the index.
1904 : *
1905 : * Returns true on success, false if we failed to complete the insertion
1906 : * (typically because of conflict with a concurrent insert). In the latter
1907 : * case, caller should re-call spgdoinsert() with the same args.
1908 : */
1909 : bool
1910 775734 : spgdoinsert(Relation index, SpGistState *state,
1911 : const ItemPointerData *heapPtr, const Datum *datums, const bool *isnulls)
1912 : {
1913 775734 : bool result = true;
1914 775734 : TupleDesc leafDescriptor = state->leafTupDesc;
1915 775734 : bool isnull = isnulls[spgKeyColumn];
1916 775734 : int level = 0;
1917 : Datum leafDatums[INDEX_MAX_KEYS];
1918 : int leafSize;
1919 : int bestLeafSize;
1920 775734 : int numNoProgressCycles = 0;
1921 : SPPageDesc current,
1922 : parent;
1923 775734 : FmgrInfo *procinfo = NULL;
1924 :
1925 : /*
1926 : * Look up FmgrInfo of the user-defined choose function once, to save
1927 : * cycles in the loop below.
1928 : */
1929 775734 : if (!isnull)
1930 775644 : procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1931 :
1932 : /*
1933 : * Prepare the leaf datum to insert.
1934 : *
1935 : * If an optional "compress" method is provided, then call it to form the
1936 : * leaf key datum from the input datum. Otherwise, store the input datum
1937 : * as is. Since we don't use index_form_tuple in this AM, we have to make
1938 : * sure value to be inserted is not toasted; FormIndexDatum doesn't
1939 : * guarantee that. But we assume the "compress" method to return an
1940 : * untoasted value.
1941 : */
1942 775734 : if (!isnull)
1943 : {
1944 775644 : if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1945 : {
1946 79732 : FmgrInfo *compressProcinfo = NULL;
1947 :
1948 79732 : compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1949 79732 : leafDatums[spgKeyColumn] =
1950 79732 : FunctionCall1Coll(compressProcinfo,
1951 79732 : index->rd_indcollation[spgKeyColumn],
1952 : datums[spgKeyColumn]);
1953 : }
1954 : else
1955 : {
1956 : Assert(state->attLeafType.type == state->attType.type);
1957 :
1958 695912 : if (state->attType.attlen == -1)
1959 179522 : leafDatums[spgKeyColumn] =
1960 179522 : PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
1961 : else
1962 516390 : leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1963 : }
1964 : }
1965 : else
1966 90 : leafDatums[spgKeyColumn] = (Datum) 0;
1967 :
1968 : /* Likewise, ensure that any INCLUDE values are not toasted */
1969 869216 : for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1970 : {
1971 93482 : if (!isnulls[i])
1972 : {
1973 86344 : if (TupleDescCompactAttr(leafDescriptor, i)->attlen == -1)
1974 13732 : leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1975 : else
1976 72612 : leafDatums[i] = datums[i];
1977 : }
1978 : else
1979 7138 : leafDatums[i] = (Datum) 0;
1980 : }
1981 :
1982 : /*
1983 : * Compute space needed for a leaf tuple containing the given data.
1984 : */
1985 775734 : leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1986 : /* Account for an item pointer, too */
1987 775734 : leafSize += sizeof(ItemIdData);
1988 :
1989 : /*
1990 : * If it isn't gonna fit, and the opclass can't reduce the datum size by
1991 : * suffixing, bail out now rather than doing a lot of useless work.
1992 : */
1993 775734 : if (leafSize > SPGIST_PAGE_CAPACITY &&
1994 4 : (isnull || !state->config.longValuesOK))
1995 0 : ereport(ERROR,
1996 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1997 : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
1998 : leafSize - sizeof(ItemIdData),
1999 : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2000 : RelationGetRelationName(index)),
2001 : errhint("Values larger than a buffer page cannot be indexed.")));
2002 775734 : bestLeafSize = leafSize;
2003 :
2004 : /* Initialize "current" to the appropriate root page */
2005 775734 : current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2006 775734 : current.buffer = InvalidBuffer;
2007 775734 : current.page = NULL;
2008 775734 : current.offnum = FirstOffsetNumber;
2009 775734 : current.node = -1;
2010 :
2011 : /* "parent" is invalid for the moment */
2012 775734 : parent.blkno = InvalidBlockNumber;
2013 775734 : parent.buffer = InvalidBuffer;
2014 775734 : parent.page = NULL;
2015 775734 : parent.offnum = InvalidOffsetNumber;
2016 775734 : parent.node = -1;
2017 :
2018 : /*
2019 : * Before entering the loop, try to clear any pending interrupt condition.
2020 : * If a query cancel is pending, we might as well accept it now not later;
2021 : * while if a non-canceling condition is pending, servicing it here avoids
2022 : * having to restart the insertion and redo all the work so far.
2023 : */
2024 775734 : CHECK_FOR_INTERRUPTS();
2025 :
2026 : for (;;)
2027 16338670 : {
2028 17114404 : bool isNew = false;
2029 :
2030 : /*
2031 : * Bail out if query cancel is pending. We must have this somewhere
2032 : * in the loop since a broken opclass could produce an infinite
2033 : * picksplit loop. However, because we'll be holding buffer lock(s)
2034 : * after the first iteration, ProcessInterrupts() wouldn't be able to
2035 : * throw a cancel error here. Hence, if we see that an interrupt is
2036 : * pending, break out of the loop and deal with the situation below.
2037 : * Set result = false because we must restart the insertion if the
2038 : * interrupt isn't a query-cancel-or-die case.
2039 : */
2040 17114404 : if (INTERRUPTS_PENDING_CONDITION())
2041 : {
2042 0 : result = false;
2043 0 : break;
2044 : }
2045 :
2046 17114404 : if (current.blkno == InvalidBlockNumber)
2047 : {
2048 : /*
2049 : * Create a leaf page. If leafSize is too large to fit on a page,
2050 : * we won't actually use the page yet, but it simplifies the API
2051 : * for doPickSplit to always have a leaf page at hand; so just
2052 : * quietly limit our request to a page size.
2053 : */
2054 2078 : current.buffer =
2055 2078 : SpGistGetBuffer(index,
2056 : GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2057 2078 : Min(leafSize, SPGIST_PAGE_CAPACITY),
2058 : &isNew);
2059 2078 : current.blkno = BufferGetBlockNumber(current.buffer);
2060 : }
2061 17112326 : else if (parent.buffer == InvalidBuffer)
2062 : {
2063 : /* we hold no parent-page lock, so no deadlock is possible */
2064 775734 : current.buffer = ReadBuffer(index, current.blkno);
2065 775734 : LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2066 : }
2067 16336592 : else if (current.blkno != parent.blkno)
2068 : {
2069 : /* descend to a new child page */
2070 1510928 : current.buffer = ReadBuffer(index, current.blkno);
2071 :
2072 : /*
2073 : * Attempt to acquire lock on child page. We must beware of
2074 : * deadlock against another insertion process descending from that
2075 : * page to our parent page (see README). If we fail to get lock,
2076 : * abandon the insertion and tell our caller to start over.
2077 : *
2078 : * XXX this could be improved, because failing to get lock on a
2079 : * buffer is not proof of a deadlock situation; the lock might be
2080 : * held by a reader, or even just background writer/checkpointer
2081 : * process. Perhaps it'd be worth retrying after sleeping a bit?
2082 : */
2083 1510928 : if (!ConditionalLockBuffer(current.buffer))
2084 : {
2085 86 : ReleaseBuffer(current.buffer);
2086 86 : UnlockReleaseBuffer(parent.buffer);
2087 86 : return false;
2088 : }
2089 : }
2090 : else
2091 : {
2092 : /* inner tuple can be stored on the same page as parent one */
2093 14825664 : current.buffer = parent.buffer;
2094 : }
2095 17114318 : current.page = BufferGetPage(current.buffer);
2096 :
2097 : /* should not arrive at a page of the wrong type */
2098 34228546 : if (isnull ? !SpGistPageStoresNulls(current.page) :
2099 17114228 : SpGistPageStoresNulls(current.page))
2100 0 : elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2101 : current.blkno);
2102 :
2103 17114318 : if (SpGistPageIsLeaf(current.page))
2104 : {
2105 : SpGistLeafTuple leafTuple;
2106 : int nToSplit,
2107 : sizeToSplit;
2108 :
2109 775918 : leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2110 1551836 : if (leafTuple->size + sizeof(ItemIdData) <=
2111 775918 : SpGistPageGetFreeSpace(current.page, 1))
2112 : {
2113 : /* it fits on page, so insert it and we're done */
2114 768094 : addLeafTuple(index, state, leafTuple,
2115 : ¤t, &parent, isnull, isNew);
2116 775644 : break;
2117 : }
2118 7824 : else if ((sizeToSplit =
2119 7824 : checkSplitConditions(index, state, ¤t,
2120 3788 : &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2121 3788 : nToSplit < 64 &&
2122 2330 : leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2123 : {
2124 : /*
2125 : * the amount of data is pretty small, so just move the whole
2126 : * chain to another leaf page rather than splitting it.
2127 : */
2128 : Assert(!isNew);
2129 2282 : moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
2130 2282 : break; /* we're done */
2131 : }
2132 : else
2133 : {
2134 : /* picksplit */
2135 5542 : if (doPickSplit(index, state, ¤t, &parent,
2136 : leafTuple, level, isnull, isNew))
2137 5268 : break; /* doPickSplit installed new tuples */
2138 :
2139 : /* leaf tuple will not be inserted yet */
2140 274 : pfree(leafTuple);
2141 :
2142 : /*
2143 : * current now describes new inner tuple, go insert into it
2144 : */
2145 : Assert(!SpGistPageIsLeaf(current.page));
2146 274 : goto process_inner_tuple;
2147 : }
2148 : }
2149 : else /* non-leaf page */
2150 : {
2151 : /*
2152 : * Apply the opclass choose function to figure out how to insert
2153 : * the given datum into the current inner tuple.
2154 : */
2155 : SpGistInnerTuple innerTuple;
2156 : spgChooseIn in;
2157 : spgChooseOut out;
2158 :
2159 : /*
2160 : * spgAddNode and spgSplitTuple cases will loop back to here to
2161 : * complete the insertion operation. Just in case the choose
2162 : * function is broken and produces add or split requests
2163 : * repeatedly, check for query cancel (see comments above).
2164 : */
2165 16338674 : process_inner_tuple:
2166 16340902 : if (INTERRUPTS_PENDING_CONDITION())
2167 : {
2168 0 : result = false;
2169 0 : break;
2170 : }
2171 :
2172 16340902 : innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2173 16340902 : PageGetItemId(current.page, current.offnum));
2174 :
2175 16340902 : in.datum = datums[spgKeyColumn];
2176 16340902 : in.leafDatum = leafDatums[spgKeyColumn];
2177 16340902 : in.level = level;
2178 16340902 : in.allTheSame = innerTuple->allTheSame;
2179 16340902 : in.hasPrefix = (innerTuple->prefixSize > 0);
2180 16340902 : in.prefixDatum = SGITDATUM(innerTuple, state);
2181 16340902 : in.nNodes = innerTuple->nNodes;
2182 16340902 : in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2183 :
2184 16340902 : memset(&out, 0, sizeof(out));
2185 :
2186 16340902 : if (!isnull)
2187 : {
2188 : /* use user-defined choose method */
2189 16340902 : FunctionCall2Coll(procinfo,
2190 16340902 : index->rd_indcollation[0],
2191 : PointerGetDatum(&in),
2192 : PointerGetDatum(&out));
2193 : }
2194 : else
2195 : {
2196 : /* force "match" action (to insert to random subnode) */
2197 0 : out.resultType = spgMatchNode;
2198 : }
2199 :
2200 16340902 : if (innerTuple->allTheSame)
2201 : {
2202 : /*
2203 : * It's not allowed to do an AddNode at an allTheSame tuple.
2204 : * Opclass must say "match", in which case we choose a random
2205 : * one of the nodes to descend into, or "split".
2206 : */
2207 145176 : if (out.resultType == spgAddNode)
2208 0 : elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2209 145176 : else if (out.resultType == spgMatchNode)
2210 145164 : out.result.matchNode.nodeN =
2211 145164 : pg_prng_uint64_range(&pg_global_prng_state,
2212 145164 : 0, innerTuple->nNodes - 1);
2213 : }
2214 :
2215 16340902 : switch (out.resultType)
2216 : {
2217 16338674 : case spgMatchNode:
2218 : /* Descend to N'th child node */
2219 16338674 : spgMatchNodeAction(index, state, innerTuple,
2220 : ¤t, &parent,
2221 : out.result.matchNode.nodeN);
2222 : /* Adjust level as per opclass request */
2223 16338674 : level += out.result.matchNode.levelAdd;
2224 : /* Replace leafDatum and recompute leafSize */
2225 16338674 : if (!isnull)
2226 : {
2227 16338674 : leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2228 16338674 : leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2229 : leafDatums, isnulls);
2230 16338674 : leafSize += sizeof(ItemIdData);
2231 : }
2232 :
2233 : /*
2234 : * Check new tuple size; fail if it can't fit, unless the
2235 : * opclass says it can handle the situation by suffixing.
2236 : *
2237 : * However, the opclass can only shorten the leaf datum,
2238 : * which may not be enough to ever make the tuple fit,
2239 : * since INCLUDE columns might alone use more than a page.
2240 : * Depending on the opclass' behavior, that could lead to
2241 : * an infinite loop --- spgtextproc.c, for example, will
2242 : * just repeatedly generate an empty-string leaf datum
2243 : * once it runs out of data. Actual bugs in opclasses
2244 : * might cause infinite looping, too. To detect such a
2245 : * loop, check to see if we are making progress by
2246 : * reducing the leafSize in each pass. This is a bit
2247 : * tricky though. Because of alignment considerations,
2248 : * the total tuple size might not decrease on every pass.
2249 : * Also, there are edge cases where the choose method
2250 : * might seem to not make progress for a cycle or two.
2251 : * Somewhat arbitrarily, we allow up to 10 no-progress
2252 : * iterations before failing. (This limit should be more
2253 : * than MAXALIGN, to accommodate opclasses that trim one
2254 : * byte from the leaf datum per pass.)
2255 : */
2256 16338674 : if (leafSize > SPGIST_PAGE_CAPACITY)
2257 : {
2258 52 : bool ok = false;
2259 :
2260 52 : if (state->config.longValuesOK && !isnull)
2261 : {
2262 52 : if (leafSize < bestLeafSize)
2263 : {
2264 4 : ok = true;
2265 4 : bestLeafSize = leafSize;
2266 4 : numNoProgressCycles = 0;
2267 : }
2268 48 : else if (++numNoProgressCycles < 10)
2269 44 : ok = true;
2270 : }
2271 52 : if (!ok)
2272 4 : ereport(ERROR,
2273 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2274 : errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2275 : leafSize - sizeof(ItemIdData),
2276 : SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2277 : RelationGetRelationName(index)),
2278 : errhint("Values larger than a buffer page cannot be indexed.")));
2279 : }
2280 :
2281 : /*
2282 : * Loop around and attempt to insert the new leafDatum at
2283 : * "current" (which might reference an existing child
2284 : * tuple, or might be invalid to force us to find a new
2285 : * page for the tuple).
2286 : */
2287 16338670 : break;
2288 1582 : case spgAddNode:
2289 : /* AddNode is not sensible if nodes don't have labels */
2290 1582 : if (in.nodeLabels == NULL)
2291 0 : elog(ERROR, "cannot add a node to an inner tuple without node labels");
2292 : /* Add node to inner tuple, per request */
2293 1582 : spgAddNodeAction(index, state, innerTuple,
2294 : ¤t, &parent,
2295 : out.result.addNode.nodeN,
2296 : out.result.addNode.nodeLabel);
2297 :
2298 : /*
2299 : * Retry insertion into the enlarged node. We assume that
2300 : * we'll get a MatchNode result this time.
2301 : */
2302 1582 : goto process_inner_tuple;
2303 : break;
2304 646 : case spgSplitTuple:
2305 : /* Split inner tuple, per request */
2306 646 : spgSplitNodeAction(index, state, innerTuple,
2307 : ¤t, &out);
2308 :
2309 : /* Retry insertion into the split node */
2310 646 : goto process_inner_tuple;
2311 : break;
2312 0 : default:
2313 0 : elog(ERROR, "unrecognized SPGiST choose result: %d",
2314 : (int) out.resultType);
2315 : break;
2316 : }
2317 : }
2318 : } /* end loop */
2319 :
2320 : /*
2321 : * Release any buffers we're still holding. Beware of possibility that
2322 : * current and parent reference same buffer.
2323 : */
2324 775644 : if (current.buffer != InvalidBuffer)
2325 : {
2326 775644 : SpGistSetLastUsedPage(index, current.buffer);
2327 775644 : UnlockReleaseBuffer(current.buffer);
2328 : }
2329 775644 : if (parent.buffer != InvalidBuffer &&
2330 758666 : parent.buffer != current.buffer)
2331 : {
2332 754170 : SpGistSetLastUsedPage(index, parent.buffer);
2333 754170 : UnlockReleaseBuffer(parent.buffer);
2334 : }
2335 :
2336 : /*
2337 : * We do not support being called while some outer function is holding a
2338 : * buffer lock (or any other reason to postpone query cancels). If that
2339 : * were the case, telling the caller to retry would create an infinite
2340 : * loop.
2341 : */
2342 : Assert(INTERRUPTS_CAN_BE_PROCESSED());
2343 :
2344 : /*
2345 : * Finally, check for interrupts again. If there was a query cancel,
2346 : * ProcessInterrupts() will be able to throw the error here. If it was
2347 : * some other kind of interrupt that can just be cleared, return false to
2348 : * tell our caller to retry.
2349 : */
2350 775644 : CHECK_FOR_INTERRUPTS();
2351 :
2352 775644 : return result;
2353 : }
|