LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 233 269 86.6 %
Date: 2025-01-18 04:15:08 Functions: 13 13 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/gist.h"
      19             : #include "access/relscan.h"
      20             : #include "access/tableam.h"
      21             : #include "access/transam.h"
      22             : #include "access/xact.h"
      23             : #include "catalog/pg_am_d.h"
      24             : #include "commands/trigger.h"
      25             : #include "executor/executor.h"
      26             : #include "executor/nodeModifyTable.h"
      27             : #include "replication/conflict.h"
      28             : #include "replication/logicalrelation.h"
      29             : #include "storage/lmgr.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/lsyscache.h"
      32             : #include "utils/rel.h"
      33             : #include "utils/snapmgr.h"
      34             : #include "utils/syscache.h"
      35             : #include "utils/typcache.h"
      36             : 
      37             : 
      38             : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      39             :                          TypeCacheEntry **eq);
      40             : 
      41             : /*
      42             :  * Returns the fixed strategy number, if any, of the equality operator for the
      43             :  * given operator class, otherwise, InvalidStrategy.
      44             :  */
      45             : StrategyNumber
      46      144304 : get_equal_strategy_number(Oid opclass)
      47             : {
      48      144304 :     Oid         am = get_opclass_method(opclass);
      49             :     int         ret;
      50             : 
      51      144304 :     switch (am)
      52             :     {
      53      144244 :         case BTREE_AM_OID:
      54      144244 :             ret = BTEqualStrategyNumber;
      55      144244 :             break;
      56          12 :         case HASH_AM_OID:
      57          12 :             ret = HTEqualStrategyNumber;
      58          12 :             break;
      59          48 :         case GIST_AM_OID:
      60          48 :             ret = GistTranslateStratnum(opclass, COMPARE_EQ);
      61          48 :             break;
      62           0 :         default:
      63           0 :             ret = InvalidStrategy;
      64           0 :             break;
      65             :     }
      66             : 
      67      144304 :     return ret;
      68             : }
      69             : 
      70             : /*
      71             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      72             :  * is setup to match 'rel' (*NOT* idxrel!).
      73             :  *
      74             :  * Returns how many columns to use for the index scan.
      75             :  *
      76             :  * This is not generic routine, idxrel must be PK, RI, or an index that can be
      77             :  * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      78             :  * for details.
      79             :  *
      80             :  * By definition, replication identity of a rel meets all limitations associated
      81             :  * with that. Note that any other index could also meet these limitations.
      82             :  */
      83             : static int
      84      144194 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      85             :                          TupleTableSlot *searchslot)
      86             : {
      87             :     int         index_attoff;
      88      144194 :     int         skey_attoff = 0;
      89             :     Datum       indclassDatum;
      90             :     oidvector  *opclass;
      91      144194 :     int2vector *indkey = &idxrel->rd_index->indkey;
      92             : 
      93      144194 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
      94             :                                            Anum_pg_index_indclass);
      95      144194 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      96             : 
      97             :     /* Build scankey for every non-expression attribute in the index. */
      98      288434 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
      99      144240 :          index_attoff++)
     100             :     {
     101             :         Oid         operator;
     102             :         Oid         optype;
     103             :         Oid         opfamily;
     104             :         RegProcedure regop;
     105      144240 :         int         table_attno = indkey->values[index_attoff];
     106             :         StrategyNumber eq_strategy;
     107             : 
     108      144240 :         if (!AttributeNumberIsValid(table_attno))
     109             :         {
     110             :             /*
     111             :              * XXX: Currently, we don't support expressions in the scan key,
     112             :              * see code below.
     113             :              */
     114           4 :             continue;
     115             :         }
     116             : 
     117             :         /*
     118             :          * Load the operator info.  We need this to get the equality operator
     119             :          * function for the scan key.
     120             :          */
     121      144236 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
     122      144236 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
     123      144236 :         eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
     124      144236 :         if (!eq_strategy)
     125           0 :             elog(ERROR, "missing equal strategy for opclass %u", opclass->values[index_attoff]);
     126             : 
     127      144236 :         operator = get_opfamily_member(opfamily, optype,
     128             :                                        optype,
     129             :                                        eq_strategy);
     130             : 
     131      144236 :         if (!OidIsValid(operator))
     132           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     133             :                  eq_strategy, optype, optype, opfamily);
     134             : 
     135      144236 :         regop = get_opcode(operator);
     136             : 
     137             :         /* Initialize the scankey. */
     138      144236 :         ScanKeyInit(&skey[skey_attoff],
     139      144236 :                     index_attoff + 1,
     140             :                     eq_strategy,
     141             :                     regop,
     142      144236 :                     searchslot->tts_values[table_attno - 1]);
     143             : 
     144      144236 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     145             : 
     146             :         /* Check for null value. */
     147      144236 :         if (searchslot->tts_isnull[table_attno - 1])
     148           2 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     149             : 
     150      144236 :         skey_attoff++;
     151             :     }
     152             : 
     153             :     /* There must always be at least one attribute for the index scan. */
     154             :     Assert(skey_attoff > 0);
     155             : 
     156      144194 :     return skey_attoff;
     157             : }
     158             : 
     159             : 
     160             : /*
     161             :  * Helper function to check if it is necessary to re-fetch and lock the tuple
     162             :  * due to concurrent modifications. This function should be called after
     163             :  * invoking table_tuple_lock.
     164             :  */
     165             : static bool
     166      144478 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
     167             : {
     168      144478 :     bool        refetch = false;
     169             : 
     170      144478 :     switch (res)
     171             :     {
     172      144478 :         case TM_Ok:
     173      144478 :             break;
     174           0 :         case TM_Updated:
     175             :             /* XXX: Improve handling here */
     176           0 :             if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
     177           0 :                 ereport(LOG,
     178             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     179             :                          errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     180             :             else
     181           0 :                 ereport(LOG,
     182             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     183             :                          errmsg("concurrent update, retrying")));
     184           0 :             refetch = true;
     185           0 :             break;
     186           0 :         case TM_Deleted:
     187             :             /* XXX: Improve handling here */
     188           0 :             ereport(LOG,
     189             :                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     190             :                      errmsg("concurrent delete, retrying")));
     191           0 :             refetch = true;
     192           0 :             break;
     193           0 :         case TM_Invisible:
     194           0 :             elog(ERROR, "attempted to lock invisible tuple");
     195             :             break;
     196           0 :         default:
     197           0 :             elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     198             :             break;
     199             :     }
     200             : 
     201      144478 :     return refetch;
     202             : }
     203             : 
     204             : /*
     205             :  * Search the relation 'rel' for tuple using the index.
     206             :  *
     207             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     208             :  * contents, and return true.  Return false otherwise.
     209             :  */
     210             : bool
     211      144194 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     212             :                              LockTupleMode lockmode,
     213             :                              TupleTableSlot *searchslot,
     214             :                              TupleTableSlot *outslot)
     215             : {
     216             :     ScanKeyData skey[INDEX_MAX_KEYS];
     217             :     int         skey_attoff;
     218             :     IndexScanDesc scan;
     219             :     SnapshotData snap;
     220             :     TransactionId xwait;
     221             :     Relation    idxrel;
     222             :     bool        found;
     223      144194 :     TypeCacheEntry **eq = NULL;
     224             :     bool        isIdxSafeToSkipDuplicates;
     225             : 
     226             :     /* Open the index. */
     227      144194 :     idxrel = index_open(idxoid, RowExclusiveLock);
     228             : 
     229      144194 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     230             : 
     231      144194 :     InitDirtySnapshot(snap);
     232             : 
     233             :     /* Build scan key. */
     234      144194 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     235             : 
     236             :     /* Start an index scan. */
     237      144194 :     scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
     238             : 
     239      144194 : retry:
     240      144194 :     found = false;
     241             : 
     242      144194 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     243             : 
     244             :     /* Try to find the tuple */
     245      144194 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     246             :     {
     247             :         /*
     248             :          * Avoid expensive equality check if the index is primary key or
     249             :          * replica identity index.
     250             :          */
     251      144170 :         if (!isIdxSafeToSkipDuplicates)
     252             :         {
     253          34 :             if (eq == NULL)
     254          34 :                 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     255             : 
     256          34 :             if (!tuples_equal(outslot, searchslot, eq))
     257           0 :                 continue;
     258             :         }
     259             : 
     260      144170 :         ExecMaterializeSlot(outslot);
     261             : 
     262      288340 :         xwait = TransactionIdIsValid(snap.xmin) ?
     263      144170 :             snap.xmin : snap.xmax;
     264             : 
     265             :         /*
     266             :          * If the tuple is locked, wait for locking transaction to finish and
     267             :          * retry.
     268             :          */
     269      144170 :         if (TransactionIdIsValid(xwait))
     270             :         {
     271           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     272           0 :             goto retry;
     273             :         }
     274             : 
     275             :         /* Found our tuple and it's not locked */
     276      144170 :         found = true;
     277      144170 :         break;
     278             :     }
     279             : 
     280             :     /* Found tuple, try to lock it in the lockmode. */
     281      144194 :     if (found)
     282             :     {
     283             :         TM_FailureData tmfd;
     284             :         TM_Result   res;
     285             : 
     286      144170 :         PushActiveSnapshot(GetLatestSnapshot());
     287             : 
     288      144170 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     289             :                                outslot,
     290             :                                GetCurrentCommandId(false),
     291             :                                lockmode,
     292             :                                LockWaitBlock,
     293             :                                0 /* don't follow updates */ ,
     294             :                                &tmfd);
     295             : 
     296      144170 :         PopActiveSnapshot();
     297             : 
     298      144170 :         if (should_refetch_tuple(res, &tmfd))
     299           0 :             goto retry;
     300             :     }
     301             : 
     302      144194 :     index_endscan(scan);
     303             : 
     304             :     /* Don't release lock until commit. */
     305      144194 :     index_close(idxrel, NoLock);
     306             : 
     307      144194 :     return found;
     308             : }
     309             : 
     310             : /*
     311             :  * Compare the tuples in the slots by checking if they have equal values.
     312             :  */
     313             : static bool
     314      210646 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     315             :              TypeCacheEntry **eq)
     316             : {
     317             :     int         attrnum;
     318             : 
     319             :     Assert(slot1->tts_tupleDescriptor->natts ==
     320             :            slot2->tts_tupleDescriptor->natts);
     321             : 
     322      210646 :     slot_getallattrs(slot1);
     323      210646 :     slot_getallattrs(slot2);
     324             : 
     325             :     /* Check equality of the attributes. */
     326      211046 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     327             :     {
     328             :         Form_pg_attribute att;
     329             :         TypeCacheEntry *typentry;
     330             : 
     331      210718 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     332             : 
     333             :         /*
     334             :          * Ignore dropped and generated columns as the publisher doesn't send
     335             :          * those
     336             :          */
     337      210718 :         if (att->attisdropped || att->attgenerated)
     338           2 :             continue;
     339             : 
     340             :         /*
     341             :          * If one value is NULL and other is not, then they are certainly not
     342             :          * equal
     343             :          */
     344      210716 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     345           0 :             return false;
     346             : 
     347             :         /*
     348             :          * If both are NULL, they can be considered equal.
     349             :          */
     350      210716 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     351           2 :             continue;
     352             : 
     353      210714 :         typentry = eq[attrnum];
     354      210714 :         if (typentry == NULL)
     355             :         {
     356         400 :             typentry = lookup_type_cache(att->atttypid,
     357             :                                          TYPECACHE_EQ_OPR_FINFO);
     358         400 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     359           0 :                 ereport(ERROR,
     360             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     361             :                          errmsg("could not identify an equality operator for type %s",
     362             :                                 format_type_be(att->atttypid))));
     363         400 :             eq[attrnum] = typentry;
     364             :         }
     365             : 
     366      210714 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     367             :                                             att->attcollation,
     368      210714 :                                             slot1->tts_values[attrnum],
     369      210714 :                                             slot2->tts_values[attrnum])))
     370      210318 :             return false;
     371             :     }
     372             : 
     373         328 :     return true;
     374             : }
     375             : 
     376             : /*
     377             :  * Search the relation 'rel' for tuple using the sequential scan.
     378             :  *
     379             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     380             :  * contents, and return true.  Return false otherwise.
     381             :  *
     382             :  * Note that this stops on the first matching tuple.
     383             :  *
     384             :  * This can obviously be quite slow on tables that have more than few rows.
     385             :  */
     386             : bool
     387         298 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     388             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     389             : {
     390             :     TupleTableSlot *scanslot;
     391             :     TableScanDesc scan;
     392             :     SnapshotData snap;
     393             :     TypeCacheEntry **eq;
     394             :     TransactionId xwait;
     395             :     bool        found;
     396         298 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     397             : 
     398             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     399             : 
     400         298 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     401             : 
     402             :     /* Start a heap scan. */
     403         298 :     InitDirtySnapshot(snap);
     404         298 :     scan = table_beginscan(rel, &snap, 0, NULL);
     405         298 :     scanslot = table_slot_create(rel, NULL);
     406             : 
     407         298 : retry:
     408         298 :     found = false;
     409             : 
     410         298 :     table_rescan(scan, NULL);
     411             : 
     412             :     /* Try to find the tuple */
     413      210616 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     414             :     {
     415      210612 :         if (!tuples_equal(scanslot, searchslot, eq))
     416      210318 :             continue;
     417             : 
     418         294 :         found = true;
     419         294 :         ExecCopySlot(outslot, scanslot);
     420             : 
     421         588 :         xwait = TransactionIdIsValid(snap.xmin) ?
     422         294 :             snap.xmin : snap.xmax;
     423             : 
     424             :         /*
     425             :          * If the tuple is locked, wait for locking transaction to finish and
     426             :          * retry.
     427             :          */
     428         294 :         if (TransactionIdIsValid(xwait))
     429             :         {
     430           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     431           0 :             goto retry;
     432             :         }
     433             : 
     434             :         /* Found our tuple and it's not locked */
     435         294 :         break;
     436             :     }
     437             : 
     438             :     /* Found tuple, try to lock it in the lockmode. */
     439         298 :     if (found)
     440             :     {
     441             :         TM_FailureData tmfd;
     442             :         TM_Result   res;
     443             : 
     444         294 :         PushActiveSnapshot(GetLatestSnapshot());
     445             : 
     446         294 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     447             :                                outslot,
     448             :                                GetCurrentCommandId(false),
     449             :                                lockmode,
     450             :                                LockWaitBlock,
     451             :                                0 /* don't follow updates */ ,
     452             :                                &tmfd);
     453             : 
     454         294 :         PopActiveSnapshot();
     455             : 
     456         294 :         if (should_refetch_tuple(res, &tmfd))
     457           0 :             goto retry;
     458             :     }
     459             : 
     460         298 :     table_endscan(scan);
     461         298 :     ExecDropSingleTupleTableSlot(scanslot);
     462             : 
     463         298 :     return found;
     464             : }
     465             : 
     466             : /*
     467             :  * Find the tuple that violates the passed unique index (conflictindex).
     468             :  *
     469             :  * If the conflicting tuple is found return true, otherwise false.
     470             :  *
     471             :  * We lock the tuple to avoid getting it deleted before the caller can fetch
     472             :  * the required information. Note that if the tuple is deleted before a lock
     473             :  * is acquired, we will retry to find the conflicting tuple again.
     474             :  */
     475             : static bool
     476          18 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
     477             :                   Oid conflictindex, TupleTableSlot *slot,
     478             :                   TupleTableSlot **conflictslot)
     479             : {
     480          18 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     481             :     ItemPointerData conflictTid;
     482             :     TM_FailureData tmfd;
     483             :     TM_Result   res;
     484             : 
     485          18 :     *conflictslot = NULL;
     486             : 
     487          18 : retry:
     488          18 :     if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
     489             :                                   &conflictTid, &slot->tts_tid,
     490          18 :                                   list_make1_oid(conflictindex)))
     491             :     {
     492           2 :         if (*conflictslot)
     493           0 :             ExecDropSingleTupleTableSlot(*conflictslot);
     494             : 
     495           2 :         *conflictslot = NULL;
     496           2 :         return false;
     497             :     }
     498             : 
     499          14 :     *conflictslot = table_slot_create(rel, NULL);
     500             : 
     501          14 :     PushActiveSnapshot(GetLatestSnapshot());
     502             : 
     503          14 :     res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
     504             :                            *conflictslot,
     505             :                            GetCurrentCommandId(false),
     506             :                            LockTupleShare,
     507             :                            LockWaitBlock,
     508             :                            0 /* don't follow updates */ ,
     509             :                            &tmfd);
     510             : 
     511          14 :     PopActiveSnapshot();
     512             : 
     513          14 :     if (should_refetch_tuple(res, &tmfd))
     514           0 :         goto retry;
     515             : 
     516          14 :     return true;
     517             : }
     518             : 
     519             : /*
     520             :  * Check all the unique indexes in 'recheckIndexes' for conflict with the
     521             :  * tuple in 'remoteslot' and report if found.
     522             :  */
     523             : static void
     524          18 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
     525             :                        ConflictType type, List *recheckIndexes,
     526             :                        TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
     527             : {
     528             :     /* Check all the unique indexes for a conflict */
     529          22 :     foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
     530             :     {
     531             :         TupleTableSlot *conflictslot;
     532             : 
     533          34 :         if (list_member_oid(recheckIndexes, uniqueidx) &&
     534          18 :             FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
     535             :                               &conflictslot))
     536             :         {
     537             :             RepOriginId origin;
     538             :             TimestampTz committs;
     539             :             TransactionId xmin;
     540             : 
     541          14 :             GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
     542          14 :             ReportApplyConflict(estate, resultRelInfo, ERROR, type,
     543             :                                 searchslot, conflictslot, remoteslot,
     544             :                                 uniqueidx, xmin, origin, committs);
     545             :         }
     546             :     }
     547           2 : }
     548             : 
     549             : /*
     550             :  * Insert tuple represented in the slot to the relation, update the indexes,
     551             :  * and execute any constraints and per-row triggers.
     552             :  *
     553             :  * Caller is responsible for opening the indexes.
     554             :  */
     555             : void
     556      152668 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     557             :                          EState *estate, TupleTableSlot *slot)
     558             : {
     559      152668 :     bool        skip_tuple = false;
     560      152668 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     561             : 
     562             :     /* For now we support only tables. */
     563             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     564             : 
     565      152668 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     566             : 
     567             :     /* BEFORE ROW INSERT Triggers */
     568      152668 :     if (resultRelInfo->ri_TrigDesc &&
     569          38 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     570             :     {
     571           6 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     572           2 :             skip_tuple = true;  /* "do nothing" */
     573             :     }
     574             : 
     575      152668 :     if (!skip_tuple)
     576             :     {
     577      152666 :         List       *recheckIndexes = NIL;
     578             :         List       *conflictindexes;
     579      152666 :         bool        conflict = false;
     580             : 
     581             :         /* Compute stored generated columns */
     582      152666 :         if (rel->rd_att->constr &&
     583       90920 :             rel->rd_att->constr->has_generated_stored)
     584           8 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     585             :                                        CMD_INSERT);
     586             : 
     587             :         /* Check the constraints of the tuple */
     588      152666 :         if (rel->rd_att->constr)
     589       90920 :             ExecConstraints(resultRelInfo, slot, estate);
     590      152666 :         if (rel->rd_rel->relispartition)
     591         120 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     592             : 
     593             :         /* OK, store the tuple and create index entries for it */
     594      152666 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     595             : 
     596      152666 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     597             : 
     598      152666 :         if (resultRelInfo->ri_NumIndices > 0)
     599      111994 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     600             :                                                    slot, estate, false,
     601             :                                                    conflictindexes ? true : false,
     602             :                                                    &conflict,
     603             :                                                    conflictindexes, false);
     604             : 
     605             :         /*
     606             :          * Checks the conflict indexes to fetch the conflicting local tuple
     607             :          * and reports the conflict. We perform this check here, instead of
     608             :          * performing an additional index scan before the actual insertion and
     609             :          * reporting the conflict if any conflicting tuples are found. This is
     610             :          * to avoid the overhead of executing the extra scan for each INSERT
     611             :          * operation, even when no conflict arises, which could introduce
     612             :          * significant overhead to replication, particularly in cases where
     613             :          * conflicts are rare.
     614             :          *
     615             :          * XXX OTOH, this could lead to clean-up effort for dead tuples added
     616             :          * in heap and index in case of conflicts. But as conflicts shouldn't
     617             :          * be a frequent thing so we preferred to save the performance
     618             :          * overhead of extra scan before each insertion.
     619             :          */
     620      152666 :         if (conflict)
     621          16 :             CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
     622             :                                    recheckIndexes, NULL, slot);
     623             : 
     624             :         /* AFTER ROW INSERT Triggers */
     625      152652 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     626             :                              recheckIndexes, NULL);
     627             : 
     628             :         /*
     629             :          * XXX we should in theory pass a TransitionCaptureState object to the
     630             :          * above to capture transition tuples, but after statement triggers
     631             :          * don't actually get fired by replication yet anyway
     632             :          */
     633             : 
     634      152652 :         list_free(recheckIndexes);
     635             :     }
     636      152654 : }
     637             : 
     638             : /*
     639             :  * Find the searchslot tuple and update it with data in the slot,
     640             :  * update the indexes, and execute any constraints and per-row triggers.
     641             :  *
     642             :  * Caller is responsible for opening the indexes.
     643             :  */
     644             : void
     645       63844 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     646             :                          EState *estate, EPQState *epqstate,
     647             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     648             : {
     649       63844 :     bool        skip_tuple = false;
     650       63844 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     651       63844 :     ItemPointer tid = &(searchslot->tts_tid);
     652             : 
     653             :     /*
     654             :      * We support only non-system tables, with
     655             :      * check_publication_add_relation() accountable.
     656             :      */
     657             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     658             :     Assert(!IsCatalogRelation(rel));
     659             : 
     660       63844 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     661             : 
     662             :     /* BEFORE ROW UPDATE Triggers */
     663       63844 :     if (resultRelInfo->ri_TrigDesc &&
     664          20 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     665             :     {
     666           6 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     667             :                                   tid, NULL, slot, NULL, NULL))
     668           4 :             skip_tuple = true;  /* "do nothing" */
     669             :     }
     670             : 
     671       63844 :     if (!skip_tuple)
     672             :     {
     673       63840 :         List       *recheckIndexes = NIL;
     674             :         TU_UpdateIndexes update_indexes;
     675             :         List       *conflictindexes;
     676       63840 :         bool        conflict = false;
     677             : 
     678             :         /* Compute stored generated columns */
     679       63840 :         if (rel->rd_att->constr &&
     680       63750 :             rel->rd_att->constr->has_generated_stored)
     681           4 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     682             :                                        CMD_UPDATE);
     683             : 
     684             :         /* Check the constraints of the tuple */
     685       63840 :         if (rel->rd_att->constr)
     686       63750 :             ExecConstraints(resultRelInfo, slot, estate);
     687       63840 :         if (rel->rd_rel->relispartition)
     688          24 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     689             : 
     690       63840 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     691             :                                   &update_indexes);
     692             : 
     693       63840 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     694             : 
     695       63840 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     696       40300 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     697             :                                                    slot, estate, true,
     698             :                                                    conflictindexes ? true : false,
     699             :                                                    &conflict, conflictindexes,
     700             :                                                    (update_indexes == TU_Summarizing));
     701             : 
     702             :         /*
     703             :          * Refer to the comments above the call to CheckAndReportConflict() in
     704             :          * ExecSimpleRelationInsert to understand why this check is done at
     705             :          * this point.
     706             :          */
     707       63840 :         if (conflict)
     708           2 :             CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
     709             :                                    recheckIndexes, searchslot, slot);
     710             : 
     711             :         /* AFTER ROW UPDATE Triggers */
     712       63838 :         ExecARUpdateTriggers(estate, resultRelInfo,
     713             :                              NULL, NULL,
     714             :                              tid, NULL, slot,
     715             :                              recheckIndexes, NULL, false);
     716             : 
     717       63838 :         list_free(recheckIndexes);
     718             :     }
     719       63842 : }
     720             : 
     721             : /*
     722             :  * Find the searchslot tuple and delete it, and execute any constraints
     723             :  * and per-row triggers.
     724             :  *
     725             :  * Caller is responsible for opening the indexes.
     726             :  */
     727             : void
     728       80620 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     729             :                          EState *estate, EPQState *epqstate,
     730             :                          TupleTableSlot *searchslot)
     731             : {
     732       80620 :     bool        skip_tuple = false;
     733       80620 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     734       80620 :     ItemPointer tid = &searchslot->tts_tid;
     735             : 
     736       80620 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     737             : 
     738             :     /* BEFORE ROW DELETE Triggers */
     739       80620 :     if (resultRelInfo->ri_TrigDesc &&
     740          20 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     741             :     {
     742           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     743           0 :                                            tid, NULL, NULL, NULL, NULL);
     744             :     }
     745             : 
     746       80620 :     if (!skip_tuple)
     747             :     {
     748             :         /* OK, delete the tuple */
     749       80620 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     750             : 
     751             :         /* AFTER ROW DELETE Triggers */
     752       80620 :         ExecARDeleteTriggers(estate, resultRelInfo,
     753             :                              tid, NULL, NULL, false);
     754             :     }
     755       80620 : }
     756             : 
     757             : /*
     758             :  * Check if command can be executed with current replica identity.
     759             :  */
     760             : void
     761      428504 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     762             : {
     763             :     PublicationDesc pubdesc;
     764             : 
     765             :     /*
     766             :      * Skip checking the replica identity for partitioned tables, because the
     767             :      * operations are actually performed on the leaf partitions.
     768             :      */
     769      428504 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     770      407072 :         return;
     771             : 
     772             :     /* We only need to do checks for UPDATE and DELETE. */
     773      422506 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     774      250580 :         return;
     775             : 
     776             :     /*
     777             :      * It is only safe to execute UPDATE/DELETE if the relation does not
     778             :      * publish UPDATEs or DELETEs, or all the following conditions are
     779             :      * satisfied:
     780             :      *
     781             :      * 1. All columns, referenced in the row filters from publications which
     782             :      * the relation is in, are valid - i.e. when all referenced columns are
     783             :      * part of REPLICA IDENTITY.
     784             :      *
     785             :      * 2. All columns, referenced in the column lists are valid - i.e. when
     786             :      * all columns referenced in the REPLICA IDENTITY are covered by the
     787             :      * column list.
     788             :      *
     789             :      * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
     790             :      * - i.e. when all these generated columns are published.
     791             :      *
     792             :      * XXX We could optimize it by first checking whether any of the
     793             :      * publications have a row filter or column list for this relation, or if
     794             :      * the relation contains a generated column. If none of these exist and
     795             :      * the relation has replica identity then we can avoid building the
     796             :      * descriptor but as this happens only one time it doesn't seem worth the
     797             :      * additional complexity.
     798             :      */
     799      171926 :     RelationBuildPublicationDesc(rel, &pubdesc);
     800      171926 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
     801          60 :         ereport(ERROR,
     802             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     803             :                  errmsg("cannot update table \"%s\"",
     804             :                         RelationGetRelationName(rel)),
     805             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     806      171866 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
     807         108 :         ereport(ERROR,
     808             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     809             :                  errmsg("cannot update table \"%s\"",
     810             :                         RelationGetRelationName(rel)),
     811             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     812      171758 :     else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
     813          12 :         ereport(ERROR,
     814             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     815             :                  errmsg("cannot update table \"%s\"",
     816             :                         RelationGetRelationName(rel)),
     817             :                  errdetail("Replica identity must not contain unpublished generated columns.")));
     818      171746 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
     819           0 :         ereport(ERROR,
     820             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     821             :                  errmsg("cannot delete from table \"%s\"",
     822             :                         RelationGetRelationName(rel)),
     823             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
     824      171746 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
     825           0 :         ereport(ERROR,
     826             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     827             :                  errmsg("cannot delete from table \"%s\"",
     828             :                         RelationGetRelationName(rel)),
     829             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
     830      171746 :     else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
     831           0 :         ereport(ERROR,
     832             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
     833             :                  errmsg("cannot delete from table \"%s\"",
     834             :                         RelationGetRelationName(rel)),
     835             :                  errdetail("Replica identity must not contain unpublished generated columns.")));
     836             : 
     837             :     /* If relation has replica identity we are always good. */
     838      171746 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
     839      150044 :         return;
     840             : 
     841             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
     842       21702 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
     843         450 :         return;
     844             : 
     845             :     /*
     846             :      * This is UPDATE/DELETE and there is no replica identity.
     847             :      *
     848             :      * Check if the table publishes UPDATES or DELETES.
     849             :      */
     850       21252 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
     851         106 :         ereport(ERROR,
     852             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     853             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     854             :                         RelationGetRelationName(rel)),
     855             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     856       21146 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
     857          10 :         ereport(ERROR,
     858             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     859             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     860             :                         RelationGetRelationName(rel)),
     861             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     862             : }
     863             : 
     864             : 
     865             : /*
     866             :  * Check if we support writing into specific relkind.
     867             :  *
     868             :  * The nspname and relname are only needed for error reporting.
     869             :  */
     870             : void
     871        1682 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     872             :                          const char *relname)
     873             : {
     874        1682 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
     875           0 :         ereport(ERROR,
     876             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     877             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     878             :                         nspname, relname),
     879             :                  errdetail_relkind_not_supported(relkind)));
     880        1682 : }

Generated by: LCOV version 1.14