LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 87.2 % 344 300
Test Date: 2026-03-01 14:14:54 Functions: 100.0 % 16 16
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * execReplication.c
       4              :  *    miscellaneous executor routines for logical replication
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  * Portions Copyright (c) 1994, Regents of the University of California
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/executor/execReplication.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres.h"
      16              : 
      17              : #include "access/amapi.h"
      18              : #include "access/commit_ts.h"
      19              : #include "access/genam.h"
      20              : #include "access/gist.h"
      21              : #include "access/relscan.h"
      22              : #include "access/tableam.h"
      23              : #include "access/transam.h"
      24              : #include "access/xact.h"
      25              : #include "access/heapam.h"
      26              : #include "catalog/pg_am_d.h"
      27              : #include "commands/trigger.h"
      28              : #include "executor/executor.h"
      29              : #include "executor/nodeModifyTable.h"
      30              : #include "replication/conflict.h"
      31              : #include "replication/logicalrelation.h"
      32              : #include "storage/lmgr.h"
      33              : #include "utils/builtins.h"
      34              : #include "utils/lsyscache.h"
      35              : #include "utils/rel.h"
      36              : #include "utils/snapmgr.h"
      37              : #include "utils/syscache.h"
      38              : #include "utils/typcache.h"
      39              : 
      40              : 
      41              : static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
      42              :                          TypeCacheEntry **eq, Bitmapset *columns);
      43              : 
      44              : /*
      45              :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      46              :  * is setup to match 'rel' (*NOT* idxrel!).
      47              :  *
      48              :  * Returns how many columns to use for the index scan.
      49              :  *
      50              :  * This is not a generic routine, idxrel must be PK, RI, or an index that can be
      51              :  * used for a REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
      52              :  * for details.
      53              :  *
      54              :  * By definition, replication identity of a rel meets all limitations associated
      55              :  * with that. Note that any other index could also meet these limitations.
      56              :  */
      57              : static int
      58        72119 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      59              :                          TupleTableSlot *searchslot)
      60              : {
      61              :     int         index_attoff;
      62        72119 :     int         skey_attoff = 0;
      63              :     Datum       indclassDatum;
      64              :     oidvector  *opclass;
      65        72119 :     int2vector *indkey = &idxrel->rd_index->indkey;
      66              : 
      67        72119 :     indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
      68              :                                            Anum_pg_index_indclass);
      69        72119 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      70              : 
      71              :     /* Build scankey for every non-expression attribute in the index. */
      72       144261 :     for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
      73        72142 :          index_attoff++)
      74              :     {
      75              :         Oid         operator;
      76              :         Oid         optype;
      77              :         Oid         opfamily;
      78              :         RegProcedure regop;
      79        72142 :         int         table_attno = indkey->values[index_attoff];
      80              :         StrategyNumber eq_strategy;
      81              : 
      82        72142 :         if (!AttributeNumberIsValid(table_attno))
      83              :         {
      84              :             /*
      85              :              * XXX: Currently, we don't support expressions in the scan key,
      86              :              * see code below.
      87              :              */
      88            2 :             continue;
      89              :         }
      90              : 
      91              :         /*
      92              :          * Load the operator info.  We need this to get the equality operator
      93              :          * function for the scan key.
      94              :          */
      95        72140 :         optype = get_opclass_input_type(opclass->values[index_attoff]);
      96        72140 :         opfamily = get_opclass_family(opclass->values[index_attoff]);
      97        72140 :         eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
      98        72140 :         operator = get_opfamily_member(opfamily, optype,
      99              :                                        optype,
     100              :                                        eq_strategy);
     101              : 
     102        72140 :         if (!OidIsValid(operator))
     103            0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
     104              :                  eq_strategy, optype, optype, opfamily);
     105              : 
     106        72140 :         regop = get_opcode(operator);
     107              : 
     108              :         /* Initialize the scankey. */
     109        72140 :         ScanKeyInit(&skey[skey_attoff],
     110        72140 :                     index_attoff + 1,
     111              :                     eq_strategy,
     112              :                     regop,
     113        72140 :                     searchslot->tts_values[table_attno - 1]);
     114              : 
     115        72140 :         skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
     116              : 
     117              :         /* Check for null value. */
     118        72140 :         if (searchslot->tts_isnull[table_attno - 1])
     119            1 :             skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
     120              : 
     121        72140 :         skey_attoff++;
     122              :     }
     123              : 
     124              :     /* There must always be at least one attribute for the index scan. */
     125              :     Assert(skey_attoff > 0);
     126              : 
     127        72119 :     return skey_attoff;
     128              : }
     129              : 
     130              : 
     131              : /*
     132              :  * Helper function to check if it is necessary to re-fetch and lock the tuple
     133              :  * due to concurrent modifications. This function should be called after
     134              :  * invoking table_tuple_lock.
     135              :  */
     136              : static bool
     137        72281 : should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
     138              : {
     139        72281 :     bool        refetch = false;
     140              : 
     141        72281 :     switch (res)
     142              :     {
     143        72281 :         case TM_Ok:
     144        72281 :             break;
     145            0 :         case TM_Updated:
     146              :             /* XXX: Improve handling here */
     147            0 :             if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
     148            0 :                 ereport(LOG,
     149              :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     150              :                          errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     151              :             else
     152            0 :                 ereport(LOG,
     153              :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     154              :                          errmsg("concurrent update, retrying")));
     155            0 :             refetch = true;
     156            0 :             break;
     157            0 :         case TM_Deleted:
     158              :             /* XXX: Improve handling here */
     159            0 :             ereport(LOG,
     160              :                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     161              :                      errmsg("concurrent delete, retrying")));
     162            0 :             refetch = true;
     163            0 :             break;
     164            0 :         case TM_Invisible:
     165            0 :             elog(ERROR, "attempted to lock invisible tuple");
     166              :             break;
     167            0 :         default:
     168            0 :             elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     169              :             break;
     170              :     }
     171              : 
     172        72281 :     return refetch;
     173              : }
     174              : 
     175              : /*
     176              :  * Search the relation 'rel' for tuple using the index.
     177              :  *
     178              :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     179              :  * contents, and return true.  Return false otherwise.
     180              :  */
     181              : bool
     182        72118 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     183              :                              LockTupleMode lockmode,
     184              :                              TupleTableSlot *searchslot,
     185              :                              TupleTableSlot *outslot)
     186              : {
     187              :     ScanKeyData skey[INDEX_MAX_KEYS];
     188              :     int         skey_attoff;
     189              :     IndexScanDesc scan;
     190              :     SnapshotData snap;
     191              :     TransactionId xwait;
     192              :     Relation    idxrel;
     193              :     bool        found;
     194        72118 :     TypeCacheEntry **eq = NULL;
     195              :     bool        isIdxSafeToSkipDuplicates;
     196              : 
     197              :     /* Open the index. */
     198        72118 :     idxrel = index_open(idxoid, RowExclusiveLock);
     199              : 
     200        72118 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     201              : 
     202        72118 :     InitDirtySnapshot(snap);
     203              : 
     204              :     /* Build scan key. */
     205        72118 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     206              : 
     207              :     /* Start an index scan. */
     208        72118 :     scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
     209              : 
     210            0 : retry:
     211        72118 :     found = false;
     212              : 
     213        72118 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     214              : 
     215              :     /* Try to find the tuple */
     216        72118 :     while (index_getnext_slot(scan, ForwardScanDirection, outslot))
     217              :     {
     218              :         /*
     219              :          * Avoid expensive equality check if the index is primary key or
     220              :          * replica identity index.
     221              :          */
     222        72088 :         if (!isIdxSafeToSkipDuplicates)
     223              :         {
     224           17 :             if (eq == NULL)
     225           17 :                 eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
     226              : 
     227           17 :             if (!tuples_equal(outslot, searchslot, eq, NULL))
     228            0 :                 continue;
     229              :         }
     230              : 
     231        72088 :         ExecMaterializeSlot(outslot);
     232              : 
     233       144176 :         xwait = TransactionIdIsValid(snap.xmin) ?
     234        72088 :             snap.xmin : snap.xmax;
     235              : 
     236              :         /*
     237              :          * If the tuple is locked, wait for locking transaction to finish and
     238              :          * retry.
     239              :          */
     240        72088 :         if (TransactionIdIsValid(xwait))
     241              :         {
     242            0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     243            0 :             goto retry;
     244              :         }
     245              : 
     246              :         /* Found our tuple and it's not locked */
     247        72088 :         found = true;
     248        72088 :         break;
     249              :     }
     250              : 
     251              :     /* Found tuple, try to lock it in the lockmode. */
     252        72118 :     if (found)
     253              :     {
     254              :         TM_FailureData tmfd;
     255              :         TM_Result   res;
     256              : 
     257        72088 :         PushActiveSnapshot(GetLatestSnapshot());
     258              : 
     259        72088 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     260              :                                outslot,
     261              :                                GetCurrentCommandId(false),
     262              :                                lockmode,
     263              :                                LockWaitBlock,
     264              :                                0 /* don't follow updates */ ,
     265              :                                &tmfd);
     266              : 
     267        72088 :         PopActiveSnapshot();
     268              : 
     269        72088 :         if (should_refetch_tuple(res, &tmfd))
     270            0 :             goto retry;
     271              :     }
     272              : 
     273        72118 :     index_endscan(scan);
     274              : 
     275              :     /* Don't release lock until commit. */
     276        72118 :     index_close(idxrel, NoLock);
     277              : 
     278        72118 :     return found;
     279              : }
     280              : 
     281              : /*
     282              :  * Compare the tuples in the slots by checking if they have equal values.
     283              :  *
     284              :  * If 'columns' is not null, only the columns specified within it will be
     285              :  * considered for the equality check, ignoring all other columns.
     286              :  */
     287              : static bool
     288       105325 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     289              :              TypeCacheEntry **eq, Bitmapset *columns)
     290              : {
     291              :     int         attrnum;
     292              : 
     293              :     Assert(slot1->tts_tupleDescriptor->natts ==
     294              :            slot2->tts_tupleDescriptor->natts);
     295              : 
     296       105325 :     slot_getallattrs(slot1);
     297       105325 :     slot_getallattrs(slot2);
     298              : 
     299              :     /* Check equality of the attributes. */
     300       105527 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     301              :     {
     302              :         Form_pg_attribute att;
     303              :         TypeCacheEntry *typentry;
     304              : 
     305       105362 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     306              : 
     307              :         /*
     308              :          * Ignore dropped and generated columns as the publisher doesn't send
     309              :          * those
     310              :          */
     311       105362 :         if (att->attisdropped || att->attgenerated)
     312            1 :             continue;
     313              : 
     314              :         /*
     315              :          * Ignore columns that are not listed for checking.
     316              :          */
     317       105361 :         if (columns &&
     318            0 :             !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
     319              :                            columns))
     320            0 :             continue;
     321              : 
     322              :         /*
     323              :          * If one value is NULL and other is not, then they are certainly not
     324              :          * equal
     325              :          */
     326       105361 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     327            0 :             return false;
     328              : 
     329              :         /*
     330              :          * If both are NULL, they can be considered equal.
     331              :          */
     332       105361 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     333            1 :             continue;
     334              : 
     335       105360 :         typentry = eq[attrnum];
     336       105360 :         if (typentry == NULL)
     337              :         {
     338          202 :             typentry = lookup_type_cache(att->atttypid,
     339              :                                          TYPECACHE_EQ_OPR_FINFO);
     340          202 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     341            0 :                 ereport(ERROR,
     342              :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     343              :                          errmsg("could not identify an equality operator for type %s",
     344              :                                 format_type_be(att->atttypid))));
     345          202 :             eq[attrnum] = typentry;
     346              :         }
     347              : 
     348       105360 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     349              :                                             att->attcollation,
     350       105360 :                                             slot1->tts_values[attrnum],
     351       105360 :                                             slot2->tts_values[attrnum])))
     352       105160 :             return false;
     353              :     }
     354              : 
     355          165 :     return true;
     356              : }
     357              : 
     358              : /*
     359              :  * Search the relation 'rel' for tuple using the sequential scan.
     360              :  *
     361              :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     362              :  * contents, and return true.  Return false otherwise.
     363              :  *
     364              :  * Note that this stops on the first matching tuple.
     365              :  *
     366              :  * This can obviously be quite slow on tables that have more than few rows.
     367              :  */
     368              : bool
     369          150 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     370              :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     371              : {
     372              :     TupleTableSlot *scanslot;
     373              :     TableScanDesc scan;
     374              :     SnapshotData snap;
     375              :     TypeCacheEntry **eq;
     376              :     TransactionId xwait;
     377              :     bool        found;
     378          150 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     379              : 
     380              :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     381              : 
     382          150 :     eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
     383              : 
     384              :     /* Start a heap scan. */
     385          150 :     InitDirtySnapshot(snap);
     386          150 :     scan = table_beginscan(rel, &snap, 0, NULL);
     387          150 :     scanslot = table_slot_create(rel, NULL);
     388              : 
     389            0 : retry:
     390          150 :     found = false;
     391              : 
     392          150 :     table_rescan(scan, NULL);
     393              : 
     394              :     /* Try to find the tuple */
     395       105309 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     396              :     {
     397       105305 :         if (!tuples_equal(scanslot, searchslot, eq, NULL))
     398       105159 :             continue;
     399              : 
     400          146 :         found = true;
     401          146 :         ExecCopySlot(outslot, scanslot);
     402              : 
     403          292 :         xwait = TransactionIdIsValid(snap.xmin) ?
     404          146 :             snap.xmin : snap.xmax;
     405              : 
     406              :         /*
     407              :          * If the tuple is locked, wait for locking transaction to finish and
     408              :          * retry.
     409              :          */
     410          146 :         if (TransactionIdIsValid(xwait))
     411              :         {
     412            0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     413            0 :             goto retry;
     414              :         }
     415              : 
     416              :         /* Found our tuple and it's not locked */
     417          146 :         break;
     418              :     }
     419              : 
     420              :     /* Found tuple, try to lock it in the lockmode. */
     421          150 :     if (found)
     422              :     {
     423              :         TM_FailureData tmfd;
     424              :         TM_Result   res;
     425              : 
     426          146 :         PushActiveSnapshot(GetLatestSnapshot());
     427              : 
     428          146 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
     429              :                                outslot,
     430              :                                GetCurrentCommandId(false),
     431              :                                lockmode,
     432              :                                LockWaitBlock,
     433              :                                0 /* don't follow updates */ ,
     434              :                                &tmfd);
     435              : 
     436          146 :         PopActiveSnapshot();
     437              : 
     438          146 :         if (should_refetch_tuple(res, &tmfd))
     439            0 :             goto retry;
     440              :     }
     441              : 
     442          150 :     table_endscan(scan);
     443          150 :     ExecDropSingleTupleTableSlot(scanslot);
     444              : 
     445          150 :     return found;
     446              : }
     447              : 
     448              : /*
     449              :  * Build additional index information necessary for conflict detection.
     450              :  */
     451              : static void
     452           49 : BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
     453              : {
     454          144 :     for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
     455              :     {
     456           95 :         Relation    indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
     457           95 :         IndexInfo  *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
     458              : 
     459           95 :         if (conflictindex != RelationGetRelid(indexRelation))
     460           46 :             continue;
     461              : 
     462              :         /*
     463              :          * This Assert will fail if BuildSpeculativeIndexInfo() is called
     464              :          * twice for the given index.
     465              :          */
     466              :         Assert(indexRelationInfo->ii_UniqueOps == NULL);
     467              : 
     468           49 :         BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
     469              :     }
     470           49 : }
     471              : 
     472              : /*
     473              :  * If the tuple is recently dead and was deleted by a transaction with a newer
     474              :  * commit timestamp than previously recorded, update the associated transaction
     475              :  * ID, commit time, and origin. This helps ensure that conflict detection uses
     476              :  * the most recent and relevant deletion metadata.
     477              :  */
     478              : static void
     479            3 : update_most_recent_deletion_info(TupleTableSlot *scanslot,
     480              :                                  TransactionId oldestxmin,
     481              :                                  TransactionId *delete_xid,
     482              :                                  TimestampTz *delete_time,
     483              :                                  ReplOriginId *delete_origin)
     484              : {
     485              :     BufferHeapTupleTableSlot *hslot;
     486              :     HeapTuple   tuple;
     487              :     Buffer      buf;
     488            3 :     bool        recently_dead = false;
     489              :     TransactionId xmax;
     490              :     TimestampTz localts;
     491              :     ReplOriginId localorigin;
     492              : 
     493            3 :     hslot = (BufferHeapTupleTableSlot *) scanslot;
     494              : 
     495            3 :     tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
     496            3 :     buf = hslot->buffer;
     497              : 
     498            3 :     LockBuffer(buf, BUFFER_LOCK_SHARE);
     499              : 
     500              :     /*
     501              :      * We do not consider HEAPTUPLE_DEAD status because it indicates either
     502              :      * tuples whose inserting transaction was aborted (meaning there is no
     503              :      * commit timestamp or origin), or tuples deleted by a transaction older
     504              :      * than oldestxmin, making it safe to ignore them during conflict
     505              :      * detection (See comments atop worker.c for details).
     506              :      */
     507            3 :     if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
     508            3 :         recently_dead = true;
     509              : 
     510            3 :     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
     511              : 
     512            3 :     if (!recently_dead)
     513            0 :         return;
     514              : 
     515            3 :     xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
     516            3 :     if (!TransactionIdIsValid(xmax))
     517            0 :         return;
     518              : 
     519              :     /* Select the dead tuple with the most recent commit timestamp */
     520            6 :     if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
     521            3 :         TimestampDifferenceExceeds(*delete_time, localts, 0))
     522              :     {
     523            3 :         *delete_xid = xmax;
     524            3 :         *delete_time = localts;
     525            3 :         *delete_origin = localorigin;
     526              :     }
     527              : }
     528              : 
     529              : /*
     530              :  * Searches the relation 'rel' for the most recently deleted tuple that matches
     531              :  * the values in 'searchslot' and is not yet removable by VACUUM. The function
     532              :  * returns the transaction ID, origin, and commit timestamp of the transaction
     533              :  * that deleted this tuple.
     534              :  *
     535              :  * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
     536              :  * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
     537              :  * conflict detection.
     538              :  *
     539              :  * Instead of stopping at the first match, we scan all matching dead tuples to
     540              :  * identify most recent deletion. This is crucial because only the latest
     541              :  * deletion is relevant for resolving conflicts.
     542              :  *
     543              :  * For example, consider a scenario on the subscriber where a row is deleted,
     544              :  * re-inserted, and then deleted again only on the subscriber:
     545              :  *
     546              :  *   - (pk, 1) - deleted at 9:00,
     547              :  *   - (pk, 1) - deleted at 9:02,
     548              :  *
     549              :  * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
     550              :  *
     551              :  * If we mistakenly return the older deletion (9:00), the system may wrongly
     552              :  * apply the remote update using a last-update-wins strategy. Instead, we must
     553              :  * recognize the more recent deletion at 9:02 and skip the update. See
     554              :  * comments atop worker.c for details. Note, as of now, conflict resolution
     555              :  * is not implemented. Consequently, the system may incorrectly report the
     556              :  * older tuple as the conflicted one, leading to misleading results.
     557              :  *
     558              :  * The commit timestamp of the deleting transaction is used to determine which
     559              :  * tuple was deleted most recently.
     560              :  */
     561              : bool
     562            2 : RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
     563              :                                 TransactionId oldestxmin,
     564              :                                 TransactionId *delete_xid,
     565              :                                 ReplOriginId *delete_origin,
     566              :                                 TimestampTz *delete_time)
     567              : {
     568              :     TupleTableSlot *scanslot;
     569              :     TableScanDesc scan;
     570              :     TypeCacheEntry **eq;
     571              :     Bitmapset  *indexbitmap;
     572            2 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     573              : 
     574              :     Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     575              : 
     576            2 :     *delete_xid = InvalidTransactionId;
     577            2 :     *delete_origin = InvalidReplOriginId;
     578            2 :     *delete_time = 0;
     579              : 
     580              :     /*
     581              :      * If the relation has a replica identity key or a primary key that is
     582              :      * unusable for locating deleted tuples (see
     583              :      * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
     584              :      * necessary. In such cases, comparing the entire tuple is not required,
     585              :      * since the remote tuple might not include all column values. Instead,
     586              :      * the indexed columns alone are sufficient to identify the target tuple
     587              :      * (see logicalrep_rel_mark_updatable).
     588              :      */
     589            2 :     indexbitmap = RelationGetIndexAttrBitmap(rel,
     590              :                                              INDEX_ATTR_BITMAP_IDENTITY_KEY);
     591              : 
     592              :     /* fallback to PK if no replica identity */
     593            2 :     if (!indexbitmap)
     594            2 :         indexbitmap = RelationGetIndexAttrBitmap(rel,
     595              :                                                  INDEX_ATTR_BITMAP_PRIMARY_KEY);
     596              : 
     597            2 :     eq = palloc0_array(TypeCacheEntry *, searchslot->tts_tupleDescriptor->natts);
     598              : 
     599              :     /*
     600              :      * Start a heap scan using SnapshotAny to identify dead tuples that are
     601              :      * not visible under a standard MVCC snapshot. Tuples from transactions
     602              :      * not yet committed or those just committed prior to the scan are
     603              :      * excluded in update_most_recent_deletion_info().
     604              :      */
     605            2 :     scan = table_beginscan(rel, SnapshotAny, 0, NULL);
     606            2 :     scanslot = table_slot_create(rel, NULL);
     607              : 
     608            2 :     table_rescan(scan, NULL);
     609              : 
     610              :     /* Try to find the tuple */
     611            5 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     612              :     {
     613            3 :         if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
     614            1 :             continue;
     615              : 
     616            2 :         update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     617              :                                          delete_time, delete_origin);
     618              :     }
     619              : 
     620            2 :     table_endscan(scan);
     621            2 :     ExecDropSingleTupleTableSlot(scanslot);
     622              : 
     623            2 :     return *delete_time != 0;
     624              : }
     625              : 
     626              : /*
     627              :  * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
     628              :  * the deleted tuple.
     629              :  */
     630              : bool
     631            1 : RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
     632              :                                     TupleTableSlot *searchslot,
     633              :                                     TransactionId oldestxmin,
     634              :                                     TransactionId *delete_xid,
     635              :                                     ReplOriginId *delete_origin,
     636              :                                     TimestampTz *delete_time)
     637              : {
     638              :     Relation    idxrel;
     639              :     ScanKeyData skey[INDEX_MAX_KEYS];
     640              :     int         skey_attoff;
     641              :     IndexScanDesc scan;
     642              :     TupleTableSlot *scanslot;
     643            1 :     TypeCacheEntry **eq = NULL;
     644              :     bool        isIdxSafeToSkipDuplicates;
     645            1 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     646              : 
     647              :     Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
     648              :     Assert(OidIsValid(idxoid));
     649              : 
     650            1 :     *delete_xid = InvalidTransactionId;
     651            1 :     *delete_time = 0;
     652            1 :     *delete_origin = InvalidReplOriginId;
     653              : 
     654            1 :     isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
     655              : 
     656            1 :     scanslot = table_slot_create(rel, NULL);
     657              : 
     658            1 :     idxrel = index_open(idxoid, RowExclusiveLock);
     659              : 
     660              :     /* Build scan key. */
     661            1 :     skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
     662              : 
     663              :     /*
     664              :      * Start an index scan using SnapshotAny to identify dead tuples that are
     665              :      * not visible under a standard MVCC snapshot. Tuples from transactions
     666              :      * not yet committed or those just committed prior to the scan are
     667              :      * excluded in update_most_recent_deletion_info().
     668              :      */
     669            1 :     scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
     670              : 
     671            1 :     index_rescan(scan, skey, skey_attoff, NULL, 0);
     672              : 
     673              :     /* Try to find the tuple */
     674            2 :     while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
     675              :     {
     676              :         /*
     677              :          * Avoid expensive equality check if the index is primary key or
     678              :          * replica identity index.
     679              :          */
     680            1 :         if (!isIdxSafeToSkipDuplicates)
     681              :         {
     682            0 :             if (eq == NULL)
     683            0 :                 eq = palloc0_array(TypeCacheEntry *, scanslot->tts_tupleDescriptor->natts);
     684              : 
     685            0 :             if (!tuples_equal(scanslot, searchslot, eq, NULL))
     686            0 :                 continue;
     687              :         }
     688              : 
     689            1 :         update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
     690              :                                          delete_time, delete_origin);
     691              :     }
     692              : 
     693            1 :     index_endscan(scan);
     694              : 
     695            1 :     index_close(idxrel, NoLock);
     696              : 
     697            1 :     ExecDropSingleTupleTableSlot(scanslot);
     698              : 
     699            1 :     return *delete_time != 0;
     700              : }
     701              : 
     702              : /*
     703              :  * Find the tuple that violates the passed unique index (conflictindex).
     704              :  *
     705              :  * If the conflicting tuple is found return true, otherwise false.
     706              :  *
     707              :  * We lock the tuple to avoid getting it deleted before the caller can fetch
     708              :  * the required information. Note that if the tuple is deleted before a lock
     709              :  * is acquired, we will retry to find the conflicting tuple again.
     710              :  */
     711              : static bool
     712           49 : FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
     713              :                   Oid conflictindex, TupleTableSlot *slot,
     714              :                   TupleTableSlot **conflictslot)
     715              : {
     716           49 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     717              :     ItemPointerData conflictTid;
     718              :     TM_FailureData tmfd;
     719              :     TM_Result   res;
     720              : 
     721           49 :     *conflictslot = NULL;
     722              : 
     723              :     /*
     724              :      * Build additional information required to check constraints violations.
     725              :      * See check_exclusion_or_unique_constraint().
     726              :      */
     727           49 :     BuildConflictIndexInfo(resultRelInfo, conflictindex);
     728              : 
     729           49 : retry:
     730           48 :     if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
     731           49 :                                   &conflictTid, &slot->tts_tid,
     732           49 :                                   list_make1_oid(conflictindex)))
     733              :     {
     734            1 :         if (*conflictslot)
     735            0 :             ExecDropSingleTupleTableSlot(*conflictslot);
     736              : 
     737            1 :         *conflictslot = NULL;
     738            1 :         return false;
     739              :     }
     740              : 
     741           47 :     *conflictslot = table_slot_create(rel, NULL);
     742              : 
     743           47 :     PushActiveSnapshot(GetLatestSnapshot());
     744              : 
     745           47 :     res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
     746              :                            *conflictslot,
     747              :                            GetCurrentCommandId(false),
     748              :                            LockTupleShare,
     749              :                            LockWaitBlock,
     750              :                            0 /* don't follow updates */ ,
     751              :                            &tmfd);
     752              : 
     753           47 :     PopActiveSnapshot();
     754              : 
     755           47 :     if (should_refetch_tuple(res, &tmfd))
     756            0 :         goto retry;
     757              : 
     758           47 :     return true;
     759              : }
     760              : 
     761              : /*
     762              :  * Check all the unique indexes in 'recheckIndexes' for conflict with the
     763              :  * tuple in 'remoteslot' and report if found.
     764              :  */
     765              : static void
     766           28 : CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
     767              :                        ConflictType type, List *recheckIndexes,
     768              :                        TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
     769              : {
     770           28 :     List       *conflicttuples = NIL;
     771              :     TupleTableSlot *conflictslot;
     772              : 
     773              :     /* Check all the unique indexes for conflicts */
     774          103 :     foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
     775              :     {
     776           97 :         if (list_member_oid(recheckIndexes, uniqueidx) &&
     777           49 :             FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
     778              :                               &conflictslot))
     779              :         {
     780           47 :             ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);
     781              : 
     782           47 :             conflicttuple->slot = conflictslot;
     783           47 :             conflicttuple->indexoid = uniqueidx;
     784              : 
     785           47 :             GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
     786              :                                     &conflicttuple->origin, &conflicttuple->ts);
     787              : 
     788           47 :             conflicttuples = lappend(conflicttuples, conflicttuple);
     789              :         }
     790              :     }
     791              : 
     792              :     /* Report the conflict, if found */
     793           27 :     if (conflicttuples)
     794           26 :         ReportApplyConflict(estate, resultRelInfo, ERROR,
     795           26 :                             list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
     796              :                             searchslot, remoteslot, conflicttuples);
     797            1 : }
     798              : 
     799              : /*
     800              :  * Insert tuple represented in the slot to the relation, update the indexes,
     801              :  * and execute any constraints and per-row triggers.
     802              :  *
     803              :  * Caller is responsible for opening the indexes.
     804              :  */
     805              : void
     806        76303 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     807              :                          EState *estate, TupleTableSlot *slot)
     808              : {
     809        76303 :     bool        skip_tuple = false;
     810        76303 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     811              : 
     812              :     /* For now we support only tables. */
     813              :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     814              : 
     815        76303 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     816              : 
     817              :     /* BEFORE ROW INSERT Triggers */
     818        76303 :     if (resultRelInfo->ri_TrigDesc &&
     819           20 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     820              :     {
     821            3 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     822            1 :             skip_tuple = true;  /* "do nothing" */
     823              :     }
     824              : 
     825        76303 :     if (!skip_tuple)
     826              :     {
     827        76302 :         List       *recheckIndexes = NIL;
     828              :         List       *conflictindexes;
     829        76302 :         bool        conflict = false;
     830              : 
     831              :         /* Compute stored generated columns */
     832        76302 :         if (rel->rd_att->constr &&
     833        45497 :             rel->rd_att->constr->has_generated_stored)
     834            4 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     835              :                                        CMD_INSERT);
     836              : 
     837              :         /* Check the constraints of the tuple */
     838        76302 :         if (rel->rd_att->constr)
     839        45497 :             ExecConstraints(resultRelInfo, slot, estate);
     840        76302 :         if (rel->rd_rel->relispartition)
     841           77 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     842              : 
     843              :         /* OK, store the tuple and create index entries for it */
     844        76302 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     845              : 
     846        76302 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     847              : 
     848        76302 :         if (resultRelInfo->ri_NumIndices > 0)
     849              :         {
     850              :             bits32      flags;
     851              : 
     852        55958 :             if (conflictindexes != NIL)
     853        55954 :                 flags = EIIT_NO_DUPE_ERROR;
     854              :             else
     855            4 :                 flags = 0;
     856        55958 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     857              :                                                    estate, flags,
     858              :                                                    slot, conflictindexes,
     859              :                                                    &conflict);
     860              :         }
     861              : 
     862              :         /*
     863              :          * Checks the conflict indexes to fetch the conflicting local row and
     864              :          * reports the conflict. We perform this check here, instead of
     865              :          * performing an additional index scan before the actual insertion and
     866              :          * reporting the conflict if any conflicting rows are found. This is
     867              :          * to avoid the overhead of executing the extra scan for each INSERT
     868              :          * operation, even when no conflict arises, which could introduce
     869              :          * significant overhead to replication, particularly in cases where
     870              :          * conflicts are rare.
     871              :          *
     872              :          * XXX OTOH, this could lead to clean-up effort for dead tuples added
     873              :          * in heap and index in case of conflicts. But as conflicts shouldn't
     874              :          * be a frequent thing so we preferred to save the performance
     875              :          * overhead of extra scan before each insertion.
     876              :          */
     877        76302 :         if (conflict)
     878           26 :             CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
     879              :                                    recheckIndexes, NULL, slot);
     880              : 
     881              :         /* AFTER ROW INSERT Triggers */
     882        76277 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     883              :                              recheckIndexes, NULL);
     884              : 
     885              :         /*
     886              :          * XXX we should in theory pass a TransitionCaptureState object to the
     887              :          * above to capture transition tuples, but after statement triggers
     888              :          * don't actually get fired by replication yet anyway
     889              :          */
     890              : 
     891        76277 :         list_free(recheckIndexes);
     892              :     }
     893        76278 : }
     894              : 
     895              : /*
     896              :  * Find the searchslot tuple and update it with data in the slot,
     897              :  * update the indexes, and execute any constraints and per-row triggers.
     898              :  *
     899              :  * Caller is responsible for opening the indexes.
     900              :  */
     901              : void
     902        31921 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     903              :                          EState *estate, EPQState *epqstate,
     904              :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     905              : {
     906        31921 :     bool        skip_tuple = false;
     907        31921 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     908        31921 :     ItemPointer tid = &(searchslot->tts_tid);
     909              : 
     910              :     /*
     911              :      * We support only non-system tables, with
     912              :      * check_publication_add_relation() accountable.
     913              :      */
     914              :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     915              :     Assert(!IsCatalogRelation(rel));
     916              : 
     917        31921 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     918              : 
     919              :     /* BEFORE ROW UPDATE Triggers */
     920        31921 :     if (resultRelInfo->ri_TrigDesc &&
     921           10 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     922              :     {
     923            3 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     924              :                                   tid, NULL, slot, NULL, NULL, false))
     925            2 :             skip_tuple = true;  /* "do nothing" */
     926              :     }
     927              : 
     928        31921 :     if (!skip_tuple)
     929              :     {
     930        31919 :         List       *recheckIndexes = NIL;
     931              :         TU_UpdateIndexes update_indexes;
     932              :         List       *conflictindexes;
     933        31919 :         bool        conflict = false;
     934              : 
     935              :         /* Compute stored generated columns */
     936        31919 :         if (rel->rd_att->constr &&
     937        31875 :             rel->rd_att->constr->has_generated_stored)
     938            2 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     939              :                                        CMD_UPDATE);
     940              : 
     941              :         /* Check the constraints of the tuple */
     942        31919 :         if (rel->rd_att->constr)
     943        31875 :             ExecConstraints(resultRelInfo, slot, estate);
     944        31919 :         if (rel->rd_rel->relispartition)
     945           12 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     946              : 
     947        31919 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     948              :                                   &update_indexes);
     949              : 
     950        31919 :         conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
     951              : 
     952        31919 :         if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
     953              :         {
     954        20154 :             bits32      flags = EIIT_IS_UPDATE;
     955              : 
     956        20154 :             if (conflictindexes != NIL)
     957        20145 :                 flags |= EIIT_NO_DUPE_ERROR;
     958        20154 :             if (update_indexes == TU_Summarizing)
     959            0 :                 flags |= EIIT_ONLY_SUMMARIZING;
     960        20154 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     961              :                                                    estate, flags,
     962              :                                                    slot, conflictindexes,
     963              :                                                    &conflict);
     964              :         }
     965              : 
     966              :         /*
     967              :          * Refer to the comments above the call to CheckAndReportConflict() in
     968              :          * ExecSimpleRelationInsert to understand why this check is done at
     969              :          * this point.
     970              :          */
     971        31919 :         if (conflict)
     972            2 :             CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
     973              :                                    recheckIndexes, searchslot, slot);
     974              : 
     975              :         /* AFTER ROW UPDATE Triggers */
     976        31917 :         ExecARUpdateTriggers(estate, resultRelInfo,
     977              :                              NULL, NULL,
     978              :                              tid, NULL, slot,
     979              :                              recheckIndexes, NULL, false);
     980              : 
     981        31917 :         list_free(recheckIndexes);
     982              :     }
     983        31919 : }
     984              : 
     985              : /*
     986              :  * Find the searchslot tuple and delete it, and execute any constraints
     987              :  * and per-row triggers.
     988              :  *
     989              :  * Caller is responsible for opening the indexes.
     990              :  */
     991              : void
     992        40313 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     993              :                          EState *estate, EPQState *epqstate,
     994              :                          TupleTableSlot *searchslot)
     995              : {
     996        40313 :     bool        skip_tuple = false;
     997        40313 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     998        40313 :     ItemPointer tid = &searchslot->tts_tid;
     999              : 
    1000        40313 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
    1001              : 
    1002              :     /* BEFORE ROW DELETE Triggers */
    1003        40313 :     if (resultRelInfo->ri_TrigDesc &&
    1004           10 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
    1005              :     {
    1006            0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
    1007            0 :                                            tid, NULL, NULL, NULL, NULL, false);
    1008              :     }
    1009              : 
    1010        40313 :     if (!skip_tuple)
    1011              :     {
    1012              :         /* OK, delete the tuple */
    1013        40313 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
    1014              : 
    1015              :         /* AFTER ROW DELETE Triggers */
    1016        40313 :         ExecARDeleteTriggers(estate, resultRelInfo,
    1017              :                              tid, NULL, NULL, false);
    1018              :     }
    1019        40313 : }
    1020              : 
    1021              : /*
    1022              :  * Check if command can be executed with current replica identity.
    1023              :  */
    1024              : void
    1025       215471 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
    1026              : {
    1027              :     PublicationDesc pubdesc;
    1028              : 
    1029              :     /*
    1030              :      * Skip checking the replica identity for partitioned tables, because the
    1031              :      * operations are actually performed on the leaf partitions.
    1032              :      */
    1033       215471 :     if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
    1034       203217 :         return;
    1035              : 
    1036              :     /* We only need to do checks for UPDATE and DELETE. */
    1037       213012 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
    1038       124678 :         return;
    1039              : 
    1040              :     /*
    1041              :      * It is only safe to execute UPDATE/DELETE if the relation does not
    1042              :      * publish UPDATEs or DELETEs, or all the following conditions are
    1043              :      * satisfied:
    1044              :      *
    1045              :      * 1. All columns, referenced in the row filters from publications which
    1046              :      * the relation is in, are valid - i.e. when all referenced columns are
    1047              :      * part of REPLICA IDENTITY.
    1048              :      *
    1049              :      * 2. All columns, referenced in the column lists are valid - i.e. when
    1050              :      * all columns referenced in the REPLICA IDENTITY are covered by the
    1051              :      * column list.
    1052              :      *
    1053              :      * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
    1054              :      * - i.e. when all these generated columns are published.
    1055              :      *
    1056              :      * XXX We could optimize it by first checking whether any of the
    1057              :      * publications have a row filter or column list for this relation, or if
    1058              :      * the relation contains a generated column. If none of these exist and
    1059              :      * the relation has replica identity then we can avoid building the
    1060              :      * descriptor but as this happens only one time it doesn't seem worth the
    1061              :      * additional complexity.
    1062              :      */
    1063        88334 :     RelationBuildPublicationDesc(rel, &pubdesc);
    1064        88334 :     if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
    1065           30 :         ereport(ERROR,
    1066              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1067              :                  errmsg("cannot update table \"%s\"",
    1068              :                         RelationGetRelationName(rel)),
    1069              :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1070        88304 :     else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
    1071           54 :         ereport(ERROR,
    1072              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1073              :                  errmsg("cannot update table \"%s\"",
    1074              :                         RelationGetRelationName(rel)),
    1075              :                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1076        88250 :     else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
    1077           12 :         ereport(ERROR,
    1078              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1079              :                  errmsg("cannot update table \"%s\"",
    1080              :                         RelationGetRelationName(rel)),
    1081              :                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1082        88238 :     else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
    1083            0 :         ereport(ERROR,
    1084              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1085              :                  errmsg("cannot delete from table \"%s\"",
    1086              :                         RelationGetRelationName(rel)),
    1087              :                  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
    1088        88238 :     else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
    1089            0 :         ereport(ERROR,
    1090              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1091              :                  errmsg("cannot delete from table \"%s\"",
    1092              :                         RelationGetRelationName(rel)),
    1093              :                  errdetail("Column list used by the publication does not cover the replica identity.")));
    1094        88238 :     else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
    1095            0 :         ereport(ERROR,
    1096              :                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1097              :                  errmsg("cannot delete from table \"%s\"",
    1098              :                         RelationGetRelationName(rel)),
    1099              :                  errdetail("Replica identity must not contain unpublished generated columns.")));
    1100              : 
    1101              :     /* If relation has replica identity we are always good. */
    1102        88238 :     if (OidIsValid(RelationGetReplicaIndex(rel)))
    1103        75850 :         return;
    1104              : 
    1105              :     /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
    1106        12388 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
    1107          230 :         return;
    1108              : 
    1109              :     /*
    1110              :      * This is UPDATE/DELETE and there is no replica identity.
    1111              :      *
    1112              :      * Check if the table publishes UPDATES or DELETES.
    1113              :      */
    1114        12158 :     if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
    1115           62 :         ereport(ERROR,
    1116              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1117              :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
    1118              :                         RelationGetRelationName(rel)),
    1119              :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1120        12096 :     else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
    1121            8 :         ereport(ERROR,
    1122              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1123              :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
    1124              :                         RelationGetRelationName(rel)),
    1125              :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
    1126              : }
    1127              : 
    1128              : 
    1129              : /*
    1130              :  * Check if we support writing into specific relkind of local relation and check
    1131              :  * if it aligns with the relkind of the relation on the publisher.
    1132              :  *
    1133              :  * The nspname and relname are only needed for error reporting.
    1134              :  */
    1135              : void
    1136          964 : CheckSubscriptionRelkind(char localrelkind, char remoterelkind,
    1137              :                          const char *nspname, const char *relname)
    1138              : {
    1139          964 :     if (localrelkind != RELKIND_RELATION &&
    1140           17 :         localrelkind != RELKIND_PARTITIONED_TABLE &&
    1141              :         localrelkind != RELKIND_SEQUENCE)
    1142            0 :         ereport(ERROR,
    1143              :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1144              :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
    1145              :                         nspname, relname),
    1146              :                  errdetail_relkind_not_supported(localrelkind)));
    1147              : 
    1148              :     /*
    1149              :      * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
    1150              :      * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
    1151              :      * exactly on both publisher and subscriber.
    1152              :      */
    1153          964 :     if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||
    1154          947 :         (localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))
    1155            0 :         ereport(ERROR,
    1156              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1157              :         /* translator: 3rd and 4th %s are "sequence" or "table" */
    1158              :                 errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
    1159              :                        nspname, relname,
    1160              :                        remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
    1161              :                        localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
    1162          964 : }
        

Generated by: LCOV version 2.0-1