LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 235 267 88.0 %
Date: 2025-04-22 08:15:34 Functions: 13 13 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/gist.h"
      19             : #include "access/relscan.h"
      20             : #include "access/tableam.h"
      21             : #include "access/transam.h"
      22             : #include "access/xact.h"
      23             : #include "catalog/pg_am_d.h"
      24             : #include "commands/trigger.h"
      25             : #include "executor/executor.h"
      26             : #include "executor/nodeModifyTable.h"
      27             : #include "replication/conflict.h"
      28             : #include "replication/logicalrelation.h"
      29             : #include "storage/lmgr.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/lsyscache.h"
      32             : #include "utils/rel.h"
      33             : #include "utils/snapmgr.h"
      34             : #include "utils/syscache.h"
      35             : #include "utils/typcache.h"
      36             : 
      37             : 
      38             : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      39             :                          TypeCacheEntry **eq);
      40             : 
      41             : /*
      42             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      43             :  * is setup to match 'rel' (*NOT* idxrel!).
      44             :  *
      45             :  * Returns how many columns to use for the index scan.
      46             :  *
      47             :  * This is not generic routine, idxrel must be PK, RI, or an index that can be
      48             :  * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      49             :  * for details.
      50             :  *
      51             :  * By definition, replication identity of a rel meets all limitations associated
      52             :  * with that. Note that any other index could also meet these limitations.
      53             :  */
      54             : static int
      55      144198 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      56             :                          TupleTableSlot *searchslot)
      57             : {
      58             :     int         index_attoff;
      59      144198 :     int         skey_attoff = 0;
      60             :     Datum       indclassDatum;
      61             :     oidvector  *opclass;
      62      144198 :     int2vector *indkey = &idxrel->rd_index->indkey;
      63             : 
      64      144198 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
      65             :                                            Anum_pg_index_indclass);
      66      144198 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      67             : 
      68             :     /* Build scankey for every non-expression attribute in the index. */
      69      288442 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
      70      144244 :          index_attoff++)
      71             :     {
      72             :         Oid         operator;
      73             :         Oid         optype;
      74             :         Oid         opfamily;
      75             :         RegProcedure regop;
      76      144244 :         int         table_attno = indkey->values[index_attoff];
      77             :         StrategyNumber eq_strategy;
      78             : 
      79      144244 :         if (!AttributeNumberIsValid(table_attno))
      80             :         {
      81             :             /*
      82             :              * XXX: Currently, we don't support expressions in the scan key,
      83             :              * see code below.
      84             :              */
      85           4 :             continue;
      86             :         }
      87             : 
      88             :         /*
      89             :          * Load the operator info.  We need this to get the equality operator
      90             :          * function for the scan key.
      91             :          */
      92      144240 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
      93      144240 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
      94      144240 :         eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
      95      144240 :         operator = get_opfamily_member(opfamily, optype,
      96             :                                        optype,
      97             :                                        eq_strategy);
      98             : 
      99      144240 :         if (!OidIsValid(operator))
     100           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     101             :                  eq_strategy, optype, optype, opfamily);
     102             : 
     103      144240 :         regop = get_opcode(operator);
     104             : 
     105             :         /* Initialize the scankey. */
     106      144240 :         ScanKeyInit(&skey[skey_attoff],
     107      144240 :                     index_attoff + 1,
     108             :                     eq_strategy,
     109             :                     regop,
     110      144240 :                     searchslot->tts_values[table_attno - 1]);
     111             : 
     112      144240 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     113             : 
     114             :         /* Check for null value. */
     115      144240 :         if (searchslot->tts_isnull[table_attno - 1])
     116           2 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     117             : 
     118      144240 :         skey_attoff++;
     119             :     }
     120             : 
     121             :     /* There must always be at least one attribute for the index scan. */
     122             :     Assert(skey_attoff > 0);
     123             : 
     124      144198 :     return skey_attoff;
     125             : }
     126             : 
     127             : 
     128             : /*
     129             :  * Helper function to check if it is necessary to re-fetch and lock the tuple
     130             :  * due to concurrent modifications. This function should be called after
     131             :  * invoking table_tuple_lock.
     132             :  */
     133             : static bool
     134      144496 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
     135             : {
     136      144496 :     bool        refetch = false;
     137             : 
     138      144496 :     switch (res)
     139             :     {
     140      144496 :         case TM_Ok:
     141      144496 :             break;
     142           0 :         case TM_Updated:
     143             :             /* XXX: Improve handling here */
     144           0 :             if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
     145           0 :                 ereport(LOG,
     146             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     147             :                          errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     148             :             else
     149           0 :                 ereport(LOG,
     150             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     151             :                          errmsg("concurrent update, retrying")));
     152           0 :             refetch = true;
     153           0 :             break;
     154           0 :         case TM_Deleted:
     155             :             /* XXX: Improve handling here */
     156           0 :             ereport(LOG,
     157             :                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     158             :                      errmsg("concurrent delete, retrying")));
     159           0 :             refetch = true;
     160           0 :             break;
     161           0 :         case TM_Invisible:
     162           0 :             elog(ERROR, "attempted to lock invisible tuple");
     163             :             break;
     164           0 :         default:
     165           0 :             elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     166             :             break;
     167             :     }
     168             : 
     169      144496 :     return refetch;
     170             : }
     171             : 
     172             : /*
     173             :  * Search the relation 'rel' for tuple using the index.
     174             :  *
     175             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     176             :  * contents, and return true.  Return false otherwise.
     177             :  */
     178             : bool
     179      144198 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     180             :                              LockTupleMode lockmode,
     181             :                              TupleTableSlot *searchslot,
     182             :                              TupleTableSlot *outslot)
     183             : {
     184             :     ScanKeyData skey[INDEX_MAX_KEYS];
     185             :     int         skey_attoff;
     186             :     IndexScanDesc scan;
     187             :     SnapshotData snap;
     188             :     TransactionId xwait;
     189             :     Relation    idxrel;
     190             :     bool        found;
     191      144198 :     TypeCacheEntry **eq = NULL;
     192             :     bool        isIdxSafeToSkipDuplicates;
     193             : 
     194             :     /* Open the index. */
     195      144198 :     idxrel = index_open(idxoid, RowExclusiveLock);
     196             : 
     197      144198 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     198             : 
     199      144198 :     InitDirtySnapshot(snap);
     200             : 
     201             :     /* Build scan key. */
     202      144198 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     203             : 
     204             :     /* Start an index scan. */
     205      144198 :     scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
     206             : 
     207      144198 : retry:
     208      144198 :     found = false;
     209             : 
     210      144198 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     211             : 
     212             :     /* Try to find the tuple */
     213      144198 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     214             :     {
     215             :         /*
     216             :          * Avoid expensive equality check if the index is primary key or
     217             :          * replica identity index.
     218             :          */
     219      144172 :         if (!isIdxSafeToSkipDuplicates)
     220             :         {
     221          34 :             if (eq == NULL)
     222          34 :                 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     223             : 
     224          34 :             if (!tuples_equal(outslot, searchslot, eq))
     225           0 :                 continue;
     226             :         }
     227             : 
     228      144172 :         ExecMaterializeSlot(outslot);
     229             : 
     230      288344 :         xwait = TransactionIdIsValid(snap.xmin) ?
     231      144172 :             snap.xmin : snap.xmax;
     232             : 
     233             :         /*
     234             :          * If the tuple is locked, wait for locking transaction to finish and
     235             :          * retry.
     236             :          */
     237      144172 :         if (TransactionIdIsValid(xwait))
     238             :         {
     239           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     240           0 :             goto retry;
     241             :         }
     242             : 
     243             :         /* Found our tuple and it's not locked */
     244      144172 :         found = true;
     245      144172 :         break;
     246             :     }
     247             : 
     248             :     /* Found tuple, try to lock it in the lockmode. */
     249      144198 :     if (found)
     250             :     {
     251             :         TM_FailureData tmfd;
     252             :         TM_Result   res;
     253             : 
     254      144172 :         PushActiveSnapshot(GetLatestSnapshot());
     255             : 
     256      144172 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     257             :                                outslot,
     258             :                                GetCurrentCommandId(false),
     259             :                                lockmode,
     260             :                                LockWaitBlock,
     261             :                                0 /* don't follow updates */ ,
     262             :                                &tmfd);
     263             : 
     264      144172 :         PopActiveSnapshot();
     265             : 
     266      144172 :         if (should_refetch_tuple(res, &tmfd))
     267           0 :             goto retry;
     268             :     }
     269             : 
     270      144198 :     index_endscan(scan);
     271             : 
     272             :     /* Don't release lock until commit. */
     273      144198 :     index_close(idxrel, NoLock);
     274             : 
     275      144198 :     return found;
     276             : }
     277             : 
     278             : /*
     279             :  * Compare the tuples in the slots by checking if they have equal values.
     280             :  */
     281             : static bool
     282      210646 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     283             :              TypeCacheEntry **eq)
     284             : {
     285             :     int         attrnum;
     286             : 
     287             :     Assert(slot1->tts_tupleDescriptor->natts ==
     288             :            slot2->tts_tupleDescriptor->natts);
     289             : 
     290      210646 :     slot_getallattrs(slot1);
     291      210646 :     slot_getallattrs(slot2);
     292             : 
     293             :     /* Check equality of the attributes. */
     294      211046 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     295             :     {
     296             :         Form_pg_attribute att;
     297             :         TypeCacheEntry *typentry;
     298             : 
     299      210718 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     300             : 
     301             :         /*
     302             :          * Ignore dropped and generated columns as the publisher doesn't send
     303             :          * those
     304             :          */
     305      210718 :         if (att->attisdropped || att->attgenerated)
     306           2 :             continue;
     307             : 
     308             :         /*
     309             :          * If one value is NULL and other is not, then they are certainly not
     310             :          * equal
     311             :          */
     312      210716 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     313           0 :             return false;
     314             : 
     315             :         /*
     316             :          * If both are NULL, they can be considered equal.
     317             :          */
     318      210716 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     319           2 :             continue;
     320             : 
     321      210714 :         typentry = eq[attrnum];
     322      210714 :         if (typentry == NULL)
     323             :         {
     324         400 :             typentry = lookup_type_cache(att->atttypid,
     325             :                                          TYPECACHE_EQ_OPR_FINFO);
     326         400 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     327           0 :                 ereport(ERROR,
     328             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     329             :                          errmsg("could not identify an equality operator for type %s",
     330             :                                 format_type_be(att->atttypid))));
     331         400 :             eq[attrnum] = typentry;
     332             :         }
     333             : 
     334      210714 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     335             :                                             att->attcollation,
     336      210714 :                                             slot1->tts_values[attrnum],
     337      210714 :                                             slot2->tts_values[attrnum])))
     338      210318 :             return false;
     339             :     }
     340             : 
     341         328 :     return true;
     342             : }
     343             : 
     344             : /*
     345             :  * Search the relation 'rel' for tuple using the sequential scan.
     346             :  *
     347             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     348             :  * contents, and return true.  Return false otherwise.
     349             :  *
     350             :  * Note that this stops on the first matching tuple.
     351             :  *
     352             :  * This can obviously be quite slow on tables that have more than few rows.
     353             :  */
     354             : bool
     355         298 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     356             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     357             : {
     358             :     TupleTableSlot *scanslot;
     359             :     TableScanDesc scan;
     360             :     SnapshotData snap;
     361             :     TypeCacheEntry **eq;
     362             :     TransactionId xwait;
     363             :     bool        found;
     364         298 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     365             : 
     366             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     367             : 
     368         298 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     369             : 
     370             :     /* Start a heap scan. */
     371         298 :     InitDirtySnapshot(snap);
     372         298 :     scan = table_beginscan(rel, &snap, 0, NULL);
     373         298 :     scanslot = table_slot_create(rel, NULL);
     374             : 
     375         298 : retry:
     376         298 :     found = false;
     377             : 
     378         298 :     table_rescan(scan, NULL);
     379             : 
     380             :     /* Try to find the tuple */
     381      210616 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     382             :     {
     383      210612 :         if (!tuples_equal(scanslot, searchslot, eq))
     384      210318 :             continue;
     385             : 
     386         294 :         found = true;
     387         294 :         ExecCopySlot(outslot, scanslot);
     388             : 
     389         588 :         xwait = TransactionIdIsValid(snap.xmin) ?
     390         294 :             snap.xmin : snap.xmax;
     391             : 
     392             :         /*
     393             :          * If the tuple is locked, wait for locking transaction to finish and
     394             :          * retry.
     395             :          */
     396         294 :         if (TransactionIdIsValid(xwait))
     397             :         {
     398           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     399           0 :             goto retry;
     400             :         }
     401             : 
     402             :         /* Found our tuple and it's not locked */
     403         294 :         break;
     404             :     }
     405             : 
     406             :     /* Found tuple, try to lock it in the lockmode. */
     407         298 :     if (found)
     408             :     {
     409             :         TM_FailureData tmfd;
     410             :         TM_Result   res;
     411             : 
     412         294 :         PushActiveSnapshot(GetLatestSnapshot());
     413             : 
     414         294 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     415             :                                outslot,
     416             :                                GetCurrentCommandId(false),
     417             :                                lockmode,
     418             :                                LockWaitBlock,
     419             :                                0 /* don't follow updates */ ,
     420             :                                &tmfd);
     421             : 
     422         294 :         PopActiveSnapshot();
     423             : 
     424         294 :         if (should_refetch_tuple(res, &tmfd))
     425           0 :             goto retry;
     426             :     }
     427             : 
     428         298 :     table_endscan(scan);
     429         298 :     ExecDropSingleTupleTableSlot(scanslot);
     430             : 
     431         298 :     return found;
     432             : }
     433             : 
     434             : /*
     435             :  * Build additional index information necessary for conflict detection.
     436             :  */
     437             : static void
     438          34 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
     439             : {
     440          96 :     for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
     441             :     {
     442          62 :         Relation    indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
     443          62 :         IndexInfo  *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
     444             : 
     445          62 :         if (conflictindex != RelationGetRelid(indexRelation))
     446          28 :             continue;
     447             : 
     448             :         /*
     449             :          * This Assert will fail if BuildSpeculativeIndexInfo() is called
     450             :          * twice for the given index.
     451             :          */
     452             :         Assert(indexRelationInfo->ii_UniqueOps == NULL);
     453             : 
     454          34 :         BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
     455             :     }
     456          34 : }
     457             : 
     458             : /*
     459             :  * Find the tuple that violates the passed unique index (conflictindex).
     460             :  *
     461             :  * If the conflicting tuple is found return true, otherwise false.
     462             :  *
     463             :  * We lock the tuple to avoid getting it deleted before the caller can fetch
     464             :  * the required information. Note that if the tuple is deleted before a lock
     465             :  * is acquired, we will retry to find the conflicting tuple again.
     466             :  */
     467             : static bool
     468          34 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
     469             :                   Oid conflictindex, TupleTableSlot *slot,
     470             :                   TupleTableSlot **conflictslot)
     471             : {
     472          34 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     473             :     ItemPointerData conflictTid;
     474             :     TM_FailureData tmfd;
     475             :     TM_Result   res;
     476             : 
     477          34 :     *conflictslot = NULL;
     478             : 
     479             :     /*
     480             :      * Build additional information required to check constraints violations.
     481             :      * See check_exclusion_or_unique_constraint().
     482             :      */
     483          34 :     BuildConflictIndexInfo(resultRelInfo, conflictindex);
     484             : 
     485          34 : retry:
     486          34 :     if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
     487             :                                   &conflictTid, &slot->tts_tid,
     488          34 :                                   list_make1_oid(conflictindex)))
     489             :     {
     490           2 :         if (*conflictslot)
     491           0 :             ExecDropSingleTupleTableSlot(*conflictslot);
     492             : 
     493           2 :         *conflictslot = NULL;
     494           2 :         return false;
     495             :     }
     496             : 
     497          30 :     *conflictslot = table_slot_create(rel, NULL);
     498             : 
     499          30 :     PushActiveSnapshot(GetLatestSnapshot());
     500             : 
     501          30 :     res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
     502             :                            *conflictslot,
     503             :                            GetCurrentCommandId(false),
     504             :                            LockTupleShare,
     505             :                            LockWaitBlock,
     506             :                            0 /* don't follow updates */ ,
     507             :                            &tmfd);
     508             : 
     509          30 :     PopActiveSnapshot();
     510             : 
     511          30 :     if (should_refetch_tuple(res, &tmfd))
     512           0 :         goto retry;
     513             : 
     514          30 :     return true;
     515             : }
     516             : 
     517             : /*
     518             :  * Check all the unique indexes in 'recheckIndexes' for conflict with the
     519             :  * tuple in 'remoteslot' and report if found.
     520             :  */
     521             : static void
     522          24 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
     523             :                        ConflictType type, List *recheckIndexes,
     524             :                        TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
     525             : {
     526          24 :     List       *conflicttuples = NIL;
     527             :     TupleTableSlot *conflictslot;
     528             : 
     529             :     /* Check all the unique indexes for conflicts */
     530          78 :     foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
     531             :     {
     532          66 :         if (list_member_oid(recheckIndexes, uniqueidx) &&
     533          34 :             FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
     534             :                               &conflictslot))
     535             :         {
     536          30 :             ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
     537             : 
     538          30 :             conflicttuple->slot = conflictslot;
     539          30 :             conflicttuple->indexoid = uniqueidx;
     540             : 
     541          30 :             GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
     542             :                                     &conflicttuple->origin, &conflicttuple->ts);
     543             : 
     544          30 :             conflicttuples = lappend(conflicttuples, conflicttuple);
     545             :         }
     546             :     }
     547             : 
     548             :     /* Report the conflict, if found */
     549          22 :     if (conflicttuples)
     550          20 :         ReportApplyConflict(estate, resultRelInfo, ERROR,
     551          20 :                             list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
     552             :                             searchslot, remoteslot, conflicttuples);
     553           2 : }
     554             : 
     555             : /*
     556             :  * Insert tuple represented in the slot to the relation, update the indexes,
     557             :  * and execute any constraints and per-row triggers.
     558             :  *
     559             :  * Caller is responsible for opening the indexes.
     560             :  */
     561             : void
     562      152504 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     563             :                          EState *estate, TupleTableSlot *slot)
     564             : {
     565      152504 :     bool        skip_tuple = false;
     566      152504 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     567             : 
     568             :     /* For now we support only tables. */
     569             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     570             : 
     571      152504 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     572             : 
     573             :     /* BEFORE ROW INSERT Triggers */
     574      152504 :     if (resultRelInfo->ri_TrigDesc &&
     575          38 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     576             :     {
     577           6 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     578           2 :             skip_tuple = true;  /* "do nothing" */
     579             :     }
     580             : 
     581      152504 :     if (!skip_tuple)
     582             :     {
     583      152502 :         List       *recheckIndexes = NIL;
     584             :         List       *conflictindexes;
     585      152502 :         bool        conflict = false;
     586             : 
     587             :         /* Compute stored generated columns */
     588      152502 :         if (rel->rd_att->constr &&
     589       90932 :             rel->rd_att->constr->has_generated_stored)
     590           8 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     591             :                                        CMD_INSERT);
     592             : 
     593             :         /* Check the constraints of the tuple */
     594      152502 :         if (rel->rd_att->constr)
     595       90932 :             ExecConstraints(resultRelInfo, slot, estate);
     596      152502 :         if (rel->rd_rel->relispartition)
     597         122 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     598             : 
     599             :         /* OK, store the tuple and create index entries for it */
     600      152502 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     601             : 
     602      152502 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     603             : 
     604      152502 :         if (resultRelInfo->ri_NumIndices > 0)
     605      111824 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     606             :                                                    slot, estate, false,
     607             :                                                    conflictindexes ? true : false,
     608             :                                                    &conflict,
     609             :                                                    conflictindexes, false);
     610             : 
     611             :         /*
     612             :          * Checks the conflict indexes to fetch the conflicting local tuple
     613             :          * and reports the conflict. We perform this check here, instead of
     614             :          * performing an additional index scan before the actual insertion and
     615             :          * reporting the conflict if any conflicting tuples are found. This is
     616             :          * to avoid the overhead of executing the extra scan for each INSERT
     617             :          * operation, even when no conflict arises, which could introduce
     618             :          * significant overhead to replication, particularly in cases where
     619             :          * conflicts are rare.
     620             :          *
     621             :          * XXX OTOH, this could lead to clean-up effort for dead tuples added
     622             :          * in heap and index in case of conflicts. But as conflicts shouldn't
     623             :          * be a frequent thing so we preferred to save the performance
     624             :          * overhead of extra scan before each insertion.
     625             :          */
     626      152502 :         if (conflict)
     627          20 :             CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
     628             :                                    recheckIndexes, NULL, slot);
     629             : 
     630             :         /* AFTER ROW INSERT Triggers */
     631      152484 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     632             :                              recheckIndexes, NULL);
     633             : 
     634             :         /*
     635             :          * XXX we should in theory pass a TransitionCaptureState object to the
     636             :          * above to capture transition tuples, but after statement triggers
     637             :          * don't actually get fired by replication yet anyway
     638             :          */
     639             : 
     640      152484 :         list_free(recheckIndexes);
     641             :     }
     642      152486 : }
     643             : 
     644             : /*
     645             :  * Find the searchslot tuple and update it with data in the slot,
     646             :  * update the indexes, and execute any constraints and per-row triggers.
     647             :  *
     648             :  * Caller is responsible for opening the indexes.
     649             :  */
     650             : void
     651       63846 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     652             :                          EState *estate, EPQState *epqstate,
     653             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     654             : {
     655       63846 :     bool        skip_tuple = false;
     656       63846 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     657       63846 :     ItemPointer tid = &(searchslot->tts_tid);
     658             : 
     659             :     /*
     660             :      * We support only non-system tables, with
     661             :      * check_publication_add_relation() accountable.
     662             :      */
     663             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     664             :     Assert(!IsCatalogRelation(rel));
     665             : 
     666       63846 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     667             : 
     668             :     /* BEFORE ROW UPDATE Triggers */
     669       63846 :     if (resultRelInfo->ri_TrigDesc &&
     670          20 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     671             :     {
     672           6 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     673             :                                   tid, NULL, slot, NULL, NULL))
     674           4 :             skip_tuple = true;  /* "do nothing" */
     675             :     }
     676             : 
     677       63846 :     if (!skip_tuple)
     678             :     {
     679       63842 :         List       *recheckIndexes = NIL;
     680             :         TU_UpdateIndexes update_indexes;
     681             :         List       *conflictindexes;
     682       63842 :         bool        conflict = false;
     683             : 
     684             :         /* Compute stored generated columns */
     685       63842 :         if (rel->rd_att->constr &&
     686       63752 :             rel->rd_att->constr->has_generated_stored)
     687           4 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     688             :                                        CMD_UPDATE);
     689             : 
     690             :         /* Check the constraints of the tuple */
     691       63842 :         if (rel->rd_att->constr)
     692       63752 :             ExecConstraints(resultRelInfo, slot, estate);
     693       63842 :         if (rel->rd_rel->relispartition)
     694          24 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     695             : 
     696       63842 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     697             :                                   &update_indexes);
     698             : 
     699       63842 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     700             : 
     701       63842 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     702       40316 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     703             :                                                    slot, estate, true,
     704             :                                                    conflictindexes ? true : false,
     705             :                                                    &conflict, conflictindexes,
     706             :                                                    (update_indexes == TU_Summarizing));
     707             : 
     708             :         /*
     709             :          * Refer to the comments above the call to CheckAndReportConflict() in
     710             :          * ExecSimpleRelationInsert to understand why this check is done at
     711             :          * this point.
     712             :          */
     713       63842 :         if (conflict)
     714           4 :             CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
     715             :                                    recheckIndexes, searchslot, slot);
     716             : 
     717             :         /* AFTER ROW UPDATE Triggers */
     718       63838 :         ExecARUpdateTriggers(estate, resultRelInfo,
     719             :                              NULL, NULL,
     720             :                              tid, NULL, slot,
     721             :                              recheckIndexes, NULL, false);
     722             : 
     723       63838 :         list_free(recheckIndexes);
     724             :     }
     725       63842 : }
     726             : 
     727             : /*
     728             :  * Find the searchslot tuple and delete it, and execute any constraints
     729             :  * and per-row triggers.
     730             :  *
     731             :  * Caller is responsible for opening the indexes.
     732             :  */
     733             : void
     734       80620 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     735             :                          EState *estate, EPQState *epqstate,
     736             :                          TupleTableSlot *searchslot)
     737             : {
     738       80620 :     bool        skip_tuple = false;
     739       80620 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     740       80620 :     ItemPointer tid = &searchslot->tts_tid;
     741             : 
     742       80620 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     743             : 
     744             :     /* BEFORE ROW DELETE Triggers */
     745       80620 :     if (resultRelInfo->ri_TrigDesc &&
     746          20 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     747             :     {
     748           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     749           0 :                                            tid, NULL, NULL, NULL, NULL);
     750             :     }
     751             : 
     752       80620 :     if (!skip_tuple)
     753             :     {
     754             :         /* OK, delete the tuple */
     755       80620 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     756             : 
     757             :         /* AFTER ROW DELETE Triggers */
     758       80620 :         ExecARDeleteTriggers(estate, resultRelInfo,
     759             :                              tid, NULL, NULL, false);
     760             :     }
     761       80620 : }
     762             : 
     763             : /*
     764             :  * Check if command can be executed with current replica identity.
     765             :  */
     766             : void
     767      431208 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     768             : {
     769             :     PublicationDesc pubdesc;
     770             : 
     771             :     /*
     772             :      * Skip checking the replica identity for partitioned tables, because the
     773             :      * operations are actually performed on the leaf partitions.
     774             :      */
     775      431208 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     776      409210 :         return;
     777             : 
     778             :     /* We only need to do checks for UPDATE and DELETE. */
     779      425202 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     780      252760 :         return;
     781             : 
     782             :     /*
     783             :      * It is only safe to execute UPDATE/DELETE if the relation does not
     784             :      * publish UPDATEs or DELETEs, or all the following conditions are
     785             :      * satisfied:
     786             :      *
     787             :      * 1. All columns, referenced in the row filters from publications which
     788             :      * the relation is in, are valid - i.e. when all referenced columns are
     789             :      * part of REPLICA IDENTITY.
     790             :      *
     791             :      * 2. All columns, referenced in the column lists are valid - i.e. when
     792             :      * all columns referenced in the REPLICA IDENTITY are covered by the
     793             :      * column list.
     794             :      *
     795             :      * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
     796             :      * - i.e. when all these generated columns are published.
     797             :      *
     798             :      * XXX We could optimize it by first checking whether any of the
     799             :      * publications have a row filter or column list for this relation, or if
     800             :      * the relation contains a generated column. If none of these exist and
     801             :      * the relation has replica identity then we can avoid building the
     802             :      * descriptor but as this happens only one time it doesn't seem worth the
     803             :      * additional complexity.
     804             :      */
     805      172442 :     RelationBuildPublicationDesc(rel, &pubdesc);
     806      172442 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
     807          60 :         ereport(ERROR,
     808             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     809             :                  errmsg("cannot update table \"%s\"",
     810             :                         RelationGetRelationName(rel)),
     811             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     812      172382 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
     813         108 :         ereport(ERROR,
     814             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     815             :                  errmsg("cannot update table \"%s\"",
     816             :                         RelationGetRelationName(rel)),
     817             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     818      172274 :     else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
     819          24 :         ereport(ERROR,
     820             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     821             :                  errmsg("cannot update table \"%s\"",
     822             :                         RelationGetRelationName(rel)),
     823             :                  errdetail("Replica identity must not contain unpublished generated columns.")));
     824      172250 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
     825           0 :         ereport(ERROR,
     826             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     827             :                  errmsg("cannot delete from table \"%s\"",
     828             :                         RelationGetRelationName(rel)),
     829             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     830      172250 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
     831           0 :         ereport(ERROR,
     832             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     833             :                  errmsg("cannot delete from table \"%s\"",
     834             :                         RelationGetRelationName(rel)),
     835             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     836      172250 :     else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
     837           0 :         ereport(ERROR,
     838             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     839             :                  errmsg("cannot delete from table \"%s\"",
     840             :                         RelationGetRelationName(rel)),
     841             :                  errdetail("Replica identity must not contain unpublished generated columns.")));
     842             : 
     843             :     /* If relation has replica identity we are always good. */
     844      172250 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
     845      149994 :         return;
     846             : 
     847             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
     848       22256 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     849         450 :         return;
     850             : 
     851             :     /*
     852             :      * This is UPDATE/DELETE and there is no replica identity.
     853             :      *
     854             :      * Check if the table publishes UPDATES or DELETES.
     855             :      */
     856       21806 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
     857         106 :         ereport(ERROR,
     858             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     859             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     860             :                         RelationGetRelationName(rel)),
     861             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     862       21700 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
     863          10 :         ereport(ERROR,
     864             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     865             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     866             :                         RelationGetRelationName(rel)),
     867             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     868             : }
     869             : 
     870             : 
     871             : /*
     872             :  * Check if we support writing into specific relkind.
     873             :  *
     874             :  * The nspname and relname are only needed for error reporting.
     875             :  */
     876             : void
     877        1738 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     878             :                          const char *relname)
     879             : {
     880        1738 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
     881           0 :         ereport(ERROR,
     882             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     883             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     884             :                         nspname, relname),
     885             :                  errdetail_relkind_not_supported(relkind)));
     886        1738 : }

Generated by: LCOV version 1.14