Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execReplication.c
4 : * miscellaneous executor routines for logical replication
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execReplication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/gist.h"
19 : #include "access/relscan.h"
20 : #include "access/tableam.h"
21 : #include "access/transam.h"
22 : #include "access/xact.h"
23 : #include "catalog/pg_am_d.h"
24 : #include "commands/trigger.h"
25 : #include "executor/executor.h"
26 : #include "executor/nodeModifyTable.h"
27 : #include "replication/conflict.h"
28 : #include "replication/logicalrelation.h"
29 : #include "storage/lmgr.h"
30 : #include "utils/builtins.h"
31 : #include "utils/lsyscache.h"
32 : #include "utils/rel.h"
33 : #include "utils/snapmgr.h"
34 : #include "utils/syscache.h"
35 : #include "utils/typcache.h"
36 :
37 :
38 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
39 : TypeCacheEntry **eq);
40 :
41 : /*
42 : * Returns the fixed strategy number, if any, of the equality operator for the
43 : * given operator class, otherwise, InvalidStrategy.
44 : */
45 : StrategyNumber
46 144304 : get_equal_strategy_number(Oid opclass)
47 : {
48 144304 : Oid am = get_opclass_method(opclass);
49 : int ret;
50 :
51 144304 : switch (am)
52 : {
53 144244 : case BTREE_AM_OID:
54 144244 : ret = BTEqualStrategyNumber;
55 144244 : break;
56 12 : case HASH_AM_OID:
57 12 : ret = HTEqualStrategyNumber;
58 12 : break;
59 48 : case GIST_AM_OID:
60 48 : ret = GistTranslateStratnum(opclass, COMPARE_EQ);
61 48 : break;
62 0 : default:
63 0 : ret = InvalidStrategy;
64 0 : break;
65 : }
66 :
67 144304 : return ret;
68 : }
69 :
70 : /*
71 : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
72 : * is setup to match 'rel' (*NOT* idxrel!).
73 : *
74 : * Returns how many columns to use for the index scan.
75 : *
76 : * This is not generic routine, idxrel must be PK, RI, or an index that can be
77 : * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
78 : * for details.
79 : *
80 : * By definition, replication identity of a rel meets all limitations associated
81 : * with that. Note that any other index could also meet these limitations.
82 : */
83 : static int
84 144194 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
85 : TupleTableSlot *searchslot)
86 : {
87 : int index_attoff;
88 144194 : int skey_attoff = 0;
89 : Datum indclassDatum;
90 : oidvector *opclass;
91 144194 : int2vector *indkey = &idxrel->rd_index->indkey;
92 :
93 144194 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
94 : Anum_pg_index_indclass);
95 144194 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
96 :
97 : /* Build scankey for every non-expression attribute in the index. */
98 288434 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
99 144240 : index_attoff++)
100 : {
101 : Oid operator;
102 : Oid optype;
103 : Oid opfamily;
104 : RegProcedure regop;
105 144240 : int table_attno = indkey->values[index_attoff];
106 : StrategyNumber eq_strategy;
107 :
108 144240 : if (!AttributeNumberIsValid(table_attno))
109 : {
110 : /*
111 : * XXX: Currently, we don't support expressions in the scan key,
112 : * see code below.
113 : */
114 4 : continue;
115 : }
116 :
117 : /*
118 : * Load the operator info. We need this to get the equality operator
119 : * function for the scan key.
120 : */
121 144236 : optype = get_opclass_input_type(opclass->values[index_attoff]);
122 144236 : opfamily = get_opclass_family(opclass->values[index_attoff]);
123 144236 : eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
124 144236 : if (!eq_strategy)
125 0 : elog(ERROR, "missing equal strategy for opclass %u", opclass->values[index_attoff]);
126 :
127 144236 : operator = get_opfamily_member(opfamily, optype,
128 : optype,
129 : eq_strategy);
130 :
131 144236 : if (!OidIsValid(operator))
132 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
133 : eq_strategy, optype, optype, opfamily);
134 :
135 144236 : regop = get_opcode(operator);
136 :
137 : /* Initialize the scankey. */
138 144236 : ScanKeyInit(&skey[skey_attoff],
139 144236 : index_attoff + 1,
140 : eq_strategy,
141 : regop,
142 144236 : searchslot->tts_values[table_attno - 1]);
143 :
144 144236 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
145 :
146 : /* Check for null value. */
147 144236 : if (searchslot->tts_isnull[table_attno - 1])
148 2 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
149 :
150 144236 : skey_attoff++;
151 : }
152 :
153 : /* There must always be at least one attribute for the index scan. */
154 : Assert(skey_attoff > 0);
155 :
156 144194 : return skey_attoff;
157 : }
158 :
159 :
160 : /*
161 : * Helper function to check if it is necessary to re-fetch and lock the tuple
162 : * due to concurrent modifications. This function should be called after
163 : * invoking table_tuple_lock.
164 : */
165 : static bool
166 144478 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
167 : {
168 144478 : bool refetch = false;
169 :
170 144478 : switch (res)
171 : {
172 144478 : case TM_Ok:
173 144478 : break;
174 0 : case TM_Updated:
175 : /* XXX: Improve handling here */
176 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
177 0 : ereport(LOG,
178 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
179 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
180 : else
181 0 : ereport(LOG,
182 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
183 : errmsg("concurrent update, retrying")));
184 0 : refetch = true;
185 0 : break;
186 0 : case TM_Deleted:
187 : /* XXX: Improve handling here */
188 0 : ereport(LOG,
189 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
190 : errmsg("concurrent delete, retrying")));
191 0 : refetch = true;
192 0 : break;
193 0 : case TM_Invisible:
194 0 : elog(ERROR, "attempted to lock invisible tuple");
195 : break;
196 0 : default:
197 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
198 : break;
199 : }
200 :
201 144478 : return refetch;
202 : }
203 :
204 : /*
205 : * Search the relation 'rel' for tuple using the index.
206 : *
207 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
208 : * contents, and return true. Return false otherwise.
209 : */
210 : bool
211 144194 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
212 : LockTupleMode lockmode,
213 : TupleTableSlot *searchslot,
214 : TupleTableSlot *outslot)
215 : {
216 : ScanKeyData skey[INDEX_MAX_KEYS];
217 : int skey_attoff;
218 : IndexScanDesc scan;
219 : SnapshotData snap;
220 : TransactionId xwait;
221 : Relation idxrel;
222 : bool found;
223 144194 : TypeCacheEntry **eq = NULL;
224 : bool isIdxSafeToSkipDuplicates;
225 :
226 : /* Open the index. */
227 144194 : idxrel = index_open(idxoid, RowExclusiveLock);
228 :
229 144194 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
230 :
231 144194 : InitDirtySnapshot(snap);
232 :
233 : /* Build scan key. */
234 144194 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
235 :
236 : /* Start an index scan. */
237 144194 : scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
238 :
239 144194 : retry:
240 144194 : found = false;
241 :
242 144194 : index_rescan(scan, skey, skey_attoff, NULL, 0);
243 :
244 : /* Try to find the tuple */
245 144194 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
246 : {
247 : /*
248 : * Avoid expensive equality check if the index is primary key or
249 : * replica identity index.
250 : */
251 144170 : if (!isIdxSafeToSkipDuplicates)
252 : {
253 34 : if (eq == NULL)
254 34 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
255 :
256 34 : if (!tuples_equal(outslot, searchslot, eq))
257 0 : continue;
258 : }
259 :
260 144170 : ExecMaterializeSlot(outslot);
261 :
262 288340 : xwait = TransactionIdIsValid(snap.xmin) ?
263 144170 : snap.xmin : snap.xmax;
264 :
265 : /*
266 : * If the tuple is locked, wait for locking transaction to finish and
267 : * retry.
268 : */
269 144170 : if (TransactionIdIsValid(xwait))
270 : {
271 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
272 0 : goto retry;
273 : }
274 :
275 : /* Found our tuple and it's not locked */
276 144170 : found = true;
277 144170 : break;
278 : }
279 :
280 : /* Found tuple, try to lock it in the lockmode. */
281 144194 : if (found)
282 : {
283 : TM_FailureData tmfd;
284 : TM_Result res;
285 :
286 144170 : PushActiveSnapshot(GetLatestSnapshot());
287 :
288 144170 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
289 : outslot,
290 : GetCurrentCommandId(false),
291 : lockmode,
292 : LockWaitBlock,
293 : 0 /* don't follow updates */ ,
294 : &tmfd);
295 :
296 144170 : PopActiveSnapshot();
297 :
298 144170 : if (should_refetch_tuple(res, &tmfd))
299 0 : goto retry;
300 : }
301 :
302 144194 : index_endscan(scan);
303 :
304 : /* Don't release lock until commit. */
305 144194 : index_close(idxrel, NoLock);
306 :
307 144194 : return found;
308 : }
309 :
310 : /*
311 : * Compare the tuples in the slots by checking if they have equal values.
312 : */
313 : static bool
314 210646 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
315 : TypeCacheEntry **eq)
316 : {
317 : int attrnum;
318 :
319 : Assert(slot1->tts_tupleDescriptor->natts ==
320 : slot2->tts_tupleDescriptor->natts);
321 :
322 210646 : slot_getallattrs(slot1);
323 210646 : slot_getallattrs(slot2);
324 :
325 : /* Check equality of the attributes. */
326 211046 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
327 : {
328 : Form_pg_attribute att;
329 : TypeCacheEntry *typentry;
330 :
331 210718 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
332 :
333 : /*
334 : * Ignore dropped and generated columns as the publisher doesn't send
335 : * those
336 : */
337 210718 : if (att->attisdropped || att->attgenerated)
338 2 : continue;
339 :
340 : /*
341 : * If one value is NULL and other is not, then they are certainly not
342 : * equal
343 : */
344 210716 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
345 0 : return false;
346 :
347 : /*
348 : * If both are NULL, they can be considered equal.
349 : */
350 210716 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
351 2 : continue;
352 :
353 210714 : typentry = eq[attrnum];
354 210714 : if (typentry == NULL)
355 : {
356 400 : typentry = lookup_type_cache(att->atttypid,
357 : TYPECACHE_EQ_OPR_FINFO);
358 400 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
359 0 : ereport(ERROR,
360 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
361 : errmsg("could not identify an equality operator for type %s",
362 : format_type_be(att->atttypid))));
363 400 : eq[attrnum] = typentry;
364 : }
365 :
366 210714 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
367 : att->attcollation,
368 210714 : slot1->tts_values[attrnum],
369 210714 : slot2->tts_values[attrnum])))
370 210318 : return false;
371 : }
372 :
373 328 : return true;
374 : }
375 :
376 : /*
377 : * Search the relation 'rel' for tuple using the sequential scan.
378 : *
379 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
380 : * contents, and return true. Return false otherwise.
381 : *
382 : * Note that this stops on the first matching tuple.
383 : *
384 : * This can obviously be quite slow on tables that have more than few rows.
385 : */
386 : bool
387 298 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
388 : TupleTableSlot *searchslot, TupleTableSlot *outslot)
389 : {
390 : TupleTableSlot *scanslot;
391 : TableScanDesc scan;
392 : SnapshotData snap;
393 : TypeCacheEntry **eq;
394 : TransactionId xwait;
395 : bool found;
396 298 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
397 :
398 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
399 :
400 298 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
401 :
402 : /* Start a heap scan. */
403 298 : InitDirtySnapshot(snap);
404 298 : scan = table_beginscan(rel, &snap, 0, NULL);
405 298 : scanslot = table_slot_create(rel, NULL);
406 :
407 298 : retry:
408 298 : found = false;
409 :
410 298 : table_rescan(scan, NULL);
411 :
412 : /* Try to find the tuple */
413 210616 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
414 : {
415 210612 : if (!tuples_equal(scanslot, searchslot, eq))
416 210318 : continue;
417 :
418 294 : found = true;
419 294 : ExecCopySlot(outslot, scanslot);
420 :
421 588 : xwait = TransactionIdIsValid(snap.xmin) ?
422 294 : snap.xmin : snap.xmax;
423 :
424 : /*
425 : * If the tuple is locked, wait for locking transaction to finish and
426 : * retry.
427 : */
428 294 : if (TransactionIdIsValid(xwait))
429 : {
430 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
431 0 : goto retry;
432 : }
433 :
434 : /* Found our tuple and it's not locked */
435 294 : break;
436 : }
437 :
438 : /* Found tuple, try to lock it in the lockmode. */
439 298 : if (found)
440 : {
441 : TM_FailureData tmfd;
442 : TM_Result res;
443 :
444 294 : PushActiveSnapshot(GetLatestSnapshot());
445 :
446 294 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
447 : outslot,
448 : GetCurrentCommandId(false),
449 : lockmode,
450 : LockWaitBlock,
451 : 0 /* don't follow updates */ ,
452 : &tmfd);
453 :
454 294 : PopActiveSnapshot();
455 :
456 294 : if (should_refetch_tuple(res, &tmfd))
457 0 : goto retry;
458 : }
459 :
460 298 : table_endscan(scan);
461 298 : ExecDropSingleTupleTableSlot(scanslot);
462 :
463 298 : return found;
464 : }
465 :
466 : /*
467 : * Find the tuple that violates the passed unique index (conflictindex).
468 : *
469 : * If the conflicting tuple is found return true, otherwise false.
470 : *
471 : * We lock the tuple to avoid getting it deleted before the caller can fetch
472 : * the required information. Note that if the tuple is deleted before a lock
473 : * is acquired, we will retry to find the conflicting tuple again.
474 : */
475 : static bool
476 18 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
477 : Oid conflictindex, TupleTableSlot *slot,
478 : TupleTableSlot **conflictslot)
479 : {
480 18 : Relation rel = resultRelInfo->ri_RelationDesc;
481 : ItemPointerData conflictTid;
482 : TM_FailureData tmfd;
483 : TM_Result res;
484 :
485 18 : *conflictslot = NULL;
486 :
487 18 : retry:
488 18 : if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
489 : &conflictTid, &slot->tts_tid,
490 18 : list_make1_oid(conflictindex)))
491 : {
492 2 : if (*conflictslot)
493 0 : ExecDropSingleTupleTableSlot(*conflictslot);
494 :
495 2 : *conflictslot = NULL;
496 2 : return false;
497 : }
498 :
499 14 : *conflictslot = table_slot_create(rel, NULL);
500 :
501 14 : PushActiveSnapshot(GetLatestSnapshot());
502 :
503 14 : res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
504 : *conflictslot,
505 : GetCurrentCommandId(false),
506 : LockTupleShare,
507 : LockWaitBlock,
508 : 0 /* don't follow updates */ ,
509 : &tmfd);
510 :
511 14 : PopActiveSnapshot();
512 :
513 14 : if (should_refetch_tuple(res, &tmfd))
514 0 : goto retry;
515 :
516 14 : return true;
517 : }
518 :
519 : /*
520 : * Check all the unique indexes in 'recheckIndexes' for conflict with the
521 : * tuple in 'remoteslot' and report if found.
522 : */
523 : static void
524 18 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
525 : ConflictType type, List *recheckIndexes,
526 : TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
527 : {
528 : /* Check all the unique indexes for a conflict */
529 22 : foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
530 : {
531 : TupleTableSlot *conflictslot;
532 :
533 34 : if (list_member_oid(recheckIndexes, uniqueidx) &&
534 18 : FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
535 : &conflictslot))
536 : {
537 : RepOriginId origin;
538 : TimestampTz committs;
539 : TransactionId xmin;
540 :
541 14 : GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
542 14 : ReportApplyConflict(estate, resultRelInfo, ERROR, type,
543 : searchslot, conflictslot, remoteslot,
544 : uniqueidx, xmin, origin, committs);
545 : }
546 : }
547 2 : }
548 :
549 : /*
550 : * Insert tuple represented in the slot to the relation, update the indexes,
551 : * and execute any constraints and per-row triggers.
552 : *
553 : * Caller is responsible for opening the indexes.
554 : */
555 : void
556 152668 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
557 : EState *estate, TupleTableSlot *slot)
558 : {
559 152668 : bool skip_tuple = false;
560 152668 : Relation rel = resultRelInfo->ri_RelationDesc;
561 :
562 : /* For now we support only tables. */
563 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
564 :
565 152668 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
566 :
567 : /* BEFORE ROW INSERT Triggers */
568 152668 : if (resultRelInfo->ri_TrigDesc &&
569 38 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
570 : {
571 6 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
572 2 : skip_tuple = true; /* "do nothing" */
573 : }
574 :
575 152668 : if (!skip_tuple)
576 : {
577 152666 : List *recheckIndexes = NIL;
578 : List *conflictindexes;
579 152666 : bool conflict = false;
580 :
581 : /* Compute stored generated columns */
582 152666 : if (rel->rd_att->constr &&
583 90920 : rel->rd_att->constr->has_generated_stored)
584 8 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
585 : CMD_INSERT);
586 :
587 : /* Check the constraints of the tuple */
588 152666 : if (rel->rd_att->constr)
589 90920 : ExecConstraints(resultRelInfo, slot, estate);
590 152666 : if (rel->rd_rel->relispartition)
591 120 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
592 :
593 : /* OK, store the tuple and create index entries for it */
594 152666 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
595 :
596 152666 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
597 :
598 152666 : if (resultRelInfo->ri_NumIndices > 0)
599 111994 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
600 : slot, estate, false,
601 : conflictindexes ? true : false,
602 : &conflict,
603 : conflictindexes, false);
604 :
605 : /*
606 : * Checks the conflict indexes to fetch the conflicting local tuple
607 : * and reports the conflict. We perform this check here, instead of
608 : * performing an additional index scan before the actual insertion and
609 : * reporting the conflict if any conflicting tuples are found. This is
610 : * to avoid the overhead of executing the extra scan for each INSERT
611 : * operation, even when no conflict arises, which could introduce
612 : * significant overhead to replication, particularly in cases where
613 : * conflicts are rare.
614 : *
615 : * XXX OTOH, this could lead to clean-up effort for dead tuples added
616 : * in heap and index in case of conflicts. But as conflicts shouldn't
617 : * be a frequent thing so we preferred to save the performance
618 : * overhead of extra scan before each insertion.
619 : */
620 152666 : if (conflict)
621 16 : CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
622 : recheckIndexes, NULL, slot);
623 :
624 : /* AFTER ROW INSERT Triggers */
625 152652 : ExecARInsertTriggers(estate, resultRelInfo, slot,
626 : recheckIndexes, NULL);
627 :
628 : /*
629 : * XXX we should in theory pass a TransitionCaptureState object to the
630 : * above to capture transition tuples, but after statement triggers
631 : * don't actually get fired by replication yet anyway
632 : */
633 :
634 152652 : list_free(recheckIndexes);
635 : }
636 152654 : }
637 :
638 : /*
639 : * Find the searchslot tuple and update it with data in the slot,
640 : * update the indexes, and execute any constraints and per-row triggers.
641 : *
642 : * Caller is responsible for opening the indexes.
643 : */
644 : void
645 63844 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
646 : EState *estate, EPQState *epqstate,
647 : TupleTableSlot *searchslot, TupleTableSlot *slot)
648 : {
649 63844 : bool skip_tuple = false;
650 63844 : Relation rel = resultRelInfo->ri_RelationDesc;
651 63844 : ItemPointer tid = &(searchslot->tts_tid);
652 :
653 : /*
654 : * We support only non-system tables, with
655 : * check_publication_add_relation() accountable.
656 : */
657 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
658 : Assert(!IsCatalogRelation(rel));
659 :
660 63844 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
661 :
662 : /* BEFORE ROW UPDATE Triggers */
663 63844 : if (resultRelInfo->ri_TrigDesc &&
664 20 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
665 : {
666 6 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
667 : tid, NULL, slot, NULL, NULL))
668 4 : skip_tuple = true; /* "do nothing" */
669 : }
670 :
671 63844 : if (!skip_tuple)
672 : {
673 63840 : List *recheckIndexes = NIL;
674 : TU_UpdateIndexes update_indexes;
675 : List *conflictindexes;
676 63840 : bool conflict = false;
677 :
678 : /* Compute stored generated columns */
679 63840 : if (rel->rd_att->constr &&
680 63750 : rel->rd_att->constr->has_generated_stored)
681 4 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
682 : CMD_UPDATE);
683 :
684 : /* Check the constraints of the tuple */
685 63840 : if (rel->rd_att->constr)
686 63750 : ExecConstraints(resultRelInfo, slot, estate);
687 63840 : if (rel->rd_rel->relispartition)
688 24 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
689 :
690 63840 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
691 : &update_indexes);
692 :
693 63840 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
694 :
695 63840 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
696 40300 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
697 : slot, estate, true,
698 : conflictindexes ? true : false,
699 : &conflict, conflictindexes,
700 : (update_indexes == TU_Summarizing));
701 :
702 : /*
703 : * Refer to the comments above the call to CheckAndReportConflict() in
704 : * ExecSimpleRelationInsert to understand why this check is done at
705 : * this point.
706 : */
707 63840 : if (conflict)
708 2 : CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
709 : recheckIndexes, searchslot, slot);
710 :
711 : /* AFTER ROW UPDATE Triggers */
712 63838 : ExecARUpdateTriggers(estate, resultRelInfo,
713 : NULL, NULL,
714 : tid, NULL, slot,
715 : recheckIndexes, NULL, false);
716 :
717 63838 : list_free(recheckIndexes);
718 : }
719 63842 : }
720 :
721 : /*
722 : * Find the searchslot tuple and delete it, and execute any constraints
723 : * and per-row triggers.
724 : *
725 : * Caller is responsible for opening the indexes.
726 : */
727 : void
728 80620 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
729 : EState *estate, EPQState *epqstate,
730 : TupleTableSlot *searchslot)
731 : {
732 80620 : bool skip_tuple = false;
733 80620 : Relation rel = resultRelInfo->ri_RelationDesc;
734 80620 : ItemPointer tid = &searchslot->tts_tid;
735 :
736 80620 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
737 :
738 : /* BEFORE ROW DELETE Triggers */
739 80620 : if (resultRelInfo->ri_TrigDesc &&
740 20 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
741 : {
742 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
743 0 : tid, NULL, NULL, NULL, NULL);
744 : }
745 :
746 80620 : if (!skip_tuple)
747 : {
748 : /* OK, delete the tuple */
749 80620 : simple_table_tuple_delete(rel, tid, estate->es_snapshot);
750 :
751 : /* AFTER ROW DELETE Triggers */
752 80620 : ExecARDeleteTriggers(estate, resultRelInfo,
753 : tid, NULL, NULL, false);
754 : }
755 80620 : }
756 :
757 : /*
758 : * Check if command can be executed with current replica identity.
759 : */
760 : void
761 428504 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
762 : {
763 : PublicationDesc pubdesc;
764 :
765 : /*
766 : * Skip checking the replica identity for partitioned tables, because the
767 : * operations are actually performed on the leaf partitions.
768 : */
769 428504 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
770 407072 : return;
771 :
772 : /* We only need to do checks for UPDATE and DELETE. */
773 422506 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
774 250580 : return;
775 :
776 : /*
777 : * It is only safe to execute UPDATE/DELETE if the relation does not
778 : * publish UPDATEs or DELETEs, or all the following conditions are
779 : * satisfied:
780 : *
781 : * 1. All columns, referenced in the row filters from publications which
782 : * the relation is in, are valid - i.e. when all referenced columns are
783 : * part of REPLICA IDENTITY.
784 : *
785 : * 2. All columns, referenced in the column lists are valid - i.e. when
786 : * all columns referenced in the REPLICA IDENTITY are covered by the
787 : * column list.
788 : *
789 : * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
790 : * - i.e. when all these generated columns are published.
791 : *
792 : * XXX We could optimize it by first checking whether any of the
793 : * publications have a row filter or column list for this relation, or if
794 : * the relation contains a generated column. If none of these exist and
795 : * the relation has replica identity then we can avoid building the
796 : * descriptor but as this happens only one time it doesn't seem worth the
797 : * additional complexity.
798 : */
799 171926 : RelationBuildPublicationDesc(rel, &pubdesc);
800 171926 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
801 60 : ereport(ERROR,
802 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
803 : errmsg("cannot update table \"%s\"",
804 : RelationGetRelationName(rel)),
805 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
806 171866 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
807 108 : ereport(ERROR,
808 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
809 : errmsg("cannot update table \"%s\"",
810 : RelationGetRelationName(rel)),
811 : errdetail("Column list used by the publication does not cover the replica identity.")));
812 171758 : else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
813 12 : ereport(ERROR,
814 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
815 : errmsg("cannot update table \"%s\"",
816 : RelationGetRelationName(rel)),
817 : errdetail("Replica identity must not contain unpublished generated columns.")));
818 171746 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
819 0 : ereport(ERROR,
820 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
821 : errmsg("cannot delete from table \"%s\"",
822 : RelationGetRelationName(rel)),
823 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
824 171746 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
825 0 : ereport(ERROR,
826 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
827 : errmsg("cannot delete from table \"%s\"",
828 : RelationGetRelationName(rel)),
829 : errdetail("Column list used by the publication does not cover the replica identity.")));
830 171746 : else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
831 0 : ereport(ERROR,
832 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
833 : errmsg("cannot delete from table \"%s\"",
834 : RelationGetRelationName(rel)),
835 : errdetail("Replica identity must not contain unpublished generated columns.")));
836 :
837 : /* If relation has replica identity we are always good. */
838 171746 : if (OidIsValid(RelationGetReplicaIndex(rel)))
839 150044 : return;
840 :
841 : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
842 21702 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
843 450 : return;
844 :
845 : /*
846 : * This is UPDATE/DELETE and there is no replica identity.
847 : *
848 : * Check if the table publishes UPDATES or DELETES.
849 : */
850 21252 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
851 106 : ereport(ERROR,
852 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
853 : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
854 : RelationGetRelationName(rel)),
855 : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
856 21146 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
857 10 : ereport(ERROR,
858 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
859 : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
860 : RelationGetRelationName(rel)),
861 : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
862 : }
863 :
864 :
865 : /*
866 : * Check if we support writing into specific relkind.
867 : *
868 : * The nspname and relname are only needed for error reporting.
869 : */
870 : void
871 1682 : CheckSubscriptionRelkind(char relkind, const char *nspname,
872 : const char *relname)
873 : {
874 1682 : if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
875 0 : ereport(ERROR,
876 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
877 : errmsg("cannot use relation \"%s.%s\" as logical replication target",
878 : nspname, relname),
879 : errdetail_relkind_not_supported(relkind)));
880 1682 : }
|