Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execReplication.c
4 : * miscellaneous executor routines for logical replication
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execReplication.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "access/genam.h"
18 : #include "access/relscan.h"
19 : #include "access/tableam.h"
20 : #include "access/transam.h"
21 : #include "access/xact.h"
22 : #include "catalog/pg_am_d.h"
23 : #include "commands/trigger.h"
24 : #include "executor/executor.h"
25 : #include "executor/nodeModifyTable.h"
26 : #include "nodes/nodeFuncs.h"
27 : #include "parser/parse_relation.h"
28 : #include "parser/parsetree.h"
29 : #include "replication/logicalrelation.h"
30 : #include "storage/bufmgr.h"
31 : #include "storage/lmgr.h"
32 : #include "utils/builtins.h"
33 : #include "utils/datum.h"
34 : #include "utils/lsyscache.h"
35 : #include "utils/memutils.h"
36 : #include "utils/rel.h"
37 : #include "utils/snapmgr.h"
38 : #include "utils/syscache.h"
39 : #include "utils/typcache.h"
40 :
41 :
42 : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
43 : TypeCacheEntry **eq);
44 :
45 : /*
46 : * Returns the fixed strategy number, if any, of the equality operator for the
47 : * given index access method, otherwise, InvalidStrategy.
48 : *
49 : * Currently, only Btree and Hash indexes are supported. The other index access
50 : * methods don't have a fixed strategy for equality operation - instead, the
51 : * support routines of each operator class interpret the strategy numbers
52 : * according to the operator class's definition.
53 : */
54 : StrategyNumber
55 144212 : get_equal_strategy_number_for_am(Oid am)
56 : {
57 : int ret;
58 :
59 144212 : switch (am)
60 : {
61 144200 : case BTREE_AM_OID:
62 144200 : ret = BTEqualStrategyNumber;
63 144200 : break;
64 12 : case HASH_AM_OID:
65 12 : ret = HTEqualStrategyNumber;
66 12 : break;
67 0 : default:
68 : /* XXX: Only Btree and Hash indexes are supported */
69 0 : ret = InvalidStrategy;
70 0 : break;
71 : }
72 :
73 144212 : return ret;
74 : }
75 :
76 : /*
77 : * Return the appropriate strategy number which corresponds to the equality
78 : * operator.
79 : */
80 : static StrategyNumber
81 144176 : get_equal_strategy_number(Oid opclass)
82 : {
83 144176 : Oid am = get_opclass_method(opclass);
84 :
85 144176 : return get_equal_strategy_number_for_am(am);
86 : }
87 :
88 : /*
89 : * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
90 : * is setup to match 'rel' (*NOT* idxrel!).
91 : *
92 : * Returns how many columns to use for the index scan.
93 : *
94 : * This is not generic routine, idxrel must be PK, RI, or an index that can be
95 : * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
96 : * for details.
97 : *
98 : * By definition, replication identity of a rel meets all limitations associated
99 : * with that. Note that any other index could also meet these limitations.
100 : */
101 : static int
102 144154 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
103 : TupleTableSlot *searchslot)
104 : {
105 : int index_attoff;
106 144154 : int skey_attoff = 0;
107 : Datum indclassDatum;
108 : oidvector *opclass;
109 144154 : int2vector *indkey = &idxrel->rd_index->indkey;
110 :
111 144154 : indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
112 : Anum_pg_index_indclass);
113 144154 : opclass = (oidvector *) DatumGetPointer(indclassDatum);
114 :
115 : /* Build scankey for every non-expression attribute in the index. */
116 288334 : for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
117 144180 : index_attoff++)
118 : {
119 : Oid operator;
120 : Oid optype;
121 : Oid opfamily;
122 : RegProcedure regop;
123 144180 : int table_attno = indkey->values[index_attoff];
124 : StrategyNumber eq_strategy;
125 :
126 144180 : if (!AttributeNumberIsValid(table_attno))
127 : {
128 : /*
129 : * XXX: Currently, we don't support expressions in the scan key,
130 : * see code below.
131 : */
132 4 : continue;
133 : }
134 :
135 : /*
136 : * Load the operator info. We need this to get the equality operator
137 : * function for the scan key.
138 : */
139 144176 : optype = get_opclass_input_type(opclass->values[index_attoff]);
140 144176 : opfamily = get_opclass_family(opclass->values[index_attoff]);
141 144176 : eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
142 :
143 144176 : operator = get_opfamily_member(opfamily, optype,
144 : optype,
145 : eq_strategy);
146 :
147 144176 : if (!OidIsValid(operator))
148 0 : elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
149 : eq_strategy, optype, optype, opfamily);
150 :
151 144176 : regop = get_opcode(operator);
152 :
153 : /* Initialize the scankey. */
154 144176 : ScanKeyInit(&skey[skey_attoff],
155 144176 : index_attoff + 1,
156 : eq_strategy,
157 : regop,
158 144176 : searchslot->tts_values[table_attno - 1]);
159 :
160 144176 : skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
161 :
162 : /* Check for null value. */
163 144176 : if (searchslot->tts_isnull[table_attno - 1])
164 2 : skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
165 :
166 144176 : skey_attoff++;
167 : }
168 :
169 : /* There must always be at least one attribute for the index scan. */
170 : Assert(skey_attoff > 0);
171 :
172 144154 : return skey_attoff;
173 : }
174 :
175 : /*
176 : * Search the relation 'rel' for tuple using the index.
177 : *
178 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
179 : * contents, and return true. Return false otherwise.
180 : */
181 : bool
182 144154 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
183 : LockTupleMode lockmode,
184 : TupleTableSlot *searchslot,
185 : TupleTableSlot *outslot)
186 : {
187 : ScanKeyData skey[INDEX_MAX_KEYS];
188 : int skey_attoff;
189 : IndexScanDesc scan;
190 : SnapshotData snap;
191 : TransactionId xwait;
192 : Relation idxrel;
193 : bool found;
194 144154 : TypeCacheEntry **eq = NULL;
195 : bool isIdxSafeToSkipDuplicates;
196 :
197 : /* Open the index. */
198 144154 : idxrel = index_open(idxoid, RowExclusiveLock);
199 :
200 144154 : isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
201 :
202 144154 : InitDirtySnapshot(snap);
203 :
204 : /* Build scan key. */
205 144154 : skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
206 :
207 : /* Start an index scan. */
208 144154 : scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
209 :
210 144154 : retry:
211 144154 : found = false;
212 :
213 144154 : index_rescan(scan, skey, skey_attoff, NULL, 0);
214 :
215 : /* Try to find the tuple */
216 144154 : while (index_getnext_slot(scan, ForwardScanDirection, outslot))
217 : {
218 : /*
219 : * Avoid expensive equality check if the index is primary key or
220 : * replica identity index.
221 : */
222 144138 : if (!isIdxSafeToSkipDuplicates)
223 : {
224 30 : if (eq == NULL)
225 30 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
226 :
227 30 : if (!tuples_equal(outslot, searchslot, eq))
228 0 : continue;
229 : }
230 :
231 144138 : ExecMaterializeSlot(outslot);
232 :
233 288276 : xwait = TransactionIdIsValid(snap.xmin) ?
234 144138 : snap.xmin : snap.xmax;
235 :
236 : /*
237 : * If the tuple is locked, wait for locking transaction to finish and
238 : * retry.
239 : */
240 144138 : if (TransactionIdIsValid(xwait))
241 : {
242 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
243 0 : goto retry;
244 : }
245 :
246 : /* Found our tuple and it's not locked */
247 144138 : found = true;
248 144138 : break;
249 : }
250 :
251 : /* Found tuple, try to lock it in the lockmode. */
252 144154 : if (found)
253 : {
254 : TM_FailureData tmfd;
255 : TM_Result res;
256 :
257 144138 : PushActiveSnapshot(GetLatestSnapshot());
258 :
259 144138 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
260 : outslot,
261 : GetCurrentCommandId(false),
262 : lockmode,
263 : LockWaitBlock,
264 : 0 /* don't follow updates */ ,
265 : &tmfd);
266 :
267 144138 : PopActiveSnapshot();
268 :
269 144138 : switch (res)
270 : {
271 144138 : case TM_Ok:
272 144138 : break;
273 0 : case TM_Updated:
274 : /* XXX: Improve handling here */
275 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
276 0 : ereport(LOG,
277 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
278 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
279 : else
280 0 : ereport(LOG,
281 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
282 : errmsg("concurrent update, retrying")));
283 0 : goto retry;
284 0 : case TM_Deleted:
285 : /* XXX: Improve handling here */
286 0 : ereport(LOG,
287 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
288 : errmsg("concurrent delete, retrying")));
289 0 : goto retry;
290 0 : case TM_Invisible:
291 0 : elog(ERROR, "attempted to lock invisible tuple");
292 : break;
293 0 : default:
294 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
295 : break;
296 : }
297 : }
298 :
299 144154 : index_endscan(scan);
300 :
301 : /* Don't release lock until commit. */
302 144154 : index_close(idxrel, NoLock);
303 :
304 144154 : return found;
305 : }
306 :
307 : /*
308 : * Compare the tuples in the slots by checking if they have equal values.
309 : */
310 : static bool
311 210564 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
312 : TypeCacheEntry **eq)
313 : {
314 : int attrnum;
315 :
316 : Assert(slot1->tts_tupleDescriptor->natts ==
317 : slot2->tts_tupleDescriptor->natts);
318 :
319 210564 : slot_getallattrs(slot1);
320 210564 : slot_getallattrs(slot2);
321 :
322 : /* Check equality of the attributes. */
323 210946 : for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
324 : {
325 : Form_pg_attribute att;
326 : TypeCacheEntry *typentry;
327 :
328 210624 : att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
329 :
330 : /*
331 : * Ignore dropped and generated columns as the publisher doesn't send
332 : * those
333 : */
334 210624 : if (att->attisdropped || att->attgenerated)
335 4 : continue;
336 :
337 : /*
338 : * If one value is NULL and other is not, then they are certainly not
339 : * equal
340 : */
341 210620 : if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
342 0 : return false;
343 :
344 : /*
345 : * If both are NULL, they can be considered equal.
346 : */
347 210620 : if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
348 2 : continue;
349 :
350 210618 : typentry = eq[attrnum];
351 210618 : if (typentry == NULL)
352 : {
353 376 : typentry = lookup_type_cache(att->atttypid,
354 : TYPECACHE_EQ_OPR_FINFO);
355 376 : if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
356 0 : ereport(ERROR,
357 : (errcode(ERRCODE_UNDEFINED_FUNCTION),
358 : errmsg("could not identify an equality operator for type %s",
359 : format_type_be(att->atttypid))));
360 376 : eq[attrnum] = typentry;
361 : }
362 :
363 210618 : if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
364 : att->attcollation,
365 210618 : slot1->tts_values[attrnum],
366 210618 : slot2->tts_values[attrnum])))
367 210242 : return false;
368 : }
369 :
370 322 : return true;
371 : }
372 :
373 : /*
374 : * Search the relation 'rel' for tuple using the sequential scan.
375 : *
376 : * If a matching tuple is found, lock it with lockmode, fill the slot with its
377 : * contents, and return true. Return false otherwise.
378 : *
379 : * Note that this stops on the first matching tuple.
380 : *
381 : * This can obviously be quite slow on tables that have more than few rows.
382 : */
383 : bool
384 292 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
385 : TupleTableSlot *searchslot, TupleTableSlot *outslot)
386 : {
387 : TupleTableSlot *scanslot;
388 : TableScanDesc scan;
389 : SnapshotData snap;
390 : TypeCacheEntry **eq;
391 : TransactionId xwait;
392 : bool found;
393 292 : TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
394 :
395 : Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
396 :
397 292 : eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
398 :
399 : /* Start a heap scan. */
400 292 : InitDirtySnapshot(snap);
401 292 : scan = table_beginscan(rel, &snap, 0, NULL);
402 292 : scanslot = table_slot_create(rel, NULL);
403 :
404 292 : retry:
405 292 : found = false;
406 :
407 292 : table_rescan(scan, NULL);
408 :
409 : /* Try to find the tuple */
410 210534 : while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
411 : {
412 210534 : if (!tuples_equal(scanslot, searchslot, eq))
413 210242 : continue;
414 :
415 292 : found = true;
416 292 : ExecCopySlot(outslot, scanslot);
417 :
418 584 : xwait = TransactionIdIsValid(snap.xmin) ?
419 292 : snap.xmin : snap.xmax;
420 :
421 : /*
422 : * If the tuple is locked, wait for locking transaction to finish and
423 : * retry.
424 : */
425 292 : if (TransactionIdIsValid(xwait))
426 : {
427 0 : XactLockTableWait(xwait, NULL, NULL, XLTW_None);
428 0 : goto retry;
429 : }
430 :
431 : /* Found our tuple and it's not locked */
432 292 : break;
433 : }
434 :
435 : /* Found tuple, try to lock it in the lockmode. */
436 292 : if (found)
437 : {
438 : TM_FailureData tmfd;
439 : TM_Result res;
440 :
441 292 : PushActiveSnapshot(GetLatestSnapshot());
442 :
443 292 : res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
444 : outslot,
445 : GetCurrentCommandId(false),
446 : lockmode,
447 : LockWaitBlock,
448 : 0 /* don't follow updates */ ,
449 : &tmfd);
450 :
451 292 : PopActiveSnapshot();
452 :
453 292 : switch (res)
454 : {
455 292 : case TM_Ok:
456 292 : break;
457 0 : case TM_Updated:
458 : /* XXX: Improve handling here */
459 0 : if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
460 0 : ereport(LOG,
461 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
462 : errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
463 : else
464 0 : ereport(LOG,
465 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
466 : errmsg("concurrent update, retrying")));
467 0 : goto retry;
468 0 : case TM_Deleted:
469 : /* XXX: Improve handling here */
470 0 : ereport(LOG,
471 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
472 : errmsg("concurrent delete, retrying")));
473 0 : goto retry;
474 0 : case TM_Invisible:
475 0 : elog(ERROR, "attempted to lock invisible tuple");
476 : break;
477 0 : default:
478 0 : elog(ERROR, "unexpected table_tuple_lock status: %u", res);
479 : break;
480 : }
481 : }
482 :
483 292 : table_endscan(scan);
484 292 : ExecDropSingleTupleTableSlot(scanslot);
485 :
486 292 : return found;
487 : }
488 :
489 : /*
490 : * Insert tuple represented in the slot to the relation, update the indexes,
491 : * and execute any constraints and per-row triggers.
492 : *
493 : * Caller is responsible for opening the indexes.
494 : */
495 : void
496 152266 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
497 : EState *estate, TupleTableSlot *slot)
498 : {
499 152266 : bool skip_tuple = false;
500 152266 : Relation rel = resultRelInfo->ri_RelationDesc;
501 :
502 : /* For now we support only tables. */
503 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
504 :
505 152266 : CheckCmdReplicaIdentity(rel, CMD_INSERT);
506 :
507 : /* BEFORE ROW INSERT Triggers */
508 152266 : if (resultRelInfo->ri_TrigDesc &&
509 38 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
510 : {
511 6 : if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
512 2 : skip_tuple = true; /* "do nothing" */
513 : }
514 :
515 152266 : if (!skip_tuple)
516 : {
517 152264 : List *recheckIndexes = NIL;
518 :
519 : /* Compute stored generated columns */
520 152264 : if (rel->rd_att->constr &&
521 90804 : rel->rd_att->constr->has_generated_stored)
522 8 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
523 : CMD_INSERT);
524 :
525 : /* Check the constraints of the tuple */
526 152264 : if (rel->rd_att->constr)
527 90804 : ExecConstraints(resultRelInfo, slot, estate);
528 152264 : if (rel->rd_rel->relispartition)
529 120 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
530 :
531 : /* OK, store the tuple and create index entries for it */
532 152264 : simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
533 :
534 152264 : if (resultRelInfo->ri_NumIndices > 0)
535 111732 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
536 : slot, estate, false, false,
537 : NULL, NIL, false);
538 :
539 : /* AFTER ROW INSERT Triggers */
540 152248 : ExecARInsertTriggers(estate, resultRelInfo, slot,
541 : recheckIndexes, NULL);
542 :
543 : /*
544 : * XXX we should in theory pass a TransitionCaptureState object to the
545 : * above to capture transition tuples, but after statement triggers
546 : * don't actually get fired by replication yet anyway
547 : */
548 :
549 152248 : list_free(recheckIndexes);
550 : }
551 152250 : }
552 :
553 : /*
554 : * Find the searchslot tuple and update it with data in the slot,
555 : * update the indexes, and execute any constraints and per-row triggers.
556 : *
557 : * Caller is responsible for opening the indexes.
558 : */
559 : void
560 63826 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
561 : EState *estate, EPQState *epqstate,
562 : TupleTableSlot *searchslot, TupleTableSlot *slot)
563 : {
564 63826 : bool skip_tuple = false;
565 63826 : Relation rel = resultRelInfo->ri_RelationDesc;
566 63826 : ItemPointer tid = &(searchslot->tts_tid);
567 :
568 : /* For now we support only tables. */
569 : Assert(rel->rd_rel->relkind == RELKIND_RELATION);
570 :
571 63826 : CheckCmdReplicaIdentity(rel, CMD_UPDATE);
572 :
573 : /* BEFORE ROW UPDATE Triggers */
574 63826 : if (resultRelInfo->ri_TrigDesc &&
575 20 : resultRelInfo->ri_TrigDesc->trig_update_before_row)
576 : {
577 6 : if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
578 : tid, NULL, slot, NULL, NULL))
579 4 : skip_tuple = true; /* "do nothing" */
580 : }
581 :
582 63826 : if (!skip_tuple)
583 : {
584 63822 : List *recheckIndexes = NIL;
585 : TU_UpdateIndexes update_indexes;
586 :
587 : /* Compute stored generated columns */
588 63822 : if (rel->rd_att->constr &&
589 63736 : rel->rd_att->constr->has_generated_stored)
590 6 : ExecComputeStoredGenerated(resultRelInfo, estate, slot,
591 : CMD_UPDATE);
592 :
593 : /* Check the constraints of the tuple */
594 63822 : if (rel->rd_att->constr)
595 63736 : ExecConstraints(resultRelInfo, slot, estate);
596 63822 : if (rel->rd_rel->relispartition)
597 22 : ExecPartitionCheck(resultRelInfo, slot, estate, true);
598 :
599 63822 : simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
600 : &update_indexes);
601 :
602 63822 : if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
603 40294 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
604 : slot, estate, true, false,
605 : NULL, NIL,
606 : (update_indexes == TU_Summarizing));
607 :
608 : /* AFTER ROW UPDATE Triggers */
609 63822 : ExecARUpdateTriggers(estate, resultRelInfo,
610 : NULL, NULL,
611 : tid, NULL, slot,
612 : recheckIndexes, NULL, false);
613 :
614 63822 : list_free(recheckIndexes);
615 : }
616 63826 : }
617 :
618 : /*
619 : * Find the searchslot tuple and delete it, and execute any constraints
620 : * and per-row triggers.
621 : *
622 : * Caller is responsible for opening the indexes.
623 : */
624 : void
625 80602 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
626 : EState *estate, EPQState *epqstate,
627 : TupleTableSlot *searchslot)
628 : {
629 80602 : bool skip_tuple = false;
630 80602 : Relation rel = resultRelInfo->ri_RelationDesc;
631 80602 : ItemPointer tid = &searchslot->tts_tid;
632 :
633 80602 : CheckCmdReplicaIdentity(rel, CMD_DELETE);
634 :
635 : /* BEFORE ROW DELETE Triggers */
636 80602 : if (resultRelInfo->ri_TrigDesc &&
637 20 : resultRelInfo->ri_TrigDesc->trig_delete_before_row)
638 : {
639 0 : skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
640 0 : tid, NULL, NULL, NULL, NULL);
641 : }
642 :
643 80602 : if (!skip_tuple)
644 : {
645 : /* OK, delete the tuple */
646 80602 : simple_table_tuple_delete(rel, tid, estate->es_snapshot);
647 :
648 : /* AFTER ROW DELETE Triggers */
649 80602 : ExecARDeleteTriggers(estate, resultRelInfo,
650 : tid, NULL, NULL, false);
651 : }
652 80602 : }
653 :
654 : /*
655 : * Check if command can be executed with current replica identity.
656 : */
657 : void
658 410268 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
659 : {
660 : PublicationDesc pubdesc;
661 :
662 : /*
663 : * Skip checking the replica identity for partitioned tables, because the
664 : * operations are actually performed on the leaf partitions.
665 : */
666 410268 : if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
667 388996 : return;
668 :
669 : /* We only need to do checks for UPDATE and DELETE. */
670 404754 : if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
671 233938 : return;
672 :
673 : /*
674 : * It is only safe to execute UPDATE/DELETE when all columns, referenced
675 : * in the row filters from publications which the relation is in, are
676 : * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
677 : * or the table does not publish UPDATEs or DELETEs.
678 : *
679 : * XXX We could optimize it by first checking whether any of the
680 : * publications have a row filter for this relation. If not and relation
681 : * has replica identity then we can avoid building the descriptor but as
682 : * this happens only one time it doesn't seem worth the additional
683 : * complexity.
684 : */
685 170816 : RelationBuildPublicationDesc(rel, &pubdesc);
686 170816 : if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
687 60 : ereport(ERROR,
688 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
689 : errmsg("cannot update table \"%s\"",
690 : RelationGetRelationName(rel)),
691 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
692 170756 : else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
693 108 : ereport(ERROR,
694 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
695 : errmsg("cannot update table \"%s\"",
696 : RelationGetRelationName(rel)),
697 : errdetail("Column list used by the publication does not cover the replica identity.")));
698 170648 : else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
699 0 : ereport(ERROR,
700 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
701 : errmsg("cannot delete from table \"%s\"",
702 : RelationGetRelationName(rel)),
703 : errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
704 170648 : else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
705 0 : ereport(ERROR,
706 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
707 : errmsg("cannot delete from table \"%s\"",
708 : RelationGetRelationName(rel)),
709 : errdetail("Column list used by the publication does not cover the replica identity.")));
710 :
711 : /* If relation has replica identity we are always good. */
712 170648 : if (OidIsValid(RelationGetReplicaIndex(rel)))
713 149138 : return;
714 :
715 : /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
716 21510 : if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
717 406 : return;
718 :
719 : /*
720 : * This is UPDATE/DELETE and there is no replica identity.
721 : *
722 : * Check if the table publishes UPDATES or DELETES.
723 : */
724 21104 : if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
725 90 : ereport(ERROR,
726 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
727 : errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
728 : RelationGetRelationName(rel)),
729 : errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
730 21014 : else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
731 0 : ereport(ERROR,
732 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
733 : errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
734 : RelationGetRelationName(rel)),
735 : errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
736 : }
737 :
738 :
739 : /*
740 : * Check if we support writing into specific relkind.
741 : *
742 : * The nspname and relname are only needed for error reporting.
743 : */
744 : void
745 1544 : CheckSubscriptionRelkind(char relkind, const char *nspname,
746 : const char *relname)
747 : {
748 1544 : if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
749 0 : ereport(ERROR,
750 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
751 : errmsg("cannot use relation \"%s.%s\" as logical replication target",
752 : nspname, relname),
753 : errdetail_relkind_not_supported(relkind)));
754 1544 : }
|