LCOV - code coverage report
Current view: top level - src/backend/replication/logical - conflict.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 119 127 93.7 %
Date: 2025-01-18 04:15:08 Functions: 7 7 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * conflict.c
       3             :  *     Support routines for logging conflicts.
       4             :  *
       5             :  * Copyright (c) 2024-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/conflict.c
       9             :  *
      10             :  * This file contains the code for logging conflicts on the subscriber during
      11             :  * logical replication.
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres.h"
      16             : 
      17             : #include "access/commit_ts.h"
      18             : #include "access/tableam.h"
      19             : #include "executor/executor.h"
      20             : #include "pgstat.h"
      21             : #include "replication/conflict.h"
      22             : #include "replication/worker_internal.h"
      23             : #include "storage/lmgr.h"
      24             : #include "utils/lsyscache.h"
      25             : 
      26             : static const char *const ConflictTypeNames[] = {
      27             :     [CT_INSERT_EXISTS] = "insert_exists",
      28             :     [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
      29             :     [CT_UPDATE_EXISTS] = "update_exists",
      30             :     [CT_UPDATE_MISSING] = "update_missing",
      31             :     [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
      32             :     [CT_DELETE_MISSING] = "delete_missing"
      33             : };
      34             : 
      35             : static int  errcode_apply_conflict(ConflictType type);
      36             : static int  errdetail_apply_conflict(EState *estate,
      37             :                                      ResultRelInfo *relinfo,
      38             :                                      ConflictType type,
      39             :                                      TupleTableSlot *searchslot,
      40             :                                      TupleTableSlot *localslot,
      41             :                                      TupleTableSlot *remoteslot,
      42             :                                      Oid indexoid, TransactionId localxmin,
      43             :                                      RepOriginId localorigin,
      44             :                                      TimestampTz localts);
      45             : static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
      46             :                                        ConflictType type,
      47             :                                        TupleTableSlot *searchslot,
      48             :                                        TupleTableSlot *localslot,
      49             :                                        TupleTableSlot *remoteslot,
      50             :                                        Oid indexoid);
      51             : static char *build_index_value_desc(EState *estate, Relation localrel,
      52             :                                     TupleTableSlot *slot, Oid indexoid);
      53             : 
      54             : /*
      55             :  * Get the xmin and commit timestamp data (origin and timestamp) associated
      56             :  * with the provided local tuple.
      57             :  *
      58             :  * Return true if the commit timestamp data was found, false otherwise.
      59             :  */
      60             : bool
      61      144478 : GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
      62             :                         RepOriginId *localorigin, TimestampTz *localts)
      63             : {
      64             :     Datum       xminDatum;
      65             :     bool        isnull;
      66             : 
      67      144478 :     xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
      68             :                                 &isnull);
      69      144478 :     *xmin = DatumGetTransactionId(xminDatum);
      70             :     Assert(!isnull);
      71             : 
      72             :     /*
      73             :      * The commit timestamp data is not available if track_commit_timestamp is
      74             :      * disabled.
      75             :      */
      76      144478 :     if (!track_commit_timestamp)
      77             :     {
      78      144460 :         *localorigin = InvalidRepOriginId;
      79      144460 :         *localts = 0;
      80      144460 :         return false;
      81             :     }
      82             : 
      83          18 :     return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
      84             : }
      85             : 
      86             : /*
      87             :  * This function is used to report a conflict while applying replication
      88             :  * changes.
      89             :  *
      90             :  * 'searchslot' should contain the tuple used to search the local tuple to be
      91             :  * updated or deleted.
      92             :  *
      93             :  * 'localslot' should contain the existing local tuple, if any, that conflicts
      94             :  * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
      95             :  * transaction information related to this existing local tuple.
      96             :  *
      97             :  * 'remoteslot' should contain the remote new tuple, if any.
      98             :  *
      99             :  * The 'indexoid' represents the OID of the unique index that triggered the
     100             :  * constraint violation error. We use this to report the key values for
     101             :  * conflicting tuple.
     102             :  *
     103             :  * The caller must ensure that the index with the OID 'indexoid' is locked so
     104             :  * that we can fetch and display the conflicting key value.
     105             :  */
     106             : void
     107          52 : ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
     108             :                     ConflictType type, TupleTableSlot *searchslot,
     109             :                     TupleTableSlot *localslot, TupleTableSlot *remoteslot,
     110             :                     Oid indexoid, TransactionId localxmin,
     111             :                     RepOriginId localorigin, TimestampTz localts)
     112             : {
     113          52 :     Relation    localrel = relinfo->ri_RelationDesc;
     114             : 
     115             :     Assert(!OidIsValid(indexoid) ||
     116             :            CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
     117             : 
     118          52 :     pgstat_report_subscription_conflict(MySubscription->oid, type);
     119             : 
     120          52 :     ereport(elevel,
     121             :             errcode_apply_conflict(type),
     122             :             errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
     123             :                    get_namespace_name(RelationGetNamespace(localrel)),
     124             :                    RelationGetRelationName(localrel),
     125             :                    ConflictTypeNames[type]),
     126             :             errdetail_apply_conflict(estate, relinfo, type, searchslot,
     127             :                                      localslot, remoteslot, indexoid,
     128             :                                      localxmin, localorigin, localts));
     129          38 : }
     130             : 
     131             : /*
     132             :  * Find all unique indexes to check for a conflict and store them into
     133             :  * ResultRelInfo.
     134             :  */
     135             : void
     136      216524 : InitConflictIndexes(ResultRelInfo *relInfo)
     137             : {
     138      216524 :     List       *uniqueIndexes = NIL;
     139             : 
     140      392304 :     for (int i = 0; i < relInfo->ri_NumIndices; i++)
     141             :     {
     142      175780 :         Relation    indexRelation = relInfo->ri_IndexRelationDescs[i];
     143             : 
     144      175780 :         if (indexRelation == NULL)
     145           0 :             continue;
     146             : 
     147             :         /* Detect conflict only for unique indexes */
     148      175780 :         if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
     149          32 :             continue;
     150             : 
     151             :         /* Don't support conflict detection for deferrable index */
     152      175748 :         if (!indexRelation->rd_index->indimmediate)
     153           0 :             continue;
     154             : 
     155      175748 :         uniqueIndexes = lappend_oid(uniqueIndexes,
     156             :                                     RelationGetRelid(indexRelation));
     157             :     }
     158             : 
     159      216524 :     relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
     160      216524 : }
     161             : 
     162             : /*
     163             :  * Add SQLSTATE error code to the current conflict report.
     164             :  */
     165             : static int
     166          52 : errcode_apply_conflict(ConflictType type)
     167             : {
     168          52 :     switch (type)
     169             :     {
     170          14 :         case CT_INSERT_EXISTS:
     171             :         case CT_UPDATE_EXISTS:
     172          14 :             return errcode(ERRCODE_UNIQUE_VIOLATION);
     173          38 :         case CT_UPDATE_ORIGIN_DIFFERS:
     174             :         case CT_UPDATE_MISSING:
     175             :         case CT_DELETE_ORIGIN_DIFFERS:
     176             :         case CT_DELETE_MISSING:
     177          38 :             return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
     178             :     }
     179             : 
     180             :     Assert(false);
     181           0 :     return 0;                   /* silence compiler warning */
     182             : }
     183             : 
     184             : /*
     185             :  * Add an errdetail() line showing conflict detail.
     186             :  *
     187             :  * The DETAIL line comprises of two parts:
     188             :  * 1. Explanation of the conflict type, including the origin and commit
     189             :  *    timestamp of the existing local tuple.
     190             :  * 2. Display of conflicting key, existing local tuple, remote new tuple, and
     191             :  *    replica identity columns, if any. The remote old tuple is excluded as its
     192             :  *    information is covered in the replica identity columns.
     193             :  */
     194             : static int
     195          52 : errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
     196             :                          ConflictType type, TupleTableSlot *searchslot,
     197             :                          TupleTableSlot *localslot, TupleTableSlot *remoteslot,
     198             :                          Oid indexoid, TransactionId localxmin,
     199             :                          RepOriginId localorigin, TimestampTz localts)
     200             : {
     201             :     StringInfoData err_detail;
     202             :     char       *val_desc;
     203             :     char       *origin_name;
     204             : 
     205          52 :     initStringInfo(&err_detail);
     206             : 
     207             :     /* First, construct a detailed message describing the type of conflict */
     208          52 :     switch (type)
     209             :     {
     210          14 :         case CT_INSERT_EXISTS:
     211             :         case CT_UPDATE_EXISTS:
     212             :             Assert(OidIsValid(indexoid));
     213             : 
     214          14 :             if (localts)
     215             :             {
     216           6 :                 if (localorigin == InvalidRepOriginId)
     217           0 :                     appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
     218             :                                      get_rel_name(indexoid),
     219             :                                      localxmin, timestamptz_to_str(localts));
     220           6 :                 else if (replorigin_by_oid(localorigin, true, &origin_name))
     221           2 :                     appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
     222             :                                      get_rel_name(indexoid), origin_name,
     223             :                                      localxmin, timestamptz_to_str(localts));
     224             : 
     225             :                 /*
     226             :                  * The origin that modified this row has been removed. This
     227             :                  * can happen if the origin was created by a different apply
     228             :                  * worker and its associated subscription and origin were
     229             :                  * dropped after updating the row, or if the origin was
     230             :                  * manually dropped by the user.
     231             :                  */
     232             :                 else
     233           4 :                     appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
     234             :                                      get_rel_name(indexoid),
     235             :                                      localxmin, timestamptz_to_str(localts));
     236             :             }
     237             :             else
     238           8 :                 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
     239             :                                  get_rel_name(indexoid), localxmin);
     240             : 
     241          14 :             break;
     242             : 
     243           6 :         case CT_UPDATE_ORIGIN_DIFFERS:
     244           6 :             if (localorigin == InvalidRepOriginId)
     245           2 :                 appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
     246             :                                  localxmin, timestamptz_to_str(localts));
     247           4 :             else if (replorigin_by_oid(localorigin, true, &origin_name))
     248           2 :                 appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
     249             :                                  origin_name, localxmin, timestamptz_to_str(localts));
     250             : 
     251             :             /* The origin that modified this row has been removed. */
     252             :             else
     253           2 :                 appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
     254             :                                  localxmin, timestamptz_to_str(localts));
     255             : 
     256           6 :             break;
     257             : 
     258          10 :         case CT_UPDATE_MISSING:
     259          10 :             appendStringInfo(&err_detail, _("Could not find the row to be updated."));
     260          10 :             break;
     261             : 
     262           4 :         case CT_DELETE_ORIGIN_DIFFERS:
     263           4 :             if (localorigin == InvalidRepOriginId)
     264           2 :                 appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
     265             :                                  localxmin, timestamptz_to_str(localts));
     266           2 :             else if (replorigin_by_oid(localorigin, true, &origin_name))
     267           2 :                 appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
     268             :                                  origin_name, localxmin, timestamptz_to_str(localts));
     269             : 
     270             :             /* The origin that modified this row has been removed. */
     271             :             else
     272           0 :                 appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
     273             :                                  localxmin, timestamptz_to_str(localts));
     274             : 
     275           4 :             break;
     276             : 
     277          18 :         case CT_DELETE_MISSING:
     278          18 :             appendStringInfo(&err_detail, _("Could not find the row to be deleted."));
     279          18 :             break;
     280             :     }
     281             : 
     282          52 :     Assert(err_detail.len > 0);
     283             : 
     284          52 :     val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
     285             :                                          localslot, remoteslot, indexoid);
     286             : 
     287             :     /*
     288             :      * Next, append the key values, existing local tuple, remote tuple and
     289             :      * replica identity columns after the message.
     290             :      */
     291          52 :     if (val_desc)
     292          52 :         appendStringInfo(&err_detail, "\n%s", val_desc);
     293             : 
     294          52 :     return errdetail_internal("%s", err_detail.data);
     295             : }
     296             : 
     297             : /*
     298             :  * Helper function to build the additional details for conflicting key,
     299             :  * existing local tuple, remote tuple, and replica identity columns.
     300             :  *
     301             :  * If the return value is NULL, it indicates that the current user lacks
     302             :  * permissions to view the columns involved.
     303             :  */
     304             : static char *
     305          52 : build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
     306             :                           ConflictType type,
     307             :                           TupleTableSlot *searchslot,
     308             :                           TupleTableSlot *localslot,
     309             :                           TupleTableSlot *remoteslot,
     310             :                           Oid indexoid)
     311             : {
     312          52 :     Relation    localrel = relinfo->ri_RelationDesc;
     313          52 :     Oid         relid = RelationGetRelid(localrel);
     314          52 :     TupleDesc   tupdesc = RelationGetDescr(localrel);
     315             :     StringInfoData tuple_value;
     316          52 :     char       *desc = NULL;
     317             : 
     318             :     Assert(searchslot || localslot || remoteslot);
     319             : 
     320          52 :     initStringInfo(&tuple_value);
     321             : 
     322             :     /*
     323             :      * Report the conflicting key values in the case of a unique constraint
     324             :      * violation.
     325             :      */
     326          52 :     if (type == CT_INSERT_EXISTS || type == CT_UPDATE_EXISTS)
     327             :     {
     328             :         Assert(OidIsValid(indexoid) && localslot);
     329             : 
     330          14 :         desc = build_index_value_desc(estate, localrel, localslot, indexoid);
     331             : 
     332          14 :         if (desc)
     333          14 :             appendStringInfo(&tuple_value, _("Key %s"), desc);
     334             :     }
     335             : 
     336          52 :     if (localslot)
     337             :     {
     338             :         /*
     339             :          * The 'modifiedCols' only applies to the new tuple, hence we pass
     340             :          * NULL for the existing local tuple.
     341             :          */
     342          24 :         desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
     343             :                                              NULL, 64);
     344             : 
     345          24 :         if (desc)
     346             :         {
     347          24 :             if (tuple_value.len > 0)
     348             :             {
     349          14 :                 appendStringInfoString(&tuple_value, "; ");
     350          14 :                 appendStringInfo(&tuple_value, _("existing local tuple %s"),
     351             :                                  desc);
     352             :             }
     353             :             else
     354             :             {
     355          10 :                 appendStringInfo(&tuple_value, _("Existing local tuple %s"),
     356             :                                  desc);
     357             :             }
     358             :         }
     359             :     }
     360             : 
     361          52 :     if (remoteslot)
     362             :     {
     363             :         Bitmapset  *modifiedCols;
     364             : 
     365             :         /*
     366             :          * Although logical replication doesn't maintain the bitmap for the
     367             :          * columns being inserted, we still use it to create 'modifiedCols'
     368             :          * for consistency with other calls to ExecBuildSlotValueDescription.
     369             :          *
     370             :          * Note that generated columns are formed locally on the subscriber.
     371             :          */
     372          30 :         modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
     373          30 :                                  ExecGetUpdatedCols(relinfo, estate));
     374          30 :         desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
     375             :                                              modifiedCols, 64);
     376             : 
     377          30 :         if (desc)
     378             :         {
     379          30 :             if (tuple_value.len > 0)
     380             :             {
     381          20 :                 appendStringInfoString(&tuple_value, "; ");
     382          20 :                 appendStringInfo(&tuple_value, _("remote tuple %s"), desc);
     383             :             }
     384             :             else
     385             :             {
     386          10 :                 appendStringInfo(&tuple_value, _("Remote tuple %s"), desc);
     387             :             }
     388             :         }
     389             :     }
     390             : 
     391          52 :     if (searchslot)
     392             :     {
     393             :         /*
     394             :          * Note that while index other than replica identity may be used (see
     395             :          * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
     396             :          * when applying update or delete, such an index scan may not result
     397             :          * in a unique tuple and we still compare the complete tuple in such
     398             :          * cases, thus such indexes are not used here.
     399             :          */
     400          40 :         Oid         replica_index = GetRelationIdentityOrPK(localrel);
     401             : 
     402             :         Assert(type != CT_INSERT_EXISTS);
     403             : 
     404             :         /*
     405             :          * If the table has a valid replica identity index, build the index
     406             :          * key value string. Otherwise, construct the full tuple value for
     407             :          * REPLICA IDENTITY FULL cases.
     408             :          */
     409          40 :         if (OidIsValid(replica_index))
     410          36 :             desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
     411             :         else
     412           4 :             desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
     413             : 
     414          40 :         if (desc)
     415             :         {
     416          40 :             if (tuple_value.len > 0)
     417             :             {
     418          22 :                 appendStringInfoString(&tuple_value, "; ");
     419          44 :                 appendStringInfo(&tuple_value, OidIsValid(replica_index)
     420          18 :                                  ? _("replica identity %s")
     421           4 :                                  : _("replica identity full %s"), desc);
     422             :             }
     423             :             else
     424             :             {
     425          36 :                 appendStringInfo(&tuple_value, OidIsValid(replica_index)
     426          18 :                                  ? _("Replica identity %s")
     427           0 :                                  : _("Replica identity full %s"), desc);
     428             :             }
     429             :         }
     430             :     }
     431             : 
     432          52 :     if (tuple_value.len == 0)
     433           0 :         return NULL;
     434             : 
     435          52 :     appendStringInfoChar(&tuple_value, '.');
     436          52 :     return tuple_value.data;
     437             : }
     438             : 
     439             : /*
     440             :  * Helper functions to construct a string describing the contents of an index
     441             :  * entry. See BuildIndexValueDescription for details.
     442             :  *
     443             :  * The caller must ensure that the index with the OID 'indexoid' is locked so
     444             :  * that we can fetch and display the conflicting key value.
     445             :  */
     446             : static char *
     447          50 : build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
     448             :                        Oid indexoid)
     449             : {
     450             :     char       *index_value;
     451             :     Relation    indexDesc;
     452             :     Datum       values[INDEX_MAX_KEYS];
     453             :     bool        isnull[INDEX_MAX_KEYS];
     454          50 :     TupleTableSlot *tableslot = slot;
     455             : 
     456          50 :     if (!tableslot)
     457           0 :         return NULL;
     458             : 
     459             :     Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
     460             : 
     461          50 :     indexDesc = index_open(indexoid, NoLock);
     462             : 
     463             :     /*
     464             :      * If the slot is a virtual slot, copy it into a heap tuple slot as
     465             :      * FormIndexDatum only works with heap tuple slots.
     466             :      */
     467          50 :     if (TTS_IS_VIRTUAL(slot))
     468             :     {
     469          24 :         tableslot = table_slot_create(localrel, &estate->es_tupleTable);
     470          24 :         tableslot = ExecCopySlot(tableslot, slot);
     471             :     }
     472             : 
     473             :     /*
     474             :      * Initialize ecxt_scantuple for potential use in FormIndexDatum when
     475             :      * index expressions are present.
     476             :      */
     477          50 :     GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
     478             : 
     479             :     /*
     480             :      * The values/nulls arrays passed to BuildIndexValueDescription should be
     481             :      * the results of FormIndexDatum, which are the "raw" input to the index
     482             :      * AM.
     483             :      */
     484          50 :     FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
     485             : 
     486          50 :     index_value = BuildIndexValueDescription(indexDesc, values, isnull);
     487             : 
     488          50 :     index_close(indexDesc, NoLock);
     489             : 
     490          50 :     return index_value;
     491             : }

Generated by: LCOV version 1.14