LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 202 244 82.8 %
Date: 2024-03-28 10:11:15 Functions: 11 11 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/relscan.h"
      19             : #include "access/tableam.h"
      20             : #include "access/transam.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/pg_am_d.h"
      23             : #include "commands/trigger.h"
      24             : #include "executor/executor.h"
      25             : #include "executor/nodeModifyTable.h"
      26             : #include "replication/logicalrelation.h"
      27             : #include "storage/lmgr.h"
      28             : #include "utils/builtins.h"
      29             : #include "utils/lsyscache.h"
      30             : #include "utils/rel.h"
      31             : #include "utils/snapmgr.h"
      32             : #include "utils/syscache.h"
      33             : #include "utils/typcache.h"
      34             : 
      35             : 
      36             : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      37             :                          TypeCacheEntry **eq);
      38             : 
      39             : /*
      40             :  * Returns the fixed strategy number, if any, of the equality operator for the
      41             :  * given index access method, otherwise, InvalidStrategy.
      42             :  *
      43             :  * Currently, only Btree and Hash indexes are supported. The other index access
      44             :  * methods don't have a fixed strategy for equality operation - instead, the
      45             :  * support routines of each operator class interpret the strategy numbers
      46             :  * according to the operator class's definition.
      47             :  */
      48             : StrategyNumber
      49      144212 : get_equal_strategy_number_for_am(Oid am)
      50             : {
      51             :     int         ret;
      52             : 
      53      144212 :     switch (am)
      54             :     {
      55      144200 :         case BTREE_AM_OID:
      56      144200 :             ret = BTEqualStrategyNumber;
      57      144200 :             break;
      58          12 :         case HASH_AM_OID:
      59          12 :             ret = HTEqualStrategyNumber;
      60          12 :             break;
      61           0 :         default:
      62             :             /* XXX: Only Btree and Hash indexes are supported */
      63           0 :             ret = InvalidStrategy;
      64           0 :             break;
      65             :     }
      66             : 
      67      144212 :     return ret;
      68             : }
      69             : 
      70             : /*
      71             :  * Return the appropriate strategy number which corresponds to the equality
      72             :  * operator.
      73             :  */
      74             : static StrategyNumber
      75      144176 : get_equal_strategy_number(Oid opclass)
      76             : {
      77      144176 :     Oid         am = get_opclass_method(opclass);
      78             : 
      79      144176 :     return get_equal_strategy_number_for_am(am);
      80             : }
      81             : 
      82             : /*
      83             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      84             :  * is setup to match 'rel' (*NOT* idxrel!).
      85             :  *
      86             :  * Returns how many columns to use for the index scan.
      87             :  *
      88             :  * This is not generic routine, idxrel must be PK, RI, or an index that can be
      89             :  * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      90             :  * for details.
      91             :  *
      92             :  * By definition, replication identity of a rel meets all limitations associated
      93             :  * with that. Note that any other index could also meet these limitations.
      94             :  */
      95             : static int
      96      144154 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      97             :                          TupleTableSlot *searchslot)
      98             : {
      99             :     int         index_attoff;
     100      144154 :     int         skey_attoff = 0;
     101             :     Datum       indclassDatum;
     102             :     oidvector  *opclass;
     103      144154 :     int2vector *indkey = &idxrel->rd_index->indkey;
     104             : 
     105      144154 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
     106             :                                            Anum_pg_index_indclass);
     107      144154 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
     108             : 
     109             :     /* Build scankey for every non-expression attribute in the index. */
     110      288334 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
     111      144180 :          index_attoff++)
     112             :     {
     113             :         Oid         operator;
     114             :         Oid         optype;
     115             :         Oid         opfamily;
     116             :         RegProcedure regop;
     117      144180 :         int         table_attno = indkey->values[index_attoff];
     118             :         StrategyNumber eq_strategy;
     119             : 
     120      144180 :         if (!AttributeNumberIsValid(table_attno))
     121             :         {
     122             :             /*
     123             :              * XXX: Currently, we don't support expressions in the scan key,
     124             :              * see code below.
     125             :              */
     126           4 :             continue;
     127             :         }
     128             : 
     129             :         /*
     130             :          * Load the operator info.  We need this to get the equality operator
     131             :          * function for the scan key.
     132             :          */
     133      144176 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
     134      144176 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
     135      144176 :         eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
     136             : 
     137      144176 :         operator = get_opfamily_member(opfamily, optype,
     138             :                                        optype,
     139             :                                        eq_strategy);
     140             : 
     141      144176 :         if (!OidIsValid(operator))
     142           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     143             :                  eq_strategy, optype, optype, opfamily);
     144             : 
     145      144176 :         regop = get_opcode(operator);
     146             : 
     147             :         /* Initialize the scankey. */
     148      144176 :         ScanKeyInit(&skey[skey_attoff],
     149      144176 :                     index_attoff + 1,
     150             :                     eq_strategy,
     151             :                     regop,
     152      144176 :                     searchslot->tts_values[table_attno - 1]);
     153             : 
     154      144176 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     155             : 
     156             :         /* Check for null value. */
     157      144176 :         if (searchslot->tts_isnull[table_attno - 1])
     158           2 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     159             : 
     160      144176 :         skey_attoff++;
     161             :     }
     162             : 
     163             :     /* There must always be at least one attribute for the index scan. */
     164             :     Assert(skey_attoff > 0);
     165             : 
     166      144154 :     return skey_attoff;
     167             : }
     168             : 
     169             : /*
     170             :  * Search the relation 'rel' for tuple using the index.
     171             :  *
     172             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     173             :  * contents, and return true.  Return false otherwise.
     174             :  */
     175             : bool
     176      144154 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     177             :                              LockTupleMode lockmode,
     178             :                              TupleTableSlot *searchslot,
     179             :                              TupleTableSlot *outslot)
     180             : {
     181             :     ScanKeyData skey[INDEX_MAX_KEYS];
     182             :     int         skey_attoff;
     183             :     IndexScanDesc scan;
     184             :     SnapshotData snap;
     185             :     TransactionId xwait;
     186             :     Relation    idxrel;
     187             :     bool        found;
     188      144154 :     TypeCacheEntry **eq = NULL;
     189             :     bool        isIdxSafeToSkipDuplicates;
     190             : 
     191             :     /* Open the index. */
     192      144154 :     idxrel = index_open(idxoid, RowExclusiveLock);
     193             : 
     194      144154 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     195             : 
     196      144154 :     InitDirtySnapshot(snap);
     197             : 
     198             :     /* Build scan key. */
     199      144154 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     200             : 
     201             :     /* Start an index scan. */
     202      144154 :     scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
     203             : 
     204      144154 : retry:
     205      144154 :     found = false;
     206             : 
     207      144154 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     208             : 
     209             :     /* Try to find the tuple */
     210      144154 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     211             :     {
     212             :         /*
     213             :          * Avoid expensive equality check if the index is primary key or
     214             :          * replica identity index.
     215             :          */
     216      144138 :         if (!isIdxSafeToSkipDuplicates)
     217             :         {
     218          30 :             if (eq == NULL)
     219          30 :                 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     220             : 
     221          30 :             if (!tuples_equal(outslot, searchslot, eq))
     222           0 :                 continue;
     223             :         }
     224             : 
     225      144138 :         ExecMaterializeSlot(outslot);
     226             : 
     227      288276 :         xwait = TransactionIdIsValid(snap.xmin) ?
     228      144138 :             snap.xmin : snap.xmax;
     229             : 
     230             :         /*
     231             :          * If the tuple is locked, wait for locking transaction to finish and
     232             :          * retry.
     233             :          */
     234      144138 :         if (TransactionIdIsValid(xwait))
     235             :         {
     236           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     237           0 :             goto retry;
     238             :         }
     239             : 
     240             :         /* Found our tuple and it's not locked */
     241      144138 :         found = true;
     242      144138 :         break;
     243             :     }
     244             : 
     245             :     /* Found tuple, try to lock it in the lockmode. */
     246      144154 :     if (found)
     247             :     {
     248             :         TM_FailureData tmfd;
     249             :         TM_Result   res;
     250             : 
     251      144138 :         PushActiveSnapshot(GetLatestSnapshot());
     252             : 
     253      144138 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     254             :                                outslot,
     255             :                                GetCurrentCommandId(false),
     256             :                                lockmode,
     257             :                                LockWaitBlock,
     258             :                                0 /* don't follow updates */ ,
     259             :                                &tmfd);
     260             : 
     261      144138 :         PopActiveSnapshot();
     262             : 
     263      144138 :         switch (res)
     264             :         {
     265      144138 :             case TM_Ok:
     266      144138 :                 break;
     267           0 :             case TM_Updated:
     268             :                 /* XXX: Improve handling here */
     269           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     270           0 :                     ereport(LOG,
     271             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     272             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     273             :                 else
     274           0 :                     ereport(LOG,
     275             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     276             :                              errmsg("concurrent update, retrying")));
     277           0 :                 goto retry;
     278           0 :             case TM_Deleted:
     279             :                 /* XXX: Improve handling here */
     280           0 :                 ereport(LOG,
     281             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     282             :                          errmsg("concurrent delete, retrying")));
     283           0 :                 goto retry;
     284           0 :             case TM_Invisible:
     285           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     286             :                 break;
     287           0 :             default:
     288           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     289             :                 break;
     290             :         }
     291             :     }
     292             : 
     293      144154 :     index_endscan(scan);
     294             : 
     295             :     /* Don't release lock until commit. */
     296      144154 :     index_close(idxrel, NoLock);
     297             : 
     298      144154 :     return found;
     299             : }
     300             : 
     301             : /*
     302             :  * Compare the tuples in the slots by checking if they have equal values.
     303             :  */
     304             : static bool
     305      210564 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     306             :              TypeCacheEntry **eq)
     307             : {
     308             :     int         attrnum;
     309             : 
     310             :     Assert(slot1->tts_tupleDescriptor->natts ==
     311             :            slot2->tts_tupleDescriptor->natts);
     312             : 
     313      210564 :     slot_getallattrs(slot1);
     314      210564 :     slot_getallattrs(slot2);
     315             : 
     316             :     /* Check equality of the attributes. */
     317      210946 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     318             :     {
     319             :         Form_pg_attribute att;
     320             :         TypeCacheEntry *typentry;
     321             : 
     322      210624 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     323             : 
     324             :         /*
     325             :          * Ignore dropped and generated columns as the publisher doesn't send
     326             :          * those
     327             :          */
     328      210624 :         if (att->attisdropped || att->attgenerated)
     329           4 :             continue;
     330             : 
     331             :         /*
     332             :          * If one value is NULL and other is not, then they are certainly not
     333             :          * equal
     334             :          */
     335      210620 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     336           0 :             return false;
     337             : 
     338             :         /*
     339             :          * If both are NULL, they can be considered equal.
     340             :          */
     341      210620 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     342           2 :             continue;
     343             : 
     344      210618 :         typentry = eq[attrnum];
     345      210618 :         if (typentry == NULL)
     346             :         {
     347         376 :             typentry = lookup_type_cache(att->atttypid,
     348             :                                          TYPECACHE_EQ_OPR_FINFO);
     349         376 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     350           0 :                 ereport(ERROR,
     351             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     352             :                          errmsg("could not identify an equality operator for type %s",
     353             :                                 format_type_be(att->atttypid))));
     354         376 :             eq[attrnum] = typentry;
     355             :         }
     356             : 
     357      210618 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     358             :                                             att->attcollation,
     359      210618 :                                             slot1->tts_values[attrnum],
     360      210618 :                                             slot2->tts_values[attrnum])))
     361      210242 :             return false;
     362             :     }
     363             : 
     364         322 :     return true;
     365             : }
     366             : 
     367             : /*
     368             :  * Search the relation 'rel' for tuple using the sequential scan.
     369             :  *
     370             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     371             :  * contents, and return true.  Return false otherwise.
     372             :  *
     373             :  * Note that this stops on the first matching tuple.
     374             :  *
     375             :  * This can obviously be quite slow on tables that have more than few rows.
     376             :  */
     377             : bool
     378         292 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     379             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     380             : {
     381             :     TupleTableSlot *scanslot;
     382             :     TableScanDesc scan;
     383             :     SnapshotData snap;
     384             :     TypeCacheEntry **eq;
     385             :     TransactionId xwait;
     386             :     bool        found;
     387         292 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     388             : 
     389             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     390             : 
     391         292 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     392             : 
     393             :     /* Start a heap scan. */
     394         292 :     InitDirtySnapshot(snap);
     395         292 :     scan = table_beginscan(rel, &snap, 0, NULL);
     396         292 :     scanslot = table_slot_create(rel, NULL);
     397             : 
     398         292 : retry:
     399         292 :     found = false;
     400             : 
     401         292 :     table_rescan(scan, NULL);
     402             : 
     403             :     /* Try to find the tuple */
     404      210534 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     405             :     {
     406      210534 :         if (!tuples_equal(scanslot, searchslot, eq))
     407      210242 :             continue;
     408             : 
     409         292 :         found = true;
     410         292 :         ExecCopySlot(outslot, scanslot);
     411             : 
     412         584 :         xwait = TransactionIdIsValid(snap.xmin) ?
     413         292 :             snap.xmin : snap.xmax;
     414             : 
     415             :         /*
     416             :          * If the tuple is locked, wait for locking transaction to finish and
     417             :          * retry.
     418             :          */
     419         292 :         if (TransactionIdIsValid(xwait))
     420             :         {
     421           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     422           0 :             goto retry;
     423             :         }
     424             : 
     425             :         /* Found our tuple and it's not locked */
     426         292 :         break;
     427             :     }
     428             : 
     429             :     /* Found tuple, try to lock it in the lockmode. */
     430         292 :     if (found)
     431             :     {
     432             :         TM_FailureData tmfd;
     433             :         TM_Result   res;
     434             : 
     435         292 :         PushActiveSnapshot(GetLatestSnapshot());
     436             : 
     437         292 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     438             :                                outslot,
     439             :                                GetCurrentCommandId(false),
     440             :                                lockmode,
     441             :                                LockWaitBlock,
     442             :                                0 /* don't follow updates */ ,
     443             :                                &tmfd);
     444             : 
     445         292 :         PopActiveSnapshot();
     446             : 
     447         292 :         switch (res)
     448             :         {
     449         292 :             case TM_Ok:
     450         292 :                 break;
     451           0 :             case TM_Updated:
     452             :                 /* XXX: Improve handling here */
     453           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     454           0 :                     ereport(LOG,
     455             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     456             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     457             :                 else
     458           0 :                     ereport(LOG,
     459             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     460             :                              errmsg("concurrent update, retrying")));
     461           0 :                 goto retry;
     462           0 :             case TM_Deleted:
     463             :                 /* XXX: Improve handling here */
     464           0 :                 ereport(LOG,
     465             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     466             :                          errmsg("concurrent delete, retrying")));
     467           0 :                 goto retry;
     468           0 :             case TM_Invisible:
     469           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     470             :                 break;
     471           0 :             default:
     472           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     473             :                 break;
     474             :         }
     475             :     }
     476             : 
     477         292 :     table_endscan(scan);
     478         292 :     ExecDropSingleTupleTableSlot(scanslot);
     479             : 
     480         292 :     return found;
     481             : }
     482             : 
     483             : /*
     484             :  * Insert tuple represented in the slot to the relation, update the indexes,
     485             :  * and execute any constraints and per-row triggers.
     486             :  *
     487             :  * Caller is responsible for opening the indexes.
     488             :  */
     489             : void
     490      152460 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     491             :                          EState *estate, TupleTableSlot *slot)
     492             : {
     493      152460 :     bool        skip_tuple = false;
     494      152460 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     495             : 
     496             :     /* For now we support only tables. */
     497             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     498             : 
     499      152460 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     500             : 
     501             :     /* BEFORE ROW INSERT Triggers */
     502      152460 :     if (resultRelInfo->ri_TrigDesc &&
     503          38 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     504             :     {
     505           6 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     506           2 :             skip_tuple = true;  /* "do nothing" */
     507             :     }
     508             : 
     509      152460 :     if (!skip_tuple)
     510             :     {
     511      152458 :         List       *recheckIndexes = NIL;
     512             : 
     513             :         /* Compute stored generated columns */
     514      152458 :         if (rel->rd_att->constr &&
     515       90884 :             rel->rd_att->constr->has_generated_stored)
     516           8 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     517             :                                        CMD_INSERT);
     518             : 
     519             :         /* Check the constraints of the tuple */
     520      152458 :         if (rel->rd_att->constr)
     521       90884 :             ExecConstraints(resultRelInfo, slot, estate);
     522      152458 :         if (rel->rd_rel->relispartition)
     523         120 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     524             : 
     525             :         /* OK, store the tuple and create index entries for it */
     526      152458 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     527             : 
     528      152458 :         if (resultRelInfo->ri_NumIndices > 0)
     529      111820 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     530             :                                                    slot, estate, false, false,
     531             :                                                    NULL, NIL, false);
     532             : 
     533             :         /* AFTER ROW INSERT Triggers */
     534      152442 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     535             :                              recheckIndexes, NULL);
     536             : 
     537             :         /*
     538             :          * XXX we should in theory pass a TransitionCaptureState object to the
     539             :          * above to capture transition tuples, but after statement triggers
     540             :          * don't actually get fired by replication yet anyway
     541             :          */
     542             : 
     543      152442 :         list_free(recheckIndexes);
     544             :     }
     545      152444 : }
     546             : 
     547             : /*
     548             :  * Find the searchslot tuple and update it with data in the slot,
     549             :  * update the indexes, and execute any constraints and per-row triggers.
     550             :  *
     551             :  * Caller is responsible for opening the indexes.
     552             :  */
     553             : void
     554       63826 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     555             :                          EState *estate, EPQState *epqstate,
     556             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     557             : {
     558       63826 :     bool        skip_tuple = false;
     559       63826 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     560       63826 :     ItemPointer tid = &(searchslot->tts_tid);
     561             : 
     562             :     /* For now we support only tables. */
     563             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     564             : 
     565       63826 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     566             : 
     567             :     /* BEFORE ROW UPDATE Triggers */
     568       63826 :     if (resultRelInfo->ri_TrigDesc &&
     569          20 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     570             :     {
     571           6 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     572             :                                   tid, NULL, slot, NULL, NULL))
     573           4 :             skip_tuple = true;  /* "do nothing" */
     574             :     }
     575             : 
     576       63826 :     if (!skip_tuple)
     577             :     {
     578       63822 :         List       *recheckIndexes = NIL;
     579             :         TU_UpdateIndexes update_indexes;
     580       63822 :         TupleTableSlot *oldSlot = NULL;
     581             : 
     582             :         /* Compute stored generated columns */
     583       63822 :         if (rel->rd_att->constr &&
     584       63736 :             rel->rd_att->constr->has_generated_stored)
     585           6 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     586             :                                        CMD_UPDATE);
     587             : 
     588             :         /* Check the constraints of the tuple */
     589       63822 :         if (rel->rd_att->constr)
     590       63736 :             ExecConstraints(resultRelInfo, slot, estate);
     591       63822 :         if (rel->rd_rel->relispartition)
     592          22 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     593             : 
     594       63822 :         if (resultRelInfo->ri_TrigDesc &&
     595          16 :             resultRelInfo->ri_TrigDesc->trig_update_after_row)
     596          14 :             oldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
     597             : 
     598       63822 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     599             :                                   &update_indexes, oldSlot);
     600             : 
     601       63822 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     602       40204 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     603             :                                                    slot, estate, true, false,
     604             :                                                    NULL, NIL,
     605             :                                                    (update_indexes == TU_Summarizing));
     606             : 
     607             :         /* AFTER ROW UPDATE Triggers */
     608       63822 :         ExecARUpdateTriggers(estate, resultRelInfo,
     609             :                              NULL, NULL,
     610             :                              NULL, oldSlot, slot,
     611             :                              recheckIndexes, NULL, false);
     612             : 
     613       63822 :         list_free(recheckIndexes);
     614             :     }
     615       63826 : }
     616             : 
     617             : /*
     618             :  * Find the searchslot tuple and delete it, and execute any constraints
     619             :  * and per-row triggers.
     620             :  *
     621             :  * Caller is responsible for opening the indexes.
     622             :  */
     623             : void
     624       80602 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     625             :                          EState *estate, EPQState *epqstate,
     626             :                          TupleTableSlot *searchslot)
     627             : {
     628       80602 :     bool        skip_tuple = false;
     629       80602 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     630       80602 :     ItemPointer tid = &searchslot->tts_tid;
     631             : 
     632       80602 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     633             : 
     634             :     /* BEFORE ROW DELETE Triggers */
     635       80602 :     if (resultRelInfo->ri_TrigDesc &&
     636          20 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     637             :     {
     638           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     639           0 :                                            tid, NULL, NULL, NULL, NULL);
     640             :     }
     641             : 
     642       80602 :     if (!skip_tuple)
     643             :     {
     644       80602 :         TupleTableSlot *oldSlot = NULL;
     645             : 
     646       80602 :         if (resultRelInfo->ri_TrigDesc &&
     647          20 :             resultRelInfo->ri_TrigDesc->trig_delete_after_row)
     648           0 :             oldSlot = ExecGetTriggerOldSlot(estate, resultRelInfo);
     649             : 
     650             :         /* OK, delete the tuple */
     651       80602 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot, oldSlot);
     652             : 
     653             :         /* AFTER ROW DELETE Triggers */
     654       80602 :         ExecARDeleteTriggers(estate, resultRelInfo,
     655             :                              NULL, oldSlot, NULL, false);
     656             :     }
     657       80602 : }
     658             : 
     659             : /*
     660             :  * Check if command can be executed with current replica identity.
     661             :  */
     662             : void
     663      424458 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     664             : {
     665             :     PublicationDesc pubdesc;
     666             : 
     667             :     /*
     668             :      * Skip checking the replica identity for partitioned tables, because the
     669             :      * operations are actually performed on the leaf partitions.
     670             :      */
     671      424458 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     672      402856 :         return;
     673             : 
     674             :     /* We only need to do checks for UPDATE and DELETE. */
     675      418642 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     676      246956 :         return;
     677             : 
     678             :     /*
     679             :      * It is only safe to execute UPDATE/DELETE when all columns, referenced
     680             :      * in the row filters from publications which the relation is in, are
     681             :      * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
     682             :      * or the table does not publish UPDATEs or DELETEs.
     683             :      *
     684             :      * XXX We could optimize it by first checking whether any of the
     685             :      * publications have a row filter for this relation. If not and relation
     686             :      * has replica identity then we can avoid building the descriptor but as
     687             :      * this happens only one time it doesn't seem worth the additional
     688             :      * complexity.
     689             :      */
     690      171686 :     RelationBuildPublicationDesc(rel, &pubdesc);
     691      171686 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
     692          60 :         ereport(ERROR,
     693             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     694             :                  errmsg("cannot update table \"%s\"",
     695             :                         RelationGetRelationName(rel)),
     696             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     697      171626 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
     698         108 :         ereport(ERROR,
     699             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     700             :                  errmsg("cannot update table \"%s\"",
     701             :                         RelationGetRelationName(rel)),
     702             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     703      171518 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
     704           0 :         ereport(ERROR,
     705             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     706             :                  errmsg("cannot delete from table \"%s\"",
     707             :                         RelationGetRelationName(rel)),
     708             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     709      171518 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
     710           0 :         ereport(ERROR,
     711             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     712             :                  errmsg("cannot delete from table \"%s\"",
     713             :                         RelationGetRelationName(rel)),
     714             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     715             : 
     716             :     /* If relation has replica identity we are always good. */
     717      171518 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
     718      149672 :         return;
     719             : 
     720             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
     721       21846 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     722         412 :         return;
     723             : 
     724             :     /*
     725             :      * This is UPDATE/DELETE and there is no replica identity.
     726             :      *
     727             :      * Check if the table publishes UPDATES or DELETES.
     728             :      */
     729       21434 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
     730          96 :         ereport(ERROR,
     731             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     732             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     733             :                         RelationGetRelationName(rel)),
     734             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     735       21338 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
     736           0 :         ereport(ERROR,
     737             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     738             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     739             :                         RelationGetRelationName(rel)),
     740             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     741             : }
     742             : 
     743             : 
     744             : /*
     745             :  * Check if we support writing into specific relkind.
     746             :  *
     747             :  * The nspname and relname are only needed for error reporting.
     748             :  */
     749             : void
     750        1586 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     751             :                          const char *relname)
     752             : {
     753        1586 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
     754           0 :         ereport(ERROR,
     755             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     756             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     757             :                         nspname, relname),
     758             :                  errdetail_relkind_not_supported(relkind)));
     759        1586 : }

Generated by: LCOV version 1.14