LCOV - code coverage report
Current view: top level - src/backend/executor - execReplication.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 160 200 80.0 %
Date: 2020-06-05 19:06:29 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * execReplication.c
       4             :  *    miscellaneous executor routines for logical replication
       5             :  *
       6             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/executor/execReplication.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/genam.h"
      18             : #include "access/relscan.h"
      19             : #include "access/tableam.h"
      20             : #include "access/transam.h"
      21             : #include "access/xact.h"
      22             : #include "commands/trigger.h"
      23             : #include "executor/executor.h"
      24             : #include "executor/nodeModifyTable.h"
      25             : #include "nodes/nodeFuncs.h"
      26             : #include "parser/parse_relation.h"
      27             : #include "parser/parsetree.h"
      28             : #include "storage/bufmgr.h"
      29             : #include "storage/lmgr.h"
      30             : #include "utils/builtins.h"
      31             : #include "utils/datum.h"
      32             : #include "utils/lsyscache.h"
      33             : #include "utils/memutils.h"
      34             : #include "utils/rel.h"
      35             : #include "utils/snapmgr.h"
      36             : #include "utils/syscache.h"
      37             : #include "utils/typcache.h"
      38             : 
      39             : 
      40             : /*
      41             :  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
      42             :  * is setup to match 'rel' (*NOT* idxrel!).
      43             :  *
      44             :  * Returns whether any column contains NULLs.
      45             :  *
      46             :  * This is not generic routine, it expects the idxrel to be replication
      47             :  * identity of a rel and meet all limitations associated with that.
      48             :  */
      49             : static bool
      50         472 : build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
      51             :                          TupleTableSlot *searchslot)
      52             : {
      53             :     int         attoff;
      54             :     bool        isnull;
      55             :     Datum       indclassDatum;
      56             :     oidvector  *opclass;
      57         472 :     int2vector *indkey = &idxrel->rd_index->indkey;
      58         472 :     bool        hasnulls = false;
      59             : 
      60             :     Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
      61             :            RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
      62             : 
      63         472 :     indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
      64             :                                     Anum_pg_index_indclass, &isnull);
      65             :     Assert(!isnull);
      66         472 :     opclass = (oidvector *) DatumGetPointer(indclassDatum);
      67             : 
      68             :     /* Build scankey for every attribute in the index. */
      69         944 :     for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
      70             :     {
      71             :         Oid         operator;
      72             :         Oid         opfamily;
      73             :         RegProcedure regop;
      74         472 :         int         pkattno = attoff + 1;
      75         472 :         int         mainattno = indkey->values[attoff];
      76         472 :         Oid         optype = get_opclass_input_type(opclass->values[attoff]);
      77             : 
      78             :         /*
      79             :          * Load the operator info.  We need this to get the equality operator
      80             :          * function for the scan key.
      81             :          */
      82         472 :         opfamily = get_opclass_family(opclass->values[attoff]);
      83             : 
      84         472 :         operator = get_opfamily_member(opfamily, optype,
      85             :                                        optype,
      86             :                                        BTEqualStrategyNumber);
      87         472 :         if (!OidIsValid(operator))
      88           0 :             elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
      89             :                  BTEqualStrategyNumber, optype, optype, opfamily);
      90             : 
      91         472 :         regop = get_opcode(operator);
      92             : 
      93             :         /* Initialize the scankey. */
      94         472 :         ScanKeyInit(&skey[attoff],
      95             :                     pkattno,
      96             :                     BTEqualStrategyNumber,
      97             :                     regop,
      98         472 :                     searchslot->tts_values[mainattno - 1]);
      99             : 
     100         472 :         skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
     101             : 
     102             :         /* Check for null value. */
     103         472 :         if (searchslot->tts_isnull[mainattno - 1])
     104             :         {
     105           0 :             hasnulls = true;
     106           0 :             skey[attoff].sk_flags |= SK_ISNULL;
     107             :         }
     108             :     }
     109             : 
     110         472 :     return hasnulls;
     111             : }
     112             : 
     113             : /*
     114             :  * Search the relation 'rel' for tuple using the index.
     115             :  *
     116             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     117             :  * contents, and return true.  Return false otherwise.
     118             :  */
     119             : bool
     120         472 : RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
     121             :                              LockTupleMode lockmode,
     122             :                              TupleTableSlot *searchslot,
     123             :                              TupleTableSlot *outslot)
     124             : {
     125             :     ScanKeyData skey[INDEX_MAX_KEYS];
     126             :     IndexScanDesc scan;
     127             :     SnapshotData snap;
     128             :     TransactionId xwait;
     129             :     Relation    idxrel;
     130             :     bool        found;
     131             : 
     132             :     /* Open the index. */
     133         472 :     idxrel = index_open(idxoid, RowExclusiveLock);
     134             : 
     135             :     /* Start an index scan. */
     136         472 :     InitDirtySnapshot(snap);
     137         472 :     scan = index_beginscan(rel, idxrel, &snap,
     138         472 :                            IndexRelationGetNumberOfKeyAttributes(idxrel),
     139             :                            0);
     140             : 
     141             :     /* Build scan key. */
     142         472 :     build_replindex_scan_key(skey, rel, idxrel, searchslot);
     143             : 
     144         472 : retry:
     145         472 :     found = false;
     146             : 
     147         472 :     index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
     148             : 
     149             :     /* Try to find the tuple */
     150         472 :     if (index_getnext_slot(scan, ForwardScanDirection, outslot))
     151             :     {
     152         472 :         found = true;
     153         472 :         ExecMaterializeSlot(outslot);
     154             : 
     155         944 :         xwait = TransactionIdIsValid(snap.xmin) ?
     156         472 :             snap.xmin : snap.xmax;
     157             : 
     158             :         /*
     159             :          * If the tuple is locked, wait for locking transaction to finish and
     160             :          * retry.
     161             :          */
     162         472 :         if (TransactionIdIsValid(xwait))
     163             :         {
     164           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     165           0 :             goto retry;
     166             :         }
     167             :     }
     168             : 
     169             :     /* Found tuple, try to lock it in the lockmode. */
     170         472 :     if (found)
     171             :     {
     172             :         TM_FailureData tmfd;
     173             :         TM_Result   res;
     174             : 
     175         472 :         PushActiveSnapshot(GetLatestSnapshot());
     176             : 
     177         472 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     178             :                                outslot,
     179             :                                GetCurrentCommandId(false),
     180             :                                lockmode,
     181             :                                LockWaitBlock,
     182             :                                0 /* don't follow updates */ ,
     183             :                                &tmfd);
     184             : 
     185         472 :         PopActiveSnapshot();
     186             : 
     187         472 :         switch (res)
     188             :         {
     189         472 :             case TM_Ok:
     190         472 :                 break;
     191           0 :             case TM_Updated:
     192             :                 /* XXX: Improve handling here */
     193           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     194           0 :                     ereport(LOG,
     195             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     196             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     197             :                 else
     198           0 :                     ereport(LOG,
     199             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     200             :                              errmsg("concurrent update, retrying")));
     201           0 :                 goto retry;
     202           0 :             case TM_Deleted:
     203             :                 /* XXX: Improve handling here */
     204           0 :                 ereport(LOG,
     205             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     206             :                          errmsg("concurrent delete, retrying")));
     207           0 :                 goto retry;
     208           0 :             case TM_Invisible:
     209           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     210             :                 break;
     211           0 :             default:
     212           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     213             :                 break;
     214             :         }
     215             :     }
     216             : 
     217         472 :     index_endscan(scan);
     218             : 
     219             :     /* Don't release lock until commit. */
     220         472 :     index_close(idxrel, NoLock);
     221             : 
     222         472 :     return found;
     223             : }
     224             : 
     225             : /*
     226             :  * Compare the tuples in the slots by checking if they have equal values.
     227             :  */
     228             : static bool
     229      210440 : tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
     230             :              TypeCacheEntry **eq)
     231             : {
     232             :     int         attrnum;
     233             : 
     234             :     Assert(slot1->tts_tupleDescriptor->natts ==
     235             :            slot2->tts_tupleDescriptor->natts);
     236             : 
     237      210440 :     slot_getallattrs(slot1);
     238      210440 :     slot_getallattrs(slot2);
     239             : 
     240             :     /* Check equality of the attributes. */
     241      210684 :     for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
     242             :     {
     243             :         Form_pg_attribute att;
     244             :         TypeCacheEntry *typentry;
     245             : 
     246             :         /*
     247             :          * If one value is NULL and other is not, then they are certainly not
     248             :          * equal
     249             :          */
     250      210440 :         if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
     251           0 :             return false;
     252             : 
     253             :         /*
     254             :          * If both are NULL, they can be considered equal.
     255             :          */
     256      210440 :         if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
     257           0 :             continue;
     258             : 
     259      210440 :         att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
     260             : 
     261      210440 :         typentry = eq[attrnum];
     262      210440 :         if (typentry == NULL)
     263             :         {
     264         244 :             typentry = lookup_type_cache(att->atttypid,
     265             :                                          TYPECACHE_EQ_OPR_FINFO);
     266         244 :             if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
     267           0 :                 ereport(ERROR,
     268             :                         (errcode(ERRCODE_UNDEFINED_FUNCTION),
     269             :                          errmsg("could not identify an equality operator for type %s",
     270             :                                 format_type_be(att->atttypid))));
     271         244 :             eq[attrnum] = typentry;
     272             :         }
     273             : 
     274      210440 :         if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
     275             :                                             att->attcollation,
     276             :                                             slot1->tts_values[attrnum],
     277             :                                             slot2->tts_values[attrnum])))
     278      210196 :             return false;
     279             :     }
     280             : 
     281         244 :     return true;
     282             : }
     283             : 
     284             : /*
     285             :  * Search the relation 'rel' for tuple using the sequential scan.
     286             :  *
     287             :  * If a matching tuple is found, lock it with lockmode, fill the slot with its
     288             :  * contents, and return true.  Return false otherwise.
     289             :  *
     290             :  * Note that this stops on the first matching tuple.
     291             :  *
     292             :  * This can obviously be quite slow on tables that have more than few rows.
     293             :  */
     294             : bool
     295         244 : RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
     296             :                          TupleTableSlot *searchslot, TupleTableSlot *outslot)
     297             : {
     298             :     TupleTableSlot *scanslot;
     299             :     TableScanDesc scan;
     300             :     SnapshotData snap;
     301             :     TypeCacheEntry **eq;
     302             :     TransactionId xwait;
     303             :     bool        found;
     304         244 :     TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
     305             : 
     306             :     Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
     307             : 
     308         244 :     eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
     309             : 
     310             :     /* Start a heap scan. */
     311         244 :     InitDirtySnapshot(snap);
     312         244 :     scan = table_beginscan(rel, &snap, 0, NULL);
     313         244 :     scanslot = table_slot_create(rel, NULL);
     314             : 
     315         244 : retry:
     316         244 :     found = false;
     317             : 
     318         244 :     table_rescan(scan, NULL);
     319             : 
     320             :     /* Try to find the tuple */
     321      210440 :     while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
     322             :     {
     323      210440 :         if (!tuples_equal(scanslot, searchslot, eq))
     324      210196 :             continue;
     325             : 
     326         244 :         found = true;
     327         244 :         ExecCopySlot(outslot, scanslot);
     328             : 
     329         488 :         xwait = TransactionIdIsValid(snap.xmin) ?
     330         244 :             snap.xmin : snap.xmax;
     331             : 
     332             :         /*
     333             :          * If the tuple is locked, wait for locking transaction to finish and
     334             :          * retry.
     335             :          */
     336         244 :         if (TransactionIdIsValid(xwait))
     337             :         {
     338           0 :             XactLockTableWait(xwait, NULL, NULL, XLTW_None);
     339           0 :             goto retry;
     340             :         }
     341             : 
     342             :         /* Found our tuple and it's not locked */
     343         244 :         break;
     344             :     }
     345             : 
     346             :     /* Found tuple, try to lock it in the lockmode. */
     347         244 :     if (found)
     348             :     {
     349             :         TM_FailureData tmfd;
     350             :         TM_Result   res;
     351             : 
     352         244 :         PushActiveSnapshot(GetLatestSnapshot());
     353             : 
     354         244 :         res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
     355             :                                outslot,
     356             :                                GetCurrentCommandId(false),
     357             :                                lockmode,
     358             :                                LockWaitBlock,
     359             :                                0 /* don't follow updates */ ,
     360             :                                &tmfd);
     361             : 
     362         244 :         PopActiveSnapshot();
     363             : 
     364         244 :         switch (res)
     365             :         {
     366         244 :             case TM_Ok:
     367         244 :                 break;
     368           0 :             case TM_Updated:
     369             :                 /* XXX: Improve handling here */
     370           0 :                 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
     371           0 :                     ereport(LOG,
     372             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     373             :                              errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
     374             :                 else
     375           0 :                     ereport(LOG,
     376             :                             (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     377             :                              errmsg("concurrent update, retrying")));
     378           0 :                 goto retry;
     379           0 :             case TM_Deleted:
     380             :                 /* XXX: Improve handling here */
     381           0 :                 ereport(LOG,
     382             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     383             :                          errmsg("concurrent delete, retrying")));
     384           0 :                 goto retry;
     385           0 :             case TM_Invisible:
     386           0 :                 elog(ERROR, "attempted to lock invisible tuple");
     387             :                 break;
     388           0 :             default:
     389           0 :                 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
     390             :                 break;
     391             :         }
     392             :     }
     393             : 
     394         244 :     table_endscan(scan);
     395         244 :     ExecDropSingleTupleTableSlot(scanslot);
     396             : 
     397         244 :     return found;
     398             : }
     399             : 
     400             : /*
     401             :  * Insert tuple represented in the slot to the relation, update the indexes,
     402             :  * and execute any constraints and per-row triggers.
     403             :  *
     404             :  * Caller is responsible for opening the indexes.
     405             :  */
     406             : void
     407         836 : ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
     408             : {
     409         836 :     bool        skip_tuple = false;
     410         836 :     ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
     411         836 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     412             : 
     413             :     /* For now we support only tables. */
     414             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     415             : 
     416         836 :     CheckCmdReplicaIdentity(rel, CMD_INSERT);
     417             : 
     418             :     /* BEFORE ROW INSERT Triggers */
     419         836 :     if (resultRelInfo->ri_TrigDesc &&
     420           8 :         resultRelInfo->ri_TrigDesc->trig_insert_before_row)
     421             :     {
     422           2 :         if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
     423           2 :             skip_tuple = true;  /* "do nothing" */
     424             :     }
     425             : 
     426         836 :     if (!skip_tuple)
     427             :     {
     428         834 :         List       *recheckIndexes = NIL;
     429             : 
     430             :         /* Compute stored generated columns */
     431         834 :         if (rel->rd_att->constr &&
     432         442 :             rel->rd_att->constr->has_generated_stored)
     433           4 :             ExecComputeStoredGenerated(estate, slot, CMD_INSERT);
     434             : 
     435             :         /* Check the constraints of the tuple */
     436         834 :         if (rel->rd_att->constr)
     437         442 :             ExecConstraints(resultRelInfo, slot, estate);
     438         834 :         if (resultRelInfo->ri_PartitionCheck)
     439          46 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     440             : 
     441             :         /* OK, store the tuple and create index entries for it */
     442         834 :         simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
     443             : 
     444         834 :         if (resultRelInfo->ri_NumIndices > 0)
     445         440 :             recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
     446             :                                                    NIL);
     447             : 
     448             :         /* AFTER ROW INSERT Triggers */
     449         834 :         ExecARInsertTriggers(estate, resultRelInfo, slot,
     450             :                              recheckIndexes, NULL);
     451             : 
     452             :         /*
     453             :          * XXX we should in theory pass a TransitionCaptureState object to the
     454             :          * above to capture transition tuples, but after statement triggers
     455             :          * don't actually get fired by replication yet anyway
     456             :          */
     457             : 
     458         834 :         list_free(recheckIndexes);
     459             :     }
     460         836 : }
     461             : 
     462             : /*
     463             :  * Find the searchslot tuple and update it with data in the slot,
     464             :  * update the indexes, and execute any constraints and per-row triggers.
     465             :  *
     466             :  * Caller is responsible for opening the indexes.
     467             :  */
     468             : void
     469         260 : ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
     470             :                          TupleTableSlot *searchslot, TupleTableSlot *slot)
     471             : {
     472         260 :     bool        skip_tuple = false;
     473         260 :     ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
     474         260 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     475         260 :     ItemPointer tid = &(searchslot->tts_tid);
     476             : 
     477             :     /* For now we support only tables. */
     478             :     Assert(rel->rd_rel->relkind == RELKIND_RELATION);
     479             : 
     480         260 :     CheckCmdReplicaIdentity(rel, CMD_UPDATE);
     481             : 
     482             :     /* BEFORE ROW UPDATE Triggers */
     483         260 :     if (resultRelInfo->ri_TrigDesc &&
     484           4 :         resultRelInfo->ri_TrigDesc->trig_update_before_row)
     485             :     {
     486           4 :         if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
     487             :                                   tid, NULL, slot))
     488           4 :             skip_tuple = true;  /* "do nothing" */
     489             :     }
     490             : 
     491         260 :     if (!skip_tuple)
     492             :     {
     493         256 :         List       *recheckIndexes = NIL;
     494             :         bool        update_indexes;
     495             : 
     496             :         /* Compute stored generated columns */
     497         256 :         if (rel->rd_att->constr &&
     498         212 :             rel->rd_att->constr->has_generated_stored)
     499           2 :             ExecComputeStoredGenerated(estate, slot, CMD_UPDATE);
     500             : 
     501             :         /* Check the constraints of the tuple */
     502         256 :         if (rel->rd_att->constr)
     503         212 :             ExecConstraints(resultRelInfo, slot, estate);
     504         256 :         if (resultRelInfo->ri_PartitionCheck)
     505          10 :             ExecPartitionCheck(resultRelInfo, slot, estate, true);
     506             : 
     507         256 :         simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
     508             :                                   &update_indexes);
     509             : 
     510         256 :         if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
     511         110 :             recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
     512             :                                                    NIL);
     513             : 
     514             :         /* AFTER ROW UPDATE Triggers */
     515         256 :         ExecARUpdateTriggers(estate, resultRelInfo,
     516             :                              tid, NULL, slot,
     517             :                              recheckIndexes, NULL);
     518             : 
     519         256 :         list_free(recheckIndexes);
     520             :     }
     521         260 : }
     522             : 
     523             : /*
     524             :  * Find the searchslot tuple and delete it, and execute any constraints
     525             :  * and per-row triggers.
     526             :  *
     527             :  * Caller is responsible for opening the indexes.
     528             :  */
     529             : void
     530         454 : ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
     531             :                          TupleTableSlot *searchslot)
     532             : {
     533         454 :     bool        skip_tuple = false;
     534         454 :     ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
     535         454 :     Relation    rel = resultRelInfo->ri_RelationDesc;
     536         454 :     ItemPointer tid = &searchslot->tts_tid;
     537             : 
     538         454 :     CheckCmdReplicaIdentity(rel, CMD_DELETE);
     539             : 
     540             :     /* BEFORE ROW DELETE Triggers */
     541         454 :     if (resultRelInfo->ri_TrigDesc &&
     542           0 :         resultRelInfo->ri_TrigDesc->trig_delete_before_row)
     543             :     {
     544           0 :         skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
     545           0 :                                            tid, NULL, NULL);
     546             : 
     547             :     }
     548             : 
     549         454 :     if (!skip_tuple)
     550             :     {
     551             :         /* OK, delete the tuple */
     552         454 :         simple_table_tuple_delete(rel, tid, estate->es_snapshot);
     553             : 
     554             :         /* AFTER ROW DELETE Triggers */
     555         454 :         ExecARDeleteTriggers(estate, resultRelInfo,
     556             :                              tid, NULL, NULL);
     557             :     }
     558         454 : }
     559             : 
     560             : /*
     561             :  * Check if command can be executed with current replica identity.
     562             :  */
     563             : void
     564       93966 : CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
     565             : {
     566             :     PublicationActions *pubactions;
     567             : 
     568             :     /* We only need to do checks for UPDATE and DELETE. */
     569       93966 :     if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
     570       73802 :         return;
     571             : 
     572             :     /* If relation has replica identity we are always good. */
     573       40054 :     if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
     574       19890 :         OidIsValid(RelationGetReplicaIndex(rel)))
     575        4192 :         return;
     576             : 
     577             :     /*
     578             :      * This is either UPDATE OR DELETE and there is no replica identity.
     579             :      *
     580             :      * Check if the table publishes UPDATES or DELETES.
     581             :      */
     582       15972 :     pubactions = GetRelationPublicationActions(rel);
     583       15972 :     if (cmd == CMD_UPDATE && pubactions->pubupdate)
     584           4 :         ereport(ERROR,
     585             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     586             :                  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
     587             :                         RelationGetRelationName(rel)),
     588             :                  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
     589       15968 :     else if (cmd == CMD_DELETE && pubactions->pubdelete)
     590           0 :         ereport(ERROR,
     591             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     592             :                  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
     593             :                         RelationGetRelationName(rel)),
     594             :                  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
     595             : }
     596             : 
     597             : 
     598             : /*
     599             :  * Check if we support writing into specific relkind.
     600             :  *
     601             :  * The nspname and relname are only needed for error reporting.
     602             :  */
     603             : void
     604         370 : CheckSubscriptionRelkind(char relkind, const char *nspname,
     605             :                          const char *relname)
     606             : {
     607             :     /*
     608             :      * Give a more specific error for foreign tables.
     609             :      */
     610         370 :     if (relkind == RELKIND_FOREIGN_TABLE)
     611           0 :         ereport(ERROR,
     612             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     613             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     614             :                         nspname, relname),
     615             :                  errdetail("\"%s.%s\" is a foreign table.",
     616             :                            nspname, relname)));
     617             : 
     618         370 :     if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
     619           0 :         ereport(ERROR,
     620             :                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     621             :                  errmsg("cannot use relation \"%s.%s\" as logical replication target",
     622             :                         nspname, relname),
     623             :                  errdetail("\"%s.%s\" is not a table.",
     624             :                            nspname, relname)));
     625         370 : }

Generated by: LCOV version 1.13