LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 293 336 87.2 %
Date: 2026-02-07 07:17:36 Functions: 16 16 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/amapi.h"
      18             : #include "access/commit_ts.h"
      19             : #include "access/genam.h"
      20             : #include "access/gist.h"
      21             : #include "access/relscan.h"
      22             : #include "access/tableam.h"
      23             : #include "access/transam.h"
      24             : #include "access/xact.h"
      25             : #include "access/heapam.h"
      26             : #include "catalog/pg_am_d.h"
      27             : #include "commands/trigger.h"
      28             : #include "executor/executor.h"
      29             : #include "executor/nodeModifyTable.h"
      30             : #include "replication/conflict.h"
      31             : #include "replication/logicalrelation.h"
      32             : #include "storage/lmgr.h"
      33             : #include "utils/builtins.h"
      34             : #include "utils/lsyscache.h"
      35             : #include "utils/rel.h"
      36             : #include "utils/snapmgr.h"
      37             : #include "utils/syscache.h"
      38             : #include "utils/typcache.h"
      39             : 
      40             : 
      41             : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      42             :                          TypeCacheEntry **eq, Bitmapset *columns);
      43             : 
      44             : /*
      45             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      46             :  * is setup to match 'rel' (*NOT* idxrel!).
      47             :  *
      48             :  * Returns how many columns to use for the index scan.
      49             :  *
      50             :  * This is not a generic routine, idxrel must be PK, RI, or an index that can be
      51             :  * used for a REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      52             :  * for details.
      53             :  *
      54             :  * By definition, replication identity of a rel meets all limitations associated
      55             :  * with that. Note that any other index could also meet these limitations.
      56             :  */
      57             : static int
      58      144208 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      59             :                          TupleTableSlot *searchslot)
      60             : {
      61             :     int         index_attoff;
      62      144208 :     int         skey_attoff = 0;
      63             :     Datum       indclassDatum;
      64             :     oidvector  *opclass;
      65      144208 :     int2vector *indkey = &idxrel->rd_index->indkey;
      66             : 
      67      144208 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
      68             :                                            Anum_pg_index_indclass);
      69      144208 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      70             : 
      71             :     /* Build scankey for every non-expression attribute in the index. */
      72      288462 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
      73      144254 :          index_attoff++)
      74             :     {
      75             :         Oid         operator;
      76             :         Oid         optype;
      77             :         Oid         opfamily;
      78             :         RegProcedure regop;
      79      144254 :         int         table_attno = indkey->values[index_attoff];
      80             :         StrategyNumber eq_strategy;
      81             : 
      82      144254 :         if (!AttributeNumberIsValid(table_attno))
      83             :         {
      84             :             /*
      85             :              * XXX: Currently, we don't support expressions in the scan key,
      86             :              * see code below.
      87             :              */
      88           4 :             continue;
      89             :         }
      90             : 
      91             :         /*
      92             :          * Load the operator info.  We need this to get the equality operator
      93             :          * function for the scan key.
      94             :          */
      95      144250 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
      96      144250 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
      97      144250 :         eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
      98      144250 :         operator = get_opfamily_member(opfamily, optype,
      99             :                                        optype,
     100             :                                        eq_strategy);
     101             : 
     102      144250 :         if (!OidIsValid(operator))
     103           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     104             :                  eq_strategy, optype, optype, opfamily);
     105             : 
     106      144250 :         regop = get_opcode(operator);
     107             : 
     108             :         /* Initialize the scankey. */
     109      144250 :         ScanKeyInit(&skey[skey_attoff],
     110      144250 :                     index_attoff + 1,
     111             :                     eq_strategy,
     112             :                     regop,
     113      144250 :                     searchslot->tts_values[table_attno - 1]);
     114             : 
     115      144250 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     116             : 
     117             :         /* Check for null value. */
     118      144250 :         if (searchslot->tts_isnull[table_attno - 1])
     119           2 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     120             : 
     121      144250 :         skey_attoff++;
     122             :     }
     123             : 
     124             :     /* There must always be at least one attribute for the index scan. */
     125             :     Assert(skey_attoff > 0);
     126             : 
     127      144208 :     return skey_attoff;
     128             : }
     129             : 
     130             : 
     131             : /*
     132             :  * Helper function to check if it is necessary to re-fetch and lock the tuple
     133             :  * due to concurrent modifications. This function should be called after
     134             :  * invoking table_tuple_lock.
     135             :  */
     136             : static bool
     137      144554 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
     138             : {
     139      144554 :     bool        refetch = false;
     140             : 
     141      144554 :     switch (res)
     142             :     {
     143      144554 :         case TM_Ok:
     144      144554 :             break;
     145           0 :         case TM_Updated:
     146             :             /* XXX: Improve handling here */
     147           0 :             if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
     148           0 :                 ereport(LOG,
     149             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     150             :                          errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     151             :             else
     152           0 :                 ereport(LOG,
     153             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     154             :                          errmsg("concurrent update, retrying")));
     155           0 :             refetch = true;
     156           0 :             break;
     157           0 :         case TM_Deleted:
     158             :             /* XXX: Improve handling here */
     159           0 :             ereport(LOG,
     160             :                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     161             :                      errmsg("concurrent delete, retrying")));
     162           0 :             refetch = true;
     163           0 :             break;
     164           0 :         case TM_Invisible:
     165           0 :             elog(ERROR, "attempted to lock invisible tuple");
     166             :             break;
     167           0 :         default:
     168           0 :             elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     169             :             break;
     170             :     }
     171             : 
     172      144554 :     return refetch;
     173             : }
     174             : 
     175             : /*
     176             :  * Search the relation 'rel' for tuple using the index.
     177             :  *
     178             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     179             :  * contents, and return true.  Return false otherwise.
     180             :  */
     181             : bool
     182      144206 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     183             :                              LockTupleMode lockmode,
     184             :                              TupleTableSlot *searchslot,
     185             :                              TupleTableSlot *outslot)
     186             : {
     187             :     ScanKeyData skey[INDEX_MAX_KEYS];
     188             :     int         skey_attoff;
     189             :     IndexScanDesc scan;
     190             :     SnapshotData snap;
     191             :     TransactionId xwait;
     192             :     Relation    idxrel;
     193             :     bool        found;
     194      144206 :     TypeCacheEntry **eq = NULL;
     195             :     bool        isIdxSafeToSkipDuplicates;
     196             : 
     197             :     /* Open the index. */
     198      144206 :     idxrel = index_open(idxoid, RowExclusiveLock);
     199             : 
     200      144206 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     201             : 
     202      144206 :     InitDirtySnapshot(snap);
     203             : 
     204             :     /* Build scan key. */
     205      144206 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     206             : 
     207             :     /* Start an index scan. */
     208      144206 :     scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
     209             : 
     210           0 : retry:
     211      144206 :     found = false;
     212             : 
     213      144206 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     214             : 
     215             :     /* Try to find the tuple */
     216      144206 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     217             :     {
     218             :         /*
     219             :          * Avoid expensive equality check if the index is primary key or
     220             :          * replica identity index.
     221             :          */
     222      144178 :         if (!isIdxSafeToSkipDuplicates)
     223             :         {
     224          34 :             if (eq == NULL)
     225          34 :                 eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
     226             : 
     227          34 :             if (!tuples_equal(outslot, searchslot, eq, NULL))
     228           0 :                 continue;
     229             :         }
     230             : 
     231      144178 :         ExecMaterializeSlot(outslot);
     232             : 
     233      288356 :         xwait = TransactionIdIsValid(snap.xmin) ?
     234      144178 :             snap.xmin : snap.xmax;
     235             : 
     236             :         /*
     237             :          * If the tuple is locked, wait for locking transaction to finish and
     238             :          * retry.
     239             :          */
     240      144178 :         if (TransactionIdIsValid(xwait))
     241             :         {
     242           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     243           0 :             goto retry;
     244             :         }
     245             : 
     246             :         /* Found our tuple and it's not locked */
     247      144178 :         found = true;
     248      144178 :         break;
     249             :     }
     250             : 
     251             :     /* Found tuple, try to lock it in the lockmode. */
     252      144206 :     if (found)
     253             :     {
     254             :         TM_FailureData tmfd;
     255             :         TM_Result   res;
     256             : 
     257      144178 :         PushActiveSnapshot(GetLatestSnapshot());
     258             : 
     259      144178 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     260             :                                outslot,
     261             :                                GetCurrentCommandId(false),
     262             :                                lockmode,
     263             :                                LockWaitBlock,
     264             :                                0 /* don't follow updates */ ,
     265             :                                &tmfd);
     266             : 
     267      144178 :         PopActiveSnapshot();
     268             : 
     269      144178 :         if (should_refetch_tuple(res, &tmfd))
     270           0 :             goto retry;
     271             :     }
     272             : 
     273      144206 :     index_endscan(scan);
     274             : 
     275             :     /* Don't release lock until commit. */
     276      144206 :     index_close(idxrel, NoLock);
     277             : 
     278      144206 :     return found;
     279             : }
     280             : 
     281             : /*
     282             :  * Compare the tuples in the slots by checking if they have equal values.
     283             :  *
     284             :  * If 'columns' is not null, only the columns specified within it will be
     285             :  * considered for the equality check, ignoring all other columns.
     286             :  */
     287             : static bool
     288      210648 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     289             :              TypeCacheEntry **eq, Bitmapset *columns)
     290             : {
     291             :     int         attrnum;
     292             : 
     293             :     Assert(slot1->tts_tupleDescriptor->natts ==
     294             :            slot2->tts_tupleDescriptor->natts);
     295             : 
     296      210648 :     slot_getallattrs(slot1);
     297      210648 :     slot_getallattrs(slot2);
     298             : 
     299             :     /* Check equality of the attributes. */
     300      211052 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     301             :     {
     302             :         Form_pg_attribute att;
     303             :         TypeCacheEntry *typentry;
     304             : 
     305      210724 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     306             : 
     307             :         /*
     308             :          * Ignore dropped and generated columns as the publisher doesn't send
     309             :          * those
     310             :          */
     311      210724 :         if (att->attisdropped || att->attgenerated)
     312           2 :             continue;
     313             : 
     314             :         /*
     315             :          * Ignore columns that are not listed for checking.
     316             :          */
     317      210722 :         if (columns &&
     318           0 :             !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     319             :                            columns))
     320           0 :             continue;
     321             : 
     322             :         /*
     323             :          * If one value is NULL and other is not, then they are certainly not
     324             :          * equal
     325             :          */
     326      210722 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     327           0 :             return false;
     328             : 
     329             :         /*
     330             :          * If both are NULL, they can be considered equal.
     331             :          */
     332      210722 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     333           2 :             continue;
     334             : 
     335      210720 :         typentry = eq[attrnum];
     336      210720 :         if (typentry == NULL)
     337             :         {
     338         404 :             typentry = lookup_type_cache(att->atttypid,
     339             :                                          TYPECACHE_EQ_OPR_FINFO);
     340         404 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     341           0 :                 ereport(ERROR,
     342             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     343             :                          errmsg("could not identify an equality operator for type %s",
     344             :                                 format_type_be(att->atttypid))));
     345         404 :             eq[attrnum] = typentry;
     346             :         }
     347             : 
     348      210720 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     349             :                                             att->attcollation,
     350      210720 :                                             slot1->tts_values[attrnum],
     351      210720 :                                             slot2->tts_values[attrnum])))
     352      210320 :             return false;
     353             :     }
     354             : 
     355         328 :     return true;
     356             : }
     357             : 
     358             : /*
     359             :  * Search the relation 'rel' for tuple using the sequential scan.
     360             :  *
     361             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     362             :  * contents, and return true.  Return false otherwise.
     363             :  *
     364             :  * Note that this stops on the first matching tuple.
     365             :  *
     366             :  * This can obviously be quite slow on tables that have more than few rows.
     367             :  */
     368             : bool
     369         298 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     370             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     371             : {
     372             :     TupleTableSlot *scanslot;
     373             :     TableScanDesc scan;
     374             :     SnapshotData snap;
     375             :     TypeCacheEntry **eq;
     376             :     TransactionId xwait;
     377             :     bool        found;
     378         298 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     379             : 
     380             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     381             : 
     382         298 :     eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
     383             : 
     384             :     /* Start a heap scan. */
     385         298 :     InitDirtySnapshot(snap);
     386         298 :     scan = table_beginscan(rel, &snap, 0, NULL);
     387         298 :     scanslot = table_slot_create(rel, NULL);
     388             : 
     389           0 : retry:
     390         298 :     found = false;
     391             : 
     392         298 :     table_rescan(scan, NULL);
     393             : 
     394             :     /* Try to find the tuple */
     395      210616 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     396             :     {
     397      210608 :         if (!tuples_equal(scanslot, searchslot, eq, NULL))
     398      210318 :             continue;
     399             : 
     400         290 :         found = true;
     401         290 :         ExecCopySlot(outslot, scanslot);
     402             : 
     403         580 :         xwait = TransactionIdIsValid(snap.xmin) ?
     404         290 :             snap.xmin : snap.xmax;
     405             : 
     406             :         /*
     407             :          * If the tuple is locked, wait for locking transaction to finish and
     408             :          * retry.
     409             :          */
     410         290 :         if (TransactionIdIsValid(xwait))
     411             :         {
     412           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     413           0 :             goto retry;
     414             :         }
     415             : 
     416             :         /* Found our tuple and it's not locked */
     417         290 :         break;
     418             :     }
     419             : 
     420             :     /* Found tuple, try to lock it in the lockmode. */
     421         298 :     if (found)
     422             :     {
     423             :         TM_FailureData tmfd;
     424             :         TM_Result   res;
     425             : 
     426         290 :         PushActiveSnapshot(GetLatestSnapshot());
     427             : 
     428         290 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     429             :                                outslot,
     430             :                                GetCurrentCommandId(false),
     431             :                                lockmode,
     432             :                                LockWaitBlock,
     433             :                                0 /* don't follow updates */ ,
     434             :                                &tmfd);
     435             : 
     436         290 :         PopActiveSnapshot();
     437             : 
     438         290 :         if (should_refetch_tuple(res, &tmfd))
     439           0 :             goto retry;
     440             :     }
     441             : 
     442         298 :     table_endscan(scan);
     443         298 :     ExecDropSingleTupleTableSlot(scanslot);
     444             : 
     445         298 :     return found;
     446             : }
     447             : 
     448             : /*
     449             :  * Build additional index information necessary for conflict detection.
     450             :  */
     451             : static void
     452          92 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
     453             : {
     454         268 :     for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
     455             :     {
     456         176 :         Relation    indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
     457         176 :         IndexInfo  *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
     458             : 
     459         176 :         if (conflictindex != RelationGetRelid(indexRelation))
     460          84 :             continue;
     461             : 
     462             :         /*
     463             :          * This Assert will fail if BuildSpeculativeIndexInfo() is called
     464             :          * twice for the given index.
     465             :          */
     466             :         Assert(indexRelationInfo->ii_UniqueOps == NULL);
     467             : 
     468          92 :         BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
     469             :     }
     470          92 : }
     471             : 
     472             : /*
     473             :  * If the tuple is recently dead and was deleted by a transaction with a newer
     474             :  * commit timestamp than previously recorded, update the associated transaction
     475             :  * ID, commit time, and origin. This helps ensure that conflict detection uses
     476             :  * the most recent and relevant deletion metadata.
     477             :  */
     478             : static void
     479           6 : update_most_recent_deletion_info(TupleTableSlot *scanslot,
     480             :                                  TransactionId oldestxmin,
     481             :                                  TransactionId *delete_xid,
     482             :                                  TimestampTz *delete_time,
     483             :                                  ReplOriginId *delete_origin)
     484             : {
     485             :     BufferHeapTupleTableSlot *hslot;
     486             :     HeapTuple   tuple;
     487             :     Buffer      buf;
     488           6 :     bool        recently_dead = false;
     489             :     TransactionId xmax;
     490             :     TimestampTz localts;
     491             :     ReplOriginId localorigin;
     492             : 
     493           6 :     hslot = (BufferHeapTupleTableSlot *) scanslot;
     494             : 
     495           6 :     tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
     496           6 :     buf = hslot->buffer;
     497             : 
     498           6 :     LockBuffer(buf, BUFFER_LOCK_SHARE);
     499             : 
     500             :     /*
     501             :      * We do not consider HEAPTUPLE_DEAD status because it indicates either
     502             :      * tuples whose inserting transaction was aborted (meaning there is no
     503             :      * commit timestamp or origin), or tuples deleted by a transaction older
     504             :      * than oldestxmin, making it safe to ignore them during conflict
     505             :      * detection (See comments atop worker.c for details).
     506             :      */
     507           6 :     if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
     508           6 :         recently_dead = true;
     509             : 
     510           6 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     511             : 
     512           6 :     if (!recently_dead)
     513           0 :         return;
     514             : 
     515           6 :     xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
     516           6 :     if (!TransactionIdIsValid(xmax))
     517           0 :         return;
     518             : 
     519             :     /* Select the dead tuple with the most recent commit timestamp */
     520          12 :     if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
     521           6 :         TimestampDifferenceExceeds(*delete_time, localts, 0))
     522             :     {
     523           6 :         *delete_xid = xmax;
     524           6 :         *delete_time = localts;
     525           6 :         *delete_origin = localorigin;
     526             :     }
     527             : }
     528             : 
     529             : /*
     530             :  * Searches the relation 'rel' for the most recently deleted tuple that matches
     531             :  * the values in 'searchslot' and is not yet removable by VACUUM. The function
     532             :  * returns the transaction ID, origin, and commit timestamp of the transaction
     533             :  * that deleted this tuple.
     534             :  *
     535             :  * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
     536             :  * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
     537             :  * conflict detection.
     538             :  *
     539             :  * Instead of stopping at the first match, we scan all matching dead tuples to
     540             :  * identify most recent deletion. This is crucial because only the latest
     541             :  * deletion is relevant for resolving conflicts.
     542             :  *
     543             :  * For example, consider a scenario on the subscriber where a row is deleted,
     544             :  * re-inserted, and then deleted again only on the subscriber:
     545             :  *
     546             :  *   - (pk, 1) - deleted at 9:00,
     547             :  *   - (pk, 1) - deleted at 9:02,
     548             :  *
     549             :  * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
     550             :  *
     551             :  * If we mistakenly return the older deletion (9:00), the system may wrongly
     552             :  * apply the remote update using a last-update-wins strategy. Instead, we must
     553             :  * recognize the more recent deletion at 9:02 and skip the update. See
     554             :  * comments atop worker.c for details. Note, as of now, conflict resolution
     555             :  * is not implemented. Consequently, the system may incorrectly report the
     556             :  * older tuple as the conflicted one, leading to misleading results.
     557             :  *
     558             :  * The commit timestamp of the deleting transaction is used to determine which
     559             :  * tuple was deleted most recently.
     560             :  */
     561             : bool
     562           4 : RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
     563             :                                 TransactionId oldestxmin,
     564             :                                 TransactionId *delete_xid,
     565             :                                 ReplOriginId *delete_origin,
     566             :                                 TimestampTz *delete_time)
     567             : {
     568             :     TupleTableSlot *scanslot;
     569             :     TableScanDesc scan;
     570             :     TypeCacheEntry **eq;
     571             :     Bitmapset  *indexbitmap;
     572           4 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     573             : 
     574             :     Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     575             : 
     576           4 :     *delete_xid = InvalidTransactionId;
     577           4 :     *delete_origin = InvalidReplOriginId;
     578           4 :     *delete_time = 0;
     579             : 
     580             :     /*
     581             :      * If the relation has a replica identity key or a primary key that is
     582             :      * unusable for locating deleted tuples (see
     583             :      * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
     584             :      * necessary. In such cases, comparing the entire tuple is not required,
     585             :      * since the remote tuple might not include all column values. Instead,
     586             :      * the indexed columns alone are sufficient to identify the target tuple
     587             :      * (see logicalrep_rel_mark_updatable).
     588             :      */
     589           4 :     indexbitmap = RelationGetIndexAttrBitmap(rel,
     590             :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
     591             : 
     592             :     /* fallback to PK if no replica identity */
     593           4 :     if (!indexbitmap)
     594           4 :         indexbitmap = RelationGetIndexAttrBitmap(rel,
     595             :                                                  INDEX_ATTR_BITMAP_PRIMARY_KEY);
     596             : 
     597           4 :     eq = palloc0_array(TypeCacheEntry *, searchslot->tts_tupleDescriptor->natts);
     598             : 
     599             :     /*
     600             :      * Start a heap scan using SnapshotAny to identify dead tuples that are
     601             :      * not visible under a standard MVCC snapshot. Tuples from transactions
     602             :      * not yet committed or those just committed prior to the scan are
     603             :      * excluded in update_most_recent_deletion_info().
     604             :      */
     605           4 :     scan = table_beginscan(rel, SnapshotAny, 0, NULL);
     606           4 :     scanslot = table_slot_create(rel, NULL);
     607             : 
     608           4 :     table_rescan(scan, NULL);
     609             : 
     610             :     /* Try to find the tuple */
     611          10 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     612             :     {
     613           6 :         if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
     614           2 :             continue;
     615             : 
     616           4 :         update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     617             :                                          delete_time, delete_origin);
     618             :     }
     619             : 
     620           4 :     table_endscan(scan);
     621           4 :     ExecDropSingleTupleTableSlot(scanslot);
     622             : 
     623           4 :     return *delete_time != 0;
     624             : }
     625             : 
     626             : /*
     627             :  * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
     628             :  * the deleted tuple.
     629             :  */
     630             : bool
     631           2 : RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
     632             :                                     TupleTableSlot *searchslot,
     633             :                                     TransactionId oldestxmin,
     634             :                                     TransactionId *delete_xid,
     635             :                                     ReplOriginId *delete_origin,
     636             :                                     TimestampTz *delete_time)
     637             : {
     638             :     Relation    idxrel;
     639             :     ScanKeyData skey[INDEX_MAX_KEYS];
     640             :     int         skey_attoff;
     641             :     IndexScanDesc scan;
     642             :     TupleTableSlot *scanslot;
     643           2 :     TypeCacheEntry **eq = NULL;
     644             :     bool        isIdxSafeToSkipDuplicates;
     645           2 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     646             : 
     647             :     Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     648             :     Assert(OidIsValid(idxoid));
     649             : 
     650           2 :     *delete_xid = InvalidTransactionId;
     651           2 :     *delete_time = 0;
     652           2 :     *delete_origin = InvalidReplOriginId;
     653             : 
     654           2 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     655             : 
     656           2 :     scanslot = table_slot_create(rel, NULL);
     657             : 
     658           2 :     idxrel = index_open(idxoid, RowExclusiveLock);
     659             : 
     660             :     /* Build scan key. */
     661           2 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     662             : 
     663             :     /*
     664             :      * Start an index scan using SnapshotAny to identify dead tuples that are
     665             :      * not visible under a standard MVCC snapshot. Tuples from transactions
     666             :      * not yet committed or those just committed prior to the scan are
     667             :      * excluded in update_most_recent_deletion_info().
     668             :      */
     669           2 :     scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
     670             : 
     671           2 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     672             : 
     673             :     /* Try to find the tuple */
     674           4 :     while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
     675             :     {
     676             :         /*
     677             :          * Avoid expensive equality check if the index is primary key or
     678             :          * replica identity index.
     679             :          */
     680           2 :         if (!isIdxSafeToSkipDuplicates)
     681             :         {
     682           0 :             if (eq == NULL)
     683           0 :                 eq = palloc0_array(TypeCacheEntry *, scanslot->tts_tupleDescriptor->natts);
     684             : 
     685           0 :             if (!tuples_equal(scanslot, searchslot, eq, NULL))
     686           0 :                 continue;
     687             :         }
     688             : 
     689           2 :         update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     690             :                                          delete_time, delete_origin);
     691             :     }
     692             : 
     693           2 :     index_endscan(scan);
     694             : 
     695           2 :     index_close(idxrel, NoLock);
     696             : 
     697           2 :     ExecDropSingleTupleTableSlot(scanslot);
     698             : 
     699           2 :     return *delete_time != 0;
     700             : }
     701             : 
     702             : /*
     703             :  * Find the tuple that violates the passed unique index (conflictindex).
     704             :  *
     705             :  * If the conflicting tuple is found return true, otherwise false.
     706             :  *
     707             :  * We lock the tuple to avoid getting it deleted before the caller can fetch
     708             :  * the required information. Note that if the tuple is deleted before a lock
     709             :  * is acquired, we will retry to find the conflicting tuple again.
     710             :  */
     711             : static bool
     712          92 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
     713             :                   Oid conflictindex, TupleTableSlot *slot,
     714             :                   TupleTableSlot **conflictslot)
     715             : {
     716          92 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     717             :     ItemPointerData conflictTid;
     718             :     TM_FailureData tmfd;
     719             :     TM_Result   res;
     720             : 
     721          92 :     *conflictslot = NULL;
     722             : 
     723             :     /*
     724             :      * Build additional information required to check constraints violations.
     725             :      * See check_exclusion_or_unique_constraint().
     726             :      */
     727          92 :     BuildConflictIndexInfo(resultRelInfo, conflictindex);
     728             : 
     729          92 : retry:
     730          88 :     if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
     731          92 :                                   &conflictTid, &slot->tts_tid,
     732          92 :                                   list_make1_oid(conflictindex)))
     733             :     {
     734           2 :         if (*conflictslot)
     735           0 :             ExecDropSingleTupleTableSlot(*conflictslot);
     736             : 
     737           2 :         *conflictslot = NULL;
     738           2 :         return false;
     739             :     }
     740             : 
     741          86 :     *conflictslot = table_slot_create(rel, NULL);
     742             : 
     743          86 :     PushActiveSnapshot(GetLatestSnapshot());
     744             : 
     745          86 :     res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
     746             :                            *conflictslot,
     747             :                            GetCurrentCommandId(false),
     748             :                            LockTupleShare,
     749             :                            LockWaitBlock,
     750             :                            0 /* don't follow updates */ ,
     751             :                            &tmfd);
     752             : 
     753          86 :     PopActiveSnapshot();
     754             : 
     755          86 :     if (should_refetch_tuple(res, &tmfd))
     756           0 :         goto retry;
     757             : 
     758          86 :     return true;
     759             : }
     760             : 
     761             : /*
     762             :  * Check all the unique indexes in 'recheckIndexes' for conflict with the
     763             :  * tuple in 'remoteslot' and report if found.
     764             :  */
     765             : static void
     766          54 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
     767             :                        ConflictType type, List *recheckIndexes,
     768             :                        TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
     769             : {
     770          54 :     List       *conflicttuples = NIL;
     771             :     TupleTableSlot *conflictslot;
     772             : 
     773             :     /* Check all the unique indexes for conflicts */
     774         192 :     foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
     775             :     {
     776         180 :         if (list_member_oid(recheckIndexes, uniqueidx) &&
     777          92 :             FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
     778             :                               &conflictslot))
     779             :         {
     780          86 :             ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
     781             : 
     782          86 :             conflicttuple->slot = conflictslot;
     783          86 :             conflicttuple->indexoid = uniqueidx;
     784             : 
     785          86 :             GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
     786             :                                     &conflicttuple->origin, &conflicttuple->ts);
     787             : 
     788          86 :             conflicttuples = lappend(conflicttuples, conflicttuple);
     789             :         }
     790             :     }
     791             : 
     792             :     /* Report the conflict, if found */
     793          50 :     if (conflicttuples)
     794          48 :         ReportApplyConflict(estate, resultRelInfo, ERROR,
     795          48 :                             list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
     796             :                             searchslot, remoteslot, conflicttuples);
     797           2 : }
     798             : 
     799             : /*
     800             :  * Insert tuple represented in the slot to the relation, update the indexes,
     801             :  * and execute any constraints and per-row triggers.
     802             :  *
     803             :  * Caller is responsible for opening the indexes.
     804             :  */
     805             : void
     806      162518 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     807             :                          EState *estate, TupleTableSlot *slot)
     808             : {
     809      162518 :     bool        skip_tuple = false;
     810      162518 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     811             : 
     812             :     /* For now we support only tables. */
     813             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     814             : 
     815      162518 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     816             : 
     817             :     /* BEFORE ROW INSERT Triggers */
     818      162518 :     if (resultRelInfo->ri_TrigDesc &&
     819          40 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     820             :     {
     821           6 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     822           2 :             skip_tuple = true;  /* "do nothing" */
     823             :     }
     824             : 
     825      162518 :     if (!skip_tuple)
     826             :     {
     827      162516 :         List       *recheckIndexes = NIL;
     828             :         List       *conflictindexes;
     829      162516 :         bool        conflict = false;
     830             : 
     831             :         /* Compute stored generated columns */
     832      162516 :         if (rel->rd_att->constr &&
     833       90984 :             rel->rd_att->constr->has_generated_stored)
     834           8 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     835             :                                        CMD_INSERT);
     836             : 
     837             :         /* Check the constraints of the tuple */
     838      162516 :         if (rel->rd_att->constr)
     839       90984 :             ExecConstraints(resultRelInfo, slot, estate);
     840      162516 :         if (rel->rd_rel->relispartition)
     841         150 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     842             : 
     843             :         /* OK, store the tuple and create index entries for it */
     844      162516 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     845             : 
     846      162516 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     847             : 
     848      162516 :         if (resultRelInfo->ri_NumIndices > 0)
     849      121838 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     850             :                                                    slot, estate, false,
     851             :                                                    conflictindexes ? true : false,
     852             :                                                    &conflict,
     853             :                                                    conflictindexes, false);
     854             : 
     855             :         /*
     856             :          * Checks the conflict indexes to fetch the conflicting local row and
     857             :          * reports the conflict. We perform this check here, instead of
     858             :          * performing an additional index scan before the actual insertion and
     859             :          * reporting the conflict if any conflicting rows are found. This is
     860             :          * to avoid the overhead of executing the extra scan for each INSERT
     861             :          * operation, even when no conflict arises, which could introduce
     862             :          * significant overhead to replication, particularly in cases where
     863             :          * conflicts are rare.
     864             :          *
     865             :          * XXX OTOH, this could lead to clean-up effort for dead tuples added
     866             :          * in heap and index in case of conflicts. But as conflicts shouldn't
     867             :          * be a frequent thing so we preferred to save the performance
     868             :          * overhead of extra scan before each insertion.
     869             :          */
     870      162516 :         if (conflict)
     871          50 :             CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
     872             :                                    recheckIndexes, NULL, slot);
     873             : 
     874             :         /* AFTER ROW INSERT Triggers */
     875      162468 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     876             :                              recheckIndexes, NULL);
     877             : 
     878             :         /*
     879             :          * XXX we should in theory pass a TransitionCaptureState object to the
     880             :          * above to capture transition tuples, but after statement triggers
     881             :          * don't actually get fired by replication yet anyway
     882             :          */
     883             : 
     884      162468 :         list_free(recheckIndexes);
     885             :     }
     886      162470 : }
     887             : 
     888             : /*
     889             :  * Find the searchslot tuple and update it with data in the slot,
     890             :  * update the indexes, and execute any constraints and per-row triggers.
     891             :  *
     892             :  * Caller is responsible for opening the indexes.
     893             :  */
     894             : void
     895       63844 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     896             :                          EState *estate, EPQState *epqstate,
     897             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     898             : {
     899       63844 :     bool        skip_tuple = false;
     900       63844 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     901       63844 :     ItemPointer tid = &(searchslot->tts_tid);
     902             : 
     903             :     /*
     904             :      * We support only non-system tables, with
     905             :      * check_publication_add_relation() accountable.
     906             :      */
     907             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     908             :     Assert(!IsCatalogRelation(rel));
     909             : 
     910       63844 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     911             : 
     912             :     /* BEFORE ROW UPDATE Triggers */
     913       63844 :     if (resultRelInfo->ri_TrigDesc &&
     914          20 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     915             :     {
     916           6 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     917             :                                   tid, NULL, slot, NULL, NULL, false))
     918           4 :             skip_tuple = true;  /* "do nothing" */
     919             :     }
     920             : 
     921       63844 :     if (!skip_tuple)
     922             :     {
     923       63840 :         List       *recheckIndexes = NIL;
     924             :         TU_UpdateIndexes update_indexes;
     925             :         List       *conflictindexes;
     926       63840 :         bool        conflict = false;
     927             : 
     928             :         /* Compute stored generated columns */
     929       63840 :         if (rel->rd_att->constr &&
     930       63752 :             rel->rd_att->constr->has_generated_stored)
     931           4 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     932             :                                        CMD_UPDATE);
     933             : 
     934             :         /* Check the constraints of the tuple */
     935       63840 :         if (rel->rd_att->constr)
     936       63752 :             ExecConstraints(resultRelInfo, slot, estate);
     937       63840 :         if (rel->rd_rel->relispartition)
     938          24 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     939             : 
     940       63840 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     941             :                                   &update_indexes);
     942             : 
     943       63840 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     944             : 
     945       63840 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     946       40358 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     947             :                                                    slot, estate, true,
     948             :                                                    conflictindexes ? true : false,
     949             :                                                    &conflict, conflictindexes,
     950             :                                                    (update_indexes == TU_Summarizing));
     951             : 
     952             :         /*
     953             :          * Refer to the comments above the call to CheckAndReportConflict() in
     954             :          * ExecSimpleRelationInsert to understand why this check is done at
     955             :          * this point.
     956             :          */
     957       63840 :         if (conflict)
     958           4 :             CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
     959             :                                    recheckIndexes, searchslot, slot);
     960             : 
     961             :         /* AFTER ROW UPDATE Triggers */
     962       63836 :         ExecARUpdateTriggers(estate, resultRelInfo,
     963             :                              NULL, NULL,
     964             :                              tid, NULL, slot,
     965             :                              recheckIndexes, NULL, false);
     966             : 
     967       63836 :         list_free(recheckIndexes);
     968             :     }
     969       63840 : }
     970             : 
     971             : /*
     972             :  * Find the searchslot tuple and delete it, and execute any constraints
     973             :  * and per-row triggers.
     974             :  *
     975             :  * Caller is responsible for opening the indexes.
     976             :  */
     977             : void
     978       80624 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     979             :                          EState *estate, EPQState *epqstate,
     980             :                          TupleTableSlot *searchslot)
     981             : {
     982       80624 :     bool        skip_tuple = false;
     983       80624 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     984       80624 :     ItemPointer tid = &searchslot->tts_tid;
     985             : 
     986       80624 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     987             : 
     988             :     /* BEFORE ROW DELETE Triggers */
     989       80624 :     if (resultRelInfo->ri_TrigDesc &&
     990          20 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     991             :     {
     992           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     993           0 :                                            tid, NULL, NULL, NULL, NULL, false);
     994             :     }
     995             : 
     996       80624 :     if (!skip_tuple)
     997             :     {
     998             :         /* OK, delete the tuple */
     999       80624 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
    1000             : 
    1001             :         /* AFTER ROW DELETE Triggers */
    1002       80624 :         ExecARDeleteTriggers(estate, resultRelInfo,
    1003             :                              tid, NULL, NULL, false);
    1004             :     }
    1005       80624 : }
    1006             : 
    1007             : /*
    1008             :  * Check if command can be executed with current replica identity.
    1009             :  */
    1010             : void
    1011      439918 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
    1012             : {
    1013             :     PublicationDesc pubdesc;
    1014             : 
    1015             :     /*
    1016             :      * Skip checking the replica identity for partitioned tables, because the
    1017             :      * operations are actually performed on the leaf partitions.
    1018             :      */
    1019      439918 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    1020      415434 :         return;
    1021             : 
    1022             :     /* We only need to do checks for UPDATE and DELETE. */
    1023      435084 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
    1024      258448 :         return;
    1025             : 
    1026             :     /*
    1027             :      * It is only safe to execute UPDATE/DELETE if the relation does not
    1028             :      * publish UPDATEs or DELETEs, or all the following conditions are
    1029             :      * satisfied:
    1030             :      *
    1031             :      * 1. All columns, referenced in the row filters from publications which
    1032             :      * the relation is in, are valid - i.e. when all referenced columns are
    1033             :      * part of REPLICA IDENTITY.
    1034             :      *
    1035             :      * 2. All columns, referenced in the column lists are valid - i.e. when
    1036             :      * all columns referenced in the REPLICA IDENTITY are covered by the
    1037             :      * column list.
    1038             :      *
    1039             :      * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
    1040             :      * - i.e. when all these generated columns are published.
    1041             :      *
    1042             :      * XXX We could optimize it by first checking whether any of the
    1043             :      * publications have a row filter or column list for this relation, or if
    1044             :      * the relation contains a generated column. If none of these exist and
    1045             :      * the relation has replica identity then we can avoid building the
    1046             :      * descriptor but as this happens only one time it doesn't seem worth the
    1047             :      * additional complexity.
    1048             :      */
    1049      176636 :     RelationBuildPublicationDesc(rel, &pubdesc);
    1050      176636 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
    1051          60 :         ereport(ERROR,
    1052             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1053             :                  errmsg("cannot update table \"%s\"",
    1054             :                         RelationGetRelationName(rel)),
    1055             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1056      176576 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
    1057         108 :         ereport(ERROR,
    1058             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1059             :                  errmsg("cannot update table \"%s\"",
    1060             :                         RelationGetRelationName(rel)),
    1061             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1062      176468 :     else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
    1063          24 :         ereport(ERROR,
    1064             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1065             :                  errmsg("cannot update table \"%s\"",
    1066             :                         RelationGetRelationName(rel)),
    1067             :                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1068      176444 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
    1069           0 :         ereport(ERROR,
    1070             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1071             :                  errmsg("cannot delete from table \"%s\"",
    1072             :                         RelationGetRelationName(rel)),
    1073             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1074      176444 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
    1075           0 :         ereport(ERROR,
    1076             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1077             :                  errmsg("cannot delete from table \"%s\"",
    1078             :                         RelationGetRelationName(rel)),
    1079             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1080      176444 :     else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
    1081           0 :         ereport(ERROR,
    1082             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1083             :                  errmsg("cannot delete from table \"%s\"",
    1084             :                         RelationGetRelationName(rel)),
    1085             :                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1086             : 
    1087             :     /* If relation has replica identity we are always good. */
    1088      176444 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
    1089      151694 :         return;
    1090             : 
    1091             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
    1092       24750 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
    1093         458 :         return;
    1094             : 
    1095             :     /*
    1096             :      * This is UPDATE/DELETE and there is no replica identity.
    1097             :      *
    1098             :      * Check if the table publishes UPDATES or DELETES.
    1099             :      */
    1100       24292 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
    1101         124 :         ereport(ERROR,
    1102             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1103             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
    1104             :                         RelationGetRelationName(rel)),
    1105             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1106       24168 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
    1107          16 :         ereport(ERROR,
    1108             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1109             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
    1110             :                         RelationGetRelationName(rel)),
    1111             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1112             : }
    1113             : 
    1114             : 
    1115             : /*
    1116             :  * Check if we support writing into specific relkind of local relation and check
    1117             :  * if it aligns with the relkind of the relation on the publisher.
    1118             :  *
    1119             :  * The nspname and relname are only needed for error reporting.
    1120             :  */
    1121             : void
    1122        1866 : CheckSubscriptionRelkind(char localrelkind, char remoterelkind,
    1123             :                          const char *nspname, const char *relname)
    1124             : {
    1125        1866 :     if (localrelkind != RELKIND_RELATION &&
    1126          34 :         localrelkind != RELKIND_PARTITIONED_TABLE &&
    1127             :         localrelkind != RELKIND_SEQUENCE)
    1128           0 :         ereport(ERROR,
    1129             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1130             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
    1131             :                         nspname, relname),
    1132             :                  errdetail_relkind_not_supported(localrelkind)));
    1133             : 
    1134             :     /*
    1135             :      * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
    1136             :      * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
    1137             :      * exactly on both publisher and subscriber.
    1138             :      */
    1139        1866 :     if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||
    1140        1832 :         (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))
    1141           0 :         ereport(ERROR,
    1142             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1143             :         /* translator: 3rd and 4th %s are "sequence" or "table" */
    1144             :                 errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
    1145             :                        nspname, relname,
    1146             :                        remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1147             :                        localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
    1148        1866 : }

Generated by: LCOV version 1.16