LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 270 331 81.6 %
Date: 2025-08-16 18:17:32 Functions: 15 16 93.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/commit_ts.h"
      18             : #include "access/genam.h"
      19             : #include "access/gist.h"
      20             : #include "access/relscan.h"
      21             : #include "access/tableam.h"
      22             : #include "access/transam.h"
      23             : #include "access/xact.h"
      24             : #include "access/heapam.h"
      25             : #include "catalog/pg_am_d.h"
      26             : #include "commands/trigger.h"
      27             : #include "executor/executor.h"
      28             : #include "executor/nodeModifyTable.h"
      29             : #include "replication/conflict.h"
      30             : #include "replication/logicalrelation.h"
      31             : #include "storage/lmgr.h"
      32             : #include "utils/builtins.h"
      33             : #include "utils/lsyscache.h"
      34             : #include "utils/rel.h"
      35             : #include "utils/snapmgr.h"
      36             : #include "utils/syscache.h"
      37             : #include "utils/typcache.h"
      38             : 
      39             : 
      40             : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      41             :                          TypeCacheEntry **eq, Bitmapset *columns);
      42             : 
      43             : /*
      44             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      45             :  * is setup to match 'rel' (*NOT* idxrel!).
      46             :  *
      47             :  * Returns how many columns to use for the index scan.
      48             :  *
      49             :  * This is not generic routine, idxrel must be PK, RI, or an index that can be
      50             :  * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      51             :  * for details.
      52             :  *
      53             :  * By definition, replication identity of a rel meets all limitations associated
      54             :  * with that. Note that any other index could also meet these limitations.
      55             :  */
      56             : static int
      57      144210 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      58             :                          TupleTableSlot *searchslot)
      59             : {
      60             :     int         index_attoff;
      61      144210 :     int         skey_attoff = 0;
      62             :     Datum       indclassDatum;
      63             :     oidvector  *opclass;
      64      144210 :     int2vector *indkey = &idxrel->rd_index->indkey;
      65             : 
      66      144210 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
      67             :                                            Anum_pg_index_indclass);
      68      144210 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      69             : 
      70             :     /* Build scankey for every non-expression attribute in the index. */
      71      288466 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
      72      144256 :          index_attoff++)
      73             :     {
      74             :         Oid         operator;
      75             :         Oid         optype;
      76             :         Oid         opfamily;
      77             :         RegProcedure regop;
      78      144256 :         int         table_attno = indkey->values[index_attoff];
      79             :         StrategyNumber eq_strategy;
      80             : 
      81      144256 :         if (!AttributeNumberIsValid(table_attno))
      82             :         {
      83             :             /*
      84             :              * XXX: Currently, we don't support expressions in the scan key,
      85             :              * see code below.
      86             :              */
      87           4 :             continue;
      88             :         }
      89             : 
      90             :         /*
      91             :          * Load the operator info.  We need this to get the equality operator
      92             :          * function for the scan key.
      93             :          */
      94      144252 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
      95      144252 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
      96      144252 :         eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
      97      144252 :         operator = get_opfamily_member(opfamily, optype,
      98             :                                        optype,
      99             :                                        eq_strategy);
     100             : 
     101      144252 :         if (!OidIsValid(operator))
     102           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     103             :                  eq_strategy, optype, optype, opfamily);
     104             : 
     105      144252 :         regop = get_opcode(operator);
     106             : 
     107             :         /* Initialize the scankey. */
     108      144252 :         ScanKeyInit(&skey[skey_attoff],
     109      144252 :                     index_attoff + 1,
     110             :                     eq_strategy,
     111             :                     regop,
     112      144252 :                     searchslot->tts_values[table_attno - 1]);
     113             : 
     114      144252 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     115             : 
     116             :         /* Check for null value. */
     117      144252 :         if (searchslot->tts_isnull[table_attno - 1])
     118           2 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     119             : 
     120      144252 :         skey_attoff++;
     121             :     }
     122             : 
     123             :     /* There must always be at least one attribute for the index scan. */
     124             :     Assert(skey_attoff > 0);
     125             : 
     126      144210 :     return skey_attoff;
     127             : }
     128             : 
     129             : 
     130             : /*
     131             :  * Helper function to check if it is necessary to re-fetch and lock the tuple
     132             :  * due to concurrent modifications. This function should be called after
     133             :  * invoking table_tuple_lock.
     134             :  */
     135             : static bool
     136      144528 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
     137             : {
     138      144528 :     bool        refetch = false;
     139             : 
     140      144528 :     switch (res)
     141             :     {
     142      144528 :         case TM_Ok:
     143      144528 :             break;
     144           0 :         case TM_Updated:
     145             :             /* XXX: Improve handling here */
     146           0 :             if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
     147           0 :                 ereport(LOG,
     148             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     149             :                          errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     150             :             else
     151           0 :                 ereport(LOG,
     152             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     153             :                          errmsg("concurrent update, retrying")));
     154           0 :             refetch = true;
     155           0 :             break;
     156           0 :         case TM_Deleted:
     157             :             /* XXX: Improve handling here */
     158           0 :             ereport(LOG,
     159             :                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     160             :                      errmsg("concurrent delete, retrying")));
     161           0 :             refetch = true;
     162           0 :             break;
     163           0 :         case TM_Invisible:
     164           0 :             elog(ERROR, "attempted to lock invisible tuple");
     165             :             break;
     166           0 :         default:
     167           0 :             elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     168             :             break;
     169             :     }
     170             : 
     171      144528 :     return refetch;
     172             : }
     173             : 
     174             : /*
     175             :  * Search the relation 'rel' for tuple using the index.
     176             :  *
     177             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     178             :  * contents, and return true.  Return false otherwise.
     179             :  */
     180             : bool
     181      144208 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     182             :                              LockTupleMode lockmode,
     183             :                              TupleTableSlot *searchslot,
     184             :                              TupleTableSlot *outslot)
     185             : {
     186             :     ScanKeyData skey[INDEX_MAX_KEYS];
     187             :     int         skey_attoff;
     188             :     IndexScanDesc scan;
     189             :     SnapshotData snap;
     190             :     TransactionId xwait;
     191             :     Relation    idxrel;
     192             :     bool        found;
     193      144208 :     TypeCacheEntry **eq = NULL;
     194             :     bool        isIdxSafeToSkipDuplicates;
     195             : 
     196             :     /* Open the index. */
     197      144208 :     idxrel = index_open(idxoid, RowExclusiveLock);
     198             : 
     199      144208 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     200             : 
     201      144208 :     InitDirtySnapshot(snap);
     202             : 
     203             :     /* Build scan key. */
     204      144208 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     205             : 
     206             :     /* Start an index scan. */
     207      144208 :     scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
     208             : 
     209           0 : retry:
     210      144208 :     found = false;
     211             : 
     212      144208 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     213             : 
     214             :     /* Try to find the tuple */
     215      144208 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     216             :     {
     217             :         /*
     218             :          * Avoid expensive equality check if the index is primary key or
     219             :          * replica identity index.
     220             :          */
     221      144176 :         if (!isIdxSafeToSkipDuplicates)
     222             :         {
     223          34 :             if (eq == NULL)
     224          34 :                 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     225             : 
     226          34 :             if (!tuples_equal(outslot, searchslot, eq, NULL))
     227           0 :                 continue;
     228             :         }
     229             : 
     230      144176 :         ExecMaterializeSlot(outslot);
     231             : 
     232      288352 :         xwait = TransactionIdIsValid(snap.xmin) ?
     233      144176 :             snap.xmin : snap.xmax;
     234             : 
     235             :         /*
     236             :          * If the tuple is locked, wait for locking transaction to finish and
     237             :          * retry.
     238             :          */
     239      144176 :         if (TransactionIdIsValid(xwait))
     240             :         {
     241           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     242           0 :             goto retry;
     243             :         }
     244             : 
     245             :         /* Found our tuple and it's not locked */
     246      144176 :         found = true;
     247      144176 :         break;
     248             :     }
     249             : 
     250             :     /* Found tuple, try to lock it in the lockmode. */
     251      144208 :     if (found)
     252             :     {
     253             :         TM_FailureData tmfd;
     254             :         TM_Result   res;
     255             : 
     256      144176 :         PushActiveSnapshot(GetLatestSnapshot());
     257             : 
     258      144176 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     259             :                                outslot,
     260             :                                GetCurrentCommandId(false),
     261             :                                lockmode,
     262             :                                LockWaitBlock,
     263             :                                0 /* don't follow updates */ ,
     264             :                                &tmfd);
     265             : 
     266      144176 :         PopActiveSnapshot();
     267             : 
     268      144176 :         if (should_refetch_tuple(res, &tmfd))
     269           0 :             goto retry;
     270             :     }
     271             : 
     272      144208 :     index_endscan(scan);
     273             : 
     274             :     /* Don't release lock until commit. */
     275      144208 :     index_close(idxrel, NoLock);
     276             : 
     277      144208 :     return found;
     278             : }
     279             : 
     280             : /*
     281             :  * Compare the tuples in the slots by checking if they have equal values.
     282             :  *
     283             :  * If 'columns' is not null, only the columns specified within it will be
     284             :  * considered for the equality check, ignoring all other columns.
     285             :  */
     286             : static bool
     287      210646 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     288             :              TypeCacheEntry **eq, Bitmapset *columns)
     289             : {
     290             :     int         attrnum;
     291             : 
     292             :     Assert(slot1->tts_tupleDescriptor->natts ==
     293             :            slot2->tts_tupleDescriptor->natts);
     294             : 
     295      210646 :     slot_getallattrs(slot1);
     296      210646 :     slot_getallattrs(slot2);
     297             : 
     298             :     /* Check equality of the attributes. */
     299      211046 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     300             :     {
     301             :         Form_pg_attribute att;
     302             :         TypeCacheEntry *typentry;
     303             : 
     304      210718 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     305             : 
     306             :         /*
     307             :          * Ignore dropped and generated columns as the publisher doesn't send
     308             :          * those
     309             :          */
     310      210718 :         if (att->attisdropped || att->attgenerated)
     311           2 :             continue;
     312             : 
     313             :         /*
     314             :          * Ignore columns that are not listed for checking.
     315             :          */
     316      210716 :         if (columns &&
     317           0 :             !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     318             :                            columns))
     319           0 :             continue;
     320             : 
     321             :         /*
     322             :          * If one value is NULL and other is not, then they are certainly not
     323             :          * equal
     324             :          */
     325      210716 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     326           0 :             return false;
     327             : 
     328             :         /*
     329             :          * If both are NULL, they can be considered equal.
     330             :          */
     331      210716 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     332           2 :             continue;
     333             : 
     334      210714 :         typentry = eq[attrnum];
     335      210714 :         if (typentry == NULL)
     336             :         {
     337         400 :             typentry = lookup_type_cache(att->atttypid,
     338             :                                          TYPECACHE_EQ_OPR_FINFO);
     339         400 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     340           0 :                 ereport(ERROR,
     341             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     342             :                          errmsg("could not identify an equality operator for type %s",
     343             :                                 format_type_be(att->atttypid))));
     344         400 :             eq[attrnum] = typentry;
     345             :         }
     346             : 
     347      210714 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     348             :                                             att->attcollation,
     349      210714 :                                             slot1->tts_values[attrnum],
     350      210714 :                                             slot2->tts_values[attrnum])))
     351      210318 :             return false;
     352             :     }
     353             : 
     354         328 :     return true;
     355             : }
     356             : 
     357             : /*
     358             :  * Search the relation 'rel' for tuple using the sequential scan.
     359             :  *
     360             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     361             :  * contents, and return true.  Return false otherwise.
     362             :  *
     363             :  * Note that this stops on the first matching tuple.
     364             :  *
     365             :  * This can obviously be quite slow on tables that have more than few rows.
     366             :  */
     367             : bool
     368         298 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     369             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     370             : {
     371             :     TupleTableSlot *scanslot;
     372             :     TableScanDesc scan;
     373             :     SnapshotData snap;
     374             :     TypeCacheEntry **eq;
     375             :     TransactionId xwait;
     376             :     bool        found;
     377         298 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     378             : 
     379             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     380             : 
     381         298 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     382             : 
     383             :     /* Start a heap scan. */
     384         298 :     InitDirtySnapshot(snap);
     385         298 :     scan = table_beginscan(rel, &snap, 0, NULL);
     386         298 :     scanslot = table_slot_create(rel, NULL);
     387             : 
     388           0 : retry:
     389         298 :     found = false;
     390             : 
     391         298 :     table_rescan(scan, NULL);
     392             : 
     393             :     /* Try to find the tuple */
     394      210616 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     395             :     {
     396      210612 :         if (!tuples_equal(scanslot, searchslot, eq, NULL))
     397      210318 :             continue;
     398             : 
     399         294 :         found = true;
     400         294 :         ExecCopySlot(outslot, scanslot);
     401             : 
     402         588 :         xwait = TransactionIdIsValid(snap.xmin) ?
     403         294 :             snap.xmin : snap.xmax;
     404             : 
     405             :         /*
     406             :          * If the tuple is locked, wait for locking transaction to finish and
     407             :          * retry.
     408             :          */
     409         294 :         if (TransactionIdIsValid(xwait))
     410             :         {
     411           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     412           0 :             goto retry;
     413             :         }
     414             : 
     415             :         /* Found our tuple and it's not locked */
     416         294 :         break;
     417             :     }
     418             : 
     419             :     /* Found tuple, try to lock it in the lockmode. */
     420         298 :     if (found)
     421             :     {
     422             :         TM_FailureData tmfd;
     423             :         TM_Result   res;
     424             : 
     425         294 :         PushActiveSnapshot(GetLatestSnapshot());
     426             : 
     427         294 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     428             :                                outslot,
     429             :                                GetCurrentCommandId(false),
     430             :                                lockmode,
     431             :                                LockWaitBlock,
     432             :                                0 /* don't follow updates */ ,
     433             :                                &tmfd);
     434             : 
     435         294 :         PopActiveSnapshot();
     436             : 
     437         294 :         if (should_refetch_tuple(res, &tmfd))
     438           0 :             goto retry;
     439             :     }
     440             : 
     441         298 :     table_endscan(scan);
     442         298 :     ExecDropSingleTupleTableSlot(scanslot);
     443             : 
     444         298 :     return found;
     445             : }
     446             : 
     447             : /*
     448             :  * Build additional index information necessary for conflict detection.
     449             :  */
     450             : static void
     451          62 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
     452             : {
     453         188 :     for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
     454             :     {
     455         126 :         Relation    indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
     456         126 :         IndexInfo  *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
     457             : 
     458         126 :         if (conflictindex != RelationGetRelid(indexRelation))
     459          64 :             continue;
     460             : 
     461             :         /*
     462             :          * This Assert will fail if BuildSpeculativeIndexInfo() is called
     463             :          * twice for the given index.
     464             :          */
     465             :         Assert(indexRelationInfo->ii_UniqueOps == NULL);
     466             : 
     467          62 :         BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
     468             :     }
     469          62 : }
     470             : 
     471             : /*
     472             :  * If the tuple is recently dead and was deleted by a transaction with a newer
     473             :  * commit timestamp than previously recorded, update the associated transaction
     474             :  * ID, commit time, and origin. This helps ensure that conflict detection uses
     475             :  * the most recent and relevant deletion metadata.
     476             :  */
     477             : static void
     478           2 : update_most_recent_deletion_info(TupleTableSlot *scanslot,
     479             :                                  TransactionId oldestxmin,
     480             :                                  TransactionId *delete_xid,
     481             :                                  TimestampTz *delete_time,
     482             :                                  RepOriginId *delete_origin)
     483             : {
     484             :     BufferHeapTupleTableSlot *hslot;
     485             :     HeapTuple   tuple;
     486             :     Buffer      buf;
     487           2 :     bool        recently_dead = false;
     488             :     TransactionId xmax;
     489             :     TimestampTz localts;
     490             :     RepOriginId localorigin;
     491             : 
     492           2 :     hslot = (BufferHeapTupleTableSlot *) scanslot;
     493             : 
     494           2 :     tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
     495           2 :     buf = hslot->buffer;
     496             : 
     497           2 :     LockBuffer(buf, BUFFER_LOCK_SHARE);
     498             : 
     499             :     /*
     500             :      * We do not consider HEAPTUPLE_DEAD status because it indicates either
     501             :      * tuples whose inserting transaction was aborted (meaning there is no
     502             :      * commit timestamp or origin), or tuples deleted by a transaction older
     503             :      * than oldestxmin, making it safe to ignore them during conflict
     504             :      * detection (See comments atop worker.c for details).
     505             :      */
     506           2 :     if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
     507           2 :         recently_dead = true;
     508             : 
     509           2 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     510             : 
     511           2 :     if (!recently_dead)
     512           0 :         return;
     513             : 
     514           2 :     xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
     515           2 :     if (!TransactionIdIsValid(xmax))
     516           0 :         return;
     517             : 
     518             :     /* Select the dead tuple with the most recent commit timestamp */
     519           4 :     if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
     520           2 :         TimestampDifferenceExceeds(*delete_time, localts, 0))
     521             :     {
     522           2 :         *delete_xid = xmax;
     523           2 :         *delete_time = localts;
     524           2 :         *delete_origin = localorigin;
     525             :     }
     526             : }
     527             : 
     528             : /*
     529             :  * Searches the relation 'rel' for the most recently deleted tuple that matches
     530             :  * the values in 'searchslot' and is not yet removable by VACUUM. The function
     531             :  * returns the transaction ID, origin, and commit timestamp of the transaction
     532             :  * that deleted this tuple.
     533             :  *
     534             :  * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
     535             :  * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
     536             :  * conflict detection.
     537             :  *
     538             :  * Instead of stopping at the first match, we scan all matching dead tuples to
     539             :  * identify most recent deletion. This is crucial because only the latest
     540             :  * deletion is relevant for resolving conflicts.
     541             :  *
     542             :  * For example, consider a scenario on the subscriber where a row is deleted,
     543             :  * re-inserted, and then deleted again only on the subscriber:
     544             :  *
     545             :  *   - (pk, 1) - deleted at 9:00,
     546             :  *   - (pk, 1) - deleted at 9:02,
     547             :  *
     548             :  * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
     549             :  *
     550             :  * If we mistakenly return the older deletion (9:00), the system may wrongly
     551             :  * apply the remote update using a last-update-wins strategy. Instead, we must
     552             :  * recognize the more recent deletion at 9:02 and skip the update. See
     553             :  * comments atop worker.c for details. Note, as of now, conflict resolution
     554             :  * is not implemented. Consequently, the system may incorrectly report the
     555             :  * older tuple as the conflicted one, leading to misleading results.
     556             :  *
     557             :  * The commit timestamp of the deleting transaction is used to determine which
     558             :  * tuple was deleted most recently.
     559             :  */
     560             : bool
     561           0 : RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
     562             :                                 TransactionId oldestxmin,
     563             :                                 TransactionId *delete_xid,
     564             :                                 RepOriginId *delete_origin,
     565             :                                 TimestampTz *delete_time)
     566             : {
     567             :     TupleTableSlot *scanslot;
     568             :     TableScanDesc scan;
     569             :     TypeCacheEntry **eq;
     570             :     Bitmapset  *indexbitmap;
     571           0 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     572             : 
     573             :     Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     574             : 
     575           0 :     *delete_xid = InvalidTransactionId;
     576           0 :     *delete_origin = InvalidRepOriginId;
     577           0 :     *delete_time = 0;
     578             : 
     579             :     /*
     580             :      * If the relation has a replica identity key or a primary key that is
     581             :      * unusable for locating deleted tuples (see
     582             :      * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
     583             :      * necessary. In such cases, comparing the entire tuple is not required,
     584             :      * since the remote tuple might not include all column values. Instead,
     585             :      * the indexed columns alone are sufficient to identify the target tuple
     586             :      * (see logicalrep_rel_mark_updatable).
     587             :      */
     588           0 :     indexbitmap = RelationGetIndexAttrBitmap(rel,
     589             :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
     590             : 
     591             :     /* fallback to PK if no replica identity */
     592           0 :     if (!indexbitmap)
     593           0 :         indexbitmap = RelationGetIndexAttrBitmap(rel,
     594             :                                                  INDEX_ATTR_BITMAP_PRIMARY_KEY);
     595             : 
     596           0 :     eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts);
     597             : 
     598             :     /*
     599             :      * Start a heap scan using SnapshotAny to identify dead tuples that are
     600             :      * not visible under a standard MVCC snapshot. Tuples from transactions
     601             :      * not yet committed or those just committed prior to the scan are
     602             :      * excluded in update_most_recent_deletion_info().
     603             :      */
     604           0 :     scan = table_beginscan(rel, SnapshotAny, 0, NULL);
     605           0 :     scanslot = table_slot_create(rel, NULL);
     606             : 
     607           0 :     table_rescan(scan, NULL);
     608             : 
     609             :     /* Try to find the tuple */
     610           0 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     611             :     {
     612           0 :         if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
     613           0 :             continue;
     614             : 
     615           0 :         update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     616             :                                          delete_time, delete_origin);
     617             :     }
     618             : 
     619           0 :     table_endscan(scan);
     620           0 :     ExecDropSingleTupleTableSlot(scanslot);
     621             : 
     622           0 :     return *delete_time != 0;
     623             : }
     624             : 
     625             : /*
     626             :  * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
     627             :  * the deleted tuple.
     628             :  */
     629             : bool
     630           2 : RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
     631             :                                     TupleTableSlot *searchslot,
     632             :                                     TransactionId oldestxmin,
     633             :                                     TransactionId *delete_xid,
     634             :                                     RepOriginId *delete_origin,
     635             :                                     TimestampTz *delete_time)
     636             : {
     637             :     Relation    idxrel;
     638             :     ScanKeyData skey[INDEX_MAX_KEYS];
     639             :     int         skey_attoff;
     640             :     IndexScanDesc scan;
     641             :     TupleTableSlot *scanslot;
     642           2 :     TypeCacheEntry **eq = NULL;
     643             :     bool        isIdxSafeToSkipDuplicates;
     644           2 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     645             : 
     646             :     Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     647             :     Assert(OidIsValid(idxoid));
     648             : 
     649           2 :     *delete_xid = InvalidTransactionId;
     650           2 :     *delete_time = 0;
     651           2 :     *delete_origin = InvalidRepOriginId;
     652             : 
     653           2 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     654             : 
     655           2 :     scanslot = table_slot_create(rel, NULL);
     656             : 
     657           2 :     idxrel = index_open(idxoid, RowExclusiveLock);
     658             : 
     659             :     /* Build scan key. */
     660           2 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     661             : 
     662             :     /*
     663             :      * Start an index scan using SnapshotAny to identify dead tuples that are
     664             :      * not visible under a standard MVCC snapshot. Tuples from transactions
     665             :      * not yet committed or those just committed prior to the scan are
     666             :      * excluded in update_most_recent_deletion_info().
     667             :      */
     668           2 :     scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
     669             : 
     670           2 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     671             : 
     672             :     /* Try to find the tuple */
     673           4 :     while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
     674             :     {
     675             :         /*
     676             :          * Avoid expensive equality check if the index is primary key or
     677             :          * replica identity index.
     678             :          */
     679           2 :         if (!isIdxSafeToSkipDuplicates)
     680             :         {
     681           0 :             if (eq == NULL)
     682           0 :                 eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts);
     683             : 
     684           0 :             if (!tuples_equal(scanslot, searchslot, eq, NULL))
     685           0 :                 continue;
     686             :         }
     687             : 
     688           2 :         update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     689             :                                          delete_time, delete_origin);
     690             :     }
     691             : 
     692           2 :     index_endscan(scan);
     693             : 
     694           2 :     index_close(idxrel, NoLock);
     695             : 
     696           2 :     ExecDropSingleTupleTableSlot(scanslot);
     697             : 
     698           2 :     return *delete_time != 0;
     699             : }
     700             : 
     701             : /*
     702             :  * Find the tuple that violates the passed unique index (conflictindex).
     703             :  *
     704             :  * If the conflicting tuple is found return true, otherwise false.
     705             :  *
     706             :  * We lock the tuple to avoid getting it deleted before the caller can fetch
     707             :  * the required information. Note that if the tuple is deleted before a lock
     708             :  * is acquired, we will retry to find the conflicting tuple again.
     709             :  */
     710             : static bool
     711          62 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
     712             :                   Oid conflictindex, TupleTableSlot *slot,
     713             :                   TupleTableSlot **conflictslot)
     714             : {
     715          62 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     716             :     ItemPointerData conflictTid;
     717             :     TM_FailureData tmfd;
     718             :     TM_Result   res;
     719             : 
     720          62 :     *conflictslot = NULL;
     721             : 
     722             :     /*
     723             :      * Build additional information required to check constraints violations.
     724             :      * See check_exclusion_or_unique_constraint().
     725             :      */
     726          62 :     BuildConflictIndexInfo(resultRelInfo, conflictindex);
     727             : 
     728          62 : retry:
     729          62 :     if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
     730             :                                   &conflictTid, &slot->tts_tid,
     731          62 :                                   list_make1_oid(conflictindex)))
     732             :     {
     733           2 :         if (*conflictslot)
     734           0 :             ExecDropSingleTupleTableSlot(*conflictslot);
     735             : 
     736           2 :         *conflictslot = NULL;
     737           2 :         return false;
     738             :     }
     739             : 
     740          58 :     *conflictslot = table_slot_create(rel, NULL);
     741             : 
     742          58 :     PushActiveSnapshot(GetLatestSnapshot());
     743             : 
     744          58 :     res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
     745             :                            *conflictslot,
     746             :                            GetCurrentCommandId(false),
     747             :                            LockTupleShare,
     748             :                            LockWaitBlock,
     749             :                            0 /* don't follow updates */ ,
     750             :                            &tmfd);
     751             : 
     752          58 :     PopActiveSnapshot();
     753             : 
     754          58 :     if (should_refetch_tuple(res, &tmfd))
     755           0 :         goto retry;
     756             : 
     757          58 :     return true;
     758             : }
     759             : 
     760             : /*
     761             :  * Check all the unique indexes in 'recheckIndexes' for conflict with the
     762             :  * tuple in 'remoteslot' and report if found.
     763             :  */
     764             : static void
     765          36 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
     766             :                        ConflictType type, List *recheckIndexes,
     767             :                        TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
     768             : {
     769          36 :     List       *conflicttuples = NIL;
     770             :     TupleTableSlot *conflictslot;
     771             : 
     772             :     /* Check all the unique indexes for conflicts */
     773         130 :     foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
     774             :     {
     775         122 :         if (list_member_oid(recheckIndexes, uniqueidx) &&
     776          62 :             FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
     777             :                               &conflictslot))
     778             :         {
     779          58 :             ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
     780             : 
     781          58 :             conflicttuple->slot = conflictslot;
     782          58 :             conflicttuple->indexoid = uniqueidx;
     783             : 
     784          58 :             GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
     785             :                                     &conflicttuple->origin, &conflicttuple->ts);
     786             : 
     787          58 :             conflicttuples = lappend(conflicttuples, conflicttuple);
     788             :         }
     789             :     }
     790             : 
     791             :     /* Report the conflict, if found */
     792          34 :     if (conflicttuples)
     793          32 :         ReportApplyConflict(estate, resultRelInfo, ERROR,
     794          32 :                             list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
     795             :                             searchslot, remoteslot, conflicttuples);
     796           2 : }
     797             : 
     798             : /*
     799             :  * Insert tuple represented in the slot to the relation, update the indexes,
     800             :  * and execute any constraints and per-row triggers.
     801             :  *
     802             :  * Caller is responsible for opening the indexes.
     803             :  */
     804             : void
     805      152542 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     806             :                          EState *estate, TupleTableSlot *slot)
     807             : {
     808      152542 :     bool        skip_tuple = false;
     809      152542 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     810             : 
     811             :     /* For now we support only tables. */
     812             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     813             : 
     814      152542 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     815             : 
     816             :     /* BEFORE ROW INSERT Triggers */
     817      152542 :     if (resultRelInfo->ri_TrigDesc &&
     818          40 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     819             :     {
     820           6 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     821           2 :             skip_tuple = true;  /* "do nothing" */
     822             :     }
     823             : 
     824      152542 :     if (!skip_tuple)
     825             :     {
     826      152540 :         List       *recheckIndexes = NIL;
     827             :         List       *conflictindexes;
     828      152540 :         bool        conflict = false;
     829             : 
     830             :         /* Compute stored generated columns */
     831      152540 :         if (rel->rd_att->constr &&
     832       90948 :             rel->rd_att->constr->has_generated_stored)
     833           8 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     834             :                                        CMD_INSERT);
     835             : 
     836             :         /* Check the constraints of the tuple */
     837      152540 :         if (rel->rd_att->constr)
     838       90948 :             ExecConstraints(resultRelInfo, slot, estate);
     839      152540 :         if (rel->rd_rel->relispartition)
     840         134 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     841             : 
     842             :         /* OK, store the tuple and create index entries for it */
     843      152540 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     844             : 
     845      152540 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     846             : 
     847      152540 :         if (resultRelInfo->ri_NumIndices > 0)
     848      111864 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     849             :                                                    slot, estate, false,
     850             :                                                    conflictindexes ? true : false,
     851             :                                                    &conflict,
     852             :                                                    conflictindexes, false);
     853             : 
     854             :         /*
     855             :          * Checks the conflict indexes to fetch the conflicting local tuple
     856             :          * and reports the conflict. We perform this check here, instead of
     857             :          * performing an additional index scan before the actual insertion and
     858             :          * reporting the conflict if any conflicting tuples are found. This is
     859             :          * to avoid the overhead of executing the extra scan for each INSERT
     860             :          * operation, even when no conflict arises, which could introduce
     861             :          * significant overhead to replication, particularly in cases where
     862             :          * conflicts are rare.
     863             :          *
     864             :          * XXX OTOH, this could lead to clean-up effort for dead tuples added
     865             :          * in heap and index in case of conflicts. But as conflicts shouldn't
     866             :          * be a frequent thing so we preferred to save the performance
     867             :          * overhead of extra scan before each insertion.
     868             :          */
     869      152540 :         if (conflict)
     870          32 :             CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
     871             :                                    recheckIndexes, NULL, slot);
     872             : 
     873             :         /* AFTER ROW INSERT Triggers */
     874      152510 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     875             :                              recheckIndexes, NULL);
     876             : 
     877             :         /*
     878             :          * XXX we should in theory pass a TransitionCaptureState object to the
     879             :          * above to capture transition tuples, but after statement triggers
     880             :          * don't actually get fired by replication yet anyway
     881             :          */
     882             : 
     883      152510 :         list_free(recheckIndexes);
     884             :     }
     885      152512 : }
     886             : 
     887             : /*
     888             :  * Find the searchslot tuple and update it with data in the slot,
     889             :  * update the indexes, and execute any constraints and per-row triggers.
     890             :  *
     891             :  * Caller is responsible for opening the indexes.
     892             :  */
     893             : void
     894       63846 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     895             :                          EState *estate, EPQState *epqstate,
     896             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     897             : {
     898       63846 :     bool        skip_tuple = false;
     899       63846 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     900       63846 :     ItemPointer tid = &(searchslot->tts_tid);
     901             : 
     902             :     /*
     903             :      * We support only non-system tables, with
     904             :      * check_publication_add_relation() accountable.
     905             :      */
     906             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     907             :     Assert(!IsCatalogRelation(rel));
     908             : 
     909       63846 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     910             : 
     911             :     /* BEFORE ROW UPDATE Triggers */
     912       63846 :     if (resultRelInfo->ri_TrigDesc &&
     913          20 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     914             :     {
     915           6 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     916             :                                   tid, NULL, slot, NULL, NULL, false))
     917           4 :             skip_tuple = true;  /* "do nothing" */
     918             :     }
     919             : 
     920       63846 :     if (!skip_tuple)
     921             :     {
     922       63842 :         List       *recheckIndexes = NIL;
     923             :         TU_UpdateIndexes update_indexes;
     924             :         List       *conflictindexes;
     925       63842 :         bool        conflict = false;
     926             : 
     927             :         /* Compute stored generated columns */
     928       63842 :         if (rel->rd_att->constr &&
     929       63752 :             rel->rd_att->constr->has_generated_stored)
     930           4 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     931             :                                        CMD_UPDATE);
     932             : 
     933             :         /* Check the constraints of the tuple */
     934       63842 :         if (rel->rd_att->constr)
     935       63752 :             ExecConstraints(resultRelInfo, slot, estate);
     936       63842 :         if (rel->rd_rel->relispartition)
     937          24 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     938             : 
     939       63842 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     940             :                                   &update_indexes);
     941             : 
     942       63842 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     943             : 
     944       63842 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     945       40406 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     946             :                                                    slot, estate, true,
     947             :                                                    conflictindexes ? true : false,
     948             :                                                    &conflict, conflictindexes,
     949             :                                                    (update_indexes == TU_Summarizing));
     950             : 
     951             :         /*
     952             :          * Refer to the comments above the call to CheckAndReportConflict() in
     953             :          * ExecSimpleRelationInsert to understand why this check is done at
     954             :          * this point.
     955             :          */
     956       63842 :         if (conflict)
     957           4 :             CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
     958             :                                    recheckIndexes, searchslot, slot);
     959             : 
     960             :         /* AFTER ROW UPDATE Triggers */
     961       63838 :         ExecARUpdateTriggers(estate, resultRelInfo,
     962             :                              NULL, NULL,
     963             :                              tid, NULL, slot,
     964             :                              recheckIndexes, NULL, false);
     965             : 
     966       63838 :         list_free(recheckIndexes);
     967             :     }
     968       63842 : }
     969             : 
     970             : /*
     971             :  * Find the searchslot tuple and delete it, and execute any constraints
     972             :  * and per-row triggers.
     973             :  *
     974             :  * Caller is responsible for opening the indexes.
     975             :  */
     976             : void
     977       80624 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     978             :                          EState *estate, EPQState *epqstate,
     979             :                          TupleTableSlot *searchslot)
     980             : {
     981       80624 :     bool        skip_tuple = false;
     982       80624 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     983       80624 :     ItemPointer tid = &searchslot->tts_tid;
     984             : 
     985       80624 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     986             : 
     987             :     /* BEFORE ROW DELETE Triggers */
     988       80624 :     if (resultRelInfo->ri_TrigDesc &&
     989          20 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     990             :     {
     991           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     992           0 :                                            tid, NULL, NULL, NULL, NULL, false);
     993             :     }
     994             : 
     995       80624 :     if (!skip_tuple)
     996             :     {
     997             :         /* OK, delete the tuple */
     998       80624 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     999             : 
    1000             :         /* AFTER ROW DELETE Triggers */
    1001       80624 :         ExecARDeleteTriggers(estate, resultRelInfo,
    1002             :                              tid, NULL, NULL, false);
    1003             :     }
    1004       80624 : }
    1005             : 
    1006             : /*
    1007             :  * Check if command can be executed with current replica identity.
    1008             :  */
    1009             : void
    1010      422718 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
    1011             : {
    1012             :     PublicationDesc pubdesc;
    1013             : 
    1014             :     /*
    1015             :      * Skip checking the replica identity for partitioned tables, because the
    1016             :      * operations are actually performed on the leaf partitions.
    1017             :      */
    1018      422718 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    1019      400562 :         return;
    1020             : 
    1021             :     /* We only need to do checks for UPDATE and DELETE. */
    1022      418662 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
    1023      246032 :         return;
    1024             : 
    1025             :     /*
    1026             :      * It is only safe to execute UPDATE/DELETE if the relation does not
    1027             :      * publish UPDATEs or DELETEs, or all the following conditions are
    1028             :      * satisfied:
    1029             :      *
    1030             :      * 1. All columns, referenced in the row filters from publications which
    1031             :      * the relation is in, are valid - i.e. when all referenced columns are
    1032             :      * part of REPLICA IDENTITY.
    1033             :      *
    1034             :      * 2. All columns, referenced in the column lists are valid - i.e. when
    1035             :      * all columns referenced in the REPLICA IDENTITY are covered by the
    1036             :      * column list.
    1037             :      *
    1038             :      * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
    1039             :      * - i.e. when all these generated columns are published.
    1040             :      *
    1041             :      * XXX We could optimize it by first checking whether any of the
    1042             :      * publications have a row filter or column list for this relation, or if
    1043             :      * the relation contains a generated column. If none of these exist and
    1044             :      * the relation has replica identity then we can avoid building the
    1045             :      * descriptor but as this happens only one time it doesn't seem worth the
    1046             :      * additional complexity.
    1047             :      */
    1048      172630 :     RelationBuildPublicationDesc(rel, &pubdesc);
    1049      172630 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
    1050          60 :         ereport(ERROR,
    1051             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1052             :                  errmsg("cannot update table \"%s\"",
    1053             :                         RelationGetRelationName(rel)),
    1054             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1055      172570 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
    1056         108 :         ereport(ERROR,
    1057             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1058             :                  errmsg("cannot update table \"%s\"",
    1059             :                         RelationGetRelationName(rel)),
    1060             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1061      172462 :     else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
    1062          24 :         ereport(ERROR,
    1063             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1064             :                  errmsg("cannot update table \"%s\"",
    1065             :                         RelationGetRelationName(rel)),
    1066             :                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1067      172438 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
    1068           0 :         ereport(ERROR,
    1069             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1070             :                  errmsg("cannot delete from table \"%s\"",
    1071             :                         RelationGetRelationName(rel)),
    1072             :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1073      172438 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
    1074           0 :         ereport(ERROR,
    1075             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1076             :                  errmsg("cannot delete from table \"%s\"",
    1077             :                         RelationGetRelationName(rel)),
    1078             :                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1079      172438 :     else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
    1080           0 :         ereport(ERROR,
    1081             :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1082             :                  errmsg("cannot delete from table \"%s\"",
    1083             :                         RelationGetRelationName(rel)),
    1084             :                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1085             : 
    1086             :     /* If relation has replica identity we are always good. */
    1087      172438 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
    1088      150018 :         return;
    1089             : 
    1090             :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
    1091       22420 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
    1092         456 :         return;
    1093             : 
    1094             :     /*
    1095             :      * This is UPDATE/DELETE and there is no replica identity.
    1096             :      *
    1097             :      * Check if the table publishes UPDATES or DELETES.
    1098             :      */
    1099       21964 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
    1100         106 :         ereport(ERROR,
    1101             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1102             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
    1103             :                         RelationGetRelationName(rel)),
    1104             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1105       21858 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
    1106          10 :         ereport(ERROR,
    1107             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1108             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
    1109             :                         RelationGetRelationName(rel)),
    1110             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1111             : }
    1112             : 
    1113             : 
    1114             : /*
    1115             :  * Check if we support writing into specific relkind.
    1116             :  *
    1117             :  * The nspname and relname are only needed for error reporting.
    1118             :  */
    1119             : void
    1120        1780 : CheckSubscriptionRelkind(char relkind, const char *nspname,
    1121             :                          const char *relname)
    1122             : {
    1123        1780 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
    1124           0 :         ereport(ERROR,
    1125             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1126             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
    1127             :                         nspname, relname),
    1128             :                  errdetail_relkind_not_supported(relkind)));
    1129        1780 : }

Generated by: LCOV version 1.16