Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execReplication.c
4 : * miscellaneous executor routines for logical replication
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execReplication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/amapi.h"
18 : #include "access/commit_ts.h"
19 : #include "access/genam.h"
20 : #include "access/gist.h"
21 : #include "access/relscan.h"
22 : #include "access/tableam.h"
23 : #include "access/transam.h"
24 : #include "access/xact.h"
25 : #include "access/heapam.h"
26 : #include "catalog/pg_am_d.h"
27 : #include "commands/trigger.h"
28 : #include "executor/executor.h"
29 : #include "executor/nodeModifyTable.h"
30 : #include "replication/conflict.h"
31 : #include "replication/logicalrelation.h"
32 : #include "storage/lmgr.h"
33 : #include "utils/builtins.h"
34 : #include "utils/lsyscache.h"
35 : #include "utils/rel.h"
36 : #include "utils/snapmgr.h"
37 : #include "utils/syscache.h"
38 : #include "utils/typcache.h"
39 :
40 :
41 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
42 : TypeCacheEntry **eq, Bitmapset *columns);
43 :
44 : /*
45 : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
46 : * is setup to match 'rel' (*NOT* idxrel!).
47 : *
48 : * Returns how many columns to use for the index scan.
49 : *
50 : * This is not a generic routine, idxrel must be PK, RI, or an index that can be
51 : * used for a REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
52 : * for details.
53 : *
54 : * By definition, replication identity of a rel meets all limitations associated
55 : * with that. Note that any other index could also meet these limitations.
56 : */
57 : static int
58 144208 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
59 : TupleTableSlot *searchslot)
60 : {
61 : int index_attoff;
62 144208 : int skey_attoff = 0;
63 : Datum indclassDatum;
64 : oidvector *opclass;
65 144208 : int2vector *indkey = &idxrel->rd_index->indkey;
66 :
67 144208 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
68 : Anum_pg_index_indclass);
69 144208 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
70 :
71 : /* Build scankey for every non-expression attribute in the index. */
72 288462 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
73 144254 : index_attoff++)
74 : {
75 : Oid operator;
76 : Oid optype;
77 : Oid opfamily;
78 : RegProcedure regop;
79 144254 : int table_attno = indkey->values[index_attoff];
80 : StrategyNumber eq_strategy;
81 :
82 144254 : if (!AttributeNumberIsValid(table_attno))
83 : {
84 : /*
85 : * XXX: Currently, we don't support expressions in the scan key,
86 : * see code below.
87 : */
88 4 : continue;
89 : }
90 :
91 : /*
92 : * Load the operator info. We need this to get the equality operator
93 : * function for the scan key.
94 : */
95 144250 : optype = get_opclass_input_type(opclass->values[index_attoff]);
96 144250 : opfamily = get_opclass_family(opclass->values[index_attoff]);
97 144250 : eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
98 144250 : operator = get_opfamily_member(opfamily, optype,
99 : optype,
100 : eq_strategy);
101 :
102 144250 : if (!OidIsValid(operator))
103 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
104 : eq_strategy, optype, optype, opfamily);
105 :
106 144250 : regop = get_opcode(operator);
107 :
108 : /* Initialize the scankey. */
109 144250 : ScanKeyInit(&skey[skey_attoff],
110 144250 : index_attoff + 1,
111 : eq_strategy,
112 : regop,
113 144250 : searchslot->tts_values[table_attno - 1]);
114 :
115 144250 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
116 :
117 : /* Check for null value. */
118 144250 : if (searchslot->tts_isnull[table_attno - 1])
119 2 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
120 :
121 144250 : skey_attoff++;
122 : }
123 :
124 : /* There must always be at least one attribute for the index scan. */
125 : Assert(skey_attoff > 0);
126 :
127 144208 : return skey_attoff;
128 : }
129 :
130 :
131 : /*
132 : * Helper function to check if it is necessary to re-fetch and lock the tuple
133 : * due to concurrent modifications. This function should be called after
134 : * invoking table_tuple_lock.
135 : */
136 : static bool
137 144554 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
138 : {
139 144554 : bool refetch = false;
140 :
141 144554 : switch (res)
142 : {
143 144554 : case TM_Ok:
144 144554 : break;
145 0 : case TM_Updated:
146 : /* XXX: Improve handling here */
147 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
148 0 : ereport(LOG,
149 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
150 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
151 : else
152 0 : ereport(LOG,
153 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
154 : errmsg("concurrent update, retrying")));
155 0 : refetch = true;
156 0 : break;
157 0 : case TM_Deleted:
158 : /* XXX: Improve handling here */
159 0 : ereport(LOG,
160 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
161 : errmsg("concurrent delete, retrying")));
162 0 : refetch = true;
163 0 : break;
164 0 : case TM_Invisible:
165 0 : elog(ERROR, "attempted to lock invisible tuple");
166 : break;
167 0 : default:
168 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
169 : break;
170 : }
171 :
172 144554 : return refetch;
173 : }
174 :
175 : /*
176 : * Search the relation 'rel' for tuple using the index.
177 : *
178 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
179 : * contents, and return true. Return false otherwise.
180 : */
181 : bool
182 144206 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
183 : LockTupleMode lockmode,
184 : TupleTableSlot *searchslot,
185 : TupleTableSlot *outslot)
186 : {
187 : ScanKeyData skey[INDEX_MAX_KEYS];
188 : int skey_attoff;
189 : IndexScanDesc scan;
190 : SnapshotData snap;
191 : TransactionId xwait;
192 : Relation idxrel;
193 : bool found;
194 144206 : TypeCacheEntry **eq = NULL;
195 : bool isIdxSafeToSkipDuplicates;
196 :
197 : /* Open the index. */
198 144206 : idxrel = index_open(idxoid, RowExclusiveLock);
199 :
200 144206 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
201 :
202 144206 : InitDirtySnapshot(snap);
203 :
204 : /* Build scan key. */
205 144206 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
206 :
207 : /* Start an index scan. */
208 144206 : scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
209 :
210 0 : retry:
211 144206 : found = false;
212 :
213 144206 : index_rescan(scan, skey, skey_attoff, NULL, 0);
214 :
215 : /* Try to find the tuple */
216 144206 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
217 : {
218 : /*
219 : * Avoid expensive equality check if the index is primary key or
220 : * replica identity index.
221 : */
222 144178 : if (!isIdxSafeToSkipDuplicates)
223 : {
224 34 : if (eq == NULL)
225 34 : eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
226 :
227 34 : if (!tuples_equal(outslot, searchslot, eq, NULL))
228 0 : continue;
229 : }
230 :
231 144178 : ExecMaterializeSlot(outslot);
232 :
233 288356 : xwait = TransactionIdIsValid(snap.xmin) ?
234 144178 : snap.xmin : snap.xmax;
235 :
236 : /*
237 : * If the tuple is locked, wait for locking transaction to finish and
238 : * retry.
239 : */
240 144178 : if (TransactionIdIsValid(xwait))
241 : {
242 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
243 0 : goto retry;
244 : }
245 :
246 : /* Found our tuple and it's not locked */
247 144178 : found = true;
248 144178 : break;
249 : }
250 :
251 : /* Found tuple, try to lock it in the lockmode. */
252 144206 : if (found)
253 : {
254 : TM_FailureData tmfd;
255 : TM_Result res;
256 :
257 144178 : PushActiveSnapshot(GetLatestSnapshot());
258 :
259 144178 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
260 : outslot,
261 : GetCurrentCommandId(false),
262 : lockmode,
263 : LockWaitBlock,
264 : 0 /* don't follow updates */ ,
265 : &tmfd);
266 :
267 144178 : PopActiveSnapshot();
268 :
269 144178 : if (should_refetch_tuple(res, &tmfd))
270 0 : goto retry;
271 : }
272 :
273 144206 : index_endscan(scan);
274 :
275 : /* Don't release lock until commit. */
276 144206 : index_close(idxrel, NoLock);
277 :
278 144206 : return found;
279 : }
280 :
281 : /*
282 : * Compare the tuples in the slots by checking if they have equal values.
283 : *
284 : * If 'columns' is not null, only the columns specified within it will be
285 : * considered for the equality check, ignoring all other columns.
286 : */
287 : static bool
288 210648 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
289 : TypeCacheEntry **eq, Bitmapset *columns)
290 : {
291 : int attrnum;
292 :
293 : Assert(slot1->tts_tupleDescriptor->natts ==
294 : slot2->tts_tupleDescriptor->natts);
295 :
296 210648 : slot_getallattrs(slot1);
297 210648 : slot_getallattrs(slot2);
298 :
299 : /* Check equality of the attributes. */
300 211052 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
301 : {
302 : Form_pg_attribute att;
303 : TypeCacheEntry *typentry;
304 :
305 210724 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
306 :
307 : /*
308 : * Ignore dropped and generated columns as the publisher doesn't send
309 : * those
310 : */
311 210724 : if (att->attisdropped || att->attgenerated)
312 2 : continue;
313 :
314 : /*
315 : * Ignore columns that are not listed for checking.
316 : */
317 210722 : if (columns &&
318 0 : !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
319 : columns))
320 0 : continue;
321 :
322 : /*
323 : * If one value is NULL and other is not, then they are certainly not
324 : * equal
325 : */
326 210722 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
327 0 : return false;
328 :
329 : /*
330 : * If both are NULL, they can be considered equal.
331 : */
332 210722 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
333 2 : continue;
334 :
335 210720 : typentry = eq[attrnum];
336 210720 : if (typentry == NULL)
337 : {
338 404 : typentry = lookup_type_cache(att->atttypid,
339 : TYPECACHE_EQ_OPR_FINFO);
340 404 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
341 0 : ereport(ERROR,
342 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
343 : errmsg("could not identify an equality operator for type %s",
344 : format_type_be(att->atttypid))));
345 404 : eq[attrnum] = typentry;
346 : }
347 :
348 210720 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
349 : att->attcollation,
350 210720 : slot1->tts_values[attrnum],
351 210720 : slot2->tts_values[attrnum])))
352 210320 : return false;
353 : }
354 :
355 328 : return true;
356 : }
357 :
358 : /*
359 : * Search the relation 'rel' for tuple using the sequential scan.
360 : *
361 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
362 : * contents, and return true. Return false otherwise.
363 : *
364 : * Note that this stops on the first matching tuple.
365 : *
366 : * This can obviously be quite slow on tables that have more than few rows.
367 : */
368 : bool
369 298 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
370 : TupleTableSlot *searchslot, TupleTableSlot *outslot)
371 : {
372 : TupleTableSlot *scanslot;
373 : TableScanDesc scan;
374 : SnapshotData snap;
375 : TypeCacheEntry **eq;
376 : TransactionId xwait;
377 : bool found;
378 298 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
379 :
380 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
381 :
382 298 : eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
383 :
384 : /* Start a heap scan. */
385 298 : InitDirtySnapshot(snap);
386 298 : scan = table_beginscan(rel, &snap, 0, NULL);
387 298 : scanslot = table_slot_create(rel, NULL);
388 :
389 0 : retry:
390 298 : found = false;
391 :
392 298 : table_rescan(scan, NULL);
393 :
394 : /* Try to find the tuple */
395 210616 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
396 : {
397 210608 : if (!tuples_equal(scanslot, searchslot, eq, NULL))
398 210318 : continue;
399 :
400 290 : found = true;
401 290 : ExecCopySlot(outslot, scanslot);
402 :
403 580 : xwait = TransactionIdIsValid(snap.xmin) ?
404 290 : snap.xmin : snap.xmax;
405 :
406 : /*
407 : * If the tuple is locked, wait for locking transaction to finish and
408 : * retry.
409 : */
410 290 : if (TransactionIdIsValid(xwait))
411 : {
412 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
413 0 : goto retry;
414 : }
415 :
416 : /* Found our tuple and it's not locked */
417 290 : break;
418 : }
419 :
420 : /* Found tuple, try to lock it in the lockmode. */
421 298 : if (found)
422 : {
423 : TM_FailureData tmfd;
424 : TM_Result res;
425 :
426 290 : PushActiveSnapshot(GetLatestSnapshot());
427 :
428 290 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
429 : outslot,
430 : GetCurrentCommandId(false),
431 : lockmode,
432 : LockWaitBlock,
433 : 0 /* don't follow updates */ ,
434 : &tmfd);
435 :
436 290 : PopActiveSnapshot();
437 :
438 290 : if (should_refetch_tuple(res, &tmfd))
439 0 : goto retry;
440 : }
441 :
442 298 : table_endscan(scan);
443 298 : ExecDropSingleTupleTableSlot(scanslot);
444 :
445 298 : return found;
446 : }
447 :
448 : /*
449 : * Build additional index information necessary for conflict detection.
450 : */
451 : static void
452 92 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
453 : {
454 268 : for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
455 : {
456 176 : Relation indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
457 176 : IndexInfo *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
458 :
459 176 : if (conflictindex != RelationGetRelid(indexRelation))
460 84 : continue;
461 :
462 : /*
463 : * This Assert will fail if BuildSpeculativeIndexInfo() is called
464 : * twice for the given index.
465 : */
466 : Assert(indexRelationInfo->ii_UniqueOps == NULL);
467 :
468 92 : BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
469 : }
470 92 : }
471 :
472 : /*
473 : * If the tuple is recently dead and was deleted by a transaction with a newer
474 : * commit timestamp than previously recorded, update the associated transaction
475 : * ID, commit time, and origin. This helps ensure that conflict detection uses
476 : * the most recent and relevant deletion metadata.
477 : */
478 : static void
479 6 : update_most_recent_deletion_info(TupleTableSlot *scanslot,
480 : TransactionId oldestxmin,
481 : TransactionId *delete_xid,
482 : TimestampTz *delete_time,
483 : ReplOriginId *delete_origin)
484 : {
485 : BufferHeapTupleTableSlot *hslot;
486 : HeapTuple tuple;
487 : Buffer buf;
488 6 : bool recently_dead = false;
489 : TransactionId xmax;
490 : TimestampTz localts;
491 : ReplOriginId localorigin;
492 :
493 6 : hslot = (BufferHeapTupleTableSlot *) scanslot;
494 :
495 6 : tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
496 6 : buf = hslot->buffer;
497 :
498 6 : LockBuffer(buf, BUFFER_LOCK_SHARE);
499 :
500 : /*
501 : * We do not consider HEAPTUPLE_DEAD status because it indicates either
502 : * tuples whose inserting transaction was aborted (meaning there is no
503 : * commit timestamp or origin), or tuples deleted by a transaction older
504 : * than oldestxmin, making it safe to ignore them during conflict
505 : * detection (See comments atop worker.c for details).
506 : */
507 6 : if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
508 6 : recently_dead = true;
509 :
510 6 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
511 :
512 6 : if (!recently_dead)
513 0 : return;
514 :
515 6 : xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
516 6 : if (!TransactionIdIsValid(xmax))
517 0 : return;
518 :
519 : /* Select the dead tuple with the most recent commit timestamp */
520 12 : if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
521 6 : TimestampDifferenceExceeds(*delete_time, localts, 0))
522 : {
523 6 : *delete_xid = xmax;
524 6 : *delete_time = localts;
525 6 : *delete_origin = localorigin;
526 : }
527 : }
528 :
529 : /*
530 : * Searches the relation 'rel' for the most recently deleted tuple that matches
531 : * the values in 'searchslot' and is not yet removable by VACUUM. The function
532 : * returns the transaction ID, origin, and commit timestamp of the transaction
533 : * that deleted this tuple.
534 : *
535 : * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
536 : * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
537 : * conflict detection.
538 : *
539 : * Instead of stopping at the first match, we scan all matching dead tuples to
540 : * identify most recent deletion. This is crucial because only the latest
541 : * deletion is relevant for resolving conflicts.
542 : *
543 : * For example, consider a scenario on the subscriber where a row is deleted,
544 : * re-inserted, and then deleted again only on the subscriber:
545 : *
546 : * - (pk, 1) - deleted at 9:00,
547 : * - (pk, 1) - deleted at 9:02,
548 : *
549 : * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
550 : *
551 : * If we mistakenly return the older deletion (9:00), the system may wrongly
552 : * apply the remote update using a last-update-wins strategy. Instead, we must
553 : * recognize the more recent deletion at 9:02 and skip the update. See
554 : * comments atop worker.c for details. Note, as of now, conflict resolution
555 : * is not implemented. Consequently, the system may incorrectly report the
556 : * older tuple as the conflicted one, leading to misleading results.
557 : *
558 : * The commit timestamp of the deleting transaction is used to determine which
559 : * tuple was deleted most recently.
560 : */
561 : bool
562 4 : RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
563 : TransactionId oldestxmin,
564 : TransactionId *delete_xid,
565 : ReplOriginId *delete_origin,
566 : TimestampTz *delete_time)
567 : {
568 : TupleTableSlot *scanslot;
569 : TableScanDesc scan;
570 : TypeCacheEntry **eq;
571 : Bitmapset *indexbitmap;
572 4 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
573 :
574 : Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
575 :
576 4 : *delete_xid = InvalidTransactionId;
577 4 : *delete_origin = InvalidReplOriginId;
578 4 : *delete_time = 0;
579 :
580 : /*
581 : * If the relation has a replica identity key or a primary key that is
582 : * unusable for locating deleted tuples (see
583 : * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
584 : * necessary. In such cases, comparing the entire tuple is not required,
585 : * since the remote tuple might not include all column values. Instead,
586 : * the indexed columns alone are sufficient to identify the target tuple
587 : * (see logicalrep_rel_mark_updatable).
588 : */
589 4 : indexbitmap = RelationGetIndexAttrBitmap(rel,
590 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
591 :
592 : /* fallback to PK if no replica identity */
593 4 : if (!indexbitmap)
594 4 : indexbitmap = RelationGetIndexAttrBitmap(rel,
595 : INDEX_ATTR_BITMAP_PRIMARY_KEY);
596 :
597 4 : eq = palloc0_array(TypeCacheEntry *, searchslot->tts_tupleDescriptor->natts);
598 :
599 : /*
600 : * Start a heap scan using SnapshotAny to identify dead tuples that are
601 : * not visible under a standard MVCC snapshot. Tuples from transactions
602 : * not yet committed or those just committed prior to the scan are
603 : * excluded in update_most_recent_deletion_info().
604 : */
605 4 : scan = table_beginscan(rel, SnapshotAny, 0, NULL);
606 4 : scanslot = table_slot_create(rel, NULL);
607 :
608 4 : table_rescan(scan, NULL);
609 :
610 : /* Try to find the tuple */
611 10 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
612 : {
613 6 : if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
614 2 : continue;
615 :
616 4 : update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
617 : delete_time, delete_origin);
618 : }
619 :
620 4 : table_endscan(scan);
621 4 : ExecDropSingleTupleTableSlot(scanslot);
622 :
623 4 : return *delete_time != 0;
624 : }
625 :
626 : /*
627 : * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
628 : * the deleted tuple.
629 : */
630 : bool
631 2 : RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
632 : TupleTableSlot *searchslot,
633 : TransactionId oldestxmin,
634 : TransactionId *delete_xid,
635 : ReplOriginId *delete_origin,
636 : TimestampTz *delete_time)
637 : {
638 : Relation idxrel;
639 : ScanKeyData skey[INDEX_MAX_KEYS];
640 : int skey_attoff;
641 : IndexScanDesc scan;
642 : TupleTableSlot *scanslot;
643 2 : TypeCacheEntry **eq = NULL;
644 : bool isIdxSafeToSkipDuplicates;
645 2 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
646 :
647 : Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
648 : Assert(OidIsValid(idxoid));
649 :
650 2 : *delete_xid = InvalidTransactionId;
651 2 : *delete_time = 0;
652 2 : *delete_origin = InvalidReplOriginId;
653 :
654 2 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
655 :
656 2 : scanslot = table_slot_create(rel, NULL);
657 :
658 2 : idxrel = index_open(idxoid, RowExclusiveLock);
659 :
660 : /* Build scan key. */
661 2 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
662 :
663 : /*
664 : * Start an index scan using SnapshotAny to identify dead tuples that are
665 : * not visible under a standard MVCC snapshot. Tuples from transactions
666 : * not yet committed or those just committed prior to the scan are
667 : * excluded in update_most_recent_deletion_info().
668 : */
669 2 : scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
670 :
671 2 : index_rescan(scan, skey, skey_attoff, NULL, 0);
672 :
673 : /* Try to find the tuple */
674 4 : while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
675 : {
676 : /*
677 : * Avoid expensive equality check if the index is primary key or
678 : * replica identity index.
679 : */
680 2 : if (!isIdxSafeToSkipDuplicates)
681 : {
682 0 : if (eq == NULL)
683 0 : eq = palloc0_array(TypeCacheEntry *, scanslot->tts_tupleDescriptor->natts);
684 :
685 0 : if (!tuples_equal(scanslot, searchslot, eq, NULL))
686 0 : continue;
687 : }
688 :
689 2 : update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
690 : delete_time, delete_origin);
691 : }
692 :
693 2 : index_endscan(scan);
694 :
695 2 : index_close(idxrel, NoLock);
696 :
697 2 : ExecDropSingleTupleTableSlot(scanslot);
698 :
699 2 : return *delete_time != 0;
700 : }
701 :
702 : /*
703 : * Find the tuple that violates the passed unique index (conflictindex).
704 : *
705 : * If the conflicting tuple is found return true, otherwise false.
706 : *
707 : * We lock the tuple to avoid getting it deleted before the caller can fetch
708 : * the required information. Note that if the tuple is deleted before a lock
709 : * is acquired, we will retry to find the conflicting tuple again.
710 : */
711 : static bool
712 92 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
713 : Oid conflictindex, TupleTableSlot *slot,
714 : TupleTableSlot **conflictslot)
715 : {
716 92 : Relation rel = resultRelInfo->ri_RelationDesc;
717 : ItemPointerData conflictTid;
718 : TM_FailureData tmfd;
719 : TM_Result res;
720 :
721 92 : *conflictslot = NULL;
722 :
723 : /*
724 : * Build additional information required to check constraints violations.
725 : * See check_exclusion_or_unique_constraint().
726 : */
727 92 : BuildConflictIndexInfo(resultRelInfo, conflictindex);
728 :
729 92 : retry:
730 88 : if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
731 92 : &conflictTid, &slot->tts_tid,
732 92 : list_make1_oid(conflictindex)))
733 : {
734 2 : if (*conflictslot)
735 0 : ExecDropSingleTupleTableSlot(*conflictslot);
736 :
737 2 : *conflictslot = NULL;
738 2 : return false;
739 : }
740 :
741 86 : *conflictslot = table_slot_create(rel, NULL);
742 :
743 86 : PushActiveSnapshot(GetLatestSnapshot());
744 :
745 86 : res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
746 : *conflictslot,
747 : GetCurrentCommandId(false),
748 : LockTupleShare,
749 : LockWaitBlock,
750 : 0 /* don't follow updates */ ,
751 : &tmfd);
752 :
753 86 : PopActiveSnapshot();
754 :
755 86 : if (should_refetch_tuple(res, &tmfd))
756 0 : goto retry;
757 :
758 86 : return true;
759 : }
760 :
761 : /*
762 : * Check all the unique indexes in 'recheckIndexes' for conflict with the
763 : * tuple in 'remoteslot' and report if found.
764 : */
765 : static void
766 54 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
767 : ConflictType type, List *recheckIndexes,
768 : TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
769 : {
770 54 : List *conflicttuples = NIL;
771 : TupleTableSlot *conflictslot;
772 :
773 : /* Check all the unique indexes for conflicts */
774 192 : foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
775 : {
776 180 : if (list_member_oid(recheckIndexes, uniqueidx) &&
777 92 : FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
778 : &conflictslot))
779 : {
780 86 : ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
781 :
782 86 : conflicttuple->slot = conflictslot;
783 86 : conflicttuple->indexoid = uniqueidx;
784 :
785 86 : GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
786 : &conflicttuple->origin, &conflicttuple->ts);
787 :
788 86 : conflicttuples = lappend(conflicttuples, conflicttuple);
789 : }
790 : }
791 :
792 : /* Report the conflict, if found */
793 50 : if (conflicttuples)
794 48 : ReportApplyConflict(estate, resultRelInfo, ERROR,
795 48 : list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
796 : searchslot, remoteslot, conflicttuples);
797 2 : }
798 :
799 : /*
800 : * Insert tuple represented in the slot to the relation, update the indexes,
801 : * and execute any constraints and per-row triggers.
802 : *
803 : * Caller is responsible for opening the indexes.
804 : */
805 : void
806 162518 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
807 : EState *estate, TupleTableSlot *slot)
808 : {
809 162518 : bool skip_tuple = false;
810 162518 : Relation rel = resultRelInfo->ri_RelationDesc;
811 :
812 : /* For now we support only tables. */
813 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
814 :
815 162518 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
816 :
817 : /* BEFORE ROW INSERT Triggers */
818 162518 : if (resultRelInfo->ri_TrigDesc &&
819 40 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
820 : {
821 6 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
822 2 : skip_tuple = true; /* "do nothing" */
823 : }
824 :
825 162518 : if (!skip_tuple)
826 : {
827 162516 : List *recheckIndexes = NIL;
828 : List *conflictindexes;
829 162516 : bool conflict = false;
830 :
831 : /* Compute stored generated columns */
832 162516 : if (rel->rd_att->constr &&
833 90984 : rel->rd_att->constr->has_generated_stored)
834 8 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
835 : CMD_INSERT);
836 :
837 : /* Check the constraints of the tuple */
838 162516 : if (rel->rd_att->constr)
839 90984 : ExecConstraints(resultRelInfo, slot, estate);
840 162516 : if (rel->rd_rel->relispartition)
841 150 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
842 :
843 : /* OK, store the tuple and create index entries for it */
844 162516 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
845 :
846 162516 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
847 :
848 162516 : if (resultRelInfo->ri_NumIndices > 0)
849 121838 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
850 : slot, estate, false,
851 : conflictindexes ? true : false,
852 : &conflict,
853 : conflictindexes, false);
854 :
855 : /*
856 : * Checks the conflict indexes to fetch the conflicting local row and
857 : * reports the conflict. We perform this check here, instead of
858 : * performing an additional index scan before the actual insertion and
859 : * reporting the conflict if any conflicting rows are found. This is
860 : * to avoid the overhead of executing the extra scan for each INSERT
861 : * operation, even when no conflict arises, which could introduce
862 : * significant overhead to replication, particularly in cases where
863 : * conflicts are rare.
864 : *
865 : * XXX OTOH, this could lead to clean-up effort for dead tuples added
866 : * in heap and index in case of conflicts. But as conflicts shouldn't
867 : * be a frequent thing so we preferred to save the performance
868 : * overhead of extra scan before each insertion.
869 : */
870 162516 : if (conflict)
871 50 : CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
872 : recheckIndexes, NULL, slot);
873 :
874 : /* AFTER ROW INSERT Triggers */
875 162468 : ExecARInsertTriggers(estate, resultRelInfo, slot,
876 : recheckIndexes, NULL);
877 :
878 : /*
879 : * XXX we should in theory pass a TransitionCaptureState object to the
880 : * above to capture transition tuples, but after statement triggers
881 : * don't actually get fired by replication yet anyway
882 : */
883 :
884 162468 : list_free(recheckIndexes);
885 : }
886 162470 : }
887 :
888 : /*
889 : * Find the searchslot tuple and update it with data in the slot,
890 : * update the indexes, and execute any constraints and per-row triggers.
891 : *
892 : * Caller is responsible for opening the indexes.
893 : */
894 : void
895 63844 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
896 : EState *estate, EPQState *epqstate,
897 : TupleTableSlot *searchslot, TupleTableSlot *slot)
898 : {
899 63844 : bool skip_tuple = false;
900 63844 : Relation rel = resultRelInfo->ri_RelationDesc;
901 63844 : ItemPointer tid = &(searchslot->tts_tid);
902 :
903 : /*
904 : * We support only non-system tables, with
905 : * check_publication_add_relation() accountable.
906 : */
907 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
908 : Assert(!IsCatalogRelation(rel));
909 :
910 63844 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
911 :
912 : /* BEFORE ROW UPDATE Triggers */
913 63844 : if (resultRelInfo->ri_TrigDesc &&
914 20 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
915 : {
916 6 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
917 : tid, NULL, slot, NULL, NULL, false))
918 4 : skip_tuple = true; /* "do nothing" */
919 : }
920 :
921 63844 : if (!skip_tuple)
922 : {
923 63840 : List *recheckIndexes = NIL;
924 : TU_UpdateIndexes update_indexes;
925 : List *conflictindexes;
926 63840 : bool conflict = false;
927 :
928 : /* Compute stored generated columns */
929 63840 : if (rel->rd_att->constr &&
930 63752 : rel->rd_att->constr->has_generated_stored)
931 4 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
932 : CMD_UPDATE);
933 :
934 : /* Check the constraints of the tuple */
935 63840 : if (rel->rd_att->constr)
936 63752 : ExecConstraints(resultRelInfo, slot, estate);
937 63840 : if (rel->rd_rel->relispartition)
938 24 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
939 :
940 63840 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
941 : &update_indexes);
942 :
943 63840 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
944 :
945 63840 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
946 40358 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
947 : slot, estate, true,
948 : conflictindexes ? true : false,
949 : &conflict, conflictindexes,
950 : (update_indexes == TU_Summarizing));
951 :
952 : /*
953 : * Refer to the comments above the call to CheckAndReportConflict() in
954 : * ExecSimpleRelationInsert to understand why this check is done at
955 : * this point.
956 : */
957 63840 : if (conflict)
958 4 : CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
959 : recheckIndexes, searchslot, slot);
960 :
961 : /* AFTER ROW UPDATE Triggers */
962 63836 : ExecARUpdateTriggers(estate, resultRelInfo,
963 : NULL, NULL,
964 : tid, NULL, slot,
965 : recheckIndexes, NULL, false);
966 :
967 63836 : list_free(recheckIndexes);
968 : }
969 63840 : }
970 :
971 : /*
972 : * Find the searchslot tuple and delete it, and execute any constraints
973 : * and per-row triggers.
974 : *
975 : * Caller is responsible for opening the indexes.
976 : */
977 : void
978 80624 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
979 : EState *estate, EPQState *epqstate,
980 : TupleTableSlot *searchslot)
981 : {
982 80624 : bool skip_tuple = false;
983 80624 : Relation rel = resultRelInfo->ri_RelationDesc;
984 80624 : ItemPointer tid = &searchslot->tts_tid;
985 :
986 80624 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
987 :
988 : /* BEFORE ROW DELETE Triggers */
989 80624 : if (resultRelInfo->ri_TrigDesc &&
990 20 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
991 : {
992 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
993 0 : tid, NULL, NULL, NULL, NULL, false);
994 : }
995 :
996 80624 : if (!skip_tuple)
997 : {
998 : /* OK, delete the tuple */
999 80624 : simple_table_tuple_delete(rel, tid, estate->es_snapshot);
1000 :
1001 : /* AFTER ROW DELETE Triggers */
1002 80624 : ExecARDeleteTriggers(estate, resultRelInfo,
1003 : tid, NULL, NULL, false);
1004 : }
1005 80624 : }
1006 :
1007 : /*
1008 : * Check if command can be executed with current replica identity.
1009 : */
1010 : void
1011 439918 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
1012 : {
1013 : PublicationDesc pubdesc;
1014 :
1015 : /*
1016 : * Skip checking the replica identity for partitioned tables, because the
1017 : * operations are actually performed on the leaf partitions.
1018 : */
1019 439918 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1020 415434 : return;
1021 :
1022 : /* We only need to do checks for UPDATE and DELETE. */
1023 435084 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
1024 258448 : return;
1025 :
1026 : /*
1027 : * It is only safe to execute UPDATE/DELETE if the relation does not
1028 : * publish UPDATEs or DELETEs, or all the following conditions are
1029 : * satisfied:
1030 : *
1031 : * 1. All columns, referenced in the row filters from publications which
1032 : * the relation is in, are valid - i.e. when all referenced columns are
1033 : * part of REPLICA IDENTITY.
1034 : *
1035 : * 2. All columns, referenced in the column lists are valid - i.e. when
1036 : * all columns referenced in the REPLICA IDENTITY are covered by the
1037 : * column list.
1038 : *
1039 : * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
1040 : * - i.e. when all these generated columns are published.
1041 : *
1042 : * XXX We could optimize it by first checking whether any of the
1043 : * publications have a row filter or column list for this relation, or if
1044 : * the relation contains a generated column. If none of these exist and
1045 : * the relation has replica identity then we can avoid building the
1046 : * descriptor but as this happens only one time it doesn't seem worth the
1047 : * additional complexity.
1048 : */
1049 176636 : RelationBuildPublicationDesc(rel, &pubdesc);
1050 176636 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
1051 60 : ereport(ERROR,
1052 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1053 : errmsg("cannot update table \"%s\"",
1054 : RelationGetRelationName(rel)),
1055 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1056 176576 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
1057 108 : ereport(ERROR,
1058 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1059 : errmsg("cannot update table \"%s\"",
1060 : RelationGetRelationName(rel)),
1061 : errdetail("Column list used by the publication does not cover the replica identity.")));
1062 176468 : else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
1063 24 : ereport(ERROR,
1064 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1065 : errmsg("cannot update table \"%s\"",
1066 : RelationGetRelationName(rel)),
1067 : errdetail("Replica identity must not contain unpublished generated columns.")));
1068 176444 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
1069 0 : ereport(ERROR,
1070 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1071 : errmsg("cannot delete from table \"%s\"",
1072 : RelationGetRelationName(rel)),
1073 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1074 176444 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
1075 0 : ereport(ERROR,
1076 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1077 : errmsg("cannot delete from table \"%s\"",
1078 : RelationGetRelationName(rel)),
1079 : errdetail("Column list used by the publication does not cover the replica identity.")));
1080 176444 : else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
1081 0 : ereport(ERROR,
1082 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1083 : errmsg("cannot delete from table \"%s\"",
1084 : RelationGetRelationName(rel)),
1085 : errdetail("Replica identity must not contain unpublished generated columns.")));
1086 :
1087 : /* If relation has replica identity we are always good. */
1088 176444 : if (OidIsValid(RelationGetReplicaIndex(rel)))
1089 151694 : return;
1090 :
1091 : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
1092 24750 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
1093 458 : return;
1094 :
1095 : /*
1096 : * This is UPDATE/DELETE and there is no replica identity.
1097 : *
1098 : * Check if the table publishes UPDATES or DELETES.
1099 : */
1100 24292 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
1101 124 : ereport(ERROR,
1102 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1103 : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
1104 : RelationGetRelationName(rel)),
1105 : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
1106 24168 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
1107 16 : ereport(ERROR,
1108 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1109 : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
1110 : RelationGetRelationName(rel)),
1111 : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
1112 : }
1113 :
1114 :
1115 : /*
1116 : * Check if we support writing into specific relkind of local relation and check
1117 : * if it aligns with the relkind of the relation on the publisher.
1118 : *
1119 : * The nspname and relname are only needed for error reporting.
1120 : */
1121 : void
1122 1866 : CheckSubscriptionRelkind(char localrelkind, char remoterelkind,
1123 : const char *nspname, const char *relname)
1124 : {
1125 1866 : if (localrelkind != RELKIND_RELATION &&
1126 34 : localrelkind != RELKIND_PARTITIONED_TABLE &&
1127 : localrelkind != RELKIND_SEQUENCE)
1128 0 : ereport(ERROR,
1129 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1130 : errmsg("cannot use relation \"%s.%s\" as logical replication target",
1131 : nspname, relname),
1132 : errdetail_relkind_not_supported(localrelkind)));
1133 :
1134 : /*
1135 : * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
1136 : * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
1137 : * exactly on both publisher and subscriber.
1138 : */
1139 1866 : if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||
1140 1832 : (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))
1141 0 : ereport(ERROR,
1142 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1143 : /* translator: 3rd and 4th %s are "sequence" or "table" */
1144 : errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
1145 : nspname, relname,
1146 : remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
1147 : localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
1148 1866 : }
|