LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16beta1 Lines: 182 220 82.7 %
Date: 2023-06-06 09:15:10 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/relscan.h"
      19             : #include "access/tableam.h"
      20             : #include "access/transam.h"
      21             : #include "access/xact.h"
      22             : #include "commands/trigger.h"
      23             : #include "executor/executor.h"
      24             : #include "executor/nodeModifyTable.h"
      25             : #include "nodes/nodeFuncs.h"
      26             : #include "parser/parse_relation.h"
      27             : #include "parser/parsetree.h"
      28             : #include "replication/logicalrelation.h"
      29             : #include "storage/bufmgr.h"
      30             : #include "storage/lmgr.h"
      31             : #include "utils/builtins.h"
      32             : #include "utils/datum.h"
      33             : #include "utils/lsyscache.h"
      34             : #include "utils/memutils.h"
      35             : #include "utils/rel.h"
      36             : #include "utils/snapmgr.h"
      37             : #include "utils/syscache.h"
      38             : #include "utils/typcache.h"
      39             : 
      40             : 
      41             : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      42             :                          TypeCacheEntry **eq);
      43             : 
      44             : /*
      45             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      46             :  * is setup to match 'rel' (*NOT* idxrel!).
      47             :  *
      48             :  * Returns how many columns to use for the index scan.
      49             :  *
      50             :  * This is not generic routine, it expects the idxrel to be a btree, non-partial
      51             :  * and have at least one column reference (i.e. cannot consist of only
      52             :  * expressions).
      53             :  *
      54             :  * By definition, replication identity of a rel meets all limitations associated
      55             :  * with that. Note that any other index could also meet these limitations.
      56             :  */
      57             : static int
      58      144146 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      59             :                          TupleTableSlot *searchslot)
      60             : {
      61             :     int         index_attoff;
      62      144146 :     int         skey_attoff = 0;
      63             :     Datum       indclassDatum;
      64             :     oidvector  *opclass;
      65      144146 :     int2vector *indkey = &idxrel->rd_index->indkey;
      66             : 
      67      144146 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
      68             :                                            Anum_pg_index_indclass);
      69      144146 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      70             : 
      71             :     /* Build scankey for every non-expression attribute in the index. */
      72      288318 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
      73      144172 :          index_attoff++)
      74             :     {
      75             :         Oid         operator;
      76             :         Oid         optype;
      77             :         Oid         opfamily;
      78             :         RegProcedure regop;
      79      144172 :         int         table_attno = indkey->values[index_attoff];
      80             : 
      81      144172 :         if (!AttributeNumberIsValid(table_attno))
      82             :         {
      83             :             /*
      84             :              * XXX: Currently, we don't support expressions in the scan key,
      85             :              * see code below.
      86             :              */
      87           4 :             continue;
      88             :         }
      89             : 
      90             :         /*
      91             :          * Load the operator info.  We need this to get the equality operator
      92             :          * function for the scan key.
      93             :          */
      94      144168 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
      95      144168 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
      96             : 
      97      144168 :         operator = get_opfamily_member(opfamily, optype,
      98             :                                        optype,
      99             :                                        BTEqualStrategyNumber);
     100      144168 :         if (!OidIsValid(operator))
     101           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     102             :                  BTEqualStrategyNumber, optype, optype, opfamily);
     103             : 
     104      144168 :         regop = get_opcode(operator);
     105             : 
     106             :         /* Initialize the scankey. */
     107      144168 :         ScanKeyInit(&skey[skey_attoff],
     108      144168 :                     index_attoff + 1,
     109             :                     BTEqualStrategyNumber,
     110             :                     regop,
     111      144168 :                     searchslot->tts_values[table_attno - 1]);
     112             : 
     113      144168 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     114             : 
     115             :         /* Check for null value. */
     116      144168 :         if (searchslot->tts_isnull[table_attno - 1])
     117           2 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     118             : 
     119      144168 :         skey_attoff++;
     120             :     }
     121             : 
     122             :     /* There must always be at least one attribute for the index scan. */
     123             :     Assert(skey_attoff > 0);
     124             : 
     125      144146 :     return skey_attoff;
     126             : }
     127             : 
     128             : /*
     129             :  * Search the relation 'rel' for tuple using the index.
     130             :  *
     131             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     132             :  * contents, and return true.  Return false otherwise.
     133             :  */
     134             : bool
     135      144146 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     136             :                              LockTupleMode lockmode,
     137             :                              TupleTableSlot *searchslot,
     138             :                              TupleTableSlot *outslot)
     139             : {
     140             :     ScanKeyData skey[INDEX_MAX_KEYS];
     141             :     int         skey_attoff;
     142             :     IndexScanDesc scan;
     143             :     SnapshotData snap;
     144             :     TransactionId xwait;
     145             :     Relation    idxrel;
     146             :     bool        found;
     147      144146 :     TypeCacheEntry **eq = NULL;
     148             :     bool        isIdxSafeToSkipDuplicates;
     149             : 
     150             :     /* Open the index. */
     151      144146 :     idxrel = index_open(idxoid, RowExclusiveLock);
     152             : 
     153      144146 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     154             : 
     155      144146 :     InitDirtySnapshot(snap);
     156             : 
     157             :     /* Build scan key. */
     158      144146 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     159             : 
     160             :     /* Start an index scan. */
     161      144146 :     scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
     162             : 
     163      144146 : retry:
     164      144146 :     found = false;
     165             : 
     166      144146 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     167             : 
     168             :     /* Try to find the tuple */
     169      144146 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     170             :     {
     171             :         /*
     172             :          * Avoid expensive equality check if the index is primary key or
     173             :          * replica identity index.
     174             :          */
     175      144130 :         if (!isIdxSafeToSkipDuplicates)
     176             :         {
     177          22 :             if (eq == NULL)
     178             :             {
     179             : #ifdef USE_ASSERT_CHECKING
     180             :                 /* apply assertions only once for the input idxoid */
     181             :                 IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
     182             : 
     183             :                 Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
     184             : #endif
     185             : 
     186          22 :                 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     187             :             }
     188             : 
     189          22 :             if (!tuples_equal(outslot, searchslot, eq))
     190           0 :                 continue;
     191             :         }
     192             : 
     193      144130 :         ExecMaterializeSlot(outslot);
     194             : 
     195      288260 :         xwait = TransactionIdIsValid(snap.xmin) ?
     196      144130 :             snap.xmin : snap.xmax;
     197             : 
     198             :         /*
     199             :          * If the tuple is locked, wait for locking transaction to finish and
     200             :          * retry.
     201             :          */
     202      144130 :         if (TransactionIdIsValid(xwait))
     203             :         {
     204           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     205           0 :             goto retry;
     206             :         }
     207             : 
     208             :         /* Found our tuple and it's not locked */
     209      144130 :         found = true;
     210      144130 :         break;
     211             :     }
     212             : 
     213             :     /* Found tuple, try to lock it in the lockmode. */
     214      144146 :     if (found)
     215             :     {
     216             :         TM_FailureData tmfd;
     217             :         TM_Result   res;
     218             : 
     219      144130 :         PushActiveSnapshot(GetLatestSnapshot());
     220             : 
     221      144130 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     222             :                                outslot,
     223             :                                GetCurrentCommandId(false),
     224             :                                lockmode,
     225             :                                LockWaitBlock,
     226             :                                0 /* don't follow updates */ ,
     227             :                                &tmfd);
     228             : 
     229      144130 :         PopActiveSnapshot();
     230             : 
     231      144130 :         switch (res)
     232             :         {
     233      144130 :             case TM_Ok:
     234      144130 :                 break;
     235           0 :             case TM_Updated:
     236             :                 /* XXX: Improve handling here */
     237           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     238           0 :                     ereport(LOG,
     239             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     240             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     241             :                 else
     242           0 :                     ereport(LOG,
     243             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     244             :                              errmsg("concurrent update, retrying")));
     245           0 :                 goto retry;
     246           0 :             case TM_Deleted:
     247             :                 /* XXX: Improve handling here */
     248           0 :                 ereport(LOG,
     249             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     250             :                          errmsg("concurrent delete, retrying")));
     251           0 :                 goto retry;
     252           0 :             case TM_Invisible:
     253           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     254             :                 break;
     255           0 :             default:
     256           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     257             :                 break;
     258             :         }
     259             :     }
     260             : 
     261      144146 :     index_endscan(scan);
     262             : 
     263             :     /* Don't release lock until commit. */
     264      144146 :     index_close(idxrel, NoLock);
     265             : 
     266      144146 :     return found;
     267             : }
     268             : 
     269             : /*
     270             :  * Compare the tuples in the slots by checking if they have equal values.
     271             :  */
     272             : static bool
     273      210552 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     274             :              TypeCacheEntry **eq)
     275             : {
     276             :     int         attrnum;
     277             : 
     278             :     Assert(slot1->tts_tupleDescriptor->natts ==
     279             :            slot2->tts_tupleDescriptor->natts);
     280             : 
     281      210552 :     slot_getallattrs(slot1);
     282      210552 :     slot_getallattrs(slot2);
     283             : 
     284             :     /* Check equality of the attributes. */
     285      210910 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     286             :     {
     287             :         Form_pg_attribute att;
     288             :         TypeCacheEntry *typentry;
     289             : 
     290      210600 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     291             : 
     292             :         /*
     293             :          * Ignore dropped and generated columns as the publisher doesn't send
     294             :          * those
     295             :          */
     296      210600 :         if (att->attisdropped || att->attgenerated)
     297           4 :             continue;
     298             : 
     299             :         /*
     300             :          * If one value is NULL and other is not, then they are certainly not
     301             :          * equal
     302             :          */
     303      210596 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     304           0 :             return false;
     305             : 
     306             :         /*
     307             :          * If both are NULL, they can be considered equal.
     308             :          */
     309      210596 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     310           2 :             continue;
     311             : 
     312      210594 :         typentry = eq[attrnum];
     313      210594 :         if (typentry == NULL)
     314             :         {
     315         352 :             typentry = lookup_type_cache(att->atttypid,
     316             :                                          TYPECACHE_EQ_OPR_FINFO);
     317         352 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     318           0 :                 ereport(ERROR,
     319             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     320             :                          errmsg("could not identify an equality operator for type %s",
     321             :                                 format_type_be(att->atttypid))));
     322         352 :             eq[attrnum] = typentry;
     323             :         }
     324             : 
     325      210594 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     326             :                                             att->attcollation,
     327      210594 :                                             slot1->tts_values[attrnum],
     328      210594 :                                             slot2->tts_values[attrnum])))
     329      210242 :             return false;
     330             :     }
     331             : 
     332         310 :     return true;
     333             : }
     334             : 
     335             : /*
     336             :  * Search the relation 'rel' for tuple using the sequential scan.
     337             :  *
     338             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     339             :  * contents, and return true.  Return false otherwise.
     340             :  *
     341             :  * Note that this stops on the first matching tuple.
     342             :  *
     343             :  * This can obviously be quite slow on tables that have more than few rows.
     344             :  */
     345             : bool
     346         288 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     347             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     348             : {
     349             :     TupleTableSlot *scanslot;
     350             :     TableScanDesc scan;
     351             :     SnapshotData snap;
     352             :     TypeCacheEntry **eq;
     353             :     TransactionId xwait;
     354             :     bool        found;
     355         288 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     356             : 
     357             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     358             : 
     359         288 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     360             : 
     361             :     /* Start a heap scan. */
     362         288 :     InitDirtySnapshot(snap);
     363         288 :     scan = table_beginscan(rel, &snap, 0, NULL);
     364         288 :     scanslot = table_slot_create(rel, NULL);
     365             : 
     366         288 : retry:
     367         288 :     found = false;
     368             : 
     369         288 :     table_rescan(scan, NULL);
     370             : 
     371             :     /* Try to find the tuple */
     372      210530 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     373             :     {
     374      210530 :         if (!tuples_equal(scanslot, searchslot, eq))
     375      210242 :             continue;
     376             : 
     377         288 :         found = true;
     378         288 :         ExecCopySlot(outslot, scanslot);
     379             : 
     380         576 :         xwait = TransactionIdIsValid(snap.xmin) ?
     381         288 :             snap.xmin : snap.xmax;
     382             : 
     383             :         /*
     384             :          * If the tuple is locked, wait for locking transaction to finish and
     385             :          * retry.
     386             :          */
     387         288 :         if (TransactionIdIsValid(xwait))
     388             :         {
     389           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     390           0 :             goto retry;
     391             :         }
     392             : 
     393             :         /* Found our tuple and it's not locked */
     394         288 :         break;
     395             :     }
     396             : 
     397             :     /* Found tuple, try to lock it in the lockmode. */
     398         288 :     if (found)
     399             :     {
     400             :         TM_FailureData tmfd;
     401             :         TM_Result   res;
     402             : 
     403         288 :         PushActiveSnapshot(GetLatestSnapshot());
     404             : 
     405         288 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     406             :                                outslot,
     407             :                                GetCurrentCommandId(false),
     408             :                                lockmode,
     409             :                                LockWaitBlock,
     410             :                                0 /* don't follow updates */ ,
     411             :                                &tmfd);
     412             : 
     413         288 :         PopActiveSnapshot();
     414             : 
     415         288 :         switch (res)
     416             :         {
     417         288 :             case TM_Ok:
     418         288 :                 break;
     419           0 :             case TM_Updated:
     420             :                 /* XXX: Improve handling here */
     421           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     422           0 :                     ereport(LOG,
     423             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     424             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     425             :                 else
     426           0 :                     ereport(LOG,
     427             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     428             :                              errmsg("concurrent update, retrying")));
     429           0 :                 goto retry;
     430           0 :             case TM_Deleted:
     431             :                 /* XXX: Improve handling here */
     432           0 :                 ereport(LOG,
     433             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     434             :                          errmsg("concurrent delete, retrying")));
     435           0 :                 goto retry;
     436           0 :             case TM_Invisible:
     437           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     438             :                 break;
     439           0 :             default:
     440           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     441             :                 break;
     442             :         }
     443             :     }
     444             : 
     445         288 :     table_endscan(scan);
     446         288 :     ExecDropSingleTupleTableSlot(scanslot);
     447             : 
     448         288 :     return found;
     449             : }
     450             : 
     451             : /*
     452             :  * Insert tuple represented in the slot to the relation, update the indexes,
     453             :  * and execute any constraints and per-row triggers.
     454             :  *
     455             :  * Caller is responsible for opening the indexes.
     456             :  */
     457             : void
     458      151202 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     459             :                          EState *estate, TupleTableSlot *slot)
     460             : {
     461      151202 :     bool        skip_tuple = false;
     462      151202 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     463             : 
     464             :     /* For now we support only tables. */
     465             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     466             : 
     467      151202 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     468             : 
     469             :     /* BEFORE ROW INSERT Triggers */
     470      151202 :     if (resultRelInfo->ri_TrigDesc &&
     471          38 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     472             :     {
     473           6 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     474           2 :             skip_tuple = true;  /* "do nothing" */
     475             :     }
     476             : 
     477      151202 :     if (!skip_tuple)
     478             :     {
     479      151200 :         List       *recheckIndexes = NIL;
     480             : 
     481             :         /* Compute stored generated columns */
     482      151200 :         if (rel->rd_att->constr &&
     483       90802 :             rel->rd_att->constr->has_generated_stored)
     484           8 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     485             :                                        CMD_INSERT);
     486             : 
     487             :         /* Check the constraints of the tuple */
     488      151200 :         if (rel->rd_att->constr)
     489       90802 :             ExecConstraints(resultRelInfo, slot, estate);
     490      151200 :         if (rel->rd_rel->relispartition)
     491         120 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     492             : 
     493             :         /* OK, store the tuple and create index entries for it */
     494      151200 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     495             : 
     496      151200 :         if (resultRelInfo->ri_NumIndices > 0)
     497      110692 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     498             :                                                    slot, estate, false, false,
     499             :                                                    NULL, NIL, false);
     500             : 
     501             :         /* AFTER ROW INSERT Triggers */
     502      151182 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     503             :                              recheckIndexes, NULL);
     504             : 
     505             :         /*
     506             :          * XXX we should in theory pass a TransitionCaptureState object to the
     507             :          * above to capture transition tuples, but after statement triggers
     508             :          * don't actually get fired by replication yet anyway
     509             :          */
     510             : 
     511      151182 :         list_free(recheckIndexes);
     512             :     }
     513      151184 : }
     514             : 
     515             : /*
     516             :  * Find the searchslot tuple and update it with data in the slot,
     517             :  * update the indexes, and execute any constraints and per-row triggers.
     518             :  *
     519             :  * Caller is responsible for opening the indexes.
     520             :  */
     521             : void
     522       63818 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     523             :                          EState *estate, EPQState *epqstate,
     524             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     525             : {
     526       63818 :     bool        skip_tuple = false;
     527       63818 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     528       63818 :     ItemPointer tid = &(searchslot->tts_tid);
     529             : 
     530             :     /* For now we support only tables. */
     531             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     532             : 
     533       63818 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     534             : 
     535             :     /* BEFORE ROW UPDATE Triggers */
     536       63818 :     if (resultRelInfo->ri_TrigDesc &&
     537          20 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     538             :     {
     539           6 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     540             :                                   tid, NULL, slot, NULL, NULL))
     541           4 :             skip_tuple = true;  /* "do nothing" */
     542             :     }
     543             : 
     544       63818 :     if (!skip_tuple)
     545             :     {
     546       63814 :         List       *recheckIndexes = NIL;
     547             :         TU_UpdateIndexes update_indexes;
     548             : 
     549             :         /* Compute stored generated columns */
     550       63814 :         if (rel->rd_att->constr &&
     551       63736 :             rel->rd_att->constr->has_generated_stored)
     552           6 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     553             :                                        CMD_UPDATE);
     554             : 
     555             :         /* Check the constraints of the tuple */
     556       63814 :         if (rel->rd_att->constr)
     557       63736 :             ExecConstraints(resultRelInfo, slot, estate);
     558       63814 :         if (rel->rd_rel->relispartition)
     559          22 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     560             : 
     561       63814 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     562             :                                   &update_indexes);
     563             : 
     564       63814 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     565       40442 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     566             :                                                    slot, estate, true, false,
     567             :                                                    NULL, NIL,
     568             :                                                    (update_indexes == TU_Summarizing));
     569             : 
     570             :         /* AFTER ROW UPDATE Triggers */
     571       63814 :         ExecARUpdateTriggers(estate, resultRelInfo,
     572             :                              NULL, NULL,
     573             :                              tid, NULL, slot,
     574             :                              recheckIndexes, NULL, false);
     575             : 
     576       63814 :         list_free(recheckIndexes);
     577             :     }
     578       63818 : }
     579             : 
     580             : /*
     581             :  * Find the searchslot tuple and delete it, and execute any constraints
     582             :  * and per-row triggers.
     583             :  *
     584             :  * Caller is responsible for opening the indexes.
     585             :  */
     586             : void
     587       80598 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     588             :                          EState *estate, EPQState *epqstate,
     589             :                          TupleTableSlot *searchslot)
     590             : {
     591       80598 :     bool        skip_tuple = false;
     592       80598 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     593       80598 :     ItemPointer tid = &searchslot->tts_tid;
     594             : 
     595       80598 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     596             : 
     597             :     /* BEFORE ROW DELETE Triggers */
     598       80598 :     if (resultRelInfo->ri_TrigDesc &&
     599          20 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     600             :     {
     601           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     602           0 :                                            tid, NULL, NULL, NULL, NULL);
     603             :     }
     604             : 
     605       80598 :     if (!skip_tuple)
     606             :     {
     607             :         /* OK, delete the tuple */
     608       80598 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     609             : 
     610             :         /* AFTER ROW DELETE Triggers */
     611       80598 :         ExecARDeleteTriggers(estate, resultRelInfo,
     612             :                              tid, NULL, NULL, false);
     613             :     }
     614       80598 : }
     615             : 
     616             : /*
     617             :  * Check if command can be executed with current replica identity.
     618             :  */
     619             : void
     620      441146 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     621             : {
     622             :     PublicationDesc pubdesc;
     623             : 
     624             :     /*
     625             :      * Skip checking the replica identity for partitioned tables, because the
     626             :      * operations are actually performed on the leaf partitions.
     627             :      */
     628      441146 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     629      416830 :         return;
     630             : 
     631             :     /* We only need to do checks for UPDATE and DELETE. */
     632      435582 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     633      261738 :         return;
     634             : 
     635             :     /*
     636             :      * It is only safe to execute UPDATE/DELETE when all columns, referenced
     637             :      * in the row filters from publications which the relation is in, are
     638             :      * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
     639             :      * or the table does not publish UPDATEs or DELETEs.
     640             :      *
     641             :      * XXX We could optimize it by first checking whether any of the
     642             :      * publications have a row filter for this relation. If not and relation
     643             :      * has replica identity then we can avoid building the descriptor but as
     644             :      * this happens only one time it doesn't seem worth the additional
     645             :      * complexity.
     646             :      */
     647      173844 :     RelationBuildPublicationDesc(rel, &pubdesc);
     648      173844 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
     649          60 :         ereport(ERROR,
     650             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     651             :                  errmsg("cannot update table \"%s\"",
     652             :                         RelationGetRelationName(rel)),
     653             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     654      173784 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
     655         108 :         ereport(ERROR,
     656             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     657             :                  errmsg("cannot update table \"%s\"",
     658             :                         RelationGetRelationName(rel)),
     659             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     660      173676 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
     661           0 :         ereport(ERROR,
     662             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     663             :                  errmsg("cannot delete from table \"%s\"",
     664             :                         RelationGetRelationName(rel)),
     665             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     666      173676 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
     667           0 :         ereport(ERROR,
     668             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     669             :                  errmsg("cannot delete from table \"%s\"",
     670             :                         RelationGetRelationName(rel)),
     671             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     672             : 
     673             :     /* If relation has replica identity we are always good. */
     674      173676 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
     675      149128 :         return;
     676             : 
     677             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
     678       24548 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     679         400 :         return;
     680             : 
     681             :     /*
     682             :      * This is UPDATE/DELETE and there is no replica identity.
     683             :      *
     684             :      * Check if the table publishes UPDATES or DELETES.
     685             :      */
     686       24148 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
     687          90 :         ereport(ERROR,
     688             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     689             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     690             :                         RelationGetRelationName(rel)),
     691             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     692       24058 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
     693           0 :         ereport(ERROR,
     694             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     695             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     696             :                         RelationGetRelationName(rel)),
     697             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     698             : }
     699             : 
     700             : 
     701             : /*
     702             :  * Check if we support writing into specific relkind.
     703             :  *
     704             :  * The nspname and relname are only needed for error reporting.
     705             :  */
     706             : void
     707        1492 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     708             :                          const char *relname)
     709             : {
     710        1492 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
     711           0 :         ereport(ERROR,
     712             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     713             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     714             :                         nspname, relname),
     715             :                  errdetail_relkind_not_supported(relkind)));
     716        1492 : }

Generated by: LCOV version 1.14