LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 87.2 % 343 299
Test Date: 2026-04-16 06:16:20 Functions: 100.0 % 16 16
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * execReplication.c
       4              :  *    miscellaneous executor routines for logical replication
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/executor/execReplication.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/amapi.h"
      18              : #include "access/commit_ts.h"
      19              : #include "access/genam.h"
      20              : #include "access/gist.h"
      21              : #include "access/relscan.h"
      22              : #include "access/tableam.h"
      23              : #include "access/transam.h"
      24              : #include "access/xact.h"
      25              : #include "access/heapam.h"
      26              : #include "catalog/pg_am_d.h"
      27              : #include "commands/trigger.h"
      28              : #include "executor/executor.h"
      29              : #include "executor/nodeModifyTable.h"
      30              : #include "replication/conflict.h"
      31              : #include "replication/logicalrelation.h"
      32              : #include "storage/lmgr.h"
      33              : #include "utils/builtins.h"
      34              : #include "utils/lsyscache.h"
      35              : #include "utils/rel.h"
      36              : #include "utils/snapmgr.h"
      37              : #include "utils/syscache.h"
      38              : #include "utils/typcache.h"
      39              : 
      40              : 
      41              : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      42              :                          TypeCacheEntry **eq, Bitmapset *columns);
      43              : 
      44              : /*
      45              :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      46              :  * is setup to match 'rel' (*NOT* idxrel!).
      47              :  *
      48              :  * Returns how many columns to use for the index scan.
      49              :  *
      50              :  * This is not a generic routine, idxrel must be PK, RI, or an index that can be
      51              :  * used for a REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      52              :  * for details.
      53              :  *
      54              :  * By definition, replication identity of a rel meets all limitations associated
      55              :  * with that. Note that any other index could also meet these limitations.
      56              :  */
      57              : static int
      58        72136 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      59              :                          TupleTableSlot *searchslot)
      60              : {
      61              :     int         index_attoff;
      62        72136 :     int         skey_attoff = 0;
      63              :     Datum       indclassDatum;
      64              :     oidvector  *opclass;
      65        72136 :     int2vector *indkey = &idxrel->rd_index->indkey;
      66              : 
      67        72136 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
      68              :                                            Anum_pg_index_indclass);
      69        72136 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      70              : 
      71              :     /* Build scankey for every non-expression attribute in the index. */
      72       144305 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
      73        72169 :          index_attoff++)
      74              :     {
      75              :         Oid         operator;
      76              :         Oid         optype;
      77              :         Oid         opfamily;
      78              :         RegProcedure regop;
      79        72169 :         int         table_attno = indkey->values[index_attoff];
      80              :         StrategyNumber eq_strategy;
      81              : 
      82        72169 :         if (!AttributeNumberIsValid(table_attno))
      83              :         {
      84              :             /*
      85              :              * XXX: Currently, we don't support expressions in the scan key,
      86              :              * see code below.
      87              :              */
      88            2 :             continue;
      89              :         }
      90              : 
      91              :         /*
      92              :          * Load the operator info.  We need this to get the equality operator
      93              :          * function for the scan key.
      94              :          */
      95        72167 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
      96        72167 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
      97        72167 :         eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
      98        72167 :         operator = get_opfamily_member(opfamily, optype,
      99              :                                        optype,
     100              :                                        eq_strategy);
     101              : 
     102        72167 :         if (!OidIsValid(operator))
     103            0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     104              :                  eq_strategy, optype, optype, opfamily);
     105              : 
     106        72167 :         regop = get_opcode(operator);
     107              : 
     108              :         /* Initialize the scankey. */
     109        72167 :         ScanKeyInit(&skey[skey_attoff],
     110        72167 :                     index_attoff + 1,
     111              :                     eq_strategy,
     112              :                     regop,
     113        72167 :                     searchslot->tts_values[table_attno - 1]);
     114              : 
     115        72167 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     116              : 
     117              :         /* Check for null value. */
     118        72167 :         if (searchslot->tts_isnull[table_attno - 1])
     119            1 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     120              : 
     121        72167 :         skey_attoff++;
     122              :     }
     123              : 
     124              :     /* There must always be at least one attribute for the index scan. */
     125              :     Assert(skey_attoff > 0);
     126              : 
     127        72136 :     return skey_attoff;
     128              : }
     129              : 
     130              : 
     131              : /*
     132              :  * Helper function to check if it is necessary to re-fetch and lock the tuple
     133              :  * due to concurrent modifications. This function should be called after
     134              :  * invoking table_tuple_lock.
     135              :  */
     136              : static bool
     137        72340 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
     138              : {
     139        72340 :     bool        refetch = false;
     140              : 
     141        72340 :     switch (res)
     142              :     {
     143        72340 :         case TM_Ok:
     144        72340 :             break;
     145            0 :         case TM_Updated:
     146              :             /* XXX: Improve handling here */
     147            0 :             if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
     148            0 :                 ereport(LOG,
     149              :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     150              :                          errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     151              :             else
     152            0 :                 ereport(LOG,
     153              :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     154              :                          errmsg("concurrent update, retrying")));
     155            0 :             refetch = true;
     156            0 :             break;
     157            0 :         case TM_Deleted:
     158              :             /* XXX: Improve handling here */
     159            0 :             ereport(LOG,
     160              :                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     161              :                      errmsg("concurrent delete, retrying")));
     162            0 :             refetch = true;
     163            0 :             break;
     164            0 :         case TM_Invisible:
     165            0 :             elog(ERROR, "attempted to lock invisible tuple");
     166              :             break;
     167            0 :         default:
     168            0 :             elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     169              :             break;
     170              :     }
     171              : 
     172        72340 :     return refetch;
     173              : }
     174              : 
     175              : /*
     176              :  * Search the relation 'rel' for tuple using the index.
     177              :  *
     178              :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     179              :  * contents, and return true.  Return false otherwise.
     180              :  */
     181              : bool
     182        72135 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     183              :                              LockTupleMode lockmode,
     184              :                              TupleTableSlot *searchslot,
     185              :                              TupleTableSlot *outslot)
     186              : {
     187              :     ScanKeyData skey[INDEX_MAX_KEYS];
     188              :     int         skey_attoff;
     189              :     IndexScanDesc scan;
     190              :     SnapshotData snap;
     191              :     TransactionId xwait;
     192              :     Relation    idxrel;
     193              :     bool        found;
     194        72135 :     TypeCacheEntry **eq = NULL;
     195              :     bool        isIdxSafeToSkipDuplicates;
     196              : 
     197              :     /* Open the index. */
     198        72135 :     idxrel = index_open(idxoid, RowExclusiveLock);
     199              : 
     200        72135 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     201              : 
     202        72135 :     InitDirtySnapshot(snap);
     203              : 
     204              :     /* Build scan key. */
     205        72135 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     206              : 
     207              :     /* Start an index scan. */
     208        72135 :     scan = index_beginscan(rel, idxrel,
     209              :                            &snap, NULL, skey_attoff, 0, SO_NONE);
     210              : 
     211            0 : retry:
     212        72135 :     found = false;
     213              : 
     214        72135 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     215              : 
     216              :     /* Try to find the tuple */
     217        72135 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     218              :     {
     219              :         /*
     220              :          * Avoid expensive equality check if the index is primary key or
     221              :          * replica identity index.
     222              :          */
     223        72100 :         if (!isIdxSafeToSkipDuplicates)
     224              :         {
     225           19 :             if (eq == NULL)
     226           19 :                 eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
     227              : 
     228           19 :             if (!tuples_equal(outslot, searchslot, eq, NULL))
     229            0 :                 continue;
     230              :         }
     231              : 
     232        72100 :         ExecMaterializeSlot(outslot);
     233              : 
     234       144200 :         xwait = TransactionIdIsValid(snap.xmin) ?
     235        72100 :             snap.xmin : snap.xmax;
     236              : 
     237              :         /*
     238              :          * If the tuple is locked, wait for locking transaction to finish and
     239              :          * retry.
     240              :          */
     241        72100 :         if (TransactionIdIsValid(xwait))
     242              :         {
     243            0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     244            0 :             goto retry;
     245              :         }
     246              : 
     247              :         /* Found our tuple and it's not locked */
     248        72100 :         found = true;
     249        72100 :         break;
     250              :     }
     251              : 
     252              :     /* Found tuple, try to lock it in the lockmode. */
     253        72135 :     if (found)
     254              :     {
     255              :         TM_FailureData tmfd;
     256              :         TM_Result   res;
     257              : 
     258        72100 :         PushActiveSnapshot(GetLatestSnapshot());
     259              : 
     260        72100 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     261              :                                outslot,
     262              :                                GetCurrentCommandId(false),
     263              :                                lockmode,
     264              :                                LockWaitBlock,
     265              :                                0 /* don't follow updates */ ,
     266              :                                &tmfd);
     267              : 
     268        72100 :         PopActiveSnapshot();
     269              : 
     270        72100 :         if (should_refetch_tuple(res, &tmfd))
     271            0 :             goto retry;
     272              :     }
     273              : 
     274        72135 :     index_endscan(scan);
     275              : 
     276              :     /* Don't release lock until commit. */
     277        72135 :     index_close(idxrel, NoLock);
     278              : 
     279        72135 :     return found;
     280              : }
     281              : 
     282              : /*
     283              :  * Compare the tuples in the slots by checking if they have equal values.
     284              :  *
     285              :  * If 'columns' is not null, only the columns specified within it will be
     286              :  * considered for the equality check, ignoring all other columns.
     287              :  */
     288              : static bool
     289       105337 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     290              :              TypeCacheEntry **eq, Bitmapset *columns)
     291              : {
     292              :     int         attrnum;
     293              : 
     294              :     Assert(slot1->tts_tupleDescriptor->natts ==
     295              :            slot2->tts_tupleDescriptor->natts);
     296              : 
     297       105337 :     slot_getallattrs(slot1);
     298       105337 :     slot_getallattrs(slot2);
     299              : 
     300              :     /* Check equality of the attributes. */
     301       105555 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     302              :     {
     303              :         Form_pg_attribute att;
     304              :         TypeCacheEntry *typentry;
     305              : 
     306       105385 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     307              : 
     308              :         /*
     309              :          * Ignore dropped and generated columns as the publisher doesn't send
     310              :          * those
     311              :          */
     312       105385 :         if (att->attisdropped || att->attgenerated)
     313            1 :             continue;
     314              : 
     315              :         /*
     316              :          * Ignore columns that are not listed for checking.
     317              :          */
     318       105384 :         if (columns &&
     319            0 :             !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     320              :                            columns))
     321            0 :             continue;
     322              : 
     323              :         /*
     324              :          * If one value is NULL and other is not, then they are certainly not
     325              :          * equal
     326              :          */
     327       105384 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     328            0 :             return false;
     329              : 
     330              :         /*
     331              :          * If both are NULL, they can be considered equal.
     332              :          */
     333       105384 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     334            1 :             continue;
     335              : 
     336       105383 :         typentry = eq[attrnum];
     337       105383 :         if (typentry == NULL)
     338              :         {
     339          216 :             typentry = lookup_type_cache(att->atttypid,
     340              :                                          TYPECACHE_EQ_OPR_FINFO);
     341          216 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     342            0 :                 ereport(ERROR,
     343              :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     344              :                          errmsg("could not identify an equality operator for type %s",
     345              :                                 format_type_be(att->atttypid))));
     346          216 :             eq[attrnum] = typentry;
     347              :         }
     348              : 
     349       105383 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     350              :                                             att->attcollation,
     351       105383 :                                             slot1->tts_values[attrnum],
     352       105383 :                                             slot2->tts_values[attrnum])))
     353       105167 :             return false;
     354              :     }
     355              : 
     356          170 :     return true;
     357              : }
     358              : 
     359              : /*
     360              :  * Search the relation 'rel' for tuple using the sequential scan.
     361              :  *
     362              :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     363              :  * contents, and return true.  Return false otherwise.
     364              :  *
     365              :  * Note that this stops on the first matching tuple.
     366              :  *
     367              :  * This can obviously be quite slow on tables that have more than few rows.
     368              :  */
     369              : bool
     370          153 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     371              :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     372              : {
     373              :     TupleTableSlot *scanslot;
     374              :     TableScanDesc scan;
     375              :     SnapshotData snap;
     376              :     TypeCacheEntry **eq;
     377              :     TransactionId xwait;
     378              :     bool        found;
     379          153 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     380              : 
     381              :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     382              : 
     383          153 :     eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
     384              : 
     385              :     /* Start a heap scan. */
     386          153 :     InitDirtySnapshot(snap);
     387          153 :     scan = table_beginscan(rel, &snap, 0, NULL,
     388              :                            SO_NONE);
     389          153 :     scanslot = table_slot_create(rel, NULL);
     390              : 
     391            0 : retry:
     392          153 :     found = false;
     393              : 
     394          153 :     table_rescan(scan, NULL);
     395              : 
     396              :     /* Try to find the tuple */
     397       105319 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     398              :     {
     399       105315 :         if (!tuples_equal(scanslot, searchslot, eq, NULL))
     400       105166 :             continue;
     401              : 
     402          149 :         found = true;
     403          149 :         ExecCopySlot(outslot, scanslot);
     404              : 
     405          298 :         xwait = TransactionIdIsValid(snap.xmin) ?
     406          149 :             snap.xmin : snap.xmax;
     407              : 
     408              :         /*
     409              :          * If the tuple is locked, wait for locking transaction to finish and
     410              :          * retry.
     411              :          */
     412          149 :         if (TransactionIdIsValid(xwait))
     413              :         {
     414            0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     415            0 :             goto retry;
     416              :         }
     417              : 
     418              :         /* Found our tuple and it's not locked */
     419          149 :         break;
     420              :     }
     421              : 
     422              :     /* Found tuple, try to lock it in the lockmode. */
     423          153 :     if (found)
     424              :     {
     425              :         TM_FailureData tmfd;
     426              :         TM_Result   res;
     427              : 
     428          149 :         PushActiveSnapshot(GetLatestSnapshot());
     429              : 
     430          149 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     431              :                                outslot,
     432              :                                GetCurrentCommandId(false),
     433              :                                lockmode,
     434              :                                LockWaitBlock,
     435              :                                0 /* don't follow updates */ ,
     436              :                                &tmfd);
     437              : 
     438          149 :         PopActiveSnapshot();
     439              : 
     440          149 :         if (should_refetch_tuple(res, &tmfd))
     441            0 :             goto retry;
     442              :     }
     443              : 
     444          153 :     table_endscan(scan);
     445          153 :     ExecDropSingleTupleTableSlot(scanslot);
     446              : 
     447          153 :     return found;
     448              : }
     449              : 
     450              : /*
     451              :  * Build additional index information necessary for conflict detection.
     452              :  */
     453              : static void
     454           94 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
     455              : {
     456          284 :     for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
     457              :     {
     458          190 :         Relation    indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
     459          190 :         IndexInfo  *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
     460              : 
     461          190 :         if (conflictindex != RelationGetRelid(indexRelation))
     462           96 :             continue;
     463              : 
     464              :         /*
     465              :          * This Assert will fail if BuildSpeculativeIndexInfo() is called
     466              :          * twice for the given index.
     467              :          */
     468              :         Assert(indexRelationInfo->ii_UniqueOps == NULL);
     469              : 
     470           94 :         BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
     471              :     }
     472           94 : }
     473              : 
     474              : /*
     475              :  * If the tuple is recently dead and was deleted by a transaction with a newer
     476              :  * commit timestamp than previously recorded, update the associated transaction
     477              :  * ID, commit time, and origin. This helps ensure that conflict detection uses
     478              :  * the most recent and relevant deletion metadata.
     479              :  */
     480              : static void
     481            3 : update_most_recent_deletion_info(TupleTableSlot *scanslot,
     482              :                                  TransactionId oldestxmin,
     483              :                                  TransactionId *delete_xid,
     484              :                                  TimestampTz *delete_time,
     485              :                                  ReplOriginId *delete_origin)
     486              : {
     487              :     BufferHeapTupleTableSlot *hslot;
     488              :     HeapTuple   tuple;
     489              :     Buffer      buf;
     490            3 :     bool        recently_dead = false;
     491              :     TransactionId xmax;
     492              :     TimestampTz localts;
     493              :     ReplOriginId localorigin;
     494              : 
     495            3 :     hslot = (BufferHeapTupleTableSlot *) scanslot;
     496              : 
     497            3 :     tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
     498            3 :     buf = hslot->buffer;
     499              : 
     500            3 :     LockBuffer(buf, BUFFER_LOCK_SHARE);
     501              : 
     502              :     /*
     503              :      * We do not consider HEAPTUPLE_DEAD status because it indicates either
     504              :      * tuples whose inserting transaction was aborted (meaning there is no
     505              :      * commit timestamp or origin), or tuples deleted by a transaction older
     506              :      * than oldestxmin, making it safe to ignore them during conflict
     507              :      * detection (See comments atop worker.c for details).
     508              :      */
     509            3 :     if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
     510            3 :         recently_dead = true;
     511              : 
     512            3 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     513              : 
     514            3 :     if (!recently_dead)
     515            0 :         return;
     516              : 
     517            3 :     xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
     518            3 :     if (!TransactionIdIsValid(xmax))
     519            0 :         return;
     520              : 
     521              :     /* Select the dead tuple with the most recent commit timestamp */
     522            6 :     if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
     523            3 :         TimestampDifferenceExceeds(*delete_time, localts, 0))
     524              :     {
     525            3 :         *delete_xid = xmax;
     526            3 :         *delete_time = localts;
     527            3 :         *delete_origin = localorigin;
     528              :     }
     529              : }
     530              : 
     531              : /*
     532              :  * Searches the relation 'rel' for the most recently deleted tuple that matches
     533              :  * the values in 'searchslot' and is not yet removable by VACUUM. The function
     534              :  * returns the transaction ID, origin, and commit timestamp of the transaction
     535              :  * that deleted this tuple.
     536              :  *
     537              :  * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
     538              :  * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
     539              :  * conflict detection.
     540              :  *
     541              :  * Instead of stopping at the first match, we scan all matching dead tuples to
     542              :  * identify most recent deletion. This is crucial because only the latest
     543              :  * deletion is relevant for resolving conflicts.
     544              :  *
     545              :  * For example, consider a scenario on the subscriber where a row is deleted,
     546              :  * re-inserted, and then deleted again only on the subscriber:
     547              :  *
     548              :  *   - (pk, 1) - deleted at 9:00,
     549              :  *   - (pk, 1) - deleted at 9:02,
     550              :  *
     551              :  * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
     552              :  *
     553              :  * If we mistakenly return the older deletion (9:00), the system may wrongly
     554              :  * apply the remote update using a last-update-wins strategy. Instead, we must
     555              :  * recognize the more recent deletion at 9:02 and skip the update. See
     556              :  * comments atop worker.c for details. Note, as of now, conflict resolution
     557              :  * is not implemented. Consequently, the system may incorrectly report the
     558              :  * older tuple as the conflicted one, leading to misleading results.
     559              :  *
     560              :  * The commit timestamp of the deleting transaction is used to determine which
     561              :  * tuple was deleted most recently.
     562              :  */
     563              : bool
     564            2 : RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
     565              :                                 TransactionId oldestxmin,
     566              :                                 TransactionId *delete_xid,
     567              :                                 ReplOriginId *delete_origin,
     568              :                                 TimestampTz *delete_time)
     569              : {
     570              :     TupleTableSlot *scanslot;
     571              :     TableScanDesc scan;
     572              :     TypeCacheEntry **eq;
     573              :     Bitmapset  *indexbitmap;
     574            2 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     575              : 
     576              :     Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     577              : 
     578            2 :     *delete_xid = InvalidTransactionId;
     579            2 :     *delete_origin = InvalidReplOriginId;
     580            2 :     *delete_time = 0;
     581              : 
     582              :     /*
     583              :      * If the relation has a replica identity key or a primary key that is
     584              :      * unusable for locating deleted tuples (see
     585              :      * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
     586              :      * necessary. In such cases, comparing the entire tuple is not required,
     587              :      * since the remote tuple might not include all column values. Instead,
     588              :      * the indexed columns alone are sufficient to identify the target tuple
     589              :      * (see logicalrep_rel_mark_updatable).
     590              :      */
     591            2 :     indexbitmap = RelationGetIndexAttrBitmap(rel,
     592              :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
     593              : 
     594              :     /* fallback to PK if no replica identity */
     595            2 :     if (!indexbitmap)
     596            2 :         indexbitmap = RelationGetIndexAttrBitmap(rel,
     597              :                                                  INDEX_ATTR_BITMAP_PRIMARY_KEY);
     598              : 
     599            2 :     eq = palloc0_array(TypeCacheEntry *, searchslot->tts_tupleDescriptor->natts);
     600              : 
     601              :     /*
     602              :      * Start a heap scan using SnapshotAny to identify dead tuples that are
     603              :      * not visible under a standard MVCC snapshot. Tuples from transactions
     604              :      * not yet committed or those just committed prior to the scan are
     605              :      * excluded in update_most_recent_deletion_info().
     606              :      */
     607            2 :     scan = table_beginscan(rel, SnapshotAny, 0, NULL,
     608              :                            SO_NONE);
     609            2 :     scanslot = table_slot_create(rel, NULL);
     610              : 
     611            2 :     table_rescan(scan, NULL);
     612              : 
     613              :     /* Try to find the tuple */
     614            5 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     615              :     {
     616            3 :         if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
     617            1 :             continue;
     618              : 
     619            2 :         update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     620              :                                          delete_time, delete_origin);
     621              :     }
     622              : 
     623            2 :     table_endscan(scan);
     624            2 :     ExecDropSingleTupleTableSlot(scanslot);
     625              : 
     626            2 :     return *delete_time != 0;
     627              : }
     628              : 
     629              : /*
     630              :  * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
     631              :  * the deleted tuple.
     632              :  */
     633              : bool
     634            1 : RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
     635              :                                     TupleTableSlot *searchslot,
     636              :                                     TransactionId oldestxmin,
     637              :                                     TransactionId *delete_xid,
     638              :                                     ReplOriginId *delete_origin,
     639              :                                     TimestampTz *delete_time)
     640              : {
     641              :     Relation    idxrel;
     642              :     ScanKeyData skey[INDEX_MAX_KEYS];
     643              :     int         skey_attoff;
     644              :     IndexScanDesc scan;
     645              :     TupleTableSlot *scanslot;
     646            1 :     TypeCacheEntry **eq = NULL;
     647              :     bool        isIdxSafeToSkipDuplicates;
     648            1 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     649              : 
     650              :     Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     651              :     Assert(OidIsValid(idxoid));
     652              : 
     653            1 :     *delete_xid = InvalidTransactionId;
     654            1 :     *delete_time = 0;
     655            1 :     *delete_origin = InvalidReplOriginId;
     656              : 
     657            1 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     658              : 
     659            1 :     scanslot = table_slot_create(rel, NULL);
     660              : 
     661            1 :     idxrel = index_open(idxoid, RowExclusiveLock);
     662              : 
     663              :     /* Build scan key. */
     664            1 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     665              : 
     666              :     /*
     667              :      * Start an index scan using SnapshotAny to identify dead tuples that are
     668              :      * not visible under a standard MVCC snapshot. Tuples from transactions
     669              :      * not yet committed or those just committed prior to the scan are
     670              :      * excluded in update_most_recent_deletion_info().
     671              :      */
     672            1 :     scan = index_beginscan(rel, idxrel,
     673              :                            SnapshotAny, NULL, skey_attoff, 0, SO_NONE);
     674              : 
     675            1 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     676              : 
     677              :     /* Try to find the tuple */
     678            2 :     while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
     679              :     {
     680              :         /*
     681              :          * Avoid expensive equality check if the index is primary key or
     682              :          * replica identity index.
     683              :          */
     684            1 :         if (!isIdxSafeToSkipDuplicates)
     685              :         {
     686            0 :             if (eq == NULL)
     687            0 :                 eq = palloc0_array(TypeCacheEntry *, scanslot->tts_tupleDescriptor->natts);
     688              : 
     689            0 :             if (!tuples_equal(scanslot, searchslot, eq, NULL))
     690            0 :                 continue;
     691              :         }
     692              : 
     693            1 :         update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     694              :                                          delete_time, delete_origin);
     695              :     }
     696              : 
     697            1 :     index_endscan(scan);
     698              : 
     699            1 :     index_close(idxrel, NoLock);
     700              : 
     701            1 :     ExecDropSingleTupleTableSlot(scanslot);
     702              : 
     703            1 :     return *delete_time != 0;
     704              : }
     705              : 
     706              : /*
     707              :  * Find the tuple that violates the passed unique index (conflictindex).
     708              :  *
     709              :  * If the conflicting tuple is found return true, otherwise false.
     710              :  *
     711              :  * We lock the tuple to avoid getting it deleted before the caller can fetch
     712              :  * the required information. Note that if the tuple is deleted before a lock
     713              :  * is acquired, we will retry to find the conflicting tuple again.
     714              :  */
     715              : static bool
     716           94 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
     717              :                   Oid conflictindex, TupleTableSlot *slot,
     718              :                   TupleTableSlot **conflictslot)
     719              : {
     720           94 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     721              :     ItemPointerData conflictTid;
     722              :     TM_FailureData tmfd;
     723              :     TM_Result   res;
     724              : 
     725           94 :     *conflictslot = NULL;
     726              : 
     727              :     /*
     728              :      * Build additional information required to check constraints violations.
     729              :      * See check_exclusion_or_unique_constraint().
     730              :      */
     731           94 :     BuildConflictIndexInfo(resultRelInfo, conflictindex);
     732              : 
     733           94 : retry:
     734          187 :     if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
     735           94 :                                   &conflictTid, &slot->tts_tid,
     736              :                                   list_make1_oid(conflictindex)))
     737              :     {
     738            2 :         if (*conflictslot)
     739            0 :             ExecDropSingleTupleTableSlot(*conflictslot);
     740              : 
     741            2 :         *conflictslot = NULL;
     742            2 :         return false;
     743              :     }
     744              : 
     745           91 :     *conflictslot = table_slot_create(rel, NULL);
     746              : 
     747           91 :     PushActiveSnapshot(GetLatestSnapshot());
     748              : 
     749           91 :     res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
     750              :                            *conflictslot,
     751              :                            GetCurrentCommandId(false),
     752              :                            LockTupleShare,
     753              :                            LockWaitBlock,
     754              :                            0 /* don't follow updates */ ,
     755              :                            &tmfd);
     756              : 
     757           91 :     PopActiveSnapshot();
     758              : 
     759           91 :     if (should_refetch_tuple(res, &tmfd))
     760            0 :         goto retry;
     761              : 
     762           91 :     return true;
     763              : }
     764              : 
     765              : /*
     766              :  * Check all the unique indexes in 'recheckIndexes' for conflict with the
     767              :  * tuple in 'remoteslot' and report if found.
     768              :  */
     769              : static void
     770           50 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
     771              :                        ConflictType type, List *recheckIndexes,
     772              :                        TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
     773              : {
     774           50 :     List       *conflicttuples = NIL;
     775              :     TupleTableSlot *conflictslot;
     776              : 
     777              :     /* Check all the unique indexes for conflicts */
     778          192 :     foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
     779              :     {
     780          187 :         if (list_member_oid(recheckIndexes, uniqueidx) &&
     781           94 :             FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
     782              :                               &conflictslot))
     783              :         {
     784           91 :             ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
     785              : 
     786           91 :             conflicttuple->slot = conflictslot;
     787           91 :             conflicttuple->indexoid = uniqueidx;
     788              : 
     789           91 :             GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
     790              :                                     &conflicttuple->origin, &conflicttuple->ts);
     791              : 
     792           91 :             conflicttuples = lappend(conflicttuples, conflicttuple);
     793              :         }
     794              :     }
     795              : 
     796              :     /* Report the conflict, if found */
     797           49 :     if (conflicttuples)
     798           47 :         ReportApplyConflict(estate, resultRelInfo, ERROR,
     799           47 :                             list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
     800              :                             searchslot, remoteslot, conflicttuples);
     801            2 : }
     802              : 
     803              : /*
     804              :  * Insert tuple represented in the slot to the relation, update the indexes,
     805              :  * and execute any constraints and per-row triggers.
     806              :  *
     807              :  * Caller is responsible for opening the indexes.
     808              :  */
     809              : void
     810        96320 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     811              :                          EState *estate, TupleTableSlot *slot)
     812              : {
     813        96320 :     bool        skip_tuple = false;
     814        96320 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     815              : 
     816              :     /* For now we support only tables. */
     817              :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     818              : 
     819        96320 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     820              : 
     821              :     /* BEFORE ROW INSERT Triggers */
     822        96320 :     if (resultRelInfo->ri_TrigDesc &&
     823           20 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     824              :     {
     825            3 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     826            1 :             skip_tuple = true;  /* "do nothing" */
     827              :     }
     828              : 
     829        96320 :     if (!skip_tuple)
     830              :     {
     831        96319 :         List       *recheckIndexes = NIL;
     832              :         List       *conflictindexes;
     833        96319 :         bool        conflict = false;
     834              : 
     835              :         /* Compute stored generated columns */
     836        96319 :         if (rel->rd_att->constr &&
     837        65812 :             rel->rd_att->constr->has_generated_stored)
     838            4 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     839              :                                        CMD_INSERT);
     840              : 
     841              :         /* Check the constraints of the tuple */
     842        96319 :         if (rel->rd_att->constr)
     843        65812 :             ExecConstraints(resultRelInfo, slot, estate);
     844        96319 :         if (rel->rd_rel->relispartition)
     845           96 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     846              : 
     847              :         /* OK, store the tuple and create index entries for it */
     848        96319 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     849              : 
     850        96319 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     851              : 
     852        96319 :         if (resultRelInfo->ri_NumIndices > 0)
     853              :         {
     854              :             uint32      flags;
     855              : 
     856        80887 :             if (conflictindexes != NIL)
     857        80883 :                 flags = EIIT_NO_DUPE_ERROR;
     858              :             else
     859            4 :                 flags = 0;
     860        80887 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     861              :                                                    estate, flags,
     862              :                                                    slot, conflictindexes,
     863              :                                                    &conflict);
     864              :         }
     865              : 
     866              :         /*
     867              :          * Checks the conflict indexes to fetch the conflicting local row and
     868              :          * reports the conflict. We perform this check here, instead of
     869              :          * performing an additional index scan before the actual insertion and
     870              :          * reporting the conflict if any conflicting rows are found. This is
     871              :          * to avoid the overhead of executing the extra scan for each INSERT
     872              :          * operation, even when no conflict arises, which could introduce
     873              :          * significant overhead to replication, particularly in cases where
     874              :          * conflicts are rare.
     875              :          *
     876              :          * XXX OTOH, this could lead to clean-up effort for dead tuples added
     877              :          * in heap and index in case of conflicts. But as conflicts shouldn't
     878              :          * be a frequent thing so we preferred to save the performance
     879              :          * overhead of extra scan before each insertion.
     880              :          */
     881        96319 :         if (conflict)
     882           47 :             CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
     883              :                                    recheckIndexes, NULL, slot);
     884              : 
     885              :         /* AFTER ROW INSERT Triggers */
     886        96274 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     887              :                              recheckIndexes, NULL);
     888              : 
     889              :         /*
     890              :          * XXX we should in theory pass a TransitionCaptureState object to the
     891              :          * above to capture transition tuples, but after statement triggers
     892              :          * don't actually get fired by replication yet anyway
     893              :          */
     894              : 
     895        96274 :         list_free(recheckIndexes);
     896              :     }
     897        96275 : }
     898              : 
     899              : /*
     900              :  * Find the searchslot tuple and update it with data in the slot,
     901              :  * update the indexes, and execute any constraints and per-row triggers.
     902              :  *
     903              :  * Caller is responsible for opening the indexes.
     904              :  */
     905              : void
     906        31930 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     907              :                          EState *estate, EPQState *epqstate,
     908              :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     909              : {
     910        31930 :     bool        skip_tuple = false;
     911        31930 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     912        31930 :     ItemPointer tid = &(searchslot->tts_tid);
     913              : 
     914              :     /*
     915              :      * We support only non-system tables, with
     916              :      * check_publication_add_relation() accountable.
     917              :      */
     918              :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     919              :     Assert(!IsCatalogRelation(rel));
     920              : 
     921        31930 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     922              : 
     923              :     /* BEFORE ROW UPDATE Triggers */
     924        31930 :     if (resultRelInfo->ri_TrigDesc &&
     925           10 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     926              :     {
     927            3 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     928              :                                   tid, NULL, slot, NULL, NULL, false))
     929            2 :             skip_tuple = true;  /* "do nothing" */
     930              :     }
     931              : 
     932        31930 :     if (!skip_tuple)
     933              :     {
     934        31928 :         List       *recheckIndexes = NIL;
     935              :         TU_UpdateIndexes update_indexes;
     936              :         List       *conflictindexes;
     937        31928 :         bool        conflict = false;
     938              : 
     939              :         /* Compute stored generated columns */
     940        31928 :         if (rel->rd_att->constr &&
     941        31881 :             rel->rd_att->constr->has_generated_stored)
     942            2 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     943              :                                        CMD_UPDATE);
     944              : 
     945              :         /* Check the constraints of the tuple */
     946        31928 :         if (rel->rd_att->constr)
     947        31881 :             ExecConstraints(resultRelInfo, slot, estate);
     948        31928 :         if (rel->rd_rel->relispartition)
     949           12 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     950              : 
     951        31928 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     952              :                                   &update_indexes);
     953              : 
     954        31928 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     955              : 
     956        31928 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     957              :         {
     958        20209 :             uint32      flags = EIIT_IS_UPDATE;
     959              : 
     960        20209 :             if (conflictindexes != NIL)
     961        20200 :                 flags |= EIIT_NO_DUPE_ERROR;
     962        20209 :             if (update_indexes == TU_Summarizing)
     963            0 :                 flags |= EIIT_ONLY_SUMMARIZING;
     964        20209 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     965              :                                                    estate, flags,
     966              :                                                    slot, conflictindexes,
     967              :                                                    &conflict);
     968              :         }
     969              : 
     970              :         /*
     971              :          * Refer to the comments above the call to CheckAndReportConflict() in
     972              :          * ExecSimpleRelationInsert to understand why this check is done at
     973              :          * this point.
     974              :          */
     975        31928 :         if (conflict)
     976            3 :             CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
     977              :                                    recheckIndexes, searchslot, slot);
     978              : 
     979              :         /* AFTER ROW UPDATE Triggers */
     980        31925 :         ExecARUpdateTriggers(estate, resultRelInfo,
     981              :                              NULL, NULL,
     982              :                              tid, NULL, slot,
     983              :                              recheckIndexes, NULL, false);
     984              : 
     985        31925 :         list_free(recheckIndexes);
     986              :     }
     987        31927 : }
     988              : 
     989              : /*
     990              :  * Find the searchslot tuple and delete it, and execute any constraints
     991              :  * and per-row triggers.
     992              :  *
     993              :  * Caller is responsible for opening the indexes.
     994              :  */
     995              : void
     996        40319 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     997              :                          EState *estate, EPQState *epqstate,
     998              :                          TupleTableSlot *searchslot)
     999              : {
    1000        40319 :     bool        skip_tuple = false;
    1001        40319 :     Relation    rel = resultRelInfo->ri_RelationDesc;
    1002        40319 :     ItemPointer tid = &searchslot->tts_tid;
    1003              : 
    1004        40319 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
    1005              : 
    1006              :     /* BEFORE ROW DELETE Triggers */
    1007        40319 :     if (resultRelInfo->ri_TrigDesc &&
    1008           10 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
    1009              :     {
    1010            0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
    1011            0 :                                            tid, NULL, NULL, NULL, NULL, false);
    1012              :     }
    1013              : 
    1014        40319 :     if (!skip_tuple)
    1015              :     {
    1016              :         /* OK, delete the tuple */
    1017        40319 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
    1018              : 
    1019              :         /* AFTER ROW DELETE Triggers */
    1020        40319 :         ExecARDeleteTriggers(estate, resultRelInfo,
    1021              :                              tid, NULL, NULL, false);
    1022              :     }
    1023        40319 : }
    1024              : 
    1025              : /*
    1026              :  * Check if command can be executed with current replica identity.
    1027              :  */
    1028              : void
    1029       254095 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
    1030              : {
    1031              :     PublicationDesc pubdesc;
    1032              : 
    1033              :     /*
    1034              :      * Skip checking the replica identity for partitioned tables, because the
    1035              :      * operations are actually performed on the leaf partitions.
    1036              :      */
    1037       254095 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    1038       238297 :         return;
    1039              : 
    1040              :     /* We only need to do checks for UPDATE and DELETE. */
    1041       250004 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
    1042       156838 :         return;
    1043              : 
    1044              :     /*
    1045              :      * It is only safe to execute UPDATE/DELETE if the relation does not
    1046              :      * publish UPDATEs or DELETEs, or all the following conditions are
    1047              :      * satisfied:
    1048              :      *
    1049              :      * 1. All columns, referenced in the row filters from publications which
    1050              :      * the relation is in, are valid - i.e. when all referenced columns are
    1051              :      * part of REPLICA IDENTITY.
    1052              :      *
    1053              :      * 2. All columns, referenced in the column lists are valid - i.e. when
    1054              :      * all columns referenced in the REPLICA IDENTITY are covered by the
    1055              :      * column list.
    1056              :      *
    1057              :      * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
    1058              :      * - i.e. when all these generated columns are published.
    1059              :      *
    1060              :      * XXX We could optimize it by first checking whether any of the
    1061              :      * publications have a row filter or column list for this relation, or if
    1062              :      * the relation contains a generated column. If none of these exist and
    1063              :      * the relation has replica identity then we can avoid building the
    1064              :      * descriptor but as this happens only one time it doesn't seem worth the
    1065              :      * additional complexity.
    1066              :      */
    1067        93166 :     RelationBuildPublicationDesc(rel, &pubdesc);
    1068        93166 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
    1069           40 :         ereport(ERROR,
    1070              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1071              :                  errmsg("cannot update table \"%s\"",
    1072              :                         RelationGetRelationName(rel)),
    1073              :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1074        93126 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
    1075           72 :         ereport(ERROR,
    1076              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1077              :                  errmsg("cannot update table \"%s\"",
    1078              :                         RelationGetRelationName(rel)),
    1079              :                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1080        93054 :     else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
    1081           16 :         ereport(ERROR,
    1082              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1083              :                  errmsg("cannot update table \"%s\"",
    1084              :                         RelationGetRelationName(rel)),
    1085              :                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1086        93038 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
    1087            0 :         ereport(ERROR,
    1088              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1089              :                  errmsg("cannot delete from table \"%s\"",
    1090              :                         RelationGetRelationName(rel)),
    1091              :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1092        93038 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
    1093            0 :         ereport(ERROR,
    1094              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1095              :                  errmsg("cannot delete from table \"%s\"",
    1096              :                         RelationGetRelationName(rel)),
    1097              :                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1098        93038 :     else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
    1099            0 :         ereport(ERROR,
    1100              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1101              :                  errmsg("cannot delete from table \"%s\"",
    1102              :                         RelationGetRelationName(rel)),
    1103              :                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1104              : 
    1105              :     /* If relation has replica identity we are always good. */
    1106        93038 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
    1107        77120 :         return;
    1108              : 
    1109              :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
    1110        15918 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
    1111          248 :         return;
    1112              : 
    1113              :     /*
    1114              :      * This is UPDATE/DELETE and there is no replica identity.
    1115              :      *
    1116              :      * Check if the table publishes UPDATES or DELETES.
    1117              :      */
    1118        15670 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
    1119           81 :         ereport(ERROR,
    1120              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1121              :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
    1122              :                         RelationGetRelationName(rel)),
    1123              :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1124        15589 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
    1125           14 :         ereport(ERROR,
    1126              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1127              :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
    1128              :                         RelationGetRelationName(rel)),
    1129              :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1130              : }
    1131              : 
    1132              : 
    1133              : /*
    1134              :  * Check if we support writing into specific relkind of local relation and check
    1135              :  * if it aligns with the relkind of the relation on the publisher.
    1136              :  *
    1137              :  * The nspname and relname are only needed for error reporting.
    1138              :  */
    1139              : void
    1140         1049 : CheckSubscriptionRelkind(char localrelkind, char remoterelkind,
    1141              :                          const char *nspname, const char *relname)
    1142              : {
    1143         1049 :     if (localrelkind != RELKIND_RELATION &&
    1144           17 :         localrelkind != RELKIND_PARTITIONED_TABLE &&
    1145              :         localrelkind != RELKIND_SEQUENCE)
    1146            0 :         ereport(ERROR,
    1147              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1148              :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
    1149              :                         nspname, relname),
    1150              :                  errdetail_relkind_not_supported(localrelkind)));
    1151              : 
    1152              :     /*
    1153              :      * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
    1154              :      * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
    1155              :      * exactly on both publisher and subscriber.
    1156              :      */
    1157         1049 :     if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||
    1158         1032 :         (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))
    1159            0 :         ereport(ERROR,
    1160              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1161              :         /* translator: 3rd and 4th %s are "sequence" or "table" */
    1162              :                 errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
    1163              :                        nspname, relname,
    1164              :                        remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1165              :                        localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
    1166         1049 : }
        

Generated by: LCOV version 2.0-1