Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execReplication.c
4 : * miscellaneous executor routines for logical replication
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execReplication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/relscan.h"
19 : #include "access/tableam.h"
20 : #include "access/transam.h"
21 : #include "access/xact.h"
22 : #include "catalog/pg_am_d.h"
23 : #include "commands/trigger.h"
24 : #include "executor/executor.h"
25 : #include "executor/nodeModifyTable.h"
26 : #include "replication/conflict.h"
27 : #include "replication/logicalrelation.h"
28 : #include "storage/lmgr.h"
29 : #include "utils/builtins.h"
30 : #include "utils/lsyscache.h"
31 : #include "utils/rel.h"
32 : #include "utils/snapmgr.h"
33 : #include "utils/syscache.h"
34 : #include "utils/typcache.h"
35 :
36 :
37 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
38 : TypeCacheEntry **eq);
39 :
40 : /*
41 : * Returns the fixed strategy number, if any, of the equality operator for the
42 : * given index access method, otherwise, InvalidStrategy.
43 : *
44 : * Currently, only Btree and Hash indexes are supported. The other index access
45 : * methods don't have a fixed strategy for equality operation - instead, the
46 : * support routines of each operator class interpret the strategy numbers
47 : * according to the operator class's definition.
48 : */
49 : StrategyNumber
50 144232 : get_equal_strategy_number_for_am(Oid am)
51 : {
52 : int ret;
53 :
54 144232 : switch (am)
55 : {
56 144220 : case BTREE_AM_OID:
57 144220 : ret = BTEqualStrategyNumber;
58 144220 : break;
59 12 : case HASH_AM_OID:
60 12 : ret = HTEqualStrategyNumber;
61 12 : break;
62 0 : default:
63 : /* XXX: Only Btree and Hash indexes are supported */
64 0 : ret = InvalidStrategy;
65 0 : break;
66 : }
67 :
68 144232 : return ret;
69 : }
70 :
71 : /*
72 : * Return the appropriate strategy number which corresponds to the equality
73 : * operator.
74 : */
75 : static StrategyNumber
76 144196 : get_equal_strategy_number(Oid opclass)
77 : {
78 144196 : Oid am = get_opclass_method(opclass);
79 :
80 144196 : return get_equal_strategy_number_for_am(am);
81 : }
82 :
83 : /*
84 : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
85 : * is setup to match 'rel' (*NOT* idxrel!).
86 : *
87 : * Returns how many columns to use for the index scan.
88 : *
89 : * This is not generic routine, idxrel must be PK, RI, or an index that can be
90 : * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
91 : * for details.
92 : *
93 : * By definition, replication identity of a rel meets all limitations associated
94 : * with that. Note that any other index could also meet these limitations.
95 : */
96 : static int
97 144174 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
98 : TupleTableSlot *searchslot)
99 : {
100 : int index_attoff;
101 144174 : int skey_attoff = 0;
102 : Datum indclassDatum;
103 : oidvector *opclass;
104 144174 : int2vector *indkey = &idxrel->rd_index->indkey;
105 :
106 144174 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
107 : Anum_pg_index_indclass);
108 144174 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
109 :
110 : /* Build scankey for every non-expression attribute in the index. */
111 288374 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
112 144200 : index_attoff++)
113 : {
114 : Oid operator;
115 : Oid optype;
116 : Oid opfamily;
117 : RegProcedure regop;
118 144200 : int table_attno = indkey->values[index_attoff];
119 : StrategyNumber eq_strategy;
120 :
121 144200 : if (!AttributeNumberIsValid(table_attno))
122 : {
123 : /*
124 : * XXX: Currently, we don't support expressions in the scan key,
125 : * see code below.
126 : */
127 4 : continue;
128 : }
129 :
130 : /*
131 : * Load the operator info. We need this to get the equality operator
132 : * function for the scan key.
133 : */
134 144196 : optype = get_opclass_input_type(opclass->values[index_attoff]);
135 144196 : opfamily = get_opclass_family(opclass->values[index_attoff]);
136 144196 : eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
137 :
138 144196 : operator = get_opfamily_member(opfamily, optype,
139 : optype,
140 : eq_strategy);
141 :
142 144196 : if (!OidIsValid(operator))
143 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
144 : eq_strategy, optype, optype, opfamily);
145 :
146 144196 : regop = get_opcode(operator);
147 :
148 : /* Initialize the scankey. */
149 144196 : ScanKeyInit(&skey[skey_attoff],
150 144196 : index_attoff + 1,
151 : eq_strategy,
152 : regop,
153 144196 : searchslot->tts_values[table_attno - 1]);
154 :
155 144196 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
156 :
157 : /* Check for null value. */
158 144196 : if (searchslot->tts_isnull[table_attno - 1])
159 2 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
160 :
161 144196 : skey_attoff++;
162 : }
163 :
164 : /* There must always be at least one attribute for the index scan. */
165 : Assert(skey_attoff > 0);
166 :
167 144174 : return skey_attoff;
168 : }
169 :
170 :
171 : /*
172 : * Helper function to check if it is necessary to re-fetch and lock the tuple
173 : * due to concurrent modifications. This function should be called after
174 : * invoking table_tuple_lock.
175 : */
176 : static bool
177 144456 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
178 : {
179 144456 : bool refetch = false;
180 :
181 144456 : switch (res)
182 : {
183 144456 : case TM_Ok:
184 144456 : break;
185 0 : case TM_Updated:
186 : /* XXX: Improve handling here */
187 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
188 0 : ereport(LOG,
189 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
190 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
191 : else
192 0 : ereport(LOG,
193 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
194 : errmsg("concurrent update, retrying")));
195 0 : refetch = true;
196 0 : break;
197 0 : case TM_Deleted:
198 : /* XXX: Improve handling here */
199 0 : ereport(LOG,
200 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
201 : errmsg("concurrent delete, retrying")));
202 0 : refetch = true;
203 0 : break;
204 0 : case TM_Invisible:
205 0 : elog(ERROR, "attempted to lock invisible tuple");
206 : break;
207 0 : default:
208 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
209 : break;
210 : }
211 :
212 144456 : return refetch;
213 : }
214 :
215 : /*
216 : * Search the relation 'rel' for tuple using the index.
217 : *
218 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
219 : * contents, and return true. Return false otherwise.
220 : */
221 : bool
222 144174 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
223 : LockTupleMode lockmode,
224 : TupleTableSlot *searchslot,
225 : TupleTableSlot *outslot)
226 : {
227 : ScanKeyData skey[INDEX_MAX_KEYS];
228 : int skey_attoff;
229 : IndexScanDesc scan;
230 : SnapshotData snap;
231 : TransactionId xwait;
232 : Relation idxrel;
233 : bool found;
234 144174 : TypeCacheEntry **eq = NULL;
235 : bool isIdxSafeToSkipDuplicates;
236 :
237 : /* Open the index. */
238 144174 : idxrel = index_open(idxoid, RowExclusiveLock);
239 :
240 144174 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
241 :
242 144174 : InitDirtySnapshot(snap);
243 :
244 : /* Build scan key. */
245 144174 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
246 :
247 : /* Start an index scan. */
248 144174 : scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
249 :
250 144174 : retry:
251 144174 : found = false;
252 :
253 144174 : index_rescan(scan, skey, skey_attoff, NULL, 0);
254 :
255 : /* Try to find the tuple */
256 144174 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
257 : {
258 : /*
259 : * Avoid expensive equality check if the index is primary key or
260 : * replica identity index.
261 : */
262 144150 : if (!isIdxSafeToSkipDuplicates)
263 : {
264 30 : if (eq == NULL)
265 30 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
266 :
267 30 : if (!tuples_equal(outslot, searchslot, eq))
268 0 : continue;
269 : }
270 :
271 144150 : ExecMaterializeSlot(outslot);
272 :
273 288300 : xwait = TransactionIdIsValid(snap.xmin) ?
274 144150 : snap.xmin : snap.xmax;
275 :
276 : /*
277 : * If the tuple is locked, wait for locking transaction to finish and
278 : * retry.
279 : */
280 144150 : if (TransactionIdIsValid(xwait))
281 : {
282 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
283 0 : goto retry;
284 : }
285 :
286 : /* Found our tuple and it's not locked */
287 144150 : found = true;
288 144150 : break;
289 : }
290 :
291 : /* Found tuple, try to lock it in the lockmode. */
292 144174 : if (found)
293 : {
294 : TM_FailureData tmfd;
295 : TM_Result res;
296 :
297 144150 : PushActiveSnapshot(GetLatestSnapshot());
298 :
299 144150 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
300 : outslot,
301 : GetCurrentCommandId(false),
302 : lockmode,
303 : LockWaitBlock,
304 : 0 /* don't follow updates */ ,
305 : &tmfd);
306 :
307 144150 : PopActiveSnapshot();
308 :
309 144150 : if (should_refetch_tuple(res, &tmfd))
310 0 : goto retry;
311 : }
312 :
313 144174 : index_endscan(scan);
314 :
315 : /* Don't release lock until commit. */
316 144174 : index_close(idxrel, NoLock);
317 :
318 144174 : return found;
319 : }
320 :
321 : /*
322 : * Compare the tuples in the slots by checking if they have equal values.
323 : */
324 : static bool
325 210636 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
326 : TypeCacheEntry **eq)
327 : {
328 : int attrnum;
329 :
330 : Assert(slot1->tts_tupleDescriptor->natts ==
331 : slot2->tts_tupleDescriptor->natts);
332 :
333 210636 : slot_getallattrs(slot1);
334 210636 : slot_getallattrs(slot2);
335 :
336 : /* Check equality of the attributes. */
337 211018 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
338 : {
339 : Form_pg_attribute att;
340 : TypeCacheEntry *typentry;
341 :
342 210696 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
343 :
344 : /*
345 : * Ignore dropped and generated columns as the publisher doesn't send
346 : * those
347 : */
348 210696 : if (att->attisdropped || att->attgenerated)
349 4 : continue;
350 :
351 : /*
352 : * If one value is NULL and other is not, then they are certainly not
353 : * equal
354 : */
355 210692 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
356 0 : return false;
357 :
358 : /*
359 : * If both are NULL, they can be considered equal.
360 : */
361 210692 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
362 2 : continue;
363 :
364 210690 : typentry = eq[attrnum];
365 210690 : if (typentry == NULL)
366 : {
367 380 : typentry = lookup_type_cache(att->atttypid,
368 : TYPECACHE_EQ_OPR_FINFO);
369 380 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
370 0 : ereport(ERROR,
371 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
372 : errmsg("could not identify an equality operator for type %s",
373 : format_type_be(att->atttypid))));
374 380 : eq[attrnum] = typentry;
375 : }
376 :
377 210690 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
378 : att->attcollation,
379 210690 : slot1->tts_values[attrnum],
380 210690 : slot2->tts_values[attrnum])))
381 210314 : return false;
382 : }
383 :
384 322 : return true;
385 : }
386 :
387 : /*
388 : * Search the relation 'rel' for tuple using the sequential scan.
389 : *
390 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
391 : * contents, and return true. Return false otherwise.
392 : *
393 : * Note that this stops on the first matching tuple.
394 : *
395 : * This can obviously be quite slow on tables that have more than few rows.
396 : */
397 : bool
398 296 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
399 : TupleTableSlot *searchslot, TupleTableSlot *outslot)
400 : {
401 : TupleTableSlot *scanslot;
402 : TableScanDesc scan;
403 : SnapshotData snap;
404 : TypeCacheEntry **eq;
405 : TransactionId xwait;
406 : bool found;
407 296 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
408 :
409 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
410 :
411 296 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
412 :
413 : /* Start a heap scan. */
414 296 : InitDirtySnapshot(snap);
415 296 : scan = table_beginscan(rel, &snap, 0, NULL);
416 296 : scanslot = table_slot_create(rel, NULL);
417 :
418 296 : retry:
419 296 : found = false;
420 :
421 296 : table_rescan(scan, NULL);
422 :
423 : /* Try to find the tuple */
424 210610 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
425 : {
426 210606 : if (!tuples_equal(scanslot, searchslot, eq))
427 210314 : continue;
428 :
429 292 : found = true;
430 292 : ExecCopySlot(outslot, scanslot);
431 :
432 584 : xwait = TransactionIdIsValid(snap.xmin) ?
433 292 : snap.xmin : snap.xmax;
434 :
435 : /*
436 : * If the tuple is locked, wait for locking transaction to finish and
437 : * retry.
438 : */
439 292 : if (TransactionIdIsValid(xwait))
440 : {
441 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
442 0 : goto retry;
443 : }
444 :
445 : /* Found our tuple and it's not locked */
446 292 : break;
447 : }
448 :
449 : /* Found tuple, try to lock it in the lockmode. */
450 296 : if (found)
451 : {
452 : TM_FailureData tmfd;
453 : TM_Result res;
454 :
455 292 : PushActiveSnapshot(GetLatestSnapshot());
456 :
457 292 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
458 : outslot,
459 : GetCurrentCommandId(false),
460 : lockmode,
461 : LockWaitBlock,
462 : 0 /* don't follow updates */ ,
463 : &tmfd);
464 :
465 292 : PopActiveSnapshot();
466 :
467 292 : if (should_refetch_tuple(res, &tmfd))
468 0 : goto retry;
469 : }
470 :
471 296 : table_endscan(scan);
472 296 : ExecDropSingleTupleTableSlot(scanslot);
473 :
474 296 : return found;
475 : }
476 :
477 : /*
478 : * Find the tuple that violates the passed unique index (conflictindex).
479 : *
480 : * If the conflicting tuple is found return true, otherwise false.
481 : *
482 : * We lock the tuple to avoid getting it deleted before the caller can fetch
483 : * the required information. Note that if the tuple is deleted before a lock
484 : * is acquired, we will retry to find the conflicting tuple again.
485 : */
486 : static bool
487 18 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
488 : Oid conflictindex, TupleTableSlot *slot,
489 : TupleTableSlot **conflictslot)
490 : {
491 18 : Relation rel = resultRelInfo->ri_RelationDesc;
492 : ItemPointerData conflictTid;
493 : TM_FailureData tmfd;
494 : TM_Result res;
495 :
496 18 : *conflictslot = NULL;
497 :
498 18 : retry:
499 18 : if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
500 : &conflictTid, &slot->tts_tid,
501 18 : list_make1_oid(conflictindex)))
502 : {
503 2 : if (*conflictslot)
504 0 : ExecDropSingleTupleTableSlot(*conflictslot);
505 :
506 2 : *conflictslot = NULL;
507 2 : return false;
508 : }
509 :
510 14 : *conflictslot = table_slot_create(rel, NULL);
511 :
512 14 : PushActiveSnapshot(GetLatestSnapshot());
513 :
514 14 : res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
515 : *conflictslot,
516 : GetCurrentCommandId(false),
517 : LockTupleShare,
518 : LockWaitBlock,
519 : 0 /* don't follow updates */ ,
520 : &tmfd);
521 :
522 14 : PopActiveSnapshot();
523 :
524 14 : if (should_refetch_tuple(res, &tmfd))
525 0 : goto retry;
526 :
527 14 : return true;
528 : }
529 :
530 : /*
531 : * Check all the unique indexes in 'recheckIndexes' for conflict with the
532 : * tuple in 'remoteslot' and report if found.
533 : */
534 : static void
535 18 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
536 : ConflictType type, List *recheckIndexes,
537 : TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
538 : {
539 : /* Check all the unique indexes for a conflict */
540 22 : foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
541 : {
542 : TupleTableSlot *conflictslot;
543 :
544 34 : if (list_member_oid(recheckIndexes, uniqueidx) &&
545 18 : FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
546 : &conflictslot))
547 : {
548 : RepOriginId origin;
549 : TimestampTz committs;
550 : TransactionId xmin;
551 :
552 14 : GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
553 14 : ReportApplyConflict(estate, resultRelInfo, ERROR, type,
554 : searchslot, conflictslot, remoteslot,
555 : uniqueidx, xmin, origin, committs);
556 : }
557 : }
558 2 : }
559 :
560 : /*
561 : * Insert tuple represented in the slot to the relation, update the indexes,
562 : * and execute any constraints and per-row triggers.
563 : *
564 : * Caller is responsible for opening the indexes.
565 : */
566 : void
567 152496 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
568 : EState *estate, TupleTableSlot *slot)
569 : {
570 152496 : bool skip_tuple = false;
571 152496 : Relation rel = resultRelInfo->ri_RelationDesc;
572 :
573 : /* For now we support only tables. */
574 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
575 :
576 152496 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
577 :
578 : /* BEFORE ROW INSERT Triggers */
579 152496 : if (resultRelInfo->ri_TrigDesc &&
580 38 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
581 : {
582 6 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
583 2 : skip_tuple = true; /* "do nothing" */
584 : }
585 :
586 152496 : if (!skip_tuple)
587 : {
588 152494 : List *recheckIndexes = NIL;
589 : List *conflictindexes;
590 152494 : bool conflict = false;
591 :
592 : /* Compute stored generated columns */
593 152494 : if (rel->rd_att->constr &&
594 90890 : rel->rd_att->constr->has_generated_stored)
595 8 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
596 : CMD_INSERT);
597 :
598 : /* Check the constraints of the tuple */
599 152494 : if (rel->rd_att->constr)
600 90890 : ExecConstraints(resultRelInfo, slot, estate);
601 152494 : if (rel->rd_rel->relispartition)
602 120 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
603 :
604 : /* OK, store the tuple and create index entries for it */
605 152494 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
606 :
607 152494 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
608 :
609 152494 : if (resultRelInfo->ri_NumIndices > 0)
610 111840 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
611 : slot, estate, false,
612 : conflictindexes ? true : false,
613 : &conflict,
614 : conflictindexes, false);
615 :
616 : /*
617 : * Checks the conflict indexes to fetch the conflicting local tuple
618 : * and reports the conflict. We perform this check here, instead of
619 : * performing an additional index scan before the actual insertion and
620 : * reporting the conflict if any conflicting tuples are found. This is
621 : * to avoid the overhead of executing the extra scan for each INSERT
622 : * operation, even when no conflict arises, which could introduce
623 : * significant overhead to replication, particularly in cases where
624 : * conflicts are rare.
625 : *
626 : * XXX OTOH, this could lead to clean-up effort for dead tuples added
627 : * in heap and index in case of conflicts. But as conflicts shouldn't
628 : * be a frequent thing so we preferred to save the performance
629 : * overhead of extra scan before each insertion.
630 : */
631 152494 : if (conflict)
632 16 : CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
633 : recheckIndexes, NULL, slot);
634 :
635 : /* AFTER ROW INSERT Triggers */
636 152480 : ExecARInsertTriggers(estate, resultRelInfo, slot,
637 : recheckIndexes, NULL);
638 :
639 : /*
640 : * XXX we should in theory pass a TransitionCaptureState object to the
641 : * above to capture transition tuples, but after statement triggers
642 : * don't actually get fired by replication yet anyway
643 : */
644 :
645 152480 : list_free(recheckIndexes);
646 : }
647 152482 : }
648 :
649 : /*
650 : * Find the searchslot tuple and update it with data in the slot,
651 : * update the indexes, and execute any constraints and per-row triggers.
652 : *
653 : * Caller is responsible for opening the indexes.
654 : */
655 : void
656 63834 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
657 : EState *estate, EPQState *epqstate,
658 : TupleTableSlot *searchslot, TupleTableSlot *slot)
659 : {
660 63834 : bool skip_tuple = false;
661 63834 : Relation rel = resultRelInfo->ri_RelationDesc;
662 63834 : ItemPointer tid = &(searchslot->tts_tid);
663 :
664 : /*
665 : * We support only non-system tables, with
666 : * check_publication_add_relation() accountable.
667 : */
668 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
669 : Assert(!IsCatalogRelation(rel));
670 :
671 63834 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
672 :
673 : /* BEFORE ROW UPDATE Triggers */
674 63834 : if (resultRelInfo->ri_TrigDesc &&
675 20 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
676 : {
677 6 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
678 : tid, NULL, slot, NULL, NULL))
679 4 : skip_tuple = true; /* "do nothing" */
680 : }
681 :
682 63834 : if (!skip_tuple)
683 : {
684 63830 : List *recheckIndexes = NIL;
685 : TU_UpdateIndexes update_indexes;
686 : List *conflictindexes;
687 63830 : bool conflict = false;
688 :
689 : /* Compute stored generated columns */
690 63830 : if (rel->rd_att->constr &&
691 63744 : rel->rd_att->constr->has_generated_stored)
692 6 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
693 : CMD_UPDATE);
694 :
695 : /* Check the constraints of the tuple */
696 63830 : if (rel->rd_att->constr)
697 63744 : ExecConstraints(resultRelInfo, slot, estate);
698 63830 : if (rel->rd_rel->relispartition)
699 24 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
700 :
701 63830 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
702 : &update_indexes);
703 :
704 63830 : conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
705 :
706 63830 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
707 40318 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
708 : slot, estate, true,
709 : conflictindexes ? true : false,
710 : &conflict, conflictindexes,
711 : (update_indexes == TU_Summarizing));
712 :
713 : /*
714 : * Refer to the comments above the call to CheckAndReportConflict() in
715 : * ExecSimpleRelationInsert to understand why this check is done at
716 : * this point.
717 : */
718 63830 : if (conflict)
719 2 : CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
720 : recheckIndexes, searchslot, slot);
721 :
722 : /* AFTER ROW UPDATE Triggers */
723 63828 : ExecARUpdateTriggers(estate, resultRelInfo,
724 : NULL, NULL,
725 : tid, NULL, slot,
726 : recheckIndexes, NULL, false);
727 :
728 63828 : list_free(recheckIndexes);
729 : }
730 63832 : }
731 :
732 : /*
733 : * Find the searchslot tuple and delete it, and execute any constraints
734 : * and per-row triggers.
735 : *
736 : * Caller is responsible for opening the indexes.
737 : */
738 : void
739 80608 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
740 : EState *estate, EPQState *epqstate,
741 : TupleTableSlot *searchslot)
742 : {
743 80608 : bool skip_tuple = false;
744 80608 : Relation rel = resultRelInfo->ri_RelationDesc;
745 80608 : ItemPointer tid = &searchslot->tts_tid;
746 :
747 80608 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
748 :
749 : /* BEFORE ROW DELETE Triggers */
750 80608 : if (resultRelInfo->ri_TrigDesc &&
751 20 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
752 : {
753 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
754 0 : tid, NULL, NULL, NULL, NULL);
755 : }
756 :
757 80608 : if (!skip_tuple)
758 : {
759 : /* OK, delete the tuple */
760 80608 : simple_table_tuple_delete(rel, tid, estate->es_snapshot);
761 :
762 : /* AFTER ROW DELETE Triggers */
763 80608 : ExecARDeleteTriggers(estate, resultRelInfo,
764 : tid, NULL, NULL, false);
765 : }
766 80608 : }
767 :
768 : /*
769 : * Check if command can be executed with current replica identity.
770 : */
771 : void
772 427600 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
773 : {
774 : PublicationDesc pubdesc;
775 :
776 : /*
777 : * Skip checking the replica identity for partitioned tables, because the
778 : * operations are actually performed on the leaf partitions.
779 : */
780 427600 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
781 406380 : return;
782 :
783 : /* We only need to do checks for UPDATE and DELETE. */
784 421608 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
785 250018 : return;
786 :
787 : /*
788 : * It is only safe to execute UPDATE/DELETE when all columns, referenced
789 : * in the row filters from publications which the relation is in, are
790 : * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
791 : * or the table does not publish UPDATEs or DELETEs.
792 : *
793 : * XXX We could optimize it by first checking whether any of the
794 : * publications have a row filter for this relation. If not and relation
795 : * has replica identity then we can avoid building the descriptor but as
796 : * this happens only one time it doesn't seem worth the additional
797 : * complexity.
798 : */
799 171590 : RelationBuildPublicationDesc(rel, &pubdesc);
800 171590 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
801 60 : ereport(ERROR,
802 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
803 : errmsg("cannot update table \"%s\"",
804 : RelationGetRelationName(rel)),
805 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
806 171530 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
807 108 : ereport(ERROR,
808 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
809 : errmsg("cannot update table \"%s\"",
810 : RelationGetRelationName(rel)),
811 : errdetail("Column list used by the publication does not cover the replica identity.")));
812 171422 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
813 0 : ereport(ERROR,
814 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
815 : errmsg("cannot delete from table \"%s\"",
816 : RelationGetRelationName(rel)),
817 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
818 171422 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
819 0 : ereport(ERROR,
820 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
821 : errmsg("cannot delete from table \"%s\"",
822 : RelationGetRelationName(rel)),
823 : errdetail("Column list used by the publication does not cover the replica identity.")));
824 :
825 : /* If relation has replica identity we are always good. */
826 171422 : if (OidIsValid(RelationGetReplicaIndex(rel)))
827 149948 : return;
828 :
829 : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
830 21474 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
831 422 : return;
832 :
833 : /*
834 : * This is UPDATE/DELETE and there is no replica identity.
835 : *
836 : * Check if the table publishes UPDATES or DELETES.
837 : */
838 21052 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
839 96 : ereport(ERROR,
840 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
841 : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
842 : RelationGetRelationName(rel)),
843 : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
844 20956 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
845 0 : ereport(ERROR,
846 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
847 : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
848 : RelationGetRelationName(rel)),
849 : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
850 : }
851 :
852 :
853 : /*
854 : * Check if we support writing into specific relkind.
855 : *
856 : * The nspname and relname are only needed for error reporting.
857 : */
858 : void
859 1618 : CheckSubscriptionRelkind(char relkind, const char *nspname,
860 : const char *relname)
861 : {
862 1618 : if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
863 0 : ereport(ERROR,
864 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
865 : errmsg("cannot use relation \"%s.%s\" as logical replication target",
866 : nspname, relname),
867 : errdetail_relkind_not_supported(relkind)));
868 1618 : }
|