LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 157 195 80.5 %
Date: 2021-12-03 04:09:03 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/relscan.h"
      19             : #include "access/tableam.h"
      20             : #include "access/transam.h"
      21             : #include "access/xact.h"
      22             : #include "commands/trigger.h"
      23             : #include "executor/executor.h"
      24             : #include "executor/nodeModifyTable.h"
      25             : #include "nodes/nodeFuncs.h"
      26             : #include "parser/parse_relation.h"
      27             : #include "parser/parsetree.h"
      28             : #include "storage/bufmgr.h"
      29             : #include "storage/lmgr.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/datum.h"
      32             : #include "utils/lsyscache.h"
      33             : #include "utils/memutils.h"
      34             : #include "utils/rel.h"
      35             : #include "utils/snapmgr.h"
      36             : #include "utils/syscache.h"
      37             : #include "utils/typcache.h"
      38             : 
      39             : 
      40             : /*
      41             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      42             :  * is setup to match 'rel' (*NOT* idxrel!).
      43             :  *
      44             :  * Returns whether any column contains NULLs.
      45             :  *
      46             :  * This is not generic routine, it expects the idxrel to be replication
      47             :  * identity of a rel and meet all limitations associated with that.
      48             :  */
      49             : static bool
      50      113002 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      51             :                          TupleTableSlot *searchslot)
      52             : {
      53             :     int         attoff;
      54             :     bool        isnull;
      55             :     Datum       indclassDatum;
      56             :     oidvector  *opclass;
      57      113002 :     int2vector *indkey = &idxrel->rd_index->indkey;
      58      113002 :     bool        hasnulls = false;
      59             : 
      60             :     Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
      61             :            RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
      62             : 
      63      113002 :     indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
      64             :                                     Anum_pg_index_indclass, &isnull);
      65             :     Assert(!isnull);
      66      113002 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      67             : 
      68             :     /* Build scankey for every attribute in the index. */
      69      226004 :     for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
      70             :     {
      71             :         Oid         operator;
      72             :         Oid         opfamily;
      73             :         RegProcedure regop;
      74      113002 :         int         pkattno = attoff + 1;
      75      113002 :         int         mainattno = indkey->values[attoff];
      76      113002 :         Oid         optype = get_opclass_input_type(opclass->values[attoff]);
      77             : 
      78             :         /*
      79             :          * Load the operator info.  We need this to get the equality operator
      80             :          * function for the scan key.
      81             :          */
      82      113002 :         opfamily = get_opclass_family(opclass->values[attoff]);
      83             : 
      84      113002 :         operator = get_opfamily_member(opfamily, optype,
      85             :                                        optype,
      86             :                                        BTEqualStrategyNumber);
      87      113002 :         if (!OidIsValid(operator))
      88           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
      89             :                  BTEqualStrategyNumber, optype, optype, opfamily);
      90             : 
      91      113002 :         regop = get_opcode(operator);
      92             : 
      93             :         /* Initialize the scankey. */
      94      113002 :         ScanKeyInit(&skey[attoff],
      95             :                     pkattno,
      96             :                     BTEqualStrategyNumber,
      97             :                     regop,
      98      113002 :                     searchslot->tts_values[mainattno - 1]);
      99             : 
     100      113002 :         skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
     101             : 
     102             :         /* Check for null value. */
     103      113002 :         if (searchslot->tts_isnull[mainattno - 1])
     104             :         {
     105           0 :             hasnulls = true;
     106           0 :             skey[attoff].sk_flags |= SK_ISNULL;
     107             :         }
     108             :     }
     109             : 
     110      113002 :     return hasnulls;
     111             : }
     112             : 
     113             : /*
     114             :  * Search the relation 'rel' for tuple using the index.
     115             :  *
     116             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     117             :  * contents, and return true.  Return false otherwise.
     118             :  */
     119             : bool
     120      113002 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     121             :                              LockTupleMode lockmode,
     122             :                              TupleTableSlot *searchslot,
     123             :                              TupleTableSlot *outslot)
     124             : {
     125             :     ScanKeyData skey[INDEX_MAX_KEYS];
     126             :     IndexScanDesc scan;
     127             :     SnapshotData snap;
     128             :     TransactionId xwait;
     129             :     Relation    idxrel;
     130             :     bool        found;
     131             : 
     132             :     /* Open the index. */
     133      113002 :     idxrel = index_open(idxoid, RowExclusiveLock);
     134             : 
     135             :     /* Start an index scan. */
     136      113002 :     InitDirtySnapshot(snap);
     137      113002 :     scan = index_beginscan(rel, idxrel, &snap,
     138      113002 :                            IndexRelationGetNumberOfKeyAttributes(idxrel),
     139             :                            0);
     140             : 
     141             :     /* Build scan key. */
     142      113002 :     build_replindex_scan_key(skey, rel, idxrel, searchslot);
     143             : 
     144      113002 : retry:
     145      113002 :     found = false;
     146             : 
     147      113002 :     index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
     148             : 
     149             :     /* Try to find the tuple */
     150      113002 :     if (index_getnext_slot(scan, ForwardScanDirection, outslot))
     151             :     {
     152      112986 :         found = true;
     153      112986 :         ExecMaterializeSlot(outslot);
     154             : 
     155      225972 :         xwait = TransactionIdIsValid(snap.xmin) ?
     156      112986 :             snap.xmin : snap.xmax;
     157             : 
     158             :         /*
     159             :          * If the tuple is locked, wait for locking transaction to finish and
     160             :          * retry.
     161             :          */
     162      112986 :         if (TransactionIdIsValid(xwait))
     163             :         {
     164           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     165           0 :             goto retry;
     166             :         }
     167             :     }
     168             : 
     169             :     /* Found tuple, try to lock it in the lockmode. */
     170      113002 :     if (found)
     171             :     {
     172             :         TM_FailureData tmfd;
     173             :         TM_Result   res;
     174             : 
     175      112986 :         PushActiveSnapshot(GetLatestSnapshot());
     176             : 
     177      112986 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     178             :                                outslot,
     179             :                                GetCurrentCommandId(false),
     180             :                                lockmode,
     181             :                                LockWaitBlock,
     182             :                                0 /* don't follow updates */ ,
     183             :                                &tmfd);
     184             : 
     185      112986 :         PopActiveSnapshot();
     186             : 
     187      112986 :         switch (res)
     188             :         {
     189      112986 :             case TM_Ok:
     190      112986 :                 break;
     191           0 :             case TM_Updated:
     192             :                 /* XXX: Improve handling here */
     193           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     194           0 :                     ereport(LOG,
     195             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     196             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     197             :                 else
     198           0 :                     ereport(LOG,
     199             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     200             :                              errmsg("concurrent update, retrying")));
     201           0 :                 goto retry;
     202           0 :             case TM_Deleted:
     203             :                 /* XXX: Improve handling here */
     204           0 :                 ereport(LOG,
     205             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     206             :                          errmsg("concurrent delete, retrying")));
     207           0 :                 goto retry;
     208           0 :             case TM_Invisible:
     209           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     210             :                 break;
     211           0 :             default:
     212           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     213             :                 break;
     214             :         }
     215             :     }
     216             : 
     217      113002 :     index_endscan(scan);
     218             : 
     219             :     /* Don't release lock until commit. */
     220      113002 :     index_close(idxrel, NoLock);
     221             : 
     222      113002 :     return found;
     223             : }
     224             : 
     225             : /*
     226             :  * Compare the tuples in the slots by checking if they have equal values.
     227             :  */
     228             : static bool
     229      210440 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     230             :              TypeCacheEntry **eq)
     231             : {
     232             :     int         attrnum;
     233             : 
     234             :     Assert(slot1->tts_tupleDescriptor->natts ==
     235             :            slot2->tts_tupleDescriptor->natts);
     236             : 
     237      210440 :     slot_getallattrs(slot1);
     238      210440 :     slot_getallattrs(slot2);
     239             : 
     240             :     /* Check equality of the attributes. */
     241      210684 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     242             :     {
     243             :         Form_pg_attribute att;
     244             :         TypeCacheEntry *typentry;
     245             : 
     246             :         /*
     247             :          * If one value is NULL and other is not, then they are certainly not
     248             :          * equal
     249             :          */
     250      210440 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     251           0 :             return false;
     252             : 
     253             :         /*
     254             :          * If both are NULL, they can be considered equal.
     255             :          */
     256      210440 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     257           0 :             continue;
     258             : 
     259      210440 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     260             : 
     261      210440 :         typentry = eq[attrnum];
     262      210440 :         if (typentry == NULL)
     263             :         {
     264         244 :             typentry = lookup_type_cache(att->atttypid,
     265             :                                          TYPECACHE_EQ_OPR_FINFO);
     266         244 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     267           0 :                 ereport(ERROR,
     268             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     269             :                          errmsg("could not identify an equality operator for type %s",
     270             :                                 format_type_be(att->atttypid))));
     271         244 :             eq[attrnum] = typentry;
     272             :         }
     273             : 
     274      210440 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     275             :                                             att->attcollation,
     276             :                                             slot1->tts_values[attrnum],
     277             :                                             slot2->tts_values[attrnum])))
     278      210196 :             return false;
     279             :     }
     280             : 
     281         244 :     return true;
     282             : }
     283             : 
     284             : /*
     285             :  * Search the relation 'rel' for tuple using the sequential scan.
     286             :  *
     287             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     288             :  * contents, and return true.  Return false otherwise.
     289             :  *
     290             :  * Note that this stops on the first matching tuple.
     291             :  *
     292             :  * This can obviously be quite slow on tables that have more than few rows.
     293             :  */
     294             : bool
     295         244 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     296             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     297             : {
     298             :     TupleTableSlot *scanslot;
     299             :     TableScanDesc scan;
     300             :     SnapshotData snap;
     301             :     TypeCacheEntry **eq;
     302             :     TransactionId xwait;
     303             :     bool        found;
     304         244 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     305             : 
     306             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     307             : 
     308         244 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     309             : 
     310             :     /* Start a heap scan. */
     311         244 :     InitDirtySnapshot(snap);
     312         244 :     scan = table_beginscan(rel, &snap, 0, NULL);
     313         244 :     scanslot = table_slot_create(rel, NULL);
     314             : 
     315         244 : retry:
     316         244 :     found = false;
     317             : 
     318         244 :     table_rescan(scan, NULL);
     319             : 
     320             :     /* Try to find the tuple */
     321      210440 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     322             :     {
     323      210440 :         if (!tuples_equal(scanslot, searchslot, eq))
     324      210196 :             continue;
     325             : 
     326         244 :         found = true;
     327         244 :         ExecCopySlot(outslot, scanslot);
     328             : 
     329         488 :         xwait = TransactionIdIsValid(snap.xmin) ?
     330         244 :             snap.xmin : snap.xmax;
     331             : 
     332             :         /*
     333             :          * If the tuple is locked, wait for locking transaction to finish and
     334             :          * retry.
     335             :          */
     336         244 :         if (TransactionIdIsValid(xwait))
     337             :         {
     338           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     339           0 :             goto retry;
     340             :         }
     341             : 
     342             :         /* Found our tuple and it's not locked */
     343         244 :         break;
     344             :     }
     345             : 
     346             :     /* Found tuple, try to lock it in the lockmode. */
     347         244 :     if (found)
     348             :     {
     349             :         TM_FailureData tmfd;
     350             :         TM_Result   res;
     351             : 
     352         244 :         PushActiveSnapshot(GetLatestSnapshot());
     353             : 
     354         244 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     355             :                                outslot,
     356             :                                GetCurrentCommandId(false),
     357             :                                lockmode,
     358             :                                LockWaitBlock,
     359             :                                0 /* don't follow updates */ ,
     360             :                                &tmfd);
     361             : 
     362         244 :         PopActiveSnapshot();
     363             : 
     364         244 :         switch (res)
     365             :         {
     366         244 :             case TM_Ok:
     367         244 :                 break;
     368           0 :             case TM_Updated:
     369             :                 /* XXX: Improve handling here */
     370           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     371           0 :                     ereport(LOG,
     372             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     373             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     374             :                 else
     375           0 :                     ereport(LOG,
     376             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     377             :                              errmsg("concurrent update, retrying")));
     378           0 :                 goto retry;
     379           0 :             case TM_Deleted:
     380             :                 /* XXX: Improve handling here */
     381           0 :                 ereport(LOG,
     382             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     383             :                          errmsg("concurrent delete, retrying")));
     384           0 :                 goto retry;
     385           0 :             case TM_Invisible:
     386           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     387             :                 break;
     388           0 :             default:
     389           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     390             :                 break;
     391             :         }
     392             :     }
     393             : 
     394         244 :     table_endscan(scan);
     395         244 :     ExecDropSingleTupleTableSlot(scanslot);
     396             : 
     397         244 :     return found;
     398             : }
     399             : 
     400             : /*
     401             :  * Insert tuple represented in the slot to the relation, update the indexes,
     402             :  * and execute any constraints and per-row triggers.
     403             :  *
     404             :  * Caller is responsible for opening the indexes.
     405             :  */
     406             : void
     407      102952 : ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
     408             :                          EState *estate, TupleTableSlot *slot)
     409             : {
     410      102952 :     bool        skip_tuple = false;
     411      102952 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     412             : 
     413             :     /* For now we support only tables. */
     414             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     415             : 
     416      102952 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     417             : 
     418             :     /* BEFORE ROW INSERT Triggers */
     419      102952 :     if (resultRelInfo->ri_TrigDesc &&
     420          34 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     421             :     {
     422           2 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     423           2 :             skip_tuple = true;  /* "do nothing" */
     424             :     }
     425             : 
     426      102952 :     if (!skip_tuple)
     427             :     {
     428      102950 :         List       *recheckIndexes = NIL;
     429             : 
     430             :         /* Compute stored generated columns */
     431      102950 :         if (rel->rd_att->constr &&
     432      102498 :             rel->rd_att->constr->has_generated_stored)
     433           4 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     434             :                                        CMD_INSERT);
     435             : 
     436             :         /* Check the constraints of the tuple */
     437      102950 :         if (rel->rd_att->constr)
     438      102498 :             ExecConstraints(resultRelInfo, slot, estate);
     439      102950 :         if (rel->rd_rel->relispartition)
     440          64 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     441             : 
     442             :         /* OK, store the tuple and create index entries for it */
     443      102950 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     444             : 
     445      102950 :         if (resultRelInfo->ri_NumIndices > 0)
     446      102498 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     447             :                                                    slot, estate, false, false,
     448             :                                                    NULL, NIL);
     449             : 
     450             :         /* AFTER ROW INSERT Triggers */
     451      102948 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     452             :                              recheckIndexes, NULL);
     453             : 
     454             :         /*
     455             :          * XXX we should in theory pass a TransitionCaptureState object to the
     456             :          * above to capture transition tuples, but after statement triggers
     457             :          * don't actually get fired by replication yet anyway
     458             :          */
     459             : 
     460      102948 :         list_free(recheckIndexes);
     461             :     }
     462      102950 : }
     463             : 
     464             : /*
     465             :  * Find the searchslot tuple and update it with data in the slot,
     466             :  * update the indexes, and execute any constraints and per-row triggers.
     467             :  *
     468             :  * Caller is responsible for opening the indexes.
     469             :  */
     470             : void
     471       57780 : ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
     472             :                          EState *estate, EPQState *epqstate,
     473             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     474             : {
     475       57780 :     bool        skip_tuple = false;
     476       57780 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     477       57780 :     ItemPointer tid = &(searchslot->tts_tid);
     478             : 
     479             :     /* For now we support only tables. */
     480             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     481             : 
     482       57780 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     483             : 
     484             :     /* BEFORE ROW UPDATE Triggers */
     485       57780 :     if (resultRelInfo->ri_TrigDesc &&
     486          18 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     487             :     {
     488           4 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     489             :                                   tid, NULL, slot))
     490           4 :             skip_tuple = true;  /* "do nothing" */
     491             :     }
     492             : 
     493       57780 :     if (!skip_tuple)
     494             :     {
     495       57776 :         List       *recheckIndexes = NIL;
     496             :         bool        update_indexes;
     497             : 
     498             :         /* Compute stored generated columns */
     499       57776 :         if (rel->rd_att->constr &&
     500       57732 :             rel->rd_att->constr->has_generated_stored)
     501           2 :             ExecComputeStoredGenerated(resultRelInfo, estate, slot,
     502             :                                        CMD_UPDATE);
     503             : 
     504             :         /* Check the constraints of the tuple */
     505       57776 :         if (rel->rd_att->constr)
     506       57732 :             ExecConstraints(resultRelInfo, slot, estate);
     507       57776 :         if (rel->rd_rel->relispartition)
     508          10 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     509             : 
     510       57776 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     511             :                                   &update_indexes);
     512             : 
     513       57776 :         if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
     514       45990 :             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
     515             :                                                    slot, estate, true, false,
     516             :                                                    NULL, NIL);
     517             : 
     518             :         /* AFTER ROW UPDATE Triggers */
     519       57776 :         ExecARUpdateTriggers(estate, resultRelInfo,
     520             :                              tid, NULL, slot,
     521             :                              recheckIndexes, NULL);
     522             : 
     523       57776 :         list_free(recheckIndexes);
     524             :     }
     525       57780 : }
     526             : 
     527             : /*
     528             :  * Find the searchslot tuple and delete it, and execute any constraints
     529             :  * and per-row triggers.
     530             :  *
     531             :  * Caller is responsible for opening the indexes.
     532             :  */
     533             : void
     534       55448 : ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
     535             :                          EState *estate, EPQState *epqstate,
     536             :                          TupleTableSlot *searchslot)
     537             : {
     538       55448 :     bool        skip_tuple = false;
     539       55448 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     540       55448 :     ItemPointer tid = &searchslot->tts_tid;
     541             : 
     542       55448 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     543             : 
     544             :     /* BEFORE ROW DELETE Triggers */
     545       55448 :     if (resultRelInfo->ri_TrigDesc &&
     546          20 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     547             :     {
     548           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     549           0 :                                            tid, NULL, NULL);
     550             : 
     551             :     }
     552             : 
     553       55448 :     if (!skip_tuple)
     554             :     {
     555             :         /* OK, delete the tuple */
     556       55448 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     557             : 
     558             :         /* AFTER ROW DELETE Triggers */
     559       55448 :         ExecARDeleteTriggers(estate, resultRelInfo,
     560             :                              tid, NULL, NULL);
     561             :     }
     562       55448 : }
     563             : 
     564             : /*
     565             :  * Check if command can be executed with current replica identity.
     566             :  */
     567             : void
     568      318986 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     569             : {
     570             :     PublicationActions *pubactions;
     571             : 
     572             :     /* We only need to do checks for UPDATE and DELETE. */
     573      318986 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     574      185512 :         return;
     575             : 
     576             :     /* If relation has replica identity we are always good. */
     577      266668 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     578      133194 :         OidIsValid(RelationGetReplicaIndex(rel)))
     579      116934 :         return;
     580             : 
     581             :     /*
     582             :      * This is either UPDATE OR DELETE and there is no replica identity.
     583             :      *
     584             :      * Check if the table publishes UPDATES or DELETES.
     585             :      */
     586       16540 :     pubactions = GetRelationPublicationActions(rel);
     587       16540 :     if (cmd == CMD_UPDATE && pubactions->pubupdate)
     588          40 :         ereport(ERROR,
     589             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     590             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     591             :                         RelationGetRelationName(rel)),
     592             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     593       16500 :     else if (cmd == CMD_DELETE && pubactions->pubdelete)
     594           0 :         ereport(ERROR,
     595             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     596             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     597             :                         RelationGetRelationName(rel)),
     598             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     599             : }
     600             : 
     601             : 
     602             : /*
     603             :  * Check if we support writing into specific relkind.
     604             :  *
     605             :  * The nspname and relname are only needed for error reporting.
     606             :  */
     607             : void
     608         702 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     609             :                          const char *relname)
     610             : {
     611         702 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
     612           0 :         ereport(ERROR,
     613             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     614             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     615             :                         nspname, relname),
     616             :                  errdetail_relkind_not_supported(relkind)));
     617         702 : }

Generated by: LCOV version 1.14