Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execReplication.c
4 : * miscellaneous executor routines for logical replication
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execReplication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/relscan.h"
19 : #include "access/tableam.h"
20 : #include "access/transam.h"
21 : #include "access/xact.h"
22 : #include "catalog/pg_am_d.h"
23 : #include "commands/trigger.h"
24 : #include "executor/executor.h"
25 : #include "executor/nodeModifyTable.h"
26 : #include "replication/logicalrelation.h"
27 : #include "storage/lmgr.h"
28 : #include "utils/builtins.h"
29 : #include "utils/lsyscache.h"
30 : #include "utils/rel.h"
31 : #include "utils/snapmgr.h"
32 : #include "utils/syscache.h"
33 : #include "utils/typcache.h"
34 :
35 :
36 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
37 : TypeCacheEntry **eq);
38 :
39 : /*
40 : * Returns the fixed strategy number, if any, of the equality operator for the
41 : * given index access method, otherwise, InvalidStrategy.
42 : *
43 : * Currently, only Btree and Hash indexes are supported. The other index access
44 : * methods don't have a fixed strategy for equality operation - instead, the
45 : * support routines of each operator class interpret the strategy numbers
46 : * according to the operator class's definition.
47 : */
48 : StrategyNumber
49 144212 : get_equal_strategy_number_for_am(Oid am)
50 : {
51 : int ret;
52 :
53 144212 : switch (am)
54 : {
55 144200 : case BTREE_AM_OID:
56 144200 : ret = BTEqualStrategyNumber;
57 144200 : break;
58 12 : case HASH_AM_OID:
59 12 : ret = HTEqualStrategyNumber;
60 12 : break;
61 0 : default:
62 : /* XXX: Only Btree and Hash indexes are supported */
63 0 : ret = InvalidStrategy;
64 0 : break;
65 : }
66 :
67 144212 : return ret;
68 : }
69 :
70 : /*
71 : * Return the appropriate strategy number which corresponds to the equality
72 : * operator.
73 : */
74 : static StrategyNumber
75 144176 : get_equal_strategy_number(Oid opclass)
76 : {
77 144176 : Oid am = get_opclass_method(opclass);
78 :
79 144176 : return get_equal_strategy_number_for_am(am);
80 : }
81 :
82 : /*
83 : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
84 : * is setup to match 'rel' (*NOT* idxrel!).
85 : *
86 : * Returns how many columns to use for the index scan.
87 : *
88 : * This is not generic routine, idxrel must be PK, RI, or an index that can be
89 : * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
90 : * for details.
91 : *
92 : * By definition, replication identity of a rel meets all limitations associated
93 : * with that. Note that any other index could also meet these limitations.
94 : */
95 : static int
96 144154 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
97 : TupleTableSlot *searchslot)
98 : {
99 : int index_attoff;
100 144154 : int skey_attoff = 0;
101 : Datum indclassDatum;
102 : oidvector *opclass;
103 144154 : int2vector *indkey = &idxrel->rd_index->indkey;
104 :
105 144154 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
106 : Anum_pg_index_indclass);
107 144154 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
108 :
109 : /* Build scankey for every non-expression attribute in the index. */
110 288334 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
111 144180 : index_attoff++)
112 : {
113 : Oid operator;
114 : Oid optype;
115 : Oid opfamily;
116 : RegProcedure regop;
117 144180 : int table_attno = indkey->values[index_attoff];
118 : StrategyNumber eq_strategy;
119 :
120 144180 : if (!AttributeNumberIsValid(table_attno))
121 : {
122 : /*
123 : * XXX: Currently, we don't support expressions in the scan key,
124 : * see code below.
125 : */
126 4 : continue;
127 : }
128 :
129 : /*
130 : * Load the operator info. We need this to get the equality operator
131 : * function for the scan key.
132 : */
133 144176 : optype = get_opclass_input_type(opclass->values[index_attoff]);
134 144176 : opfamily = get_opclass_family(opclass->values[index_attoff]);
135 144176 : eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
136 :
137 144176 : operator = get_opfamily_member(opfamily, optype,
138 : optype,
139 : eq_strategy);
140 :
141 144176 : if (!OidIsValid(operator))
142 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
143 : eq_strategy, optype, optype, opfamily);
144 :
145 144176 : regop = get_opcode(operator);
146 :
147 : /* Initialize the scankey. */
148 144176 : ScanKeyInit(&skey[skey_attoff],
149 144176 : index_attoff + 1,
150 : eq_strategy,
151 : regop,
152 144176 : searchslot->tts_values[table_attno - 1]);
153 :
154 144176 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
155 :
156 : /* Check for null value. */
157 144176 : if (searchslot->tts_isnull[table_attno - 1])
158 2 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
159 :
160 144176 : skey_attoff++;
161 : }
162 :
163 : /* There must always be at least one attribute for the index scan. */
164 : Assert(skey_attoff > 0);
165 :
166 144154 : return skey_attoff;
167 : }
168 :
169 : /*
170 : * Search the relation 'rel' for tuple using the index.
171 : *
172 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
173 : * contents, and return true. Return false otherwise.
174 : */
175 : bool
176 144154 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
177 : LockTupleMode lockmode,
178 : TupleTableSlot *searchslot,
179 : TupleTableSlot *outslot)
180 : {
181 : ScanKeyData skey[INDEX_MAX_KEYS];
182 : int skey_attoff;
183 : IndexScanDesc scan;
184 : SnapshotData snap;
185 : TransactionId xwait;
186 : Relation idxrel;
187 : bool found;
188 144154 : TypeCacheEntry **eq = NULL;
189 : bool isIdxSafeToSkipDuplicates;
190 :
191 : /* Open the index. */
192 144154 : idxrel = index_open(idxoid, RowExclusiveLock);
193 :
194 144154 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
195 :
196 144154 : InitDirtySnapshot(snap);
197 :
198 : /* Build scan key. */
199 144154 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
200 :
201 : /* Start an index scan. */
202 144154 : scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
203 :
204 144154 : retry:
205 144154 : found = false;
206 :
207 144154 : index_rescan(scan, skey, skey_attoff, NULL, 0);
208 :
209 : /* Try to find the tuple */
210 144154 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
211 : {
212 : /*
213 : * Avoid expensive equality check if the index is primary key or
214 : * replica identity index.
215 : */
216 144138 : if (!isIdxSafeToSkipDuplicates)
217 : {
218 30 : if (eq == NULL)
219 30 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
220 :
221 30 : if (!tuples_equal(outslot, searchslot, eq))
222 0 : continue;
223 : }
224 :
225 144138 : ExecMaterializeSlot(outslot);
226 :
227 288276 : xwait = TransactionIdIsValid(snap.xmin) ?
228 144138 : snap.xmin : snap.xmax;
229 :
230 : /*
231 : * If the tuple is locked, wait for locking transaction to finish and
232 : * retry.
233 : */
234 144138 : if (TransactionIdIsValid(xwait))
235 : {
236 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
237 0 : goto retry;
238 : }
239 :
240 : /* Found our tuple and it's not locked */
241 144138 : found = true;
242 144138 : break;
243 : }
244 :
245 : /* Found tuple, try to lock it in the lockmode. */
246 144154 : if (found)
247 : {
248 : TM_FailureData tmfd;
249 : TM_Result res;
250 :
251 144138 : PushActiveSnapshot(GetLatestSnapshot());
252 :
253 144138 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
254 : outslot,
255 : GetCurrentCommandId(false),
256 : lockmode,
257 : LockWaitBlock,
258 : 0 /* don't follow updates */ ,
259 : &tmfd);
260 :
261 144138 : PopActiveSnapshot();
262 :
263 144138 : switch (res)
264 : {
265 144138 : case TM_Ok:
266 144138 : break;
267 0 : case TM_Updated:
268 : /* XXX: Improve handling here */
269 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
270 0 : ereport(LOG,
271 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
272 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
273 : else
274 0 : ereport(LOG,
275 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
276 : errmsg("concurrent update, retrying")));
277 0 : goto retry;
278 0 : case TM_Deleted:
279 : /* XXX: Improve handling here */
280 0 : ereport(LOG,
281 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
282 : errmsg("concurrent delete, retrying")));
283 0 : goto retry;
284 0 : case TM_Invisible:
285 0 : elog(ERROR, "attempted to lock invisible tuple");
286 : break;
287 0 : default:
288 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
289 : break;
290 : }
291 : }
292 :
293 144154 : index_endscan(scan);
294 :
295 : /* Don't release lock until commit. */
296 144154 : index_close(idxrel, NoLock);
297 :
298 144154 : return found;
299 : }
300 :
301 : /*
302 : * Compare the tuples in the slots by checking if they have equal values.
303 : */
304 : static bool
305 210564 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
306 : TypeCacheEntry **eq)
307 : {
308 : int attrnum;
309 :
310 : Assert(slot1->tts_tupleDescriptor->natts ==
311 : slot2->tts_tupleDescriptor->natts);
312 :
313 210564 : slot_getallattrs(slot1);
314 210564 : slot_getallattrs(slot2);
315 :
316 : /* Check equality of the attributes. */
317 210946 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
318 : {
319 : Form_pg_attribute att;
320 : TypeCacheEntry *typentry;
321 :
322 210624 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
323 :
324 : /*
325 : * Ignore dropped and generated columns as the publisher doesn't send
326 : * those
327 : */
328 210624 : if (att->attisdropped || att->attgenerated)
329 4 : continue;
330 :
331 : /*
332 : * If one value is NULL and other is not, then they are certainly not
333 : * equal
334 : */
335 210620 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
336 0 : return false;
337 :
338 : /*
339 : * If both are NULL, they can be considered equal.
340 : */
341 210620 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
342 2 : continue;
343 :
344 210618 : typentry = eq[attrnum];
345 210618 : if (typentry == NULL)
346 : {
347 376 : typentry = lookup_type_cache(att->atttypid,
348 : TYPECACHE_EQ_OPR_FINFO);
349 376 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
350 0 : ereport(ERROR,
351 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
352 : errmsg("could not identify an equality operator for type %s",
353 : format_type_be(att->atttypid))));
354 376 : eq[attrnum] = typentry;
355 : }
356 :
357 210618 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
358 : att->attcollation,
359 210618 : slot1->tts_values[attrnum],
360 210618 : slot2->tts_values[attrnum])))
361 210242 : return false;
362 : }
363 :
364 322 : return true;
365 : }
366 :
367 : /*
368 : * Search the relation 'rel' for tuple using the sequential scan.
369 : *
370 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
371 : * contents, and return true. Return false otherwise.
372 : *
373 : * Note that this stops on the first matching tuple.
374 : *
375 : * This can obviously be quite slow on tables that have more than few rows.
376 : */
377 : bool
378 292 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
379 : TupleTableSlot *searchslot, TupleTableSlot *outslot)
380 : {
381 : TupleTableSlot *scanslot;
382 : TableScanDesc scan;
383 : SnapshotData snap;
384 : TypeCacheEntry **eq;
385 : TransactionId xwait;
386 : bool found;
387 292 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
388 :
389 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
390 :
391 292 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
392 :
393 : /* Start a heap scan. */
394 292 : InitDirtySnapshot(snap);
395 292 : scan = table_beginscan(rel, &snap, 0, NULL);
396 292 : scanslot = table_slot_create(rel, NULL);
397 :
398 292 : retry:
399 292 : found = false;
400 :
401 292 : table_rescan(scan, NULL);
402 :
403 : /* Try to find the tuple */
404 210534 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
405 : {
406 210534 : if (!tuples_equal(scanslot, searchslot, eq))
407 210242 : continue;
408 :
409 292 : found = true;
410 292 : ExecCopySlot(outslot, scanslot);
411 :
412 584 : xwait = TransactionIdIsValid(snap.xmin) ?
413 292 : snap.xmin : snap.xmax;
414 :
415 : /*
416 : * If the tuple is locked, wait for locking transaction to finish and
417 : * retry.
418 : */
419 292 : if (TransactionIdIsValid(xwait))
420 : {
421 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
422 0 : goto retry;
423 : }
424 :
425 : /* Found our tuple and it's not locked */
426 292 : break;
427 : }
428 :
429 : /* Found tuple, try to lock it in the lockmode. */
430 292 : if (found)
431 : {
432 : TM_FailureData tmfd;
433 : TM_Result res;
434 :
435 292 : PushActiveSnapshot(GetLatestSnapshot());
436 :
437 292 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
438 : outslot,
439 : GetCurrentCommandId(false),
440 : lockmode,
441 : LockWaitBlock,
442 : 0 /* don't follow updates */ ,
443 : &tmfd);
444 :
445 292 : PopActiveSnapshot();
446 :
447 292 : switch (res)
448 : {
449 292 : case TM_Ok:
450 292 : break;
451 0 : case TM_Updated:
452 : /* XXX: Improve handling here */
453 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
454 0 : ereport(LOG,
455 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
456 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
457 : else
458 0 : ereport(LOG,
459 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
460 : errmsg("concurrent update, retrying")));
461 0 : goto retry;
462 0 : case TM_Deleted:
463 : /* XXX: Improve handling here */
464 0 : ereport(LOG,
465 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
466 : errmsg("concurrent delete, retrying")));
467 0 : goto retry;
468 0 : case TM_Invisible:
469 0 : elog(ERROR, "attempted to lock invisible tuple");
470 : break;
471 0 : default:
472 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
473 : break;
474 : }
475 : }
476 :
477 292 : table_endscan(scan);
478 292 : ExecDropSingleTupleTableSlot(scanslot);
479 :
480 292 : return found;
481 : }
482 :
483 : /*
484 : * Insert tuple represented in the slot to the relation, update the indexes,
485 : * and execute any constraints and per-row triggers.
486 : *
487 : * Caller is responsible for opening the indexes.
488 : */
489 : void
490 152460 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
491 : EState *estate, TupleTableSlot *slot)
492 : {
493 152460 : bool skip_tuple = false;
494 152460 : Relation rel = resultRelInfo->ri_RelationDesc;
495 :
496 : /* For now we support only tables. */
497 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
498 :
499 152460 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
500 :
501 : /* BEFORE ROW INSERT Triggers */
502 152460 : if (resultRelInfo->ri_TrigDesc &&
503 38 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
504 : {
505 6 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
506 2 : skip_tuple = true; /* "do nothing" */
507 : }
508 :
509 152460 : if (!skip_tuple)
510 : {
511 152458 : List *recheckIndexes = NIL;
512 :
513 : /* Compute stored generated columns */
514 152458 : if (rel->rd_att->constr &&
515 90884 : rel->rd_att->constr->has_generated_stored)
516 8 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
517 : CMD_INSERT);
518 :
519 : /* Check the constraints of the tuple */
520 152458 : if (rel->rd_att->constr)
521 90884 : ExecConstraints(resultRelInfo, slot, estate);
522 152458 : if (rel->rd_rel->relispartition)
523 120 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
524 :
525 : /* OK, store the tuple and create index entries for it */
526 152458 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
527 :
528 152458 : if (resultRelInfo->ri_NumIndices > 0)
529 111820 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
530 : slot, estate, false, false,
531 : NULL, NIL, false);
532 :
533 : /* AFTER ROW INSERT Triggers */
534 152442 : ExecARInsertTriggers(estate, resultRelInfo, slot,
535 : recheckIndexes, NULL);
536 :
537 : /*
538 : * XXX we should in theory pass a TransitionCaptureState object to the
539 : * above to capture transition tuples, but after statement triggers
540 : * don't actually get fired by replication yet anyway
541 : */
542 :
543 152442 : list_free(recheckIndexes);
544 : }
545 152444 : }
546 :
547 : /*
548 : * Find the searchslot tuple and update it with data in the slot,
549 : * update the indexes, and execute any constraints and per-row triggers.
550 : *
551 : * Caller is responsible for opening the indexes.
552 : */
553 : void
554 63826 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
555 : EState *estate, EPQState *epqstate,
556 : TupleTableSlot *searchslot, TupleTableSlot *slot)
557 : {
558 63826 : bool skip_tuple = false;
559 63826 : Relation rel = resultRelInfo->ri_RelationDesc;
560 63826 : ItemPointer tid = &(searchslot->tts_tid);
561 :
562 : /* For now we support only tables. */
563 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
564 :
565 63826 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
566 :
567 : /* BEFORE ROW UPDATE Triggers */
568 63826 : if (resultRelInfo->ri_TrigDesc &&
569 20 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
570 : {
571 6 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
572 : tid, NULL, slot, NULL, NULL))
573 4 : skip_tuple = true; /* "do nothing" */
574 : }
575 :
576 63826 : if (!skip_tuple)
577 : {
578 63822 : List *recheckIndexes = NIL;
579 : TU_UpdateIndexes update_indexes;
580 63822 : TupleTableSlot *oldSlot = NULL;
581 :
582 : /* Compute stored generated columns */
583 63822 : if (rel->rd_att->constr &&
584 63736 : rel->rd_att->constr->has_generated_stored)
585 6 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
586 : CMD_UPDATE);
587 :
588 : /* Check the constraints of the tuple */
589 63822 : if (rel->rd_att->constr)
590 63736 : ExecConstraints(resultRelInfo, slot, estate);
591 63822 : if (rel->rd_rel->relispartition)
592 22 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
593 :
594 63822 : if (resultRelInfo->ri_TrigDesc &&
595 16 : resultRelInfo->ri_TrigDesc->trig_update_after_row)
596 14 : oldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
597 :
598 63822 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
599 : &update_indexes, oldSlot);
600 :
601 63822 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
602 40204 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
603 : slot, estate, true, false,
604 : NULL, NIL,
605 : (update_indexes == TU_Summarizing));
606 :
607 : /* AFTER ROW UPDATE Triggers */
608 63822 : ExecARUpdateTriggers(estate, resultRelInfo,
609 : NULL, NULL,
610 : NULL, oldSlot, slot,
611 : recheckIndexes, NULL, false);
612 :
613 63822 : list_free(recheckIndexes);
614 : }
615 63826 : }
616 :
617 : /*
618 : * Find the searchslot tuple and delete it, and execute any constraints
619 : * and per-row triggers.
620 : *
621 : * Caller is responsible for opening the indexes.
622 : */
623 : void
624 80602 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
625 : EState *estate, EPQState *epqstate,
626 : TupleTableSlot *searchslot)
627 : {
628 80602 : bool skip_tuple = false;
629 80602 : Relation rel = resultRelInfo->ri_RelationDesc;
630 80602 : ItemPointer tid = &searchslot->tts_tid;
631 :
632 80602 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
633 :
634 : /* BEFORE ROW DELETE Triggers */
635 80602 : if (resultRelInfo->ri_TrigDesc &&
636 20 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
637 : {
638 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
639 0 : tid, NULL, NULL, NULL, NULL);
640 : }
641 :
642 80602 : if (!skip_tuple)
643 : {
644 80602 : TupleTableSlot *oldSlot = NULL;
645 :
646 80602 : if (resultRelInfo->ri_TrigDesc &&
647 20 : resultRelInfo->ri_TrigDesc->trig_delete_after_row)
648 0 : oldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
649 :
650 : /* OK, delete the tuple */
651 80602 : simple_table_tuple_delete(rel, tid, estate->es_snapshot, oldSlot);
652 :
653 : /* AFTER ROW DELETE Triggers */
654 80602 : ExecARDeleteTriggers(estate, resultRelInfo,
655 : NULL, oldSlot, NULL, false);
656 : }
657 80602 : }
658 :
659 : /*
660 : * Check if command can be executed with current replica identity.
661 : */
662 : void
663 424458 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
664 : {
665 : PublicationDesc pubdesc;
666 :
667 : /*
668 : * Skip checking the replica identity for partitioned tables, because the
669 : * operations are actually performed on the leaf partitions.
670 : */
671 424458 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
672 402856 : return;
673 :
674 : /* We only need to do checks for UPDATE and DELETE. */
675 418642 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
676 246956 : return;
677 :
678 : /*
679 : * It is only safe to execute UPDATE/DELETE when all columns, referenced
680 : * in the row filters from publications which the relation is in, are
681 : * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
682 : * or the table does not publish UPDATEs or DELETEs.
683 : *
684 : * XXX We could optimize it by first checking whether any of the
685 : * publications have a row filter for this relation. If not and relation
686 : * has replica identity then we can avoid building the descriptor but as
687 : * this happens only one time it doesn't seem worth the additional
688 : * complexity.
689 : */
690 171686 : RelationBuildPublicationDesc(rel, &pubdesc);
691 171686 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
692 60 : ereport(ERROR,
693 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
694 : errmsg("cannot update table \"%s\"",
695 : RelationGetRelationName(rel)),
696 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
697 171626 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
698 108 : ereport(ERROR,
699 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
700 : errmsg("cannot update table \"%s\"",
701 : RelationGetRelationName(rel)),
702 : errdetail("Column list used by the publication does not cover the replica identity.")));
703 171518 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
704 0 : ereport(ERROR,
705 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
706 : errmsg("cannot delete from table \"%s\"",
707 : RelationGetRelationName(rel)),
708 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
709 171518 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
710 0 : ereport(ERROR,
711 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
712 : errmsg("cannot delete from table \"%s\"",
713 : RelationGetRelationName(rel)),
714 : errdetail("Column list used by the publication does not cover the replica identity.")));
715 :
716 : /* If relation has replica identity we are always good. */
717 171518 : if (OidIsValid(RelationGetReplicaIndex(rel)))
718 149672 : return;
719 :
720 : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
721 21846 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
722 412 : return;
723 :
724 : /*
725 : * This is UPDATE/DELETE and there is no replica identity.
726 : *
727 : * Check if the table publishes UPDATES or DELETES.
728 : */
729 21434 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
730 96 : ereport(ERROR,
731 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
732 : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
733 : RelationGetRelationName(rel)),
734 : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
735 21338 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
736 0 : ereport(ERROR,
737 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
738 : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
739 : RelationGetRelationName(rel)),
740 : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
741 : }
742 :
743 :
744 : /*
745 : * Check if we support writing into specific relkind.
746 : *
747 : * The nspname and relname are only needed for error reporting.
748 : */
749 : void
750 1586 : CheckSubscriptionRelkind(char relkind, const char *nspname,
751 : const char *relname)
752 : {
753 1586 : if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
754 0 : ereport(ERROR,
755 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
756 : errmsg("cannot use relation \"%s.%s\" as logical replication target",
757 : nspname, relname),
758 : errdetail_relkind_not_supported(relkind)));
759 1586 : }
|