LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 146 185 78.9 %
Date: 2019-06-18 07:06:57 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/relscan.h"
      19             : #include "access/tableam.h"
      20             : #include "access/transam.h"
      21             : #include "access/xact.h"
      22             : #include "commands/trigger.h"
      23             : #include "executor/executor.h"
      24             : #include "executor/nodeModifyTable.h"
      25             : #include "nodes/nodeFuncs.h"
      26             : #include "parser/parse_relation.h"
      27             : #include "parser/parsetree.h"
      28             : #include "storage/bufmgr.h"
      29             : #include "storage/lmgr.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/datum.h"
      32             : #include "utils/lsyscache.h"
      33             : #include "utils/memutils.h"
      34             : #include "utils/rel.h"
      35             : #include "utils/snapmgr.h"
      36             : #include "utils/syscache.h"
      37             : #include "utils/typcache.h"
      38             : 
      39             : 
      40             : /*
      41             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      42             :  * is setup to match 'rel' (*NOT* idxrel!).
      43             :  *
      44             :  * Returns whether any column contains NULLs.
      45             :  *
      46             :  * This is not generic routine, it expects the idxrel to be replication
      47             :  * identity of a rel and meet all limitations associated with that.
      48             :  */
      49             : static bool
      50         352 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      51             :                          TupleTableSlot *searchslot)
      52             : {
      53             :     int         attoff;
      54             :     bool        isnull;
      55             :     Datum       indclassDatum;
      56             :     oidvector  *opclass;
      57         352 :     int2vector *indkey = &idxrel->rd_index->indkey;
      58         352 :     bool        hasnulls = false;
      59             : 
      60             :     Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel));
      61             : 
      62         352 :     indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
      63             :                                     Anum_pg_index_indclass, &isnull);
      64             :     Assert(!isnull);
      65         352 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      66             : 
      67             :     /* Build scankey for every attribute in the index. */
      68         704 :     for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
      69             :     {
      70             :         Oid         operator;
      71             :         Oid         opfamily;
      72             :         RegProcedure regop;
      73         352 :         int         pkattno = attoff + 1;
      74         352 :         int         mainattno = indkey->values[attoff];
      75         352 :         Oid         optype = get_opclass_input_type(opclass->values[attoff]);
      76             : 
      77             :         /*
      78             :          * Load the operator info.  We need this to get the equality operator
      79             :          * function for the scan key.
      80             :          */
      81         352 :         opfamily = get_opclass_family(opclass->values[attoff]);
      82             : 
      83         352 :         operator = get_opfamily_member(opfamily, optype,
      84             :                                        optype,
      85             :                                        BTEqualStrategyNumber);
      86         352 :         if (!OidIsValid(operator))
      87           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
      88             :                  BTEqualStrategyNumber, optype, optype, opfamily);
      89             : 
      90         352 :         regop = get_opcode(operator);
      91             : 
      92             :         /* Initialize the scankey. */
      93         352 :         ScanKeyInit(&skey[attoff],
      94             :                     pkattno,
      95             :                     BTEqualStrategyNumber,
      96             :                     regop,
      97         352 :                     searchslot->tts_values[mainattno - 1]);
      98             : 
      99         352 :         skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
     100             : 
     101             :         /* Check for null value. */
     102         352 :         if (searchslot->tts_isnull[mainattno - 1])
     103             :         {
     104           0 :             hasnulls = true;
     105           0 :             skey[attoff].sk_flags |= SK_ISNULL;
     106             :         }
     107             :     }
     108             : 
     109         352 :     return hasnulls;
     110             : }
     111             : 
     112             : /*
     113             :  * Search the relation 'rel' for tuple using the index.
     114             :  *
     115             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     116             :  * contents, and return true.  Return false otherwise.
     117             :  */
     118             : bool
     119         352 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     120             :                              LockTupleMode lockmode,
     121             :                              TupleTableSlot *searchslot,
     122             :                              TupleTableSlot *outslot)
     123             : {
     124             :     ScanKeyData skey[INDEX_MAX_KEYS];
     125             :     IndexScanDesc scan;
     126             :     SnapshotData snap;
     127             :     TransactionId xwait;
     128             :     Relation    idxrel;
     129             :     bool        found;
     130             : 
     131             :     /* Open the index. */
     132         352 :     idxrel = index_open(idxoid, RowExclusiveLock);
     133             : 
     134             :     /* Start an index scan. */
     135         352 :     InitDirtySnapshot(snap);
     136         352 :     scan = index_beginscan(rel, idxrel, &snap,
     137         352 :                            IndexRelationGetNumberOfKeyAttributes(idxrel),
     138             :                            0);
     139             : 
     140             :     /* Build scan key. */
     141         352 :     build_replindex_scan_key(skey, rel, idxrel, searchslot);
     142             : 
     143             : retry:
     144         352 :     found = false;
     145             : 
     146         352 :     index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
     147             : 
     148             :     /* Try to find the tuple */
     149         352 :     if (index_getnext_slot(scan, ForwardScanDirection, outslot))
     150             :     {
     151         352 :         found = true;
     152         352 :         ExecMaterializeSlot(outslot);
     153             : 
     154         704 :         xwait = TransactionIdIsValid(snap.xmin) ?
     155         352 :             snap.xmin : snap.xmax;
     156             : 
     157             :         /*
     158             :          * If the tuple is locked, wait for locking transaction to finish and
     159             :          * retry.
     160             :          */
     161         352 :         if (TransactionIdIsValid(xwait))
     162             :         {
     163           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     164           0 :             goto retry;
     165             :         }
     166             :     }
     167             : 
     168             :     /* Found tuple, try to lock it in the lockmode. */
     169         352 :     if (found)
     170             :     {
     171             :         TM_FailureData tmfd;
     172             :         TM_Result   res;
     173             : 
     174         352 :         PushActiveSnapshot(GetLatestSnapshot());
     175             : 
     176         352 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     177             :                                outslot,
     178             :                                GetCurrentCommandId(false),
     179             :                                lockmode,
     180             :                                LockWaitBlock,
     181             :                                0 /* don't follow updates */ ,
     182             :                                &tmfd);
     183             : 
     184         352 :         PopActiveSnapshot();
     185             : 
     186         352 :         switch (res)
     187             :         {
     188             :             case TM_Ok:
     189         352 :                 break;
     190             :             case TM_Updated:
     191             :                 /* XXX: Improve handling here */
     192           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     193           0 :                     ereport(LOG,
     194             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     195             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     196             :                 else
     197           0 :                     ereport(LOG,
     198             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     199             :                              errmsg("concurrent update, retrying")));
     200           0 :                 goto retry;
     201             :             case TM_Deleted:
     202             :                 /* XXX: Improve handling here */
     203           0 :                 ereport(LOG,
     204             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     205             :                          errmsg("concurrent delete, retrying")));
     206           0 :                 goto retry;
     207             :             case TM_Invisible:
     208           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     209             :                 break;
     210             :             default:
     211           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     212             :                 break;
     213             :         }
     214             :     }
     215             : 
     216         352 :     index_endscan(scan);
     217             : 
     218             :     /* Don't release lock until commit. */
     219         352 :     index_close(idxrel, NoLock);
     220             : 
     221         352 :     return found;
     222             : }
     223             : 
     224             : /*
     225             :  * Compare the tuples in the slots by checking if they have equal values.
     226             :  */
     227             : static bool
     228      221312 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2)
     229             : {
     230             :     int         attrnum;
     231             : 
     232             :     Assert(slot1->tts_tupleDescriptor->natts ==
     233             :            slot2->tts_tupleDescriptor->natts);
     234             : 
     235      221312 :     slot_getallattrs(slot1);
     236      221312 :     slot_getallattrs(slot2);
     237             : 
     238             :     /* Check equality of the attributes. */
     239      221596 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     240             :     {
     241             :         Form_pg_attribute att;
     242             :         TypeCacheEntry *typentry;
     243             : 
     244             :         /*
     245             :          * If one value is NULL and other is not, then they are certainly not
     246             :          * equal
     247             :          */
     248      221312 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     249           0 :             return false;
     250             : 
     251             :         /*
     252             :          * If both are NULL, they can be considered equal.
     253             :          */
     254      221312 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     255           0 :             continue;
     256             : 
     257      221312 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     258             : 
     259      221312 :         typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
     260      221312 :         if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     261           0 :             ereport(ERROR,
     262             :                     (errcode(ERRCODE_UNDEFINED_FUNCTION),
     263             :                      errmsg("could not identify an equality operator for type %s",
     264             :                             format_type_be(att->atttypid))));
     265             : 
     266      221312 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     267             :                                             att->attcollation,
     268             :                                             slot1->tts_values[attrnum],
     269             :                                             slot2->tts_values[attrnum])))
     270      221028 :             return false;
     271             :     }
     272             : 
     273         284 :     return true;
     274             : }
     275             : 
     276             : /*
     277             :  * Search the relation 'rel' for tuple using the sequential scan.
     278             :  *
     279             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     280             :  * contents, and return true.  Return false otherwise.
     281             :  *
     282             :  * Note that this stops on the first matching tuple.
     283             :  *
     284             :  * This can obviously be quite slow on tables that have more than few rows.
     285             :  */
     286             : bool
     287         244 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     288             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     289             : {
     290             :     TupleTableSlot *scanslot;
     291             :     TableScanDesc scan;
     292             :     SnapshotData snap;
     293             :     TransactionId xwait;
     294             :     bool        found;
     295         244 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     296             : 
     297             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     298             : 
     299             :     /* Start a heap scan. */
     300         244 :     InitDirtySnapshot(snap);
     301         244 :     scan = table_beginscan(rel, &snap, 0, NULL);
     302         244 :     scanslot = table_slot_create(rel, NULL);
     303             : 
     304             : retry:
     305         244 :     found = false;
     306             : 
     307         244 :     table_rescan(scan, NULL);
     308             : 
     309             :     /* Try to find the tuple */
     310         244 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     311             :     {
     312      221312 :         if (!tuples_equal(scanslot, searchslot))
     313      221028 :             continue;
     314             : 
     315         284 :         found = true;
     316         284 :         ExecCopySlot(outslot, scanslot);
     317             : 
     318         568 :         xwait = TransactionIdIsValid(snap.xmin) ?
     319         284 :             snap.xmin : snap.xmax;
     320             : 
     321             :         /*
     322             :          * If the tuple is locked, wait for locking transaction to finish and
     323             :          * retry.
     324             :          */
     325         284 :         if (TransactionIdIsValid(xwait))
     326             :         {
     327           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     328           0 :             goto retry;
     329             :         }
     330             :     }
     331             : 
     332             :     /* Found tuple, try to lock it in the lockmode. */
     333         244 :     if (found)
     334             :     {
     335             :         TM_FailureData tmfd;
     336             :         TM_Result   res;
     337             : 
     338         244 :         PushActiveSnapshot(GetLatestSnapshot());
     339             : 
     340         244 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     341             :                                outslot,
     342             :                                GetCurrentCommandId(false),
     343             :                                lockmode,
     344             :                                LockWaitBlock,
     345             :                                0 /* don't follow updates */ ,
     346             :                                &tmfd);
     347             : 
     348         244 :         PopActiveSnapshot();
     349             : 
     350         244 :         switch (res)
     351             :         {
     352             :             case TM_Ok:
     353         244 :                 break;
     354             :             case TM_Updated:
     355             :                 /* XXX: Improve handling here */
     356           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     357           0 :                     ereport(LOG,
     358             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     359             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     360             :                 else
     361           0 :                     ereport(LOG,
     362             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     363             :                              errmsg("concurrent update, retrying")));
     364           0 :                 goto retry;
     365             :             case TM_Deleted:
     366             :                 /* XXX: Improve handling here */
     367           0 :                 ereport(LOG,
     368             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     369             :                          errmsg("concurrent delete, retrying")));
     370           0 :                 goto retry;
     371             :             case TM_Invisible:
     372           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     373             :                 break;
     374             :             default:
     375           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     376             :                 break;
     377             :         }
     378             :     }
     379             : 
     380         244 :     table_endscan(scan);
     381         244 :     ExecDropSingleTupleTableSlot(scanslot);
     382             : 
     383         244 :     return found;
     384             : }
     385             : 
     386             : /*
     387             :  * Insert tuple represented in the slot to the relation, update the indexes,
     388             :  * and execute any constraints and per-row triggers.
     389             :  *
     390             :  * Caller is responsible for opening the indexes.
     391             :  */
     392             : void
     393         702 : ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
     394             : {
     395         702 :     bool        skip_tuple = false;
     396         702 :     ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
     397         702 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     398             : 
     399             :     /* For now we support only tables. */
     400             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     401             : 
     402         702 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     403             : 
     404             :     /* BEFORE ROW INSERT Triggers */
     405         710 :     if (resultRelInfo->ri_TrigDesc &&
     406           8 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     407             :     {
     408           2 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     409           2 :             skip_tuple = true;  /* "do nothing" */
     410             :     }
     411             : 
     412         702 :     if (!skip_tuple)
     413             :     {
     414         700 :         List       *recheckIndexes = NIL;
     415             : 
     416             :         /* Compute stored generated columns */
     417        1048 :         if (rel->rd_att->constr &&
     418         348 :             rel->rd_att->constr->has_generated_stored)
     419           4 :             ExecComputeStoredGenerated(estate, slot);
     420             : 
     421             :         /* Check the constraints of the tuple */
     422         700 :         if (rel->rd_att->constr)
     423         348 :             ExecConstraints(resultRelInfo, slot, estate);
     424         700 :         if (resultRelInfo->ri_PartitionCheck)
     425           0 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     426             : 
     427             :         /* OK, store the tuple and create index entries for it */
     428         700 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     429             : 
     430         700 :         if (resultRelInfo->ri_NumIndices > 0)
     431         346 :             recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
     432             :                                                    NIL);
     433             : 
     434             :         /* AFTER ROW INSERT Triggers */
     435         700 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     436             :                              recheckIndexes, NULL);
     437             : 
     438             :         /*
     439             :          * XXX we should in theory pass a TransitionCaptureState object to the
     440             :          * above to capture transition tuples, but after statement triggers
     441             :          * don't actually get fired by replication yet anyway
     442             :          */
     443             : 
     444         700 :         list_free(recheckIndexes);
     445             :     }
     446         702 : }
     447             : 
     448             : /*
     449             :  * Find the searchslot tuple and update it with data in the slot,
     450             :  * update the indexes, and execute any constraints and per-row triggers.
     451             :  *
     452             :  * Caller is responsible for opening the indexes.
     453             :  */
     454             : void
     455         214 : ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
     456             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     457             : {
     458         214 :     bool        skip_tuple = false;
     459         214 :     ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
     460         214 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     461         214 :     ItemPointer tid = &(searchslot->tts_tid);
     462             : 
     463             :     /* For now we support only tables. */
     464             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     465             : 
     466         214 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     467             : 
     468             :     /* BEFORE ROW UPDATE Triggers */
     469         214 :     if (resultRelInfo->ri_TrigDesc &&
     470           0 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     471             :     {
     472           0 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     473             :                                   tid, NULL, slot))
     474           0 :             skip_tuple = true;  /* "do nothing" */
     475             :     }
     476             : 
     477         214 :     if (!skip_tuple)
     478             :     {
     479         214 :         List       *recheckIndexes = NIL;
     480             :         bool        update_indexes;
     481             : 
     482             :         /* Compute stored generated columns */
     483         384 :         if (rel->rd_att->constr &&
     484         170 :             rel->rd_att->constr->has_generated_stored)
     485           2 :             ExecComputeStoredGenerated(estate, slot);
     486             : 
     487             :         /* Check the constraints of the tuple */
     488         214 :         if (rel->rd_att->constr)
     489         170 :             ExecConstraints(resultRelInfo, slot, estate);
     490         214 :         if (resultRelInfo->ri_PartitionCheck)
     491           0 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     492             : 
     493         214 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     494             :                                   &update_indexes);
     495             : 
     496         214 :         if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
     497          82 :             recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
     498             :                                                    NIL);
     499             : 
     500             :         /* AFTER ROW UPDATE Triggers */
     501         214 :         ExecARUpdateTriggers(estate, resultRelInfo,
     502             :                              tid, NULL, slot,
     503             :                              recheckIndexes, NULL);
     504             : 
     505         214 :         list_free(recheckIndexes);
     506             :     }
     507         214 : }
     508             : 
     509             : /*
     510             :  * Find the searchslot tuple and delete it, and execute any constraints
     511             :  * and per-row triggers.
     512             :  *
     513             :  * Caller is responsible for opening the indexes.
     514             :  */
     515             : void
     516         382 : ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
     517             :                          TupleTableSlot *searchslot)
     518             : {
     519         382 :     bool        skip_tuple = false;
     520         382 :     ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
     521         382 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     522         382 :     ItemPointer tid = &searchslot->tts_tid;
     523             : 
     524         382 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     525             : 
     526             :     /* BEFORE ROW DELETE Triggers */
     527         382 :     if (resultRelInfo->ri_TrigDesc &&
     528           0 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     529             :     {
     530           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     531           0 :                                            tid, NULL, NULL);
     532             : 
     533             :     }
     534             : 
     535         382 :     if (!skip_tuple)
     536             :     {
     537             :         /* OK, delete the tuple */
     538         382 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     539             : 
     540             :         /* AFTER ROW DELETE Triggers */
     541         382 :         ExecARDeleteTriggers(estate, resultRelInfo,
     542             :                              tid, NULL, NULL);
     543             :     }
     544         382 : }
     545             : 
     546             : /*
     547             :  * Check if command can be executed with current replica identity.
     548             :  */
     549             : void
     550       82460 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     551             : {
     552             :     PublicationActions *pubactions;
     553             : 
     554             :     /* We only need to do checks for UPDATE and DELETE. */
     555       82460 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     556       68780 :         return;
     557             : 
     558             :     /* If relation has replica identity we are always good. */
     559       27090 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     560       13410 :         OidIsValid(RelationGetReplicaIndex(rel)))
     561        3778 :         return;
     562             : 
     563             :     /*
     564             :      * This is either UPDATE OR DELETE and there is no replica identity.
     565             :      *
     566             :      * Check if the table publishes UPDATES or DELETES.
     567             :      */
     568        9902 :     pubactions = GetRelationPublicationActions(rel);
     569        9902 :     if (cmd == CMD_UPDATE && pubactions->pubupdate)
     570           0 :         ereport(ERROR,
     571             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     572             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     573             :                         RelationGetRelationName(rel)),
     574             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     575        9902 :     else if (cmd == CMD_DELETE && pubactions->pubdelete)
     576           0 :         ereport(ERROR,
     577             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     578             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     579             :                         RelationGetRelationName(rel)),
     580             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     581             : }
     582             : 
     583             : 
     584             : /*
     585             :  * Check if we support writing into specific relkind.
     586             :  *
     587             :  * The nspname and relname are only needed for error reporting.
     588             :  */
     589             : void
     590         246 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     591             :                          const char *relname)
     592             : {
     593             :     /*
     594             :      * We currently only support writing to regular tables.  However, give a
     595             :      * more specific error for partitioned and foreign tables.
     596             :      */
     597         246 :     if (relkind == RELKIND_PARTITIONED_TABLE)
     598           0 :         ereport(ERROR,
     599             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     600             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     601             :                         nspname, relname),
     602             :                  errdetail("\"%s.%s\" is a partitioned table.",
     603             :                            nspname, relname)));
     604         246 :     else if (relkind == RELKIND_FOREIGN_TABLE)
     605           0 :         ereport(ERROR,
     606             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     607             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     608             :                         nspname, relname),
     609             :                  errdetail("\"%s.%s\" is a foreign table.",
     610             :                            nspname, relname)));
     611             : 
     612         246 :     if (relkind != RELKIND_RELATION)
     613           0 :         ereport(ERROR,
     614             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     615             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     616             :                         nspname, relname),
     617             :                  errdetail("\"%s.%s\" is not a table.",
     618             :                            nspname, relname)));
     619         246 : }

Generated by: LCOV version 1.13