LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 227 262 86.6 %
Date: 2024-11-21 08:14:44 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/relscan.h"
      19             : #include "access/tableam.h"
      20             : #include "access/transam.h"
      21             : #include "access/xact.h"
      22             : #include "catalog/pg_am_d.h"
      23             : #include "commands/trigger.h"
      24             : #include "executor/executor.h"
      25             : #include "executor/nodeModifyTable.h"
      26             : #include "replication/conflict.h"
      27             : #include "replication/logicalrelation.h"
      28             : #include "storage/lmgr.h"
      29             : #include "utils/builtins.h"
      30             : #include "utils/lsyscache.h"
      31             : #include "utils/rel.h"
      32             : #include "utils/snapmgr.h"
      33             : #include "utils/syscache.h"
      34             : #include "utils/typcache.h"
      35             : 
      36             : 
      37             : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      38             :                          TypeCacheEntry **eq);
      39             : 
      40             : /*
      41             :  * Returns the fixed strategy number, if any, of the equality operator for the
      42             :  * given index access method, otherwise, InvalidStrategy.
      43             :  *
      44             :  * Currently, only Btree and Hash indexes are supported. The other index access
      45             :  * methods don't have a fixed strategy for equality operation - instead, the
      46             :  * support routines of each operator class interpret the strategy numbers
      47             :  * according to the operator class's definition.
      48             :  */
      49             : StrategyNumber
      50      144232 : get_equal_strategy_number_for_am(Oid am)
      51             : {
      52             :     int         ret;
      53             : 
      54      144232 :     switch (am)
      55             :     {
      56      144220 :         case BTREE_AM_OID:
      57      144220 :             ret = BTEqualStrategyNumber;
      58      144220 :             break;
      59          12 :         case HASH_AM_OID:
      60          12 :             ret = HTEqualStrategyNumber;
      61          12 :             break;
      62           0 :         default:
      63             :             /* XXX: Only Btree and Hash indexes are supported */
      64           0 :             ret = InvalidStrategy;
      65           0 :             break;
      66             :     }
      67             : 
      68      144232 :     return ret;
      69             : }
      70             : 
      71             : /*
      72             :  * Return the appropriate strategy number which corresponds to the equality
      73             :  * operator.
      74             :  */
      75             : static StrategyNumber
      76      144196 : get_equal_strategy_number(Oid opclass)
      77             : {
      78      144196 :     Oid         am = get_opclass_method(opclass);
      79             : 
      80      144196 :     return get_equal_strategy_number_for_am(am);
      81             : }
      82             : 
      83             : /*
      84             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      85             :  * is setup to match 'rel' (*NOT* idxrel!).
      86             :  *
      87             :  * Returns how many columns to use for the index scan.
      88             :  *
      89             :  * This is not generic routine, idxrel must be PK, RI, or an index that can be
      90             :  * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      91             :  * for details.
      92             :  *
      93             :  * By definition, replication identity of a rel meets all limitations associated
      94             :  * with that. Note that any other index could also meet these limitations.
      95             :  */
      96             : static int
      97      144174 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      98             :                          TupleTableSlot *searchslot)
      99             : {
     100             :     int         index_attoff;
     101      144174 :     int         skey_attoff = 0;
     102             :     Datum       indclassDatum;
     103             :     oidvector  *opclass;
     104      144174 :     int2vector *indkey = &idxrel->rd_index->indkey;
     105             : 
     106      144174 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
     107             :                                            Anum_pg_index_indclass);
     108      144174 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
     109             : 
     110             :     /* Build scankey for every non-expression attribute in the index. */
     111      288374 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
     112      144200 :          index_attoff++)
     113             :     {
     114             :         Oid         operator;
     115             :         Oid         optype;
     116             :         Oid         opfamily;
     117             :         RegProcedure regop;
     118      144200 :         int         table_attno = indkey->values[index_attoff];
     119             :         StrategyNumber eq_strategy;
     120             : 
     121      144200 :         if (!AttributeNumberIsValid(table_attno))
     122             :         {
     123             :             /*
     124             :              * XXX: Currently, we don't support expressions in the scan key,
     125             :              * see code below.
     126             :              */
     127           4 :             continue;
     128             :         }
     129             : 
     130             :         /*
     131             :          * Load the operator info.  We need this to get the equality operator
     132             :          * function for the scan key.
     133             :          */
     134      144196 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
     135      144196 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
     136      144196 :         eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
     137             : 
     138      144196 :         operator = get_opfamily_member(opfamily, optype,
     139             :                                        optype,
     140             :                                        eq_strategy);
     141             : 
     142      144196 :         if (!OidIsValid(operator))
     143           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     144             :                  eq_strategy, optype, optype, opfamily);
     145             : 
     146      144196 :         regop = get_opcode(operator);
     147             : 
     148             :         /* Initialize the scankey. */
     149      144196 :         ScanKeyInit(&skey[skey_attoff],
     150      144196 :                     index_attoff + 1,
     151             :                     eq_strategy,
     152             :                     regop,
     153      144196 :                     searchslot->tts_values[table_attno - 1]);
     154             : 
     155      144196 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     156             : 
     157             :         /* Check for null value. */
     158      144196 :         if (searchslot->tts_isnull[table_attno - 1])
     159           2 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     160             : 
     161      144196 :         skey_attoff++;
     162             :     }
     163             : 
     164             :     /* There must always be at least one attribute for the index scan. */
     165             :     Assert(skey_attoff > 0);
     166             : 
     167      144174 :     return skey_attoff;
     168             : }
     169             : 
     170             : 
     171             : /*
     172             :  * Helper function to check if it is necessary to re-fetch and lock the tuple
     173             :  * due to concurrent modifications. This function should be called after
     174             :  * invoking table_tuple_lock.
     175             :  */
     176             : static bool
     177      144456 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
     178             : {
     179      144456 :     bool        refetch = false;
     180             : 
     181      144456 :     switch (res)
     182             :     {
     183      144456 :         case TM_Ok:
     184      144456 :             break;
     185           0 :         case TM_Updated:
     186             :             /* XXX: Improve handling here */
     187           0 :             if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
     188           0 :                 ereport(LOG,
     189             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     190             :                          errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     191             :             else
     192           0 :                 ereport(LOG,
     193             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     194             :                          errmsg("concurrent update, retrying")));
     195           0 :             refetch = true;
     196           0 :             break;
     197           0 :         case TM_Deleted:
     198             :             /* XXX: Improve handling here */
     199           0 :             ereport(LOG,
     200             :                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     201             :                      errmsg("concurrent delete, retrying")));
     202           0 :             refetch = true;
     203           0 :             break;
     204           0 :         case TM_Invisible:
     205           0 :             elog(ERROR, "attempted to lock invisible tuple");
     206             :             break;
     207           0 :         default:
     208           0 :             elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     209             :             break;
     210             :     }
     211             : 
     212      144456 :     return refetch;
     213             : }
     214             : 
     215             : /*
     216             :  * Search the relation 'rel' for tuple using the index.
     217             :  *
     218             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     219             :  * contents, and return true.  Return false otherwise.
     220             :  */
     221             : bool
     222      144174 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     223             :                              LockTupleMode lockmode,
     224             :                              TupleTableSlot *searchslot,
     225             :                              TupleTableSlot *outslot)
     226             : {
     227             :     ScanKeyData skey[INDEX_MAX_KEYS];
     228             :     int         skey_attoff;
     229             :     IndexScanDesc scan;
     230             :     SnapshotData snap;
     231             :     TransactionId xwait;
     232             :     Relation    idxrel;
     233             :     bool        found;
     234      144174 :     TypeCacheEntry **eq = NULL;
     235             :     bool        isIdxSafeToSkipDuplicates;
     236             : 
     237             :     /* Open the index. */
     238      144174 :     idxrel = index_open(idxoid, RowExclusiveLock);
     239             : 
     240      144174 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     241             : 
     242      144174 :     InitDirtySnapshot(snap);
     243             : 
     244             :     /* Build scan key. */
     245      144174 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     246             : 
     247             :     /* Start an index scan. */
     248      144174 :     scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
     249             : 
     250      144174 : retry:
     251      144174 :     found = false;
     252             : 
     253      144174 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     254             : 
     255             :     /* Try to find the tuple */
     256      144174 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     257             :     {
     258             :         /*
     259             :          * Avoid expensive equality check if the index is primary key or
     260             :          * replica identity index.
     261             :          */
     262      144150 :         if (!isIdxSafeToSkipDuplicates)
     263             :         {
     264          30 :             if (eq == NULL)
     265          30 :                 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     266             : 
     267          30 :             if (!tuples_equal(outslot, searchslot, eq))
     268           0 :                 continue;
     269             :         }
     270             : 
     271      144150 :         ExecMaterializeSlot(outslot);
     272             : 
     273      288300 :         xwait = TransactionIdIsValid(snap.xmin) ?
     274      144150 :             snap.xmin : snap.xmax;
     275             : 
     276             :         /*
     277             :          * If the tuple is locked, wait for locking transaction to finish and
     278             :          * retry.
     279             :          */
     280      144150 :         if (TransactionIdIsValid(xwait))
     281             :         {
     282           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     283           0 :             goto retry;
     284             :         }
     285             : 
     286             :         /* Found our tuple and it's not locked */
     287      144150 :         found = true;
     288      144150 :         break;
     289             :     }
     290             : 
     291             :     /* Found tuple, try to lock it in the lockmode. */
     292      144174 :     if (found)
     293             :     {
     294             :         TM_FailureData tmfd;
     295             :         TM_Result   res;
     296             : 
     297      144150 :         PushActiveSnapshot(GetLatestSnapshot());
     298             : 
     299      144150 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     300             :                                outslot,
     301             :                                GetCurrentCommandId(false),
     302             :                                lockmode,
     303             :                                LockWaitBlock,
     304             :                                0 /* don't follow updates */ ,
     305             :                                &tmfd);
     306             : 
     307      144150 :         PopActiveSnapshot();
     308             : 
     309      144150 :         if (should_refetch_tuple(res, &tmfd))
     310           0 :             goto retry;
     311             :     }
     312             : 
     313      144174 :     index_endscan(scan);
     314             : 
     315             :     /* Don't release lock until commit. */
     316      144174 :     index_close(idxrel, NoLock);
     317             : 
     318      144174 :     return found;
     319             : }
     320             : 
     321             : /*
     322             :  * Compare the tuples in the slots by checking if they have equal values.
     323             :  */
     324             : static bool
     325      210636 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     326             :              TypeCacheEntry **eq)
     327             : {
     328             :     int         attrnum;
     329             : 
     330             :     Assert(slot1->tts_tupleDescriptor->natts ==
     331             :            slot2->tts_tupleDescriptor->natts);
     332             : 
     333      210636 :     slot_getallattrs(slot1);
     334      210636 :     slot_getallattrs(slot2);
     335             : 
     336             :     /* Check equality of the attributes. */
     337      211018 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     338             :     {
     339             :         Form_pg_attribute att;
     340             :         TypeCacheEntry *typentry;
     341             : 
     342      210696 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     343             : 
     344             :         /*
     345             :          * Ignore dropped and generated columns as the publisher doesn't send
     346             :          * those
     347             :          */
     348      210696 :         if (att->attisdropped || att->attgenerated)
     349           4 :             continue;
     350             : 
     351             :         /*
     352             :          * If one value is NULL and other is not, then they are certainly not
     353             :          * equal
     354             :          */
     355      210692 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     356           0 :             return false;
     357             : 
     358             :         /*
     359             :          * If both are NULL, they can be considered equal.
     360             :          */
     361      210692 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     362           2 :             continue;
     363             : 
     364      210690 :         typentry = eq[attrnum];
     365      210690 :         if (typentry == NULL)
     366             :         {
     367         380 :             typentry = lookup_type_cache(att->atttypid,
     368             :                                          TYPECACHE_EQ_OPR_FINFO);
     369         380 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     370           0 :                 ereport(ERROR,
     371             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     372             :                          errmsg("could not identify an equality operator for type %s",
     373             :                                 format_type_be(att->atttypid))));
     374         380 :             eq[attrnum] = typentry;
     375             :         }
     376             : 
     377      210690 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     378             :                                             att->attcollation,
     379      210690 :                                             slot1->tts_values[attrnum],
     380      210690 :                                             slot2->tts_values[attrnum])))
     381      210314 :             return false;
     382             :     }
     383             : 
     384         322 :     return true;
     385             : }
     386             : 
     387             : /*
     388             :  * Search the relation 'rel' for tuple using the sequential scan.
     389             :  *
     390             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     391             :  * contents, and return true.  Return false otherwise.
     392             :  *
     393             :  * Note that this stops on the first matching tuple.
     394             :  *
     395             :  * This can obviously be quite slow on tables that have more than few rows.
     396             :  */
     397             : bool
     398         296 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     399             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     400             : {
     401             :     TupleTableSlot *scanslot;
     402             :     TableScanDesc scan;
     403             :     SnapshotData snap;
     404             :     TypeCacheEntry **eq;
     405             :     TransactionId xwait;
     406             :     bool        found;
     407         296 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     408             : 
     409             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     410             : 
     411         296 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     412             : 
     413             :     /* Start a heap scan. */
     414         296 :     InitDirtySnapshot(snap);
     415         296 :     scan = table_beginscan(rel, &snap, 0, NULL);
     416         296 :     scanslot = table_slot_create(rel, NULL);
     417             : 
     418         296 : retry:
     419         296 :     found = false;
     420             : 
     421         296 :     table_rescan(scan, NULL);
     422             : 
     423             :     /* Try to find the tuple */
     424      210610 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     425             :     {
     426      210606 :         if (!tuples_equal(scanslot, searchslot, eq))
     427      210314 :             continue;
     428             : 
     429         292 :         found = true;
     430         292 :         ExecCopySlot(outslot, scanslot);
     431             : 
     432         584 :         xwait = TransactionIdIsValid(snap.xmin) ?
     433         292 :             snap.xmin : snap.xmax;
     434             : 
     435             :         /*
     436             :          * If the tuple is locked, wait for locking transaction to finish and
     437             :          * retry.
     438             :          */
     439         292 :         if (TransactionIdIsValid(xwait))
     440             :         {
     441           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     442           0 :             goto retry;
     443             :         }
     444             : 
     445             :         /* Found our tuple and it's not locked */
     446         292 :         break;
     447             :     }
     448             : 
     449             :     /* Found tuple, try to lock it in the lockmode. */
     450         296 :     if (found)
     451             :     {
     452             :         TM_FailureData tmfd;
     453             :         TM_Result   res;
     454             : 
     455         292 :         PushActiveSnapshot(GetLatestSnapshot());
     456             : 
     457         292 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     458             :                                outslot,
     459             :                                GetCurrentCommandId(false),
     460             :                                lockmode,
     461             :                                LockWaitBlock,
     462             :                                0 /* don't follow updates */ ,
     463             :                                &tmfd);
     464             : 
     465         292 :         PopActiveSnapshot();
     466             : 
     467         292 :         if (should_refetch_tuple(res, &tmfd))
     468           0 :             goto retry;
     469             :     }
     470             : 
     471         296 :     table_endscan(scan);
     472         296 :     ExecDropSingleTupleTableSlot(scanslot);
     473             : 
     474         296 :     return found;
     475             : }
     476             : 
     477             : /*
     478             :  * Find the tuple that violates the passed unique index (conflictindex).
     479             :  *
     480             :  * If the conflicting tuple is found return true, otherwise false.
     481             :  *
     482             :  * We lock the tuple to avoid getting it deleted before the caller can fetch
     483             :  * the required information. Note that if the tuple is deleted before a lock
     484             :  * is acquired, we will retry to find the conflicting tuple again.
     485             :  */
     486             : static bool
     487          18 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
     488             :                   Oid conflictindex, TupleTableSlot *slot,
     489             :                   TupleTableSlot **conflictslot)
     490             : {
     491          18 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     492             :     ItemPointerData conflictTid;
     493             :     TM_FailureData tmfd;
     494             :     TM_Result   res;
     495             : 
     496          18 :     *conflictslot = NULL;
     497             : 
     498          18 : retry:
     499          18 :     if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
     500             :                                   &conflictTid, &slot->tts_tid,
     501          18 :                                   list_make1_oid(conflictindex)))
     502             :     {
     503           2 :         if (*conflictslot)
     504           0 :             ExecDropSingleTupleTableSlot(*conflictslot);
     505             : 
     506           2 :         *conflictslot = NULL;
     507           2 :         return false;
     508             :     }
     509             : 
     510          14 :     *conflictslot = table_slot_create(rel, NULL);
     511             : 
     512          14 :     PushActiveSnapshot(GetLatestSnapshot());
     513             : 
     514          14 :     res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
     515             :                            *conflictslot,
     516             :                            GetCurrentCommandId(false),
     517             :                            LockTupleShare,
     518             :                            LockWaitBlock,
     519             :                            0 /* don't follow updates */ ,
     520             :                            &tmfd);
     521             : 
     522          14 :     PopActiveSnapshot();
     523             : 
     524          14 :     if (should_refetch_tuple(res, &tmfd))
     525           0 :         goto retry;
     526             : 
     527          14 :     return true;
     528             : }
     529             : 
     530             : /*
     531             :  * Check all the unique indexes in 'recheckIndexes' for conflict with the
     532             :  * tuple in 'remoteslot' and report if found.
     533             :  */
     534             : static void
     535          18 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
     536             :                        ConflictType type, List *recheckIndexes,
     537             :                        TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
     538             : {
     539             :     /* Check all the unique indexes for a conflict */
     540          22 :     foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
     541             :     {
     542             :         TupleTableSlot *conflictslot;
     543             : 
     544          34 :         if (list_member_oid(recheckIndexes, uniqueidx) &&
     545          18 :             FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
     546             :                               &conflictslot))
     547             :         {
     548             :             RepOriginId origin;
     549             :             TimestampTz committs;
     550             :             TransactionId xmin;
     551             : 
     552          14 :             GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
     553          14 :             ReportApplyConflict(estate, resultRelInfo, ERROR, type,
     554             :                                 searchslot, conflictslot, remoteslot,
     555             :                                 uniqueidx, xmin, origin, committs);
     556             :         }
     557             :     }
     558           2 : }
     559             : 
     560             : /*
     561             :  * Insert tuple represented in the slot to the relation, update the indexes,
     562             :  * and execute any constraints and per-row triggers.
     563             :  *
     564             :  * Caller is responsible for opening the indexes.
     565             :  */
     566             : void
     567      152496 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     568             :                          EState *estate, TupleTableSlot *slot)
     569             : {
     570      152496 :     bool        skip_tuple = false;
     571      152496 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     572             : 
     573             :     /* For now we support only tables. */
     574             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     575             : 
     576      152496 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     577             : 
     578             :     /* BEFORE ROW INSERT Triggers */
     579      152496 :     if (resultRelInfo->ri_TrigDesc &&
     580          38 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     581             :     {
     582           6 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     583           2 :             skip_tuple = true;  /* "do nothing" */
     584             :     }
     585             : 
     586      152496 :     if (!skip_tuple)
     587             :     {
     588      152494 :         List       *recheckIndexes = NIL;
     589             :         List       *conflictindexes;
     590      152494 :         bool        conflict = false;
     591             : 
     592             :         /* Compute stored generated columns */
     593      152494 :         if (rel->rd_att->constr &&
     594       90890 :             rel->rd_att->constr->has_generated_stored)
     595           8 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     596             :                                        CMD_INSERT);
     597             : 
     598             :         /* Check the constraints of the tuple */
     599      152494 :         if (rel->rd_att->constr)
     600       90890 :             ExecConstraints(resultRelInfo, slot, estate);
     601      152494 :         if (rel->rd_rel->relispartition)
     602         120 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     603             : 
     604             :         /* OK, store the tuple and create index entries for it */
     605      152494 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     606             : 
     607      152494 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     608             : 
     609      152494 :         if (resultRelInfo->ri_NumIndices > 0)
     610      111840 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     611             :                                                    slot, estate, false,
     612             :                                                    conflictindexes ? true : false,
     613             :                                                    &conflict,
     614             :                                                    conflictindexes, false);
     615             : 
     616             :         /*
     617             :          * Checks the conflict indexes to fetch the conflicting local tuple
     618             :          * and reports the conflict. We perform this check here, instead of
     619             :          * performing an additional index scan before the actual insertion and
     620             :          * reporting the conflict if any conflicting tuples are found. This is
     621             :          * to avoid the overhead of executing the extra scan for each INSERT
     622             :          * operation, even when no conflict arises, which could introduce
     623             :          * significant overhead to replication, particularly in cases where
     624             :          * conflicts are rare.
     625             :          *
     626             :          * XXX OTOH, this could lead to clean-up effort for dead tuples added
     627             :          * in heap and index in case of conflicts. But as conflicts shouldn't
     628             :          * be a frequent thing so we preferred to save the performance
     629             :          * overhead of extra scan before each insertion.
     630             :          */
     631      152494 :         if (conflict)
     632          16 :             CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
     633             :                                    recheckIndexes, NULL, slot);
     634             : 
     635             :         /* AFTER ROW INSERT Triggers */
     636      152480 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     637             :                              recheckIndexes, NULL);
     638             : 
     639             :         /*
     640             :          * XXX we should in theory pass a TransitionCaptureState object to the
     641             :          * above to capture transition tuples, but after statement triggers
     642             :          * don't actually get fired by replication yet anyway
     643             :          */
     644             : 
     645      152480 :         list_free(recheckIndexes);
     646             :     }
     647      152482 : }
     648             : 
     649             : /*
     650             :  * Find the searchslot tuple and update it with data in the slot,
     651             :  * update the indexes, and execute any constraints and per-row triggers.
     652             :  *
     653             :  * Caller is responsible for opening the indexes.
     654             :  */
     655             : void
     656       63834 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     657             :                          EState *estate, EPQState *epqstate,
     658             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     659             : {
     660       63834 :     bool        skip_tuple = false;
     661       63834 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     662       63834 :     ItemPointer tid = &(searchslot->tts_tid);
     663             : 
     664             :     /*
     665             :      * We support only non-system tables, with
     666             :      * check_publication_add_relation() accountable.
     667             :      */
     668             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     669             :     Assert(!IsCatalogRelation(rel));
     670             : 
     671       63834 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     672             : 
     673             :     /* BEFORE ROW UPDATE Triggers */
     674       63834 :     if (resultRelInfo->ri_TrigDesc &&
     675          20 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     676             :     {
     677           6 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     678             :                                   tid, NULL, slot, NULL, NULL))
     679           4 :             skip_tuple = true;  /* "do nothing" */
     680             :     }
     681             : 
     682       63834 :     if (!skip_tuple)
     683             :     {
     684       63830 :         List       *recheckIndexes = NIL;
     685             :         TU_UpdateIndexes update_indexes;
     686             :         List       *conflictindexes;
     687       63830 :         bool        conflict = false;
     688             : 
     689             :         /* Compute stored generated columns */
     690       63830 :         if (rel->rd_att->constr &&
     691       63744 :             rel->rd_att->constr->has_generated_stored)
     692           6 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     693             :                                        CMD_UPDATE);
     694             : 
     695             :         /* Check the constraints of the tuple */
     696       63830 :         if (rel->rd_att->constr)
     697       63744 :             ExecConstraints(resultRelInfo, slot, estate);
     698       63830 :         if (rel->rd_rel->relispartition)
     699          24 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     700             : 
     701       63830 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     702             :                                   &update_indexes);
     703             : 
     704       63830 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     705             : 
     706       63830 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     707       40318 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     708             :                                                    slot, estate, true,
     709             :                                                    conflictindexes ? true : false,
     710             :                                                    &conflict, conflictindexes,
     711             :                                                    (update_indexes == TU_Summarizing));
     712             : 
     713             :         /*
     714             :          * Refer to the comments above the call to CheckAndReportConflict() in
     715             :          * ExecSimpleRelationInsert to understand why this check is done at
     716             :          * this point.
     717             :          */
     718       63830 :         if (conflict)
     719           2 :             CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
     720             :                                    recheckIndexes, searchslot, slot);
     721             : 
     722             :         /* AFTER ROW UPDATE Triggers */
     723       63828 :         ExecARUpdateTriggers(estate, resultRelInfo,
     724             :                              NULL, NULL,
     725             :                              tid, NULL, slot,
     726             :                              recheckIndexes, NULL, false);
     727             : 
     728       63828 :         list_free(recheckIndexes);
     729             :     }
     730       63832 : }
     731             : 
     732             : /*
     733             :  * Find the searchslot tuple and delete it, and execute any constraints
     734             :  * and per-row triggers.
     735             :  *
     736             :  * Caller is responsible for opening the indexes.
     737             :  */
     738             : void
     739       80608 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     740             :                          EState *estate, EPQState *epqstate,
     741             :                          TupleTableSlot *searchslot)
     742             : {
     743       80608 :     bool        skip_tuple = false;
     744       80608 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     745       80608 :     ItemPointer tid = &searchslot->tts_tid;
     746             : 
     747       80608 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     748             : 
     749             :     /* BEFORE ROW DELETE Triggers */
     750       80608 :     if (resultRelInfo->ri_TrigDesc &&
     751          20 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     752             :     {
     753           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     754           0 :                                            tid, NULL, NULL, NULL, NULL);
     755             :     }
     756             : 
     757       80608 :     if (!skip_tuple)
     758             :     {
     759             :         /* OK, delete the tuple */
     760       80608 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     761             : 
     762             :         /* AFTER ROW DELETE Triggers */
     763       80608 :         ExecARDeleteTriggers(estate, resultRelInfo,
     764             :                              tid, NULL, NULL, false);
     765             :     }
     766       80608 : }
     767             : 
     768             : /*
     769             :  * Check if command can be executed with current replica identity.
     770             :  */
     771             : void
     772      427600 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     773             : {
     774             :     PublicationDesc pubdesc;
     775             : 
     776             :     /*
     777             :      * Skip checking the replica identity for partitioned tables, because the
     778             :      * operations are actually performed on the leaf partitions.
     779             :      */
     780      427600 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     781      406380 :         return;
     782             : 
     783             :     /* We only need to do checks for UPDATE and DELETE. */
     784      421608 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     785      250018 :         return;
     786             : 
     787             :     /*
     788             :      * It is only safe to execute UPDATE/DELETE when all columns, referenced
     789             :      * in the row filters from publications which the relation is in, are
     790             :      * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
     791             :      * or the table does not publish UPDATEs or DELETEs.
     792             :      *
     793             :      * XXX We could optimize it by first checking whether any of the
     794             :      * publications have a row filter for this relation. If not and relation
     795             :      * has replica identity then we can avoid building the descriptor but as
     796             :      * this happens only one time it doesn't seem worth the additional
     797             :      * complexity.
     798             :      */
     799      171590 :     RelationBuildPublicationDesc(rel, &pubdesc);
     800      171590 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
     801          60 :         ereport(ERROR,
     802             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     803             :                  errmsg("cannot update table \"%s\"",
     804             :                         RelationGetRelationName(rel)),
     805             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     806      171530 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
     807         108 :         ereport(ERROR,
     808             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     809             :                  errmsg("cannot update table \"%s\"",
     810             :                         RelationGetRelationName(rel)),
     811             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     812      171422 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
     813           0 :         ereport(ERROR,
     814             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     815             :                  errmsg("cannot delete from table \"%s\"",
     816             :                         RelationGetRelationName(rel)),
     817             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     818      171422 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
     819           0 :         ereport(ERROR,
     820             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     821             :                  errmsg("cannot delete from table \"%s\"",
     822             :                         RelationGetRelationName(rel)),
     823             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     824             : 
     825             :     /* If relation has replica identity we are always good. */
     826      171422 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
     827      149948 :         return;
     828             : 
     829             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
     830       21474 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     831         422 :         return;
     832             : 
     833             :     /*
     834             :      * This is UPDATE/DELETE and there is no replica identity.
     835             :      *
     836             :      * Check if the table publishes UPDATES or DELETES.
     837             :      */
     838       21052 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
     839          96 :         ereport(ERROR,
     840             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     841             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     842             :                         RelationGetRelationName(rel)),
     843             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     844       20956 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
     845           0 :         ereport(ERROR,
     846             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     847             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     848             :                         RelationGetRelationName(rel)),
     849             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     850             : }
     851             : 
     852             : 
     853             : /*
     854             :  * Check if we support writing into specific relkind.
     855             :  *
     856             :  * The nspname and relname are only needed for error reporting.
     857             :  */
     858             : void
     859        1618 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     860             :                          const char *relname)
     861             : {
     862        1618 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
     863           0 :         ereport(ERROR,
     864             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     865             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     866             :                         nspname, relname),
     867             :                  errdetail_relkind_not_supported(relkind)));
     868        1618 : }

Generated by: LCOV version 1.14