Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execReplication.c
4 : * miscellaneous executor routines for logical replication
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execReplication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/commit_ts.h"
18 : #include "access/genam.h"
19 : #include "access/gist.h"
20 : #include "access/relscan.h"
21 : #include "access/tableam.h"
22 : #include "access/transam.h"
23 : #include "access/xact.h"
24 : #include "access/heapam.h"
25 : #include "catalog/pg_am_d.h"
26 : #include "commands/trigger.h"
27 : #include "executor/executor.h"
28 : #include "executor/nodeModifyTable.h"
29 : #include "replication/conflict.h"
30 : #include "replication/logicalrelation.h"
31 : #include "storage/lmgr.h"
32 : #include "utils/builtins.h"
33 : #include "utils/lsyscache.h"
34 : #include "utils/rel.h"
35 : #include "utils/snapmgr.h"
36 : #include "utils/syscache.h"
37 : #include "utils/typcache.h"
38 :
39 :
40 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
41 : TypeCacheEntry **eq, Bitmapset *columns);
42 :
43 : /*
44 : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
45 : * is setup to match 'rel' (*NOT* idxrel!).
46 : *
47 : * Returns how many columns to use for the index scan.
48 : *
49 : * This is not generic routine, idxrel must be PK, RI, or an index that can be
50 : * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
51 : * for details.
52 : *
53 : * By definition, replication identity of a rel meets all limitations associated
54 : * with that. Note that any other index could also meet these limitations.
55 : */
56 : static int
57 144210 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
58 : TupleTableSlot *searchslot)
59 : {
60 : int index_attoff;
61 144210 : int skey_attoff = 0;
62 : Datum indclassDatum;
63 : oidvector *opclass;
64 144210 : int2vector *indkey = &idxrel->rd_index->indkey;
65 :
66 144210 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
67 : Anum_pg_index_indclass);
68 144210 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
69 :
70 : /* Build scankey for every non-expression attribute in the index. */
71 288466 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
72 144256 : index_attoff++)
73 : {
74 : Oid operator;
75 : Oid optype;
76 : Oid opfamily;
77 : RegProcedure regop;
78 144256 : int table_attno = indkey->values[index_attoff];
79 : StrategyNumber eq_strategy;
80 :
81 144256 : if (!AttributeNumberIsValid(table_attno))
82 : {
83 : /*
84 : * XXX: Currently, we don't support expressions in the scan key,
85 : * see code below.
86 : */
87 4 : continue;
88 : }
89 :
90 : /*
91 : * Load the operator info. We need this to get the equality operator
92 : * function for the scan key.
93 : */
94 144252 : optype = get_opclass_input_type(opclass->values[index_attoff]);
95 144252 : opfamily = get_opclass_family(opclass->values[index_attoff]);
96 144252 : eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
97 144252 : operator = get_opfamily_member(opfamily, optype,
98 : optype,
99 : eq_strategy);
100 :
101 144252 : if (!OidIsValid(operator))
102 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
103 : eq_strategy, optype, optype, opfamily);
104 :
105 144252 : regop = get_opcode(operator);
106 :
107 : /* Initialize the scankey. */
108 144252 : ScanKeyInit(&skey[skey_attoff],
109 144252 : index_attoff + 1,
110 : eq_strategy,
111 : regop,
112 144252 : searchslot->tts_values[table_attno - 1]);
113 :
114 144252 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
115 :
116 : /* Check for null value. */
117 144252 : if (searchslot->tts_isnull[table_attno - 1])
118 2 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
119 :
120 144252 : skey_attoff++;
121 : }
122 :
123 : /* There must always be at least one attribute for the index scan. */
124 : Assert(skey_attoff > 0);
125 :
126 144210 : return skey_attoff;
127 : }
128 :
129 :
130 : /*
131 : * Helper function to check if it is necessary to re-fetch and lock the tuple
132 : * due to concurrent modifications. This function should be called after
133 : * invoking table_tuple_lock.
134 : */
135 : static bool
136 144528 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
137 : {
138 144528 : bool refetch = false;
139 :
140 144528 : switch (res)
141 : {
142 144528 : case TM_Ok:
143 144528 : break;
144 0 : case TM_Updated:
145 : /* XXX: Improve handling here */
146 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
147 0 : ereport(LOG,
148 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
149 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
150 : else
151 0 : ereport(LOG,
152 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
153 : errmsg("concurrent update, retrying")));
154 0 : refetch = true;
155 0 : break;
156 0 : case TM_Deleted:
157 : /* XXX: Improve handling here */
158 0 : ereport(LOG,
159 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
160 : errmsg("concurrent delete, retrying")));
161 0 : refetch = true;
162 0 : break;
163 0 : case TM_Invisible:
164 0 : elog(ERROR, "attempted to lock invisible tuple");
165 : break;
166 0 : default:
167 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
168 : break;
169 : }
170 :
171 144528 : return refetch;
172 : }
173 :
174 : /*
175 : * Search the relation 'rel' for tuple using the index.
176 : *
177 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
178 : * contents, and return true. Return false otherwise.
179 : */
180 : bool
181 144208 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
182 : LockTupleMode lockmode,
183 : TupleTableSlot *searchslot,
184 : TupleTableSlot *outslot)
185 : {
186 : ScanKeyData skey[INDEX_MAX_KEYS];
187 : int skey_attoff;
188 : IndexScanDesc scan;
189 : SnapshotData snap;
190 : TransactionId xwait;
191 : Relation idxrel;
192 : bool found;
193 144208 : TypeCacheEntry **eq = NULL;
194 : bool isIdxSafeToSkipDuplicates;
195 :
196 : /* Open the index. */
197 144208 : idxrel = index_open(idxoid, RowExclusiveLock);
198 :
199 144208 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
200 :
201 144208 : InitDirtySnapshot(snap);
202 :
203 : /* Build scan key. */
204 144208 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
205 :
206 : /* Start an index scan. */
207 144208 : scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
208 :
209 0 : retry:
210 144208 : found = false;
211 :
212 144208 : index_rescan(scan, skey, skey_attoff, NULL, 0);
213 :
214 : /* Try to find the tuple */
215 144208 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
216 : {
217 : /*
218 : * Avoid expensive equality check if the index is primary key or
219 : * replica identity index.
220 : */
221 144176 : if (!isIdxSafeToSkipDuplicates)
222 : {
223 34 : if (eq == NULL)
224 34 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
225 :
226 34 : if (!tuples_equal(outslot, searchslot, eq, NULL))
227 0 : continue;
228 : }
229 :
230 144176 : ExecMaterializeSlot(outslot);
231 :
232 288352 : xwait = TransactionIdIsValid(snap.xmin) ?
233 144176 : snap.xmin : snap.xmax;
234 :
235 : /*
236 : * If the tuple is locked, wait for locking transaction to finish and
237 : * retry.
238 : */
239 144176 : if (TransactionIdIsValid(xwait))
240 : {
241 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
242 0 : goto retry;
243 : }
244 :
245 : /* Found our tuple and it's not locked */
246 144176 : found = true;
247 144176 : break;
248 : }
249 :
250 : /* Found tuple, try to lock it in the lockmode. */
251 144208 : if (found)
252 : {
253 : TM_FailureData tmfd;
254 : TM_Result res;
255 :
256 144176 : PushActiveSnapshot(GetLatestSnapshot());
257 :
258 144176 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
259 : outslot,
260 : GetCurrentCommandId(false),
261 : lockmode,
262 : LockWaitBlock,
263 : 0 /* don't follow updates */ ,
264 : &tmfd);
265 :
266 144176 : PopActiveSnapshot();
267 :
268 144176 : if (should_refetch_tuple(res, &tmfd))
269 0 : goto retry;
270 : }
271 :
272 144208 : index_endscan(scan);
273 :
274 : /* Don't release lock until commit. */
275 144208 : index_close(idxrel, NoLock);
276 :
277 144208 : return found;
278 : }
279 :
280 : /*
281 : * Compare the tuples in the slots by checking if they have equal values.
282 : *
283 : * If 'columns' is not null, only the columns specified within it will be
284 : * considered for the equality check, ignoring all other columns.
285 : */
286 : static bool
287 210646 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
288 : TypeCacheEntry **eq, Bitmapset *columns)
289 : {
290 : int attrnum;
291 :
292 : Assert(slot1->tts_tupleDescriptor->natts ==
293 : slot2->tts_tupleDescriptor->natts);
294 :
295 210646 : slot_getallattrs(slot1);
296 210646 : slot_getallattrs(slot2);
297 :
298 : /* Check equality of the attributes. */
299 211046 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
300 : {
301 : Form_pg_attribute att;
302 : TypeCacheEntry *typentry;
303 :
304 210718 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
305 :
306 : /*
307 : * Ignore dropped and generated columns as the publisher doesn't send
308 : * those
309 : */
310 210718 : if (att->attisdropped || att->attgenerated)
311 2 : continue;
312 :
313 : /*
314 : * Ignore columns that are not listed for checking.
315 : */
316 210716 : if (columns &&
317 0 : !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
318 : columns))
319 0 : continue;
320 :
321 : /*
322 : * If one value is NULL and other is not, then they are certainly not
323 : * equal
324 : */
325 210716 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
326 0 : return false;
327 :
328 : /*
329 : * If both are NULL, they can be considered equal.
330 : */
331 210716 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
332 2 : continue;
333 :
334 210714 : typentry = eq[attrnum];
335 210714 : if (typentry == NULL)
336 : {
337 400 : typentry = lookup_type_cache(att->atttypid,
338 : TYPECACHE_EQ_OPR_FINFO);
339 400 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
340 0 : ereport(ERROR,
341 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
342 : errmsg("could not identify an equality operator for type %s",
343 : format_type_be(att->atttypid))));
344 400 : eq[attrnum] = typentry;
345 : }
346 :
347 210714 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
348 : att->attcollation,
349 210714 : slot1->tts_values[attrnum],
350 210714 : slot2->tts_values[attrnum])))
351 210318 : return false;
352 : }
353 :
354 328 : return true;
355 : }
356 :
357 : /*
358 : * Search the relation 'rel' for tuple using the sequential scan.
359 : *
360 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
361 : * contents, and return true. Return false otherwise.
362 : *
363 : * Note that this stops on the first matching tuple.
364 : *
365 : * This can obviously be quite slow on tables that have more than few rows.
366 : */
367 : bool
368 298 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
369 : TupleTableSlot *searchslot, TupleTableSlot *outslot)
370 : {
371 : TupleTableSlot *scanslot;
372 : TableScanDesc scan;
373 : SnapshotData snap;
374 : TypeCacheEntry **eq;
375 : TransactionId xwait;
376 : bool found;
377 298 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
378 :
379 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
380 :
381 298 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
382 :
383 : /* Start a heap scan. */
384 298 : InitDirtySnapshot(snap);
385 298 : scan = table_beginscan(rel, &snap, 0, NULL);
386 298 : scanslot = table_slot_create(rel, NULL);
387 :
388 0 : retry:
389 298 : found = false;
390 :
391 298 : table_rescan(scan, NULL);
392 :
393 : /* Try to find the tuple */
394 210616 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
395 : {
396 210612 : if (!tuples_equal(scanslot, searchslot, eq, NULL))
397 210318 : continue;
398 :
399 294 : found = true;
400 294 : ExecCopySlot(outslot, scanslot);
401 :
402 588 : xwait = TransactionIdIsValid(snap.xmin) ?
403 294 : snap.xmin : snap.xmax;
404 :
405 : /*
406 : * If the tuple is locked, wait for locking transaction to finish and
407 : * retry.
408 : */
409 294 : if (TransactionIdIsValid(xwait))
410 : {
411 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
412 0 : goto retry;
413 : }
414 :
415 : /* Found our tuple and it's not locked */
416 294 : break;
417 : }
418 :
419 : /* Found tuple, try to lock it in the lockmode. */
420 298 : if (found)
421 : {
422 : TM_FailureData tmfd;
423 : TM_Result res;
424 :
425 294 : PushActiveSnapshot(GetLatestSnapshot());
426 :
427 294 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
428 : outslot,
429 : GetCurrentCommandId(false),
430 : lockmode,
431 : LockWaitBlock,
432 : 0 /* don't follow updates */ ,
433 : &tmfd);
434 :
435 294 : PopActiveSnapshot();
436 :
437 294 : if (should_refetch_tuple(res, &tmfd))
438 0 : goto retry;
439 : }
440 :
441 298 : table_endscan(scan);
442 298 : ExecDropSingleTupleTableSlot(scanslot);
443 :
444 298 : return found;
445 : }
446 :
447 : /*
448 : * Build additional index information necessary for conflict detection.
449 : */
450 : static void
451 62 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
452 : {
453 188 : for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
454 : {
455 126 : Relation indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
456 126 : IndexInfo *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
457 :
458 126 : if (conflictindex != RelationGetRelid(indexRelation))
459 64 : continue;
460 :
461 : /*
462 : * This Assert will fail if BuildSpeculativeIndexInfo() is called
463 : * twice for the given index.
464 : */
465 : Assert(indexRelationInfo->ii_UniqueOps == NULL);
466 :
467 62 : BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
468 : }
469 62 : }
470 :
471 : /*
472 : * If the tuple is recently dead and was deleted by a transaction with a newer
473 : * commit timestamp than previously recorded, update the associated transaction
474 : * ID, commit time, and origin. This helps ensure that conflict detection uses
475 : * the most recent and relevant deletion metadata.
476 : */
477 : static void
478 2 : update_most_recent_deletion_info(TupleTableSlot *scanslot,
479 : TransactionId oldestxmin,
480 : TransactionId *delete_xid,
481 : TimestampTz *delete_time,
482 : RepOriginId *delete_origin)
483 : {
484 : BufferHeapTupleTableSlot *hslot;
485 : HeapTuple tuple;
486 : Buffer buf;
487 2 : bool recently_dead = false;
488 : TransactionId xmax;
489 : TimestampTz localts;
490 : RepOriginId localorigin;
491 :
492 2 : hslot = (BufferHeapTupleTableSlot *) scanslot;
493 :
494 2 : tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
495 2 : buf = hslot->buffer;
496 :
497 2 : LockBuffer(buf, BUFFER_LOCK_SHARE);
498 :
499 : /*
500 : * We do not consider HEAPTUPLE_DEAD status because it indicates either
501 : * tuples whose inserting transaction was aborted (meaning there is no
502 : * commit timestamp or origin), or tuples deleted by a transaction older
503 : * than oldestxmin, making it safe to ignore them during conflict
504 : * detection (See comments atop worker.c for details).
505 : */
506 2 : if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
507 2 : recently_dead = true;
508 :
509 2 : LockBuffer(buf, BUFFER_LOCK_UNLOCK);
510 :
511 2 : if (!recently_dead)
512 0 : return;
513 :
514 2 : xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
515 2 : if (!TransactionIdIsValid(xmax))
516 0 : return;
517 :
518 : /* Select the dead tuple with the most recent commit timestamp */
519 4 : if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
520 2 : TimestampDifferenceExceeds(*delete_time, localts, 0))
521 : {
522 2 : *delete_xid = xmax;
523 2 : *delete_time = localts;
524 2 : *delete_origin = localorigin;
525 : }
526 : }
527 :
528 : /*
529 : * Searches the relation 'rel' for the most recently deleted tuple that matches
530 : * the values in 'searchslot' and is not yet removable by VACUUM. The function
531 : * returns the transaction ID, origin, and commit timestamp of the transaction
532 : * that deleted this tuple.
533 : *
534 : * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
535 : * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
536 : * conflict detection.
537 : *
538 : * Instead of stopping at the first match, we scan all matching dead tuples to
539 : * identify most recent deletion. This is crucial because only the latest
540 : * deletion is relevant for resolving conflicts.
541 : *
542 : * For example, consider a scenario on the subscriber where a row is deleted,
543 : * re-inserted, and then deleted again only on the subscriber:
544 : *
545 : * - (pk, 1) - deleted at 9:00,
546 : * - (pk, 1) - deleted at 9:02,
547 : *
548 : * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
549 : *
550 : * If we mistakenly return the older deletion (9:00), the system may wrongly
551 : * apply the remote update using a last-update-wins strategy. Instead, we must
552 : * recognize the more recent deletion at 9:02 and skip the update. See
553 : * comments atop worker.c for details. Note, as of now, conflict resolution
554 : * is not implemented. Consequently, the system may incorrectly report the
555 : * older tuple as the conflicted one, leading to misleading results.
556 : *
557 : * The commit timestamp of the deleting transaction is used to determine which
558 : * tuple was deleted most recently.
559 : */
560 : bool
561 0 : RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
562 : TransactionId oldestxmin,
563 : TransactionId *delete_xid,
564 : RepOriginId *delete_origin,
565 : TimestampTz *delete_time)
566 : {
567 : TupleTableSlot *scanslot;
568 : TableScanDesc scan;
569 : TypeCacheEntry **eq;
570 : Bitmapset *indexbitmap;
571 0 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
572 :
573 : Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
574 :
575 0 : *delete_xid = InvalidTransactionId;
576 0 : *delete_origin = InvalidRepOriginId;
577 0 : *delete_time = 0;
578 :
579 : /*
580 : * If the relation has a replica identity key or a primary key that is
581 : * unusable for locating deleted tuples (see
582 : * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
583 : * necessary. In such cases, comparing the entire tuple is not required,
584 : * since the remote tuple might not include all column values. Instead,
585 : * the indexed columns alone are sufficient to identify the target tuple
586 : * (see logicalrep_rel_mark_updatable).
587 : */
588 0 : indexbitmap = RelationGetIndexAttrBitmap(rel,
589 : INDEX_ATTR_BITMAP_IDENTITY_KEY);
590 :
591 : /* fallback to PK if no replica identity */
592 0 : if (!indexbitmap)
593 0 : indexbitmap = RelationGetIndexAttrBitmap(rel,
594 : INDEX_ATTR_BITMAP_PRIMARY_KEY);
595 :
596 0 : eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts);
597 :
598 : /*
599 : * Start a heap scan using SnapshotAny to identify dead tuples that are
600 : * not visible under a standard MVCC snapshot. Tuples from transactions
601 : * not yet committed or those just committed prior to the scan are
602 : * excluded in update_most_recent_deletion_info().
603 : */
604 0 : scan = table_beginscan(rel, SnapshotAny, 0, NULL);
605 0 : scanslot = table_slot_create(rel, NULL);
606 :
607 0 : table_rescan(scan, NULL);
608 :
609 : /* Try to find the tuple */
610 0 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
611 : {
612 0 : if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
613 0 : continue;
614 :
615 0 : update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
616 : delete_time, delete_origin);
617 : }
618 :
619 0 : table_endscan(scan);
620 0 : ExecDropSingleTupleTableSlot(scanslot);
621 :
622 0 : return *delete_time != 0;
623 : }
624 :
625 : /*
626 : * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
627 : * the deleted tuple.
628 : */
629 : bool
630 2 : RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
631 : TupleTableSlot *searchslot,
632 : TransactionId oldestxmin,
633 : TransactionId *delete_xid,
634 : RepOriginId *delete_origin,
635 : TimestampTz *delete_time)
636 : {
637 : Relation idxrel;
638 : ScanKeyData skey[INDEX_MAX_KEYS];
639 : int skey_attoff;
640 : IndexScanDesc scan;
641 : TupleTableSlot *scanslot;
642 2 : TypeCacheEntry **eq = NULL;
643 : bool isIdxSafeToSkipDuplicates;
644 2 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
645 :
646 : Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
647 : Assert(OidIsValid(idxoid));
648 :
649 2 : *delete_xid = InvalidTransactionId;
650 2 : *delete_time = 0;
651 2 : *delete_origin = InvalidRepOriginId;
652 :
653 2 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
654 :
655 2 : scanslot = table_slot_create(rel, NULL);
656 :
657 2 : idxrel = index_open(idxoid, RowExclusiveLock);
658 :
659 : /* Build scan key. */
660 2 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
661 :
662 : /*
663 : * Start an index scan using SnapshotAny to identify dead tuples that are
664 : * not visible under a standard MVCC snapshot. Tuples from transactions
665 : * not yet committed or those just committed prior to the scan are
666 : * excluded in update_most_recent_deletion_info().
667 : */
668 2 : scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
669 :
670 2 : index_rescan(scan, skey, skey_attoff, NULL, 0);
671 :
672 : /* Try to find the tuple */
673 4 : while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
674 : {
675 : /*
676 : * Avoid expensive equality check if the index is primary key or
677 : * replica identity index.
678 : */
679 2 : if (!isIdxSafeToSkipDuplicates)
680 : {
681 0 : if (eq == NULL)
682 0 : eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts);
683 :
684 0 : if (!tuples_equal(scanslot, searchslot, eq, NULL))
685 0 : continue;
686 : }
687 :
688 2 : update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
689 : delete_time, delete_origin);
690 : }
691 :
692 2 : index_endscan(scan);
693 :
694 2 : index_close(idxrel, NoLock);
695 :
696 2 : ExecDropSingleTupleTableSlot(scanslot);
697 :
698 2 : return *delete_time != 0;
699 : }
700 :
701 : /*
702 : * Find the tuple that violates the passed unique index (conflictindex).
703 : *
704 : * If the conflicting tuple is found return true, otherwise false.
705 : *
706 : * We lock the tuple to avoid getting it deleted before the caller can fetch
707 : * the required information. Note that if the tuple is deleted before a lock
708 : * is acquired, we will retry to find the conflicting tuple again.
709 : */
710 : static bool
711 62 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
712 : Oid conflictindex, TupleTableSlot *slot,
713 : TupleTableSlot **conflictslot)
714 : {
715 62 : Relation rel = resultRelInfo->ri_RelationDesc;
716 : ItemPointerData conflictTid;
717 : TM_FailureData tmfd;
718 : TM_Result res;
719 :
720 62 : *conflictslot = NULL;
721 :
722 : /*
723 : * Build additional information required to check constraints violations.
724 : * See check_exclusion_or_unique_constraint().
725 : */
726 62 : BuildConflictIndexInfo(resultRelInfo, conflictindex);
727 :
728 62 : retry:
729 62 : if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
730 : &conflictTid, &slot->tts_tid,
731 62 : list_make1_oid(conflictindex)))
732 : {
733 2 : if (*conflictslot)
734 0 : ExecDropSingleTupleTableSlot(*conflictslot);
735 :
736 2 : *conflictslot = NULL;
737 2 : return false;
738 : }
739 :
740 58 : *conflictslot = table_slot_create(rel, NULL);
741 :
742 58 : PushActiveSnapshot(GetLatestSnapshot());
743 :
744 58 : res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
745 : *conflictslot,
746 : GetCurrentCommandId(false),
747 : LockTupleShare,
748 : LockWaitBlock,
749 : 0 /* don't follow updates */ ,
750 : &tmfd);
751 :
752 58 : PopActiveSnapshot();
753 :
754 58 : if (should_refetch_tuple(res, &tmfd))
755 0 : goto retry;
756 :
757 58 : return true;
758 : }
759 :
760 : /*
761 : * Check all the unique indexes in 'recheckIndexes' for conflict with the
762 : * tuple in 'remoteslot' and report if found.
763 : */
764 : static void
765 36 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
766 : ConflictType type, List *recheckIndexes,
767 : TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
768 : {
769 36 : List *conflicttuples = NIL;
770 : TupleTableSlot *conflictslot;
771 :
772 : /* Check all the unique indexes for conflicts */
773 130 : foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
774 : {
775 122 : if (list_member_oid(recheckIndexes, uniqueidx) &&
776 62 : FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
777 : &conflictslot))
778 : {
779 58 : ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
780 :
781 58 : conflicttuple->slot = conflictslot;
782 58 : conflicttuple->indexoid = uniqueidx;
783 :
784 58 : GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
785 : &conflicttuple->origin, &conflicttuple->ts);
786 :
787 58 : conflicttuples = lappend(conflicttuples, conflicttuple);
788 : }
789 : }
790 :
791 : /* Report the conflict, if found */
792 34 : if (conflicttuples)
793 32 : ReportApplyConflict(estate, resultRelInfo, ERROR,
794 32 : list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
795 : searchslot, remoteslot, conflicttuples);
796 2 : }
797 :
798 : /*
799 : * Insert tuple represented in the slot to the relation, update the indexes,
800 : * and execute any constraints and per-row triggers.
801 : *
802 : * Caller is responsible for opening the indexes.
803 : */
804 : void
805 152542 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
806 : EState *estate, TupleTableSlot *slot)
807 : {
808 152542 : bool skip_tuple = false;
809 152542 : Relation rel = resultRelInfo->ri_RelationDesc;
810 :
811 : /* For now we support only tables. */
812 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
813 :
814 152542 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
815 :
816 : /* BEFORE ROW INSERT Triggers */
817 152542 : if (resultRelInfo->ri_TrigDesc &&
818 40 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
819 : {
820 6 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
821 2 : skip_tuple = true; /* "do nothing" */
822 : }
823 :
824 152542 : if (!skip_tuple)
825 : {
826 152540 : List *recheckIndexes = NIL;
827 : List *conflictindexes;
828 152540 : bool conflict = false;
829 :
830 : /* Compute stored generated columns */
831 152540 : if (rel->rd_att->constr &&
832 90948 : rel->rd_att->constr->has_generated_stored)
833 8 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
834 : CMD_INSERT);
835 :
836 : /* Check the constraints of the tuple */
837 152540 : if (rel->rd_att->constr)
838 90948 : ExecConstraints(resultRelInfo, slot, estate);
839 152540 : if (rel->rd_rel->relispartition)
840 134 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
841 :
842 : /* OK, store the tuple and create index entries for it */
843 152540 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
844 :
845 152540 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
846 :
847 152540 : if (resultRelInfo->ri_NumIndices > 0)
848 111864 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
849 : slot, estate, false,
850 : conflictindexes ? true : false,
851 : &conflict,
852 : conflictindexes, false);
853 :
854 : /*
855 : * Checks the conflict indexes to fetch the conflicting local tuple
856 : * and reports the conflict. We perform this check here, instead of
857 : * performing an additional index scan before the actual insertion and
858 : * reporting the conflict if any conflicting tuples are found. This is
859 : * to avoid the overhead of executing the extra scan for each INSERT
860 : * operation, even when no conflict arises, which could introduce
861 : * significant overhead to replication, particularly in cases where
862 : * conflicts are rare.
863 : *
864 : * XXX OTOH, this could lead to clean-up effort for dead tuples added
865 : * in heap and index in case of conflicts. But as conflicts shouldn't
866 : * be a frequent thing so we preferred to save the performance
867 : * overhead of extra scan before each insertion.
868 : */
869 152540 : if (conflict)
870 32 : CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
871 : recheckIndexes, NULL, slot);
872 :
873 : /* AFTER ROW INSERT Triggers */
874 152510 : ExecARInsertTriggers(estate, resultRelInfo, slot,
875 : recheckIndexes, NULL);
876 :
877 : /*
878 : * XXX we should in theory pass a TransitionCaptureState object to the
879 : * above to capture transition tuples, but after statement triggers
880 : * don't actually get fired by replication yet anyway
881 : */
882 :
883 152510 : list_free(recheckIndexes);
884 : }
885 152512 : }
886 :
887 : /*
888 : * Find the searchslot tuple and update it with data in the slot,
889 : * update the indexes, and execute any constraints and per-row triggers.
890 : *
891 : * Caller is responsible for opening the indexes.
892 : */
893 : void
894 63846 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
895 : EState *estate, EPQState *epqstate,
896 : TupleTableSlot *searchslot, TupleTableSlot *slot)
897 : {
898 63846 : bool skip_tuple = false;
899 63846 : Relation rel = resultRelInfo->ri_RelationDesc;
900 63846 : ItemPointer tid = &(searchslot->tts_tid);
901 :
902 : /*
903 : * We support only non-system tables, with
904 : * check_publication_add_relation() accountable.
905 : */
906 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
907 : Assert(!IsCatalogRelation(rel));
908 :
909 63846 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
910 :
911 : /* BEFORE ROW UPDATE Triggers */
912 63846 : if (resultRelInfo->ri_TrigDesc &&
913 20 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
914 : {
915 6 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
916 : tid, NULL, slot, NULL, NULL, false))
917 4 : skip_tuple = true; /* "do nothing" */
918 : }
919 :
920 63846 : if (!skip_tuple)
921 : {
922 63842 : List *recheckIndexes = NIL;
923 : TU_UpdateIndexes update_indexes;
924 : List *conflictindexes;
925 63842 : bool conflict = false;
926 :
927 : /* Compute stored generated columns */
928 63842 : if (rel->rd_att->constr &&
929 63752 : rel->rd_att->constr->has_generated_stored)
930 4 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
931 : CMD_UPDATE);
932 :
933 : /* Check the constraints of the tuple */
934 63842 : if (rel->rd_att->constr)
935 63752 : ExecConstraints(resultRelInfo, slot, estate);
936 63842 : if (rel->rd_rel->relispartition)
937 24 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
938 :
939 63842 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
940 : &update_indexes);
941 :
942 63842 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
943 :
944 63842 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
945 40406 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
946 : slot, estate, true,
947 : conflictindexes ? true : false,
948 : &conflict, conflictindexes,
949 : (update_indexes == TU_Summarizing));
950 :
951 : /*
952 : * Refer to the comments above the call to CheckAndReportConflict() in
953 : * ExecSimpleRelationInsert to understand why this check is done at
954 : * this point.
955 : */
956 63842 : if (conflict)
957 4 : CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
958 : recheckIndexes, searchslot, slot);
959 :
960 : /* AFTER ROW UPDATE Triggers */
961 63838 : ExecARUpdateTriggers(estate, resultRelInfo,
962 : NULL, NULL,
963 : tid, NULL, slot,
964 : recheckIndexes, NULL, false);
965 :
966 63838 : list_free(recheckIndexes);
967 : }
968 63842 : }
969 :
970 : /*
971 : * Find the searchslot tuple and delete it, and execute any constraints
972 : * and per-row triggers.
973 : *
974 : * Caller is responsible for opening the indexes.
975 : */
976 : void
977 80624 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
978 : EState *estate, EPQState *epqstate,
979 : TupleTableSlot *searchslot)
980 : {
981 80624 : bool skip_tuple = false;
982 80624 : Relation rel = resultRelInfo->ri_RelationDesc;
983 80624 : ItemPointer tid = &searchslot->tts_tid;
984 :
985 80624 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
986 :
987 : /* BEFORE ROW DELETE Triggers */
988 80624 : if (resultRelInfo->ri_TrigDesc &&
989 20 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
990 : {
991 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
992 0 : tid, NULL, NULL, NULL, NULL, false);
993 : }
994 :
995 80624 : if (!skip_tuple)
996 : {
997 : /* OK, delete the tuple */
998 80624 : simple_table_tuple_delete(rel, tid, estate->es_snapshot);
999 :
1000 : /* AFTER ROW DELETE Triggers */
1001 80624 : ExecARDeleteTriggers(estate, resultRelInfo,
1002 : tid, NULL, NULL, false);
1003 : }
1004 80624 : }
1005 :
1006 : /*
1007 : * Check if command can be executed with current replica identity.
1008 : */
1009 : void
1010 422718 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
1011 : {
1012 : PublicationDesc pubdesc;
1013 :
1014 : /*
1015 : * Skip checking the replica identity for partitioned tables, because the
1016 : * operations are actually performed on the leaf partitions.
1017 : */
1018 422718 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1019 400562 : return;
1020 :
1021 : /* We only need to do checks for UPDATE and DELETE. */
1022 418662 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
1023 246032 : return;
1024 :
1025 : /*
1026 : * It is only safe to execute UPDATE/DELETE if the relation does not
1027 : * publish UPDATEs or DELETEs, or all the following conditions are
1028 : * satisfied:
1029 : *
1030 : * 1. All columns, referenced in the row filters from publications which
1031 : * the relation is in, are valid - i.e. when all referenced columns are
1032 : * part of REPLICA IDENTITY.
1033 : *
1034 : * 2. All columns, referenced in the column lists are valid - i.e. when
1035 : * all columns referenced in the REPLICA IDENTITY are covered by the
1036 : * column list.
1037 : *
1038 : * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
1039 : * - i.e. when all these generated columns are published.
1040 : *
1041 : * XXX We could optimize it by first checking whether any of the
1042 : * publications have a row filter or column list for this relation, or if
1043 : * the relation contains a generated column. If none of these exist and
1044 : * the relation has replica identity then we can avoid building the
1045 : * descriptor but as this happens only one time it doesn't seem worth the
1046 : * additional complexity.
1047 : */
1048 172630 : RelationBuildPublicationDesc(rel, &pubdesc);
1049 172630 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
1050 60 : ereport(ERROR,
1051 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1052 : errmsg("cannot update table \"%s\"",
1053 : RelationGetRelationName(rel)),
1054 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1055 172570 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
1056 108 : ereport(ERROR,
1057 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1058 : errmsg("cannot update table \"%s\"",
1059 : RelationGetRelationName(rel)),
1060 : errdetail("Column list used by the publication does not cover the replica identity.")));
1061 172462 : else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
1062 24 : ereport(ERROR,
1063 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1064 : errmsg("cannot update table \"%s\"",
1065 : RelationGetRelationName(rel)),
1066 : errdetail("Replica identity must not contain unpublished generated columns.")));
1067 172438 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
1068 0 : ereport(ERROR,
1069 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1070 : errmsg("cannot delete from table \"%s\"",
1071 : RelationGetRelationName(rel)),
1072 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1073 172438 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
1074 0 : ereport(ERROR,
1075 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1076 : errmsg("cannot delete from table \"%s\"",
1077 : RelationGetRelationName(rel)),
1078 : errdetail("Column list used by the publication does not cover the replica identity.")));
1079 172438 : else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
1080 0 : ereport(ERROR,
1081 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1082 : errmsg("cannot delete from table \"%s\"",
1083 : RelationGetRelationName(rel)),
1084 : errdetail("Replica identity must not contain unpublished generated columns.")));
1085 :
1086 : /* If relation has replica identity we are always good. */
1087 172438 : if (OidIsValid(RelationGetReplicaIndex(rel)))
1088 150018 : return;
1089 :
1090 : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
1091 22420 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
1092 456 : return;
1093 :
1094 : /*
1095 : * This is UPDATE/DELETE and there is no replica identity.
1096 : *
1097 : * Check if the table publishes UPDATES or DELETES.
1098 : */
1099 21964 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
1100 106 : ereport(ERROR,
1101 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1102 : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
1103 : RelationGetRelationName(rel)),
1104 : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
1105 21858 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
1106 10 : ereport(ERROR,
1107 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1108 : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
1109 : RelationGetRelationName(rel)),
1110 : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
1111 : }
1112 :
1113 :
1114 : /*
1115 : * Check if we support writing into specific relkind.
1116 : *
1117 : * The nspname and relname are only needed for error reporting.
1118 : */
1119 : void
1120 1780 : CheckSubscriptionRelkind(char relkind, const char *nspname,
1121 : const char *relname)
1122 : {
1123 1780 : if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
1124 0 : ereport(ERROR,
1125 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1126 : errmsg("cannot use relation \"%s.%s\" as logical replication target",
1127 : nspname, relname),
1128 : errdetail_relkind_not_supported(relkind)));
1129 1780 : }
|