Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execReplication.c
4 : * miscellaneous executor routines for logical replication
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execReplication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/gist.h"
19 : #include "access/relscan.h"
20 : #include "access/tableam.h"
21 : #include "access/transam.h"
22 : #include "access/xact.h"
23 : #include "catalog/pg_am_d.h"
24 : #include "commands/trigger.h"
25 : #include "executor/executor.h"
26 : #include "executor/nodeModifyTable.h"
27 : #include "replication/conflict.h"
28 : #include "replication/logicalrelation.h"
29 : #include "storage/lmgr.h"
30 : #include "utils/builtins.h"
31 : #include "utils/lsyscache.h"
32 : #include "utils/rel.h"
33 : #include "utils/snapmgr.h"
34 : #include "utils/syscache.h"
35 : #include "utils/typcache.h"
36 :
37 :
38 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
39 : TypeCacheEntry **eq);
40 :
41 : /*
42 : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
43 : * is setup to match 'rel' (*NOT* idxrel!).
44 : *
45 : * Returns how many columns to use for the index scan.
46 : *
47 : * This is not generic routine, idxrel must be PK, RI, or an index that can be
48 : * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
49 : * for details.
50 : *
51 : * By definition, replication identity of a rel meets all limitations associated
52 : * with that. Note that any other index could also meet these limitations.
53 : */
54 : static int
55 144198 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
56 : TupleTableSlot *searchslot)
57 : {
58 : int index_attoff;
59 144198 : int skey_attoff = 0;
60 : Datum indclassDatum;
61 : oidvector *opclass;
62 144198 : int2vector *indkey = &idxrel->rd_index->indkey;
63 :
64 144198 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
65 : Anum_pg_index_indclass);
66 144198 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
67 :
68 : /* Build scankey for every non-expression attribute in the index. */
69 288442 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
70 144244 : index_attoff++)
71 : {
72 : Oid operator;
73 : Oid optype;
74 : Oid opfamily;
75 : RegProcedure regop;
76 144244 : int table_attno = indkey->values[index_attoff];
77 : StrategyNumber eq_strategy;
78 :
79 144244 : if (!AttributeNumberIsValid(table_attno))
80 : {
81 : /*
82 : * XXX: Currently, we don't support expressions in the scan key,
83 : * see code below.
84 : */
85 4 : continue;
86 : }
87 :
88 : /*
89 : * Load the operator info. We need this to get the equality operator
90 : * function for the scan key.
91 : */
92 144240 : optype = get_opclass_input_type(opclass->values[index_attoff]);
93 144240 : opfamily = get_opclass_family(opclass->values[index_attoff]);
94 144240 : eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
95 144240 : operator = get_opfamily_member(opfamily, optype,
96 : optype,
97 : eq_strategy);
98 :
99 144240 : if (!OidIsValid(operator))
100 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
101 : eq_strategy, optype, optype, opfamily);
102 :
103 144240 : regop = get_opcode(operator);
104 :
105 : /* Initialize the scankey. */
106 144240 : ScanKeyInit(&skey[skey_attoff],
107 144240 : index_attoff + 1,
108 : eq_strategy,
109 : regop,
110 144240 : searchslot->tts_values[table_attno - 1]);
111 :
112 144240 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
113 :
114 : /* Check for null value. */
115 144240 : if (searchslot->tts_isnull[table_attno - 1])
116 2 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
117 :
118 144240 : skey_attoff++;
119 : }
120 :
121 : /* There must always be at least one attribute for the index scan. */
122 : Assert(skey_attoff > 0);
123 :
124 144198 : return skey_attoff;
125 : }
126 :
127 :
128 : /*
129 : * Helper function to check if it is necessary to re-fetch and lock the tuple
130 : * due to concurrent modifications. This function should be called after
131 : * invoking table_tuple_lock.
132 : */
133 : static bool
134 144496 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
135 : {
136 144496 : bool refetch = false;
137 :
138 144496 : switch (res)
139 : {
140 144496 : case TM_Ok:
141 144496 : break;
142 0 : case TM_Updated:
143 : /* XXX: Improve handling here */
144 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
145 0 : ereport(LOG,
146 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
147 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
148 : else
149 0 : ereport(LOG,
150 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
151 : errmsg("concurrent update, retrying")));
152 0 : refetch = true;
153 0 : break;
154 0 : case TM_Deleted:
155 : /* XXX: Improve handling here */
156 0 : ereport(LOG,
157 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
158 : errmsg("concurrent delete, retrying")));
159 0 : refetch = true;
160 0 : break;
161 0 : case TM_Invisible:
162 0 : elog(ERROR, "attempted to lock invisible tuple");
163 : break;
164 0 : default:
165 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
166 : break;
167 : }
168 :
169 144496 : return refetch;
170 : }
171 :
172 : /*
173 : * Search the relation 'rel' for tuple using the index.
174 : *
175 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
176 : * contents, and return true. Return false otherwise.
177 : */
178 : bool
179 144198 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
180 : LockTupleMode lockmode,
181 : TupleTableSlot *searchslot,
182 : TupleTableSlot *outslot)
183 : {
184 : ScanKeyData skey[INDEX_MAX_KEYS];
185 : int skey_attoff;
186 : IndexScanDesc scan;
187 : SnapshotData snap;
188 : TransactionId xwait;
189 : Relation idxrel;
190 : bool found;
191 144198 : TypeCacheEntry **eq = NULL;
192 : bool isIdxSafeToSkipDuplicates;
193 :
194 : /* Open the index. */
195 144198 : idxrel = index_open(idxoid, RowExclusiveLock);
196 :
197 144198 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
198 :
199 144198 : InitDirtySnapshot(snap);
200 :
201 : /* Build scan key. */
202 144198 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
203 :
204 : /* Start an index scan. */
205 144198 : scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
206 :
207 144198 : retry:
208 144198 : found = false;
209 :
210 144198 : index_rescan(scan, skey, skey_attoff, NULL, 0);
211 :
212 : /* Try to find the tuple */
213 144198 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
214 : {
215 : /*
216 : * Avoid expensive equality check if the index is primary key or
217 : * replica identity index.
218 : */
219 144172 : if (!isIdxSafeToSkipDuplicates)
220 : {
221 34 : if (eq == NULL)
222 34 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
223 :
224 34 : if (!tuples_equal(outslot, searchslot, eq))
225 0 : continue;
226 : }
227 :
228 144172 : ExecMaterializeSlot(outslot);
229 :
230 288344 : xwait = TransactionIdIsValid(snap.xmin) ?
231 144172 : snap.xmin : snap.xmax;
232 :
233 : /*
234 : * If the tuple is locked, wait for locking transaction to finish and
235 : * retry.
236 : */
237 144172 : if (TransactionIdIsValid(xwait))
238 : {
239 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
240 0 : goto retry;
241 : }
242 :
243 : /* Found our tuple and it's not locked */
244 144172 : found = true;
245 144172 : break;
246 : }
247 :
248 : /* Found tuple, try to lock it in the lockmode. */
249 144198 : if (found)
250 : {
251 : TM_FailureData tmfd;
252 : TM_Result res;
253 :
254 144172 : PushActiveSnapshot(GetLatestSnapshot());
255 :
256 144172 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
257 : outslot,
258 : GetCurrentCommandId(false),
259 : lockmode,
260 : LockWaitBlock,
261 : 0 /* don't follow updates */ ,
262 : &tmfd);
263 :
264 144172 : PopActiveSnapshot();
265 :
266 144172 : if (should_refetch_tuple(res, &tmfd))
267 0 : goto retry;
268 : }
269 :
270 144198 : index_endscan(scan);
271 :
272 : /* Don't release lock until commit. */
273 144198 : index_close(idxrel, NoLock);
274 :
275 144198 : return found;
276 : }
277 :
278 : /*
279 : * Compare the tuples in the slots by checking if they have equal values.
280 : */
281 : static bool
282 210646 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
283 : TypeCacheEntry **eq)
284 : {
285 : int attrnum;
286 :
287 : Assert(slot1->tts_tupleDescriptor->natts ==
288 : slot2->tts_tupleDescriptor->natts);
289 :
290 210646 : slot_getallattrs(slot1);
291 210646 : slot_getallattrs(slot2);
292 :
293 : /* Check equality of the attributes. */
294 211046 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
295 : {
296 : Form_pg_attribute att;
297 : TypeCacheEntry *typentry;
298 :
299 210718 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
300 :
301 : /*
302 : * Ignore dropped and generated columns as the publisher doesn't send
303 : * those
304 : */
305 210718 : if (att->attisdropped || att->attgenerated)
306 2 : continue;
307 :
308 : /*
309 : * If one value is NULL and other is not, then they are certainly not
310 : * equal
311 : */
312 210716 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
313 0 : return false;
314 :
315 : /*
316 : * If both are NULL, they can be considered equal.
317 : */
318 210716 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
319 2 : continue;
320 :
321 210714 : typentry = eq[attrnum];
322 210714 : if (typentry == NULL)
323 : {
324 400 : typentry = lookup_type_cache(att->atttypid,
325 : TYPECACHE_EQ_OPR_FINFO);
326 400 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
327 0 : ereport(ERROR,
328 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
329 : errmsg("could not identify an equality operator for type %s",
330 : format_type_be(att->atttypid))));
331 400 : eq[attrnum] = typentry;
332 : }
333 :
334 210714 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
335 : att->attcollation,
336 210714 : slot1->tts_values[attrnum],
337 210714 : slot2->tts_values[attrnum])))
338 210318 : return false;
339 : }
340 :
341 328 : return true;
342 : }
343 :
344 : /*
345 : * Search the relation 'rel' for tuple using the sequential scan.
346 : *
347 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
348 : * contents, and return true. Return false otherwise.
349 : *
350 : * Note that this stops on the first matching tuple.
351 : *
352 : * This can obviously be quite slow on tables that have more than few rows.
353 : */
354 : bool
355 298 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
356 : TupleTableSlot *searchslot, TupleTableSlot *outslot)
357 : {
358 : TupleTableSlot *scanslot;
359 : TableScanDesc scan;
360 : SnapshotData snap;
361 : TypeCacheEntry **eq;
362 : TransactionId xwait;
363 : bool found;
364 298 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
365 :
366 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
367 :
368 298 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
369 :
370 : /* Start a heap scan. */
371 298 : InitDirtySnapshot(snap);
372 298 : scan = table_beginscan(rel, &snap, 0, NULL);
373 298 : scanslot = table_slot_create(rel, NULL);
374 :
375 298 : retry:
376 298 : found = false;
377 :
378 298 : table_rescan(scan, NULL);
379 :
380 : /* Try to find the tuple */
381 210616 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
382 : {
383 210612 : if (!tuples_equal(scanslot, searchslot, eq))
384 210318 : continue;
385 :
386 294 : found = true;
387 294 : ExecCopySlot(outslot, scanslot);
388 :
389 588 : xwait = TransactionIdIsValid(snap.xmin) ?
390 294 : snap.xmin : snap.xmax;
391 :
392 : /*
393 : * If the tuple is locked, wait for locking transaction to finish and
394 : * retry.
395 : */
396 294 : if (TransactionIdIsValid(xwait))
397 : {
398 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
399 0 : goto retry;
400 : }
401 :
402 : /* Found our tuple and it's not locked */
403 294 : break;
404 : }
405 :
406 : /* Found tuple, try to lock it in the lockmode. */
407 298 : if (found)
408 : {
409 : TM_FailureData tmfd;
410 : TM_Result res;
411 :
412 294 : PushActiveSnapshot(GetLatestSnapshot());
413 :
414 294 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
415 : outslot,
416 : GetCurrentCommandId(false),
417 : lockmode,
418 : LockWaitBlock,
419 : 0 /* don't follow updates */ ,
420 : &tmfd);
421 :
422 294 : PopActiveSnapshot();
423 :
424 294 : if (should_refetch_tuple(res, &tmfd))
425 0 : goto retry;
426 : }
427 :
428 298 : table_endscan(scan);
429 298 : ExecDropSingleTupleTableSlot(scanslot);
430 :
431 298 : return found;
432 : }
433 :
434 : /*
435 : * Build additional index information necessary for conflict detection.
436 : */
437 : static void
438 34 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
439 : {
440 96 : for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
441 : {
442 62 : Relation indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
443 62 : IndexInfo *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
444 :
445 62 : if (conflictindex != RelationGetRelid(indexRelation))
446 28 : continue;
447 :
448 : /*
449 : * This Assert will fail if BuildSpeculativeIndexInfo() is called
450 : * twice for the given index.
451 : */
452 : Assert(indexRelationInfo->ii_UniqueOps == NULL);
453 :
454 34 : BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
455 : }
456 34 : }
457 :
458 : /*
459 : * Find the tuple that violates the passed unique index (conflictindex).
460 : *
461 : * If the conflicting tuple is found return true, otherwise false.
462 : *
463 : * We lock the tuple to avoid getting it deleted before the caller can fetch
464 : * the required information. Note that if the tuple is deleted before a lock
465 : * is acquired, we will retry to find the conflicting tuple again.
466 : */
467 : static bool
468 34 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
469 : Oid conflictindex, TupleTableSlot *slot,
470 : TupleTableSlot **conflictslot)
471 : {
472 34 : Relation rel = resultRelInfo->ri_RelationDesc;
473 : ItemPointerData conflictTid;
474 : TM_FailureData tmfd;
475 : TM_Result res;
476 :
477 34 : *conflictslot = NULL;
478 :
479 : /*
480 : * Build additional information required to check constraints violations.
481 : * See check_exclusion_or_unique_constraint().
482 : */
483 34 : BuildConflictIndexInfo(resultRelInfo, conflictindex);
484 :
485 34 : retry:
486 34 : if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
487 : &conflictTid, &slot->tts_tid,
488 34 : list_make1_oid(conflictindex)))
489 : {
490 2 : if (*conflictslot)
491 0 : ExecDropSingleTupleTableSlot(*conflictslot);
492 :
493 2 : *conflictslot = NULL;
494 2 : return false;
495 : }
496 :
497 30 : *conflictslot = table_slot_create(rel, NULL);
498 :
499 30 : PushActiveSnapshot(GetLatestSnapshot());
500 :
501 30 : res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
502 : *conflictslot,
503 : GetCurrentCommandId(false),
504 : LockTupleShare,
505 : LockWaitBlock,
506 : 0 /* don't follow updates */ ,
507 : &tmfd);
508 :
509 30 : PopActiveSnapshot();
510 :
511 30 : if (should_refetch_tuple(res, &tmfd))
512 0 : goto retry;
513 :
514 30 : return true;
515 : }
516 :
517 : /*
518 : * Check all the unique indexes in 'recheckIndexes' for conflict with the
519 : * tuple in 'remoteslot' and report if found.
520 : */
521 : static void
522 24 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
523 : ConflictType type, List *recheckIndexes,
524 : TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
525 : {
526 24 : List *conflicttuples = NIL;
527 : TupleTableSlot *conflictslot;
528 :
529 : /* Check all the unique indexes for conflicts */
530 78 : foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
531 : {
532 66 : if (list_member_oid(recheckIndexes, uniqueidx) &&
533 34 : FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
534 : &conflictslot))
535 : {
536 30 : ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
537 :
538 30 : conflicttuple->slot = conflictslot;
539 30 : conflicttuple->indexoid = uniqueidx;
540 :
541 30 : GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
542 : &conflicttuple->origin, &conflicttuple->ts);
543 :
544 30 : conflicttuples = lappend(conflicttuples, conflicttuple);
545 : }
546 : }
547 :
548 : /* Report the conflict, if found */
549 22 : if (conflicttuples)
550 20 : ReportApplyConflict(estate, resultRelInfo, ERROR,
551 20 : list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
552 : searchslot, remoteslot, conflicttuples);
553 2 : }
554 :
555 : /*
556 : * Insert tuple represented in the slot to the relation, update the indexes,
557 : * and execute any constraints and per-row triggers.
558 : *
559 : * Caller is responsible for opening the indexes.
560 : */
561 : void
562 152504 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
563 : EState *estate, TupleTableSlot *slot)
564 : {
565 152504 : bool skip_tuple = false;
566 152504 : Relation rel = resultRelInfo->ri_RelationDesc;
567 :
568 : /* For now we support only tables. */
569 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
570 :
571 152504 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
572 :
573 : /* BEFORE ROW INSERT Triggers */
574 152504 : if (resultRelInfo->ri_TrigDesc &&
575 38 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
576 : {
577 6 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
578 2 : skip_tuple = true; /* "do nothing" */
579 : }
580 :
581 152504 : if (!skip_tuple)
582 : {
583 152502 : List *recheckIndexes = NIL;
584 : List *conflictindexes;
585 152502 : bool conflict = false;
586 :
587 : /* Compute stored generated columns */
588 152502 : if (rel->rd_att->constr &&
589 90932 : rel->rd_att->constr->has_generated_stored)
590 8 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
591 : CMD_INSERT);
592 :
593 : /* Check the constraints of the tuple */
594 152502 : if (rel->rd_att->constr)
595 90932 : ExecConstraints(resultRelInfo, slot, estate);
596 152502 : if (rel->rd_rel->relispartition)
597 122 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
598 :
599 : /* OK, store the tuple and create index entries for it */
600 152502 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
601 :
602 152502 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
603 :
604 152502 : if (resultRelInfo->ri_NumIndices > 0)
605 111824 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
606 : slot, estate, false,
607 : conflictindexes ? true : false,
608 : &conflict,
609 : conflictindexes, false);
610 :
611 : /*
612 : * Checks the conflict indexes to fetch the conflicting local tuple
613 : * and reports the conflict. We perform this check here, instead of
614 : * performing an additional index scan before the actual insertion and
615 : * reporting the conflict if any conflicting tuples are found. This is
616 : * to avoid the overhead of executing the extra scan for each INSERT
617 : * operation, even when no conflict arises, which could introduce
618 : * significant overhead to replication, particularly in cases where
619 : * conflicts are rare.
620 : *
621 : * XXX OTOH, this could lead to clean-up effort for dead tuples added
622 : * in heap and index in case of conflicts. But as conflicts shouldn't
623 : * be a frequent thing so we preferred to save the performance
624 : * overhead of extra scan before each insertion.
625 : */
626 152502 : if (conflict)
627 20 : CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
628 : recheckIndexes, NULL, slot);
629 :
630 : /* AFTER ROW INSERT Triggers */
631 152484 : ExecARInsertTriggers(estate, resultRelInfo, slot,
632 : recheckIndexes, NULL);
633 :
634 : /*
635 : * XXX we should in theory pass a TransitionCaptureState object to the
636 : * above to capture transition tuples, but after statement triggers
637 : * don't actually get fired by replication yet anyway
638 : */
639 :
640 152484 : list_free(recheckIndexes);
641 : }
642 152486 : }
643 :
644 : /*
645 : * Find the searchslot tuple and update it with data in the slot,
646 : * update the indexes, and execute any constraints and per-row triggers.
647 : *
648 : * Caller is responsible for opening the indexes.
649 : */
650 : void
651 63846 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
652 : EState *estate, EPQState *epqstate,
653 : TupleTableSlot *searchslot, TupleTableSlot *slot)
654 : {
655 63846 : bool skip_tuple = false;
656 63846 : Relation rel = resultRelInfo->ri_RelationDesc;
657 63846 : ItemPointer tid = &(searchslot->tts_tid);
658 :
659 : /*
660 : * We support only non-system tables, with
661 : * check_publication_add_relation() accountable.
662 : */
663 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
664 : Assert(!IsCatalogRelation(rel));
665 :
666 63846 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
667 :
668 : /* BEFORE ROW UPDATE Triggers */
669 63846 : if (resultRelInfo->ri_TrigDesc &&
670 20 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
671 : {
672 6 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
673 : tid, NULL, slot, NULL, NULL))
674 4 : skip_tuple = true; /* "do nothing" */
675 : }
676 :
677 63846 : if (!skip_tuple)
678 : {
679 63842 : List *recheckIndexes = NIL;
680 : TU_UpdateIndexes update_indexes;
681 : List *conflictindexes;
682 63842 : bool conflict = false;
683 :
684 : /* Compute stored generated columns */
685 63842 : if (rel->rd_att->constr &&
686 63752 : rel->rd_att->constr->has_generated_stored)
687 4 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
688 : CMD_UPDATE);
689 :
690 : /* Check the constraints of the tuple */
691 63842 : if (rel->rd_att->constr)
692 63752 : ExecConstraints(resultRelInfo, slot, estate);
693 63842 : if (rel->rd_rel->relispartition)
694 24 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
695 :
696 63842 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
697 : &update_indexes);
698 :
699 63842 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
700 :
701 63842 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
702 40316 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
703 : slot, estate, true,
704 : conflictindexes ? true : false,
705 : &conflict, conflictindexes,
706 : (update_indexes == TU_Summarizing));
707 :
708 : /*
709 : * Refer to the comments above the call to CheckAndReportConflict() in
710 : * ExecSimpleRelationInsert to understand why this check is done at
711 : * this point.
712 : */
713 63842 : if (conflict)
714 4 : CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
715 : recheckIndexes, searchslot, slot);
716 :
717 : /* AFTER ROW UPDATE Triggers */
718 63838 : ExecARUpdateTriggers(estate, resultRelInfo,
719 : NULL, NULL,
720 : tid, NULL, slot,
721 : recheckIndexes, NULL, false);
722 :
723 63838 : list_free(recheckIndexes);
724 : }
725 63842 : }
726 :
727 : /*
728 : * Find the searchslot tuple and delete it, and execute any constraints
729 : * and per-row triggers.
730 : *
731 : * Caller is responsible for opening the indexes.
732 : */
733 : void
734 80620 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
735 : EState *estate, EPQState *epqstate,
736 : TupleTableSlot *searchslot)
737 : {
738 80620 : bool skip_tuple = false;
739 80620 : Relation rel = resultRelInfo->ri_RelationDesc;
740 80620 : ItemPointer tid = &searchslot->tts_tid;
741 :
742 80620 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
743 :
744 : /* BEFORE ROW DELETE Triggers */
745 80620 : if (resultRelInfo->ri_TrigDesc &&
746 20 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
747 : {
748 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
749 0 : tid, NULL, NULL, NULL, NULL);
750 : }
751 :
752 80620 : if (!skip_tuple)
753 : {
754 : /* OK, delete the tuple */
755 80620 : simple_table_tuple_delete(rel, tid, estate->es_snapshot);
756 :
757 : /* AFTER ROW DELETE Triggers */
758 80620 : ExecARDeleteTriggers(estate, resultRelInfo,
759 : tid, NULL, NULL, false);
760 : }
761 80620 : }
762 :
763 : /*
764 : * Check if command can be executed with current replica identity.
765 : */
766 : void
767 431208 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
768 : {
769 : PublicationDesc pubdesc;
770 :
771 : /*
772 : * Skip checking the replica identity for partitioned tables, because the
773 : * operations are actually performed on the leaf partitions.
774 : */
775 431208 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
776 409210 : return;
777 :
778 : /* We only need to do checks for UPDATE and DELETE. */
779 425202 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
780 252760 : return;
781 :
782 : /*
783 : * It is only safe to execute UPDATE/DELETE if the relation does not
784 : * publish UPDATEs or DELETEs, or all the following conditions are
785 : * satisfied:
786 : *
787 : * 1. All columns, referenced in the row filters from publications which
788 : * the relation is in, are valid - i.e. when all referenced columns are
789 : * part of REPLICA IDENTITY.
790 : *
791 : * 2. All columns, referenced in the column lists are valid - i.e. when
792 : * all columns referenced in the REPLICA IDENTITY are covered by the
793 : * column list.
794 : *
795 : * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
796 : * - i.e. when all these generated columns are published.
797 : *
798 : * XXX We could optimize it by first checking whether any of the
799 : * publications have a row filter or column list for this relation, or if
800 : * the relation contains a generated column. If none of these exist and
801 : * the relation has replica identity then we can avoid building the
802 : * descriptor but as this happens only one time it doesn't seem worth the
803 : * additional complexity.
804 : */
805 172442 : RelationBuildPublicationDesc(rel, &pubdesc);
806 172442 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
807 60 : ereport(ERROR,
808 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
809 : errmsg("cannot update table \"%s\"",
810 : RelationGetRelationName(rel)),
811 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
812 172382 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
813 108 : ereport(ERROR,
814 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
815 : errmsg("cannot update table \"%s\"",
816 : RelationGetRelationName(rel)),
817 : errdetail("Column list used by the publication does not cover the replica identity.")));
818 172274 : else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
819 24 : ereport(ERROR,
820 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
821 : errmsg("cannot update table \"%s\"",
822 : RelationGetRelationName(rel)),
823 : errdetail("Replica identity must not contain unpublished generated columns.")));
824 172250 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
825 0 : ereport(ERROR,
826 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
827 : errmsg("cannot delete from table \"%s\"",
828 : RelationGetRelationName(rel)),
829 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
830 172250 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
831 0 : ereport(ERROR,
832 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
833 : errmsg("cannot delete from table \"%s\"",
834 : RelationGetRelationName(rel)),
835 : errdetail("Column list used by the publication does not cover the replica identity.")));
836 172250 : else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
837 0 : ereport(ERROR,
838 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
839 : errmsg("cannot delete from table \"%s\"",
840 : RelationGetRelationName(rel)),
841 : errdetail("Replica identity must not contain unpublished generated columns.")));
842 :
843 : /* If relation has replica identity we are always good. */
844 172250 : if (OidIsValid(RelationGetReplicaIndex(rel)))
845 149994 : return;
846 :
847 : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
848 22256 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
849 450 : return;
850 :
851 : /*
852 : * This is UPDATE/DELETE and there is no replica identity.
853 : *
854 : * Check if the table publishes UPDATES or DELETES.
855 : */
856 21806 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
857 106 : ereport(ERROR,
858 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
859 : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
860 : RelationGetRelationName(rel)),
861 : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
862 21700 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
863 10 : ereport(ERROR,
864 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
865 : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
866 : RelationGetRelationName(rel)),
867 : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
868 : }
869 :
870 :
871 : /*
872 : * Check if we support writing into specific relkind.
873 : *
874 : * The nspname and relname are only needed for error reporting.
875 : */
876 : void
877 1738 : CheckSubscriptionRelkind(char relkind, const char *nspname,
878 : const char *relname)
879 : {
880 1738 : if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
881 0 : ereport(ERROR,
882 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
883 : errmsg("cannot use relation \"%s.%s\" as logical replication target",
884 : nspname, relname),
885 : errdetail_relkind_not_supported(relkind)));
886 1738 : }
|