LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 195 236 82.6 %
Date: 2023-12-02 15:10:53 Functions: 11 11 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/relscan.h"
      19             : #include "access/tableam.h"
      20             : #include "access/transam.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/pg_am_d.h"
      23             : #include "commands/trigger.h"
      24             : #include "executor/executor.h"
      25             : #include "executor/nodeModifyTable.h"
      26             : #include "nodes/nodeFuncs.h"
      27             : #include "parser/parse_relation.h"
      28             : #include "parser/parsetree.h"
      29             : #include "replication/logicalrelation.h"
      30             : #include "storage/bufmgr.h"
      31             : #include "storage/lmgr.h"
      32             : #include "utils/builtins.h"
      33             : #include "utils/datum.h"
      34             : #include "utils/lsyscache.h"
      35             : #include "utils/memutils.h"
      36             : #include "utils/rel.h"
      37             : #include "utils/snapmgr.h"
      38             : #include "utils/syscache.h"
      39             : #include "utils/typcache.h"
      40             : 
      41             : 
      42             : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      43             :                          TypeCacheEntry **eq);
      44             : 
      45             : /*
      46             :  * Returns the fixed strategy number, if any, of the equality operator for the
      47             :  * given index access method, otherwise, InvalidStrategy.
      48             :  *
      49             :  * Currently, only Btree and Hash indexes are supported. The other index access
      50             :  * methods don't have a fixed strategy for equality operation - instead, the
      51             :  * support routines of each operator class interpret the strategy numbers
      52             :  * according to the operator class's definition.
      53             :  */
      54             : StrategyNumber
      55      144212 : get_equal_strategy_number_for_am(Oid am)
      56             : {
      57             :     int         ret;
      58             : 
      59      144212 :     switch (am)
      60             :     {
      61      144200 :         case BTREE_AM_OID:
      62      144200 :             ret = BTEqualStrategyNumber;
      63      144200 :             break;
      64          12 :         case HASH_AM_OID:
      65          12 :             ret = HTEqualStrategyNumber;
      66          12 :             break;
      67           0 :         default:
      68             :             /* XXX: Only Btree and Hash indexes are supported */
      69           0 :             ret = InvalidStrategy;
      70           0 :             break;
      71             :     }
      72             : 
      73      144212 :     return ret;
      74             : }
      75             : 
      76             : /*
      77             :  * Return the appropriate strategy number which corresponds to the equality
      78             :  * operator.
      79             :  */
      80             : static StrategyNumber
      81      144176 : get_equal_strategy_number(Oid opclass)
      82             : {
      83      144176 :     Oid         am = get_opclass_method(opclass);
      84             : 
      85      144176 :     return get_equal_strategy_number_for_am(am);
      86             : }
      87             : 
      88             : /*
      89             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      90             :  * is setup to match 'rel' (*NOT* idxrel!).
      91             :  *
      92             :  * Returns how many columns to use for the index scan.
      93             :  *
      94             :  * This is not generic routine, idxrel must be PK, RI, or an index that can be
      95             :  * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      96             :  * for details.
      97             :  *
      98             :  * By definition, replication identity of a rel meets all limitations associated
      99             :  * with that. Note that any other index could also meet these limitations.
     100             :  */
     101             : static int
     102      144154 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
     103             :                          TupleTableSlot *searchslot)
     104             : {
     105             :     int         index_attoff;
     106      144154 :     int         skey_attoff = 0;
     107             :     Datum       indclassDatum;
     108             :     oidvector  *opclass;
     109      144154 :     int2vector *indkey = &idxrel->rd_index->indkey;
     110             : 
     111      144154 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
     112             :                                            Anum_pg_index_indclass);
     113      144154 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
     114             : 
     115             :     /* Build scankey for every non-expression attribute in the index. */
     116      288334 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
     117      144180 :          index_attoff++)
     118             :     {
     119             :         Oid         operator;
     120             :         Oid         optype;
     121             :         Oid         opfamily;
     122             :         RegProcedure regop;
     123      144180 :         int         table_attno = indkey->values[index_attoff];
     124             :         StrategyNumber eq_strategy;
     125             : 
     126      144180 :         if (!AttributeNumberIsValid(table_attno))
     127             :         {
     128             :             /*
     129             :              * XXX: Currently, we don't support expressions in the scan key,
     130             :              * see code below.
     131             :              */
     132           4 :             continue;
     133             :         }
     134             : 
     135             :         /*
     136             :          * Load the operator info.  We need this to get the equality operator
     137             :          * function for the scan key.
     138             :          */
     139      144176 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
     140      144176 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
     141      144176 :         eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
     142             : 
     143      144176 :         operator = get_opfamily_member(opfamily, optype,
     144             :                                        optype,
     145             :                                        eq_strategy);
     146             : 
     147      144176 :         if (!OidIsValid(operator))
     148           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     149             :                  eq_strategy, optype, optype, opfamily);
     150             : 
     151      144176 :         regop = get_opcode(operator);
     152             : 
     153             :         /* Initialize the scankey. */
     154      144176 :         ScanKeyInit(&skey[skey_attoff],
     155      144176 :                     index_attoff + 1,
     156             :                     eq_strategy,
     157             :                     regop,
     158      144176 :                     searchslot->tts_values[table_attno - 1]);
     159             : 
     160      144176 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     161             : 
     162             :         /* Check for null value. */
     163      144176 :         if (searchslot->tts_isnull[table_attno - 1])
     164           2 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     165             : 
     166      144176 :         skey_attoff++;
     167             :     }
     168             : 
     169             :     /* There must always be at least one attribute for the index scan. */
     170             :     Assert(skey_attoff > 0);
     171             : 
     172      144154 :     return skey_attoff;
     173             : }
     174             : 
     175             : /*
     176             :  * Search the relation 'rel' for tuple using the index.
     177             :  *
     178             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     179             :  * contents, and return true.  Return false otherwise.
     180             :  */
     181             : bool
     182      144154 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     183             :                              LockTupleMode lockmode,
     184             :                              TupleTableSlot *searchslot,
     185             :                              TupleTableSlot *outslot)
     186             : {
     187             :     ScanKeyData skey[INDEX_MAX_KEYS];
     188             :     int         skey_attoff;
     189             :     IndexScanDesc scan;
     190             :     SnapshotData snap;
     191             :     TransactionId xwait;
     192             :     Relation    idxrel;
     193             :     bool        found;
     194      144154 :     TypeCacheEntry **eq = NULL;
     195             :     bool        isIdxSafeToSkipDuplicates;
     196             : 
     197             :     /* Open the index. */
     198      144154 :     idxrel = index_open(idxoid, RowExclusiveLock);
     199             : 
     200      144154 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     201             : 
     202      144154 :     InitDirtySnapshot(snap);
     203             : 
     204             :     /* Build scan key. */
     205      144154 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     206             : 
     207             :     /* Start an index scan. */
     208      144154 :     scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
     209             : 
     210      144154 : retry:
     211      144154 :     found = false;
     212             : 
     213      144154 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     214             : 
     215             :     /* Try to find the tuple */
     216      144154 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     217             :     {
     218             :         /*
     219             :          * Avoid expensive equality check if the index is primary key or
     220             :          * replica identity index.
     221             :          */
     222      144138 :         if (!isIdxSafeToSkipDuplicates)
     223             :         {
     224          30 :             if (eq == NULL)
     225          30 :                 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     226             : 
     227          30 :             if (!tuples_equal(outslot, searchslot, eq))
     228           0 :                 continue;
     229             :         }
     230             : 
     231      144138 :         ExecMaterializeSlot(outslot);
     232             : 
     233      288276 :         xwait = TransactionIdIsValid(snap.xmin) ?
     234      144138 :             snap.xmin : snap.xmax;
     235             : 
     236             :         /*
     237             :          * If the tuple is locked, wait for locking transaction to finish and
     238             :          * retry.
     239             :          */
     240      144138 :         if (TransactionIdIsValid(xwait))
     241             :         {
     242           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     243           0 :             goto retry;
     244             :         }
     245             : 
     246             :         /* Found our tuple and it's not locked */
     247      144138 :         found = true;
     248      144138 :         break;
     249             :     }
     250             : 
     251             :     /* Found tuple, try to lock it in the lockmode. */
     252      144154 :     if (found)
     253             :     {
     254             :         TM_FailureData tmfd;
     255             :         TM_Result   res;
     256             : 
     257      144138 :         PushActiveSnapshot(GetLatestSnapshot());
     258             : 
     259      144138 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     260             :                                outslot,
     261             :                                GetCurrentCommandId(false),
     262             :                                lockmode,
     263             :                                LockWaitBlock,
     264             :                                0 /* don't follow updates */ ,
     265             :                                &tmfd);
     266             : 
     267      144138 :         PopActiveSnapshot();
     268             : 
     269      144138 :         switch (res)
     270             :         {
     271      144138 :             case TM_Ok:
     272      144138 :                 break;
     273           0 :             case TM_Updated:
     274             :                 /* XXX: Improve handling here */
     275           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     276           0 :                     ereport(LOG,
     277             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     278             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     279             :                 else
     280           0 :                     ereport(LOG,
     281             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     282             :                              errmsg("concurrent update, retrying")));
     283           0 :                 goto retry;
     284           0 :             case TM_Deleted:
     285             :                 /* XXX: Improve handling here */
     286           0 :                 ereport(LOG,
     287             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     288             :                          errmsg("concurrent delete, retrying")));
     289           0 :                 goto retry;
     290           0 :             case TM_Invisible:
     291           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     292             :                 break;
     293           0 :             default:
     294           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     295             :                 break;
     296             :         }
     297             :     }
     298             : 
     299      144154 :     index_endscan(scan);
     300             : 
     301             :     /* Don't release lock until commit. */
     302      144154 :     index_close(idxrel, NoLock);
     303             : 
     304      144154 :     return found;
     305             : }
     306             : 
     307             : /*
     308             :  * Compare the tuples in the slots by checking if they have equal values.
     309             :  */
     310             : static bool
     311      210564 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     312             :              TypeCacheEntry **eq)
     313             : {
     314             :     int         attrnum;
     315             : 
     316             :     Assert(slot1->tts_tupleDescriptor->natts ==
     317             :            slot2->tts_tupleDescriptor->natts);
     318             : 
     319      210564 :     slot_getallattrs(slot1);
     320      210564 :     slot_getallattrs(slot2);
     321             : 
     322             :     /* Check equality of the attributes. */
     323      210946 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     324             :     {
     325             :         Form_pg_attribute att;
     326             :         TypeCacheEntry *typentry;
     327             : 
     328      210624 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     329             : 
     330             :         /*
     331             :          * Ignore dropped and generated columns as the publisher doesn't send
     332             :          * those
     333             :          */
     334      210624 :         if (att->attisdropped || att->attgenerated)
     335           4 :             continue;
     336             : 
     337             :         /*
     338             :          * If one value is NULL and other is not, then they are certainly not
     339             :          * equal
     340             :          */
     341      210620 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     342           0 :             return false;
     343             : 
     344             :         /*
     345             :          * If both are NULL, they can be considered equal.
     346             :          */
     347      210620 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     348           2 :             continue;
     349             : 
     350      210618 :         typentry = eq[attrnum];
     351      210618 :         if (typentry == NULL)
     352             :         {
     353         376 :             typentry = lookup_type_cache(att->atttypid,
     354             :                                          TYPECACHE_EQ_OPR_FINFO);
     355         376 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     356           0 :                 ereport(ERROR,
     357             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     358             :                          errmsg("could not identify an equality operator for type %s",
     359             :                                 format_type_be(att->atttypid))));
     360         376 :             eq[attrnum] = typentry;
     361             :         }
     362             : 
     363      210618 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     364             :                                             att->attcollation,
     365      210618 :                                             slot1->tts_values[attrnum],
     366      210618 :                                             slot2->tts_values[attrnum])))
     367      210242 :             return false;
     368             :     }
     369             : 
     370         322 :     return true;
     371             : }
     372             : 
     373             : /*
     374             :  * Search the relation 'rel' for tuple using the sequential scan.
     375             :  *
     376             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     377             :  * contents, and return true.  Return false otherwise.
     378             :  *
     379             :  * Note that this stops on the first matching tuple.
     380             :  *
     381             :  * This can obviously be quite slow on tables that have more than few rows.
     382             :  */
     383             : bool
     384         292 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     385             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     386             : {
     387             :     TupleTableSlot *scanslot;
     388             :     TableScanDesc scan;
     389             :     SnapshotData snap;
     390             :     TypeCacheEntry **eq;
     391             :     TransactionId xwait;
     392             :     bool        found;
     393         292 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     394             : 
     395             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     396             : 
     397         292 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     398             : 
     399             :     /* Start a heap scan. */
     400         292 :     InitDirtySnapshot(snap);
     401         292 :     scan = table_beginscan(rel, &snap, 0, NULL);
     402         292 :     scanslot = table_slot_create(rel, NULL);
     403             : 
     404         292 : retry:
     405         292 :     found = false;
     406             : 
     407         292 :     table_rescan(scan, NULL);
     408             : 
     409             :     /* Try to find the tuple */
     410      210534 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     411             :     {
     412      210534 :         if (!tuples_equal(scanslot, searchslot, eq))
     413      210242 :             continue;
     414             : 
     415         292 :         found = true;
     416         292 :         ExecCopySlot(outslot, scanslot);
     417             : 
     418         584 :         xwait = TransactionIdIsValid(snap.xmin) ?
     419         292 :             snap.xmin : snap.xmax;
     420             : 
     421             :         /*
     422             :          * If the tuple is locked, wait for locking transaction to finish and
     423             :          * retry.
     424             :          */
     425         292 :         if (TransactionIdIsValid(xwait))
     426             :         {
     427           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     428           0 :             goto retry;
     429             :         }
     430             : 
     431             :         /* Found our tuple and it's not locked */
     432         292 :         break;
     433             :     }
     434             : 
     435             :     /* Found tuple, try to lock it in the lockmode. */
     436         292 :     if (found)
     437             :     {
     438             :         TM_FailureData tmfd;
     439             :         TM_Result   res;
     440             : 
     441         292 :         PushActiveSnapshot(GetLatestSnapshot());
     442             : 
     443         292 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     444             :                                outslot,
     445             :                                GetCurrentCommandId(false),
     446             :                                lockmode,
     447             :                                LockWaitBlock,
     448             :                                0 /* don't follow updates */ ,
     449             :                                &tmfd);
     450             : 
     451         292 :         PopActiveSnapshot();
     452             : 
     453         292 :         switch (res)
     454             :         {
     455         292 :             case TM_Ok:
     456         292 :                 break;
     457           0 :             case TM_Updated:
     458             :                 /* XXX: Improve handling here */
     459           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     460           0 :                     ereport(LOG,
     461             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     462             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     463             :                 else
     464           0 :                     ereport(LOG,
     465             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     466             :                              errmsg("concurrent update, retrying")));
     467           0 :                 goto retry;
     468           0 :             case TM_Deleted:
     469             :                 /* XXX: Improve handling here */
     470           0 :                 ereport(LOG,
     471             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     472             :                          errmsg("concurrent delete, retrying")));
     473           0 :                 goto retry;
     474           0 :             case TM_Invisible:
     475           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     476             :                 break;
     477           0 :             default:
     478           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     479             :                 break;
     480             :         }
     481             :     }
     482             : 
     483         292 :     table_endscan(scan);
     484         292 :     ExecDropSingleTupleTableSlot(scanslot);
     485             : 
     486         292 :     return found;
     487             : }
     488             : 
     489             : /*
     490             :  * Insert tuple represented in the slot to the relation, update the indexes,
     491             :  * and execute any constraints and per-row triggers.
     492             :  *
     493             :  * Caller is responsible for opening the indexes.
     494             :  */
     495             : void
     496      152266 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     497             :                          EState *estate, TupleTableSlot *slot)
     498             : {
     499      152266 :     bool        skip_tuple = false;
     500      152266 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     501             : 
     502             :     /* For now we support only tables. */
     503             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     504             : 
     505      152266 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     506             : 
     507             :     /* BEFORE ROW INSERT Triggers */
     508      152266 :     if (resultRelInfo->ri_TrigDesc &&
     509          38 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     510             :     {
     511           6 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     512           2 :             skip_tuple = true;  /* "do nothing" */
     513             :     }
     514             : 
     515      152266 :     if (!skip_tuple)
     516             :     {
     517      152264 :         List       *recheckIndexes = NIL;
     518             : 
     519             :         /* Compute stored generated columns */
     520      152264 :         if (rel->rd_att->constr &&
     521       90804 :             rel->rd_att->constr->has_generated_stored)
     522           8 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     523             :                                        CMD_INSERT);
     524             : 
     525             :         /* Check the constraints of the tuple */
     526      152264 :         if (rel->rd_att->constr)
     527       90804 :             ExecConstraints(resultRelInfo, slot, estate);
     528      152264 :         if (rel->rd_rel->relispartition)
     529         120 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     530             : 
     531             :         /* OK, store the tuple and create index entries for it */
     532      152264 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     533             : 
     534      152264 :         if (resultRelInfo->ri_NumIndices > 0)
     535      111732 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     536             :                                                    slot, estate, false, false,
     537             :                                                    NULL, NIL, false);
     538             : 
     539             :         /* AFTER ROW INSERT Triggers */
     540      152248 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     541             :                              recheckIndexes, NULL);
     542             : 
     543             :         /*
     544             :          * XXX we should in theory pass a TransitionCaptureState object to the
     545             :          * above to capture transition tuples, but after statement triggers
     546             :          * don't actually get fired by replication yet anyway
     547             :          */
     548             : 
     549      152248 :         list_free(recheckIndexes);
     550             :     }
     551      152250 : }
     552             : 
     553             : /*
     554             :  * Find the searchslot tuple and update it with data in the slot,
     555             :  * update the indexes, and execute any constraints and per-row triggers.
     556             :  *
     557             :  * Caller is responsible for opening the indexes.
     558             :  */
     559             : void
     560       63826 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     561             :                          EState *estate, EPQState *epqstate,
     562             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     563             : {
     564       63826 :     bool        skip_tuple = false;
     565       63826 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     566       63826 :     ItemPointer tid = &(searchslot->tts_tid);
     567             : 
     568             :     /* For now we support only tables. */
     569             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     570             : 
     571       63826 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     572             : 
     573             :     /* BEFORE ROW UPDATE Triggers */
     574       63826 :     if (resultRelInfo->ri_TrigDesc &&
     575          20 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     576             :     {
     577           6 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     578             :                                   tid, NULL, slot, NULL, NULL))
     579           4 :             skip_tuple = true;  /* "do nothing" */
     580             :     }
     581             : 
     582       63826 :     if (!skip_tuple)
     583             :     {
     584       63822 :         List       *recheckIndexes = NIL;
     585             :         TU_UpdateIndexes update_indexes;
     586             : 
     587             :         /* Compute stored generated columns */
     588       63822 :         if (rel->rd_att->constr &&
     589       63736 :             rel->rd_att->constr->has_generated_stored)
     590           6 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     591             :                                        CMD_UPDATE);
     592             : 
     593             :         /* Check the constraints of the tuple */
     594       63822 :         if (rel->rd_att->constr)
     595       63736 :             ExecConstraints(resultRelInfo, slot, estate);
     596       63822 :         if (rel->rd_rel->relispartition)
     597          22 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     598             : 
     599       63822 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     600             :                                   &update_indexes);
     601             : 
     602       63822 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     603       40294 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     604             :                                                    slot, estate, true, false,
     605             :                                                    NULL, NIL,
     606             :                                                    (update_indexes == TU_Summarizing));
     607             : 
     608             :         /* AFTER ROW UPDATE Triggers */
     609       63822 :         ExecARUpdateTriggers(estate, resultRelInfo,
     610             :                              NULL, NULL,
     611             :                              tid, NULL, slot,
     612             :                              recheckIndexes, NULL, false);
     613             : 
     614       63822 :         list_free(recheckIndexes);
     615             :     }
     616       63826 : }
     617             : 
     618             : /*
     619             :  * Find the searchslot tuple and delete it, and execute any constraints
     620             :  * and per-row triggers.
     621             :  *
     622             :  * Caller is responsible for opening the indexes.
     623             :  */
     624             : void
     625       80602 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     626             :                          EState *estate, EPQState *epqstate,
     627             :                          TupleTableSlot *searchslot)
     628             : {
     629       80602 :     bool        skip_tuple = false;
     630       80602 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     631       80602 :     ItemPointer tid = &searchslot->tts_tid;
     632             : 
     633       80602 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     634             : 
     635             :     /* BEFORE ROW DELETE Triggers */
     636       80602 :     if (resultRelInfo->ri_TrigDesc &&
     637          20 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     638             :     {
     639           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     640           0 :                                            tid, NULL, NULL, NULL, NULL);
     641             :     }
     642             : 
     643       80602 :     if (!skip_tuple)
     644             :     {
     645             :         /* OK, delete the tuple */
     646       80602 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     647             : 
     648             :         /* AFTER ROW DELETE Triggers */
     649       80602 :         ExecARDeleteTriggers(estate, resultRelInfo,
     650             :                              tid, NULL, NULL, false);
     651             :     }
     652       80602 : }
     653             : 
     654             : /*
     655             :  * Check if command can be executed with current replica identity.
     656             :  */
     657             : void
     658      410268 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     659             : {
     660             :     PublicationDesc pubdesc;
     661             : 
     662             :     /*
     663             :      * Skip checking the replica identity for partitioned tables, because the
     664             :      * operations are actually performed on the leaf partitions.
     665             :      */
     666      410268 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     667      388996 :         return;
     668             : 
     669             :     /* We only need to do checks for UPDATE and DELETE. */
     670      404754 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     671      233938 :         return;
     672             : 
     673             :     /*
     674             :      * It is only safe to execute UPDATE/DELETE when all columns, referenced
     675             :      * in the row filters from publications which the relation is in, are
     676             :      * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
     677             :      * or the table does not publish UPDATEs or DELETEs.
     678             :      *
     679             :      * XXX We could optimize it by first checking whether any of the
     680             :      * publications have a row filter for this relation. If not and relation
     681             :      * has replica identity then we can avoid building the descriptor but as
     682             :      * this happens only one time it doesn't seem worth the additional
     683             :      * complexity.
     684             :      */
     685      170816 :     RelationBuildPublicationDesc(rel, &pubdesc);
     686      170816 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
     687          60 :         ereport(ERROR,
     688             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     689             :                  errmsg("cannot update table \"%s\"",
     690             :                         RelationGetRelationName(rel)),
     691             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     692      170756 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
     693         108 :         ereport(ERROR,
     694             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     695             :                  errmsg("cannot update table \"%s\"",
     696             :                         RelationGetRelationName(rel)),
     697             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     698      170648 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
     699           0 :         ereport(ERROR,
     700             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     701             :                  errmsg("cannot delete from table \"%s\"",
     702             :                         RelationGetRelationName(rel)),
     703             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     704      170648 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
     705           0 :         ereport(ERROR,
     706             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     707             :                  errmsg("cannot delete from table \"%s\"",
     708             :                         RelationGetRelationName(rel)),
     709             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     710             : 
     711             :     /* If relation has replica identity we are always good. */
     712      170648 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
     713      149138 :         return;
     714             : 
     715             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
     716       21510 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     717         406 :         return;
     718             : 
     719             :     /*
     720             :      * This is UPDATE/DELETE and there is no replica identity.
     721             :      *
     722             :      * Check if the table publishes UPDATES or DELETES.
     723             :      */
     724       21104 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
     725          90 :         ereport(ERROR,
     726             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     727             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     728             :                         RelationGetRelationName(rel)),
     729             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     730       21014 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
     731           0 :         ereport(ERROR,
     732             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     733             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     734             :                         RelationGetRelationName(rel)),
     735             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     736             : }
     737             : 
     738             : 
     739             : /*
     740             :  * Check if we support writing into specific relkind.
     741             :  *
     742             :  * The nspname and relname are only needed for error reporting.
     743             :  */
     744             : void
     745        1544 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     746             :                          const char *relname)
     747             : {
     748        1544 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
     749           0 :         ereport(ERROR,
     750             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     751             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     752             :                         nspname, relname),
     753             :                  errdetail_relkind_not_supported(relkind)));
     754        1544 : }

Generated by: LCOV version 1.14